From: Andres Freund Date: Fri, 20 Jan 2023 02:50:01 +0000 (-0800) Subject: Use dlists instead of SHM_QUEUE for predicate locking X-Git-Tag: REL_16_BETA1~883 X-Git-Url: http://git.postgresql.org/gitweb/?a=commitdiff_plain;h=9600371764583c80f3d94957e3d16daa2661154b;p=postgresql.git Use dlists instead of SHM_QUEUE for predicate locking Part of a series to remove SHM_QUEUE. ilist.h style lists are more widely used and have an easier to use interface. Reviewed-by: Thomas Munro (in an older version) Discussion: https://postgr.es/m/20221120055930.t6kl3tyivzhlrzu2@awork3.anarazel.de Discussion: https://postgr.es/m/20200211042229.msv23badgqljrdg2@alap3.anarazel.de --- diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 327adef5d3c..11decb74b2a 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -260,7 +260,7 @@ #define NPREDICATELOCKTARGETENTS() \ mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts)) -#define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink))) +#define SxactIsOnFinishedList(sxact) (!dlist_node_is_detached(&(sxact)->finishedLink)) /* * Note that a sxact is marked "prepared" once it has passed @@ -392,7 +392,7 @@ static RWConflictPoolHeader RWConflictPool; static HTAB *SerializableXidHash; static HTAB *PredicateLockTargetHash; static HTAB *PredicateLockHash; -static SHM_QUEUE *FinishedSerializableTransactions; +static dlist_head *FinishedSerializableTransactions; /* * Tag for a dummy entry in PredicateLockTargetHash. By temporarily removing @@ -430,8 +430,6 @@ static SERIALIZABLEXACT *SavedSerializableXact = InvalidSerializableXact; static SERIALIZABLEXACT *CreatePredXact(void); static void ReleasePredXact(SERIALIZABLEXACT *sxact); -static SERIALIZABLEXACT *FirstPredXact(void); -static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact); static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer); static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); @@ -579,69 +577,24 @@ SerializationNeededForWrite(Relation relation) static SERIALIZABLEXACT * CreatePredXact(void) { - PredXactListElement ptle; + SERIALIZABLEXACT *sxact; - ptle = (PredXactListElement) - SHMQueueNext(&PredXact->availableList, - &PredXact->availableList, - offsetof(PredXactListElementData, link)); - if (!ptle) + if (dlist_is_empty(&PredXact->availableList)) return NULL; - SHMQueueDelete(&ptle->link); - SHMQueueInsertBefore(&PredXact->activeList, &ptle->link); - return &ptle->sxact; + sxact = dlist_container(SERIALIZABLEXACT, xactLink, + dlist_pop_head_node(&PredXact->availableList)); + dlist_push_tail(&PredXact->activeList, &sxact->xactLink); + return sxact; } static void ReleasePredXact(SERIALIZABLEXACT *sxact) { - PredXactListElement ptle; - Assert(ShmemAddrIsValid(sxact)); - ptle = (PredXactListElement) - (((char *) sxact) - - offsetof(PredXactListElementData, sxact) - + offsetof(PredXactListElementData, link)); - SHMQueueDelete(&ptle->link); - SHMQueueInsertBefore(&PredXact->availableList, &ptle->link); -} - -static SERIALIZABLEXACT * -FirstPredXact(void) -{ - PredXactListElement ptle; - - ptle = (PredXactListElement) - SHMQueueNext(&PredXact->activeList, - &PredXact->activeList, - offsetof(PredXactListElementData, link)); - if (!ptle) - return NULL; - - return &ptle->sxact; -} - -static SERIALIZABLEXACT * -NextPredXact(SERIALIZABLEXACT *sxact) -{ - PredXactListElement ptle; - - Assert(ShmemAddrIsValid(sxact)); - - ptle = (PredXactListElement) - (((char *) sxact) - - offsetof(PredXactListElementData, sxact) - + offsetof(PredXactListElementData, link)); - ptle = (PredXactListElement) - SHMQueueNext(&PredXact->activeList, - &ptle->link, - offsetof(PredXactListElementData, link)); - if (!ptle) - return NULL; - - return &ptle->sxact; + dlist_delete(&sxact->xactLink); + dlist_push_tail(&PredXact->availableList, &sxact->xactLink); } /*------------------------------------------------------------------------*/ @@ -652,30 +605,30 @@ NextPredXact(SERIALIZABLEXACT *sxact) static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer) { - RWConflict conflict; + dlist_iter iter; Assert(reader != writer); /* Check the ends of the purported conflict first. */ if (SxactIsDoomed(reader) || SxactIsDoomed(writer) - || SHMQueueEmpty(&reader->outConflicts) - || SHMQueueEmpty(&writer->inConflicts)) + || dlist_is_empty(&reader->outConflicts) + || dlist_is_empty(&writer->inConflicts)) return false; - /* A conflict is possible; walk the list to find out. */ - conflict = (RWConflict) - SHMQueueNext(&reader->outConflicts, - &reader->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + /* + * A conflict is possible; walk the list to find out. + * + * The unconstify is needed as we have no const version of + * dlist_foreach(). + */ + dlist_foreach(iter, &unconstify(SERIALIZABLEXACT *, reader)->outConflicts) { + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); + if (conflict->sxactIn == writer) return true; - conflict = (RWConflict) - SHMQueueNext(&reader->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); } /* No conflict found. */ @@ -690,22 +643,19 @@ SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer) Assert(reader != writer); Assert(!RWConflictExists(reader, writer)); - conflict = (RWConflict) - SHMQueueNext(&RWConflictPool->availableList, - &RWConflictPool->availableList, - offsetof(RWConflictData, outLink)); - if (!conflict) + if (dlist_is_empty(&RWConflictPool->availableList)) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("not enough elements in RWConflictPool to record a read/write conflict"), errhint("You might need to run fewer transactions at a time or increase max_connections."))); - SHMQueueDelete(&conflict->outLink); + conflict = dlist_head_element(RWConflictData, outLink, &RWConflictPool->availableList); + dlist_delete(&conflict->outLink); conflict->sxactOut = reader; conflict->sxactIn = writer; - SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink); - SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink); + dlist_push_tail(&reader->outConflicts, &conflict->outLink); + dlist_push_tail(&writer->inConflicts, &conflict->inLink); } static void @@ -718,39 +668,33 @@ SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, Assert(SxactIsReadOnly(roXact)); Assert(!SxactIsReadOnly(activeXact)); - conflict = (RWConflict) - SHMQueueNext(&RWConflictPool->availableList, - &RWConflictPool->availableList, - offsetof(RWConflictData, outLink)); - if (!conflict) + if (dlist_is_empty(&RWConflictPool->availableList)) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"), errhint("You might need to run fewer transactions at a time or increase max_connections."))); - SHMQueueDelete(&conflict->outLink); + conflict = dlist_head_element(RWConflictData, outLink, &RWConflictPool->availableList); + dlist_delete(&conflict->outLink); conflict->sxactOut = activeXact; conflict->sxactIn = roXact; - SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts, - &conflict->outLink); - SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts, - &conflict->inLink); + dlist_push_tail(&activeXact->possibleUnsafeConflicts, &conflict->outLink); + dlist_push_tail(&roXact->possibleUnsafeConflicts, &conflict->inLink); } static void ReleaseRWConflict(RWConflict conflict) { - SHMQueueDelete(&conflict->inLink); - SHMQueueDelete(&conflict->outLink); - SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink); + dlist_delete(&conflict->inLink); + dlist_delete(&conflict->outLink); + dlist_push_tail(&RWConflictPool->availableList, &conflict->outLink); } static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact) { - RWConflict conflict, - nextConflict; + dlist_mutable_iter iter; Assert(SxactIsReadOnly(sxact)); Assert(!SxactIsROSafe(sxact)); @@ -761,23 +705,15 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact) * We know this isn't a safe snapshot, so we can stop looking for other * potential conflicts. */ - conflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &sxact->possibleUnsafeConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) + dlist_foreach_modify(iter, &sxact->possibleUnsafeConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); Assert(!SxactIsReadOnly(conflict->sxactOut)); Assert(sxact == conflict->sxactIn); ReleaseRWConflict(conflict); - - conflict = nextConflict; } } @@ -1242,8 +1178,8 @@ InitPredicateLocks(void) { int i; - SHMQueueInit(&PredXact->availableList); - SHMQueueInit(&PredXact->activeList); + dlist_init(&PredXact->availableList); + dlist_init(&PredXact->activeList); PredXact->SxactGlobalXmin = InvalidTransactionId; PredXact->SxactGlobalXminCount = 0; PredXact->WritableSxactCount = 0; @@ -1251,27 +1187,26 @@ InitPredicateLocks(void) PredXact->CanPartialClearThrough = 0; PredXact->HavePartialClearedThrough = 0; requestSize = mul_size((Size) max_table_size, - PredXactListElementDataSize); + sizeof(SERIALIZABLEXACT)); PredXact->element = ShmemAlloc(requestSize); /* Add all elements to available list, clean. */ memset(PredXact->element, 0, requestSize); for (i = 0; i < max_table_size; i++) { - LWLockInitialize(&PredXact->element[i].sxact.perXactPredicateListLock, + LWLockInitialize(&PredXact->element[i].perXactPredicateListLock, LWTRANCHE_PER_XACT_PREDICATE_LIST); - SHMQueueInsertBefore(&(PredXact->availableList), - &(PredXact->element[i].link)); + dlist_push_tail(&PredXact->availableList, &PredXact->element[i].xactLink); } PredXact->OldCommittedSxact = CreatePredXact(); SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid); PredXact->OldCommittedSxact->prepareSeqNo = 0; PredXact->OldCommittedSxact->commitSeqNo = 0; PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0; - SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts); - SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts); - SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks); - SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink); - SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts); + dlist_init(&PredXact->OldCommittedSxact->outConflicts); + dlist_init(&PredXact->OldCommittedSxact->inConflicts); + dlist_init(&PredXact->OldCommittedSxact->predicateLocks); + dlist_node_init(&PredXact->OldCommittedSxact->finishedLink); + dlist_init(&PredXact->OldCommittedSxact->possibleUnsafeConflicts); PredXact->OldCommittedSxact->topXid = InvalidTransactionId; PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId; PredXact->OldCommittedSxact->xmin = InvalidTransactionId; @@ -1317,7 +1252,7 @@ InitPredicateLocks(void) { int i; - SHMQueueInit(&RWConflictPool->availableList); + dlist_init(&RWConflictPool->availableList); requestSize = mul_size((Size) max_table_size, RWConflictDataSize); RWConflictPool->element = ShmemAlloc(requestSize); @@ -1325,8 +1260,8 @@ InitPredicateLocks(void) memset(RWConflictPool->element, 0, requestSize); for (i = 0; i < max_table_size; i++) { - SHMQueueInsertBefore(&(RWConflictPool->availableList), - &(RWConflictPool->element[i].outLink)); + dlist_push_tail(&RWConflictPool->availableList, + &RWConflictPool->element[i].outLink); } } @@ -1334,13 +1269,13 @@ InitPredicateLocks(void) * Create or attach to the header for the list of finished serializable * transactions. */ - FinishedSerializableTransactions = (SHM_QUEUE *) + FinishedSerializableTransactions = (dlist_head *) ShmemInitStruct("FinishedSerializableTransactions", - sizeof(SHM_QUEUE), + sizeof(dlist_head), &found); Assert(found == IsUnderPostmaster); if (!found) - SHMQueueInit(FinishedSerializableTransactions); + dlist_init(FinishedSerializableTransactions); /* * Initialize the SLRU storage for old committed serializable @@ -1379,7 +1314,7 @@ PredicateLockShmemSize(void) max_table_size *= 10; size = add_size(size, PredXactListDataSize); size = add_size(size, mul_size((Size) max_table_size, - PredXactListElementDataSize)); + sizeof(SERIALIZABLEXACT))); /* transaction xid table */ size = add_size(size, hash_estimate_size(max_table_size, @@ -1392,7 +1327,7 @@ PredicateLockShmemSize(void) RWConflictDataSize)); /* Head for list of finished serializable transactions. */ - size = add_size(size, sizeof(SHM_QUEUE)); + size = add_size(size, sizeof(dlist_head)); /* Shared memory structures for SLRU tracking of old committed xids. */ size = add_size(size, sizeof(SerialControlData)); @@ -1515,7 +1450,7 @@ SummarizeOldestCommittedSxact(void) * that case, we have nothing to do here. The caller will find one of the * slots released by the other backend when it retries. */ - if (SHMQueueEmpty(FinishedSerializableTransactions)) + if (dlist_is_empty(FinishedSerializableTransactions)) { LWLockRelease(SerializableFinishedListLock); return; @@ -1525,11 +1460,9 @@ SummarizeOldestCommittedSxact(void) * Grab the first sxact off the finished list -- this will be the earliest * commit. Remove it from the list. */ - sxact = (SERIALIZABLEXACT *) - SHMQueueNext(FinishedSerializableTransactions, - FinishedSerializableTransactions, - offsetof(SERIALIZABLEXACT, finishedLink)); - SHMQueueDelete(&(sxact->finishedLink)); + sxact = dlist_head_element(SERIALIZABLEXACT, finishedLink, + FinishedSerializableTransactions); + dlist_delete_thoroughly(&sxact->finishedLink); /* Add to SLRU summary information. */ if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact)) @@ -1583,7 +1516,7 @@ GetSafeSnapshot(Snapshot origSnapshot) * them marked us as conflicted. */ MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING; - while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) || + while (!(dlist_is_empty(&MySerializableXact->possibleUnsafeConflicts) || SxactIsROUnsafe(MySerializableXact))) { LWLockRelease(SerializableXactHashLock); @@ -1629,13 +1562,16 @@ int GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size) { int num_written = 0; - SERIALIZABLEXACT *sxact; + dlist_iter iter; + SERIALIZABLEXACT *sxact = NULL; LWLockAcquire(SerializableXactHashLock, LW_SHARED); /* Find blocked_pid's SERIALIZABLEXACT by linear search. */ - for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact)) + dlist_foreach(iter, &PredXact->activeList) { + sxact = dlist_container(SERIALIZABLEXACT, xactLink, iter.cur); + if (sxact->pid == blocked_pid) break; } @@ -1643,21 +1579,13 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size) /* Did we find it, and is it currently waiting in GetSafeSnapshot? */ if (sxact != NULL && SxactIsDeferrableWaiting(sxact)) { - RWConflict possibleUnsafeConflict; - /* Traverse the list of possible unsafe conflicts collecting PIDs. */ - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &sxact->possibleUnsafeConflicts, - offsetof(RWConflictData, inLink)); - - while (possibleUnsafeConflict != NULL && num_written < output_size) + dlist_foreach(iter, &sxact->possibleUnsafeConflicts) { + RWConflict possibleUnsafeConflict = + dlist_container(RWConflictData, inLink, iter.cur); + output[num_written++] = possibleUnsafeConflict->sxactOut->pid; - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &possibleUnsafeConflict->inLink, - offsetof(RWConflictData, inLink)); } } @@ -1870,19 +1798,21 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo; sxact->prepareSeqNo = InvalidSerCommitSeqNo; sxact->commitSeqNo = InvalidSerCommitSeqNo; - SHMQueueInit(&(sxact->outConflicts)); - SHMQueueInit(&(sxact->inConflicts)); - SHMQueueInit(&(sxact->possibleUnsafeConflicts)); + dlist_init(&(sxact->outConflicts)); + dlist_init(&(sxact->inConflicts)); + dlist_init(&(sxact->possibleUnsafeConflicts)); sxact->topXid = GetTopTransactionIdIfAny(); sxact->finishedBefore = InvalidTransactionId; sxact->xmin = snapshot->xmin; sxact->pid = MyProcPid; sxact->pgprocno = MyProc->pgprocno; - SHMQueueInit(&(sxact->predicateLocks)); - SHMQueueElemInit(&(sxact->finishedLink)); + dlist_init(&sxact->predicateLocks); + dlist_node_init(&sxact->finishedLink); sxact->flags = 0; if (XactReadOnly) { + dlist_iter iter; + sxact->flags |= SXACT_FLAG_READ_ONLY; /* @@ -1891,10 +1821,10 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, * transactions then this snapshot can be deemed safe (and we can run * without tracking predicate locks). */ - for (othersxact = FirstPredXact(); - othersxact != NULL; - othersxact = NextPredXact(othersxact)) + dlist_foreach(iter, &PredXact->activeList) { + othersxact = dlist_container(SERIALIZABLEXACT, xactLink, iter.cur); + if (!SxactIsCommitted(othersxact) && !SxactIsDoomed(othersxact) && !SxactIsReadOnly(othersxact)) @@ -2171,7 +2101,7 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) Assert(LWLockHeldByMe(SerializablePredicateListLock)); /* Can't remove it until no locks at this target. */ - if (!SHMQueueEmpty(&target->predicateLocks)) + if (!dlist_is_empty(&target->predicateLocks)) return; /* Actually remove the target. */ @@ -2199,28 +2129,20 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) { SERIALIZABLEXACT *sxact; PREDICATELOCK *predlock; + dlist_mutable_iter iter; LWLockAcquire(SerializablePredicateListLock, LW_SHARED); sxact = MySerializableXact; if (IsInParallelMode()) LWLockAcquire(&sxact->perXactPredicateListLock, LW_EXCLUSIVE); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(sxact->predicateLocks), - offsetof(PREDICATELOCK, xactLink)); - while (predlock) + + dlist_foreach_modify(iter, &sxact->predicateLocks) { - SHM_QUEUE *predlocksxactlink; - PREDICATELOCK *nextpredlock; PREDICATELOCKTAG oldlocktag; PREDICATELOCKTARGET *oldtarget; PREDICATELOCKTARGETTAG oldtargettag; - predlocksxactlink = &(predlock->xactLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - predlocksxactlink, - offsetof(PREDICATELOCK, xactLink)); + predlock = dlist_container(PREDICATELOCK, xactLink, iter.cur); oldlocktag = predlock->tag; Assert(oldlocktag.myXact == sxact); @@ -2238,8 +2160,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) LWLockAcquire(partitionLock, LW_EXCLUSIVE); - SHMQueueDelete(predlocksxactlink); - SHMQueueDelete(&(predlock->targetLink)); + dlist_delete(&predlock->xactLink); + dlist_delete(&predlock->targetLink); rmpredlock = hash_search_with_hash_value (PredicateLockHash, &oldlocktag, @@ -2254,8 +2176,6 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) DecrementParentLocks(&oldtargettag); } - - predlock = nextpredlock; } if (IsInParallelMode()) LWLockRelease(&sxact->perXactPredicateListLock); @@ -2472,7 +2392,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, errmsg("out of shared memory"), errhint("You might need to increase max_pred_locks_per_transaction."))); if (!found) - SHMQueueInit(&(target->predicateLocks)); + dlist_init(&target->predicateLocks); /* We've got the sxact and target, make sure they're joined. */ locktag.myTarget = target; @@ -2489,9 +2409,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, if (!found) { - SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink)); - SHMQueueInsertBefore(&(sxact->predicateLocks), - &(lock->xactLink)); + dlist_push_tail(&target->predicateLocks, &lock->targetLink); + dlist_push_tail(&sxact->predicateLocks, &lock->xactLink); lock->commitSeqNo = InvalidSerCommitSeqNo; } @@ -2663,30 +2582,22 @@ PredicateLockTID(Relation relation, ItemPointer tid, Snapshot snapshot, static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) { - PREDICATELOCK *predlock; - SHM_QUEUE *predlocktargetlink; - PREDICATELOCK *nextpredlock; - bool found; + dlist_mutable_iter iter; Assert(LWLockHeldByMeInMode(SerializablePredicateListLock, LW_EXCLUSIVE)); Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash))); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(target->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); - while (predlock) + + dlist_foreach_modify(iter, &target->predicateLocks) { - predlocktargetlink = &(predlock->targetLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - predlocktargetlink, - offsetof(PREDICATELOCK, targetLink)); + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); + bool found; - SHMQueueDelete(&(predlock->xactLink)); - SHMQueueDelete(&(predlock->targetLink)); + dlist_delete(&(predlock->xactLink)); + dlist_delete(&(predlock->targetLink)); hash_search_with_hash_value (PredicateLockHash, @@ -2695,8 +2606,6 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) targettaghash), HASH_REMOVE, &found); Assert(found); - - predlock = nextpredlock; } LWLockRelease(SerializableXactHashLock); @@ -2795,8 +2704,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, if (oldtarget) { PREDICATELOCKTARGET *newtarget; - PREDICATELOCK *oldpredlock; PREDICATELOCKTAG newpredlocktag; + dlist_mutable_iter iter; newtarget = hash_search_with_hash_value(PredicateLockTargetHash, &newtargettag, @@ -2812,7 +2721,7 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, /* If we created a new entry, initialize it */ if (!found) - SHMQueueInit(&(newtarget->predicateLocks)); + dlist_init(&newtarget->predicateLocks); newpredlocktag.myTarget = newtarget; @@ -2820,29 +2729,21 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, * Loop through all the locks on the old target, replacing them with * locks on the new target. */ - oldpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - &(oldtarget->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); - while (oldpredlock) + + dlist_foreach_modify(iter, &oldtarget->predicateLocks) { - SHM_QUEUE *predlocktargetlink; - PREDICATELOCK *nextpredlock; + PREDICATELOCK *oldpredlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); PREDICATELOCK *newpredlock; SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo; - predlocktargetlink = &(oldpredlock->targetLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - predlocktargetlink, - offsetof(PREDICATELOCK, targetLink)); newpredlocktag.myXact = oldpredlock->tag.myXact; if (removeOld) { - SHMQueueDelete(&(oldpredlock->xactLink)); - SHMQueueDelete(&(oldpredlock->targetLink)); + dlist_delete(&(oldpredlock->xactLink)); + dlist_delete(&(oldpredlock->targetLink)); hash_search_with_hash_value (PredicateLockHash, @@ -2870,10 +2771,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, } if (!found) { - SHMQueueInsertBefore(&(newtarget->predicateLocks), - &(newpredlock->targetLink)); - SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks), - &(newpredlock->xactLink)); + dlist_push_tail(&(newtarget->predicateLocks), + &(newpredlock->targetLink)); + dlist_push_tail(&(newpredlocktag.myXact->predicateLocks), + &(newpredlock->xactLink)); newpredlock->commitSeqNo = oldCommitSeqNo; } else @@ -2885,14 +2786,12 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, Assert(newpredlock->commitSeqNo != 0); Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo) || (newpredlock->tag.myXact == OldCommittedSxact)); - - oldpredlock = nextpredlock; } LWLockRelease(SerializableXactHashLock); if (removeOld) { - Assert(SHMQueueEmpty(&oldtarget->predicateLocks)); + Assert(dlist_is_empty(&oldtarget->predicateLocks)); RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash); } } @@ -3012,7 +2911,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat))) { - PREDICATELOCK *oldpredlock; + dlist_mutable_iter iter; /* * Check whether this is a target which needs attention. @@ -3047,29 +2946,21 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) heaptargettaghash, HASH_ENTER, &found); if (!found) - SHMQueueInit(&heaptarget->predicateLocks); + dlist_init(&heaptarget->predicateLocks); } /* * Loop through all the locks on the old target, replacing them with * locks on the new target. */ - oldpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - &(oldtarget->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); - while (oldpredlock) + dlist_foreach_modify(iter, &oldtarget->predicateLocks) { - PREDICATELOCK *nextpredlock; + PREDICATELOCK *oldpredlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); PREDICATELOCK *newpredlock; SerCommitSeqNo oldCommitSeqNo; SERIALIZABLEXACT *oldXact; - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - &(oldpredlock->targetLink), - offsetof(PREDICATELOCK, targetLink)); - /* * Remove the old lock first. This avoids the chance of running * out of lock structure entries for the hash table. @@ -3077,7 +2968,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) oldCommitSeqNo = oldpredlock->commitSeqNo; oldXact = oldpredlock->tag.myXact; - SHMQueueDelete(&(oldpredlock->xactLink)); + dlist_delete(&(oldpredlock->xactLink)); /* * No need for retail delete from oldtarget list, we're removing @@ -3103,10 +2994,10 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) &found); if (!found) { - SHMQueueInsertBefore(&(heaptarget->predicateLocks), - &(newpredlock->targetLink)); - SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks), - &(newpredlock->xactLink)); + dlist_push_tail(&(heaptarget->predicateLocks), + &(newpredlock->targetLink)); + dlist_push_tail(&(newpredlocktag.myXact->predicateLocks), + &(newpredlock->xactLink)); newpredlock->commitSeqNo = oldCommitSeqNo; } else @@ -3119,8 +3010,6 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo) || (newpredlock->tag.myXact == OldCommittedSxact)); } - - oldpredlock = nextpredlock; } hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE, @@ -3275,15 +3164,18 @@ PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, static void SetNewSxactGlobalXmin(void) { - SERIALIZABLEXACT *sxact; + dlist_iter iter; Assert(LWLockHeldByMe(SerializableXactHashLock)); PredXact->SxactGlobalXmin = InvalidTransactionId; PredXact->SxactGlobalXminCount = 0; - for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact)) + dlist_foreach(iter, &PredXact->activeList) { + SERIALIZABLEXACT *sxact = + dlist_container(SERIALIZABLEXACT, xactLink, iter.cur); + if (!SxactIsRolledBack(sxact) && !SxactIsCommitted(sxact) && sxact != OldCommittedSxact) @@ -3334,10 +3226,8 @@ void ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) { bool needToClear; - RWConflict conflict, - nextConflict, - possibleUnsafeConflict; SERIALIZABLEXACT *roXact; + dlist_mutable_iter iter; /* * We can't trust XactReadOnly here, because a transaction which started @@ -3525,23 +3415,15 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * make us unsafe. Note that we use 'inLink' for the iteration as * opposed to 'outLink' for the r/w xacts. */ - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &MySerializableXact->possibleUnsafeConflicts, - offsetof(RWConflictData, inLink)); - while (possibleUnsafeConflict) + dlist_foreach_modify(iter, &MySerializableXact->possibleUnsafeConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &possibleUnsafeConflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict possibleUnsafeConflict = + dlist_container(RWConflictData, inLink, iter.cur); Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut)); Assert(MySerializableXact == possibleUnsafeConflict->sxactIn); ReleaseRWConflict(possibleUnsafeConflict); - - possibleUnsafeConflict = nextConflict; } } @@ -3564,16 +3446,10 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to * previously committed transactions. */ - conflict = (RWConflict) - SHMQueueNext(&MySerializableXact->outConflicts, - &MySerializableXact->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_foreach_modify(iter, &MySerializableXact->outConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); if (isCommit && !SxactIsReadOnly(MySerializableXact) @@ -3589,31 +3465,21 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) || SxactIsCommitted(conflict->sxactIn) || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo)) ReleaseRWConflict(conflict); - - conflict = nextConflict; } /* * Release all inConflicts from committed and read-only transactions. If * we're rolling back, clear them all. */ - conflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &MySerializableXact->inConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) + dlist_foreach_modify(iter, &MySerializableXact->inConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); if (!isCommit || SxactIsCommitted(conflict->sxactOut) || SxactIsReadOnly(conflict->sxactOut)) ReleaseRWConflict(conflict); - - conflict = nextConflict; } if (!topLevelIsDeclaredReadOnly) @@ -3624,16 +3490,10 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * conflict out. If any are waiting DEFERRABLE transactions, wake them * up if they are known safe or known unsafe. */ - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &MySerializableXact->possibleUnsafeConflicts, - offsetof(RWConflictData, outLink)); - while (possibleUnsafeConflict) + dlist_foreach_modify(iter, &MySerializableXact->possibleUnsafeConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &possibleUnsafeConflict->outLink, - offsetof(RWConflictData, outLink)); + RWConflict possibleUnsafeConflict = + dlist_container(RWConflictData, outLink, iter.cur); roXact = possibleUnsafeConflict->sxactIn; Assert(MySerializableXact == possibleUnsafeConflict->sxactOut); @@ -3661,7 +3521,7 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * transaction can now safely release its predicate locks (but * that transaction's backend has to do that itself). */ - if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts)) + if (dlist_is_empty(&roXact->possibleUnsafeConflicts)) roXact->flags |= SXACT_FLAG_RO_SAFE; } @@ -3672,8 +3532,6 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) if (SxactIsDeferrableWaiting(roXact) && (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact))) ProcSendSignal(roXact->pgprocno); - - possibleUnsafeConflict = nextConflict; } } @@ -3701,8 +3559,8 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) /* Add this to the list of transactions to check for later cleanup. */ if (isCommit) - SHMQueueInsertBefore(FinishedSerializableTransactions, - &MySerializableXact->finishedLink); + dlist_push_tail(FinishedSerializableTransactions, + &MySerializableXact->finishedLink); /* * If we're releasing a RO_SAFE transaction in parallel mode, we'll only @@ -3744,27 +3602,19 @@ ReleasePredicateLocksLocal(void) static void ClearOldPredicateLocks(void) { - SERIALIZABLEXACT *finishedSxact; - PREDICATELOCK *predlock; + dlist_mutable_iter iter; /* * Loop through finished transactions. They are in commit order, so we can * stop as soon as we find one that's still interesting. */ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE); - finishedSxact = (SERIALIZABLEXACT *) - SHMQueueNext(FinishedSerializableTransactions, - FinishedSerializableTransactions, - offsetof(SERIALIZABLEXACT, finishedLink)); LWLockAcquire(SerializableXactHashLock, LW_SHARED); - while (finishedSxact) + dlist_foreach_modify(iter, FinishedSerializableTransactions) { - SERIALIZABLEXACT *nextSxact; + SERIALIZABLEXACT *finishedSxact = + dlist_container(SERIALIZABLEXACT, finishedLink, iter.cur); - nextSxact = (SERIALIZABLEXACT *) - SHMQueueNext(FinishedSerializableTransactions, - &(finishedSxact->finishedLink), - offsetof(SERIALIZABLEXACT, finishedLink)); if (!TransactionIdIsValid(PredXact->SxactGlobalXmin) || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore, PredXact->SxactGlobalXmin)) @@ -3774,7 +3624,7 @@ ClearOldPredicateLocks(void) * took its snapshot. It's no longer interesting. */ LWLockRelease(SerializableXactHashLock); - SHMQueueDelete(&(finishedSxact->finishedLink)); + dlist_delete_thoroughly(&finishedSxact->finishedLink); ReleaseOneSerializableXact(finishedSxact, false, false); LWLockAcquire(SerializableXactHashLock, LW_SHARED); } @@ -3791,7 +3641,7 @@ ClearOldPredicateLocks(void) if (SxactIsReadOnly(finishedSxact)) { /* A read-only transaction can be removed entirely */ - SHMQueueDelete(&(finishedSxact->finishedLink)); + dlist_delete_thoroughly(&(finishedSxact->finishedLink)); ReleaseOneSerializableXact(finishedSxact, false, false); } else @@ -3812,7 +3662,6 @@ ClearOldPredicateLocks(void) /* Still interesting. */ break; } - finishedSxact = nextSxact; } LWLockRelease(SerializableXactHashLock); @@ -3820,20 +3669,12 @@ ClearOldPredicateLocks(void) * Loop through predicate locks on dummy transaction for summarized data. */ LWLockAcquire(SerializablePredicateListLock, LW_SHARED); - predlock = (PREDICATELOCK *) - SHMQueueNext(&OldCommittedSxact->predicateLocks, - &OldCommittedSxact->predicateLocks, - offsetof(PREDICATELOCK, xactLink)); - while (predlock) + dlist_foreach_modify(iter, &OldCommittedSxact->predicateLocks) { - PREDICATELOCK *nextpredlock; + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, xactLink, iter.cur); bool canDoPartialCleanup; - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&OldCommittedSxact->predicateLocks, - &predlock->xactLink, - offsetof(PREDICATELOCK, xactLink)); - LWLockAcquire(SerializableXactHashLock, LW_SHARED); Assert(predlock->commitSeqNo != 0); Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo); @@ -3860,8 +3701,8 @@ ClearOldPredicateLocks(void) LWLockAcquire(partitionLock, LW_EXCLUSIVE); - SHMQueueDelete(&(predlock->targetLink)); - SHMQueueDelete(&(predlock->xactLink)); + dlist_delete(&(predlock->targetLink)); + dlist_delete(&(predlock->xactLink)); hash_search_with_hash_value(PredicateLockHash, &tag, PredicateLockHashCodeFromTargetHashCode(&tag, @@ -3871,8 +3712,6 @@ ClearOldPredicateLocks(void) LWLockRelease(partitionLock); } - - predlock = nextpredlock; } LWLockRelease(SerializablePredicateListLock); @@ -3902,10 +3741,8 @@ static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, bool summarize) { - PREDICATELOCK *predlock; SERIALIZABLEXIDTAG sxidtag; - RWConflict conflict, - nextConflict; + dlist_mutable_iter iter; Assert(sxact != NULL); Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact)); @@ -3919,27 +3756,17 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, LWLockAcquire(SerializablePredicateListLock, LW_SHARED); if (IsInParallelMode()) LWLockAcquire(&sxact->perXactPredicateListLock, LW_EXCLUSIVE); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(sxact->predicateLocks), - offsetof(PREDICATELOCK, xactLink)); - while (predlock) + dlist_foreach_modify(iter, &sxact->predicateLocks) { - PREDICATELOCK *nextpredlock; + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, xactLink, iter.cur); PREDICATELOCKTAG tag; - SHM_QUEUE *targetLink; PREDICATELOCKTARGET *target; PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; LWLock *partitionLock; - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(predlock->xactLink), - offsetof(PREDICATELOCK, xactLink)); - tag = predlock->tag; - targetLink = &(predlock->targetLink); target = tag.myTarget; targettag = target->tag; targettaghash = PredicateLockTargetTagHashCode(&targettag); @@ -3947,7 +3774,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, LWLockAcquire(partitionLock, LW_EXCLUSIVE); - SHMQueueDelete(targetLink); + dlist_delete(&predlock->targetLink); hash_search_with_hash_value(PredicateLockHash, &tag, PredicateLockHashCodeFromTargetHashCode(&tag, @@ -3977,10 +3804,10 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, } else { - SHMQueueInsertBefore(&(target->predicateLocks), - &(predlock->targetLink)); - SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks), - &(predlock->xactLink)); + dlist_push_tail(&target->predicateLocks, + &predlock->targetLink); + dlist_push_tail(&OldCommittedSxact->predicateLocks, + &predlock->xactLink); predlock->commitSeqNo = sxact->commitSeqNo; } } @@ -3988,15 +3815,13 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, RemoveTargetIfNoLongerUsed(target, targettaghash); LWLockRelease(partitionLock); - - predlock = nextpredlock; } /* * Rather than retail removal, just re-init the head after we've run * through the list. */ - SHMQueueInit(&sxact->predicateLocks); + dlist_init(&sxact->predicateLocks); if (IsInParallelMode()) LWLockRelease(&sxact->perXactPredicateListLock); @@ -4008,38 +3833,26 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, /* Release all outConflicts (unless 'partial' is true) */ if (!partial) { - conflict = (RWConflict) - SHMQueueNext(&sxact->outConflicts, - &sxact->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_foreach_modify(iter, &sxact->outConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&sxact->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); + if (summarize) conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN; ReleaseRWConflict(conflict); - conflict = nextConflict; } } /* Release all inConflicts. */ - conflict = (RWConflict) - SHMQueueNext(&sxact->inConflicts, - &sxact->inConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) + dlist_foreach_modify(iter, &sxact->inConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&sxact->inConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); + if (summarize) conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT; ReleaseRWConflict(conflict); - conflict = nextConflict; } /* Finally, get rid of the xid and the record of the transaction itself. */ @@ -4165,7 +3978,7 @@ CheckForSerializableConflictOut(Relation relation, TransactionId xid, Snapshot s errhint("The transaction might succeed if retried."))); if (SxactHasSummaryConflictIn(MySerializableXact) - || !SHMQueueEmpty(&MySerializableXact->inConflicts)) + || !dlist_is_empty(&MySerializableXact->inConflicts)) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), @@ -4261,9 +4074,9 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) uint32 targettaghash; LWLock *partitionLock; PREDICATELOCKTARGET *target; - PREDICATELOCK *predlock; PREDICATELOCK *mypredlock = NULL; PREDICATELOCKTAG mypredlocktag; + dlist_mutable_iter iter; Assert(MySerializableXact != InvalidSerializableXact); @@ -4288,24 +4101,14 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) * Each lock for an overlapping transaction represents a conflict: a * rw-dependency in to this transaction. */ - predlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(target->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); LWLockAcquire(SerializableXactHashLock, LW_SHARED); - while (predlock) - { - SHM_QUEUE *predlocktargetlink; - PREDICATELOCK *nextpredlock; - SERIALIZABLEXACT *sxact; - predlocktargetlink = &(predlock->targetLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - predlocktargetlink, - offsetof(PREDICATELOCK, targetLink)); + dlist_foreach_modify(iter, &target->predicateLocks) + { + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); + SERIALIZABLEXACT *sxact = predlock->tag.myXact; - sxact = predlock->tag.myXact; if (sxact == MySerializableXact) { /* @@ -4350,8 +4153,6 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) LWLockRelease(SerializableXactHashLock); LWLockAcquire(SerializableXactHashLock, LW_SHARED); } - - predlock = nextpredlock; } LWLockRelease(SerializableXactHashLock); LWLockRelease(partitionLock); @@ -4391,8 +4192,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) { Assert(rmpredlock == mypredlock); - SHMQueueDelete(&(mypredlock->targetLink)); - SHMQueueDelete(&(mypredlock->xactLink)); + dlist_delete(&(mypredlock->targetLink)); + dlist_delete(&(mypredlock->xactLink)); rmpredlock = (PREDICATELOCK *) hash_search_with_hash_value(PredicateLockHash, @@ -4562,7 +4363,7 @@ CheckTableForSerializableConflictIn(Relation relation) while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat))) { - PREDICATELOCK *predlock; + dlist_mutable_iter iter; /* * Check whether this is a target which needs attention. @@ -4575,26 +4376,16 @@ CheckTableForSerializableConflictIn(Relation relation) /* * Loop through locks for this target and flag conflicts. */ - predlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(target->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); - while (predlock) + dlist_foreach_modify(iter, &target->predicateLocks) { - PREDICATELOCK *nextpredlock; - - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(predlock->targetLink), - offsetof(PREDICATELOCK, targetLink)); + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); if (predlock->tag.myXact != MySerializableXact && !RWConflictExists(predlock->tag.myXact, MySerializableXact)) { FlagRWConflict(predlock->tag.myXact, MySerializableXact); } - - predlock = nextpredlock; } } @@ -4652,7 +4443,6 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer) { bool failure; - RWConflict conflict; Assert(LWLockHeldByMe(SerializableXactHashLock)); @@ -4692,20 +4482,16 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, * to abort. *------------------------------------------------------------------------ */ - if (!failure) + if (!failure && SxactHasSummaryConflictOut(writer)) + failure = true; + else if (!failure) { - if (SxactHasSummaryConflictOut(writer)) - { - failure = true; - conflict = NULL; - } - else - conflict = (RWConflict) - SHMQueueNext(&writer->outConflicts, - &writer->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_iter iter; + + dlist_foreach(iter, &writer->outConflicts) { + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); SERIALIZABLEXACT *t2 = conflict->sxactIn; if (SxactIsPrepared(t2) @@ -4719,10 +4505,6 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, failure = true; break; } - conflict = (RWConflict) - SHMQueueNext(&writer->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); } } @@ -4744,30 +4526,31 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, if (SxactHasSummaryConflictIn(reader)) { failure = true; - conflict = NULL; } else - conflict = (RWConflict) - SHMQueueNext(&reader->inConflicts, - &reader->inConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) { - SERIALIZABLEXACT *t0 = conflict->sxactOut; + dlist_iter iter; - if (!SxactIsDoomed(t0) - && (!SxactIsCommitted(t0) - || t0->commitSeqNo >= writer->prepareSeqNo) - && (!SxactIsReadOnly(t0) - || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo)) + /* + * The unconstify is needed as we have no const version of + * dlist_foreach(). + */ + dlist_foreach(iter, &unconstify(SERIALIZABLEXACT *, reader)->inConflicts) { - failure = true; - break; + const RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); + const SERIALIZABLEXACT *t0 = conflict->sxactOut; + + if (!SxactIsDoomed(t0) + && (!SxactIsCommitted(t0) + || t0->commitSeqNo >= writer->prepareSeqNo) + && (!SxactIsReadOnly(t0) + || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo)) + { + failure = true; + break; + } } - conflict = (RWConflict) - SHMQueueNext(&reader->inConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); } } @@ -4825,7 +4608,7 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, void PreCommit_CheckForSerializationFailure(void) { - RWConflict nearConflict; + dlist_iter near_iter; if (MySerializableXact == InvalidSerializableXact) return; @@ -4846,23 +4629,21 @@ PreCommit_CheckForSerializationFailure(void) errhint("The transaction might succeed if retried."))); } - nearConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &MySerializableXact->inConflicts, - offsetof(RWConflictData, inLink)); - while (nearConflict) + dlist_foreach(near_iter, &MySerializableXact->inConflicts) { + RWConflict nearConflict = + dlist_container(RWConflictData, inLink, near_iter.cur); + if (!SxactIsCommitted(nearConflict->sxactOut) && !SxactIsDoomed(nearConflict->sxactOut)) { - RWConflict farConflict; + dlist_iter far_iter; - farConflict = (RWConflict) - SHMQueueNext(&nearConflict->sxactOut->inConflicts, - &nearConflict->sxactOut->inConflicts, - offsetof(RWConflictData, inLink)); - while (farConflict) + dlist_foreach(far_iter, &nearConflict->sxactOut->inConflicts) { + RWConflict farConflict = + dlist_container(RWConflictData, inLink, far_iter.cur); + if (farConflict->sxactOut == MySerializableXact || (!SxactIsCommitted(farConflict->sxactOut) && !SxactIsReadOnly(farConflict->sxactOut) @@ -4886,17 +4667,8 @@ PreCommit_CheckForSerializationFailure(void) nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED; break; } - farConflict = (RWConflict) - SHMQueueNext(&nearConflict->sxactOut->inConflicts, - &farConflict->inLink, - offsetof(RWConflictData, inLink)); } } - - nearConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &nearConflict->inLink, - offsetof(RWConflictData, inLink)); } MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo); @@ -4919,11 +4691,11 @@ PreCommit_CheckForSerializationFailure(void) void AtPrepare_PredicateLocks(void) { - PREDICATELOCK *predlock; SERIALIZABLEXACT *sxact; TwoPhasePredicateRecord record; TwoPhasePredicateXactRecord *xactRecord; TwoPhasePredicateLockRecord *lockRecord; + dlist_iter iter; sxact = MySerializableXact; xactRecord = &(record.data.xactRecord); @@ -4963,23 +4735,16 @@ AtPrepare_PredicateLocks(void) */ Assert(!IsParallelWorker() && !ParallelContextActive()); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(sxact->predicateLocks), - offsetof(PREDICATELOCK, xactLink)); - - while (predlock != NULL) + dlist_foreach(iter, &sxact->predicateLocks) { + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, xactLink, iter.cur); + record.type = TWOPHASEPREDICATERECORD_LOCK; lockRecord->target = predlock->tag.myTarget->tag; RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0, &record, sizeof(record)); - - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(predlock->xactLink), - offsetof(PREDICATELOCK, xactLink)); } LWLockRelease(SerializablePredicateListLock); @@ -5091,10 +4856,10 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, * recovered xact started are still active, except possibly other * prepared xacts and we don't care whether those are RO_SAFE or not. */ - SHMQueueInit(&(sxact->possibleUnsafeConflicts)); + dlist_init(&(sxact->possibleUnsafeConflicts)); - SHMQueueInit(&(sxact->predicateLocks)); - SHMQueueElemInit(&(sxact->finishedLink)); + dlist_init(&(sxact->predicateLocks)); + dlist_node_init(&sxact->finishedLink); sxact->topXid = xid; sxact->xmin = xactRecord->xmin; @@ -5112,8 +4877,8 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, * we'll conservatively assume that it had both a conflict in and a * conflict out, and represent that with the summary conflict flags. */ - SHMQueueInit(&(sxact->outConflicts)); - SHMQueueInit(&(sxact->inConflicts)); + dlist_init(&(sxact->outConflicts)); + dlist_init(&(sxact->inConflicts)); sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN; sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT; diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h index 9e42e1a72c5..142a195d0e0 100644 --- a/src/include/storage/predicate_internals.h +++ b/src/include/storage/predicate_internals.h @@ -14,6 +14,7 @@ #ifndef PREDICATE_INTERNALS_H #define PREDICATE_INTERNALS_H +#include "lib/ilist.h" #include "storage/lock.h" #include "storage/lwlock.h" @@ -84,13 +85,14 @@ typedef struct SERIALIZABLEXACT SerCommitSeqNo lastCommitBeforeSnapshot; /* when not committed or * no conflict out */ } SeqNo; - SHM_QUEUE outConflicts; /* list of write transactions whose data we + dlist_head outConflicts; /* list of write transactions whose data we * couldn't read. */ - SHM_QUEUE inConflicts; /* list of read transactions which couldn't + dlist_head inConflicts; /* list of read transactions which couldn't * see our write. */ - SHM_QUEUE predicateLocks; /* list of associated PREDICATELOCK objects */ - SHM_QUEUE finishedLink; /* list link in + dlist_head predicateLocks; /* list of associated PREDICATELOCK objects */ + dlist_node finishedLink; /* list link in * FinishedSerializableTransactions */ + dlist_node xactLink; /* PredXact->activeList/availableList */ /* * perXactPredicateListLock is only used in parallel queries: it protects @@ -103,7 +105,7 @@ typedef struct SERIALIZABLEXACT * for r/o transactions: list of concurrent r/w transactions that we could * potentially have conflicts with, and vice versa for r/w transactions */ - SHM_QUEUE possibleUnsafeConflicts; + dlist_head possibleUnsafeConflicts; TransactionId topXid; /* top level xid for the transaction, if one * exists; else invalid */ @@ -139,28 +141,10 @@ typedef struct SERIALIZABLEXACT */ #define SXACT_FLAG_PARTIALLY_RELEASED 0x00000800 -/* - * The following types are used to provide an ad hoc list for holding - * SERIALIZABLEXACT objects. An HTAB is overkill, since there is no need to - * access these by key -- there are direct pointers to these objects where - * needed. If a shared memory list is created, these types can probably be - * eliminated in favor of using the general solution. - */ -typedef struct PredXactListElementData -{ - SHM_QUEUE link; - SERIALIZABLEXACT sxact; -} PredXactListElementData; - -typedef struct PredXactListElementData *PredXactListElement; - -#define PredXactListElementDataSize \ - ((Size)MAXALIGN(sizeof(PredXactListElementData))) - typedef struct PredXactListData { - SHM_QUEUE availableList; - SHM_QUEUE activeList; + dlist_head availableList; + dlist_head activeList; /* * These global variables are maintained when registering and cleaning up @@ -187,7 +171,7 @@ typedef struct PredXactListData * seq no */ SERIALIZABLEXACT *OldCommittedSxact; /* shared copy of dummy sxact */ - PredXactListElement element; + SERIALIZABLEXACT *element; } PredXactListData; typedef struct PredXactListData *PredXactList; @@ -208,8 +192,8 @@ typedef struct PredXactListData *PredXactList; */ typedef struct RWConflictData { - SHM_QUEUE outLink; /* link for list of conflicts out from a sxact */ - SHM_QUEUE inLink; /* link for list of conflicts in to a sxact */ + dlist_node outLink; /* link for list of conflicts out from a sxact */ + dlist_node inLink; /* link for list of conflicts in to a sxact */ SERIALIZABLEXACT *sxactOut; SERIALIZABLEXACT *sxactIn; } RWConflictData; @@ -221,7 +205,7 @@ typedef struct RWConflictData *RWConflict; typedef struct RWConflictPoolHeaderData { - SHM_QUEUE availableList; + dlist_head availableList; RWConflict element; } RWConflictPoolHeaderData; @@ -303,7 +287,7 @@ typedef struct PREDICATELOCKTARGET PREDICATELOCKTARGETTAG tag; /* unique identifier of lockable object */ /* data */ - SHM_QUEUE predicateLocks; /* list of PREDICATELOCK objects assoc. with + dlist_head predicateLocks; /* list of PREDICATELOCK objects assoc. with * predicate lock target */ } PREDICATELOCKTARGET; @@ -336,9 +320,9 @@ typedef struct PREDICATELOCK PREDICATELOCKTAG tag; /* unique identifier of lock */ /* data */ - SHM_QUEUE targetLink; /* list link in PREDICATELOCKTARGET's list of + dlist_node targetLink; /* list link in PREDICATELOCKTARGET's list of * predicate locks */ - SHM_QUEUE xactLink; /* list link in SERIALIZABLEXACT's list of + dlist_node xactLink; /* list link in SERIALIZABLEXACT's list of * predicate locks */ SerCommitSeqNo commitSeqNo; /* only used for summarized predicate locks */ } PREDICATELOCK;