BlockNumber blockNum,
BufferAccessStrategy strategy,
bool *foundPtr, IOContext io_context);
+static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks);
+static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete);
static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context);
return buffer;
}
+ /*
+ * Signal that we are going to immediately wait. If we're immediately
+ * waiting, there is no benefit in actually executing the IO
+ * asynchronously, it would just add dispatch overhead.
+ */
+ flags = READ_BUFFERS_SYNCHRONOUSLY;
if (mode == RBM_ZERO_ON_ERROR)
- flags = READ_BUFFERS_ZERO_ON_ERROR;
- else
- flags = 0;
+ flags |= READ_BUFFERS_ZERO_ON_ERROR;
operation.smgr = smgr;
operation.rel = rel;
operation.persistence = persistence;
{
int actual_nblocks = *nblocks;
int maxcombine = 0;
+ bool did_start_io;
Assert(*nblocks == 1 || allow_forwarding);
Assert(*nblocks > 0);
if (i == 0)
{
*nblocks = 1;
+
+#ifdef USE_ASSERT_CHECKING
+
+ /*
+ * Initialize enough of ReadBuffersOperation to make
+ * CheckReadBuffersOperation() work. Outside of assertions
+ * that's not necessary when no IO is issued.
+ */
+ operation->buffers = buffers;
+ operation->blocknum = blockNum;
+ operation->nblocks = 1;
+ operation->nblocks_done = 1;
+ CheckReadBuffersOperation(operation, true);
+#endif
return false;
}
operation->blocknum = blockNum;
operation->flags = flags;
operation->nblocks = actual_nblocks;
+ operation->nblocks_done = 0;
+ pgaio_wref_clear(&operation->io_wref);
- if (flags & READ_BUFFERS_ISSUE_ADVICE)
+ /*
+ * When using AIO, start the IO in the background. If not, issue prefetch
+ * requests if desired by the caller.
+ *
+ * The reason we have a dedicated path for IOMETHOD_SYNC here is to
+ * de-risk the introduction of AIO somewhat. It's a large architectural
+ * change, with lots of chances for unanticipated performance effects.
+ *
+ * Use of IOMETHOD_SYNC already leads to not actually performing IO
+ * asynchronously, but without the check here we'd execute IO earlier than
+ * we used to. Eventually this IOMETHOD_SYNC specific path should go away.
+ */
+ if (io_method != IOMETHOD_SYNC)
{
/*
- * In theory we should only do this if PinBufferForBlock() had to
- * allocate new buffers above. That way, if two calls to
- * StartReadBuffers() were made for the same blocks before
- * WaitReadBuffers(), only the first would issue the advice. That'd be
- * a better simulation of true asynchronous I/O, which would only
- * start the I/O once, but isn't done here for simplicity.
+ * Try to start IO asynchronously. It's possible that no IO needs to
+ * be started, if another backend already performed the IO.
+ *
+ * Note that if an IO is started, it might not cover the entire
+ * requested range, e.g. because an intermediary block has been read
+ * in by another backend. In that case any "trailing" buffers we
+ * already pinned above will be "forwarded" by read_stream.c to the
+ * next call to StartReadBuffers().
+ *
+ * This is signalled to the caller by decrementing *nblocks *and*
+ * reducing operation->nblocks. The latter is done here, but not below
+ * WaitReadBuffers(), as in WaitReadBuffers() we can't "shorten" the
+ * overall read size anymore, we need to retry until done in its
+ * entirety or until failed.
*/
- smgrprefetch(operation->smgr,
- operation->forknum,
- blockNum,
- actual_nblocks);
+ did_start_io = AsyncReadBuffers(operation, nblocks);
+
+ operation->nblocks = *nblocks;
}
+ else
+ {
+ operation->flags |= READ_BUFFERS_SYNCHRONOUSLY;
- /* Indicate that WaitReadBuffers() should be called. */
- return true;
+ if (flags & READ_BUFFERS_ISSUE_ADVICE)
+ {
+ /*
+ * In theory we should only do this if PinBufferForBlock() had to
+ * allocate new buffers above. That way, if two calls to
+ * StartReadBuffers() were made for the same blocks before
+ * WaitReadBuffers(), only the first would issue the advice.
+ * That'd be a better simulation of true asynchronous I/O, which
+ * would only start the I/O once, but isn't done here for
+ * simplicity.
+ */
+ smgrprefetch(operation->smgr,
+ operation->forknum,
+ blockNum,
+ actual_nblocks);
+ }
+
+ /*
+ * Indicate that WaitReadBuffers() should be called. WaitReadBuffers()
+ * will initiate the necessary IO.
+ */
+ did_start_io = true;
+ }
+
+ CheckReadBuffersOperation(operation, !did_start_io);
+
+ return did_start_io;
}
/*
return result;
}
+/*
+ * Perform sanity checks on the ReadBuffersOperation.
+ */
+static void
+CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
+{
+#ifdef USE_ASSERT_CHECKING
+ Assert(operation->nblocks_done <= operation->nblocks);
+ Assert(!is_complete || operation->nblocks == operation->nblocks_done);
+
+ for (int i = 0; i < operation->nblocks; i++)
+ {
+ Buffer buffer = operation->buffers[i];
+ BufferDesc *buf_hdr = BufferIsLocal(buffer) ?
+ GetLocalBufferDescriptor(-buffer - 1) :
+ GetBufferDescriptor(buffer - 1);
+
+ Assert(BufferGetBlockNumber(buffer) == operation->blocknum + i);
+ Assert(pg_atomic_read_u32(&buf_hdr->state) & BM_TAG_VALID);
+
+ if (i < operation->nblocks_done)
+ Assert(pg_atomic_read_u32(&buf_hdr->state) & BM_VALID);
+ }
+#endif
+}
+
+/* helper for ReadBuffersCanStartIO(), to avoid repetition */
static inline bool
-WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
{
if (BufferIsLocal(buffer))
return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
}
+/*
+ * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
+ */
+static inline bool
+ReadBuffersCanStartIO(Buffer buffer, bool nowait)
+{
+ /*
+ * If this backend currently has staged IO, we need to submit the pending
+ * IO before waiting for the right to issue IO, to avoid the potential for
+ * deadlocks (and, more commonly, unnecessary delays for other backends).
+ */
+ if (!nowait && pgaio_have_staged())
+ {
+ if (ReadBuffersCanStartIOOnce(buffer, true))
+ return true;
+
+ /*
+ * Unfortunately StartBufferIO() returning false doesn't allow to
+ * distinguish between the buffer already being valid and IO already
+ * being in progress. Since IO already being in progress is quite
+ * rare, this approach seems fine.
+ */
+ pgaio_submit_staged();
+ }
+
+ return ReadBuffersCanStartIOOnce(buffer, nowait);
+}
+
+/*
+ * Helper for WaitReadBuffers() that processes the results of a readv
+ * operation, raising an error if necessary.
+ */
+static void
+ProcessReadBuffersResult(ReadBuffersOperation *operation)
+{
+ PgAioReturn *aio_ret = &operation->io_return;
+ PgAioResultStatus rs = aio_ret->result.status;
+ int newly_read_blocks = 0;
+
+ Assert(pgaio_wref_valid(&operation->io_wref));
+ Assert(aio_ret->result.status != PGAIO_RS_UNKNOWN);
+
+ /*
+ * SMGR reports the number of blocks successfully read as the result of
+ * the IO operation. Thus we can simply add that to ->nblocks_done.
+ */
+
+ if (likely(rs != PGAIO_RS_ERROR))
+ newly_read_blocks = aio_ret->result.result;
+
+ if (rs == PGAIO_RS_ERROR || rs == PGAIO_RS_WARNING)
+ pgaio_result_report(aio_ret->result, &aio_ret->target_data,
+ rs == PGAIO_RS_ERROR ? ERROR : WARNING);
+ else if (aio_ret->result.status == PGAIO_RS_PARTIAL)
+ {
+ /*
+ * We'll retry, so we just emit a debug message to the server log (or
+ * not even that in prod scenarios).
+ */
+ pgaio_result_report(aio_ret->result, &aio_ret->target_data, DEBUG1);
+ elog(DEBUG3, "partial read, will retry");
+ }
+
+ Assert(newly_read_blocks > 0);
+ Assert(newly_read_blocks <= MAX_IO_COMBINE_LIMIT);
+
+ operation->nblocks_done += newly_read_blocks;
+
+ Assert(operation->nblocks_done <= operation->nblocks);
+}
+
void
WaitReadBuffers(ReadBuffersOperation *operation)
{
- Buffer *buffers;
- int nblocks;
- BlockNumber blocknum;
- ForkNumber forknum;
+ PgAioReturn *aio_ret = &operation->io_return;
IOContext io_context;
IOObject io_object;
- char persistence;
-
- /* Find the range of the physical read we need to perform. */
- nblocks = operation->nblocks;
- buffers = &operation->buffers[0];
- blocknum = operation->blocknum;
- forknum = operation->forknum;
- persistence = operation->persistence;
- Assert(nblocks > 0);
- Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
-
- if (persistence == RELPERSISTENCE_TEMP)
+ if (operation->persistence == RELPERSISTENCE_TEMP)
{
io_context = IOCONTEXT_NORMAL;
io_object = IOOBJECT_TEMP_RELATION;
io_object = IOOBJECT_RELATION;
}
- for (int i = 0; i < nblocks; ++i)
+ /*
+ * If we get here without an IO operation having been issued, the
+ * io_method == IOMETHOD_SYNC path must have been used. Otherwise the
+ * caller should not have called WaitReadBuffers().
+ *
+ * In the case of IOMETHOD_SYNC, we start - as we used to before the
+ * introducing of AIO - the IO in WaitReadBuffers(). This is done as part
+ * of the retry logic below, no extra code is required.
+ *
+ * This path is expected to eventually go away.
+ */
+ if (!pgaio_wref_valid(&operation->io_wref) && io_method != IOMETHOD_SYNC)
+ elog(ERROR, "waiting for read operation that didn't read");
+
+ /*
+ * To handle partial reads, and IOMETHOD_SYNC, we re-issue IO until we're
+ * done. We may need multiple retries, not just because we could get
+ * multiple partial reads, but also because some of the remaining
+ * to-be-read buffers may have been read in by other backends, limiting
+ * the IO size.
+ */
+ while (true)
{
- int io_buffers_len;
- Buffer io_buffers[MAX_IO_COMBINE_LIMIT];
- void *io_pages[MAX_IO_COMBINE_LIMIT];
- instr_time io_start;
- BlockNumber io_first_block;
+ int ignored_nblocks_progress;
+
+ CheckReadBuffersOperation(operation, false);
/*
- * Skip this block if someone else has already completed it. If an
- * I/O is already in progress in another backend, this will wait for
- * the outcome: either done, or something went wrong and we will
- * retry.
+ * If there is an IO associated with the operation, we may need to
+ * wait for it.
*/
- if (!WaitReadBuffersCanStartIO(buffers[i], false))
+ if (pgaio_wref_valid(&operation->io_wref))
{
/*
- * Report and track this as a 'hit' for this backend, even though
- * it must have started out as a miss in PinBufferForBlock(). The
- * other backend will track this as a 'read'.
+ * Track the time spent waiting for the IO to complete. As
+ * tracking a wait even if we don't actually need to wait
+ *
+ * a) is not cheap, due to the timestamping overhead
+ *
+ * b) reports some time as waiting, even if we never waited
+ *
+ * we first check if we already know the IO is complete.
*/
- TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
- operation->smgr->smgr_rlocator.locator.spcOid,
- operation->smgr->smgr_rlocator.locator.dbOid,
- operation->smgr->smgr_rlocator.locator.relNumber,
- operation->smgr->smgr_rlocator.backend,
- true);
-
- if (persistence == RELPERSISTENCE_TEMP)
- pgBufferUsage.local_blks_hit += 1;
+ if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+ !pgaio_wref_check_done(&operation->io_wref))
+ {
+ instr_time io_start = pgstat_prepare_io_time(track_io_timing);
+
+ pgaio_wref_wait(&operation->io_wref);
+
+ /*
+ * The IO operation itself was already counted earlier, in
+ * AsyncReadBuffers(), this just accounts for the wait time.
+ */
+ pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+ io_start, 0, 0);
+ }
else
- pgBufferUsage.shared_blks_hit += 1;
+ {
+ Assert(pgaio_wref_check_done(&operation->io_wref));
+ }
- if (operation->rel)
- pgstat_count_buffer_hit(operation->rel);
+ /*
+ * We now are sure the IO completed. Check the results. This
+ * includes reporting on errors if there were any.
+ */
+ ProcessReadBuffersResult(operation);
+ }
- pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+ /*
+ * Most of the time, the one IO we already started, will read in
+ * everything. But we need to deal with partial reads and buffers not
+ * needing IO anymore.
+ */
+ if (operation->nblocks_done == operation->nblocks)
+ break;
- if (VacuumCostActive)
- VacuumCostBalance += VacuumCostPageHit;
+ CHECK_FOR_INTERRUPTS();
- continue;
- }
+ /*
+ * This may only complete the IO partially, either because some
+ * buffers were already valid, or because of a partial read.
+ *
+ * NB: In contrast to after the AsyncReadBuffers() call in
+ * StartReadBuffers(), we do *not* reduce
+ * ReadBuffersOperation->nblocks here, callers expect the full
+ * operation to be completed at this point (as more operations may
+ * have been queued).
+ */
+ AsyncReadBuffers(operation, &ignored_nblocks_progress);
+ }
+
+ CheckReadBuffersOperation(operation, true);
+
+ /* NB: READ_DONE tracepoint was already executed in completion callback */
+}
+
+/*
+ * Initiate IO for the ReadBuffersOperation
+ *
+ * This function only starts a single IO at a time. The size of the IO may be
+ * limited to below the to-be-read blocks, if one of the buffers has
+ * concurrently been read in. If the first to-be-read buffer is already valid,
+ * no IO will be issued.
+ *
+ * To support retries after partial reads, the first operation->nblocks_done
+ * buffers are skipped.
+ *
+ * On return *nblocks_progress is updated to reflect the number of buffers
+ * affected by the call. If the first buffer is valid, *nblocks_progress is
+ * set to 1 and operation->nblocks_done is incremented.
+ *
+ * Returns true if IO was initiated, false if no IO was necessary.
+ */
+static bool
+AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
+{
+ Buffer *buffers = &operation->buffers[0];
+ int flags = operation->flags;
+ BlockNumber blocknum = operation->blocknum;
+ ForkNumber forknum = operation->forknum;
+ char persistence = operation->persistence;
+ int16 nblocks_done = operation->nblocks_done;
+ Buffer *io_buffers = &operation->buffers[nblocks_done];
+ int io_buffers_len = 0;
+ PgAioHandle *ioh;
+ uint32 ioh_flags = 0;
+ void *io_pages[MAX_IO_COMBINE_LIMIT];
+ IOContext io_context;
+ IOObject io_object;
+ bool did_start_io;
+
+ /*
+ * When this IO is executed synchronously, either because the caller will
+ * immediately block waiting for the IO or because IOMETHOD_SYNC is used,
+ * the AIO subsystem needs to know.
+ */
+ if (flags & READ_BUFFERS_SYNCHRONOUSLY)
+ ioh_flags |= PGAIO_HF_SYNCHRONOUS;
+
+ if (persistence == RELPERSISTENCE_TEMP)
+ {
+ io_context = IOCONTEXT_NORMAL;
+ io_object = IOOBJECT_TEMP_RELATION;
+ ioh_flags |= PGAIO_HF_REFERENCES_LOCAL;
+ }
+ else
+ {
+ io_context = IOContextForStrategy(operation->strategy);
+ io_object = IOOBJECT_RELATION;
+ }
+
+ /*
+ * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR
+ * flag. The reason for that is that, hopefully, zero_damaged_pages isn't
+ * set globally, but on a per-session basis. The completion callback,
+ * which may be run in other processes, e.g. in IO workers, may have a
+ * different value of the zero_damaged_pages GUC.
+ *
+ * XXX: We probably should eventually use a different flag for
+ * zero_damaged_pages, so we can report different log levels / error codes
+ * for zero_damaged_pages and ZERO_ON_ERROR.
+ */
+ if (zero_damaged_pages)
+ flags |= READ_BUFFERS_ZERO_ON_ERROR;
+
+ /*
+ * For the same reason as with zero_damaged_pages we need to use this
+ * backend's ignore_checksum_failure value.
+ */
+ if (ignore_checksum_failure)
+ flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES;
+
+
+ /*
+ * To be allowed to report stats in the local completion callback we need
+ * to prepare to report stats now. This ensures we can safely report the
+ * checksum failure even in a critical section.
+ */
+ pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
+
+ /*
+ * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
+ * might block, which we don't want after setting IO_IN_PROGRESS.
+ *
+ * If we need to wait for IO before we can get a handle, submit
+ * already-staged IO first, so that other backends don't need to wait.
+ * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to
+ * wait for already submitted IO, which doesn't require additional locks,
+ * but it could still cause undesirable waits.
+ *
+ * A secondary benefit is that this would allow us to measure the time in
+ * pgaio_io_acquire() without causing undue timer overhead in the common,
+ * non-blocking, case. However, currently the pgstats infrastructure
+ * doesn't really allow that, as it a) asserts that an operation can't
+ * have time without operations b) doesn't have an API to report
+ * "accumulated" time.
+ */
+ ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return);
+ if (unlikely(!ioh))
+ {
+ pgaio_submit_staged();
+
+ ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
+ }
+
+ /*
+ * Check if we can start IO on the first to-be-read buffer.
+ *
+ * If an I/O is already in progress in another backend, we want to wait
+ * for the outcome: either done, or something went wrong and we will
+ * retry.
+ */
+ if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+ {
+ /*
+ * Someone else has already completed this block, we're done.
+ *
+ * When IO is necessary, ->nblocks_done is updated in
+ * ProcessReadBuffersResult(), but that is not called if no IO is
+ * necessary. Thus update here.
+ */
+ operation->nblocks_done += 1;
+ *nblocks_progress = 1;
+
+ pgaio_io_release(ioh);
+ pgaio_wref_clear(&operation->io_wref);
+ did_start_io = false;
+
+ /*
+ * Report and track this as a 'hit' for this backend, even though it
+ * must have started out as a miss in PinBufferForBlock(). The other
+ * backend will track this as a 'read'.
+ */
+ TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done,
+ operation->smgr->smgr_rlocator.locator.spcOid,
+ operation->smgr->smgr_rlocator.locator.dbOid,
+ operation->smgr->smgr_rlocator.locator.relNumber,
+ operation->smgr->smgr_rlocator.backend,
+ true);
+
+ if (persistence == RELPERSISTENCE_TEMP)
+ pgBufferUsage.local_blks_hit += 1;
+ else
+ pgBufferUsage.shared_blks_hit += 1;
+
+ if (operation->rel)
+ pgstat_count_buffer_hit(operation->rel);
+
+ pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+
+ if (VacuumCostActive)
+ VacuumCostBalance += VacuumCostPageHit;
+ }
+ else
+ {
+ instr_time io_start;
/* We found a buffer that we need to read in. */
- io_buffers[0] = buffers[i];
- io_pages[0] = BufferGetBlock(buffers[i]);
- io_first_block = blocknum + i;
+ Assert(io_buffers[0] == buffers[nblocks_done]);
+ io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
io_buffers_len = 1;
/*
* How many neighboring-on-disk blocks can we scatter-read into other
* buffers at the same time? In this case we don't wait if we see an
- * I/O already in progress. We already hold BM_IO_IN_PROGRESS for the
+ * I/O already in progress. We already set BM_IO_IN_PROGRESS for the
* head block, so we should get on with that I/O as soon as possible.
- * We'll come back to this block again, above.
*/
- while ((i + 1) < nblocks &&
- WaitReadBuffersCanStartIO(buffers[i + 1], true))
+ for (int i = nblocks_done + 1; i < operation->nblocks; i++)
{
+ if (!ReadBuffersCanStartIO(buffers[i], true))
+ break;
/* Must be consecutive block numbers. */
- Assert(BufferGetBlockNumber(buffers[i + 1]) ==
- BufferGetBlockNumber(buffers[i]) + 1);
+ Assert(BufferGetBlockNumber(buffers[i - 1]) ==
+ BufferGetBlockNumber(buffers[i]) - 1);
+ Assert(io_buffers[io_buffers_len] == buffers[i]);
- io_buffers[io_buffers_len] = buffers[++i];
io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
}
- io_start = pgstat_prepare_io_time(track_io_timing);
- smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);
- pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
- 1, io_buffers_len * BLCKSZ);
+ /* get a reference to wait for in WaitReadBuffers() */
+ pgaio_io_get_wref(ioh, &operation->io_wref);
- /* Verify each block we read, and terminate the I/O. */
- for (int j = 0; j < io_buffers_len; ++j)
- {
- BufferDesc *bufHdr;
- Block bufBlock;
- int piv_flags;
- bool verified;
- bool checksum_failure;
-
- if (persistence == RELPERSISTENCE_TEMP)
- {
- bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
- bufBlock = LocalBufHdrGetBlock(bufHdr);
- }
- else
- {
- bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
- bufBlock = BufHdrGetBlock(bufHdr);
- }
-
- /* check for garbage data */
- piv_flags = PIV_LOG_WARNING;
- if (ignore_checksum_failure)
- piv_flags |= PIV_IGNORE_CHECKSUM_FAILURE;
- verified = PageIsVerified((Page) bufBlock, io_first_block + j,
- piv_flags, &checksum_failure);
- if (checksum_failure)
- {
- RelFileLocatorBackend rloc = operation->smgr->smgr_rlocator;
+ /* provide the list of buffers to the completion callbacks */
+ pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
- pgstat_prepare_report_checksum_failure(rloc.locator.dbOid);
- pgstat_report_checksum_failures_in_db(rloc.locator.dbOid, 1);
- }
+ pgaio_io_register_callbacks(ioh,
+ persistence == RELPERSISTENCE_TEMP ?
+ PGAIO_HCB_LOCAL_BUFFER_READV :
+ PGAIO_HCB_SHARED_BUFFER_READV,
+ flags);
- if (!verified)
- {
- if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
- {
- ereport(WARNING,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("invalid page in block %u of relation %s; zeroing out page",
- io_first_block + j,
- relpath(operation->smgr->smgr_rlocator, forknum).str)));
- memset(bufBlock, 0, BLCKSZ);
- }
- else
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("invalid page in block %u of relation %s",
- io_first_block + j,
- relpath(operation->smgr->smgr_rlocator, forknum).str)));
- }
+ pgaio_io_set_flag(ioh, ioh_flags);
- /* Set BM_VALID, terminate IO, and wake up any waiters */
- if (persistence == RELPERSISTENCE_TEMP)
- TerminateLocalBufferIO(bufHdr, false, BM_VALID, false);
- else
- TerminateBufferIO(bufHdr, false, BM_VALID, true, false);
-
- /* Report I/Os as completing individually. */
- TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
- operation->smgr->smgr_rlocator.locator.spcOid,
- operation->smgr->smgr_rlocator.locator.dbOid,
- operation->smgr->smgr_rlocator.locator.relNumber,
- operation->smgr->smgr_rlocator.backend,
- false);
- }
+ /* ---
+ * Even though we're trying to issue IO asynchronously, track the time
+ * in smgrstartreadv():
+ * - if io_method == IOMETHOD_SYNC, we will always perform the IO
+ * immediately
+ * - the io method might not support the IO (e.g. worker IO for a temp
+ * table)
+ * ---
+ */
+ io_start = pgstat_prepare_io_time(track_io_timing);
+ smgrstartreadv(ioh, operation->smgr, forknum,
+ blocknum + nblocks_done,
+ io_pages, io_buffers_len);
+ pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+ io_start, 1, io_buffers_len * BLCKSZ);
if (persistence == RELPERSISTENCE_TEMP)
pgBufferUsage.local_blks_read += io_buffers_len;
else
pgBufferUsage.shared_blks_read += io_buffers_len;
+ /*
+ * Track vacuum cost when issuing IO, not after waiting for it.
+ * Otherwise we could end up issuing a lot of IO in a short timespan,
+ * despite a low cost limit.
+ */
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+
+ *nblocks_progress = io_buffers_len;
+ did_start_io = true;
}
+
+ return did_start_io;
}
/*