Replace buffer I/O locks with condition variables.
authorAndres Freund <[email protected]>
Thu, 29 Oct 2020 17:57:34 +0000 (10:57 -0700)
committerAndres Freund <[email protected]>
Mon, 11 Jan 2021 23:09:14 +0000 (15:09 -0800)
Author: Robert Haas
Discussion: https://postgr.es/m/CA+Tgmoaj2aPti0yho7FeEf2qt-JgQPRWb0gci_o1Hfr=C56Xng@mail.gmail.com

src/backend/postmaster/pgstat.c
src/backend/storage/buffer/buf_init.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/lmgr/lwlock.c
src/include/pgstat.h
src/include/storage/buf_internals.h
src/include/storage/condition_variable.h
src/include/storage/lwlock.h

index 5b66808a170635a4ed510379e89dbfff5db0dd65..3cf8db9fcc633e511dae2d4f363a68b7c3e68ad0 100644 (file)
@@ -4110,6 +4110,9 @@ pgstat_get_wait_io(WaitEventIO w)
        case WAIT_EVENT_BUFFILE_TRUNCATE:
            event_name = "BufFileTruncate";
            break;
+       case WAIT_EVENT_BUFFILE_WAITIO:
+           event_name = "BufFileWaitIO";
+           break;
        case WAIT_EVENT_CONTROL_FILE_READ:
            event_name = "ControlFileRead";
            break;
index e9e4f35bb5f2fe81aa3855c81dabdc457ad55d7d..850ef8af4f1ef4a571485c14c24342d69f85e681 100644 (file)
@@ -19,7 +19,7 @@
 
 BufferDescPadded *BufferDescriptors;
 char      *BufferBlocks;
-LWLockMinimallyPadded *BufferIOLWLockArray = NULL;
+ConditionVariableMinimallyPadded *BufferIOCVArray = NULL;
 WritebackContext BackendWritebackContext;
 CkptSortItem *CkptBufferIds;
 
@@ -68,7 +68,7 @@ InitBufferPool(void)
 {
    bool        foundBufs,
                foundDescs,
-               foundIOLocks,
+               foundIOCV,
                foundBufCkpt;
 
    /* Align descriptors to a cacheline boundary. */
@@ -82,10 +82,10 @@ InitBufferPool(void)
                        NBuffers * (Size) BLCKSZ, &foundBufs);
 
    /* Align lwlocks to cacheline boundary */
-   BufferIOLWLockArray = (LWLockMinimallyPadded *)
-       ShmemInitStruct("Buffer IO Locks",
-                       NBuffers * (Size) sizeof(LWLockMinimallyPadded),
-                       &foundIOLocks);
+   BufferIOCVArray = (ConditionVariableMinimallyPadded *)
+       ShmemInitStruct("Buffer IO Condition Variables",
+             NBuffers * (Size) sizeof(ConditionVariableMinimallyPadded),
+                                      &foundIOCV);
 
    /*
     * The array used to sort to-be-checkpointed buffer ids is located in
@@ -98,10 +98,10 @@ InitBufferPool(void)
        ShmemInitStruct("Checkpoint BufferIds",
                        NBuffers * sizeof(CkptSortItem), &foundBufCkpt);
 
-   if (foundDescs || foundBufs || foundIOLocks || foundBufCkpt)
+   if (foundDescs || foundBufs || foundIOCV || foundBufCkpt)
    {
        /* should find all of these, or none of them */
-       Assert(foundDescs && foundBufs && foundIOLocks && foundBufCkpt);
+       Assert(foundDescs && foundBufs && foundIOCV && foundBufCkpt);
        /* note: this path is only taken in EXEC_BACKEND case */
    }
    else
@@ -131,8 +131,7 @@ InitBufferPool(void)
            LWLockInitialize(BufferDescriptorGetContentLock(buf),
                             LWTRANCHE_BUFFER_CONTENT);
 
-           LWLockInitialize(BufferDescriptorGetIOLock(buf),
-                            LWTRANCHE_BUFFER_IO);
+           ConditionVariableInit(BufferDescriptorGetIOCV(buf));
        }
 
        /* Correct last entry of linked list */
index 71b5852224fcca27500442cec1aa47a887fdef2f..8cb455df382ebde75677f621bdac81ac7a305205 100644 (file)
@@ -1340,8 +1340,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
    LWLockRelease(newPartitionLock);
 
    /*
-    * Buffer contents are currently invalid.  Try to get the io_in_progress
-    * lock.  If StartBufferIO returns false, then someone else managed to
+    * Buffer contents are currently invalid.  Try to obtain the right to start
+    * I/O.  If StartBufferIO returns false, then someone else managed to
     * read it before we did, so there's nothing left for BufferAlloc() to do.
     */
    if (StartBufferIO(buf, true))
@@ -1765,9 +1765,8 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
         */
        VALGRIND_MAKE_MEM_NOACCESS(BufHdrGetBlock(buf), BLCKSZ);
 
-       /* I'd better not still hold any locks on the buffer */
+       /* I'd better not still hold the buffer content lock */
        Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
-       Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
 
        /*
         * Decrement the shared reference count.
@@ -2730,9 +2729,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
    uint32      buf_state;
 
    /*
-    * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
-    * false, then someone else flushed the buffer before we could, so we need
-    * not do anything.
+    * Try to start an I/O operation.  If StartBufferIO returns false, then
+    * someone else flushed the buffer before we could, so we need not do
+    * anything.
     */
    if (!StartBufferIO(buf, false))
        return;
@@ -2788,7 +2787,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
    /*
     * Now it's safe to write buffer to disk. Note that no one else should
     * have been able to write it while we were busy with log flushing because
-    * we have the io_in_progress lock.
+    * only one process at a time can set the BM_IO_IN_PROGRESS bit.
     */
    bufBlock = BufHdrGetBlock(buf);
 
@@ -2823,7 +2822,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
 
    /*
     * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
-    * end the io_in_progress state.
+    * end the BM_IO_IN_PROGRESS state.
     */
    TerminateBufferIO(buf, true, 0);
 
@@ -4073,7 +4072,7 @@ IsBufferCleanupOK(Buffer buffer)
  * Functions for buffer I/O handling
  *
  * Note: We assume that nested buffer I/O never occurs.
- * i.e at most one io_in_progress lock is held per proc.
+ * i.e at most one BM_IO_IN_PROGRESS bit is set per proc.
  *
  * Also note that these are used only for shared buffers, not local ones.
  */
@@ -4084,13 +4083,10 @@ IsBufferCleanupOK(Buffer buffer)
 static void
 WaitIO(BufferDesc *buf)
 {
-   /*
-    * Changed to wait until there's no IO - Inoue 01/13/2000
-    *
-    * Note this is *necessary* because an error abort in the process doing
-    * I/O could release the io_in_progress_lock prematurely. See
-    * AbortBufferIO.
-    */
+   ConditionVariable   *cv = BufferDescriptorGetIOCV(buf);
+
+   ConditionVariablePrepareToSleep(cv);
+
    for (;;)
    {
        uint32      buf_state;
@@ -4105,9 +4101,9 @@ WaitIO(BufferDesc *buf)
 
        if (!(buf_state & BM_IO_IN_PROGRESS))
            break;
-       LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
-       LWLockRelease(BufferDescriptorGetIOLock(buf));
+       ConditionVariableSleep(cv, WAIT_EVENT_BUFFILE_WAITIO);
    }
+   ConditionVariableCancelSleep();
 }
 
 /*
@@ -4119,7 +4115,7 @@ WaitIO(BufferDesc *buf)
  * In some scenarios there are race conditions in which multiple backends
  * could attempt the same I/O operation concurrently.  If someone else
  * has already started I/O on this buffer then we will block on the
- * io_in_progress lock until he's done.
+ * I/O condition variable until he's done.
  *
  * Input operations are only attempted on buffers that are not BM_VALID,
  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
@@ -4137,25 +4133,11 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 
    for (;;)
    {
-       /*
-        * Grab the io_in_progress lock so that other processes can wait for
-        * me to finish the I/O.
-        */
-       LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
        buf_state = LockBufHdr(buf);
 
        if (!(buf_state & BM_IO_IN_PROGRESS))
            break;
-
-       /*
-        * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
-        * lock isn't held is if the process doing the I/O is recovering from
-        * an error (see AbortBufferIO).  If that's the case, we must wait for
-        * him to get unwedged.
-        */
        UnlockBufHdr(buf, buf_state);
-       LWLockRelease(BufferDescriptorGetIOLock(buf));
        WaitIO(buf);
    }
 
@@ -4165,7 +4147,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
    {
        /* someone else already did the I/O */
        UnlockBufHdr(buf, buf_state);
-       LWLockRelease(BufferDescriptorGetIOLock(buf));
        return false;
    }
 
@@ -4183,7 +4164,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
  * (Assumptions)
  * My process is executing IO for the buffer
  * BM_IO_IN_PROGRESS bit is set for the buffer
- * We hold the buffer's io_in_progress lock
  * The buffer is Pinned
  *
  * If clear_dirty is true and BM_JUST_DIRTIED is not set, we clear the
@@ -4215,7 +4195,7 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
 
    InProgressBuf = NULL;
 
-   LWLockRelease(BufferDescriptorGetIOLock(buf));
+   ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
 }
 
 /*
@@ -4236,14 +4216,6 @@ AbortBufferIO(void)
    {
        uint32      buf_state;
 
-       /*
-        * Since LWLockReleaseAll has already been called, we're not holding
-        * the buffer's io_in_progress_lock. We have to re-acquire it so that
-        * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
-        * buffer will be in a busy spin until we succeed in doing this.
-        */
-       LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
        buf_state = LockBufHdr(buf);
        Assert(buf_state & BM_IO_IN_PROGRESS);
        if (IsForInput)
index db7e59f8b70b6b5e4abdcbdc9515d6afeb892ac2..caad6fb623c498247efc7c1eac913e30cc5c3698 100644 (file)
@@ -146,8 +146,6 @@ static const char *const BuiltinTrancheNames[] = {
    "WALInsert",
    /* LWTRANCHE_BUFFER_CONTENT: */
    "BufferContent",
-   /* LWTRANCHE_BUFFER_IO: */
-   "BufferIO",
    /* LWTRANCHE_REPLICATION_ORIGIN_STATE: */
    "ReplicationOriginState",
    /* LWTRANCHE_REPLICATION_SLOT_IO: */
index c38b68971019c2969a783461f7c79cf3f1b757e5..83198f4ca3b834f38b7e23daac09e451e0440fcf 100644 (file)
@@ -994,6 +994,7 @@ typedef enum
    WAIT_EVENT_BUFFILE_READ,
    WAIT_EVENT_BUFFILE_WRITE,
    WAIT_EVENT_BUFFILE_TRUNCATE,
+   WAIT_EVENT_BUFFILE_WAITIO,
    WAIT_EVENT_CONTROL_FILE_READ,
    WAIT_EVENT_CONTROL_FILE_SYNC,
    WAIT_EVENT_CONTROL_FILE_SYNC_UPDATE,
index f6b578296539569007283d77cc1d2120ddf25fde..532711200a6cccc0f87d72d902e85dae11c85516 100644 (file)
@@ -18,6 +18,7 @@
 #include "port/atomics.h"
 #include "storage/buf.h"
 #include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
@@ -221,12 +222,12 @@ typedef union BufferDescPadded
 
 #define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1)
 
-#define BufferDescriptorGetIOLock(bdesc) \
-   (&(BufferIOLWLockArray[(bdesc)->buf_id]).lock)
+#define BufferDescriptorGetIOCV(bdesc) \
+   (&(BufferIOCVArray[(bdesc)->buf_id]).cv)
 #define BufferDescriptorGetContentLock(bdesc) \
    ((LWLock*) (&(bdesc)->content_lock))
 
-extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray;
+extern PGDLLIMPORT ConditionVariableMinimallyPadded *BufferIOCVArray;
 
 /*
  * The freeNext field is either the index of the next freelist entry,
index 0b7578f8c4cc5f5eddfb877fcad11af30ee6c4e9..4cba0eccffd720c6e20072e5937c4bfb4b681129 100644 (file)
@@ -31,6 +31,17 @@ typedef struct
    proclist_head wakeup;       /* list of wake-able processes */
 } ConditionVariable;
 
+/*
+ * Pad a condition variable to a power-of-two size so that an array of
+ * condition variables does not cross a cache line boundary.
+ */
+#define CV_MINIMAL_SIZE        (sizeof(ConditionVariable) <= 16 ? 16 : 32)
+typedef union ConditionVariableMinimallyPadded
+{
+   ConditionVariable   cv;
+   char        pad[CV_MINIMAL_SIZE];
+} ConditionVariableMinimallyPadded;
+
 /* Initialize a condition variable. */
 extern void ConditionVariableInit(ConditionVariable *cv);
 
index cbf2510fbf5652337efae69dae5c99b863308c59..0ed190f73b34b6f4ee1194253c657e354ed532d0 100644 (file)
@@ -202,7 +202,6 @@ typedef enum BuiltinTrancheIds
    LWTRANCHE_SERIAL_BUFFER,
    LWTRANCHE_WAL_INSERT,
    LWTRANCHE_BUFFER_CONTENT,
-   LWTRANCHE_BUFFER_IO,
    LWTRANCHE_REPLICATION_ORIGIN_STATE,
    LWTRANCHE_REPLICATION_SLOT_IO,
    LWTRANCHE_LOCK_FASTPATH,