Rename RBTXN_PREPARE to RBTXN_IS_PREPARE for better clarification.
authorMasahiko Sawada <[email protected]>
Thu, 13 Feb 2025 00:55:00 +0000 (16:55 -0800)
committerMasahiko Sawada <[email protected]>
Thu, 13 Feb 2025 00:55:00 +0000 (16:55 -0800)
RBTXN_PREPARE flag and rbtxn_prepared macro could be misinterpreted as
either indicating the transaction type (e.g. a prepared transaction or
a normal transaction) or its currentstate (e.g. skipped or its prepare
message is sent), especially after commit 072ee847ad4 introduced the
RBTXN_SENT_PREPARE flag and the rbtxn_sent_prepare macro.

The RBTXN_PREPARE flag (and its corresponding macro) have been renamed
to RBTXN_IS_PREPARE to explicitly indicate the transaction
type. Therefore, this commit also adds the RBTXN_IS_PREPARE flag to
the transaction that is a prepared transaction and has been skipped,
which previously had only the RBTXN_SKIPPED_PREPARE flag.

Reviewed-by: Amit Kapila, Peter Smith
Discussion: https://postgr.es/m/CAA4eK1KgNmBsG%3D155E7QQ6TX9RoWnM4z5Z20SvsbwxSe_QXYsg%40mail.gmail.com

src/backend/replication/logical/proto.c
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/logical/snapbuild.c
src/include/replication/reorderbuffer.h

index dc72b7c8f77a9c85eed0051ff215cf3b7177c4cb..1a352b542dc5670d2838149c5e51e1b042b9254c 100644 (file)
@@ -164,7 +164,7 @@ logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
     * which case we expect to have a valid GID.
     */
    Assert(txn->gid != NULL);
-   Assert(rbtxn_prepared(txn));
+   Assert(rbtxn_is_prepared(txn));
    Assert(TransactionIdIsValid(txn->xid));
 
    /* send the flags field */
index ed5a2946dc1ae8c5995c325724653bd25846643e..b42f4002ba84589bc4efd6d58fe06d6dcba0735f 100644 (file)
@@ -1793,7 +1793,7 @@ ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn
     * and the toast reconstruction data. The full cleanup will happen as part
     * of decoding ABORT record of this transaction.
     */
-   ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+   ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
    ReorderBufferToastReset(rb, txn);
 
    /* All changes should be discarded */
@@ -1968,7 +1968,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
    ReorderBufferStreamTXN(rb, txn);
 
-   if (rbtxn_prepared(txn))
+   if (rbtxn_is_prepared(txn))
    {
        /*
         * Note, we send stream prepare even if a concurrent abort is
@@ -2150,7 +2150,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
                      ReorderBufferChange *specinsert)
 {
    /* Discard the changes that we just streamed */
-   ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+   ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
 
    /* Free all resources allocated for toast reconstruction */
    ReorderBufferToastReset(rb, txn);
@@ -2238,7 +2238,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
         */
        if (!streaming)
        {
-           if (rbtxn_prepared(txn))
+           if (rbtxn_is_prepared(txn))
                rb->begin_prepare(rb, txn);
            else
                rb->begin(rb, txn);
@@ -2280,7 +2280,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
             * required for the cases when we decode the changes before the
             * COMMIT record is processed.
             */
-           if (streaming || rbtxn_prepared(change->txn))
+           if (streaming || rbtxn_is_prepared(change->txn))
            {
                curtxn = change->txn;
                SetupCheckXidLive(curtxn->xid);
@@ -2625,7 +2625,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
             * Call either PREPARE (for two-phase transactions) or COMMIT (for
             * regular ones).
             */
-           if (rbtxn_prepared(txn))
+           if (rbtxn_is_prepared(txn))
            {
                Assert(!rbtxn_sent_prepare(txn));
                rb->prepare(rb, txn, commit_lsn);
@@ -2680,12 +2680,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
         * For 4, as the entire txn has been decoded, we can fully clean up
         * the TXN reorder buffer.
         */
-       if (streaming || rbtxn_prepared(txn))
+       if (streaming || rbtxn_is_prepared(txn))
        {
            if (streaming)
                ReorderBufferMaybeMarkTXNStreamed(rb, txn);
 
-           ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+           ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
            /* Reset the CheckXidAlive */
            CheckXidAlive = InvalidTransactionId;
        }
@@ -2729,7 +2729,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
         * during a two-phase commit.
         */
        if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
-           (stream_started || rbtxn_prepared(txn)))
+           (stream_started || rbtxn_is_prepared(txn)))
        {
            /* curtxn must be set for streaming or prepared transactions */
            Assert(curtxn);
@@ -2816,7 +2816,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
         * Removing this txn before a commit might result in the computation
         * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
         */
-       if (!rbtxn_prepared(txn))
+       if (!rbtxn_is_prepared(txn))
            ReorderBufferCleanupTXN(rb, txn);
        return;
    }
@@ -2853,7 +2853,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
- * Record the prepare information for a transaction.
+ * Record the prepare information for a transaction. Also, mark the transaction
+ * as a prepared transaction.
  */
 bool
 ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
@@ -2879,6 +2880,10 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
    txn->origin_id = origin_id;
    txn->origin_lsn = origin_lsn;
 
+   /* Mark this transaction as a prepared transaction */
+   Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == 0);
+   txn->txn_flags |= RBTXN_IS_PREPARED;
+
    return true;
 }
 
@@ -2894,6 +2899,8 @@ ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
    if (txn == NULL)
        return;
 
+   /* txn must have been marked as a prepared transaction */
+   Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED);
    txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
 }
 
@@ -2915,12 +2922,16 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
    if (txn == NULL)
        return;
 
-   txn->txn_flags |= RBTXN_PREPARE;
-   txn->gid = pstrdup(gid);
-
-   /* The prepare info must have been updated in txn by now. */
+   /*
+    * txn must have been marked as a prepared transaction and must have
+    * neither been skipped nor sent a prepare. Also, the prepare info must
+    * have been updated in it by now.
+    */
+   Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED);
    Assert(txn->final_lsn != InvalidXLogRecPtr);
 
+   txn->gid = pstrdup(gid);
+
    ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
                        txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
 
@@ -2976,12 +2987,13 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
     */
    if ((txn->final_lsn < two_phase_at) && is_commit)
    {
-       txn->txn_flags |= RBTXN_PREPARE;
-
        /*
-        * The prepare info must have been updated in txn even if we skip
-        * prepare.
+        * txn must have been marked as a prepared transaction and skipped but
+        * not sent a prepare. Also, the prepare info must have been updated
+        * in txn even if we skip prepare.
         */
+       Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) ==
+              (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE));
        Assert(txn->final_lsn != InvalidXLogRecPtr);
 
        /*
index bbedd3de31831e9590610c2f2cd7fb6b64b72825..05687fd75e5b4c1065c9baf9295b86b1143bb9cb 100644 (file)
@@ -761,7 +761,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
         * We don't need to add snapshot to prepared transactions as they
         * should not see the new catalog contents.
         */
-       if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
+       if (rbtxn_is_prepared(txn))
            continue;
 
        elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
index 9d9ac2f08305a0ba88ea6874489c4ac080fff5ff..517a8e3634fd1e147d96de844aed32dc2de29cb8 100644 (file)
@@ -170,13 +170,15 @@ typedef struct ReorderBufferChange
 #define RBTXN_IS_SERIALIZED_CLEAR  0x0008
 #define RBTXN_IS_STREAMED          0x0010
 #define RBTXN_HAS_PARTIAL_CHANGE   0x0020
-#define RBTXN_PREPARE              0x0040
+#define RBTXN_IS_PREPARED          0x0040
 #define RBTXN_SKIPPED_PREPARE      0x0080
 #define RBTXN_HAS_STREAMABLE_CHANGE    0x0100
 #define RBTXN_SENT_PREPARE         0x0200
 #define RBTXN_IS_COMMITTED         0x0400
 #define RBTXN_IS_ABORTED           0x0800
 
+#define RBTXN_PREPARE_STATUS_MASK  (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
+
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
 ( \
@@ -234,9 +236,9 @@ typedef struct ReorderBufferChange
  * committed. To check whether a prepare or a stream_prepare has already
  * been sent for this transaction, we need to use rbtxn_sent_prepare().
  */
-#define rbtxn_prepared(txn) \
+#define rbtxn_is_prepared(txn) \
 ( \
-   ((txn)->txn_flags & RBTXN_PREPARE) != 0 \
+   ((txn)->txn_flags & RBTXN_IS_PREPARED) != 0 \
 )
 
 /* Has a prepare or stream_prepare already been sent? */