diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index 2240564fa25b4986f5f7f445a5d84667bf79a992..5297fde36dcff0b5da385d9fe4a974cebe1535a6 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -19,13 +19,16 @@ * disk space as soon as each block is read from its "tape". * * We do not form the initial runs using Knuth's recommended replacement - * selection method (Algorithm 5.4.1R), because it uses a fixed number of - * records in memory at all times. Since we are dealing with tuples that - * may vary considerably in size, we want to be able to vary the number of - * records kept in memory to ensure full utilization of the allowed sort - * memory space. This is easily done by keeping a variable-size heap in - * which the records of the current run are stored, plus a variable-size - * unsorted array holding records that must go into the next run. + * selection data structure (Algorithm 5.4.1R), because it uses a fixed + * number of records in memory at all times. Since we are dealing with + * tuples that may vary considerably in size, we want to be able to vary + * the number of records kept in memory to ensure full utilization of the + * allowed sort memory space. So, we keep the tuples in a variable-size + * heap, with the next record to go out at the top of the heap. Like + * Algorithm 5.4.1R, each record is stored with the run number that it + * must go into, and we use (run number, key) as the ordering key for the + * heap. When the run number at the top of the heap changes, we know that + * no more records of the prior run are left in the heap. * * The (approximate) amount of memory allowed for any one sort operation * is given in kilobytes by the external variable SortMem. Initially, @@ -35,13 +38,32 @@ * tuples just by scanning the tuple array sequentially. If we do exceed * SortMem, we construct a heap using Algorithm H and begin to emit tuples * into sorted runs in temporary tapes, emitting just enough tuples at each - * step to get back within the SortMem limit. New tuples are added to the - * heap if they can go into the current run, else they are temporarily added - * to the unsorted array. Whenever the heap empties, we construct a new heap - * from the current contents of the unsorted array, and begin a new run with a - * new output tape (selected per Algorithm D). After the end of the input - * is reached, we dump out remaining tuples in memory into a final run - * (or two), then merge the runs using Algorithm D. + * step to get back within the SortMem limit. Whenever the run number at + * the top of the heap changes, we begin a new run with a new output tape + * (selected per Algorithm D). After the end of the input is reached, + * we dump out remaining tuples in memory into a final run (or two), + * then merge the runs using Algorithm D. + * + * When merging runs, we use a heap containing just the frontmost tuple from + * each source run; we repeatedly output the smallest tuple and insert the + * next tuple from its source tape (if any). When the heap empties, the merge + * is complete. The basic merge algorithm thus needs very little memory --- + * only M tuples for an M-way merge, and M is at most six in the present code. + * However, we can still make good use of our full SortMem allocation by + * pre-reading additional tuples from each source tape. Without prereading, + * our access pattern to the temporary file would be very erratic; on average + * we'd read one block from each of M source tapes during the same time that + * we're writing M blocks to the output tape, so there is no sequentiality of + * access at all, defeating the read-ahead methods used by most Unix kernels. + * Worse, the output tape gets written into a very random sequence of blocks + * of the temp file, ensuring that things will be even worse when it comes + * time to read that tape. A straightforward merge pass thus ends up doing a + * lot of waiting for disk seeks. We can improve matters by prereading from + * each source tape sequentially, loading about SortMem/M bytes from each tape + * in turn. Then we run the merge algorithm, writing but not reading until + * one of the preloaded tuple series runs out. Then we switch back to preread + * mode, fill memory again, and repeat. This approach helps to localize both + * read and write accesses. * * When the caller requests random access to the sort result, we form * the final sorted run on a logical tape which is then "frozen", so @@ -55,7 +77,7 @@ * Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.1 1999/10/17 22:15:05 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.2 1999/10/30 17:27:15 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -129,30 +151,65 @@ struct Tuplesortstate * of memory space consumed. */ void * (*readtup) (Tuplesortstate *state, int tapenum, unsigned int len); + /* + * Obtain memory space occupied by a stored tuple. (This routine is + * only needed in the FINALMERGE case, since copytup, writetup, and + * readtup are expected to adjust availMem appropriately.) + */ + unsigned int (*tuplesize) (Tuplesortstate *state, void *tup); /* - * This array holds "unsorted" tuples during the input phases. - * If we are able to complete the sort in memory, it holds the - * final sorted result as well. + * This array holds pointers to tuples in sort memory. If we are in + * state INITIAL, the tuples are in no particular order; if we are in + * state SORTEDINMEM, the tuples are in final sorted order; in states + * BUILDRUNS and FINALMERGE, the tuples are organized in "heap" order + * per Algorithm H. (Note that memtupcount only counts the tuples that + * are part of the heap --- during merge passes, memtuples[] entries + * beyond TAPERANGE are never in the heap and are used to hold + * pre-read tuples.) In state SORTEDONTAPE, the array is not used. */ void **memtuples; /* array of pointers to palloc'd tuples */ int memtupcount; /* number of tuples currently present */ int memtupsize; /* allocated length of memtuples array */ /* - * This array holds the partially-sorted "heap" of tuples that will go - * out in the current run during BUILDRUNS state. While completing - * the sort, we use it to merge runs of tuples from input tapes. - * It is never allocated unless we need to use tapes. + * While building initial runs, this array holds the run number for each + * tuple in memtuples[]. During merge passes, we re-use it to hold the + * input tape number that each tuple in the heap was read from, or to hold + * the index of the next tuple pre-read from the same tape in the case of + * pre-read entries. This array is never allocated unless we need to use + * tapes. Whenever it is allocated, it has the same length as + * memtuples[]. */ - void **heaptuples; /* array of pointers to palloc'd tuples */ - int heaptupcount; /* number of tuples currently present */ - int heaptupsize; /* allocated length of heaptuples array */ + int *memtupindex; /* index value associated with memtuples[i] */ + + /* + * While building initial runs, this is the current output run number + * (starting at 0). Afterwards, it is the number of initial runs we made. + */ + int currentRun; + /* - * While merging, this array holds the actual number of the input tape - * that each tuple in heaptuples[] came from. + * These variables are only used during merge passes. mergeactive[i] + * is true if we are reading an input run from (actual) tape number i + * and have not yet exhausted that run. mergenext[i] is the memtuples + * index of the next pre-read tuple (next to be loaded into the heap) + * for tape i, or 0 if we are out of pre-read tuples. mergelast[i] + * similarly points to the last pre-read tuple from each tape. + * mergeavailmem[i] is the amount of unused space allocated for tape i. + * mergefreelist and mergefirstfree keep track of unused locations + * in the memtuples[] array. memtupindex[] links together pre-read + * tuples for each tape as well as recycled locations in mergefreelist. + * It is OK to use 0 as a null link in these lists, because memtuples[0] + * is part of the merge heap and is never a pre-read tuple. */ - int *heapsrctapes; + bool mergeactive[MAXTAPES]; /* Active input run source? */ + int mergenext[MAXTAPES]; /* first preread tuple for each source */ + int mergelast[MAXTAPES]; /* last preread tuple for each source */ + long mergeavailmem[MAXTAPES]; /* availMem for prereading tapes */ + long spacePerTape; /* actual per-tape target usage */ + int mergefreelist; /* head of freelist of recycled slots */ + int mergefirstfree; /* first slot never used in this merge */ /* * Variables for Algorithm D. Note that destTape is a "logical" tape @@ -166,8 +223,6 @@ struct Tuplesortstate int tp_dummy[MAXTAPES]; /* # of dummy runs for each tape (D[]) */ int tp_tapenum[MAXTAPES]; /* Actual tape numbers (TAPE[]) */ - bool multipleRuns; /* T if we have created more than 1 run */ - /* * These variables are used after completion of sorting to keep track * of the next tuple to return. (In the tape case, the tape's current @@ -202,6 +257,7 @@ struct Tuplesortstate #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup)) #define WRITETUP(state,tape,tup) ((*(state)->writetup) (state, tape, tup)) #define READTUP(state,tape,len) ((*(state)->readtup) (state, tape, len)) +#define TUPLESIZE(state,tup) ((*(state)->tuplesize) (state, tup)) #define LACKMEM(state) ((state)->availMem < 0) #define USEMEM(state,amt) ((state)->availMem -= (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt)) @@ -239,7 +295,7 @@ struct Tuplesortstate * * We count space requested for tuples against the SortMem limit. * Fixed-size space (primarily the LogicalTapeSet I/O buffers) is not - * counted, nor do we count the variable-size memtuples and heaptuples + * counted, nor do we count the variable-size memtuples and memtupindex * arrays. (Even though those could grow pretty large, they should be * small compared to the tuples proper, so this is not unreasonable.) * @@ -271,11 +327,11 @@ static void selectnewtape(Tuplesortstate *state); static void mergeruns(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state); static void beginmerge(Tuplesortstate *state); -static void beginrun(Tuplesortstate *state); +static void mergepreread(Tuplesortstate *state); static void dumptuples(Tuplesortstate *state, bool alltuples); static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple, - int tapenum); -static void tuplesort_heap_siftup(Tuplesortstate *state); + int tupleindex, bool checkIndex); +static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static void markrunend(Tuplesortstate *state, int tapenum); static int qsort_comparetup(const void *a, const void *b); @@ -285,12 +341,14 @@ static void *copytup_heap(Tuplesortstate *state, void *tup); static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup); static void *readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len); +static unsigned int tuplesize_heap(Tuplesortstate *state, void *tup); static int comparetup_index(Tuplesortstate *state, const void *a, const void *b); static void *copytup_index(Tuplesortstate *state, void *tup); static void writetup_index(Tuplesortstate *state, int tapenum, void *tup); static void *readtup_index(Tuplesortstate *state, int tapenum, unsigned int len); +static unsigned int tuplesize_index(Tuplesortstate *state, void *tup); /* * Since qsort(3) will not pass any context info to qsort_comparetup(), @@ -332,10 +390,9 @@ tuplesort_begin_common(bool randomAccess) state->memtupsize = 1024; /* initial guess */ state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *)); - state->heaptuples = NULL; /* until and unless needed */ - state->heaptupcount = 0; - state->heaptupsize = 0; - state->heapsrctapes = NULL; + state->memtupindex = NULL; /* until and unless needed */ + + state->currentRun = 0; /* Algorithm D variables will be initialized by inittapes, if needed */ @@ -359,6 +416,7 @@ tuplesort_begin_heap(TupleDesc tupDesc, state->copytup = copytup_heap; state->writetup = writetup_heap; state->readtup = readtup_heap; + state->tuplesize = tuplesize_heap; state->tupDesc = tupDesc; state->nKeys = nkeys; @@ -378,6 +436,7 @@ tuplesort_begin_index(Relation indexRel, state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; + state->tuplesize = tuplesize_index; state->indexRel = indexRel; state->enforceUnique = enforceUnique; @@ -403,14 +462,8 @@ tuplesort_end(Tuplesortstate *state) pfree(state->memtuples[i]); pfree(state->memtuples); } - if (state->heaptuples) - { - for (i = 0; i < state->heaptupcount; i++) - pfree(state->heaptuples[i]); - pfree(state->heaptuples); - } - if (state->heapsrctapes) - pfree(state->heapsrctapes); + if (state->memtupindex) + pfree(state->memtupindex); } /* @@ -450,7 +503,6 @@ tuplesort_puttuple(Tuplesortstate *state, void *tuple) * Nope; time to switch to tape-based operation. */ inittapes(state); - beginrun(state); /* * Dump tuples until we are back under the limit. */ @@ -458,36 +510,23 @@ tuplesort_puttuple(Tuplesortstate *state, void *tuple) break; case TSS_BUILDRUNS: /* - * Insert the copied tuple into the heap if it can go into the - * current run; otherwise add it to the unsorted array, whence - * it will go into the next run. - * - * The tuple can go into the current run if it is >= the first - * not-yet-output tuple. (Actually, it could go into the current - * run if it is >= the most recently output tuple ... but that - * would require keeping around the tuple we last output, and - * it's simplest to let writetup free the tuple when written.) + * Insert the copied tuple into the heap, with run number + * currentRun if it can go into the current run, else run + * number currentRun+1. The tuple can go into the current run + * if it is >= the first not-yet-output tuple. (Actually, + * it could go into the current run if it is >= the most recently + * output tuple ... but that would require keeping around the + * tuple we last output, and it's simplest to let writetup free + * each tuple as soon as it's written.) * * Note there will always be at least one tuple in the heap * at this point; see dumptuples. */ - Assert(state->heaptupcount > 0); - if (COMPARETUP(state, tuple, state->heaptuples[0]) >= 0) - { - tuplesort_heap_insert(state, tuple, 0); - } + Assert(state->memtupcount > 0); + if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0) + tuplesort_heap_insert(state, tuple, state->currentRun, true); else - { - if (state->memtupcount >= state->memtupsize) - { - /* Grow the unsorted array as needed. */ - state->memtupsize *= 2; - state->memtuples = (void **) - repalloc(state->memtuples, - state->memtupsize * sizeof(void *)); - } - state->memtuples[state->memtupcount++] = tuple; - } + tuplesort_heap_insert(state, tuple, state->currentRun+1, true); /* * If we are over the memory limit, dump tuples till we're under. */ @@ -529,7 +568,7 @@ tuplesort_performsort(Tuplesortstate *state) * Finish tape-based sort. First, flush all tuples remaining * in memory out to tape; then merge until we have a single * remaining run (or, if !randomAccess, one run per tape). - * Note that mergeruns sets the correct status. + * Note that mergeruns sets the correct state->status. */ dumptuples(state, true); mergeruns(state); @@ -675,17 +714,35 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward, /* * This code should match the inner loop of mergeonerun(). */ - if (state->heaptupcount > 0) + if (state->memtupcount > 0) { - int srcTape = state->heapsrctapes[0]; - - tup = state->heaptuples[0]; - tuplesort_heap_siftup(state); - if ((tuplen = getlen(state, srcTape, true)) != 0) + int srcTape = state->memtupindex[0]; + unsigned int tuplen; + int tupIndex; + void *newtup; + + tup = state->memtuples[0]; + /* returned tuple is no longer counted in our memory space */ + tuplen = TUPLESIZE(state, tup); + state->availMem += tuplen; + state->mergeavailmem[srcTape] += tuplen; + tuplesort_heap_siftup(state, false); + if ((tupIndex = state->mergenext[srcTape]) == 0) { - void *newtup = READTUP(state, srcTape, tuplen); - tuplesort_heap_insert(state, newtup, srcTape); + /* out of preloaded data on this tape, try to read more */ + mergepreread(state); + /* if still no data, we've reached end of run on this tape */ + if ((tupIndex = state->mergenext[srcTape]) == 0) + return tup; } + /* pull next preread tuple from list, insert in heap */ + newtup = state->memtuples[tupIndex]; + state->mergenext[srcTape] = state->memtupindex[tupIndex]; + if (state->mergenext[srcTape] == 0) + state->mergelast[srcTape] = 0; + state->memtupindex[tupIndex] = state->mergefreelist; + state->mergefreelist = tupIndex; + tuplesort_heap_insert(state, newtup, srcTape, false); return tup; } return NULL; @@ -704,18 +761,31 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward, static void inittapes(Tuplesortstate *state) { - int j; + int ntuples, + j; state->tapeset = LogicalTapeSetCreate(MAXTAPES); /* - * Initialize heaptuples array slightly larger than current memtuples - * usage; memtupcount is probably a good guess at how many tuples we - * will be able to have in the heap at once. + * Allocate the memtupindex array, same size as memtuples. + */ + state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int)); + + /* + * Convert the unsorted contents of memtuples[] into a heap. + * Each tuple is marked as belonging to run number zero. + * + * NOTE: we pass false for checkIndex since there's no point in + * comparing indexes in this step, even though we do intend the + * indexes to be part of the sort key... */ - state->heaptupcount = 0; - state->heaptupsize = state->memtupcount + state->memtupcount / 4; - state->heaptuples = (void **) palloc(state->heaptupsize * sizeof(void *)); + ntuples = state->memtupcount; + state->memtupcount = 0; /* make the heap empty */ + for (j = 0; j < ntuples; j++) + tuplesort_heap_insert(state, state->memtuples[j], 0, false); + Assert(state->memtupcount == ntuples); + + state->currentRun = 0; /* * Initialize variables of Algorithm D (step D1). @@ -733,8 +803,6 @@ inittapes(Tuplesortstate *state) state->Level = 1; state->destTape = 0; - state->multipleRuns = false; - state->status = TSS_BUILDRUNS; } @@ -750,9 +818,6 @@ selectnewtape(Tuplesortstate *state) int j; int a; - /* We now have at least two initial runs */ - state->multipleRuns = true; - /* Step D3: advance j (destTape) */ if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape+1]) { @@ -791,13 +856,13 @@ mergeruns(Tuplesortstate *state) svDummy; Assert(state->status == TSS_BUILDRUNS); - Assert(state->memtupcount == 0 && state->heaptupcount == 0); + Assert(state->memtupcount == 0); /* * If we produced only one initial run (quite likely if the total * data volume is between 1X and 2X SortMem), we can just use that * tape as the finished output, rather than doing a useless merge. */ - if (! state->multipleRuns) + if (state->currentRun == 1) { state->result_tape = state->tp_tapenum[state->destTape]; /* must freeze and rewind the finished output tape */ @@ -896,8 +961,10 @@ mergeonerun(Tuplesortstate *state) { int destTape = state->tp_tapenum[TAPERANGE]; int srcTape; - unsigned int tuplen; + int tupIndex; void *tup; + long priorAvail, + spaceFreed; /* * Start the merge by loading one tuple from each active source tape @@ -910,18 +977,34 @@ mergeonerun(Tuplesortstate *state) * writing it out, and replacing it with next tuple from same tape * (if there is another one). */ - while (state->heaptupcount > 0) + while (state->memtupcount > 0) { - WRITETUP(state, destTape, state->heaptuples[0]); - srcTape = state->heapsrctapes[0]; - tuplesort_heap_siftup(state); - if ((tuplen = getlen(state, srcTape, true)) != 0) + /* write the tuple to destTape */ + priorAvail = state->availMem; + srcTape = state->memtupindex[0]; + WRITETUP(state, destTape, state->memtuples[0]); + /* writetup adjusted total free space, now fix per-tape space */ + spaceFreed = state->availMem - priorAvail; + state->mergeavailmem[srcTape] += spaceFreed; + /* compact the heap */ + tuplesort_heap_siftup(state, false); + if ((tupIndex = state->mergenext[srcTape]) == 0) { - tup = READTUP(state, srcTape, tuplen); - tuplesort_heap_insert(state, tup, srcTape); + /* out of preloaded data on this tape, try to read more */ + mergepreread(state); + /* if still no data, we've reached end of run on this tape */ + if ((tupIndex = state->mergenext[srcTape]) == 0) + continue; } + /* pull next preread tuple from list, insert in heap */ + tup = state->memtuples[tupIndex]; + state->mergenext[srcTape] = state->memtupindex[tupIndex]; + if (state->mergenext[srcTape] == 0) + state->mergelast[srcTape] = 0; + state->memtupindex[tupIndex] = state->mergefreelist; + state->mergefreelist = tupIndex; + tuplesort_heap_insert(state, tup, srcTape, false); } - /* * When the heap empties, we're done. Write an end-of-run marker * on the output tape, and increment its count of real runs. @@ -933,21 +1016,31 @@ mergeonerun(Tuplesortstate *state) /* * beginmerge - initialize for a merge pass * - * We load the first tuple from each nondummy input run into the heap. - * We also decrease the counts of real and dummy runs for each tape. + * We decrease the counts of real and dummy runs for each tape, and mark + * which tapes contain active input runs in mergeactive[]. Then, load + * as many tuples as we can from each active input tape, and finally + * fill the merge heap with the first tuple from each active tape. */ static void beginmerge(Tuplesortstate *state) { + int activeTapes; int tapenum; int srcTape; - unsigned int tuplen; - void *tup; - Assert(state->heaptuples != NULL && state->heaptupcount == 0); - if (state->heapsrctapes == NULL) - state->heapsrctapes = (int *) palloc(MAXTAPES * sizeof(int)); + /* Heap should be empty here */ + Assert(state->memtupcount == 0); + + /* Clear merge-pass state variables */ + memset(state->mergeactive, 0, sizeof(state->mergeactive)); + memset(state->mergenext, 0, sizeof(state->mergenext)); + memset(state->mergelast, 0, sizeof(state->mergelast)); + memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem)); + state->mergefreelist = 0; /* nothing in the freelist */ + state->mergefirstfree = MAXTAPES; /* first slot available for preread */ + /* Adjust run counts and mark the active tapes */ + activeTapes = 0; for (tapenum = 0; tapenum < TAPERANGE; tapenum++) { if (state->tp_dummy[tapenum] > 0) @@ -959,34 +1052,135 @@ beginmerge(Tuplesortstate *state) Assert(state->tp_runs[tapenum] > 0); state->tp_runs[tapenum]--; srcTape = state->tp_tapenum[tapenum]; - tuplen = getlen(state, srcTape, false); - tup = READTUP(state, srcTape, tuplen); - tuplesort_heap_insert(state, tup, srcTape); + state->mergeactive[srcTape] = true; + activeTapes++; } } + /* + * Initialize space allocation to let each active input tape have + * an equal share of preread space. + */ + Assert(activeTapes > 0); + state->spacePerTape = state->availMem / activeTapes; + for (srcTape = 0; srcTape < MAXTAPES; srcTape++) + { + if (state->mergeactive[srcTape]) + state->mergeavailmem[srcTape] = state->spacePerTape; + } + + /* + * Preread as many tuples as possible (and at least one) from each + * active tape + */ + mergepreread(state); + + /* Load the merge heap with the first tuple from each input tape */ + for (srcTape = 0; srcTape < MAXTAPES; srcTape++) + { + int tupIndex = state->mergenext[srcTape]; + void *tup; + + if (tupIndex) + { + tup = state->memtuples[tupIndex]; + state->mergenext[srcTape] = state->memtupindex[tupIndex]; + if (state->mergenext[srcTape] == 0) + state->mergelast[srcTape] = 0; + state->memtupindex[tupIndex] = state->mergefreelist; + state->mergefreelist = tupIndex; + tuplesort_heap_insert(state, tup, srcTape, false); + } + } } /* - * beginrun - start a new initial run + * mergepreread - load tuples from merge input tapes * - * The tuples presently in the unsorted memory array are moved into - * the heap. + * This routine exists to improve sequentiality of reads during a merge pass, + * as explained in the header comments of this file. Load tuples from each + * active source tape until the tape's run is exhausted or it has used up + * its fair share of available memory. In any case, we guarantee that there + * is at one preread tuple available from each unexhausted input tape. */ static void -beginrun(Tuplesortstate *state) +mergepreread(Tuplesortstate *state) { - int i; + int srcTape; + unsigned int tuplen; + void *tup; + int tupIndex; + long priorAvail, + spaceUsed; - Assert(state->heaptupcount == 0 && state->memtupcount > 0); - for (i = 0; i < state->memtupcount; i++) - tuplesort_heap_insert(state, state->memtuples[i], 0); - state->memtupcount = 0; + for (srcTape = 0; srcTape < MAXTAPES; srcTape++) + { + if (! state->mergeactive[srcTape]) + continue; + /* + * Skip reading from any tape that still has at least half + * of its target memory filled with tuples (threshold fraction + * may need adjustment?). This avoids reading just a few tuples + * when the incoming runs are not being consumed evenly. + */ + if (state->mergenext[srcTape] != 0 && + state->mergeavailmem[srcTape] <= state->spacePerTape / 2) + continue; + /* + * Read tuples from this tape until it has used up its free memory, + * but ensure that we have at least one. + */ + priorAvail = state->availMem; + state->availMem = state->mergeavailmem[srcTape]; + while (! LACKMEM(state) || state->mergenext[srcTape] == 0) + { + /* read next tuple, if any */ + if ((tuplen = getlen(state, srcTape, true)) == 0) + { + state->mergeactive[srcTape] = false; + break; + } + tup = READTUP(state, srcTape, tuplen); + /* find or make a free slot in memtuples[] for it */ + tupIndex = state->mergefreelist; + if (tupIndex) + state->mergefreelist = state->memtupindex[tupIndex]; + else + { + tupIndex = state->mergefirstfree++; + /* Might need to enlarge arrays! */ + if (tupIndex >= state->memtupsize) + { + state->memtupsize *= 2; + state->memtuples = (void **) + repalloc(state->memtuples, + state->memtupsize * sizeof(void *)); + state->memtupindex = (int *) + repalloc(state->memtupindex, + state->memtupsize * sizeof(int)); + } + } + /* store tuple, append to list for its tape */ + state->memtuples[tupIndex] = tup; + state->memtupindex[tupIndex] = 0; + if (state->mergelast[srcTape]) + state->memtupindex[state->mergelast[srcTape]] = tupIndex; + else + state->mergenext[srcTape] = tupIndex; + state->mergelast[srcTape] = tupIndex; + } + /* update per-tape and global availmem counts */ + spaceUsed = state->mergeavailmem[srcTape] - state->availMem; + state->mergeavailmem[srcTape] = state->availMem; + state->availMem = priorAvail - spaceUsed; + } } /* * dumptuples - remove tuples from heap and write to tape * + * This is used during initial-run building, but not during merging. + * * When alltuples = false, dump only enough tuples to get under the * availMem limit (and leave at least one tuple in the heap in any case, * since puttuple assumes it always has a tuple to compare to). @@ -994,37 +1188,42 @@ beginrun(Tuplesortstate *state) * When alltuples = true, dump everything currently in memory. * (This case is only used at end of input data.) * - * If we empty the heap, then start a new run using the tuples that - * have accumulated in memtuples[] (if any). + * If we empty the heap, close out the current run and return (this should + * only happen at end of input data). If we see that the tuple run number + * at the top of the heap has changed, start a new run. */ static void dumptuples(Tuplesortstate *state, bool alltuples) { while (alltuples || - (LACKMEM(state) && - (state->heaptupcount > 0 || state->memtupcount > 0))) + (LACKMEM(state) && state->memtupcount > 1)) { /* * Dump the heap's frontmost entry, and sift up to remove it * from the heap. */ - Assert(state->heaptupcount > 0); + Assert(state->memtupcount > 0); WRITETUP(state, state->tp_tapenum[state->destTape], - state->heaptuples[0]); - tuplesort_heap_siftup(state); + state->memtuples[0]); + tuplesort_heap_siftup(state, true); /* - * If the heap is now empty, we've finished a run. + * If the heap is empty *or* top run number has changed, + * we've finished the current run. */ - if (state->heaptupcount == 0) + if (state->memtupcount == 0 || + state->currentRun != state->memtupindex[0]) { markrunend(state, state->tp_tapenum[state->destTape]); + state->currentRun++; state->tp_runs[state->destTape]++; state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ + /* + * Done if heap is empty, else prepare for new run. + */ if (state->memtupcount == 0) - break; /* all input data has been written to tape */ - /* Select new output tape and start a new run */ + break; + Assert(state->currentRun == state->memtupindex[0]); selectnewtape(state); - beginrun(state); } } } @@ -1119,88 +1318,102 @@ tuplesort_restorepos(Tuplesortstate *state) /* * Heap manipulation routines, per Knuth's Algorithm 5.2.3H. + * + * The heap lives in state->memtuples[], with parallel data storage + * for indexes in state->memtupindex[]. If checkIndex is true, use + * the tuple index as the front of the sort key; otherwise, no. */ +#define HEAPCOMPARE(tup1,index1,tup2,index2) \ + (checkIndex && (index1 != index2) ? index1 - index2 : \ + COMPARETUP(state, tup1, tup2)) + /* * Insert a new tuple into an empty or existing heap, maintaining the - * heap invariant. The heap lives in state->heaptuples[]. Also, if - * state->heapsrctapes is not NULL, we store each tuple's source tapenum - * in the corresponding element of state->heapsrctapes[]. + * heap invariant. */ static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple, - int tapenum) + int tupleindex, bool checkIndex) { + void **memtuples; + int *memtupindex; int j; /* - * Make sure heaptuples[] can handle another entry. - * NOTE: we do not enlarge heapsrctapes[]; it's supposed - * to be big enough when created. + * Make sure memtuples[] can handle another entry. */ - if (state->heaptupcount >= state->heaptupsize) + if (state->memtupcount >= state->memtupsize) { - /* Grow the unsorted array as needed. */ - state->heaptupsize *= 2; - state->heaptuples = (void **) - repalloc(state->heaptuples, - state->heaptupsize * sizeof(void *)); + state->memtupsize *= 2; + state->memtuples = (void **) + repalloc(state->memtuples, + state->memtupsize * sizeof(void *)); + state->memtupindex = (int *) + repalloc(state->memtupindex, + state->memtupsize * sizeof(int)); } + memtuples = state->memtuples; + memtupindex = state->memtupindex; /* * Sift-up the new entry, per Knuth 5.2.3 exercise 16. * Note that Knuth is using 1-based array indexes, not 0-based. */ - j = state->heaptupcount++; - while (j > 0) { + j = state->memtupcount++; + while (j > 0) + { int i = (j-1) >> 1; - if (COMPARETUP(state, tuple, state->heaptuples[i]) >= 0) + if (HEAPCOMPARE(tuple, tupleindex, + memtuples[i], memtupindex[i]) >= 0) break; - state->heaptuples[j] = state->heaptuples[i]; - if (state->heapsrctapes) - state->heapsrctapes[j] = state->heapsrctapes[i]; + memtuples[j] = memtuples[i]; + memtupindex[j] = memtupindex[i]; j = i; } - state->heaptuples[j] = tuple; - if (state->heapsrctapes) - state->heapsrctapes[j] = tapenum; + memtuples[j] = tuple; + memtupindex[j] = tupleindex; } /* - * The tuple at state->heaptuples[0] has been removed from the heap. - * Decrement heaptupcount, and sift up to maintain the heap invariant. + * The tuple at state->memtuples[0] has been removed from the heap. + * Decrement memtupcount, and sift up to maintain the heap invariant. */ static void -tuplesort_heap_siftup(Tuplesortstate *state) +tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex) { - void **heaptuples = state->heaptuples; + void **memtuples = state->memtuples; + int *memtupindex = state->memtupindex; void *tuple; - int i, + int tupindex, + i, n; - if (--state->heaptupcount <= 0) + if (--state->memtupcount <= 0) return; - n = state->heaptupcount; - tuple = heaptuples[n]; /* tuple that must be reinserted */ + n = state->memtupcount; + tuple = memtuples[n]; /* tuple that must be reinserted */ + tupindex = memtupindex[n]; i = 0; /* i is where the "hole" is */ - for (;;) { + for (;;) + { int j = 2*i + 1; if (j >= n) break; if (j+1 < n && - COMPARETUP(state, heaptuples[j], heaptuples[j+1]) > 0) + HEAPCOMPARE(memtuples[j], memtupindex[j], + memtuples[j+1], memtupindex[j+1]) > 0) j++; - if (COMPARETUP(state, tuple, heaptuples[j]) <= 0) + if (HEAPCOMPARE(tuple, tupindex, + memtuples[j], memtupindex[j]) <= 0) break; - heaptuples[i] = heaptuples[j]; - if (state->heapsrctapes) - state->heapsrctapes[i] = state->heapsrctapes[j]; + memtuples[i] = memtuples[j]; + memtupindex[i] = memtupindex[j]; i = j; } - heaptuples[i] = tuple; - if (state->heapsrctapes) - state->heapsrctapes[i] = state->heapsrctapes[n]; + memtuples[i] = tuple; + memtupindex[i] = tupindex; } @@ -1252,6 +1465,7 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b) { HeapTuple ltup = (HeapTuple) a; HeapTuple rtup = (HeapTuple) b; + TupleDesc tupDesc = state->tupDesc; int nkey; for (nkey = 0; nkey < state->nKeys; nkey++) @@ -1265,11 +1479,11 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b) lattr = heap_getattr(ltup, scanKey->sk_attno, - state->tupDesc, + tupDesc, &isnull1); rattr = heap_getattr(rtup, scanKey->sk_attno, - state->tupDesc, + tupDesc, &isnull2); if (isnull1) { @@ -1351,6 +1565,14 @@ readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len) return (void *) tuple; } +static unsigned int +tuplesize_heap(Tuplesortstate *state, void *tup) +{ + HeapTuple tuple = (HeapTuple) tup; + + return HEAPTUPLESIZE + tuple->t_len; +} + /* * Routines specialized for IndexTuple case @@ -1368,16 +1590,17 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b) IndexTuple rtup = (IndexTuple) b; TupleDesc itdesc = state->indexRel->rd_att; bool equal_isnull = false; - Datum lattr, - rattr; - bool isnull1, - isnull2; int i; - for (i = 0; i < itdesc->natts; i++) + for (i = 1; i <= itdesc->natts; i++) { - lattr = index_getattr(ltup, i + 1, itdesc, &isnull1); - rattr = index_getattr(rtup, i + 1, itdesc, &isnull2); + Datum lattr, + rattr; + bool isnull1, + isnull2; + + lattr = index_getattr(ltup, i, itdesc, &isnull1); + rattr = index_getattr(rtup, i, itdesc, &isnull2); if (isnull1) { @@ -1389,11 +1612,11 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b) else if (isnull2) return -1; - if (_bt_invokestrat(state->indexRel, i + 1, + if (_bt_invokestrat(state->indexRel, i, BTGreaterStrategyNumber, lattr, rattr)) return 1; - if (_bt_invokestrat(state->indexRel, i + 1, + if (_bt_invokestrat(state->indexRel, i, BTGreaterStrategyNumber, rattr, lattr)) return -1; @@ -1463,3 +1686,12 @@ readtup_index(Tuplesortstate *state, int tapenum, unsigned int len) elog(ERROR, "tuplesort: unexpected end of data"); return (void *) tuple; } + +static unsigned int +tuplesize_index(Tuplesortstate *state, void *tup) +{ + IndexTuple tuple = (IndexTuple) tup; + unsigned int tuplen = IndexTupleSize(tuple); + + return tuplen; +}