Replace known_assigned_xids_lck with memory barriers.
authorNathan Bossart <[email protected]>
Tue, 5 Sep 2023 20:59:06 +0000 (13:59 -0700)
committerNathan Bossart <[email protected]>
Tue, 5 Sep 2023 20:59:06 +0000 (13:59 -0700)
This lock was introduced before memory barrier support was added,
and it is only used to guarantee proper memory ordering when
KnownAssignedXidsAdd() appends to the array without a lock.  Now
that such memory barrier support exists, we can remove the lock and
use barriers instead.

Suggested-by: Tom Lane
Author: Michail Nikolaev
Reviewed-by: Robert Haas
Discussion: https://postgr.es/m/CANtu0oh0si%3DjG5z_fLeFtmYcETssQ08kLEa8b6TQqDm_cinroA%40mail.gmail.com

src/backend/storage/ipc/procarray.c

index aa1552e0316f182e51c2056da669b20f7e0c68e7..bfbf7f903f55ab533b99beb172d5198cc1e9c065 100644 (file)
@@ -61,7 +61,6 @@
 #include "port/pg_lfind.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
-#include "storage/spin.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/rel.h"
@@ -82,7 +81,6 @@ typedef struct ProcArrayStruct
    int         numKnownAssignedXids;   /* current # of valid entries */
    int         tailKnownAssignedXids;  /* index of oldest valid element */
    int         headKnownAssignedXids;  /* index of newest element, + 1 */
-   slock_t     known_assigned_xids_lck;    /* protects head/tail pointers */
 
    /*
     * Highest subxid that has been removed from KnownAssignedXids array to
@@ -441,7 +439,6 @@ CreateSharedProcArray(void)
        procArray->numKnownAssignedXids = 0;
        procArray->tailKnownAssignedXids = 0;
        procArray->headKnownAssignedXids = 0;
-       SpinLockInit(&procArray->known_assigned_xids_lck);
        procArray->lastOverflowedXid = InvalidTransactionId;
        procArray->replication_slot_xmin = InvalidTransactionId;
        procArray->replication_slot_catalog_xmin = InvalidTransactionId;
@@ -4533,22 +4530,19 @@ KnownAssignedTransactionIdsIdleMaintenance(void)
  * during normal running).  Compressing unused entries out of the array
  * likewise requires exclusive lock.  To add XIDs to the array, we just insert
  * them into slots to the right of the head pointer and then advance the head
- * pointer.  This wouldn't require any lock at all, except that on machines
- * with weak memory ordering we need to be careful that other processors
- * see the array element changes before they see the head pointer change.
- * We handle this by using a spinlock to protect reads and writes of the
- * head/tail pointers.  (We could dispense with the spinlock if we were to
- * create suitable memory access barrier primitives and use those instead.)
- * The spinlock must be taken to read or write the head/tail pointers unless
- * the caller holds ProcArrayLock exclusively.
+ * pointer.  This doesn't require any lock at all, but on machines with weak
+ * memory ordering, we need to be careful that other processors see the array
+ * element changes before they see the head pointer change.  We handle this by
+ * using memory barriers when reading or writing the head/tail pointers (unless
+ * the caller holds ProcArrayLock exclusively).
  *
  * Algorithmic analysis:
  *
  * If we have a maximum of M slots, with N XIDs currently spread across
  * S elements then we have N <= S <= M always.
  *
- * * Adding a new XID is O(1) and needs little locking (unless compression
- *     must happen)
+ * * Adding a new XID is O(1) and needs no lock (unless compression must
+ *     happen)
  * * Compressing the array is O(S) and requires exclusive lock
  * * Removing an XID is O(logS) and requires exclusive lock
  * * Taking a snapshot is O(S) and requires shared lock
@@ -4778,22 +4772,15 @@ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
    pArray->numKnownAssignedXids += nxids;
 
    /*
-    * Now update the head pointer.  We use a spinlock to protect this
-    * pointer, not because the update is likely to be non-atomic, but to
-    * ensure that other processors see the above array updates before they
-    * see the head pointer change.
-    *
-    * If we're holding ProcArrayLock exclusively, there's no need to take the
-    * spinlock.
+    * Now update the head pointer.  We use a write barrier to ensure that
+    * other processors see the above array updates before they see the head
+    * pointer change.  The barrier isn't required if we're holding
+    * ProcArrayLock exclusively.
     */
-   if (exclusive_lock)
-       pArray->headKnownAssignedXids = head;
-   else
-   {
-       SpinLockAcquire(&pArray->known_assigned_xids_lck);
-       pArray->headKnownAssignedXids = head;
-       SpinLockRelease(&pArray->known_assigned_xids_lck);
-   }
+   if (!exclusive_lock)
+       pg_write_barrier();
+
+   pArray->headKnownAssignedXids = head;
 }
 
 /*
@@ -4815,20 +4802,15 @@ KnownAssignedXidsSearch(TransactionId xid, bool remove)
    int         tail;
    int         result_index = -1;
 
-   if (remove)
-   {
-       /* we hold ProcArrayLock exclusively, so no need for spinlock */
-       tail = pArray->tailKnownAssignedXids;
-       head = pArray->headKnownAssignedXids;
-   }
-   else
-   {
-       /* take spinlock to ensure we see up-to-date array contents */
-       SpinLockAcquire(&pArray->known_assigned_xids_lck);
-       tail = pArray->tailKnownAssignedXids;
-       head = pArray->headKnownAssignedXids;
-       SpinLockRelease(&pArray->known_assigned_xids_lck);
-   }
+   tail = pArray->tailKnownAssignedXids;
+   head = pArray->headKnownAssignedXids;
+
+   /*
+    * Only the startup process removes entries, so we don't need the read
+    * barrier in that case.
+    */
+   if (!remove)
+       pg_read_barrier();      /* pairs with KnownAssignedXidsAdd */
 
    /*
     * Standard binary search.  Note we can ignore the KnownAssignedXidsValid
@@ -5066,13 +5048,11 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
     * cannot enter and then leave the array while we hold ProcArrayLock.  We
     * might miss newly-added xids, but they should be >= xmax so irrelevant
     * anyway.
-    *
-    * Must take spinlock to ensure we see up-to-date array contents.
     */
-   SpinLockAcquire(&procArray->known_assigned_xids_lck);
    tail = procArray->tailKnownAssignedXids;
    head = procArray->headKnownAssignedXids;
-   SpinLockRelease(&procArray->known_assigned_xids_lck);
+
+   pg_read_barrier();          /* pairs with KnownAssignedXidsAdd */
 
    for (i = tail; i < head; i++)
    {
@@ -5119,10 +5099,10 @@ KnownAssignedXidsGetOldestXmin(void)
    /*
     * Fetch head just once, since it may change while we loop.
     */
-   SpinLockAcquire(&procArray->known_assigned_xids_lck);
    tail = procArray->tailKnownAssignedXids;
    head = procArray->headKnownAssignedXids;
-   SpinLockRelease(&procArray->known_assigned_xids_lck);
+
+   pg_read_barrier();          /* pairs with KnownAssignedXidsAdd */
 
    for (i = tail; i < head; i++)
    {