* Helper function for ReorderBufferProcessTXN to handle the concurrent
* abort of the streaming transaction. This resets the TXN such that it
* can be used to stream the remaining data of transaction being processed.
+ * This can happen when the subtransaction is aborted and we still want to
+ * continue processing the main or other subtransactions data.
*/
static void
ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
LogicalDecodingContext *ctx = rb->private_data;
SnapBuild *builder = ctx->snapshot_builder;
+ /* We can't start streaming unless a consistent state is reached. */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
+ return false;
+
/*
* We can't start streaming immediately even if the streaming is enabled
* because we previously decoded this transaction and now just are
*/
if (ReorderBufferCanStream(rb) &&
!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
- {
- /* We must have a consistent snapshot by this time */
- Assert(SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT);
return true;
- }
return false;
}