From e04a9ccd2ccd6e31cc4af6b08257a0a186d0fce8 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Thu, 12 Jun 2014 13:23:46 +0200
Subject: [PATCH] Consistency improvements for slot and decoding code.

Change the order of checks in similar functions to be the same; remove
a parameter that's not needed anymore; rename a memory context and
expand a couple of comments.

Per review comments from Amit Kapila
---
 src/backend/access/transam/xlog.c         |  2 +-
 src/backend/replication/logical/logical.c |  2 +-
 src/backend/replication/slot.c            |  2 +-
 src/backend/replication/slotfuncs.c       | 17 +++++++++++------
 src/backend/replication/walsender.c       |  9 +++++++--
 src/include/replication/slot.h            |  2 +-
 6 files changed, 22 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9eca63cbbfa..3f92482b42d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6270,7 +6270,7 @@ StartupXLOG(void)
 	 * Initialize replication slots, before there's a chance to remove
 	 * required resources.
 	 */
-	StartupReplicationSlots(checkPoint.redo);
+	StartupReplicationSlots();
 
 	/*
 	 * Startup logical state, needs to be setup now so we have proper data
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index b82580fbcdf..9eb5cd5ee4d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -125,7 +125,7 @@ StartupDecodingContext(List *output_plugin_options,
 	slot = MyReplicationSlot;
 
 	context = AllocSetContextCreate(CurrentMemoryContext,
-									"Changeset Extraction Context",
+									"Logical Decoding Context",
 									ALLOCSET_DEFAULT_MINSIZE,
 									ALLOCSET_DEFAULT_INITSIZE,
 									ALLOCSET_DEFAULT_MAXSIZE);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index ee0c7c07a97..5671ac1d14f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -829,7 +829,7 @@ CheckPointReplicationSlots(void)
  * needs to be run before we start crash recovery.
  */
 void
-StartupReplicationSlots(XLogRecPtr checkPointRedo)
+StartupReplicationSlots(void)
 {
 	DIR		   *replication_dir;
 	struct dirent *replication_de;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index dc94f504ee2..bd4701f97df 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -46,13 +46,15 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	HeapTuple	tuple;
 	Datum		result;
 
-	check_permissions();
-
-	CheckSlotRequirements();
+	Assert(!MyReplicationSlot);
 
 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
 		elog(ERROR, "return type must be a row type");
 
+	check_permissions();
+
+	CheckSlotRequirements();
+
 	/* acquire replication slot, this will check for conflicting names */
 	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
 
@@ -87,6 +89,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	Datum		values[2];
 	bool		nulls[2];
 
+	Assert(!MyReplicationSlot);
+
 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
 		elog(ERROR, "return type must be a row type");
 
@@ -94,10 +98,11 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 
 	CheckLogicalDecodingRequirements();
 
-	Assert(!MyReplicationSlot);
-
 	/*
-	 * Acquire a logical decoding slot, this will check for conflicting names.
+	 * Acquire a logical decoding slot, this will check for conflicting
+	 * names. Initially create it as ephemeral - that allows us to nicely
+	 * handle errors during initialization because it'll get dropped if this
+	 * transaction fails. We'll make it persistent at the end.
 	 */
 	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 088ee2c0976..318979342eb 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -781,6 +781,11 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	else
 	{
 		CheckLogicalDecodingRequirements();
+		/*
+		 * Initially create the slot as ephemeral - that allows us to nicely
+		 * handle errors during initialization because it'll get dropped if
+		 * this transaction fails. We'll make it persistent at the end.
+		 */
 		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
 	}
 
@@ -1682,8 +1687,8 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * If we're using a replication slot we reserve the xmin via that,
 	 * otherwise via the walsender's PGXACT entry.
 	 *
-	 * XXX: It might make sense to introduce ephemeral slots and always use
-	 * the slot mechanism.
+	 * XXX: It might make sense to generalize the ephemeral slot concept and
+	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
 		PhysicalReplicationSlotNewXmin(feedbackXmin);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 341e829bbb3..c129a4a7718 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -164,7 +164,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 
-extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
+extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
-- 
GitLab