From 5433b4838006ffa4da80e5cdf64452bccd2aabdc Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Sat, 2 Jun 2001 19:01:53 +0000
Subject: [PATCH] Tweak sorting so that nulls appear at the front of a
 descending sort (vs. at the end of a normal sort).  This ensures that
 explicit sorts yield the same ordering as a btree index scan.  To be really
 sure that that equivalence holds, we use the btree entries in pg_amop to
 decide whether we are looking at a '<' or '>' operator.  For a sort operator
 that has no btree association, we put the nulls at the front if the operator
 is named '>' ... pretty grotty, but it does the right thing in simple ASC and
 DESC cases, and at least there's no possibility of getting a different answer
 depending on the plan type chosen.

---
 src/backend/commands/analyze.c     |  27 +---
 src/backend/utils/sort/tuplesort.c | 231 ++++++++++++++++-------------
 src/include/utils/tuplesort.h      |  13 +-
 3 files changed, 144 insertions(+), 127 deletions(-)

diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 24cc7a8b254..c5f2799022a 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.17 2001/05/07 00:43:17 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.18 2001/06/02 19:01:53 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1439,27 +1439,12 @@ compare_scalars(const void *a, const void *b)
 	int			ta = ((ScalarItem *) a)->tupno;
 	Datum		db = ((ScalarItem *) b)->value;
 	int			tb = ((ScalarItem *) b)->tupno;
+	int32		compare;
 
-	if (datumCmpFnKind == SORTFUNC_LT)
-	{
-		if (DatumGetBool(FunctionCall2(datumCmpFn, da, db)))
-			return -1;			/* a < b */
-		if (DatumGetBool(FunctionCall2(datumCmpFn, db, da)))
-			return 1;			/* a > b */
-	}
-	else
-	{
-		/* sort function is CMP or REVCMP */
-		int32	compare;
-
-		compare = DatumGetInt32(FunctionCall2(datumCmpFn, da, db));
-		if (compare != 0)
-		{
-			if (datumCmpFnKind == SORTFUNC_REVCMP)
-				compare = -compare;
-			return compare;
-		}
-	}
+	compare = ApplySortFunction(datumCmpFn, datumCmpFnKind,
+								da, false, db, false);
+	if (compare != 0)
+		return compare;
 
 	/*
 	 * The two datums are equal, so update datumCmpTupnoLink[].
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 5a77c47c200..1391ea88c2e 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -78,7 +78,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.16 2001/05/07 00:43:24 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.17 2001/06/02 19:01:52 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -90,10 +90,12 @@
 #include "catalog/catname.h"
 #include "catalog/pg_amop.h"
 #include "catalog/pg_amproc.h"
+#include "catalog/pg_operator.h"
 #include "miscadmin.h"
 #include "utils/fmgroids.h"
 #include "utils/logtape.h"
 #include "utils/lsyscache.h"
+#include "utils/syscache.h"
 #include "utils/tuplesort.h"
 
 /*
@@ -1732,53 +1734,25 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
 	for (nkey = 0; nkey < state->nKeys; nkey++)
 	{
 		ScanKey		scanKey = state->scanKeys + nkey;
-		SortFunctionKind fnKind = state->sortFnKinds[nkey];
 		AttrNumber	attno = scanKey->sk_attno;
-		Datum		lattr,
-					rattr;
+		Datum		datum1,
+					datum2;
 		bool		isnull1,
 					isnull2;
+		int32		compare;
 
-		lattr = heap_getattr(ltup, attno, tupDesc, &isnull1);
-		rattr = heap_getattr(rtup, attno, tupDesc, &isnull2);
+		datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
+		datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
 
-		if (isnull1)
-		{
-			if (!isnull2)
-				return 1;		/* NULL sorts after non-NULL */
-		}
-		else if (isnull2)
-			return -1;
-		else
+		compare = ApplySortFunction(&scanKey->sk_func,
+									state->sortFnKinds[nkey],
+									datum1, isnull1, datum2, isnull2);
+		if (compare != 0)
 		{
-			int32		compare;
-
-			if (fnKind == SORTFUNC_LT)
-			{
-				if (DatumGetBool(FunctionCall2(&scanKey->sk_func,
-											   lattr, rattr)))
-					compare = -1;	/* a < b */
-				else if (DatumGetBool(FunctionCall2(&scanKey->sk_func,
-													rattr, lattr)))
-					compare = 1;	/* a > b */
-				else
-					compare = 0;
-			}
-			else
-			{
-				/* sort function is CMP or REVCMP */
-				compare = DatumGetInt32(FunctionCall2(&scanKey->sk_func,
-													  lattr, rattr));
-				if (fnKind == SORTFUNC_REVCMP)
-					compare = -compare;
-			}
-
-			if (compare != 0)
-			{
-				if (scanKey->sk_flags & SK_COMMUTE)
-					compare = -compare;
-				return compare;
-			}
+			/* dead code? SK_COMMUTE can't actually be set here, can it? */
+			if (scanKey->sk_flags & SK_COMMUTE)
+				compare = -compare;
+			return compare;
 		}
 	}
 
@@ -1861,7 +1835,6 @@ tuplesize_heap(Tuplesortstate *state, void *tup)
 static int
 comparetup_index(Tuplesortstate *state, const void *a, const void *b)
 {
-
 	/*
 	 * This is almost the same as _bt_tuplecompare(), but we need to keep
 	 * track of whether any null fields are present.
@@ -1880,41 +1853,28 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b)
 	for (i = 1; i <= keysz; i++)
 	{
 		ScanKey		entry = &scankey[i - 1];
-		Datum		attrDatum1,
-					attrDatum2;
-		bool		isFirstNull,
-					isSecondNull;
+		Datum		datum1,
+					datum2;
+		bool		isnull1,
+					isnull2;
 		int32		compare;
 
-		attrDatum1 = index_getattr(tuple1, i, tupDes, &isFirstNull);
-		attrDatum2 = index_getattr(tuple2, i, tupDes, &isSecondNull);
+		datum1 = index_getattr(tuple1, i, tupDes, &isnull1);
+		datum2 = index_getattr(tuple2, i, tupDes, &isnull2);
 
 		/* see comments about NULLs handling in btbuild */
-		if (isFirstNull)		/* attr in tuple1 is NULL */
-		{
-			if (isSecondNull)	/* attr in tuple2 is NULL too */
-			{
-				compare = 0;
-				equal_hasnull = true;
-			}
-			else
-				compare = 1;	/* NULL ">" not-NULL */
-		}
-		else if (isSecondNull)	/* attr in tuple1 is NOT_NULL and */
-		{						/* attr in tuple2 is NULL */
-			compare = -1;		/* not-NULL "<" NULL */
-		}
-		else
-		{
-			/* the comparison function is always of CMP type */
-			compare = DatumGetInt32(FunctionCall2(&entry->sk_func,
-												  attrDatum1,
-												  attrDatum2));
-		}
+
+		/* the comparison function is always of CMP type */
+		compare = ApplySortFunction(&entry->sk_func, SORTFUNC_CMP,
+									datum1, isnull1, datum2, isnull2);
 
 		if (compare != 0)
 			return (int) compare;		/* done when we find unequal
 										 * attributes */
+
+		/* they are equal, so we only need to examine one null flag */
+		if (isnull1)
+			equal_hasnull = true;
 	}
 
 	/*
@@ -2002,35 +1962,9 @@ comparetup_datum(Tuplesortstate *state, const void *a, const void *b)
 	DatumTuple *ltup = (DatumTuple *) a;
 	DatumTuple *rtup = (DatumTuple *) b;
 
-	if (ltup->isNull)
-	{
-		if (!rtup->isNull)
-			return 1;			/* NULL sorts after non-NULL */
-		return 0;
-	}
-	else if (rtup->isNull)
-		return -1;
-	else if (state->sortFnKind == SORTFUNC_LT)
-	{
-		if (DatumGetBool(FunctionCall2(&state->sortOpFn,
-									   ltup->val, rtup->val)))
-			return -1;			/* a < b */
-		if (DatumGetBool(FunctionCall2(&state->sortOpFn,
-									   rtup->val, ltup->val)))
-			return 1;			/* a > b */
-		return 0;
-	}
-	else
-	{
-		/* sort function is CMP or REVCMP */
-		int32	compare;
-
-		compare = DatumGetInt32(FunctionCall2(&state->sortOpFn,
-											  ltup->val, rtup->val));
-		if (state->sortFnKind == SORTFUNC_REVCMP)
-			compare = -compare;
-		return compare;
-	}
+	return ApplySortFunction(&state->sortOpFn, state->sortFnKind,
+							 ltup->val, ltup->isNull,
+							 rtup->val, rtup->isNull);
 }
 
 static void *
@@ -2123,6 +2057,7 @@ SelectSortFunction(Oid sortOperator,
 	HeapScanDesc scan;
 	ScanKeyData skey[3];
 	HeapTuple	tuple;
+	Form_pg_operator optup;
 	Oid			opclass = InvalidOid;
 
 	/*
@@ -2207,11 +2142,97 @@ SelectSortFunction(Oid sortOperator,
 			return;
 	}
 
-	/* Can't find a comparator, so use the operator as-is */
-
-	*kind = SORTFUNC_LT;
-	*sortFunction = get_opcode(sortOperator);
-	if (!RegProcedureIsValid(*sortFunction))
-		elog(ERROR, "SelectSortFunction: operator %u has no implementation",
+	/*
+	 * Can't find a comparator, so use the operator as-is.  Decide whether
+	 * it is forward or reverse sort by looking at its name (grotty, but
+	 * this only matters for deciding which end NULLs should get sorted to).
+	 */
+	tuple = SearchSysCache(OPEROID,
+						   ObjectIdGetDatum(sortOperator),
+						   0, 0, 0);
+	if (!HeapTupleIsValid(tuple))
+		elog(ERROR, "SelectSortFunction: cache lookup failed for operator %u",
 			 sortOperator);
+	optup = (Form_pg_operator) GETSTRUCT(tuple);
+	if (strcmp(NameStr(optup->oprname), ">") == 0)
+		*kind = SORTFUNC_REVLT;
+	else
+		*kind = SORTFUNC_LT;
+	*sortFunction =  optup->oprcode;
+	ReleaseSysCache(tuple);
+
+	Assert(RegProcedureIsValid(*sortFunction));
+}
+
+/*
+ * Apply a sort function (by now converted to fmgr lookup form)
+ * and return a 3-way comparison result.  This takes care of handling
+ * NULLs and sort ordering direction properly.
+ */
+int32
+ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
+				  Datum datum1, bool isNull1,
+				  Datum datum2, bool isNull2)
+{
+	switch (kind)
+	{
+		case SORTFUNC_LT:
+			if (isNull1)
+			{
+				if (isNull2)
+					return 0;
+				return 1;		/* NULL sorts after non-NULL */
+			}
+			if (isNull2)
+				return -1;
+			if (DatumGetBool(FunctionCall2(sortFunction, datum1, datum2)))
+				return -1;		/* a < b */
+			if (DatumGetBool(FunctionCall2(sortFunction, datum2, datum1)))
+				return 1;		/* a > b */
+			return 0;
+
+		case SORTFUNC_REVLT:
+			/* We reverse the ordering of NULLs, but not the operator */
+			if (isNull1)
+			{
+				if (isNull2)
+					return 0;
+				return -1;		/* NULL sorts before non-NULL */
+			}
+			if (isNull2)
+				return 1;
+			if (DatumGetBool(FunctionCall2(sortFunction, datum1, datum2)))
+				return -1;		/* a < b */
+			if (DatumGetBool(FunctionCall2(sortFunction, datum2, datum1)))
+				return 1;		/* a > b */
+			return 0;
+
+		case SORTFUNC_CMP:
+			if (isNull1)
+			{
+				if (isNull2)
+					return 0;
+				return 1;		/* NULL sorts after non-NULL */
+			}
+			if (isNull2)
+				return -1;
+			return DatumGetInt32(FunctionCall2(sortFunction,
+											   datum1, datum2));
+
+		case SORTFUNC_REVCMP:
+			if (isNull1)
+			{
+				if (isNull2)
+					return 0;
+				return -1;		/* NULL sorts before non-NULL */
+			}
+			if (isNull2)
+				return 1;
+			return - DatumGetInt32(FunctionCall2(sortFunction,
+												 datum1, datum2));
+
+		default:
+			elog(ERROR, "Invalid SortFunctionKind %d", (int) kind);
+			return 0;			/* can't get here, but keep compiler quiet */
+	}
 }
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index 001761796e2..a2c4f879624 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -13,7 +13,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: tuplesort.h,v 1.7 2001/05/07 00:43:26 tgl Exp $
+ * $Id: tuplesort.h,v 1.8 2001/06/02 19:01:52 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -22,6 +22,7 @@
 
 #include "access/htup.h"
 #include "access/itup.h"
+#include "fmgr.h"
 
 /* Tuplesortstate is an opaque type whose details are not known outside tuplesort.c. */
 
@@ -83,6 +84,7 @@ extern void tuplesort_restorepos(Tuplesortstate *state);
 typedef enum
 {
 	SORTFUNC_LT,				/* raw "<" operator */
+	SORTFUNC_REVLT,				/* raw "<" operator, but reverse NULLs */
 	SORTFUNC_CMP,				/* -1 / 0 / 1 three-way comparator */
 	SORTFUNC_REVCMP				/* 1 / 0 / -1 (reversed) 3-way comparator */
 } SortFunctionKind;
@@ -91,4 +93,13 @@ extern void SelectSortFunction(Oid sortOperator,
 							   RegProcedure *sortFunction,
 							   SortFunctionKind *kind);
 
+/*
+ * Apply a sort function (by now converted to fmgr lookup form)
+ * and return a 3-way comparison result.  This takes care of handling
+ * NULLs and sort ordering direction properly.
+ */
+extern int32 ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
+							   Datum datum1, bool isNull1,
+							   Datum datum2, bool isNull2);
+
 #endif	 /* TUPLESORT_H */
-- 
GitLab