Track clearly whether to run a remote transaction in autocommit or a block
authorPavan Deolasee <[email protected]>
Mon, 9 Apr 2018 10:42:54 +0000 (16:12 +0530)
committerPavan Deolasee <[email protected]>
Fri, 18 May 2018 10:55:41 +0000 (16:25 +0530)
Chi Gao and Hengbing Wang reported certain issues around transaction handling
and demonstrated via xlogdump how certain transactions were getting marked
committed/aborted repeatedly on a datanode. When an already committed
transaction is attempted to be aborted again, it results in a PANIC. Upon
investigation, this uncovered a very serious yet long standing bug in
transaction handling.

If the client is running in autocommit mode, we try to avoid starting a
transaction block on the datanode side if only one datanode is going to be
involved in the transaction. This is an optimisation to speed up short queries
touching only a single node. But when the query rewriter transforms a single
statement into multiple statements, we would still (and incorrectly) run each
statement in an autocommit mode on the datanode. This can cause inconsistencies
when one statement commits but the next statement aborts. And it may also lead
to the PANIC situations if we continue to use the same global transaction
identifier for the statements.

This can also happen when the user invokes a user-defined function. If the
function has multiple statements, each statement will run in an autocommit
mode, if it's FQSed, thus again creating inconsistency if a following statement
in the function fails.

We now have a more elaborate mechanism to tackle autocommit and transaction
block needs. The special casing for force_autocommit is now removed, thus
making it more predictable. We also have specific conditions to check to ensure
that we don't mixup autocommit and transaction block for the same global xid.
Finally, if a query rewriter transforms a single statement into multiple
statements, we run those statements in a transaction block. Together these
changes should help us fix the problems.

20 files changed:
contrib/stormstats/stormstats.c
src/backend/access/transam/xact.c
src/backend/commands/analyze.c
src/backend/commands/dbcommands.c
src/backend/commands/explain.c
src/backend/commands/vacuum.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/parser/analyze.c
src/backend/pgxc/locator/redistrib.c
src/backend/pgxc/plan/planner.c
src/backend/pgxc/pool/execRemote.c
src/backend/pgxc/pool/poolutils.c
src/backend/tcop/postgres.c
src/backend/tcop/utility.c
src/backend/utils/adt/dbsize.c
src/backend/utils/misc/guc.c
src/include/access/xact.h
src/include/pgxc/planner.h
src/test/regress/sql/plpgsql.sql

index 72c7150fb0e5f5fd6a8314c9967e6040a9350de5..004f5c23de84758ba1fe95ed22904c6d322d4b84 100644 (file)
@@ -630,7 +630,6 @@ storm_gather_remote_coord_info(Oid funcid)
        step->combine_type = COMBINE_TYPE_NONE;
        step->exec_nodes = NULL;
        step->sql_statement = query;
-       step->force_autocommit = false;
        step->read_only = true;
        step->exec_type = EXEC_ON_COORDS;
 
index 4a83f77e3f6ff15661477f24aad0977d11757031..9bb805d1278f61536d5790a6898f3164958be402 100644 (file)
@@ -90,6 +90,8 @@
 #define implicit2PC_head "_$XC$"
 #endif
 
+#define XACT_REMOTE_TRANSACTION_AUTOCOMMIT     0x01
+#define XACT_REMOTE_TRANSACTION_BLOCK          0x02
 
 /*
  *     User-tweakable parameters
@@ -221,6 +223,11 @@ typedef struct TransactionStateData
        int                     parallelModeLevel;      /* Enter/ExitParallelMode counter */
        struct TransactionStateData *parent;    /* back link to parent */
 #ifdef XCP
+       /*
+        * flags to track whether to run the remote transaction in a transaction
+        * block or in autocommit mode.
+        */
+       int                             remoteTransactionBlockFlags;
        int                             waitedForXidsCount;     /* count of xids we waited to finish */
        TransactionId   *waitedForXids;         /* xids we waited to finish */
 #endif
@@ -2758,6 +2765,7 @@ AtEOXact_GlobalTxn(bool commit)
        }
        s->waitedForXids = NULL;
        s->waitedForXidsCount = 0;
+       s->remoteTransactionBlockFlags = 0;
 
        SetNextTransactionId(InvalidTransactionId);
 }
@@ -3116,6 +3124,7 @@ PrepareTransaction(void)
        }
        s->waitedForXids = NULL;
        s->waitedForXidsCount = 0;
+       s->remoteTransactionBlockFlags = 0;
 #endif
 
        SetNextTransactionId(InvalidTransactionId);
@@ -3429,6 +3438,7 @@ CleanupTransaction(void)
        }
        s->waitedForXids = NULL;
        s->waitedForXidsCount = 0;
+       s->remoteTransactionBlockFlags = 0;
 #endif
 
        /*
@@ -3930,8 +3940,8 @@ AbortCurrentTransaction(void)
  *     making callers do it.)
  *     stmtType: statement type name, for error messages.
  */
-void
-PreventTransactionChain(bool isTopLevel, const char *stmtType)
+static void
+PreventTransactionChainInternal(bool isTopLevel, const char *stmtType, bool remote)
 {
        /*
         * xact block already started?
@@ -3968,6 +3978,21 @@ PreventTransactionChain(bool isTopLevel, const char *stmtType)
                CurrentTransactionState->blockState != TBLOCK_STARTED)
                elog(FATAL, "cannot prevent transaction chain");
        /* all okay */
+
+       if (remote)
+               SetRequireRemoteTransactionAutoCommit();
+}
+
+void
+PreventTransactionChain(bool isTopLevel, const char *stmtType)
+{
+       PreventTransactionChainInternal(isTopLevel, stmtType, true);
+}
+
+void
+PreventTransactionChainLocal(bool isTopLevel, const char *stmtType)
+{
+       PreventTransactionChainInternal(isTopLevel, stmtType, false);
 }
 
 /*
@@ -4300,6 +4325,8 @@ BeginTransactionBlock(void)
         */
        if (IS_PGXC_LOCAL_COORDINATOR)
                SetSendCommandId(true);
+
+       SetRequireRemoteTransactionBlock();
 #endif
 }
 
@@ -6912,5 +6939,43 @@ SetTopTransactionId(GlobalTransactionId xid)
                        pq_putmessage('x', (const char *) &xid, sizeof (GlobalTransactionId));
        }
 }
+
+void
+SetRequireRemoteTransactionBlock(void)
+{
+       TransactionState s = CurrentTransactionState;
+
+       if (s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_AUTOCOMMIT)
+               elog(ERROR, "Can't run a query marked for autocommit in a transaction block");
+       s->remoteTransactionBlockFlags |= XACT_REMOTE_TRANSACTION_BLOCK;
+}
+
+bool
+IsRemoteTransactionBlockRequired(void)
+{
+       TransactionState s = CurrentTransactionState;
+
+       return s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_BLOCK;
+
+}
+
+void
+SetRequireRemoteTransactionAutoCommit(void)
+{
+       TransactionState s = CurrentTransactionState;
+
+       if (s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_BLOCK)
+               elog(ERROR, "Can't run a query marked for a transaction block in autocommit mode");
+       s->remoteTransactionBlockFlags |= XACT_REMOTE_TRANSACTION_AUTOCOMMIT;
+}
+
+bool
+IsRemoteTransactionAutoCommit(void)
+{
+       TransactionState s = CurrentTransactionState;
+
+       return s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_AUTOCOMMIT;
+
+}
 #endif
 #endif
index 832d99a065ddc6cba0a62526d9d51616b082b74b..8abc3432e3dbd7b691338368ce3578ebad1fc853 100644 (file)
@@ -3032,7 +3032,6 @@ coord_collect_simple_stats(Relation onerel, bool inh, int attr_cnt,
        step->combine_type = COMBINE_TYPE_NONE;
        step->exec_nodes = NULL;
        step->sql_statement = query.data;
-       step->force_autocommit = true;
        step->exec_type = EXEC_ON_DATANODES;
 
        /* Add targetlist entries */
index ce49f91166c7451e000def5e45ef71c49c6d09ba..2fd6497518c0da7e5de4f51d9f963f5fb57d8cae 100644 (file)
@@ -1613,7 +1613,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
                /* ... but we allow it on remote nodes */
                if (IS_PGXC_LOCAL_COORDINATOR)
 #endif
-                       PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
+                       PreventTransactionChainLocal(isTopLevel, "ALTER DATABASE SET TABLESPACE");
 
                movedb(stmt->dbname, defGetString(dtablespace));
                return InvalidOid;
index 69dc8d0508c18acd88fd688ad552e40204240581..4a2eb5b56e5b1da673b491f1723790e5105dbb3b 100644 (file)
@@ -3760,7 +3760,6 @@ ExplainRemoteQuery(RemoteQuery *plan, PlanState *planstate, List *ancestors, Exp
                        step->exec_nodes->nodeList =
                                list_make1_int(linitial_int(plan->exec_nodes->nodeList));
 
-               step->force_autocommit = true;
                step->exec_type = EXEC_ON_DATANODES;
 
                dummy = makeVar(1, 1, TEXTOID, -1, InvalidOid, 0);
index fffae63bccbb183a64fd58a31a3114d82186e0e9..edadfc48449ead6207301b1d8a71109519127da6 100644 (file)
@@ -1706,7 +1706,6 @@ get_remote_relstat(char *nspname, char *relname, bool replicated,
        step->combine_type = COMBINE_TYPE_NONE;
        step->exec_nodes = NULL;
        step->sql_statement = query.data;
-       step->force_autocommit = true;
        step->exec_type = EXEC_ON_DATANODES;
 
        /* Add targetlist entries */
index e64a074cd11d06f1ebe411ffb1b31de74786c045..067fa8296f1cd3addf6b41404b2526ecbe024880 100644 (file)
@@ -1232,7 +1232,6 @@ _copyRemoteQuery(const RemoteQuery *from)
        COPY_SCALAR_FIELD(combine_type);
        COPY_NODE_FIELD(sort);
        COPY_SCALAR_FIELD(read_only);
-       COPY_SCALAR_FIELD(force_autocommit);
        COPY_STRING_FIELD(statement);
        COPY_STRING_FIELD(cursor);
        COPY_SCALAR_FIELD(rq_num_params);
index 99b70b717fe08fa3ee5997c02eb78c8b46ce643e..06e12e0b6443ac8174d7c03549ae9af501d075a0 100644 (file)
@@ -873,7 +873,6 @@ _outRemoteQuery(StringInfo str, const RemoteQuery *node)
        WRITE_NODE_FIELD(exec_nodes);
        WRITE_ENUM_FIELD(combine_type, CombineType);
        WRITE_BOOL_FIELD(read_only);
-       WRITE_BOOL_FIELD(force_autocommit);
        WRITE_STRING_FIELD(statement);
        WRITE_STRING_FIELD(cursor);
        WRITE_INT_FIELD(rq_num_params);
index 7ccd3f3dd8961b1f995dd428be0b1dc7efa70a24..fc865ac47af6935a4ea3ed931717e86ec1d61ee0 100644 (file)
@@ -2712,7 +2712,6 @@ transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt)
        step->combine_type = COMBINE_TYPE_NONE;
        step->sort = NULL;
        step->read_only = true;
-       step->force_autocommit = false;
        step->cursor = NULL;
 
        /* This is needed by executor */
@@ -2735,7 +2734,6 @@ transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt)
        step->join_condition = NULL;
 
        /* Change the list of nodes that will be executed for the query and others */
-       step->force_autocommit = false;
        step->combine_type = COMBINE_TYPE_SAME;
        step->read_only = true;
        step->exec_direct_type = EXEC_DIRECT_NONE;
index 6999dc7e2a68d8cba3c581c60b015c17a1379897..1a081b473013e6e78b2f785edf696362bfba35f0 100644 (file)
@@ -888,7 +888,6 @@ distrib_execute_query(char *sql, bool is_temp, ExecNodes *exec_nodes)
        step->combine_type = COMBINE_TYPE_SAME;
        step->exec_nodes = exec_nodes;
        step->sql_statement = pstrdup(sql);
-       step->force_autocommit = false;
 
        /* Redistribution operations only concern Datanodes */
        step->exec_type = EXEC_ON_DATANODES;
index e86a945c25ccde24a6c81ab64966a120e523d9bb..9afb602359cf0f5595ba9778856a5bcc6234217f 100644 (file)
@@ -17,6 +17,7 @@
 #include "postgres.h"
 #include "miscadmin.h"
 #include "access/transam.h"
+#include "access/xact.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_inherits_fn.h"
@@ -232,6 +233,16 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
 
        /* we need Coordinator for evaluation, invoke standard planner */
        result = standard_planner(query, cursorOptions, boundParams);
+
+       /*
+        * For coordinator side execution, we must always force a transaction block
+        * on the remote side. This ensures that all queries resulting from the
+        * coordinator side execution are run within a block. For example, this
+        * could be a user-defined function, which internally runs several queries,
+        * where each query is separately checked for fast-query-shipping. We must
+        * run all these queries inside a block.
+        */ 
+       SetRequireRemoteTransactionBlock();
        return result;
 }
 
index 2fd23687fc9d15a93bf82cf0a059ba2e0076fcc2..b069b68be56245e17681abbc7f2f0ede4f627550 100644 (file)
@@ -1895,6 +1895,9 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections,
                if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
                        BufferConnection(connections[i]);
 
+               elog(DEBUG2, "Sending gxid %u to remote node %s, need_tran_block %d",
+                               gxid, connections[i]->nodename, need_tran_block);
+
                /* Send GXID and check for errors */
                if (GlobalTransactionIdIsValid(gxid) && pgxc_node_send_gxid(connections[i], gxid))
                        return EOF;
@@ -1908,7 +1911,7 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections,
                else if (IS_PGXC_REMOTE_COORDINATOR)
                        need_tran_block = false;
 
-               elog(DEBUG5, "need_tran_block %d, connections[%d]->transaction_status %c",
+               elog(DEBUG2, "need_tran_block %d, connections[%d]->transaction_status %c",
                                need_tran_block, i, connections[i]->transaction_status);
                /* Send BEGIN if not already in transaction */
                if (need_tran_block && connections[i]->transaction_status == 'I')
@@ -2805,7 +2808,7 @@ DataNodeCopyBegin(RemoteCopyData *rcstate)
         * If more than one nodes are involved or if we are already in a
         * transaction block, we must the remote statements in a transaction block
         */
-       need_tran_block = (conn_count > 1) || (TransactionBlockStatusCode() == 'T');
+       need_tran_block = (conn_count > 1) || IsRemoteTransactionBlockRequired();
 
        elog(DEBUG1, "conn_count = %d, need_tran_block = %s", conn_count,
                        need_tran_block ? "true" : "false");
@@ -3400,7 +3403,7 @@ ExecRemoteUtility(RemoteQuery *node)
 {
        RemoteQueryState *remotestate;
        ResponseCombiner *combiner;
-       bool            force_autocommit = node->force_autocommit;
+       bool            force_autocommit = IsRemoteTransactionAutoCommit();
        RemoteQueryExecType exec_type = node->exec_type;
        GlobalTransactionId gxid = InvalidGlobalTransactionId;
        Snapshot snapshot = NULL;
@@ -4624,12 +4627,17 @@ ExecRemoteQuery(PlanState *pstate)
                 * Start transaction on data nodes if we are in explicit transaction
                 * or going to use extended query protocol or write to multiple nodes
                 */
-               if (step->force_autocommit)
+               elog(DEBUG2, "cursor %s, read_only %d,"
+                               " total_conn_count %d, transaction block status %c",
+                               step->cursor, step->read_only,
+                               total_conn_count, TransactionBlockStatusCode());
+
+               if (IsRemoteTransactionAutoCommit())
                        need_tran_block = false;
                else
                        need_tran_block = step->cursor ||
                                        (!step->read_only && total_conn_count > 1) ||
-                                       (TransactionBlockStatusCode() == 'T');
+                                       IsRemoteTransactionBlockRequired();
 
                stat_statement();
                stat_transaction(total_conn_count);
index ad361a0812197edcd7ce282dc326ff473e03d462..ac8d1daff49b52e608ec27dd03c5ad55bf6011f1 100644 (file)
@@ -27,6 +27,7 @@
 #include "pgxc/poolutils.h"
 #include "pgxc/pgxcnode.h"
 #include "access/gtm.h"
+#include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/pgxc_node.h"
 #include "commands/dbcommands.h"
@@ -97,10 +98,10 @@ pgxc_pool_check(PG_FUNCTION_ARGS)
 Datum
 pgxc_pool_reload(PG_FUNCTION_ARGS)
 {
-       if (IsTransactionBlock())
+       if (TransactionIdIsValid(GetTopTransactionIdIfAny()))
                ereport(ERROR,
                                (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
-                                errmsg("pgxc_pool_reload cannot run inside a transaction block")));
+                                errmsg("pgxc_pool_reload cannot run with a transaction ID assigned")));
 
        /*
         * Always check if we can get away with a LESS destructive refresh
index 8aa7dc25dc1b08ad358e35a001232aa727a48ecb..3c73f9bc162c414c2f317fbcfb8afc3a4babd1c0 100644 (file)
@@ -805,6 +805,13 @@ pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string,
         */
        querytree_list = pg_rewrite_query(query);
 
+       /*
+        * If we rewrote the query into more than one queries, then we must
+        * enforce a transaction block while running remote queries.
+        */
+       if (list_length(querytree_list) > 1)
+               SetRequireRemoteTransactionBlock();
+
        TRACE_POSTGRESQL_QUERY_REWRITE_DONE(query_string);
 
        return querytree_list;
@@ -1245,9 +1252,19 @@ exec_simple_query(const char *query_string)
                 */
                if (IS_PGXC_DATANODE && IsPostmasterEnvironment)
                {
-                       if (IsA(parsetree->stmt, VacuumStmt) || IsA(parsetree->stmt, ClusterStmt))
-                                SetForceXidFromGTM(true);
-                       else if (IsA(parsetree->stmt, ReindexStmt))
+                       if (IsA(parsetree, VacuumStmt))
+                       {
+                               VacuumStmt *stmt = (VacuumStmt *) parsetree;
+                               if (stmt->options & VACOPT_VACUUM)
+                                       SetForceXidFromGTM(true);
+                       }
+                       else if (IsA(parsetree, ClusterStmt))
+                       {
+                               ClusterStmt *stmt = (ClusterStmt *) parsetree;
+                               if (stmt->relation == NULL)
+                                       SetForceXidFromGTM(true);
+                       }
+                       else if (IsA(parsetree, ReindexStmt))
                        {
                                ReindexStmt *stmt = (ReindexStmt *) parsetree->stmt;
                                if (stmt->kind == REINDEX_OBJECT_SCHEMA ||
index 6dd3845153e71b26fcc85d8008c315fa544f33e0..6254f1c5a88e4e0f905856138d1fe3b310bc8ccf 100644 (file)
 
 static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
                                                                   bool sentToRemote,
-                                                                  bool force_autocommit,
                                                                   RemoteQueryExecType exec_type,
                                                                   bool is_temp,
                                                                   bool add_context);
 static void ExecUtilityStmtOnNodesInternal(const char *queryString,
                                                                   ExecNodes *nodes,
                                                                   bool sentToRemote,
-                                                                  bool force_autocommit,
                                                                   RemoteQueryExecType exec_type,
                                                                   bool is_temp);
 static RemoteQueryExecType ExecUtilityFindNodes(ObjectType objectType,
@@ -1535,7 +1533,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                case T_CreateTableSpaceStmt:
                        /* no event triggers for global objects */
                        if (IS_PGXC_LOCAL_COORDINATOR)
-                               PreventTransactionChain(isTopLevel, "CREATE TABLESPACE");
+                               PreventTransactionChainLocal(isTopLevel, "CREATE TABLESPACE");
                        CreateTableSpace((CreateTableSpaceStmt *) parsetree);
                        break;
 
@@ -1543,7 +1541,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                        /* no event triggers for global objects */
                        /* Allow this to be run inside transaction block on remote nodes */
                        if (IS_PGXC_LOCAL_COORDINATOR)
-                               PreventTransactionChain(isTopLevel, "DROP TABLESPACE");
+                               PreventTransactionChainLocal(isTopLevel, "DROP TABLESPACE");
 
                        DropTableSpace((DropTableSpaceStmt *) parsetree);
                        break;
@@ -1595,7 +1593,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                case T_CreatedbStmt:
                        /* no event triggers for global objects */
                        if (IS_PGXC_LOCAL_COORDINATOR)
-                               PreventTransactionChain(isTopLevel, "CREATE DATABASE");
+                               PreventTransactionChainLocal(isTopLevel, "CREATE DATABASE");
                        createdb(pstate, (CreatedbStmt *) parsetree);
                        break;
 
@@ -1615,7 +1613,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 
                                /* no event triggers for global objects */
                                if (IS_PGXC_LOCAL_COORDINATOR)
-                                       PreventTransactionChain(isTopLevel, "DROP DATABASE");
+                                       PreventTransactionChainLocal(isTopLevel, "DROP DATABASE");
 
                                dropdb(stmt->dbname, stmt->missing_ok);
                        }
@@ -1667,6 +1665,8 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                case T_ClusterStmt:
                        /* we choose to allow this during "read only" transactions */
                        PreventCommandDuringRecovery("CLUSTER");
+                       if (((ClusterStmt *) parsetree)->relation == NULL)
+                               PreventTransactionChain(isTopLevel, "CLUSTER");
                        /* forbidden in parallel mode due to CommandIsReadOnly */
                        cluster((ClusterStmt *) parsetree, isTopLevel);
                        break;
@@ -1678,6 +1678,16 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                                /* we choose to allow this during "read only" transactions */
                                PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ?
                                                                                         "VACUUM" : "ANALYZE");
+                               /*
+                                * We have to run the command on nodes before Coordinator because
+                                * vacuum() pops active snapshot and we can not send it to nodes
+                                */
+                               if (IS_PGXC_LOCAL_COORDINATOR &&
+                                       !(stmt->options & VACOPT_COORDINATOR))
+                               {
+                                       if (stmt->options & VACOPT_VACUUM)
+                                               SetRequireRemoteTransactionAutoCommit();
+                               }
                                /* forbidden in parallel mode due to CommandIsReadOnly */
                                ExecVacuum(stmt, isTopLevel);
                        }
@@ -1785,6 +1795,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                case T_ReindexStmt:
                        {
                                ReindexStmt *stmt = (ReindexStmt *) parsetree;
+                               bool                            prevent_xact_chain = false;
 
                                /* we choose to allow this during "read only" transactions */
                                PreventCommandDuringRecovery("REINDEX");
@@ -1812,12 +1823,18 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                                                                                                (stmt->kind == REINDEX_OBJECT_SYSTEM) ? "REINDEX SYSTEM" :
                                                                                                "REINDEX DATABASE");
                                                ReindexMultipleTables(stmt->name, stmt->kind, stmt->options);
+                                               prevent_xact_chain = true;
                                                break;
                                        default:
                                                elog(ERROR, "unrecognized object type: %d",
                                                         (int) stmt->kind);
                                                break;
                                }
+                               if (IS_PGXC_LOCAL_COORDINATOR)
+                               {
+                                       if (prevent_xact_chain)
+                                               SetRequireRemoteTransactionAutoCommit();
+                               }
                        }
                        break;
 
@@ -1868,7 +1885,6 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                                                                           completionTag);
                                else
                                        ExecRenameStmt(stmt);
-
                        }
                        break;
 
@@ -1929,7 +1945,6 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                                                                           completionTag);
                                else
                                        CommentObject(stmt);
-                               break;
                        }
                        break;
 
@@ -2824,12 +2839,17 @@ ExecDropStmt(DropStmt *stmt,
 ExecDropStmt(DropStmt *stmt, bool isTopLevel)
 #endif
 {
+       bool    prevent_xact_chain = false;
+
        switch (stmt->removeType)
        {
                case OBJECT_INDEX:
                        if (stmt->concurrent)
+                       {
                                PreventTransactionChain(isTopLevel,
                                                                                "DROP INDEX CONCURRENTLY");
+                               prevent_xact_chain = true;
+                       }
                        /* fall through */
 
                case OBJECT_TABLE:
@@ -2850,8 +2870,12 @@ ExecDropStmt(DropStmt *stmt, bool isTopLevel)
 #ifdef PGXC
                                /* DROP is done depending on the object type and its temporary type */
                                if (IS_PGXC_LOCAL_COORDINATOR)
-                                       ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false,
+                               {
+                                       if (prevent_xact_chain)
+                                               SetRequireRemoteTransactionAutoCommit();
+                                       ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
                                                        exec_type, is_temp, false);
+                               }
                        }
 #endif
                        break;
@@ -2868,7 +2892,7 @@ ExecDropStmt(DropStmt *stmt, bool isTopLevel)
                                RemoveObjects(stmt);
 #ifdef PGXC
                                if (IS_PGXC_LOCAL_COORDINATOR)
-                                       ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false,
+                                       ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
                                                        exec_type, is_temp, false);
                        }
 #endif
@@ -4596,7 +4620,7 @@ GetCommandLogLevel(Node *parsetree)
 #ifdef PGXC
 static void
 ExecUtilityStmtOnNodesInternal(const char *queryString, ExecNodes *nodes, bool sentToRemote,
-               bool force_autocommit, RemoteQueryExecType exec_type, bool is_temp)
+               RemoteQueryExecType exec_type, bool is_temp)
 {
        /* Return if query is launched on no nodes */
        if (exec_type == EXEC_ON_NONE)
@@ -4620,7 +4644,6 @@ ExecUtilityStmtOnNodesInternal(const char *queryString, ExecNodes *nodes, bool s
                step->combine_type = COMBINE_TYPE_SAME;
                step->exec_nodes = nodes;
                step->sql_statement = pstrdup(queryString);
-               step->force_autocommit = force_autocommit;
                step->exec_type = exec_type;
                ExecRemoteUtility(step);
                pfree(step->sql_statement);
index 4e8aacf6c48ccce96e01a844eab1905803fe0fc1..5bf80a26a942587909f168e98e396e5828b7d488 100644 (file)
@@ -1235,7 +1235,6 @@ pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query)
 
        }
        plan->sql_statement = query;
-       plan->force_autocommit = false;
        /*
         * We only need the target entry to determine result data type.
         * So create dummy even if real expression is a function.
index a51de7fafdb924d0e26a914f72949f5889205a0a..cc032e496663790d806e76a7c8a63df5b18de857 100644 (file)
@@ -7445,7 +7445,6 @@ set_config_option(const char *name, const char *value,
                step->exec_nodes = NULL;
                step->sql_statement = poolcmd.data;
                /* force_autocommit is actually does not start transaction on nodes */
-               step->force_autocommit = true;
                step->exec_type = EXEC_ON_CURRENT;
                ExecRemoteUtility(step);
                pfree(step);
index 16b3c5a26fa9db0d3e05670ad30e652f3f62ebc5..34c54198e2cc4ee41b924894b5abbe80a0c5c504 100644 (file)
@@ -403,6 +403,7 @@ extern bool IsTransactionOrTransactionBlock(void);
 extern char TransactionBlockStatusCode(void);
 extern void AbortOutOfAnyTransaction(void);
 extern void PreventTransactionChain(bool isTopLevel, const char *stmtType);
+extern void PreventTransactionChainLocal(bool isTopLevel, const char *stmtType);
 extern void RequireTransactionChain(bool isTopLevel, const char *stmtType);
 extern void WarnNoTransactionChain(bool isTopLevel, const char *stmtType);
 extern bool IsInTransactionChain(bool isTopLevel);
@@ -429,6 +430,10 @@ extern void SetSendCommandId(bool status);
 extern bool IsPGXCNodeXactReadOnly(void);
 extern bool IsPGXCNodeXactDatanodeDirect(void);
 extern void TransactionRecordXidWait(TransactionId xid);
+extern void SetRequireRemoteTransactionBlock(void);
+extern bool IsRemoteTransactionBlockRequired(void);
+extern void SetRequireRemoteTransactionAutoCommit(void);
+extern bool IsRemoteTransactionAutoCommit(void);
 #endif
 
 extern int     xactGetCommittedChildren(TransactionId **ptr);
index 095eca4e0089d5519eb5d751ba18d61de081e606..d4223d2ab6425639c9d4129c943ba47240084389 100644 (file)
@@ -88,7 +88,6 @@ typedef struct
        CombineType             combine_type;
        SimpleSort              *sort;
        bool                    read_only;              /* do not use 2PC when committing read only steps */
-       bool                    force_autocommit;       /* some commands like VACUUM require autocommit mode */
        char                    *statement;             /* if specified use it as a PreparedStatement name on Datanodes */
        char                    *cursor;                /* if specified use it as a Portal name on Datanodes */
        int             rq_num_params;      /* number of parameters present in
index fdcffe7d650117d69170ae5d383791dc654f3366..31517d384ea6f9f11fcb44a7f00de1c150abb24c 100644 (file)
@@ -4921,3 +4921,27 @@ BEGIN
 END; $$ LANGUAGE plpgsql;
 
 SELECT * FROM list_partitioned_table() AS t;
+
+-- ensure that all statements in a function are correctly executed in a
+-- transaction block.
+create table plp_mt_tab(a int, b int);
+create function plpgsql_multistmt() returns void as $$
+begin
+       insert into plp_mt_tab(a) values (1);
+       insert into plp_mt_tab(a) values (2);
+       insert into plp_mt_tab(a) values (3/0);
+end
+$$ language plpgsql;
+
+select plpgsql_multistmt();
+select * from plp_mt_tab;
+
+create or replace function plpgsql_multistmt() returns void as $$
+begin
+       insert into plp_mt_tab(a) values (3);
+       update plp_mt_tab set b = 1 where (a / 0) = 0;
+end
+$$ language plpgsql;
+
+select plpgsql_multistmt();
+select * from plp_mt_tab;