XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */
FullTransactionId ckptFullXid; /* nextXid of latest checkpoint */
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
+ XLogRecPtr asyncXactNotifiedLSN; /* */
XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */
XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment */
XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
{
XLogRecPtr WriteRqstPtr = asyncXactLSN;
+ XLogRecPtr LastNotifiedLSN;
bool sleeping;
SpinLockAcquire(&XLogCtl->info_lck);
sleeping = XLogCtl->WalWriterSleeping;
if (XLogCtl->asyncXactLSN < asyncXactLSN)
XLogCtl->asyncXactLSN = asyncXactLSN;
+ LastNotifiedLSN = XLogCtl->asyncXactNotifiedLSN;
SpinLockRelease(&XLogCtl->info_lck);
/*
WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
/* if we have already flushed that far, we're done */
- if (WriteRqstPtr <= LogwrtResult.FlushDone)
+ if (WriteRqstPtr <= LogwrtResult.FlushInit)
return;
+
+ if (WriteRqstPtr <= LogwrtResult.FlushInit + WalWriterFlushAfter * XLOG_BLCKSZ)
+ return;
+
+ if (WriteRqstPtr <= LastNotifiedLSN)
+ return;
+#if 1
+ SpinLockAcquire(&XLogCtl->info_lck);
+ LastNotifiedLSN = XLogCtl->asyncXactNotifiedLSN;
+ if (LastNotifiedLSN < WriteRqstPtr)
+ XLogCtl->asyncXactNotifiedLSN = WriteRqstPtr;
+ SpinLockRelease(&XLogCtl->info_lck);
+
+ if (WriteRqstPtr <= LastNotifiedLSN)
+ return;
+#endif
}
/*