Revert: Get rid of WALBufMappingLock
authorAlexander Korotkov <[email protected]>
Mon, 17 Feb 2025 10:35:28 +0000 (12:35 +0200)
committerAlexander Korotkov <[email protected]>
Mon, 17 Feb 2025 10:35:28 +0000 (12:35 +0200)
This commit reverts 6a2275b895.  Buildfarm failure on batta spots some
concurrency issue, which requires further investigation.

src/backend/access/transam/xlog.c
src/backend/utils/activity/wait_event_names.txt
src/include/storage/lwlocklist.h

index 75d5554c77c2a3f7f02f74ecaa2423ba240a39a3..25a5c6054049e8927b0bd8224e9a614139f0d572 100644 (file)
@@ -302,6 +302,11 @@ static bool doPageWrites;
  * so it's a plain spinlock.  The other locks are held longer (potentially
  * over I/O operations), so we use LWLocks for them.  These locks are:
  *
+ * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
+ * It is only held while initializing and changing the mapping.  If the
+ * contents of the buffer being replaced haven't been written yet, the mapping
+ * lock is released while the write is done, and reacquired afterwards.
+ *
  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
  * XLogFlush).
  *
@@ -468,32 +473,21 @@ typedef struct XLogCtlData
    pg_atomic_uint64 logFlushResult;    /* last byte + 1 flushed */
 
    /*
-    * Latest reserved for inititalization page in the cache (last byte
-    * position + 1).
+    * Latest initialized page in the cache (last byte position + 1).
     *
-    * To change the identity of a buffer, you need to advance
-    * InitializeReserved first.  To change the identity of a buffer that's
+    * To change the identity of a buffer (and InitializedUpTo), you need to
+    * hold WALBufMappingLock.  To change the identity of a buffer that's
     * still dirty, the old page needs to be written out first, and for that
     * you need WALWriteLock, and you need to ensure that there are no
     * in-progress insertions to the page by calling
     * WaitXLogInsertionsToFinish().
     */
-   pg_atomic_uint64 InitializeReserved;
-
-   /*
-    * Latest initialized page in the cache (last byte position + 1).
-    *
-    * InitializedUpTo is updated after the buffer initialization.  After
-    * update, waiters got notification using InitializedUpToCondVar.
-    */
-   pg_atomic_uint64 InitializedUpTo;
-   ConditionVariable InitializedUpToCondVar;
+   XLogRecPtr  InitializedUpTo;
 
    /*
     * These values do not change after startup, although the pointed-to pages
-    * and xlblocks values certainly do.  xlblocks values are changed
-    * lock-free according to the check for the xlog write position and are
-    * accompanied by changes of InitializeReserved and InitializedUpTo.
+    * and xlblocks values certainly do.  xlblocks values are protected by
+    * WALBufMappingLock.
     */
    char       *pages;          /* buffers for unwritten XLOG pages */
    pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -816,9 +810,9 @@ XLogInsertRecord(XLogRecData *rdata,
     * fullPageWrites from changing until the insertion is finished.
     *
     * Step 2 can usually be done completely in parallel. If the required WAL
-    * page is not initialized yet, you have to go through AdvanceXLInsertBuffer,
-    * which will ensure it is initialized. But the WAL writer tries to do that
-    * ahead of insertions to avoid that from happening in the critical path.
+    * page is not initialized yet, you have to grab WALBufMappingLock to
+    * initialize it, but the WAL writer tries to do that ahead of insertions
+    * to avoid that from happening in the critical path.
     *
     *----------
     */
@@ -1997,70 +1991,32 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
    XLogRecPtr  NewPageEndPtr = InvalidXLogRecPtr;
    XLogRecPtr  NewPageBeginPtr;
    XLogPageHeader NewPage;
-   XLogRecPtr  ReservedPtr;
    int         npages pg_attribute_unused() = 0;
 
-   /*
-    * We must run the loop below inside the critical section as we expect
-    * XLogCtl->InitializedUpTo to eventually keep up.  The most of callers
-    * already run inside the critical section. Except for WAL writer, which
-    * passed 'opportunistic == true', and therefore we don't perform
-    * operations that could error out.
-    *
-    * Start an explicit critical section anyway though.
-    */
-   Assert(CritSectionCount > 0 || opportunistic);
-   START_CRIT_SECTION();
+   LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
 
-   /*--
-    * Loop till we get all the pages in WAL buffer before 'upto' reserved for
-    * initialization.  Multiple process can initialize different buffers with
-    * this loop in parallel as following.
-    *
-    * 1. Reserve page for initialization using XLogCtl->InitializeReserved.
-    * 2. Initialize the reserved page.
-    * 3. Attempt to advance XLogCtl->InitializedUpTo,
+   /*
+    * Now that we have the lock, check if someone initialized the page
+    * already.
     */
-   ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved);
-   while (upto >= ReservedPtr || opportunistic)
+   while (upto >= XLogCtl->InitializedUpTo || opportunistic)
    {
-       Assert(ReservedPtr % XLOG_BLCKSZ == 0);
+       nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
 
        /*
-        * Get ending-offset of the buffer page we need to replace.
-        *
-        * We don't lookup into xlblocks, but rather calculate position we
-        * must wait to be written. If it was written, xlblocks will have this
-        * position (or uninitialized)
+        * Get ending-offset of the buffer page we need to replace (this may
+        * be zero if the buffer hasn't been used yet).  Fall through if it's
+        * already written out.
         */
-       if (ReservedPtr + XLOG_BLCKSZ > XLOG_BLCKSZ * XLOGbuffers)
-           OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - XLOG_BLCKSZ * XLOGbuffers;
-       else
-           OldPageRqstPtr = InvalidXLogRecPtr;
-
-       if (LogwrtResult.Write < OldPageRqstPtr && opportunistic)
+       OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]);
+       if (LogwrtResult.Write < OldPageRqstPtr)
        {
            /*
-            * If we just want to pre-initialize as much as we can without
-            * flushing, give up now.
+            * Nope, got work to do. If we just want to pre-initialize as much
+            * as we can without flushing, give up now.
             */
-           upto = ReservedPtr - 1;
-           break;
-       }
-
-       /*
-        * Attempt to reserve the page for initialization.  Failure means that
-        * this page got reserved by another process.
-        */
-       if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved,
-                                           &ReservedPtr,
-                                           ReservedPtr + XLOG_BLCKSZ))
-           continue;
-
-       /* Fall through if it's already written out. */
-       if (LogwrtResult.Write < OldPageRqstPtr)
-       {
-           /* Nope, got work to do. */
+           if (opportunistic)
+               break;
 
            /* Advance shared memory write request position */
            SpinLockAcquire(&XLogCtl->info_lck);
@@ -2075,6 +2031,14 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
            RefreshXLogWriteResult(LogwrtResult);
            if (LogwrtResult.Write < OldPageRqstPtr)
            {
+               /*
+                * Must acquire write lock. Release WALBufMappingLock first,
+                * to make sure that all insertions that we need to wait for
+                * can finish (up to this same position). Otherwise we risk
+                * deadlock.
+                */
+               LWLockRelease(WALBufMappingLock);
+
                WaitXLogInsertionsToFinish(OldPageRqstPtr);
 
                LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
@@ -2096,6 +2060,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
                    pgWalUsage.wal_buffers_full++;
                    TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
                }
+               /* Re-acquire WALBufMappingLock and retry */
+               LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+               continue;
            }
        }
 
@@ -2103,17 +2070,10 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
         * Now the next buffer slot is free and we can set it up to be the
         * next output page.
         */
-       NewPageBeginPtr = ReservedPtr;
+       NewPageBeginPtr = XLogCtl->InitializedUpTo;
        NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
-       nextidx = XLogRecPtrToBufIdx(ReservedPtr);
 
-#ifdef USE_ASSERT_CHECKING
-       {
-           XLogRecPtr  storedBound = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]);
-
-           Assert(storedBound == OldPageRqstPtr || storedBound == InvalidXLogRecPtr);
-       }
-#endif
+       Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
 
        NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
 
@@ -2179,50 +2139,11 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
        pg_write_barrier();
 
        pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr);
-
-       /*
-        * Try to advance XLogCtl->InitializedUpTo.
-        *
-        * If the CAS operation failed, then some of previous pages are not
-        * initialized yet, and this backend gives up.
-        *
-        * Since initializer of next page might give up on advancing of
-        * InitializedUpTo, this backend have to attempt advancing until it
-        * find page "in the past" or concurrent backend succeeded at
-        * advancing.  When we finish advancing XLogCtl->InitializedUpTo, we
-        * notify all the waiters with XLogCtl->InitializedUpToCondVar.
-        */
-       while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr))
-       {
-           NewPageBeginPtr = NewPageEndPtr;
-           NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
-           nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr);
-
-           if (pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) != NewPageEndPtr)
-           {
-               /*
-                * Page at nextidx wasn't initialized yet, so we cann't move
-                * InitializedUpto further. It will be moved by backend which
-                * will initialize nextidx.
-                */
-               ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar);
-               break;
-           }
-       }
+       XLogCtl->InitializedUpTo = NewPageEndPtr;
 
        npages++;
    }
-
-   END_CRIT_SECTION();
-
-   /*
-    * All the pages in WAL buffer before 'upto' were reserved for
-    * initialization.  However, some pages might be reserved by concurrent
-    * processes.  Wait till they finish initialization.
-    */
-   while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo))
-       ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
-   ConditionVariableCancelSleep();
+   LWLockRelease(WALBufMappingLock);
 
 #ifdef WAL_DEBUG
    if (XLOG_DEBUG && npages > 0)
@@ -5123,10 +5044,6 @@ XLOGShmemInit(void)
    pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
    pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
    pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
-
-   pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr);
-   pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr);
-   ConditionVariableInit(&XLogCtl->InitializedUpToCondVar);
 }
 
 /*
@@ -6146,7 +6063,7 @@ StartupXLOG(void)
        memset(page + len, 0, XLOG_BLCKSZ - len);
 
        pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
-       pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
+       XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ;
    }
    else
    {
@@ -6155,9 +6072,8 @@ StartupXLOG(void)
         * let the first attempt to insert a log record to initialize the next
         * buffer.
         */
-       pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog);
+       XLogCtl->InitializedUpTo = EndOfLog;
    }
-   pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo));
 
    /*
     * Update local and shared status.  This is OK to do without any locks
index ccf73781d81a6981fe62c579a4a85cc2cba389b0..e199f071628987ec0626847d8c18632293dd3e89 100644 (file)
@@ -155,7 +155,6 @@ REPLICATION_SLOT_DROP   "Waiting for a replication slot to become inactive so it c
 RESTORE_COMMAND    "Waiting for <xref linkend="guc-restore-command"/> to complete."
 SAFE_SNAPSHOT  "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction."
 SYNC_REP   "Waiting for confirmation from a remote server during synchronous replication."
-WAL_BUFFER_INIT    "Waiting on WAL buffer to be initialized."
 WAL_RECEIVER_EXIT  "Waiting for the WAL receiver to exit."
 WAL_RECEIVER_WAIT_START    "Waiting for startup process to send initial data for streaming replication."
 WAL_SUMMARY_READY  "Waiting for a new WAL summary to be generated."
@@ -311,6 +310,7 @@ XidGen  "Waiting to allocate a new transaction ID."
 ProcArray  "Waiting to access the shared per-process data structures (typically, to get a snapshot or report a session's transaction ID)."
 SInvalRead "Waiting to retrieve messages from the shared catalog invalidation queue."
 SInvalWrite    "Waiting to add a message to the shared catalog invalidation queue."
+WALBufMapping  "Waiting to replace a page in WAL buffers."
 WALWrite   "Waiting for WAL buffers to be written to disk."
 ControlFile    "Waiting to read or update the <filename>pg_control</filename> file or create a new WAL file."
 MultiXactGen   "Waiting to read or update shared multixact state."
index ff897515769a2dcac6debe57999fc59d027de339..cf565452382be392f7db53e4baacb3bd27d133c5 100644 (file)
@@ -37,7 +37,7 @@ PG_LWLOCK(3, XidGen)
 PG_LWLOCK(4, ProcArray)
 PG_LWLOCK(5, SInvalRead)
 PG_LWLOCK(6, SInvalWrite)
-/* 7 was WALBufMapping */
+PG_LWLOCK(7, WALBufMapping)
 PG_LWLOCK(8, WALWrite)
 PG_LWLOCK(9, ControlFile)
 /* 10 was CheckpointLock */