walreceiver.c: Don't depend on ThisTimeLineID.
authorRobert Haas <[email protected]>
Fri, 29 Oct 2021 17:01:09 +0000 (13:01 -0400)
committerRobert Haas <[email protected]>
Fri, 29 Oct 2021 17:38:50 +0000 (13:38 -0400)
Instead, pass the TLI around explicitly, as a function parameter.
Since this calls a few xlog.c functions that used ThisTimeLineID,
it was necessary to also change those functions to take a
TimeLineID as a parameter.

src/backend/access/transam/xlog.c
src/backend/replication/walreceiver.c
src/include/access/xlog.h

index ab9cb2093ca4afd2843de594d759be611b4e4f66..23a3d49f77091de40c3a08d095b804937a1ba366 100644 (file)
@@ -927,7 +927,8 @@ static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
-                                                                  bool find_free, XLogSegNo max_segno);
+                                                                  bool find_free, XLogSegNo max_segno,
+                                                                  TimeLineID tli);
 static int     XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                                                 XLogSource source, bool notfoundOk);
 static int     XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
@@ -2517,7 +2518,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                                                        wal_segment_size);
 
                        /* create/use new log file */
-                       openLogFile = XLogFileInit(openLogSegNo);
+                       openLogFile = XLogFileInit(openLogSegNo, ThisTimeLineID);
                        ReserveExternalFD();
                }
 
@@ -2632,7 +2633,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                         */
                        if (finishing_seg)
                        {
-                               issue_xlog_fsync(openLogFile, openLogSegNo);
+                               issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID);
 
                                /* signal that we need to wakeup walsenders later */
                                WalSndWakeupRequest();
@@ -2703,7 +2704,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                                ReserveExternalFD();
                        }
 
-                       issue_xlog_fsync(openLogFile, openLogSegNo);
+                       issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID);
                }
 
                /* signal that we need to wakeup walsenders later */
@@ -3295,7 +3296,8 @@ XLogNeedsFlush(XLogRecPtr record)
  * succeed.  (This is weird, but it's efficient for the callers.)
  */
 static int
-XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
+XLogFileInitInternal(XLogSegNo logsegno, TimeLineID logtli,
+                                        bool *added, char *path)
 {
        char            tmppath[MAXPGPATH];
        PGAlignedXLogBlock zbuffer;
@@ -3304,7 +3306,9 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
        int                     fd;
        int                     save_errno;
 
-       XLogFilePath(path, ThisTimeLineID, logsegno, wal_segment_size);
+       Assert(logtli != 0);
+
+       XLogFilePath(path, logtli, logsegno, wal_segment_size);
 
        /*
         * Try to use existent file (checkpoint maker may have created it already)
@@ -3448,7 +3452,8 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
         * CheckPointSegments.
         */
        max_segno = logsegno + CheckPointSegments;
-       if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno))
+       if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno,
+                                                          logtli))
        {
                *added = true;
                elog(DEBUG2, "done creating and filling new WAL file");
@@ -3480,13 +3485,15 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
  * in a critical section.
  */
 int
-XLogFileInit(XLogSegNo logsegno)
+XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
 {
        bool            ignore_added;
        char            path[MAXPGPATH];
        int                     fd;
 
-       fd = XLogFileInitInternal(logsegno, &ignore_added, path);
+       Assert(logtli != 0);
+
+       fd = XLogFileInitInternal(logsegno, logtli, &ignore_added, path);
        if (fd >= 0)
                return fd;
 
@@ -3628,7 +3635,7 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno,
        /*
         * Now move the segment into place with its final name.
         */
-       if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0))
+       if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, ThisTimeLineID))
                elog(ERROR, "InstallXLogFileSegment should not have failed");
 }
 
@@ -3652,18 +3659,22 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno,
  * free slot is found between *segno and max_segno. (Ignored when find_free
  * is false.)
  *
+ * tli: The timeline on which the new segment should be installed.
+ *
  * Returns true if the file was installed successfully.  false indicates that
  * max_segno limit was exceeded, the startup process has disabled this
  * function for now, or an error occurred while renaming the file into place.
  */
 static bool
 InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
-                                          bool find_free, XLogSegNo max_segno)
+                                          bool find_free, XLogSegNo max_segno, TimeLineID tli)
 {
        char            path[MAXPGPATH];
        struct stat stat_buf;
 
-       XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size);
+       Assert(tli != 0);
+
+       XLogFilePath(path, tli, *segno, wal_segment_size);
 
        LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
        if (!XLogCtl->InstallXLogFileSegmentActive)
@@ -3689,7 +3700,7 @@ InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
                                return false;
                        }
                        (*segno)++;
-                       XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size);
+                       XLogFilePath(path, tli, *segno, wal_segment_size);
                }
        }
 
@@ -3986,7 +3997,7 @@ PreallocXlogFiles(XLogRecPtr endptr)
        if (offset >= (uint32) (0.75 * wal_segment_size))
        {
                _logSegNo++;
-               lf = XLogFileInitInternal(_logSegNo, &added, path);
+               lf = XLogFileInitInternal(_logSegNo, ThisTimeLineID, &added, path);
                if (lf >= 0)
                        close(lf);
                if (added)
@@ -4265,7 +4276,7 @@ RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
                XLogCtl->InstallXLogFileSegmentActive &&        /* callee rechecks this */
                lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
                InstallXLogFileSegment(endlogSegNo, path,
-                                                          true, recycleSegNo))
+                                                          true, recycleSegNo, ThisTimeLineID))
        {
                ereport(DEBUG2,
                                (errmsg_internal("recycled write-ahead log file \"%s\"",
@@ -5400,7 +5411,7 @@ BootStrapXLOG(void)
        record->xl_crc = crc;
 
        /* Create first XLOG segment file */
-       openLogFile = XLogFileInit(1);
+       openLogFile = XLogFileInit(1, ThisTimeLineID);
 
        /*
         * We needn't bother with Reserve/ReleaseExternalFD here, since we'll
@@ -5708,7 +5719,7 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
                 */
                int                     fd;
 
-               fd = XLogFileInit(startLogSegNo);
+               fd = XLogFileInit(startLogSegNo, ThisTimeLineID);
 
                if (close(fd) != 0)
                {
@@ -10864,11 +10875,13 @@ assign_xlog_sync_method(int new_sync_method, void *extra)
  * 'segno' is for error reporting purposes.
  */
 void
-issue_xlog_fsync(int fd, XLogSegNo segno)
+issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
 {
        char       *msg = NULL;
        instr_time      start;
 
+       Assert(tli != 0);
+
        /*
         * Quick exit if fsync is disabled or write() has already synced the WAL
         * file.
@@ -10917,8 +10930,7 @@ issue_xlog_fsync(int fd, XLogSegNo segno)
                char            xlogfname[MAXFNAMELEN];
                int                     save_errno = errno;
 
-               XLogFileName(xlogfname, ThisTimeLineID, segno,
-                                        wal_segment_size);
+               XLogFileName(xlogfname, tli, segno, wal_segment_size);
                errno = save_errno;
                ereport(PANIC,
                                (errcode_for_file_access(),
index b90e5ca98ea5e76e7912100175af793fc34a67ca..7a7eb3784e7c35acb7294b54315ce7ff1a6aa9e7 100644 (file)
@@ -122,10 +122,12 @@ static StringInfoData incoming_message;
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
 static void WalRcvDie(int code, Datum arg);
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
-static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvClose(XLogRecPtr recptr);
+static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
+                                                                TimeLineID tli);
+static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
+                                                       TimeLineID tli);
+static void XLogWalRcvFlush(bool dying, TimeLineID tli);
+static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -255,7 +257,7 @@ WalReceiverMain(void)
        pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
 
        /* Arrange to clean up at walreceiver exit */
-       on_shmem_exit(WalRcvDie, 0);
+       on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
 
        /* Properly accept or ignore signals the postmaster might send us */
        pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
@@ -394,7 +396,6 @@ WalReceiverMain(void)
                options.startpoint = startpoint;
                options.slotname = slotname[0] != '\0' ? slotname : NULL;
                options.proto.physical.startpointTLI = startpointTLI;
-               ThisTimeLineID = startpointTLI;
                if (walrcv_startstreaming(wrconn, &options))
                {
                        if (first_stream)
@@ -462,7 +463,8 @@ WalReceiverMain(void)
                                                         */
                                                        last_recv_timestamp = GetCurrentTimestamp();
                                                        ping_sent = false;
-                                                       XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
+                                                       XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
+                                                                                                startpointTLI);
                                                }
                                                else if (len == 0)
                                                        break;
@@ -487,7 +489,7 @@ WalReceiverMain(void)
                                         * let the startup process and primary server know about
                                         * them.
                                         */
-                                       XLogWalRcvFlush(false);
+                                       XLogWalRcvFlush(false, startpointTLI);
                                }
 
                                /* Check if we need to exit the streaming loop. */
@@ -608,7 +610,7 @@ WalReceiverMain(void)
                {
                        char            xlogfname[MAXFNAMELEN];
 
-                       XLogWalRcvFlush(false);
+                       XLogWalRcvFlush(false, startpointTLI);
                        XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
                        if (close(recvFile) != 0)
                                ereport(PANIC,
@@ -776,9 +778,12 @@ static void
 WalRcvDie(int code, Datum arg)
 {
        WalRcvData *walrcv = WalRcv;
+       TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
+
+       Assert(*startpointTLI_p != 0);
 
        /* Ensure that all WAL records received are flushed to disk */
-       XLogWalRcvFlush(true);
+       XLogWalRcvFlush(true, *startpointTLI_p);
 
        /* Mark ourselves inactive in shared memory */
        SpinLockAcquire(&walrcv->mutex);
@@ -808,7 +813,7 @@ WalRcvDie(int code, Datum arg)
  * Accept the message from XLOG stream, and process it.
  */
 static void
-XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
+XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
 {
        int                     hdrlen;
        XLogRecPtr      dataStart;
@@ -838,7 +843,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 
                                buf += hdrlen;
                                len -= hdrlen;
-                               XLogWalRcvWrite(buf, len, dataStart);
+                               XLogWalRcvWrite(buf, len, dataStart, tli);
                                break;
                        }
                case 'k':                               /* Keepalive */
@@ -875,25 +880,27 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
  * Write XLOG data to disk.
  */
 static void
-XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
+XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 {
        int                     startoff;
        int                     byteswritten;
 
+       Assert(tli != 0);
+
        while (nbytes > 0)
        {
                int                     segbytes;
 
                /* Close the current segment if it's completed */
                if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-                       XLogWalRcvClose(recptr);
+                       XLogWalRcvClose(recptr, tli);
 
                if (recvFile < 0)
                {
                        /* Create/use new log file */
                        XLByteToSeg(recptr, recvSegNo, wal_segment_size);
-                       recvFile = XLogFileInit(recvSegNo);
-                       recvFileTLI = ThisTimeLineID;
+                       recvFile = XLogFileInit(recvSegNo, tli);
+                       recvFileTLI = tli;
                }
 
                /* Calculate the start offset of the received logs */
@@ -946,7 +953,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
         * segment is received and written.
         */
        if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-               XLogWalRcvClose(recptr);
+               XLogWalRcvClose(recptr, tli);
 }
 
 /*
@@ -956,13 +963,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
  * an error, so we skip sending a reply in that case.
  */
 static void
-XLogWalRcvFlush(bool dying)
+XLogWalRcvFlush(bool dying, TimeLineID tli)
 {
+       Assert(tli != 0);
+
        if (LogstreamResult.Flush < LogstreamResult.Write)
        {
                WalRcvData *walrcv = WalRcv;
 
-               issue_xlog_fsync(recvFile, recvSegNo);
+               issue_xlog_fsync(recvFile, recvSegNo, tli);
 
                LogstreamResult.Flush = LogstreamResult.Write;
 
@@ -972,7 +981,7 @@ XLogWalRcvFlush(bool dying)
                {
                        walrcv->latestChunkStart = walrcv->flushedUpto;
                        walrcv->flushedUpto = LogstreamResult.Flush;
-                       walrcv->receivedTLI = ThisTimeLineID;
+                       walrcv->receivedTLI = tli;
                }
                SpinLockRelease(&walrcv->mutex);
 
@@ -1009,17 +1018,18 @@ XLogWalRcvFlush(bool dying)
  * Create an archive notification file since the segment is known completed.
  */
 static void
-XLogWalRcvClose(XLogRecPtr recptr)
+XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
 {
        char            xlogfname[MAXFNAMELEN];
 
        Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
+       Assert(tli != 0);
 
        /*
         * fsync() and close current file before we switch to next one. We would
         * otherwise have to reopen this file to fsync it later
         */
-       XLogWalRcvFlush(false);
+       XLogWalRcvFlush(false, tli);
 
        XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 
index 2941265017fc3d0cd44864e7cd2b2e02bf647de0..c36d688401bed17ac7e1b63c7d7aeb9b8e18e714 100644 (file)
@@ -262,7 +262,7 @@ extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
 extern void XLogFlush(XLogRecPtr RecPtr);
 extern bool XLogBackgroundFlush(void);
 extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
-extern int     XLogFileInit(XLogSegNo segno);
+extern int     XLogFileInit(XLogSegNo segno, TimeLineID tli);
 extern int     XLogFileOpen(XLogSegNo segno);
 
 extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
@@ -274,7 +274,7 @@ extern void xlog_redo(XLogReaderState *record);
 extern void xlog_desc(StringInfo buf, XLogReaderState *record);
 extern const char *xlog_identify(uint8 info);
 
-extern void issue_xlog_fsync(int fd, XLogSegNo segno);
+extern void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli);
 
 extern bool RecoveryInProgress(void);
 extern RecoveryState GetRecoveryState(void);