@@ -94,6 +94,9 @@ bool UseCitusDBSelectLogic = false;
94
94
/* informs pg_shard to use the distributed transaction manager */
95
95
bool UseDtmTransactions = false;
96
96
97
+ /* Use two phase commit for update transactions */
98
+ bool DtmTwoPhaseCommit = false;
99
+
97
100
/* logs each statement used in a distributed plan */
98
101
bool LogDistributedStatements = false;
99
102
@@ -178,7 +181,7 @@ static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
178
181
/* XTM stuff */
179
182
static List * connectionsWithDtmTransactions = NIL ;
180
183
static csn_t currentGlobalTransactionId = 0 ;
181
- static int currentLocalTransactionId = 0 ;
184
+ static int currentLocalTransactionId = 0 ;
182
185
static bool commitCallbackSet = false;
183
186
184
187
#define TRACE (fmt , ...) fprintf(stderr, fmt, ## __VA_ARGS__)
@@ -642,7 +645,7 @@ ErrorIfQueryNotSupported(Query *queryTree)
642
645
/*
643
646
* if (!IsA(targetEntry->expr, Const))
644
647
* {
645
- * hasNonConstTargetEntryExprs = true;
648
+ * hasNonConstTargetEntryExprs = true;
646
649
* }
647
650
*/
648
651
@@ -656,7 +659,7 @@ ErrorIfQueryNotSupported(Query *queryTree)
656
659
* joinTree = queryTree->jointree;
657
660
* if (joinTree != NULL && contain_mutable_functions(joinTree->quals))
658
661
* {
659
- * hasNonConstQualExprs = true;
662
+ * hasNonConstQualExprs = true;
660
663
* }
661
664
*/
662
665
}
@@ -669,9 +672,9 @@ ErrorIfQueryNotSupported(Query *queryTree)
669
672
/*
670
673
* if (hasNonConstTargetEntryExprs || hasNonConstQualExprs)
671
674
* {
672
- * ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
673
- * errmsg("cannot plan sharded modification containing values "
674
- * "which are not constants or constant expressions")));
675
+ * ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
676
+ * errmsg("cannot plan sharded modification containing values "
677
+ * "which are not constants or constant expressions")));
675
678
* }
676
679
*
677
680
*/
@@ -1280,8 +1283,8 @@ PgShardExecutorStart(QueryDesc *queryDesc, int eflags)
1280
1283
/*
1281
1284
* if (!UseDtmTransactions)
1282
1285
* {
1283
- * PreventTransactionChain(topLevel, "distributed commands");
1284
- * eflags |= EXEC_FLAG_SKIP_TRIGGERS;
1286
+ * PreventTransactionChain(topLevel, "distributed commands");
1287
+ * eflags |= EXEC_FLAG_SKIP_TRIGGERS;
1285
1288
* }
1286
1289
*
1287
1290
*/
@@ -1492,6 +1495,8 @@ ExecuteMultipleShardSelect(DistributedPlan *distributedPlan,
1492
1495
1493
1496
ListCell * taskCell = NULL ;
1494
1497
1498
+ DtmTwoPhaseCommit = IsTransactionBlock ();
1499
+
1495
1500
foreach (taskCell , taskList )
1496
1501
{
1497
1502
Task * task = (Task * ) lfirst (taskCell );
@@ -1519,6 +1524,7 @@ ExecuteMultipleShardSelect(DistributedPlan *distributedPlan,
1519
1524
1520
1525
tuplestore_end (tupleStore );
1521
1526
}
1527
+
1522
1528
}
1523
1529
1524
1530
@@ -1893,6 +1899,7 @@ ExecuteDistributedModify(DistributedPlan *plan)
1893
1899
1894
1900
if (UseDtmTransactions )
1895
1901
{
1902
+ DtmTwoPhaseCommit = true;
1896
1903
PrepareDtmTransaction (task );
1897
1904
}
1898
1905
@@ -2000,7 +2007,12 @@ PrepareDtmTransaction(Task *task)
2000
2007
/* already started a transaction */
2001
2008
continue ;
2002
2009
}
2003
-
2010
+ if (!SendCommand (connection , "BEGIN" ))
2011
+ {
2012
+ PurgeConnection (connection );
2013
+ abortTransaction = true;
2014
+ continue ;
2015
+ }
2004
2016
if (!currentGlobalTransactionId )
2005
2017
{
2006
2018
/* Send dtm_begin_transaction to the first node */
@@ -2013,7 +2025,7 @@ PrepareDtmTransaction(Task *task)
2013
2025
continue ;
2014
2026
}
2015
2027
TRACE ("shard_xtm: conn#%p: Sent dtm_begin() to %s:%u -> %llu\n" ,
2016
- connection , nodeName , nodePort , currentGlobalTransactionId );
2028
+ connection , nodeName , nodePort , currentGlobalTransactionId );
2017
2029
}
2018
2030
else
2019
2031
{
@@ -2024,19 +2036,19 @@ PrepareDtmTransaction(Task *task)
2024
2036
continue ;
2025
2037
}
2026
2038
TRACE ("shard_xtm: conn#%p: Sent dtm_access(%llu) to %s:%u\n" ,
2027
- connection , currentGlobalTransactionId , nodeName , nodePort );
2039
+ connection , currentGlobalTransactionId , nodeName , nodePort );
2028
2040
}
2029
2041
2030
2042
newTransactions = lappend (newTransactions , connection );
2031
2043
}
2032
2044
2033
2045
if (abortTransaction )
2034
2046
{
2035
- /* make sure we abort all pending transactions */
2036
- connectionsWithDtmTransactions = newTransactions ;
2037
-
2038
2047
MemoryContextSwitchTo (oldContext );
2039
2048
2049
+ /* make sure we abort all pending transactions */
2050
+ connectionsWithDtmTransactions = newTransactions ;
2051
+
2040
2052
/*
2041
2053
* Since pg_shard reuses connections across transactions on the master,
2042
2054
* we need to abort pending transactions on the workers.
@@ -2045,40 +2057,8 @@ PrepareDtmTransaction(Task *task)
2045
2057
ereport (ERROR , (errmsg ("aborting distributed transaction due to failures" )));
2046
2058
}
2047
2059
2048
- foreach (taskPlacementCell , task -> taskPlacementList )
2049
- {
2050
- ShardPlacement * taskPlacement = (ShardPlacement * ) lfirst (taskPlacementCell );
2051
- char * nodeName = taskPlacement -> nodeName ;
2052
- int32 nodePort = taskPlacement -> nodePort ;
2053
- PGconn * connection = NULL ;
2054
-
2055
- connection = GetConnection (nodeName , nodePort , false);
2056
- if (connection == NULL )
2057
- {
2058
- ereport (WARNING , (errmsg ("failed to connect to %s:%d" ,
2059
- nodeName , nodePort )));
2060
- abortTransaction = true;
2061
- continue ;
2062
- }
2063
-
2064
- if (list_member_ptr (connectionsWithDtmTransactions , connection ))
2065
- {
2066
- /* already started a transaction */
2067
- continue ;
2068
- }
2069
-
2070
- if (!SendCommand (connection , "BEGIN" ))
2071
- {
2072
- PurgeConnection (connection );
2073
- abortTransaction = true;
2074
- continue ;
2075
- }
2076
- TRACE ("shard_xtm: conn#%p: Sent BEGIN to %s:%u\n" , connection , nodeName , nodePort );
2077
-
2078
- }
2079
-
2080
2060
connectionsWithDtmTransactions = list_union (connectionsWithDtmTransactions ,
2081
- newTransactions );
2061
+ newTransactions );
2082
2062
2083
2063
MemoryContextSwitchTo (oldContext );
2084
2064
@@ -2107,8 +2087,10 @@ SendDtmBeginTransaction(PGconn *connection)
2107
2087
char * resp = NULL ;
2108
2088
csn_t remoteTransactionId ;
2109
2089
2110
-
2111
- result = PQexec (connection , psprintf ("SELECT dtm_extend('%d.%d')" , MyProcPid , ++ currentLocalTransactionId ));
2090
+
2091
+ result = DtmTwoPhaseCommit
2092
+ ? PQexec (connection , psprintf ("SELECT dtm_extend('%d.%d')" , MyProcPid , ++ currentLocalTransactionId ))
2093
+ : PQexec (connection , "SELECT dtm_extend()" );
2112
2094
if (PQresultStatus (result ) != PGRES_TUPLES_OK )
2113
2095
{
2114
2096
ReportRemoteError (connection , result );
@@ -2135,7 +2117,9 @@ SendDtmJoinTransaction(PGconn *connection, csn_t TransactionId)
2135
2117
PGresult * result = NULL ;
2136
2118
bool resultOK = true;
2137
2119
2138
- result = PQexec (connection , psprintf ("SELECT dtm_access(%llu, '%d.%d')" , TransactionId , MyProcPid , currentLocalTransactionId ));
2120
+ result = PQexec (connection , DtmTwoPhaseCommit
2121
+ ? psprintf ("SELECT dtm_access(%llu, '%d.%d')" , TransactionId , MyProcPid , currentLocalTransactionId )
2122
+ : psprintf ("SELECT dtm_access(%llu)" , TransactionId ));
2139
2123
if (PQresultStatus (result ) != PGRES_TUPLES_OK )
2140
2124
{
2141
2125
ReportRemoteError (connection , result );
@@ -2170,23 +2154,28 @@ typedef bool (*DtmCommandResultHandler)(PGresult *result, void* arg);
2170
2154
2171
2155
static bool RunDtmStatement (char const * sql , unsigned expectedStatus , DtmCommandResultHandler handler , void * arg )
2172
2156
{
2173
- ListCell * connectionCell = NULL ;
2174
2157
int querySent = 0 ;
2175
2158
PGresult * result = NULL ;
2176
2159
PGconn * connection = NULL ;
2177
2160
bool allOk = true;
2161
+ ListCell * connectionCell = NULL ;
2162
+ ListCell * nextCell = list_head (connectionsWithDtmTransactions );
2163
+ ListCell * prevCell = NULL ;
2178
2164
2179
- foreach ( connectionCell , connectionsWithDtmTransactions )
2165
+ while (( connectionCell = nextCell ) != NULL )
2180
2166
{
2167
+ nextCell = lnext (connectionCell );
2181
2168
connection = (PGconn * ) lfirst (connectionCell );
2182
2169
querySent = PQsendQuery (connection , sql );
2183
2170
if (!querySent )
2184
2171
{
2185
2172
ReportRemoteError (connection , NULL );
2173
+ list_delete_cell (connectionsWithDtmTransactions , connectionCell , prevCell );
2186
2174
PurgeConnection (connection );
2187
2175
allOk = false;
2188
2176
continue ;
2189
2177
}
2178
+ prevCell = connectionCell ;
2190
2179
TRACE ("shard_xtm: conn#%p: Sent %s to %s:%s\n" , connection , sql , PQhost (connection ), PQport (connection ));
2191
2180
}
2192
2181
foreach (connectionCell , connectionsWithDtmTransactions )
@@ -2199,7 +2188,7 @@ static bool RunDtmStatement(char const* sql, unsigned expectedStatus, DtmCommand
2199
2188
allOk = false;
2200
2189
}
2201
2190
PQclear (result );
2202
- PQresultStatus ( result ); /* consume NULL result */
2191
+ PQgetResult ( connection ); /* consume NULL result */
2203
2192
}
2204
2193
return allOk ;
2205
2194
}
@@ -2234,37 +2223,46 @@ static bool DtmMaxCSN(PGresult *result, void* arg)
2234
2223
2235
2224
2236
2225
static void
2237
- FinishDtmTransaction (XactEvent event , void * arg )
2226
+ FinishDtmTransaction (XactEvent event , void * arg )
2238
2227
{
2239
- if (!(event == XACT_EVENT_COMMIT || event == XACT_EVENT_ABORT )
2240
- || !connectionsWithDtmTransactions )
2241
- {
2242
- return ;
2243
- }
2244
- if (event == XACT_EVENT_COMMIT )
2245
- {
2246
- csn_t maxCSN = 0 ;
2247
-
2248
- if (!RunDtmCommand (psprintf ("PREPARE TRANSACTION '%d.%d'" , MyProcPid , currentLocalTransactionId )) ||
2249
- !RunDtmFunction (psprintf ("SELECT dtm_begin_prepare('%d.%d')" , MyProcPid , currentLocalTransactionId )) ||
2250
- !RunDtmStatement (psprintf ("SELECT dtm_prepare('%d.%d',0)" , MyProcPid , currentLocalTransactionId ), PGRES_TUPLES_OK , DtmMaxCSN , & maxCSN ) ||
2251
- !RunDtmFunction (psprintf ("SELECT dtm_end_prepare('%d.%d',%lld)" , MyProcPid , currentLocalTransactionId , maxCSN )) ||
2252
- !RunDtmCommand (psprintf ("COMMIT PREPARED '%d.%d'" , MyProcPid , currentLocalTransactionId )))
2253
- {
2254
- RunDtmCommand (psprintf ("ROLLBACK PREPARED '%d.%d'" , MyProcPid , currentLocalTransactionId ));
2228
+ if ((event == XACT_EVENT_COMMIT || event == XACT_EVENT_ABORT ) && connectionsWithDtmTransactions )
2229
+ {
2230
+ if (DtmTwoPhaseCommit )
2231
+ {
2232
+ if (event == XACT_EVENT_COMMIT )
2233
+ {
2234
+ csn_t maxCSN = 0 ;
2235
+
2236
+ if (!RunDtmCommand (psprintf ("PREPARE TRANSACTION '%d.%d'" ,
2237
+ MyProcPid , currentLocalTransactionId )) ||
2238
+ !RunDtmFunction (psprintf ("SELECT dtm_begin_prepare('%d.%d')" ,
2239
+ MyProcPid , currentLocalTransactionId )) ||
2240
+ !RunDtmStatement (psprintf ("SELECT dtm_prepare('%d.%d',0)" ,
2241
+ MyProcPid , currentLocalTransactionId ), PGRES_TUPLES_OK , DtmMaxCSN , & maxCSN ) ||
2242
+ !RunDtmFunction (psprintf ("SELECT dtm_end_prepare('%d.%d',%lld)" ,
2243
+ MyProcPid , currentLocalTransactionId , maxCSN )) ||
2244
+ !RunDtmCommand (psprintf ("COMMIT PREPARED '%d.%d'" ,
2245
+ MyProcPid , currentLocalTransactionId )))
2246
+ {
2247
+ RunDtmCommand (psprintf ("ROLLBACK PREPARED '%d.%d'" ,
2248
+ MyProcPid , currentLocalTransactionId ));
2249
+ }
2250
+ } else {
2251
+ RunDtmCommand ("ROLLBACK" );
2252
+ }
2253
+ } else {
2254
+ RunDtmCommand ("COMMIT" );
2255
2255
}
2256
- } else {
2257
- RunDtmCommand ("ROLLBACK" );
2256
+ /*
2257
+ * Calling unregister inside callback itself leads to segfault when
2258
+ * there are several callbacks on the same event.
2259
+ */
2260
+ /*
2261
+ * UnregisterXactCallback(FinishDtmTransaction, NULL);
2262
+ */
2263
+ connectionsWithDtmTransactions = NIL ;
2264
+ currentGlobalTransactionId = 0 ;
2258
2265
}
2259
- /*
2260
- * Calling unregister inside callback itself leads to segfault when
2261
- * there are several callbacks on the same event.
2262
- */
2263
- /*
2264
- * UnregisterXactCallback(FinishDtmTransaction, NULL);
2265
- */
2266
- connectionsWithDtmTransactions = NIL ;
2267
- currentGlobalTransactionId = 0 ;
2268
2266
}
2269
2267
2270
2268
0 commit comments