From 7326a7d63801673d3569def65cb7955d6486f254 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 1 Aug 2018 17:39:07 -0400
Subject: [PATCH] Fix logical replication slot initialization

This was broken in commit 9c7d06d60680, which inadvertently gave the
wrong value to fast_forward in one StartupDecodingContext call.  Fix by
flipping the value.  Add a test for the obvious error, namely trying to
initialize a replication slot with an nonexistent output plugin.

While at it, move the CreateDecodingContext call earlier, so that any
errors are reported before sending the CopyBoth message.

Author: Dave Cramer <davecramer@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CADK3HHLVkeRe1v4P02-5hj55H3_yJg3AEtpXyEY5T3wuzO2jSg@mail.gmail.com
---
 contrib/test_decoding/expected/slot.out   |  2 ++
 contrib/test_decoding/sql/slot.sql        |  2 ++
 src/backend/replication/logical/logical.c |  2 +-
 src/backend/replication/walsender.c       | 23 +++++++++++++----------
 4 files changed, 18 insertions(+), 11 deletions(-)

diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 2737a8a301b..523621a705d 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -30,6 +30,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'tes
  init
 (1 row)
 
+SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
+ERROR:  could not access file "nonexistent": No such file or directory
 -- here we want to start a new session and wait till old one is gone
 select pg_backend_pid() as oldpid \gset
 \c -
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index 24cdf7155d7..c8d08f85417 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -9,6 +9,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test
 
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true);
 
+SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
+
 -- here we want to start a new session and wait till old one is gone
 select pg_backend_pid() as oldpid \gset
 \c -
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c9bbdcda74d..6fc762e318d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -312,7 +312,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotSave();
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-								 need_full_snapshot, true,
+								 need_full_snapshot, false,
 								 read_page, prepare_write, do_write,
 								 update_progress);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3a0106bc933..15becc11ee2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1061,6 +1061,19 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 		got_STOPPING = true;
 	}
 
+	/*
+	 * Create our decoding context, making it start at the previously ack'ed
+	 * position.
+	 *
+	 * Do this before sending CopyBoth, so that any errors are reported early.
+	 */
+	logical_decoding_ctx =
+		CreateDecodingContext(cmd->startpoint, cmd->options, false,
+							  logical_read_xlog_page,
+							  WalSndPrepareWrite, WalSndWriteData,
+							  WalSndUpdateProgress);
+
+
 	WalSndSetState(WALSNDSTATE_CATCHUP);
 
 	/* Send a CopyBothResponse message, and start streaming */
@@ -1070,16 +1083,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	pq_endmessage(&buf);
 	pq_flush();
 
-	/*
-	 * Initialize position to the last ack'ed one, then the xlog records begin
-	 * to be shipped from that position.
-	 */
-	logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
-												 false,
-												 logical_read_xlog_page,
-												 WalSndPrepareWrite,
-												 WalSndWriteData,
-												 WalSndUpdateProgress);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
-- 
GitLab