#include "port/pg_lfind.h"
#include "storage/proc.h"
#include "storage/procarray.h"
-#include "storage/spin.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/rel.h"
int numKnownAssignedXids; /* current # of valid entries */
int tailKnownAssignedXids; /* index of oldest valid element */
int headKnownAssignedXids; /* index of newest element, + 1 */
- slock_t known_assigned_xids_lck; /* protects head/tail pointers */
/*
* Highest subxid that has been removed from KnownAssignedXids array to
procArray->numKnownAssignedXids = 0;
procArray->tailKnownAssignedXids = 0;
procArray->headKnownAssignedXids = 0;
- SpinLockInit(&procArray->known_assigned_xids_lck);
procArray->lastOverflowedXid = InvalidTransactionId;
procArray->replication_slot_xmin = InvalidTransactionId;
procArray->replication_slot_catalog_xmin = InvalidTransactionId;
* during normal running). Compressing unused entries out of the array
* likewise requires exclusive lock. To add XIDs to the array, we just insert
* them into slots to the right of the head pointer and then advance the head
- * pointer. This wouldn't require any lock at all, except that on machines
- * with weak memory ordering we need to be careful that other processors
- * see the array element changes before they see the head pointer change.
- * We handle this by using a spinlock to protect reads and writes of the
- * head/tail pointers. (We could dispense with the spinlock if we were to
- * create suitable memory access barrier primitives and use those instead.)
- * The spinlock must be taken to read or write the head/tail pointers unless
- * the caller holds ProcArrayLock exclusively.
+ * pointer. This doesn't require any lock at all, but on machines with weak
+ * memory ordering, we need to be careful that other processors see the array
+ * element changes before they see the head pointer change. We handle this by
+ * using memory barriers when reading or writing the head/tail pointers (unless
+ * the caller holds ProcArrayLock exclusively).
*
* Algorithmic analysis:
*
* If we have a maximum of M slots, with N XIDs currently spread across
* S elements then we have N <= S <= M always.
*
- * * Adding a new XID is O(1) and needs little locking (unless compression
- * must happen)
+ * * Adding a new XID is O(1) and needs no lock (unless compression must
+ * happen)
* * Compressing the array is O(S) and requires exclusive lock
* * Removing an XID is O(logS) and requires exclusive lock
* * Taking a snapshot is O(S) and requires shared lock
pArray->numKnownAssignedXids += nxids;
/*
- * Now update the head pointer. We use a spinlock to protect this
- * pointer, not because the update is likely to be non-atomic, but to
- * ensure that other processors see the above array updates before they
- * see the head pointer change.
- *
- * If we're holding ProcArrayLock exclusively, there's no need to take the
- * spinlock.
+ * Now update the head pointer. We use a write barrier to ensure that
+ * other processors see the above array updates before they see the head
+ * pointer change. The barrier isn't required if we're holding
+ * ProcArrayLock exclusively.
*/
- if (exclusive_lock)
- pArray->headKnownAssignedXids = head;
- else
- {
- SpinLockAcquire(&pArray->known_assigned_xids_lck);
- pArray->headKnownAssignedXids = head;
- SpinLockRelease(&pArray->known_assigned_xids_lck);
- }
+ if (!exclusive_lock)
+ pg_write_barrier();
+
+ pArray->headKnownAssignedXids = head;
}
/*
int tail;
int result_index = -1;
- if (remove)
- {
- /* we hold ProcArrayLock exclusively, so no need for spinlock */
- tail = pArray->tailKnownAssignedXids;
- head = pArray->headKnownAssignedXids;
- }
- else
- {
- /* take spinlock to ensure we see up-to-date array contents */
- SpinLockAcquire(&pArray->known_assigned_xids_lck);
- tail = pArray->tailKnownAssignedXids;
- head = pArray->headKnownAssignedXids;
- SpinLockRelease(&pArray->known_assigned_xids_lck);
- }
+ tail = pArray->tailKnownAssignedXids;
+ head = pArray->headKnownAssignedXids;
+
+ /*
+ * Only the startup process removes entries, so we don't need the read
+ * barrier in that case.
+ */
+ if (!remove)
+ pg_read_barrier(); /* pairs with KnownAssignedXidsAdd */
/*
* Standard binary search. Note we can ignore the KnownAssignedXidsValid
* cannot enter and then leave the array while we hold ProcArrayLock. We
* might miss newly-added xids, but they should be >= xmax so irrelevant
* anyway.
- *
- * Must take spinlock to ensure we see up-to-date array contents.
*/
- SpinLockAcquire(&procArray->known_assigned_xids_lck);
tail = procArray->tailKnownAssignedXids;
head = procArray->headKnownAssignedXids;
- SpinLockRelease(&procArray->known_assigned_xids_lck);
+
+ pg_read_barrier(); /* pairs with KnownAssignedXidsAdd */
for (i = tail; i < head; i++)
{
/*
* Fetch head just once, since it may change while we loop.
*/
- SpinLockAcquire(&procArray->known_assigned_xids_lck);
tail = procArray->tailKnownAssignedXids;
head = procArray->headKnownAssignedXids;
- SpinLockRelease(&procArray->known_assigned_xids_lck);
+
+ pg_read_barrier(); /* pairs with KnownAssignedXidsAdd */
for (i = tail; i < head; i++)
{