From c981d9145deae067bc67bc8f8bcd68b300ece3fe Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 17 Jan 2023 11:28:22 +0530 Subject: [PATCH] Improve the code to decide and process the apply action. The code that decides the apply action missed to handle non-transactional messages and we didn't catch it in our testing as currently such messages are simply ignored by the apply worker. This was introduced by changes in commit 216a784829. While testing this, I noticed that we forgot to reset stream_xid after processing the stream stop message which could also result in the wrong apply action after the fix for non-transactional messages. In passing, change assert to elog for unexpected apply action in some of the routines so as to catch the problems in the production environment, if any. Reported-by: Tomas Vondra Author: Amit Kapila Reviewed-by: Tomas Vondra, Sawada Masahiko, Hou Zhijie Discussion: https://postgr.es/m/984ff689-adde-9977-affe-cd6029e850be@enterprisedb.com Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com --- src/backend/replication/logical/worker.c | 73 +++++++++++++++--------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d8b8a374c62..a0084c7ef69 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -247,8 +247,10 @@ typedef struct ApplyErrorCallbackArg * The action to be taken for the changes in the transaction. * * TRANS_LEADER_APPLY: - * This action means that we are in the leader apply worker and changes of the - * transaction are applied directly by the worker. + * This action means that we are in the leader apply worker or table sync + * worker. The changes of the transaction are either directly applied or + * are read from temporary files (for streaming transactions) and then + * applied by the worker. * * TRANS_LEADER_SERIALIZE: * This action means that we are in the leader apply worker or table sync @@ -1004,6 +1006,9 @@ apply_handle_begin(StringInfo s) { LogicalRepBeginData begin_data; + /* There must not be an active streaming transaction. */ + Assert(!TransactionIdIsValid(stream_xid)); + logicalrep_read_begin(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); @@ -1058,6 +1063,9 @@ apply_handle_begin_prepare(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); + /* There must not be an active streaming transaction. */ + Assert(!TransactionIdIsValid(stream_xid)); + logicalrep_read_begin_prepare(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); @@ -1301,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s) switch (apply_action) { - case TRANS_LEADER_SERIALIZE: + case TRANS_LEADER_APPLY: /* * The transaction has been serialized to file, so replay all the @@ -1384,7 +1392,7 @@ apply_handle_stream_prepare(StringInfo s) break; default: - Assert(false); + elog(ERROR, "unexpected apply action: %d", (int) apply_action); break; } @@ -1484,6 +1492,9 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("duplicate STREAM START message"))); + /* There must not be an active streaming transaction. */ + Assert(!TransactionIdIsValid(stream_xid)); + /* notify handle methods we're processing a remote transaction */ in_streamed_transaction = true; @@ -1589,7 +1600,7 @@ apply_handle_stream_start(StringInfo s) break; default: - Assert(false); + elog(ERROR, "unexpected apply action: %d", (int) apply_action); break; } @@ -1705,11 +1716,12 @@ apply_handle_stream_stop(StringInfo s) break; default: - Assert(false); + elog(ERROR, "unexpected apply action: %d", (int) apply_action); break; } in_streamed_transaction = false; + stream_xid = InvalidTransactionId; /* * The parallel apply worker could be in a transaction in which case we @@ -1842,7 +1854,7 @@ apply_handle_stream_abort(StringInfo s) switch (apply_action) { - case TRANS_LEADER_SERIALIZE: + case TRANS_LEADER_APPLY: /* * We are in the leader apply worker and the transaction has been @@ -1957,7 +1969,7 @@ apply_handle_stream_abort(StringInfo s) break; default: - Assert(false); + elog(ERROR, "unexpected apply action: %d", (int) apply_action); break; } @@ -2154,7 +2166,7 @@ apply_handle_stream_commit(StringInfo s) switch (apply_action) { - case TRANS_LEADER_SERIALIZE: + case TRANS_LEADER_APPLY: /* * The transaction has been serialized to file, so replay all the @@ -2226,7 +2238,7 @@ apply_handle_stream_commit(StringInfo s) break; default: - Assert(false); + elog(ERROR, "unexpected apply action: %d", (int) apply_action); break; } @@ -4204,7 +4216,6 @@ stream_close_file(void) BufFileClose(stream_fd); - stream_xid = InvalidTransactionId; stream_fd = NULL; } @@ -4977,10 +4988,12 @@ set_apply_error_context_origin(char *originname) } /* - * Return the action to be taken for the given transaction. *winfo is - * assigned to the destination parallel worker info when the leader apply - * worker has to pass all the transaction's changes to the parallel apply - * worker. + * Return the action to be taken for the given transaction. See + * TransApplyAction for information on each of the actions. + * + * *winfo is assigned to the destination parallel worker info when the leader + * apply worker has to pass all the transaction's changes to the parallel + * apply worker. */ static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) @@ -4991,27 +5004,35 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) { return TRANS_PARALLEL_APPLY; } - else if (in_remote_transaction) - { - return TRANS_LEADER_APPLY; - } /* - * Check if we are processing this transaction using a parallel apply - * worker. + * If we are processing this transaction using a parallel apply worker then + * either we send the changes to the parallel worker or if the worker is busy + * then serialize the changes to the file which will later be processed by + * the parallel worker. */ *winfo = pa_find_worker(xid); - if (!*winfo) + if (*winfo && (*winfo)->serialize_changes) { - return TRANS_LEADER_SERIALIZE; + return TRANS_LEADER_PARTIAL_SERIALIZE; } - else if ((*winfo)->serialize_changes) + else if (*winfo) { - return TRANS_LEADER_PARTIAL_SERIALIZE; + return TRANS_LEADER_SEND_TO_PARALLEL; + } + + /* + * If there is no parallel worker involved to process this transaction then + * we either directly apply the change or serialize it to a file which will + * later be applied when the transaction finish message is processed. + */ + else if (in_streamed_transaction) + { + return TRANS_LEADER_SERIALIZE; } else { - return TRANS_LEADER_SEND_TO_PARALLEL; + return TRANS_LEADER_APPLY; } } -- 2.30.2