* and the toast reconstruction data. The full cleanup will happen as part
* of decoding ABORT record of this transaction.
*/
- ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
ReorderBufferToastReset(rb, txn);
/* All changes should be discarded */
ReorderBufferStreamTXN(rb, txn);
- if (rbtxn_prepared(txn))
+ if (rbtxn_is_prepared(txn))
{
/*
* Note, we send stream prepare even if a concurrent abort is
ReorderBufferChange *specinsert)
{
/* Discard the changes that we just streamed */
- ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
/* Free all resources allocated for toast reconstruction */
ReorderBufferToastReset(rb, txn);
*/
if (!streaming)
{
- if (rbtxn_prepared(txn))
+ if (rbtxn_is_prepared(txn))
rb->begin_prepare(rb, txn);
else
rb->begin(rb, txn);
* required for the cases when we decode the changes before the
* COMMIT record is processed.
*/
- if (streaming || rbtxn_prepared(change->txn))
+ if (streaming || rbtxn_is_prepared(change->txn))
{
curtxn = change->txn;
SetupCheckXidLive(curtxn->xid);
* Call either PREPARE (for two-phase transactions) or COMMIT (for
* regular ones).
*/
- if (rbtxn_prepared(txn))
+ if (rbtxn_is_prepared(txn))
{
Assert(!rbtxn_sent_prepare(txn));
rb->prepare(rb, txn, commit_lsn);
* For 4, as the entire txn has been decoded, we can fully clean up
* the TXN reorder buffer.
*/
- if (streaming || rbtxn_prepared(txn))
+ if (streaming || rbtxn_is_prepared(txn))
{
if (streaming)
ReorderBufferMaybeMarkTXNStreamed(rb, txn);
- ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
* during a two-phase commit.
*/
if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
- (stream_started || rbtxn_prepared(txn)))
+ (stream_started || rbtxn_is_prepared(txn)))
{
/* curtxn must be set for streaming or prepared transactions */
Assert(curtxn);
* Removing this txn before a commit might result in the computation
* of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
*/
- if (!rbtxn_prepared(txn))
+ if (!rbtxn_is_prepared(txn))
ReorderBufferCleanupTXN(rb, txn);
return;
}
}
/*
- * Record the prepare information for a transaction.
+ * Record the prepare information for a transaction. Also, mark the transaction
+ * as a prepared transaction.
*/
bool
ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
+ /* Mark this transaction as a prepared transaction */
+ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == 0);
+ txn->txn_flags |= RBTXN_IS_PREPARED;
+
return true;
}
if (txn == NULL)
return;
+ /* txn must have been marked as a prepared transaction */
+ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED);
txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
}
if (txn == NULL)
return;
- txn->txn_flags |= RBTXN_PREPARE;
- txn->gid = pstrdup(gid);
-
- /* The prepare info must have been updated in txn by now. */
+ /*
+ * txn must have been marked as a prepared transaction and must have
+ * neither been skipped nor sent a prepare. Also, the prepare info must
+ * have been updated in it by now.
+ */
+ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED);
Assert(txn->final_lsn != InvalidXLogRecPtr);
+ txn->gid = pstrdup(gid);
+
ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
*/
if ((txn->final_lsn < two_phase_at) && is_commit)
{
- txn->txn_flags |= RBTXN_PREPARE;
-
/*
- * The prepare info must have been updated in txn even if we skip
- * prepare.
+ * txn must have been marked as a prepared transaction and skipped but
+ * not sent a prepare. Also, the prepare info must have been updated
+ * in txn even if we skip prepare.
*/
+ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) ==
+ (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE));
Assert(txn->final_lsn != InvalidXLogRecPtr);
/*
#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
#define RBTXN_IS_STREAMED 0x0010
#define RBTXN_HAS_PARTIAL_CHANGE 0x0020
-#define RBTXN_PREPARE 0x0040
+#define RBTXN_IS_PREPARED 0x0040
#define RBTXN_SKIPPED_PREPARE 0x0080
#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100
#define RBTXN_SENT_PREPARE 0x0200
#define RBTXN_IS_COMMITTED 0x0400
#define RBTXN_IS_ABORTED 0x0800
+#define RBTXN_PREPARE_STATUS_MASK (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
+
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
( \
* committed. To check whether a prepare or a stream_prepare has already
* been sent for this transaction, we need to use rbtxn_sent_prepare().
*/
-#define rbtxn_prepared(txn) \
+#define rbtxn_is_prepared(txn) \
( \
- ((txn)->txn_flags & RBTXN_PREPARE) != 0 \
+ ((txn)->txn_flags & RBTXN_IS_PREPARED) != 0 \
)
/* Has a prepare or stream_prepare already been sent? */