read_stream: Introduce and use optional batchmode support
authorAndres Freund <[email protected]>
Sun, 30 Mar 2025 22:30:36 +0000 (18:30 -0400)
committerAndres Freund <[email protected]>
Sun, 30 Mar 2025 22:36:41 +0000 (18:36 -0400)
Submitting IO in larger batches can be more efficient than doing so
one-by-one, particularly for many small reads. It does, however, require
the ReadStreamBlockNumberCB callback to abide by the restrictions of AIO
batching (c.f. pgaio_enter_batchmode()). Basically, the callback may not:
a) block without first calling pgaio_submit_staged(), unless a
   to-be-waited-on lock cannot be part of a deadlock, e.g. because it is
   never held while waiting for IO.

b) directly or indirectly start another batch pgaio_enter_batchmode()

As this requires care and is nontrivial in some cases, batching is only
used with explicit opt-in.

This patch adds an explicit flag (READ_STREAM_USE_BATCHING) to read_stream and
uses it where appropriate.

There are two cases where batching would likely be beneficial, but where we
aren't using it yet:

1) bitmap heap scans, because the callback reads the VM

   This should soon be solved, because we are planning to remove the use of
   the VM, due to that not being sound.

2) The first phase of heap vacuum

   This could be made to support batchmode, but would require some care.

Reviewed-by: Noah Misch <[email protected]>
Reviewed-by: Thomas Munro <[email protected]>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt

12 files changed:
contrib/amcheck/verify_heapam.c
contrib/pg_prewarm/pg_prewarm.c
contrib/pg_visibility/pg_visibility.c
src/backend/access/gist/gistvacuum.c
src/backend/access/heap/heapam.c
src/backend/access/heap/vacuumlazy.c
src/backend/access/nbtree/nbtree.c
src/backend/access/spgist/spgvacuum.c
src/backend/commands/analyze.c
src/backend/storage/aio/read_stream.c
src/backend/storage/buffer/bufmgr.c
src/include/storage/read_stream.h

index 9e4d558436bac50d9cf78607a87a05b9c9b6976a..1970fc8620aeb2ccd4e355f476c68e4f2e66add0 100644 (file)
@@ -447,12 +447,23 @@ verify_heapam(PG_FUNCTION_ARGS)
 
    if (skip_option == SKIP_PAGES_NONE)
    {
+       /*
+        * It is safe to use batchmode as block_range_read_stream_cb takes no
+        * locks.
+        */
        stream_cb = block_range_read_stream_cb;
-       stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_FULL;
+       stream_flags = READ_STREAM_SEQUENTIAL |
+           READ_STREAM_FULL |
+           READ_STREAM_USE_BATCHING;
        stream_data = &stream_skip_data.range;
    }
    else
    {
+       /*
+        * It would not be safe to naively use use batchmode, as
+        * heapcheck_read_stream_next_unskippable takes locks. It shouldn't be
+        * too hard to convert though.
+        */
        stream_cb = heapcheck_read_stream_next_unskippable;
        stream_flags = READ_STREAM_DEFAULT;
        stream_data = &stream_skip_data;
index 63faf43d0bf3e12f835d9b3fa772eead26642d3f..c0efb530c4e5f9b1dc017375be960b959f980516 100644 (file)
@@ -198,7 +198,12 @@ pg_prewarm(PG_FUNCTION_ARGS)
        p.current_blocknum = first_block;
        p.last_exclusive = last_block + 1;
 
-       stream = read_stream_begin_relation(READ_STREAM_FULL,
+       /*
+        * It is safe to use batchmode as block_range_read_stream_cb takes no
+        * locks.
+        */
+       stream = read_stream_begin_relation(READ_STREAM_FULL |
+                                           READ_STREAM_USE_BATCHING,
                                            NULL,
                                            rel,
                                            forkNumber,
index ca91819852c747f6017046605c50e0880358d176..d79ef35006bfab6a4c98962715af1c3f97b576e7 100644 (file)
@@ -526,7 +526,13 @@ collect_visibility_data(Oid relid, bool include_pd)
    {
        p.current_blocknum = 0;
        p.last_exclusive = nblocks;
-       stream = read_stream_begin_relation(READ_STREAM_FULL,
+
+       /*
+        * It is safe to use batchmode as block_range_read_stream_cb takes no
+        * locks.
+        */
+       stream = read_stream_begin_relation(READ_STREAM_FULL |
+                                           READ_STREAM_USE_BATCHING,
                                            bstrategy,
                                            rel,
                                            MAIN_FORKNUM,
index 20b1bb5dbacedbaec6f7cedf0ca32b827157f71f..ce9d78d78d650f4374dcc8882381cfd46bfa3120 100644 (file)
@@ -210,7 +210,13 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
    needLock = !RELATION_IS_LOCAL(rel);
 
    p.current_blocknum = GIST_ROOT_BLKNO;
-   stream = read_stream_begin_relation(READ_STREAM_FULL,
+
+   /*
+    * It is safe to use batchmode as block_range_read_stream_cb takes no
+    * locks.
+    */
+   stream = read_stream_begin_relation(READ_STREAM_FULL |
+                                       READ_STREAM_USE_BATCHING,
                                        info->strategy,
                                        rel,
                                        MAIN_FORKNUM,
index b12b583c4d9004718dda5cbb6211eb17f89155c1..6e433db039ebb8b323a5cf7c25e67962a6c5f671 100644 (file)
@@ -1206,7 +1206,15 @@ heap_beginscan(Relation relation, Snapshot snapshot,
        else
            cb = heap_scan_stream_read_next_serial;
 
-       scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+       /* ---
+        * It is safe to use batchmode as the only locks taken by `cb`
+        * are never taken while waiting for IO:
+        * - SyncScanLock is used in the non-parallel case
+        * - in the parallel case, only spinlocks and atomics are used
+        * ---
+        */
+       scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL |
+                                                         READ_STREAM_USE_BATCHING,
                                                          scan->rs_strategy,
                                                          scan->rs_base.rs_rd,
                                                          MAIN_FORKNUM,
@@ -1216,6 +1224,12 @@ heap_beginscan(Relation relation, Snapshot snapshot,
    }
    else if (scan->rs_base.rs_flags & SO_TYPE_BITMAPSCAN)
    {
+       /*
+        * Currently we can't trivially use batching, due to the
+        * VM_ALL_VISIBLE check in bitmapheap_stream_read_next. While that
+        * could be made safe, we are about to remove the all-visible logic
+        * from bitmap scans due to its unsoundness.
+        */
        scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
                                                          scan->rs_strategy,
                                                          scan->rs_base.rs_rd,
index 6d287b38cf552609dc336d3fd21d211532f82549..f28326bad0951f5a600350e546508956f3e859a1 100644 (file)
@@ -1225,7 +1225,12 @@ lazy_scan_heap(LVRelState *vacrel)
    vacrel->next_unskippable_eager_scanned = false;
    vacrel->next_unskippable_vmbuffer = InvalidBuffer;
 
-   /* Set up the read stream for vacuum's first pass through the heap */
+   /*
+    * Set up the read stream for vacuum's first pass through the heap.
+    *
+    * This could be made safe for READ_STREAM_USE_BATCHING, but only with
+    * explicit work in heap_vac_scan_next_block.
+    */
    stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
                                        vacrel->bstrategy,
                                        vacrel->rel,
@@ -2669,6 +2674,8 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
  * Read stream callback for vacuum's third phase (second pass over the heap).
  * Gets the next block from the TID store and returns it or InvalidBlockNumber
  * if there are no further blocks to vacuum.
+ *
+ * NB: Assumed to be safe to use with READ_STREAM_USE_BATCHING.
  */
 static BlockNumber
 vacuum_reap_lp_read_stream_next(ReadStream *stream,
@@ -2732,8 +2739,16 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 
    iter = TidStoreBeginIterate(vacrel->dead_items);
 
-   /* Set up the read stream for vacuum's second pass through the heap */
-   stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
+   /*
+    * Set up the read stream for vacuum's second pass through the heap.
+    *
+    * It is safe to use batchmode, as vacuum_reap_lp_read_stream_next() does
+    * not need to wait for IO and does not perform locking. Once we support
+    * parallelism it should still be fine, as presumably the holder of locks
+    * would never be blocked by IO while holding the lock.
+    */
+   stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+                                       READ_STREAM_USE_BATCHING,
                                        vacrel->bstrategy,
                                        vacrel->rel,
                                        MAIN_FORKNUM,
index 80b04d6ca2ac2fd0537bbf3088d960de0c87400c..4a0bf069f995474b511a1bd9d69e05e17b45abe2 100644 (file)
@@ -1064,7 +1064,13 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
    needLock = !RELATION_IS_LOCAL(rel);
 
    p.current_blocknum = BTREE_METAPAGE + 1;
-   stream = read_stream_begin_relation(READ_STREAM_FULL,
+
+   /*
+    * It is safe to use batchmode as block_range_read_stream_cb takes no
+    * locks.
+    */
+   stream = read_stream_begin_relation(READ_STREAM_FULL |
+                                       READ_STREAM_USE_BATCHING,
                                        info->strategy,
                                        rel,
                                        MAIN_FORKNUM,
index 77deb226b7e60dd08ecbd9e2491b20bed59196d9..b3df2d89074956056ecf50fbd2caf62406352498 100644 (file)
@@ -822,7 +822,13 @@ spgvacuumscan(spgBulkDeleteState *bds)
    /* We can skip locking for new or temp relations */
    needLock = !RELATION_IS_LOCAL(index);
    p.current_blocknum = SPGIST_METAPAGE_BLKNO + 1;
-   stream = read_stream_begin_relation(READ_STREAM_FULL,
+
+   /*
+    * It is safe to use batchmode as block_range_read_stream_cb takes no
+    * locks.
+    */
+   stream = read_stream_begin_relation(READ_STREAM_FULL |
+                                       READ_STREAM_USE_BATCHING,
                                        bds->info->strategy,
                                        index,
                                        MAIN_FORKNUM,
index ca76c0d2668055e43f9c69798041262f49b7820e..4fffb76e5573596ce0648fdca2468c5202df6dfa 100644 (file)
@@ -1237,7 +1237,12 @@ acquire_sample_rows(Relation onerel, int elevel,
    scan = table_beginscan_analyze(onerel);
    slot = table_slot_create(onerel, NULL);
 
-   stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
+   /*
+    * It is safe to use batching, as block_sampling_read_stream_next never
+    * blocks.
+    */
+   stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+                                       READ_STREAM_USE_BATCHING,
                                        vac_strategy,
                                        scan->rs_rd,
                                        MAIN_FORKNUM,
index 26e5dfe77db5ef67bb848e074d510ad1ab20ff91..36c54fb695b0533ec6608819cd4ff0debc95cc68 100644 (file)
@@ -102,6 +102,7 @@ struct ReadStream
    int16       initialized_buffers;
    int         read_buffers_flags;
    bool        sync_mode;      /* using io_method=sync */
+   bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
    bool        advice_enabled;
    bool        temporary;
 
@@ -403,6 +404,15 @@ read_stream_start_pending_read(ReadStream *stream)
 static void
 read_stream_look_ahead(ReadStream *stream)
 {
+   /*
+    * Allow amortizing the cost of submitting IO over multiple IOs. This
+    * requires that we don't do any operations that could lead to a deadlock
+    * with staged-but-unsubmitted IO. The callback needs to opt-in to being
+    * careful.
+    */
+   if (stream->batch_mode)
+       pgaio_enter_batchmode();
+
    while (stream->ios_in_progress < stream->max_ios &&
           stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
    {
@@ -450,6 +460,8 @@ read_stream_look_ahead(ReadStream *stream)
            {
                /* We've hit the buffer or I/O limit.  Rewind and stop here. */
                read_stream_unget_block(stream, blocknum);
+               if (stream->batch_mode)
+                   pgaio_exit_batchmode();
                return;
            }
        }
@@ -484,6 +496,9 @@ read_stream_look_ahead(ReadStream *stream)
     * time.
     */
    Assert(stream->pinned_buffers > 0 || stream->distance == 0);
+
+   if (stream->batch_mode)
+       pgaio_exit_batchmode();
 }
 
 /*
@@ -617,6 +632,7 @@ read_stream_begin_impl(int flags,
            MAXALIGN(&stream->ios[Max(1, max_ios)]);
 
    stream->sync_mode = io_method == IOMETHOD_SYNC;
+   stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
 
 #ifdef USE_PREFETCH
 
index 6a18e334809557b83c3496cea56af1dbcc2c899a..f9681d09e1e74e41efad0f55934571cbd15fc7d1 100644 (file)
@@ -5100,7 +5100,13 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator,
    p.current_blocknum = 0;
    p.last_exclusive = nblocks;
    src_smgr = smgropen(srclocator, INVALID_PROC_NUMBER);
-   src_stream = read_stream_begin_smgr_relation(READ_STREAM_FULL,
+
+   /*
+    * It is safe to use batchmode as block_range_read_stream_cb takes no
+    * locks.
+    */
+   src_stream = read_stream_begin_smgr_relation(READ_STREAM_FULL |
+                                                READ_STREAM_USE_BATCHING,
                                                 bstrategy_src,
                                                 src_smgr,
                                                 permanent ? RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED,
index c11d8ce3300cdb523a9532a4c67b60e6799ac229..9b0d65161d02cdf99d424a4ccadf02883d3cc956 100644 (file)
  */
 #define READ_STREAM_FULL 0x04
 
+/* ---
+ * Opt-in to using AIO batchmode.
+ *
+ * Submitting IO in larger batches can be more efficient than doing so
+ * one-by-one, particularly for many small reads. It does, however, require
+ * the ReadStreamBlockNumberCB callback to abide by the restrictions of AIO
+ * batching (c.f. pgaio_enter_batchmode()). Basically, the callback may not:
+ *
+ * a) block without first calling pgaio_submit_staged(), unless a
+ *    to-be-waited-on lock cannot be part of a deadlock, e.g. because it is
+ *    never held while waiting for IO.
+ *
+ * b) start another batch (without first exiting batchmode and re-entering
+ *    before returning)
+ *
+ * As this requires care and is nontrivial in some cases, batching is only
+ * used with explicit opt-in.
+ * ---
+ */
+#define READ_STREAM_USE_BATCHING 0x08
+
 struct ReadStream;
 typedef struct ReadStream ReadStream;