Respect changing pin limits in read_stream.c.
authorThomas Munro <[email protected]>
Fri, 14 Mar 2025 07:39:43 +0000 (20:39 +1300)
committerThomas Munro <[email protected]>
Fri, 14 Mar 2025 08:21:09 +0000 (21:21 +1300)
To avoid pinning too much of the buffer pool at once, read_stream.c
previously used LimitAdditionalPins().  The coding was naive, and only
considered the available buffers at stream construction time.

This commit checks before each StartReadBuffers() call with
GetAdditionalPinLimit().  The result might change over time due to pins
acquired outside this stream by the same backend.  No extra CPU cycles
are added to the all-buffered fast-path code, but the I/O-starting path
now considers the up-to-date remaining buffer limit.

In practice it was quite difficult to exceed limits and cause any real
problems in v17, so no back-patch for now, but proposed changes will
make it easier.

Per code review from Andres, in the course of testing his AIO patches.

Reviewed-by: Andres Freund <[email protected]> (earlier versions)
Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com

src/backend/storage/aio/read_stream.c

index 36fb9fe152cf4355aa91a0c712d010d3293cf20c..175f8410baffcd1054aee97666140731e3f692d4 100644 (file)
@@ -116,6 +116,7 @@ struct ReadStream
    int16       pinned_buffers;
    int16       distance;
    bool        advice_enabled;
+   bool        temporary;
 
    /*
     * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -213,8 +214,9 @@ read_stream_get_block(ReadStream *stream, void *per_buffer_data)
 }
 
 /*
- * In order to deal with short reads in StartReadBuffers(), we sometimes need
- * to defer handling of a block until later.
+ * In order to deal with buffer shortages and I/O limits after short reads, we
+ * sometimes need to defer handling of a block we've already consumed from the
+ * registered callback until later.
  */
 static inline void
 read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
@@ -225,7 +227,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
    stream->buffered_blocknum = blocknum;
 }
 
-static void
+/*
+ * Start as much of the current pending read as we can.  If we have to split it
+ * because of the per-backend buffer limit, or the buffer manager decides to
+ * split it, then the pending read is adjusted to hold the remaining portion.
+ *
+ * We can always start a read of at least size one if we have no progress yet.
+ * Otherwise it's possible that we can't start a read at all because of a lack
+ * of buffers, and then false is returned.  Buffer shortages also reduce the
+ * distance to a level that prevents look-ahead until buffers are released.
+ */
+static bool
 read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 {
    bool        need_wait;
@@ -234,12 +246,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
    int16       io_index;
    int16       overflow;
    int16       buffer_index;
+   int16       buffer_limit;
 
    /* This should only be called with a pending read. */
    Assert(stream->pending_read_nblocks > 0);
    Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
 
-   /* We had better not exceed the pin limit by starting this read. */
+   /* We had better not exceed the per-stream buffer limit with this read. */
    Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
           stream->max_pinned_buffers);
 
@@ -260,10 +273,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
    else
        flags = 0;
 
-   /* We say how many blocks we want to read, but may be smaller on return. */
+   /* How many more buffers is this backend allowed? */
+   if (stream->temporary)
+       buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
+   else
+       buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
+   if (buffer_limit == 0 && stream->pinned_buffers == 0)
+       buffer_limit = 1;       /* guarantee progress */
+
+   /* Does the per-backend limit affect this read? */
+   nblocks = stream->pending_read_nblocks;
+   if (buffer_limit < nblocks)
+   {
+       int16       new_distance;
+
+       /* Shrink distance: no more look-ahead until buffers are released. */
+       new_distance = stream->pinned_buffers + buffer_limit;
+       if (stream->distance > new_distance)
+           stream->distance = new_distance;
+
+       /* Unless we have nothing to give the consumer, stop here. */
+       if (stream->pinned_buffers > 0)
+           return false;
+
+       /* A short read is required to make progress. */
+       nblocks = buffer_limit;
+   }
+
+   /*
+    * We say how many blocks we want to read, but it may be smaller on return
+    * if the buffer manager decides to shorten the read.
+    */
    buffer_index = stream->next_buffer_index;
    io_index = stream->next_io_index;
-   nblocks = stream->pending_read_nblocks;
    need_wait = StartReadBuffers(&stream->ios[io_index].op,
                                 &stream->buffers[buffer_index],
                                 stream->pending_read_blocknum,
@@ -313,6 +355,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
    /* Adjust the pending read to cover the remaining portion, if any. */
    stream->pending_read_blocknum += nblocks;
    stream->pending_read_nblocks -= nblocks;
+
+   return true;
 }
 
 static void
@@ -361,14 +405,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
        /* We have to start the pending read before we can build another. */
        while (stream->pending_read_nblocks > 0)
        {
-           read_stream_start_pending_read(stream, suppress_advice);
-           suppress_advice = false;
-           if (stream->ios_in_progress == stream->max_ios)
+           if (!read_stream_start_pending_read(stream, suppress_advice) ||
+               stream->ios_in_progress == stream->max_ios)
            {
-               /* And we've hit the limit.  Rewind, and stop here. */
+               /* We've hit the buffer or I/O limit.  Rewind and stop here. */
                read_stream_unget_block(stream, blocknum);
                return;
            }
+
+           suppress_advice = false;
        }
 
        /* This is the start of a new pending read. */
@@ -382,15 +427,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
     * io_combine_limit size once more buffers have been consumed.  However,
     * if we've already reached io_combine_limit, or we've reached the
     * distance limit and there isn't anything pinned yet, or the callback has
-    * signaled end-of-stream, we start the read immediately.
+    * signaled end-of-stream, we start the read immediately.  Note that the
+    * pending read can exceed the distance goal, if the latter was reduced
+    * after hitting the per-backend buffer limit.
     */
    if (stream->pending_read_nblocks > 0 &&
        (stream->pending_read_nblocks == stream->io_combine_limit ||
-        (stream->pending_read_nblocks == stream->distance &&
+        (stream->pending_read_nblocks >= stream->distance &&
          stream->pinned_buffers == 0) ||
         stream->distance == 0) &&
        stream->ios_in_progress < stream->max_ios)
        read_stream_start_pending_read(stream, suppress_advice);
+
+   /*
+    * There should always be something pinned when we leave this function,
+    * whether started by this call or not, unless we've hit the end of the
+    * stream.  In the worst case we can always make progress one buffer at a
+    * time.
+    */
+   Assert(stream->pinned_buffers > 0 || stream->distance == 0);
 }
 
 /*
@@ -420,6 +475,7 @@ read_stream_begin_impl(int flags,
    int         max_ios;
    int         strategy_pin_limit;
    uint32      max_pinned_buffers;
+   uint32      max_possible_buffer_limit;
    Oid         tablespace_id;
 
    /*
@@ -475,12 +531,23 @@ read_stream_begin_impl(int flags,
    strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
    max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
 
-   /* Don't allow this backend to pin more than its share of buffers. */
+   /*
+    * Also limit our queue to the maximum number of pins we could ever be
+    * allowed to acquire according to the buffer manager.  We may not really
+    * be able to use them all due to other pins held by this backend, but
+    * we'll check that later in read_stream_start_pending_read().
+    */
    if (SmgrIsTemp(smgr))
-       LimitAdditionalLocalPins(&max_pinned_buffers);
+       max_possible_buffer_limit = GetLocalPinLimit();
    else
-       LimitAdditionalPins(&max_pinned_buffers);
-   Assert(max_pinned_buffers > 0);
+       max_possible_buffer_limit = GetPinLimit();
+   max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
+
+   /*
+    * The limit might be zero on a system configured with too few buffers for
+    * the number of connections.  We need at least one to make progress.
+    */
+   max_pinned_buffers = Max(1, max_pinned_buffers);
 
    /*
     * We need one extra entry for buffers and per-buffer data, because users
@@ -546,6 +613,7 @@ read_stream_begin_impl(int flags,
    stream->callback = callback;
    stream->callback_private_data = callback_private_data;
    stream->buffered_blocknum = InvalidBlockNumber;
+   stream->temporary = SmgrIsTemp(smgr);
 
    /*
     * Skip the initial ramp-up phase if the caller says we're going to be
@@ -674,6 +742,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
             * arbitrary I/O entry (they're all free).  We don't have to
             * adjust pinned_buffers because we're transferring one to caller
             * but pinning one more.
+            *
+            * In the fast path we don't need to check the pin limit.  We're
+            * always allowed at least one pin so that progress can be made,
+            * and that's all we need here.  Although two pins are momentarily
+            * held at the same time, the model used here is that the stream
+            * holds only one, and the other now belongs to the caller.
             */
            if (likely(!StartReadBuffer(&stream->ios[0].op,
                                        &stream->buffers[oldest_buffer_index],