diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index d7e83a937c991e154a16438c15df55387e0b5119..d808607ecdbb31c90f3a20c14eb01b0e6ca0fb12 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -453,6 +453,7 @@ typedef struct XLogCtlData * replayed, otherwise it's equal to lastReplayedEndRecPtr. */ XLogRecPtr lastReplayedEndRecPtr; + TimeLineID lastReplayedTLI; XLogRecPtr replayEndRecPtr; TimeLineID replayEndTLI; /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ @@ -3829,7 +3830,6 @@ rescanLatestTimeLine(void) TimeLineID newtarget; TimeLineHistoryEntry *currentTle = NULL; /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; newtarget = findNewestTimeLine(recoveryTargetTLI); if (newtarget == recoveryTargetTLI) @@ -3888,20 +3888,10 @@ rescanLatestTimeLine(void) list_free_deep(expectedTLEs); expectedTLEs = newExpectedTLEs; - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->RecoveryTargetTLI = recoveryTargetTLI; - SpinLockRelease(&xlogctl->info_lck); - ereport(LOG, (errmsg("new target timeline is %u", recoveryTargetTLI))); - /* - * Wake up any walsenders to notice that we have a new target timeline. - */ - if (AllowCascadeReplication()) - WalSndWakeup(); - return true; } @@ -5389,11 +5379,9 @@ StartupXLOG(void) ControlFile->minRecoveryPointTLI))); /* - * Save the selected recovery target timeline ID and - * archive_cleanup_command in shared memory so that other processes can - * see them + * Save archive_cleanup_command in shared memory so that other processes + * can see it. */ - XLogCtl->RecoveryTargetTLI = recoveryTargetTLI; strncpy(XLogCtl->archiveCleanupCommand, archiveCleanupCommand ? archiveCleanupCommand : "", sizeof(XLogCtl->archiveCleanupCommand)); @@ -5770,6 +5758,7 @@ StartupXLOG(void) xlogctl->replayEndRecPtr = ReadRecPtr; xlogctl->replayEndTLI = ThisTimeLineID; xlogctl->lastReplayedEndRecPtr = EndRecPtr; + xlogctl->lastReplayedEndRecPtr = ThisTimeLineID; xlogctl->recoveryLastXTime = 0; xlogctl->currentChunkStartTime = 0; xlogctl->recoveryPause = false; @@ -5837,6 +5826,7 @@ StartupXLOG(void) */ do { + bool switchedTLI = false; #ifdef WAL_DEBUG if (XLOG_DEBUG || (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) || @@ -5942,6 +5932,7 @@ StartupXLOG(void) /* Following WAL records should be run with new TLI */ ThisTimeLineID = newTLI; + switchedTLI = true; } } @@ -5974,6 +5965,7 @@ StartupXLOG(void) */ SpinLockAcquire(&xlogctl->info_lck); xlogctl->lastReplayedEndRecPtr = EndRecPtr; + xlogctl->lastReplayedTLI = ThisTimeLineID; SpinLockRelease(&xlogctl->info_lck); /* Remember this record as the last-applied one */ @@ -5982,6 +5974,13 @@ StartupXLOG(void) /* Allow read-only connections if we're consistent now */ CheckRecoveryConsistency(); + /* + * If this record was a timeline switch, wake up any + * walsenders to notice that we are on a new timeline. + */ + if (switchedTLI && AllowCascadeReplication()) + WalSndWakeup(); + /* Exit loop if we reached inclusive recovery target */ if (!recoveryContinue) break; @@ -6822,23 +6821,6 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch) *epoch = ckptXidEpoch; } -/* - * GetRecoveryTargetTLI - get the current recovery target timeline ID - */ -TimeLineID -GetRecoveryTargetTLI(void) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - TimeLineID result; - - SpinLockAcquire(&xlogctl->info_lck); - result = xlogctl->RecoveryTargetTLI; - SpinLockRelease(&xlogctl->info_lck); - - return result; -} - /* * This must be called ONCE during postmaster or standalone-backend shutdown */ @@ -7642,10 +7624,16 @@ CreateRestartPoint(int flags) */ if (_logSegNo) { + XLogRecPtr receivePtr; + XLogRecPtr replayPtr; XLogRecPtr endptr; - /* Get the current (or recent) end of xlog */ - endptr = GetStandbyFlushRecPtr(); + /* + * Get the current end of xlog replayed or received, whichever is later. + */ + receivePtr = GetWalRcvWriteRecPtr(NULL, NULL); + replayPtr = GetXLogReplayRecPtr(NULL); + endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); _logSegNo--; @@ -9109,38 +9097,23 @@ do_pg_abort_backup(void) * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr -GetXLogReplayRecPtr(void) +GetXLogReplayRecPtr(TimeLineID *replayTLI) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr recptr; + TimeLineID tli; SpinLockAcquire(&xlogctl->info_lck); recptr = xlogctl->lastReplayedEndRecPtr; + tli = xlogctl->lastReplayedTLI; SpinLockRelease(&xlogctl->info_lck); + if (replayTLI) + *replayTLI = tli; return recptr; } -/* - * Get current standby flush position, ie, the last WAL position - * known to be fsync'd to disk in standby. - */ -XLogRecPtr -GetStandbyFlushRecPtr(void) -{ - XLogRecPtr receivePtr; - XLogRecPtr replayPtr; - - receivePtr = GetWalRcvWriteRecPtr(NULL, NULL); - replayPtr = GetXLogReplayRecPtr(); - - if (XLByteLT(receivePtr, replayPtr)) - return replayPtr; - else - return receivePtr; -} - /* * Get latest WAL insert pointer */ diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index e91bdc3f4af93b6b56367a0d2010651f49d95c4c..47624c3e75fdc0b922ad83ff58eda2e60c340233 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -248,7 +248,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS) XLogRecPtr recptr; char location[MAXFNAMELEN]; - recptr = GetXLogReplayRecPtr(); + recptr = GetXLogReplayRecPtr(NULL); if (recptr == 0) PG_RETURN_NULL(); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 303edb75a32061c0d268c1e5fe6334be4f1f9049..a0960f2ceab20bc44fe8b41acdbda30df78fb03e 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -370,7 +370,7 @@ WalReceiverMain(void) first_stream = false; /* Initialize LogstreamResult and buffers for processing messages */ - LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(); + LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); initStringInfo(&incoming_message); @@ -1026,7 +1026,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(); + applyPtr = GetXLogReplayRecPtr(NULL); resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index a8ccfc66398bab64b90c8443182a0a5f1ca10d43..1aaafbb49fc0424ecb8aef6fa514c48e15950189 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -324,7 +324,7 @@ GetReplicationApplyDelay(void) receivePtr = walrcv->receivedUpto; SpinLockRelease(&walrcv->mutex); - replayPtr = GetXLogReplayRecPtr(); + replayPtr = GetXLogReplayRecPtr(NULL); if (XLByteEQ(receivePtr, replayPtr)) return 0; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index aec57f5535fc42f17330fc066c1e10175663f963..29a25eb9035e1234d8473a7ed0b86804e39fb375 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -169,6 +169,7 @@ static void WalSndLoop(void); static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); static void XLogSend(bool *caughtup); +static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID currentTLI); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); @@ -190,12 +191,6 @@ InitWalSender(void) /* Set up resource owner */ CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner"); - /* - * Use the recovery target timeline ID during recovery - */ - if (am_cascading_walsender) - ThisTimeLineID = GetRecoveryTargetTLI(); - /* * Let postmaster know that we're a WAL sender. Once we've declared us as * a WAL sender process, postmaster will let us outlive the bgwriter and @@ -254,8 +249,8 @@ IdentifySystem(void) am_cascading_walsender = RecoveryInProgress(); if (am_cascading_walsender) { - logptr = GetStandbyFlushRecPtr(); - ThisTimeLineID = GetRecoveryTargetTLI(); + /* this also updates ThisTimeLineID */ + logptr = GetStandbyFlushRecPtr(0); } else logptr = GetInsertRecPtr(); @@ -409,6 +404,7 @@ static void StartReplication(StartReplicationCmd *cmd) { StringInfoData buf; + XLogRecPtr FlushPtr; /* * We assume here that we're logging enough information in the WAL for @@ -421,8 +417,17 @@ StartReplication(StartReplicationCmd *cmd) /* * Select the timeline. If it was given explicitly by the client, use - * that. Otherwise use the current ThisTimeLineID. + * that. Otherwise use the timeline of the last replayed record, which + * is kept in ThisTimeLineID. */ + if (am_cascading_walsender) + { + /* this also updates ThisTimeLineID */ + FlushPtr = GetStandbyFlushRecPtr(0); + } + else + FlushPtr = GetFlushRecPtr(); + if (cmd->timeline != 0) { XLogRecPtr switchpoint; @@ -494,7 +499,6 @@ StartReplication(StartReplicationCmd *cmd) if (!sendTimeLineIsHistoric || XLByteLT(cmd->startpoint, sendTimeLineValidUpto)) { - XLogRecPtr FlushPtr; /* * When we first start replication the standby will be behind the primary. * For some applications, for example, synchronous replication, it is @@ -516,10 +520,6 @@ StartReplication(StartReplicationCmd *cmd) * Don't allow a request to stream from a future point in WAL that * hasn't been flushed to disk in this server yet. */ - if (am_cascading_walsender) - FlushPtr = GetStandbyFlushRecPtr(); - else - FlushPtr = GetFlushRecPtr(); if (XLByteLT(FlushPtr, cmd->startpoint)) { ereport(ERROR, @@ -1330,7 +1330,7 @@ XLogSend(bool *caughtup) * that gets lost on the master. */ if (am_cascading_walsender) - FlushPtr = GetStandbyFlushRecPtr(); + FlushPtr = GetStandbyFlushRecPtr(sendTimeLine); else FlushPtr = GetFlushRecPtr(); @@ -1347,7 +1347,6 @@ XLogSend(bool *caughtup) if (!sendTimeLineIsHistoric && am_cascading_walsender) { bool becameHistoric = false; - TimeLineID targetTLI; if (!RecoveryInProgress()) { @@ -1355,7 +1354,6 @@ XLogSend(bool *caughtup) * We have been promoted. RecoveryInProgress() updated * ThisTimeLineID to the new current timeline. */ - targetTLI = ThisTimeLineID; am_cascading_walsender = false; becameHistoric = true; } @@ -1363,11 +1361,9 @@ XLogSend(bool *caughtup) { /* * Still a cascading standby. But is the timeline we're sending - * still the recovery target timeline? + * still the one recovery is recovering from? */ - targetTLI = GetRecoveryTargetTLI(); - - if (targetTLI != sendTimeLine) + if (sendTimeLine != ThisTimeLineID) becameHistoric = true; } @@ -1380,7 +1376,7 @@ XLogSend(bool *caughtup) */ List *history; - history = readTimeLineHistory(targetTLI); + history = readTimeLineHistory(ThisTimeLineID); sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history); Assert(XLByteLE(sentPtr, sendTimeLineValidUpto)); list_free_deep(history); @@ -1521,6 +1517,48 @@ XLogSend(bool *caughtup) return; } +/* + * Returns the latest point in WAL that has been safely flushed to disk, and + * can be sent to the standby. This should only be called when in recovery, + * ie. we're streaming to a cascaded standby. + * + * If currentTLI is non-zero, the function returns the point that the WAL on + * the given timeline has been flushed upto. If recovery has already switched + * to a different timeline, InvalidXLogRecPtr is returned. + * + * As a side-effect, ThisTimeLineID is updated to the TLI of the last + * replayed WAL record. + */ +static XLogRecPtr +GetStandbyFlushRecPtr(TimeLineID currentTLI) +{ + XLogRecPtr replayPtr; + TimeLineID replayTLI; + XLogRecPtr receivePtr; + TimeLineID receiveTLI; + XLogRecPtr result; + + /* + * We can safely send what's already been replayed. Also, if walreceiver + * is streaming WAL from the same timeline, we can send anything that + * it has streamed, but hasn't been replayed yet. + */ + + receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI); + replayPtr = GetXLogReplayRecPtr(&replayTLI); + + ThisTimeLineID = replayTLI; + + if (currentTLI != replayTLI && currentTLI != 0) + return InvalidXLogRecPtr; + + result = replayPtr; + if (receiveTLI == currentTLI && receivePtr > replayPtr) + result = receivePtr; + + return result; +} + /* * Request walsenders to reload the currently-open WAL file */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index c8cd37981c58785bcb3326034fd533051124d820..95d01b974446b27d5501df0d526ceace1b833a2e 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -283,8 +283,7 @@ extern bool RecoveryInProgress(void); extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); -extern XLogRecPtr GetXLogReplayRecPtr(void); -extern XLogRecPtr GetStandbyFlushRecPtr(void); +extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); extern bool RecoveryIsPaused(void);