aio: heapam prefetching.
authorAndres Freund <[email protected]>
Mon, 9 Nov 2020 22:20:42 +0000 (14:20 -0800)
committerAndres Freund <[email protected]>
Mon, 11 Jan 2021 23:09:15 +0000 (15:09 -0800)
Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:

src/backend/access/heap/heapam.c
src/backend/access/heap/heapam_handler.c
src/include/access/heapam.h

index 61f7afbeca71cf0b73cc7a1117330bc2b862607a..4ffc11bcaba04e78d49b3eccc9aac9680151b242 100644 (file)
@@ -55,6 +55,8 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/atomics.h"
+#include "storage/aio.h"
+#include "storage/block.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
@@ -201,6 +203,123 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
  * ----------------------------------------------------------------
  */
 
+static PgStreamingReadNextStatus
+heap_pgsr_next_single(uintptr_t pgsr_private, PgAioInProgress *aio, uintptr_t *read_private)
+{
+   HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+   Buffer buf;
+   bool already_valid;
+   BlockNumber blockno;
+
+   Assert(scan->rs_inited);
+   Assert(!scan->rs_base.rs_parallel);
+   Assert(scan->rs_nblocks > 0);
+
+   if (scan->rs_prefetch_block == InvalidBlockNumber)
+   {
+       scan->rs_prefetch_block = blockno = scan->rs_startblock;
+   }
+   else
+   {
+       blockno = ++scan->rs_prefetch_block;
+       if (blockno >= scan->rs_nblocks)
+           scan->rs_prefetch_block = blockno = 0;
+       if (blockno == scan->rs_startblock ||
+           (scan->rs_numblocks != InvalidBlockNumber &&
+            --scan->rs_numblocks == 0))
+       {
+           *read_private = 0;
+           return PGSR_NEXT_END;
+       }
+   }
+
+   buf = ReadBufferAsync(scan->rs_base.rs_rd, MAIN_FORKNUM, blockno,
+                         RBM_NORMAL, scan->rs_strategy, &already_valid,
+                         &aio);
+
+   *read_private = (uintptr_t) buf;
+
+   if (already_valid)
+       return PGSR_NEXT_NO_IO;
+   else
+       return PGSR_NEXT_IO;
+}
+
+static PgStreamingReadNextStatus
+heap_pgsr_next_parallel(uintptr_t pgsr_private, PgAioInProgress *aio, uintptr_t *read_private)
+{
+   HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+   ParallelBlockTableScanDesc pbscan =
+       (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+   ParallelBlockTableScanWorker pbscanwork =
+       (ParallelBlockTableScanWorker) scan->rs_base.rs_private;
+   BlockNumber blockno;
+   Buffer buf;
+   bool already_valid;
+
+   Assert(scan->rs_inited);
+   Assert(scan->rs_base.rs_parallel);
+   Assert(scan->rs_nblocks > 0);
+
+   blockno = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+                                               pbscanwork, pbscan);
+
+   /* Other processes might have already finished the scan. */
+   if (blockno == InvalidBlockNumber)
+   {
+       *read_private = 0;
+       return PGSR_NEXT_END;
+   }
+
+   buf = ReadBufferAsync(scan->rs_base.rs_rd, MAIN_FORKNUM, blockno,
+                         RBM_NORMAL, scan->rs_strategy, &already_valid,
+                         &aio);
+   *read_private = (uintptr_t) buf;
+
+   if (already_valid)
+       return PGSR_NEXT_NO_IO;
+   else
+       return PGSR_NEXT_IO;
+}
+
+static void
+heap_pgsr_release(uintptr_t pgsr_private, uintptr_t read_private)
+{
+   HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+   Buffer buf = (Buffer) read_private;
+
+   ereport(DEBUG2,
+           errmsg("pgsr %s: releasing buf %d",
+                  NameStr(scan->rs_base.rs_rd->rd_rel->relname),
+                  buf),
+           errhidestmt(true),
+           errhidecontext(true));
+
+   Assert(BufferIsValid(buf));
+   ReleaseBuffer(buf);
+}
+
+static PgStreamingRead *
+heap_pgsr_single_alloc(HeapScanDesc scan)
+{
+   int iodepth = Max(Min(128, NBuffers / 128), 1);
+
+   return pg_streaming_read_alloc(iodepth, (uintptr_t) scan,
+                                  heap_pgsr_next_single,
+                                  heap_pgsr_release);
+}
+
+static PgStreamingRead *
+heap_pgsr_parallel_alloc(HeapScanDesc scan)
+{
+   int iodepth = Max(Min(128, NBuffers / 128), 1);
+
+   return pg_streaming_read_alloc(iodepth, (uintptr_t) scan,
+                                  heap_pgsr_next_parallel,
+                                  heap_pgsr_release);
+}
+
+
 /* ----------------
  *     initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -318,6 +437,26 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
     */
    if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN)
        pgstat_count_heap_scan(scan->rs_base.rs_rd);
+
+   scan->rs_prefetch_block = InvalidBlockNumber;
+   if (scan->pgsr)
+   {
+       pg_streaming_read_free(scan->pgsr);
+       scan->pgsr = NULL;
+   }
+
+   /*
+    * FIXME: This probably should be done in the !rs_inited blocks instead.
+    */
+   scan->pgsr = NULL;
+   if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) &&
+       (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN))
+   {
+       if (scan->rs_base.rs_parallel)
+           scan->pgsr = heap_pgsr_parallel_alloc(scan);
+       else
+           scan->pgsr = heap_pgsr_single_alloc(scan);
+   }
 }
 
 /*
@@ -350,7 +489,7 @@ heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlk
  * which tuples on the page are visible.
  */
 void
-heapgetpage(TableScanDesc sscan, BlockNumber page)
+heapgetpage(TableScanDesc sscan, BlockNumber page, Buffer pgsr_buffer)
 {
    HeapScanDesc scan = (HeapScanDesc) sscan;
    Buffer      buffer;
@@ -378,9 +517,19 @@ heapgetpage(TableScanDesc sscan, BlockNumber page)
     */
    CHECK_FOR_INTERRUPTS();
 
-   /* read page using selected strategy */
-   scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, page,
-                                      RBM_NORMAL, scan->rs_strategy);
+   if (BufferIsValid(pgsr_buffer))
+   {
+       Assert(scan->pgsr);
+       scan->rs_cbuf = pgsr_buffer;
+   }
+   else
+   {
+       Assert(!scan->pgsr);
+
+       /* read page using selected strategy */
+       scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, page,
+                                          RBM_NORMAL, scan->rs_strategy);
+   }
    scan->rs_cblock = page;
 
    if (!(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE))
@@ -501,6 +650,7 @@ heapgettup(HeapScanDesc scan,
    Snapshot    snapshot = scan->rs_base.rs_snapshot;
    bool        backward = ScanDirectionIsBackward(dir);
    BlockNumber page;
+   Buffer      pgsr_buf = InvalidBuffer;
    bool        finished;
    Page        dp;
    int         lines;
@@ -513,8 +663,15 @@ heapgettup(HeapScanDesc scan,
     */
    if (ScanDirectionIsForward(dir))
    {
+       /*
+        * FIXME: This logic badly needs to be consolidated into one
+        * place. Instead of having logic at the top and bottom of heapgettup
+        * and heapgettup_pagemode() each.
+        */
        if (!scan->rs_inited)
        {
+           scan->rs_inited = true;
+
            /*
             * return null immediately if relation is empty
             */
@@ -524,7 +681,38 @@ heapgettup(HeapScanDesc scan,
                tuple->t_data = NULL;
                return;
            }
-           if (scan->rs_base.rs_parallel != NULL)
+           if (scan->pgsr)
+           {
+               ParallelBlockTableScanDesc pbscan =
+               (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+               ParallelBlockTableScanWorker pbscanwork =
+               (ParallelBlockTableScanWorker) scan->rs_base.rs_private;
+
+               if (scan->rs_base.rs_parallel != NULL)
+               {
+                   table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, pbscanwork, pbscan);
+               }
+
+               pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
+
+               /* Other processes might have already finished the scan. */
+               if (scan->rs_base.rs_parallel != NULL)
+               {
+                   if (!BufferIsValid(pgsr_buf))
+                   {
+                       Assert(!BufferIsValid(scan->rs_cbuf));
+                       tuple->t_data = NULL;
+                       return;
+                   }
+                   page = BufferGetBlockNumber(pgsr_buf);
+               }
+               else
+               {
+                   Assert(BufferGetBlockNumber(pgsr_buf) == scan->rs_startblock);
+                   page = scan->rs_startblock; /* crosscheck */
+               }
+           }
+           else if (scan->rs_base.rs_parallel != NULL)
            {
                ParallelBlockTableScanDesc pbscan =
                (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
@@ -547,9 +735,9 @@ heapgettup(HeapScanDesc scan,
            }
            else
                page = scan->rs_startblock; /* first page */
-           heapgetpage((TableScanDesc) scan, page);
+
+           heapgetpage((TableScanDesc) scan, page, pgsr_buf);
            lineoff = FirstOffsetNumber;    /* first offnum */
-           scan->rs_inited = true;
        }
        else
        {
@@ -573,6 +761,12 @@ heapgettup(HeapScanDesc scan,
        /* backward parallel scan not supported */
        Assert(scan->rs_base.rs_parallel == NULL);
 
+       if (scan->pgsr)
+       {
+           pg_streaming_read_free(scan->pgsr);
+           scan->pgsr = NULL;
+       }
+
        if (!scan->rs_inited)
        {
            /*
@@ -597,7 +791,7 @@ heapgettup(HeapScanDesc scan,
                page = scan->rs_startblock - 1;
            else
                page = scan->rs_nblocks - 1;
-           heapgetpage((TableScanDesc) scan, page);
+           heapgetpage((TableScanDesc) scan, page, InvalidBuffer);
        }
        else
        {
@@ -639,7 +833,7 @@ heapgettup(HeapScanDesc scan,
 
        page = ItemPointerGetBlockNumber(&(tuple->t_self));
        if (page != scan->rs_cblock)
-           heapgetpage((TableScanDesc) scan, page);
+           heapgetpage((TableScanDesc) scan, page, InvalidBuffer);
 
        /* Since the tuple was previously fetched, needn't lock page here */
        dp = BufferGetPage(scan->rs_cbuf);
@@ -714,6 +908,7 @@ heapgettup(HeapScanDesc scan,
         * it's time to move to the next.
         */
        LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+       pgsr_buf = InvalidBuffer;
 
        /*
         * advance to next/prior page and detect end of scan
@@ -726,6 +921,20 @@ heapgettup(HeapScanDesc scan,
                page = scan->rs_nblocks;
            page--;
        }
+       else if (scan->pgsr)
+       {
+           pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
+
+           if (BufferIsValid(pgsr_buf))
+           {
+               page = BufferGetBlockNumber(pgsr_buf);
+               /* FIXME: add crosscheck for block number */
+               /* FIXME: ss_report_location */
+               finished = false;
+           }
+           else
+               finished = true;
+       }
        else if (scan->rs_base.rs_parallel != NULL)
        {
            ParallelBlockTableScanDesc pbscan =
@@ -775,7 +984,7 @@ heapgettup(HeapScanDesc scan,
            return;
        }
 
-       heapgetpage((TableScanDesc) scan, page);
+       heapgetpage((TableScanDesc) scan, page, pgsr_buf);
 
        LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
@@ -818,6 +1027,7 @@ heapgettup_pagemode(HeapScanDesc scan,
    HeapTuple   tuple = &(scan->rs_ctup);
    bool        backward = ScanDirectionIsBackward(dir);
    BlockNumber page;
+   Buffer      pgsr_buf = InvalidBuffer;
    bool        finished;
    Page        dp;
    int         lines;
@@ -833,6 +1043,8 @@ heapgettup_pagemode(HeapScanDesc scan,
    {
        if (!scan->rs_inited)
        {
+           scan->rs_inited = true;
+
            /*
             * return null immediately if relation is empty
             */
@@ -842,7 +1054,38 @@ heapgettup_pagemode(HeapScanDesc scan,
                tuple->t_data = NULL;
                return;
            }
-           if (scan->rs_base.rs_parallel != NULL)
+           if (scan->pgsr)
+           {
+               ParallelBlockTableScanDesc pbscan =
+               (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+               ParallelBlockTableScanWorker pbscanwork =
+               (ParallelBlockTableScanWorker) scan->rs_base.rs_private;
+
+               if (scan->rs_base.rs_parallel != NULL)
+               {
+                   table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, pbscanwork, pbscan);
+               }
+
+               pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
+
+               /* Other processes might have already finished the scan. */
+               if (scan->rs_base.rs_parallel != NULL)
+               {
+                   if (!BufferIsValid(pgsr_buf))
+                   {
+                       Assert(!BufferIsValid(scan->rs_cbuf));
+                       tuple->t_data = NULL;
+                       return;
+                   }
+                   page = BufferGetBlockNumber(pgsr_buf);
+               }
+               else
+               {
+                   Assert(BufferGetBlockNumber(pgsr_buf) == scan->rs_startblock);
+                   page = scan->rs_startblock; /* crosscheck */
+               }
+           }
+           else if (scan->rs_base.rs_parallel != NULL)
            {
                ParallelBlockTableScanDesc pbscan =
                (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
@@ -865,9 +1108,8 @@ heapgettup_pagemode(HeapScanDesc scan,
            }
            else
                page = scan->rs_startblock; /* first page */
-           heapgetpage((TableScanDesc) scan, page);
+           heapgetpage((TableScanDesc) scan, page, pgsr_buf);
            lineindex = 0;
-           scan->rs_inited = true;
        }
        else
        {
@@ -888,6 +1130,12 @@ heapgettup_pagemode(HeapScanDesc scan,
        /* backward parallel scan not supported */
        Assert(scan->rs_base.rs_parallel == NULL);
 
+       if (scan->pgsr)
+       {
+           pg_streaming_read_free(scan->pgsr);
+           scan->pgsr = NULL;
+       }
+
        if (!scan->rs_inited)
        {
            /*
@@ -912,7 +1160,7 @@ heapgettup_pagemode(HeapScanDesc scan,
                page = scan->rs_startblock - 1;
            else
                page = scan->rs_nblocks - 1;
-           heapgetpage((TableScanDesc) scan, page);
+           heapgetpage((TableScanDesc) scan, page, InvalidBuffer);
        }
        else
        {
@@ -951,7 +1199,7 @@ heapgettup_pagemode(HeapScanDesc scan,
 
        page = ItemPointerGetBlockNumber(&(tuple->t_self));
        if (page != scan->rs_cblock)
-           heapgetpage((TableScanDesc) scan, page);
+           heapgetpage((TableScanDesc) scan, page, InvalidBuffer);
 
        /* Since the tuple was previously fetched, needn't lock page here */
        dp = BufferGetPage(scan->rs_cbuf);
@@ -1017,6 +1265,8 @@ heapgettup_pagemode(HeapScanDesc scan,
                ++lineindex;
        }
 
+       pgsr_buf = InvalidBuffer;
+
        /*
         * if we get here, it means we've exhausted the items on this page and
         * it's time to move to the next.
@@ -1029,6 +1279,20 @@ heapgettup_pagemode(HeapScanDesc scan,
                page = scan->rs_nblocks;
            page--;
        }
+       else if (scan->pgsr)
+       {
+           pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
+
+           if (BufferIsValid(pgsr_buf))
+           {
+               page = BufferGetBlockNumber(pgsr_buf);
+               /* FIXME: add crosscheck for block number */
+               /* FIXME: ss_report_location */
+               finished = false;
+           }
+           else
+               finished = true;
+       }
        else if (scan->rs_base.rs_parallel != NULL)
        {
            ParallelBlockTableScanDesc pbscan =
@@ -1078,7 +1342,7 @@ heapgettup_pagemode(HeapScanDesc scan,
            return;
        }
 
-       heapgetpage((TableScanDesc) scan, page);
+       heapgetpage((TableScanDesc) scan, page, pgsr_buf);
 
        dp = BufferGetPage(scan->rs_cbuf);
        TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
@@ -1175,6 +1439,8 @@ heap_beginscan(Relation relation, Snapshot snapshot,
        palloc(sizeof(ParallelBlockTableScanWorkerData));
    scan->rs_strategy = NULL;   /* set in initscan */
 
+   scan->pgsr = NULL;
+
    /*
     * Disable page-at-a-time mode if it's not a MVCC-safe snapshot.
     */
@@ -1272,6 +1538,12 @@ heap_endscan(TableScanDesc sscan)
    if (BufferIsValid(scan->rs_cbuf))
        ReleaseBuffer(scan->rs_cbuf);
 
+   if (scan->pgsr)
+   {
+       pg_streaming_read_free(scan->pgsr);
+       scan->pgsr = NULL;
+   }
+
    /*
     * decrement relation reference count and free scan descriptor storage
     */
index 10ddde4ecf949330d8a79329775e96884526bec0..d3fe83d79868f03381d7ac8916a0f6e1a570c0ce 100644 (file)
@@ -2328,7 +2328,7 @@ heapam_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate)
        return false;
    }
 
-   heapgetpage(scan, blockno);
+   heapgetpage(scan, blockno, InvalidBuffer);
    hscan->rs_inited = true;
 
    return true;
index 95cb1e346b2d66cb66861d94271d786789947d80..d4674610385461f3edbe1f57507e85b96f891187 100644 (file)
@@ -57,6 +57,7 @@ typedef struct HeapScanDescData
    /* scan current state */
    bool        rs_inited;      /* false = scan not init'd yet */
    BlockNumber rs_cblock;      /* current block # in scan, if any */
+   BlockNumber rs_prefetch_block;      /* block being prefetched */
    Buffer      rs_cbuf;        /* current buffer in scan, if any */
    /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
 
@@ -65,6 +66,8 @@ typedef struct HeapScanDescData
 
    HeapTupleData rs_ctup;      /* current tuple in scan, if any */
 
+   struct PgStreamingRead *pgsr;
+
    /* these fields only used in page-at-a-time mode and for bitmap scans */
    int         rs_cindex;      /* current tuple's index in vistuples */
    int         rs_ntuples;     /* number of visible tuples on page */
@@ -114,7 +117,7 @@ extern TableScanDesc heap_beginscan(Relation relation, Snapshot snapshot,
                                    uint32 flags);
 extern void heap_setscanlimits(TableScanDesc scan, BlockNumber startBlk,
                               BlockNumber numBlks);
-extern void heapgetpage(TableScanDesc scan, BlockNumber page);
+extern void heapgetpage(TableScanDesc scan, BlockNumber page, Buffer buffer);
 extern void heap_rescan(TableScanDesc scan, ScanKey key, bool set_params,
                        bool allow_strat, bool allow_sync, bool allow_pagemode);
 extern void heap_endscan(TableScanDesc scan);