Fix memory counter update in ReorderBuffer.
authorMasahiko Sawada <[email protected]>
Mon, 26 Aug 2024 18:00:07 +0000 (11:00 -0700)
committerMasahiko Sawada <[email protected]>
Mon, 26 Aug 2024 18:00:07 +0000 (11:00 -0700)
Commit 5bec1d6bc5e changed the memory usage updates of the
ReorderBufferTXN to zero all at once by subtracting txn->size, rather
than updating it for each change. However, if TOAST reconstruction
data remained in the transaction when freeing it, there were cases
where it further subtracted the memory counter from zero, resulting in
an assertion failure.

This change calculates the memory size for each change and updates the
memory usage to precisely the amount that has been freed.

Backpatch to v17, where this was introducd.

Reviewed-by: Amit Kapila, Shlok Kyal
Discussion: https://postgr.es/m/CAD21AoAqkNUvicgKPT_dXzNoOwpPkVTg0QPPxEcWmzT0moCJ1g%40mail.gmail.com
Backpatch-through: 17

contrib/test_decoding/expected/stream.out
contrib/test_decoding/sql/stream.sql
src/backend/replication/logical/reorderbuffer.c

index 4ab2d47bf8d3c33d3a2420ee5aca79e70a291a11..a76f77601e26f7333565aea78503074a6bd99b16 100644 (file)
@@ -109,6 +109,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  committing streamed transaction
 (17 rows)
 
+/*
+ * Test concurrent abort with toast data. When streaming the second insertion, we
+ * detect that the subtransaction was aborted, and reset the transaction while having
+ * the TOAST changes in memory, resulting in deallocating both decoded changes and
+ * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ */
+BEGIN;
+INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+ALTER TABLE stream_test ADD COLUMN i INT;
+SAVEPOINT s1;
+INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
+ROLLBACK TO s1;
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ count 
+-------
+     5
+(1 row)
+
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
index 4feec62972a5e4b6ac40fddbaed3fc8836d645a7..7f43f0c2ab72a0a437298ed7d3d9553ebfb38eec 100644 (file)
@@ -44,5 +44,20 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012
 
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
+/*
+ * Test concurrent abort with toast data. When streaming the second insertion, we
+ * detect that the subtransaction was aborted, and reset the transaction while having
+ * the TOAST changes in memory, resulting in deallocating both decoded changes and
+ * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ */
+BEGIN;
+INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+ALTER TABLE stream_test ADD COLUMN i INT;
+SAVEPOINT s1;
+INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
+ROLLBACK TO s1;
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
index 00a8327e771532f67e49c43ed27f940c787e8ad9..b3139c41e2b0c70f6b8bdb7135bc5ce27daeda47 100644 (file)
@@ -467,6 +467,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
    /* Reset the toast hash */
    ReorderBufferToastReset(rb, txn);
 
+   /* All changes must be deallocated */
+   Assert(txn->size == 0);
+
    pfree(txn);
 }
 
@@ -1506,6 +1509,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
    bool        found;
    dlist_mutable_iter iter;
+   Size        mem_freed = 0;
 
    /* cleanup subtransactions & their changes */
    dlist_foreach_modify(iter, &txn->subtxns)
@@ -1535,9 +1539,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
        /* Check we're not mixing changes from different transactions. */
        Assert(change->txn == txn);
 
+       /*
+        * Instead of updating the memory counter for individual changes,
+        * we sum up the size of memory to free so we can update the memory
+        * counter all together below. This saves costs of maintaining
+        * the max-heap.
+        */
+       mem_freed += ReorderBufferChangeSize(change);
+
        ReorderBufferReturnChange(rb, change, false);
    }
 
+   /* Update the memory counter */
+   ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
+
    /*
     * Cleanup the tuplecids we stored for decoding catalog snapshot access.
     * They are always stored in the toplevel transaction.
@@ -1594,9 +1609,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
    if (rbtxn_is_serialized(txn))
        ReorderBufferRestoreCleanup(rb, txn);
 
-   /* Update the memory counter */
-   ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
-
    /* deallocate */
    ReorderBufferReturnTXN(rb, txn);
 }
@@ -1616,6 +1628,7 @@ static void
 ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 {
    dlist_mutable_iter iter;
+   Size    mem_freed = 0;
 
    /* cleanup subtransactions & their changes */
    dlist_foreach_modify(iter, &txn->subtxns)
@@ -1648,11 +1661,19 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
        /* remove the change from it's containing list */
        dlist_delete(&change->node);
 
+       /*
+        * Instead of updating the memory counter for individual changes,
+        * we sum up the size of memory to free so we can update the memory
+        * counter all together below. This saves costs of maintaining
+        * the max-heap.
+        */
+       mem_freed += ReorderBufferChangeSize(change);
+
        ReorderBufferReturnChange(rb, change, false);
    }
 
    /* Update the memory counter */
-   ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+   ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
 
    /*
     * Mark the transaction as streamed.
@@ -2062,6 +2083,9 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
        rb->stream_stop(rb, txn, last_lsn);
        ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
    }
+
+   /* All changes must be deallocated */
+   Assert(txn->size == 0);
 }
 
 /*