diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 08cba6fa2b5f2bfbd444ee5e36d06a1ac305ba58..249534bb92720ac15c7498493eb15aa7a2b68bec 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -24,6 +24,6 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \ nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ nodeValuesscan.o nodeCtescan.o nodeWorktablescan.o \ nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \ - nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o spi.o + nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o tqueue.o spi.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c new file mode 100644 index 0000000000000000000000000000000000000000..d0edf4e5595bae0d7678797a163b3c81fd3fc92b --- /dev/null +++ b/src/backend/executor/tqueue.c @@ -0,0 +1,262 @@ +/*------------------------------------------------------------------------- + * + * tqueue.c + * Use shm_mq to send & receive tuples between parallel backends + * + * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver + * under the hood, writes tuples from the executor to a shm_mq. + * + * A TupleQueueFunnel helps manage the process of reading tuples from + * one or more shm_mq objects being used as tuple queues. + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/tqueue.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "executor/tqueue.h" +#include "miscadmin.h" + +typedef struct +{ + DestReceiver pub; + shm_mq_handle *handle; +} TQueueDestReceiver; + +struct TupleQueueFunnel +{ + int nqueues; + int maxqueues; + int nextqueue; + shm_mq_handle **queue; +}; + +/* + * Receive a tuple. + */ +static void +tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) +{ + TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + HeapTuple tuple; + + tuple = ExecMaterializeSlot(slot); + shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); +} + +/* + * Prepare to receive tuples from executor. + */ +static void +tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + /* do nothing */ +} + +/* + * Clean up at end of an executor run + */ +static void +tqueueShutdownReceiver(DestReceiver *self) +{ + /* do nothing */ +} + +/* + * Destroy receiver when done with it + */ +static void +tqueueDestroyReceiver(DestReceiver *self) +{ + pfree(self); +} + +/* + * Create a DestReceiver that writes tuples to a tuple queue. + */ +DestReceiver * +CreateTupleQueueDestReceiver(shm_mq_handle *handle) +{ + TQueueDestReceiver *self; + + self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver)); + + self->pub.receiveSlot = tqueueReceiveSlot; + self->pub.rStartup = tqueueStartupReceiver; + self->pub.rShutdown = tqueueShutdownReceiver; + self->pub.rDestroy = tqueueDestroyReceiver; + self->pub.mydest = DestTupleQueue; + self->handle = handle; + + return (DestReceiver *) self; +} + +/* + * Create a tuple queue funnel. + */ +TupleQueueFunnel * +CreateTupleQueueFunnel(void) +{ + TupleQueueFunnel *funnel = palloc0(sizeof(TupleQueueFunnel)); + + funnel->maxqueues = 8; + funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *)); + + return funnel; +} + +/* + * Destroy a tuple queue funnel. + */ +void +DestroyTupleQueueFunnel(TupleQueueFunnel *funnel) +{ + int i; + + for (i = 0; i < funnel->nqueues; i++) + shm_mq_detach(shm_mq_get_queue(funnel->queue[i])); + pfree(funnel->queue); + pfree(funnel); +} + +/* + * Remember the shared memory queue handle in funnel. + */ +void +RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle) +{ + if (funnel->nqueues < funnel->maxqueues) + { + funnel->queue[funnel->nqueues++] = handle; + return; + } + + if (funnel->nqueues >= funnel->maxqueues) + { + int newsize = funnel->nqueues * 2; + + Assert(funnel->nqueues == funnel->maxqueues); + + funnel->queue = repalloc(funnel->queue, + newsize * sizeof(shm_mq_handle *)); + funnel->maxqueues = newsize; + } + + funnel->queue[funnel->nqueues++] = handle; +} + +/* + * Fetch a tuple from a tuple queue funnel. + * + * We try to read from the queues in round-robin fashion so as to avoid + * the situation where some workers get their tuples read expediently while + * others are barely ever serviced. + * + * Even when nowait = false, we read from the individual queues in + * non-blocking mode. Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, + * it can still accumulate bytes from a partially-read message, so doing it + * this way should outperform doing a blocking read on each queue in turn. + * + * The return value is NULL if there are no remaining queues or if + * nowait = true and no queue returned a tuple without blocking. *done, if + * not NULL, is set to true when there are no remaining queues and false in + * any other case. + */ +HeapTuple +TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done) +{ + int waitpos = funnel->nextqueue; + + /* Corner case: called before adding any queues, or after all are gone. */ + if (funnel->nqueues == 0) + { + if (done != NULL) + *done = true; + return NULL; + } + + if (done != NULL) + *done = false; + + for (;;) + { + shm_mq_handle *mqh = funnel->queue[funnel->nextqueue]; + shm_mq_result result; + Size nbytes; + void *data; + + /* Attempt to read a message. */ + result = shm_mq_receive(mqh, &nbytes, &data, true); + + /* + * Normally, we advance funnel->nextqueue to the next queue at this + * point, but if we're pointing to a queue that we've just discovered + * is detached, then forget that queue and leave the pointer where it + * is until the number of remaining queues fall below that pointer and + * at that point make the pointer point to the first queue. + */ + if (result != SHM_MQ_DETACHED) + funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues; + else + { + --funnel->nqueues; + if (funnel->nqueues == 0) + { + if (done != NULL) + *done = true; + return NULL; + } + + memmove(&funnel->queue[funnel->nextqueue], + &funnel->queue[funnel->nextqueue + 1], + sizeof(shm_mq_handle *) + * (funnel->nqueues - funnel->nextqueue)); + + if (funnel->nextqueue >= funnel->nqueues) + funnel->nextqueue = 0; + + if (funnel->nextqueue < waitpos) + --waitpos; + + continue; + } + + /* If we got a message, return it. */ + if (result == SHM_MQ_SUCCESS) + { + HeapTupleData htup; + + /* + * The tuple data we just read from the queue is only valid until + * we again attempt to read from it. Copy the tuple into a single + * palloc'd chunk as callers will expect. + */ + ItemPointerSetInvalid(&htup.t_self); + htup.t_tableOid = InvalidOid; + htup.t_len = nbytes; + htup.t_data = data; + return heap_copytuple(&htup); + } + + /* + * If we've visited all of the queues, then we should either give up + * and return NULL (if we're in non-blocking mode) or wait for the + * process latch to be set (otherwise). + */ + if (funnel->nextqueue == waitpos) + { + if (nowait) + return NULL; + WaitLatch(MyLatch, WL_LATCH_SET, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(MyLatch); + } + } +} diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 0e60dbcddc82ec14612980372af4a790b1450a73..c78f1650e6af4eaf741be75448766ce7a09ec03c 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -745,6 +745,15 @@ shm_mq_detach(shm_mq *mq) SetLatch(&victim->procLatch); } +/* + * Get the shm_mq from handle. + */ +shm_mq * +shm_mq_get_queue(shm_mq_handle *mqh) +{ + return mqh->mqh_queue; +} + /* * Write bytes into a shared message queue. */ diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index bcf38952a8c2af18e009a1340e7177e6f684ce58..d645751ff580a92dff9b4ad80d5c983b4d1381a7 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -34,6 +34,7 @@ #include "commands/createas.h" #include "commands/matview.h" #include "executor/functions.h" +#include "executor/tqueue.h" #include "executor/tstoreReceiver.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -129,6 +130,9 @@ CreateDestReceiver(CommandDest dest) case DestTransientRel: return CreateTransientRelDestReceiver(InvalidOid); + + case DestTupleQueue: + return CreateTupleQueueDestReceiver(NULL); } /* should never get here */ @@ -162,6 +166,7 @@ EndCommand(const char *commandTag, CommandDest dest) case DestCopyOut: case DestSQLFunction: case DestTransientRel: + case DestTupleQueue: break; } } @@ -204,6 +209,7 @@ NullCommand(CommandDest dest) case DestCopyOut: case DestSQLFunction: case DestTransientRel: + case DestTupleQueue: break; } } @@ -248,6 +254,7 @@ ReadyForQuery(CommandDest dest) case DestCopyOut: case DestSQLFunction: case DestTransientRel: + case DestTupleQueue: break; } } diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h new file mode 100644 index 0000000000000000000000000000000000000000..6f8eb73c9ae155b0a821ca0a7a0bdbb65293ba76 --- /dev/null +++ b/src/include/executor/tqueue.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * tqueue.h + * Use shm_mq to send & receive tuples between parallel backends + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/tqueue.h + * + *------------------------------------------------------------------------- + */ + +#ifndef TQUEUE_H +#define TQUEUE_H + +#include "storage/shm_mq.h" +#include "tcop/dest.h" + +/* Use this to send tuples to a shm_mq. */ +extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle); + +/* Use these to receive tuples from a shm_mq. */ +typedef struct TupleQueueFunnel TupleQueueFunnel; +extern TupleQueueFunnel *CreateTupleQueueFunnel(void); +extern void DestroyTupleQueueFunnel(TupleQueueFunnel *funnel); +extern void RegisterTupleQueueOnFunnel(TupleQueueFunnel *, shm_mq_handle *); +extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, bool nowait, + bool *done); + +#endif /* TQUEUE_H */ diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index 1a2ba040cb43abfafbb6fbf8063a04f4247f597b..7621a358ab469a5e70f7e9586ac8406d5b80293b 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -65,6 +65,9 @@ extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *); /* Break connection. */ extern void shm_mq_detach(shm_mq *); +/* Get the shm_mq from handle. */ +extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh); + /* Send or receive messages. */ extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait); diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 5bcca3fbcaacdf9cba7784c977dac2fefe3d6790..b560672fd40f74fac9770d70fa0999602a015e18 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -94,7 +94,8 @@ typedef enum DestIntoRel, /* results sent to relation (SELECT INTO) */ DestCopyOut, /* results sent to COPY TO code */ DestSQLFunction, /* results sent to SQL-language func mgr */ - DestTransientRel /* results sent to transient relation */ + DestTransientRel, /* results sent to transient relation */ + DestTupleQueue /* results sent to tuple queue */ } CommandDest; /* ---------------- diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 4b650d1bde73157e246eb227a1de6d2d4b06d224..a037f818acc95819d47a5c42436391a67b9da34c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2014,6 +2014,7 @@ TupleHashEntry TupleHashEntryData TupleHashIterator TupleHashTable +TupleQueueFunnel TupleTableSlot Tuplesortstate Tuplestorestate