diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 62f72b40ae0e4bf906a7052af756075a27f424f5..e8342858c9d11bce470107591e2cb48267021cac 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -912,10 +912,9 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' </para> <para> - Promoting a cascading standby terminates the immediate downstream replication - connections which it serves. This is because the timeline becomes different - between standbys, and they can no longer continue replication. The - affected standby(s) may reconnect to reestablish streaming replication. + If an upstream standby server is promoted to become new master, downstream + servers will continue to stream from the new master if + <varname>recovery_target_timeline</> is set to <literal>'latest'</>. </para> <para> diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index f87020c9099b92e697099a2db2d26aeab38fa92c..e14627c201ebc5ac63271afd3d79ddf900a60451 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1018,14 +1018,21 @@ </para> <para> - There is another Copy-related mode called Copy-both, which allows + There is another Copy-related mode called copy-both, which allows high-speed bulk data transfer to <emphasis>and</> from the server. Copy-both mode is initiated when a backend in walsender mode executes a <command>START_REPLICATION</command> statement. The backend sends a CopyBothResponse message to the frontend. Both the backend and the frontend may then send CopyData messages - until the connection is terminated. See <xref - linkend="protocol-replication">. + until either end sends a CopyDone message. After the client + sends a CopyDone message, the connection goes from copy-both mode to + copy-out mode, and the client may not send any more CopyData messages. + Similarly, when the server sends a CopyDone message, the connection + goes into copy-in mode, and the server may not send any more CopyData + messages. After both sides have sent a CopyDone message, the copy mode + is terminated, and the backend reverts to the command-processing mode. + See <xref linkend="protocol-replication"> for more information on the + subprotocol transmitted over copy-both mode. </para> <para> @@ -1350,19 +1357,69 @@ The commands accepted in walsender mode are: </varlistentry> <varlistentry> - <term>START_REPLICATION <replaceable>XXX</>/<replaceable>XXX</></term> + <term>TIMELINE_HISTORY <replaceable class="parameter">tli</replaceable></term> + <listitem> + <para> + Requests the server to send over the timeline history file for timeline + <replaceable class="parameter">tli</replaceable>. Server replies with a + result set of a single row, containing two fields: + </para> + + <para> + <variablelist> + <varlistentry> + <term> + filename + </term> + <listitem> + <para> + Filename of the timeline history file, e.g 00000002.history. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + content + </term> + <listitem> + <para> + Contents of the timeline history file. + </para> + </listitem> + </varlistentry> + + </variablelist> + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>START_REPLICATION <replaceable class="parameter">XXX/XXX</> TIMELINE <replaceable class="parameter">tli</></term> <listitem> <para> Instructs server to start streaming WAL, starting at - WAL position <replaceable>XXX</>/<replaceable>XXX</>. + WAL position <replaceable class="parameter">XXX/XXX</> on timeline + <replaceable class="parameter">tli</>. The server can reply with an error, e.g. if the requested section of WAL has already been recycled. On success, server responds with a CopyBothResponse message, and then starts to stream WAL to the frontend. - WAL will continue to be streamed until the connection is broken; - no further commands will be accepted. If the WAL sender process is - terminated normally (during postmaster shutdown), it will send a - CommandComplete message before exiting. This might not happen during an - abnormal shutdown, of course. + </para> + + <para> + If the client requests a timeline that's not the latest, but is part of + the history of the server, the server will stream all the WAL on that + timeline starting from the requested startpoint, up to the point where + the server switched to another timeline. If the client requests + streaming at exactly the end of an old timeline, the server responds + immediately with CommandComplete without entering COPY mode. + </para> + + <para> + After streaming all the WAL on a timeline that is not the latest one, + the server will end streaming by exiting the COPY mode. When the client + acknowledges this by also exiting COPY mode, the server responds with a + CommandComplete message, and is ready to accept a new command. </para> <para> diff --git a/src/backend/access/transam/timeline.c b/src/backend/access/transam/timeline.c index 0681944ae5e1905daba9adaee486fce6367517c0..b33d230c7011857b03129cefddd4e2443451cbde 100644 --- a/src/backend/access/transam/timeline.c +++ b/src/backend/access/transam/timeline.c @@ -410,6 +410,89 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, XLogArchiveNotify(histfname); } +/* + * Writes a history file for given timeline and contents. + * + * Currently this is only used in the walreceiver process, and so there are + * no locking considerations. But we should be just as tense as XLogFileInit + * to avoid emplacing a bogus file. + */ +void +writeTimeLineHistoryFile(TimeLineID tli, char *content, int size) +{ + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + int fd; + + /* + * Write into a temp file name. + */ + snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid()); + + unlink(tmppath); + + /* do not use get_sync_bit() here --- want to fsync only at end of fill */ + fd = OpenTransientFile(tmppath, O_RDWR | O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", tmppath))); + + errno = 0; + if ((int) write(fd, content, size) != size) + { + int save_errno = errno; + + /* + * If we fail to make the file, delete it to release disk space + */ + unlink(tmppath); + /* if write didn't set errno, assume problem is no disk space */ + errno = save_errno ? save_errno : ENOSPC; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", tmppath))); + } + + if (pg_fsync(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", tmppath))); + + if (CloseTransientFile(fd)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", tmppath))); + + + /* + * Now move the completed history file into place with its final name. + */ + TLHistoryFilePath(path, tli); + + /* + * Prefer link() to rename() here just to be really sure that we don't + * overwrite an existing logfile. However, there shouldn't be one, so + * rename() is an acceptable substitute except for the truly paranoid. + */ +#if HAVE_WORKING_LINK + if (link(tmppath, path) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not link file \"%s\" to \"%s\": %m", + tmppath, path))); + unlink(tmppath); +#else + if (rename(tmppath, path) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\" to \"%s\": %m", + tmppath, path))); +#endif +} + /* * Returns true if 'expectedTLEs' contains a timeline with id 'tli' */ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 2d4b62aa84055ec0df2c3c38fd10ef182ba87d4e..2deb7e5d89bf1177b2d32c16ff6550aae6928778 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -153,6 +153,7 @@ static XLogRecPtr LastRec; /* Local copy of WalRcv->receivedUpto */ static XLogRecPtr receivedUpto = 0; +static TimeLineID receiveTLI = 0; /* * During recovery, lastFullPageWrites keeps track of full_page_writes that @@ -6366,6 +6367,12 @@ StartupXLOG(void) xlogctl->SharedRecoveryInProgress = false; SpinLockRelease(&xlogctl->info_lck); } + + /* + * If there were cascading standby servers connected to us, nudge any + * wal sender processes to notice that we've been promoted. + */ + WalSndWakeup(); } /* @@ -7626,7 +7633,7 @@ CreateRestartPoint(int flags) XLogRecPtr endptr; /* Get the current (or recent) end of xlog */ - endptr = GetStandbyFlushRecPtr(NULL); + endptr = GetStandbyFlushRecPtr(); KeepLogSeg(endptr, &_logSegNo); _logSegNo--; @@ -9087,13 +9094,10 @@ do_pg_abort_backup(void) /* * Get latest redo apply position. * - * Optionally, returns the current recovery target timeline. Callers not - * interested in that may pass NULL for targetTLI. - * * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr -GetXLogReplayRecPtr(TimeLineID *targetTLI) +GetXLogReplayRecPtr(void) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; @@ -9101,8 +9105,6 @@ GetXLogReplayRecPtr(TimeLineID *targetTLI) SpinLockAcquire(&xlogctl->info_lck); recptr = xlogctl->lastReplayedEndRecPtr; - if (targetTLI) - *targetTLI = xlogctl->RecoveryTargetTLI; SpinLockRelease(&xlogctl->info_lck); return recptr; @@ -9111,18 +9113,15 @@ GetXLogReplayRecPtr(TimeLineID *targetTLI) /* * Get current standby flush position, ie, the last WAL position * known to be fsync'd to disk in standby. - * - * If 'targetTLI' is not NULL, it's set to the current recovery target - * timeline. */ XLogRecPtr -GetStandbyFlushRecPtr(TimeLineID *targetTLI) +GetStandbyFlushRecPtr(void) { XLogRecPtr receivePtr; XLogRecPtr replayPtr; - receivePtr = GetWalRcvWriteRecPtr(NULL); - replayPtr = GetXLogReplayRecPtr(targetTLI); + receivePtr = GetWalRcvWriteRecPtr(NULL, NULL); + replayPtr = GetXLogReplayRecPtr(); if (XLByteLT(receivePtr, replayPtr)) return replayPtr; @@ -9611,7 +9610,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * archive and pg_xlog before failover. */ if (CheckForStandbyTrigger()) + { + ShutdownWalRcv(); return false; + } /* * If primary_conninfo is set, launch walreceiver to try to @@ -9626,8 +9628,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, if (PrimaryConnInfo) { XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr; - - RequestXLogStreaming(ptr, PrimaryConnInfo); + TimeLineID tli = tliOfPointInHistory(ptr, expectedTLEs); + + if (curFileTLI > 0 && tli < curFileTLI) + elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u", + (uint32) (ptr >> 32), (uint32) ptr, + tli, curFileTLI); + curFileTLI = tli; + RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo); } /* * Move to XLOG_FROM_STREAM state in either case. We'll get @@ -9653,10 +9661,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ /* * Before we leave XLOG_FROM_STREAM state, make sure that - * walreceiver is not running, so that it won't overwrite - * any WAL that we restore from archive. + * walreceiver is not active, so that it won't overwrite + * WAL that we restore from archive. */ - if (WalRcvInProgress()) + if (WalRcvStreaming()) ShutdownWalRcv(); /* @@ -9749,7 +9757,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, /* * Check if WAL receiver is still active. */ - if (!WalRcvInProgress()) + if (!WalRcvStreaming()) { lastSourceFailed = true; break; @@ -9772,8 +9780,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, { XLogRecPtr latestChunkStart; - receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); - if (XLByteLT(RecPtr, receivedUpto)) + receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI); + if (XLByteLT(RecPtr, receivedUpto) && receiveTLI == curFileTLI) { havedata = true; if (!XLByteLT(RecPtr, latestChunkStart)) @@ -9888,8 +9896,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr) /* * Check to see whether the user-specified trigger file exists and whether a - * promote request has arrived. If either condition holds, request postmaster - * to shut down walreceiver, wait for it to exit, and return true. + * promote request has arrived. If either condition holds, return true. */ static bool CheckForStandbyTrigger(void) @@ -9904,7 +9911,6 @@ CheckForStandbyTrigger(void) { ereport(LOG, (errmsg("received promote request"))); - ShutdownWalRcv(); ResetPromoteTriggered(); triggered = true; return true; @@ -9917,7 +9923,6 @@ CheckForStandbyTrigger(void) { ereport(LOG, (errmsg("trigger file found: %s", TriggerFile))); - ShutdownWalRcv(); unlink(TriggerFile); triggered = true; return true; diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index 40c0bd67b57a1b11fa7d47e5224a41f3a75abef9..e91bdc3f4af93b6b56367a0d2010651f49d95c4c 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -226,7 +226,7 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS) XLogRecPtr recptr; char location[MAXFNAMELEN]; - recptr = GetWalRcvWriteRecPtr(NULL); + recptr = GetWalRcvWriteRecPtr(NULL, NULL); if (recptr == 0) PG_RETURN_NULL(); @@ -248,7 +248,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS) XLogRecPtr recptr; char location[MAXFNAMELEN]; - recptr = GetXLogReplayRecPtr(NULL); + recptr = GetXLogReplayRecPtr(); if (recptr == 0) PG_RETURN_NULL(); diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index a492c60b46a925a3c2cd980689623541fe3c1e8c..8f39aec1808e7c8b59feea4a503c3f54fa0deec3 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -2563,27 +2563,6 @@ reaper(SIGNAL_ARGS) ReachedNormalRunning = true; pmState = PM_RUN; - /* - * Kill any walsenders to force the downstream standby(s) to - * reread the timeline history file, adjust their timelines and - * establish replication connections again. This is required - * because the timeline of cascading standby is not consistent - * with that of cascaded one just after failover. We LOG this - * message since we need to leave a record to explain this - * disconnection. - * - * XXX should avoid the need for disconnection. When we do, - * am_cascading_walsender should be replaced with - * RecoveryInProgress() - */ - if (max_wal_senders > 0 && CountChildren(BACKEND_TYPE_WALSND) > 0) - { - ereport(LOG, - (errmsg("terminating all walsender processes to force cascaded " - "standby(s) to update timeline and reconnect"))); - SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND); - } - /* * Crank up the background tasks, if we didn't do that already * when we entered consistent recovery state. It doesn't matter diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c index ab4d1645f246aa9c56e14bf0374e69dfb4a950c3..70b75c7292730c3f559d87c6042a680cd601b518 100644 --- a/src/backend/postmaster/startup.c +++ b/src/backend/postmaster/startup.c @@ -5,6 +5,8 @@ * The Startup process initialises the server and performs any recovery * actions that have been specified. Notice that there is no "main loop" * since the Startup process ends as soon as initialisation is complete. + * (in standby mode, one can think of the replay loop as a main loop, + * though.) * * * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 04681f4196299ce74739cc4cccf5ca305cffae97..65200c129aafb62ca3926136fe9befba65f64fd7 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -56,6 +56,9 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void parse_basebackup_options(List *options, basebackup_options *opt); static void SendXlogRecPtrResult(XLogRecPtr ptr); +/* Was the backup currently in-progress initiated in recovery mode? */ +static bool backup_started_in_recovery = false; + /* * Size of each block sent into the tar stream for larger files. * @@ -94,6 +97,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) XLogRecPtr endptr; char *labelfile; + backup_started_in_recovery = RecoveryInProgress(); + startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile); SendXlogRecPtrResult(startptr); @@ -261,7 +266,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) * http://lists.apple.com/archives/xcode-users/2003/Dec//msg000 * 51.html */ - XLogRead(buf, ptr, TAR_SEND_SIZE); + XLogRead(buf, ThisTimeLineID, ptr, TAR_SEND_SIZE); if (pq_putmessage('d', buf, TAR_SEND_SIZE)) ereport(ERROR, (errmsg("base backup could not send data, aborting backup"))); @@ -592,11 +597,19 @@ sendDir(char *path, int basepathlen, bool sizeonly) /* * Check if the postmaster has signaled us to exit, and abort with an * error in that case. The error handler further up will call - * do_pg_abort_backup() for us. + * do_pg_abort_backup() for us. Also check that if the backup was + * started while still in recovery, the server wasn't promoted. + * dp_pg_stop_backup() will check that too, but it's better to stop + * the backup early than continue to the end and fail there. */ - if (ProcDiePending || walsender_ready_to_stop) + CHECK_FOR_INTERRUPTS(); + if (RecoveryInProgress() != backup_started_in_recovery) ereport(ERROR, - (errmsg("shutdown requested, aborting active base backup"))); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("the standby was promoted during online backup"), + errhint("This means that the backup being taken is corrupt " + "and should not be used. " + "Try taking another online backup."))); snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index bfaebeae84286908ff2dd0df240943ec70e084d7..180d96b6a694222df1c2b86af7bdf4f89e90e3ca 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -46,9 +46,12 @@ static PGconn *streamConn = NULL; static char *recvBuf = NULL; /* Prototypes for interface functions */ -static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); -static bool libpqrcv_receive(int timeout, unsigned char *type, - char **buffer, int *len); +static void libpqrcv_connect(char *conninfo); +static void libpqrcv_identify_system(TimeLineID *primary_tli); +static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len); +static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint); +static void libpqrcv_endstreaming(void); +static int libpqrcv_receive(int timeout, char **buffer); static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); @@ -63,10 +66,17 @@ void _PG_init(void) { /* Tell walreceiver how to reach us */ - if (walrcv_connect != NULL || walrcv_receive != NULL || - walrcv_send != NULL || walrcv_disconnect != NULL) + if (walrcv_connect != NULL || walrcv_identify_system != NULL || + walrcv_readtimelinehistoryfile != NULL || + walrcv_startstreaming != NULL || walrcv_endstreaming != NULL || + walrcv_receive != NULL || walrcv_send != NULL || + walrcv_disconnect != NULL) elog(ERROR, "libpqwalreceiver already loaded"); walrcv_connect = libpqrcv_connect; + walrcv_identify_system = libpqrcv_identify_system; + walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile; + walrcv_startstreaming = libpqrcv_startstreaming; + walrcv_endstreaming = libpqrcv_endstreaming; walrcv_receive = libpqrcv_receive; walrcv_send = libpqrcv_send; walrcv_disconnect = libpqrcv_disconnect; @@ -75,16 +85,10 @@ _PG_init(void) /* * Establish the connection to the primary server for XLOG streaming */ -static bool -libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) +static void +libpqrcv_connect(char *conninfo) { char conninfo_repl[MAXCONNINFO + 75]; - char *primary_sysid; - char standby_sysid[32]; - TimeLineID primary_tli; - TimeLineID standby_tli; - PGresult *res; - char cmd[64]; /* * Connect using deliberately undocumented parameter: replication. The @@ -100,6 +104,18 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) ereport(ERROR, (errmsg("could not connect to the primary server: %s", PQerrorMessage(streamConn)))); +} + +/* + * Check that primary's system identifier matches ours, and fetch the current + * timeline ID of the primary. + */ +static void +libpqrcv_identify_system(TimeLineID *primary_tli) +{ + PGresult *res; + char *primary_sysid; + char standby_sysid[32]; /* * Get the system identifier and timeline ID as a DataRow message from the @@ -126,7 +142,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) ntuples, nfields))); } primary_sysid = PQgetvalue(res, 0, 0); - primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); + *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); /* * Confirm that the system identifier of the primary is the same as ours. @@ -141,24 +157,37 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) errdetail("The primary's identifier is %s, the standby's identifier is %s.", primary_sysid, standby_sysid))); } - - /* - * Confirm that the current timeline of the primary is the same as the - * recovery target timeline. - */ - standby_tli = GetRecoveryTargetTLI(); PQclear(res); - if (primary_tli != standby_tli) - ereport(ERROR, - (errmsg("timeline %u of the primary does not match recovery target timeline %u", - primary_tli, standby_tli))); - ThisTimeLineID = primary_tli; +} + +/* + * Start streaming WAL data from given startpoint and timeline. + * + * Returns true if we switched successfully to copy-both mode. False + * means the server received the command and executed it successfully, but + * didn't switch to copy-mode. That means that there was no WAL on the + * requested timeline and starting point, because the server switched to + * another timeline at or before the requested starting point. On failure, + * throws an ERROR. + */ +static bool +libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint) +{ + char cmd[64]; + PGresult *res; /* Start streaming from the point requested by startup process */ - snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", - (uint32) (startpoint >> 32), (uint32) startpoint); + snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u", + (uint32) (startpoint >> 32), (uint32) startpoint, + tli); res = libpqrcv_PQexec(cmd); - if (PQresultStatus(res) != PGRES_COPY_BOTH) + + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + return false; + } + else if (PQresultStatus(res) != PGRES_COPY_BOTH) { PQclear(res); ereport(ERROR, @@ -166,11 +195,81 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) PQerrorMessage(streamConn)))); } PQclear(res); + return true; +} + +/* + * Stop streaming WAL data. + */ +static void +libpqrcv_endstreaming(void) +{ + PGresult *res; + + if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn)) + ereport(ERROR, + (errmsg("could not send end-of-streaming message to primary: %s", + PQerrorMessage(streamConn)))); - ereport(LOG, - (errmsg("streaming replication successfully connected to primary"))); + /* Read the command result after COPY is finished */ - return true; + while ((res = PQgetResult(streamConn)) != NULL) + { + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, + (errmsg("error reading result of streaming command: %s", + PQerrorMessage(streamConn)))); + /* + * If we had not yet received CopyDone from the backend, PGRES_COPY_IN + * is also possible. However, at the moment this function is only + * called after receiving CopyDone from the backend - the walreceiver + * never terminates replication on its own initiative. + */ + + PQclear(res); + } +} + +/* + * Fetch the timeline history file for 'tli' from primary. + */ +static void +libpqrcv_readtimelinehistoryfile(TimeLineID tli, + char **filename, char **content, int *len) +{ + PGresult *res; + char cmd[64]; + + /* + * Request the primary to send over the history file for given timeline. + */ + snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); + res = libpqrcv_PQexec(cmd); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive timeline history file from " + "the primary server: %s", + PQerrorMessage(streamConn)))); + } + if (PQnfields(res) != 2 || PQntuples(res) != 1) + { + int ntuples = PQntuples(res); + int nfields = PQnfields(res); + + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.", + ntuples, nfields))); + } + *filename = pstrdup(PQgetvalue(res, 0, 0)); + + *len = PQgetlength(res, 0, 1); + *content = palloc(*len); + memcpy(*content, PQgetvalue(res, 0, 1), *len); + PQclear(res); } /* @@ -327,20 +426,19 @@ libpqrcv_disconnect(void) * * Returns: * - * True if data was received. *type, *buffer and *len are set to - * the type of the received data, buffer holding it, and length, - * respectively. + * If data was received, returns the length of the data. *buffer is set to + * point to a buffer holding the received message. The buffer is only valid + * until the next libpqrcv_* call. * - * False if no data was available within timeout, or wait was interrupted + * 0 if no data was available within timeout, or wait was interrupted * by signal. * - * The buffer returned is only valid until the next call of this function or - * libpq_connect/disconnect. + * -1 if the server ended the COPY. * * ereports on error. */ -static bool -libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) +static int +libpqrcv_receive(int timeout, char **buffer) { int rawlen; @@ -359,7 +457,7 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) if (timeout > 0) { if (!libpq_select(timeout)) - return false; + return 0; } if (PQconsumeInput(streamConn) == 0) @@ -370,23 +468,26 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) /* Now that we've consumed some input, try again */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); if (rawlen == 0) - return false; + return 0; } if (rawlen == -1) /* end-of-streaming or error */ { PGresult *res; res = PQgetResult(streamConn); - if (PQresultStatus(res) == PGRES_COMMAND_OK) + if (PQresultStatus(res) == PGRES_COMMAND_OK || + PQresultStatus(res) == PGRES_COPY_IN) + { + PQclear(res); + return -1; + } + else { PQclear(res); ereport(ERROR, - (errmsg("replication terminated by primary server"))); + (errmsg("could not receive data from WAL stream: %s", + PQerrorMessage(streamConn)))); } - PQclear(res); - ereport(ERROR, - (errmsg("could not receive data from WAL stream: %s", - PQerrorMessage(streamConn)))); } if (rawlen < -1) ereport(ERROR, @@ -394,11 +495,8 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) PQerrorMessage(streamConn)))); /* Return received messages to caller */ - *type = *((unsigned char *) recvBuf); - *buffer = recvBuf + sizeof(*type); - *len = rawlen - sizeof(*type); - - return true; + *buffer = recvBuf; + return rawlen; } /* diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index b6cfdac1b66e09875b3368d1f2f9dec15bb3173e..ff4cff16e4b50495659daf53d844963590dbfff7 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -56,6 +56,7 @@ Node *replication_parse_result; %union { char *str; bool boolval; + int32 intval; XLogRecPtr recptr; Node *node; @@ -65,22 +66,26 @@ Node *replication_parse_result; /* Non-keyword tokens */ %token <str> SCONST +%token <intval> ICONST %token <recptr> RECPTR /* Keyword tokens. */ %token K_BASE_BACKUP %token K_IDENTIFY_SYSTEM +%token K_START_REPLICATION +%token K_TIMELINE_HISTORY %token K_LABEL %token K_PROGRESS %token K_FAST %token K_NOWAIT %token K_WAL -%token K_START_REPLICATION +%token K_TIMELINE %type <node> command -%type <node> base_backup start_replication identify_system +%type <node> base_backup start_replication identify_system timeline_history %type <list> base_backup_opt_list %type <defelt> base_backup_opt +%type <intval> opt_timeline %% firstcmd: command opt_semicolon @@ -97,6 +102,7 @@ command: identify_system | base_backup | start_replication + | timeline_history ; /* @@ -153,15 +159,48 @@ base_backup_opt: ; /* - * START_REPLICATION %X/%X + * START_REPLICATION %X/%X [TIMELINE %d] */ start_replication: - K_START_REPLICATION RECPTR + K_START_REPLICATION RECPTR opt_timeline { StartReplicationCmd *cmd; cmd = makeNode(StartReplicationCmd); cmd->startpoint = $2; + cmd->timeline = $3; + + $$ = (Node *) cmd; + } + ; + +opt_timeline: + K_TIMELINE ICONST + { + if ($2 <= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + (errmsg("invalid timeline %d", $2)))); + $$ = $2; + } + | /* nothing */ { $$ = 0; } + ; + +/* + * TIMELINE_HISTORY %d + */ +timeline_history: + K_TIMELINE_HISTORY ICONST + { + TimeLineHistoryCmd *cmd; + + if ($2 <= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + (errmsg("invalid timeline %d", $2)))); + + cmd = makeNode(TimeLineHistoryCmd); + cmd->timeline = $2; $$ = (Node *) cmd; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 51f381da000803a71a5a95cfc109a3d17e70356c..122da29016be14acebef68f31dbeacd6eab3c1b4 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -15,6 +15,8 @@ */ #include "postgres.h" +#include "utils/builtins.h" + /* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */ #undef fprintf #define fprintf(file, fmt, msg) ereport(ERROR, (errmsg_internal("%s", msg))) @@ -49,6 +51,7 @@ xqstart {quote} xqdouble {quote}{quote} xqinside [^']+ +digit [0-9]+ hexdigit [0-9A-Za-z]+ quote ' @@ -63,7 +66,9 @@ LABEL { return K_LABEL; } NOWAIT { return K_NOWAIT; } PROGRESS { return K_PROGRESS; } WAL { return K_WAL; } +TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } +TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } "," { return ','; } ";" { return ';'; } @@ -71,6 +76,11 @@ START_REPLICATION { return K_START_REPLICATION; } [\t] ; " " ; +{digit}+ { + yylval.intval = pg_atoi(yytext, sizeof(int32), 0); + return ICONST; + } + {hexdigit}+\/{hexdigit}+ { uint32 hi, lo; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 62135037f104837b154cf7293d51418c08e1f660..303edb75a32061c0d268c1e5fe6334be4f1f9049 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -15,6 +15,14 @@ * WalRcv->receivedUpto variable in shared memory, to inform the startup * process of how far it can proceed with XLOG replay. * + * If the primary server ends streaming, but doesn't disconnect, walreceiver + * goes into "waiting" mode, and waits for the startup process to give new + * instructions. The startup process will treat that the same as + * disconnection, and will rescan the archive/pg_xlog directory. But when the + * startup process wants to try streaming replication again, it will just + * nudge the existing walreceiver process that's waiting, instead of launching + * a new one. + * * Normal termination is by SIGTERM, which instructs the walreceiver to * exit(0). Emergency termination is by SIGQUIT; like any postmaster child * process, the walreceiver will simply abort and exit on SIGQUIT. A close @@ -38,6 +46,7 @@ #include <signal.h> #include <unistd.h> +#include "access/timeline.h" #include "access/xlog_internal.h" #include "libpq/pqformat.h" #include "libpq/pqsignal.h" @@ -60,6 +69,10 @@ bool hot_standby_feedback; /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; +walrcv_identify_system_type walrcv_identify_system = NULL; +walrcv_startstreaming_type walrcv_startstreaming = NULL; +walrcv_endstreaming_type walrcv_endstreaming = NULL; +walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL; walrcv_receive_type walrcv_receive = NULL; walrcv_send_type walrcv_send = NULL; walrcv_disconnect_type walrcv_disconnect = NULL; @@ -118,6 +131,8 @@ static volatile bool WalRcvImmediateInterruptOK = false; static void ProcessWalRcvInterrupts(void); static void EnableWalRcvImmediateExit(void); static void DisableWalRcvImmediateExit(void); +static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); +static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); @@ -128,6 +143,7 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); +static void WalRcvSigUsr1Handler(SIGNAL_ARGS); static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); @@ -171,6 +187,10 @@ WalReceiverMain(void) { char conninfo[MAXCONNINFO]; XLogRecPtr startpoint; + TimeLineID startpointTLI; + TimeLineID primaryTLI; + bool first_stream; + /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; TimestampTz last_recv_timestamp; @@ -207,17 +227,21 @@ WalReceiverMain(void) /* The usual case */ break; - case WALRCV_RUNNING: + case WALRCV_WAITING: + case WALRCV_STREAMING: + case WALRCV_RESTARTING: + default: /* Shouldn't happen */ elog(PANIC, "walreceiver still running according to shared memory state"); } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; - walrcv->walRcvState = WALRCV_RUNNING; + walrcv->walRcvState = WALRCV_STREAMING; /* Fetch information required to start streaming */ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); startpoint = walrcv->receiveStart; + startpointTLI = walrcv->receiveStartTLI; /* Initialise to a sanish value */ walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp(); @@ -227,6 +251,8 @@ WalReceiverMain(void) /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); + OwnLatch(&walrcv->latch); + /* * If possible, make this process a group leader, so that the postmaster * can signal any child processes too. (walreceiver probably never has @@ -246,7 +272,7 @@ WalReceiverMain(void) pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */ pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGUSR1, WalRcvSigUsr1Handler); pqsignal(SIGUSR2, SIG_IGN); /* Reset some signals that are accepted by postmaster but not here */ @@ -261,8 +287,12 @@ WalReceiverMain(void) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - if (walrcv_connect == NULL || walrcv_receive == NULL || - walrcv_send == NULL || walrcv_disconnect == NULL) + if (walrcv_connect == NULL || walrcv_startstreaming == NULL || + walrcv_endstreaming == NULL || + walrcv_identify_system == NULL || + walrcv_readtimelinehistoryfile == NULL || + walrcv_receive == NULL || walrcv_send == NULL || + walrcv_disconnect == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* @@ -276,122 +306,360 @@ WalReceiverMain(void) /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); - walrcv_connect(conninfo, startpoint); + walrcv_connect(conninfo); DisableWalRcvImmediateExit(); - /* Initialize LogstreamResult and buffers for processing messages */ - LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); - initStringInfo(&reply_message); - initStringInfo(&incoming_message); - - /* Initialize the last recv timestamp */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; - - /* Loop until end-of-streaming or error */ + first_stream = true; for (;;) { - unsigned char type; - char *buf; - int len; - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. + * Check that we're connected to a valid server using the + * IDENTIFY_SYSTEM replication command, */ - if (!PostmasterIsAlive()) - exit(1); + EnableWalRcvImmediateExit(); + walrcv_identify_system(&primaryTLI); + DisableWalRcvImmediateExit(); /* - * Exit walreceiver if we're not in recovery. This should not happen, - * but cross-check the status here. + * Confirm that the current timeline of the primary is the same or + * ahead of ours. */ - if (!RecoveryInProgress()) - ereport(FATAL, - (errmsg("cannot continue WAL streaming, recovery has already ended"))); - - /* Process any requests or signals received recently */ - ProcessWalRcvInterrupts(); + if (primaryTLI < startpointTLI) + ereport(ERROR, + (errmsg("highest timeline %u of the primary is behind recovery timeline %u", + primaryTLI, startpointTLI))); - if (got_SIGHUP) - { - got_SIGHUP = false; - ProcessConfigFile(PGC_SIGHUP); - } + /* + * Get any missing history files. We do this always, even when we're + * not interested in that timeline, so that if we're promoted to become + * the master later on, we don't select the same timeline that was + * already used in the current master. This isn't bullet-proof - you'll + * need some external software to manage your cluster if you need to + * ensure that a unique timeline id is chosen in every case, but let's + * avoid the confusion of timeline id collisions where we can. + */ + WalRcvFetchTimeLineHistoryFiles(startpointTLI + 1, primaryTLI); - /* Wait a while for data to arrive */ - if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) + /* + * Start streaming. + * + * We'll try to start at the requested starting point and timeline, + * even if it's different from the server's latest timeline. In case + * we've already reached the end of the old timeline, the server will + * finish the streaming immediately, and we will go back to await + * orders from the startup process. If recovery_target_timeline is + * 'latest', the startup process will scan pg_xlog and find the new + * history file, bump recovery target timeline, and ask us to restart + * on the new timeline. + */ + ThisTimeLineID = startpointTLI; + if (walrcv_startstreaming(startpointTLI, startpoint)) { - /* Something was received from master, so reset timeout */ + bool endofwal = false; + + if (first_stream) + ereport(LOG, + (errmsg("started streaming WAL from primary at %X/%X on timeline %u", + (uint32) (startpoint >> 32), (uint32) startpoint, + startpointTLI))); + else + ereport(LOG, + (errmsg("restarted WAL streaming at %X/%X on timeline %u", + (uint32) (startpoint >> 32), (uint32) startpoint, + startpointTLI))); + first_stream = false; + + /* Initialize LogstreamResult and buffers for processing messages */ + LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(); + initStringInfo(&reply_message); + initStringInfo(&incoming_message); + + /* Initialize the last recv timestamp */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; - /* Accept the received data, and process it */ - XLogWalRcvProcessMsg(type, buf, len); - - /* Receive any more data we can without sleeping */ - while (walrcv_receive(0, &type, &buf, &len)) + /* Loop until end-of-streaming or error */ + while (!endofwal) { - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; - XLogWalRcvProcessMsg(type, buf, len); - } + char *buf; + int len; - /* Let the master know that we received some data. */ - XLogWalRcvSendReply(false, false); + /* + * Emergency bailout if postmaster has died. This is to avoid + * the necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive()) + exit(1); + + /* + * Exit walreceiver if we're not in recovery. This should not + * happen, but cross-check the status here. + */ + if (!RecoveryInProgress()) + ereport(FATAL, + (errmsg("cannot continue WAL streaming, recovery has already ended"))); + + /* Process any requests or signals received recently */ + ProcessWalRcvInterrupts(); + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* Wait a while for data to arrive */ + len = walrcv_receive(NAPTIME_PER_CYCLE, &buf); + if (len != 0) + { + /* + * Process the received data, and any subsequent data we + * can read without blocking. + */ + for (;;) + { + if (len > 0) + { + /* Something was received from master, so reset timeout */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; + XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); + } + else if (len == 0) + break; + else if (len < 0) + { + ereport(LOG, + (errmsg("replication terminated by primary server"), + errdetail("End of WAL reached on timeline %u", startpointTLI))); + endofwal = true; + break; + } + len = walrcv_receive(0, &buf); + } + + /* Let the master know that we received some data. */ + XLogWalRcvSendReply(false, false); + + /* + * If we've written some records, flush them to disk and + * let the startup process and primary server know about + * them. + */ + XLogWalRcvFlush(false); + } + else + { + /* + * We didn't receive anything new. If we haven't heard + * anything from the server for more than + * wal_receiver_timeout / 2, ping the server. Also, if it's + * been longer than wal_receiver_status_interval since the + * last update we sent, send a status update to the master + * anyway, to report any progress in applying WAL. + */ + bool requestReply = false; + + /* + * Check if time since last receive from standby has + * reached the configured limit. + */ + if (wal_receiver_timeout > 0) + { + TimestampTz now = GetCurrentTimestamp(); + TimestampTz timeout; + + timeout = + TimestampTzPlusMilliseconds(last_recv_timestamp, + wal_receiver_timeout); + + if (now >= timeout) + ereport(ERROR, + (errmsg("terminating walreceiver due to timeout"))); + + /* + * We didn't receive anything new, for half of receiver + * replication timeout. Ping the server. + */ + if (!ping_sent) + { + timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, + (wal_receiver_timeout/2)); + if (now >= timeout) + { + requestReply = true; + ping_sent = true; + } + } + } + + XLogWalRcvSendReply(requestReply, requestReply); + XLogWalRcvSendHSFeedback(); + } + } /* - * If we've written some records, flush them to disk and let the - * startup process and primary server know about them. + * The backend finished streaming. Exit streaming COPY-mode from + * our side, too. */ + EnableWalRcvImmediateExit(); + walrcv_endstreaming(); + DisableWalRcvImmediateExit(); + } + else + ereport(LOG, + (errmsg("primary server contains no more WAL on requested timeline %u", + startpointTLI))); + + /* + * End of WAL reached on the requested timeline. Close the last + * segment, and await for new orders from the startup process. + */ + if (recvFile >= 0) + { XLogWalRcvFlush(false); + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log segment %s: %m", + XLogFileNameP(recvFileTLI, recvSegNo)))); } + recvFile = -1; + + elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); + WalRcvWaitForStartPosition(&startpoint, &startpointTLI); + } + /* not reached */ +} + +/* + * Wait for startup process to set receiveStart and receiveStartTLI. + */ +static void +WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + int state; + + SpinLockAcquire(&walrcv->mutex); + state = walrcv->walRcvState; + if (state != WALRCV_STREAMING) + { + SpinLockRelease(&walrcv->mutex); + if (state == WALRCV_STOPPING) + proc_exit(0); else + elog(FATAL, "unexpected walreceiver state"); + } + walrcv->walRcvState = WALRCV_WAITING; + walrcv->receiveStart = InvalidXLogRecPtr; + walrcv->receiveStartTLI = 0; + SpinLockRelease(&walrcv->mutex); + + if (update_process_title) + set_ps_display("idle", false); + + /* + * nudge startup process to notice that we've stopped streaming and are + * now waiting for instructions. + */ + WakeupRecovery(); + for (;;) + { + ResetLatch(&walrcv->latch); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive()) + exit(1); + + ProcessWalRcvInterrupts(); + + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->walRcvState == WALRCV_RESTARTING || + walrcv->walRcvState == WALRCV_WAITING || + walrcv->walRcvState == WALRCV_STOPPING); + if (walrcv->walRcvState == WALRCV_RESTARTING) + { + /* we don't expect primary_conninfo to change */ + *startpoint = walrcv->receiveStart; + *startpointTLI = walrcv->receiveStartTLI; + walrcv->walRcvState = WALRCV_STREAMING; + SpinLockRelease(&walrcv->mutex); + break; + } + if (walrcv->walRcvState == WALRCV_STOPPING) { /* - * We didn't receive anything new. If we haven't heard anything - * from the server for more than wal_receiver_timeout / 2, - * ping the server. Also, if it's been longer than - * wal_receiver_status_interval since the last update we sent, - * send a status update to the master anyway, to report any - * progress in applying WAL. + * We should've received SIGTERM if the startup process wants + * us to die, but might as well check it here too. */ - bool requestReply = false; + SpinLockRelease(&walrcv->mutex); + exit(1); + } + SpinLockRelease(&walrcv->mutex); - /* - * Check if time since last receive from standby has reached the - * configured limit. - */ - if (wal_receiver_timeout > 0) - { - TimestampTz now = GetCurrentTimestamp(); - TimestampTz timeout; + WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0); + } + + if (update_process_title) + { + char activitymsg[50]; - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - wal_receiver_timeout); + snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X", + (uint32) (*startpoint >> 32), + (uint32) *startpoint); + set_ps_display(activitymsg, false); + } +} - if (now >= timeout) - ereport(ERROR, - (errmsg("terminating walreceiver due to timeout"))); +/* + * Fetch any missing timeline history files between 'first' and 'last' + * (inclusive) from the server. + */ +static void +WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) +{ + TimeLineID tli; - /* - * We didn't receive anything new, for half of receiver - * replication timeout. Ping the server. - */ - if (!ping_sent) - { - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout/2)); - if (now >= timeout) - { - requestReply = true; - ping_sent = true; - } - } - } + for (tli = first; tli <= last; tli++) + { + if (!existsTimeLineHistory(tli)) + { + char *fname; + char *content; + int len; + char expectedfname[MAXFNAMELEN]; - XLogWalRcvSendReply(requestReply, requestReply); - XLogWalRcvSendHSFeedback(); + ereport(LOG, + (errmsg("fetching timeline history file for timeline %u from primary server", + tli))); + + EnableWalRcvImmediateExit(); + walrcv_readtimelinehistoryfile(tli, &fname, &content, &len); + DisableWalRcvImmediateExit(); + + /* + * Check that the filename on the master matches what we calculated + * ourselves. This is just a sanity check, it should always match. + */ + TLHistoryFileName(expectedfname, tli); + if (strcmp(fname, expectedfname) != 0) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("primary reported unexpected filename for timeline history file of timeline %u", + tli))); + + /* + * Write the file to pg_xlog. + */ + writeTimeLineHistoryFile(tli, content, len); + + pfree(fname); + pfree(content); } } } @@ -408,9 +676,15 @@ WalRcvDie(int code, Datum arg) /* Ensure that all WAL records received are flushed to disk */ XLogWalRcvFlush(true); + DisownLatch(&walrcv->latch); + SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->walRcvState == WALRCV_RUNNING || + Assert(walrcv->walRcvState == WALRCV_STREAMING || + walrcv->walRcvState == WALRCV_RESTARTING || + walrcv->walRcvState == WALRCV_STARTING || + walrcv->walRcvState == WALRCV_WAITING || walrcv->walRcvState == WALRCV_STOPPING); + Assert(walrcv->pid == MyProcPid); walrcv->walRcvState = WALRCV_STOPPED; walrcv->pid = 0; SpinLockRelease(&walrcv->mutex); @@ -418,6 +692,9 @@ WalRcvDie(int code, Datum arg) /* Terminate the connection gracefully. */ if (walrcv_disconnect != NULL) walrcv_disconnect(); + + /* Wake up the startup process to notice promptly that we're gone */ + WakeupRecovery(); } /* SIGHUP: set flag to re-read config file at next convenient time */ @@ -427,6 +704,14 @@ WalRcvSigHupHandler(SIGNAL_ARGS) got_SIGHUP = true; } + +/* SIGUSR1: used by latch mechanism */ +static void +WalRcvSigUsr1Handler(SIGNAL_ARGS) +{ + latch_sigusr1_handler(); +} + /* SIGTERM: set flag for main loop, or shutdown immediately if safe */ static void WalRcvShutdownHandler(SIGNAL_ARGS) @@ -435,6 +720,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS) got_SIGTERM = true; + SetLatch(&WalRcv->latch); + /* Don't joggle the elbow of proc_exit */ if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) ProcessWalRcvInterrupts(); @@ -661,6 +948,7 @@ XLogWalRcvFlush(bool dying) { walrcv->latestChunkStart = walrcv->receivedUpto; walrcv->receivedUpto = LogstreamResult.Flush; + walrcv->receivedTLI = ThisTimeLineID; } SpinLockRelease(&walrcv->mutex); @@ -738,7 +1026,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + applyPtr = GetXLogReplayRecPtr(); resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 9eba180f04973df6c4f0bc5a73686f0aca5dcb14..a8ccfc66398bab64b90c8443182a0a5f1ca10d43 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -64,12 +64,13 @@ WalRcvShmemInit(void) MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); + InitSharedLatch(&WalRcv->latch); } } -/* Is walreceiver in progress (or starting up)? */ +/* Is walreceiver running (or starting up)? */ bool -WalRcvInProgress(void) +WalRcvRunning(void) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; @@ -110,6 +111,53 @@ WalRcvInProgress(void) return false; } +/* + * Is walreceiver running and streaming (or at least attempting to connect, + * or starting up)? + */ +bool +WalRcvStreaming(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + WalRcvState state; + pg_time_t startTime; + + SpinLockAcquire(&walrcv->mutex); + + state = walrcv->walRcvState; + startTime = walrcv->startTime; + + SpinLockRelease(&walrcv->mutex); + + /* + * If it has taken too long for walreceiver to start up, give up. Setting + * the state to STOPPED ensures that if walreceiver later does start up + * after all, it will see that it's not supposed to be running and die + * without doing anything. + */ + if (state == WALRCV_STARTING) + { + pg_time_t now = (pg_time_t) time(NULL); + + if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) + { + SpinLockAcquire(&walrcv->mutex); + + if (walrcv->walRcvState == WALRCV_STARTING) + state = walrcv->walRcvState = WALRCV_STOPPED; + + SpinLockRelease(&walrcv->mutex); + } + } + + if (state == WALRCV_STREAMING || state == WALRCV_STARTING || + state == WALRCV_RESTARTING) + return true; + else + return false; +} + /* * Stop walreceiver (if running) and wait for it to die. * Executed by the Startup process. @@ -135,7 +183,9 @@ ShutdownWalRcv(void) walrcv->walRcvState = WALRCV_STOPPED; break; - case WALRCV_RUNNING: + case WALRCV_STREAMING: + case WALRCV_WAITING: + case WALRCV_RESTARTING: walrcv->walRcvState = WALRCV_STOPPING; /* fall through */ case WALRCV_STOPPING: @@ -154,7 +204,7 @@ ShutdownWalRcv(void) * Wait for walreceiver to acknowledge its death by setting state to * WALRCV_STOPPED. */ - while (WalRcvInProgress()) + while (WalRcvRunning()) { /* * This possibly-long loop needs to handle interrupts of startup @@ -173,10 +223,11 @@ ShutdownWalRcv(void) * is a libpq connection string to use. */ void -RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) +RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; + bool launch = false; pg_time_t now = (pg_time_t) time(NULL); /* @@ -190,14 +241,22 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) SpinLockAcquire(&walrcv->mutex); - /* It better be stopped before we try to restart it */ - Assert(walrcv->walRcvState == WALRCV_STOPPED); + /* It better be stopped if we try to restart it */ + Assert(walrcv->walRcvState == WALRCV_STOPPED || + walrcv->walRcvState == WALRCV_WAITING); if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else walrcv->conninfo[0] = '\0'; - walrcv->walRcvState = WALRCV_STARTING; + + if (walrcv->walRcvState == WALRCV_STOPPED) + { + launch = true; + walrcv->walRcvState = WALRCV_STARTING; + } + else + walrcv->walRcvState = WALRCV_RESTARTING; walrcv->startTime = now; /* @@ -210,10 +269,14 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) walrcv->latestChunkStart = recptr; } walrcv->receiveStart = recptr; + walrcv->receiveStartTLI = tli; SpinLockRelease(&walrcv->mutex); - SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); + if (launch) + SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); + else + SetLatch(&walrcv->latch); } /* @@ -221,10 +284,11 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) * * Optionally, returns the previous chunk start, that is the first byte * written in the most recent walreceiver flush cycle. Callers not - * interested in that value may pass NULL for latestChunkStart. + * interested in that value may pass NULL for latestChunkStart. Same for + * receiveTLI. */ XLogRecPtr -GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart) +GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; @@ -234,6 +298,8 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart) recptr = walrcv->receivedUpto; if (latestChunkStart) *latestChunkStart = walrcv->latestChunkStart; + if (receiveTLI) + *receiveTLI = walrcv->receivedTLI; SpinLockRelease(&walrcv->mutex); return recptr; @@ -258,7 +324,7 @@ GetReplicationApplyDelay(void) receivePtr = walrcv->receivedUpto; SpinLockRelease(&walrcv->mutex); - replayPtr = GetXLogReplayRecPtr(NULL); + replayPtr = GetXLogReplayRecPtr(); if (XLByteEQ(receivePtr, replayPtr)) return 0; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 8774d7e8229edc34634bf4bcf7e4b5469e3fe5ad..aec57f5535fc42f17330fc066c1e10175663f963 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -7,10 +7,15 @@ * (Note that there can be more than one walsender process concurrently.) * It is started by the postmaster when the walreceiver of a standby server * connects to the primary server and requests XLOG streaming replication. - * It attempts to keep reading XLOG records from the disk and sending them - * to the standby server, as long as the connection is alive (i.e., like - * any backend, there is a one-to-one relationship between a connection - * and a walsender process). + * + * A walsender is similar to a regular backend, ie. there is a one-to-one + * relationship between a connection and a walsender process, but instead + * of processing SQL queries, it understands a small set of special + * replication-mode commands. The START_REPLICATION command begins streaming + * WAL to the client. While streaming, the walsender keeps reading XLOG + * records from the disk and sends them to the standby server over the + * COPY protocol, until the either side ends the replication by exiting COPY + * mode (or until the connection is closed). * * Normal termination is by SIGTERM, which instructs the walsender to * close the connection and exit(0) at next convenient moment. Emergency @@ -37,6 +42,7 @@ #include <signal.h> #include <unistd.h> +#include "access/timeline.h" #include "access/transam.h" #include "access/xlog_internal.h" #include "catalog/pg_type.h" @@ -87,8 +93,6 @@ bool am_walsender = false; /* Am I a walsender process ? */ bool am_cascading_walsender = false; /* Am I cascading WAL to * another standby ? */ -static bool replication_started = false; /* Started streaming yet? */ - /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int wal_sender_timeout = 60 * 1000; /* maximum time to send one @@ -106,6 +110,16 @@ static int sendFile = -1; static XLogSegNo sendSegNo = 0; static uint32 sendOff = 0; +/* + * These variables keep track of the state of the timeline we're currently + * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric, + * the timeline is not the latest timeline on this server, and the server's + * history forked off from that timeline at sendTimeLineValidUpto. + */ +static TimeLineID sendTimeLine = 0; +static bool sendTimeLineIsHistoric = false; +static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; + /* * How far have we sent WAL already? This is also advertised in * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) @@ -124,9 +138,26 @@ static TimestampTz last_reply_timestamp; /* Have we sent a heartbeat message asking for reply, since last reply? */ static bool ping_sent = false; +/* + * While streaming WAL in Copy mode, streamingDoneSending is set to true + * after we have sent CopyDone. We should not send any more CopyData messages + * after that. streamingDoneReceiving is set to true when we receive CopyDone + * from the other end. When both become true, it's time to exit Copy mode. + */ +static bool streamingDoneSending; +static bool streamingDoneReceiving; + /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; -volatile sig_atomic_t walsender_ready_to_stop = false; +static volatile sig_atomic_t walsender_ready_to_stop = false; + +/* + * This is set while we are streaming. When not set, SIGUSR2 signal will be + * handled like SIGTERM. When set, the main loop is responsible for checking + * walsender_ready_to_stop and terminating when it's set (after streaming any + * remaining WAL). + */ +static volatile sig_atomic_t replication_active = false; /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); @@ -134,7 +165,7 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ -static void WalSndLoop(void) __attribute__((noreturn)); +static void WalSndLoop(void); static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); static void XLogSend(bool *caughtup); @@ -164,6 +195,16 @@ InitWalSender(void) */ if (am_cascading_walsender) ThisTimeLineID = GetRecoveryTargetTLI(); + + /* + * Let postmaster know that we're a WAL sender. Once we've declared us as + * a WAL sender process, postmaster will let us outlive the bgwriter and + * kill us last in the shutdown sequence, so we get a chance to stream all + * remaining WAL at shutdown, including the shutdown checkpoint. Note that + * there's no going back, and we mustn't write any WAL records after this. + */ + MarkPostmasterChildWalSender(); + SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); } /* @@ -182,17 +223,16 @@ WalSndErrorCleanup() sendFile = -1; } - /* - * Don't return back to the command loop after we've started replicating. - * We've already marked us as an actively streaming WAL sender in the - * PMSignal slot, and there's currently no way to undo that. - */ - if (replication_started) + replication_active = false; + if (walsender_ready_to_stop) proc_exit(0); + + /* Revert back to startup state */ + WalSndSetState(WALSNDSTATE_STARTUP); } /* - * IDENTIFY_SYSTEM + * Handle the IDENTIFY_SYSTEM command. */ static void IdentifySystem(void) @@ -210,9 +250,17 @@ IdentifySystem(void) snprintf(sysid, sizeof(sysid), UINT64_FORMAT, GetSystemIdentifier()); - snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr(); + am_cascading_walsender = RecoveryInProgress(); + if (am_cascading_walsender) + { + logptr = GetStandbyFlushRecPtr(); + ThisTimeLineID = GetRecoveryTargetTLI(); + } + else + logptr = GetInsertRecPtr(); + + snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); @@ -261,56 +309,106 @@ IdentifySystem(void) pq_endmessage(&buf); } + /* - * Handle START_REPLICATION command. - * - * At the moment, this never returns, but an ereport(ERROR) will take us back - * to the main loop. + * Handle TIMELINE_HISTORY command. */ static void -StartReplication(StartReplicationCmd *cmd) +SendTimeLineHistory(TimeLineHistoryCmd *cmd) { StringInfoData buf; + char histfname[MAXFNAMELEN]; + char path[MAXPGPATH]; + int fd; + size_t histfilelen; + size_t bytesleft; /* - * Let postmaster know that we're streaming. Once we've declared us as a - * WAL sender process, postmaster will let us outlive the bgwriter and - * kill us last in the shutdown sequence, so we get a chance to stream all - * remaining WAL at shutdown, including the shutdown checkpoint. Note that - * there's no going back, and we mustn't write any WAL records after this. + * Reply with a result set with one row, and two columns. The first col + * is the name of the history file, 2nd is the contents. */ - MarkPostmasterChildWalSender(); - SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); - replication_started = true; - /* - * When promoting a cascading standby, postmaster sends SIGUSR2 to any - * cascading walsenders to kill them. But there is a corner-case where - * such walsender fails to receive SIGUSR2 and survives a standby - * promotion unexpectedly. This happens when postmaster sends SIGUSR2 - * before the walsender marks itself as a WAL sender, because postmaster - * sends SIGUSR2 to only the processes marked as a WAL sender. - * - * To avoid this corner-case, if recovery is NOT in progress even though - * the walsender is cascading one, we do the same thing as SIGUSR2 signal - * handler does, i.e., set walsender_ready_to_stop to true. Which causes - * the walsender to end later. - * - * When terminating cascading walsenders, usually postmaster writes the - * log message announcing the terminations. But there is a race condition - * here. If there is no walsender except this process before reaching - * here, postmaster thinks that there is no walsender and suppresses that - * log message. To handle this case, we always emit that log message here. - * This might cause duplicate log messages, but which is less likely to - * happen, so it's not worth writing some code to suppress them. - */ - if (am_cascading_walsender && !RecoveryInProgress()) + TLHistoryFileName(histfname, cmd->timeline); + TLHistoryFilePath(path, cmd->timeline); + + /* Send a RowDescription message */ + pq_beginmessage(&buf, 'T'); + pq_sendint(&buf, 2, 2); /* 2 fields */ + + /* first field */ + pq_sendstring(&buf, "filename"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* second field */ + pq_sendstring(&buf, "content"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, BYTEAOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 2, 2); /* # of columns */ + pq_sendint(&buf, strlen(histfname), 4); /* col1 len */ + pq_sendbytes(&buf, histfname, strlen(histfname)); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + /* Determine file length and send it to client */ + histfilelen = lseek(fd, 0, SEEK_END); + if (histfilelen < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek to end of file \"%s\": %m", path))); + if (lseek(fd, 0, SEEK_SET) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek to beginning of file \"%s\": %m", path))); + + pq_sendint(&buf, histfilelen, 4); /* col2 len */ + + bytesleft = histfilelen; + while (bytesleft > 0) { - ereport(LOG, - (errmsg("terminating walsender process to force cascaded standby " - "to update timeline and reconnect"))); - walsender_ready_to_stop = true; + char rbuf[BLCKSZ]; + int nread; + + nread = read(fd, rbuf, sizeof(rbuf)); + if (nread <= 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + path))); + pq_sendbytes(&buf, rbuf, nread); + bytesleft -= nread; } + CloseTransientFile(fd); + + pq_endmessage(&buf); +} + +/* + * Handle START_REPLICATION command. + * + * At the moment, this never returns, but an ereport(ERROR) will take us back + * to the main loop. + */ +static void +StartReplication(StartReplicationCmd *cmd) +{ + StringInfoData buf; /* * We assume here that we're logging enough information in the WAL for @@ -322,42 +420,144 @@ StartReplication(StartReplicationCmd *cmd) */ /* - * When we first start replication the standby will be behind the primary. - * For some applications, for example, synchronous replication, it is - * important to have a clear state for this initial catchup mode, so we - * can trigger actions when we change streaming state later. We may stay - * in this state for a long time, which is exactly why we want to be able - * to monitor whether or not we are still here. + * Select the timeline. If it was given explicitly by the client, use + * that. Otherwise use the current ThisTimeLineID. */ - WalSndSetState(WALSNDSTATE_CATCHUP); + if (cmd->timeline != 0) + { + XLogRecPtr switchpoint; - /* Send a CopyBothResponse message, and start streaming */ - pq_beginmessage(&buf, 'W'); - pq_sendbyte(&buf, 0); - pq_sendint(&buf, 0, 2); - pq_endmessage(&buf); - pq_flush(); + sendTimeLine = cmd->timeline; + if (sendTimeLine == ThisTimeLineID) + { + sendTimeLineIsHistoric = false; + sendTimeLineValidUpto = InvalidXLogRecPtr; + } + else + { + List *timeLineHistory; - /* - * Initialize position to the received one, then the xlog records begin to - * be shipped from that position - */ - sentPtr = cmd->startpoint; + sendTimeLineIsHistoric = true; - /* Also update the start position status in shared memory */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = MyWalSnd; + /* + * Check that the timeline the client requested for exists, and the + * requested start location is on that timeline. + */ + timeLineHistory = readTimeLineHistory(ThisTimeLineID); + switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory); + list_free_deep(timeLineHistory); - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = sentPtr; - SpinLockRelease(&walsnd->mutex); + /* + * Found the requested timeline in the history. Check that + * requested startpoint is on that timeline in our history. + * + * This is quite loose on purpose. We only check that we didn't + * fork off the requested timeline before the switchpoint. We don't + * check that we switched *to* it before the requested starting + * point. This is because the client can legitimately request to + * start replication from the beginning of the WAL segment that + * contains switchpoint, but on the new timeline, so that it + * doesn't end up with a partial segment. If you ask for a too old + * starting point, you'll get an error later when we fail to find + * the requested WAL segment in pg_xlog. + * + * XXX: we could be more strict here and only allow a startpoint + * that's older than the switchpoint, if it it's still in the same + * WAL segment. + */ + if (!XLogRecPtrIsInvalid(switchpoint) && + XLByteLT(switchpoint, cmd->startpoint)) + { + ereport(ERROR, + (errmsg("requested starting point %X/%X on timeline %u is not in this server's history", + (uint32) (cmd->startpoint >> 32), + (uint32) (cmd->startpoint), + cmd->timeline), + errdetail("This server's history forked from timeline %u at %X/%X", + cmd->timeline, + (uint32) (switchpoint >> 32), + (uint32) (switchpoint)))); + } + sendTimeLineValidUpto = switchpoint; + } + } + else + { + sendTimeLine = ThisTimeLineID; + sendTimeLineValidUpto = InvalidXLogRecPtr; + sendTimeLineIsHistoric = false; } - SyncRepInitConfig(); + streamingDoneSending = streamingDoneReceiving = false; + + /* If there is nothing to stream, don't even enter COPY mode */ + if (!sendTimeLineIsHistoric || + XLByteLT(cmd->startpoint, sendTimeLineValidUpto)) + { + XLogRecPtr FlushPtr; + /* + * When we first start replication the standby will be behind the primary. + * For some applications, for example, synchronous replication, it is + * important to have a clear state for this initial catchup mode, so we + * can trigger actions when we change streaming state later. We may stay + * in this state for a long time, which is exactly why we want to be able + * to monitor whether or not we are still here. + */ + WalSndSetState(WALSNDSTATE_CATCHUP); + + /* Send a CopyBothResponse message, and start streaming */ + pq_beginmessage(&buf, 'W'); + pq_sendbyte(&buf, 0); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + pq_flush(); + + /* + * Don't allow a request to stream from a future point in WAL that + * hasn't been flushed to disk in this server yet. + */ + if (am_cascading_walsender) + FlushPtr = GetStandbyFlushRecPtr(); + else + FlushPtr = GetFlushRecPtr(); + if (XLByteLT(FlushPtr, cmd->startpoint)) + { + ereport(ERROR, + (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X", + (uint32) (cmd->startpoint >> 32), + (uint32) (cmd->startpoint), + (uint32) (FlushPtr >> 32), + (uint32) (FlushPtr)))); + } + + /* Start streaming from the requested point */ + sentPtr = cmd->startpoint; - /* Main loop of walsender */ - WalSndLoop(); + /* Initialize shared memory status, too */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + + SyncRepInitConfig(); + + /* Main loop of walsender */ + replication_active = true; + + WalSndLoop(); + + replication_active = false; + if (walsender_ready_to_stop) + proc_exit(0); + WalSndSetState(WALSNDSTATE_STARTUP); + } + + /* Get out of COPY mode (CommandComplete). */ + EndCommand("COPY 0", DestRemote); } /* @@ -406,10 +606,13 @@ exec_replication_command(const char *cmd_string) SendBaseBackup((BaseBackupCmd *) cmd_node); break; + case T_TimeLineHistoryCmd: + SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); + break; + default: - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby query string: %s", cmd_string))); + elog(ERROR, "unrecognized replication command node tag: %u", + cmd_node->type); } /* done */ @@ -421,7 +624,8 @@ exec_replication_command(const char *cmd_string) } /* - * Check if the remote end has closed the connection. + * Process any incoming messages while streaming. Also checks if the remote + * end has closed the connection. */ static void ProcessRepliesIfAny(void) @@ -430,7 +634,12 @@ ProcessRepliesIfAny(void) int r; bool received = false; - for (;;) + /* + * If we already received a CopyDone from the frontend, any subsequent + * message is the beginning of a new command, and should be processed in + * the main processing loop. + */ + while (!streamingDoneReceiving) { r = pq_getbyte_if_available(&firstchar); if (r < 0) @@ -458,6 +667,31 @@ ProcessRepliesIfAny(void) received = true; break; + /* + * CopyDone means the standby requested to finish streaming. + * Reply with CopyDone, if we had not sent that already. + */ + case 'c': + if (!streamingDoneSending) + { + pq_putmessage_noblock('c', NULL, 0); + streamingDoneSending = true; + } + + /* consume the CopyData message */ + resetStringInfo(&reply_message); + if (pq_getmessage(&reply_message, 0)) + { + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + proc_exit(0); + } + + streamingDoneReceiving = true; + received = true; + break; + /* * 'X' means that the standby is closing down the socket. */ @@ -666,7 +900,10 @@ WalSndLoop(void) last_reply_timestamp = GetCurrentTimestamp(); ping_sent = false; - /* Loop forever, unless we get an error */ + /* + * Loop until we reach the end of this timeline or the client requests + * to stop streaming. + */ for (;;) { /* Clear any already-pending wakeups */ @@ -692,6 +929,14 @@ WalSndLoop(void) /* Check for input from the client */ ProcessRepliesIfAny(); + /* + * If we have received CopyDone from the client, sent CopyDone + * ourselves, and the output buffer is empty, it's time to exit + * streaming. + */ + if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving) + break; + /* * If we don't have any pending data in the output buffer, try to send * some more. If there is some, we don't bother to call XLogSend @@ -705,7 +950,7 @@ WalSndLoop(void) /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) - break; + goto send_failure; /* If nothing remains to be sent right now ... */ if (caughtup && !pq_is_send_pending()) @@ -739,7 +984,7 @@ WalSndLoop(void) if (caughtup && !pq_is_send_pending()) { /* Inform the standby that XLOG streaming is done */ - pq_puttextmessage('C', "COPY 0"); + EndCommand("COPY 0", DestRemote); pq_flush(); proc_exit(0); @@ -754,14 +999,16 @@ WalSndLoop(void) * loaded a subset of the available data but then pq_flush_if_writable * flushed it all --- we should immediately try to send more. */ - if (caughtup || pq_is_send_pending()) + if ((caughtup && !streamingDoneSending) || pq_is_send_pending()) { TimestampTz timeout = 0; long sleeptime = 10000; /* 10 s */ int wakeEvents; - wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | - WL_SOCKET_READABLE | WL_TIMEOUT; + wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT; + + if (!streamingDoneReceiving) + wakeEvents |= WL_SOCKET_READABLE; if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; @@ -813,11 +1060,13 @@ WalSndLoop(void) */ ereport(COMMERROR, (errmsg("terminating walsender process due to replication timeout"))); - break; + goto send_failure; } } } + return; +send_failure: /* * Get here on send failure. Clean up and exit. * @@ -916,7 +1165,7 @@ WalSndKill(int code, Datum arg) * more than one. */ void -XLogRead(char *buf, XLogRecPtr startptr, Size count) +XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) { char *p; XLogRecPtr recptr; @@ -937,7 +1186,7 @@ retry: startoff = recptr % XLogSegSize; - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || sendTimeLine != tli) { char path[MAXPGPATH]; @@ -945,8 +1194,9 @@ retry: if (sendFile >= 0) close(sendFile); + sendTimeLine = tli; XLByteToSeg(recptr, sendSegNo); - XLogFilePath(path, ThisTimeLineID, sendSegNo); + XLogFilePath(path, sendTimeLine, sendSegNo); sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (sendFile < 0) @@ -960,7 +1210,7 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(ThisTimeLineID, sendSegNo)))); + XLogFileNameP(sendTimeLine, sendSegNo)))); else ereport(ERROR, (errcode_for_file_access(), @@ -977,7 +1227,7 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(ThisTimeLineID, sendSegNo), + XLogFileNameP(sendTimeLine, sendSegNo), startoff))); sendOff = startoff; } @@ -994,7 +1244,7 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u, length %lu: %m", - XLogFileNameP(ThisTimeLineID, sendSegNo), + XLogFileNameP(sendTimeLine, sendSegNo), sendOff, (unsigned long) segbytes))); } @@ -1019,7 +1269,7 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(ThisTimeLineID, segno)))); + XLogFileNameP(sendTimeLine, segno)))); /* * During recovery, the currently-open WAL file might be replaced with the @@ -1060,10 +1310,17 @@ static void XLogSend(bool *caughtup) { XLogRecPtr SendRqstPtr; + XLogRecPtr FlushPtr; XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; + if (streamingDoneSending) + { + *caughtup = true; + return; + } + /* * Attempt to send all data that's already been written out and fsync'd to * disk. We cannot go further than what's been written out given the @@ -1073,32 +1330,103 @@ XLogSend(bool *caughtup) * that gets lost on the master. */ if (am_cascading_walsender) + FlushPtr = GetStandbyFlushRecPtr(); + else + FlushPtr = GetFlushRecPtr(); + + /* + * In a cascading standby, the current recovery target timeline can + * change, or we can be promoted. In either case, the current timeline + * becomes historic. We need to detect that so that we don't try to stream + * past the point where we switched to another timeline. It's checked + * after calculating FlushPtr, to avoid a race condition: if the timeline + * becomes historic just after we checked that it was still current, it + * should still be OK to stream it up to the FlushPtr that was calculated + * before it became historic. + */ + if (!sendTimeLineIsHistoric && am_cascading_walsender) { - TimeLineID currentTargetTLI; - SendRqstPtr = GetStandbyFlushRecPtr(¤tTargetTLI); + bool becameHistoric = false; + TimeLineID targetTLI; - /* - * If the recovery target timeline changed, bail out. It's a bit - * unfortunate that we have to just disconnect, but there is no way - * to tell the client that the timeline changed. We also don't know - * exactly where the switch happened, so we cannot safely try to send - * up to the switchover point before disconnecting. - */ - if (currentTargetTLI != ThisTimeLineID) + if (!RecoveryInProgress()) { - if (!walsender_ready_to_stop) - ereport(LOG, - (errmsg("terminating walsender process to force cascaded standby " - "to update timeline and reconnect"))); - walsender_ready_to_stop = true; - *caughtup = true; - return; + /* + * We have been promoted. RecoveryInProgress() updated + * ThisTimeLineID to the new current timeline. + */ + targetTLI = ThisTimeLineID; + am_cascading_walsender = false; + becameHistoric = true; + } + else + { + /* + * Still a cascading standby. But is the timeline we're sending + * still the recovery target timeline? + */ + targetTLI = GetRecoveryTargetTLI(); + + if (targetTLI != sendTimeLine) + becameHistoric = true; + } + + if (becameHistoric) + { + /* + * The timeline we were sending has become historic. Read the + * timeline history file of the new timeline to see where exactly + * we forked off from the timeline we were sending. + */ + List *history; + + history = readTimeLineHistory(targetTLI); + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history); + Assert(XLByteLE(sentPtr, sendTimeLineValidUpto)); + list_free_deep(history); + + /* the switchpoint should be >= current send pointer */ + if (!XLByteLE(sentPtr, sendTimeLineValidUpto)) + elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X", + sendTimeLine, + (uint32) (sendTimeLineValidUpto >> 32), + (uint32) sendTimeLineValidUpto, + (uint32) (sentPtr >> 32), + (uint32) sentPtr); + + sendTimeLineIsHistoric = true; } } + + /* + * If this is a historic timeline and we've reached the point where we + * forked to the next timeline, stop streaming. + */ + if (sendTimeLineIsHistoric && XLByteLE(sendTimeLineValidUpto, sentPtr)) + { + /* close the current file. */ + if (sendFile >= 0) + close(sendFile); + sendFile = -1; + + /* Send CopyDone */ + pq_putmessage_noblock('c', NULL, 0); + streamingDoneSending = true; + + *caughtup = true; + return; + } + + /* + * Stream up to the point known to be flushed to disk, or to the end of + * this timeline, whichever comes first. + */ + if (sendTimeLineIsHistoric && XLByteLT(sendTimeLineValidUpto, FlushPtr)) + SendRqstPtr = sendTimeLineValidUpto; else - SendRqstPtr = GetFlushRecPtr(); + SendRqstPtr = FlushPtr; - /* Quick exit if nothing to do */ + Assert(XLByteLE(sentPtr, SendRqstPtr)); if (XLByteLE(SendRqstPtr, sentPtr)) { *caughtup = true; @@ -1124,7 +1452,10 @@ XLogSend(bool *caughtup) if (XLByteLE(SendRqstPtr, endptr)) { endptr = SendRqstPtr; - *caughtup = true; + if (sendTimeLineIsHistoric) + *caughtup = false; + else + *caughtup = true; } else { @@ -1151,7 +1482,7 @@ XLogSend(bool *caughtup) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(&output_message.data[output_message.len], startptr, nbytes); + XLogRead(&output_message.data[output_message.len], sendTimeLine, startptr, nbytes); output_message.len += nbytes; output_message.data[output_message.len] = '\0'; @@ -1242,6 +1573,14 @@ WalSndLastCycleHandler(SIGNAL_ARGS) { int save_errno = errno; + /* + * If replication has not yet started, die like with SIGTERM. If + * replication is active, only set a flag and wake up the main loop. It + * will send any outstanding WAL, and then exit gracefully. + */ + if (!replication_active) + kill(MyProcPid, SIGTERM); + walsender_ready_to_stop = true; if (MyWalSnd) SetLatch(&MyWalSnd->latch); diff --git a/src/include/access/timeline.h b/src/include/access/timeline.h index 785195bd36a9caf740a77ed55f3de60869042ac8..08b75f6d79dcada28ca19d0f952fd19d9922b02e 100644 --- a/src/include/access/timeline.h +++ b/src/include/access/timeline.h @@ -34,6 +34,7 @@ extern bool existsTimeLineHistory(TimeLineID probeTLI); extern TimeLineID findNewestTimeLine(TimeLineID startTLI); extern void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, XLogRecPtr switchpoint, char *reason); +extern void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size); extern bool tliInHistory(TimeLineID tli, List *expectedTLIs); extern TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history); extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 32c2e40ac14fe249ce662b6262d4391351ec1d09..c8cd37981c58785bcb3326034fd533051124d820 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -283,8 +283,8 @@ extern bool RecoveryInProgress(void); extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); -extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI); -extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI); +extern XLogRecPtr GetXLogReplayRecPtr(void); +extern XLogRecPtr GetStandbyFlushRecPtr(void); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); extern bool RecoveryIsPaused(void); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 438a1d98630e97bcf0d9f02fdc78e922e02b5a10..5529a8b811cc5e5308e3ae72d4f16662d9ee5192 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -407,6 +407,7 @@ typedef enum NodeTag T_IdentifySystemCmd, T_BaseBackupCmd, T_StartReplicationCmd, + T_TimeLineHistoryCmd, /* * TAGS FOR RANDOM OTHER STUFF diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 236a36dd98045b1a7633b7e81451b25a43db78bd..8d686879a9aeaea94fc0cc88677a69b451430e73 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -46,7 +46,19 @@ typedef struct BaseBackupCmd typedef struct StartReplicationCmd { NodeTag type; + TimeLineID timeline; XLogRecPtr startpoint; } StartReplicationCmd; + +/* ---------------------- + * TIMELINE_HISTORY command + * ---------------------- + */ +typedef struct TimeLineHistoryCmd +{ + NodeTag type; + TimeLineID timeline; +} TimeLineHistoryCmd; + #endif /* REPLNODES_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 4cf5a27e6d9cbe9728c83df50a8039a70b76db6c..e62a1db63fe8623d6891877687714ccf246c9c4b 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -14,6 +14,7 @@ #include "access/xlog.h" #include "access/xlogdefs.h" +#include "storage/latch.h" #include "storage/spin.h" #include "pgtime.h" @@ -40,7 +41,9 @@ typedef enum WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STARTING, /* launched, but the process hasn't * initialized yet */ - WALRCV_RUNNING, /* walreceiver is running */ + WALRCV_STREAMING, /* walreceiver is streaming */ + WALRCV_WAITING, /* stopped streaming, waiting for orders */ + WALRCV_RESTARTING, /* asked to restart streaming */ WALRCV_STOPPING /* requested to stop, but still running */ } WalRcvState; @@ -57,19 +60,23 @@ typedef struct pg_time_t startTime; /* - * receiveStart is the first byte position that will be received. When - * startup process starts the walreceiver, it sets receiveStart to the - * point where it wants the streaming to begin. + * receiveStart and receiveStartTLI indicate the first byte position + * and timeline that will be received. When startup process starts the + * walreceiver, it sets these to the point where it wants the streaming + * to begin. */ XLogRecPtr receiveStart; + TimeLineID receiveStartTLI; /* * receivedUpto-1 is the last byte position that has already been - * received. At the first startup of walreceiver, receivedUpto is set to - * receiveStart. After that, walreceiver updates this whenever it flushes - * the received WAL to disk. + * received, and receivedTLI is the timeline it came from. At the first + * startup of walreceiver, these are set to receiveStart and + * receiveStartTLI. After that, walreceiver updates these whenever it + * flushes the received WAL to disk. */ XLogRecPtr receivedUpto; + TimeLineID receivedTLI; /* * latestChunkStart is the starting byte position of the current "batch" @@ -97,16 +104,34 @@ typedef struct char conninfo[MAXCONNINFO]; slock_t mutex; /* locks shared variables shown above */ + + /* + * Latch used by startup process to wake up walreceiver after telling it + * where to start streaming (after setting receiveStart and + * receiveStartTLI). + */ + Latch latch; } WalRcvData; extern WalRcvData *WalRcv; /* libpqwalreceiver hooks */ -typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint); +typedef void (*walrcv_connect_type) (char *conninfo); extern PGDLLIMPORT walrcv_connect_type walrcv_connect; -typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type, - char **buffer, int *len); +typedef void (*walrcv_identify_system_type) (TimeLineID *primary_tli); +extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system; + +typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size); +extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile; + +typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint); +extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming; + +typedef void (*walrcv_endstreaming_type) (void); +extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming; + +typedef int (*walrcv_receive_type) (int timeout, char **buffer); extern PGDLLIMPORT walrcv_receive_type walrcv_receive; typedef void (*walrcv_send_type) (const char *buffer, int nbytes); @@ -122,9 +147,10 @@ extern void WalReceiverMain(void) __attribute__((noreturn)); extern Size WalRcvShmemSize(void); extern void WalRcvShmemInit(void); extern void ShutdownWalRcv(void); -extern bool WalRcvInProgress(void); -extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo); -extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart); +extern bool WalRcvStreaming(void); +extern bool WalRcvRunning(void); +extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo); +extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index df8e9514780b3f85a21af31541d66eabc59ec781..eabafab33b8219fa2e363c2a04d1d789f4c23b71 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -19,7 +19,6 @@ /* global state */ extern bool am_walsender; extern bool am_cascading_walsender; -extern volatile sig_atomic_t walsender_ready_to_stop; extern bool wake_wal_senders; /* user-settable parameters */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 66234cd8b5925d81b535522983fdc75da1d40c75..5d849d4b0f5dcb50f36dc1254385d5ce09162fe7 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -95,7 +95,7 @@ extern WalSndCtlData *WalSndCtl; extern void WalSndSetState(WalSndState state); -extern void XLogRead(char *buf, XLogRecPtr startptr, Size count); +extern void XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count); /* * Internal functions for parsing the replication grammar, in repl_gram.y and diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 77124efe779656f49f8bb028378d351bfcb0e8a7..4bc5e647153ba55431456cd314b1fc9baa7ddfe6 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2245,7 +2245,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) { if (!conn) return -1; - if (conn->asyncStatus != PGASYNC_COPY_IN) + if (conn->asyncStatus != PGASYNC_COPY_IN && + conn->asyncStatus != PGASYNC_COPY_BOTH) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n")); @@ -2305,7 +2306,10 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) } /* Return to active duty */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->asyncStatus == PGASYNC_COPY_BOTH) + conn->asyncStatus = PGASYNC_COPY_OUT; + else + conn->asyncStatus = PGASYNC_BUSY; resetPQExpBuffer(&conn->errorMessage); /* Try to flush data */ diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index c605bcd734c9fd76d15c893db6d66f727bb6d876..e8f3f337b1a2c6e4be1f2afecbadda383c105d71 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -1484,7 +1484,12 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) * expect the state was already changed. */ if (msgLength == -1) - conn->asyncStatus = PGASYNC_BUSY; + { + if (conn->asyncStatus == PGASYNC_COPY_BOTH) + conn->asyncStatus = PGASYNC_COPY_IN; + else + conn->asyncStatus = PGASYNC_BUSY; + } return msgLength; /* end-of-copy or error */ } if (msgLength == 0)