* ----------------------------------------------------------------
*/
+/*
+ * Streaming read API callback for parallel sequential scans. Returns the next
+ * block the caller wants from the read stream or InvalidBlockNumber when done.
+ */
+static BlockNumber
+heap_scan_stream_read_next_parallel(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ HeapScanDesc scan = (HeapScanDesc) callback_private_data;
+
+ Assert(ScanDirectionIsForward(scan->rs_dir));
+ Assert(scan->rs_base.rs_parallel);
+
+ if (unlikely(!scan->rs_inited))
+ {
+ /* parallel scan */
+ table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+ scan->rs_parallelworkerdata,
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
+
+ /* may return InvalidBlockNumber if there are no more blocks */
+ scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ scan->rs_parallelworkerdata,
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
+ scan->rs_inited = true;
+ }
+ else
+ {
+ scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
+ scan->rs_base.rs_parallel);
+ }
+
+ return scan->rs_prefetch_block;
+}
+
+/*
+ * Streaming read API callback for serial sequential and TID range scans.
+ * Returns the next block the caller wants from the read stream or
+ * InvalidBlockNumber when done.
+ */
+static BlockNumber
+heap_scan_stream_read_next_serial(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ HeapScanDesc scan = (HeapScanDesc) callback_private_data;
+
+ if (unlikely(!scan->rs_inited))
+ {
+ scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir);
+ scan->rs_inited = true;
+ }
+ else
+ scan->rs_prefetch_block = heapgettup_advance_block(scan,
+ scan->rs_prefetch_block,
+ scan->rs_dir);
+
+ return scan->rs_prefetch_block;
+}
+
/* ----------------
* initscan - scan code common to heap_beginscan and heap_rescan
* ----------------
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
+ /*
+ * Initialize to ForwardScanDirection because it is most common and
+ * because heap scans go forward before going backward (e.g. CURSORs).
+ */
+ scan->rs_dir = ForwardScanDirection;
+ scan->rs_prefetch_block = InvalidBlockNumber;
+
/* page-at-a-time fields are always invalid when not rs_inited */
/*
/*
* heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
*
- * Read the next block of the scan relation into a buffer and pin that buffer
- * before saving it in the scan descriptor.
+ * Read the next block of the scan relation from the read stream and save it
+ * in the scan descriptor. It is already pinned.
*/
static inline void
heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
{
+ Assert(scan->rs_read_stream);
+
/* release previous scan buffer, if any */
if (BufferIsValid(scan->rs_cbuf))
{
*/
CHECK_FOR_INTERRUPTS();
- if (unlikely(!scan->rs_inited))
+ /*
+ * If the scan direction is changing, reset the prefetch block to the
+ * current block. Otherwise, we will incorrectly prefetch the blocks
+ * between the prefetch block and the current block again before
+ * prefetching blocks in the new, correct scan direction.
+ */
+ if (unlikely(scan->rs_dir != dir))
{
- scan->rs_cblock = heapgettup_initial_block(scan, dir);
+ scan->rs_prefetch_block = scan->rs_cblock;
+ read_stream_reset(scan->rs_read_stream);
+ }
- /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
- Assert(scan->rs_cblock != InvalidBlockNumber ||
- !BufferIsValid(scan->rs_cbuf));
+ scan->rs_dir = dir;
- scan->rs_inited = true;
- }
- else
- scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock,
- dir);
-
- /* read block if valid */
- if (BlockNumberIsValid(scan->rs_cblock))
- scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
- scan->rs_cblock, RBM_NORMAL,
- scan->rs_strategy);
+ scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
+ if (BufferIsValid(scan->rs_cbuf))
+ scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
}
/*
heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
{
Assert(!scan->rs_inited);
+ Assert(scan->rs_base.rs_parallel == NULL);
/* When there are no pages to scan, return InvalidBlockNumber */
if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
if (ScanDirectionIsForward(dir))
{
- /* serial scan */
- if (scan->rs_base.rs_parallel == NULL)
- return scan->rs_startblock;
- else
- {
- /* parallel scan */
- table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
- scan->rs_parallelworkerdata,
- (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
-
- /* may return InvalidBlockNumber if there are no more blocks */
- return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
- scan->rs_parallelworkerdata,
- (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
- }
+ return scan->rs_startblock;
}
else
{
- /* backward parallel scan not supported */
- Assert(scan->rs_base.rs_parallel == NULL);
-
/*
* Disable reporting to syncscan logic in a backwards scan; it's not
* very likely anyone else is doing the same thing at the same time,
static inline BlockNumber
heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir)
{
- if (ScanDirectionIsForward(dir))
+ Assert(scan->rs_base.rs_parallel == NULL);
+
+ if (likely(ScanDirectionIsForward(dir)))
{
- if (scan->rs_base.rs_parallel == NULL)
- {
- block++;
+ block++;
- /* wrap back to the start of the heap */
- if (block >= scan->rs_nblocks)
- block = 0;
+ /* wrap back to the start of the heap */
+ if (block >= scan->rs_nblocks)
+ block = 0;
- /*
- * Report our new scan position for synchronization purposes. We
- * don't do that when moving backwards, however. That would just
- * mess up any other forward-moving scanners.
- *
- * Note: we do this before checking for end of scan so that the
- * final state of the position hint is back at the start of the
- * rel. That's not strictly necessary, but otherwise when you run
- * the same query multiple times the starting position would shift
- * a little bit backwards on every invocation, which is confusing.
- * We don't guarantee any specific ordering in general, though.
- */
- if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
- ss_report_location(scan->rs_base.rs_rd, block);
-
- /* we're done if we're back at where we started */
- if (block == scan->rs_startblock)
- return InvalidBlockNumber;
+ /*
+ * Report our new scan position for synchronization purposes. We don't
+ * do that when moving backwards, however. That would just mess up any
+ * other forward-moving scanners.
+ *
+ * Note: we do this before checking for end of scan so that the final
+ * state of the position hint is back at the start of the rel. That's
+ * not strictly necessary, but otherwise when you run the same query
+ * multiple times the starting position would shift a little bit
+ * backwards on every invocation, which is confusing. We don't
+ * guarantee any specific ordering in general, though.
+ */
+ if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
+ ss_report_location(scan->rs_base.rs_rd, block);
- /* check if the limit imposed by heap_setscanlimits() is met */
- if (scan->rs_numblocks != InvalidBlockNumber)
- {
- if (--scan->rs_numblocks == 0)
- return InvalidBlockNumber;
- }
+ /* we're done if we're back at where we started */
+ if (block == scan->rs_startblock)
+ return InvalidBlockNumber;
- return block;
- }
- else
+ /* check if the limit imposed by heap_setscanlimits() is met */
+ if (scan->rs_numblocks != InvalidBlockNumber)
{
- return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
- scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
- scan->rs_base.rs_parallel);
+ if (--scan->rs_numblocks == 0)
+ return InvalidBlockNumber;
}
+
+ return block;
}
else
{
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
+ scan->rs_prefetch_block = InvalidBlockNumber;
tuple->t_data = NULL;
scan->rs_inited = false;
}
ReleaseBuffer(scan->rs_cbuf);
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
+ scan->rs_prefetch_block = InvalidBlockNumber;
tuple->t_data = NULL;
scan->rs_inited = false;
}
initscan(scan, key, false);
+ scan->rs_read_stream = NULL;
+
+ /*
+ * Set up a read stream for sequential scans and TID range scans. This
+ * should be done after initscan() because initscan() allocates the
+ * BufferAccessStrategy object passed to the streaming read API.
+ */
+ if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
+ scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
+ {
+ ReadStreamBlockNumberCB cb;
+
+ if (scan->rs_base.rs_parallel)
+ cb = heap_scan_stream_read_next_parallel;
+ else
+ cb = heap_scan_stream_read_next_serial;
+
+ scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+ scan->rs_strategy,
+ scan->rs_base.rs_rd,
+ MAIN_FORKNUM,
+ cb,
+ scan,
+ 0);
+ }
+
+
return (TableScanDesc) scan;
}
Assert(scan->rs_empty_tuples_pending == 0);
+ /*
+ * The read stream is reset on rescan. This must be done before
+ * initscan(), as some state referred to by read_stream_reset() is reset
+ * in initscan().
+ */
+ if (scan->rs_read_stream)
+ read_stream_reset(scan->rs_read_stream);
+
/*
* reinitialize scan descriptor
*/
Assert(scan->rs_empty_tuples_pending == 0);
+ /*
+ * Must free the read stream before freeing the BufferAccessStrategy.
+ */
+ if (scan->rs_read_stream)
+ read_stream_end(scan->rs_read_stream);
+
/*
* decrement relation reference count and free scan descriptor storage
*/