localbuf: Introduce StartLocalBufferIO()
authorAndres Freund <[email protected]>
Sat, 15 Mar 2025 16:30:07 +0000 (12:30 -0400)
committerAndres Freund <[email protected]>
Sun, 16 Mar 2025 02:07:48 +0000 (22:07 -0400)
To initiate IO on a shared buffer we have StartBufferIO(). For temporary table
buffers no similar function exists - likely because the code for that
currently is very simple due to the lack of concurrency.

However, the upcoming AIO support will make it possible to re-encounter a
local buffer, while the buffer already is the target of IO. In that case we
need to wait for already in-progress IO to complete. This commit makes it
easier to add the necessary code, by introducing StartLocalBufferIO().

Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/CAAKRu_b9anbWzEs5AAF9WCvcEVmgz-1AkHSQ-CLLy-p7WHzvFw@mail.gmail.com

src/backend/storage/buffer/bufmgr.c
src/backend/storage/buffer/localbuf.c
src/include/storage/buf_internals.h

index f3c27d7e77adaf289861e02396bb58752824eee8..79ca9d18d07b04cea6fab7722d43867099aae3c8 100644 (file)
@@ -1038,7 +1038,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
    {
        /* Simple case for non-shared buffers. */
        bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-       need_to_zero = (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
+       need_to_zero = StartLocalBufferIO(bufHdr, true);
    }
    else
    {
@@ -1388,11 +1388,7 @@ static inline bool
 WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
 {
    if (BufferIsLocal(buffer))
-   {
-       BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-
-       return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
-   }
+       return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), true);
    else
        return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
 }
index 8efde05c0a5fb6cc88f30ac558532ef00b44d86f..f172a5c7820732ba6e1ea56f183f1c4516ed59b4 100644 (file)
@@ -183,6 +183,13 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
    instr_time  io_start;
    Page        localpage = (char *) LocalBufHdrGetBlock(bufHdr);
 
+   /*
+    * Try to start an I/O operation.  There currently are no reasons for
+    * StartLocalBufferIO to return false, so we raise an error in that case.
+    */
+   if (!StartLocalBufferIO(bufHdr, false))
+       elog(ERROR, "failed to start write IO on local buffer");
+
    /* Find smgr relation for buffer */
    if (reln == NULL)
        reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
@@ -406,11 +413,17 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
            PinLocalBuffer(existing_hdr, false);
            buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
 
+           /*
+            * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
+            */
            buf_state = pg_atomic_read_u32(&existing_hdr->state);
            Assert(buf_state & BM_TAG_VALID);
            Assert(!(buf_state & BM_DIRTY));
            buf_state &= ~BM_VALID;
            pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
+
+           /* no need to loop for local buffers */
+           StartLocalBufferIO(existing_hdr, true);
        }
        else
        {
@@ -425,6 +438,8 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
            pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
 
            hresult->id = victim_buf_id;
+
+           StartLocalBufferIO(victim_buf_hdr, true);
        }
    }
 
@@ -489,6 +504,27 @@ MarkLocalBufferDirty(Buffer buffer)
    pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 }
 
+/*
+ * Like StartBufferIO, but for local buffers
+ */
+bool
+StartLocalBufferIO(BufferDesc *bufHdr, bool forInput)
+{
+   uint32      buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+   if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
+   {
+       /* someone else already did the I/O */
+       return false;
+   }
+
+   /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
+
+   /* local buffers don't track IO using resowners */
+
+   return true;
+}
+
 /*
  * Like TerminateBufferIO, but for local buffers
  */
index 90bc7e0db7be25318d5726a93e7a882328bd5e81..9327f60c44ccf64efae05c458d70525c1bdb3620 100644 (file)
@@ -473,6 +473,7 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
 extern void MarkLocalBufferDirty(Buffer buffer);
 extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
                                   uint32 set_flag_bits);
+extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput);
 extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
 extern void DropRelationLocalBuffers(RelFileLocator rlocator,
                                     ForkNumber forkNum,