Replace buffer I/O locks with condition variables. cv
authorRobert Haas <[email protected]>
Sat, 20 Feb 2016 08:14:53 +0000 (13:44 +0530)
committerRobert Haas <[email protected]>
Thu, 11 Aug 2016 19:18:11 +0000 (15:18 -0400)
src/backend/storage/buffer/buf_init.c
src/backend/storage/buffer/bufmgr.c
src/include/storage/buf_internals.h
src/include/storage/condition_variable.h

index a4163cf717d91a85a328c58a00d29852567fb2bf..37ebd37e4fa84116a892d4151666b9ed81cdfcdf 100644 (file)
@@ -20,7 +20,7 @@
 
 BufferDescPadded *BufferDescriptors;
 char      *BufferBlocks;
-LWLockMinimallyPadded *BufferIOLWLockArray = NULL;
+ConditionVariableMinimallyPadded *BufferIOCVArray = NULL;
 LWLockTranche BufferIOLWLockTranche;
 LWLockTranche BufferContentLWLockTranche;
 WritebackContext BackendWritebackContext;
@@ -71,7 +71,7 @@ InitBufferPool(void)
 {
        bool            foundBufs,
                                foundDescs,
-                               foundIOLocks,
+                               foundIOCV,
                                foundBufCkpt;
 
        /* Align descriptors to a cacheline boundary. */
@@ -85,16 +85,10 @@ InitBufferPool(void)
                                                NBuffers * (Size) BLCKSZ, &foundBufs);
 
        /* Align lwlocks to cacheline boundary */
-       BufferIOLWLockArray = (LWLockMinimallyPadded *)
-               ShmemInitStruct("Buffer IO Locks",
-                                               NBuffers * (Size) sizeof(LWLockMinimallyPadded),
-                                               &foundIOLocks);
-
-       BufferIOLWLockTranche.name = "buffer_io";
-       BufferIOLWLockTranche.array_base = BufferIOLWLockArray;
-       BufferIOLWLockTranche.array_stride = sizeof(LWLockMinimallyPadded);
-       LWLockRegisterTranche(LWTRANCHE_BUFFER_IO_IN_PROGRESS,
-                                                 &BufferIOLWLockTranche);
+       BufferIOCVArray = (ConditionVariableMinimallyPadded *)
+               ShmemInitStruct("Buffer IO Condition Variables",
+                         NBuffers * (Size) sizeof(ConditionVariableMinimallyPadded),
+                                                                          &foundIOCV);
 
        BufferContentLWLockTranche.name = "buffer_content";
        BufferContentLWLockTranche.array_base =
@@ -114,10 +108,10 @@ InitBufferPool(void)
                ShmemInitStruct("Checkpoint BufferIds",
                                                NBuffers * sizeof(CkptSortItem), &foundBufCkpt);
 
-       if (foundDescs || foundBufs || foundIOLocks || foundBufCkpt)
+       if (foundDescs || foundBufs || foundIOCV || foundBufCkpt)
        {
                /* should find all of these, or none of them */
-               Assert(foundDescs && foundBufs && foundIOLocks && foundBufCkpt);
+               Assert(foundDescs && foundBufs && foundIOCV && foundBufCkpt);
                /* note: this path is only taken in EXEC_BACKEND case */
        }
        else
@@ -147,8 +141,7 @@ InitBufferPool(void)
                        LWLockInitialize(BufferDescriptorGetContentLock(buf),
                                                         LWTRANCHE_BUFFER_CONTENT);
 
-                       LWLockInitialize(BufferDescriptorGetIOLock(buf),
-                                                        LWTRANCHE_BUFFER_IO_IN_PROGRESS);
+                       ConditionVariableInit(BufferDescriptorGetIOCV(buf));
                }
 
                /* Correct last entry of linked list */
index 76ade3727cd0ee8dc7364fdae21789c6a26847d0..403db6e291bdf6cc358ee0304fe203f760c39c70 100644 (file)
@@ -1314,8 +1314,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
        LWLockRelease(newPartitionLock);
 
        /*
-        * Buffer contents are currently invalid.  Try to get the io_in_progress
-        * lock.  If StartBufferIO returns false, then someone else managed to
+        * Buffer contents are currently invalid.  Try to obtain the right to start
+        * I/O.  If StartBufferIO returns false, then someone else managed to
         * read it before we did, so there's nothing left for BufferAlloc() to do.
         */
        if (StartBufferIO(buf, true))
@@ -1693,9 +1693,8 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
                uint32          buf_state;
                uint32          old_buf_state;
 
-               /* I'd better not still hold any locks on the buffer */
+               /* I'd better not still hold the buffer content lock */
                Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
-               Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
 
                /*
                 * Decrement the shared reference count.
@@ -2656,9 +2655,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
        uint32          buf_state;
 
        /*
-        * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
-        * false, then someone else flushed the buffer before we could, so we need
-        * not do anything.
+        * Try to start an I/O operation.  If StartBufferIO returns false, then
+        * someone else flushed the buffer before we could, so we need not do
+        * anything.
         */
        if (!StartBufferIO(buf, false))
                return;
@@ -2714,7 +2713,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
        /*
         * Now it's safe to write buffer to disk. Note that no one else should
         * have been able to write it while we were busy with log flushing because
-        * we have the io_in_progress lock.
+        * only one process at a time can set the BM_IO_IN_PROGRESS bit.
         */
        bufBlock = BufHdrGetBlock(buf);
 
@@ -2749,7 +2748,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
 
        /*
         * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
-        * end the io_in_progress state.
+        * end the BM_IO_IN_PROGRESS state.
         */
        TerminateBufferIO(buf, true, 0);
 
@@ -3755,7 +3754,7 @@ ConditionalLockBufferForCleanup(Buffer buffer)
  *     Functions for buffer I/O handling
  *
  *     Note: We assume that nested buffer I/O never occurs.
- *     i.e at most one io_in_progress lock is held per proc.
+ *     i.e at most one BM_IO_IN_PROGRESS bit is set per proc.
  *
  *     Also note that these are used only for shared buffers, not local ones.
  */
@@ -3766,13 +3765,6 @@ ConditionalLockBufferForCleanup(Buffer buffer)
 static void
 WaitIO(BufferDesc *buf)
 {
-       /*
-        * Changed to wait until there's no IO - Inoue 01/13/2000
-        *
-        * Note this is *necessary* because an error abort in the process doing
-        * I/O could release the io_in_progress_lock prematurely. See
-        * AbortBufferIO.
-        */
        for (;;)
        {
                uint32          buf_state;
@@ -3782,14 +3774,15 @@ WaitIO(BufferDesc *buf)
                 * here, but since this test is essential for correctness, we'd better
                 * play it safe.
                 */
+               ConditionVariablePrepareToSleep(BufferDescriptorGetIOCV(buf));
                buf_state = LockBufHdr(buf);
                UnlockBufHdr(buf, buf_state);
 
                if (!(buf_state & BM_IO_IN_PROGRESS))
                        break;
-               LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
-               LWLockRelease(BufferDescriptorGetIOLock(buf));
+               ConditionVariableSleep();
        }
+       ConditionVariableCancelSleep();
 }
 
 /*
@@ -3801,7 +3794,7 @@ WaitIO(BufferDesc *buf)
  * In some scenarios there are race conditions in which multiple backends
  * could attempt the same I/O operation concurrently.  If someone else
  * has already started I/O on this buffer then we will block on the
- * io_in_progress lock until he's done.
+ * I/O condition variable until he's done.
  *
  * Input operations are only attempted on buffers that are not BM_VALID,
  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
@@ -3819,25 +3812,11 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 
        for (;;)
        {
-               /*
-                * Grab the io_in_progress lock so that other processes can wait for
-                * me to finish the I/O.
-                */
-               LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
                buf_state = LockBufHdr(buf);
 
                if (!(buf_state & BM_IO_IN_PROGRESS))
                        break;
-
-               /*
-                * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
-                * lock isn't held is if the process doing the I/O is recovering from
-                * an error (see AbortBufferIO).  If that's the case, we must wait for
-                * him to get unwedged.
-                */
                UnlockBufHdr(buf, buf_state);
-               LWLockRelease(BufferDescriptorGetIOLock(buf));
                WaitIO(buf);
        }
 
@@ -3847,7 +3826,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
        {
                /* someone else already did the I/O */
                UnlockBufHdr(buf, buf_state);
-               LWLockRelease(BufferDescriptorGetIOLock(buf));
                return false;
        }
 
@@ -3865,7 +3843,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
  *     (Assumptions)
  *     My process is executing IO for the buffer
  *     BM_IO_IN_PROGRESS bit is set for the buffer
- *     We hold the buffer's io_in_progress lock
  *     The buffer is Pinned
  *
  * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
@@ -3897,7 +3874,7 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
 
        InProgressBuf = NULL;
 
-       LWLockRelease(BufferDescriptorGetIOLock(buf));
+       ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
 }
 
 /*
@@ -3918,14 +3895,6 @@ AbortBufferIO(void)
        {
                uint32          buf_state;
 
-               /*
-                * Since LWLockReleaseAll has already been called, we're not holding
-                * the buffer's io_in_progress_lock. We have to re-acquire it so that
-                * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
-                * buffer will be in a busy spin until we succeed in doing this.
-                */
-               LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
                buf_state = LockBufHdr(buf);
                Assert(buf_state & BM_IO_IN_PROGRESS);
                if (IsForInput)
index e0dfb2f5b03b99add82df846f193d28e55d430f8..18ef400f41ba3075b1288ed1694a673980393842 100644 (file)
@@ -17,6 +17,7 @@
 
 #include "storage/buf.h"
 #include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
@@ -221,12 +222,12 @@ typedef union BufferDescPadded
 
 #define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1)
 
-#define BufferDescriptorGetIOLock(bdesc) \
-       (&(BufferIOLWLockArray[(bdesc)->buf_id]).lock)
+#define BufferDescriptorGetIOCV(bdesc) \
+       (&(BufferIOCVArray[(bdesc)->buf_id]).cv)
 #define BufferDescriptorGetContentLock(bdesc) \
        ((LWLock*) (&(bdesc)->content_lock))
 
-extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray;
+extern PGDLLIMPORT ConditionVariableMinimallyPadded *BufferIOCVArray;
 
 /*
  * The freeNext field is either the index of the next freelist entry,
index 54b7fba05b5b5bd238290f08d70344910bd4a688..38626d5acdfca4117fe3f9a3930b461b3066853e 100644 (file)
@@ -31,6 +31,17 @@ typedef struct
        proclist_head   wakeup;
 } ConditionVariable;
 
+/*
+ * Pad a condition variable to a power-of-two size so that an array of
+ * condition variables does not cross a cache line boundary.
+ */
+#define CV_MINIMAL_SIZE                (sizeof(ConditionVariable) <= 16 ? 16 : 32)
+typedef union ConditionVariableMinimallyPadded
+{
+       ConditionVariable       cv;
+       char            pad[CV_MINIMAL_SIZE];
+} ConditionVariableMinimallyPadded;
+
 /* Initialize a condition variable. */
 extern void ConditionVariableInit(ConditionVariable *);