diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index f23ec8969c27..b9ee9426c734 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -56,7 +56,7 @@ #include "storage/latch.h" #include "storage/pmsignal.h" #include "storage/procarray.h" -#include "storage/spin.h" +#include "storage/rwoptspin.h" #include "utils/datetime.h" #include "utils/fmgrprotos.h" #include "utils/guc_hooks.h" @@ -364,7 +364,7 @@ typedef struct XLogRecoveryCtlData RecoveryPauseState recoveryPauseState; ConditionVariable recoveryNotPausedCV; - slock_t info_lck; /* locks shared variables shown above */ + RWOptSpin info_lck; /* locks shared variables shown above */ } XLogRecoveryCtlData; static XLogRecoveryCtlData *XLogRecoveryCtl = NULL; @@ -471,7 +471,7 @@ XLogRecoveryShmemInit(void) return; memset(XLogRecoveryCtl, 0, sizeof(XLogRecoveryCtlData)); - SpinLockInit(&XLogRecoveryCtl->info_lck); + RWOptSpinInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); } @@ -1669,7 +1669,7 @@ PerformWalRecovery(void) * we had just replayed the record before the REDO location (or the * checkpoint record itself, if it's a shutdown checkpoint). */ - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); if (RedoStartLSN < CheckPointLoc) { XLogRecoveryCtl->lastReplayedReadRecPtr = InvalidXLogRecPtr; @@ -1687,7 +1687,7 @@ PerformWalRecovery(void) XLogRecoveryCtl->recoveryLastXTime = 0; XLogRecoveryCtl->currentChunkStartTime = 0; XLogRecoveryCtl->recoveryPauseState = RECOVERY_NOT_PAUSED; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); /* Also ensure XLogReceiptTime has a sane value */ XLogReceiptTime = GetCurrentTimestamp(); @@ -1978,10 +1978,10 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl * Update shared replayEndRecPtr before replaying this record, so that * XLogFlush will update minRecoveryPoint correctly. */ - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->replayEndRecPtr = xlogreader->EndRecPtr; XLogRecoveryCtl->replayEndTLI = *replayTLI; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); /* * If we are attempting to enter Hot Standby mode, process XIDs we see @@ -2015,11 +2015,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl * Update lastReplayedEndRecPtr after this record has been successfully * replayed. */ - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->lastReplayedReadRecPtr = xlogreader->ReadRecPtr; XLogRecoveryCtl->lastReplayedEndRecPtr = xlogreader->EndRecPtr; XLogRecoveryCtl->lastReplayedTLI = *replayTLI; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); /* ------ * Wakeup walsenders: @@ -2270,9 +2270,9 @@ CheckRecoveryConsistency(void) reachedConsistency && IsUnderPostmaster) { - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->SharedHotStandbyActive = true; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); LocalHotStandbyActive = true; @@ -3083,9 +3083,9 @@ GetRecoveryPauseState(void) { RecoveryPauseState state; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); state = XLogRecoveryCtl->recoveryPauseState; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); return state; } @@ -3101,14 +3101,14 @@ GetRecoveryPauseState(void) void SetRecoveryPause(bool recoveryPause) { - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); if (!recoveryPause) XLogRecoveryCtl->recoveryPauseState = RECOVERY_NOT_PAUSED; else if (XLogRecoveryCtl->recoveryPauseState == RECOVERY_NOT_PAUSED) XLogRecoveryCtl->recoveryPauseState = RECOVERY_PAUSE_REQUESTED; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); if (!recoveryPause) ConditionVariableBroadcast(&XLogRecoveryCtl->recoveryNotPausedCV); @@ -3122,10 +3122,10 @@ static void ConfirmRecoveryPaused(void) { /* If recovery pause is requested then set it paused */ - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); if (XLogRecoveryCtl->recoveryPauseState == RECOVERY_PAUSE_REQUESTED) XLogRecoveryCtl->recoveryPauseState = RECOVERY_PAUSED; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); } @@ -4422,9 +4422,9 @@ PromoteIsTriggered(void) if (LocalPromoteIsTriggered) return true; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); LocalPromoteIsTriggered = XLogRecoveryCtl->SharedPromoteIsTriggered; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); return LocalPromoteIsTriggered; } @@ -4432,9 +4432,9 @@ PromoteIsTriggered(void) static void SetPromoteIsTriggered(void) { - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->SharedPromoteIsTriggered = true; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); /* * Mark the recovery pause state as 'not paused' because the paused state @@ -4532,9 +4532,9 @@ HotStandbyActive(void) else { /* spinlock is essential on machines with weak memory ordering! */ - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); LocalHotStandbyActive = XLogRecoveryCtl->SharedHotStandbyActive; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); return LocalHotStandbyActive; } @@ -4562,10 +4562,10 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI) XLogRecPtr recptr; TimeLineID tli; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); recptr = XLogRecoveryCtl->lastReplayedEndRecPtr; tli = XLogRecoveryCtl->lastReplayedTLI; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); if (replayTLI) *replayTLI = tli; @@ -4585,10 +4585,10 @@ GetCurrentReplayRecPtr(TimeLineID *replayEndTLI) XLogRecPtr recptr; TimeLineID tli; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); recptr = XLogRecoveryCtl->replayEndRecPtr; tli = XLogRecoveryCtl->replayEndTLI; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); if (replayEndTLI) *replayEndTLI = tli; @@ -4605,9 +4605,9 @@ GetCurrentReplayRecPtr(TimeLineID *replayEndTLI) static void SetLatestXTime(TimestampTz xtime) { - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->recoveryLastXTime = xtime; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); } /* @@ -4618,9 +4618,9 @@ GetLatestXTime(void) { TimestampTz xtime; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); xtime = XLogRecoveryCtl->recoveryLastXTime; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); return xtime; } @@ -4634,9 +4634,9 @@ GetLatestXTime(void) static void SetCurrentChunkStartTime(TimestampTz xtime) { - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinAcquire(&XLogRecoveryCtl->info_lck); XLogRecoveryCtl->currentChunkStartTime = xtime; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinRelease(&XLogRecoveryCtl->info_lck); } /* @@ -4648,9 +4648,9 @@ GetCurrentChunkReplayStartTime(void) { TimestampTz xtime; - SpinLockAcquire(&XLogRecoveryCtl->info_lck); + RWOptSpinReadDo(&XLogRecoveryCtl->info_lck); xtime = XLogRecoveryCtl->currentChunkStartTime; - SpinLockRelease(&XLogRecoveryCtl->info_lck); + RWOptSpinReadWhile(&XLogRecoveryCtl->info_lck); return xtime; } diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index c5748b690f40..cbc21667f265 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -24,7 +24,7 @@ #include "storage/procsignal.h" #include "storage/shmem.h" #include "storage/sinvaladt.h" -#include "storage/spin.h" +#include "storage/rwoptspin.h" /* * Conceptually, the shared cache invalidation messages are stored in an @@ -171,7 +171,7 @@ typedef struct SISeg int maxMsgNum; /* next message number to be assigned */ int nextThreshold; /* # of messages to call SICleanupQueue */ - slock_t msgnumLock; /* spinlock protecting maxMsgNum */ + RWOptSpin msgnumLock; /* spinlock protecting maxMsgNum */ /* * Circular buffer holding shared-inval messages @@ -246,7 +246,7 @@ SharedInvalShmemInit(void) shmInvalBuffer->minMsgNum = 0; shmInvalBuffer->maxMsgNum = 0; shmInvalBuffer->nextThreshold = CLEANUP_MIN; - SpinLockInit(&shmInvalBuffer->msgnumLock); + RWOptSpinInit(&shmInvalBuffer->msgnumLock); /* The buffer[] array is initially all unused, so we need not fill it */ @@ -419,9 +419,9 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) } /* Update current value of maxMsgNum using spinlock */ - SpinLockAcquire(&segP->msgnumLock); + RWOptSpinAcquire(&segP->msgnumLock); segP->maxMsgNum = max; - SpinLockRelease(&segP->msgnumLock); + RWOptSpinRelease(&segP->msgnumLock); /* * Now that the maxMsgNum change is globally visible, we give everyone @@ -508,9 +508,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) stateP->hasMessages = false; /* Fetch current value of maxMsgNum using spinlock */ - SpinLockAcquire(&segP->msgnumLock); + RWOptSpinReadDo(&segP->msgnumLock); max = segP->maxMsgNum; - SpinLockRelease(&segP->msgnumLock); + RWOptSpinReadWhile(&segP->msgnumLock); if (stateP->resetState) { diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile index 6cbaf23b855f..89892835b97d 100644 --- a/src/backend/storage/lmgr/Makefile +++ b/src/backend/storage/lmgr/Makefile @@ -21,6 +21,7 @@ OBJS = \ predicate.o \ proc.o \ s_lock.o \ + rwoptspin.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/lmgr/meson.build b/src/backend/storage/lmgr/meson.build index a5490c1047fa..2f940931961e 100644 --- a/src/backend/storage/lmgr/meson.build +++ b/src/backend/storage/lmgr/meson.build @@ -9,4 +9,5 @@ backend_sources += files( 'predicate.c', 'proc.c', 's_lock.c', + 'rwoptspin.c', ) diff --git a/src/backend/storage/lmgr/rwoptspin.c b/src/backend/storage/lmgr/rwoptspin.c new file mode 100644 index 000000000000..7a8e7065d07c --- /dev/null +++ b/src/backend/storage/lmgr/rwoptspin.c @@ -0,0 +1,33 @@ +#include "storage/rwoptspin.h" + +#if PG_RWOPTSPIN_NATIVE + +#include "storage/s_lock.h" +void +RWOptSpinAcquire_slowpath(RWOptSpin *spin, const char *file, int line, const char *func) +{ + SpinDelayStatus delay; + + init_spin_delay(&delay, file, line, func); + do + { + perform_spin_delay(&delay); + } while ((pg_atomic_read_u64(spin) & 1) != 0 || + ((pg_atomic_fetch_or_u64(spin, 1) & 1) != 0)); + finish_spin_delay(&delay); +} + +void +RWOptSpinRead_wait(RWOptSpin *spin, uint64 *version, const char *file, int line, const char *func) +{ + SpinDelayStatus delay; + + init_spin_delay(&delay, file, line, func); + do + { + perform_spin_delay(&delay); + *version = pg_atomic_read_u64(spin); + } while (*version & 1); + finish_spin_delay(&delay); +} +#endif diff --git a/src/include/storage/rwoptspin.h b/src/include/storage/rwoptspin.h new file mode 100644 index 000000000000..d806ea82a4d4 --- /dev/null +++ b/src/include/storage/rwoptspin.h @@ -0,0 +1,139 @@ +/*------------------------------------------------------------- + * + * rwoptspin.h + * Read-Write optimistic spin lock. + * + * It works best when there are few writers and a lot of simultaneous + * readers. + * + * Synchronization primitive relied on lock version: + * - to acquire write lock, writer first makes version odd with atomic + * operation (and spins if it is already odd), + * - to release write lock, writer increments version, therefore version become + * even, + * - reader ensures version is even (spin-waiting in other case), then reads + * protected values "lockless". + * - after read version should be rechecked. If it changed, read should be + * retried. + * + * Reader could perform only simple read operations. It should not perform any + * write operation in its loop nor complex reads which may be invalid in case + * of concurrent changes. + */ + +#ifndef RWSPIN_H +#define RWSPIN_H + +#include "c.h" +#include "port/atomics.h" + +#if defined(PG_HAVE_ATOMIC_U64_SUPPORT) && !defined(PG_HAVE_ATOMIC_U64_SIMULATION) +#define PG_RWOPTSPIN_NATIVE 1 + +typedef pg_atomic_uint64 RWOptSpin; + +/* Initialize RWSpin. */ +static inline void RWOptSpinInit(RWOptSpin *spin); + +/* Acquire RWOptSpin for write operation. */ +#define RWOptSpinAcquire(spin) RWOptSpinAcquire_impl((spin), __FILE__, __LINE__, __func__) + +/* Release write lock of RWOptSpin. */ +static inline void RWOptSpinRelease(RWOptSpin *spin); + +/*----------------------------------------- + * Syntax sugar for read operations. + * Example usage: + * + * RWOptSpinReadDo(rwspin); + * val1 = read_protected_value1(); + * val2 = read_protected_value2(); + * RWOptSpinReadWhile(rwspin); + */ +#define RWOptSpinReadDo(rwspin) \ + { uint64 rwopt_version = RWOptSpinReadStart((rwspin), __FILE__, __LINE__, __func__); do { +#define RWOptSpinReadWhile(rwspin) \ + } while (RWOptSpinRetryRead((rwspin), &rwopt_version, __FILE__, __LINE__, __func__)); } + +/* Implementation */ + +static inline void +RWOptSpinInit(RWOptSpin *spin) +{ + pg_atomic_init_u64(spin, 0); +} + +extern void RWOptSpinAcquire_slowpath(RWOptSpin *spin, const char *file, int line, const char *func); + +static inline void +RWOptSpinAcquire_impl(RWOptSpin *spin, const char *file, int line, const char *func) +{ + if (likely((pg_atomic_fetch_or_u64(spin, 1) & 1) == 0)) + return; + RWOptSpinAcquire_slowpath(spin, file, line, func); +} + +static inline void +RWOptSpinRelease(RWOptSpin *spin) +{ + pg_atomic_add_fetch_u64(spin, 1); +} + +extern void RWOptSpinRead_wait(RWOptSpin *spin, uint64 *version, const char *file, int line, const char *func); + +static inline uint64 +RWOptSpinReadStart(RWOptSpin *spin, const char *file, int line, const char *func) +{ + uint64 version = pg_atomic_read_u64(spin); + + /* If it is write locked, wait until writer finished its work */ + if (version & 1) + RWOptSpinRead_wait(spin, &version, file, line, func); + + /* + * Memory barrier is to provide both acquire+release semantic and read + * barrier between version read and actual reads. + */ + pg_memory_barrier(); + + return version; +} + +static inline bool +RWOptSpinRetryRead(RWOptSpin *spin, uint64 *version, const char *file, int line, const char *func) +{ + uint64 cur; + + pg_read_barrier(); + cur = pg_atomic_read_u64(spin); + if (cur == *version) + return false; + + *version = cur; + + /* If it is write locked, wait until writer finished its work */ + if (*version & 1) + RWOptSpinRead_wait(spin, version, file, line, func); + + pg_read_barrier(); + return true; +} + +#else /* defined(PG_HAVE_ATOMIC_U64_SUPPORT) && + * !defined(PG_HAVE_ATOMIC_U64_SIMULATION) */ + +#include "storage/spin.h" + +typedef slock_t RWOptSpin; + +#define RWOptSpinInit(lock) SpinLockInit(lock) +#define RWOptSpinAcquire(lock) SpinLockAcquire(lock) +#define RWOptSpinRelease(lock) SpinLockRelease(lock) + +#define RWOptSpinReadDo(lock) SpinLockAcquire(lock) +#define RWOptSpinReadWhile(lock) SpinLockRelease(lock) + +#endif /* defined(PG_HAVE_ATOMIC_U64_SUPPORT) && + * !defined(PG_HAVE_ATOMIC_U64_SIMULATION) */ + +#endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e6f2e93b2d6f..bce98e8b270f 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2411,6 +2411,7 @@ RTEPermissionInfo RWConflict RWConflictData RWConflictPoolHeader +RWOptSpin Range RangeBound RangeBox