*/
int
-bkup_begin_transaction(GTM_Conn *conn, GTM_TransactionHandle txn, GTM_IsolationLevel isolevel,
+bkup_begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel,
bool read_only, uint32 client_id, GTM_Timestamp timestamp)
{
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
gtmpqPutInt(MSG_BKUP_TXN_BEGIN, sizeof (GTM_MessageType), conn) ||
- gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) ||
gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn) ||
gtmpqPutc(read_only, conn) ||
gtmpqPutInt(client_id, sizeof (uint32), conn) ||
}
int
-bkup_begin_transaction_gxid(GTM_Conn *conn, GTM_TransactionHandle txn, GlobalTransactionId gxid,
+bkup_begin_transaction_gxid(GTM_Conn *conn, GlobalTransactionId gxid,
GTM_IsolationLevel isolevel, bool read_only,
uint32 client_id, GTM_Timestamp timestamp)
{
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
gtmpqPutInt(MSG_BKUP_TXN_BEGIN_GETGXID, sizeof (GTM_MessageType), conn) ||
- gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) ||
gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) ||
gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn) ||
gtmpqPutc(read_only, conn) ||
int
-bkup_begin_transaction_autovacuum(GTM_Conn *conn, GTM_TransactionHandle txn, GlobalTransactionId gxid,
+bkup_begin_transaction_autovacuum(GTM_Conn *conn, GlobalTransactionId gxid,
GTM_IsolationLevel isolevel, uint32 client_id)
{
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
gtmpqPutInt(MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM, sizeof (GTM_MessageType), conn) ||
- gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) ||
gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) ||
gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn) ||
gtmpqPutInt(client_id, sizeof (uint32), conn))
int
bkup_begin_transaction_multi(GTM_Conn *conn, int txn_count,
- GTM_TransactionHandle *txn, GlobalTransactionId start_gxid, GTM_IsolationLevel *isolevel,
+ GlobalTransactionId start_gxid, GTM_IsolationLevel *isolevel,
bool *read_only, uint32 *client_id,
GTMProxy_ConnID *txn_connid)
{
{
if (gxid == InvalidGlobalTransactionId)
gxid = FirstNormalGlobalTransactionId;
- if (gtmpqPutInt(txn[ii], sizeof(GTM_TransactionHandle), conn) ||
- gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) ||
+ if (gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) ||
gtmpqPutInt(isolevel[ii], sizeof(GTM_IsolationLevel), conn) ||
gtmpqPutc(read_only[ii], conn) ||
gtmpqPutInt(client_id[ii], sizeof (uint32), conn) ||
for (ii = 0; ii < txn_count; ii++)
{
- if (gtmpqPutc(false, conn) ||
- gtmpqPutnchar((char *)&gxid[ii],
+ if (gtmpqPutnchar((char *)&gxid[ii],
sizeof (GlobalTransactionId), conn))
goto send_failed;
}
void
-GTM_BkupBeginTransactionMulti(GTM_TransactionHandle *txn,
- GTM_IsolationLevel *isolevel,
+GTM_BkupBeginTransactionMulti(GTM_IsolationLevel *isolevel,
bool *readonly,
uint32 *client_id,
GTMProxy_ConnID *connid,
int txn_count)
{
+ GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS];;
GTM_TransactionInfo *gtm_txninfo;
MemoryContext oldContext;
int kk;
+ int count;
gtm_txninfo = NULL;
oldContext = MemoryContextSwitchTo(TopMostMemoryContext);
- GTM_RWLockAcquire(>MTransactions.gt_TransArrayLock, GTM_LOCKMODE_WRITE);
- for (kk = 0; kk < txn_count; kk++)
- {
- gtm_txninfo = >MTransactions.gt_transactions_array[txn[kk]];
- if (gtm_txninfo->gti_in_use)
- {
- GTM_RWLockRelease(>MTransactions.gt_TransArrayLock);
- elog(ERROR, "GTM_TransactionInfo already in use. Cannot assign the transaction: handle (%d).",
- txn[kk]);
- return;
- }
-
- elog(DEBUG1, "GTM_BkupBeginTransactionMulti: handle(%u)", txn[kk]);
-
- init_GTM_TransactionInfo(gtm_txninfo, txn[kk],
- isolevel[kk], client_id[kk], connid[kk], readonly[kk]);
- GTMTransactions.gt_lastslot = txn[kk];
- GTMTransactions.gt_open_transactions = gtm_lappend(GTMTransactions.gt_open_transactions, gtm_txninfo);
- }
+ count = GTM_BeginTransactionMulti(isolevel, readonly, connid,
+ txn_count, txn);
+ if (count != txn_count)
+ ereport(ERROR,
+ (EINVAL,
+ errmsg("Failed to start %d new transactions", txn_count)));
- GTM_RWLockRelease(>MTransactions.gt_TransArrayLock);
MemoryContextSwitchTo(oldContext);
}
void
-GTM_BkupBeginTransaction(GTM_TransactionHandle txn,
- GTM_IsolationLevel isolevel,
+GTM_BkupBeginTransaction(GTM_IsolationLevel isolevel,
bool readonly,
uint32 client_id)
{
GTMProxy_ConnID connid = -1;
- GTM_BkupBeginTransactionMulti(&txn, &isolevel, &readonly,
+ GTM_BkupBeginTransactionMulti(&isolevel, &readonly,
&client_id, &connid, 1);
}
/*
/* Backup first */
if (GetMyThreadInfo->thr_conn->standby)
{
- bkup_begin_transaction(GetMyThreadInfo->thr_conn->standby, txn,
+ bkup_begin_transaction(GetMyThreadInfo->thr_conn->standby,
txn_isolation_level, txn_read_only,
GetMyThreadInfo->thr_client_id, timestamp);
/* Synch. with standby */
void
ProcessBkupBeginTransactionCommand(Port *myport, StringInfo message)
{
- GTM_TransactionHandle txn;
GTM_IsolationLevel txn_isolation_level;
bool txn_read_only;
GTM_Timestamp timestamp;
MemoryContext oldContext;
uint32 client_id;
- txn = pq_getmsgint(message, sizeof(GTM_TransactionHandle));
txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel));
txn_read_only = pq_getmsgbyte(message);
client_id = pq_getmsgint(message, sizeof (uint32));
oldContext = MemoryContextSwitchTo(TopMemoryContext);
- GTM_BkupBeginTransaction(txn, txn_isolation_level, txn_read_only,
+ GTM_BkupBeginTransaction(txn_isolation_level, txn_read_only,
client_id);
MemoryContextSwitchTo(oldContext);
retry:
bkup_begin_transaction_gxid(GetMyThreadInfo->thr_conn->standby,
- txn, gxid, txn_isolation_level,
+ gxid, txn_isolation_level,
txn_read_only,
GetMyThreadInfo->thr_client_id,
timestamp);
}
static void
-GTM_BkupBeginTransactionGetGXIDMulti(GTM_TransactionHandle *txn,
- GlobalTransactionId *gxid,
+GTM_BkupBeginTransactionGetGXIDMulti(GlobalTransactionId *gxid,
GTM_IsolationLevel *isolevel,
bool *readonly,
uint32 *client_id,
GTMProxy_ConnID *connid,
int txn_count)
{
+ GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS];;
GTM_TransactionInfo *gtm_txninfo;
int ii;
+ int count;
MemoryContext oldContext;
#ifdef XCP
bool save_control = false;
GlobalTransactionId xid = InvalidGlobalTransactionId;
#endif
-
oldContext = MemoryContextSwitchTo(TopMostMemoryContext);
+
+ count = GTM_BeginTransactionMulti(isolevel, readonly, connid,
+ txn_count, txn);
+ if (count != txn_count)
+ ereport(ERROR,
+ (EINVAL,
+ errmsg("Failed to start %d new transactions", txn_count)));
+
//XCPTODO check oldContext = MemoryContextSwitchTo(TopMemoryContext);
GTM_RWLockAcquire(>MTransactions.gt_TransArrayLock, GTM_LOCKMODE_WRITE);
for (ii = 0; ii < txn_count; ii++)
{
- gtm_txninfo = >MTransactions.gt_transactions_array[txn[ii]];
- if (gtm_txninfo->gti_in_use)
- {
- GTM_RWLockRelease(>MTransactions.gt_TransArrayLock);
- elog(ERROR, "GTM_TransactionInfo already in use. Cannot assign the transaction: handle (%d).",
- txn[ii]);
- MemoryContextSwitchTo(oldContext);
- return;
- }
- init_GTM_TransactionInfo(gtm_txninfo, txn[ii],
- isolevel[ii], client_id[ii], connid[ii], readonly[ii]);
- GTMTransactions.gt_lastslot = txn[ii];
+ gtm_txninfo = GTM_HandleToTransactionInfo(txn[ii]);
gtm_txninfo->gti_gxid = gxid[ii];
elog(DEBUG1, "GTM_BkupBeginTransactionGetGXIDMulti: xid(%u), handle(%u)",
GTMTransactions.gt_nextXid = gxid[ii] + 1;
if (!GlobalTransactionIdIsValid(GTMTransactions.gt_nextXid)) /* Handle wrap around too */
GTMTransactions.gt_nextXid = FirstNormalGlobalTransactionId;
- GTMTransactions.gt_open_transactions = gtm_lappend(GTMTransactions.gt_open_transactions, gtm_txninfo);
xid = GTMTransactions.gt_nextXid;
}
}
static void
-GTM_BkupBeginTransactionGetGXID(GTM_TransactionHandle txn,
- GlobalTransactionId gxid,
+GTM_BkupBeginTransactionGetGXID(GlobalTransactionId gxid,
GTM_IsolationLevel isolevel,
bool readonly,
uint32 client_id)
{
GTMProxy_ConnID connid = -1;
- GTM_BkupBeginTransactionGetGXIDMulti(&txn, &gxid, &isolevel,
+ GTM_BkupBeginTransactionGetGXIDMulti(&gxid, &isolevel,
&readonly, &client_id, &connid, 1);
}
void
ProcessBkupBeginTransactionGetGXIDCommand(Port *myport, StringInfo message)
{
- GTM_TransactionHandle txn;
GlobalTransactionId gxid;
GTM_IsolationLevel txn_isolation_level;
bool txn_read_only;
uint32 txn_client_id;
GTM_Timestamp timestamp;
- txn = pq_getmsgint(message, sizeof(GTM_TransactionHandle));
gxid = pq_getmsgint(message, sizeof(GlobalTransactionId));
txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel));
txn_read_only = pq_getmsgbyte(message);
memcpy(×tamp, pq_getmsgbytes(message, sizeof(GTM_Timestamp)), sizeof(GTM_Timestamp));
pq_getmsgend(message);
- GTM_BkupBeginTransactionGetGXID(txn, gxid, txn_isolation_level,
+ GTM_BkupBeginTransactionGetGXID(gxid, txn_isolation_level,
txn_read_only, txn_client_id);
}
void
ProcessBkupBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message)
{
- GTM_TransactionHandle txn;
GlobalTransactionId gxid;
GTM_IsolationLevel txn_isolation_level;
uint32 txn_client_id;
- txn = pq_getmsgint(message, sizeof(GTM_TransactionHandle));
gxid = pq_getmsgint(message, sizeof(GlobalTransactionId));
txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel));
txn_client_id = pq_getmsgint(message, sizeof (uint32));
pq_getmsgend(message);
- GTM_BkupBeginTransactionGetGXID(txn, gxid, txn_isolation_level,
+ GTM_BkupBeginTransactionGetGXID(gxid, txn_isolation_level,
false, txn_client_id);
- GTM_SetDoVacuum(txn);
+ GTM_SetDoVacuum(GTM_GXIDToHandle(gxid));
}
/*
retry:
_gxid = bkup_begin_transaction_autovacuum(GetMyThreadInfo->thr_conn->standby,
- txn, gxid,
+ gxid,
txn_isolation_level,
GetMyThreadInfo->thr_client_id);
retry:
_rc = bkup_begin_transaction_multi(GetMyThreadInfo->thr_conn->standby,
txn_count,
- txn,
start_gxid,
txn_isolation_level,
txn_read_only,
ProcessBkupBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message)
{
int txn_count;
- GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS];
GlobalTransactionId gxid[GTM_MAX_GLOBAL_TRANSACTIONS];
GTM_IsolationLevel txn_isolation_level[GTM_MAX_GLOBAL_TRANSACTIONS];
bool txn_read_only[GTM_MAX_GLOBAL_TRANSACTIONS];
for (ii = 0; ii < txn_count; ii++)
{
- txn[ii] = pq_getmsgint(message, sizeof(GTM_TransactionHandle));
gxid[ii] = pq_getmsgint(message, sizeof(GlobalTransactionId));
txn_isolation_level[ii] = pq_getmsgint(message, sizeof(GTM_IsolationLevel));
txn_read_only[ii] = pq_getmsgbyte(message);
txn_connid[ii] = pq_getmsgint(message, sizeof(GTMProxy_ConnID));
}
- GTM_BkupBeginTransactionGetGXIDMulti(txn, gxid, txn_isolation_level,
+ GTM_BkupBeginTransactionGetGXIDMulti(gxid, txn_isolation_level,
txn_read_only, txn_client_id, txn_connid, txn_count);
}
oldContext = MemoryContextSwitchTo(TopMemoryContext);
- elog(DEBUG1, "Committing transaction id %u", gxid);
-
/*
* Commit the transaction
*/
(EINVAL,
errmsg("Failed to get the information of prepared transaction")));
+
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ int _rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
+ GTM_Timestamp timestamp;
+
+ elog(DEBUG1, "calling bkup_begin_transaction_gxid() for auxiliary transaction for standby GTM %p.",
+ GetMyThreadInfo->thr_conn->standby);
+
+retry:
+ /*
+ * The main XID was already backed up on the standby when it was
+ * started. Now also backup the new GXID we obtained above for running
+ * COMMIT/ROLLBACK PREPARED statements. This is necessary because GTM
+ * will later receive a COMMIT/ABORT message for this XID and the
+ * standby must be prepared to handle those messages as well
+ *
+ * Note: We use the same routine used to backup a new transaction
+ * instead of writing a routine specific to MSG_TXN_GET_GID_DATA
+ * message
+ */
+ _rc = bkup_begin_transaction_gxid(GetMyThreadInfo->thr_conn->standby,
+ gxid,
+ txn_isolation_level,
+ false,
+ -1,
+ timestamp);
+
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
+
+ }
+
/*
* Send a SUCCESS message back to the client
*/
/* No backup to the standby because this does not change internal status */
if (myport->remote_type != GTM_NODE_GTM_PROXY)
pq_flush(myport);
-
- /* I don't think the following backup is needed. K.Suzuki, 27th, Dec., 2011 */
-#if 0
- if (GetMyThreadInfo->thr_conn->standby)
- {
- int _rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
-
- elog(DEBUG1, "calling get_gid_data() for standby GTM %p.",
- GetMyThreadInfo->thr_conn->standby);
-
-retry:
- _rc = get_gid_data(GetMyThreadInfo->thr_conn->standby,
- txn_isolation_level,
- gid,
- &gxid,
- &prepared_gxid,
- &nodestring);
-
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
-
- elog(DEBUG1, "get_gid_data() rc=%d done.", _rc);
- }
-#endif
-
return;
}
-
/*
* Process MSG_TXN_GXID_LIST
*/