/* prototype needed because of stream_commit */
static void apply_dispatch(StringInfo s);
+static void apply_handle_commit_internal(StringInfo s,
+ LogicalRepCommitData* commit_data);
static void apply_handle_insert_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot);
static void apply_handle_update_internal(ResultRelInfo *relinfo,
Assert(commit_data.commit_lsn == remote_final_lsn);
- /* The synchronization worker runs in single transaction. */
- if (IsTransactionState() && !am_tablesync_worker())
- {
- /*
- * Update origin state so we can restart streaming from correct
- * position in case of crash.
- */
- replorigin_session_origin_lsn = commit_data.end_lsn;
- replorigin_session_origin_timestamp = commit_data.committime;
-
- CommitTransactionCommand();
- pgstat_report_stat(false);
-
- store_flush_position(commit_data.end_lsn);
- }
- else
- {
- /* Process any invalidation messages that might have accumulated. */
- AcceptInvalidationMessages();
- maybe_reread_subscription();
- }
-
- in_remote_transaction = false;
+ apply_handle_commit_internal(s, &commit_data);
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn);
/*
* Start a transaction on stream start, this transaction will be committed
- * on the stream stop. We need the transaction for handling the buffile,
- * used for serializing the streaming data and subxact info.
+ * on the stream stop unless it is a tablesync worker in which case it will
+ * be committed after processing all the messages. We need the transaction
+ * for handling the buffile, used for serializing the streaming data and
+ * subxact info.
*/
ensure_transaction();
/* We must be in a valid transaction state */
Assert(IsTransactionState());
- /* Commit the per-stream transaction */
- CommitTransactionCommand();
+ /* The synchronization worker runs in single transaction. */
+ if (!am_tablesync_worker())
+ {
+ /* Commit the per-stream transaction */
+ CommitTransactionCommand();
+ }
in_streamed_transaction = false;
{
/* Cleanup the subxact info */
cleanup_subxact_info();
- CommitTransactionCommand();
+
+ /* The synchronization worker runs in single transaction */
+ if (!am_tablesync_worker())
+ CommitTransactionCommand();
return;
}
/* write the updated subxact list */
subxact_info_write(MyLogicalRepWorker->subid, xid);
- CommitTransactionCommand();
+
+ if (!am_tablesync_worker())
+ CommitTransactionCommand();
}
}
BufFileClose(fd);
- /*
- * Update origin state so we can restart streaming from correct position
- * in case of crash.
- */
- replorigin_session_origin_lsn = commit_data.end_lsn;
- replorigin_session_origin_timestamp = commit_data.committime;
-
pfree(buffer);
pfree(s2.data);
- CommitTransactionCommand();
- pgstat_report_stat(false);
-
- store_flush_position(commit_data.end_lsn);
-
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
nchanges, path);
- in_remote_transaction = false;
-
- /* Process any tables that are being synchronized in parallel. */
- process_syncing_tables(commit_data.end_lsn);
+ apply_handle_commit_internal(s, &commit_data);
/* unlink the files with serialized changes and subxact info */
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(commit_data.end_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
}
+/*
+ * Helper function for apply_handle_commit and apply_handle_stream_commit.
+ */
+static void
+apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data)
+{
+ /* The synchronization worker runs in single transaction. */
+ if (IsTransactionState() && !am_tablesync_worker())
+ {
+ /*
+ * Update origin state so we can restart streaming from correct
+ * position in case of crash.
+ */
+ replorigin_session_origin_lsn = commit_data->end_lsn;
+ replorigin_session_origin_timestamp = commit_data->committime;
+
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(commit_data->end_lsn);
+ }
+ else
+ {
+ /* Process any invalidation messages that might have accumulated. */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();
+ }
+
+ in_remote_transaction = false;
+}
+
/*
* Handle RELATION message.
*