#include "pgxc/pause.h"
#endif
-static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes, bool sentToRemote,
- bool force_autocommit, RemoteQueryExecType exec_type,
+static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
+ bool sentToRemote,
+ bool force_autocommit,
+ RemoteQueryExecType exec_type,
+ bool is_temp,
+ bool add_context);
+static void ExecUtilityStmtOnNodesInternal(const char *queryString,
+ ExecNodes *nodes,
+ bool sentToRemote,
+ bool force_autocommit,
+ RemoteQueryExecType exec_type,
bool is_temp);
static RemoteQueryExecType ExecUtilityFindNodes(ObjectType objectType,
Oid relid,
static void DropStmtPreTreatment(DropStmt *stmt, const char *queryString, bool sentToRemote,
bool *is_temp, RemoteQueryExecType *exec_type);
static bool IsStmtAllowedInLockedMode(Node *parsetree, const char *queryString);
-static void ExecUtilityWithMessage(const char *queryString, bool sentToRemote, bool is_temp);
#endif
/* Hook for plugins to get control in ProcessUtility() */
completionTag);
}
+/*
+ * Do the necessary processing before executing the utility command locally on
+ * the coordinator.
+ */
static bool
ProcessUtilityPre(PlannedStmt *pstmt,
const char *queryString,
{
Node *parsetree = pstmt->utilityStmt;
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
+ bool all_done = false;
+ bool is_temp = false;
+ bool auto_commit = false;
+ bool add_context = false;
+ RemoteQueryExecType exec_type = EXEC_ON_NONE;
+
+ /*
+ * auto_commit and is_temp is initialised to false and changed if required.
+ *
+ * exec_type is initialised to EXEC_ON_NONE and updated iff the command
+ * needs remote execution during the preprocessing step.
+ */
switch (nodeTag(parsetree))
{
if (IS_PGXC_LOCAL_COORDINATOR)
{
if (!FinishRemotePreparedTransaction(stmt->gid, true) && !xc_maintenance_mode)
- return false; /* No need to commit locally */
+ all_done = true; /* No need to commit locally */
}
break;
if (IS_PGXC_LOCAL_COORDINATOR)
{
if (!FinishRemotePreparedTransaction(stmt->gid, false) && !xc_maintenance_mode)
- return false;
+ all_done = true;
}
break;
/* Clean also remote Coordinators */
sprintf(query, "CLEAN CONNECTION TO ALL FOR DATABASE %s;",
quote_identifier(stmt->dbname));
- ExecUtilityStmtOnNodes(query, NULL, sentToRemote, true, EXEC_ON_ALL_NODES, false);
+ ExecUtilityStmtOnNodes(query, NULL, sentToRemote, true,
+ EXEC_ON_ALL_NODES, false, false);
}
break;
* We have to run the command on nodes before Coordinator because
* vacuum() pops active snapshot and we can not send it to nodes
*/
- if (IS_PGXC_LOCAL_COORDINATOR &&
- !(stmt->options & VACOPT_COORDINATOR))
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
- true,
- EXEC_ON_DATANODES, false);
+ if (!(stmt->options & VACOPT_COORDINATOR))
+ exec_type = EXEC_ON_DATANODES;
+ auto_commit = true;
}
break;
case T_AlterNodeStmt:
PgxcNodeAlter((AlterNodeStmt *) parsetree);
if (((AlterNodeStmt *) parsetree)->cluster)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_ALL_NODES, false);
+ exec_type = EXEC_ON_ALL_NODES;
+ all_done = true;
break;
case T_CreateNodeStmt:
PgxcNodeCreate((CreateNodeStmt *) parsetree);
+ all_done = true;
break;
case T_DropNodeStmt:
PgxcNodeRemove((DropNodeStmt *) parsetree);
+ all_done = true;
break;
case T_CreateGroupStmt:
PgxcGroupCreate((CreateGroupStmt *) parsetree);
+ all_done = true;
break;
case T_DropGroupStmt:
PgxcGroupRemove((DropGroupStmt *) parsetree);
+ all_done = true;
break;
case T_ReindexStmt:
case T_RenameStmt:
{
RenameStmt *stmt = (RenameStmt *) parsetree;
- RemoteQueryExecType exec_type;
- bool is_temp = false;
if (IS_PGXC_LOCAL_COORDINATOR)
{
exec_type = ExecUtilityFindNodes(stmt->renameType,
InvalidOid,
&is_temp);
- ExecUtilityStmtOnNodes(queryString,
- NULL,
- sentToRemote,
- false,
- exec_type,
- is_temp);
}
}
break;
* connections, then clean local pooler
*/
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, true, EXEC_ON_ALL_NODES, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, true,
+ EXEC_ON_ALL_NODES, false, false);
CleanConnection((CleanConnStmt *) parsetree);
break;
break;
case T_RefreshMatViewStmt:
- if ((IS_PGXC_COORDINATOR) && !IsConnFromCoord())
+ if (IS_PGXC_LOCAL_COORDINATOR)
{
RefreshMatViewStmt *stmt = (RefreshMatViewStmt *) parsetree;
if (stmt->relation->relpersistence != RELPERSISTENCE_TEMP)
- ExecUtilityStmtOnNodes(queryString, NULL,
- sentToRemote, false, EXEC_ON_COORDS, false);
+ exec_type = EXEC_ON_COORDS;
}
break;
break;
}
- return true;
+ /*
+ * Send queryString to remote nodes, if needed.
+ */
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, auto_commit,
+ exec_type, is_temp, add_context);
+
+
+ return all_done;
}
static void
bool sentToRemote)
{
Node *parsetree = pstmt->utilityStmt;
+ bool is_temp = false;
+ bool auto_commit = false;
+ bool add_context = false;
+ RemoteQueryExecType exec_type = EXEC_ON_NONE;
+
+ /*
+ * auto_commit and is_temp is initialised to false and changed if required.
+ *
+ * exec_type is initialised to EXEC_ON_NONE and updated iff the command
+ * needs remote execution during the preprocessing step.
+ */
switch (nodeTag(parsetree))
{
case T_CreateTableSpaceStmt:
case T_CreatedbStmt:
- if (IS_PGXC_LOCAL_COORDINATOR)
- ExecUtilityWithMessage(queryString, sentToRemote, false);
+ add_context = true;
+ exec_type = EXEC_ON_ALL_NODES;
break;
case T_DropTableSpaceStmt:
case T_CreatePolicyStmt: /* CREATE POLICY */
case T_AlterPolicyStmt: /* ALTER POLICY */
case T_CreateAmStmt:
- if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_ALL_NODES, false);
+ exec_type = EXEC_ON_ALL_NODES;
break;
case T_TruncateStmt:
*/
if (IS_PGXC_LOCAL_COORDINATOR)
{
- bool is_temp = false;
ListCell *cell;
TruncateStmt *stmt = (TruncateStmt *) parsetree;
break;
}
}
-
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_DATANODES, is_temp);
+ exec_type = EXEC_ON_DATANODES;
}
break;
* If this is not a SET TABLESPACE statement, just propogate the
* cmd as usual.
*/
- if (!IsSetTableSpace((AlterDatabaseStmt*) parsetree))
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_ALL_NODES, false);
- else
- ExecUtilityWithMessage(queryString, sentToRemote, false);
+ if (IsSetTableSpace((AlterDatabaseStmt*) parsetree))
+ add_context = true;
+ exec_type = EXEC_ON_ALL_NODES;
}
break;
case T_LoadStmt:
case T_ConstraintsSetStmt:
- if (IS_PGXC_LOCAL_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_DATANODES, false);
+ exec_type = EXEC_ON_DATANODES;
break;
case T_ClusterStmt:
case T_CheckPointStmt:
- if (IS_PGXC_LOCAL_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, true, EXEC_ON_DATANODES, false);
+ auto_commit = true;
+ exec_type = EXEC_ON_DATANODES;
break;
case T_DiscardStmt:
* For example, temporary tables are created on all Datanodes
* and Coordinators.
*/
- if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, true, EXEC_ON_ALL_NODES, false);
+ auto_commit = true;
+ exec_type = EXEC_ON_ALL_NODES;
break;
case T_ReindexStmt:
{
ReindexStmt *stmt = (ReindexStmt *) parsetree;
Oid relid;
- RemoteQueryExecType exec_type;
- bool is_temp;
/* forbidden in parallel mode due to CommandIsReadOnly */
switch (stmt->kind)
}
if (IS_PGXC_LOCAL_COORDINATOR)
{
- bool auto_commit = (stmt->kind == REINDEX_OBJECT_DATABASE ||
+ auto_commit = (stmt->kind == REINDEX_OBJECT_DATABASE ||
stmt->kind == REINDEX_OBJECT_SCHEMA);
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
- auto_commit, exec_type, false);
}
}
break;
if (IS_PGXC_LOCAL_COORDINATOR)
{
RemoteQueryExecType remoteExecType = EXEC_ON_ALL_NODES;
- bool is_temp = false;
/* Launch GRANT on Coordinator if object is a sequence */
if ((stmt->objtype == ACL_OBJECT_RELATION &&
}
}
}
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, remoteExecType, is_temp);
+ exec_type = remoteExecType;
}
}
break;
if (IS_PGXC_LOCAL_COORDINATOR)
{
AlterObjectSchemaStmt *stmt = (AlterObjectSchemaStmt *) parsetree;
- RemoteQueryExecType exec_type;
- bool is_temp = false;
/* Try to use the object relation if possible */
if (stmt->relation)
InvalidOid,
&is_temp);
}
-
- ExecUtilityStmtOnNodes(queryString,
- NULL,
- sentToRemote,
- false,
- exec_type,
- is_temp);
}
break;
/* Comment objects depending on their object and temporary types */
if (IS_PGXC_LOCAL_COORDINATOR)
{
- bool is_temp = false;
CommentStmt *stmt = (CommentStmt *) parsetree;
- RemoteQueryExecType exec_type = GetNodesForCommentUtility(stmt, &is_temp);
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, exec_type, is_temp);
+ exec_type = GetNodesForCommentUtility(stmt, &is_temp);
}
break;
{
IndexStmt *stmt = (IndexStmt *) parsetree;
Oid relid;
- bool is_temp = false;
- RemoteQueryExecType exec_type = EXEC_ON_ALL_NODES;
/* INDEX on a temporary table cannot use 2PC at commit */
relid = RangeVarGetRelid(stmt->relation, NoLock, true);
else
exec_type = EXEC_ON_NONE;
- if (IS_PGXC_COORDINATOR && !stmt->isconstraint && !IsConnFromCoord())
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
- stmt->concurrent, exec_type, is_temp);
+ auto_commit = stmt->concurrent;
+ if (stmt->isconstraint)
+ exec_type = EXEC_ON_NONE;
}
break;
* In this case force autocommit, this transaction cannot be launched
* inside a transaction block.
*/
- if (IS_PGXC_LOCAL_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false , EXEC_ON_ALL_NODES, false);
+ exec_type = EXEC_ON_ALL_NODES;
break;
case T_ViewStmt: /* CREATE VIEW */
if (IS_PGXC_LOCAL_COORDINATOR)
{
ViewStmt *stmt = (ViewStmt *) parsetree;
-
if (stmt->view->relpersistence != RELPERSISTENCE_TEMP)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_COORDS, false);
+ exec_type = EXEC_ON_COORDS;
}
break;
case T_RuleStmt: /* CREATE RULE */
if (IS_PGXC_LOCAL_COORDINATOR)
{
- RemoteQueryExecType exec_type;
- bool is_temp;
exec_type = GetNodesForRulesUtility(((RuleStmt *) parsetree)->relation,
&is_temp);
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, exec_type, is_temp);
}
break;
/* In case this query is related to a SERIAL execution, just bypass */
if (!stmt->is_serial)
- {
- bool is_temp = stmt->sequence->relpersistence == RELPERSISTENCE_TEMP;
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_ALL_NODES, is_temp);
- }
+ is_temp = stmt->sequence->relpersistence == RELPERSISTENCE_TEMP;
+ exec_type = EXEC_ON_ALL_NODES;
}
break;
/* In case this query is related to a SERIAL execution, just bypass */
if (!stmt->is_serial)
{
- bool is_temp;
- RemoteQueryExecType exec_type;
Oid relid = RangeVarGetRelid(stmt->sequence, NoLock, true);
if (!OidIsValid(relid))
relid,
&is_temp);
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, exec_type, is_temp);
}
}
break;
case T_CreateTableAsStmt:
- if ((IS_PGXC_COORDINATOR) && !IsConnFromCoord())
+ if (IS_PGXC_LOCAL_COORDINATOR)
{
CreateTableAsStmt *stmt = (CreateTableAsStmt *) parsetree;
*/
Assert(stmt->relkind == OBJECT_MATVIEW);
if (stmt->into->rel->relpersistence != RELPERSISTENCE_TEMP)
- ExecUtilityStmtOnNodes(queryString, NULL,
- sentToRemote, false, EXEC_ON_COORDS, false);
+ exec_type = EXEC_ON_COORDS;
}
break;
case T_CreateStatsStmt:
if (IS_PGXC_LOCAL_COORDINATOR)
{
- bool is_temp;
CreateStatsStmt *stmt = (CreateStatsStmt *) parsetree;
RangeVar *rln = linitial(stmt->relations);
Relation rel = relation_openrv((RangeVar *) rln, ShareUpdateExclusiveLock);
* fetch the nodes. This is ok because the command must
* only be even used on some kind of relation.
*/
- RemoteQueryExecType exec_type =
+ exec_type =
ExecUtilityFindNodesRelkind(RelationGetRelid(rel), &is_temp);
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
- false, exec_type, false);
relation_close(rel, NoLock);
}
break;
(int) nodeTag(parsetree));
break;
}
+
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, auto_commit,
+ exec_type, is_temp, add_context);
}
/*
* standard_ProcessUtility itself deals only with utility commands for
if (completionTag)
completionTag[0] = '\0';
- if (!ProcessUtilityPre(pstmt, queryString, context, queryEnv, sentToRemote,
+ if (ProcessUtilityPre(pstmt, queryString, context, queryEnv, sentToRemote,
completionTag))
return;
/* DROP is done depending on the object type and its temporary type */
if (IS_PGXC_LOCAL_COORDINATOR)
ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false,
- exec_type, is_temp);
+ exec_type, is_temp, false);
}
#endif
break;
#ifdef PGXC
if (IS_PGXC_LOCAL_COORDINATOR)
ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false,
- exec_type, is_temp);
+ exec_type, is_temp, false);
}
#endif
break;
}
#ifdef PGXC
-
-/*
- * ExecUtilityWithMessage:
- * Execute the query on remote nodes in a transaction block.
- * If this fails on one of the nodes :
- * Add a context message containing the failed node names.
- * Rethrow the error with the message about the failed nodes.
- * If all are successful, just return.
- */
- static void
-ExecUtilityWithMessage(const char *queryString, bool sentToRemote, bool is_temp)
-{
- PG_TRY();
- {
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_ALL_NODES, is_temp);
- }
- PG_CATCH();
- {
-
- /*
- * Some nodes failed. Add context about what all nodes the query
- * failed
- */
- ExecNodes *coord_success_nodes = NULL;
- ExecNodes *data_success_nodes = NULL;
- char *msg_failed_nodes;
-
- pgxc_all_success_nodes(&data_success_nodes, &coord_success_nodes, &msg_failed_nodes);
- if (msg_failed_nodes)
- errcontext("%s", msg_failed_nodes);
- PG_RE_THROW();
- }
- PG_END_TRY();
-
-
-}
-
-/*
- * Execute a Utility statement on nodes, including Coordinators
- * If the DDL is received from a remote Coordinator,
- * it is not possible to push down DDL to Datanodes
- * as it is taken in charge by the remote Coordinator.
- */
- static void
-ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes, bool sentToRemote,
+static void
+ExecUtilityStmtOnNodesInternal(const char *queryString, ExecNodes *nodes, bool sentToRemote,
bool force_autocommit, RemoteQueryExecType exec_type, bool is_temp)
{
/* Return if query is launched on no nodes */
}
}
+
+/*
+ * ExecUtilityStmtOnNodes
+ *
+ * Execute the query on remote nodes
+ *
+ * queryString is the raw query to be executed.
+ * If nodes is NULL then the list of nodes is computed from exec_type.
+ * If auto_commit is true, then the query is executed without a transaction
+ * block and auto-committed on the remote node.
+ * exec_type is used to compute the list of remote nodes on which the query is
+ * executed.
+ * is_temp is set to true if the query involves a temporary database object.
+ * If add_context is true and if this fails on one of the nodes then add a
+ * context message containing the failed node names.
+ */
+static void
+ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
+ bool sentToRemote, bool auto_commit, RemoteQueryExecType exec_type,
+ bool is_temp, bool add_context)
+{
+ PG_TRY();
+ {
+ ExecUtilityStmtOnNodesInternal(queryString, nodes, sentToRemote,
+ auto_commit, exec_type, is_temp);
+ }
+ PG_CATCH();
+ {
+ /*
+ * Some nodes failed. Add context about what all nodes the query
+ * failed
+ */
+ ExecNodes *coord_success_nodes = NULL;
+ ExecNodes *data_success_nodes = NULL;
+ char *msg_failed_nodes;
+
+ /*
+ * If the caller has asked for context information, add that and
+ * re-throw the error.
+ */
+ if (!add_context)
+ PG_RE_THROW();
+
+ pgxc_all_success_nodes(&data_success_nodes, &coord_success_nodes, &msg_failed_nodes);
+ if (msg_failed_nodes)
+ errcontext("%s", msg_failed_nodes);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+
+}
+
/*
* ExecUtilityFindNodes
*