From 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Fri, 18 Sep 2015 21:10:08 -0400
Subject: [PATCH] Glue layer to connect the executor to the shm_mq mechanism.

The shm_mq mechanism was built to send error (and notice) messages and
tuples between backends.  However, shm_mq itself only deals in raw
bytes.  Since commit 2bd9e412f92bc6a68f3e8bcb18e04955cc35001d, we have
had infrastructure for one message to redirect protocol messages to a
queue and for another backend to parse them and do useful things with
them.  This commit introduces a somewhat analogous facility for tuples
by adding a new type of DestReceiver, DestTupleQueue, which writes
each tuple generated by a query into a shm_mq, and a new
TupleQueueFunnel facility which reads raw tuples out of the queue and
reconstructs the HeapTuple format expected by the executor.

The TupleQueueFunnel abstraction supports reading from multiple tuple
streams at the same time, but only in round-robin fashion.  Someone
could imaginably want other policies, but this should be good enough
to meet our short-term needs related to parallel query, and we can
always extend it later.

This also makes one minor addition to the shm_mq API that didn'
seem worth breaking out as a separate patch.

Extracted from Amit Kapila's parallel sequential scan patch.  This
code was originally written by me, and then it was revised by Amit,
and then it was revised some more by me.
---
 src/backend/executor/Makefile    |   2 +-
 src/backend/executor/tqueue.c    | 262 +++++++++++++++++++++++++++++++
 src/backend/storage/ipc/shm_mq.c |   9 ++
 src/backend/tcop/dest.c          |   7 +
 src/include/executor/tqueue.h    |  31 ++++
 src/include/storage/shm_mq.h     |   3 +
 src/include/tcop/dest.h          |   3 +-
 src/tools/pgindent/typedefs.list |   1 +
 8 files changed, 316 insertions(+), 2 deletions(-)
 create mode 100644 src/backend/executor/tqueue.c
 create mode 100644 src/include/executor/tqueue.h

diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 08cba6fa2b5..249534bb927 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -24,6 +24,6 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
        nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
        nodeValuesscan.o nodeCtescan.o nodeWorktablescan.o \
        nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \
-       nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o spi.o
+       nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o tqueue.o spi.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
new file mode 100644
index 00000000000..d0edf4e5595
--- /dev/null
+++ b/src/backend/executor/tqueue.c
@@ -0,0 +1,262 @@
+/*-------------------------------------------------------------------------
+ *
+ * tqueue.c
+ *	  Use shm_mq to send & receive tuples between parallel backends
+ *
+ * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
+ * under the hood, writes tuples from the executor to a shm_mq.
+ *
+ * A TupleQueueFunnel helps manage the process of reading tuples from
+ * one or more shm_mq objects being used as tuple queues.
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/tqueue.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "executor/tqueue.h"
+#include "miscadmin.h"
+
+typedef struct
+{
+	DestReceiver pub;
+	shm_mq_handle *handle;
+}	TQueueDestReceiver;
+
+struct TupleQueueFunnel
+{
+	int			nqueues;
+	int			maxqueues;
+	int			nextqueue;
+	shm_mq_handle **queue;
+};
+
+/*
+ * Receive a tuple.
+ */
+static void
+tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
+{
+	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+	HeapTuple	tuple;
+
+	tuple = ExecMaterializeSlot(slot);
+	shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+}
+
+/*
+ * Prepare to receive tuples from executor.
+ */
+static void
+tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+	/* do nothing */
+}
+
+/*
+ * Clean up at end of an executor run
+ */
+static void
+tqueueShutdownReceiver(DestReceiver *self)
+{
+	/* do nothing */
+}
+
+/*
+ * Destroy receiver when done with it
+ */
+static void
+tqueueDestroyReceiver(DestReceiver *self)
+{
+	pfree(self);
+}
+
+/*
+ * Create a DestReceiver that writes tuples to a tuple queue.
+ */
+DestReceiver *
+CreateTupleQueueDestReceiver(shm_mq_handle *handle)
+{
+	TQueueDestReceiver *self;
+
+	self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
+
+	self->pub.receiveSlot = tqueueReceiveSlot;
+	self->pub.rStartup = tqueueStartupReceiver;
+	self->pub.rShutdown = tqueueShutdownReceiver;
+	self->pub.rDestroy = tqueueDestroyReceiver;
+	self->pub.mydest = DestTupleQueue;
+	self->handle = handle;
+
+	return (DestReceiver *) self;
+}
+
+/*
+ * Create a tuple queue funnel.
+ */
+TupleQueueFunnel *
+CreateTupleQueueFunnel(void)
+{
+	TupleQueueFunnel *funnel = palloc0(sizeof(TupleQueueFunnel));
+
+	funnel->maxqueues = 8;
+	funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *));
+
+	return funnel;
+}
+
+/*
+ * Destroy a tuple queue funnel.
+ */
+void
+DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
+{
+	int			i;
+
+	for (i = 0; i < funnel->nqueues; i++)
+		shm_mq_detach(shm_mq_get_queue(funnel->queue[i]));
+	pfree(funnel->queue);
+	pfree(funnel);
+}
+
+/*
+ * Remember the shared memory queue handle in funnel.
+ */
+void
+RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
+{
+	if (funnel->nqueues < funnel->maxqueues)
+	{
+		funnel->queue[funnel->nqueues++] = handle;
+		return;
+	}
+
+	if (funnel->nqueues >= funnel->maxqueues)
+	{
+		int			newsize = funnel->nqueues * 2;
+
+		Assert(funnel->nqueues == funnel->maxqueues);
+
+		funnel->queue = repalloc(funnel->queue,
+								 newsize * sizeof(shm_mq_handle *));
+		funnel->maxqueues = newsize;
+	}
+
+	funnel->queue[funnel->nqueues++] = handle;
+}
+
+/*
+ * Fetch a tuple from a tuple queue funnel.
+ *
+ * We try to read from the queues in round-robin fashion so as to avoid
+ * the situation where some workers get their tuples read expediently while
+ * others are barely ever serviced.
+ *
+ * Even when nowait = false, we read from the individual queues in
+ * non-blocking mode.  Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK,
+ * it can still accumulate bytes from a partially-read message, so doing it
+ * this way should outperform doing a blocking read on each queue in turn.
+ *
+ * The return value is NULL if there are no remaining queues or if
+ * nowait = true and no queue returned a tuple without blocking.  *done, if
+ * not NULL, is set to true when there are no remaining queues and false in
+ * any other case.
+ */
+HeapTuple
+TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
+{
+	int			waitpos = funnel->nextqueue;
+
+	/* Corner case: called before adding any queues, or after all are gone. */
+	if (funnel->nqueues == 0)
+	{
+		if (done != NULL)
+			*done = true;
+		return NULL;
+	}
+
+	if (done != NULL)
+		*done = false;
+
+	for (;;)
+	{
+		shm_mq_handle *mqh = funnel->queue[funnel->nextqueue];
+		shm_mq_result result;
+		Size		nbytes;
+		void	   *data;
+
+		/* Attempt to read a message. */
+		result = shm_mq_receive(mqh, &nbytes, &data, true);
+
+		/*
+		 * Normally, we advance funnel->nextqueue to the next queue at this
+		 * point, but if we're pointing to a queue that we've just discovered
+		 * is detached, then forget that queue and leave the pointer where it
+		 * is until the number of remaining queues fall below that pointer and
+		 * at that point make the pointer point to the first queue.
+		 */
+		if (result != SHM_MQ_DETACHED)
+			funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
+		else
+		{
+			--funnel->nqueues;
+			if (funnel->nqueues == 0)
+			{
+				if (done != NULL)
+					*done = true;
+				return NULL;
+			}
+
+			memmove(&funnel->queue[funnel->nextqueue],
+					&funnel->queue[funnel->nextqueue + 1],
+					sizeof(shm_mq_handle *)
+					* (funnel->nqueues - funnel->nextqueue));
+
+			if (funnel->nextqueue >= funnel->nqueues)
+				funnel->nextqueue = 0;
+
+			if (funnel->nextqueue < waitpos)
+				--waitpos;
+
+			continue;
+		}
+
+		/* If we got a message, return it. */
+		if (result == SHM_MQ_SUCCESS)
+		{
+			HeapTupleData htup;
+
+			/*
+			 * The tuple data we just read from the queue is only valid until
+			 * we again attempt to read from it.  Copy the tuple into a single
+			 * palloc'd chunk as callers will expect.
+			 */
+			ItemPointerSetInvalid(&htup.t_self);
+			htup.t_tableOid = InvalidOid;
+			htup.t_len = nbytes;
+			htup.t_data = data;
+			return heap_copytuple(&htup);
+		}
+
+		/*
+		 * If we've visited all of the queues, then we should either give up
+		 * and return NULL (if we're in non-blocking mode) or wait for the
+		 * process latch to be set (otherwise).
+		 */
+		if (funnel->nextqueue == waitpos)
+		{
+			if (nowait)
+				return NULL;
+			WaitLatch(MyLatch, WL_LATCH_SET, 0);
+			CHECK_FOR_INTERRUPTS();
+			ResetLatch(MyLatch);
+		}
+	}
+}
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 0e60dbcddc8..c78f1650e6a 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -745,6 +745,15 @@ shm_mq_detach(shm_mq *mq)
 		SetLatch(&victim->procLatch);
 }
 
+/*
+ * Get the shm_mq from handle.
+ */
+shm_mq *
+shm_mq_get_queue(shm_mq_handle *mqh)
+{
+	return mqh->mqh_queue;
+}
+
 /*
  * Write bytes into a shared message queue.
  */
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index bcf38952a8c..d645751ff58 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -34,6 +34,7 @@
 #include "commands/createas.h"
 #include "commands/matview.h"
 #include "executor/functions.h"
+#include "executor/tqueue.h"
 #include "executor/tstoreReceiver.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -129,6 +130,9 @@ CreateDestReceiver(CommandDest dest)
 
 		case DestTransientRel:
 			return CreateTransientRelDestReceiver(InvalidOid);
+
+		case DestTupleQueue:
+			return CreateTupleQueueDestReceiver(NULL);
 	}
 
 	/* should never get here */
@@ -162,6 +166,7 @@ EndCommand(const char *commandTag, CommandDest dest)
 		case DestCopyOut:
 		case DestSQLFunction:
 		case DestTransientRel:
+		case DestTupleQueue:
 			break;
 	}
 }
@@ -204,6 +209,7 @@ NullCommand(CommandDest dest)
 		case DestCopyOut:
 		case DestSQLFunction:
 		case DestTransientRel:
+		case DestTupleQueue:
 			break;
 	}
 }
@@ -248,6 +254,7 @@ ReadyForQuery(CommandDest dest)
 		case DestCopyOut:
 		case DestSQLFunction:
 		case DestTransientRel:
+		case DestTupleQueue:
 			break;
 	}
 }
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
new file mode 100644
index 00000000000..6f8eb73c9ae
--- /dev/null
+++ b/src/include/executor/tqueue.h
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ *
+ * tqueue.h
+ *	  Use shm_mq to send & receive tuples between parallel backends
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/tqueue.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef TQUEUE_H
+#define TQUEUE_H
+
+#include "storage/shm_mq.h"
+#include "tcop/dest.h"
+
+/* Use this to send tuples to a shm_mq. */
+extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
+
+/* Use these to receive tuples from a shm_mq. */
+typedef struct TupleQueueFunnel TupleQueueFunnel;
+extern TupleQueueFunnel *CreateTupleQueueFunnel(void);
+extern void DestroyTupleQueueFunnel(TupleQueueFunnel *funnel);
+extern void RegisterTupleQueueOnFunnel(TupleQueueFunnel *, shm_mq_handle *);
+extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, bool nowait,
+					 bool *done);
+
+#endif   /* TQUEUE_H */
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 1a2ba040cb4..7621a358ab4 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -65,6 +65,9 @@ extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
 /* Break connection. */
 extern void shm_mq_detach(shm_mq *);
 
+/* Get the shm_mq from handle. */
+extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
+
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
 			Size nbytes, const void *data, bool nowait);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index 5bcca3fbcaa..b560672fd40 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -94,7 +94,8 @@ typedef enum
 	DestIntoRel,				/* results sent to relation (SELECT INTO) */
 	DestCopyOut,				/* results sent to COPY TO code */
 	DestSQLFunction,			/* results sent to SQL-language func mgr */
-	DestTransientRel			/* results sent to transient relation */
+	DestTransientRel,			/* results sent to transient relation */
+	DestTupleQueue				/* results sent to tuple queue */
 } CommandDest;
 
 /* ----------------
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 4b650d1bde7..a037f818acc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2014,6 +2014,7 @@ TupleHashEntry
 TupleHashEntryData
 TupleHashIterator
 TupleHashTable
+TupleQueueFunnel
 TupleTableSlot
 Tuplesortstate
 Tuplestorestate
-- 
GitLab