diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9208bc21d462d51bd975b560a28e27f467fab39e..c8ac97fbf7fac6cddfd7019e19d252da285c501e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -506,12 +506,18 @@ static XLogwrtResult LogwrtResult = {0, 0}; /* * Codes indicating where we got a WAL file from during recovery, or where - * to attempt to get one. These are chosen so that they can be OR'd together - * in a bitmask state variable. + * to attempt to get one. */ -#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */ -#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */ -#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */ +typedef enum +{ + XLOG_FROM_ANY = 0, /* request to read WAL from any source */ + XLOG_FROM_ARCHIVE, /* restored using restore_command */ + XLOG_FROM_PG_XLOG, /* existing file in pg_xlog */ + XLOG_FROM_STREAM, /* streamed from master */ +} XLogSource; + +/* human-readable names for XLogSources, for debugging output */ +static const char *xlogSourceNames[] = { "any", "archive", "pg_xlog", "stream" }; /* * openLogFile is -1 or a kernel FD for an open log file segment. @@ -536,22 +542,28 @@ static XLogSegNo readSegNo = 0; static uint32 readOff = 0; static uint32 readLen = 0; static bool readFileHeaderValidated = false; -static int readSource = 0; /* XLOG_FROM_* code */ +static XLogSource readSource = 0; /* XLOG_FROM_* code */ /* - * Keeps track of which sources we've tried to read the current WAL - * record from and failed. + * Keeps track of which source we're currently reading from. This is + * different from readSource in that this is always set, even when we don't + * currently have a WAL file open. If lastSourceFailed is set, our last + * attempt to read from currentSource failed, and we should try another source + * next. */ -static int failedSources = 0; /* OR of XLOG_FROM_* codes */ +static XLogSource currentSource = 0; /* XLOG_FROM_* code */ +static bool lastSourceFailed = false; /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as * readSource, but readSource gets reset to zero when we don't have data - * to process right now.) + * to process right now. It is also different from currentSource, which + * also changes when we try to read from a source and fail, while + * XLogReceiptSource tracks where we last successfully read some WAL.) */ static TimestampTz XLogReceiptTime = 0; -static int XLogReceiptSource = 0; /* XLOG_FROM_* code */ +static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */ /* Buffer for currently read page (XLOG_BLCKSZ bytes) */ static char *readBuf = NULL; @@ -605,7 +617,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, bool use_lock); static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, int source, bool notexistOk); -static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources); +static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source); static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, bool randAccess); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, @@ -2551,7 +2563,7 @@ XLogFileOpen(XLogSegNo segno) /* * Open a logfile segment for reading (during recovery). * - * If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive. + * If source == XLOG_FROM_ARCHIVE, the segment is retrieved from archive. * Otherwise, it's assumed to be already available in pg_xlog. */ static int @@ -2697,7 +2709,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, * This version searches for the segment with any TLI listed in expectedTLIs. */ static int -XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources) +XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source) { char path[MAXPGPATH]; ListCell *cell; @@ -2720,7 +2732,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources) if (tli < curFileTLI) break; /* don't bother looking at too-old TLIs */ - if (sources & XLOG_FROM_ARCHIVE) + if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE) { fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true); if (fd != -1) @@ -2730,7 +2742,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources) } } - if (sources & XLOG_FROM_PG_XLOG) + if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG) { fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true); if (fd != -1) @@ -3332,7 +3344,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) } /* This is the first try to read this page. */ - failedSources = 0; + lastSourceFailed = false; retry: /* Read the page containing the record */ if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess)) @@ -3545,7 +3557,7 @@ retry: return record; next_record_is_invalid: - failedSources |= readSource; + lastSourceFailed = true; if (readFile >= 0) { @@ -9162,7 +9174,7 @@ CancelBackup(void) * In standby mode, if after a successful return of XLogPageRead() the * caller finds the record it's interested in to be broken, it should * ereport the error with the level determined by - * emode_for_corrupt_record(), and then set "failedSources |= readSource" + * emode_for_corrupt_record(), and then set lastSourceFailed * and call XLogPageRead() again with the same arguments. This lets * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. @@ -9180,7 +9192,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, targetRecOff = (*RecPtr) % XLOG_BLCKSZ; /* Fast exit if we have read the record in the current buffer already */ - if (failedSources == 0 && targetSegNo == readSegNo && + if (!lastSourceFailed && targetSegNo == readSegNo && targetPageOff == readOff && targetRecOff < readLen) return true; @@ -9227,17 +9239,18 @@ retry: /* In archive or crash recovery. */ if (readFile < 0) { - int sources; + int source; /* Reset curFileTLI if random fetch. */ if (randAccess) curFileTLI = 0; - sources = XLOG_FROM_PG_XLOG; if (InArchiveRecovery) - sources |= XLOG_FROM_ARCHIVE; + source = XLOG_FROM_ANY; + else + source = XLOG_FROM_PG_XLOG; - readFile = XLogFileReadAnyTLI(readSegNo, emode, sources); + readFile = XLogFileReadAnyTLI(readSegNo, emode, source); if (readFile < 0) return false; } @@ -9326,7 +9339,7 @@ retry: return true; next_record_is_invalid: - failedSources |= readSource; + lastSourceFailed = true; if (readFile >= 0) close(readFile); @@ -9366,185 +9379,289 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt) { static pg_time_t last_fail_time = 0; + pg_time_t now; + + /*------- + * Standby mode is implemented by a state machine: + * + * 1. Read from archive (XLOG_FROM_ARCHIVE) + * 2. Read from pg_xlog (XLOG_FROM_PG_XLOG) + * 3. Check trigger file + * 4. Read from primary server via walreceiver (XLOG_FROM_STREAM) + * 5. Rescan timelines + * 6. Sleep 5 seconds, and loop back to 1. + * + * Failure to read from the current source advances the state machine to + * the next state. In addition, successfully reading a file from pg_xlog + * moves the state machine from state 2 back to state 1 (we always prefer + * files in the archive over files in pg_xlog). + * + * 'currentSource' indicates the current state. There are no currentSource + * values for "check trigger", "rescan timelines", and "sleep" states, + * those actions are taken when reading from the previous source fails, as + * part of advancing to the next state. + *------- + */ + if (currentSource == 0) + currentSource = XLOG_FROM_ARCHIVE; for (;;) { - if (WalRcvInProgress()) + int oldSource = currentSource; + + /* + * First check if we failed to read from the current source, and + * advance the state machine if so. The failure to read might've + * happened outside this function, e.g when a CRC check fails on a + * record, or within this loop. + */ + if (lastSourceFailed) { - bool havedata; - /* - * If we find an invalid record in the WAL streamed from master, - * something is seriously wrong. There's little chance that the - * problem will just go away, but PANIC is not good for - * availability either, especially in hot standby mode. - * Disconnect, and retry from archive/pg_xlog again. The WAL in - * the archive should be identical to what was streamed, so it's - * unlikely that it helps, but one can hope... - */ - if (failedSources & XLOG_FROM_STREAM) + switch (currentSource) { - ShutdownWalRcv(); - continue; - } + case XLOG_FROM_ARCHIVE: + currentSource = XLOG_FROM_PG_XLOG; + break; - /* - * Walreceiver is active, so see if new data has arrived. - * - * We only advance XLogReceiptTime when we obtain fresh WAL from - * walreceiver and observe that we had already processed - * everything before the most recent "chunk" that it flushed to - * disk. In steady state where we are keeping up with the - * incoming data, XLogReceiptTime will be updated on each cycle. - * When we are behind, XLogReceiptTime will not advance, so the - * grace time allotted to conflicting queries will decrease. - */ - if (XLByteLT(RecPtr, receivedUpto)) - havedata = true; - else - { - XLogRecPtr latestChunkStart; + case XLOG_FROM_PG_XLOG: + /* + * Check to see if the trigger file exists. Note that we do + * this only after failure, so when you create the trigger + * file, we still finish replaying as much as we can from + * archive and pg_xlog before failover. + */ + if (CheckForStandbyTrigger()) + return false; - receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); - if (XLByteLT(RecPtr, receivedUpto)) - { - havedata = true; - if (!XLByteLT(RecPtr, latestChunkStart)) + /* + * If primary_conninfo is set, launch walreceiver to try to + * stream the missing WAL. + * + * If fetching_ckpt is TRUE, RecPtr points to the initial + * checkpoint location. In that case, we use RedoStartLSN + * as the streaming start position instead of RecPtr, so + * that when we later jump backwards to start redo at + * RedoStartLSN, we will have the logs streamed already. + */ + if (PrimaryConnInfo) { - XLogReceiptTime = GetCurrentTimestamp(); - SetCurrentChunkStartTime(XLogReceiptTime); + XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr; + + RequestXLogStreaming(ptr, PrimaryConnInfo); } - } - else - havedata = false; - } - if (havedata) - { - /* - * Great, streamed far enough. Open the file if it's not open - * already. Use XLOG_FROM_STREAM so that source info is set - * correctly and XLogReceiptTime isn't changed. - */ - if (readFile < 0) - { - readFile = XLogFileRead(readSegNo, PANIC, - recoveryTargetTLI, - XLOG_FROM_STREAM, false); - Assert(readFile >= 0); - } - else - { - /* just make sure source info is correct... */ - readSource = XLOG_FROM_STREAM; - XLogReceiptSource = XLOG_FROM_STREAM; - } - break; - } + /* + * Move to XLOG_FROM_STREAM state in either case. We'll get + * immediate failure if we didn't launch walreceiver, and + * move on to the next state. + */ + currentSource = XLOG_FROM_STREAM; + break; - /* - * Data not here yet, so check for trigger then sleep for five - * seconds like in the WAL file polling case below. - */ - if (CheckForStandbyTrigger()) - return false; + case XLOG_FROM_STREAM: + /* + * Failure while streaming. Most likely, we got here because + * streaming replication was terminated, or promotion was + * triggered. But we also get here if we find an invalid + * record in the WAL streamed from master, in which case + * something is seriously wrong. There's little chance that + * the problem will just go away, but PANIC is not good for + * availability either, especially in hot standby mode. So, + * we treat that the same as disconnection, and retry from + * archive/pg_xlog again. The WAL in the archive should be + * identical to what was streamed, so it's unlikely that it + * helps, but one can hope... + */ + /* + * Before we leave XLOG_FROM_STREAM state, make sure that + * walreceiver is not running, so that it won't overwrite + * any WAL that we restore from archive. + */ + if (WalRcvInProgress()) + ShutdownWalRcv(); - /* - * Wait for more WAL to arrive, or timeout to be reached - */ - WaitLatch(&XLogCtl->recoveryWakeupLatch, - WL_LATCH_SET | WL_TIMEOUT, - 5000L); - ResetLatch(&XLogCtl->recoveryWakeupLatch); + /* + * Before we sleep, re-scan for possible new timelines if + * we were requested to recover to the latest timeline. + */ + if (recoveryTargetIsLatest) + { + if (rescanLatestTimeLine()) + { + currentSource = XLOG_FROM_ARCHIVE; + break; + } + } + + /* + * XLOG_FROM_STREAM is the last state in our state machine, + * so we've exhausted all the options for obtaining the + * requested WAL. We're going to loop back and retry from + * the archive, but if it hasn't been long since last + * attempt, sleep 5 seconds to avoid busy-waiting. + */ + now = (pg_time_t) time(NULL); + if ((now - last_fail_time) < 5) + { + pg_usleep(1000000L * (5 - (now - last_fail_time))); + now = (pg_time_t) time(NULL); + } + last_fail_time = now; + currentSource = XLOG_FROM_ARCHIVE; + break; + + default: + elog(ERROR, "unexpected WAL source %d", currentSource); + } } - else + else if (currentSource == XLOG_FROM_PG_XLOG) { /* - * WAL receiver is not active. Poll the archive. + * We just successfully read a file in pg_xlog. We prefer files + * in the archive over ones in pg_xlog, so try the next file + * again from the archive first. */ - int sources; - pg_time_t now; + currentSource = XLOG_FROM_ARCHIVE; + } - if (readFile >= 0) - { - close(readFile); - readFile = -1; - } - /* Reset curFileTLI if random fetch. */ - if (randAccess) - curFileTLI = 0; + if (currentSource != oldSource) + elog(LOG, "switched WAL source from %s to %s after %s", + xlogSourceNames[oldSource], xlogSourceNames[currentSource], + lastSourceFailed ? "failure" : "success"); + + /* + * We've now handled possible failure. Try to read from the chosen + * source. + */ + lastSourceFailed = false; + + switch (currentSource) + { + case XLOG_FROM_ARCHIVE: + case XLOG_FROM_PG_XLOG: + /* Close any old file we might have open. */ + if (readFile >= 0) + { + close(readFile); + readFile = -1; + } + /* Reset curFileTLI if random fetch. */ + if (randAccess) + curFileTLI = 0; - /* - * Try to restore the file from archive, or read an existing file - * from pg_xlog. - */ - sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG; - if (!(sources & ~failedSources)) - { /* - * We've exhausted all options for retrieving the file. Retry. + * Try to restore the file from archive, or read an existing + * file from pg_xlog. */ - failedSources = 0; + readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, currentSource); + if (readFile >= 0) + return true; /* success! */ /* - * Before we sleep, re-scan for possible new timelines if we - * were requested to recover to the latest timeline. + * Nope, not found in archive or pg_xlog. */ - if (recoveryTargetIsLatest) - { - if (rescanLatestTimeLine()) - continue; - } + lastSourceFailed = true; + break; + + case XLOG_FROM_STREAM: + { + bool havedata; /* - * If it hasn't been long since last attempt, sleep to avoid - * busy-waiting. + * Check if WAL receiver is still active. */ - now = (pg_time_t) time(NULL); - if ((now - last_fail_time) < 5) + if (!WalRcvInProgress()) { - pg_usleep(1000000L * (5 - (now - last_fail_time))); - now = (pg_time_t) time(NULL); + lastSourceFailed = true; + break; } - last_fail_time = now; /* - * If primary_conninfo is set, launch walreceiver to try to - * stream the missing WAL, before retrying to restore from - * archive/pg_xlog. + * Walreceiver is active, so see if new data has arrived. * - * If fetching_ckpt is TRUE, RecPtr points to the initial - * checkpoint location. In that case, we use RedoStartLSN as - * the streaming start position instead of RecPtr, so that - * when we later jump backwards to start redo at RedoStartLSN, - * we will have the logs streamed already. + * We only advance XLogReceiptTime when we obtain fresh WAL + * from walreceiver and observe that we had already processed + * everything before the most recent "chunk" that it flushed to + * disk. In steady state where we are keeping up with the + * incoming data, XLogReceiptTime will be updated on each cycle. + * When we are behind, XLogReceiptTime will not advance, so the + * grace time allotted to conflicting queries will decrease. */ - if (PrimaryConnInfo) + if (XLByteLT(RecPtr, receivedUpto)) + havedata = true; + else { - XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr; + XLogRecPtr latestChunkStart; - RequestXLogStreaming(ptr, PrimaryConnInfo); - continue; + receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); + if (XLByteLT(RecPtr, receivedUpto)) + { + havedata = true; + if (!XLByteLT(RecPtr, latestChunkStart)) + { + XLogReceiptTime = GetCurrentTimestamp(); + SetCurrentChunkStartTime(XLogReceiptTime); + } + } + else + havedata = false; + } + if (havedata) + { + /* + * Great, streamed far enough. Open the file if it's not + * open already. Use XLOG_FROM_STREAM so that source info + * is set correctly and XLogReceiptTime isn't changed. + */ + if (readFile < 0) + { + readFile = XLogFileRead(readSegNo, PANIC, + recoveryTargetTLI, + XLOG_FROM_STREAM, false); + Assert(readFile >= 0); + } + else + { + /* just make sure source info is correct... */ + readSource = XLOG_FROM_STREAM; + XLogReceiptSource = XLOG_FROM_STREAM; + return true; + } + break; } - } - /* Don't try to read from a source that just failed */ - sources &= ~failedSources; - readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources); - if (readFile >= 0) - break; - /* - * Nope, not found in archive and/or pg_xlog. - */ - failedSources |= sources; + /* + * Data not here yet. Check for trigger, then wait for + * walreceiver to wake us up when new WAL arrives. + */ + if (CheckForStandbyTrigger()) + { + /* + * Note that we don't "return false" immediately here. + * After being triggered, we still want to replay all the + * WAL that was already streamed. It's in pg_xlog now, so + * we just treat this as a failure, and the state machine + * will move on to replay the streamed WAL from pg_xlog, + * and then recheck the trigger and exit replay. + */ + lastSourceFailed = true; + break; + } - /* - * Check to see if the trigger file exists. Note that we do this - * only after failure, so when you create the trigger file, we - * still finish replaying as much as we can from archive and - * pg_xlog before failover. - */ - if (CheckForStandbyTrigger()) - return false; + /* + * Wait for more WAL to arrive. Time out after 5 seconds, like + * when polling the archive, to react to a trigger file + * promptly. + */ + WaitLatch(&XLogCtl->recoveryWakeupLatch, + WL_LATCH_SET | WL_TIMEOUT, + 5000L); + ResetLatch(&XLogCtl->recoveryWakeupLatch); + break; + } + + default: + elog(ERROR, "unexpected WAL source %d", currentSource); } /* @@ -9554,7 +9671,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, HandleStartupProcInterrupts(); } - return true; + return false; /* not reached */ } /*