--- /dev/null
+# Test that catalog cache invalidation messages are distributed to ongoing
+# transactions, ensuring they can access the updated catalog content after
+# processing these messages.
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
+ CREATE TABLE tbl1(val1 integer, val2 integer);
+ CREATE PUBLICATION pub;
+}
+
+teardown
+{
+ DROP TABLE tbl1;
+ DROP PUBLICATION pub;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
+step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
static void SnapBuildSnapIncRefcount(Snapshot snap);
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
uint32 xinfo);
}
/*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
*/
static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
{
dlist_iter txn_i;
ReorderBufferTXN *txn;
/*
* Iterate through all toplevel transactions. This can include
* subtransactions which we just don't yet know to be that, but that's
- * fine, they will just get an unnecessary snapshot queued.
+ * fine, they will just get an unnecessary snapshot and invalidations
+ * queued.
*/
dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
{
* transaction which in turn implies we don't yet need a snapshot at
* all. We'll add a snapshot when the first change gets queued.
*
+ * Similarly, we don't need to add invalidations to a transaction whose
+ * base snapshot is not yet set. Once a base snapshot is built, it will
+ * include the xids of committed transactions that have modified the
+ * catalog, thus reflecting the new catalog contents. The existing
+ * catalog cache will have already been invalidated after processing
+ * the invalidations in the transaction that modified catalogs,
+ * ensuring that a fresh cache is constructed during decoding.
+ *
* NB: This works correctly even for subtransactions because
* ReorderBufferAssignChild() takes care to transfer the base snapshot
* to the top-level transaction, and while iterating the changequeue
continue;
/*
- * We don't need to add snapshot to prepared transactions as they
- * should not see the new catalog contents.
+ * We don't need to add snapshot or invalidations to prepared
+ * transactions as they should not see the new catalog contents.
*/
if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
continue;
- elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
+ elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
txn->xid, LSN_FORMAT_ARGS(lsn));
/*
SnapBuildSnapIncRefcount(builder->snapshot);
ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
builder->snapshot);
+
+ /*
+ * Add invalidation messages to the reorder buffer of in-progress
+ * transactions except the current committed transaction, for which we
+ * will execute invalidations at the end.
+ *
+ * It is required, otherwise, we will end up using the stale catcache
+ * contents built by the current transaction even after its decoding,
+ * which should have been invalidated due to concurrent catalog
+ * changing transaction.
+ */
+ if (txn->xid != xid)
+ {
+ uint32 ninvalidations;
+ SharedInvalidationMessage *msgs = NULL;
+
+ ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
+ xid, &msgs);
+
+ if (ninvalidations > 0)
+ {
+ Assert(msgs != NULL);
+
+ ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
+ ninvalidations, msgs);
+ }
+ }
}
}
/* refcount of the snapshot builder for the new snapshot */
SnapBuildSnapIncRefcount(builder->snapshot);
- /* add a new catalog snapshot to all currently running transactions */
- SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+ /*
+ * Add a new catalog snapshot and invalidations messages to all
+ * currently running transactions.
+ */
+ SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
}
}