From 01edb5c7fc3bcf6aea15f2b3be36189b52ad9d1a Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Fri, 1 Sep 2017 17:38:54 -0400
Subject: [PATCH] Improve division of labor between execParallel.c and
 nodeGather[Merge].c.

Move the responsibility for creating/destroying TupleQueueReaders into
execParallel.c, to avoid duplicative coding in nodeGather.c and
nodeGatherMerge.c.  Also, instead of having DestroyTupleQueueReader do
shm_mq_detach, do it in the caller (which is now only ExecParallelFinish).
This means execParallel.c does both the attaching and detaching of the
tuple-queue-reader shm_mqs, which seems less weird than the previous
arrangement.

These changes also eliminate a vestigial memory leak (of the pei->tqueue
array).  It's now demonstrable that rescans of Gather or GatherMerge don't
leak memory.

Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
---
 src/backend/executor/execParallel.c    | 72 ++++++++++++++++++++++++--
 src/backend/executor/nodeGather.c      | 64 ++++++++---------------
 src/backend/executor/nodeGatherMerge.c | 50 ++++++------------
 src/backend/executor/tqueue.c          |  4 +-
 src/include/executor/execParallel.h    | 18 ++++---
 5 files changed, 119 insertions(+), 89 deletions(-)

diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 2313b4c45cb..7dda399daf3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -498,9 +498,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
 	pei->buffer_usage = bufusage_space;
 
-	/* Set up tuple queues. */
+	/* Set up the tuple queues that the workers will write into. */
 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
+	/* We don't need the TupleQueueReaders yet, though. */
+	pei->reader = NULL;
+
 	/*
 	 * If instrumentation options were supplied, allocate space for the data.
 	 * It only gets partially initialized here; the rest happens during
@@ -567,6 +570,37 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 	return pei;
 }
 
+/*
+ * Set up tuple queue readers to read the results of a parallel subplan.
+ * All the workers are expected to return tuples matching tupDesc.
+ *
+ * This is separate from ExecInitParallelPlan() because we can launch the
+ * worker processes and let them start doing something before we do this.
+ */
+void
+ExecParallelCreateReaders(ParallelExecutorInfo *pei,
+						  TupleDesc tupDesc)
+{
+	int			nworkers = pei->pcxt->nworkers_launched;
+	int			i;
+
+	Assert(pei->reader == NULL);
+
+	if (nworkers > 0)
+	{
+		pei->reader = (TupleQueueReader **)
+			palloc(nworkers * sizeof(TupleQueueReader *));
+
+		for (i = 0; i < nworkers; i++)
+		{
+			shm_mq_set_handle(pei->tqueue[i],
+							  pei->pcxt->worker[i].bgwhandle);
+			pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i],
+													tupDesc);
+		}
+	}
+}
+
 /*
  * Re-initialize the parallel executor shared memory state before launching
  * a fresh batch of workers.
@@ -580,6 +614,7 @@ ExecParallelReinitialize(PlanState *planstate,
 
 	ReinitializeParallelDSM(pei->pcxt);
 	pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+	pei->reader = NULL;
 	pei->finished = false;
 
 	/* Traverse plan tree and let each child node reset associated state. */
@@ -691,16 +726,45 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 void
 ExecParallelFinish(ParallelExecutorInfo *pei)
 {
+	int			nworkers = pei->pcxt->nworkers_launched;
 	int			i;
 
+	/* Make this be a no-op if called twice in a row. */
 	if (pei->finished)
 		return;
 
-	/* First, wait for the workers to finish. */
+	/*
+	 * Detach from tuple queues ASAP, so that any still-active workers will
+	 * notice that no further results are wanted.
+	 */
+	if (pei->tqueue != NULL)
+	{
+		for (i = 0; i < nworkers; i++)
+			shm_mq_detach(pei->tqueue[i]);
+		pfree(pei->tqueue);
+		pei->tqueue = NULL;
+	}
+
+	/*
+	 * While we're waiting for the workers to finish, let's get rid of the
+	 * tuple queue readers.  (Any other local cleanup could be done here too.)
+	 */
+	if (pei->reader != NULL)
+	{
+		for (i = 0; i < nworkers; i++)
+			DestroyTupleQueueReader(pei->reader[i]);
+		pfree(pei->reader);
+		pei->reader = NULL;
+	}
+
+	/* Now wait for the workers to finish. */
 	WaitForParallelWorkersToFinish(pei->pcxt);
 
-	/* Next, accumulate buffer usage. */
-	for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
+	/*
+	 * Next, accumulate buffer usage.  (This must wait for the workers to
+	 * finish, or we might get incomplete data.)
+	 */
+	for (i = 0; i < nworkers; i++)
 		InstrAccumParallelQuery(&pei->buffer_usage[i]);
 
 	/* Finally, accumulate instrumentation, if any. */
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 94b6ae5c300..3be735f5c33 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -129,7 +129,6 @@ ExecGather(PlanState *pstate)
 {
 	GatherState *node = castNode(GatherState, pstate);
 	TupleTableSlot *fslot = node->funnel_slot;
-	int			i;
 	TupleTableSlot *slot;
 	ExprContext *econtext;
 
@@ -171,33 +170,30 @@ ExecGather(PlanState *pstate)
 			LaunchParallelWorkers(pcxt);
 			/* We save # workers launched for the benefit of EXPLAIN */
 			node->nworkers_launched = pcxt->nworkers_launched;
-			node->nreaders = 0;
-			node->nextreader = 0;
 
 			/* Set up tuple queue readers to read the results. */
 			if (pcxt->nworkers_launched > 0)
 			{
-				node->reader = palloc(pcxt->nworkers_launched *
-									  sizeof(TupleQueueReader *));
-
-				for (i = 0; i < pcxt->nworkers_launched; ++i)
-				{
-					shm_mq_set_handle(node->pei->tqueue[i],
-									  pcxt->worker[i].bgwhandle);
-					node->reader[node->nreaders++] =
-						CreateTupleQueueReader(node->pei->tqueue[i],
-											   fslot->tts_tupleDescriptor);
-				}
+				ExecParallelCreateReaders(node->pei,
+										  fslot->tts_tupleDescriptor);
+				/* Make a working array showing the active readers */
+				node->nreaders = pcxt->nworkers_launched;
+				node->reader = (TupleQueueReader **)
+					palloc(node->nreaders * sizeof(TupleQueueReader *));
+				memcpy(node->reader, node->pei->reader,
+					   node->nreaders * sizeof(TupleQueueReader *));
 			}
 			else
 			{
 				/* No workers?	Then never mind. */
-				ExecShutdownGatherWorkers(node);
+				node->nreaders = 0;
+				node->reader = NULL;
 			}
+			node->nextreader = 0;
 		}
 
 		/* Run plan locally if no workers or not single-copy. */
-		node->need_to_scan_locally = (node->reader == NULL)
+		node->need_to_scan_locally = (node->nreaders == 0)
 			|| !gather->single_copy;
 		node->initialized = true;
 	}
@@ -256,11 +252,11 @@ gather_getnext(GatherState *gatherstate)
 	MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
 	HeapTuple	tup;
 
-	while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
+	while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
 	{
 		CHECK_FOR_INTERRUPTS();
 
-		if (gatherstate->reader != NULL)
+		if (gatherstate->nreaders > 0)
 		{
 			MemoryContext oldContext;
 
@@ -317,19 +313,15 @@ gather_readnext(GatherState *gatherstate)
 		tup = TupleQueueReaderNext(reader, true, &readerdone);
 
 		/*
-		 * If this reader is done, remove it, and collapse the array.  If all
-		 * readers are done, clean up remaining worker state.
+		 * If this reader is done, remove it from our working array of active
+		 * readers.  If all readers are done, we're outta here.
 		 */
 		if (readerdone)
 		{
 			Assert(!tup);
-			DestroyTupleQueueReader(reader);
 			--gatherstate->nreaders;
 			if (gatherstate->nreaders == 0)
-			{
-				ExecShutdownGatherWorkers(gatherstate);
 				return NULL;
-			}
 			memmove(&gatherstate->reader[gatherstate->nextreader],
 					&gatherstate->reader[gatherstate->nextreader + 1],
 					sizeof(TupleQueueReader *)
@@ -376,37 +368,25 @@ gather_readnext(GatherState *gatherstate)
 /* ----------------------------------------------------------------
  *		ExecShutdownGatherWorkers
  *
- *		Destroy the parallel workers.  Collect all the stats after
- *		workers are stopped, else some work done by workers won't be
- *		accounted.
+ *		Stop all the parallel workers.
  * ----------------------------------------------------------------
  */
 static void
 ExecShutdownGatherWorkers(GatherState *node)
 {
-	/* Shut down tuple queue readers before shutting down workers. */
-	if (node->reader != NULL)
-	{
-		int			i;
-
-		for (i = 0; i < node->nreaders; ++i)
-			DestroyTupleQueueReader(node->reader[i]);
-
-		pfree(node->reader);
-		node->reader = NULL;
-	}
-
-	/* Now shut down the workers. */
 	if (node->pei != NULL)
 		ExecParallelFinish(node->pei);
+
+	/* Flush local copy of reader array */
+	if (node->reader)
+		pfree(node->reader);
+	node->reader = NULL;
 }
 
 /* ----------------------------------------------------------------
  *		ExecShutdownGather
  *
  *		Destroy the setup for parallel workers including parallel context.
- *		Collect all the stats after workers are stopped, else some work
- *		done by workers won't be accounted.
  * ----------------------------------------------------------------
  */
 void
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index f67fbe71760..c0c285bd611 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -177,7 +177,6 @@ ExecGatherMerge(PlanState *pstate)
 	GatherMergeState *node = castNode(GatherMergeState, pstate);
 	TupleTableSlot *slot;
 	ExprContext *econtext;
-	int			i;
 
 	CHECK_FOR_INTERRUPTS();
 
@@ -212,27 +211,23 @@ ExecGatherMerge(PlanState *pstate)
 			LaunchParallelWorkers(pcxt);
 			/* We save # workers launched for the benefit of EXPLAIN */
 			node->nworkers_launched = pcxt->nworkers_launched;
-			node->nreaders = 0;
 
 			/* Set up tuple queue readers to read the results. */
 			if (pcxt->nworkers_launched > 0)
 			{
-				node->reader = palloc(pcxt->nworkers_launched *
-									  sizeof(TupleQueueReader *));
-
-				for (i = 0; i < pcxt->nworkers_launched; ++i)
-				{
-					shm_mq_set_handle(node->pei->tqueue[i],
-									  pcxt->worker[i].bgwhandle);
-					node->reader[node->nreaders++] =
-						CreateTupleQueueReader(node->pei->tqueue[i],
-											   node->tupDesc);
-				}
+				ExecParallelCreateReaders(node->pei, node->tupDesc);
+				/* Make a working array showing the active readers */
+				node->nreaders = pcxt->nworkers_launched;
+				node->reader = (TupleQueueReader **)
+					palloc(node->nreaders * sizeof(TupleQueueReader *));
+				memcpy(node->reader, node->pei->reader,
+					   node->nreaders * sizeof(TupleQueueReader *));
 			}
 			else
 			{
 				/* No workers?	Then never mind. */
-				ExecShutdownGatherMergeWorkers(node);
+				node->nreaders = 0;
+				node->reader = NULL;
 			}
 		}
 
@@ -282,8 +277,6 @@ ExecEndGatherMerge(GatherMergeState *node)
  *		ExecShutdownGatherMerge
  *
  *		Destroy the setup for parallel workers including parallel context.
- *		Collect all the stats after workers are stopped, else some work
- *		done by workers won't be accounted.
  * ----------------------------------------------------------------
  */
 void
@@ -302,30 +295,19 @@ ExecShutdownGatherMerge(GatherMergeState *node)
 /* ----------------------------------------------------------------
  *		ExecShutdownGatherMergeWorkers
  *
- *		Destroy the parallel workers.  Collect all the stats after
- *		workers are stopped, else some work done by workers won't be
- *		accounted.
+ *		Stop all the parallel workers.
  * ----------------------------------------------------------------
  */
 static void
 ExecShutdownGatherMergeWorkers(GatherMergeState *node)
 {
-	/* Shut down tuple queue readers before shutting down workers. */
-	if (node->reader != NULL)
-	{
-		int			i;
-
-		for (i = 0; i < node->nreaders; ++i)
-			if (node->reader[i])
-				DestroyTupleQueueReader(node->reader[i]);
-
-		pfree(node->reader);
-		node->reader = NULL;
-	}
-
-	/* Now shut down the workers. */
 	if (node->pei != NULL)
 		ExecParallelFinish(node->pei);
+
+	/* Flush local copy of reader array */
+	if (node->reader)
+		pfree(node->reader);
+	node->reader = NULL;
 }
 
 /* ----------------------------------------------------------------
@@ -670,8 +652,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
 	else if (tuple_buffer->done)
 	{
 		/* Reader is known to be exhausted. */
-		DestroyTupleQueueReader(gm_state->reader[reader - 1]);
-		gm_state->reader[reader - 1] = NULL;
 		return false;
 	}
 	else
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 4339203085d..42bf57a2ab8 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -651,11 +651,13 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
 
 /*
  * Destroy a tuple queue reader.
+ *
+ * Note: cleaning up the underlying shm_mq is the caller's responsibility.
+ * We won't access it here, as it may be detached already.
  */
 void
 DestroyTupleQueueReader(TupleQueueReader *reader)
 {
-	shm_mq_detach(reader->queue);
 	if (reader->typmodmap != NULL)
 		hash_destroy(reader->typmodmap);
 	/* Is it worth trying to free substructure of the remap tree? */
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index a6512245b15..8b714193c57 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -23,17 +23,21 @@ typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
 
 typedef struct ParallelExecutorInfo
 {
-	PlanState  *planstate;
-	ParallelContext *pcxt;
-	BufferUsage *buffer_usage;
-	SharedExecutorInstrumentation *instrumentation;
-	shm_mq_handle **tqueue;
-	dsa_area   *area;
-	bool		finished;
+	PlanState  *planstate;		/* plan subtree we're running in parallel */
+	ParallelContext *pcxt;		/* parallel context we're using */
+	BufferUsage *buffer_usage;	/* points to bufusage area in DSM */
+	SharedExecutorInstrumentation *instrumentation; /* optional */
+	dsa_area   *area;			/* points to DSA area in DSM */
+	bool		finished;		/* set true by ExecParallelFinish */
+	/* These two arrays have pcxt->nworkers_launched entries: */
+	shm_mq_handle **tqueue;		/* tuple queues for worker output */
+	struct TupleQueueReader **reader;	/* tuple reader/writer support */
 } ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
 					 EState *estate, int nworkers);
+extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei,
+						  TupleDesc tupDesc);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(PlanState *planstate,
-- 
GitLab