committing streamed transaction
(17 rows)
+/*
+ * Test concurrent abort with toast data. When streaming the second insertion, we
+ * detect that the subtransaction was aborted, and reset the transaction while having
+ * the TOAST changes in memory, resulting in deallocating both decoded changes and
+ * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ */
+BEGIN;
+INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+ALTER TABLE stream_test ADD COLUMN i INT;
+SAVEPOINT s1;
+INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
+ROLLBACK TO s1;
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ count
+-------
+ 5
+(1 row)
+
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+/*
+ * Test concurrent abort with toast data. When streaming the second insertion, we
+ * detect that the subtransaction was aborted, and reset the transaction while having
+ * the TOAST changes in memory, resulting in deallocating both decoded changes and
+ * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ */
+BEGIN;
+INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+ALTER TABLE stream_test ADD COLUMN i INT;
+SAVEPOINT s1;
+INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
+ROLLBACK TO s1;
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
/* Reset the toast hash */
ReorderBufferToastReset(rb, txn);
+ /* All changes must be deallocated */
+ Assert(txn->size == 0);
+
pfree(txn);
}
{
bool found;
dlist_mutable_iter iter;
+ Size mem_freed = 0;
/* cleanup subtransactions & their changes */
dlist_foreach_modify(iter, &txn->subtxns)
/* Check we're not mixing changes from different transactions. */
Assert(change->txn == txn);
+ /*
+ * Instead of updating the memory counter for individual changes,
+ * we sum up the size of memory to free so we can update the memory
+ * counter all together below. This saves costs of maintaining
+ * the max-heap.
+ */
+ mem_freed += ReorderBufferChangeSize(change);
+
ReorderBufferReturnChange(rb, change, false);
}
+ /* Update the memory counter */
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
+
/*
* Cleanup the tuplecids we stored for decoding catalog snapshot access.
* They are always stored in the toplevel transaction.
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
- /* Update the memory counter */
- ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
-
/* deallocate */
ReorderBufferReturnTXN(rb, txn);
}
ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
{
dlist_mutable_iter iter;
+ Size mem_freed = 0;
/* cleanup subtransactions & their changes */
dlist_foreach_modify(iter, &txn->subtxns)
/* remove the change from it's containing list */
dlist_delete(&change->node);
+ /*
+ * Instead of updating the memory counter for individual changes,
+ * we sum up the size of memory to free so we can update the memory
+ * counter all together below. This saves costs of maintaining
+ * the max-heap.
+ */
+ mem_freed += ReorderBufferChangeSize(change);
+
ReorderBufferReturnChange(rb, change, false);
}
/* Update the memory counter */
- ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
/*
* Mark the transaction as streamed.
rb->stream_stop(rb, txn, last_lsn);
ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
}
+
+ /* All changes must be deallocated */
+ Assert(txn->size == 0);
}
/*