From d811c037cea2e929ee5823aab6e1bca0df8be245 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 21 Apr 2015 11:51:06 +0200
Subject: [PATCH] Add 'active_in' column to pg_replication_slots.

Right now it is visible whether a replication slot is active in any
session, but not in which.  Adding the active_in column, containing the
pid of the backend having acquired the slot, makes it much easier to
associate pg_replication_slots entries with the corresponding
pg_stat_replication/pg_stat_activity row.

This should have been done from the start, but I (Andres) dropped the
ball there somehow.

Author: Craig Ringer, revised by me Discussion:
CAMsr+YFKgZca5_7_ouaMWxA5PneJC9LNViPzpDHusaPhU9pA7g@mail.gmail.com
---
 contrib/test_decoding/expected/ddl.out |  4 ++--
 doc/src/sgml/catalogs.sgml             | 10 +++++++++
 doc/src/sgml/logicaldecoding.sgml      |  8 +++----
 src/backend/catalog/system_views.sql   |  1 +
 src/backend/replication/slot.c         | 30 ++++++++++++++------------
 src/backend/replication/slotfuncs.c    | 13 +++++++----
 src/include/catalog/pg_proc.h          |  2 +-
 src/include/replication/slot.h         |  4 ++--
 src/test/regress/expected/rules.out    |  3 ++-
 9 files changed, 47 insertions(+), 28 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 780120d7314..838483ee5c0 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -603,7 +603,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn 
------------+--------+-----------+--------+----------+--------+------+--------------+-------------
+ slot_name | plugin | slot_type | datoid | database | active | active_in | xmin | catalog_xmin | restart_lsn 
+-----------+--------+-----------+--------+----------+--------+-----------+------+--------------+-------------
 (0 rows)
 
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index d0b78f27827..98847354bd0 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -5400,6 +5400,16 @@
       <entry>True if this slot is currently actively being used</entry>
      </row>
 
+     <row>
+      <entry><structfield>active_in</structfield></entry>
+      <entry><type>integer</type></entry>
+      <entry></entry>
+      <entry>The process ID of the session using this slot if the slot
+       is currently actively being used. <literal>NULL</literal> if
+       inactive.
+      </entry>
+     </row>
+
      <row>
       <entry><structfield>xmin</structfield></entry>
       <entry><type>xid</type></entry>
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 36505678526..0810a2d1f97 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -62,10 +62,10 @@ postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', '
  regression_slot | 0/16B1970
 (1 row)
 
-postgres=# SELECT * FROM pg_replication_slots;
-    slot_name    |    plugin     | slot_type | datoid | database | active |  xmin  | catalog_xmin | restart_lsn
------------------+---------------+-----------+--------+----------+--------+--------+--------------+-------------
- regression_slot | test_decoding | logical   |  12052 | postgres | f      |        |          684 | 0/16A4408
+postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots;
+    slot_name    |    plugin     | slot_type | database | active | restart_lsn
+-----------------+---------------+-----------+----------+--------+-------------
+ regression_slot | test_decoding | logical   | postgres | f      | 0/16A4408
 (1 row)
 
 postgres=# -- There are no changes to see yet
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a4fd88fa98d..75ff231de55 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -665,6 +665,7 @@ CREATE VIEW pg_replication_slots AS
             L.datoid,
             D.datname AS database,
             L.active,
+            L.active_in,
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 16ea80bc3a0..fa1f07b3f3e 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -262,7 +262,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * be doing that.  So it's safe to initialize the slot.
 	 */
 	Assert(!slot->in_use);
-	Assert(!slot->active);
+	Assert(slot->active_pid == 0);
 	slot->data.persistency = persistency;
 	slot->data.xmin = InvalidTransactionId;
 	slot->effective_xmin = InvalidTransactionId;
@@ -291,8 +291,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 		volatile ReplicationSlot *vslot = slot;
 
 		SpinLockAcquire(&slot->mutex);
-		Assert(!vslot->active);
-		vslot->active = true;
+		Assert(vslot->active_pid == 0);
+		vslot->active_pid = MyProcPid;
 		SpinLockRelease(&slot->mutex);
 		MyReplicationSlot = slot;
 	}
@@ -314,7 +314,7 @@ ReplicationSlotAcquire(const char *name)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
-	bool		active = false;
+	int			active_pid = 0;
 
 	Assert(MyReplicationSlot == NULL);
 
@@ -331,8 +331,9 @@ ReplicationSlotAcquire(const char *name)
 			volatile ReplicationSlot *vslot = s;
 
 			SpinLockAcquire(&s->mutex);
-			active = vslot->active;
-			vslot->active = true;
+			active_pid = vslot->active_pid;
+			if (active_pid == 0)
+				vslot->active_pid = MyProcPid;
 			SpinLockRelease(&s->mutex);
 			slot = s;
 			break;
@@ -345,10 +346,11 @@ ReplicationSlotAcquire(const char *name)
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
-	if (active)
+	if (active_pid != 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_IN_USE),
-				 errmsg("replication slot \"%s\" is already active", name)));
+				 errmsg("replication slot \"%s\" is already active for pid %d",
+						name, active_pid)));
 
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = slot;
@@ -363,7 +365,7 @@ ReplicationSlotRelease(void)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 
-	Assert(slot != NULL && slot->active);
+	Assert(slot != NULL && slot->active_pid != 0);
 
 	if (slot->data.persistency == RS_EPHEMERAL)
 	{
@@ -380,7 +382,7 @@ ReplicationSlotRelease(void)
 		volatile ReplicationSlot *vslot = slot;
 
 		SpinLockAcquire(&slot->mutex);
-		vslot->active = false;
+		vslot->active_pid = 0;
 		SpinLockRelease(&slot->mutex);
 	}
 
@@ -460,7 +462,7 @@ ReplicationSlotDropAcquired(void)
 		bool		fail_softly = slot->data.persistency == RS_EPHEMERAL;
 
 		SpinLockAcquire(&slot->mutex);
-		vslot->active = false;
+		vslot->active_pid = 0;
 		SpinLockRelease(&slot->mutex);
 
 		ereport(fail_softly ? WARNING : ERROR,
@@ -477,7 +479,7 @@ ReplicationSlotDropAcquired(void)
 	 * scanning the array.
 	 */
 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
-	slot->active = false;
+	slot->active_pid = 0;
 	slot->in_use = false;
 	LWLockRelease(ReplicationSlotControlLock);
 
@@ -749,7 +751,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 		/* count slots with spinlock held */
 		SpinLockAcquire(&s->mutex);
 		(*nslots)++;
-		if (s->active)
+		if (s->active_pid != 0)
 			(*nactive)++;
 		SpinLockRelease(&s->mutex);
 	}
@@ -1227,7 +1229,7 @@ RestoreSlotFromDisk(const char *name)
 		slot->candidate_restart_valid = InvalidXLogRecPtr;
 
 		slot->in_use = true;
-		slot->active = false;
+		slot->active_pid = 0;
 
 		restored = true;
 		break;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f31925d862b..3d9aadbd839 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 8
+#define PG_GET_REPLICATION_SLOTS_COLS 9
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -206,7 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		TransactionId xmin;
 		TransactionId catalog_xmin;
 		XLogRecPtr	restart_lsn;
-		bool		active;
+		pid_t		active_pid;
 		Oid			database;
 		NameData	slot_name;
 		NameData	plugin;
@@ -227,7 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			namecpy(&slot_name, &slot->data.name);
 			namecpy(&plugin, &slot->data.plugin);
 
-			active = slot->active;
+			active_pid = slot->active_pid;
 		}
 		SpinLockRelease(&slot->mutex);
 
@@ -251,7 +251,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			values[i++] = database;
 
-		values[i++] = BoolGetDatum(active);
+		values[i++] = BoolGetDatum(active_pid != 0);
+
+		if (active_pid != 0)
+			values[i++] = Int32GetDatum(active_pid);
+		else
+			nulls[i++] = true;
 
 		if (xmin != InvalidTransactionId)
 			values[i++] = TransactionIdGetDatum(xmin);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 619d9969cab..de18a5ad3ca 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5106,7 +5106,7 @@ DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
-DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,28,28,3220}" "{o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_in,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
 DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a4001360c43..78cff07abf8 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -84,8 +84,8 @@ typedef struct ReplicationSlot
 	/* is this slot defined */
 	bool		in_use;
 
-	/* is somebody streaming out changes for this slot */
-	bool		active;
+	/* Who is streaming out changes for this slot? 0 in unused slots. */
+	pid_t		active_pid;
 
 	/* any outstanding modifications? */
 	bool		just_dirtied;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 71fa44a6bea..1577ff0e297 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1396,10 +1396,11 @@ pg_replication_slots| SELECT l.slot_name,
     l.datoid,
     d.datname AS database,
     l.active,
+    l.active_in,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, xmin, catalog_xmin, restart_lsn)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_in, xmin, catalog_xmin, restart_lsn)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
GitLab