Optimize pg_visibility with read streams.
authorNoah Misch <[email protected]>
Tue, 3 Sep 2024 17:46:20 +0000 (10:46 -0700)
committerNoah Misch <[email protected]>
Tue, 3 Sep 2024 17:46:20 +0000 (10:46 -0700)
We've measured 5% performance improvement, and this arranges to benefit
automatically from future optimizations to the read_stream subsystem.

Nazir Bilal Yavuz

Discussion: https://postgr.es/m/CAN55FZ1_Ru3XpMgTwsU67FTH2fs_FrRROmb7x6zs+F44QBEiww@mail.gmail.com

contrib/pg_visibility/pg_visibility.c

index 773ba92e454fd288fccc025cb333f27ffc86a376..9975e8876e63077eb2328df3415ba890b6a77f86 100644 (file)
@@ -21,6 +21,7 @@
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
@@ -41,6 +42,17 @@ typedef struct corrupt_items
    ItemPointer tids;
 } corrupt_items;
 
+/* for collect_corrupt_items_read_stream_next_block */
+struct collect_corrupt_items_read_stream_private
+{
+   bool        all_frozen;
+   bool        all_visible;
+   BlockNumber current_blocknum;
+   BlockNumber last_exclusive;
+   Relation    rel;
+   Buffer      vmbuffer;
+};
+
 PG_FUNCTION_INFO_V1(pg_visibility_map);
 PG_FUNCTION_INFO_V1(pg_visibility_map_rel);
 PG_FUNCTION_INFO_V1(pg_visibility);
@@ -478,6 +490,7 @@ collect_visibility_data(Oid relid, bool include_pd)
    BlockNumber blkno;
    Buffer      vmbuffer = InvalidBuffer;
    BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
+   ReadStream *stream = NULL;
 
    rel = relation_open(relid, AccessShareLock);
 
@@ -489,6 +502,22 @@ collect_visibility_data(Oid relid, bool include_pd)
    info->next = 0;
    info->count = nblocks;
 
+   /* Create a stream if reading main fork. */
+   if (include_pd)
+   {
+       BlockRangeReadStreamPrivate p;
+
+       p.current_blocknum = 0;
+       p.last_exclusive = nblocks;
+       stream = read_stream_begin_relation(READ_STREAM_FULL,
+                                           bstrategy,
+                                           rel,
+                                           MAIN_FORKNUM,
+                                           block_range_read_stream_cb,
+                                           &p,
+                                           0);
+   }
+
    for (blkno = 0; blkno < nblocks; ++blkno)
    {
        int32       mapbits;
@@ -513,8 +542,7 @@ collect_visibility_data(Oid relid, bool include_pd)
            Buffer      buffer;
            Page        page;
 
-           buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-                                       bstrategy);
+           buffer = read_stream_next_buffer(stream, NULL);
            LockBuffer(buffer, BUFFER_LOCK_SHARE);
 
            page = BufferGetPage(buffer);
@@ -525,6 +553,12 @@ collect_visibility_data(Oid relid, bool include_pd)
        }
    }
 
+   if (include_pd)
+   {
+       Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+       read_stream_end(stream);
+   }
+
    /* Clean up. */
    if (vmbuffer != InvalidBuffer)
        ReleaseBuffer(vmbuffer);
@@ -610,6 +644,38 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
    }
 }
 
+/*
+ * Callback function to get next block for read stream object used in
+ * collect_corrupt_items() function.
+ */
+static BlockNumber
+collect_corrupt_items_read_stream_next_block(ReadStream *stream,
+                                            void *callback_private_data,
+                                            void *per_buffer_data)
+{
+   struct collect_corrupt_items_read_stream_private *p = callback_private_data;
+
+   for (; p->current_blocknum < p->last_exclusive; p->current_blocknum++)
+   {
+       bool        check_frozen = false;
+       bool        check_visible = false;
+
+       /* Make sure we are interruptible. */
+       CHECK_FOR_INTERRUPTS();
+
+       if (p->all_frozen && VM_ALL_FROZEN(p->rel, p->current_blocknum, &p->vmbuffer))
+           check_frozen = true;
+       if (p->all_visible && VM_ALL_VISIBLE(p->rel, p->current_blocknum, &p->vmbuffer))
+           check_visible = true;
+       if (!check_visible && !check_frozen)
+           continue;
+
+       return p->current_blocknum++;
+   }
+
+   return InvalidBlockNumber;
+}
+
 /*
  * Returns a list of items whose visibility map information does not match
  * the status of the tuples on the page.
@@ -628,12 +694,13 @@ static corrupt_items *
 collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 {
    Relation    rel;
-   BlockNumber nblocks;
    corrupt_items *items;
-   BlockNumber blkno;
    Buffer      vmbuffer = InvalidBuffer;
    BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
    TransactionId OldestXmin = InvalidTransactionId;
+   struct collect_corrupt_items_read_stream_private p;
+   ReadStream *stream;
+   Buffer      buffer;
 
    rel = relation_open(relid, AccessShareLock);
 
@@ -643,8 +710,6 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
    if (all_visible)
        OldestXmin = GetStrictOldestNonRemovableTransactionId(rel);
 
-   nblocks = RelationGetNumberOfBlocks(rel);
-
    /*
     * Guess an initial array size. We don't expect many corrupted tuples, so
     * start with a small array.  This function uses the "next" field to track
@@ -658,42 +723,46 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
    items->count = 64;
    items->tids = palloc(items->count * sizeof(ItemPointerData));
 
+   p.current_blocknum = 0;
+   p.last_exclusive = RelationGetNumberOfBlocks(rel);
+   p.rel = rel;
+   p.vmbuffer = InvalidBuffer;
+   p.all_frozen = all_frozen;
+   p.all_visible = all_visible;
+   stream = read_stream_begin_relation(READ_STREAM_FULL,
+                                       bstrategy,
+                                       rel,
+                                       MAIN_FORKNUM,
+                                       collect_corrupt_items_read_stream_next_block,
+                                       &p,
+                                       0);
+
    /* Loop over every block in the relation. */
-   for (blkno = 0; blkno < nblocks; ++blkno)
+   while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
    {
        bool        check_frozen = false;
        bool        check_visible = false;
-       Buffer      buffer;
        Page        page;
        OffsetNumber offnum,
                    maxoff;
+       BlockNumber blkno;
 
        /* Make sure we are interruptible. */
        CHECK_FOR_INTERRUPTS();
 
-       /* Use the visibility map to decide whether to check this page. */
-       if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer))
-           check_frozen = true;
-       if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
-           check_visible = true;
-       if (!check_visible && !check_frozen)
-           continue;
-
-       /* Read and lock the page. */
-       buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-                                   bstrategy);
        LockBuffer(buffer, BUFFER_LOCK_SHARE);
 
        page = BufferGetPage(buffer);
        maxoff = PageGetMaxOffsetNumber(page);
+       blkno = BufferGetBlockNumber(buffer);
 
        /*
         * The visibility map bits might have changed while we were acquiring
         * the page lock.  Recheck to avoid returning spurious results.
         */
-       if (check_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
+       if (all_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
            check_frozen = false;
-       if (check_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
+       if (all_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
            check_visible = false;
        if (!check_visible && !check_frozen)
        {
@@ -778,10 +847,13 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 
        UnlockReleaseBuffer(buffer);
    }
+   read_stream_end(stream);
 
    /* Clean up. */
    if (vmbuffer != InvalidBuffer)
        ReleaseBuffer(vmbuffer);
+   if (p.vmbuffer != InvalidBuffer)
+       ReleaseBuffer(p.vmbuffer);
    relation_close(rel, AccessShareLock);
 
    /*