Use streaming I/O in sequential scans.
authorThomas Munro <[email protected]>
Sun, 7 Apr 2024 13:48:27 +0000 (01:48 +1200)
committerThomas Munro <[email protected]>
Sun, 7 Apr 2024 13:53:57 +0000 (01:53 +1200)
Instead of calling ReadBuffer() for each block, heap sequential scans
and TID range scans now use the streaming API introduced in b5a9b18cd0.

Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Reviewed-by: Thomas Munro <[email protected]>
Discussion: https://postgr.es/m/flat/CAAKRu_YtXJiYKQvb5JsA2SkwrsizYLugs4sSOZh3EAjKUg%3DgEQ%40mail.gmail.com

src/backend/access/heap/heapam.c
src/include/access/heapam.h

index a32acc90473f1a6430faaa5d57ee65b260897698..2663f52d1a7caf60ce0b2996b7d14979ad151830 100644 (file)
@@ -223,6 +223,68 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
  * ----------------------------------------------------------------
  */
 
+/*
+ * Streaming read API callback for parallel sequential scans. Returns the next
+ * block the caller wants from the read stream or InvalidBlockNumber when done.
+ */
+static BlockNumber
+heap_scan_stream_read_next_parallel(ReadStream *stream,
+                                   void *callback_private_data,
+                                   void *per_buffer_data)
+{
+   HeapScanDesc scan = (HeapScanDesc) callback_private_data;
+
+   Assert(ScanDirectionIsForward(scan->rs_dir));
+   Assert(scan->rs_base.rs_parallel);
+
+   if (unlikely(!scan->rs_inited))
+   {
+       /* parallel scan */
+       table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+                                                scan->rs_parallelworkerdata,
+                                                (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
+
+       /* may return InvalidBlockNumber if there are no more blocks */
+       scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+                                                                   scan->rs_parallelworkerdata,
+                                                                   (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
+       scan->rs_inited = true;
+   }
+   else
+   {
+       scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+                                                                   scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
+                                                                   scan->rs_base.rs_parallel);
+   }
+
+   return scan->rs_prefetch_block;
+}
+
+/*
+ * Streaming read API callback for serial sequential and TID range scans.
+ * Returns the next block the caller wants from the read stream or
+ * InvalidBlockNumber when done.
+ */
+static BlockNumber
+heap_scan_stream_read_next_serial(ReadStream *stream,
+                                 void *callback_private_data,
+                                 void *per_buffer_data)
+{
+   HeapScanDesc scan = (HeapScanDesc) callback_private_data;
+
+   if (unlikely(!scan->rs_inited))
+   {
+       scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir);
+       scan->rs_inited = true;
+   }
+   else
+       scan->rs_prefetch_block = heapgettup_advance_block(scan,
+                                                          scan->rs_prefetch_block,
+                                                          scan->rs_dir);
+
+   return scan->rs_prefetch_block;
+}
+
 /* ----------------
  *     initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -325,6 +387,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
    scan->rs_cbuf = InvalidBuffer;
    scan->rs_cblock = InvalidBlockNumber;
 
+   /*
+    * Initialize to ForwardScanDirection because it is most common and
+    * because heap scans go forward before going backward (e.g. CURSORs).
+    */
+   scan->rs_dir = ForwardScanDirection;
+   scan->rs_prefetch_block = InvalidBlockNumber;
+
    /* page-at-a-time fields are always invalid when not rs_inited */
 
    /*
@@ -508,12 +577,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
 /*
  * heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
  *
- * Read the next block of the scan relation into a buffer and pin that buffer
- * before saving it in the scan descriptor.
+ * Read the next block of the scan relation from the read stream and save it
+ * in the scan descriptor.  It is already pinned.
  */
 static inline void
 heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
 {
+   Assert(scan->rs_read_stream);
+
    /* release previous scan buffer, if any */
    if (BufferIsValid(scan->rs_cbuf))
    {
@@ -528,25 +599,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
     */
    CHECK_FOR_INTERRUPTS();
 
-   if (unlikely(!scan->rs_inited))
+   /*
+    * If the scan direction is changing, reset the prefetch block to the
+    * current block. Otherwise, we will incorrectly prefetch the blocks
+    * between the prefetch block and the current block again before
+    * prefetching blocks in the new, correct scan direction.
+    */
+   if (unlikely(scan->rs_dir != dir))
    {
-       scan->rs_cblock = heapgettup_initial_block(scan, dir);
+       scan->rs_prefetch_block = scan->rs_cblock;
+       read_stream_reset(scan->rs_read_stream);
+   }
 
-       /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
-       Assert(scan->rs_cblock != InvalidBlockNumber ||
-              !BufferIsValid(scan->rs_cbuf));
+   scan->rs_dir = dir;
 
-       scan->rs_inited = true;
-   }
-   else
-       scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock,
-                                                  dir);
-
-   /* read block if valid */
-   if (BlockNumberIsValid(scan->rs_cblock))
-       scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
-                                          scan->rs_cblock, RBM_NORMAL,
-                                          scan->rs_strategy);
+   scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
+   if (BufferIsValid(scan->rs_cbuf))
+       scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
 }
 
 /*
@@ -560,6 +629,7 @@ static pg_noinline BlockNumber
 heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
 {
    Assert(!scan->rs_inited);
+   Assert(scan->rs_base.rs_parallel == NULL);
 
    /* When there are no pages to scan, return InvalidBlockNumber */
    if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
@@ -567,27 +637,10 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
 
    if (ScanDirectionIsForward(dir))
    {
-       /* serial scan */
-       if (scan->rs_base.rs_parallel == NULL)
-           return scan->rs_startblock;
-       else
-       {
-           /* parallel scan */
-           table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
-                                                    scan->rs_parallelworkerdata,
-                                                    (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
-
-           /* may return InvalidBlockNumber if there are no more blocks */
-           return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-                                                    scan->rs_parallelworkerdata,
-                                                    (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
-       }
+       return scan->rs_startblock;
    }
    else
    {
-       /* backward parallel scan not supported */
-       Assert(scan->rs_base.rs_parallel == NULL);
-
        /*
         * Disable reporting to syncscan logic in a backwards scan; it's not
         * very likely anyone else is doing the same thing at the same time,
@@ -699,50 +752,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
 static inline BlockNumber
 heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir)
 {
-   if (ScanDirectionIsForward(dir))
+   Assert(scan->rs_base.rs_parallel == NULL);
+
+   if (likely(ScanDirectionIsForward(dir)))
    {
-       if (scan->rs_base.rs_parallel == NULL)
-       {
-           block++;
+       block++;
 
-           /* wrap back to the start of the heap */
-           if (block >= scan->rs_nblocks)
-               block = 0;
+       /* wrap back to the start of the heap */
+       if (block >= scan->rs_nblocks)
+           block = 0;
 
-           /*
-            * Report our new scan position for synchronization purposes. We
-            * don't do that when moving backwards, however. That would just
-            * mess up any other forward-moving scanners.
-            *
-            * Note: we do this before checking for end of scan so that the
-            * final state of the position hint is back at the start of the
-            * rel.  That's not strictly necessary, but otherwise when you run
-            * the same query multiple times the starting position would shift
-            * a little bit backwards on every invocation, which is confusing.
-            * We don't guarantee any specific ordering in general, though.
-            */
-           if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
-               ss_report_location(scan->rs_base.rs_rd, block);
-
-           /* we're done if we're back at where we started */
-           if (block == scan->rs_startblock)
-               return InvalidBlockNumber;
+       /*
+        * Report our new scan position for synchronization purposes. We don't
+        * do that when moving backwards, however. That would just mess up any
+        * other forward-moving scanners.
+        *
+        * Note: we do this before checking for end of scan so that the final
+        * state of the position hint is back at the start of the rel.  That's
+        * not strictly necessary, but otherwise when you run the same query
+        * multiple times the starting position would shift a little bit
+        * backwards on every invocation, which is confusing. We don't
+        * guarantee any specific ordering in general, though.
+        */
+       if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
+           ss_report_location(scan->rs_base.rs_rd, block);
 
-           /* check if the limit imposed by heap_setscanlimits() is met */
-           if (scan->rs_numblocks != InvalidBlockNumber)
-           {
-               if (--scan->rs_numblocks == 0)
-                   return InvalidBlockNumber;
-           }
+       /* we're done if we're back at where we started */
+       if (block == scan->rs_startblock)
+           return InvalidBlockNumber;
 
-           return block;
-       }
-       else
+       /* check if the limit imposed by heap_setscanlimits() is met */
+       if (scan->rs_numblocks != InvalidBlockNumber)
        {
-           return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-                                                    scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
-                                                    scan->rs_base.rs_parallel);
+           if (--scan->rs_numblocks == 0)
+               return InvalidBlockNumber;
        }
+
+       return block;
    }
    else
    {
@@ -879,6 +925,7 @@ continue_page:
 
    scan->rs_cbuf = InvalidBuffer;
    scan->rs_cblock = InvalidBlockNumber;
+   scan->rs_prefetch_block = InvalidBlockNumber;
    tuple->t_data = NULL;
    scan->rs_inited = false;
 }
@@ -974,6 +1021,7 @@ continue_page:
        ReleaseBuffer(scan->rs_cbuf);
    scan->rs_cbuf = InvalidBuffer;
    scan->rs_cblock = InvalidBlockNumber;
+   scan->rs_prefetch_block = InvalidBlockNumber;
    tuple->t_data = NULL;
    scan->rs_inited = false;
 }
@@ -1069,6 +1117,33 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 
    initscan(scan, key, false);
 
+   scan->rs_read_stream = NULL;
+
+   /*
+    * Set up a read stream for sequential scans and TID range scans. This
+    * should be done after initscan() because initscan() allocates the
+    * BufferAccessStrategy object passed to the streaming read API.
+    */
+   if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
+       scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
+   {
+       ReadStreamBlockNumberCB cb;
+
+       if (scan->rs_base.rs_parallel)
+           cb = heap_scan_stream_read_next_parallel;
+       else
+           cb = heap_scan_stream_read_next_serial;
+
+       scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+                                                         scan->rs_strategy,
+                                                         scan->rs_base.rs_rd,
+                                                         MAIN_FORKNUM,
+                                                         cb,
+                                                         scan,
+                                                         0);
+   }
+
+
    return (TableScanDesc) scan;
 }
 
@@ -1111,6 +1186,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 
    Assert(scan->rs_empty_tuples_pending == 0);
 
+   /*
+    * The read stream is reset on rescan. This must be done before
+    * initscan(), as some state referred to by read_stream_reset() is reset
+    * in initscan().
+    */
+   if (scan->rs_read_stream)
+       read_stream_reset(scan->rs_read_stream);
+
    /*
     * reinitialize scan descriptor
     */
@@ -1135,6 +1218,12 @@ heap_endscan(TableScanDesc sscan)
 
    Assert(scan->rs_empty_tuples_pending == 0);
 
+   /*
+    * Must free the read stream before freeing the BufferAccessStrategy.
+    */
+   if (scan->rs_read_stream)
+       read_stream_end(scan->rs_read_stream);
+
    /*
     * decrement relation reference count and free scan descriptor storage
     */
index 750ea30852e3aae4b5278b967875ebd845fb00ac..48936826bcc2e62caab043bd81bb4616694a2994 100644 (file)
@@ -25,6 +25,7 @@
 #include "storage/bufpage.h"
 #include "storage/dsm.h"
 #include "storage/lockdefs.h"
+#include "storage/read_stream.h"
 #include "storage/shm_toc.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -70,6 +71,20 @@ typedef struct HeapScanDescData
 
    HeapTupleData rs_ctup;      /* current tuple in scan, if any */
 
+   /* For scans that stream reads */
+   ReadStream *rs_read_stream;
+
+   /*
+    * For sequential scans and TID range scans to stream reads. The read
+    * stream is allocated at the beginning of the scan and reset on rescan or
+    * when the scan direction changes. The scan direction is saved each time
+    * a new page is requested. If the scan direction changes from one page to
+    * the next, the read stream releases all previously pinned buffers and
+    * resets the prefetch block.
+    */
+   ScanDirection rs_dir;
+   BlockNumber rs_prefetch_block;
+
    /*
     * For parallel scans to store page allocation data.  NULL when not
     * performing a parallel scan.