step->combine_type = COMBINE_TYPE_NONE;
step->exec_nodes = NULL;
step->sql_statement = query;
- step->force_autocommit = false;
step->read_only = true;
step->exec_type = EXEC_ON_COORDS;
#define implicit2PC_head "_$XC$"
#endif
+#define XACT_REMOTE_TRANSACTION_AUTOCOMMIT 0x01
+#define XACT_REMOTE_TRANSACTION_BLOCK 0x02
/*
* User-tweakable parameters
int parallelModeLevel; /* Enter/ExitParallelMode counter */
struct TransactionStateData *parent; /* back link to parent */
#ifdef XCP
+ /*
+ * flags to track whether to run the remote transaction in a transaction
+ * block or in autocommit mode.
+ */
+ int remoteTransactionBlockFlags;
int waitedForXidsCount; /* count of xids we waited to finish */
TransactionId *waitedForXids; /* xids we waited to finish */
#endif
}
s->waitedForXids = NULL;
s->waitedForXidsCount = 0;
+ s->remoteTransactionBlockFlags = 0;
SetNextTransactionId(InvalidTransactionId);
}
}
s->waitedForXids = NULL;
s->waitedForXidsCount = 0;
+ s->remoteTransactionBlockFlags = 0;
#endif
SetNextTransactionId(InvalidTransactionId);
}
s->waitedForXids = NULL;
s->waitedForXidsCount = 0;
+ s->remoteTransactionBlockFlags = 0;
#endif
/*
* making callers do it.)
* stmtType: statement type name, for error messages.
*/
-void
-PreventTransactionChain(bool isTopLevel, const char *stmtType)
+static void
+PreventTransactionChainInternal(bool isTopLevel, const char *stmtType, bool remote)
{
/*
* xact block already started?
CurrentTransactionState->blockState != TBLOCK_STARTED)
elog(FATAL, "cannot prevent transaction chain");
/* all okay */
+
+ if (remote)
+ SetRequireRemoteTransactionAutoCommit();
+}
+
+void
+PreventTransactionChain(bool isTopLevel, const char *stmtType)
+{
+ PreventTransactionChainInternal(isTopLevel, stmtType, true);
+}
+
+void
+PreventTransactionChainLocal(bool isTopLevel, const char *stmtType)
+{
+ PreventTransactionChainInternal(isTopLevel, stmtType, false);
}
/*
*/
if (IS_PGXC_LOCAL_COORDINATOR)
SetSendCommandId(true);
+
+ SetRequireRemoteTransactionBlock();
#endif
}
pq_putmessage('x', (const char *) &xid, sizeof (GlobalTransactionId));
}
}
+
+void
+SetRequireRemoteTransactionBlock(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ if (s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_AUTOCOMMIT)
+ elog(ERROR, "Can't run a query marked for autocommit in a transaction block");
+ s->remoteTransactionBlockFlags |= XACT_REMOTE_TRANSACTION_BLOCK;
+}
+
+bool
+IsRemoteTransactionBlockRequired(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ return s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_BLOCK;
+
+}
+
+void
+SetRequireRemoteTransactionAutoCommit(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ if (s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_BLOCK)
+ elog(ERROR, "Can't run a query marked for a transaction block in autocommit mode");
+ s->remoteTransactionBlockFlags |= XACT_REMOTE_TRANSACTION_AUTOCOMMIT;
+}
+
+bool
+IsRemoteTransactionAutoCommit(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ return s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_AUTOCOMMIT;
+
+}
#endif
#endif
step->combine_type = COMBINE_TYPE_NONE;
step->exec_nodes = NULL;
step->sql_statement = query.data;
- step->force_autocommit = true;
step->exec_type = EXEC_ON_DATANODES;
/* Add targetlist entries */
/* ... but we allow it on remote nodes */
if (IS_PGXC_LOCAL_COORDINATOR)
#endif
- PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
+ PreventTransactionChainLocal(isTopLevel, "ALTER DATABASE SET TABLESPACE");
movedb(stmt->dbname, defGetString(dtablespace));
return InvalidOid;
step->exec_nodes->nodeList =
list_make1_int(linitial_int(plan->exec_nodes->nodeList));
- step->force_autocommit = true;
step->exec_type = EXEC_ON_DATANODES;
dummy = makeVar(1, 1, TEXTOID, -1, InvalidOid, 0);
step->combine_type = COMBINE_TYPE_NONE;
step->exec_nodes = NULL;
step->sql_statement = query.data;
- step->force_autocommit = true;
step->exec_type = EXEC_ON_DATANODES;
/* Add targetlist entries */
COPY_SCALAR_FIELD(combine_type);
COPY_NODE_FIELD(sort);
COPY_SCALAR_FIELD(read_only);
- COPY_SCALAR_FIELD(force_autocommit);
COPY_STRING_FIELD(statement);
COPY_STRING_FIELD(cursor);
COPY_SCALAR_FIELD(rq_num_params);
WRITE_NODE_FIELD(exec_nodes);
WRITE_ENUM_FIELD(combine_type, CombineType);
WRITE_BOOL_FIELD(read_only);
- WRITE_BOOL_FIELD(force_autocommit);
WRITE_STRING_FIELD(statement);
WRITE_STRING_FIELD(cursor);
WRITE_INT_FIELD(rq_num_params);
step->combine_type = COMBINE_TYPE_NONE;
step->sort = NULL;
step->read_only = true;
- step->force_autocommit = false;
step->cursor = NULL;
/* This is needed by executor */
step->join_condition = NULL;
/* Change the list of nodes that will be executed for the query and others */
- step->force_autocommit = false;
step->combine_type = COMBINE_TYPE_SAME;
step->read_only = true;
step->exec_direct_type = EXEC_DIRECT_NONE;
step->combine_type = COMBINE_TYPE_SAME;
step->exec_nodes = exec_nodes;
step->sql_statement = pstrdup(sql);
- step->force_autocommit = false;
/* Redistribution operations only concern Datanodes */
step->exec_type = EXEC_ON_DATANODES;
#include "postgres.h"
#include "miscadmin.h"
#include "access/transam.h"
+#include "access/xact.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_class.h"
#include "catalog/pg_inherits_fn.h"
/* we need Coordinator for evaluation, invoke standard planner */
result = standard_planner(query, cursorOptions, boundParams);
+
+ /*
+ * For coordinator side execution, we must always force a transaction block
+ * on the remote side. This ensures that all queries resulting from the
+ * coordinator side execution are run within a block. For example, this
+ * could be a user-defined function, which internally runs several queries,
+ * where each query is separately checked for fast-query-shipping. We must
+ * run all these queries inside a block.
+ */
+ SetRequireRemoteTransactionBlock();
return result;
}
if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
BufferConnection(connections[i]);
+ elog(DEBUG2, "Sending gxid %u to remote node %s, need_tran_block %d",
+ gxid, connections[i]->nodename, need_tran_block);
+
/* Send GXID and check for errors */
if (GlobalTransactionIdIsValid(gxid) && pgxc_node_send_gxid(connections[i], gxid))
return EOF;
else if (IS_PGXC_REMOTE_COORDINATOR)
need_tran_block = false;
- elog(DEBUG5, "need_tran_block %d, connections[%d]->transaction_status %c",
+ elog(DEBUG2, "need_tran_block %d, connections[%d]->transaction_status %c",
need_tran_block, i, connections[i]->transaction_status);
/* Send BEGIN if not already in transaction */
if (need_tran_block && connections[i]->transaction_status == 'I')
* If more than one nodes are involved or if we are already in a
* transaction block, we must the remote statements in a transaction block
*/
- need_tran_block = (conn_count > 1) || (TransactionBlockStatusCode() == 'T');
+ need_tran_block = (conn_count > 1) || IsRemoteTransactionBlockRequired();
elog(DEBUG1, "conn_count = %d, need_tran_block = %s", conn_count,
need_tran_block ? "true" : "false");
{
RemoteQueryState *remotestate;
ResponseCombiner *combiner;
- bool force_autocommit = node->force_autocommit;
+ bool force_autocommit = IsRemoteTransactionAutoCommit();
RemoteQueryExecType exec_type = node->exec_type;
GlobalTransactionId gxid = InvalidGlobalTransactionId;
Snapshot snapshot = NULL;
* Start transaction on data nodes if we are in explicit transaction
* or going to use extended query protocol or write to multiple nodes
*/
- if (step->force_autocommit)
+ elog(DEBUG2, "cursor %s, read_only %d,"
+ " total_conn_count %d, transaction block status %c",
+ step->cursor, step->read_only,
+ total_conn_count, TransactionBlockStatusCode());
+
+ if (IsRemoteTransactionAutoCommit())
need_tran_block = false;
else
need_tran_block = step->cursor ||
(!step->read_only && total_conn_count > 1) ||
- (TransactionBlockStatusCode() == 'T');
+ IsRemoteTransactionBlockRequired();
stat_statement();
stat_transaction(total_conn_count);
#include "pgxc/poolutils.h"
#include "pgxc/pgxcnode.h"
#include "access/gtm.h"
+#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pgxc_node.h"
#include "commands/dbcommands.h"
Datum
pgxc_pool_reload(PG_FUNCTION_ARGS)
{
- if (IsTransactionBlock())
+ if (TransactionIdIsValid(GetTopTransactionIdIfAny()))
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
- errmsg("pgxc_pool_reload cannot run inside a transaction block")));
+ errmsg("pgxc_pool_reload cannot run with a transaction ID assigned")));
/*
* Always check if we can get away with a LESS destructive refresh
*/
querytree_list = pg_rewrite_query(query);
+ /*
+ * If we rewrote the query into more than one queries, then we must
+ * enforce a transaction block while running remote queries.
+ */
+ if (list_length(querytree_list) > 1)
+ SetRequireRemoteTransactionBlock();
+
TRACE_POSTGRESQL_QUERY_REWRITE_DONE(query_string);
return querytree_list;
*/
if (IS_PGXC_DATANODE && IsPostmasterEnvironment)
{
- if (IsA(parsetree->stmt, VacuumStmt) || IsA(parsetree->stmt, ClusterStmt))
- SetForceXidFromGTM(true);
- else if (IsA(parsetree->stmt, ReindexStmt))
+ if (IsA(parsetree, VacuumStmt))
+ {
+ VacuumStmt *stmt = (VacuumStmt *) parsetree;
+ if (stmt->options & VACOPT_VACUUM)
+ SetForceXidFromGTM(true);
+ }
+ else if (IsA(parsetree, ClusterStmt))
+ {
+ ClusterStmt *stmt = (ClusterStmt *) parsetree;
+ if (stmt->relation == NULL)
+ SetForceXidFromGTM(true);
+ }
+ else if (IsA(parsetree, ReindexStmt))
{
ReindexStmt *stmt = (ReindexStmt *) parsetree->stmt;
if (stmt->kind == REINDEX_OBJECT_SCHEMA ||
static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
bool sentToRemote,
- bool force_autocommit,
RemoteQueryExecType exec_type,
bool is_temp,
bool add_context);
static void ExecUtilityStmtOnNodesInternal(const char *queryString,
ExecNodes *nodes,
bool sentToRemote,
- bool force_autocommit,
RemoteQueryExecType exec_type,
bool is_temp);
static RemoteQueryExecType ExecUtilityFindNodes(ObjectType objectType,
case T_CreateTableSpaceStmt:
/* no event triggers for global objects */
if (IS_PGXC_LOCAL_COORDINATOR)
- PreventTransactionChain(isTopLevel, "CREATE TABLESPACE");
+ PreventTransactionChainLocal(isTopLevel, "CREATE TABLESPACE");
CreateTableSpace((CreateTableSpaceStmt *) parsetree);
break;
/* no event triggers for global objects */
/* Allow this to be run inside transaction block on remote nodes */
if (IS_PGXC_LOCAL_COORDINATOR)
- PreventTransactionChain(isTopLevel, "DROP TABLESPACE");
+ PreventTransactionChainLocal(isTopLevel, "DROP TABLESPACE");
DropTableSpace((DropTableSpaceStmt *) parsetree);
break;
case T_CreatedbStmt:
/* no event triggers for global objects */
if (IS_PGXC_LOCAL_COORDINATOR)
- PreventTransactionChain(isTopLevel, "CREATE DATABASE");
+ PreventTransactionChainLocal(isTopLevel, "CREATE DATABASE");
createdb(pstate, (CreatedbStmt *) parsetree);
break;
/* no event triggers for global objects */
if (IS_PGXC_LOCAL_COORDINATOR)
- PreventTransactionChain(isTopLevel, "DROP DATABASE");
+ PreventTransactionChainLocal(isTopLevel, "DROP DATABASE");
dropdb(stmt->dbname, stmt->missing_ok);
}
case T_ClusterStmt:
/* we choose to allow this during "read only" transactions */
PreventCommandDuringRecovery("CLUSTER");
+ if (((ClusterStmt *) parsetree)->relation == NULL)
+ PreventTransactionChain(isTopLevel, "CLUSTER");
/* forbidden in parallel mode due to CommandIsReadOnly */
cluster((ClusterStmt *) parsetree, isTopLevel);
break;
/* we choose to allow this during "read only" transactions */
PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ?
"VACUUM" : "ANALYZE");
+ /*
+ * We have to run the command on nodes before Coordinator because
+ * vacuum() pops active snapshot and we can not send it to nodes
+ */
+ if (IS_PGXC_LOCAL_COORDINATOR &&
+ !(stmt->options & VACOPT_COORDINATOR))
+ {
+ if (stmt->options & VACOPT_VACUUM)
+ SetRequireRemoteTransactionAutoCommit();
+ }
/* forbidden in parallel mode due to CommandIsReadOnly */
ExecVacuum(stmt, isTopLevel);
}
case T_ReindexStmt:
{
ReindexStmt *stmt = (ReindexStmt *) parsetree;
+ bool prevent_xact_chain = false;
/* we choose to allow this during "read only" transactions */
PreventCommandDuringRecovery("REINDEX");
(stmt->kind == REINDEX_OBJECT_SYSTEM) ? "REINDEX SYSTEM" :
"REINDEX DATABASE");
ReindexMultipleTables(stmt->name, stmt->kind, stmt->options);
+ prevent_xact_chain = true;
break;
default:
elog(ERROR, "unrecognized object type: %d",
(int) stmt->kind);
break;
}
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ {
+ if (prevent_xact_chain)
+ SetRequireRemoteTransactionAutoCommit();
+ }
}
break;
completionTag);
else
ExecRenameStmt(stmt);
-
}
break;
completionTag);
else
CommentObject(stmt);
- break;
}
break;
ExecDropStmt(DropStmt *stmt, bool isTopLevel)
#endif
{
+ bool prevent_xact_chain = false;
+
switch (stmt->removeType)
{
case OBJECT_INDEX:
if (stmt->concurrent)
+ {
PreventTransactionChain(isTopLevel,
"DROP INDEX CONCURRENTLY");
+ prevent_xact_chain = true;
+ }
/* fall through */
case OBJECT_TABLE:
#ifdef PGXC
/* DROP is done depending on the object type and its temporary type */
if (IS_PGXC_LOCAL_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false,
+ {
+ if (prevent_xact_chain)
+ SetRequireRemoteTransactionAutoCommit();
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
exec_type, is_temp, false);
+ }
}
#endif
break;
RemoveObjects(stmt);
#ifdef PGXC
if (IS_PGXC_LOCAL_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false,
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
exec_type, is_temp, false);
}
#endif
#ifdef PGXC
static void
ExecUtilityStmtOnNodesInternal(const char *queryString, ExecNodes *nodes, bool sentToRemote,
- bool force_autocommit, RemoteQueryExecType exec_type, bool is_temp)
+ RemoteQueryExecType exec_type, bool is_temp)
{
/* Return if query is launched on no nodes */
if (exec_type == EXEC_ON_NONE)
step->combine_type = COMBINE_TYPE_SAME;
step->exec_nodes = nodes;
step->sql_statement = pstrdup(queryString);
- step->force_autocommit = force_autocommit;
step->exec_type = exec_type;
ExecRemoteUtility(step);
pfree(step->sql_statement);
}
plan->sql_statement = query;
- plan->force_autocommit = false;
/*
* We only need the target entry to determine result data type.
* So create dummy even if real expression is a function.
step->exec_nodes = NULL;
step->sql_statement = poolcmd.data;
/* force_autocommit is actually does not start transaction on nodes */
- step->force_autocommit = true;
step->exec_type = EXEC_ON_CURRENT;
ExecRemoteUtility(step);
pfree(step);
extern char TransactionBlockStatusCode(void);
extern void AbortOutOfAnyTransaction(void);
extern void PreventTransactionChain(bool isTopLevel, const char *stmtType);
+extern void PreventTransactionChainLocal(bool isTopLevel, const char *stmtType);
extern void RequireTransactionChain(bool isTopLevel, const char *stmtType);
extern void WarnNoTransactionChain(bool isTopLevel, const char *stmtType);
extern bool IsInTransactionChain(bool isTopLevel);
extern bool IsPGXCNodeXactReadOnly(void);
extern bool IsPGXCNodeXactDatanodeDirect(void);
extern void TransactionRecordXidWait(TransactionId xid);
+extern void SetRequireRemoteTransactionBlock(void);
+extern bool IsRemoteTransactionBlockRequired(void);
+extern void SetRequireRemoteTransactionAutoCommit(void);
+extern bool IsRemoteTransactionAutoCommit(void);
#endif
extern int xactGetCommittedChildren(TransactionId **ptr);
CombineType combine_type;
SimpleSort *sort;
bool read_only; /* do not use 2PC when committing read only steps */
- bool force_autocommit; /* some commands like VACUUM require autocommit mode */
char *statement; /* if specified use it as a PreparedStatement name on Datanodes */
char *cursor; /* if specified use it as a Portal name on Datanodes */
int rq_num_params; /* number of parameters present in
END; $$ LANGUAGE plpgsql;
SELECT * FROM list_partitioned_table() AS t;
+
+-- ensure that all statements in a function are correctly executed in a
+-- transaction block.
+create table plp_mt_tab(a int, b int);
+create function plpgsql_multistmt() returns void as $$
+begin
+ insert into plp_mt_tab(a) values (1);
+ insert into plp_mt_tab(a) values (2);
+ insert into plp_mt_tab(a) values (3/0);
+end
+$$ language plpgsql;
+
+select plpgsql_multistmt();
+select * from plp_mt_tab;
+
+create or replace function plpgsql_multistmt() returns void as $$
+begin
+ insert into plp_mt_tab(a) values (3);
+ update plp_mt_tab set b = 1 where (a / 0) = 0;
+end
+$$ language plpgsql;
+
+select plpgsql_multistmt();
+select * from plp_mt_tab;