diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 37b7bbd413b3c629d95d2e27f0e9ea01adfebf04..a55022e0a8029b36537c378e7a6f00a3aa6ac503 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -76,6 +76,7 @@ static void CheckValidRowMarkRel(Relation rel, RowMarkType markType); static void ExecPostprocessPlan(EState *estate); static void ExecEndPlan(PlanState *planstate, EState *estate); static void ExecutePlan(EState *estate, PlanState *planstate, + bool use_parallel_mode, CmdType operation, bool sendTuples, long numberTuples, @@ -243,11 +244,6 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) if (!(eflags & (EXEC_FLAG_SKIP_TRIGGERS | EXEC_FLAG_EXPLAIN_ONLY))) AfterTriggerBeginQuery(); - /* Enter parallel mode, if required by the query. */ - if (queryDesc->plannedstmt->parallelModeNeeded && - !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) - EnterParallelMode(); - MemoryContextSwitchTo(oldcontext); } @@ -341,15 +337,13 @@ standard_ExecutorRun(QueryDesc *queryDesc, if (!ScanDirectionIsNoMovement(direction)) ExecutePlan(estate, queryDesc->planstate, + queryDesc->plannedstmt->parallelModeNeeded, operation, sendTuples, count, direction, dest); - /* Allow nodes to release or shut down resources. */ - (void) ExecShutdownNode(queryDesc->planstate); - /* * shutdown tuple receiver, if we started it */ @@ -482,11 +476,6 @@ standard_ExecutorEnd(QueryDesc *queryDesc) */ MemoryContextSwitchTo(oldcontext); - /* Exit parallel mode, if it was required by the query. */ - if (queryDesc->plannedstmt->parallelModeNeeded && - !(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)) - ExitParallelMode(); - /* * Release EState and per-query memory context. This should release * everything the executor has allocated. @@ -1529,6 +1518,7 @@ ExecEndPlan(PlanState *planstate, EState *estate) static void ExecutePlan(EState *estate, PlanState *planstate, + bool use_parallel_mode, CmdType operation, bool sendTuples, long numberTuples, @@ -1548,6 +1538,20 @@ ExecutePlan(EState *estate, */ estate->es_direction = direction; + /* + * If a tuple count was supplied, we must force the plan to run without + * parallelism, because we might exit early. + */ + if (numberTuples != 0) + use_parallel_mode = false; + + /* + * If a tuple count was supplied, we must force the plan to run without + * parallelism, because we might exit early. + */ + if (use_parallel_mode) + EnterParallelMode(); + /* * Loop until we've processed the proper number of tuples from the plan. */ @@ -1566,7 +1570,11 @@ ExecutePlan(EState *estate, * process so we just end the loop... */ if (TupIsNull(slot)) + { + /* Allow nodes to release or shut down resources. */ + (void) ExecShutdownNode(planstate); break; + } /* * If we have a junk filter, then project a new tuple with the junk @@ -1603,6 +1611,9 @@ ExecutePlan(EState *estate, if (numberTuples && numberTuples == current_tuple_count) break; } + + if (use_parallel_mode) + ExitParallelMode(); } diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index e6930c1d51c929f4c1306ea38b749a757543929e..3bb820692d26702703013ea107bdfcb1b3852166 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -442,6 +442,23 @@ ExecParallelFinish(ParallelExecutorInfo *pei) pei->instrumentation); } +/* + * Clean up whatever ParallelExecutreInfo resources still exist after + * ExecParallelFinish. We separate these routines because someone might + * want to examine the contents of the DSM after ExecParallelFinish and + * before calling this routine. + */ +void +ExecParallelCleanup(ParallelExecutorInfo *pei) +{ + if (pei->pcxt != NULL) + { + DestroyParallelContext(pei->pcxt); + pei->pcxt = NULL; + } + pfree(pei); +} + /* * Create a DestReceiver to write tuples we produce to the shm_mq designated * for that purpose. diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index c689a4d17a0316b9498338f04b6df1185129ffa1..7e2272f634ba460782adc30f78ea52cf1011dac2 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -16,6 +16,7 @@ #include "postgres.h" #include "access/relscan.h" +#include "access/xact.h" #include "executor/execdebug.h" #include "executor/execParallel.h" #include "executor/nodeGather.h" @@ -45,7 +46,6 @@ ExecInitGather(Gather *node, EState *estate, int eflags) gatherstate = makeNode(GatherState); gatherstate->ps.plan = (Plan *) node; gatherstate->ps.state = estate; - gatherstate->need_to_scan_workers = false; gatherstate->need_to_scan_locally = !node->single_copy; /* @@ -106,52 +106,57 @@ ExecGather(GatherState *node) * needs to allocate large dynamic segement, so it is better to do if it * is really needed. */ - if (!node->pei) + if (!node->initialized) { EState *estate = node->ps.state; - - /* Initialize the workers required to execute Gather node. */ - node->pei = ExecInitParallelPlan(node->ps.lefttree, - estate, - ((Gather *) (node->ps.plan))->num_workers); + Gather *gather = (Gather *) node->ps.plan; /* - * Register backend workers. If the required number of workers are not - * available then we perform the scan with available workers and if - * there are no more workers available, then the Gather node will just - * scan locally. + * Sometimes we might have to run without parallelism; but if + * parallel mode is active then we can try to fire up some workers. */ - LaunchParallelWorkers(node->pei->pcxt); - - node->funnel = CreateTupleQueueFunnel(); - - for (i = 0; i < node->pei->pcxt->nworkers; ++i) + if (gather->num_workers > 0 && IsInParallelMode()) { - if (node->pei->pcxt->worker[i].bgwhandle) + bool got_any_worker = false; + + /* Initialize the workers required to execute Gather node. */ + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + gather->num_workers); + + /* + * Register backend workers. We might not get as many as we + * requested, or indeed any at all. + */ + LaunchParallelWorkers(node->pei->pcxt); + + /* Set up a tuple queue to collect the results. */ + node->funnel = CreateTupleQueueFunnel(); + for (i = 0; i < node->pei->pcxt->nworkers; ++i) { - shm_mq_set_handle(node->pei->tqueue[i], - node->pei->pcxt->worker[i].bgwhandle); - RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]); - node->need_to_scan_workers = true; + if (node->pei->pcxt->worker[i].bgwhandle) + { + shm_mq_set_handle(node->pei->tqueue[i], + node->pei->pcxt->worker[i].bgwhandle); + RegisterTupleQueueOnFunnel(node->funnel, + node->pei->tqueue[i]); + got_any_worker = true; + } } + + /* No workers? Then never mind. */ + if (!got_any_worker) + ExecShutdownGather(node); } - /* If no workers are available, we must always scan locally. */ - if (!node->need_to_scan_workers) - node->need_to_scan_locally = true; + /* Run plan locally if no workers or not single-copy. */ + node->need_to_scan_locally = (node->funnel == NULL) + || !gather->single_copy; + node->initialized = true; } slot = gather_getnext(node); - if (TupIsNull(slot)) - { - /* - * Destroy the parallel context once we complete fetching all the - * tuples. Otherwise, the DSM and workers will stick around for the - * lifetime of the entire statement. - */ - ExecShutdownGather(node); - } return slot; } @@ -194,10 +199,9 @@ gather_getnext(GatherState *gatherstate) */ slot = gatherstate->ps.ps_ProjInfo->pi_slot; - while (gatherstate->need_to_scan_workers || - gatherstate->need_to_scan_locally) + while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally) { - if (gatherstate->need_to_scan_workers) + if (gatherstate->funnel != NULL) { bool done = false; @@ -206,7 +210,7 @@ gather_getnext(GatherState *gatherstate) gatherstate->need_to_scan_locally, &done); if (done) - gatherstate->need_to_scan_workers = false; + ExecShutdownGather(gatherstate); if (HeapTupleIsValid(tup)) { @@ -247,30 +251,20 @@ gather_getnext(GatherState *gatherstate) void ExecShutdownGather(GatherState *node) { - Gather *gather; - - if (node->pei == NULL || node->pei->pcxt == NULL) - return; - - /* - * Ensure all workers have finished before destroying the parallel context - * to ensure a clean exit. - */ - if (node->funnel) + /* Shut down tuple queue funnel before shutting down workers. */ + if (node->funnel != NULL) { DestroyTupleQueueFunnel(node->funnel); node->funnel = NULL; } - ExecParallelFinish(node->pei); - - /* destroy parallel context. */ - DestroyParallelContext(node->pei->pcxt); - node->pei->pcxt = NULL; - - gather = (Gather *) node->ps.plan; - node->need_to_scan_locally = !gather->single_copy; - node->need_to_scan_workers = false; + /* Now shut down the workers. */ + if (node->pei != NULL) + { + ExecParallelFinish(node->pei); + ExecParallelCleanup(node->pei); + node->pei = NULL; + } } /* ---------------------------------------------------------------- @@ -295,5 +289,7 @@ ExecReScanGather(GatherState *node) */ ExecShutdownGather(node); + node->initialized = false; + ExecReScan(node->ps.lefttree); } diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 4fc797ad9820600672632aa9d2575a70b930b94c..505500e76b54898bbe12105a51da6a8ef91a7154 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -32,5 +32,6 @@ typedef struct ParallelExecutorInfo extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers); extern void ExecParallelFinish(ParallelExecutorInfo *pei); +extern void ExecParallelCleanup(ParallelExecutorInfo *pei); #endif /* EXECPARALLEL_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 23670e1ff9bac8689e40ef8bd75e38d15efcf530..4fcdcc4067a09d7afade70b2e7299ee3fbe7ac4e 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1961,9 +1961,9 @@ typedef struct UniqueState typedef struct GatherState { PlanState ps; /* its first field is NodeTag */ + bool initialized; struct ParallelExecutorInfo *pei; struct TupleQueueFunnel *funnel; - bool need_to_scan_workers; bool need_to_scan_locally; } GatherState;