From bbd8550bce146f86e5e883f1232292a975c314fb Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Wed, 1 Feb 2017 13:42:41 -0500
Subject: [PATCH] Refactor other replication commands to use DestRemoteSimple.

Commit a84069d9350400c860d5e932b50dfd337aa407b0 added a new type of
DestReceiver to avoid duplicating the existing code for the SHOW
command, but it turns out we can leverage that new DestReceiver
type in a few more places, saving some code.

Michael Paquier, reviewed by Andres Freund and by me.

Discussion: http://postgr.es/m/CAB7nPqSdFOQC0evc0r1nJeQyGBqjBrR41MC4rcMqUUpoJaZbtQ%40mail.gmail.com
Discussion: http://postgr.es/m/CAB7nPqT2K4XFT1JgqufFBjsOc-NUKXg5qBDucHPMbk6Xi1kYaA@mail.gmail.com
---
 src/backend/access/common/printsimple.c |  21 ++
 src/backend/access/common/tupdesc.c     |   8 +
 src/backend/replication/walsender.c     | 247 ++++++++----------------
 3 files changed, 114 insertions(+), 162 deletions(-)

diff --git a/src/backend/access/common/printsimple.c b/src/backend/access/common/printsimple.c
index 420de65e207..5fe1c72da83 100644
--- a/src/backend/access/common/printsimple.c
+++ b/src/backend/access/common/printsimple.c
@@ -22,6 +22,7 @@
 #include "catalog/pg_type.h"
 #include "fmgr.h"
 #include "libpq/pqformat.h"
+#include "utils/builtins.h"
 
 /*
  * At startup time, send a RowDescription message.
@@ -99,6 +100,26 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
 				}
 				break;
 
+			case INT4OID:
+				{
+					int32	num = DatumGetInt32(value);
+					char	str[12];	/* sign, 10 digits and '\0' */
+
+					pg_ltoa(num, str);
+					pq_sendcountedtext(&buf, str, strlen(str), false);
+				}
+				break;
+
+			case INT8OID:
+				{
+					int64	num = DatumGetInt64(value);
+					char	str[23];	/* sign, 21 digits and '\0' */
+
+					pg_lltoa(num, str);
+					pq_sendcountedtext(&buf, str, strlen(str), false);
+				}
+				break;
+
 			default:
 				elog(ERROR, "unsupported type OID: %u", attr->atttypid);
 		}
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
index 083c0303dcf..4e2ebe1ae7e 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -629,6 +629,14 @@ TupleDescInitBuiltinEntry(TupleDesc desc,
 			att->attstorage = 'p';
 			att->attcollation = InvalidOid;
 			break;
+
+		case INT8OID:
+			att->attlen = 8;
+			att->attbyval = FLOAT8PASSBYVAL;
+			att->attalign = 'd';
+			att->attstorage = 'p';
+			att->attcollation = InvalidOid;
+			break;
 	}
 }
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5909b7dd8c5..76f09fbdbf2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -302,13 +302,15 @@ WalSndShutdown(void)
 static void
 IdentifySystem(void)
 {
-	StringInfoData buf;
 	char		sysid[32];
-	char		tli[11];
 	char		xpos[MAXFNAMELEN];
 	XLogRecPtr	logptr;
 	char	   *dbname = NULL;
-	Size		len;
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	Datum		values[4];
+	bool		nulls[4];
 
 	/*
 	 * Reply with a result set with one row, four columns. First col is system
@@ -328,8 +330,6 @@ IdentifySystem(void)
 	else
 		logptr = GetFlushRecPtr();
 
-	snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
-
 	snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
 	if (MyDatabaseId != InvalidOid)
@@ -346,79 +346,42 @@ IdentifySystem(void)
 		MemoryContextSwitchTo(cur);
 	}
 
-	/* Send a RowDescription message */
-	pq_beginmessage(&buf, 'T');
-	pq_sendint(&buf, 4, 2);		/* 4 fields */
-
-	/* first field */
-	pq_sendstring(&buf, "systemid");	/* col name */
-	pq_sendint(&buf, 0, 4);		/* table oid */
-	pq_sendint(&buf, 0, 2);		/* attnum */
-	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
-	pq_sendint(&buf, -1, 2);	/* typlen */
-	pq_sendint(&buf, 0, 4);		/* typmod */
-	pq_sendint(&buf, 0, 2);		/* format code */
-
-	/* second field */
-	pq_sendstring(&buf, "timeline");	/* col name */
-	pq_sendint(&buf, 0, 4);		/* table oid */
-	pq_sendint(&buf, 0, 2);		/* attnum */
-	pq_sendint(&buf, INT4OID, 4);		/* type oid */
-	pq_sendint(&buf, 4, 2);		/* typlen */
-	pq_sendint(&buf, 0, 4);		/* typmod */
-	pq_sendint(&buf, 0, 2);		/* format code */
-
-	/* third field */
-	pq_sendstring(&buf, "xlogpos");		/* col name */
-	pq_sendint(&buf, 0, 4);		/* table oid */
-	pq_sendint(&buf, 0, 2);		/* attnum */
-	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
-	pq_sendint(&buf, -1, 2);	/* typlen */
-	pq_sendint(&buf, 0, 4);		/* typmod */
-	pq_sendint(&buf, 0, 2);		/* format code */
+	dest = CreateDestReceiver(DestRemoteSimple);
+	MemSet(nulls, false, sizeof(nulls));
 
-	/* fourth field */
-	pq_sendstring(&buf, "dbname");		/* col name */
-	pq_sendint(&buf, 0, 4);		/* table oid */
-	pq_sendint(&buf, 0, 2);		/* attnum */
-	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
-	pq_sendint(&buf, -1, 2);	/* typlen */
-	pq_sendint(&buf, 0, 4);		/* typmod */
-	pq_sendint(&buf, 0, 2);		/* format code */
-	pq_endmessage(&buf);
+	/* need a tuple descriptor representing four columns */
+	tupdesc = CreateTemplateTupleDesc(4, false);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
+							  INT4OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
+							  TEXTOID, -1, 0);
 
-	/* Send a DataRow message */
-	pq_beginmessage(&buf, 'D');
-	pq_sendint(&buf, 4, 2);		/* # of columns */
+	/* prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc);
 
 	/* column 1: system identifier */
-	len = strlen(sysid);
-	pq_sendint(&buf, len, 4);
-	pq_sendbytes(&buf, (char *) &sysid, len);
+	values[0] = CStringGetTextDatum(sysid);
 
 	/* column 2: timeline */
-	len = strlen(tli);
-	pq_sendint(&buf, len, 4);
-	pq_sendbytes(&buf, (char *) tli, len);
+	values[1] = Int32GetDatum(ThisTimeLineID);
 
 	/* column 3: xlog position */
-	len = strlen(xpos);
-	pq_sendint(&buf, len, 4);
-	pq_sendbytes(&buf, (char *) xpos, len);
+	values[2] = CStringGetTextDatum(xpos);
 
 	/* column 4: database name, or NULL if none */
 	if (dbname)
-	{
-		len = strlen(dbname);
-		pq_sendint(&buf, len, 4);
-		pq_sendbytes(&buf, (char *) dbname, len);
-	}
+		values[3] = CStringGetTextDatum(dbname);
 	else
-	{
-		pq_sendint(&buf, -1, 4);
-	}
+		nulls[3] = true;
 
-	pq_endmessage(&buf);
+	/* send it to dest */
+	do_tup_output(tstate, values, nulls);
+
+	end_tup_output(tstate);
 }
 
 
@@ -695,54 +658,41 @@ StartReplication(StartReplicationCmd *cmd)
 	 */
 	if (sendTimeLineIsHistoric)
 	{
-		char		tli_str[11];
 		char		startpos_str[8 + 1 + 8 + 1];
-		Size		len;
+		DestReceiver *dest;
+		TupOutputState *tstate;
+		TupleDesc	tupdesc;
+		Datum		values[2];
+		bool		nulls[2];
 
-		snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
 		snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
 				 (uint32) (sendTimeLineValidUpto >> 32),
 				 (uint32) sendTimeLineValidUpto);
 
-		pq_beginmessage(&buf, 'T');		/* RowDescription */
-		pq_sendint(&buf, 2, 2); /* 2 fields */
-
-		/* Field header */
-		pq_sendstring(&buf, "next_tli");
-		pq_sendint(&buf, 0, 4); /* table oid */
-		pq_sendint(&buf, 0, 2); /* attnum */
+		dest = CreateDestReceiver(DestRemoteSimple);
+		MemSet(nulls, false, sizeof(nulls));
 
 		/*
+		 * Need a tuple descriptor representing two columns.
 		 * int8 may seem like a surprising data type for this, but in theory
 		 * int4 would not be wide enough for this, as TimeLineID is unsigned.
 		 */
-		pq_sendint(&buf, INT8OID, 4);	/* type oid */
-		pq_sendint(&buf, -1, 2);
-		pq_sendint(&buf, 0, 4);
-		pq_sendint(&buf, 0, 2);
-
-		pq_sendstring(&buf, "next_tli_startpos");
-		pq_sendint(&buf, 0, 4); /* table oid */
-		pq_sendint(&buf, 0, 2); /* attnum */
-		pq_sendint(&buf, TEXTOID, 4);	/* type oid */
-		pq_sendint(&buf, -1, 2);
-		pq_sendint(&buf, 0, 4);
-		pq_sendint(&buf, 0, 2);
-		pq_endmessage(&buf);
+		tupdesc = CreateTemplateTupleDesc(2, false);
+		TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
+								  INT8OID, -1, 0);
+		TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
+								  TEXTOID, -1, 0);
 
-		/* Data row */
-		pq_beginmessage(&buf, 'D');
-		pq_sendint(&buf, 2, 2); /* number of columns */
+		/* prepare for projection of tuple */
+		tstate = begin_tup_output_tupdesc(dest, tupdesc);
 
-		len = strlen(tli_str);
-		pq_sendint(&buf, len, 4);		/* length */
-		pq_sendbytes(&buf, tli_str, len);
+		values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
+		values[1] = CStringGetTextDatum(startpos_str);
 
-		len = strlen(startpos_str);
-		pq_sendint(&buf, len, 4);		/* length */
-		pq_sendbytes(&buf, startpos_str, len);
+		/* send it to dest */
+		do_tup_output(tstate, values, nulls);
 
-		pq_endmessage(&buf);
+		end_tup_output(tstate);
 	}
 
 	/* Send CommandComplete message */
@@ -790,8 +740,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 {
 	const char *snapshot_name = NULL;
 	char		xpos[MAXFNAMELEN];
-	StringInfoData buf;
-	Size		len;
+	char	   *slot_name;
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	Datum		values[4];
+	bool		nulls[4];
 
 	Assert(!MyReplicationSlot);
 
@@ -868,82 +822,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 			 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
 			 (uint32) MyReplicationSlot->data.confirmed_flush);
 
-	pq_beginmessage(&buf, 'T');
-	pq_sendint(&buf, 4, 2);		/* 4 fields */
-
-	/* first field: slot name */
-	pq_sendstring(&buf, "slot_name");	/* col name */
-	pq_sendint(&buf, 0, 4);		/* table oid */
-	pq_sendint(&buf, 0, 2);		/* attnum */
-	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
-	pq_sendint(&buf, -1, 2);	/* typlen */
-	pq_sendint(&buf, 0, 4);		/* typmod */
-	pq_sendint(&buf, 0, 2);		/* format code */
-
-	/* second field: LSN at which we became consistent */
-	pq_sendstring(&buf, "consistent_point");	/* col name */
-	pq_sendint(&buf, 0, 4);		/* table oid */
-	pq_sendint(&buf, 0, 2);		/* attnum */
-	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
-	pq_sendint(&buf, -1, 2);	/* typlen */
-	pq_sendint(&buf, 0, 4);		/* typmod */
-	pq_sendint(&buf, 0, 2);		/* format code */
-
-	/* third field: exported snapshot's name */
-	pq_sendstring(&buf, "snapshot_name");		/* col name */
-	pq_sendint(&buf, 0, 4);		/* table oid */
-	pq_sendint(&buf, 0, 2);		/* attnum */
-	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
-	pq_sendint(&buf, -1, 2);	/* typlen */
-	pq_sendint(&buf, 0, 4);		/* typmod */
-	pq_sendint(&buf, 0, 2);		/* format code */
-
-	/* fourth field: output plugin */
-	pq_sendstring(&buf, "output_plugin");		/* col name */
-	pq_sendint(&buf, 0, 4);		/* table oid */
-	pq_sendint(&buf, 0, 2);		/* attnum */
-	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
-	pq_sendint(&buf, -1, 2);	/* typlen */
-	pq_sendint(&buf, 0, 4);		/* typmod */
-	pq_sendint(&buf, 0, 2);		/* format code */
+	dest = CreateDestReceiver(DestRemoteSimple);
+	MemSet(nulls, false, sizeof(nulls));
 
-	pq_endmessage(&buf);
-
-	/* Send a DataRow message */
-	pq_beginmessage(&buf, 'D');
-	pq_sendint(&buf, 4, 2);		/* # of columns */
+	/*
+	 * Need a tuple descriptor representing four columns:
+	 * - first field: the slot name
+	 * - second field: LSN at which we became consistent
+	 * - third field: exported snapshot's name
+	 * - fourth field: output plugin
+	 */
+	tupdesc = CreateTemplateTupleDesc(4, false);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
+							  TEXTOID, -1, 0);
+
+	/* prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc);
 
 	/* slot_name */
-	len = strlen(NameStr(MyReplicationSlot->data.name));
-	pq_sendint(&buf, len, 4);	/* col1 len */
-	pq_sendbytes(&buf, NameStr(MyReplicationSlot->data.name), len);
+	slot_name = NameStr(MyReplicationSlot->data.name);
+	values[0] = CStringGetTextDatum(slot_name);
 
 	/* consistent wal location */
-	len = strlen(xpos);
-	pq_sendint(&buf, len, 4);
-	pq_sendbytes(&buf, xpos, len);
+	values[1] = CStringGetTextDatum(xpos);
 
 	/* snapshot name, or NULL if none */
 	if (snapshot_name != NULL)
-	{
-		len = strlen(snapshot_name);
-		pq_sendint(&buf, len, 4);
-		pq_sendbytes(&buf, snapshot_name, len);
-	}
+		values[2] = CStringGetTextDatum(snapshot_name);
 	else
-		pq_sendint(&buf, -1, 4);
+		nulls[2] = true;
 
 	/* plugin, or NULL if none */
 	if (cmd->plugin != NULL)
-	{
-		len = strlen(cmd->plugin);
-		pq_sendint(&buf, len, 4);
-		pq_sendbytes(&buf, cmd->plugin, len);
-	}
+		values[3] = CStringGetTextDatum(cmd->plugin);
 	else
-		pq_sendint(&buf, -1, 4);
+		nulls[3] = true;
 
-	pq_endmessage(&buf);
+	/* send it to dest */
+	do_tup_output(tstate, values, nulls);
+	end_tup_output(tstate);
 
 	ReplicationSlotRelease();
 }
-- 
GitLab