From 75da2bece670059f3c1a3628dfbc3d24cc9638b8 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Thu, 13 Mar 2025 15:43:34 +1300 Subject: [PATCH] Fix read_stream.c for changing io_combine_limit. In a couple of places, read_stream.c assumed that io_combine_limit would be stable during the lifetime of a stream. That is not true in at least one unusual case: streams held by CURSORs where you could change the GUC between FETCH commands, with unpredictable results. Fix, by storing stream->io_combine_limit and referring only to that after construction. This mirrors the treatment of the other important setting {effective,maintenance}_io_concurrency, which is stored in stream->max_ios. One of the cases was the queue overflow space, which was sized for io_combine_limit and could be overrun if the GUC was increased. Since that coding was a little hard to follow, also introduce a variable for better readability instead of open-coding the arithmetic. Doing so revealed an off-by-one thinko while clamping max_pinned_buffers to INT16_MAX, though that wasn't a live bug due to the current limits on GUC values. Back-patch to 17. Discussion: https://postgr.es/m/CA%2BhUKG%2B2T9p-%2BzM6Eeou-RAJjTML6eit1qn26f9twznX59qtCA%40mail.gmail.com --- src/backend/storage/aio/read_stream.c | 38 +++++++++++++++++++-------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 04bdb5e6d4b..36fb9fe152c 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -109,6 +109,7 @@ typedef struct InProgressIO struct ReadStream { int16 max_ios; + int16 io_combine_limit; int16 ios_in_progress; int16 queue_size; int16 max_pinned_buffers; @@ -236,7 +237,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) /* This should only be called with a pending read. */ Assert(stream->pending_read_nblocks > 0); - Assert(stream->pending_read_nblocks <= io_combine_limit); + Assert(stream->pending_read_nblocks <= stream->io_combine_limit); /* We had better not exceed the pin limit by starting this read. */ Assert(stream->pinned_buffers + stream->pending_read_nblocks <= @@ -324,7 +325,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) int16 buffer_index; void *per_buffer_data; - if (stream->pending_read_nblocks == io_combine_limit) + if (stream->pending_read_nblocks == stream->io_combine_limit) { read_stream_start_pending_read(stream, suppress_advice); suppress_advice = false; @@ -384,7 +385,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) * signaled end-of-stream, we start the read immediately. */ if (stream->pending_read_nblocks > 0 && - (stream->pending_read_nblocks == io_combine_limit || + (stream->pending_read_nblocks == stream->io_combine_limit || (stream->pending_read_nblocks == stream->distance && stream->pinned_buffers == 0) || stream->distance == 0) && @@ -415,6 +416,7 @@ read_stream_begin_impl(int flags, ReadStream *stream; size_t size; int16 queue_size; + int16 queue_overflow; int max_ios; int strategy_pin_limit; uint32 max_pinned_buffers; @@ -445,6 +447,14 @@ read_stream_begin_impl(int flags, /* Cap to INT16_MAX to avoid overflowing below */ max_ios = Min(max_ios, PG_INT16_MAX); + /* + * If starting a multi-block I/O near the end of the queue, we might + * temporarily need extra space for overflowing buffers before they are + * moved to regular circular position. This is the maximum extra space we + * could need. + */ + queue_overflow = io_combine_limit - 1; + /* * Choose the maximum number of buffers we're prepared to pin. We try to * pin fewer if we can, though. We add one so that we can make progress @@ -459,7 +469,7 @@ read_stream_begin_impl(int flags, */ max_pinned_buffers = (max_ios + 1) * io_combine_limit; max_pinned_buffers = Min(max_pinned_buffers, - PG_INT16_MAX - io_combine_limit - 1); + PG_INT16_MAX - queue_overflow - 1); /* Give the strategy a chance to limit the number of buffers we pin. */ strategy_pin_limit = GetAccessStrategyPinLimit(strategy); @@ -485,18 +495,17 @@ read_stream_begin_impl(int flags, * one big chunk. Though we have queue_size buffers, we want to be able * to assume that all the buffers for a single read are contiguous (i.e. * don't wrap around halfway through), so we allow temporary overflows of - * up to the maximum possible read size by allocating an extra - * io_combine_limit - 1 elements. + * up to the maximum possible overflow size. */ size = offsetof(ReadStream, buffers); - size += sizeof(Buffer) * (queue_size + io_combine_limit - 1); + size += sizeof(Buffer) * (queue_size + queue_overflow); size += sizeof(InProgressIO) * Max(1, max_ios); size += per_buffer_data_size * queue_size; size += MAXIMUM_ALIGNOF * 2; stream = (ReadStream *) palloc(size); memset(stream, 0, offsetof(ReadStream, buffers)); stream->ios = (InProgressIO *) - MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]); + MAXALIGN(&stream->buffers[queue_size + queue_overflow]); if (per_buffer_data_size > 0) stream->per_buffer_data = (void *) MAXALIGN(&stream->ios[Max(1, max_ios)]); @@ -523,7 +532,14 @@ read_stream_begin_impl(int flags, if (max_ios == 0) max_ios = 1; + /* + * Capture stable values for these two GUC-derived numbers for the + * lifetime of this stream, so we don't have to worry about the GUCs + * changing underneath us beyond this point. + */ stream->max_ios = max_ios; + stream->io_combine_limit = io_combine_limit; + stream->per_buffer_data_size = per_buffer_data_size; stream->max_pinned_buffers = max_pinned_buffers; stream->queue_size = queue_size; @@ -537,7 +553,7 @@ read_stream_begin_impl(int flags, * doing full io_combine_limit sized reads (behavior B). */ if (flags & READ_STREAM_FULL) - stream->distance = Min(max_pinned_buffers, io_combine_limit); + stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); else stream->distance = 1; @@ -752,14 +768,14 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) else { /* No advice; move towards io_combine_limit (behavior B). */ - if (stream->distance > io_combine_limit) + if (stream->distance > stream->io_combine_limit) { stream->distance--; } else { distance = stream->distance * 2; - distance = Min(distance, io_combine_limit); + distance = Min(distance, stream->io_combine_limit); distance = Min(distance, stream->max_pinned_buffers); stream->distance = distance; } -- 2.30.2