diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index a67326c60b2f03eaae88e0bb697f619535a90538..255ffcff25ed1c01c0192d2976aa6ccbf06d0fa7 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/executor/nodeSort.c,v 1.52 2005/11/23 20:27:57 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/executor/nodeSort.c,v 1.53 2006/02/26 22:58:12 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -225,6 +225,8 @@ ExecEndSort(SortState *node)
 	 * clean out the tuple table
 	 */
 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
+	/* must drop pointer to sort result tuple */
+	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
 
 	/*
 	 * Release tuplesort resources
@@ -292,6 +294,7 @@ ExecReScanSort(SortState *node, ExprContext *exprCtxt)
 	if (!node->sort_Done)
 		return;
 
+	/* must drop pointer to sort result tuple */
 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
 
 	/*
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 12d471d952b969d39de29eb466641f7df495f3e7..0f680d0e2c6970acea46e6fbe33284017463dbb9 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -91,7 +91,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.59 2006/02/19 19:59:53 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.60 2006/02/26 22:58:12 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -119,6 +119,40 @@ bool		trace_sort = false;
 #endif
 
 
+/*
+ * The objects we actually sort are SortTuple structs.  These contain
+ * a pointer to the tuple proper (might be a HeapTuple or IndexTuple),
+ * which is a separate palloc chunk --- we assume it is just one chunk and
+ * can be freed by a simple pfree().  SortTuples also contain the tuple's
+ * first key column in Datum/nullflag format, and an index integer.
+ *
+ * Storing the first key column lets us save heap_getattr or index_getattr
+ * calls during tuple comparisons.  We could extract and save all the key
+ * columns not just the first, but this would increase code complexity and
+ * overhead, and wouldn't actually save any comparison cycles in the common
+ * case where the first key determines the comparison result.  Note that
+ * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
+ *
+ * When sorting single Datums, the data value is represented directly by
+ * datum1/isnull1.  If the datatype is pass-by-reference and isnull1 is false,
+ * then datum1 points to a separately palloc'd data value that is also pointed
+ * to by the "tuple" pointer; otherwise "tuple" is NULL.
+ *
+ * While building initial runs, tupindex holds the tuple's run number.  During
+ * merge passes, we re-use it to hold the input tape number that each tuple in
+ * the heap was read from, or to hold the index of the next tuple pre-read
+ * from the same tape in the case of pre-read entries.  tupindex goes unused
+ * if the sort occurs entirely in memory.
+ */
+typedef struct
+{
+	void	   *tuple;			/* the tuple proper */
+	Datum		datum1;			/* value of first key column */
+	bool		isnull1;		/* is first key column NULL? */
+	int			tupindex;		/* see notes above */
+} SortTuple;
+
+
 /*
  * Possible states of a Tuplesort object.  These denote the states that
  * persist between calls of Tuplesort routines.
@@ -133,16 +167,17 @@ typedef enum
 } TupSortStatus;
 
 /*
- * Parameters for calculation of number of tapes to use --- see inittapes().
+ * Parameters for calculation of number of tapes to use --- see inittapes()
+ * and tuplesort_merge_order().
  *
  * In this calculation we assume that each tape will cost us about 3 blocks
  * worth of buffer space (which is an underestimate for very large data
  * volumes, but it's probably close enough --- see logtape.c).
  *
- * MERGE_BUFFER_SIZE is how much data we'd like to read from each
+ * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
  * tape during a preread cycle (see discussion at top of file).
  */
-#define MINTAPES		7		/* minimum number of tapes */
+#define MINORDER		6		/* minimum merge order */
 #define TAPE_BUFFER_OVERHEAD		(BLCKSZ * 3)
 #define MERGE_BUFFER_SIZE			(BLCKSZ * 32)
 
@@ -152,11 +187,13 @@ typedef enum
 struct Tuplesortstate
 {
 	TupSortStatus status;		/* enumerated value as shown above */
+	int			nKeys;			/* number of columns in sort key */
 	bool		randomAccess;	/* did caller request random access? */
 	long		availMem;		/* remaining memory available, in bytes */
 	long		allowedMem;		/* total memory allowed, in bytes */
 	int			maxTapes;		/* number of tapes (Knuth's T) */
 	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
+	MemoryContext sortcontext;	/* memory context holding all sort data */
 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
 
 	/*
@@ -168,35 +205,38 @@ struct Tuplesortstate
 	 *
 	 * <0, 0, >0 according as a<b, a=b, a>b.
 	 */
-	int			(*comparetup) (Tuplesortstate *state, const void *a, const void *b);
+	int			(*comparetup) (Tuplesortstate *state,
+							   const SortTuple *a, const SortTuple *b);
 
 	/*
-	 * Function to copy a supplied input tuple into palloc'd space. (NB: we
-	 * assume that a single pfree() is enough to release the tuple later, so
-	 * the representation must be "flat" in one palloc chunk.) state->availMem
-	 * must be decreased by the amount of space used.
+	 * Function to copy a supplied input tuple into palloc'd space and set up
+	 * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
+	 * state->availMem must be decreased by the amount of space used for the
+	 * tuple copy (note the SortTuple struct itself is not counted).
 	 */
-	void	   *(*copytup) (Tuplesortstate *state, void *tup);
+	void		(*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
 
 	/*
 	 * Function to write a stored tuple onto tape.	The representation of the
 	 * tuple on tape need not be the same as it is in memory; requirements on
 	 * the tape representation are given below.  After writing the tuple,
-	 * pfree() it, and increase state->availMem by the amount of memory space
-	 * thereby released.
+	 * pfree() the out-of-line data (not the SortTuple struct!), and increase
+	 * state->availMem by the amount of memory space thereby released.
 	 */
-	void		(*writetup) (Tuplesortstate *state, int tapenum, void *tup);
+	void		(*writetup) (Tuplesortstate *state, int tapenum,
+							 SortTuple *stup);
 
 	/*
 	 * Function to read a stored tuple from tape back into memory. 'len' is
-	 * the already-read length of the stored tuple.  Create and return a
-	 * palloc'd copy, and decrease state->availMem by the amount of memory
-	 * space consumed.
+	 * the already-read length of the stored tuple.  Create a palloc'd copy,
+	 * initialize tuple/datum1/isnull1 in the target SortTuple struct,
+	 * and decrease state->availMem by the amount of memory space consumed.
 	 */
-	void	   *(*readtup) (Tuplesortstate *state, int tapenum, unsigned int len);
+	void		(*readtup) (Tuplesortstate *state, SortTuple *stup,
+							int tapenum, unsigned int len);
 
 	/*
-	 * This array holds pointers to tuples in sort memory.	If we are in state
+	 * This array holds the tuples now in sort memory.  If we are in state
 	 * INITIAL, the tuples are in no particular order; if we are in state
 	 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
 	 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
@@ -205,21 +245,10 @@ struct Tuplesortstate
 	 * never in the heap and are used to hold pre-read tuples.)  In state
 	 * SORTEDONTAPE, the array is not used.
 	 */
-	void	  **memtuples;		/* array of pointers to palloc'd tuples */
+	SortTuple  *memtuples;		/* array of SortTuple structs */
 	int			memtupcount;	/* number of tuples currently present */
 	int			memtupsize;		/* allocated length of memtuples array */
 
-	/*
-	 * While building initial runs, this array holds the run number for each
-	 * tuple in memtuples[].  During merge passes, we re-use it to hold the
-	 * input tape number that each tuple in the heap was read from, or to hold
-	 * the index of the next tuple pre-read from the same tape in the case of
-	 * pre-read entries.  This array is never allocated unless we need to use
-	 * tapes.  Whenever it is allocated, it has the same length as
-	 * memtuples[].
-	 */
-	int		   *memtupindex;	/* index value associated with memtuples[i] */
-
 	/*
 	 * While building initial runs, this is the current output run number
 	 * (starting at 0).  Afterwards, it is the number of initial runs we made.
@@ -237,13 +266,15 @@ struct Tuplesortstate
 	 * have not yet exhausted that run.  mergenext[i] is the memtuples index
 	 * of the next pre-read tuple (next to be loaded into the heap) for tape
 	 * i, or 0 if we are out of pre-read tuples.  mergelast[i] similarly
-	 * points to the last pre-read tuple from each tape. mergeavailmem[i] is
-	 * the amount of unused space allocated for tape i. mergefreelist and
+	 * points to the last pre-read tuple from each tape.  mergeavailmem[i] is
+	 * the amount of unused space allocated for tape i.  mergefreelist and
 	 * mergefirstfree keep track of unused locations in the memtuples[] array.
-	 * memtupindex[] links together pre-read tuples for each tape as well as
-	 * recycled locations in mergefreelist. It is OK to use 0 as a null link
-	 * in these lists, because memtuples[0] is part of the merge heap and is
-	 * never a pre-read tuple.
+	 * The memtuples[].tupindex fields link together pre-read tuples for each
+	 * tape as well as recycled locations in mergefreelist. It is OK to use 0
+	 * as a null link in these lists, because memtuples[0] is part of the
+	 * merge heap and is never a pre-read tuple.  mergeslotsfree counts the
+	 * total number of free memtuples[] slots, both those in the freelist and
+	 * those beyond mergefirstfree.
 	 */
 	bool	   *mergeactive;	/* Active input run source? */
 	int		   *mergenext;		/* first preread tuple for each source */
@@ -251,7 +282,8 @@ struct Tuplesortstate
 	long	   *mergeavailmem;	/* availMem for prereading tapes */
 	long		spacePerTape;	/* actual per-tape target usage */
 	int			mergefreelist;	/* head of freelist of recycled slots */
-	int			mergefirstfree; /* first slot never used in this merge */
+	int			mergefirstfree;	/* first slot never used in this merge */
+	int			mergeslotsfree;	/* number of free slots during merge */
 
 	/*
 	 * Variables for Algorithm D.  Note that destTape is a "logical" tape
@@ -284,7 +316,6 @@ struct Tuplesortstate
 	 * tuplesort_begin_heap and used only by the HeapTuple routines.
 	 */
 	TupleDesc	tupDesc;
-	int			nKeys;
 	ScanKey		scanKeys;			/* array of length nKeys */
 	SortFunctionKind *sortFnKinds;	/* array of length nKeys */
 
@@ -317,9 +348,9 @@ struct Tuplesortstate
 };
 
 #define COMPARETUP(state,a,b)	((*(state)->comparetup) (state, a, b))
-#define COPYTUP(state,tup)	((*(state)->copytup) (state, tup))
-#define WRITETUP(state,tape,tup)	((*(state)->writetup) (state, tape, tup))
-#define READTUP(state,tape,len) ((*(state)->readtup) (state, tape, len))
+#define COPYTUP(state,stup,tup)	((*(state)->copytup) (state, stup, tup))
+#define WRITETUP(state,tape,stup)	((*(state)->writetup) (state, tape, stup))
+#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
 #define LACKMEM(state)		((state)->availMem < 0)
 #define USEMEM(state,amt)	((state)->availMem -= (amt))
 #define FREEMEM(state,amt)	((state)->availMem += (amt))
@@ -355,8 +386,8 @@ struct Tuplesortstate
  * NOTES about memory consumption calculations:
  *
  * We count space allocated for tuples against the workMem limit, plus
- * the space used by the variable-size arrays memtuples and memtupindex.
- * Fixed-size space is not counted; it's small enough to not be interesting.
+ * the space used by the variable-size memtuples array.  Fixed-size space
+ * is not counted; it's small enough to not be interesting.
  *
  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
  * rather than the originally-requested size.  This is important since
@@ -365,22 +396,9 @@ struct Tuplesortstate
  * a lot better than what we were doing before 7.3.
  */
 
-/*
- * For sorting single Datums, we build "pseudo tuples" that just carry
- * the datum's value and null flag.  For pass-by-reference data types,
- * the actual data value appears after the DatumTupleHeader (MAXALIGNed,
- * of course), and the value field in the header is just a pointer to it.
- */
-
-typedef struct
-{
-	Datum		val;
-	bool		isNull;
-} DatumTuple;
-
 
 static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
-static void puttuple_common(Tuplesortstate *state, void *tuple);
+static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
 static void inittapes(Tuplesortstate *state);
 static void selectnewtape(Tuplesortstate *state);
 static void mergeruns(Tuplesortstate *state);
@@ -388,30 +406,33 @@ static void mergeonerun(Tuplesortstate *state);
 static void beginmerge(Tuplesortstate *state);
 static void mergepreread(Tuplesortstate *state);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
-static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
+static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
 					  int tupleindex, bool checkIndex);
 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
 static void markrunend(Tuplesortstate *state, int tapenum);
 static int	qsort_comparetup(const void *a, const void *b);
 static int comparetup_heap(Tuplesortstate *state,
-				const void *a, const void *b);
-static void *copytup_heap(Tuplesortstate *state, void *tup);
-static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup);
-static void *readtup_heap(Tuplesortstate *state, int tapenum,
-			 unsigned int len);
+				const SortTuple *a, const SortTuple *b);
+static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
+static void writetup_heap(Tuplesortstate *state, int tapenum,
+						  SortTuple *stup);
+static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
+						 int tapenum, unsigned int len);
 static int comparetup_index(Tuplesortstate *state,
-				 const void *a, const void *b);
-static void *copytup_index(Tuplesortstate *state, void *tup);
-static void writetup_index(Tuplesortstate *state, int tapenum, void *tup);
-static void *readtup_index(Tuplesortstate *state, int tapenum,
-			  unsigned int len);
+				 const SortTuple *a, const SortTuple *b);
+static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
+static void writetup_index(Tuplesortstate *state, int tapenum,
+						   SortTuple *stup);
+static void readtup_index(Tuplesortstate *state, SortTuple *stup,
+						  int tapenum, unsigned int len);
 static int comparetup_datum(Tuplesortstate *state,
-				 const void *a, const void *b);
-static void *copytup_datum(Tuplesortstate *state, void *tup);
-static void writetup_datum(Tuplesortstate *state, int tapenum, void *tup);
-static void *readtup_datum(Tuplesortstate *state, int tapenum,
-			  unsigned int len);
+				 const SortTuple *a, const SortTuple *b);
+static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
+static void writetup_datum(Tuplesortstate *state, int tapenum,
+						   SortTuple *stup);
+static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
+						  int tapenum, unsigned int len);
 
 /*
  * Since qsort(3) will not pass any context info to qsort_comparetup(),
@@ -446,6 +467,24 @@ static Tuplesortstate *
 tuplesort_begin_common(int workMem, bool randomAccess)
 {
 	Tuplesortstate *state;
+	MemoryContext sortcontext;
+	MemoryContext oldcontext;
+
+	/*
+	 * Create a working memory context for this sort operation.
+	 * All data needed by the sort will live inside this context.
+	 */
+	sortcontext = AllocSetContextCreate(CurrentMemoryContext,
+										"TupleSort",
+										ALLOCSET_DEFAULT_MINSIZE,
+										ALLOCSET_DEFAULT_INITSIZE,
+										ALLOCSET_DEFAULT_MAXSIZE);
+
+	/*
+	 * Make the Tuplesortstate within the per-sort context.  This way,
+	 * we don't need a separate pfree() operation for it at shutdown.
+	 */
+	oldcontext = MemoryContextSwitchTo(sortcontext);
 
 	state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
 
@@ -458,16 +497,19 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 	state->randomAccess = randomAccess;
 	state->allowedMem = workMem * 1024L;
 	state->availMem = state->allowedMem;
+	state->sortcontext = sortcontext;
 	state->tapeset = NULL;
 
 	state->memtupcount = 0;
 	state->memtupsize = 1024;	/* initial guess */
-	state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
-
-	state->memtupindex = NULL;	/* until and unless needed */
+	state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
 
 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
+	/* workMem must be large enough for the minimal memtuples array */
+	if (LACKMEM(state))
+		elog(ERROR, "insufficient memory allowed for sort");
+
 	state->currentRun = 0;
 
 	/*
@@ -477,6 +519,8 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 
 	state->result_tape = -1;	/* flag that result tape has not been formed */
 
+	MemoryContextSwitchTo(oldcontext);
+
 	return state;
 }
 
@@ -487,8 +531,11 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 					 int workMem, bool randomAccess)
 {
 	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+	MemoryContext oldcontext;
 	int			i;
 
+	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
 	AssertArg(nkeys > 0);
 
 #ifdef TRACE_SORT
@@ -498,13 +545,14 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 			 nkeys, workMem, randomAccess ? 't' : 'f');
 #endif
 
+	state->nKeys = nkeys;
+
 	state->comparetup = comparetup_heap;
 	state->copytup = copytup_heap;
 	state->writetup = writetup_heap;
 	state->readtup = readtup_heap;
 
-	state->tupDesc = tupDesc;
-	state->nKeys = nkeys;
+	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
 	state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
 	state->sortFnKinds = (SortFunctionKind *)
 		palloc0(nkeys * sizeof(SortFunctionKind));
@@ -531,6 +579,8 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 					(Datum) 0);
 	}
 
+	MemoryContextSwitchTo(oldcontext);
+
 	return state;
 }
 
@@ -540,6 +590,9 @@ tuplesort_begin_index(Relation indexRel,
 					  int workMem, bool randomAccess)
 {
 	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->sortcontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -549,6 +602,8 @@ tuplesort_begin_index(Relation indexRel,
 			 workMem, randomAccess ? 't' : 'f');
 #endif
 
+	state->nKeys = RelationGetNumberOfAttributes(indexRel);
+
 	state->comparetup = comparetup_index;
 	state->copytup = copytup_index;
 	state->writetup = writetup_index;
@@ -559,6 +614,8 @@ tuplesort_begin_index(Relation indexRel,
 	state->indexScanKey = _bt_mkscankey_nodata(indexRel);
 	state->enforceUnique = enforceUnique;
 
+	MemoryContextSwitchTo(oldcontext);
+
 	return state;
 }
 
@@ -568,10 +625,13 @@ tuplesort_begin_datum(Oid datumType,
 					  int workMem, bool randomAccess)
 {
 	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+	MemoryContext oldcontext;
 	RegProcedure sortFunction;
 	int16		typlen;
 	bool		typbyval;
 
+	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
 #ifdef TRACE_SORT
 	if (trace_sort)
 		elog(LOG,
@@ -579,6 +639,8 @@ tuplesort_begin_datum(Oid datumType,
 			 workMem, randomAccess ? 't' : 'f');
 #endif
 
+	state->nKeys = 1;			/* always a one-column sort */
+
 	state->comparetup = comparetup_datum;
 	state->copytup = copytup_datum;
 	state->writetup = writetup_datum;
@@ -597,6 +659,8 @@ tuplesort_begin_datum(Oid datumType,
 	state->datumTypeLen = typlen;
 	state->datumTypeByVal = typbyval;
 
+	MemoryContextSwitchTo(oldcontext);
+
 	return state;
 }
 
@@ -604,46 +668,34 @@ tuplesort_begin_datum(Oid datumType,
  * tuplesort_end
  *
  *	Release resources and clean up.
+ *
+ * NOTE: after calling this, any tuple pointers returned by tuplesort_gettuple
+ * or datum pointers returned by tuplesort_getdatum are pointing to garbage.
+ * Be careful not to attempt to use or free such pointers afterwards!
  */
 void
 tuplesort_end(Tuplesortstate *state)
 {
-	int			i;
+	/* context swap probably not needed, but let's be safe */
+	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
 
 #ifdef TRACE_SORT
 	long		spaceUsed;
-#endif
 
 	if (state->tapeset)
-	{
-#ifdef TRACE_SORT
 		spaceUsed = LogicalTapeSetBlocks(state->tapeset);
-#endif
-		LogicalTapeSetClose(state->tapeset);
-	}
 	else
-	{
-#ifdef TRACE_SORT
 		spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
 #endif
-	}
-
-	if (state->memtuples)
-	{
-		for (i = 0; i < state->memtupcount; i++)
-			pfree(state->memtuples[i]);
-		pfree(state->memtuples);
-	}
-	if (state->memtupindex)
-		pfree(state->memtupindex);
 
 	/*
-	 * this stuff might better belong in a variant-specific shutdown routine
+	 * Delete temporary "tape" files, if any.
+	 *
+	 * Note: want to include this in reported total cost of sort, hence
+	 * need for two #ifdef TRACE_SORT sections.
 	 */
-	if (state->scanKeys)
-		pfree(state->scanKeys);
-	if (state->sortFnKinds)
-		pfree(state->sortFnKinds);
+	if (state->tapeset)
+		LogicalTapeSetClose(state->tapeset);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -657,7 +709,47 @@ tuplesort_end(Tuplesortstate *state)
 	}
 #endif
 
-	pfree(state);
+	MemoryContextSwitchTo(oldcontext);
+
+	/*
+	 * Free the per-sort memory context, thereby releasing all working
+	 * memory, including the Tuplesortstate struct itself.
+	 */
+	MemoryContextDelete(state->sortcontext);
+}
+
+/*
+ * Grow the memtuples[] array, if possible within our memory constraint.
+ * Return TRUE if able to enlarge the array, FALSE if not.
+ *
+ * At each increment we double the size of the array.  When we are short
+ * on memory we could consider smaller increases, but because availMem
+ * moves around with tuple addition/removal, this might result in thrashing.
+ * Small increases in the array size are likely to be pretty inefficient.
+ */
+static bool
+grow_memtuples(Tuplesortstate *state)
+{
+	/*
+	 * We need to be sure that we do not cause LACKMEM to become true, else
+	 * the space management algorithm will go nuts.  We assume here that
+	 * the memory chunk overhead associated with the memtuples array is
+	 * constant and so there will be no unexpected addition to what we ask
+	 * for.  (The minimum array size established in tuplesort_begin_common
+	 * is large enough to force palloc to treat it as a separate chunk, so
+	 * this assumption should be good.  But let's check it.)
+	 */
+	if (state->availMem <= (long) (state->memtupsize * sizeof(SortTuple)))
+		return false;
+	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
+	state->memtupsize *= 2;
+	state->memtuples = (SortTuple *)
+		repalloc(state->memtuples,
+				 state->memtupsize * sizeof(SortTuple));
+	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+	if (LACKMEM(state))
+		elog(ERROR, "unexpected out-of-memory situation during sort");
+	return true;
 }
 
 /*
@@ -668,13 +760,18 @@ tuplesort_end(Tuplesortstate *state)
 void
 tuplesort_puttuple(Tuplesortstate *state, void *tuple)
 {
+	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	SortTuple	stup;
+
 	/*
 	 * Copy the given tuple into memory we control, and decrease availMem.
 	 * Then call the code shared with the Datum case.
 	 */
-	tuple = COPYTUP(state, tuple);
+	COPYTUP(state, &stup, tuple);
+
+	puttuple_common(state, &stup);
 
-	puttuple_common(state, tuple);
+	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
@@ -685,66 +782,60 @@ tuplesort_puttuple(Tuplesortstate *state, void *tuple)
 void
 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
 {
-	DatumTuple *tuple;
+	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	SortTuple	stup;
 
 	/*
-	 * Build pseudo-tuple carrying the datum, and decrease availMem.
+	 * If it's a pass-by-reference value, copy it into memory we control,
+	 * and decrease availMem.  Then call the code shared with the tuple case.
 	 */
 	if (isNull || state->datumTypeByVal)
 	{
-		tuple = (DatumTuple *) palloc(sizeof(DatumTuple));
-		tuple->val = val;
-		tuple->isNull = isNull;
+		stup.datum1 = val;
+		stup.isnull1 = isNull;
+		stup.tuple = NULL;		/* no separate storage */
 	}
 	else
 	{
-		Size		datalen;
-		Size		tuplelen;
-		char	   *newVal;
-
-		datalen = datumGetSize(val, false, state->datumTypeLen);
-		tuplelen = datalen + MAXALIGN(sizeof(DatumTuple));
-		tuple = (DatumTuple *) palloc(tuplelen);
-		newVal = ((char *) tuple) + MAXALIGN(sizeof(DatumTuple));
-		memcpy(newVal, DatumGetPointer(val), datalen);
-		tuple->val = PointerGetDatum(newVal);
-		tuple->isNull = false;
+		stup.datum1 = datumCopy(val, false, state->datumTypeLen);
+		stup.isnull1 = false;
+		stup.tuple = DatumGetPointer(stup.datum1);
+		USEMEM(state, GetMemoryChunkSpace(stup.tuple));
 	}
 
-	USEMEM(state, GetMemoryChunkSpace(tuple));
+	puttuple_common(state, &stup);
 
-	puttuple_common(state, (void *) tuple);
+	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
  * Shared code for tuple and datum cases.
  */
 static void
-puttuple_common(Tuplesortstate *state, void *tuple)
+puttuple_common(Tuplesortstate *state, SortTuple *tuple)
 {
 	switch (state->status)
 	{
 		case TSS_INITIAL:
 
 			/*
-			 * Save the copied tuple into the unsorted array.
+			 * Save the tuple into the unsorted array.  First, grow the
+			 * array as needed.  Note that we try to grow the array when there
+			 * is still one free slot remaining --- if we fail, there'll still
+			 * be room to store the incoming tuple, and then we'll switch to
+			 * tape-based operation.
 			 */
-			if (state->memtupcount >= state->memtupsize)
+			if (state->memtupcount >= state->memtupsize - 1)
 			{
-				/* Grow the unsorted array as needed. */
-				FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
-				state->memtupsize *= 2;
-				state->memtuples = (void **)
-					repalloc(state->memtuples,
-							 state->memtupsize * sizeof(void *));
-				USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+				(void) grow_memtuples(state);
+				Assert(state->memtupcount < state->memtupsize);
 			}
-			state->memtuples[state->memtupcount++] = tuple;
+			state->memtuples[state->memtupcount++] = *tuple;
 
 			/*
-			 * Done if we still fit in available memory.
+			 * Done if we still fit in available memory and have array slots.
 			 */
-			if (!LACKMEM(state))
+			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
 				return;
 
 			/*
@@ -760,7 +851,7 @@ puttuple_common(Tuplesortstate *state, void *tuple)
 		case TSS_BUILDRUNS:
 
 			/*
-			 * Insert the copied tuple into the heap, with run number
+			 * Insert the tuple into the heap, with run number
 			 * currentRun if it can go into the current run, else run number
 			 * currentRun+1.  The tuple can go into the current run if it is
 			 * >= the first not-yet-output tuple.  (Actually, it could go into
@@ -773,7 +864,7 @@ puttuple_common(Tuplesortstate *state, void *tuple)
 			 * this point; see dumptuples.
 			 */
 			Assert(state->memtupcount > 0);
-			if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0)
+			if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
 				tuplesort_heap_insert(state, tuple, state->currentRun, true);
 			else
 				tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
@@ -795,6 +886,8 @@ puttuple_common(Tuplesortstate *state, void *tuple)
 void
 tuplesort_performsort(Tuplesortstate *state)
 {
+	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
 #ifdef TRACE_SORT
 	if (trace_sort)
 		elog(LOG, "performsort starting: %s",
@@ -813,7 +906,7 @@ tuplesort_performsort(Tuplesortstate *state)
 			{
 				qsort_tuplesortstate = state;
 				qsort((void *) state->memtuples, state->memtupcount,
-					  sizeof(void *), qsort_comparetup);
+					  sizeof(SortTuple), qsort_comparetup);
 			}
 			state->current = 0;
 			state->eof_reached = false;
@@ -847,19 +940,20 @@ tuplesort_performsort(Tuplesortstate *state)
 			 (state->status == TSS_FINALMERGE) ? " (except final merge)" : "",
 			 pg_rusage_show(&state->ru_start));
 #endif
+
+	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
- * Fetch the next tuple in either forward or back direction.
- * Returns NULL if no more tuples.	If should_free is set, the
- * caller must pfree the returned tuple when done with it.
+ * Internal routine to fetch the next tuple in either forward or back
+ * direction into *stup.  Returns FALSE if no more tuples.
+ * If *should_free is set, the caller must pfree stup.tuple when done with it.
  */
-void *
-tuplesort_gettuple(Tuplesortstate *state, bool forward,
-				   bool *should_free)
+static bool
+tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
+						  SortTuple *stup, bool *should_free)
 {
 	unsigned int tuplen;
-	void	   *tup;
 
 	switch (state->status)
 	{
@@ -869,14 +963,17 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
 			if (forward)
 			{
 				if (state->current < state->memtupcount)
-					return state->memtuples[state->current++];
+				{
+					*stup = state->memtuples[state->current++];
+					return true;
+				}
 				state->eof_reached = true;
-				return NULL;
+				return false;
 			}
 			else
 			{
 				if (state->current <= 0)
-					return NULL;
+					return false;
 
 				/*
 				 * if all tuples are fetched already then we return last
@@ -888,9 +985,10 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
 				{
 					state->current--;	/* last returned tuple */
 					if (state->current <= 0)
-						return NULL;
+						return false;
 				}
-				return state->memtuples[state->current - 1];
+				*stup = state->memtuples[state->current - 1];
+				return true;
 			}
 			break;
 
@@ -900,16 +998,16 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
 			if (forward)
 			{
 				if (state->eof_reached)
-					return NULL;
+					return false;
 				if ((tuplen = getlen(state, state->result_tape, true)) != 0)
 				{
-					tup = READTUP(state, state->result_tape, tuplen);
-					return tup;
+					READTUP(state, stup, state->result_tape, tuplen);
+					return true;
 				}
 				else
 				{
 					state->eof_reached = true;
-					return NULL;
+					return false;
 				}
 			}
 
@@ -929,7 +1027,7 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
 				if (!LogicalTapeBackspace(state->tapeset,
 										  state->result_tape,
 										  2 * sizeof(unsigned int)))
-					return NULL;
+					return false;
 				state->eof_reached = false;
 			}
 			else
@@ -941,7 +1039,7 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
 				if (!LogicalTapeBackspace(state->tapeset,
 										  state->result_tape,
 										  sizeof(unsigned int)))
-					return NULL;
+					return false;
 				tuplen = getlen(state, state->result_tape, false);
 
 				/*
@@ -961,7 +1059,7 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
 											  state->result_tape,
 											  tuplen + sizeof(unsigned int)))
 						elog(ERROR, "bogus tuple length in backward scan");
-					return NULL;
+					return false;
 				}
 			}
 
@@ -976,8 +1074,8 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
 									  state->result_tape,
 									  tuplen))
 				elog(ERROR, "bogus tuple length in backward scan");
-			tup = READTUP(state, state->result_tape, tuplen);
-			return tup;
+			READTUP(state, stup, state->result_tape, tuplen);
+			return true;
 
 		case TSS_FINALMERGE:
 			Assert(forward);
@@ -988,16 +1086,19 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
 			 */
 			if (state->memtupcount > 0)
 			{
-				int			srcTape = state->memtupindex[0];
+				int			srcTape = state->memtuples[0].tupindex;
 				Size		tuplen;
 				int			tupIndex;
-				void	   *newtup;
+				SortTuple  *newtup;
 
-				tup = state->memtuples[0];
+				*stup = state->memtuples[0];
 				/* returned tuple is no longer counted in our memory space */
-				tuplen = GetMemoryChunkSpace(tup);
-				state->availMem += tuplen;
-				state->mergeavailmem[srcTape] += tuplen;
+				if (stup->tuple)
+				{
+					tuplen = GetMemoryChunkSpace(stup->tuple);
+					state->availMem += tuplen;
+					state->mergeavailmem[srcTape] += tuplen;
+				}
 				tuplesort_heap_siftup(state, false);
 				if ((tupIndex = state->mergenext[srcTape]) == 0)
 				{
@@ -1010,26 +1111,48 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
 					 * if still no data, we've reached end of run on this tape
 					 */
 					if ((tupIndex = state->mergenext[srcTape]) == 0)
-						return tup;
+						return true;
 				}
 				/* pull next preread tuple from list, insert in heap */
-				newtup = state->memtuples[tupIndex];
-				state->mergenext[srcTape] = state->memtupindex[tupIndex];
+				newtup = &state->memtuples[tupIndex];
+				state->mergenext[srcTape] = newtup->tupindex;
 				if (state->mergenext[srcTape] == 0)
 					state->mergelast[srcTape] = 0;
-				state->memtupindex[tupIndex] = state->mergefreelist;
-				state->mergefreelist = tupIndex;
 				tuplesort_heap_insert(state, newtup, srcTape, false);
-				return tup;
+				/* put the now-unused memtuples entry on the freelist */
+				newtup->tupindex = state->mergefreelist;
+				state->mergefreelist = tupIndex;
+				state->mergeslotsfree++;
+				return true;
 			}
-			return NULL;
+			return false;
 
 		default:
 			elog(ERROR, "invalid tuplesort state");
-			return NULL;		/* keep compiler quiet */
+			return false;		/* keep compiler quiet */
 	}
 }
 
+/*
+ * Fetch the next tuple in either forward or back direction.
+ * Returns NULL if no more tuples.	If *should_free is set, the
+ * caller must pfree the returned tuple when done with it.
+ */
+void *
+tuplesort_gettuple(Tuplesortstate *state, bool forward,
+				   bool *should_free)
+{
+	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	SortTuple	stup;
+
+	if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
+		stup.tuple = NULL;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return stup.tuple;
+}
+
 /*
  * Fetch the next Datum in either forward or back direction.
  * Returns FALSE if no more datums.
@@ -1041,53 +1164,63 @@ bool
 tuplesort_getdatum(Tuplesortstate *state, bool forward,
 				   Datum *val, bool *isNull)
 {
-	DatumTuple *tuple;
+	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	SortTuple	stup;
 	bool		should_free;
 
-	tuple = (DatumTuple *) tuplesort_gettuple(state, forward, &should_free);
-
-	if (tuple == NULL)
+	if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
+	{
+		MemoryContextSwitchTo(oldcontext);
 		return false;
+	}
 
-	if (tuple->isNull || state->datumTypeByVal)
+	if (stup.isnull1 || state->datumTypeByVal)
 	{
-		*val = tuple->val;
-		*isNull = tuple->isNull;
+		*val = stup.datum1;
+		*isNull = stup.isnull1;
 	}
 	else
 	{
-		*val = datumCopy(tuple->val, false, state->datumTypeLen);
+		if (should_free)
+			*val = stup.datum1;
+		else
+			*val = datumCopy(stup.datum1, false, state->datumTypeLen);
 		*isNull = false;
 	}
 
-	if (should_free)
-		pfree(tuple);
+	MemoryContextSwitchTo(oldcontext);
 
 	return true;
 }
 
 /*
  * tuplesort_merge_order - report merge order we'll use for given memory
+ * (note: "merge order" just means the number of input tapes in the merge).
  *
  * This is exported for use by the planner.  allowedMem is in bytes.
- *
- * This must match the calculation in inittapes.  The only reason we
- * don't fold the code together is that inittapes wants to know if the
- * MINTAPES limitation applies or not.
  */
 int
 tuplesort_merge_order(long allowedMem)
 {
-	int			maxTapes;
+	int			mOrder;
 
-	/* see inittapes for comments */
-	maxTapes = (int) ((allowedMem - TAPE_BUFFER_OVERHEAD) /
-					  (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD)) + 1;
+	/*
+	 * We need one tape for each merge input, plus another one for the
+	 * output, and each of these tapes needs buffer space.  In addition
+	 * we want MERGE_BUFFER_SIZE workspace per input tape (but the output
+	 * tape doesn't count).
+	 *
+	 * Note: you might be thinking we need to account for the memtuples[]
+	 * array in this calculation, but we effectively treat that as part of
+	 * the MERGE_BUFFER_SIZE workspace.
+	 */
+	mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
+		(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
 
-	maxTapes = Max(maxTapes, MINTAPES);
+	/* Even in minimum memory, use at least a MINORDER merge */
+	mOrder = Max(mOrder, MINORDER);
 
-	/* The merge order is one less than the number of tapes */
-	return maxTapes - 1;
+	return mOrder;
 }
 
 /*
@@ -1101,39 +1234,18 @@ inittapes(Tuplesortstate *state)
 	int			maxTapes,
 				ntuples,
 				j;
+	long		tapeSpace;
 
-	/*
-	 * Determine the number of tapes to use based on allowed memory.
-	 *
-	 * We need T+1 tapes to do a T-way merge, and we want MERGE_BUFFER_SIZE
-	 * tuple workspace for each input tape of the merge.  The output tape
-	 * doesn't account for tuple workspace but it does need tape buffer space.
-	 *
-	 * Keep this code in sync with tuplesort_merge_order!
-	 */
-	maxTapes = (int) ((state->allowedMem - TAPE_BUFFER_OVERHEAD) /
-					  (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD)) + 1;
+	/* Compute number of tapes to use: merge order plus 1 */
+	maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
 
 	/*
-	 * We will use at least MINTAPES regardless, but otherwise we decrease
-	 * availMem to reflect the space that goes into buffers.
+	 * We must have at least 2*maxTapes slots in the memtuples[] array, else
+	 * we'd not have room for merge heap plus preread.  It seems unlikely
+	 * that this case would ever occur, but be safe.
 	 */
-	if (maxTapes >= MINTAPES)
-	{
-		/* maxTapes is OK, adjust availMem */
-		USEMEM(state, maxTapes * TAPE_BUFFER_OVERHEAD);
-	}
-	else
-	{
-		/*
-		 * Force minimum tape count.  In this path we ignore the tape buffers
-		 * in our space calculation, to avoid driving availMem permanently
-		 * negative if allowedMem is really tiny.  (This matches the pre-8.2
-		 * behavior which was to ignore the tape buffers always, on the
-		 * grounds that they were fixed-size overhead.)
-		 */
-		maxTapes = MINTAPES;
-	}
+	maxTapes = Min(maxTapes, state->memtupsize / 2);
+
 	state->maxTapes = maxTapes;
 	state->tapeRange = maxTapes - 1;
 
@@ -1143,6 +1255,19 @@ inittapes(Tuplesortstate *state)
 			 maxTapes, pg_rusage_show(&state->ru_start));
 #endif
 
+	/*
+	 * Decrease availMem to reflect the space needed for tape buffers; but
+	 * don't decrease it to the point that we have no room for tuples.
+	 * (That case is only likely to occur if sorting pass-by-value Datums;
+	 * in all other scenarios the memtuples[] array is unlikely to occupy
+	 * more than half of allowedMem.  In the pass-by-value case it's not
+	 * important to account for tuple space, so we don't care if LACKMEM
+	 * becomes inaccurate.)
+	 */
+	tapeSpace = maxTapes * TAPE_BUFFER_OVERHEAD;
+	if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
+		USEMEM(state, tapeSpace);
+
 	/*
 	 * Create the tape set and allocate the per-tape data arrays.
 	 */
@@ -1157,13 +1282,6 @@ inittapes(Tuplesortstate *state)
 	state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
 	state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
 
-	/*
-	 * Allocate the memtupindex array, same size as memtuples.
-	 */
-	state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));
-
-	USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
-
 	/*
 	 * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
 	 * marked as belonging to run number zero.
@@ -1175,7 +1293,12 @@ inittapes(Tuplesortstate *state)
 	ntuples = state->memtupcount;
 	state->memtupcount = 0;		/* make the heap empty */
 	for (j = 0; j < ntuples; j++)
-		tuplesort_heap_insert(state, state->memtuples[j], 0, false);
+	{
+		/* Must copy source tuple to avoid possible overwrite */
+		SortTuple	stup = state->memtuples[j];
+
+		tuplesort_heap_insert(state, &stup, 0, false);
+	}
 	Assert(state->memtupcount == ntuples);
 
 	state->currentRun = 0;
@@ -1360,7 +1483,7 @@ mergeonerun(Tuplesortstate *state)
 	int			destTape = state->tp_tapenum[state->tapeRange];
 	int			srcTape;
 	int			tupIndex;
-	void	   *tup;
+	SortTuple  *tup;
 	long		priorAvail,
 				spaceFreed;
 
@@ -1380,8 +1503,8 @@ mergeonerun(Tuplesortstate *state)
 		CHECK_FOR_INTERRUPTS();
 		/* write the tuple to destTape */
 		priorAvail = state->availMem;
-		srcTape = state->memtupindex[0];
-		WRITETUP(state, destTape, state->memtuples[0]);
+		srcTape = state->memtuples[0].tupindex;
+		WRITETUP(state, destTape, &state->memtuples[0]);
 		/* writetup adjusted total free space, now fix per-tape space */
 		spaceFreed = state->availMem - priorAvail;
 		state->mergeavailmem[srcTape] += spaceFreed;
@@ -1396,13 +1519,15 @@ mergeonerun(Tuplesortstate *state)
 				continue;
 		}
 		/* pull next preread tuple from list, insert in heap */
-		tup = state->memtuples[tupIndex];
-		state->mergenext[srcTape] = state->memtupindex[tupIndex];
+		tup = &state->memtuples[tupIndex];
+		state->mergenext[srcTape] = tup->tupindex;
 		if (state->mergenext[srcTape] == 0)
 			state->mergelast[srcTape] = 0;
-		state->memtupindex[tupIndex] = state->mergefreelist;
-		state->mergefreelist = tupIndex;
 		tuplesort_heap_insert(state, tup, srcTape, false);
+		/* put the now-unused memtuples entry on the freelist */
+		tup->tupindex = state->mergefreelist;
+		state->mergefreelist = tupIndex;
+		state->mergeslotsfree++;
 	}
 
 	/*
@@ -1444,6 +1569,8 @@ beginmerge(Tuplesortstate *state)
 	memset(state->mergeavailmem, 0, state->maxTapes * sizeof(*state->mergeavailmem));
 	state->mergefreelist = 0;	/* nothing in the freelist */
 	state->mergefirstfree = state->maxTapes;  /* 1st slot avail for preread */
+	state->mergeslotsfree = state->memtupsize - state->mergefirstfree;
+	Assert(state->mergeslotsfree >= state->maxTapes);
 
 	/* Adjust run counts and mark the active tapes */
 	activeTapes = 0;
@@ -1483,17 +1610,19 @@ beginmerge(Tuplesortstate *state)
 	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
 	{
 		int			tupIndex = state->mergenext[srcTape];
-		void	   *tup;
+		SortTuple  *tup;
 
 		if (tupIndex)
 		{
-			tup = state->memtuples[tupIndex];
-			state->mergenext[srcTape] = state->memtupindex[tupIndex];
+			tup = &state->memtuples[tupIndex];
+			state->mergenext[srcTape] = tup->tupindex;
 			if (state->mergenext[srcTape] == 0)
 				state->mergelast[srcTape] = 0;
-			state->memtupindex[tupIndex] = state->mergefreelist;
-			state->mergefreelist = tupIndex;
 			tuplesort_heap_insert(state, tup, srcTape, false);
+			/* put the now-unused memtuples entry on the freelist */
+			tup->tupindex = state->mergefreelist;
+			state->mergefreelist = tupIndex;
+			state->mergeslotsfree++;
 		}
 	}
 }
@@ -1512,7 +1641,7 @@ mergepreread(Tuplesortstate *state)
 {
 	int			srcTape;
 	unsigned int tuplen;
-	void	   *tup;
+	SortTuple	stup;
 	int			tupIndex;
 	long		priorAvail,
 				spaceUsed;
@@ -1534,11 +1663,13 @@ mergepreread(Tuplesortstate *state)
 
 		/*
 		 * Read tuples from this tape until it has used up its free memory,
-		 * but ensure that we have at least one.
+		 * or we are low on memtuples slots; but ensure that we have at least
+		 * one tuple.
 		 */
 		priorAvail = state->availMem;
 		state->availMem = state->mergeavailmem[srcTape];
-		while (!LACKMEM(state) || state->mergenext[srcTape] == 0)
+		while ((!LACKMEM(state) && state->mergeslotsfree > state->tapeRange) ||
+			   state->mergenext[srcTape] == 0)
 		{
 			/* read next tuple, if any */
 			if ((tuplen = getlen(state, srcTape, true)) == 0)
@@ -1546,35 +1677,22 @@ mergepreread(Tuplesortstate *state)
 				state->mergeactive[srcTape] = false;
 				break;
 			}
-			tup = READTUP(state, srcTape, tuplen);
-			/* find or make a free slot in memtuples[] for it */
+			READTUP(state, &stup, srcTape, tuplen);
+			/* find a free slot in memtuples[] for it */
 			tupIndex = state->mergefreelist;
 			if (tupIndex)
-				state->mergefreelist = state->memtupindex[tupIndex];
+				state->mergefreelist = state->memtuples[tupIndex].tupindex;
 			else
 			{
 				tupIndex = state->mergefirstfree++;
-				/* Might need to enlarge arrays! */
-				if (tupIndex >= state->memtupsize)
-				{
-					FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
-					FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));
-					state->memtupsize *= 2;
-					state->memtuples = (void **)
-						repalloc(state->memtuples,
-								 state->memtupsize * sizeof(void *));
-					state->memtupindex = (int *)
-						repalloc(state->memtupindex,
-								 state->memtupsize * sizeof(int));
-					USEMEM(state, GetMemoryChunkSpace(state->memtuples));
-					USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
-				}
+				Assert(tupIndex < state->memtupsize);
 			}
+			state->mergeslotsfree--;
 			/* store tuple, append to list for its tape */
-			state->memtuples[tupIndex] = tup;
-			state->memtupindex[tupIndex] = 0;
+			stup.tupindex = 0;
+			state->memtuples[tupIndex] = stup;
 			if (state->mergelast[srcTape])
-				state->memtupindex[state->mergelast[srcTape]] = tupIndex;
+				state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
 			else
 				state->mergenext[srcTape] = tupIndex;
 			state->mergelast[srcTape] = tupIndex;
@@ -1593,7 +1711,8 @@ mergepreread(Tuplesortstate *state)
  *
  * When alltuples = false, dump only enough tuples to get under the
  * availMem limit (and leave at least one tuple in the heap in any case,
- * since puttuple assumes it always has a tuple to compare to).
+ * since puttuple assumes it always has a tuple to compare to).  We also
+ * insist there be at least one free slot in the memtuples[] array.
  *
  * When alltuples = true, dump everything currently in memory.
  * (This case is only used at end of input data.)
@@ -1606,7 +1725,8 @@ static void
 dumptuples(Tuplesortstate *state, bool alltuples)
 {
 	while (alltuples ||
-		   (LACKMEM(state) && state->memtupcount > 1))
+		   (LACKMEM(state) && state->memtupcount > 1) ||
+		   state->memtupcount >= state->memtupsize)
 	{
 		/*
 		 * Dump the heap's frontmost entry, and sift up to remove it from the
@@ -1614,7 +1734,7 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 		 */
 		Assert(state->memtupcount > 0);
 		WRITETUP(state, state->tp_tapenum[state->destTape],
-				 state->memtuples[0]);
+				 &state->memtuples[0]);
 		tuplesort_heap_siftup(state, true);
 
 		/*
@@ -1622,7 +1742,7 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 		 * finished the current run.
 		 */
 		if (state->memtupcount == 0 ||
-			state->currentRun != state->memtupindex[0])
+			state->currentRun != state->memtuples[0].tupindex)
 		{
 			markrunend(state, state->tp_tapenum[state->destTape]);
 			state->currentRun++;
@@ -1642,7 +1762,7 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 			 */
 			if (state->memtupcount == 0)
 				break;
-			Assert(state->currentRun == state->memtupindex[0]);
+			Assert(state->currentRun == state->memtuples[0].tupindex);
 			selectnewtape(state);
 		}
 	}
@@ -1654,6 +1774,8 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 void
 tuplesort_rescan(Tuplesortstate *state)
 {
+	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
 	Assert(state->randomAccess);
 
 	switch (state->status)
@@ -1677,6 +1799,8 @@ tuplesort_rescan(Tuplesortstate *state)
 			elog(ERROR, "invalid tuplesort state");
 			break;
 	}
+
+	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
@@ -1685,6 +1809,8 @@ tuplesort_rescan(Tuplesortstate *state)
 void
 tuplesort_markpos(Tuplesortstate *state)
 {
+	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
 	Assert(state->randomAccess);
 
 	switch (state->status)
@@ -1704,6 +1830,8 @@ tuplesort_markpos(Tuplesortstate *state)
 			elog(ERROR, "invalid tuplesort state");
 			break;
 	}
+
+	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
@@ -1713,6 +1841,8 @@ tuplesort_markpos(Tuplesortstate *state)
 void
 tuplesort_restorepos(Tuplesortstate *state)
 {
+	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
 	Assert(state->randomAccess);
 
 	switch (state->status)
@@ -1733,52 +1863,50 @@ tuplesort_restorepos(Tuplesortstate *state)
 			elog(ERROR, "invalid tuplesort state");
 			break;
 	}
+
+	MemoryContextSwitchTo(oldcontext);
 }
 
 
 /*
  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
  *
- * The heap lives in state->memtuples[], with parallel data storage
- * for indexes in state->memtupindex[].  If checkIndex is true, use
- * the tuple index as the front of the sort key; otherwise, no.
+ * Compare two SortTuples.  If checkIndex is true, use the tuple index
+ * as the front of the sort key; otherwise, no.
  */
 
-#define HEAPCOMPARE(tup1,index1,tup2,index2) \
-	(checkIndex && (index1 != index2) ? (index1) - (index2) : \
+#define HEAPCOMPARE(tup1,tup2) \
+	(checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
+	 ((tup1)->tupindex) - ((tup2)->tupindex) : \
 	 COMPARETUP(state, tup1, tup2))
 
 /*
  * Insert a new tuple into an empty or existing heap, maintaining the
- * heap invariant.
+ * heap invariant.  Caller is responsible for ensuring there's room.
+ *
+ * Note: we assume *tuple is a temporary variable that can be scribbled on.
+ * For some callers, tuple actually points to a memtuples[] entry above the
+ * end of the heap.  This is safe as long as it's not immediately adjacent
+ * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
+ * is, it might get overwritten before being moved into the heap!
  */
 static void
-tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
+tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
 					  int tupleindex, bool checkIndex)
 {
-	void	  **memtuples;
-	int		   *memtupindex;
+	SortTuple  *memtuples;
 	int			j;
 
 	/*
-	 * Make sure memtuples[] can handle another entry.
+	 * Save the tupleindex --- see notes above about writing on *tuple.
+	 * It's a historical artifact that tupleindex is passed as a separate
+	 * argument and not in *tuple, but it's notationally convenient so
+	 * let's leave it that way.
 	 */
-	if (state->memtupcount >= state->memtupsize)
-	{
-		FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
-		FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));
-		state->memtupsize *= 2;
-		state->memtuples = (void **)
-			repalloc(state->memtuples,
-					 state->memtupsize * sizeof(void *));
-		state->memtupindex = (int *)
-			repalloc(state->memtupindex,
-					 state->memtupsize * sizeof(int));
-		USEMEM(state, GetMemoryChunkSpace(state->memtuples));
-		USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
-	}
+	tuple->tupindex = tupleindex;
+
 	memtuples = state->memtuples;
-	memtupindex = state->memtupindex;
+	Assert(state->memtupcount < state->memtupsize);
 
 	/*
 	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
@@ -1789,15 +1917,12 @@ tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
 	{
 		int			i = (j - 1) >> 1;
 
-		if (HEAPCOMPARE(tuple, tupleindex,
-						memtuples[i], memtupindex[i]) >= 0)
+		if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
 			break;
 		memtuples[j] = memtuples[i];
-		memtupindex[j] = memtupindex[i];
 		j = i;
 	}
-	memtuples[j] = tuple;
-	memtupindex[j] = tupleindex;
+	memtuples[j] = *tuple;
 }
 
 /*
@@ -1807,18 +1932,15 @@ tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
 static void
 tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
 {
-	void	  **memtuples = state->memtuples;
-	int		   *memtupindex = state->memtupindex;
-	void	   *tuple;
-	int			tupindex,
-				i,
+	SortTuple  *memtuples = state->memtuples;
+	SortTuple  *tuple;
+	int			i,
 				n;
 
 	if (--state->memtupcount <= 0)
 		return;
 	n = state->memtupcount;
-	tuple = memtuples[n];		/* tuple that must be reinserted */
-	tupindex = memtupindex[n];
+	tuple = &memtuples[n];		/* tuple that must be reinserted */
 	i = 0;						/* i is where the "hole" is */
 	for (;;)
 	{
@@ -1827,18 +1949,14 @@ tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
 		if (j >= n)
 			break;
 		if (j + 1 < n &&
-			HEAPCOMPARE(memtuples[j], memtupindex[j],
-						memtuples[j + 1], memtupindex[j + 1]) > 0)
+			HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
 			j++;
-		if (HEAPCOMPARE(tuple, tupindex,
-						memtuples[j], memtupindex[j]) <= 0)
+		if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
 			break;
 		memtuples[i] = memtuples[j];
-		memtupindex[i] = memtupindex[j];
 		i = j;
 	}
-	memtuples[i] = tuple;
-	memtupindex[i] = tupindex;
+	memtuples[i] = *tuple;
 }
 
 
@@ -1875,9 +1993,10 @@ markrunend(Tuplesortstate *state, int tapenum)
 static int
 qsort_comparetup(const void *a, const void *b)
 {
-	/* The passed pointers are pointers to void * ... */
-
-	return COMPARETUP(qsort_tuplesortstate, *(void **) a, *(void **) b);
+	/* The passed pointers are pointers to SortTuple ... */
+	return COMPARETUP(qsort_tuplesortstate,
+					  (const SortTuple *) a,
+					  (const SortTuple *) b);
 }
 
 
@@ -2095,22 +2214,35 @@ ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
  */
 
 static int
-comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
+comparetup_heap(Tuplesortstate *state, const SortTuple *a, const SortTuple *b)
 {
-	HeapTuple	ltup = (HeapTuple) a;
-	HeapTuple	rtup = (HeapTuple) b;
-	TupleDesc	tupDesc = state->tupDesc;
+	ScanKey		scanKey = state->scanKeys;
+	HeapTuple	ltup;
+	HeapTuple	rtup;
+	TupleDesc	tupDesc;
 	int			nkey;
-
-	for (nkey = 0; nkey < state->nKeys; nkey++)
+	int32		compare;
+
+	/* Compare the leading sort key */
+	compare = inlineApplySortFunction(&scanKey->sk_func,
+									  state->sortFnKinds[0],
+									  a->datum1, a->isnull1,
+									  b->datum1, b->isnull1);
+	if (compare != 0)
+		return compare;
+
+	/* Compare additional sort keys */
+	ltup = (HeapTuple) a->tuple;
+	rtup = (HeapTuple) b->tuple;
+	tupDesc = state->tupDesc;
+	scanKey++;
+	for (nkey = 1; nkey < state->nKeys; nkey++, scanKey++)
 	{
-		ScanKey		scanKey = state->scanKeys + nkey;
 		AttrNumber	attno = scanKey->sk_attno;
 		Datum		datum1,
 					datum2;
 		bool		isnull1,
 					isnull2;
-		int32		compare;
 
 		datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
 		datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
@@ -2126,14 +2258,19 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
 	return 0;
 }
 
-static void *
-copytup_heap(Tuplesortstate *state, void *tup)
+static void
+copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
 {
 	HeapTuple	tuple = (HeapTuple) tup;
 
-	tuple = heap_copytuple(tuple);
-	USEMEM(state, GetMemoryChunkSpace(tuple));
-	return (void *) tuple;
+	/* copy the tuple into sort storage */
+	stup->tuple = (void *) heap_copytuple(tuple);
+	USEMEM(state, GetMemoryChunkSpace(stup->tuple));
+	/* set up first-column key value */
+	stup->datum1 = heap_getattr((HeapTuple) stup->tuple,
+								state->scanKeys[0].sk_attno,
+								state->tupDesc,
+								&stup->isnull1);
 }
 
 /*
@@ -2141,9 +2278,9 @@ copytup_heap(Tuplesortstate *state, void *tup)
  */
 
 static void
-writetup_heap(Tuplesortstate *state, int tapenum, void *tup)
+writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
 {
-	HeapTuple	tuple = (HeapTuple) tup;
+	HeapTuple	tuple = (HeapTuple) stup->tuple;
 	unsigned int tuplen;
 
 	tuplen = tuple->t_len + sizeof(tuplen);
@@ -2159,8 +2296,9 @@ writetup_heap(Tuplesortstate *state, int tapenum, void *tup)
 	heap_freetuple(tuple);
 }
 
-static void *
-readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
+static void
+readtup_heap(Tuplesortstate *state, SortTuple *stup,
+			 int tapenum, unsigned int len)
 {
 	unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
 	HeapTuple	tuple = (HeapTuple) palloc(tuplen);
@@ -2169,6 +2307,7 @@ readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
 	/* reconstruct the HeapTupleData portion */
 	tuple->t_len = len - sizeof(unsigned int);
 	ItemPointerSetInvalid(&(tuple->t_self));
+	tuple->t_tableOid = InvalidOid;
 	tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
 	/* read in the tuple proper */
 	if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple->t_data,
@@ -2178,7 +2317,12 @@ readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
 		if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
 							sizeof(tuplen)) != sizeof(tuplen))
 			elog(ERROR, "unexpected end of data");
-	return (void *) tuple;
+	stup->tuple = (void *) tuple;
+	/* set up first-column key value */
+	stup->datum1 = heap_getattr(tuple,
+								state->scanKeys[0].sk_attno,
+								state->tupDesc,
+								&stup->isnull1);
 }
 
 
@@ -2192,46 +2336,61 @@ readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
  */
 
 static int
-comparetup_index(Tuplesortstate *state, const void *a, const void *b)
+comparetup_index(Tuplesortstate *state, const SortTuple *a, const SortTuple *b)
 {
 	/*
-	 * This is almost the same as _bt_tuplecompare(), but we need to keep
-	 * track of whether any null fields are present.  Also see the special
-	 * treatment for equal keys at the end.
+	 * This is similar to _bt_tuplecompare(), but we have already done the
+	 * index_getattr calls for the first column, and we need to keep track
+	 * of whether any null fields are present.  Also see the special treatment
+	 * for equal keys at the end.
 	 */
-	IndexTuple	tuple1 = (IndexTuple) a;
-	IndexTuple	tuple2 = (IndexTuple) b;
-	Relation	rel = state->indexRel;
-	int			keysz = RelationGetNumberOfAttributes(rel);
-	ScanKey		scankey = state->indexScanKey;
+	ScanKey		scanKey = state->indexScanKey;
+	IndexTuple	tuple1;
+	IndexTuple	tuple2;
+	int			keysz;
 	TupleDesc	tupDes;
-	int			i;
 	bool		equal_hasnull = false;
-
-	tupDes = RelationGetDescr(rel);
-
-	for (i = 1; i <= keysz; i++)
+	int			nkey;
+	int32		compare;
+
+	/* Compare the leading sort key */
+	compare = inlineApplySortFunction(&scanKey->sk_func,
+									  SORTFUNC_CMP,
+									  a->datum1, a->isnull1,
+									  b->datum1, b->isnull1);
+	if (compare != 0)
+		return compare;
+
+	/* they are equal, so we only need to examine one null flag */
+	if (a->isnull1)
+		equal_hasnull = true;
+
+	/* Compare additional sort keys */
+	tuple1 = (IndexTuple) a->tuple;
+	tuple2 = (IndexTuple) b->tuple;
+	keysz = state->nKeys;
+	tupDes = RelationGetDescr(state->indexRel);
+	scanKey++;
+	for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
 	{
-		ScanKey		entry = &scankey[i - 1];
 		Datum		datum1,
 					datum2;
 		bool		isnull1,
 					isnull2;
-		int32		compare;
 
-		datum1 = index_getattr(tuple1, i, tupDes, &isnull1);
-		datum2 = index_getattr(tuple2, i, tupDes, &isnull2);
+		datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
+		datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
 
 		/* see comments about NULLs handling in btbuild */
 
 		/* the comparison function is always of CMP type */
-		compare = inlineApplySortFunction(&entry->sk_func, SORTFUNC_CMP,
+		compare = inlineApplySortFunction(&scanKey->sk_func,
+										  SORTFUNC_CMP,
 										  datum1, isnull1,
 										  datum2, isnull2);
 
 		if (compare != 0)
-			return (int) compare;		/* done when we find unequal
-										 * attributes */
+			return compare;		/* done when we find unequal attributes */
 
 		/* they are equal, so we only need to examine one null flag */
 		if (isnull1)
@@ -2249,8 +2408,9 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b)
 	 *
 	 * Some rather brain-dead implementations of qsort will sometimes call the
 	 * comparison routine to compare a value to itself.  (At this writing only
-	 * QNX 4 is known to do such silly things.)  Don't raise a bogus error in
-	 * that case.   Update: The QNX port is gone.
+	 * QNX 4 is known to do such silly things; we don't support QNX anymore,
+	 * but perhaps the behavior still exists elsewhere.)  Don't raise a bogus
+	 * error in that case.
 	 */
 	if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
 		ereport(ERROR,
@@ -2282,25 +2442,29 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b)
 	return 0;
 }
 
-static void *
-copytup_index(Tuplesortstate *state, void *tup)
+static void
+copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
 {
 	IndexTuple	tuple = (IndexTuple) tup;
 	unsigned int tuplen = IndexTupleSize(tuple);
 	IndexTuple	newtuple;
 
+	/* copy the tuple into sort storage */
 	newtuple = (IndexTuple) palloc(tuplen);
-	USEMEM(state, GetMemoryChunkSpace(newtuple));
-
 	memcpy(newtuple, tuple, tuplen);
-
-	return (void *) newtuple;
+	USEMEM(state, GetMemoryChunkSpace(newtuple));
+	stup->tuple = (void *) newtuple;
+	/* set up first-column key value */
+	stup->datum1 = index_getattr(newtuple,
+								 1,
+								 RelationGetDescr(state->indexRel),
+								 &stup->isnull1);
 }
 
 static void
-writetup_index(Tuplesortstate *state, int tapenum, void *tup)
+writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
 {
-	IndexTuple	tuple = (IndexTuple) tup;
+	IndexTuple	tuple = (IndexTuple) stup->tuple;
 	unsigned int tuplen;
 
 	tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
@@ -2316,8 +2480,9 @@ writetup_index(Tuplesortstate *state, int tapenum, void *tup)
 	pfree(tuple);
 }
 
-static void *
-readtup_index(Tuplesortstate *state, int tapenum, unsigned int len)
+static void
+readtup_index(Tuplesortstate *state, SortTuple *stup,
+			  int tapenum, unsigned int len)
 {
 	unsigned int tuplen = len - sizeof(unsigned int);
 	IndexTuple	tuple = (IndexTuple) palloc(tuplen);
@@ -2330,7 +2495,12 @@ readtup_index(Tuplesortstate *state, int tapenum, unsigned int len)
 		if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
 							sizeof(tuplen)) != sizeof(tuplen))
 			elog(ERROR, "unexpected end of data");
-	return (void *) tuple;
+	stup->tuple = (void *) tuple;
+	/* set up first-column key value */
+	stup->datum1 = index_getattr(tuple,
+								 1,
+								 RelationGetDescr(state->indexRel),
+								 &stup->isnull1);
 }
 
 
@@ -2339,39 +2509,42 @@ readtup_index(Tuplesortstate *state, int tapenum, unsigned int len)
  */
 
 static int
-comparetup_datum(Tuplesortstate *state, const void *a, const void *b)
+comparetup_datum(Tuplesortstate *state, const SortTuple *a, const SortTuple *b)
 {
-	DatumTuple *ltup = (DatumTuple *) a;
-	DatumTuple *rtup = (DatumTuple *) b;
-
 	return inlineApplySortFunction(&state->sortOpFn, state->sortFnKind,
-								   ltup->val, ltup->isNull,
-								   rtup->val, rtup->isNull);
+								   a->datum1, a->isnull1,
+								   b->datum1, b->isnull1);
 }
 
-static void *
-copytup_datum(Tuplesortstate *state, void *tup)
+static void
+copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
 {
 	/* Not currently needed */
 	elog(ERROR, "copytup_datum() should not be called");
-	return NULL;
 }
 
 static void
-writetup_datum(Tuplesortstate *state, int tapenum, void *tup)
+writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
 {
-	DatumTuple *tuple = (DatumTuple *) tup;
+	void	   *waddr;
 	unsigned int tuplen;
 	unsigned int writtenlen;
 
-	if (tuple->isNull || state->datumTypeByVal)
-		tuplen = sizeof(DatumTuple);
+	if (stup->isnull1)
+	{
+		waddr = NULL;
+		tuplen = 0;
+	}
+	else if (state->datumTypeByVal)
+	{
+		waddr = &stup->datum1;
+		tuplen = sizeof(Datum);
+	}
 	else
 	{
-		Size		datalen;
-
-		datalen = datumGetSize(tuple->val, false, state->datumTypeLen);
-		tuplen = datalen + MAXALIGN(sizeof(DatumTuple));
+		waddr = DatumGetPointer(stup->datum1);
+		tuplen = datumGetSize(stup->datum1, false, state->datumTypeLen);
+		Assert(tuplen != 0);
 	}
 
 	writtenlen = tuplen + sizeof(unsigned int);
@@ -2379,33 +2552,55 @@ writetup_datum(Tuplesortstate *state, int tapenum, void *tup)
 	LogicalTapeWrite(state->tapeset, tapenum,
 					 (void *) &writtenlen, sizeof(writtenlen));
 	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) tuple, tuplen);
+					 waddr, tuplen);
 	if (state->randomAccess)	/* need trailing length word? */
 		LogicalTapeWrite(state->tapeset, tapenum,
 						 (void *) &writtenlen, sizeof(writtenlen));
 
-	FREEMEM(state, GetMemoryChunkSpace(tuple));
-	pfree(tuple);
+	if (stup->tuple)
+	{
+		FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
+		pfree(stup->tuple);
+	}
 }
 
-static void *
-readtup_datum(Tuplesortstate *state, int tapenum, unsigned int len)
+static void
+readtup_datum(Tuplesortstate *state, SortTuple *stup,
+			  int tapenum, unsigned int len)
 {
 	unsigned int tuplen = len - sizeof(unsigned int);
-	DatumTuple *tuple = (DatumTuple *) palloc(tuplen);
 
-	USEMEM(state, GetMemoryChunkSpace(tuple));
-	if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
-						tuplen) != tuplen)
-		elog(ERROR, "unexpected end of data");
+	if (tuplen == 0)
+	{
+		/* it's NULL */
+		stup->datum1 = (Datum) 0;
+		stup->isnull1 = true;
+		stup->tuple = NULL;
+	}
+	else if (state->datumTypeByVal)
+	{
+		Assert(tuplen == sizeof(Datum));
+		if (LogicalTapeRead(state->tapeset, tapenum, (void *) &stup->datum1,
+							tuplen) != tuplen)
+			elog(ERROR, "unexpected end of data");
+		stup->isnull1 = false;
+		stup->tuple = NULL;
+	}
+	else
+	{
+		void   *raddr = palloc(tuplen);
+
+		if (LogicalTapeRead(state->tapeset, tapenum, raddr,
+							tuplen) != tuplen)
+			elog(ERROR, "unexpected end of data");
+		stup->datum1 = PointerGetDatum(raddr);
+		stup->isnull1 = false;
+		stup->tuple = raddr;
+		USEMEM(state, GetMemoryChunkSpace(raddr));
+	}
+
 	if (state->randomAccess)	/* need trailing length word? */
 		if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
 							sizeof(tuplen)) != sizeof(tuplen))
 			elog(ERROR, "unexpected end of data");
-
-	/* if pass-by-ref data type, must recompute the Datum pointer */
-	if (!tuple->isNull && !state->datumTypeByVal)
-		tuple->val = PointerGetDatum(((char *) tuple) +
-									 MAXALIGN(sizeof(DatumTuple)));
-	return (void *) tuple;
 }