#include "miscadmin.h"
#include "pgstat.h"
#include "port/atomics.h"
+#include "storage/aio.h"
+#include "storage/block.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
* ----------------------------------------------------------------
*/
+static PgStreamingReadNextStatus
+heap_pgsr_next_single(uintptr_t pgsr_private, PgAioInProgress *aio, uintptr_t *read_private)
+{
+ HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+ Buffer buf;
+ bool already_valid;
+ BlockNumber blockno;
+
+ Assert(scan->rs_inited);
+ Assert(!scan->rs_base.rs_parallel);
+ Assert(scan->rs_nblocks > 0);
+
+ if (scan->rs_prefetch_block == InvalidBlockNumber)
+ {
+ scan->rs_prefetch_block = blockno = scan->rs_startblock;
+ }
+ else
+ {
+ blockno = ++scan->rs_prefetch_block;
+ if (blockno >= scan->rs_nblocks)
+ scan->rs_prefetch_block = blockno = 0;
+ if (blockno == scan->rs_startblock ||
+ (scan->rs_numblocks != InvalidBlockNumber &&
+ --scan->rs_numblocks == 0))
+ {
+ *read_private = 0;
+ return PGSR_NEXT_END;
+ }
+ }
+
+ buf = ReadBufferAsync(scan->rs_base.rs_rd, MAIN_FORKNUM, blockno,
+ RBM_NORMAL, scan->rs_strategy, &already_valid,
+ &aio);
+
+ *read_private = (uintptr_t) buf;
+
+ if (already_valid)
+ return PGSR_NEXT_NO_IO;
+ else
+ return PGSR_NEXT_IO;
+}
+
+static PgStreamingReadNextStatus
+heap_pgsr_next_parallel(uintptr_t pgsr_private, PgAioInProgress *aio, uintptr_t *read_private)
+{
+ HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWorker pbscanwork =
+ (ParallelBlockTableScanWorker) scan->rs_base.rs_private;
+ BlockNumber blockno;
+ Buffer buf;
+ bool already_valid;
+
+ Assert(scan->rs_inited);
+ Assert(scan->rs_base.rs_parallel);
+ Assert(scan->rs_nblocks > 0);
+
+ blockno = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscanwork, pbscan);
+
+ /* Other processes might have already finished the scan. */
+ if (blockno == InvalidBlockNumber)
+ {
+ *read_private = 0;
+ return PGSR_NEXT_END;
+ }
+
+ buf = ReadBufferAsync(scan->rs_base.rs_rd, MAIN_FORKNUM, blockno,
+ RBM_NORMAL, scan->rs_strategy, &already_valid,
+ &aio);
+ *read_private = (uintptr_t) buf;
+
+ if (already_valid)
+ return PGSR_NEXT_NO_IO;
+ else
+ return PGSR_NEXT_IO;
+}
+
+static void
+heap_pgsr_release(uintptr_t pgsr_private, uintptr_t read_private)
+{
+ HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+ Buffer buf = (Buffer) read_private;
+
+ ereport(DEBUG2,
+ errmsg("pgsr %s: releasing buf %d",
+ NameStr(scan->rs_base.rs_rd->rd_rel->relname),
+ buf),
+ errhidestmt(true),
+ errhidecontext(true));
+
+ Assert(BufferIsValid(buf));
+ ReleaseBuffer(buf);
+}
+
+static PgStreamingRead *
+heap_pgsr_single_alloc(HeapScanDesc scan)
+{
+ int iodepth = Max(Min(128, NBuffers / 128), 1);
+
+ return pg_streaming_read_alloc(iodepth, (uintptr_t) scan,
+ heap_pgsr_next_single,
+ heap_pgsr_release);
+}
+
+static PgStreamingRead *
+heap_pgsr_parallel_alloc(HeapScanDesc scan)
+{
+ int iodepth = Max(Min(128, NBuffers / 128), 1);
+
+ return pg_streaming_read_alloc(iodepth, (uintptr_t) scan,
+ heap_pgsr_next_parallel,
+ heap_pgsr_release);
+}
+
+
/* ----------------
* initscan - scan code common to heap_beginscan and heap_rescan
* ----------------
*/
if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN)
pgstat_count_heap_scan(scan->rs_base.rs_rd);
+
+ scan->rs_prefetch_block = InvalidBlockNumber;
+ if (scan->pgsr)
+ {
+ pg_streaming_read_free(scan->pgsr);
+ scan->pgsr = NULL;
+ }
+
+ /*
+ * FIXME: This probably should be done in the !rs_inited blocks instead.
+ */
+ scan->pgsr = NULL;
+ if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) &&
+ (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN))
+ {
+ if (scan->rs_base.rs_parallel)
+ scan->pgsr = heap_pgsr_parallel_alloc(scan);
+ else
+ scan->pgsr = heap_pgsr_single_alloc(scan);
+ }
}
/*
* which tuples on the page are visible.
*/
void
-heapgetpage(TableScanDesc sscan, BlockNumber page)
+heapgetpage(TableScanDesc sscan, BlockNumber page, Buffer pgsr_buffer)
{
HeapScanDesc scan = (HeapScanDesc) sscan;
Buffer buffer;
*/
CHECK_FOR_INTERRUPTS();
- /* read page using selected strategy */
- scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, page,
- RBM_NORMAL, scan->rs_strategy);
+ if (BufferIsValid(pgsr_buffer))
+ {
+ Assert(scan->pgsr);
+ scan->rs_cbuf = pgsr_buffer;
+ }
+ else
+ {
+ Assert(!scan->pgsr);
+
+ /* read page using selected strategy */
+ scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, page,
+ RBM_NORMAL, scan->rs_strategy);
+ }
scan->rs_cblock = page;
if (!(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE))
Snapshot snapshot = scan->rs_base.rs_snapshot;
bool backward = ScanDirectionIsBackward(dir);
BlockNumber page;
+ Buffer pgsr_buf = InvalidBuffer;
bool finished;
Page dp;
int lines;
*/
if (ScanDirectionIsForward(dir))
{
+ /*
+ * FIXME: This logic badly needs to be consolidated into one
+ * place. Instead of having logic at the top and bottom of heapgettup
+ * and heapgettup_pagemode() each.
+ */
if (!scan->rs_inited)
{
+ scan->rs_inited = true;
+
/*
* return null immediately if relation is empty
*/
tuple->t_data = NULL;
return;
}
- if (scan->rs_base.rs_parallel != NULL)
+ if (scan->pgsr)
+ {
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWorker pbscanwork =
+ (ParallelBlockTableScanWorker) scan->rs_base.rs_private;
+
+ if (scan->rs_base.rs_parallel != NULL)
+ {
+ table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, pbscanwork, pbscan);
+ }
+
+ pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
+
+ /* Other processes might have already finished the scan. */
+ if (scan->rs_base.rs_parallel != NULL)
+ {
+ if (!BufferIsValid(pgsr_buf))
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+ page = BufferGetBlockNumber(pgsr_buf);
+ }
+ else
+ {
+ Assert(BufferGetBlockNumber(pgsr_buf) == scan->rs_startblock);
+ page = scan->rs_startblock; /* crosscheck */
+ }
+ }
+ else if (scan->rs_base.rs_parallel != NULL)
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
}
else
page = scan->rs_startblock; /* first page */
- heapgetpage((TableScanDesc) scan, page);
+
+ heapgetpage((TableScanDesc) scan, page, pgsr_buf);
lineoff = FirstOffsetNumber; /* first offnum */
- scan->rs_inited = true;
}
else
{
/* backward parallel scan not supported */
Assert(scan->rs_base.rs_parallel == NULL);
+ if (scan->pgsr)
+ {
+ pg_streaming_read_free(scan->pgsr);
+ scan->pgsr = NULL;
+ }
+
if (!scan->rs_inited)
{
/*
page = scan->rs_startblock - 1;
else
page = scan->rs_nblocks - 1;
- heapgetpage((TableScanDesc) scan, page);
+ heapgetpage((TableScanDesc) scan, page, InvalidBuffer);
}
else
{
page = ItemPointerGetBlockNumber(&(tuple->t_self));
if (page != scan->rs_cblock)
- heapgetpage((TableScanDesc) scan, page);
+ heapgetpage((TableScanDesc) scan, page, InvalidBuffer);
/* Since the tuple was previously fetched, needn't lock page here */
dp = BufferGetPage(scan->rs_cbuf);
* it's time to move to the next.
*/
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+ pgsr_buf = InvalidBuffer;
/*
* advance to next/prior page and detect end of scan
page = scan->rs_nblocks;
page--;
}
+ else if (scan->pgsr)
+ {
+ pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
+
+ if (BufferIsValid(pgsr_buf))
+ {
+ page = BufferGetBlockNumber(pgsr_buf);
+ /* FIXME: add crosscheck for block number */
+ /* FIXME: ss_report_location */
+ finished = false;
+ }
+ else
+ finished = true;
+ }
else if (scan->rs_base.rs_parallel != NULL)
{
ParallelBlockTableScanDesc pbscan =
return;
}
- heapgetpage((TableScanDesc) scan, page);
+ heapgetpage((TableScanDesc) scan, page, pgsr_buf);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
HeapTuple tuple = &(scan->rs_ctup);
bool backward = ScanDirectionIsBackward(dir);
BlockNumber page;
+ Buffer pgsr_buf = InvalidBuffer;
bool finished;
Page dp;
int lines;
{
if (!scan->rs_inited)
{
+ scan->rs_inited = true;
+
/*
* return null immediately if relation is empty
*/
tuple->t_data = NULL;
return;
}
- if (scan->rs_base.rs_parallel != NULL)
+ if (scan->pgsr)
+ {
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWorker pbscanwork =
+ (ParallelBlockTableScanWorker) scan->rs_base.rs_private;
+
+ if (scan->rs_base.rs_parallel != NULL)
+ {
+ table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, pbscanwork, pbscan);
+ }
+
+ pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
+
+ /* Other processes might have already finished the scan. */
+ if (scan->rs_base.rs_parallel != NULL)
+ {
+ if (!BufferIsValid(pgsr_buf))
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+ page = BufferGetBlockNumber(pgsr_buf);
+ }
+ else
+ {
+ Assert(BufferGetBlockNumber(pgsr_buf) == scan->rs_startblock);
+ page = scan->rs_startblock; /* crosscheck */
+ }
+ }
+ else if (scan->rs_base.rs_parallel != NULL)
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
}
else
page = scan->rs_startblock; /* first page */
- heapgetpage((TableScanDesc) scan, page);
+ heapgetpage((TableScanDesc) scan, page, pgsr_buf);
lineindex = 0;
- scan->rs_inited = true;
}
else
{
/* backward parallel scan not supported */
Assert(scan->rs_base.rs_parallel == NULL);
+ if (scan->pgsr)
+ {
+ pg_streaming_read_free(scan->pgsr);
+ scan->pgsr = NULL;
+ }
+
if (!scan->rs_inited)
{
/*
page = scan->rs_startblock - 1;
else
page = scan->rs_nblocks - 1;
- heapgetpage((TableScanDesc) scan, page);
+ heapgetpage((TableScanDesc) scan, page, InvalidBuffer);
}
else
{
page = ItemPointerGetBlockNumber(&(tuple->t_self));
if (page != scan->rs_cblock)
- heapgetpage((TableScanDesc) scan, page);
+ heapgetpage((TableScanDesc) scan, page, InvalidBuffer);
/* Since the tuple was previously fetched, needn't lock page here */
dp = BufferGetPage(scan->rs_cbuf);
++lineindex;
}
+ pgsr_buf = InvalidBuffer;
+
/*
* if we get here, it means we've exhausted the items on this page and
* it's time to move to the next.
page = scan->rs_nblocks;
page--;
}
+ else if (scan->pgsr)
+ {
+ pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
+
+ if (BufferIsValid(pgsr_buf))
+ {
+ page = BufferGetBlockNumber(pgsr_buf);
+ /* FIXME: add crosscheck for block number */
+ /* FIXME: ss_report_location */
+ finished = false;
+ }
+ else
+ finished = true;
+ }
else if (scan->rs_base.rs_parallel != NULL)
{
ParallelBlockTableScanDesc pbscan =
return;
}
- heapgetpage((TableScanDesc) scan, page);
+ heapgetpage((TableScanDesc) scan, page, pgsr_buf);
dp = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
palloc(sizeof(ParallelBlockTableScanWorkerData));
scan->rs_strategy = NULL; /* set in initscan */
+ scan->pgsr = NULL;
+
/*
* Disable page-at-a-time mode if it's not a MVCC-safe snapshot.
*/
if (BufferIsValid(scan->rs_cbuf))
ReleaseBuffer(scan->rs_cbuf);
+ if (scan->pgsr)
+ {
+ pg_streaming_read_free(scan->pgsr);
+ scan->pgsr = NULL;
+ }
+
/*
* decrement relation reference count and free scan descriptor storage
*/