From 56cbb611ec749ba867a4cfc09c8b7df0f4446620 Mon Sep 17 00:00:00 2001
From: Joe Conway <mail@joeconway.com>
Date: Sun, 24 Jan 2010 22:19:38 +0000
Subject: [PATCH] Rewrite dblink_record_internal() and dblink_fetch() to use a
 tuplestore (SFRM_Materialize mode) to return tuples. Since we don't return
 from the dblink function in tuplestore mode, release the PGresult with a
 PG_CATCH block on error. Also rearrange to share the same code to materialize
 the tuplestore. Patch by Takahiro Itagaki.

---
 contrib/dblink/dblink.c | 569 ++++++++++++++++------------------------
 1 file changed, 232 insertions(+), 337 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 3fc6b60c2a5..ded7832f335 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -8,7 +8,7 @@
  * Darko Prenosil <Darko.Prenosil@finteh.hr>
  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
  *
- * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.86 2010/01/02 16:57:32 momjian Exp $
+ * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.87 2010/01/24 22:19:38 joe Exp $
  * Copyright (c) 2001-2010, PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
@@ -80,6 +80,7 @@ typedef struct remoteConn
  * Internal declarations
  */
 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
+static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
 static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
@@ -504,200 +505,94 @@ PG_FUNCTION_INFO_V1(dblink_fetch);
 Datum
 dblink_fetch(PG_FUNCTION_ARGS)
 {
-	FuncCallContext *funcctx;
-	TupleDesc	tupdesc = NULL;
-	int			call_cntr;
-	int			max_calls;
-	AttInMetadata *attinmeta;
-	PGresult   *res = NULL;
-	MemoryContext oldcontext;
-	char	   *conname = NULL;
-	remoteConn *rconn = NULL;
+	ReturnSetInfo  *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	PGresult	   *res = NULL;
+	char		   *conname = NULL;
+	remoteConn	   *rconn = NULL;
+	PGconn		   *conn = NULL;
+	StringInfoData	buf;
+	char		   *curname = NULL;
+	int				howmany = 0;
+	bool			fail = true;	/* default to backward compatible */
 
 	DBLINK_INIT;
 
-	/* stuff done only on the first call of the function */
-	if (SRF_IS_FIRSTCALL())
+	if (PG_NARGS() == 4)
 	{
-		PGconn	   *conn = NULL;
-		StringInfoData buf;
-		char	   *curname = NULL;
-		int			howmany = 0;
-		bool		fail = true;	/* default to backward compatible */
+		/* text,text,int,bool */
+		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+		curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
+		howmany = PG_GETARG_INT32(2);
+		fail = PG_GETARG_BOOL(3);
 
-		if (PG_NARGS() == 4)
+		rconn = getConnectionByName(conname);
+		if (rconn)
+			conn = rconn->conn;
+	}
+	else if (PG_NARGS() == 3)
+	{
+		/* text,text,int or text,int,bool */
+		if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
+		{
+			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+			howmany = PG_GETARG_INT32(1);
+			fail = PG_GETARG_BOOL(2);
+			conn = pconn->conn;
+		}
+		else
 		{
-			/* text,text,int,bool */
 			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 			curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			howmany = PG_GETARG_INT32(2);
-			fail = PG_GETARG_BOOL(3);
 
 			rconn = getConnectionByName(conname);
 			if (rconn)
 				conn = rconn->conn;
 		}
-		else if (PG_NARGS() == 3)
-		{
-			/* text,text,int or text,int,bool */
-			if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
-			{
-				curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
-				howmany = PG_GETARG_INT32(1);
-				fail = PG_GETARG_BOOL(2);
-				conn = pconn->conn;
-			}
-			else
-			{
-				conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
-				curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
-				howmany = PG_GETARG_INT32(2);
-
-				rconn = getConnectionByName(conname);
-				if (rconn)
-					conn = rconn->conn;
-			}
-		}
-		else if (PG_NARGS() == 2)
-		{
-			/* text,int */
-			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
-			howmany = PG_GETARG_INT32(1);
-			conn = pconn->conn;
-		}
-
-		if (!conn)
-			DBLINK_CONN_NOT_AVAIL;
-
-		initStringInfo(&buf);
-		appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
-
-		/* create a function context for cross-call persistence */
-		funcctx = SRF_FIRSTCALL_INIT();
-
-		/*
-		 * Try to execute the query.  Note that since libpq uses malloc, the
-		 * PGresult will be long-lived even though we are still in a
-		 * short-lived memory context.
-		 */
-		res = PQexec(conn, buf.data);
-		if (!res ||
-			(PQresultStatus(res) != PGRES_COMMAND_OK &&
-			 PQresultStatus(res) != PGRES_TUPLES_OK))
-		{
-			dblink_res_error(conname, res, "could not fetch from cursor", fail);
-			SRF_RETURN_DONE(funcctx);
-		}
-		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
-		{
-			/* cursor does not exist - closed already or bad name */
-			PQclear(res);
-			ereport(ERROR,
-					(errcode(ERRCODE_INVALID_CURSOR_NAME),
-					 errmsg("cursor \"%s\" does not exist", curname)));
-		}
-
-		funcctx->max_calls = PQntuples(res);
-
-		/* got results, keep track of them */
-		funcctx->user_fctx = res;
-
-		/* get a tuple descriptor for our result type */
-		switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-		{
-			case TYPEFUNC_COMPOSITE:
-				/* success */
-				break;
-			case TYPEFUNC_RECORD:
-				/* failed to determine actual type of RECORD */
-				ereport(ERROR,
-						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						 errmsg("function returning record called in context "
-								"that cannot accept type record")));
-				break;
-			default:
-				/* result type isn't composite */
-				elog(ERROR, "return type must be a row type");
-				break;
-		}
-
-		/* check result and tuple descriptor have the same number of columns */
-		if (PQnfields(res) != tupdesc->natts)
-			ereport(ERROR,
-					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("remote query result rowtype does not match "
-							"the specified FROM clause rowtype")));
-
-		/*
-		 * fast track when no results.	We could exit earlier, but then we'd
-		 * not report error if the result tuple type is wrong.
-		 */
-		if (funcctx->max_calls < 1)
-		{
-			PQclear(res);
-			SRF_RETURN_DONE(funcctx);
-		}
-
-		/*
-		 * switch to memory context appropriate for multiple function calls,
-		 * so we can make long-lived copy of tupdesc etc
-		 */
-		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
-
-		/* make sure we have a persistent copy of the tupdesc */
-		tupdesc = CreateTupleDescCopy(tupdesc);
+	}
+	else if (PG_NARGS() == 2)
+	{
+		/* text,int */
+		curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+		howmany = PG_GETARG_INT32(1);
+		conn = pconn->conn;
+	}
 
-		/* store needed metadata for subsequent calls */
-		attinmeta = TupleDescGetAttInMetadata(tupdesc);
-		funcctx->attinmeta = attinmeta;
+	if (!conn)
+		DBLINK_CONN_NOT_AVAIL;
 
-		MemoryContextSwitchTo(oldcontext);
-	}
+	/* let the caller know we're sending back a tuplestore */
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = NULL;
+	rsinfo->setDesc = NULL;
 
-	/* stuff done on every call of the function */
-	funcctx = SRF_PERCALL_SETUP();
+	initStringInfo(&buf);
+	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
 
 	/*
-	 * initialize per-call variables
+	 * Try to execute the query.  Note that since libpq uses malloc, the
+	 * PGresult will be long-lived even though we are still in a
+	 * short-lived memory context.
 	 */
-	call_cntr = funcctx->call_cntr;
-	max_calls = funcctx->max_calls;
-
-	res = (PGresult *) funcctx->user_fctx;
-	attinmeta = funcctx->attinmeta;
-	tupdesc = attinmeta->tupdesc;
-
-	if (call_cntr < max_calls)	/* do when there is more left to send */
+	res = PQexec(conn, buf.data);
+	if (!res ||
+		(PQresultStatus(res) != PGRES_COMMAND_OK &&
+		 PQresultStatus(res) != PGRES_TUPLES_OK))
 	{
-		char	  **values;
-		HeapTuple	tuple;
-		Datum		result;
-		int			i;
-		int			nfields = PQnfields(res);
-
-		values = (char **) palloc(nfields * sizeof(char *));
-		for (i = 0; i < nfields; i++)
-		{
-			if (PQgetisnull(res, call_cntr, i) == 0)
-				values[i] = PQgetvalue(res, call_cntr, i);
-			else
-				values[i] = NULL;
-		}
-
-		/* build the tuple */
-		tuple = BuildTupleFromCStrings(attinmeta, values);
-
-		/* make the tuple into a datum */
-		result = HeapTupleGetDatum(tuple);
-
-		SRF_RETURN_NEXT(funcctx, result);
+		dblink_res_error(conname, res, "could not fetch from cursor", fail);
+		return (Datum) 0;
 	}
-	else
+	else if (PQresultStatus(res) == PGRES_COMMAND_OK)
 	{
-		/* do when there is no more left */
+		/* cursor does not exist - closed already or bad name */
 		PQclear(res);
-		SRF_RETURN_DONE(funcctx);
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_CURSOR_NAME),
+				 errmsg("cursor \"%s\" does not exist", curname)));
 	}
+
+	materializeResult(fcinfo, res);
+	return (Datum) 0;
 }
 
 /*
@@ -749,147 +644,156 @@ dblink_get_result(PG_FUNCTION_ARGS)
 static Datum
 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 {
-	FuncCallContext *funcctx;
-	TupleDesc	tupdesc = NULL;
-	int			call_cntr;
-	int			max_calls;
-	AttInMetadata *attinmeta;
-	char	   *msg;
-	PGresult   *res = NULL;
-	bool		is_sql_cmd = false;
-	char	   *sql_cmd_status = NULL;
-	MemoryContext oldcontext;
-	bool		freeconn = false;
+	ReturnSetInfo  *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	char		   *msg;
+	PGresult	   *res = NULL;
+	PGconn		   *conn = NULL;
+	char		   *connstr = NULL;
+	char		   *sql = NULL;
+	char		   *conname = NULL;
+	remoteConn	   *rconn = NULL;
+	bool			fail = true;	/* default to backward compatible */
+	bool			freeconn = false;
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not " \
+						"allowed in this context")));
 
 	DBLINK_INIT;
 
-	/* stuff done only on the first call of the function */
-	if (SRF_IS_FIRSTCALL())
+	if (!is_async)
 	{
-		PGconn	   *conn = NULL;
-		char	   *connstr = NULL;
-		char	   *sql = NULL;
-		char	   *conname = NULL;
-		remoteConn *rconn = NULL;
-		bool		fail = true;	/* default to backward compatible */
-
-		/* create a function context for cross-call persistence */
-		funcctx = SRF_FIRSTCALL_INIT();
-
-		/*
-		 * switch to memory context appropriate for multiple function calls
-		 */
-		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
-
-		if (!is_async)
+		if (PG_NARGS() == 3)
 		{
-			if (PG_NARGS() == 3)
-			{
-				/* text,text,bool */
-				DBLINK_GET_CONN;
-				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-				fail = PG_GETARG_BOOL(2);
-			}
-			else if (PG_NARGS() == 2)
-			{
-				/* text,text or text,bool */
-				if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
-				{
-					conn = pconn->conn;
-					sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
-					fail = PG_GETARG_BOOL(1);
-				}
-				else
-				{
-					DBLINK_GET_CONN;
-					sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-				}
-			}
-			else if (PG_NARGS() == 1)
-			{
-				/* text */
-				conn = pconn->conn;
-				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
-			}
-			else
-				/* shouldn't happen */
-				elog(ERROR, "wrong number of arguments");
+			/* text,text,bool */
+			DBLINK_GET_CONN;
+			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+			fail = PG_GETARG_BOOL(2);
 		}
-		else	/* is_async */
+		else if (PG_NARGS() == 2)
 		{
-			/* get async result */
-			if (PG_NARGS() == 2)
+			/* text,text or text,bool */
+			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
 			{
-				/* text,bool */
-				DBLINK_GET_CONN;
+				conn = pconn->conn;
+				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
 				fail = PG_GETARG_BOOL(1);
 			}
-			else if (PG_NARGS() == 1)
+			else
 			{
-				/* text */
 				DBLINK_GET_CONN;
+				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			}
-			else
-				/* shouldn't happen */
-				elog(ERROR, "wrong number of arguments");
 		}
-
-		if (!conn)
-			DBLINK_CONN_NOT_AVAIL;
-
-		/* synchronous query, or async result retrieval */
-		if (!is_async)
-			res = PQexec(conn, sql);
+		else if (PG_NARGS() == 1)
+		{
+			/* text */
+			conn = pconn->conn;
+			sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+		}
 		else
+			/* shouldn't happen */
+			elog(ERROR, "wrong number of arguments");
+	}
+	else	/* is_async */
+	{
+		/* get async result */
+		if (PG_NARGS() == 2)
 		{
-			res = PQgetResult(conn);
-			/* NULL means we're all done with the async results */
-			if (!res)
-			{
-				MemoryContextSwitchTo(oldcontext);
-				SRF_RETURN_DONE(funcctx);
-			}
+			/* text,bool */
+			DBLINK_GET_CONN;
+			fail = PG_GETARG_BOOL(1);
 		}
-
-		if (!res ||
-			(PQresultStatus(res) != PGRES_COMMAND_OK &&
-			 PQresultStatus(res) != PGRES_TUPLES_OK))
+		else if (PG_NARGS() == 1)
 		{
-			if (freeconn)
-				PQfinish(conn);
-			dblink_res_error(conname, res, "could not execute query", fail);
-			MemoryContextSwitchTo(oldcontext);
-			SRF_RETURN_DONE(funcctx);
+			/* text */
+			DBLINK_GET_CONN;
 		}
+		else
+			/* shouldn't happen */
+			elog(ERROR, "wrong number of arguments");
+	}
+
+	if (!conn)
+		DBLINK_CONN_NOT_AVAIL;
+
+	/* let the caller know we're sending back a tuplestore */
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = NULL;
+	rsinfo->setDesc = NULL;
+
+	/* synchronous query, or async result retrieval */
+	if (!is_async)
+		res = PQexec(conn, sql);
+	else
+	{
+		res = PQgetResult(conn);
+		/* NULL means we're all done with the async results */
+		if (!res)
+			return (Datum) 0;
+	}
+
+	/* if needed, close the connection to the database and cleanup */
+	if (freeconn)
+		PQfinish(conn);
+
+	if (!res ||
+		(PQresultStatus(res) != PGRES_COMMAND_OK &&
+		 PQresultStatus(res) != PGRES_TUPLES_OK))
+	{
+		dblink_res_error(conname, res, "could not execute query", fail);
+		return (Datum) 0;
+	}
+
+	materializeResult(fcinfo, res);
+	return (Datum) 0;
+}
+
+/*
+ * Materialize the PGresult to return them as the function result.
+ * The res will be released in this function.
+ */
+static void
+materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+{
+	ReturnSetInfo  *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+	Assert(rsinfo->returnMode == SFRM_Materialize);
+
+	PG_TRY();
+	{
+		TupleDesc	tupdesc;
+		bool		is_sql_cmd = false;
+		int			ntuples;
+		int			nfields;
 
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			is_sql_cmd = true;
 
-			/* need a tuple descriptor representing one TEXT column */
+			/*
+			 * need a tuple descriptor representing one TEXT column to
+			 * return the command status string as our result tuple
+			 */
 			tupdesc = CreateTemplateTupleDesc(1, false);
 			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
 							   TEXTOID, -1, 0);
-
-			/*
-			 * and save a copy of the command status string to return as our
-			 * result tuple
-			 */
-			sql_cmd_status = PQcmdStatus(res);
-			funcctx->max_calls = 1;
+			ntuples = 1;
+			nfields = 1;
 		}
 		else
-			funcctx->max_calls = PQntuples(res);
-
-		/* got results, keep track of them */
-		funcctx->user_fctx = res;
+		{
+			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
 
-		/* if needed, close the connection to the database and cleanup */
-		if (freeconn)
-			PQfinish(conn);
+			is_sql_cmd = false;
 
-		if (!is_sql_cmd)
-		{
 			/* get a tuple descriptor for our result type */
 			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
 			{
@@ -911,87 +815,78 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 
 			/* make sure we have a persistent copy of the tupdesc */
 			tupdesc = CreateTupleDescCopy(tupdesc);
+			ntuples = PQntuples(res);
+			nfields = PQnfields(res);
 		}
 
 		/*
 		 * check result and tuple descriptor have the same number of columns
 		 */
-		if (PQnfields(res) != tupdesc->natts)
+		if (nfields != tupdesc->natts)
 			ereport(ERROR,
 					(errcode(ERRCODE_DATATYPE_MISMATCH),
 					 errmsg("remote query result rowtype does not match "
 							"the specified FROM clause rowtype")));
 
-		/* fast track when no results */
-		if (funcctx->max_calls < 1)
+		if (ntuples > 0)
 		{
-			if (res)
-				PQclear(res);
+			AttInMetadata	   *attinmeta;
+			Tuplestorestate	   *tupstore;
+			MemoryContext		oldcontext;
+			int					row;
+			char			  **values;
+
+			attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+			oldcontext = MemoryContextSwitchTo(
+								rsinfo->econtext->ecxt_per_query_memory);
+			tupstore = tuplestore_begin_heap(true, false, work_mem);
+			rsinfo->setResult = tupstore;
+			rsinfo->setDesc = tupdesc;
 			MemoryContextSwitchTo(oldcontext);
-			SRF_RETURN_DONE(funcctx);
-		}
-
-		/* store needed metadata for subsequent calls */
-		attinmeta = TupleDescGetAttInMetadata(tupdesc);
-		funcctx->attinmeta = attinmeta;
-
-		MemoryContextSwitchTo(oldcontext);
-
-	}
-
-	/* stuff done on every call of the function */
-	funcctx = SRF_PERCALL_SETUP();
-
-	/*
-	 * initialize per-call variables
-	 */
-	call_cntr = funcctx->call_cntr;
-	max_calls = funcctx->max_calls;
 
-	res = (PGresult *) funcctx->user_fctx;
-	attinmeta = funcctx->attinmeta;
-	tupdesc = attinmeta->tupdesc;
+			values = (char **) palloc(nfields * sizeof(char *));
 
-	if (call_cntr < max_calls)	/* do when there is more left to send */
-	{
-		char	  **values;
-		HeapTuple	tuple;
-		Datum		result;
+			/* put all tuples into the tuplestore */
+			for (row = 0; row < ntuples; row++)
+			{
+				HeapTuple	tuple;
 
-		if (!is_sql_cmd)
-		{
-			int			i;
-			int			nfields = PQnfields(res);
+				if (!is_sql_cmd)
+				{
+					int			i;
 
-			values = (char **) palloc(nfields * sizeof(char *));
-			for (i = 0; i < nfields; i++)
-			{
-				if (PQgetisnull(res, call_cntr, i) == 0)
-					values[i] = PQgetvalue(res, call_cntr, i);
+					for (i = 0; i < nfields; i++)
+					{
+						if (PQgetisnull(res, row, i))
+							values[i] = NULL;
+						else
+							values[i] = PQgetvalue(res, row, i);
+					}
+				}
 				else
-					values[i] = NULL;
-			}
-		}
-		else
-		{
-			values = (char **) palloc(1 * sizeof(char *));
-			values[0] = sql_cmd_status;
-		}
+				{
+					values[0] = PQcmdStatus(res);
+				}
 
-		/* build the tuple */
-		tuple = BuildTupleFromCStrings(attinmeta, values);
+				/* build the tuple and put it into the tuplestore. */
+				tuple = BuildTupleFromCStrings(attinmeta, values);
+				tuplestore_puttuple(tupstore, tuple);
+			}
 
-		/* make the tuple into a datum */
-		result = HeapTupleGetDatum(tuple);
+			/* clean up and return the tuplestore */
+			tuplestore_donestoring(tupstore);
+		}
 
-		SRF_RETURN_NEXT(funcctx, result);
+		PQclear(res);
 	}
-	else
+	PG_CATCH();
 	{
-		/* do when there is no more left */
+		/* be sure to release the libpq result */
 		PQclear(res);
-		SRF_RETURN_DONE(funcctx);
+		PG_RE_THROW();
 	}
+	PG_END_TRY();
 }
 
 /*
-- 
GitLab