Fix replication of in-progress transactions in tablesync worker.
authorAmit Kapila <[email protected]>
Fri, 27 Nov 2020 02:13:34 +0000 (07:43 +0530)
committerAmit Kapila <[email protected]>
Fri, 27 Nov 2020 02:13:34 +0000 (07:43 +0530)
Tablesync worker runs under a single transaction but in streaming mode, we
were committing the transaction on stream_stop, stream_abort, and
stream_commit. We need to avoid committing the transaction in a streaming
mode in tablesync worker.

In passing move the call to process_syncing_tables in
apply_handle_stream_commit after clean up of stream files. This will
allow clean up of files to happen before the exit of tablesync worker
which would otherwise be handled by one of the proc exit routines.

Author: Dilip Kumar
Reviewed-by: Amit Kapila and Peter Smith
Tested-by: Peter Smith
Discussion: https://postgr.es/m/CAHut+Pt4PyKQCwqzQ=EFF=bpKKJD7XKt_S23F6L20ayQNxg77A@mail.gmail.com

src/backend/replication/logical/worker.c

index 4e8e2965b82b1f0560e6e2077a8b56f1f9dac800..8c7fad8f74108d6a9f6540f07207c15d12eb57c3 100644 (file)
@@ -224,6 +224,8 @@ static void maybe_reread_subscription(void);
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
+static void apply_handle_commit_internal(StringInfo s,
+                                        LogicalRepCommitData* commit_data);
 static void apply_handle_insert_internal(ResultRelInfo *relinfo,
                                         EState *estate, TupleTableSlot *remoteslot);
 static void apply_handle_update_internal(ResultRelInfo *relinfo,
@@ -709,29 +711,7 @@ apply_handle_commit(StringInfo s)
 
    Assert(commit_data.commit_lsn == remote_final_lsn);
 
-   /* The synchronization worker runs in single transaction. */
-   if (IsTransactionState() && !am_tablesync_worker())
-   {
-       /*
-        * Update origin state so we can restart streaming from correct
-        * position in case of crash.
-        */
-       replorigin_session_origin_lsn = commit_data.end_lsn;
-       replorigin_session_origin_timestamp = commit_data.committime;
-
-       CommitTransactionCommand();
-       pgstat_report_stat(false);
-
-       store_flush_position(commit_data.end_lsn);
-   }
-   else
-   {
-       /* Process any invalidation messages that might have accumulated. */
-       AcceptInvalidationMessages();
-       maybe_reread_subscription();
-   }
-
-   in_remote_transaction = false;
+   apply_handle_commit_internal(s, &commit_data);
 
    /* Process any tables that are being synchronized in parallel. */
    process_syncing_tables(commit_data.end_lsn);
@@ -772,8 +752,10 @@ apply_handle_stream_start(StringInfo s)
 
    /*
     * Start a transaction on stream start, this transaction will be committed
-    * on the stream stop. We need the transaction for handling the buffile,
-    * used for serializing the streaming data and subxact info.
+    * on the stream stop unless it is a tablesync worker in which case it will
+    * be committed after processing all the messages. We need the transaction
+    * for handling the buffile, used for serializing the streaming data and
+    * subxact info.
     */
    ensure_transaction();
 
@@ -825,8 +807,12 @@ apply_handle_stream_stop(StringInfo s)
    /* We must be in a valid transaction state */
    Assert(IsTransactionState());
 
-   /* Commit the per-stream transaction */
-   CommitTransactionCommand();
+   /* The synchronization worker runs in single transaction. */
+   if (!am_tablesync_worker())
+   {
+       /* Commit the per-stream transaction */
+       CommitTransactionCommand();
+   }
 
    in_streamed_transaction = false;
 
@@ -902,7 +888,10 @@ apply_handle_stream_abort(StringInfo s)
        {
            /* Cleanup the subxact info */
            cleanup_subxact_info();
-           CommitTransactionCommand();
+
+           /* The synchronization worker runs in single transaction */
+           if (!am_tablesync_worker())
+               CommitTransactionCommand();
            return;
        }
 
@@ -928,7 +917,9 @@ apply_handle_stream_abort(StringInfo s)
 
        /* write the updated subxact list */
        subxact_info_write(MyLogicalRepWorker->subid, xid);
-       CommitTransactionCommand();
+
+       if (!am_tablesync_worker())
+           CommitTransactionCommand();
    }
 }
 
@@ -1048,35 +1039,54 @@ apply_handle_stream_commit(StringInfo s)
 
    BufFileClose(fd);
 
-   /*
-    * Update origin state so we can restart streaming from correct position
-    * in case of crash.
-    */
-   replorigin_session_origin_lsn = commit_data.end_lsn;
-   replorigin_session_origin_timestamp = commit_data.committime;
-
    pfree(buffer);
    pfree(s2.data);
 
-   CommitTransactionCommand();
-   pgstat_report_stat(false);
-
-   store_flush_position(commit_data.end_lsn);
-
    elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
         nchanges, path);
 
-   in_remote_transaction = false;
-
-   /* Process any tables that are being synchronized in parallel. */
-   process_syncing_tables(commit_data.end_lsn);
+   apply_handle_commit_internal(s, &commit_data);
 
    /* unlink the files with serialized changes and subxact info */
    stream_cleanup_files(MyLogicalRepWorker->subid, xid);
 
+   /* Process any tables that are being synchronized in parallel. */
+   process_syncing_tables(commit_data.end_lsn);
+
    pgstat_report_activity(STATE_IDLE, NULL);
 }
 
+/*
+ * Helper function for apply_handle_commit and apply_handle_stream_commit.
+ */
+static void
+apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data)
+{
+   /* The synchronization worker runs in single transaction. */
+   if (IsTransactionState() && !am_tablesync_worker())
+   {
+       /*
+        * Update origin state so we can restart streaming from correct
+        * position in case of crash.
+        */
+       replorigin_session_origin_lsn = commit_data->end_lsn;
+       replorigin_session_origin_timestamp = commit_data->committime;
+
+       CommitTransactionCommand();
+       pgstat_report_stat(false);
+
+       store_flush_position(commit_data->end_lsn);
+   }
+   else
+   {
+       /* Process any invalidation messages that might have accumulated. */
+       AcceptInvalidationMessages();
+       maybe_reread_subscription();
+   }
+
+   in_remote_transaction = false;
+}
+
 /*
  * Handle RELATION message.
  *