From 6439bea867306655db3efb0c7174d745d1b94a97 Mon Sep 17 00:00:00 2001 From: Yura Sokolov Date: Mon, 2 Jun 2025 19:26:20 +0300 Subject: [PATCH] Read-Write optimistic spin lock. There are couple of places where spin lock is used just to separate readers and writers. And there are a lot of readers and one or few writers. Those places are source of contention in a huge system setups because readers create bottleneck on always exclusive slock_t. Mitigate it by introducing read-write optimistic spin lock. Read is performed without any write operation on lock itself. Correctness of read is checked by comparing lock version before and after read. Use this RWSpin lock in sending invalidations and notifies of walsenders. If no native 64bit atomic support present, fallback to regular spin lock. --- src/backend/access/transam/xlogrecovery.c | 70 +++++------ src/backend/storage/ipc/sinvaladt.c | 14 +-- src/backend/storage/lmgr/Makefile | 1 + src/backend/storage/lmgr/meson.build | 1 + src/backend/storage/lmgr/rwoptspin.c | 33 +++++ src/include/storage/rwoptspin.h | 139 ++++++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 7 files changed, 217 insertions(+), 42 deletions(-) create mode 100644 src/backend/storage/lmgr/rwoptspin.c create mode 100644 src/include/storage/rwoptspin.h diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index f23ec8969c27..b9ee9426c734 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -56,7 +56,7 @@ #include "storage/latch.h" #include "storage/pmsignal.h" #include "storage/procarray.h" -#include "storage/spin.h" +#include "storage/rwoptspin.h" #include "utils/datetime.h" #include "utils/fmgrprotos.h" #include "utils/guc_hooks.h" @@ -364,7 +364,7 @@ typedef struct XLogRecoveryCtlData RecoveryPauseState recoveryPauseState; ConditionVariable recoveryNotPausedCV; - slock_t info_lck; /* locks shared variables shown above */ + RWOptSpin info_lck; /* locks shared variables shown above */ } XLogRecoveryCtlData; static XLogRecoveryCtlData *XLogRecoveryCtl = NULL; @@ -471,7 +471,7 @@ XLogRecoveryShmemInit(void) return; memset(XLogRecoveryCtl, 0, sizeof(XLogRecoveryCtlData)); - SpinLockInit(&XLogRecoveryCtl->info_lck); + RWOptSpinInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); } @@ -1669,7 +1669,7 @@ PerformWalRecovery(void) * we had just replayed the record before the REDO location (or the * checkpoint record itself, if it's a shutdown checkpoint). */ - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); if (RedoStartLSN < CheckPointLoc) { XLogRecoveryCtl->lastReplayedReadRecPtr = InvalidXLogRecPtr; @@ -1687,7 +1687,7 @@ PerformWalRecovery(void) XLogRecoveryCtl->recoveryLastXTime = 0; XLogRecoveryCtl->currentChunkStartTime = 0; XLogRecoveryCtl->recoveryPauseState = RECOVERY_NOT_PAUSED; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); /* Also ensure XLogReceiptTime has a sane value */ XLogReceiptTime = GetCurrentTimestamp(); @@ -1978,10 +1978,10 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl * Update shared replayEndRecPtr before replaying this record, so that * XLogFlush will update minRecoveryPoint correctly. */ - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->replayEndRecPtr = xlogreader->EndRecPtr; XLogRecoveryCtl->replayEndTLI = *replayTLI; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); /* * If we are attempting to enter Hot Standby mode, process XIDs we see @@ -2015,11 +2015,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl * Update lastReplayedEndRecPtr after this record has been successfully * replayed. */ - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->lastReplayedReadRecPtr = xlogreader->ReadRecPtr; XLogRecoveryCtl->lastReplayedEndRecPtr = xlogreader->EndRecPtr; XLogRecoveryCtl->lastReplayedTLI = *replayTLI; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); /* ------ * Wakeup walsenders: @@ -2270,9 +2270,9 @@ CheckRecoveryConsistency(void) reachedConsistency && IsUnderPostmaster) { - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->SharedHotStandbyActive = true; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); LocalHotStandbyActive = true; @@ -3083,9 +3083,9 @@ GetRecoveryPauseState(void) { RecoveryPauseState state; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); state = XLogRecoveryCtl->recoveryPauseState; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); return state; } @@ -3101,14 +3101,14 @@ GetRecoveryPauseState(void) void SetRecoveryPause(bool recoveryPause) { - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); if (!recoveryPause) XLogRecoveryCtl->recoveryPauseState = RECOVERY_NOT_PAUSED; else if (XLogRecoveryCtl->recoveryPauseState == RECOVERY_NOT_PAUSED) XLogRecoveryCtl->recoveryPauseState = RECOVERY_PAUSE_REQUESTED; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); if (!recoveryPause) ConditionVariableBroadcast(&XLogRecoveryCtl->recoveryNotPausedCV); @@ -3122,10 +3122,10 @@ static void ConfirmRecoveryPaused(void) { /* If recovery pause is requested then set it paused */ - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); if (XLogRecoveryCtl->recoveryPauseState == RECOVERY_PAUSE_REQUESTED) XLogRecoveryCtl->recoveryPauseState = RECOVERY_PAUSED; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); } @@ -4422,9 +4422,9 @@ PromoteIsTriggered(void) if (LocalPromoteIsTriggered) return true; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); LocalPromoteIsTriggered = XLogRecoveryCtl->SharedPromoteIsTriggered; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); return LocalPromoteIsTriggered; } @@ -4432,9 +4432,9 @@ PromoteIsTriggered(void) static void SetPromoteIsTriggered(void) { - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->SharedPromoteIsTriggered = true; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); /* * Mark the recovery pause state as 'not paused' because the paused state @@ -4532,9 +4532,9 @@ HotStandbyActive(void) else { /* spinlock is essential on machines with weak memory ordering! */ - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); LocalHotStandbyActive = XLogRecoveryCtl->SharedHotStandbyActive; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); return LocalHotStandbyActive; } @@ -4562,10 +4562,10 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI) XLogRecPtr recptr; TimeLineID tli; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); recptr = XLogRecoveryCtl->lastReplayedEndRecPtr; tli = XLogRecoveryCtl->lastReplayedTLI; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); if (replayTLI) *replayTLI = tli; @@ -4585,10 +4585,10 @@ GetCurrentReplayRecPtr(TimeLineID *replayEndTLI) XLogRecPtr recptr; TimeLineID tli; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); recptr = XLogRecoveryCtl->replayEndRecPtr; tli = XLogRecoveryCtl->replayEndTLI; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); if (replayEndTLI) *replayEndTLI = tli; @@ -4605,9 +4605,9 @@ GetCurrentReplayRecPtr(TimeLineID *replayEndTLI) static void SetLatestXTime(TimestampTz xtime) { - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->recoveryLastXTime = xtime; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); } /* @@ -4618,9 +4618,9 @@ GetLatestXTime(void) { TimestampTz xtime; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); xtime = XLogRecoveryCtl->recoveryLastXTime; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); return xtime; } @@ -4634,9 +4634,9 @@ GetLatestXTime(void) static void SetCurrentChunkStartTime(TimestampTz xtime) { - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->currentChunkStartTime = xtime; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); } /* @@ -4648,9 +4648,9 @@ GetCurrentChunkReplayStartTime(void) { TimestampTz xtime; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); xtime = XLogRecoveryCtl->currentChunkStartTime; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); return xtime; } diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index c5748b690f40..cbc21667f265 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -24,7 +24,7 @@ #include "storage/procsignal.h" #include "storage/shmem.h" #include "storage/sinvaladt.h" -#include "storage/spin.h" +#include "storage/rwoptspin.h" /* * Conceptually, the shared cache invalidation messages are stored in an @@ -171,7 +171,7 @@ typedef struct SISeg int maxMsgNum; /* next message number to be assigned */ int nextThreshold; /* # of messages to call SICleanupQueue */ - slock_t msgnumLock; /* spinlock protecting maxMsgNum */ + RWOptSpin msgnumLock; /* spinlock protecting maxMsgNum */ /* * Circular buffer holding shared-inval messages @@ -246,7 +246,7 @@ SharedInvalShmemInit(void) shmInvalBuffer->minMsgNum = 0; shmInvalBuffer->maxMsgNum = 0; shmInvalBuffer->nextThreshold = CLEANUP_MIN; - SpinLockInit(&shmInvalBuffer->msgnumLock); + RWOptSpinInit(&shmInvalBuffer->msgnumLock); /* The buffer[] array is initially all unused, so we need not fill it */ @@ -419,9 +419,9 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) } /* Update current value of maxMsgNum using spinlock */ - SpinLockAcquire(&segP->msgnumLock); + RWOptSpinAcquire(&segP->msgnumLock); segP->maxMsgNum = max; - SpinLockRelease(&segP->msgnumLock); + RWOptSpinRelease(&segP->msgnumLock); /* * Now that the maxMsgNum change is globally visible, we give everyone @@ -508,9 +508,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) stateP->hasMessages = false; /* Fetch current value of maxMsgNum using spinlock */ - SpinLockAcquire(&segP->msgnumLock); + RWOptSpinReadDo(&segP->msgnumLock); max = segP->maxMsgNum; - SpinLockRelease(&segP->msgnumLock); + RWOptSpinReadWhile(&segP->msgnumLock); if (stateP->resetState) { diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile index 6cbaf23b855f..89892835b97d 100644 --- a/src/backend/storage/lmgr/Makefile +++ b/src/backend/storage/lmgr/Makefile @@ -21,6 +21,7 @@ OBJS = \ predicate.o \ proc.o \ s_lock.o \ + rwoptspin.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/lmgr/meson.build b/src/backend/storage/lmgr/meson.build index a5490c1047fa..2f940931961e 100644 --- a/src/backend/storage/lmgr/meson.build +++ b/src/backend/storage/lmgr/meson.build @@ -9,4 +9,5 @@ backend_sources += files( 'predicate.c', 'proc.c', 's_lock.c', + 'rwoptspin.c', ) diff --git a/src/backend/storage/lmgr/rwoptspin.c b/src/backend/storage/lmgr/rwoptspin.c new file mode 100644 index 000000000000..7a8e7065d07c --- /dev/null +++ b/src/backend/storage/lmgr/rwoptspin.c @@ -0,0 +1,33 @@ +#include "storage/rwoptspin.h" + +#if PG_RWOPTSPIN_NATIVE + +#include "storage/s_lock.h" +void +RWOptSpinAcquire_slowpath(RWOptSpin *spin, const char *file, int line, const char *func) +{ + SpinDelayStatus delay; + + init_spin_delay(&delay, file, line, func); + do + { + perform_spin_delay(&delay); + } while ((pg_atomic_read_u64(spin) & 1) != 0 || + ((pg_atomic_fetch_or_u64(spin, 1) & 1) != 0)); + finish_spin_delay(&delay); +} + +void +RWOptSpinRead_wait(RWOptSpin *spin, uint64 *version, const char *file, int line, const char *func) +{ + SpinDelayStatus delay; + + init_spin_delay(&delay, file, line, func); + do + { + perform_spin_delay(&delay); + *version = pg_atomic_read_u64(spin); + } while (*version & 1); + finish_spin_delay(&delay); +} +#endif diff --git a/src/include/storage/rwoptspin.h b/src/include/storage/rwoptspin.h new file mode 100644 index 000000000000..d806ea82a4d4 --- /dev/null +++ b/src/include/storage/rwoptspin.h @@ -0,0 +1,139 @@ +/*------------------------------------------------------------- + * + * rwoptspin.h + * Read-Write optimistic spin lock. + * + * It works best when there are few writers and a lot of simultaneous + * readers. + * + * Synchronization primitive relied on lock version: + * - to acquire write lock, writer first makes version odd with atomic + * operation (and spins if it is already odd), + * - to release write lock, writer increments version, therefore version become + * even, + * - reader ensures version is even (spin-waiting in other case), then reads + * protected values "lockless". + * - after read version should be rechecked. If it changed, read should be + * retried. + * + * Reader could perform only simple read operations. It should not perform any + * write operation in its loop nor complex reads which may be invalid in case + * of concurrent changes. + */ + +#ifndef RWSPIN_H +#define RWSPIN_H + +#include "c.h" +#include "port/atomics.h" + +#if defined(PG_HAVE_ATOMIC_U64_SUPPORT) && !defined(PG_HAVE_ATOMIC_U64_SIMULATION) +#define PG_RWOPTSPIN_NATIVE 1 + +typedef pg_atomic_uint64 RWOptSpin; + +/* Initialize RWSpin. */ +static inline void RWOptSpinInit(RWOptSpin *spin); + +/* Acquire RWOptSpin for write operation. */ +#define RWOptSpinAcquire(spin) RWOptSpinAcquire_impl((spin), __FILE__, __LINE__, __func__) + +/* Release write lock of RWOptSpin. */ +static inline void RWOptSpinRelease(RWOptSpin *spin); + +/*----------------------------------------- + * Syntax sugar for read operations. + * Example usage: + * + * RWOptSpinReadDo(rwspin); + * val1 = read_protected_value1(); + * val2 = read_protected_value2(); + * RWOptSpinReadWhile(rwspin); + */ +#define RWOptSpinReadDo(rwspin) \ + { uint64 rwopt_version = RWOptSpinReadStart((rwspin), __FILE__, __LINE__, __func__); do { +#define RWOptSpinReadWhile(rwspin) \ + } while (RWOptSpinRetryRead((rwspin), &rwopt_version, __FILE__, __LINE__, __func__)); } + +/* Implementation */ + +static inline void +RWOptSpinInit(RWOptSpin *spin) +{ + pg_atomic_init_u64(spin, 0); +} + +extern void RWOptSpinAcquire_slowpath(RWOptSpin *spin, const char *file, int line, const char *func); + +static inline void +RWOptSpinAcquire_impl(RWOptSpin *spin, const char *file, int line, const char *func) +{ + if (likely((pg_atomic_fetch_or_u64(spin, 1) & 1) == 0)) + return; + RWOptSpinAcquire_slowpath(spin, file, line, func); +} + +static inline void +RWOptSpinRelease(RWOptSpin *spin) +{ + pg_atomic_add_fetch_u64(spin, 1); +} + +extern void RWOptSpinRead_wait(RWOptSpin *spin, uint64 *version, const char *file, int line, const char *func); + +static inline uint64 +RWOptSpinReadStart(RWOptSpin *spin, const char *file, int line, const char *func) +{ + uint64 version = pg_atomic_read_u64(spin); + + /* If it is write locked, wait until writer finished its work */ + if (version & 1) + RWOptSpinRead_wait(spin, &version, file, line, func); + + /* + * Memory barrier is to provide both acquire+release semantic and read + * barrier between version read and actual reads. + */ + pg_memory_barrier(); + + return version; +} + +static inline bool +RWOptSpinRetryRead(RWOptSpin *spin, uint64 *version, const char *file, int line, const char *func) +{ + uint64 cur; + + pg_read_barrier(); + cur = pg_atomic_read_u64(spin); + if (cur == *version) + return false; + + *version = cur; + + /* If it is write locked, wait until writer finished its work */ + if (*version & 1) + RWOptSpinRead_wait(spin, version, file, line, func); + + pg_read_barrier(); + return true; +} + +#else /* defined(PG_HAVE_ATOMIC_U64_SUPPORT) && + * !defined(PG_HAVE_ATOMIC_U64_SIMULATION) */ + +#include "storage/spin.h" + +typedef slock_t RWOptSpin; + +#define RWOptSpinInit(lock) SpinLockInit(lock) +#define RWOptSpinAcquire(lock) SpinLockAcquire(lock) +#define RWOptSpinRelease(lock) SpinLockRelease(lock) + +#define RWOptSpinReadDo(lock) SpinLockAcquire(lock) +#define RWOptSpinReadWhile(lock) SpinLockRelease(lock) + +#endif /* defined(PG_HAVE_ATOMIC_U64_SUPPORT) && + * !defined(PG_HAVE_ATOMIC_U64_SIMULATION) */ + +#endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e6f2e93b2d6f..bce98e8b270f 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2411,6 +2411,7 @@ RTEPermissionInfo RWConflict RWConflictData RWConflictPoolHeader +RWOptSpin Range RangeBound RangeBox