From 9aefd56669131bfe8dd234544fb87e848a781c97 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Thu, 28 Dec 2006 23:16:39 +0000
Subject: [PATCH] Fix up btree's initial scankey processing to be able to
 detect redundant or contradictory keys even in cross-data-type scenarios. 
 This is another benefit of the opfamily rewrite: we can find the needed
 comparison operators now.

---
 src/backend/access/nbtree/README      |  11 +-
 src/backend/access/nbtree/nbtsearch.c |  24 ++-
 src/backend/access/nbtree/nbtutils.c  | 272 ++++++++++++++++++--------
 3 files changed, 208 insertions(+), 99 deletions(-)

diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README
index ccc03e7116f..53fda66572f 100644
--- a/src/backend/access/nbtree/README
+++ b/src/backend/access/nbtree/README
@@ -1,4 +1,4 @@
-$PostgreSQL: pgsql/src/backend/access/nbtree/README,v 1.14 2006/11/01 19:43:17 tgl Exp $
+$PostgreSQL: pgsql/src/backend/access/nbtree/README,v 1.15 2006/12/28 23:16:39 tgl Exp $
 
 This directory contains a correct implementation of Lehman and Yao's
 high-concurrency B-tree management algorithm (P. Lehman and S. Yao,
@@ -480,7 +480,8 @@ than key.
 Notes to operator class implementors
 ------------------------------------
 
-With this implementation, we require each supported datatype to supply
-us with a comparison procedure via pg_amproc.  This procedure must take
-two nonnull values A and B and return an int32 < 0, 0, or > 0 if A < B,
-A = B, or A > B, respectively.  See nbtcompare.c for examples.
+With this implementation, we require each supported combination of
+datatypes to supply us with a comparison procedure via pg_amproc.
+This procedure must take two nonnull values A and B and return an int32 < 0,
+0, or > 0 if A < B, A = B, or A > B, respectively.  See nbtcompare.c for
+examples.
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 284b92e0d28..27cff387e65 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.108 2006/12/23 00:43:09 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.109 2006/12/28 23:16:39 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -476,16 +476,16 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 	 * attributes to its right, because it would break our simplistic notion
 	 * of what initial positioning strategy to use.
 	 *
-	 * When the scan keys include non-default operators, _bt_preprocess_keys
+	 * When the scan keys include cross-type operators, _bt_preprocess_keys
 	 * may not be able to eliminate redundant keys; in such cases we will
 	 * arbitrarily pick a usable one for each attribute.  This is correct
 	 * but possibly not optimal behavior.  (For example, with keys like
 	 * "x >= 4 AND x >= 5" we would elect to scan starting at x=4 when
-	 * x=5 would be more efficient.)  Since the situation only arises in
-	 * hokily-worded queries, live with it.
+	 * x=5 would be more efficient.)  Since the situation only arises given
+	 * a poorly-worded query plus an incomplete opfamily, live with it.
 	 *
 	 * When both equality and inequality keys appear for a single attribute
-	 * (again, only possible when non-default operators appear), we *must*
+	 * (again, only possible when cross-type operators appear), we *must*
 	 * select one of the equality keys for the starting point, because
 	 * _bt_checkkeys() will stop the scan as soon as an equality qual fails.
 	 * For example, if we have keys like "x >= 4 AND x = 10" and we elect to
@@ -658,11 +658,15 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 			 * to an insertion scan key by replacing the sk_func with the
 			 * appropriate btree comparison function.
 			 *
-			 * If scankey operator is of the default type for the index, we
-			 * can use the cached comparison function; otherwise gotta look it
-			 * up in the catalogs.  Also, we support the convention that
-			 * sk_subtype == 0 means the default type; this is a hack to
-			 * simplify life for ScanKeyInit().
+			 * If scankey operator is not a cross-type comparison, we can use
+			 * the cached comparison function; otherwise gotta look it up in
+			 * the catalogs.  (That can't lead to infinite recursion, since no
+			 * indexscan initiated by syscache lookup will use cross-data-type
+			 * operators.)
+			 *
+			 * We support the convention that sk_subtype == InvalidOid means
+			 * the opclass input type; this is a hack to simplify life for
+			 * ScanKeyInit().
 			 */
 			if (cur->sk_subtype == rel->rd_opcintype[i] ||
 				cur->sk_subtype == InvalidOid)
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index a562ee6cbd2..c138fdad3a2 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtutils.c,v 1.79 2006/10/04 00:29:49 momjian Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtutils.c,v 1.80 2006/12/28 23:16:39 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -24,8 +24,12 @@
 #include "miscadmin.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
+#include "utils/lsyscache.h"
 
 
+static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
+						 ScanKey leftarg, ScanKey rightarg,
+						 bool *result);
 static void _bt_mark_scankey_required(ScanKey skey);
 static bool _bt_check_rowcompare(ScanKey skey,
 					 IndexTuple tuple, TupleDesc tupdesc,
@@ -145,7 +149,7 @@ _bt_freestack(BTStack stack)
 }
 
 
-/*----------
+/*
  *	_bt_preprocess_keys() -- Preprocess scan keys
  *
  * The caller-supplied search-type keys (in scan->keyData[]) are copied to
@@ -155,9 +159,9 @@ _bt_freestack(BTStack stack)
  *
  * The primary purpose of this routine is to discover how many scan keys
  * must be satisfied to continue the scan.	It also attempts to eliminate
- * redundant keys and detect contradictory keys.  At present, redundant and
- * contradictory keys can only be detected for same-data-type comparisons,
- * but that's the usual case so it seems worth doing.
+ * redundant keys and detect contradictory keys.  (If the index opfamily
+ * provides incomplete sets of cross-type operators, we may fail to detect
+ * redundant or contradictory keys, but we can survive that.)
  *
  * The output keys must be sorted by index attribute.  Presently we expect
  * (but verify) that the input keys are already so sorted --- this is done
@@ -184,25 +188,23 @@ _bt_freestack(BTStack stack)
  * If possible, redundant keys are eliminated: we keep only the tightest
  * >/>= bound and the tightest </<= bound, and if there's an = key then
  * that's the only one returned.  (So, we return either a single = key,
- * or one or two boundary-condition keys for each attr.)  However, we can
- * only detect redundant keys when the right-hand datatypes are all equal
- * to the index datatype, because we do not know suitable operators for
- * comparing right-hand values of two different datatypes.	(In theory
- * we could handle comparison of a RHS of the index datatype with a RHS of
- * another type, but that seems too much pain for too little gain.)  So,
- * keys whose operator has a nondefault subtype (ie, its RHS is not of the
- * index datatype) are ignored here, except for noting whether they include
- * an "=" condition or not.  The logic about required keys still works if
- * we don't eliminate redundant keys.
+ * or one or two boundary-condition keys for each attr.)  However, if we
+ * cannot compare two keys for lack of a suitable cross-type operator,
+ * we cannot eliminate either.  If there are two such keys of the same
+ * operator strategy, the second one is just pushed into the output array
+ * without further processing here.  We may also emit both >/>= or both
+ * </<= keys if we can't compare them.  The logic about required keys still
+ * works if we don't eliminate redundant keys.
  *
  * As a byproduct of this work, we can detect contradictory quals such
- * as "x = 1 AND x > 2".  If we see that, we return so->quals_ok = FALSE,
+ * as "x = 1 AND x > 2".  If we see that, we return so->qual_ok = FALSE,
  * indicating the scan need not be run at all since no tuples can match.
- * Again though, only keys with RHS datatype equal to the index datatype
- * can be checked for contradictions.
+ * (In this case we do not bother completing the output key array!)
+ * Again, missing cross-type operators might cause us to fail to prove the
+ * quals contradictory when they really are, but the scan will work correctly.
  *
- * Row comparison keys are treated the same as comparisons to nondefault
- * datatypes: we just transfer them into the preprocessed array without any
+ * Row comparison keys are currently also treated without any smarts:
+ * we just transfer them into the preprocessed array without any
  * editorialization.  We can treat them the same as an ordinary inequality
  * comparison on the row's first index column, for the purposes of the logic
  * about required keys.
@@ -211,7 +213,6 @@ _bt_freestack(BTStack stack)
  * storage is that we are modifying the array based on comparisons of the
  * key argument values, which could change on a rescan.  Therefore we can't
  * overwrite the caller's data structure.
- *----------
  */
 void
 _bt_preprocess_keys(IndexScanDesc scan)
@@ -224,8 +225,7 @@ _bt_preprocess_keys(IndexScanDesc scan)
 	ScanKey		outkeys;
 	ScanKey		cur;
 	ScanKey		xform[BTMaxStrategyNumber];
-	bool		hasOtherTypeEqual;
-	Datum		test;
+	bool		test_result;
 	int			i,
 				j;
 	AttrNumber	attno;
@@ -271,15 +271,11 @@ _bt_preprocess_keys(IndexScanDesc scan)
 	/*
 	 * Initialize for processing of keys for attr 1.
 	 *
-	 * xform[i] points to the currently best scan key of strategy type i+1, if
-	 * any is found with a default operator subtype; it is NULL if we haven't
-	 * yet found such a key for this attr.	Scan keys of nondefault subtypes
-	 * are transferred to the output with no processing except for noting if
-	 * they are of "=" type.
+	 * xform[i] points to the currently best scan key of strategy type i+1;
+	 * it is NULL if we haven't yet found such a key for this attr.
 	 */
 	attno = 1;
 	memset(xform, 0, sizeof(xform));
-	hasOtherTypeEqual = false;
 
 	/*
 	 * Loop iterates from 0 to numberOfKeys inclusive; we use the last pass to
@@ -317,9 +313,9 @@ _bt_preprocess_keys(IndexScanDesc scan)
 				elog(ERROR, "btree index keys must be ordered by attribute");
 
 			/*
-			 * If = has been specified, no other key will be used. In case of
-			 * key > 2 && key == 1 and so on we have to set qual_ok to false
-			 * before discarding the other keys.
+			 * If = has been specified, all other keys can be eliminated as
+			 * redundant.  In case of key > 2 && key == 1 we can set qual_ok
+			 * to false and abandon further processing.
 			 */
 			if (xform[BTEqualStrategyNumber - 1])
 			{
@@ -331,59 +327,56 @@ _bt_preprocess_keys(IndexScanDesc scan)
 
 					if (!chk || j == (BTEqualStrategyNumber - 1))
 						continue;
-					test = FunctionCall2(&chk->sk_func,
-										 eq->sk_argument,
-										 chk->sk_argument);
-					if (!DatumGetBool(test))
+					if (_bt_compare_scankey_args(scan, chk, eq, chk,
+												 &test_result))
 					{
-						so->qual_ok = false;
-						break;
+						if (!test_result)
+						{
+							/* keys proven mutually contradictory */
+							so->qual_ok = false;
+							return;
+						}
+						/* else discard the redundant non-equality key */
+						xform[j] = NULL;
 					}
+					/* else, cannot determine redundancy, keep both keys */
 				}
-				xform[BTLessStrategyNumber - 1] = NULL;
-				xform[BTLessEqualStrategyNumber - 1] = NULL;
-				xform[BTGreaterEqualStrategyNumber - 1] = NULL;
-				xform[BTGreaterStrategyNumber - 1] = NULL;
 				/* track number of attrs for which we have "=" keys */
 				numberOfEqualCols++;
 			}
-			else
-			{
-				/* track number of attrs for which we have "=" keys */
-				if (hasOtherTypeEqual)
-					numberOfEqualCols++;
-			}
 
-			/* keep only one of <, <= */
+			/* try to keep only one of <, <= */
 			if (xform[BTLessStrategyNumber - 1]
 				&& xform[BTLessEqualStrategyNumber - 1])
 			{
 				ScanKey		lt = xform[BTLessStrategyNumber - 1];
 				ScanKey		le = xform[BTLessEqualStrategyNumber - 1];
 
-				test = FunctionCall2(&le->sk_func,
-									 lt->sk_argument,
-									 le->sk_argument);
-				if (DatumGetBool(test))
-					xform[BTLessEqualStrategyNumber - 1] = NULL;
-				else
-					xform[BTLessStrategyNumber - 1] = NULL;
+				if (_bt_compare_scankey_args(scan, le, lt, le,
+											 &test_result))
+				{
+					if (test_result)
+						xform[BTLessEqualStrategyNumber - 1] = NULL;
+					else
+						xform[BTLessStrategyNumber - 1] = NULL;
+				}
 			}
 
-			/* keep only one of >, >= */
+			/* try to keep only one of >, >= */
 			if (xform[BTGreaterStrategyNumber - 1]
 				&& xform[BTGreaterEqualStrategyNumber - 1])
 			{
 				ScanKey		gt = xform[BTGreaterStrategyNumber - 1];
 				ScanKey		ge = xform[BTGreaterEqualStrategyNumber - 1];
 
-				test = FunctionCall2(&ge->sk_func,
-									 gt->sk_argument,
-									 ge->sk_argument);
-				if (DatumGetBool(test))
-					xform[BTGreaterEqualStrategyNumber - 1] = NULL;
-				else
-					xform[BTGreaterStrategyNumber - 1] = NULL;
+				if (_bt_compare_scankey_args(scan, ge, gt, ge,
+											 &test_result))
+				{
+					if (test_result)
+						xform[BTGreaterEqualStrategyNumber - 1] = NULL;
+					else
+						xform[BTGreaterStrategyNumber - 1] = NULL;
+				}
 			}
 
 			/*
@@ -412,51 +405,162 @@ _bt_preprocess_keys(IndexScanDesc scan)
 			/* Re-initialize for new attno */
 			attno = cur->sk_attno;
 			memset(xform, 0, sizeof(xform));
-			hasOtherTypeEqual = false;
 		}
 
 		/* check strategy this key's operator corresponds to */
 		j = cur->sk_strategy - 1;
 
-		/* if row comparison or wrong RHS data type, punt */
-		if ((cur->sk_flags & SK_ROW_HEADER) || cur->sk_subtype != InvalidOid)
+		/* if row comparison, push it directly to the output array */
+		if (cur->sk_flags & SK_ROW_HEADER)
 		{
 			ScanKey		outkey = &outkeys[new_numberOfKeys++];
 
 			memcpy(outkey, cur, sizeof(ScanKeyData));
 			if (numberOfEqualCols == attno - 1)
 				_bt_mark_scankey_required(outkey);
-			if (j == (BTEqualStrategyNumber - 1))
-				hasOtherTypeEqual = true;
+			/*
+			 * We don't support RowCompare using equality; such a qual would
+			 * mess up the numberOfEqualCols tracking.
+			 */
+			Assert(j != (BTEqualStrategyNumber - 1));
 			continue;
 		}
 
 		/* have we seen one of these before? */
-		if (xform[j])
+		if (xform[j] == NULL)
 		{
-			/* yup, keep the more restrictive key */
-			test = FunctionCall2(&cur->sk_func,
-								 cur->sk_argument,
-								 xform[j]->sk_argument);
-			if (DatumGetBool(test))
-				xform[j] = cur;
-			else if (j == (BTEqualStrategyNumber - 1))
-			{
-				/* key == a && key == b, but a != b */
-				so->qual_ok = false;
-				return;
-			}
+			/* nope, so remember this scankey */
+			xform[j] = cur;
 		}
 		else
 		{
-			/* nope, so remember this scankey */
-			xform[j] = cur;
+			/* yup, keep only the more restrictive key */
+			if (_bt_compare_scankey_args(scan, cur, cur, xform[j],
+										 &test_result))
+			{
+				if (test_result)
+					xform[j] = cur;
+				else if (j == (BTEqualStrategyNumber - 1))
+				{
+					/* key == a && key == b, but a != b */
+					so->qual_ok = false;
+					return;
+				}
+				/* else old key is more restrictive, keep it */
+			}
+			else
+			{
+				/*
+				 * We can't determine which key is more restrictive.  Keep
+				 * the previous one in xform[j] and push this one directly
+				 * to the output array.
+				 */
+				ScanKey		outkey = &outkeys[new_numberOfKeys++];
+
+				memcpy(outkey, cur, sizeof(ScanKeyData));
+				if (numberOfEqualCols == attno - 1)
+					_bt_mark_scankey_required(outkey);
+			}
 		}
 	}
 
 	so->numberOfKeys = new_numberOfKeys;
 }
 
+/*
+ * Compare two scankey values using a specified operator.  Both values
+ * must be already known non-NULL.
+ *
+ * The test we want to perform is logically "leftarg op rightarg", where
+ * leftarg and rightarg are the sk_argument values in those ScanKeys, and
+ * the comparison operator is the one in the op ScanKey.  However, in
+ * cross-data-type situations we may need to look up the correct operator in
+ * the index's opfamily: it is the one having amopstrategy = op->sk_strategy
+ * and amoplefttype/amoprighttype equal to the two argument datatypes.
+ *
+ * If the opfamily doesn't supply a complete set of cross-type operators we
+ * may not be able to make the comparison.  If we can make the comparison
+ * we store the operator result in *result and return TRUE.  We return FALSE
+ * if the comparison could not be made.
+ *
+ * Note: op always points at the same ScanKey as either leftarg or rightarg.
+ * Since we don't scribble on the scankeys, this aliasing should cause no
+ * trouble.
+ */
+static bool
+_bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
+						 ScanKey leftarg, ScanKey rightarg,
+						 bool *result)
+{
+	Relation	rel = scan->indexRelation;
+	Oid			lefttype,
+				righttype,
+				optype,
+				opcintype,
+				cmp_op;
+
+	/*
+	 * The opfamily we need to worry about is identified by the index column.
+	 */
+	Assert(leftarg->sk_attno == rightarg->sk_attno);
+
+	opcintype = rel->rd_opcintype[leftarg->sk_attno - 1];
+
+	/*
+	 * Determine the actual datatypes of the ScanKey arguments.  We have to
+	 * support the convention that sk_subtype == InvalidOid means the opclass
+	 * input type; this is a hack to simplify life for ScanKeyInit().
+	 */
+	lefttype = leftarg->sk_subtype;
+	if (lefttype == InvalidOid)
+		lefttype = opcintype;
+	righttype = rightarg->sk_subtype;
+	if (righttype == InvalidOid)
+		righttype = opcintype;
+	optype = op->sk_subtype;
+	if (optype == InvalidOid)
+		optype = opcintype;
+
+	/*
+	 * If leftarg and rightarg match the types expected for the "op" scankey,
+	 * we can use its already-looked-up comparison function.
+	 */
+	if (lefttype == opcintype && righttype == optype)
+	{
+		*result = DatumGetBool(FunctionCall2(&op->sk_func,
+											 leftarg->sk_argument,
+											 rightarg->sk_argument));
+		return true;
+	}
+
+	/*
+	 * Otherwise, we need to go to the syscache to find the appropriate
+	 * operator.  (This cannot result in infinite recursion, since no
+	 * indexscan initiated by syscache lookup will use cross-data-type
+	 * operators.)
+	 */
+	cmp_op = get_opfamily_member(rel->rd_opfamily[leftarg->sk_attno - 1],
+								 lefttype,
+								 righttype,
+								 op->sk_strategy);
+	if (OidIsValid(cmp_op))
+	{
+		RegProcedure cmp_proc = get_opcode(cmp_op);
+
+		if (RegProcedureIsValid(cmp_proc))
+		{
+			*result = DatumGetBool(OidFunctionCall2(cmp_proc,
+													leftarg->sk_argument,
+													rightarg->sk_argument));
+			return true;
+		}
+	}
+
+	/* Can't make the comparison */
+	*result = false;			/* suppress compiler warnings */
+	return false;
+}
+
 /*
  * Mark a scankey as "required to continue the scan".
  *
-- 
GitLab