From 056eb1412c864c61b26eb8d540fd92636795f67d Mon Sep 17 00:00:00 2001
From: Joe Conway <mail@joeconway.com>
Date: Tue, 18 Oct 2005 02:55:49 +0000
Subject: [PATCH] When a cursor is opened using dblink_open, only start a
 transaction if there isn't one already open. Upon dblink_close, only commit
 the open transaction if it was started by dblink_open, and only then when all
 cursors opened by dblink_open are closed. The transaction accounting is done
 individually for all named connections, plus the persistent unnamed
 connection.

---
 contrib/dblink/dblink.c            | 114 +++++++++++++++++++----------
 contrib/dblink/expected/dblink.out |  82 +++++++++++++++++++++
 contrib/dblink/sql/dblink.sql      |  36 +++++++++
 3 files changed, 193 insertions(+), 39 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index c11b15860a3..54e787bb206 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -60,9 +60,9 @@
 
 typedef struct remoteConn
 {
-	PGconn	   *conn;			/* Hold the remote connection */
-	int			autoXactCursors;/* Indicates the number of open cursors,
-								 * non-zero means we opened the xact ourselves */
+	PGconn	   *conn;				/* Hold the remote connection */
+	int			openCursorCount;	/* The number of open cursors */
+	bool		newXactForCursor;	/* Opened a transaction for a cursor */
 }	remoteConn;
 
 /*
@@ -84,10 +84,8 @@ static Oid	get_relid_from_relname(text *relname_text);
 static char *generate_relation_name(Oid relid);
 
 /* Global */
-List	   *res_id = NIL;
-int			res_id_index = 0;
-PGconn	   *persistent_conn = NULL;
-static HTAB *remoteConnHash = NULL;
+static remoteConn	   *pconn = NULL;
+static HTAB			   *remoteConnHash = NULL;
 
 /*
  *	Following is list that holds multiple remote connections.
@@ -184,6 +182,16 @@ typedef struct remoteConnHashEnt
 			} \
 	} while (0)
 
+#define DBLINK_INIT \
+	do { \
+			if (!pconn) \
+			{ \
+				pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
+				pconn->conn = NULL; \
+				pconn->openCursorCount = 0; \
+				pconn->newXactForCursor = FALSE; \
+			} \
+	} while (0)
 
 /*
  * Create a persistent connection to another database
@@ -199,6 +207,8 @@ dblink_connect(PG_FUNCTION_ARGS)
 	PGconn	   *conn = NULL;
 	remoteConn *rconn = NULL;
 
+	DBLINK_INIT;
+
 	if (PG_NARGS() == 2)
 	{
 		connstr = GET_STR(PG_GETARG_TEXT_P(1));
@@ -234,7 +244,7 @@ dblink_connect(PG_FUNCTION_ARGS)
 		createNewConnection(connname, rconn);
 	}
 	else
-		persistent_conn = conn;
+		pconn->conn = conn;
 
 	PG_RETURN_TEXT_P(GET_TEXT("OK"));
 }
@@ -250,6 +260,8 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	PGconn	   *conn = NULL;
 
+	DBLINK_INIT;
+
 	if (PG_NARGS() == 1)
 	{
 		conname = GET_STR(PG_GETARG_TEXT_P(0));
@@ -258,7 +270,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 			conn = rconn->conn;
 	}
 	else
-		conn = persistent_conn;
+		conn = pconn->conn;
 
 	if (!conn)
 		DBLINK_CONN_NOT_AVAIL;
@@ -270,7 +282,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 		pfree(rconn);
 	}
 	else
-		persistent_conn = NULL;
+		pconn->conn = NULL;
 
 	PG_RETURN_TEXT_P(GET_TEXT("OK"));
 }
@@ -292,12 +304,14 @@ dblink_open(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible behavior */
 
+	DBLINK_INIT;
+
 	if (PG_NARGS() == 2)
 	{
 		/* text,text */
 		curname = GET_STR(PG_GETARG_TEXT_P(0));
 		sql = GET_STR(PG_GETARG_TEXT_P(1));
-		conn = persistent_conn;
+		rconn = pconn;
 	}
 	else if (PG_NARGS() == 3)
 	{
@@ -307,7 +321,7 @@ dblink_open(PG_FUNCTION_ARGS)
 			curname = GET_STR(PG_GETARG_TEXT_P(0));
 			sql = GET_STR(PG_GETARG_TEXT_P(1));
 			fail = PG_GETARG_BOOL(2);
-			conn = persistent_conn;
+			rconn = pconn;
 		}
 		else
 		{
@@ -315,8 +329,6 @@ dblink_open(PG_FUNCTION_ARGS)
 			curname = GET_STR(PG_GETARG_TEXT_P(1));
 			sql = GET_STR(PG_GETARG_TEXT_P(2));
 			rconn = getConnectionByName(conname);
-			if (rconn)
-				conn = rconn->conn;
 		}
 	}
 	else if (PG_NARGS() == 4)
@@ -327,18 +339,26 @@ dblink_open(PG_FUNCTION_ARGS)
 		sql = GET_STR(PG_GETARG_TEXT_P(2));
 		fail = PG_GETARG_BOOL(3);
 		rconn = getConnectionByName(conname);
-		if (rconn)
-			conn = rconn->conn;
 	}
 
-	if (!conn)
+	if (!rconn || !rconn->conn)
 		DBLINK_CONN_NOT_AVAIL;
+	else
+		conn = rconn->conn;
 
-	res = PQexec(conn, "BEGIN");
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		DBLINK_RES_INTERNALERROR("begin error");
+	/*	If we are not in a transaction, start one */
+	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
+	{
+		res = PQexec(conn, "BEGIN");
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			DBLINK_RES_INTERNALERROR("begin error");
+		PQclear(res);
+		rconn->newXactForCursor = TRUE;
+	}
 
-	PQclear(res);
+	/* if we started a transaction, increment cursor count */
+	if (rconn->newXactForCursor)
+		(rconn->openCursorCount)++;
 
 	appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
 	res = PQexec(conn, str->data);
@@ -373,11 +393,13 @@ dblink_close(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible behavior */
 
+	DBLINK_INIT;
+
 	if (PG_NARGS() == 1)
 	{
 		/* text */
 		curname = GET_STR(PG_GETARG_TEXT_P(0));
-		conn = persistent_conn;
+		rconn = pconn;
 	}
 	else if (PG_NARGS() == 2)
 	{
@@ -386,15 +408,13 @@ dblink_close(PG_FUNCTION_ARGS)
 		{
 			curname = GET_STR(PG_GETARG_TEXT_P(0));
 			fail = PG_GETARG_BOOL(1);
-			conn = persistent_conn;
+			rconn = pconn;
 		}
 		else
 		{
 			conname = GET_STR(PG_GETARG_TEXT_P(0));
 			curname = GET_STR(PG_GETARG_TEXT_P(1));
 			rconn = getConnectionByName(conname);
-			if (rconn)
-				conn = rconn->conn;
 		}
 	}
 	if (PG_NARGS() == 3)
@@ -404,12 +424,12 @@ dblink_close(PG_FUNCTION_ARGS)
 		curname = GET_STR(PG_GETARG_TEXT_P(1));
 		fail = PG_GETARG_BOOL(2);
 		rconn = getConnectionByName(conname);
-		if (rconn)
-			conn = rconn->conn;
 	}
 
-	if (!conn)
+	if (!rconn || !rconn->conn)
 		DBLINK_CONN_NOT_AVAIL;
+	else
+		conn = rconn->conn;
 
 	appendStringInfo(str, "CLOSE %s", curname);
 
@@ -428,12 +448,22 @@ dblink_close(PG_FUNCTION_ARGS)
 
 	PQclear(res);
 
-	/* commit the transaction */
-	res = PQexec(conn, "COMMIT");
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		DBLINK_RES_INTERNALERROR("commit error");
+	/* if we started a transaction, decrement cursor count */
+	if (rconn->newXactForCursor)
+	{
+		(rconn->openCursorCount)--;
 
-	PQclear(res);
+		/* if count is zero, commit the transaction */
+		if (rconn->openCursorCount == 0)
+		{
+			rconn->newXactForCursor = FALSE;
+
+			res = PQexec(conn, "COMMIT");
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+				DBLINK_RES_INTERNALERROR("commit error");
+			PQclear(res);
+		}
+	}
 
 	PG_RETURN_TEXT_P(GET_TEXT("OK"));
 }
@@ -456,6 +486,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	char	   *conname = NULL;
 	remoteConn *rconn = NULL;
 
+	DBLINK_INIT;
+
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
@@ -485,7 +517,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 				curname = GET_STR(PG_GETARG_TEXT_P(0));
 				howmany = PG_GETARG_INT32(1);
 				fail = PG_GETARG_BOOL(2);
-				conn = persistent_conn;
+				conn = pconn->conn;
 			}
 			else
 			{
@@ -503,7 +535,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 			/* text,int */
 			curname = GET_STR(PG_GETARG_TEXT_P(0));
 			howmany = PG_GETARG_INT32(1);
-			conn = persistent_conn;
+			conn = pconn->conn;
 		}
 
 		if (!conn)
@@ -648,6 +680,8 @@ dblink_record(PG_FUNCTION_ARGS)
 	MemoryContext oldcontext;
 	bool		freeconn = false;
 
+	DBLINK_INIT;
+
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
@@ -678,7 +712,7 @@ dblink_record(PG_FUNCTION_ARGS)
 			/* text,text or text,bool */
 			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
 			{
-				conn = persistent_conn;
+				conn = pconn->conn;
 				sql = GET_STR(PG_GETARG_TEXT_P(0));
 				fail = PG_GETARG_BOOL(1);
 			}
@@ -691,7 +725,7 @@ dblink_record(PG_FUNCTION_ARGS)
 		else if (PG_NARGS() == 1)
 		{
 			/* text */
-			conn = persistent_conn;
+			conn = pconn->conn;
 			sql = GET_STR(PG_GETARG_TEXT_P(0));
 		}
 		else
@@ -857,6 +891,8 @@ dblink_exec(PG_FUNCTION_ARGS)
 	bool		freeconn = false;
 	bool		fail = true;	/* default to backward compatible behavior */
 
+	DBLINK_INIT;
+
 	if (PG_NARGS() == 3)
 	{
 		/* must be text,text,bool */
@@ -869,7 +905,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 		/* might be text,text or text,bool */
 		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
 		{
-			conn = persistent_conn;
+			conn = pconn->conn;
 			sql = GET_STR(PG_GETARG_TEXT_P(0));
 			fail = PG_GETARG_BOOL(1);
 		}
@@ -882,7 +918,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 	else if (PG_NARGS() == 1)
 	{
 		/* must be single text argument */
-		conn = persistent_conn;
+		conn = pconn->conn;
 		sql = GET_STR(PG_GETARG_TEXT_P(0));
 	}
 	else
diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index cd2d4b18450..55d2b9e7c4c 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -436,6 +436,88 @@ SELECT dblink_exec('myconn','ABORT');
  ROLLBACK
 (1 row)
 
+-- test opening cursor in a transaction
+SELECT dblink_exec('myconn','BEGIN');
+ dblink_exec 
+-------------
+ BEGIN
+(1 row)
+
+-- an open transaction will prevent dblink_open() from opening its own
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ dblink_open 
+-------------
+ OK
+(1 row)
+
+-- this should not commit the transaction because the client opened it
+SELECT dblink_close('myconn','rmt_foo_cursor');
+ dblink_close 
+--------------
+ OK
+(1 row)
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+  dblink_exec   
+----------------
+ DECLARE CURSOR
+(1 row)
+
+-- commit remote transaction
+SELECT dblink_exec('myconn','COMMIT');
+ dblink_exec 
+-------------
+ COMMIT
+(1 row)
+
+-- test automatic transactions for multiple cursor opens
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ dblink_open 
+-------------
+ OK
+(1 row)
+
+-- the second cursor
+SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+ dblink_open 
+-------------
+ OK
+(1 row)
+
+-- this should not commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor2');
+ dblink_close 
+--------------
+ OK
+(1 row)
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+  dblink_exec   
+----------------
+ DECLARE CURSOR
+(1 row)
+
+-- this should commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor');
+ dblink_close 
+--------------
+ OK
+(1 row)
+
+-- this should fail because there is no open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ERROR:  sql error
+DETAIL:  ERROR:  cursor "xact_test" already exists
+
+-- reset remote transaction state
+SELECT dblink_exec('myconn','ABORT');
+ dblink_exec 
+-------------
+ ROLLBACK
+(1 row)
+
 -- open a cursor
 SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
  dblink_open 
diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql
index db9dd6582fd..66e2607cfee 100644
--- a/contrib/dblink/sql/dblink.sql
+++ b/contrib/dblink/sql/dblink.sql
@@ -217,6 +217,42 @@ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foobar',false);
 -- reset remote transaction state
 SELECT dblink_exec('myconn','ABORT');
 
+-- test opening cursor in a transaction
+SELECT dblink_exec('myconn','BEGIN');
+
+-- an open transaction will prevent dblink_open() from opening its own
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+-- this should not commit the transaction because the client opened it
+SELECT dblink_close('myconn','rmt_foo_cursor');
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- commit remote transaction
+SELECT dblink_exec('myconn','COMMIT');
+
+-- test automatic transactions for multiple cursor opens
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+-- the second cursor
+SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+
+-- this should not commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor2');
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- this should commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor');
+
+-- this should fail because there is no open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- reset remote transaction state
+SELECT dblink_exec('myconn','ABORT');
+
 -- open a cursor
 SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
 
-- 
GitLab