First cut. Passes regressions now getsnapshotdata-with-commitseqno
authorHeikki Linnakangas <[email protected]>
Thu, 10 Nov 2011 10:32:29 +0000 (12:32 +0200)
committerHeikki Linnakangas <[email protected]>
Tue, 15 Nov 2011 13:22:25 +0000 (15:22 +0200)
src/backend/access/transam/xact.c
src/backend/storage/ipc/procarray.c
src/backend/storage/lmgr/proc.c
src/include/storage/proc.h
src/include/storage/procarray.h

index c151d3be191361ab84f9d3cf57567ba56a4cfb80..fb291214188f003f37d625030ecc15efde4a814b 100644 (file)
@@ -1651,6 +1651,9 @@ StartTransaction(void)
        TransactionState s;
        VirtualTransactionId vxid;
 
+       if (TransactionIdIsValid(MyProc->xid))
+               ProcArrayCleanupTransaction(MyProc);
+
        /*
         * Let's just make sure the state stack is empty
         */
@@ -2307,6 +2310,11 @@ AbortTransaction(void)
         * Let others know about no transaction in progress by me. Note that this
         * must be done _before_ releasing locks we hold and _after_
         * RecordTransactionAbort.
+        *
+        * XXX: Couldn't we use ProcArrayClearTransction() here? AFAICS marking
+        * an aborted transaction as not-running doesn't need to be interlocked
+        * with GetSnapshotData(), because whether or not a snapshot sees it as
+        * still running, its effects are not visible anyway.
         */
        ProcArrayEndTransaction(MyProc, latestXid);
 
index 1a48485f970b6a3427cc59b446e693b52e577725..5511023d3c7b7781744f952bb974ea7ba0b05b30 100644 (file)
@@ -72,6 +72,10 @@ typedef struct ProcArrayStruct
        int                     headKnownAssignedXids;  /* index of newest element, + 1 */
        slock_t         known_assigned_xids_lck;                /* protects head/tail pointers */
 
+       slock_t         commitseqno_lck;
+       uint64          commitseqno;
+       uint64          commitseqno_removed;
+
        /*
         * Highest subxid that has been removed from KnownAssignedXids array to
         * prevent overflow; or InvalidTransactionId if none.  We track this for
@@ -229,6 +233,10 @@ CreateSharedProcArray(void)
                procArray->headKnownAssignedXids = 0;
                SpinLockInit(&procArray->known_assigned_xids_lck);
                procArray->lastOverflowedXid = InvalidTransactionId;
+
+               SpinLockInit(&procArray->commitseqno_lck);
+               procArray->commitseqno = 0;
+               procArray->commitseqno_removed = 0;
        }
 
        /* Create or attach to the KnownAssignedXids arrays too, if needed */
@@ -333,6 +341,69 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
 }
 
 
+/*
+ * Clear out our XID from our PGPROC entry. ProcArrayEndTransaction()
+ * just marks our xid as committed in PGPROC, so before we can start another
+ * transaction, we must clear out the already-committed xid.
+ */
+void
+ProcArrayCleanupTransaction(PGPROC *proc)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile ProcArrayStruct *pArray = procArray;
+
+       if (proc->xid != InvalidTransactionId)
+       {
+               /*
+                * Need to interlock with GetSnapshotData(), so that anyone
+                * who started doing GetSnapshotData() before we set proc->commitlsn,
+                * must be finished before we do this
+                */
+               SpinLockAcquire(&pArray->commitseqno_lck);
+
+               /*
+                * Adjust bookkeeping to note that we have removed this committed
+                * xid from the array. If anyone is running GetSnapshotData() at the
+                * same time, this signals it that if it was supposed to see our XID
+                * in the snapshot, it's no longer there and he needs to start over.
+                */
+               if (pArray->commitseqno_removed < proc->commitseqno)
+                       pArray->commitseqno_removed = proc->commitseqno;
+
+               proc->commitseqno = 0;
+
+               proc->xid = InvalidTransactionId;
+               proc->lxid = InvalidLocalTransactionId;
+               proc->xmin = InvalidTransactionId;
+               /* must be cleared with xid/xmin: */
+               proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+               proc->inCommit = false; /* be sure this is cleared in abort */
+               proc->recoveryConflictPending = false;
+
+               /* Clear the subtransaction-XID cache too while holding the lock */
+               proc->subxids.nxids = 0;
+               proc->subxids.overflowed = false;
+
+               SpinLockRelease(&pArray->commitseqno_lck);
+       }
+       else
+       {
+               /* We're not holding onto an xid, so no need to take the spinlock */
+               Assert(proc->commitseqno == 0);
+
+               proc->lxid = InvalidLocalTransactionId;
+               proc->xmin = InvalidTransactionId;
+               /* must be cleared with xid/xmin: */
+               proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+               proc->inCommit = false; /* be sure this is cleared in abort */
+               proc->recoveryConflictPending = false;
+
+               /* Clear the subtransaction-XID cache too while holding the lock */
+               Assert(proc->subxids.nxids == 0);
+               Assert(proc->subxids.overflowed == false);
+       }
+}
+
 /*
  * ProcArrayEndTransaction -- mark a transaction as no longer running
  *
@@ -349,6 +420,9 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
 void
 ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile ProcArrayStruct *pArray = procArray;
+
        if (TransactionIdIsValid(latestXid))
        {
                /*
@@ -358,27 +432,19 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
                 * src/backend/access/transam/README.
                 */
                Assert(TransactionIdIsValid(proc->xid));
+               Assert(proc->commitseqno == 0);
 
-               LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+               SpinLockAcquire(&pArray->commitseqno_lck);
 
-               proc->xid = InvalidTransactionId;
-               proc->lxid = InvalidLocalTransactionId;
-               proc->xmin = InvalidTransactionId;
-               /* must be cleared with xid/xmin: */
-               proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
-               proc->inCommit = false; /* be sure this is cleared in abort */
-               proc->recoveryConflictPending = false;
-
-               /* Clear the subtransaction-XID cache too while holding the lock */
-               proc->subxids.nxids = 0;
-               proc->subxids.overflowed = false;
+               /* Advance commitseqno and get our value */
+               proc->commitseqno = (++pArray->commitseqno);
 
                /* Also advance global latestCompletedXid while holding the lock */
                if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
                                                                  latestXid))
                        ShmemVariableCache->latestCompletedXid = latestXid;
 
-               LWLockRelease(ProcArrayLock);
+               SpinLockRelease(&pArray->commitseqno_lck);
        }
        else
        {
@@ -423,6 +489,7 @@ ProcArrayClearTransaction(PGPROC *proc)
        proc->lxid = InvalidLocalTransactionId;
        proc->xmin = InvalidTransactionId;
        proc->recoveryConflictPending = false;
+       proc->commitseqno = 0;
 
        /* redundant, but just in case */
        proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
@@ -818,6 +885,17 @@ TransactionIdIsInProgress(TransactionId xid)
                if (proc == MyProc)
                        continue;
 
+               if (proc->commitseqno != 0)
+               {
+                       /*
+                        * Perhaps we should check the xid anyway. If it matches, we
+                        * know that this xid just committed, so it's definitely not
+                        * in-progress anymore, and we wouldn't need to go through the
+                        * rest of the procarray.
+                        */
+                       continue;
+               }
+
                /* Fetch xid just once - see GetNewTransactionId */
                pxid = proc->xid;
 
@@ -973,6 +1051,9 @@ TransactionIdIsActive(TransactionId xid)
                if (!TransactionIdIsValid(pxid))
                        continue;
 
+               if (proc->commitseqno != 0)
+                       continue;
+
                if (proc->pid == 0)
                        continue;                       /* ignore prepared transactions */
 
@@ -1065,6 +1146,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
                if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM))
                        continue;
 
+               if (proc->commitseqno != 0)
+                       continue;
+
                if (allDbs ||
                        proc->databaseId == MyDatabaseId ||
                        proc->databaseId == 0)          /* always include WalSender */
@@ -1193,13 +1277,15 @@ Snapshot
 GetSnapshotData(Snapshot snapshot)
 {
        ProcArrayStruct *arrayP = procArray;
+       volatile ProcArrayStruct *pArray = procArray;
        TransactionId xmin;
        TransactionId xmax;
        TransactionId globalxmin;
        int                     index;
-       int                     count = 0;
-       int                     subcount = 0;
-       bool            suboverflowed = false;
+       int                     count;
+       int                     subcount;
+       bool            suboverflowed;
+       uint64          lastcommitseqno;
 
        Assert(snapshot != NULL);
 
@@ -1235,14 +1321,36 @@ GetSnapshotData(Snapshot snapshot)
                                         errmsg("out of memory")));
        }
 
+retry:
+       count = subcount = 0;
+       suboverflowed = false;
+
        /*
         * It is sufficient to get shared lock on ProcArrayLock, even if we are
         * going to set MyProc->xmin.
+        *
+        * Other backends might begin or end transactions while we're busy
+        * getting our snapshot. Anyone who begins after we get latestCompletedXid
+        * below will be ignored, because his xid will be > xmax. Anyone who
+        * commits or aborts while we're getting the snapshot will have
+        * proc->commitseqno set to a value higher than the lastcommitseqno we
+        * get below, so we will still consider them as running. See also
+        * ProcArrayEndTransaction().
+        *
+        * ProcArrayLock is still needed to prevent the number of procs in the
+        * procarray from changing under us.
         */
        LWLockAcquire(ProcArrayLock, LW_SHARED);
 
+       SpinLockAcquire(&pArray->commitseqno_lck);
+
        /* xmax is always latestCompletedXid + 1 */
        xmax = ShmemVariableCache->latestCompletedXid;
+
+       lastcommitseqno = pArray->commitseqno;
+
+       SpinLockRelease(&pArray->commitseqno_lck);
+
        Assert(TransactionIdIsNormal(xmax));
        TransactionIdAdvance(xmax);
 
@@ -1273,11 +1381,21 @@ GetSnapshotData(Snapshot snapshot)
                {
                        volatile PGPROC *proc = arrayP->procs[index];
                        TransactionId xid;
+                       uint64 commitseqno;
 
                        /* Ignore procs running LAZY VACUUM */
                        if (proc->vacuumFlags & PROC_IN_VACUUM)
                                continue;
 
+                       /*
+                        * If this transaction committed before we began taking our
+                        * snapshot, we see it as committed. Otherwise we still
+                        * consider it in-progress.
+                        */
+                       commitseqno = proc->commitseqno; /* fetch just once. XXX: this should be atomic */
+                       if (commitseqno != 0 && commitseqno <= lastcommitseqno)
+                               continue;
+
                        /* Update globalxmin to be the smallest valid xmin */
                        xid = proc->xmin;       /* fetch just once */
                        if (TransactionIdIsNormal(xid) &&
@@ -1372,9 +1490,26 @@ GetSnapshotData(Snapshot snapshot)
                        suboverflowed = true;
        }
 
+       SpinLockAcquire(&pArray->commitseqno_lck);
+
+       if (pArray->commitseqno_removed > lastcommitseqno)
+       {
+               /*
+                * Oops, someone cleaned up an xid that we should still see as
+                * visible, while we were getting the snapshot. Our snapshot might be
+                * incomplete. Have to retry; hopefully this doesn't happen very
+                * often in practice.
+                */
+               SpinLockRelease(&pArray->commitseqno_lck);
+               LWLockRelease(ProcArrayLock);
+               goto retry;
+       }
+
        if (!TransactionIdIsValid(MyProc->xmin))
                MyProc->xmin = TransactionXmin = xmin;
 
+       SpinLockRelease(&pArray->commitseqno_lck);
+
        LWLockRelease(ProcArrayLock);
 
        /*
@@ -1576,6 +1711,9 @@ GetRunningTransactionData(void)
                if (!TransactionIdIsValid(xid))
                        continue;
 
+               if (proc->commitseqno != 0)
+                       continue;
+
                xids[count++] = xid;
 
                if (TransactionIdPrecedes(xid, oldestRunningXid))
@@ -1662,6 +1800,9 @@ GetOldestActiveTransactionId(void)
                if (!TransactionIdIsNormal(xid))
                        continue;
 
+               if (proc->commitseqno != 0)
+                       continue;
+
                if (TransactionIdPrecedes(xid, oldestRunningXid))
                        oldestRunningXid = xid;
 
@@ -1835,6 +1976,9 @@ BackendXidGetPid(TransactionId xid)
        {
                volatile PGPROC *proc = arrayP->procs[index];
 
+               if (proc->commitseqno != 0)
+                       continue;
+
                if (proc->xid == xid)
                {
                        result = proc->pid;
@@ -2126,6 +2270,8 @@ MinimumActiveBackends(int min)
                        continue;                       /* do not count prepared xacts */
                if (proc->xid == InvalidTransactionId)
                        continue;                       /* do not count if no XID assigned */
+               if (proc->commitseqno != 0)
+                       continue;                       /* do not count if already committed */
                if (proc->waitLock != NULL)
                        continue;                       /* do not count if blocked on a lock */
                count++;
index eda3a98a85b0ba14ed82aa1570fe758b895dfa37..bd445241c25948dc2e65131601196e187ec664f1 100644 (file)
@@ -314,6 +314,7 @@ InitProcess(void)
        MyProc->waitStatus = STATUS_OK;
        MyProc->lxid = InvalidLocalTransactionId;
        MyProc->xid = InvalidTransactionId;
+       MyProc->commitseqno = 0;
        MyProc->xmin = InvalidTransactionId;
        MyProc->pid = MyProcPid;
        /* backendId, databaseId and roleId will be filled in later */
@@ -473,6 +474,7 @@ InitAuxiliaryProcess(void)
        MyProc->waitStatus = STATUS_OK;
        MyProc->lxid = InvalidLocalTransactionId;
        MyProc->xid = InvalidTransactionId;
+       MyProc->commitseqno = 0;
        MyProc->xmin = InvalidTransactionId;
        MyProc->backendId = InvalidBackendId;
        MyProc->databaseId = InvalidOid;
index 6e798b1b2d9248e69fe157764b9e3d08365a58a9..428a3e8bc2ddc262303fd05ef2d1ae11a4bc2581 100644 (file)
@@ -95,6 +95,7 @@ struct PGPROC
                                                                 * starting our xact, excluding LAZY VACUUM:
                                                                 * vacuum must not remove tuples deleted by
                                                                 * xid >= xmin ! */
+       uint64          commitseqno;
 
        int                     pid;                    /* Backend's process ID; 0 if prepared xact */
 
index 3e80cc5c075cd97e400dc35b77ba2a2d49e3454e..84b39e50496b40ee1115683e4b35b9b12e7bd95d 100644 (file)
@@ -25,6 +25,7 @@ extern void ProcArrayRemove(PGPROC *proc, TransactionId latestXid);
 
 extern void ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid);
 extern void ProcArrayClearTransaction(PGPROC *proc);
+extern void ProcArrayCleanupTransaction(PGPROC *proc);
 
 extern void ProcArrayApplyRecoveryInfo(RunningTransactions running);
 extern void ProcArrayApplyXidAssignment(TransactionId topxid,