From 0392ea5097e5d7b8068b64722a34b7b75791a8f5 Mon Sep 17 00:00:00 2001
From: Teodor Sigaev <teodor@sigaev.ru>
Date: Fri, 7 Sep 2007 17:04:26 +0000
Subject: [PATCH] Improve page split in rtree emulation. Now if splitted result
 has big misalignement, then it tries to split page basing on distribution of
 boxe's centers.

Per report from  Dolafi, Tom <dolafit@janelia.hhmi.org>

 Backpatch is needed, change doesn't affect on-disk storage.
---
 src/backend/access/gist/gistproc.c | 79 +++++++++++++-----------------
 1 file changed, 34 insertions(+), 45 deletions(-)

diff --git a/src/backend/access/gist/gistproc.c b/src/backend/access/gist/gistproc.c
index 19868a127e7..590be9133fb 100644
--- a/src/backend/access/gist/gistproc.c
+++ b/src/backend/access/gist/gistproc.c
@@ -10,7 +10,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	$PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.10 2007/01/05 22:19:22 momjian Exp $
+ *	$PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.11 2007/09/07 17:04:26 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -21,13 +21,6 @@
 #include "utils/geo_decls.h"
 
 
-typedef struct
-{
-	BOX		   *key;
-	int			pos;
-} KBsort;
-
-static int	compare_KB(const void *a, const void *b);
 static bool gist_box_leaf_consistent(BOX *key, BOX *query,
 						 StrategyNumber strategy);
 static double size_box(Datum dbox);
@@ -194,22 +187,6 @@ gist_box_penalty(PG_FUNCTION_ARGS)
 	PG_RETURN_POINTER(result);
 }
 
-/*
- * qsort comparator for box areas
- */
-static int
-compare_KB(const void *a, const void *b)
-{
-	BOX		   *abox = ((const KBsort *) a)->key;
-	BOX		   *bbox = ((const KBsort *) b)->key;
-	double		sa = (abox->high.x - abox->low.x) * (abox->high.y - abox->low.y);
-	double		sb = (bbox->high.x - bbox->low.x) * (bbox->high.y - bbox->low.y);
-
-	if (sa == sb)
-		return 0;
-	return (sa > sb) ? 1 : -1;
-}
-
 static void
 chooseLR(GIST_SPLITVEC *v,
 		 OffsetNumber *list1, int nlist1, BOX *union1,
@@ -417,44 +394,56 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
 			ADDLIST(listT, unionT, posT, i);
 	}
 
-	/* bad disposition, sort by ascending and resplit */
-	if ((posR == 0 || posL == 0) && (posT == 0 || posB == 0))
+#define LIMIT_RATIO	0.1
+#define _IS_BADRATIO(x,y)	( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO )
+#define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) )
+	/* bad disposition, try to split by centers of boxes  */
+	if ( IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB) )
 	{
-		KBsort	   *arr = (KBsort *) palloc(sizeof(KBsort) * maxoff);
+		double	avgCenterX=0.0, avgCenterY=0.0;
+		double	CenterX, CenterY;
 
-		posL = posR = posB = posT = 0;
 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
 		{
-			arr[i - 1].key = DatumGetBoxP(entryvec->vector[i].key);
-			arr[i - 1].pos = i;
+			cur = DatumGetBoxP(entryvec->vector[i].key);
+			avgCenterX +=  ((double)cur->high.x + (double)cur->low.x)/2.0;
+			avgCenterY +=  ((double)cur->high.y + (double)cur->low.y)/2.0;
 		}
-		qsort(arr, maxoff, sizeof(KBsort), compare_KB);
+
+		avgCenterX /= maxoff;
+		avgCenterY /= maxoff;
+
+		posL = posR = posB = posT = 0;
 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
 		{
-			cur = arr[i - 1].key;
-			if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x)
-				ADDLIST(listL, unionL, posL, arr[i - 1].pos);
-			else if (cur->low.x - pageunion.low.x == pageunion.high.x - cur->high.x)
+			cur = DatumGetBoxP(entryvec->vector[i].key);
+			
+			CenterX =  ((double)cur->high.x + (double)cur->low.x)/2.0;
+			CenterY =  ((double)cur->high.y + (double)cur->low.y)/2.0;
+
+			if (CenterX < avgCenterX) 
+				ADDLIST(listL, unionL, posL, i);
+			else if (CenterX == avgCenterX)
 			{
 				if (posL > posR)
-					ADDLIST(listR, unionR, posR, arr[i - 1].pos);
+					ADDLIST(listR, unionR, posR, i);
 				else
-					ADDLIST(listL, unionL, posL, arr[i - 1].pos);
+					ADDLIST(listL, unionL, posL, i);
 			}
 			else
-				ADDLIST(listR, unionR, posR, arr[i - 1].pos);
+				ADDLIST(listR, unionR, posR, i);
 
-			if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y)
-				ADDLIST(listB, unionB, posB, arr[i - 1].pos);
-			else if (cur->low.y - pageunion.low.y == pageunion.high.y - cur->high.y)
+			if (CenterY < avgCenterY)
+				ADDLIST(listB, unionB, posB, i);
+			else if (CenterY == avgCenterY)
 			{
 				if (posB > posT)
-					ADDLIST(listT, unionT, posT, arr[i - 1].pos);
+					ADDLIST(listT, unionT, posT, i);
 				else
-					ADDLIST(listB, unionB, posB, arr[i - 1].pos);
+					ADDLIST(listB, unionB, posB, i);
 			}
-			else
-				ADDLIST(listT, unionT, posT, arr[i - 1].pos);
+			else 
+				ADDLIST(listT, unionT, posT, i);
 		}
 	}
 
-- 
GitLab