Skip to content
Snippets Groups Projects
Select Git revision
  • benchmark-tools
  • postgres-lambda
  • master default
  • REL9_4_25
  • REL9_5_20
  • REL9_6_16
  • REL_10_11
  • REL_11_6
  • REL_12_1
  • REL_12_0
  • REL_12_RC1
  • REL_12_BETA4
  • REL9_4_24
  • REL9_5_19
  • REL9_6_15
  • REL_10_10
  • REL_11_5
  • REL_12_BETA3
  • REL9_4_23
  • REL9_5_18
  • REL9_6_14
  • REL_10_9
  • REL_11_4
23 results

nodeMergeAppend.c

Blame
    • Peter Eisentraut's avatar
      414c5a2e
      Per-column collation support · 414c5a2e
      Peter Eisentraut authored
      This adds collation support for columns and domains, a COLLATE clause
      to override it per expression, and B-tree index support.
      
      Peter Eisentraut
      reviewed by Pavel Stehule, Itagaki Takahiro, Robert Haas, Noah Misch
      414c5a2e
      History
      Per-column collation support
      Peter Eisentraut authored
      This adds collation support for columns and domains, a COLLATE clause
      to override it per expression, and B-tree index support.
      
      Peter Eisentraut
      reviewed by Pavel Stehule, Itagaki Takahiro, Robert Haas, Noah Misch
    nodeMergeAppend.c 9.61 KiB
    /*-------------------------------------------------------------------------
     *
     * nodeMergeAppend.c
     *	  routines to handle MergeAppend nodes.
     *
     * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
     * Portions Copyright (c) 1994, Regents of the University of California
     *
     *
     * IDENTIFICATION
     *	  src/backend/executor/nodeMergeAppend.c
     *
     *-------------------------------------------------------------------------
     */
    /* INTERFACE ROUTINES
     *		ExecInitMergeAppend		- initialize the MergeAppend node
     *		ExecMergeAppend			- retrieve the next tuple from the node
     *		ExecEndMergeAppend		- shut down the MergeAppend node
     *		ExecReScanMergeAppend	- rescan the MergeAppend node
     *
     *	 NOTES
     *		A MergeAppend node contains a list of one or more subplans.
     *		These are each expected to deliver tuples that are sorted according
     *		to a common sort key.  The MergeAppend node merges these streams
     *		to produce output sorted the same way.
     *
     *		MergeAppend nodes don't make use of their left and right
     *		subtrees, rather they maintain a list of subplans so
     *		a typical MergeAppend node looks like this in the plan tree:
     *
     *				   ...
     *				   /
     *				MergeAppend---+------+------+--- nil
     *				/	\		  |		 |		|
     *			  nil	nil		 ...	...    ...
     *								 subplans
     */
    
    #include "postgres.h"
    
    #include "access/nbtree.h"
    #include "executor/execdebug.h"
    #include "executor/nodeMergeAppend.h"
    #include "utils/lsyscache.h"
    
    /*
     * It gets quite confusing having a heap array (indexed by integers) which
     * contains integers which index into the slots array. These typedefs try to
     * clear it up, but they're only documentation.
     */
    typedef int		SlotNumber;
    typedef int		HeapPosition;
    
    static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
    static void heap_siftup_slot(MergeAppendState *node);
    static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
    
    
    /* ----------------------------------------------------------------
     *		ExecInitMergeAppend
     *
     *		Begin all of the subscans of the MergeAppend node.
     * ----------------------------------------------------------------
     */
    MergeAppendState *
    ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
    {
    	MergeAppendState *mergestate = makeNode(MergeAppendState);
    	PlanState **mergeplanstates;
    	int			nplans;
    	int			i;
    	ListCell   *lc;
    
    	/* check for unsupported flags */
    	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
    
    	/*
    	 * Set up empty vector of subplan states
    	 */
    	nplans = list_length(node->mergeplans);
    
    	mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
    
    	/*
    	 * create new MergeAppendState for our node
    	 */
    	mergestate->ps.plan = (Plan *) node;
    	mergestate->ps.state = estate;
    	mergestate->mergeplans = mergeplanstates;
    	mergestate->ms_nplans = nplans;
    
    	mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
    	mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
    
    	/*
    	 * Miscellaneous initialization
    	 *
    	 * MergeAppend plans don't have expression contexts because they never
    	 * call ExecQual or ExecProject.
    	 */
    
    	/*
    	 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
    	 * so we have to initialize them.
    	 */
    	ExecInitResultTupleSlot(estate, &mergestate->ps);
    
    	/*
    	 * call ExecInitNode on each of the plans to be executed and save the
    	 * results into the array "mergeplans".
    	 */
    	i = 0;
    	foreach(lc, node->mergeplans)
    	{
    		Plan	   *initNode = (Plan *) lfirst(lc);
    
    		mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
    		i++;
    	}
    
    	/*
    	 * initialize output tuple type
    	 */
    	ExecAssignResultTypeFromTL(&mergestate->ps);
    	mergestate->ps.ps_ProjInfo = NULL;
    
    	/*
    	 * initialize sort-key information
    	 */
    	mergestate->ms_nkeys = node->numCols;
    	mergestate->ms_scankeys = palloc0(sizeof(ScanKeyData) *  node->numCols);
    
    	for (i = 0; i < node->numCols; i++)
    	{
    		Oid		sortFunction;
    		bool	reverse;
    
    		if (!get_compare_function_for_ordering_op(node->sortOperators[i],
    												  &sortFunction, &reverse))
    			elog(ERROR, "operator %u is not a valid ordering operator",
    				 node->sortOperators[i]);
    
    		/*
    		 * We needn't fill in sk_strategy or sk_subtype since these scankeys
    		 * will never be passed to an index.
    		 */
    		ScanKeyInit(&mergestate->ms_scankeys[i],
    					node->sortColIdx[i],
    					InvalidStrategy,
    					sortFunction,
    					(Datum) 0);
    
    		ScanKeyEntryInitializeCollation(&mergestate->ms_scankeys[i],
    										node->collations[i]);
    
    		/* However, we use btree's conventions for encoding directionality */
    		if (reverse)
    			mergestate->ms_scankeys[i].sk_flags |= SK_BT_DESC;
    		if (node->nullsFirst[i])
    			mergestate->ms_scankeys[i].sk_flags |= SK_BT_NULLS_FIRST;
    	}
    
    	/*
    	 * initialize to show we have not run the subplans yet
    	 */
    	mergestate->ms_heap_size = 0;
    	mergestate->ms_initialized = false;
    	mergestate->ms_last_slot = -1;
    
    	return mergestate;
    }
    
    /* ----------------------------------------------------------------
     *	   ExecMergeAppend
     *
     *		Handles iteration over multiple subplans.
     * ----------------------------------------------------------------
     */
    TupleTableSlot *
    ExecMergeAppend(MergeAppendState *node)
    {
    	TupleTableSlot *result;
    	SlotNumber	i;
    
    	if (!node->ms_initialized)
    	{
    		/*
    		 * First time through: pull the first tuple from each subplan,
    		 * and set up the heap.
    		 */
    		for (i = 0; i < node->ms_nplans; i++)
    		{
    			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
    			if (!TupIsNull(node->ms_slots[i]))
    				heap_insert_slot(node, i);
    		}
    		node->ms_initialized = true;
    	}
    	else
    	{
    		/*
    		 * Otherwise, pull the next tuple from whichever subplan we returned
    		 * from last time, and insert it into the heap.  (We could simplify
    		 * the logic a bit by doing this before returning from the prior call,
    		 * but it's better to not pull tuples until necessary.)
    		 */
    		i = node->ms_last_slot;
    		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
    		if (!TupIsNull(node->ms_slots[i]))
    			heap_insert_slot(node, i);
    	}
    
    	if (node->ms_heap_size > 0)
    	{
    		/* Return the topmost heap node, and sift up the remaining nodes */
    		i = node->ms_heap[0];
    		result = node->ms_slots[i];
    		node->ms_last_slot = i;
    		heap_siftup_slot(node);
    	}
    	else
    	{
    		/* All the subplans are exhausted, and so is the heap */
    		result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
    	}
    
    	return result;
    }
    
    /*
     * Insert a new slot into the heap.  The slot must contain a valid tuple.
     */
    static void
    heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
    {
    	SlotNumber *heap = node->ms_heap;
    	HeapPosition j;
    
    	Assert(!TupIsNull(node->ms_slots[new_slot]));
    
    	j = node->ms_heap_size++;	/* j is where the "hole" is */
    	while (j > 0)
    	{
    		int		i = (j-1)/2;
    
    		if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
    			break;
    		heap[j] = heap[i];
    		j = i;
    	}
    	heap[j] = new_slot;
    }
    
    /*
     * Delete the heap top (the slot in heap[0]), and sift up.
     */
    static void
    heap_siftup_slot(MergeAppendState *node)
    {
    	SlotNumber *heap = node->ms_heap;
    	HeapPosition i,
    				n;
    
    	if (--node->ms_heap_size <= 0)
    		return;
    	n = node->ms_heap_size;		/* heap[n] needs to be reinserted */
    	i = 0;						/* i is where the "hole" is */
    	for (;;)
    	{
    		int		j = 2 * i + 1;
    
    		if (j >= n)
    			break;
    		if (j+1 < n && heap_compare_slots(node, heap[j], heap[j+1]) > 0)
    			j++;
    		if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
    			break;
    		heap[i] = heap[j];
    		i = j;
    	}
    	heap[i] = heap[n];
    }
    
    /*
     * Compare the tuples in the two given slots.
     */
    static int32
    heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
    {
    	TupleTableSlot *s1 = node->ms_slots[slot1];
    	TupleTableSlot *s2 = node->ms_slots[slot2];
    	int			nkey;
    
    	Assert(!TupIsNull(s1));
    	Assert(!TupIsNull(s2));
    
    	for (nkey = 0; nkey < node->ms_nkeys; nkey++)
    	{
    		ScanKey scankey = node->ms_scankeys + nkey;
    		AttrNumber attno = scankey->sk_attno;
    		Datum	datum1,
    				datum2;
    		bool	isNull1,
    				isNull2;
    		int32	compare;
    
    		datum1 = slot_getattr(s1, attno, &isNull1);
    		datum2 = slot_getattr(s2, attno, &isNull2);
    
    		if (isNull1)
    		{
    			if (isNull2)
    				continue;		/* NULL "=" NULL */
    			else if (scankey->sk_flags & SK_BT_NULLS_FIRST)
    				return -1;		/* NULL "<" NOT_NULL */
    			else
    				return 1;		/* NULL ">" NOT_NULL */
    		}
    		else if (isNull2)
    		{
    			if (scankey->sk_flags & SK_BT_NULLS_FIRST)
    				return 1;		/* NOT_NULL ">" NULL */
    			else
    				return -1;		/* NOT_NULL "<" NULL */
    		}
    		else
    		{
    			compare = DatumGetInt32(FunctionCall2(&scankey->sk_func,
    												  datum1, datum2));
    			if (compare != 0)
    			{
    				if (scankey->sk_flags & SK_BT_DESC)
    					compare = -compare;
    				return compare;
    			}
    		}
    	}
    	return 0;
    }
    
    /* ----------------------------------------------------------------
     *		ExecEndMergeAppend
     *
     *		Shuts down the subscans of the MergeAppend node.
     *
     *		Returns nothing of interest.
     * ----------------------------------------------------------------
     */
    void
    ExecEndMergeAppend(MergeAppendState *node)
    {
    	PlanState **mergeplans;
    	int			nplans;
    	int			i;
    
    	/*
    	 * get information from the node
    	 */
    	mergeplans = node->mergeplans;
    	nplans = node->ms_nplans;
    
    	/*
    	 * shut down each of the subscans
    	 */
    	for (i = 0; i < nplans; i++)
    		ExecEndNode(mergeplans[i]);
    }
    
    void
    ExecReScanMergeAppend(MergeAppendState *node)
    {
    	int			i;
    
    	for (i = 0; i < node->ms_nplans; i++)
    	{
    		PlanState  *subnode = node->mergeplans[i];
    
    		/*
    		 * ExecReScan doesn't know about my subplans, so I have to do
    		 * changed-parameter signaling myself.
    		 */
    		if (node->ps.chgParam != NULL)
    			UpdateChangedParamSet(subnode, node->ps.chgParam);
    
    		/*
    		 * If chgParam of subnode is not null then plan will be re-scanned by
    		 * first ExecProcNode.
    		 */
    		if (subnode->chgParam == NULL)
    			ExecReScan(subnode);
    	}
    	node->ms_heap_size = 0;
    	node->ms_initialized = false;
    	node->ms_last_slot = -1;
    }