PostgreSQL Source Code git master
sinvaladt.h File Reference
#include "storage/lock.h"
#include "storage/sinval.h"
Include dependency graph for sinvaladt.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

Size SharedInvalShmemSize (void)
 
void SharedInvalShmemInit (void)
 
void SharedInvalBackendInit (bool sendOnly)
 
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
 
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
 
void SICleanupQueue (bool callerHasWriteLock, int minFree)
 
LocalTransactionId GetNextLocalTransactionId (void)
 

Function Documentation

◆ GetNextLocalTransactionId()

LocalTransactionId GetNextLocalTransactionId ( void  )

Definition at line 701 of file sinvaladt.c.

702{
703 LocalTransactionId result;
704
705 /* loop to avoid returning InvalidLocalTransactionId at wraparound */
706 do
707 {
708 result = nextLocalTransactionId++;
709 } while (!LocalTransactionIdIsValid(result));
710
711 return result;
712}
uint32 LocalTransactionId
Definition: c.h:625
#define LocalTransactionIdIsValid(lxid)
Definition: lock.h:67
static LocalTransactionId nextLocalTransactionId
Definition: sinvaladt.c:209

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

◆ SharedInvalBackendInit()

void SharedInvalBackendInit ( bool  sendOnly)

Definition at line 272 of file sinvaladt.c.

273{
274 ProcState *stateP;
275 pid_t oldPid;
276 SISeg *segP = shmInvalBuffer;
277
278 if (MyProcNumber < 0)
279 elog(ERROR, "MyProcNumber not set");
281 elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
283 stateP = &segP->procState[MyProcNumber];
284
285 /*
286 * This can run in parallel with read operations, but not with write
287 * operations, since SIInsertDataEntries relies on the pgprocnos array to
288 * set hasMessages appropriately.
289 */
290 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
291
292 oldPid = stateP->procPid;
293 if (oldPid != 0)
294 {
295 LWLockRelease(SInvalWriteLock);
296 elog(ERROR, "sinval slot for backend %d is already in use by process %d",
297 MyProcNumber, (int) oldPid);
298 }
299
301
302 /* Fetch next local transaction ID into local memory */
304
305 /* mark myself active, with all extant messages already read */
306 stateP->procPid = MyProcPid;
307 stateP->nextMsgNum = segP->maxMsgNum;
308 stateP->resetState = false;
309 stateP->signaled = false;
310 stateP->hasMessages = false;
311 stateP->sendOnly = sendOnly;
312
313 LWLockRelease(SInvalWriteLock);
314
315 /* register exit routine to mark my entry inactive at exit */
317}
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
int MyProcPid
Definition: globals.c:48
ProcNumber MyProcNumber
Definition: globals.c:91
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_EXCLUSIVE
Definition: lwlock.h:114
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
static SISeg * shmInvalBuffer
Definition: sinvaladt.c:206
#define NumProcStateSlots
Definition: sinvaladt.c:204
static void CleanupInvalidationState(int status, Datum arg)
Definition: sinvaladt.c:328
int nextMsgNum
Definition: sinvaladt.c:142
bool signaled
Definition: sinvaladt.c:144
LocalTransactionId nextLXID
Definition: sinvaladt.c:161
pid_t procPid
Definition: sinvaladt.c:140
bool hasMessages
Definition: sinvaladt.c:145
bool sendOnly
Definition: sinvaladt.c:153
bool resetState
Definition: sinvaladt.c:143
int maxMsgNum
Definition: sinvaladt.c:171
int * pgprocnos
Definition: sinvaladt.c:194
int numProcs
Definition: sinvaladt.c:193
ProcState procState[FLEXIBLE_ARRAY_MEMBER]
Definition: sinvaladt.c:195

References CleanupInvalidationState(), elog, ERROR, ProcState::hasMessages, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MyProcNumber, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::numProcs, NumProcStateSlots, on_shmem_exit(), PANIC, SISeg::pgprocnos, PointerGetDatum(), ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, shmInvalBuffer, and ProcState::signaled.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

◆ SharedInvalShmemInit()

void SharedInvalShmemInit ( void  )

Definition at line 234 of file sinvaladt.c.

235{
236 int i;
237 bool found;
238
239 /* Allocate space in shared memory */
241 ShmemInitStruct("shmInvalBuffer", SharedInvalShmemSize(), &found);
242 if (found)
243 return;
244
245 /* Clear message counters, save size of procState array, init spinlock */
250
251 /* The buffer[] array is initially all unused, so we need not fill it */
252
253 /* Mark all backends inactive, and initialize nextLXID */
254 for (i = 0; i < NumProcStateSlots; i++)
255 {
256 shmInvalBuffer->procState[i].procPid = 0; /* inactive */
257 shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
262 }
265}
int i
Definition: isn.c:77
#define InvalidLocalTransactionId
Definition: lock.h:66
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
Size SharedInvalShmemSize(void)
Definition: sinvaladt.c:218
#define CLEANUP_MIN
Definition: sinvaladt.c:131
#define SpinLockInit(lock)
Definition: spin.h:57
int minMsgNum
Definition: sinvaladt.c:170
slock_t msgnumLock
Definition: sinvaladt.c:174
int nextThreshold
Definition: sinvaladt.c:172

References CLEANUP_MIN, ProcState::hasMessages, i, InvalidLocalTransactionId, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, SISeg::numProcs, NumProcStateSlots, SISeg::pgprocnos, ProcState::procPid, SISeg::procState, ProcState::resetState, SharedInvalShmemSize(), ShmemInitStruct(), shmInvalBuffer, ProcState::signaled, and SpinLockInit.

Referenced by CreateOrAttachShmemStructs().

◆ SharedInvalShmemSize()

Size SharedInvalShmemSize ( void  )

Definition at line 218 of file sinvaladt.c.

219{
220 Size size;
221
222 size = offsetof(SISeg, procState);
223 size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
224 size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
225
226 return size;
227}
size_t Size
Definition: c.h:576
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), mul_size(), and NumProcStateSlots.

Referenced by CalculateShmemSize(), and SharedInvalShmemInit().

◆ SICleanupQueue()

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 577 of file sinvaladt.c.

578{
579 SISeg *segP = shmInvalBuffer;
580 int min,
581 minsig,
582 lowbound,
583 numMsgs,
584 i;
585 ProcState *needSig = NULL;
586
587 /* Lock out all writers and readers */
588 if (!callerHasWriteLock)
589 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
590 LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
591
592 /*
593 * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
594 * furthest-back backend that needs signaling (if any), and reset any
595 * backends that are too far back. Note that because we ignore sendOnly
596 * backends here it is possible for them to keep sending messages without
597 * a problem even when they are the only active backend.
598 */
599 min = segP->maxMsgNum;
600 minsig = min - SIG_THRESHOLD;
601 lowbound = min - MAXNUMMESSAGES + minFree;
602
603 for (i = 0; i < segP->numProcs; i++)
604 {
605 ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
606 int n = stateP->nextMsgNum;
607
608 /* Ignore if already in reset state */
609 Assert(stateP->procPid != 0);
610 if (stateP->resetState || stateP->sendOnly)
611 continue;
612
613 /*
614 * If we must free some space and this backend is preventing it, force
615 * him into reset state and then ignore until he catches up.
616 */
617 if (n < lowbound)
618 {
619 stateP->resetState = true;
620 /* no point in signaling him ... */
621 continue;
622 }
623
624 /* Track the global minimum nextMsgNum */
625 if (n < min)
626 min = n;
627
628 /* Also see who's furthest back of the unsignaled backends */
629 if (n < minsig && !stateP->signaled)
630 {
631 minsig = n;
632 needSig = stateP;
633 }
634 }
635 segP->minMsgNum = min;
636
637 /*
638 * When minMsgNum gets really large, decrement all message counters so as
639 * to forestall overflow of the counters. This happens seldom enough that
640 * folding it into the previous loop would be a loser.
641 */
642 if (min >= MSGNUMWRAPAROUND)
643 {
646 for (i = 0; i < segP->numProcs; i++)
648 }
649
650 /*
651 * Determine how many messages are still in the queue, and set the
652 * threshold at which we should repeat SICleanupQueue().
653 */
654 numMsgs = segP->maxMsgNum - segP->minMsgNum;
655 if (numMsgs < CLEANUP_MIN)
657 else
658 segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
659
660 /*
661 * Lastly, signal anyone who needs a catchup interrupt. Since
662 * SendProcSignal() might not be fast, we don't want to hold locks while
663 * executing it.
664 */
665 if (needSig)
666 {
667 pid_t his_pid = needSig->procPid;
668 ProcNumber his_procNumber = (needSig - &segP->procState[0]);
669
670 needSig->signaled = true;
671 LWLockRelease(SInvalReadLock);
672 LWLockRelease(SInvalWriteLock);
673 elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
674 SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
675 if (callerHasWriteLock)
676 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
677 }
678 else
679 {
680 LWLockRelease(SInvalReadLock);
681 if (!callerHasWriteLock)
682 LWLockRelease(SInvalWriteLock);
683 }
684}
#define DEBUG4
Definition: elog.h:27
Assert(PointerIsAligned(start, uint64))
int ProcNumber
Definition: procnumber.h:24
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_CATCHUP_INTERRUPT
Definition: procsignal.h:32
#define CLEANUP_QUANTUM
Definition: sinvaladt.c:132
#define MAXNUMMESSAGES
Definition: sinvaladt.c:129
#define MSGNUMWRAPAROUND
Definition: sinvaladt.c:130
#define SIG_THRESHOLD
Definition: sinvaladt.c:133

References Assert(), CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, SISeg::numProcs, SISeg::pgprocnos, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), shmInvalBuffer, SIG_THRESHOLD, and ProcState::signaled.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

◆ SIGetDataEntries()

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 473 of file sinvaladt.c.

474{
475 SISeg *segP;
476 ProcState *stateP;
477 int max;
478 int n;
479
480 segP = shmInvalBuffer;
481 stateP = &segP->procState[MyProcNumber];
482
483 /*
484 * Before starting to take locks, do a quick, unlocked test to see whether
485 * there can possibly be anything to read. On a multiprocessor system,
486 * it's possible that this load could migrate backwards and occur before
487 * we actually enter this function, so we might miss a sinval message that
488 * was just added by some other processor. But they can't migrate
489 * backwards over a preceding lock acquisition, so it should be OK. If we
490 * haven't acquired a lock preventing against further relevant
491 * invalidations, any such occurrence is not much different than if the
492 * invalidation had arrived slightly later in the first place.
493 */
494 if (!stateP->hasMessages)
495 return 0;
496
497 LWLockAcquire(SInvalReadLock, LW_SHARED);
498
499 /*
500 * We must reset hasMessages before determining how many messages we're
501 * going to read. That way, if new messages arrive after we have
502 * determined how many we're reading, the flag will get reset and we'll
503 * notice those messages part-way through.
504 *
505 * Note that, if we don't end up reading all of the messages, we had
506 * better be certain to reset this flag before exiting!
507 */
508 stateP->hasMessages = false;
509
510 /* Fetch current value of maxMsgNum using spinlock */
512 max = segP->maxMsgNum;
514
515 if (stateP->resetState)
516 {
517 /*
518 * Force reset. We can say we have dealt with any messages added
519 * since the reset, as well; and that means we should clear the
520 * signaled flag, too.
521 */
522 stateP->nextMsgNum = max;
523 stateP->resetState = false;
524 stateP->signaled = false;
525 LWLockRelease(SInvalReadLock);
526 return -1;
527 }
528
529 /*
530 * Retrieve messages and advance backend's counter, until data array is
531 * full or there are no more messages.
532 *
533 * There may be other backends that haven't read the message(s), so we
534 * cannot delete them here. SICleanupQueue() will eventually remove them
535 * from the queue.
536 */
537 n = 0;
538 while (n < datasize && stateP->nextMsgNum < max)
539 {
540 data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
541 stateP->nextMsgNum++;
542 }
543
544 /*
545 * If we have caught up completely, reset our "signaled" flag so that
546 * we'll get another signal if we fall behind again.
547 *
548 * If we haven't caught up completely, reset the hasMessages flag so that
549 * we see the remaining messages next time.
550 */
551 if (stateP->nextMsgNum >= max)
552 stateP->signaled = false;
553 else
554 stateP->hasMessages = true;
555
556 LWLockRelease(SInvalReadLock);
557 return n;
558}
@ LW_SHARED
Definition: lwlock.h:115
const void * data
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
SharedInvalidationMessage buffer[MAXNUMMESSAGES]
Definition: sinvaladt.c:179

References SISeg::buffer, data, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::msgnumLock, MyProcNumber, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, shmInvalBuffer, ProcState::signaled, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

◆ SIInsertDataEntries()

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 370 of file sinvaladt.c.

371{
372 SISeg *segP = shmInvalBuffer;
373
374 /*
375 * N can be arbitrarily large. We divide the work into groups of no more
376 * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
377 * an unreasonably long time. (This is not so much because we care about
378 * letting in other writers, as that some just-caught-up backend might be
379 * trying to do SICleanupQueue to pass on its signal, and we don't want it
380 * to have to wait a long time.) Also, we need to consider calling
381 * SICleanupQueue every so often.
382 */
383 while (n > 0)
384 {
385 int nthistime = Min(n, WRITE_QUANTUM);
386 int numMsgs;
387 int max;
388 int i;
389
390 n -= nthistime;
391
392 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
393
394 /*
395 * If the buffer is full, we *must* acquire some space. Clean the
396 * queue and reset anyone who is preventing space from being freed.
397 * Otherwise, clean the queue only when it's exceeded the next
398 * fullness threshold. We have to loop and recheck the buffer state
399 * after any call of SICleanupQueue.
400 */
401 for (;;)
402 {
403 numMsgs = segP->maxMsgNum - segP->minMsgNum;
404 if (numMsgs + nthistime > MAXNUMMESSAGES ||
405 numMsgs >= segP->nextThreshold)
406 SICleanupQueue(true, nthistime);
407 else
408 break;
409 }
410
411 /*
412 * Insert new message(s) into proper slot of circular buffer
413 */
414 max = segP->maxMsgNum;
415 while (nthistime-- > 0)
416 {
417 segP->buffer[max % MAXNUMMESSAGES] = *data++;
418 max++;
419 }
420
421 /* Update current value of maxMsgNum using spinlock */
423 segP->maxMsgNum = max;
425
426 /*
427 * Now that the maxMsgNum change is globally visible, we give everyone
428 * a swift kick to make sure they read the newly added messages.
429 * Releasing SInvalWriteLock will enforce a full memory barrier, so
430 * these (unlocked) changes will be committed to memory before we exit
431 * the function.
432 */
433 for (i = 0; i < segP->numProcs; i++)
434 {
435 ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
436
437 stateP->hasMessages = true;
438 }
439
440 LWLockRelease(SInvalWriteLock);
441 }
442}
#define Min(x, y)
Definition: c.h:975
#define WRITE_QUANTUM
Definition: sinvaladt.c:134
void SICleanupQueue(bool callerHasWriteLock, int minFree)
Definition: sinvaladt.c:577

References SISeg::buffer, data, ProcState::hasMessages, i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::numProcs, SISeg::pgprocnos, SISeg::procState, shmInvalBuffer, SICleanupQueue(), SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().