/*-------------------------------------------------------------------------
 *
 * gistxlog.c
 *	  WAL replay logic for GiST.
 *
 *
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *			 $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.27 2008/01/01 19:45:46 momjian Exp $
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/gist_private.h"
#include "access/heapam.h"
#include "miscadmin.h"
#include "utils/memutils.h"


typedef struct
{
	gistxlogPageUpdate *data;
	int			len;
	IndexTuple *itup;
	OffsetNumber *todelete;
} PageUpdateRecord;

typedef struct
{
	gistxlogPage *header;
	IndexTuple *itup;
} NewPage;

typedef struct
{
	gistxlogPageSplit *data;
	NewPage    *page;
} PageSplitRecord;

/* track for incomplete inserts, idea was taken from nbtxlog.c */

typedef struct gistIncompleteInsert
{
	RelFileNode node;
	BlockNumber origblkno;		/* for splits */
	ItemPointerData key;
	int			lenblk;
	BlockNumber *blkno;
	XLogRecPtr	lsn;
	BlockNumber *path;
	int			pathlen;
} gistIncompleteInsert;


static MemoryContext opCtx;		/* working memory for operations */
static MemoryContext insertCtx; /* holds incomplete_inserts list */
static List *incomplete_inserts;


#define ItemPointerEQ(a, b) \
	( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
	  ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )


static void
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
					 BlockNumber *blkno, int lenblk,
					 PageSplitRecord *xlinfo /* to extract blkno info */ )
{
	MemoryContext oldCxt;
	gistIncompleteInsert *ninsert;

	if (!ItemPointerIsValid(&key))

		/*
		 * if key is null then we should not store insertion as incomplete,
		 * because it's a vacuum operation..
		 */
		return;

	oldCxt = MemoryContextSwitchTo(insertCtx);
	ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));

	ninsert->node = node;
	ninsert->key = key;
	ninsert->lsn = lsn;

	if (lenblk && blkno)
	{
		ninsert->lenblk = lenblk;
		ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
		memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
		ninsert->origblkno = *blkno;
	}
	else
	{
		int			i;

		Assert(xlinfo);
		ninsert->lenblk = xlinfo->data->npage;
		ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
		for (i = 0; i < ninsert->lenblk; i++)
			ninsert->blkno[i] = xlinfo->page[i].header->blkno;
		ninsert->origblkno = xlinfo->data->origblkno;
	}
	Assert(ninsert->lenblk > 0);

	/*
	 * Stick the new incomplete insert onto the front of the list, not the
	 * back.  This is so that gist_xlog_cleanup will process incompletions in
	 * last-in-first-out order.
	 */
	incomplete_inserts = lcons(ninsert, incomplete_inserts);

	MemoryContextSwitchTo(oldCxt);
}

static void
forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
{
	ListCell   *l;

	if (!ItemPointerIsValid(&key))
		return;

	if (incomplete_inserts == NIL)
		return;

	foreach(l, incomplete_inserts)
	{
		gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);

		if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
		{
			/* found */
			incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
			pfree(insert->blkno);
			pfree(insert);
			break;
		}
	}
}

static void
decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
{
	char	   *begin = XLogRecGetData(record),
			   *ptr;
	int			i = 0,
				addpath = 0;

	decoded->data = (gistxlogPageUpdate *) begin;

	if (decoded->data->ntodelete)
	{
		decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
		addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
	}
	else
		decoded->todelete = NULL;

	decoded->len = 0;
	ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
	while (ptr - begin < record->xl_len)
	{
		decoded->len++;
		ptr += IndexTupleSize((IndexTuple) ptr);
	}

	decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);

	ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
	while (ptr - begin < record->xl_len)
	{
		decoded->itup[i] = (IndexTuple) ptr;
		ptr += IndexTupleSize(decoded->itup[i]);
		i++;
	}
}

/*
 * redo any page update (except page split)
 */
static void
gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
{
	gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
	PageUpdateRecord xlrec;
	Relation	reln;
	Buffer		buffer;
	Page		page;

	/* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
	forgetIncompleteInsert(xldata->node, xldata->key);

	if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
		/* operation with root always finalizes insertion */
		pushIncompleteInsert(xldata->node, lsn, xldata->key,
							 &(xldata->blkno), 1,
							 NULL);

	/* nothing else to do if page was backed up (and no info to do it with) */
	if (record->xl_info & XLR_BKP_BLOCK_1)
		return;

	decodePageUpdateRecord(&xlrec, record);

	reln = XLogOpenRelation(xlrec.data->node);
	buffer = XLogReadBuffer(reln, xlrec.data->blkno, false);
	if (!BufferIsValid(buffer))
		return;
	page = (Page) BufferGetPage(buffer);

	if (XLByteLE(lsn, PageGetLSN(page)))
	{
		UnlockReleaseBuffer(buffer);
		return;
	}

	if (isnewroot)
		GISTInitBuffer(buffer, 0);
	else if (xlrec.data->ntodelete)
	{
		int			i;

		for (i = 0; i < xlrec.data->ntodelete; i++)
			PageIndexTupleDelete(page, xlrec.todelete[i]);
		if (GistPageIsLeaf(page))
			GistMarkTuplesDeleted(page);
	}

	/* add tuples */
	if (xlrec.len > 0)
		gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);

	/*
	 * special case: leafpage, nothing to insert, nothing to delete, then
	 * vacuum marks page
	 */
	if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
		GistClearTuplesDeleted(page);

	if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)

		/*
		 * all links on non-leaf root page was deleted by vacuum full, so root
		 * page becomes a leaf
		 */
		GistPageSetLeaf(page);

	GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
	PageSetLSN(page, lsn);
	PageSetTLI(page, ThisTimeLineID);
	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
}

static void
gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
{
	gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
	Relation	reln;
	Buffer		buffer;
	Page		page;

	/* nothing else to do if page was backed up (and no info to do it with) */
	if (record->xl_info & XLR_BKP_BLOCK_1)
		return;

	reln = XLogOpenRelation(xldata->node);
	buffer = XLogReadBuffer(reln, xldata->blkno, false);
	if (!BufferIsValid(buffer))
		return;

	page = (Page) BufferGetPage(buffer);
	GistPageSetDeleted(page);

	PageSetLSN(page, lsn);
	PageSetTLI(page, ThisTimeLineID);
	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
}

static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
{
	char	   *begin = XLogRecGetData(record),
			   *ptr;
	int			j,
				i = 0;

	decoded->data = (gistxlogPageSplit *) begin;
	decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);

	ptr = begin + sizeof(gistxlogPageSplit);
	for (i = 0; i < decoded->data->npage; i++)
	{
		Assert(ptr - begin < record->xl_len);
		decoded->page[i].header = (gistxlogPage *) ptr;
		ptr += sizeof(gistxlogPage);

		decoded->page[i].itup = (IndexTuple *)
			palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
		j = 0;
		while (j < decoded->page[i].header->num)
		{
			Assert(ptr - begin < record->xl_len);
			decoded->page[i].itup[j] = (IndexTuple) ptr;
			ptr += IndexTupleSize((IndexTuple) ptr);
			j++;
		}
	}
}

static void
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
{
	PageSplitRecord xlrec;
	Relation	reln;
	Buffer		buffer;
	Page		page;
	int			i;
	int			flags;

	decodePageSplitRecord(&xlrec, record);
	reln = XLogOpenRelation(xlrec.data->node);
	flags = xlrec.data->origleaf ? F_LEAF : 0;

	/* loop around all pages */
	for (i = 0; i < xlrec.data->npage; i++)
	{
		NewPage    *newpage = xlrec.page + i;

		buffer = XLogReadBuffer(reln, newpage->header->blkno, true);
		Assert(BufferIsValid(buffer));
		page = (Page) BufferGetPage(buffer);

		/* ok, clear buffer */
		GISTInitBuffer(buffer, flags);

		/* and fill it */
		gistfillbuffer(reln, page, newpage->itup, newpage->header->num, FirstOffsetNumber);

		PageSetLSN(page, lsn);
		PageSetTLI(page, ThisTimeLineID);
		MarkBufferDirty(buffer);
		UnlockReleaseBuffer(buffer);
	}

	forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);

	pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
						 NULL, 0,
						 &xlrec);
}

static void
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
{
	RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
	Relation	reln;
	Buffer		buffer;
	Page		page;

	reln = XLogOpenRelation(*node);
	buffer = XLogReadBuffer(reln, GIST_ROOT_BLKNO, true);
	Assert(BufferIsValid(buffer));
	page = (Page) BufferGetPage(buffer);

	GISTInitBuffer(buffer, F_LEAF);

	PageSetLSN(page, lsn);
	PageSetTLI(page, ThisTimeLineID);

	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
}

static void
gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
{
	char	   *begin = XLogRecGetData(record),
			   *ptr;
	gistxlogInsertComplete *xlrec;

	xlrec = (gistxlogInsertComplete *) begin;

	ptr = begin + sizeof(gistxlogInsertComplete);
	while (ptr - begin < record->xl_len)
	{
		Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
		forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
		ptr += sizeof(ItemPointerData);
	}
}

void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
	uint8		info = record->xl_info & ~XLR_INFO_MASK;

	MemoryContext oldCxt;

	oldCxt = MemoryContextSwitchTo(opCtx);
	switch (info)
	{
		case XLOG_GIST_PAGE_UPDATE:
			gistRedoPageUpdateRecord(lsn, record, false);
			break;
		case XLOG_GIST_PAGE_DELETE:
			gistRedoPageDeleteRecord(lsn, record);
			break;
		case XLOG_GIST_NEW_ROOT:
			gistRedoPageUpdateRecord(lsn, record, true);
			break;
		case XLOG_GIST_PAGE_SPLIT:
			gistRedoPageSplitRecord(lsn, record);
			break;
		case XLOG_GIST_CREATE_INDEX:
			gistRedoCreateIndex(lsn, record);
			break;
		case XLOG_GIST_INSERT_COMPLETE:
			gistRedoCompleteInsert(lsn, record);
			break;
		default:
			elog(PANIC, "gist_redo: unknown op code %u", info);
	}

	MemoryContextSwitchTo(oldCxt);
	MemoryContextReset(opCtx);
}

static void
out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
{
	appendStringInfo(buf, "rel %u/%u/%u",
					 node.spcNode, node.dbNode, node.relNode);
	if (ItemPointerIsValid(&key))
		appendStringInfo(buf, "; tid %u/%u",
						 ItemPointerGetBlockNumber(&key),
						 ItemPointerGetOffsetNumber(&key));
}

static void
out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
{
	out_target(buf, xlrec->node, xlrec->key);
	appendStringInfo(buf, "; block number %u", xlrec->blkno);
}

static void
out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
{
	appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
				xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
					 xlrec->blkno);
}

static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{
	appendStringInfo(buf, "page_split: ");
	out_target(buf, xlrec->node, xlrec->key);
	appendStringInfo(buf, "; block number %u splits to %d pages",
					 xlrec->origblkno, xlrec->npage);
}

void
gist_desc(StringInfo buf, uint8 xl_info, char *rec)
{
	uint8		info = xl_info & ~XLR_INFO_MASK;

	switch (info)
	{
		case XLOG_GIST_PAGE_UPDATE:
			appendStringInfo(buf, "page_update: ");
			out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
			break;
		case XLOG_GIST_PAGE_DELETE:
			out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
			break;
		case XLOG_GIST_NEW_ROOT:
			appendStringInfo(buf, "new_root: ");
			out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
			break;
		case XLOG_GIST_PAGE_SPLIT:
			out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
			break;
		case XLOG_GIST_CREATE_INDEX:
			appendStringInfo(buf, "create_index: rel %u/%u/%u",
							 ((RelFileNode *) rec)->spcNode,
							 ((RelFileNode *) rec)->dbNode,
							 ((RelFileNode *) rec)->relNode);
			break;
		case XLOG_GIST_INSERT_COMPLETE:
			appendStringInfo(buf, "complete_insert: rel %u/%u/%u",
							 ((gistxlogInsertComplete *) rec)->node.spcNode,
							 ((gistxlogInsertComplete *) rec)->node.dbNode,
							 ((gistxlogInsertComplete *) rec)->node.relNode);
			break;
		default:
			appendStringInfo(buf, "unknown gist op code %u", info);
			break;
	}
}

IndexTuple
gist_form_invalid_tuple(BlockNumber blkno)
{
	/*
	 * we don't alloc space for null's bitmap, this is invalid tuple, be
	 * carefull in read and write code
	 */
	Size		size = IndexInfoFindDataOffset(0);
	IndexTuple	tuple = (IndexTuple) palloc0(size);

	tuple->t_info |= size;

	ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
	GistTupleSetInvalid(tuple);

	return tuple;
}


static void
gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
{
	GISTInsertStack *top;

	insert->pathlen = 0;
	insert->path = NULL;

	if ((top = gistFindPath(index, insert->origblkno)) != NULL)
	{
		int			i;
		GISTInsertStack *ptr;

		for (ptr = top; ptr; ptr = ptr->parent)
			insert->pathlen++;

		insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);

		i = 0;
		for (ptr = top; ptr; ptr = ptr->parent)
			insert->path[i++] = ptr->blkno;
	}
	else
		elog(ERROR, "lost parent for block %u", insert->origblkno);
}

static SplitedPageLayout *
gistMakePageLayout(Buffer *buffers, int nbuffers)
{
	SplitedPageLayout *res = NULL,
			   *resptr;

	while (nbuffers-- > 0)
	{
		Page		page = BufferGetPage(buffers[nbuffers]);
		IndexTuple *vec;
		int			veclen;

		resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout));

		resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]);
		resptr->block.num = PageGetMaxOffsetNumber(page);

		vec = gistextractpage(page, &veclen);
		resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist));

		resptr->next = res;
		res = resptr;
	}

	return res;
}

/*
 * Continue insert after crash.  In normal situations, there aren't any
 * incomplete inserts, but if a crash occurs partway through an insertion
 * sequence, we'll need to finish making the index valid at the end of WAL
 * replay.
 *
 * Note that we assume the index is now in a valid state, except for the
 * unfinished insertion.  In particular it's safe to invoke gistFindPath();
 * there shouldn't be any garbage pages for it to run into.
 *
 * To complete insert we can't use basic insertion algorithm because
 * during insertion we can't call user-defined support functions of opclass.
 * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
 * 'invalid' tuple should be updated by vacuum full.
 */
static void
gistContinueInsert(gistIncompleteInsert *insert)
{
	IndexTuple *itup;
	int			i,
				lenitup;
	Relation	index;

	index = XLogOpenRelation(insert->node);

	/*
	 * needed vector itup never will be more than initial lenblkno+2, because
	 * during this processing Indextuple can be only smaller
	 */
	lenitup = insert->lenblk;
	itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));

	for (i = 0; i < insert->lenblk; i++)
		itup[i] = gist_form_invalid_tuple(insert->blkno[i]);

	/*
	 * any insertion of itup[] should make LOG message about
	 */

	if (insert->origblkno == GIST_ROOT_BLKNO)
	{
		/*
		 * it was split root, so we should only make new root. it can't be
		 * simple insert into root, we should replace all content of root.
		 */
		Buffer		buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);

		gistnewroot(index, buffer, itup, lenitup, NULL);
		UnlockReleaseBuffer(buffer);
	}
	else
	{
		Buffer	   *buffers;
		Page	   *pages;
		int			numbuffer;
		OffsetNumber *todelete;

		/* construct path */
		gistxlogFindPath(index, insert);

		Assert(insert->pathlen > 0);

		buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
		pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
		todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));

		for (i = 0; i < insert->pathlen; i++)
		{
			int			j,
						k,
						pituplen = 0;
			XLogRecData *rdata;
			XLogRecPtr	recptr;
			Buffer		tempbuffer = InvalidBuffer;
			int			ntodelete = 0;

			numbuffer = 1;
			buffers[0] = ReadBuffer(index, insert->path[i]);
			LockBuffer(buffers[0], GIST_EXCLUSIVE);

			/*
			 * we check buffer, because we restored page earlier
			 */
			gistcheckpage(index, buffers[0]);

			pages[0] = BufferGetPage(buffers[0]);
			Assert(!GistPageIsLeaf(pages[0]));

			pituplen = PageGetMaxOffsetNumber(pages[0]);

			/* find remove old IndexTuples to remove */
			for (j = 0; j < pituplen && ntodelete < lenitup; j++)
			{
				BlockNumber blkno;
				ItemId		iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
				IndexTuple	idxtup = (IndexTuple) PageGetItem(pages[0], iid);

				blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));

				for (k = 0; k < lenitup; k++)
					if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
					{
						todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
						ntodelete++;
						break;
					}
			}

			if (ntodelete == 0)
				elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");

			/*
			 * we check space with subtraction only first tuple to delete,
			 * hope, that wiil be enough space....
			 */

			if (gistnospace(pages[0], itup, lenitup, *todelete, 0))
			{

				/* no space left on page, so we must split */
				buffers[numbuffer] = ReadBuffer(index, P_NEW);
				LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
				GISTInitBuffer(buffers[numbuffer], 0);
				pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
				gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber);
				numbuffer++;

				if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
				{
					Buffer		tmp;

					/*
					 * we split root, just copy content from root to new page
					 */

					/* sanity check */
					if (i + 1 != insert->pathlen)
						elog(PANIC, "unexpected pathlen in index \"%s\"",
							 RelationGetRelationName(index));

					/* fill new page, root will be changed later */
					tempbuffer = ReadBuffer(index, P_NEW);
					LockBuffer(tempbuffer, GIST_EXCLUSIVE);
					memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer));

					/* swap buffers[0] (was root) and temp buffer */
					tmp = buffers[0];
					buffers[0] = tempbuffer;
					tempbuffer = tmp;	/* now in tempbuffer GIST_ROOT_BLKNO,
										 * it is still unchanged */

					pages[0] = BufferGetPage(buffers[0]);
				}

				START_CRIT_SECTION();

				for (j = 0; j < ntodelete; j++)
					PageIndexTupleDelete(pages[0], todelete[j]);

				rdata = formSplitRdata(index->rd_node, insert->path[i],
									   false, &(insert->key),
									 gistMakePageLayout(buffers, numbuffer));

			}
			else
			{
				START_CRIT_SECTION();

				for (j = 0; j < ntodelete; j++)
					PageIndexTupleDelete(pages[0], todelete[j]);
				gistfillbuffer(index, pages[0], itup, lenitup, InvalidOffsetNumber);

				rdata = formUpdateRdata(index->rd_node, buffers[0],
										todelete, ntodelete,
										itup, lenitup, &(insert->key));
			}

			/*
			 * use insert->key as mark for completion of insert (form*Rdata()
			 * above) for following possible replays
			 */

			/* write pages, we should mark it dirty befor XLogInsert() */
			for (j = 0; j < numbuffer; j++)
			{
				GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
				MarkBufferDirty(buffers[j]);
			}
			recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
			for (j = 0; j < numbuffer; j++)
			{
				PageSetLSN(pages[j], recptr);
				PageSetTLI(pages[j], ThisTimeLineID);
			}

			END_CRIT_SECTION();

			lenitup = numbuffer;
			for (j = 0; j < numbuffer; j++)
			{
				itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
				UnlockReleaseBuffer(buffers[j]);
			}

			if (tempbuffer != InvalidBuffer)
			{
				/*
				 * it was a root split, so fill it by new values
				 */
				gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
				UnlockReleaseBuffer(tempbuffer);
			}
		}
	}

	ereport(LOG,
			(errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
			insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
		   errdetail("Incomplete insertion detected during crash replay.")));
}

void
gist_xlog_startup(void)
{
	incomplete_inserts = NIL;
	insertCtx = AllocSetContextCreate(CurrentMemoryContext,
									  "GiST recovery temporary context",
									  ALLOCSET_DEFAULT_MINSIZE,
									  ALLOCSET_DEFAULT_INITSIZE,
									  ALLOCSET_DEFAULT_MAXSIZE);
	opCtx = createTempGistContext();
}

void
gist_xlog_cleanup(void)
{
	ListCell   *l;
	MemoryContext oldCxt;

	oldCxt = MemoryContextSwitchTo(opCtx);

	foreach(l, incomplete_inserts)
	{
		gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);

		gistContinueInsert(insert);
		MemoryContextReset(opCtx);
	}
	MemoryContextSwitchTo(oldCxt);

	MemoryContextDelete(opCtx);
	MemoryContextDelete(insertCtx);
}

bool
gist_safe_restartpoint(void)
{
	if (incomplete_inserts)
		return false;
	return true;
}


XLogRecData *
formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
			   ItemPointer key, SplitedPageLayout *dist)
{
	XLogRecData *rdata;
	gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
	SplitedPageLayout *ptr;
	int			npage = 0,
				cur = 1;

	ptr = dist;
	while (ptr)
	{
		npage++;
		ptr = ptr->next;
	}

	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));

	xlrec->node = node;
	xlrec->origblkno = blkno;
	xlrec->origleaf = page_is_leaf;
	xlrec->npage = (uint16) npage;
	if (key)
		xlrec->key = *key;
	else
		ItemPointerSetInvalid(&(xlrec->key));

	rdata[0].buffer = InvalidBuffer;
	rdata[0].data = (char *) xlrec;
	rdata[0].len = sizeof(gistxlogPageSplit);
	rdata[0].next = NULL;

	ptr = dist;
	while (ptr)
	{
		rdata[cur].buffer = InvalidBuffer;
		rdata[cur].data = (char *) &(ptr->block);
		rdata[cur].len = sizeof(gistxlogPage);
		rdata[cur - 1].next = &(rdata[cur]);
		cur++;

		rdata[cur].buffer = InvalidBuffer;
		rdata[cur].data = (char *) (ptr->list);
		rdata[cur].len = ptr->lenlist;
		rdata[cur - 1].next = &(rdata[cur]);
		rdata[cur].next = NULL;
		cur++;
		ptr = ptr->next;
	}

	return rdata;
}

/*
 * Construct the rdata array for an XLOG record describing a page update
 * (deletion and/or insertion of tuples on a single index page).
 *
 * Note that both the todelete array and the tuples are marked as belonging
 * to the target buffer; they need not be stored in XLOG if XLogInsert decides
 * to log the whole buffer contents instead.  Also, we take care that there's
 * at least one rdata item referencing the buffer, even when ntodelete and
 * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
 */
XLogRecData *
formUpdateRdata(RelFileNode node, Buffer buffer,
				OffsetNumber *todelete, int ntodelete,
				IndexTuple *itup, int ituplen, ItemPointer key)
{
	XLogRecData *rdata;
	gistxlogPageUpdate *xlrec;
	int			cur,
				i;

	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
	xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));

	xlrec->node = node;
	xlrec->blkno = BufferGetBlockNumber(buffer);
	xlrec->ntodelete = ntodelete;

	if (key)
		xlrec->key = *key;
	else
		ItemPointerSetInvalid(&(xlrec->key));

	rdata[0].buffer = buffer;
	rdata[0].buffer_std = true;
	rdata[0].data = NULL;
	rdata[0].len = 0;
	rdata[0].next = &(rdata[1]);

	rdata[1].data = (char *) xlrec;
	rdata[1].len = sizeof(gistxlogPageUpdate);
	rdata[1].buffer = InvalidBuffer;
	rdata[1].next = &(rdata[2]);

	rdata[2].data = (char *) todelete;
	rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
	rdata[2].buffer = buffer;
	rdata[2].buffer_std = true;
	rdata[2].next = NULL;

	/* new tuples */
	cur = 3;
	for (i = 0; i < ituplen; i++)
	{
		rdata[cur - 1].next = &(rdata[cur]);
		rdata[cur].data = (char *) (itup[i]);
		rdata[cur].len = IndexTupleSize(itup[i]);
		rdata[cur].buffer = buffer;
		rdata[cur].buffer_std = true;
		rdata[cur].next = NULL;
		cur++;
	}

	return rdata;
}

XLogRecPtr
gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len)
{
	gistxlogInsertComplete xlrec;
	XLogRecData rdata[2];
	XLogRecPtr	recptr;

	Assert(len > 0);
	xlrec.node = node;

	rdata[0].buffer = InvalidBuffer;
	rdata[0].data = (char *) &xlrec;
	rdata[0].len = sizeof(gistxlogInsertComplete);
	rdata[0].next = &(rdata[1]);

	rdata[1].buffer = InvalidBuffer;
	rdata[1].data = (char *) keys;
	rdata[1].len = sizeof(ItemPointerData) * len;
	rdata[1].next = NULL;

	START_CRIT_SECTION();

	recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);

	END_CRIT_SECTION();

	return recptr;
}