From 6fcd88511f8e69e38defb1272e0042ef4bab2feb Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 11 Aug 2015 12:34:31 +0200
Subject: [PATCH] Allow pg_create_physical_replication_slot() to reserve WAL.

When creating a physical slot it's often useful to immediately reserve
the current WAL position instead of only doing after the first feedback
message arrives. That e.g. allows slots to guarantee that all the WAL
for a base backup will be available afterwards.

Logical slots already have to reserve WAL during creation, so generalize
that logic into being usable for both physical and logical slots.

Catversion bump because of the new parameter.

Author: Gurjeet Singh
Reviewed-By: Andres Freund
Discussion: CABwTF4Wh_dBCzTU=49pFXR6coR4NW1ynb+vBqT+Po=7fuq5iCw@mail.gmail.com
---
 doc/src/sgml/func.sgml                    |  8 ++-
 src/backend/catalog/system_views.sql      |  7 +++
 src/backend/replication/logical/logical.c | 47 +--------------
 src/backend/replication/slot.c            | 71 +++++++++++++++++++++++
 src/backend/replication/slotfuncs.c       | 21 ++++++-
 src/include/catalog/catversion.h          |  2 +-
 src/include/catalog/pg_proc.h             |  2 +-
 src/include/replication/slot.h            |  1 +
 8 files changed, 107 insertions(+), 52 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 4fcc4fe5aaa..b1df55d7f62 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -17211,7 +17211,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_create_physical_replication_slot</primary>
         </indexterm>
-        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type>)</function></literal>
+        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type><optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal>
        </entry>
        <entry>
         (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
@@ -17221,7 +17221,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <parameter>slot_name</parameter>. Streaming changes from a physical slot
         is only possible with the streaming-replication protocol - see <xref
         linkend="protocol-replication">. Corresponds to the replication protocol
-        command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
+        command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. The optional
+        second parameter,  when <literal>true</>, specifies that the <acronym>LSN</>
+        for this replication slot be reserved immediately; the <acronym<LSN</>
+        is otherwise reserved on first connection from a streaming replication
+        client.
        </entry>
       </row>
       <row>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 3190c7f7e01..ccc030fd7fb 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -917,6 +917,13 @@ LANGUAGE INTERNAL
 VOLATILE ROWS 1000 COST 1000
 AS 'pg_logical_slot_peek_binary_changes';
 
+CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
+    IN slot_name name, IN immediately_reserve boolean DEFAULT false,
+    OUT slot_name name, OUT xlog_position pg_lsn)
+RETURNS RECORD
+LANGUAGE INTERNAL
+AS 'pg_create_physical_replication_slot';
+
 CREATE OR REPLACE FUNCTION
   make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
                 days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5411e599eb2..5a07e1d9a69 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -250,52 +250,7 @@ CreateInitDecodingContext(char *plugin,
 	StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
 	SpinLockRelease(&slot->mutex);
 
-	/*
-	 * The replication slot mechanism is used to prevent removal of required
-	 * WAL. As there is no interlock between this and checkpoints required WAL
-	 * could be removed before ReplicationSlotsComputeRequiredLSN() has been
-	 * called to prevent that. In the very unlikely case that this happens
-	 * we'll just retry.
-	 */
-	while (true)
-	{
-		XLogSegNo	segno;
-
-		/*
-		 * Let's start with enough information if we can, so log a standby
-		 * snapshot and start decoding at exactly that position.
-		 */
-		if (!RecoveryInProgress())
-		{
-			XLogRecPtr	flushptr;
-
-			/* start at current insert position */
-			slot->data.restart_lsn = GetXLogInsertRecPtr();
-
-			/* make sure we have enough information to start */
-			flushptr = LogStandbySnapshot();
-
-			/* and make sure it's fsynced to disk */
-			XLogFlush(flushptr);
-		}
-		else
-			slot->data.restart_lsn = GetRedoRecPtr();
-
-		/* prevent WAL removal as fast as possible */
-		ReplicationSlotsComputeRequiredLSN();
-
-		/*
-		 * If all required WAL is still there, great, otherwise retry. The
-		 * slot should prevent further removal of WAL, unless there's a
-		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
-		 * the new restart_lsn above, so normally we should never need to loop
-		 * more than twice.
-		 */
-		XLByteToSeg(slot->data.restart_lsn, segno);
-		if (XLogGetLastRemovedSegno() < segno)
-			break;
-	}
-
+	ReplicationSlotReserveWal();
 
 	/* ----
 	 * This is a bit tricky: We need to determine a safe xmin horizon to start
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1f013af8871..c66619cda29 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -40,6 +40,7 @@
 #include <sys/stat.h>
 
 #include "access/transam.h"
+#include "access/xlog_internal.h"
 #include "common/string.h"
 #include "miscadmin.h"
 #include "replication/slot.h"
@@ -781,6 +782,76 @@ CheckSlotRequirements(void)
 				 errmsg("replication slots can only be used if wal_level >= archive")));
 }
 
+/*
+ * Reserve WAL for the currently active slot.
+ *
+ * Compute and set restart_lsn in a manner that's appropriate for the type of
+ * the slot and concurrency safe.
+ */
+void
+ReplicationSlotReserveWal(void)
+{
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(slot != NULL);
+	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+
+	/*
+	 * The replication slot mechanism is used to prevent removal of required
+	 * WAL. As there is no interlock between this routine and checkpoints, WAL
+	 * segments could concurrently be removed when a now stale return value of
+	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
+	 * this happens we'll just retry.
+	 */
+	while (true)
+	{
+		XLogSegNo	segno;
+
+		/*
+		 * For logical slots log a standby snapshot and start logical decoding
+		 * at exactly that position. That allows the slot to start up more
+		 * quickly.
+		 *
+		 * That's not needed (or indeed helpful) for physical slots as they'll
+		 * start replay at the last logged checkpoint anyway. Instead return
+		 * the location of the last redo LSN. While that slightly increases
+		 * the chance that we have to retry, it's where a base backup has to
+		 * start replay at.
+		 */
+		if (!RecoveryInProgress() && SlotIsLogical(slot))
+		{
+			XLogRecPtr	flushptr;
+
+			/* start at current insert position */
+			slot->data.restart_lsn = GetXLogInsertRecPtr();
+
+			/* make sure we have enough information to start */
+			flushptr = LogStandbySnapshot();
+
+			/* and make sure it's fsynced to disk */
+			XLogFlush(flushptr);
+		}
+		else
+		{
+			slot->data.restart_lsn = GetRedoRecPtr();
+		}
+
+		/* prevent WAL removal as fast as possible */
+		ReplicationSlotsComputeRequiredLSN();
+
+		/*
+		 * If all required WAL is still there, great, otherwise retry. The
+		 * slot should prevent further removal of WAL, unless there's a
+		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
+		 * the new restart_lsn above, so normally we should never need to loop
+		 * more than twice.
+		 */
+		XLByteToSeg(slot->data.restart_lsn, segno);
+		if (XLogGetLastRemovedSegno() < segno)
+			break;
+	}
+}
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ecfcb0754bd..2dc68279900 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -40,6 +40,7 @@ Datum
 pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 {
 	Name		name = PG_GETARG_NAME(0);
+	bool 		immediately_reserve = PG_GETARG_BOOL(1);
 	Datum		values[2];
 	bool		nulls[2];
 	TupleDesc	tupdesc;
@@ -59,9 +60,25 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
-
 	nulls[0] = false;
-	nulls[1] = true;
+
+	if (immediately_reserve)
+	{
+		/* Reserve WAL as the user asked for it */
+		ReplicationSlotReserveWal();
+
+		/* Write this slot to disk */
+		ReplicationSlotMarkDirty();
+		ReplicationSlotSave();
+
+		values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
+		nulls[1] = false;
+	}
+	else
+	{
+		values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+		nulls[1] = true;
+	}
 
 	tuple = heap_form_tuple(tupdesc, values, nulls);
 	result = HeapTupleGetDatum(tuple);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 8cd67729879..b58fe464476 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	201508101
+#define CATALOG_VERSION_NO	201508111
 
 #endif
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 51639624a9e..ddf7c673710 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5193,7 +5193,7 @@ DATA(insert OID = 3473 (  spg_range_quad_leaf_consistent	PGNSP PGUID 12 1 0 0 0
 DESCR("SP-GiST support for quad tree over range");
 
 /* replication slots */
-DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 367ef0a38dc..20dd7a283c9 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -166,6 +166,7 @@ extern void ReplicationSlotMarkDirty(void);
 
 /* misc stuff */
 extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern void ReplicationSlotReserveWal(void);
 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
-- 
GitLab