From 23f10b6473ed58c3699b81ca4677f4ff51878ec0 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 8 Mar 2013 15:03:19 +0200
Subject: [PATCH] SP-GiST support of the range adjacent operator -|-

Alexander Korotkov, reviewed by Jeff Davis.
---
 src/backend/utils/adt/rangetypes.c        | 119 ++++++++++---------
 src/backend/utils/adt/rangetypes_spgist.c | 137 ++++++++++++++++++++++
 src/include/catalog/pg_amop.h             |   1 +
 src/include/utils/rangetypes.h            |   2 +
 src/test/regress/expected/opr_sanity.out  |   3 +-
 5 files changed, 204 insertions(+), 58 deletions(-)

diff --git a/src/backend/utils/adt/rangetypes.c b/src/backend/utils/adt/rangetypes.c
index c440847916b..84a4aca16c0 100644
--- a/src/backend/utils/adt/rangetypes.c
+++ b/src/backend/utils/adt/rangetypes.c
@@ -709,6 +709,64 @@ range_after(PG_FUNCTION_ARGS)
 	PG_RETURN_BOOL(range_after_internal(typcache, r1, r2));
 }
 
+/*
+ * Check if two bounds A and B are "adjacent", where A is an upper bound and B
+ * is a lower bound. For the bounds to be adjacent, each subtype value must
+ * satisfy strictly one of the bounds: there are no values which satisfy both
+ * bounds (i.e. less than A and greater than B); and there are no values which
+ * satisfy neither bound (i.e. greater than A and less than B).
+ *
+ * For discrete ranges, we rely on the canonicalization function to see if A..B
+ * normalizes to empty. (If there is no canonicalization function, it's
+ * impossible for such a range to normalize to empty, so we needn't bother to
+ * try.)
+ *
+ * If A == B, the ranges are adjacent only if the bounds have different
+ * inclusive flags (i.e., exactly one of the ranges includes the common
+ * boundary point).
+ *
+ * And if A > B then the ranges are not adjacent in this order.
+ */
+bool
+bounds_adjacent(TypeCacheEntry *typcache, RangeBound boundA, RangeBound boundB)
+{
+	int			cmp;
+
+	Assert(!boundA.lower && boundB.lower);
+
+	cmp = range_cmp_bound_values(typcache, &boundA, &boundB);
+	if (cmp < 0)
+	{
+		RangeType *r;
+
+		/*
+		 * Bounds do not overlap; see if there are points in between.
+		 */
+
+		/* in a continuous subtype, there are assumed to be points between */
+		if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
+			return false;
+
+		/*
+		 * The bounds are of a discrete range type; so make a range A..B and
+		 * see if it's empty.
+		 */
+
+		/* flip the inclusion flags */
+		boundA.inclusive = !boundA.inclusive;
+		boundB.inclusive = !boundB.inclusive;
+		/* change upper/lower labels to avoid Assert failures */
+		boundA.lower = true;
+		boundB.lower = false;
+		r = make_range(typcache, &boundA, &boundB, false);
+		return RangeIsEmpty(r);
+	}
+	else if (cmp == 0)
+		return boundA.inclusive != boundB.inclusive;
+	else
+		return false;		/* bounds overlap */
+}
+
 /* adjacent to (but not overlapping)? (internal version) */
 bool
 range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
@@ -719,8 +777,6 @@ range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
 				upper2;
 	bool		empty1,
 				empty2;
-	RangeType  *r3;
-	int			cmp;
 
 	/* Different types should be prevented by ANYRANGE matching rules */
 	if (RangeTypeGetOid(r1) != RangeTypeGetOid(r2))
@@ -734,62 +790,11 @@ range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
 		return false;
 
 	/*
-	 * Given two ranges A..B and C..D, where B < C, the ranges are adjacent if
-	 * and only if the range B..C is empty, where inclusivity of these two
-	 * bounds is inverted compared to the original bounds.	For discrete
-	 * ranges, we have to rely on the canonicalization function to normalize
-	 * B..C to empty if it contains no elements of the subtype.  (If there is
-	 * no canonicalization function, it's impossible for such a range to
-	 * normalize to empty, so we needn't bother to try.)
-	 *
-	 * If B == C, the ranges are adjacent only if these bounds have different
-	 * inclusive flags (i.e., exactly one of the ranges includes the common
-	 * boundary point).
-	 *
-	 * And if B > C then the ranges cannot be adjacent in this order, but we
-	 * must consider the other order (i.e., check D <= A).
+	 * Given two ranges A..B and C..D, the ranges are adjacent if and only if
+	 * B is adjacent to C, or D is adjacent to A.
 	 */
-	cmp = range_cmp_bound_values(typcache, &upper1, &lower2);
-	if (cmp < 0)
-	{
-		/* in a continuous subtype, there are assumed to be points between */
-		if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
-			return (false);
-		/* flip the inclusion flags */
-		upper1.inclusive = !upper1.inclusive;
-		lower2.inclusive = !lower2.inclusive;
-		/* change upper/lower labels to avoid Assert failures */
-		upper1.lower = true;
-		lower2.lower = false;
-		r3 = make_range(typcache, &upper1, &lower2, false);
-		return RangeIsEmpty(r3);
-	}
-	if (cmp == 0)
-	{
-		return (upper1.inclusive != lower2.inclusive);
-	}
-
-	cmp = range_cmp_bound_values(typcache, &upper2, &lower1);
-	if (cmp < 0)
-	{
-		/* in a continuous subtype, there are assumed to be points between */
-		if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
-			return (false);
-		/* flip the inclusion flags */
-		upper2.inclusive = !upper2.inclusive;
-		lower1.inclusive = !lower1.inclusive;
-		/* change upper/lower labels to avoid Assert failures */
-		upper2.lower = true;
-		lower1.lower = false;
-		r3 = make_range(typcache, &upper2, &lower1, false);
-		return RangeIsEmpty(r3);
-	}
-	if (cmp == 0)
-	{
-		return (upper2.inclusive != lower1.inclusive);
-	}
-
-	return false;
+	return (bounds_adjacent(typcache, upper1, lower2) ||
+			bounds_adjacent(typcache, upper2, lower1));
 }
 
 /* adjacent to (but not overlapping)? */
diff --git a/src/backend/utils/adt/rangetypes_spgist.c b/src/backend/utils/adt/rangetypes_spgist.c
index d395ef47180..9a7f20d9f37 100644
--- a/src/backend/utils/adt/rangetypes_spgist.c
+++ b/src/backend/utils/adt/rangetypes_spgist.c
@@ -305,6 +305,16 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
 	int			which;
 	int			i;
 
+	/*
+	 * For adjacent search we need also previous centroid (if any) to improve
+	 * the precision of the consistent check. In this case needPrevious flag is
+	 * set and centroid is passed into reconstructedValues. This is not the
+	 * intended purpose of reconstructedValues (because we already have the
+	 * full value available at the leaf), but it's a convenient place to store
+	 * state while traversing the tree.
+	 */
+	bool		needPrevious = false;
+
 	if (in->allTheSame)
 	{
 		/* Report that all nodes should be visited */
@@ -351,6 +361,7 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
 				case RANGESTRAT_OVERLAPS:
 				case RANGESTRAT_OVERRIGHT:
 				case RANGESTRAT_AFTER:
+				case RANGESTRAT_ADJACENT:
 					/* These strategies return false if any argument is empty */
 					if (empty)
 						which = 0;
@@ -435,6 +446,9 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
 			/* Are the restrictions on range bounds inclusive? */
 			bool		inclusive = true;
 			bool		strictEmpty = true;
+			int			cmp,
+						which1,
+						which2;
 
 			strategy = in->scankeys[i].sk_strategy;
 
@@ -522,6 +536,118 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
 					inclusive = false;
 					break;
 
+				case RANGESTRAT_ADJACENT:
+					if (empty)
+						break;				/* Skip to strictEmpty check. */
+
+					/*
+					 * which1 is bitmask for possibility to be adjacent with
+					 * lower bound of argument. which2 is bitmask for
+					 * possibility to be adjacent with upper bound of argument.
+					 */
+					which1 = which2 = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
+
+					/*
+					 * Previously selected quadrant could exclude possibility
+					 * for lower or upper bounds to be adjacent. Deserialize
+					 * previous centroid range if present for checking this.
+					 */
+					if (in->reconstructedValue != (Datum) 0)
+					{
+						RangeType  *prevCentroid;
+						RangeBound	prevLower,
+									prevUpper;
+						bool		prevEmpty;
+						int			cmp1,
+									cmp2;
+
+						prevCentroid = DatumGetRangeType(in->reconstructedValue);
+						range_deserialize(typcache, prevCentroid,
+										  &prevLower, &prevUpper, &prevEmpty);
+
+						/*
+						 * Check if lower bound of argument is not in a
+						 * quadrant we visited in the previous step.
+						 */
+						cmp1 = range_cmp_bounds(typcache, &lower, &prevUpper);
+						cmp2 = range_cmp_bounds(typcache, &centroidUpper,
+												&prevUpper);
+						if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
+							which1 = 0;
+
+						/*
+						 * Check if upper bound of argument is not in a
+						 * quadrant we visited in the previous step.
+						 */
+						cmp1 = range_cmp_bounds(typcache, &upper, &prevLower);
+						cmp2 = range_cmp_bounds(typcache, &centroidLower,
+												&prevLower);
+						if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
+							which2 = 0;
+					}
+
+					if (which1)
+					{
+						/*
+						 * For a range's upper bound to be adjacent to the
+						 * argument's lower bound, it will be found along the
+						 * line adjacent to (and just below) Y=lower.
+						 * Therefore, if the argument's lower bound is less
+						 * than the centroid's upper bound, the line falls in
+						 * quadrants 2 and 3; if greater, the line falls in
+						 * quadrants 1 and 4.
+						 *
+						 * The above is true even when the argument's lower
+						 * bound is greater and adjacent to the centroid's
+						 * upper bound. If the argument's lower bound is
+						 * greater than the centroid's upper bound, then the
+						 * lowest value that an adjacent range could have is
+						 * that of the centroid's upper bound, which still
+						 * falls in quadrants 1 and 4.
+						 *
+						 * In the edge case, where the argument's lower bound
+						 * is equal to the cetroid's upper bound, there may be
+						 * adjacent ranges in any quadrant.
+						 */
+						cmp = range_cmp_bounds(typcache, &lower,
+											   &centroidUpper);
+						if (cmp < 0)
+							which1 &= (1 << 2) | (1 << 3);
+						else if (cmp > 0)
+							which1 &= (1 << 1) | (1 << 4);
+					}
+
+					if (which2)
+					{
+						/*
+						 * For a range's lower bound to be adjacent to the
+						 * argument's upper bound, it will be found along the
+						 * line adjacent to (and just right of)
+						 * X=upper. Therefore, if the argument's upper bound is
+						 * less than (and not adjacent to) the centroid's upper
+						 * bound, the line falls in quadrants 3 and 4; if
+						 * greater or equal to, the line falls in quadrants 1
+						 * and 2.
+						 *
+						 * The edge case is when the argument's upper bound is
+						 * less than and adjacent to the centroid's lower
+						 * bound. In that case, adjacent ranges may be in any
+						 * quadrant.
+						 */
+						cmp = range_cmp_bounds(typcache, &lower,
+											   &centroidUpper);
+						if (cmp < 0 &&
+							!bounds_adjacent(typcache, upper, centroidLower))
+							which1 &= (1 << 3) | (1 << 4);
+						else if (cmp > 0)
+							which1 &= (1 << 1) | (1 << 2);
+					}
+
+					which &= which1 | which2;
+
+					needPrevious = true;
+					break;
+
 				case RANGESTRAT_CONTAINS:
 					/*
 					 * Non-empty range A contains non-empty range B if lower
@@ -652,11 +778,18 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
 
 	/* We must descend into the quadrant(s) identified by 'which' */
 	out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
+	if (needPrevious)
+		out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
 	out->nNodes = 0;
 	for (i = 1; i <= in->nNodes; i++)
 	{
 		if (which & (1 << i))
+		{
+			/* Save previous prefix if needed */
+			if (needPrevious)
+				out->reconstructedValues[out->nNodes] = in->prefixDatum;
 			out->nodeNumbers[out->nNodes++] = i - 1;
+		}
 	}
 
 	PG_RETURN_VOID();
@@ -713,6 +846,10 @@ spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)
 				res = range_after_internal(typcache, leafRange,
 										   DatumGetRangeType(keyDatum));
 				break;
+			case RANGESTRAT_ADJACENT:
+				res = range_adjacent_internal(typcache, leafRange,
+										   DatumGetRangeType(keyDatum));
+				break;
 			case RANGESTRAT_CONTAINS:
 				res = range_contains_internal(typcache, leafRange,
 											  DatumGetRangeType(keyDatum));
diff --git a/src/include/catalog/pg_amop.h b/src/include/catalog/pg_amop.h
index 5d451e36c11..d2003487472 100644
--- a/src/include/catalog/pg_amop.h
+++ b/src/include/catalog/pg_amop.h
@@ -775,6 +775,7 @@ DATA(insert (	3474   3831 3831 2 s	3895 4000 0 ));
 DATA(insert (	3474   3831 3831 3 s	3888 4000 0 ));
 DATA(insert (	3474   3831 3831 4 s	3896 4000 0 ));
 DATA(insert (	3474   3831 3831 5 s	3894 4000 0 ));
+DATA(insert (	3474   3831 3831 6 s	3897 4000 0 ));
 DATA(insert (	3474   3831 3831 7 s	3890 4000 0 ));
 DATA(insert (	3474   3831 3831 8 s	3892 4000 0 ));
 DATA(insert (	3474   3831 2283 16 s	3889 4000 0 ));
diff --git a/src/include/utils/rangetypes.h b/src/include/utils/rangetypes.h
index 67b1a7ce611..f6b941d3acc 100644
--- a/src/include/utils/rangetypes.h
+++ b/src/include/utils/rangetypes.h
@@ -201,6 +201,8 @@ extern int range_cmp_bounds(TypeCacheEntry *typcache, RangeBound *b1,
 				 RangeBound *b2);
 extern int range_cmp_bound_values(TypeCacheEntry *typcache, RangeBound *b1,
 					   RangeBound *b2);
+extern bool bounds_adjacent(TypeCacheEntry *typcache, RangeBound bound1,
+				RangeBound bound2);
 extern RangeType *make_empty_range(TypeCacheEntry *typcache);
 
 /* GiST support (in rangetypes_gist.c) */
diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out
index 6faaf4272f8..256b7196d0d 100644
--- a/src/test/regress/expected/opr_sanity.out
+++ b/src/test/regress/expected/opr_sanity.out
@@ -1076,6 +1076,7 @@ ORDER BY 1, 2, 3;
        4000 |            4 | ~>=~
        4000 |            5 | >>
        4000 |            5 | ~>~
+       4000 |            6 | -|-
        4000 |            6 | ~=
        4000 |            7 | @>
        4000 |            8 | <@
@@ -1087,7 +1088,7 @@ ORDER BY 1, 2, 3;
        4000 |           15 | >
        4000 |           16 | @>
        4000 |           18 | =
-(61 rows)
+(62 rows)
 
 -- Check that all opclass search operators have selectivity estimators.
 -- This is not absolutely required, but it seems a reasonable thing
-- 
GitLab