int headKnownAssignedXids; /* index of newest element, + 1 */
slock_t known_assigned_xids_lck; /* protects head/tail pointers */
+ slock_t commitseqno_lck;
+ uint64 commitseqno;
+ uint64 commitseqno_removed;
+
/*
* Highest subxid that has been removed from KnownAssignedXids array to
* prevent overflow; or InvalidTransactionId if none. We track this for
procArray->headKnownAssignedXids = 0;
SpinLockInit(&procArray->known_assigned_xids_lck);
procArray->lastOverflowedXid = InvalidTransactionId;
+
+ SpinLockInit(&procArray->commitseqno_lck);
+ procArray->commitseqno = 0;
+ procArray->commitseqno_removed = 0;
}
/* Create or attach to the KnownAssignedXids arrays too, if needed */
}
+/*
+ * Clear out our XID from our PGPROC entry. ProcArrayEndTransaction()
+ * just marks our xid as committed in PGPROC, so before we can start another
+ * transaction, we must clear out the already-committed xid.
+ */
+void
+ProcArrayCleanupTransaction(PGPROC *proc)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile ProcArrayStruct *pArray = procArray;
+
+ if (proc->xid != InvalidTransactionId)
+ {
+ /*
+ * Need to interlock with GetSnapshotData(), so that anyone
+ * who started doing GetSnapshotData() before we set proc->commitlsn,
+ * must be finished before we do this
+ */
+ SpinLockAcquire(&pArray->commitseqno_lck);
+
+ /*
+ * Adjust bookkeeping to note that we have removed this committed
+ * xid from the array. If anyone is running GetSnapshotData() at the
+ * same time, this signals it that if it was supposed to see our XID
+ * in the snapshot, it's no longer there and he needs to start over.
+ */
+ if (pArray->commitseqno_removed < proc->commitseqno)
+ pArray->commitseqno_removed = proc->commitseqno;
+
+ proc->commitseqno = 0;
+
+ proc->xid = InvalidTransactionId;
+ proc->lxid = InvalidLocalTransactionId;
+ proc->xmin = InvalidTransactionId;
+ /* must be cleared with xid/xmin: */
+ proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ proc->inCommit = false; /* be sure this is cleared in abort */
+ proc->recoveryConflictPending = false;
+
+ /* Clear the subtransaction-XID cache too while holding the lock */
+ proc->subxids.nxids = 0;
+ proc->subxids.overflowed = false;
+
+ SpinLockRelease(&pArray->commitseqno_lck);
+ }
+ else
+ {
+ /* We're not holding onto an xid, so no need to take the spinlock */
+ Assert(proc->commitseqno == 0);
+
+ proc->lxid = InvalidLocalTransactionId;
+ proc->xmin = InvalidTransactionId;
+ /* must be cleared with xid/xmin: */
+ proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ proc->inCommit = false; /* be sure this is cleared in abort */
+ proc->recoveryConflictPending = false;
+
+ /* Clear the subtransaction-XID cache too while holding the lock */
+ Assert(proc->subxids.nxids == 0);
+ Assert(proc->subxids.overflowed == false);
+ }
+}
+
/*
* ProcArrayEndTransaction -- mark a transaction as no longer running
*
void
ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile ProcArrayStruct *pArray = procArray;
+
if (TransactionIdIsValid(latestXid))
{
/*
* src/backend/access/transam/README.
*/
Assert(TransactionIdIsValid(proc->xid));
+ Assert(proc->commitseqno == 0);
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ SpinLockAcquire(&pArray->commitseqno_lck);
- proc->xid = InvalidTransactionId;
- proc->lxid = InvalidLocalTransactionId;
- proc->xmin = InvalidTransactionId;
- /* must be cleared with xid/xmin: */
- proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
- proc->inCommit = false; /* be sure this is cleared in abort */
- proc->recoveryConflictPending = false;
-
- /* Clear the subtransaction-XID cache too while holding the lock */
- proc->subxids.nxids = 0;
- proc->subxids.overflowed = false;
+ /* Advance commitseqno and get our value */
+ proc->commitseqno = (++pArray->commitseqno);
/* Also advance global latestCompletedXid while holding the lock */
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
latestXid))
ShmemVariableCache->latestCompletedXid = latestXid;
- LWLockRelease(ProcArrayLock);
+ SpinLockRelease(&pArray->commitseqno_lck);
}
else
{
proc->lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
proc->recoveryConflictPending = false;
+ proc->commitseqno = 0;
/* redundant, but just in case */
proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
if (proc == MyProc)
continue;
+ if (proc->commitseqno != 0)
+ {
+ /*
+ * Perhaps we should check the xid anyway. If it matches, we
+ * know that this xid just committed, so it's definitely not
+ * in-progress anymore, and we wouldn't need to go through the
+ * rest of the procarray.
+ */
+ continue;
+ }
+
/* Fetch xid just once - see GetNewTransactionId */
pxid = proc->xid;
if (!TransactionIdIsValid(pxid))
continue;
+ if (proc->commitseqno != 0)
+ continue;
+
if (proc->pid == 0)
continue; /* ignore prepared transactions */
if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM))
continue;
+ if (proc->commitseqno != 0)
+ continue;
+
if (allDbs ||
proc->databaseId == MyDatabaseId ||
proc->databaseId == 0) /* always include WalSender */
GetSnapshotData(Snapshot snapshot)
{
ProcArrayStruct *arrayP = procArray;
+ volatile ProcArrayStruct *pArray = procArray;
TransactionId xmin;
TransactionId xmax;
TransactionId globalxmin;
int index;
- int count = 0;
- int subcount = 0;
- bool suboverflowed = false;
+ int count;
+ int subcount;
+ bool suboverflowed;
+ uint64 lastcommitseqno;
Assert(snapshot != NULL);
errmsg("out of memory")));
}
+retry:
+ count = subcount = 0;
+ suboverflowed = false;
+
/*
* It is sufficient to get shared lock on ProcArrayLock, even if we are
* going to set MyProc->xmin.
+ *
+ * Other backends might begin or end transactions while we're busy
+ * getting our snapshot. Anyone who begins after we get latestCompletedXid
+ * below will be ignored, because his xid will be > xmax. Anyone who
+ * commits or aborts while we're getting the snapshot will have
+ * proc->commitseqno set to a value higher than the lastcommitseqno we
+ * get below, so we will still consider them as running. See also
+ * ProcArrayEndTransaction().
+ *
+ * ProcArrayLock is still needed to prevent the number of procs in the
+ * procarray from changing under us.
*/
LWLockAcquire(ProcArrayLock, LW_SHARED);
+ SpinLockAcquire(&pArray->commitseqno_lck);
+
/* xmax is always latestCompletedXid + 1 */
xmax = ShmemVariableCache->latestCompletedXid;
+
+ lastcommitseqno = pArray->commitseqno;
+
+ SpinLockRelease(&pArray->commitseqno_lck);
+
Assert(TransactionIdIsNormal(xmax));
TransactionIdAdvance(xmax);
{
volatile PGPROC *proc = arrayP->procs[index];
TransactionId xid;
+ uint64 commitseqno;
/* Ignore procs running LAZY VACUUM */
if (proc->vacuumFlags & PROC_IN_VACUUM)
continue;
+ /*
+ * If this transaction committed before we began taking our
+ * snapshot, we see it as committed. Otherwise we still
+ * consider it in-progress.
+ */
+ commitseqno = proc->commitseqno; /* fetch just once. XXX: this should be atomic */
+ if (commitseqno != 0 && commitseqno <= lastcommitseqno)
+ continue;
+
/* Update globalxmin to be the smallest valid xmin */
xid = proc->xmin; /* fetch just once */
if (TransactionIdIsNormal(xid) &&
suboverflowed = true;
}
+ SpinLockAcquire(&pArray->commitseqno_lck);
+
+ if (pArray->commitseqno_removed > lastcommitseqno)
+ {
+ /*
+ * Oops, someone cleaned up an xid that we should still see as
+ * visible, while we were getting the snapshot. Our snapshot might be
+ * incomplete. Have to retry; hopefully this doesn't happen very
+ * often in practice.
+ */
+ SpinLockRelease(&pArray->commitseqno_lck);
+ LWLockRelease(ProcArrayLock);
+ goto retry;
+ }
+
if (!TransactionIdIsValid(MyProc->xmin))
MyProc->xmin = TransactionXmin = xmin;
+ SpinLockRelease(&pArray->commitseqno_lck);
+
LWLockRelease(ProcArrayLock);
/*
if (!TransactionIdIsValid(xid))
continue;
+ if (proc->commitseqno != 0)
+ continue;
+
xids[count++] = xid;
if (TransactionIdPrecedes(xid, oldestRunningXid))
if (!TransactionIdIsNormal(xid))
continue;
+ if (proc->commitseqno != 0)
+ continue;
+
if (TransactionIdPrecedes(xid, oldestRunningXid))
oldestRunningXid = xid;
{
volatile PGPROC *proc = arrayP->procs[index];
+ if (proc->commitseqno != 0)
+ continue;
+
if (proc->xid == xid)
{
result = proc->pid;
continue; /* do not count prepared xacts */
if (proc->xid == InvalidTransactionId)
continue; /* do not count if no XID assigned */
+ if (proc->commitseqno != 0)
+ continue; /* do not count if already committed */
if (proc->waitLock != NULL)
continue; /* do not count if blocked on a lock */
count++;