From ec543db77b6b72f24d0a637c4a4a419cf8311d0b Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Mon, 1 Dec 2008 17:06:21 +0000
Subject: [PATCH] Ensure that the contents of a holdable cursor don't depend on
 out-of-line toasted values, since those could get dropped once the cursor's
 transaction is over.  Per bug #4553 from Andrew Gierth.

Back-patch as far as 8.1.  The bug actually exists back to 7.4 when holdable
cursors were introduced, but this patch won't work before 8.1 without
significant adjustments.  Given the lack of field complaints, it doesn't seem
worth the work (and risk of introducing new bugs) to try to make a patch for
the older branches.
---
 src/backend/commands/portalcmds.c     |  10 +-
 src/backend/executor/tstoreReceiver.c | 137 ++++++++++++++++++++++++--
 src/backend/tcop/pquery.c             |   5 +-
 src/include/executor/tstoreReceiver.h |   5 +-
 4 files changed, 140 insertions(+), 17 deletions(-)

diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 45776277d88..9b74f1bdc1a 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -14,7 +14,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/commands/portalcmds.c,v 1.76 2008/11/30 20:51:25 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/commands/portalcmds.c,v 1.77 2008/12/01 17:06:21 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -351,11 +351,15 @@ PersistHoldablePortal(Portal portal)
 		 */
 		ExecutorRewind(queryDesc);
 
-		/* Change the destination to output to the tuplestore */
+		/*
+		 * Change the destination to output to the tuplestore.  Note we
+		 * tell the tuplestore receiver to detoast all data passed through it.
+		 */
 		queryDesc->dest = CreateDestReceiver(DestTuplestore);
 		SetTuplestoreDestReceiverParams(queryDesc->dest,
 										portal->holdStore,
-										portal->holdContext);
+										portal->holdContext,
+										true);
 
 		/* Fetch the result set into the tuplestore */
 		ExecutorRun(queryDesc, ForwardScanDirection, 0L);
diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c
index cb78c80b919..9993f856ffa 100644
--- a/src/backend/executor/tstoreReceiver.c
+++ b/src/backend/executor/tstoreReceiver.c
@@ -1,46 +1,97 @@
 /*-------------------------------------------------------------------------
  *
  * tstoreReceiver.c
- *	  an implementation of DestReceiver that stores the result tuples in
- *	  a Tuplestore
+ *	  An implementation of DestReceiver that stores the result tuples in
+ *	  a Tuplestore.
+ *
+ * Optionally, we can force detoasting (but not decompression) of out-of-line
+ * toasted values.  This is to support cursors WITH HOLD, which must retain
+ * data even if the underlying table is dropped.
  *
  *
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/executor/tstoreReceiver.c,v 1.20 2008/11/30 20:51:25 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/executor/tstoreReceiver.c,v 1.21 2008/12/01 17:06:21 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
+#include "access/tuptoaster.h"
 #include "executor/tstoreReceiver.h"
 
 
 typedef struct
 {
 	DestReceiver pub;
-	Tuplestorestate *tstore;
-	MemoryContext cxt;
+	/* parameters: */
+	Tuplestorestate *tstore;	/* where to put the data */
+	MemoryContext cxt;			/* context containing tstore */
+	bool		detoast;		/* were we told to detoast? */
+	/* workspace: */
+	Datum	   *outvalues;		/* values array for result tuple */
+	Datum	   *tofree;			/* temp values to be pfree'd */
 } TStoreState;
 
 
+static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
+static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+
+
 /*
  * Prepare to receive tuples from executor.
  */
 static void
 tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
 {
-	/* do nothing */
+	TStoreState *myState = (TStoreState *) self;
+	bool		needtoast = false;
+	Form_pg_attribute *attrs = typeinfo->attrs;
+	int			natts = typeinfo->natts;
+	int			i;
+
+	/* Check if any columns require detoast work */
+	if (myState->detoast)
+	{
+		for (i = 0; i < natts; i++)
+		{
+			if (attrs[i]->attisdropped)
+				continue;
+			if (attrs[i]->attlen == -1)
+			{
+				needtoast = true;
+				break;
+			}
+		}
+	}
+
+	/* Set up appropriate callback */
+	if (needtoast)
+	{
+		myState->pub.receiveSlot = tstoreReceiveSlot_detoast;
+		/* Create workspace */
+		myState->outvalues = (Datum *)
+			MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
+		myState->tofree = (Datum *)
+			MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
+	}
+	else
+	{
+		myState->pub.receiveSlot = tstoreReceiveSlot_notoast;
+		myState->outvalues = NULL;
+		myState->tofree = NULL;
+	}
 }
 
 /*
  * Receive a tuple from the executor and store it in the tuplestore.
+ * This is for the easy case where we don't have to detoast.
  */
 static void
-tstoreReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
+tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
 {
 	TStoreState *myState = (TStoreState *) self;
 	MemoryContext oldcxt = MemoryContextSwitchTo(myState->cxt);
@@ -50,13 +101,77 @@ tstoreReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 	MemoryContextSwitchTo(oldcxt);
 }
 
+/*
+ * Receive a tuple from the executor and store it in the tuplestore.
+ * This is for the case where we have to detoast any toasted values.
+ */
+static void
+tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
+{
+	TStoreState *myState = (TStoreState *) self;
+	TupleDesc	typeinfo = slot->tts_tupleDescriptor;
+	Form_pg_attribute *attrs = typeinfo->attrs;
+	int			natts = typeinfo->natts;
+	int			nfree;
+	int			i;
+	MemoryContext oldcxt;
+
+	/* Make sure the tuple is fully deconstructed */
+	slot_getallattrs(slot);
+
+	/*
+	 * Fetch back any out-of-line datums.  We build the new datums array in
+	 * myState->outvalues[] (but we can re-use the slot's isnull array).
+	 * Also, remember the fetched values to free afterwards.
+	 */
+	nfree = 0;
+	for (i = 0; i < natts; i++)
+	{
+		Datum		val = slot->tts_values[i];
+
+		if (!attrs[i]->attisdropped &&
+			attrs[i]->attlen == -1 &&
+			!slot->tts_isnull[i])
+		{
+			if (VARATT_IS_EXTERNAL(DatumGetPointer(val)))
+			{
+				val = PointerGetDatum(heap_tuple_fetch_attr((struct varlena *)
+														DatumGetPointer(val)));
+				myState->tofree[nfree++] = val;
+			}
+		}
+
+		myState->outvalues[i] = val;
+	}
+
+	/*
+	 * Push the modified tuple into the tuplestore.
+	 */
+	oldcxt = MemoryContextSwitchTo(myState->cxt);
+	tuplestore_putvalues(myState->tstore, typeinfo,
+						 myState->outvalues, slot->tts_isnull);
+	MemoryContextSwitchTo(oldcxt);
+
+	/* And release any temporary detoasted values */
+	for (i = 0; i < nfree; i++)
+		pfree(DatumGetPointer(myState->tofree[i]));
+}
+
 /*
  * Clean up at end of an executor run
  */
 static void
 tstoreShutdownReceiver(DestReceiver *self)
 {
-	/* do nothing */
+	TStoreState *myState = (TStoreState *) self;
+
+	/* Release workspace if any */
+	if (myState->outvalues)
+		pfree(myState->outvalues);
+	myState->outvalues = NULL;
+	if (myState->tofree)
+		pfree(myState->tofree);
+	myState->tofree = NULL;
 }
 
 /*
@@ -76,7 +191,7 @@ CreateTuplestoreDestReceiver(void)
 {
 	TStoreState *self = (TStoreState *) palloc0(sizeof(TStoreState));
 
-	self->pub.receiveSlot = tstoreReceiveSlot;
+	self->pub.receiveSlot = tstoreReceiveSlot_notoast;	/* might change */
 	self->pub.rStartup = tstoreStartupReceiver;
 	self->pub.rShutdown = tstoreShutdownReceiver;
 	self->pub.rDestroy = tstoreDestroyReceiver;
@@ -93,11 +208,13 @@ CreateTuplestoreDestReceiver(void)
 void
 SetTuplestoreDestReceiverParams(DestReceiver *self,
 								Tuplestorestate *tStore,
-								MemoryContext tContext)
+								MemoryContext tContext,
+								bool detoast)
 {
 	TStoreState *myState = (TStoreState *) self;
 
 	Assert(myState->pub.mydest == DestTuplestore);
 	myState->tstore = tStore;
 	myState->cxt = tContext;
+	myState->detoast = detoast;
 }
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 2a9764ac8b1..c2a161b0f7f 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/tcop/pquery.c,v 1.126 2008/11/30 20:51:25 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/tcop/pquery.c,v 1.127 2008/12/01 17:06:21 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1036,7 +1036,8 @@ FillPortalStore(Portal portal, bool isTopLevel)
 	treceiver = CreateDestReceiver(DestTuplestore);
 	SetTuplestoreDestReceiverParams(treceiver,
 									portal->holdStore,
-									portal->holdContext);
+									portal->holdContext,
+									false);
 
 	completionTag[0] = '\0';
 
diff --git a/src/include/executor/tstoreReceiver.h b/src/include/executor/tstoreReceiver.h
index c304c1fce90..b2d3890f46a 100644
--- a/src/include/executor/tstoreReceiver.h
+++ b/src/include/executor/tstoreReceiver.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/executor/tstoreReceiver.h,v 1.11 2008/11/30 20:51:25 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/executor/tstoreReceiver.h,v 1.12 2008/12/01 17:06:21 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -23,6 +23,7 @@ extern DestReceiver *CreateTuplestoreDestReceiver(void);
 
 extern void SetTuplestoreDestReceiverParams(DestReceiver *self,
 											Tuplestorestate *tStore,
-											MemoryContext tContext);
+											MemoryContext tContext,
+											bool detoast);
 
 #endif   /* TSTORE_RECEIVER_H */
-- 
GitLab