walsender.c: Eliminate remaining uses of ThisTimeLineID.
authorRobert Haas <[email protected]>
Thu, 28 Oct 2021 17:14:24 +0000 (13:14 -0400)
committerRobert Haas <[email protected]>
Fri, 29 Oct 2021 17:38:50 +0000 (13:38 -0400)
IdentifySystem and StartReplication needs to know the current time
line ID, but it does not need to store the value into a global
variable, so don't do that.

logical_read_xlog_page is already relying on the timeline ID not
changing currently, so eliminat the use of the global variable
here, but also add some comments about the potential hazard if we
ever allow logical decoding on standbys.

To make all this work, GetStandbyFlushRecPtr now returns the TLI
via an out parameter.

src/backend/replication/walsender.c

index 2eb0006d92e4d89a6d1c71e5ca70b2c469d32950..854c34e016e6350a275542f4dc5624e8dfa32f28 100644 (file)
@@ -230,7 +230,7 @@ static void WalSndShutdown(void) pg_attribute_noreturn();
 static void XLogSendPhysical(void);
 static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
-static XLogRecPtr GetStandbyFlushRecPtr(void);
+static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
 static void IdentifySystem(void);
 static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
@@ -385,6 +385,7 @@ IdentifySystem(void)
        TupleDesc       tupdesc;
        Datum           values[4];
        bool            nulls[4];
+       TimeLineID      currTLI;
 
        /*
         * Reply with a result set with one row, four columns. First col is system
@@ -397,12 +398,9 @@ IdentifySystem(void)
 
        am_cascading_walsender = RecoveryInProgress();
        if (am_cascading_walsender)
-       {
-               /* this also updates ThisTimeLineID */
-               logptr = GetStandbyFlushRecPtr();
-       }
+               logptr = GetStandbyFlushRecPtr(&currTLI);
        else
-               logptr = GetFlushRecPtr(&ThisTimeLineID);
+               logptr = GetFlushRecPtr(&currTLI);
 
        snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
 
@@ -441,7 +439,7 @@ IdentifySystem(void)
        values[0] = CStringGetTextDatum(sysid);
 
        /* column 2: timeline */
-       values[1] = Int32GetDatum(ThisTimeLineID);
+       values[1] = Int32GetDatum(currTLI);
 
        /* column 3: wal location */
        values[2] = CStringGetTextDatum(xloc);
@@ -671,6 +669,7 @@ StartReplication(StartReplicationCmd *cmd)
 {
        StringInfoData buf;
        XLogRecPtr      FlushPtr;
+       TimeLineID      FlushTLI;
 
        /* create xlogreader for physical replication */
        xlogreader =
@@ -710,24 +709,20 @@ StartReplication(StartReplicationCmd *cmd)
 
        /*
         * Select the timeline. If it was given explicitly by the client, use
-        * that. Otherwise use the timeline of the last replayed record, which is
-        * kept in ThisTimeLineID.
+        * that. Otherwise use the timeline of the last replayed record.
         */
        am_cascading_walsender = RecoveryInProgress();
        if (am_cascading_walsender)
-       {
-               /* this also updates ThisTimeLineID */
-               FlushPtr = GetStandbyFlushRecPtr();
-       }
+               FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
        else
-               FlushPtr = GetFlushRecPtr(&ThisTimeLineID);
+               FlushPtr = GetFlushRecPtr(&FlushTLI);
 
        if (cmd->timeline != 0)
        {
                XLogRecPtr      switchpoint;
 
                sendTimeLine = cmd->timeline;
-               if (sendTimeLine == ThisTimeLineID)
+               if (sendTimeLine == FlushTLI)
                {
                        sendTimeLineIsHistoric = false;
                        sendTimeLineValidUpto = InvalidXLogRecPtr;
@@ -742,7 +737,7 @@ StartReplication(StartReplicationCmd *cmd)
                         * Check that the timeline the client requested exists, and the
                         * requested start location is on that timeline.
                         */
-                       timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+                       timeLineHistory = readTimeLineHistory(FlushTLI);
                        switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
                                                                                 &sendTimeLineNextTLI);
                        list_free_deep(timeLineHistory);
@@ -781,7 +776,7 @@ StartReplication(StartReplicationCmd *cmd)
        }
        else
        {
-               sendTimeLine = ThisTimeLineID;
+               sendTimeLine = FlushTLI;
                sendTimeLineValidUpto = InvalidXLogRecPtr;
                sendTimeLineIsHistoric = false;
        }
@@ -909,9 +904,16 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
        int                     count;
        WALReadError errinfo;
        XLogSegNo       segno;
+       TimeLineID      currTLI = GetCurrentTimeLineID();
 
-       XLogReadDetermineTimeline(state, targetPagePtr, reqLen, ThisTimeLineID);
-       sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
+       /*
+        * Since logical decoding is only permitted on a primary server, we know
+        * that the current timeline ID can't be changing any more. If we did this
+        * on a standby, we'd have to worry about the values we compute here
+        * becoming invalid due to a promotion or timeline change.
+        */
+       XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
+       sendTimeLineIsHistoric = (state->currTLI != currTLI);
        sendTimeLine = state->currTLI;
        sendTimeLineValidUpto = state->currTLIValidUntil;
        sendTimeLineNextTLI = state->nextTLI;
@@ -2683,6 +2685,8 @@ XLogSendPhysical(void)
        }
        else if (am_cascading_walsender)
        {
+               TimeLineID      SendRqstTLI;
+
                /*
                 * Streaming the latest timeline on a standby.
                 *
@@ -2702,14 +2706,12 @@ XLogSendPhysical(void)
                 */
                bool            becameHistoric = false;
 
-               SendRqstPtr = GetStandbyFlushRecPtr();
+               SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
 
                if (!RecoveryInProgress())
                {
-                       /*
-                        * We have been promoted. RecoveryInProgress() updated
-                        * ThisTimeLineID to the new current timeline.
-                        */
+                       /* We have been promoted. */
+                       SendRqstTLI = GetCurrentTimeLineID();
                        am_cascading_walsender = false;
                        becameHistoric = true;
                }
@@ -2717,10 +2719,9 @@ XLogSendPhysical(void)
                {
                        /*
                         * Still a cascading standby. But is the timeline we're sending
-                        * still the one recovery is recovering from? ThisTimeLineID was
-                        * updated by the GetStandbyFlushRecPtr() call above.
+                        * still the one recovery is recovering from?
                         */
-                       if (sendTimeLine != ThisTimeLineID)
+                       if (sendTimeLine != SendRqstTLI)
                                becameHistoric = true;
                }
 
@@ -2733,7 +2734,7 @@ XLogSendPhysical(void)
                         */
                        List       *history;
 
-                       history = readTimeLineHistory(ThisTimeLineID);
+                       history = readTimeLineHistory(SendRqstTLI);
                        sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
 
                        Assert(sendTimeLine < sendTimeLineNextTLI);
@@ -3069,11 +3070,11 @@ WalSndDone(WalSndSendDataCallback send_data)
  * can be sent to the standby. This should only be called when in recovery,
  * ie. we're streaming to a cascaded standby.
  *
- * As a side-effect, ThisTimeLineID is updated to the TLI of the last
+ * As a side-effect, *tli is updated to the TLI of the last
  * replayed WAL record.
  */
 static XLogRecPtr
-GetStandbyFlushRecPtr(void)
+GetStandbyFlushRecPtr(TimeLineID *tli)
 {
        XLogRecPtr      replayPtr;
        TimeLineID      replayTLI;
@@ -3090,10 +3091,10 @@ GetStandbyFlushRecPtr(void)
        receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-       ThisTimeLineID = replayTLI;
+       *tli = replayTLI;
 
        result = replayPtr;
-       if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
+       if (receiveTLI == replayTLI && receivePtr > replayPtr)
                result = receivePtr;
 
        return result;