Skip to content
Snippets Groups Projects
gistsplit.c 16.54 KiB
/*-------------------------------------------------------------------------
 *
 * gistsplit.c
 *	  Split page algorithm
 *
 *
 * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/backend/access/gist/gistsplit.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/gist_private.h"
#include "utils/rel.h"

typedef struct
{
	OffsetNumber *entries;
	int			len;
	Datum	   *attr;
	bool	   *isnull;
	bool	   *equiv;
} GistSplitUnion;


/*
 * Form unions of subkeys after a page split, ignoring any tuples
 * that are marked in gsvp->equiv[]
 */
static void
gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec,
				   GistSplitUnion *gsvp)
{
	IndexTuple *cleanedItVec;
	int			i,
				cleanedLen = 0;

	cleanedItVec = (IndexTuple *) palloc(sizeof(IndexTuple) * gsvp->len);

	for (i = 0; i < gsvp->len; i++)
	{
		if (gsvp->equiv && gsvp->equiv[gsvp->entries[i]])
			continue;

		cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1];
	}

	gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen,
					   gsvp->attr, gsvp->isnull);

	pfree(cleanedItVec);
}

/*
 * Recompute unions of subkeys after a page split, ignoring any tuples
 * that are marked in spl->spl_equiv[]
 */
static void
gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GistSplitVector *spl)
{
	GistSplitUnion gsvp;

	gsvp.equiv = spl->spl_equiv;

	gsvp.entries = spl->splitVector.spl_left;
	gsvp.len = spl->splitVector.spl_nleft;
	gsvp.attr = spl->spl_lattr;
	gsvp.isnull = spl->spl_lisnull;

	gistunionsubkeyvec(giststate, itvec, &gsvp);

	gsvp.entries = spl->splitVector.spl_right;
	gsvp.len = spl->splitVector.spl_nright;
	gsvp.attr = spl->spl_rattr;
	gsvp.isnull = spl->spl_risnull;

	gistunionsubkeyvec(giststate, itvec, &gsvp);
}

/*
 * find group in vector with equivalent value
 */
static int
gistfindgroup(Relation r, GISTSTATE *giststate, GISTENTRY *valvec, GistSplitVector *spl, int attno)
{
	int			i;
	GISTENTRY	entry;
	int			len = 0;

	/*
	 * attno key is always not null (see gistSplitByKey), so we may not check
	 * for nulls
	 */
	gistentryinit(entry, spl->splitVector.spl_rdatum, r, NULL, (OffsetNumber) 0, FALSE);
	for (i = 0; i < spl->splitVector.spl_nleft; i++)
	{
		float		penalty = gistpenalty(giststate, attno, &entry, false,
							   &valvec[spl->splitVector.spl_left[i]], false);

		if (penalty == 0.0)
		{
			spl->spl_equiv[spl->splitVector.spl_left[i]] = true;
			len++;
		}
	}

	gistentryinit(entry, spl->splitVector.spl_ldatum, r, NULL, (OffsetNumber) 0, FALSE);
	for (i = 0; i < spl->splitVector.spl_nright; i++)
	{
		float		penalty = gistpenalty(giststate, attno, &entry, false,
							  &valvec[spl->splitVector.spl_right[i]], false);

		if (penalty == 0.0)
		{
			spl->spl_equiv[spl->splitVector.spl_right[i]] = true;
			len++;
		}
	}

	return len;
}

static void
cleanupOffsets(OffsetNumber *a, int *len, bool *equiv, int *LenEquiv)
{
	int			curlen,
				i;
	OffsetNumber *curwpos;

	curlen = *len;
	curwpos = a;
	for (i = 0; i < *len; i++)
	{
		if (equiv[a[i]] == FALSE)
		{
			*curwpos = a[i];
			curwpos++;
		}
		else
		{
			/* corner case: we shouldn't make void array */
			if (curlen == 1)
			{
				equiv[a[i]] = FALSE;	/* mark item as non-equivalent */
				i--;			/* redo the same */
				*LenEquiv -= 1;
				continue;
			}
			else
				curlen--;
		}
	}

	*len = curlen;
}

static void
placeOne(Relation r, GISTSTATE *giststate, GistSplitVector *v, IndexTuple itup, OffsetNumber off, int attno)
{
	GISTENTRY	identry[INDEX_MAX_KEYS];
	bool		isnull[INDEX_MAX_KEYS];
	bool		toLeft = true;

	gistDeCompressAtt(giststate, r, itup, NULL, (OffsetNumber) 0, identry, isnull);

	for (; attno < giststate->tupdesc->natts; attno++)
	{
		float		lpenalty,
					rpenalty;
		GISTENTRY	entry;

		gistentryinit(entry, v->spl_lattr[attno], r, NULL, 0, FALSE);
		lpenalty = gistpenalty(giststate, attno, &entry, v->spl_lisnull[attno], identry + attno, isnull[attno]);
		gistentryinit(entry, v->spl_rattr[attno], r, NULL, 0, FALSE);
		rpenalty = gistpenalty(giststate, attno, &entry, v->spl_risnull[attno], identry + attno, isnull[attno]);

		if (lpenalty != rpenalty)
		{
			if (lpenalty > rpenalty)
				toLeft = false;
			break;
		}
	}

	if (toLeft)
		v->splitVector.spl_left[v->splitVector.spl_nleft++] = off;
	else
		v->splitVector.spl_right[v->splitVector.spl_nright++] = off;
}

#define SWAPVAR( s, d, t ) \
do {	\
	(t) = (s); \
	(s) = (d); \
	(d) = (t); \
} while(0)

/*
 * adjust left and right unions according to splits by previous
 * split by first columns. This function is called only in case
 * when pickSplit doesn't support subsplit.
 */

static void
supportSecondarySplit(Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC *sv, Datum oldL, Datum oldR)
{
	bool		leaveOnLeft = true,
				tmpBool;
	GISTENTRY	entryL,
				entryR,
				entrySL,
				entrySR;

	gistentryinit(entryL, oldL, r, NULL, 0, FALSE);
	gistentryinit(entryR, oldR, r, NULL, 0, FALSE);
	gistentryinit(entrySL, sv->spl_ldatum, r, NULL, 0, FALSE);
	gistentryinit(entrySR, sv->spl_rdatum, r, NULL, 0, FALSE);

	if (sv->spl_ldatum_exists && sv->spl_rdatum_exists)
	{
		float		penalty1,
					penalty2;

		penalty1 = gistpenalty(giststate, attno, &entryL, false, &entrySL, false) +
			gistpenalty(giststate, attno, &entryR, false, &entrySR, false);
		penalty2 = gistpenalty(giststate, attno, &entryL, false, &entrySR, false) +
			gistpenalty(giststate, attno, &entryR, false, &entrySL, false);

		if (penalty1 > penalty2)
			leaveOnLeft = false;

	}
	else
	{
		GISTENTRY  *entry1 = (sv->spl_ldatum_exists) ? &entryL : &entryR;
		float		penalty1,
					penalty2;

		/*
		 * there is only one previously defined union, so we just choose swap
		 * or not by lowest penalty
		 */

		penalty1 = gistpenalty(giststate, attno, entry1, false, &entrySL, false);
		penalty2 = gistpenalty(giststate, attno, entry1, false, &entrySR, false);

		if (penalty1 < penalty2)
			leaveOnLeft = (sv->spl_ldatum_exists) ? true : false;
		else
			leaveOnLeft = (sv->spl_rdatum_exists) ? true : false;
	}

	if (leaveOnLeft == false)
	{
		/*
		 * swap left and right
		 */
		OffsetNumber *off,
					noff;
		Datum		datum;

		SWAPVAR(sv->spl_left, sv->spl_right, off);
		SWAPVAR(sv->spl_nleft, sv->spl_nright, noff);
		SWAPVAR(sv->spl_ldatum, sv->spl_rdatum, datum);
		gistentryinit(entrySL, sv->spl_ldatum, r, NULL, 0, FALSE);
		gistentryinit(entrySR, sv->spl_rdatum, r, NULL, 0, FALSE);
	}

	if (sv->spl_ldatum_exists)
		gistMakeUnionKey(giststate, attno, &entryL, false, &entrySL, false,
						 &sv->spl_ldatum, &tmpBool);

	if (sv->spl_rdatum_exists)
		gistMakeUnionKey(giststate, attno, &entryR, false, &entrySR, false,
						 &sv->spl_rdatum, &tmpBool);
	sv->spl_ldatum_exists = sv->spl_rdatum_exists = false;
}

/*
 * Trivial picksplit implementaion. Function called only
 * if user-defined picksplit puts all keys to the one page.
 * That is a bug of user-defined picksplit but we'd like
 * to "fix" that.
 */
static void
genericPickSplit(GISTSTATE *giststate, GistEntryVector *entryvec, GIST_SPLITVEC *v, int attno)
{
	OffsetNumber i,
				maxoff;
	int			nbytes;
	GistEntryVector *evec;

	maxoff = entryvec->n - 1;

	nbytes = (maxoff + 2) * sizeof(OffsetNumber);

	v->spl_left = (OffsetNumber *) palloc(nbytes);
	v->spl_right = (OffsetNumber *) palloc(nbytes);
	v->spl_nleft = v->spl_nright = 0;

	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
	{
		if (i <= (maxoff - FirstOffsetNumber + 1) / 2)
		{
			v->spl_left[v->spl_nleft] = i;
			v->spl_nleft++;
		}
		else
		{
			v->spl_right[v->spl_nright] = i;
			v->spl_nright++;
		}
	}

	/*
	 * Form unions of each page
	 */

	evec = palloc(sizeof(GISTENTRY) * entryvec->n + GEVHDRSZ);

	evec->n = v->spl_nleft;
	memcpy(evec->vector, entryvec->vector + FirstOffsetNumber,
		   sizeof(GISTENTRY) * evec->n);
	v->spl_ldatum = FunctionCall2Coll(&giststate->unionFn[attno],
									  giststate->supportCollation[attno],
									  PointerGetDatum(evec),
									  PointerGetDatum(&nbytes));

	evec->n = v->spl_nright;
	memcpy(evec->vector, entryvec->vector + FirstOffsetNumber + v->spl_nleft,
		   sizeof(GISTENTRY) * evec->n);
	v->spl_rdatum = FunctionCall2Coll(&giststate->unionFn[attno],
									  giststate->supportCollation[attno],
									  PointerGetDatum(evec),
									  PointerGetDatum(&nbytes));
}

/*
 * Calls user picksplit method for attno columns to split vector to
 * two vectors. May use attno+n columns data to
 * get better split.
 * Returns TRUE and v->spl_equiv = NULL if left and right unions of attno columns are the same,
 * so caller may find better split
 * Returns TRUE and v->spl_equiv != NULL if there is tuples which may be freely moved
 */
static bool
gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVector *v,
				  IndexTuple *itup, int len, GISTSTATE *giststate)
{
	GIST_SPLITVEC *sv = &v->splitVector;

	/*
	 * now let the user-defined picksplit function set up the split vector; in
	 * entryvec there is no null value!!
	 */

	sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
	sv->spl_rdatum_exists = (v->spl_risnull[attno]) ? false : true;
	sv->spl_ldatum = v->spl_lattr[attno];
	sv->spl_rdatum = v->spl_rattr[attno];

	FunctionCall2Coll(&giststate->picksplitFn[attno],
					  giststate->supportCollation[attno],
					  PointerGetDatum(entryvec),
					  PointerGetDatum(sv));

	if (sv->spl_nleft == 0 || sv->spl_nright == 0)
	{
		ereport(DEBUG1,
				(errcode(ERRCODE_INTERNAL_ERROR),
			  errmsg("picksplit method for column %d of index \"%s\" failed",
					 attno + 1, RelationGetRelationName(r)),
				 errhint("The index is not optimal. To optimize it, contact a developer, or try to use the column as the second one in the CREATE INDEX command.")));

		/*
		 * Reinit GIST_SPLITVEC. Although that fields are not used by
		 * genericPickSplit(), let us set up it for further processing
		 */
		sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
		sv->spl_rdatum_exists = (v->spl_risnull[attno]) ? false : true;
		sv->spl_ldatum = v->spl_lattr[attno];
		sv->spl_rdatum = v->spl_rattr[attno];

		genericPickSplit(giststate, entryvec, sv, attno);

		if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
			supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
	}
	else
	{
		/* compatibility with old code */
		if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber)
			sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
		if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber)
			sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);

		if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
		{
			elog(DEBUG1, "picksplit method for column %d of index \"%s\" doesn't support secondary split",
				 attno + 1, RelationGetRelationName(r));

			supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
		}
	}

	v->spl_lattr[attno] = sv->spl_ldatum;
	v->spl_rattr[attno] = sv->spl_rdatum;
	v->spl_lisnull[attno] = false;
	v->spl_risnull[attno] = false;

	/*
	 * if index is multikey, then we must to try get smaller bounding box for
	 * subkey(s)
	 */
	v->spl_equiv = NULL;

	if (giststate->tupdesc->natts > 1 && attno + 1 != giststate->tupdesc->natts)
	{
		if (gistKeyIsEQ(giststate, attno, sv->spl_ldatum, sv->spl_rdatum))
		{
			/*
			 * Left and right key's unions are equial, so we can get better
			 * split by following columns. Note, unions for attno columns are
			 * already done.
			 */

			return true;
		}
		else
		{
			int			LenEquiv;

			v->spl_equiv = (bool *) palloc0(sizeof(bool) * (entryvec->n + 1));

			LenEquiv = gistfindgroup(r, giststate, entryvec->vector, v, attno);

			/*
			 * if possible, we should distribute equivalent tuples
			 */
			if (LenEquiv == 0)
			{
				gistunionsubkey(giststate, itup, v);
			}
			else
			{
				cleanupOffsets(sv->spl_left, &sv->spl_nleft, v->spl_equiv, &LenEquiv);
				cleanupOffsets(sv->spl_right, &sv->spl_nright, v->spl_equiv, &LenEquiv);

				gistunionsubkey(giststate, itup, v);
				if (LenEquiv == 1)
				{
					/*
					 * In case with one tuple we just choose left-right by
					 * penalty. It's simplify user-defined pickSplit
					 */
					OffsetNumber toMove = InvalidOffsetNumber;

					for (toMove = FirstOffsetNumber; toMove < entryvec->n; toMove++)
						if (v->spl_equiv[toMove])
							break;
					Assert(toMove < entryvec->n);

					placeOne(r, giststate, v, itup[toMove - 1], toMove, attno + 1);

					/*
					 * redo gistunionsubkey(): it will not degradate
					 * performance, because it's very rarely
					 */
					v->spl_equiv = NULL;
					gistunionsubkey(giststate, itup, v);

					return false;
				}
				else if (LenEquiv > 1)
					return true;
			}
		}
	}

	return false;
}

/*
 * simple split page
 */
static void
gistSplitHalf(GIST_SPLITVEC *v, int len)
{
	int			i;

	v->spl_nright = v->spl_nleft = 0;
	v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
	v->spl_right = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
	for (i = 1; i <= len; i++)
		if (i < len / 2)
			v->spl_right[v->spl_nright++] = i;
		else
			v->spl_left[v->spl_nleft++] = i;
}

/*
 * tries to split page by attno key, in case of null
 * values move those to separate page.
 */
void
gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate,
			   GistSplitVector *v, GistEntryVector *entryvec, int attno)
{
	int			i;
	static OffsetNumber offNullTuples[MaxOffsetNumber];
	int			nOffNullTuples = 0;

	for (i = 1; i <= len; i++)
	{
		Datum		datum;
		bool		IsNull;

		datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull);
		gistdentryinit(giststate, attno, &(entryvec->vector[i]),
					   datum, r, page, i,
					   FALSE, IsNull);
		if (IsNull)
			offNullTuples[nOffNullTuples++] = i;
	}

	if (nOffNullTuples == len)
	{
		/*
		 * Corner case: All keys in attno column are null, we should try to
		 * split by keys in next column. If all keys in all columns are NULL
		 * just split page half by half
		 */
		v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;

		if (attno + 1 == r->rd_att->natts)
			gistSplitHalf(&v->splitVector, len);
		else
			gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno + 1);
	}
	else if (nOffNullTuples > 0)
	{
		int			j = 0;

		/*
		 * We don't want to mix NULLs and not-NULLs keys on one page, so move
		 * nulls to right page
		 */
		v->splitVector.spl_right = offNullTuples;
		v->splitVector.spl_nright = nOffNullTuples;
		v->spl_risnull[attno] = TRUE;

		v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
		v->splitVector.spl_nleft = 0;
		for (i = 1; i <= len; i++)
			if (j < v->splitVector.spl_nright && offNullTuples[j] == i)
				j++;
			else
				v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;

		v->spl_equiv = NULL;
		gistunionsubkey(giststate, itup, v);
	}
	else
	{
		/*
		 * all keys are not-null
		 */
		entryvec->n = len + 1;

		if (gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate) && attno + 1 != r->rd_att->natts)
		{
			/*
			 * Splitting on attno column is not optimized: there is a tuples
			 * which can be freely left or right page, we will try to split
			 * page by following columns
			 */
			if (v->spl_equiv == NULL)
			{
				/*
				 * simple case: left and right keys for attno column are equal
				 */
				gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno + 1);
			}
			else
			{
				/* we should clean up vector from already distributed tuples */
				IndexTuple *newitup = (IndexTuple *) palloc((len + 1) * sizeof(IndexTuple));
				OffsetNumber *map = (OffsetNumber *) palloc((len + 1) * sizeof(IndexTuple));
				int			newlen = 0;
				GIST_SPLITVEC backupSplit = v->splitVector;

				for (i = 0; i < len; i++)
					if (v->spl_equiv[i + 1])
					{
						map[newlen] = i + 1;
						newitup[newlen++] = itup[i];
					}

				Assert(newlen > 0);

				backupSplit.spl_left = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
				memcpy(backupSplit.spl_left, v->splitVector.spl_left, sizeof(OffsetNumber) * v->splitVector.spl_nleft);
				backupSplit.spl_right = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
				memcpy(backupSplit.spl_right, v->splitVector.spl_right, sizeof(OffsetNumber) * v->splitVector.spl_nright);

				gistSplitByKey(r, page, newitup, newlen, giststate, v, entryvec, attno + 1);

				/* merge result of subsplit */
				for (i = 0; i < v->splitVector.spl_nleft; i++)
					backupSplit.spl_left[backupSplit.spl_nleft++] = map[v->splitVector.spl_left[i] - 1];
				for (i = 0; i < v->splitVector.spl_nright; i++)
					backupSplit.spl_right[backupSplit.spl_nright++] = map[v->splitVector.spl_right[i] - 1];

				v->splitVector = backupSplit;
				/* reunion left and right datums */
				gistunionsubkey(giststate, itup, v);
			}
		}
	}
}