Instead, pass the TLI around explicitly, as a function parameter.
Since this calls a few xlog.c functions that used ThisTimeLineID,
it was necessary to also change those functions to take a
TimeLineID as a parameter.
static bool XLogCheckpointNeeded(XLogSegNo new_segno);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
- bool find_free, XLogSegNo max_segno);
+ bool find_free, XLogSegNo max_segno,
+ TimeLineID tli);
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
XLogSource source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
wal_segment_size);
/* create/use new log file */
- openLogFile = XLogFileInit(openLogSegNo);
+ openLogFile = XLogFileInit(openLogSegNo, ThisTimeLineID);
ReserveExternalFD();
}
*/
if (finishing_seg)
{
- issue_xlog_fsync(openLogFile, openLogSegNo);
+ issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID);
/* signal that we need to wakeup walsenders later */
WalSndWakeupRequest();
ReserveExternalFD();
}
- issue_xlog_fsync(openLogFile, openLogSegNo);
+ issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID);
}
/* signal that we need to wakeup walsenders later */
* succeed. (This is weird, but it's efficient for the callers.)
*/
static int
-XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
+XLogFileInitInternal(XLogSegNo logsegno, TimeLineID logtli,
+ bool *added, char *path)
{
char tmppath[MAXPGPATH];
PGAlignedXLogBlock zbuffer;
int fd;
int save_errno;
- XLogFilePath(path, ThisTimeLineID, logsegno, wal_segment_size);
+ Assert(logtli != 0);
+
+ XLogFilePath(path, logtli, logsegno, wal_segment_size);
/*
* Try to use existent file (checkpoint maker may have created it already)
* CheckPointSegments.
*/
max_segno = logsegno + CheckPointSegments;
- if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno))
+ if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno,
+ logtli))
{
*added = true;
elog(DEBUG2, "done creating and filling new WAL file");
* in a critical section.
*/
int
-XLogFileInit(XLogSegNo logsegno)
+XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
{
bool ignore_added;
char path[MAXPGPATH];
int fd;
- fd = XLogFileInitInternal(logsegno, &ignore_added, path);
+ Assert(logtli != 0);
+
+ fd = XLogFileInitInternal(logsegno, logtli, &ignore_added, path);
if (fd >= 0)
return fd;
/*
* Now move the segment into place with its final name.
*/
- if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0))
+ if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, ThisTimeLineID))
elog(ERROR, "InstallXLogFileSegment should not have failed");
}
* free slot is found between *segno and max_segno. (Ignored when find_free
* is false.)
*
+ * tli: The timeline on which the new segment should be installed.
+ *
* Returns true if the file was installed successfully. false indicates that
* max_segno limit was exceeded, the startup process has disabled this
* function for now, or an error occurred while renaming the file into place.
*/
static bool
InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
- bool find_free, XLogSegNo max_segno)
+ bool find_free, XLogSegNo max_segno, TimeLineID tli)
{
char path[MAXPGPATH];
struct stat stat_buf;
- XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size);
+ Assert(tli != 0);
+
+ XLogFilePath(path, tli, *segno, wal_segment_size);
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (!XLogCtl->InstallXLogFileSegmentActive)
return false;
}
(*segno)++;
- XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size);
+ XLogFilePath(path, tli, *segno, wal_segment_size);
}
}
if (offset >= (uint32) (0.75 * wal_segment_size))
{
_logSegNo++;
- lf = XLogFileInitInternal(_logSegNo, &added, path);
+ lf = XLogFileInitInternal(_logSegNo, ThisTimeLineID, &added, path);
if (lf >= 0)
close(lf);
if (added)
XLogCtl->InstallXLogFileSegmentActive && /* callee rechecks this */
lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
InstallXLogFileSegment(endlogSegNo, path,
- true, recycleSegNo))
+ true, recycleSegNo, ThisTimeLineID))
{
ereport(DEBUG2,
(errmsg_internal("recycled write-ahead log file \"%s\"",
record->xl_crc = crc;
/* Create first XLOG segment file */
- openLogFile = XLogFileInit(1);
+ openLogFile = XLogFileInit(1, ThisTimeLineID);
/*
* We needn't bother with Reserve/ReleaseExternalFD here, since we'll
*/
int fd;
- fd = XLogFileInit(startLogSegNo);
+ fd = XLogFileInit(startLogSegNo, ThisTimeLineID);
if (close(fd) != 0)
{
* 'segno' is for error reporting purposes.
*/
void
-issue_xlog_fsync(int fd, XLogSegNo segno)
+issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
{
char *msg = NULL;
instr_time start;
+ Assert(tli != 0);
+
/*
* Quick exit if fsync is disabled or write() has already synced the WAL
* file.
char xlogfname[MAXFNAMELEN];
int save_errno = errno;
- XLogFileName(xlogfname, ThisTimeLineID, segno,
- wal_segment_size);
+ XLogFileName(xlogfname, tli, segno, wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
static void WalRcvDie(int code, Datum arg);
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
-static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvClose(XLogRecPtr recptr);
+static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
+ TimeLineID tli);
+static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
+ TimeLineID tli);
+static void XLogWalRcvFlush(bool dying, TimeLineID tli);
+static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
/* Arrange to clean up at walreceiver exit */
- on_shmem_exit(WalRcvDie, 0);
+ on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
/* Properly accept or ignore signals the postmaster might send us */
pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
options.startpoint = startpoint;
options.slotname = slotname[0] != '\0' ? slotname : NULL;
options.proto.physical.startpointTLI = startpointTLI;
- ThisTimeLineID = startpointTLI;
if (walrcv_startstreaming(wrconn, &options))
{
if (first_stream)
*/
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
- XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
+ XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
+ startpointTLI);
}
else if (len == 0)
break;
* let the startup process and primary server know about
* them.
*/
- XLogWalRcvFlush(false);
+ XLogWalRcvFlush(false, startpointTLI);
}
/* Check if we need to exit the streaming loop. */
{
char xlogfname[MAXFNAMELEN];
- XLogWalRcvFlush(false);
+ XLogWalRcvFlush(false, startpointTLI);
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
if (close(recvFile) != 0)
ereport(PANIC,
WalRcvDie(int code, Datum arg)
{
WalRcvData *walrcv = WalRcv;
+ TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
+
+ Assert(*startpointTLI_p != 0);
/* Ensure that all WAL records received are flushed to disk */
- XLogWalRcvFlush(true);
+ XLogWalRcvFlush(true, *startpointTLI_p);
/* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
* Accept the message from XLOG stream, and process it.
*/
static void
-XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
+XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
{
int hdrlen;
XLogRecPtr dataStart;
buf += hdrlen;
len -= hdrlen;
- XLogWalRcvWrite(buf, len, dataStart);
+ XLogWalRcvWrite(buf, len, dataStart, tli);
break;
}
case 'k': /* Keepalive */
* Write XLOG data to disk.
*/
static void
-XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
+XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
{
int startoff;
int byteswritten;
+ Assert(tli != 0);
+
while (nbytes > 0)
{
int segbytes;
/* Close the current segment if it's completed */
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
- XLogWalRcvClose(recptr);
+ XLogWalRcvClose(recptr, tli);
if (recvFile < 0)
{
/* Create/use new log file */
XLByteToSeg(recptr, recvSegNo, wal_segment_size);
- recvFile = XLogFileInit(recvSegNo);
- recvFileTLI = ThisTimeLineID;
+ recvFile = XLogFileInit(recvSegNo, tli);
+ recvFileTLI = tli;
}
/* Calculate the start offset of the received logs */
* segment is received and written.
*/
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
- XLogWalRcvClose(recptr);
+ XLogWalRcvClose(recptr, tli);
}
/*
* an error, so we skip sending a reply in that case.
*/
static void
-XLogWalRcvFlush(bool dying)
+XLogWalRcvFlush(bool dying, TimeLineID tli)
{
+ Assert(tli != 0);
+
if (LogstreamResult.Flush < LogstreamResult.Write)
{
WalRcvData *walrcv = WalRcv;
- issue_xlog_fsync(recvFile, recvSegNo);
+ issue_xlog_fsync(recvFile, recvSegNo, tli);
LogstreamResult.Flush = LogstreamResult.Write;
{
walrcv->latestChunkStart = walrcv->flushedUpto;
walrcv->flushedUpto = LogstreamResult.Flush;
- walrcv->receivedTLI = ThisTimeLineID;
+ walrcv->receivedTLI = tli;
}
SpinLockRelease(&walrcv->mutex);
* Create an archive notification file since the segment is known completed.
*/
static void
-XLogWalRcvClose(XLogRecPtr recptr)
+XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
{
char xlogfname[MAXFNAMELEN];
Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
+ Assert(tli != 0);
/*
* fsync() and close current file before we switch to next one. We would
* otherwise have to reopen this file to fsync it later
*/
- XLogWalRcvFlush(false);
+ XLogWalRcvFlush(false, tli);
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
extern void XLogFlush(XLogRecPtr RecPtr);
extern bool XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
-extern int XLogFileInit(XLogSegNo segno);
+extern int XLogFileInit(XLogSegNo segno, TimeLineID tli);
extern int XLogFileOpen(XLogSegNo segno);
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
extern void xlog_desc(StringInfo buf, XLogReaderState *record);
extern const char *xlog_identify(uint8 info);
-extern void issue_xlog_fsync(int fd, XLogSegNo segno);
+extern void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli);
extern bool RecoveryInProgress(void);
extern RecoveryState GetRecoveryState(void);