From 420cbff8813dd5b1390a1602331b57fbc4fbb899 Mon Sep 17 00:00:00 2001
From: Teodor Sigaev <teodor@sigaev.ru>
Date: Fri, 19 May 2006 16:15:17 +0000
Subject: [PATCH] Simplify gistSplit() and some refactoring related code.

---
 src/backend/access/gist/gist.c     | 145 +++++++---------------
 src/backend/access/gist/gistutil.c | 185 ++++++++++++++++-------------
 src/backend/access/gist/gistxlog.c |  22 +---
 src/include/access/gist_private.h  |  19 ++-
 4 files changed, 168 insertions(+), 203 deletions(-)

diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index d207b7ecfa7..cb10cbc35bd 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.135 2006/05/17 16:34:59 teodor Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.136 2006/05/19 16:15:17 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -936,31 +936,6 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
 		gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
 }
 
-static void
-gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset)
-{
-	int			i;
-
-	for (i = 0; i < len; i++)
-		arr[i] = reasloffset[arr[i]];
-}
-
-static IndexTupleData *
-gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
-	char *ptr, *ret = palloc(BLCKSZ);
-	int i;
-
-	ptr = ret;
-	for (i = 0; i < veclen; i++) {
-		memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
-		ptr += IndexTupleSize(vec[i]);
-	}
-
-	*memlen = ptr - ret;
-	Assert( *memlen < BLCKSZ );
-	return (IndexTupleData*)ret;
-}
-
 /*
  *	gistSplit -- split a page in the tree.
  */
@@ -975,100 +950,70 @@ gistSplit(Relation r,
 			   *rvectup;
 	GIST_SPLITVEC v;
 	GistEntryVector *entryvec;
-	int			i,
-				fakeoffset;
-	OffsetNumber *realoffset;
-	IndexTuple *cleaneditup = itup;
-	int			lencleaneditup = len;
+	int			i;
+	OffsetNumber offInvTuples[ MaxOffsetNumber ];
+	int			 nOffInvTuples = 0;
 	SplitedPageLayout	*res = NULL;
 
 	/* generate the item array */
-	realoffset = palloc((len + 1) * sizeof(OffsetNumber));
 	entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
 	entryvec->n = len + 1;
 
-	fakeoffset = FirstOffsetNumber;
 	for (i = 1; i <= len; i++)
 	{
 		Datum		datum;
 		bool		IsNull;
 
 		if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
-		{
-			entryvec->n--;
 			/* remember position of invalid tuple */
-			realoffset[entryvec->n] = i;
-			continue;
-		}
+			offInvTuples[ nOffInvTuples++ ] = i;			
+
+		if ( nOffInvTuples > 0 )
+			/* we can safely do not decompress other keys, because 
+			   we will do splecial processing, but
+			   it's needed to find another invalid tuples */
+			continue;	
 
 		datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
-		gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
+		gistdentryinit(giststate, 0, &(entryvec->vector[i]),
 					   datum, r, page, i,
 					   ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
 					   FALSE, IsNull);
-		realoffset[fakeoffset] = i;
-		fakeoffset++;
 	}
 
 	/*
-	 * if it was invalid tuple then we need special processing. If it's
-	 * possible, we move all invalid tuples on right page. We should remember,
-	 * that union with invalid tuples is a invalid tuple.
+	 * if it was invalid tuple then we need special processing.
+	 * We move all invalid tuples on right page. 
+	 *
+	 * if there is no place on left page, gistSplit will be called one more 
+	 * time for left page.
+	 *
+	 * Normally, we never exec this code, but after crash replay it's possible
+	 * to get 'invalid' tuples (probability is low enough)
 	 */
-	if (entryvec->n != len + 1)
+	if (nOffInvTuples > 0)
 	{
-		lencleaneditup = entryvec->n - 1;
-		cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple));
-		for (i = 1; i < entryvec->n; i++)
-			cleaneditup[i - 1] = itup[realoffset[i] - 1];
-
-		if (!gistfitpage(cleaneditup, lencleaneditup))
-		{
-			/* no space on left to put all good tuples, so picksplit */
-			gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
-			v.spl_leftvalid = true;
-			v.spl_rightvalid = false;
-			gistToRealOffset(v.spl_left, v.spl_nleft, realoffset);
-			gistToRealOffset(v.spl_right, v.spl_nright, realoffset);
-		}
-		else
-		{
-			/* we can try to store all valid tuples on one page */
-			v.spl_right = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
-			v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
-
-			if (lencleaneditup == 0)
-			{
-				/* all tuples are invalid, so moves half of its to right */
-				v.spl_leftvalid = v.spl_rightvalid = false;
-				v.spl_nright = 0;
-				v.spl_nleft = 0;
-				for (i = 1; i <= len; i++)
-					if (i - 1 < len / 2)
-						v.spl_left[v.spl_nleft++] = i;
-					else
-						v.spl_right[v.spl_nright++] = i;
-			}
-			else
-			{
-				/*
-				 * we will not call gistUserPicksplit, just put good tuples on
-				 * left and invalid on right
-				 */
-				v.spl_nleft = lencleaneditup;
-				v.spl_nright = 0;
-				for (i = 1; i < entryvec->n; i++)
-					v.spl_left[i - 1] = i;
-				gistToRealOffset(v.spl_left, v.spl_nleft, realoffset);
-				v.spl_lattr[0] = v.spl_ldatum = (Datum) 0;
-				v.spl_rattr[0] = v.spl_rdatum = (Datum) 0;
-				v.spl_lisnull[0] = true;
-				v.spl_risnull[0] = true;
-				gistunionsubkey(r, giststate, itup, &v, true);
-				v.spl_leftvalid = true;
-				v.spl_rightvalid = false;
-			}
-		}
+		GistSplitVec    gsvp;
+				
+		v.spl_right = offInvTuples;
+		v.spl_nright = nOffInvTuples;
+		v.spl_rightvalid = false;
+
+		v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
+		v.spl_nleft = 0;
+		for(i = 1; i <= len; i++) 
+			if ( !GistTupleIsInvalid(itup[i - 1]) )
+				v.spl_left[ v.spl_nleft++ ] = i;
+		v.spl_leftvalid = true;
+		
+		gsvp.idgrp = NULL;
+		gsvp.attrsize = v.spl_lattrsize;
+		gsvp.attr = v.spl_lattr;
+		gsvp.len = v.spl_nleft;
+		gsvp.entries = v.spl_left;
+		gsvp.isnull = v.spl_lisnull;
+
+		gistunionsubkeyvec(giststate, itup, &gsvp, true);
 	}
 	else
 	{
@@ -1088,12 +1033,6 @@ gistSplit(Relation r,
 	for (i = 0; i < v.spl_nright; i++)
 		rvectup[i] = itup[v.spl_right[i] - 1];
 
-	/* place invalid tuples on right page if itsn't done yet */
-	for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++)
-	{
-		rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
-	}
-
 	/* finalyze splitting (may need another split) */
 	if (!gistfitpage(rvectup, v.spl_nright))
 	{
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index ca5a9d652d2..92798a27d30 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *			$PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.12 2006/05/17 16:34:59 teodor Exp $
+ *			$PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.13 2006/05/19 16:15:17 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -139,6 +139,30 @@ gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
 	return itvec;
 }
 
+/*
+ * make plain IndexTupleVector
+ */
+
+IndexTupleData *
+gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
+	char *ptr, *ret;
+	int i;
+
+	*memlen=0;
+					
+	for (i = 0; i < veclen; i++)
+		*memlen += IndexTupleSize(vec[i]);
+
+	ptr = ret = palloc(*memlen);
+
+	for (i = 0; i < veclen; i++) { 
+		memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
+		ptr += IndexTupleSize(vec[i]);
+	}
+
+	return (IndexTupleData*)ret;
+}
+
 /*
  * Return an IndexTuple containing the result of applying the "union"
  * method to the specified IndexTuple vector.
@@ -313,100 +337,101 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
 	return newtup;
 }
 
-void
-gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall)
-{
-	int			lr;
+void 
+gistunionsubkeyvec(GISTSTATE *giststate,  IndexTuple *itvec, 
+							GistSplitVec *gsvp, bool isall) {
+	int			i;
+	GistEntryVector *evec;
 
-	for (lr = 0; lr < 2; lr++)
-	{
-		OffsetNumber *entries;
-		int			i;
-		Datum	   *attr;
-		int			len,
-				   *attrsize;
-		bool	   *isnull;
-		GistEntryVector *evec;
-
-		if (lr)
-		{
-			attrsize = spl->spl_lattrsize;
-			attr = spl->spl_lattr;
-			len = spl->spl_nleft;
-			entries = spl->spl_left;
-			isnull = spl->spl_lisnull;
-		}
-		else
-		{
-			attrsize = spl->spl_rattrsize;
-			attr = spl->spl_rattr;
-			len = spl->spl_nright;
-			entries = spl->spl_right;
-			isnull = spl->spl_risnull;
-		}
+	evec = palloc(((gsvp->len < 2) ? 2 : gsvp->len) * sizeof(GISTENTRY) + GEVHDRSZ);
 
-		evec = palloc(((len < 2) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+	for (i = (isall) ? 0 : 1; i < giststate->tupdesc->natts; i++)
+	{
+		int			j;
+		Datum		datum;
+		int			datumsize;
+		int			real_len;
 
-		for (i = (isall) ? 0 : 1; i < r->rd_att->natts; i++)
+		real_len = 0;
+		for (j = 0; j < gsvp->len; j++)
 		{
-			int			j;
-			Datum		datum;
-			int			datumsize;
-			int			real_len;
+			bool		IsNull;
 
-			real_len = 0;
-			for (j = 0; j < len; j++)
-			{
-				bool		IsNull;
+			if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[j]])
+				continue;
 
-				if (spl->spl_idgrp[entries[j]])
-					continue;
-				datum = index_getattr(itvec[entries[j] - 1], i + 1,
+			datum = index_getattr(itvec[gsvp->entries[j] - 1], i + 1,
 									  giststate->tupdesc, &IsNull);
-				if (IsNull)
-					continue;
-				gistdentryinit(giststate, i,
-							   &(evec->vector[real_len]),
-							   datum,
-							   NULL, NULL, (OffsetNumber) 0,
+			if (IsNull)
+				continue;
+			gistdentryinit(giststate, i,
+						   &(evec->vector[real_len]),
+						   datum,
+						   NULL, NULL, (OffsetNumber) 0,
 						   ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
-							   FALSE, IsNull);
-				real_len++;
+						   FALSE, IsNull);
+			real_len++;
 
-			}
+		}
 
-			if (real_len == 0)
+		if (real_len == 0)
+		{
+			datum = (Datum) 0;
+			datumsize = 0;
+			gsvp->isnull[i] = true;
+		}
+		else
+		{
+			/*
+			 * evec->vector[0].bytes may be not defined, so form union
+			 * with itself
+			 */
+			if (real_len == 1)
 			{
-				datum = (Datum) 0;
-				datumsize = 0;
-				isnull[i] = true;
+				evec->n = 2;
+				memcpy(&(evec->vector[1]), &(evec->vector[0]),
+					   sizeof(GISTENTRY));
 			}
 			else
-			{
-				/*
-				 * evec->vector[0].bytes may be not defined, so form union
-				 * with itself
-				 */
-				if (real_len == 1)
-				{
-					evec->n = 2;
-					memcpy(&(evec->vector[1]), &(evec->vector[0]),
-						   sizeof(GISTENTRY));
-				}
-				else
-					evec->n = real_len;
-				datum = FunctionCall2(&giststate->unionFn[i],
-									  PointerGetDatum(evec),
-									  PointerGetDatum(&datumsize));
-				isnull[i] = false;
-			}
-
-			attr[i] = datum;
-			attrsize[i] = datumsize;
+				evec->n = real_len;
+			datum = FunctionCall2(&giststate->unionFn[i],
+								  PointerGetDatum(evec),
+								  PointerGetDatum(&datumsize));
+			gsvp->isnull[i] = false;
 		}
+
+		gsvp->attr[i] = datum;
+		gsvp->attrsize[i] = datumsize;
 	}
 }
 
+/*
+ * unions subkey for after user picksplit over first column
+ */
+static void
+gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
+{
+	GistSplitVec	gsvp;
+
+	gsvp.idgrp = spl->spl_idgrp;
+
+	gsvp.attrsize = spl->spl_lattrsize;
+	gsvp.attr = spl->spl_lattr;
+	gsvp.len = spl->spl_nleft;
+	gsvp.entries = spl->spl_left;
+	gsvp.isnull = spl->spl_lisnull;
+
+	gistunionsubkeyvec(giststate, itvec, &gsvp, false);
+
+	gsvp.attrsize = spl->spl_rattrsize;
+	gsvp.attr = spl->spl_rattr;
+	gsvp.len = spl->spl_nright;
+	gsvp.entries = spl->spl_right;
+	gsvp.isnull = spl->spl_risnull;
+
+	gistunionsubkeyvec(giststate, itvec, &gsvp, false);
+}
+
 /*
  * find group in vector with equal value
  */
@@ -840,7 +865,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
 	 * if index is multikey, then we must to try get smaller bounding box for
 	 * subkey(s)
 	 */
-	if (r->rd_att->natts > 1)
+	if (giststate->tupdesc->natts > 1)
 	{
 		int			MaxGrpId;
 
@@ -851,7 +876,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
 		MaxGrpId = gistfindgroup(giststate, entryvec->vector, v);
 
 		/* form union of sub keys for each page (l,p) */
-		gistunionsubkey(r, giststate, itup, v, false);
+		gistunionsubkey(giststate, itup, v);
 
 		/*
 		 * if possible, we insert equivalent tuples with control by penalty
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 1126727cd97..aef2056a34d 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *			 $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.18 2006/05/19 11:10:25 teodor Exp $
+ *			 $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.19 2006/05/19 16:15:17 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -557,28 +557,16 @@ gistMakePageLayout(Buffer *buffers, int nbuffers) {
 
 	while( nbuffers-- > 0 ) {
 		Page page = BufferGetPage( buffers[ nbuffers ] );
-		IndexTuple	idxtup;
-		OffsetNumber	i;
-		char *ptr;
+		IndexTuple*	vec;
+		int	veclen;
 
 		resptr = (SplitedPageLayout*)palloc0( sizeof(SplitedPageLayout) );
 
 		resptr->block.blkno = BufferGetBlockNumber( buffers[ nbuffers ] );
 		resptr->block.num = PageGetMaxOffsetNumber( page );
 
-		for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
-			idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
-			resptr->lenlist += IndexTupleSize(idxtup);
-		}
-
-		resptr->list = (IndexTupleData*)palloc( resptr->lenlist );
-		ptr = (char*)(resptr->list);
-
-		for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
-			idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
-			memcpy( ptr, idxtup, IndexTupleSize(idxtup) );
-			ptr += IndexTupleSize(idxtup);
-		}
+		vec = gistextractpage( page, &veclen ); 
+		resptr->list = gistfillitupvec( vec, veclen, &(resptr->lenlist) );
 
 		resptr->next = res;
 		res = resptr;
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index a866277fe9f..f08d49dbf90 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.14 2006/05/17 16:34:59 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.15 2006/05/19 16:15:17 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -283,6 +283,8 @@ extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
 extern IndexTuple *gistjoinvector(
 			   IndexTuple *itvec, int *len,
 			   IndexTuple *additvec, int addlen);
+extern IndexTupleData* gistfillitupvec(IndexTuple *vec, int veclen, int *memlen);
+
 extern IndexTuple gistunion(Relation r, IndexTuple *itvec,
 		  int len, GISTSTATE *giststate);
 extern IndexTuple gistgetadjusted(Relation r,
@@ -308,8 +310,19 @@ extern void gistcentryinit(GISTSTATE *giststate, int nkey,
 extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
 				  IndexTuple tuple, Page p, OffsetNumber o,
 				  GISTENTRY *attdata, bool *isnull);
-extern void gistunionsubkey(Relation r, GISTSTATE *giststate,
-				IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall);
+
+typedef struct {
+	int		*attrsize;
+	Datum	*attr;
+	int		len;
+	OffsetNumber *entries;
+	bool	*isnull;
+	int		*idgrp;
+} GistSplitVec;
+
+extern void gistunionsubkeyvec(GISTSTATE *giststate, 
+	IndexTuple *itvec, GistSplitVec *gsvp,  bool isall);
+
 extern void GISTInitBuffer(Buffer b, uint32 f);
 extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
 			   Datum k, Relation r, Page pg, OffsetNumber o,
-- 
GitLab