From 8876e37d07baa0041649c5894e948ce65292b714 Mon Sep 17 00:00:00 2001
From: Teodor Sigaev <teodor@sigaev.ru>
Date: Wed, 17 May 2006 16:34:59 +0000
Subject: [PATCH] Reduce size of critial section during vacuum full, critical
 sections now isn't nested. All user-defined functions now is called outside
 critsections. Small improvements in WAL protocol.

TODO: improve XLOG replay
---
 src/backend/access/gist/gist.c       |   8 +-
 src/backend/access/gist/gistutil.c   |   9 +-
 src/backend/access/gist/gistvacuum.c | 483 +++++++++++++++++----------
 src/backend/access/gist/gistxlog.c   | 131 +++++---
 src/include/access/gist_private.h    |  18 +-
 5 files changed, 403 insertions(+), 246 deletions(-)

diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 4ce461d4463..d207b7ecfa7 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.134 2006/05/10 23:18:38 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.135 2006/05/17 16:34:59 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -347,7 +347,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
 		 * Form index tuples vector to split:
 		 * remove old tuple if t's needed and add new tuples to vector
 		 */
-		itvec = gistextractbuffer(state->stack->buffer, &tlen);
+		itvec = gistextractpage(state->stack->page, &tlen);
 		if ( !is_leaf ) {
 			/* on inner page we should remove old tuple */
 			int pos = state->stack->childoffnum - FirstOffsetNumber;
@@ -501,7 +501,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
 			}
 
 			rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
-									offs, noffs, false,
+									offs, noffs, 
 									state->itup, state->ituplen,
 									&(state->key));
 
@@ -1157,7 +1157,7 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
 		XLogRecData *rdata;
 
 		rdata = formUpdateRdata(r->rd_node, buffer,
-								NULL, 0, false,
+								NULL, 0,
 								itup, len, key);
 
 		recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index d5d6405100b..ca5a9d652d2 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *			$PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.11 2006/05/10 09:19:54 teodor Exp $
+ *			$PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.12 2006/05/17 16:34:59 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -112,18 +112,17 @@ gistfitpage(IndexTuple *itvec, int len) {
  * Read buffer into itup vector
  */
 IndexTuple *
-gistextractbuffer(Buffer buffer, int *len /* out */ )
+gistextractpage(Page page, int *len /* out */ )
 {
 	OffsetNumber i,
 				maxoff;
 	IndexTuple *itvec;
-	Page		p = (Page) BufferGetPage(buffer);
 
-	maxoff = PageGetMaxOffsetNumber(p);
+	maxoff = PageGetMaxOffsetNumber(page);
 	*len = maxoff;
 	itvec = palloc(sizeof(IndexTuple) * maxoff);
 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-		itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+		itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
 
 	return itvec;
 }
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index e81c0ebf487..9b32304d1ae 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -47,23 +47,235 @@ typedef struct
 	bool		emptypage;
 } ArrayTuple;
 
+/*
+ * Make union of keys on page 
+ */
+static IndexTuple
+PageMakeUnionKey(GistVacuum *gv, Buffer buffer) {
+	Page	page = BufferGetPage( buffer );
+	IndexTuple *vec,
+				tmp, res;
+	int			veclen = 0;
+	MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+	vec = gistextractpage(page, &veclen);
+	/* we call gistunion() in temprorary context because user-defined functions called in gistunion()
+	   may do not free all memory */
+	tmp = gistunion(gv->index, vec, veclen, &(gv->giststate));
+	MemoryContextSwitchTo(oldCtx);
+
+	res = (IndexTuple) palloc(IndexTupleSize(tmp));
+	memcpy(res, tmp, IndexTupleSize(tmp));
+
+	ItemPointerSetBlockNumber(&(res->t_tid), BufferGetBlockNumber(buffer));
+	GistTupleSetValid(res);
+
+	MemoryContextReset(gv->opCtx);
+
+	return res;
+}
+
+static void
+gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) {
+	Buffer  buffer;
+	Page    page;
+
+	buffer = ReadBuffer(gv->index, blkno);
+	LockBuffer(buffer, GIST_EXCLUSIVE);
+	page = (Page) BufferGetPage(buffer);
+
+	if ( !GistPageIsLeaf(page) ) {
+		int     i;
+
+		for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i = OffsetNumberNext(i)) {
+			ItemId iid = PageGetItemId(page, i);
+			IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid);
+			gistDeleteSubtree(gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid))); 
+		}
+	}
+
+	START_CRIT_SECTION();
+
+	MarkBufferDirty(buffer);
+
+	page = (Page) BufferGetPage(buffer);
+	GistPageSetDeleted(page);
+	gv->result->std.pages_deleted++;
+
+	if (!gv->index->rd_istemp)
+	{
+		XLogRecData rdata;
+		XLogRecPtr	recptr;
+		gistxlogPageDelete	xlrec;
+
+		xlrec.node = gv->index->rd_node;
+		xlrec.blkno = blkno;
+
+		rdata.buffer = InvalidBuffer;
+		rdata.data = (char *) &xlrec;
+		rdata.len = sizeof(gistxlogPageDelete);
+		rdata.next = NULL;
+
+		recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata);
+		PageSetLSN(page, recptr);
+		PageSetTLI(page, ThisTimeLineID);
+	}
+	else
+		PageSetLSN(page, XLogRecPtrForTemp);
+	
+	END_CRIT_SECTION();
+
+	UnlockReleaseBuffer(buffer);
+}
+
+static Page    
+GistPageGetCopyPage( Page page ) {
+	Size    pageSize = PageGetPageSize( page );
+	Page tmppage;
+
+	tmppage=(Page)palloc( pageSize );
+	memcpy( tmppage, page, pageSize );
+
+	return tmppage;
+}
+
+static ArrayTuple
+vacuumSplitPage(GistVacuum *gv, Page tempPage, Buffer buffer, IndexTuple *addon, int curlenaddon) {
+	ArrayTuple	res = {NULL, 0, false};
+	IndexTuple *vec;
+	SplitedPageLayout *dist = NULL,
+				   *ptr;
+	int			i, veclen=0;
+	BlockNumber	blkno = BufferGetBlockNumber(buffer);
+	MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+	vec = gistextractpage(tempPage, &veclen);
+	vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
+	dist = gistSplit(gv->index, tempPage, vec, veclen, &(gv->giststate));
+
+	MemoryContextSwitchTo(oldCtx);
+
+	if (blkno != GIST_ROOT_BLKNO) {
+		/* if non-root split then we should not allocate new buffer */
+		dist->buffer = buffer;
+		dist->page = tempPage;
+		/* during vacuum we never split leaf page */
+		GistPageGetOpaque(dist->page)->flags = 0;
+	} else
+		pfree(tempPage);
+
+	res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
+	res.ituplen = 0;
+
+	/* make new pages and fills them */
+	for (ptr = dist; ptr; ptr = ptr->next) {
+		char *data;
+
+		if ( ptr->buffer == InvalidBuffer ) {
+			ptr->buffer = gistNewBuffer( gv->index );
+			GISTInitBuffer( ptr->buffer, 0 );
+			ptr->page = BufferGetPage(ptr->buffer);
+		}
+		ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+		data = (char*)(ptr->list);
+		for(i=0;i<ptr->block.num;i++) {
+			if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+				elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
+			data += IndexTupleSize((IndexTuple)data);
+		}
+
+		ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+		res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
+		memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
+		res.ituplen++;
+	}
+
+	START_CRIT_SECTION();
+
+	for (ptr = dist; ptr; ptr = ptr->next) {
+		MarkBufferDirty(ptr->buffer);
+		GistPageGetOpaque(ptr->page)->rightlink = InvalidBlockNumber;
+	}
+
+	/* restore splitted non-root page */
+	if (blkno != GIST_ROOT_BLKNO) {
+		PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) );
+		dist->page = BufferGetPage( dist->buffer );
+	}
+
+	if (!gv->index->rd_istemp)
+	{
+		XLogRecPtr	recptr;
+		XLogRecData *rdata;
+		ItemPointerData key;		/* set key for incomplete
+									 * insert */
+		char	   *xlinfo;
+
+		ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+
+		rdata = formSplitRdata(gv->index->rd_node, blkno,
+										   false, &key, dist);
+		xlinfo = rdata->data;
+
+		recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+		for (ptr = dist; ptr; ptr = ptr->next)
+		{
+			PageSetLSN(BufferGetPage(ptr->buffer), recptr);
+			PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+		}
+
+		pfree(xlinfo);
+		pfree(rdata);
+	}
+	else
+	{
+		for (ptr = dist; ptr; ptr = ptr->next)
+			PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
+	}
+
+	for (ptr = dist; ptr; ptr = ptr->next)
+	{
+		/* we must keep the buffer pin on the head page */
+		if (BufferGetBlockNumber(ptr->buffer) != blkno)
+			UnlockReleaseBuffer( ptr->buffer );
+	}
+
+	if (blkno == GIST_ROOT_BLKNO)
+	{
+		ItemPointerData key;		/* set key for incomplete
+									 * insert */
+
+		ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+
+		gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
+	}
+
+	END_CRIT_SECTION();
+
+	MemoryContextReset(gv->opCtx);
+
+	return res;
+}
 
 static ArrayTuple
 gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 {
 	ArrayTuple	res = {NULL, 0, false};
 	Buffer		buffer;
-	Page		page;
+	Page		page, tempPage = NULL;
 	OffsetNumber i,
 				maxoff;
 	ItemId		iid;
 	int			lenaddon = 4,
 				curlenaddon = 0,
-				ntodelete = 0;
+				nOffToDelete = 0,
+				nBlkToDelete = 0;
 	IndexTuple	idxtuple,
 			   *addon = NULL;
 	bool		needwrite = false;
-	OffsetNumber todelete[MaxOffsetNumber];
+	OffsetNumber offToDelete[MaxOffsetNumber];
+	BlockNumber  blkToDelete[MaxOffsetNumber];
 	ItemPointerData *completed = NULL;
 	int			ncompleted = 0,
 				lencompleted = 16;
@@ -76,12 +288,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 	page = (Page) BufferGetPage(buffer);
 	maxoff = PageGetMaxOffsetNumber(page);
 
-	/*
-	 * XXX need to reduce scope of changes to page so we can make this
-	 * critical section less extensive
-	 */
-	START_CRIT_SECTION();
-
 	if (GistPageIsLeaf(page))
 	{
 		if (GistTuplesDeleted(page))
@@ -92,13 +298,16 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 		completed = (ItemPointerData *) palloc(sizeof(ItemPointerData) * lencompleted);
 		addon = (IndexTuple *) palloc(sizeof(IndexTuple) * lenaddon);
 
+		/* get copy of page to work */
+		tempPage = GistPageGetCopyPage(page);
+
 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
 		{
 			ArrayTuple	chldtuple;
 			bool		needchildunion;
 
-			iid = PageGetItemId(page, i);
-			idxtuple = (IndexTuple) PageGetItem(page, iid);
+			iid = PageGetItemId(tempPage, i);
+			idxtuple = (IndexTuple) PageGetItem(tempPage, iid);
 			needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false;
 
 			if (needchildunion)
@@ -109,14 +318,19 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 										 needchildunion);
 			if (chldtuple.ituplen || chldtuple.emptypage)
 			{
-				PageIndexTupleDelete(page, i);
-				todelete[ntodelete++] = i;
+				/* update tuple or/and inserts new */
+				if ( chldtuple.emptypage )
+					blkToDelete[nBlkToDelete++] = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
+				offToDelete[nOffToDelete++] = i;
+				PageIndexTupleDelete(tempPage, i);
 				i--;
 				maxoff--;
 				needwrite = needunion = true;
 
 				if (chldtuple.ituplen)
 				{
+
+					Assert( chldtuple.emptypage == false );
 					while (curlenaddon + chldtuple.ituplen >= lenaddon)
 					{
 						lenaddon *= 2;
@@ -150,200 +364,102 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 				}
 			}
 		}
+		
+		Assert( maxoff == PageGetMaxOffsetNumber(tempPage) );
 
 		if (curlenaddon)
 		{
 			/* insert updated tuples */
-			if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber))
-			{
+			if (gistnospace(tempPage, addon, curlenaddon, InvalidOffsetNumber)) {
 				/* there is no space on page to insert tuples */
-				IndexTuple *vec;
-				SplitedPageLayout *dist = NULL,
-						   *ptr;
-				int			i, veclen=0;
-				MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
-
-				vec = gistextractbuffer(buffer, &veclen);
-				vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
-				dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate));
-
-				MemoryContextSwitchTo(oldCtx);
-
-				if (blkno != GIST_ROOT_BLKNO) {
-					/* if non-root split then we should not allocate new buffer */
-					dist->buffer = buffer;
-					dist->page = BufferGetPage(dist->buffer);
-					GistPageGetOpaque(dist->page)->flags = 0;
-				}
-
-				res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
-				res.ituplen = 0;
-
-				/* make new pages and fills them */
-				for (ptr = dist; ptr; ptr = ptr->next) {
-					char *data;
-
-					if ( ptr->buffer == InvalidBuffer ) {
-						ptr->buffer = gistNewBuffer( gv->index );
-						GISTInitBuffer( ptr->buffer, 0 );
-						ptr->page = BufferGetPage(ptr->buffer);
-					}
-					ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
-
-					data = (char*)(ptr->list);
-					for(i=0;i<ptr->block.num;i++) {
-						if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
-							elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
-						data += IndexTupleSize((IndexTuple)data);
-					}
-
-					ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
-					res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
-					memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
-					res.ituplen++;
-
-					MarkBufferDirty(ptr->buffer);
-				}
-
-				if (!gv->index->rd_istemp)
-				{
-					XLogRecPtr	recptr;
-					XLogRecData *rdata;
-					ItemPointerData key;		/* set key for incomplete
-												 * insert */
-					char	   *xlinfo;
-
-					ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+				res = vacuumSplitPage(gv, tempPage, buffer, addon, curlenaddon);
+				tempPage=NULL; /* vacuumSplitPage() free tempPage */
+				needwrite = needunion = false;		/* gistSplit already forms unions and writes pages */
+			} else
+				/* enough free space */
+				gistfillbuffer(gv->index, tempPage, addon, curlenaddon, InvalidOffsetNumber);
+		}
+	}
 
-					rdata = formSplitRdata(gv->index->rd_node, blkno,
-										   false, &key, dist);
-					xlinfo = rdata->data;
+	/* 
+	 * If page is empty, we should remove pointer to it before
+	 * deleting page (except root)
+	 */
 
-					recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
-					for (ptr = dist; ptr; ptr = ptr->next)
-					{
-						PageSetLSN(BufferGetPage(ptr->buffer), recptr);
-						PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
-					}
+	if ( blkno != GIST_ROOT_BLKNO && ( PageIsEmpty(page) || (tempPage && PageIsEmpty(tempPage)) ) ) {
+		/*
+		 * New version of page is empty, so leave it unchanged,
+		 * upper call will mark our page as deleted.
+		 * In case of page split we never will be here...
+		 *
+		 * If page was empty it can't become non-empty during processing 
+		 */
+		res.emptypage = true;
+		UnlockReleaseBuffer(buffer);
+	} else {
+		/* write page and remove its childs if it need */
 
-					pfree(xlinfo);
-					pfree(rdata);
-				}
-				else
-				{
-					for (ptr = dist; ptr; ptr = ptr->next)
-					{
-						PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
-					}
-				}
+		START_CRIT_SECTION();
 
-				for (ptr = dist; ptr; ptr = ptr->next)
-				{
-					/* we must keep the buffer pin on the head page */
-					if (BufferGetBlockNumber(ptr->buffer) != blkno)
-						UnlockReleaseBuffer( ptr->buffer );
-				}
+		if ( tempPage && needwrite ) {
+			PageRestoreTempPage(tempPage, page);
+			tempPage = NULL;
+		}
 
-				if (blkno == GIST_ROOT_BLKNO)
-				{
-					ItemPointerData key;		/* set key for incomplete
-												 * insert */
+		/* Empty index */ 
+		if (PageIsEmpty(page) && blkno == GIST_ROOT_BLKNO )
+		{
+			needwrite = true;
+			GistPageSetLeaf(page);
+		}
 
-					ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+	
+		if (needwrite)
+		{
+			MarkBufferDirty(buffer);
+			GistClearTuplesDeleted(page);
 
-					oldCtx = MemoryContextSwitchTo(gv->opCtx);
-					gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
-					MemoryContextSwitchTo(oldCtx);
-				}
+			if (!gv->index->rd_istemp)
+			{
+				XLogRecData *rdata;
+				XLogRecPtr	recptr;
+				char	   *xlinfo;
 
-				needwrite = false;
+				rdata = formUpdateRdata(gv->index->rd_node, buffer,
+										offToDelete, nOffToDelete,
+										addon, curlenaddon, NULL);
+				xlinfo = rdata->next->data;
 
-				MemoryContextReset(gv->opCtx);
+				recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
+				PageSetLSN(page, recptr);
+				PageSetTLI(page, ThisTimeLineID);
 
-				needunion = false;		/* gistSplit already forms unions */
+				pfree(xlinfo);
+				pfree(rdata);
 			}
 			else
-			{
-				/* enough free space */
-				gistfillbuffer(gv->index, page, addon, curlenaddon, InvalidOffsetNumber);
-			}
+				PageSetLSN(page, XLogRecPtrForTemp);
 		}
-	}
 
-	if (needunion)
-	{
-		/* forms union for page  or check empty */
-		if (PageIsEmpty(page))
-		{
-			if (blkno == GIST_ROOT_BLKNO)
-			{
-				needwrite = true;
-				GistPageSetLeaf(page);
-			}
-			else
-			{
-				needwrite = true;
-				res.emptypage = true;
-				GistPageSetDeleted(page);
-				gv->result->std.pages_deleted++;
-			}
-		}
-		else
-		{
-			IndexTuple *vec,
-						tmp;
-			int			veclen = 0;
-			MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
-
-			vec = gistextractbuffer(buffer, &veclen);
-			tmp = gistunion(gv->index, vec, veclen, &(gv->giststate));
-			MemoryContextSwitchTo(oldCtx);
+		END_CRIT_SECTION();
 
+		if ( needunion && !PageIsEmpty(page) )
+		{
 			res.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
 			res.ituplen = 1;
-			res.itup[0] = (IndexTuple) palloc(IndexTupleSize(tmp));
-			memcpy(res.itup[0], tmp, IndexTupleSize(tmp));
-
-			ItemPointerSetBlockNumber(&(res.itup[0]->t_tid), blkno);
-			GistTupleSetValid(res.itup[0]);
-
-			MemoryContextReset(gv->opCtx);
+			res.itup[0] = PageMakeUnionKey(gv, buffer);
 		}
-	}
 
-	if (needwrite)
-	{
-		MarkBufferDirty(buffer);
-		GistClearTuplesDeleted(page);
-
-		if (!gv->index->rd_istemp)
-		{
-			XLogRecData *rdata;
-			XLogRecPtr	recptr;
-			char	   *xlinfo;
-
-			rdata = formUpdateRdata(gv->index->rd_node, buffer,
-									todelete, ntodelete, res.emptypage,
-									addon, curlenaddon, NULL);
-			xlinfo = rdata->data;
+		UnlockReleaseBuffer(buffer);
 
-			recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
-			PageSetLSN(page, recptr);
-			PageSetTLI(page, ThisTimeLineID);
+		/* delete empty children, now we havn't any links to pointed subtrees */
+		for(i=0;i<nBlkToDelete;i++) 
+			gistDeleteSubtree(gv, blkToDelete[i]);
 
-			pfree(xlinfo);
-			pfree(rdata);
-		}
-		else
-			PageSetLSN(page, XLogRecPtrForTemp);
+		if (ncompleted && !gv->index->rd_istemp)
+			gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
 	}
 
-	END_CRIT_SECTION();
-
-	UnlockReleaseBuffer(buffer);
-
-	if (ncompleted && !gv->index->rd_istemp)
-		gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
 
 	for (i = 0; i < curlenaddon; i++)
 		pfree(addon[i]);
@@ -351,6 +467,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 		pfree(addon);
 	if (completed)
 		pfree(completed);
+	if (tempPage)
+		pfree(tempPage);
+
 	return res;
 }
 
@@ -627,10 +746,10 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 					gistxlogPageUpdate *xlinfo;
 
 					rdata = formUpdateRdata(rel->rd_node, buffer,
-											todelete, ntodelete, false,
+											todelete, ntodelete, 
 											NULL, 0,
 											NULL);
-					xlinfo = (gistxlogPageUpdate *) rdata->data;
+					xlinfo = (gistxlogPageUpdate *) rdata->next->data;
 
 					recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
 					PageSetLSN(page, recptr);
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index a029d8f1ec5..01dab119b2e 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *			 $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.16 2006/05/10 09:19:54 teodor Exp $
+ *			 $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.17 2006/05/17 16:34:59 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -209,41 +209,33 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
 		return;
 	}
 
-	if (xlrec.data->isemptypage)
+	if (isnewroot)
+		GISTInitBuffer(buffer, 0);
+	else if (xlrec.data->ntodelete)
 	{
-		while (!PageIsEmpty(page))
-			PageIndexTupleDelete(page, FirstOffsetNumber);
+		int			i;
 
-		if (xlrec.data->blkno == GIST_ROOT_BLKNO)
-			GistPageSetLeaf(page);
-		else
-			GistPageSetDeleted(page);
+		for (i = 0; i < xlrec.data->ntodelete; i++)
+			PageIndexTupleDelete(page, xlrec.todelete[i]);
+		if (GistPageIsLeaf(page))
+			GistMarkTuplesDeleted(page);
 	}
-	else
-	{
-		if (isnewroot)
-			GISTInitBuffer(buffer, 0);
-		else if (xlrec.data->ntodelete)
-		{
-			int			i;
 
-			for (i = 0; i < xlrec.data->ntodelete; i++)
-				PageIndexTupleDelete(page, xlrec.todelete[i]);
-			if (GistPageIsLeaf(page))
-				GistMarkTuplesDeleted(page);
-		}
+	/* add tuples */
+	if (xlrec.len > 0)
+		gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
 
-		/* add tuples */
-		if (xlrec.len > 0)
-			gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
+	/*
+	 * special case: leafpage, nothing to insert, nothing to delete, then
+	 * vacuum marks page
+	 */
+	if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
+		GistClearTuplesDeleted(page);
 
-		/*
-		 * special case: leafpage, nothing to insert, nothing to delete, then
-		 * vacuum marks page
-		 */
-		if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
-			GistClearTuplesDeleted(page);
-	}
+	if ( !GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO )
+		/* all links on non-leaf root page was deleted by vacuum full,
+		   so root page becomes a leaf */
+		GistPageSetLeaf(page);
 
 	GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
 	PageSetLSN(page, lsn);
@@ -252,6 +244,29 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
 	UnlockReleaseBuffer(buffer);
 }
 
+static void
+gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
+{
+	gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
+	Relation	reln;
+	Buffer		buffer;
+	Page		page;
+
+	reln = XLogOpenRelation(xldata->node);
+	buffer = XLogReadBuffer(reln, xldata->blkno, false);
+	if (!BufferIsValid(buffer))
+		return;
+
+	GISTInitBuffer( buffer, 0 );
+	page = (Page) BufferGetPage(buffer);
+	GistPageSetDeleted(page);
+
+	PageSetLSN(page, lsn);
+	PageSetTLI(page, ThisTimeLineID);
+	MarkBufferDirty(buffer);
+	UnlockReleaseBuffer(buffer);
+}
+
 static void
 decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
 {
@@ -382,6 +397,9 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
 		case XLOG_GIST_PAGE_UPDATE:
 			gistRedoPageUpdateRecord(lsn, record, false);
 			break;
+		case XLOG_GIST_PAGE_DELETE:
+			gistRedoPageDeleteRecord(lsn, record);
+			break;
 		case XLOG_GIST_NEW_ROOT:
 			gistRedoPageUpdateRecord(lsn, record, true);
 			break;
@@ -405,8 +423,10 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
 static void
 out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
 {
-	appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u",
-			node.spcNode, node.dbNode, node.relNode,
+	appendStringInfo(buf, "rel %u/%u/%u",
+			node.spcNode, node.dbNode, node.relNode);
+	if ( ItemPointerIsValid( &key ) )
+		appendStringInfo(buf, "; tid %u/%u",
 			ItemPointerGetBlockNumber(&key),
 			ItemPointerGetOffsetNumber(&key));
 }
@@ -418,6 +438,14 @@ out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
 	appendStringInfo(buf, "; block number %u", xlrec->blkno);
 }
 
+static void
+out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
+{
+	appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
+			xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
+			xlrec->blkno);
+}
+
 static void
 out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
 {
@@ -438,6 +466,9 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
 			appendStringInfo(buf, "page_update: ");
 			out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
 			break;
+		case XLOG_GIST_PAGE_DELETE:
+			out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
+			break;
 		case XLOG_GIST_NEW_ROOT:
 			appendStringInfo(buf, "new_root: ");
 			out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
@@ -643,7 +674,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
 					 * we split root, just copy tuples from old root to new
 					 * page
 					 */
-					parentitup = gistextractbuffer(buffers[numbuffer - 1],
+					parentitup = gistextractpage(pages[numbuffer - 1],
 												   &pituplen);
 
 					/* sanity check */
@@ -796,7 +827,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
  */
 XLogRecData *
 formUpdateRdata(RelFileNode node, Buffer buffer,
-				OffsetNumber *todelete, int ntodelete, bool emptypage,
+				OffsetNumber *todelete, int ntodelete,
 				IndexTuple *itup, int ituplen, ItemPointer key)
 {
 	XLogRecData *rdata;
@@ -804,35 +835,37 @@ formUpdateRdata(RelFileNode node, Buffer buffer,
 	int			cur,
 				i;
 
-	/* ugly wart in API: emptypage causes us to ignore other inputs */
-	if (emptypage)
-		ntodelete = ituplen = 0;
-
-	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
+	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
 	xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
 
 	xlrec->node = node;
 	xlrec->blkno = BufferGetBlockNumber(buffer);
 	xlrec->ntodelete = ntodelete;
-	xlrec->isemptypage = emptypage;
+
 	if (key)
 		xlrec->key = *key;
 	else
 		ItemPointerSetInvalid(&(xlrec->key));
 
-	rdata[0].data = (char *) xlrec;
-	rdata[0].len = sizeof(gistxlogPageUpdate);
-	rdata[0].buffer = InvalidBuffer;
+	rdata[0].buffer = buffer;
+	rdata[0].buffer_std = true;
+	rdata[0].data = NULL;
+	rdata[0].len = 0;
 	rdata[0].next = &(rdata[1]);
 
-	rdata[1].data = (char *) todelete;
-	rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
-	rdata[1].buffer = buffer;
-	rdata[1].buffer_std = true;
-	rdata[1].next = NULL;
+	rdata[1].data = (char *) xlrec;
+	rdata[1].len = sizeof(gistxlogPageUpdate);
+	rdata[1].buffer = InvalidBuffer;
+	rdata[1].next = &(rdata[2]);
+
+	rdata[2].data = (char *) todelete;
+	rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
+	rdata[2].buffer = buffer;
+	rdata[2].buffer_std = true;
+	rdata[2].next = NULL;
 
 	/* new tuples */
-	cur = 2;
+	cur = 3;
 	for (i = 0; i < ituplen; i++)
 	{
 		rdata[cur - 1].next = &(rdata[cur]);
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 7e9469f000b..a866277fe9f 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.13 2006/05/10 09:19:54 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.14 2006/05/17 16:34:59 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -85,20 +85,21 @@ extern const XLogRecPtr XLogRecPtrForTemp;
 #define XLOG_GIST_PAGE_SPLIT		0x30
 #define XLOG_GIST_INSERT_COMPLETE	0x40
 #define XLOG_GIST_CREATE_INDEX		0x50
+#define XLOG_GIST_PAGE_DELETE		0x60
 
 typedef struct gistxlogPageUpdate
 {
 	RelFileNode node;
 	BlockNumber blkno;
 
-	uint16		ntodelete;
-	bool		isemptypage;
-
 	/*
 	 * It used to identify completeness of insert. Sets to leaf itup
 	 */
 	ItemPointerData key;
 
+	/* number of deleted offsets */
+	uint16		ntodelete;
+
 	/*
 	 * follow: 1. todelete OffsetNumbers 2. tuples to insert
 	 */
@@ -131,6 +132,11 @@ typedef struct gistxlogInsertComplete
 	/* follows ItemPointerData key to clean */
 } gistxlogInsertComplete;
 
+typedef struct gistxlogPageDelete
+{
+	RelFileNode node;
+	BlockNumber blkno;
+} gistxlogPageDelete;
 
 /* SplitedPageLayout - gistSplit function result */
 typedef struct SplitedPageLayout
@@ -249,7 +255,7 @@ extern void gist_xlog_cleanup(void);
 extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
 
 extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
-				OffsetNumber *todelete, int ntodelete, bool emptypage,
+				OffsetNumber *todelete, int ntodelete, 
 				IndexTuple *itup, int ituplen, ItemPointer key);
 
 extern XLogRecData *formSplitRdata(RelFileNode node,
@@ -273,7 +279,7 @@ extern void gistcheckpage(Relation rel, Buffer buf);
 extern Buffer gistNewBuffer(Relation r);
 extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
 			   int len, OffsetNumber off);
-extern IndexTuple *gistextractbuffer(Buffer buffer, int *len /* out */ );
+extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
 extern IndexTuple *gistjoinvector(
 			   IndexTuple *itvec, int *len,
 			   IndexTuple *additvec, int addlen);
-- 
GitLab