diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3e4da46ac24a8817b8f4cf03053e74a81aea5c90..17ceda3b1ad311776e108307a3f4cc2f1247a2b9 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -157,6 +157,9 @@ HotStandbyState standbyState = STANDBY_DISABLED; static XLogRecPtr LastRec; +/* Local copy of WalRcv->receivedUpto */ +static XLogRecPtr receivedUpto = 0; + /* * During recovery, lastFullPageWrites keeps track of full_page_writes that * the replayed WAL records indicate. It's initialized with full_page_writes @@ -538,6 +541,7 @@ static int readFile = -1; static XLogSegNo readSegNo = 0; static uint32 readOff = 0; static uint32 readLen = 0; +static bool readFileHeaderValidated = false; static int readSource = 0; /* XLOG_FROM_* code */ /* @@ -628,6 +632,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources); static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, bool randAccess); +static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, + bool fetching_ckpt); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static void XLogFileClose(void); static void PreallocXlogFiles(XLogRecPtr endptr); @@ -2685,6 +2691,9 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, if (source != XLOG_FROM_STREAM) XLogReceiptTime = GetCurrentTimestamp(); + /* The file header needs to be validated on first access */ + readFileHeaderValidated = false; + return fd; } if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ @@ -9233,12 +9242,9 @@ static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, bool randAccess) { - static XLogRecPtr receivedUpto = 0; - bool switched_segment = false; uint32 targetPageOff; uint32 targetRecOff; XLogSegNo targetSegNo; - static pg_time_t last_fail_time = 0; XLByteToSeg(*RecPtr, targetSegNo); targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; @@ -9283,208 +9289,9 @@ retry: { if (StandbyMode) { - /* - * In standby mode, wait for the requested record to become - * available, either via restore_command succeeding to restore the - * segment, or via walreceiver having streamed the record. - */ - for (;;) - { - if (WalRcvInProgress()) - { - bool havedata; - - /* - * If we find an invalid record in the WAL streamed from - * master, something is seriously wrong. There's little - * chance that the problem will just go away, but PANIC is - * not good for availability either, especially in hot - * standby mode. Disconnect, and retry from - * archive/pg_xlog again. The WAL in the archive should be - * identical to what was streamed, so it's unlikely that - * it helps, but one can hope... - */ - if (failedSources & XLOG_FROM_STREAM) - { - ShutdownWalRcv(); - continue; - } - - /* - * Walreceiver is active, so see if new data has arrived. - * - * We only advance XLogReceiptTime when we obtain fresh - * WAL from walreceiver and observe that we had already - * processed everything before the most recent "chunk" - * that it flushed to disk. In steady state where we are - * keeping up with the incoming data, XLogReceiptTime will - * be updated on each cycle. When we are behind, - * XLogReceiptTime will not advance, so the grace time - * alloted to conflicting queries will decrease. - */ - if (XLByteLT(*RecPtr, receivedUpto)) - havedata = true; - else - { - XLogRecPtr latestChunkStart; - - receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); - if (XLByteLT(*RecPtr, receivedUpto)) - { - havedata = true; - if (!XLByteLT(*RecPtr, latestChunkStart)) - { - XLogReceiptTime = GetCurrentTimestamp(); - SetCurrentChunkStartTime(XLogReceiptTime); - } - } - else - havedata = false; - } - if (havedata) - { - /* - * Great, streamed far enough. Open the file if it's - * not open already. Use XLOG_FROM_STREAM so that - * source info is set correctly and XLogReceiptTime - * isn't changed. - */ - if (readFile < 0) - { - readFile = - XLogFileRead(readSegNo, PANIC, - recoveryTargetTLI, - XLOG_FROM_STREAM, false); - Assert(readFile >= 0); - switched_segment = true; - } - else - { - /* just make sure source info is correct... */ - readSource = XLOG_FROM_STREAM; - XLogReceiptSource = XLOG_FROM_STREAM; - } - break; - } - - /* - * Data not here yet, so check for trigger then sleep for - * five seconds like in the WAL file polling case below. - */ - if (CheckForStandbyTrigger()) - goto retry; - - /* - * Wait for more WAL to arrive, or timeout to be reached - */ - WaitLatch(&XLogCtl->recoveryWakeupLatch, - WL_LATCH_SET | WL_TIMEOUT, - 5000L); - ResetLatch(&XLogCtl->recoveryWakeupLatch); - } - else - { - int sources; - pg_time_t now; - - /* - * Until walreceiver manages to reconnect, poll the - * archive. - */ - if (readFile >= 0) - { - close(readFile); - readFile = -1; - } - /* Reset curFileTLI if random fetch. */ - if (randAccess) - curFileTLI = 0; - - /* - * Try to restore the file from archive, or read an - * existing file from pg_xlog. - */ - sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG; - if (!(sources & ~failedSources)) - { - /* - * We've exhausted all options for retrieving the - * file. Retry. - */ - failedSources = 0; - - /* - * Before we sleep, re-scan for possible new timelines - * if we were requested to recover to the latest - * timeline. - */ - if (recoveryTargetIsLatest) - { - if (rescanLatestTimeLine()) - continue; - } - - /* - * If it hasn't been long since last attempt, sleep to - * avoid busy-waiting. - */ - now = (pg_time_t) time(NULL); - if ((now - last_fail_time) < 5) - { - pg_usleep(1000000L * (5 - (now - last_fail_time))); - now = (pg_time_t) time(NULL); - } - last_fail_time = now; - - /* - * If primary_conninfo is set, launch walreceiver to - * try to stream the missing WAL, before retrying to - * restore from archive/pg_xlog. - * - * If fetching_ckpt is TRUE, RecPtr points to the - * initial checkpoint location. In that case, we use - * RedoStartLSN as the streaming start position - * instead of RecPtr, so that when we later jump - * backwards to start redo at RedoStartLSN, we will - * have the logs streamed already. - */ - if (PrimaryConnInfo) - { - RequestXLogStreaming( - fetching_ckpt ? RedoStartLSN : *RecPtr, - PrimaryConnInfo); - continue; - } - } - /* Don't try to read from a source that just failed */ - sources &= ~failedSources; - readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, - sources); - switched_segment = true; - if (readFile >= 0) - break; - - /* - * Nope, not found in archive and/or pg_xlog. - */ - failedSources |= sources; - - /* - * Check to see if the trigger file exists. Note that we - * do this only after failure, so when you create the - * trigger file, we still finish replaying as much as we - * can from archive and pg_xlog before failover. - */ - if (CheckForStandbyTrigger()) - goto triggered; - } - - /* - * This possibly-long loop needs to handle interrupts of - * startup process. - */ - HandleStartupProcInterrupts(); - } + if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess, + fetching_ckpt)) + goto triggered; } else { @@ -9502,7 +9309,6 @@ retry: sources |= XLOG_FROM_ARCHIVE; readFile = XLogFileReadAnyTLI(readSegNo, emode, sources); - switched_segment = true; if (readFile < 0) return false; } @@ -9533,7 +9339,7 @@ retry: else readLen = XLOG_BLCKSZ; - if (switched_segment && targetPageOff != 0) + if (!readFileHeaderValidated && targetPageOff != 0) { /* * Whenever switching to a new WAL segment, we read the first page of @@ -9582,6 +9388,8 @@ retry: if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode)) goto next_record_is_invalid; + readFileHeaderValidated = true; + Assert(targetSegNo == readSegNo); Assert(targetPageOff == readOff); Assert(targetRecOff < readLen); @@ -9613,6 +9421,213 @@ triggered: return false; } +/* + * In standby mode, wait for the requested record to become available, either + * via restore_command succeeding to restore the segment, or via walreceiver + * having streamed the record (or via someone copying the segment directly to + * pg_xlog, but that is not documented or recommended). + * + * When the requested record becomes available, the function opens the file + * containing it (if not open already), and returns true. When end of standby + * mode is triggered by the user, and there is no more WAL available, returns + * false. + */ +static bool +WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, + bool fetching_ckpt) +{ + static pg_time_t last_fail_time = 0; + + for (;;) + { + if (WalRcvInProgress()) + { + bool havedata; + + /* + * If we find an invalid record in the WAL streamed from master, + * something is seriously wrong. There's little chance that the + * problem will just go away, but PANIC is not good for + * availability either, especially in hot standby mode. + * Disconnect, and retry from archive/pg_xlog again. The WAL in + * the archive should be identical to what was streamed, so it's + * unlikely that it helps, but one can hope... + */ + if (failedSources & XLOG_FROM_STREAM) + { + ShutdownWalRcv(); + continue; + } + + /* + * Walreceiver is active, so see if new data has arrived. + * + * We only advance XLogReceiptTime when we obtain fresh WAL from + * walreceiver and observe that we had already processed + * everything before the most recent "chunk" that it flushed to + * disk. In steady state where we are keeping up with the + * incoming data, XLogReceiptTime will be updated on each cycle. + * When we are behind, XLogReceiptTime will not advance, so the + * grace time allotted to conflicting queries will decrease. + */ + if (XLByteLT(RecPtr, receivedUpto)) + havedata = true; + else + { + XLogRecPtr latestChunkStart; + + receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); + if (XLByteLT(RecPtr, receivedUpto)) + { + havedata = true; + if (!XLByteLT(RecPtr, latestChunkStart)) + { + XLogReceiptTime = GetCurrentTimestamp(); + SetCurrentChunkStartTime(XLogReceiptTime); + } + } + else + havedata = false; + } + if (havedata) + { + /* + * Great, streamed far enough. Open the file if it's not open + * already. Use XLOG_FROM_STREAM so that source info is set + * correctly and XLogReceiptTime isn't changed. + */ + if (readFile < 0) + { + readFile = XLogFileRead(readSegNo, PANIC, + curFileTLI, + XLOG_FROM_STREAM, false); + Assert(readFile >= 0); + } + else + { + /* just make sure source info is correct... */ + readSource = XLOG_FROM_STREAM; + XLogReceiptSource = XLOG_FROM_STREAM; + } + break; + } + + /* + * Data not here yet, so check for trigger then sleep for five + * seconds like in the WAL file polling case below. + */ + if (CheckForStandbyTrigger()) + return false; + + /* + * Wait for more WAL to arrive, or timeout to be reached + */ + WaitLatch(&XLogCtl->recoveryWakeupLatch, + WL_LATCH_SET | WL_TIMEOUT, + 5000L); + ResetLatch(&XLogCtl->recoveryWakeupLatch); + } + else + { + /* + * WAL receiver is not active. Poll the archive. + */ + int sources; + pg_time_t now; + + if (readFile >= 0) + { + close(readFile); + readFile = -1; + } + /* Reset curFileTLI if random fetch. */ + if (randAccess) + curFileTLI = 0; + + /* + * Try to restore the file from archive, or read an existing file + * from pg_xlog. + */ + sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG; + if (!(sources & ~failedSources)) + { + /* + * We've exhausted all options for retrieving the file. Retry. + */ + failedSources = 0; + + /* + * Before we sleep, re-scan for possible new timelines if we + * were requested to recover to the latest timeline. + */ + if (recoveryTargetIsLatest) + { + if (rescanLatestTimeLine()) + continue; + } + + /* + * If it hasn't been long since last attempt, sleep to avoid + * busy-waiting. + */ + now = (pg_time_t) time(NULL); + if ((now - last_fail_time) < 5) + { + pg_usleep(1000000L * (5 - (now - last_fail_time))); + now = (pg_time_t) time(NULL); + } + last_fail_time = now; + + /* + * If primary_conninfo is set, launch walreceiver to try to + * stream the missing WAL, before retrying to restore from + * archive/pg_xlog. + * + * If fetching_ckpt is TRUE, RecPtr points to the initial + * checkpoint location. In that case, we use RedoStartLSN as + * the streaming start position instead of RecPtr, so that + * when we later jump backwards to start redo at RedoStartLSN, + * we will have the logs streamed already. + */ + if (PrimaryConnInfo) + { + XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr; + + RequestXLogStreaming(ptr, PrimaryConnInfo); + continue; + } + } + /* Don't try to read from a source that just failed */ + sources &= ~failedSources; + readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources); + if (readFile >= 0) + break; + + /* + * Nope, not found in archive and/or pg_xlog. + */ + failedSources |= sources; + + /* + * Check to see if the trigger file exists. Note that we do this + * only after failure, so when you create the trigger file, we + * still finish replaying as much as we can from archive and + * pg_xlog before failover. + */ + if (CheckForStandbyTrigger()) + return false; + } + + /* + * This possibly-long loop needs to handle interrupts of startup + * process. + */ + HandleStartupProcInterrupts(); + } + + return true; +} + /* * Determine what log level should be used to report a corrupt WAL record * in the current WAL page, previously read by XLogPageRead().