From 98e2c298c2f5fef158a6bf0d543a84098671ae7d Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Thu, 12 Jul 2018 10:20:27 +0900
Subject: [PATCH] Make logical WAL sender report streaming state appropriately

WAL senders sending logically-decoded data fail to properly report in
"streaming" state when starting up, hence as long as one extra record is
not replayed, such WAL senders would remain in a "catchup" state, which
is inconsistent with the physical cousin.

This can be easily reproduced by for example using pg_recvlogical and
restarting the upstream server.  The TAP tests have been slightly
modified to detect the failure and strengthened so as future tests also
make sure that a node is in streaming state when waiting for its
catchup.

Backpatch down to 9.4 where this code has been introduced.

Reported-by: Sawada Masahiko
Author: Simon Riggs, Sawada Masahiko
Reviewed-by: Petr Jelinek, Michael Paquier, Vaishnavi Prabakaran
Discussion: https://postgr.es/m/CAD21AoB2ZbCCqOx=bgKMcLrAvs1V0ZMqzs7wBTuDySezTGtMZA@mail.gmail.com
---
 src/backend/replication/walsender.c | 22 ++++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3a06a4a307a..8f720521abf 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1914,8 +1914,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
 			if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
 			{
 				ereport(DEBUG1,
-					 (errmsg("standby \"%s\" has now caught up with primary",
-							 application_name)));
+						(errmsg("\"%s\" has now caught up with upstream server",
+								application_name)));
 				WalSndSetState(WALSNDSTATE_STREAMING);
 			}
 
@@ -2482,10 +2482,10 @@ XLogSendLogical(void)
 	char	   *errm;
 
 	/*
-	 * Don't know whether we've caught up yet. We'll set it to true in
-	 * WalSndWaitForWal, if we're actually waiting. We also set to true if
-	 * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
-	 * i.e. when we're shutting down.
+	 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
+	 * true in WalSndWaitForWal, if we're actually waiting. We also set to
+	 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
+	 * didn't wait - i.e. when we're shutting down.
 	 */
 	WalSndCaughtUp = false;
 
@@ -2498,9 +2498,19 @@ XLogSendLogical(void)
 
 	if (record != NULL)
 	{
+		/* XXX: Note that logical decoding cannot be used while in recovery */
+		XLogRecPtr	flushPtr = GetFlushRecPtr();
+
 		LogicalDecodingProcessRecord(logical_decoding_ctx, record);
 
 		sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+
+		/*
+		 * If we have sent a record that is at or beyond the flushed point, we
+		 * have caught up.
+		 */
+		if (sentPtr >= flushPtr)
+			WalSndCaughtUp = true;
 	}
 	else
 	{
-- 
GitLab