Enforce memory limit during parallel GIN builds
authorTomas Vondra <[email protected]>
Tue, 4 Mar 2025 19:37:55 +0000 (20:37 +0100)
committerTomas Vondra <[email protected]>
Tue, 4 Mar 2025 19:41:13 +0000 (20:41 +0100)
Index builds are expected to respect maintenance_work_mem, just like
other maintenance operations. For serial builds this is done simply by
flushing the buffer in ginBuildCallback() into the index. But with
parallel builds it's more complicated, because there are multiple places
that can allocate memory.

ginBuildCallbackParallel() does the same thing as ginBuildCallback(),
except that the accumulated items are written into tuplesort. Then the
entries with the same key get merged - first in the worker, then in the
leader - and the TID lists may get (arbitrarily) long. It's unlikely it
would exceed the memory limit, but it's possible. We address this by
evicting some of the data if the list gets too long.

We can't simply dump the whole in-memory TID list. The GIN index bulk
insert code expects to see TIDs in monotonic order; it may fail if the
TIDs go backwards. If the TID lists overlap, evicting the whole current
TID list would break this (a later entry might add "old" TID values into
the already-written part).

In the workers this is not an issue, because the lists never overlap.
But the leader may see overlapping lists produced by the workers.

We can however derive a safe "horizon" TID - the entries (for a given
key) are sorted by (key, first TID), which means no future list can add
values before the last "first TID" we've seen. This patch tracks the
"frozen" part of the TID list, which we know can't change by merging
additional TID lists. If needed, we can evict this part of the list.

We don't want to do this too often - the smaller lists we evict, the
more expensive it'll be to merge them in the next step (especially in
the leader). Therefore we only trim the list if we have at least 1024
frozen items, and if the whole list is at least 64kB large.

These thresholds are somewhat arbitrary and conservative. We might
calculate the values from maintenance_work_mem, but tests show that does
not really improve anything (time, compression ratio, ...). So we stick
to these conservative values to release memory faster.

Author: Tomas Vondra
Reviewed-by: Matthias van de Meent, Andy Fan, Kirill Reshke
Discussion: https://postgr.es/m/6ab4003f-a8b8-4d75-a67f-f25ad98582dc%40enterprisedb.com

src/backend/access/gin/gininsert.c

index 27c14adbc3a39a25da5067eb67fb347d4fd3d53d..b2f89cad88023010a2301d6a903613fb5bd38f84 100644 (file)
@@ -1155,8 +1155,12 @@ typedef struct GinBuffer
    int16       typlen;
    bool        typbyval;
 
+   /* Number of TIDs to collect before attempt to write some out. */
+   int         maxitems;
+
    /* array of TID values */
    int         nitems;
+   int         nfrozen;
    SortSupport ssup;           /* for sorting/comparing keys */
    ItemPointerData *items;
 } GinBuffer;
@@ -1229,6 +1233,13 @@ GinBufferInit(Relation index)
                nKeys;
    TupleDesc   desc = RelationGetDescr(index);
 
+   /*
+    * How many items can we fit into the memory limit? We don't want to end
+    * with too many TIDs. and 64kB seems more than enough. But maybe this
+    * should be tied to maintenance_work_mem or something like that?
+    */
+   buffer->maxitems = (64 * 1024L) / sizeof(ItemPointerData);
+
    nKeys = IndexRelationGetNumberOfKeyAttributes(index);
 
    buffer->ssup = palloc0(sizeof(SortSupportData) * nKeys);
@@ -1336,6 +1347,48 @@ GinBufferKeyEquals(GinBuffer *buffer, GinTuple *tup)
    return (r == 0);
 }
 
+/*
+ * GinBufferShouldTrim
+ *     Should we trim the list of item pointers?
+ *
+ * By trimming we understand writing out and removing the tuple IDs that
+ * we know can't change by future merges. We can deduce the TID up to which
+ * this is guaranteed from the "first" TID in each GIN tuple, which provides
+ * a "horizon" (for a given key) thanks to the sort.
+ *
+ * We don't want to do this too often - compressing longer TID lists is more
+ * efficient. But we also don't want to accumulate too many TIDs, for two
+ * reasons. First, it consumes memory and we might exceed maintenance_work_mem
+ * (or whatever limit applies), even if that's unlikely because TIDs are very
+ * small so we can fit a lot of them. Second, and more importantly, long TID
+ * lists are an issue if the scan wraps around, because a key may get a very
+ * wide list (with min/max TID for that key), forcing "full" mergesorts for
+ * every list merged into it (instead of the efficient append).
+ *
+ * So we look at two things when deciding if to trim - if the resulting list
+ * (after adding TIDs from the new tuple) would be too long, and if there is
+ * enough TIDs to trim (with values less than "first" TID from the new tuple),
+ * we do the trim. By enough we mean at least 128 TIDs (mostly an arbitrary
+ * number).
+ */
+static bool
+GinBufferShouldTrim(GinBuffer *buffer, GinTuple *tup)
+{
+   /* not enough TIDs to trim (1024 is somewhat arbitrary number) */
+   if (buffer->nfrozen < 1024)
+       return false;
+
+   /* no need to trim if we have not hit the memory limit yet */
+   if ((buffer->nitems + tup->nitems) < buffer->maxitems)
+       return false;
+
+   /*
+    * OK, we have enough frozen TIDs to flush, and we have hit the memory
+    * limit, so it's time to write it out.
+    */
+   return true;
+}
+
 /*
  * GinBufferStoreTuple
  *     Add data (especially TID list) from a GIN tuple to the buffer.
@@ -1386,21 +1439,76 @@ GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
            buffer->key = (Datum) 0;
    }
 
+   /*
+    * Try freeze TIDs at the beginning of the list, i.e. exclude them from
+    * the mergesort. We can do that with TIDs before the first TID in the new
+    * tuple we're about to add into the buffer.
+    *
+    * We do this incrementally when adding data into the in-memory buffer,
+    * and not later (e.g. when hitting a memory limit), because it allows us
+    * to skip the frozen data during the mergesort, making it cheaper.
+    */
+
+   /*
+    * Check if the last TID in the current list is frozen. This is the case
+    * when merging non-overlapping lists, e.g. in each parallel worker.
+    */
+   if ((buffer->nitems > 0) &&
+       (ItemPointerCompare(&buffer->items[buffer->nitems - 1],
+                           GinTupleGetFirst(tup)) == 0))
+       buffer->nfrozen = buffer->nitems;
+
+   /*
+    * Now find the last TID we know to be frozen, i.e. the last TID right
+    * before the new GIN tuple.
+    *
+    * Start with the first not-yet-frozen tuple, and walk until we find the
+    * first TID that's higher. If we already know the whole list is frozen
+    * (i.e. nfrozen == nitems), this does nothing.
+    *
+    * XXX This might do a binary search for sufficiently long lists, but it
+    * does not seem worth the complexity. Overlapping lists should be rare
+    * common, TID comparisons are cheap, and we should quickly freeze most of
+    * the list.
+    */
+   for (int i = buffer->nfrozen; i < buffer->nitems; i++)
+   {
+       /* Is the TID after the first TID of the new tuple? Can't freeze. */
+       if (ItemPointerCompare(&buffer->items[i],
+                              GinTupleGetFirst(tup)) > 0)
+           break;
+
+       buffer->nfrozen++;
+   }
+
    /* add the new TIDs into the buffer, combine using merge-sort */
    {
        int         nnew;
        ItemPointer new;
 
-       new = ginMergeItemPointers(buffer->items, buffer->nitems,
+       /*
+        * Resize the array - we do this first, because we'll dereference the
+        * first unfrozen TID, which would fail if the array is NULL. We'll
+        * still pass 0 as number of elements in that array though.
+        */
+       if (buffer->items == NULL)
+           buffer->items = palloc((buffer->nitems + tup->nitems) * sizeof(ItemPointerData));
+       else
+           buffer->items = repalloc(buffer->items,
+                                    (buffer->nitems + tup->nitems) * sizeof(ItemPointerData));
+
+       new = ginMergeItemPointers(&buffer->items[buffer->nfrozen], /* first unfronzen */
+                                  (buffer->nitems - buffer->nfrozen),  /* num of unfrozen */
                                   items, tup->nitems, &nnew);
 
-       Assert(nnew == buffer->nitems + tup->nitems);
+       Assert(nnew == (tup->nitems + (buffer->nitems - buffer->nfrozen)));
 
-       if (buffer->items)
-           pfree(buffer->items);
+       memcpy(&buffer->items[buffer->nfrozen], new,
+              nnew * sizeof(ItemPointerData));
 
-       buffer->items = new;
-       buffer->nitems = nnew;
+       pfree(new);
+
+       buffer->nitems += tup->nitems;
 
        AssertCheckItemPointers(buffer);
    }
@@ -1432,11 +1540,29 @@ GinBufferReset(GinBuffer *buffer)
    buffer->category = 0;
    buffer->keylen = 0;
    buffer->nitems = 0;
+   buffer->nfrozen = 0;
 
    buffer->typlen = 0;
    buffer->typbyval = 0;
 }
 
+/*
+ * GinBufferTrim
+ *     Discard the "frozen" part of the TID list (which should have been
+ *     written to disk/index before this call).
+ */
+static void
+GinBufferTrim(GinBuffer *buffer)
+{
+   Assert((buffer->nfrozen > 0) && (buffer->nfrozen <= buffer->nitems));
+
+   memmove(&buffer->items[0], &buffer->items[buffer->nfrozen],
+           sizeof(ItemPointerData) * (buffer->nitems - buffer->nfrozen));
+
+   buffer->nitems -= buffer->nfrozen;
+   buffer->nfrozen = 0;
+}
+
 /*
  * GinBufferFree
  *     Release memory associated with the GinBuffer (including TID array).
@@ -1504,7 +1630,12 @@ _gin_parallel_merge(GinBuildState *state)
    /* do the actual sort in the leader */
    tuplesort_performsort(state->bs_sortstate);
 
-   /* initialize buffer to combine entries for the same key */
+   /*
+    * Initialize buffer to combine entries for the same key.
+    *
+    * The leader is allowed to use the whole maintenance_work_mem buffer to
+    * combine data. The parallel workers already completed.
+    */
    buffer = GinBufferInit(state->ginstate.index);
 
    /*
@@ -1562,6 +1693,32 @@ _gin_parallel_merge(GinBuildState *state)
            GinBufferReset(buffer);
        }
 
+       /*
+        * We're about to add a GIN tuple to the buffer - check the memory
+        * limit first, and maybe write out some of the data into the index
+        * first, if needed (and possible). We only flush the part of the TID
+        * list that we know won't change, and only if there's enough data for
+        * compression to work well.
+        */
+       if (GinBufferShouldTrim(buffer, tup))
+       {
+           Assert(buffer->nfrozen > 0);
+
+           /*
+            * Buffer is not empty and it's storing a different key - flush
+            * the data into the insert, and start a new entry for current
+            * GinTuple.
+            */
+           AssertCheckItemPointers(buffer);
+
+           ginEntryInsert(&state->ginstate,
+                          buffer->attnum, buffer->key, buffer->category,
+                          buffer->items, buffer->nfrozen, &state->buildStats);
+
+           /* truncate the data we've just discarded */
+           GinBufferTrim(buffer);
+       }
+
        /*
         * Remember data for the current tuple (either remember the new key,
         * or append if to the existing data).
@@ -1655,7 +1812,13 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort,
 
    GinBuffer  *buffer;
 
-   /* initialize buffer to combine entries for the same key */
+   /*
+    * Initialize buffer to combine entries for the same key.
+    *
+    * The workers are limited to the same amount of memory as during the sort
+    * in ginBuildCallbackParallel. But this probably should be the 32MB used
+    * during planning, just like there.
+    */
    buffer = GinBufferInit(state->ginstate.index);
 
    /* sort the raw per-worker data */
@@ -1711,6 +1874,39 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort,
            GinBufferReset(buffer);
        }
 
+       /*
+        * We're about to add a GIN tuple to the buffer - check the memory
+        * limit first, and maybe write out some of the data into the index
+        * first, if needed (and possible). We only flush the part of the TID
+        * list that we know won't change, and only if there's enough data for
+        * compression to work well.
+        */
+       if (GinBufferShouldTrim(buffer, tup))
+       {
+           GinTuple   *ntup;
+           Size        ntuplen;
+
+           Assert(buffer->nfrozen > 0);
+
+           /*
+            * Buffer is not empty and it's storing a different key - flush
+            * the data into the insert, and start a new entry for current
+            * GinTuple.
+            */
+           AssertCheckItemPointers(buffer);
+
+           ntup = _gin_build_tuple(buffer->attnum, buffer->category,
+                                   buffer->key, buffer->typlen, buffer->typbyval,
+                                   buffer->items, buffer->nfrozen, &ntuplen);
+
+           tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
+
+           pfree(ntup);
+
+           /* truncate the data we've just discarded */
+           GinBufferTrim(buffer);
+       }
+
        /*
         * Remember data for the current tuple (either remember the new key,
         * or append if to the existing data).