From: Robert Haas Date: Sat, 20 Feb 2016 08:14:53 +0000 (+0530) Subject: Replace buffer I/O locks with condition variables. X-Git-Url: http://git.postgresql.org/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fcv;p=users%2Frhaas%2Fpostgres.git Replace buffer I/O locks with condition variables. --- diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index a4163cf717..37ebd37e4f 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -20,7 +20,7 @@ BufferDescPadded *BufferDescriptors; char *BufferBlocks; -LWLockMinimallyPadded *BufferIOLWLockArray = NULL; +ConditionVariableMinimallyPadded *BufferIOCVArray = NULL; LWLockTranche BufferIOLWLockTranche; LWLockTranche BufferContentLWLockTranche; WritebackContext BackendWritebackContext; @@ -71,7 +71,7 @@ InitBufferPool(void) { bool foundBufs, foundDescs, - foundIOLocks, + foundIOCV, foundBufCkpt; /* Align descriptors to a cacheline boundary. */ @@ -85,16 +85,10 @@ InitBufferPool(void) NBuffers * (Size) BLCKSZ, &foundBufs); /* Align lwlocks to cacheline boundary */ - BufferIOLWLockArray = (LWLockMinimallyPadded *) - ShmemInitStruct("Buffer IO Locks", - NBuffers * (Size) sizeof(LWLockMinimallyPadded), - &foundIOLocks); - - BufferIOLWLockTranche.name = "buffer_io"; - BufferIOLWLockTranche.array_base = BufferIOLWLockArray; - BufferIOLWLockTranche.array_stride = sizeof(LWLockMinimallyPadded); - LWLockRegisterTranche(LWTRANCHE_BUFFER_IO_IN_PROGRESS, - &BufferIOLWLockTranche); + BufferIOCVArray = (ConditionVariableMinimallyPadded *) + ShmemInitStruct("Buffer IO Condition Variables", + NBuffers * (Size) sizeof(ConditionVariableMinimallyPadded), + &foundIOCV); BufferContentLWLockTranche.name = "buffer_content"; BufferContentLWLockTranche.array_base = @@ -114,10 +108,10 @@ InitBufferPool(void) ShmemInitStruct("Checkpoint BufferIds", NBuffers * sizeof(CkptSortItem), &foundBufCkpt); - if (foundDescs || foundBufs || foundIOLocks || foundBufCkpt) + if (foundDescs || foundBufs || foundIOCV || foundBufCkpt) { /* should find all of these, or none of them */ - Assert(foundDescs && foundBufs && foundIOLocks && foundBufCkpt); + Assert(foundDescs && foundBufs && foundIOCV && foundBufCkpt); /* note: this path is only taken in EXEC_BACKEND case */ } else @@ -147,8 +141,7 @@ InitBufferPool(void) LWLockInitialize(BufferDescriptorGetContentLock(buf), LWTRANCHE_BUFFER_CONTENT); - LWLockInitialize(BufferDescriptorGetIOLock(buf), - LWTRANCHE_BUFFER_IO_IN_PROGRESS); + ConditionVariableInit(BufferDescriptorGetIOCV(buf)); } /* Correct last entry of linked list */ diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 76ade3727c..403db6e291 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1314,8 +1314,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, LWLockRelease(newPartitionLock); /* - * Buffer contents are currently invalid. Try to get the io_in_progress - * lock. If StartBufferIO returns false, then someone else managed to + * Buffer contents are currently invalid. Try to obtain the right to start + * I/O. If StartBufferIO returns false, then someone else managed to * read it before we did, so there's nothing left for BufferAlloc() to do. */ if (StartBufferIO(buf, true)) @@ -1693,9 +1693,8 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) uint32 buf_state; uint32 old_buf_state; - /* I'd better not still hold any locks on the buffer */ + /* I'd better not still hold the buffer content lock */ Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf))); - Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf))); /* * Decrement the shared reference count. @@ -2656,9 +2655,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) uint32 buf_state; /* - * Acquire the buffer's io_in_progress lock. If StartBufferIO returns - * false, then someone else flushed the buffer before we could, so we need - * not do anything. + * Try to start an I/O operation. If StartBufferIO returns false, then + * someone else flushed the buffer before we could, so we need not do + * anything. */ if (!StartBufferIO(buf, false)) return; @@ -2714,7 +2713,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) /* * Now it's safe to write buffer to disk. Note that no one else should * have been able to write it while we were busy with log flushing because - * we have the io_in_progress lock. + * only one process at a time can set the BM_IO_IN_PROGRESS bit. */ bufBlock = BufHdrGetBlock(buf); @@ -2749,7 +2748,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) /* * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and - * end the io_in_progress state. + * end the BM_IO_IN_PROGRESS state. */ TerminateBufferIO(buf, true, 0); @@ -3755,7 +3754,7 @@ ConditionalLockBufferForCleanup(Buffer buffer) * Functions for buffer I/O handling * * Note: We assume that nested buffer I/O never occurs. - * i.e at most one io_in_progress lock is held per proc. + * i.e at most one BM_IO_IN_PROGRESS bit is set per proc. * * Also note that these are used only for shared buffers, not local ones. */ @@ -3766,13 +3765,6 @@ ConditionalLockBufferForCleanup(Buffer buffer) static void WaitIO(BufferDesc *buf) { - /* - * Changed to wait until there's no IO - Inoue 01/13/2000 - * - * Note this is *necessary* because an error abort in the process doing - * I/O could release the io_in_progress_lock prematurely. See - * AbortBufferIO. - */ for (;;) { uint32 buf_state; @@ -3782,14 +3774,15 @@ WaitIO(BufferDesc *buf) * here, but since this test is essential for correctness, we'd better * play it safe. */ + ConditionVariablePrepareToSleep(BufferDescriptorGetIOCV(buf)); buf_state = LockBufHdr(buf); UnlockBufHdr(buf, buf_state); if (!(buf_state & BM_IO_IN_PROGRESS)) break; - LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED); - LWLockRelease(BufferDescriptorGetIOLock(buf)); + ConditionVariableSleep(); } + ConditionVariableCancelSleep(); } /* @@ -3801,7 +3794,7 @@ WaitIO(BufferDesc *buf) * In some scenarios there are race conditions in which multiple backends * could attempt the same I/O operation concurrently. If someone else * has already started I/O on this buffer then we will block on the - * io_in_progress lock until he's done. + * I/O condition variable until he's done. * * Input operations are only attempted on buffers that are not BM_VALID, * and output operations only on buffers that are BM_VALID and BM_DIRTY, @@ -3819,25 +3812,11 @@ StartBufferIO(BufferDesc *buf, bool forInput) for (;;) { - /* - * Grab the io_in_progress lock so that other processes can wait for - * me to finish the I/O. - */ - LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - buf_state = LockBufHdr(buf); if (!(buf_state & BM_IO_IN_PROGRESS)) break; - - /* - * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress - * lock isn't held is if the process doing the I/O is recovering from - * an error (see AbortBufferIO). If that's the case, we must wait for - * him to get unwedged. - */ UnlockBufHdr(buf, buf_state); - LWLockRelease(BufferDescriptorGetIOLock(buf)); WaitIO(buf); } @@ -3847,7 +3826,6 @@ StartBufferIO(BufferDesc *buf, bool forInput) { /* someone else already did the I/O */ UnlockBufHdr(buf, buf_state); - LWLockRelease(BufferDescriptorGetIOLock(buf)); return false; } @@ -3865,7 +3843,6 @@ StartBufferIO(BufferDesc *buf, bool forInput) * (Assumptions) * My process is executing IO for the buffer * BM_IO_IN_PROGRESS bit is set for the buffer - * We hold the buffer's io_in_progress lock * The buffer is Pinned * * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the @@ -3897,7 +3874,7 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) InProgressBuf = NULL; - LWLockRelease(BufferDescriptorGetIOLock(buf)); + ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf)); } /* @@ -3918,14 +3895,6 @@ AbortBufferIO(void) { uint32 buf_state; - /* - * Since LWLockReleaseAll has already been called, we're not holding - * the buffer's io_in_progress_lock. We have to re-acquire it so that - * we can use TerminateBufferIO. Anyone who's executing WaitIO on the - * buffer will be in a busy spin until we succeed in doing this. - */ - LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - buf_state = LockBufHdr(buf); Assert(buf_state & BM_IO_IN_PROGRESS); if (IsForInput) diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index e0dfb2f5b0..18ef400f41 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -17,6 +17,7 @@ #include "storage/buf.h" #include "storage/bufmgr.h" +#include "storage/condition_variable.h" #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/shmem.h" @@ -221,12 +222,12 @@ typedef union BufferDescPadded #define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1) -#define BufferDescriptorGetIOLock(bdesc) \ - (&(BufferIOLWLockArray[(bdesc)->buf_id]).lock) +#define BufferDescriptorGetIOCV(bdesc) \ + (&(BufferIOCVArray[(bdesc)->buf_id]).cv) #define BufferDescriptorGetContentLock(bdesc) \ ((LWLock*) (&(bdesc)->content_lock)) -extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray; +extern PGDLLIMPORT ConditionVariableMinimallyPadded *BufferIOCVArray; /* * The freeNext field is either the index of the next freelist entry, diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h index 54b7fba05b..38626d5acd 100644 --- a/src/include/storage/condition_variable.h +++ b/src/include/storage/condition_variable.h @@ -31,6 +31,17 @@ typedef struct proclist_head wakeup; } ConditionVariable; +/* + * Pad a condition variable to a power-of-two size so that an array of + * condition variables does not cross a cache line boundary. + */ +#define CV_MINIMAL_SIZE (sizeof(ConditionVariable) <= 16 ? 16 : 32) +typedef union ConditionVariableMinimallyPadded +{ + ConditionVariable cv; + char pad[CV_MINIMAL_SIZE]; +} ConditionVariableMinimallyPadded; + /* Initialize a condition variable. */ extern void ConditionVariableInit(ConditionVariable *);