From 721f7bd3cbccaf8c07cad2707826b83f84694832 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Tue, 29 Nov 2016 12:18:31 -0500
Subject: [PATCH] libpq: Add target_session_attrs parameter.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Commit 274bb2b3857cc987cfa21d14775cae9b0dababa5 made it possible to
specify multiple IPs in a connection string, but that's not good
enough for the case where you have a read-write master and a bunch of
read-only standbys and want to connect to whichever server is the
master at the current time.  This commit allows that, by making it
possible to specify target_session_attrs=read-write as a connection
parameter.

There was extensive discussion of the best name for the connection
parameter and its values as well as the best way to distinguish master
and standbys.  For now, adopt the same solution as JDBC: if the user
wants a read-write connection, issue 'show transaction_read_only' and
rejection the connection if the result is 'on'.  In the future, we
could add additional values of this new target_session_attrs parameter
that issue different queries; or we might have some way of
distinguishing the server type without resorting to an SQL query; but
right now, we have this, and that's (hopefully) a good start.

Victor Wagner and Mithun Cy.  Design review by Álvaro Herrera, Catalin
Iacob, Takayuki Tsunakawa, and Craig Ringer; code review by me.  I
changed Mithun's patch to skip all remaining IPs for a host if we
reject a connection based on this new parameter, rewrote the
documentation, and did some other cosmetic cleanup.

Discussion: http://postgr.es/m/CAD__OuhqPRGpcsfwPHz_PDqAGkoqS1UvnUnOnAB-LBWBW=wu4A@mail.gmail.com
---
 doc/src/sgml/libpq.sgml           |  29 +++-
 src/interfaces/libpq/fe-connect.c | 242 ++++++++++++++++++++++++------
 src/interfaces/libpq/libpq-fe.h   |   4 +-
 src/interfaces/libpq/libpq-int.h  |   3 +
 4 files changed, 227 insertions(+), 51 deletions(-)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 0f375bf5f25..2620eec033d 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -811,7 +811,7 @@ postgresql://localhost/mydb
 postgresql://user@localhost
 postgresql://user:secret@localhost
 postgresql://other@localhost/otherdb?connect_timeout=10&amp;application_name=myapp
-postgresql://host1:123,host2:456/somedb
+postgresql://host1:123,host2:456/somedb?target_session_attrs=any&amp;application_name=myapp
 </programlisting>
     Components of the hierarchical part of the <acronym>URI</acronym> can also
     be given as parameters.  For example:
@@ -1386,6 +1386,23 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry id="libpq-connect-target-session-attrs" xreflabel="target_session_attrs">
+      <term><literal>target_session_attrs</literal></term>
+      <listitem>
+       <para>
+        If this parameter is set to <literal>read-write</literal>, only a
+        connection in which read-write transactions are accepted by default
+        is considered acceptable.  The query
+        <literal>show transaction_read_only</literal> will be sent upon any
+        successful connection; if it returns <literal>on</>, the connection
+        will be closed.  If multiple hosts were specified in the connection
+        string, any remaining servers will be tried just as if the connection
+        attempt had failed.  The default value of this parameter,
+        <literal>any</>, regards all connections as acceptable.
+      </para>
+      </listitem>
+    </varlistentry>
     </variablelist>
    </para>
   </sect2>
@@ -7069,6 +7086,16 @@ myEventProc(PGEventId evtId, void *evtInfo, void *passThrough)
       linkend="libpq-connect-client-encoding"> connection parameter.
      </para>
     </listitem>
+
+    <listitem>
+     <para>
+      <indexterm>
+       <primary><envar>PGTARGETSESSIONATTRS</envar></primary>
+      </indexterm>
+      <envar>PGTARGETSESSIONATTRS</envar> behaves the same as the <xref
+      linkend="libpq-connect-target-session-attrs"> connection parameter.
+     </para>
+    </listitem>
    </itemizedlist>
   </para>
 
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 3e9c45bc406..cd96ddb2f07 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -108,6 +108,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
 #define DefaultOption	""
 #define DefaultAuthtype		  ""
 #define DefaultPassword		  ""
+#define DefaultTargetSessionAttrs	"any"
 #ifdef USE_SSL
 #define DefaultSSLMode "prefer"
 #else
@@ -300,6 +301,11 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"target_session_attrs", "PGTARGETSESSIONATTRS",
+		DefaultTargetSessionAttrs, NULL,
+		"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
+	offsetof(struct pg_conn, target_session_attrs)},
+
 	/* Terminating entry --- MUST BE LAST */
 	{NULL, NULL, NULL, NULL,
 	NULL, NULL, 0}
@@ -336,6 +342,8 @@ static PGconn *makeEmptyPGconn(void);
 static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions);
 static void freePGconn(PGconn *conn);
 static void closePGconn(PGconn *conn);
+static void release_all_addrinfo(PGconn *conn);
+static void sendTerminateConn(PGconn *conn);
 static PQconninfoOption *conninfo_init(PQExpBuffer errorMessage);
 static PQconninfoOption *parse_connection_string(const char *conninfo,
 						PQExpBuffer errorMessage, bool use_defaults);
@@ -1025,6 +1033,22 @@ connectOptions2(PGconn *conn)
 			goto oom_error;
 	}
 
+	/*
+	 * Validate target_session_attrs option.
+	 */
+	if (conn->target_session_attrs)
+	{
+		if (strcmp(conn->target_session_attrs, "any") != 0
+			&& strcmp(conn->target_session_attrs, "read-write") != 0)
+		{
+			conn->status = CONNECTION_BAD;
+			printfPQExpBuffer(&conn->errorMessage,
+							libpq_gettext("invalid target_session_attrs value: \"%s\"\n"),
+							  conn->target_session_attrs);
+			return false;
+		}
+	}
+
 	/*
 	 * Only if we get this far is it appropriate to try to connect. (We need a
 	 * state flag, rather than just the boolean result of this function, in
@@ -1814,6 +1838,7 @@ PQconnectPoll(PGconn *conn)
 			/* Special cases: proceed without waiting. */
 		case CONNECTION_SSL_STARTUP:
 		case CONNECTION_NEEDED:
+		case CONNECTION_CHECK_WRITABLE:
 			break;
 
 		default:
@@ -2752,27 +2777,6 @@ keep_going:						/* We will come back to here until there is
 					goto error_return;
 				}
 
-				/* We can release the address lists now. */
-				if (conn->connhost != NULL)
-				{
-					int		i;
-
-					for (i = 0; i < conn->nconnhost; ++i)
-					{
-						int		family = AF_UNSPEC;
-
-#ifdef HAVE_UNIX_SOCKETS
-						if (conn->connhost[i].type == CHT_UNIX_SOCKET)
-							family = AF_UNIX;
-#endif
-
-						pg_freeaddrinfo_all(family,
-											conn->connhost[i].addrlist);
-						conn->connhost[i].addrlist = NULL;
-					}
-				}
-				conn->addr_cur = NULL;
-
 				/* Fire up post-connection housekeeping if needed */
 				if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
 				{
@@ -2782,7 +2786,24 @@ keep_going:						/* We will come back to here until there is
 					return PGRES_POLLING_WRITING;
 				}
 
-				/* Otherwise, we are open for business! */
+				/*
+				 * If a read-write connection is required, see if we have one.
+				 */
+				if (conn->target_session_attrs != NULL &&
+					strcmp(conn->target_session_attrs, "read-write") == 0)
+				{
+					conn->status = CONNECTION_OK;
+					if (!PQsendQuery(conn,
+									 "show transaction_read_only"))
+						goto error_return;
+					conn->status = CONNECTION_CHECK_WRITABLE;
+					return PGRES_POLLING_READING;
+				}
+
+				/* We can release the address lists now. */
+				release_all_addrinfo(conn);
+
+				/* We are open for business! */
 				conn->status = CONNECTION_OK;
 				return PGRES_POLLING_OK;
 			}
@@ -2814,10 +2835,109 @@ keep_going:						/* We will come back to here until there is
 					goto error_return;
 			}
 
+			/*
+			 * If a read-write connection is requisted check for same.
+			 */
+			if (conn->target_session_attrs != NULL &&
+				strcmp(conn->target_session_attrs, "read-write") == 0)
+			{
+				conn->status = CONNECTION_OK;
+				if (!PQsendQuery(conn,
+								 "show transaction_read_only"))
+					goto error_return;
+				conn->status = CONNECTION_CHECK_WRITABLE;
+				return PGRES_POLLING_READING;
+			}
+
+			/* We can release the address lists now. */
+			release_all_addrinfo(conn);
+
 			/* We are open for business! */
 			conn->status = CONNECTION_OK;
 			return PGRES_POLLING_OK;
 
+		case CONNECTION_CHECK_WRITABLE:
+			{
+				conn->status = CONNECTION_OK;
+				if (!PQconsumeInput(conn))
+					goto error_return;
+
+				if (PQisBusy(conn))
+				{
+					conn->status = CONNECTION_CHECK_WRITABLE;
+					return PGRES_POLLING_READING;
+				}
+
+				res = PQgetResult(conn);
+				if (res && (PQresultStatus(res) == PGRES_TUPLES_OK) &&
+					PQntuples(res) == 1)
+				{
+					char	   *val;
+
+					val = PQgetvalue(res, 0, 0);
+					if (strncmp(val, "on", 2) == 0)
+					{
+						PQclear(res);
+
+						/* Not writable; close connection. */
+						appendPQExpBuffer(&conn->errorMessage,
+								   libpq_gettext("could not make a writable "
+												 "connection to server "
+												 "\"%s:%s\"\n"),
+										conn->connhost[conn->whichhost].host,
+									   conn->connhost[conn->whichhost].port);
+						conn->status = CONNECTION_OK;
+						sendTerminateConn(conn);
+						pqDropConnection(conn, true);
+
+						/* Skip any remaining addresses for this host. */
+						conn->addr_cur = NULL;
+						if (conn->whichhost + 1 < conn->nconnhost)
+						{
+							conn->status = CONNECTION_NEEDED;
+							goto keep_going;
+						}
+
+						/* No more addresses to try. So we fail. */
+						goto error_return;
+					}
+					PQclear(res);
+
+					/* We can release the address lists now. */
+					release_all_addrinfo(conn);
+
+					/* We are open for business! */
+					conn->status = CONNECTION_OK;
+					return PGRES_POLLING_OK;
+				}
+
+				/*
+				 * Something went wrong with "show transaction_read_only". We
+				 * should try next addresses.
+				 */
+				if (res)
+					PQclear(res);
+				appendPQExpBuffer(&conn->errorMessage,
+				  libpq_gettext("test \"show transaction_read_only\" failed "
+								" on \"%s:%s\" \n"),
+								  conn->connhost[conn->whichhost].host,
+								  conn->connhost[conn->whichhost].port);
+				conn->status = CONNECTION_OK;
+				sendTerminateConn(conn);
+				pqDropConnection(conn, true);
+
+				if (conn->addr_cur->ai_next != NULL ||
+					conn->whichhost + 1 < conn->nconnhost)
+				{
+					conn->addr_cur = conn->addr_cur->ai_next;
+					conn->status = CONNECTION_NEEDED;
+					goto keep_going;
+				}
+
+				/* No more addresses to try. So we fail. */
+				goto error_return;
+			}
+
 		default:
 			appendPQExpBuffer(&conn->errorMessage,
 							  libpq_gettext("invalid connection state %d, "
@@ -3109,6 +3229,8 @@ freePGconn(PGconn *conn)
 		free(conn->outBuffer);
 	if (conn->rowBuf)
 		free(conn->rowBuf);
+	if (conn->target_session_attrs)
+		free(conn->target_session_attrs);
 	termPQExpBuffer(&conn->errorMessage);
 	termPQExpBuffer(&conn->workBuffer);
 
@@ -3120,19 +3242,41 @@ freePGconn(PGconn *conn)
 }
 
 /*
- * closePGconn
- *	 - properly close a connection to the backend
- *
- * This should reset or release all transient state, but NOT the connection
- * parameters.  On exit, the PGconn should be in condition to start a fresh
- * connection with the same parameters (see PQreset()).
+ * release_all_addrinfo
+ *	 - free addrinfo of all hostconn elements.
  */
+
 static void
-closePGconn(PGconn *conn)
+release_all_addrinfo(PGconn *conn)
 {
-	PGnotify   *notify;
-	pgParameterStatus *pstatus;
+	if (conn->connhost != NULL)
+	{
+		int			i;
+
+		for (i = 0; i < conn->nconnhost; ++i)
+		{
+			int			family = AF_UNSPEC;
+
+#ifdef HAVE_UNIX_SOCKETS
+			if (conn->connhost[i].type == CHT_UNIX_SOCKET)
+				family = AF_UNIX;
+#endif
 
+			pg_freeaddrinfo_all(family,
+								conn->connhost[i].addrlist);
+			conn->connhost[i].addrlist = NULL;
+		}
+	}
+	conn->addr_cur = NULL;
+}
+
+/*
+ * sendTerminateConn
+ *	 - Send a terminate message to backend.
+ */
+static void
+sendTerminateConn(PGconn *conn)
+{
 	/*
 	 * Note that the protocol doesn't allow us to send Terminate messages
 	 * during the startup phase.
@@ -3147,6 +3291,23 @@ closePGconn(PGconn *conn)
 		pqPutMsgEnd(conn);
 		(void) pqFlush(conn);
 	}
+}
+
+/*
+ * closePGconn
+ *	 - properly close a connection to the backend
+ *
+ * This should reset or release all transient state, but NOT the connection
+ * parameters.  On exit, the PGconn should be in condition to start a fresh
+ * connection with the same parameters (see PQreset()).
+ */
+static void
+closePGconn(PGconn *conn)
+{
+	PGnotify   *notify;
+	pgParameterStatus *pstatus;
+
+	sendTerminateConn(conn);
 
 	/*
 	 * Must reset the blocking status so a possible reconnect will work.
@@ -3165,25 +3326,8 @@ closePGconn(PGconn *conn)
 	conn->asyncStatus = PGASYNC_IDLE;
 	pqClearAsyncResult(conn);	/* deallocate result */
 	resetPQExpBuffer(&conn->errorMessage);
-	if (conn->connhost != NULL)
-	{
-		int		i;
-
-		for (i = 0; i < conn->nconnhost; ++i)
-		{
-			int		family = AF_UNSPEC;
-
-#ifdef HAVE_UNIX_SOCKETS
-			if (conn->connhost[i].type == CHT_UNIX_SOCKET)
-				family = AF_UNIX;
-#endif
+	release_all_addrinfo(conn);
 
-			pg_freeaddrinfo_all(family,
-								conn->connhost[i].addrlist);
-			conn->connhost[i].addrlist = NULL;
-		}
-	}
-	conn->addr_cur = NULL;
 	notify = conn->notifyHead;
 	while (notify != NULL)
 	{
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 9ca0756c4bf..20b7e57de76 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -62,7 +62,9 @@ typedef enum
 								 * backend startup. */
 	CONNECTION_SETENV,			/* Negotiating environment. */
 	CONNECTION_SSL_STARTUP,		/* Negotiating SSL. */
-	CONNECTION_NEEDED			/* Internal state: connect() needed */
+	CONNECTION_NEEDED,			/* Internal state: connect() needed */
+	CONNECTION_CHECK_WRITABLE	/* Check if we could make a writable
+								 * connection. */
 } ConnStatusType;
 
 typedef enum
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 854ec89924b..a2f85895a18 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -361,6 +361,9 @@ struct pg_conn
 	char	   *krbsrvname;		/* Kerberos service name */
 #endif
 
+	char	   *target_session_attrs;	/* Type of connection to make
+										 * Possible values any, read-write. */
+
 	/* Optional file to write trace info to */
 	FILE	   *Pfdebug;
 
-- 
GitLab