From bf5ccf382c606b09e17dbcefc101d62417c4db33 Mon Sep 17 00:00:00 2001
From: Teodor Sigaev <teodor@sigaev.ru>
Date: Mon, 29 Oct 2007 19:26:57 +0000
Subject: [PATCH] - Add check of already changed page while replay WAL. This
 touches only ginRedoInsert(), because other ginRedo* functions rewrite whole
 page or make changes which could be applied several times without
 consistent's loss

- Remove check of identifying of corresponding split record:
it's possible that replaying of WAL starts after actual page split, but before
removing of that split from incomplete splits list. In this case, that check
cause FATAL error.

Per stress test which reproduces bug reported by Craig McElroy
<craig.mcelroy@contegix.com>
---
 src/backend/access/gin/ginxlog.c | 112 +++++++++++++++++--------------
 1 file changed, 60 insertions(+), 52 deletions(-)

diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index bf2174c37c7..79fbb496b54 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *			 $PostgreSQL: pgsql/src/backend/access/gin/ginxlog.c,v 1.9 2007/09/20 17:56:30 tgl Exp $
+ *			 $PostgreSQL: pgsql/src/backend/access/gin/ginxlog.c,v 1.10 2007/10/29 19:26:57 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -53,7 +53,6 @@ static void
 forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
 {
 	ListCell   *l;
-	bool		found = false;
 
 	foreach(l, incomplete_splits)
 	{
@@ -62,16 +61,9 @@ forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updat
 		if (RelFileNodeEquals(node, split->node) && leftBlkno == split->leftBlkno && updateBlkno == split->rightBlkno)
 		{
 			incomplete_splits = list_delete_ptr(incomplete_splits, split);
-			found = true;
 			break;
 		}
 	}
-
-	if (!found)
-	{
-		elog(ERROR, "failed to identify corresponding split record for %u/%u/%u",
-					node.relNode, leftBlkno, updateBlkno);
-	}
 }
 
 static void
@@ -129,7 +121,7 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 	Buffer		buffer;
 	Page		page;
 
-	/* nothing else to do if page was backed up (and no info to do it with) */
+	/* nothing else to do if page was backed up */
 	if (record->xl_info & XLR_BKP_BLOCK_1)
 		return;
 
@@ -143,37 +135,44 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 		Assert(data->isDelete == FALSE);
 		Assert(GinPageIsData(page));
 
-		if (data->isLeaf)
+		if ( ! XLByteLE(lsn, PageGetLSN(page)) )
 		{
-			OffsetNumber i;
-			ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+			if (data->isLeaf)
+			{
+				OffsetNumber i;
+				ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
 
-			Assert(GinPageIsLeaf(page));
-			Assert(data->updateBlkno == InvalidBlockNumber);
+				Assert(GinPageIsLeaf(page));
+				Assert(data->updateBlkno == InvalidBlockNumber);
 
-			for (i = 0; i < data->nitem; i++)
-				GinDataPageAddItem(page, items + i, data->offset + i);
-		}
-		else
-		{
-			PostingItem *pitem;
+				for (i = 0; i < data->nitem; i++)
+					GinDataPageAddItem(page, items + i, data->offset + i);
+			}
+			else
+			{
+				PostingItem *pitem;
 
-			Assert(!GinPageIsLeaf(page));
+				Assert(!GinPageIsLeaf(page));
 
-			if (data->updateBlkno != InvalidBlockNumber)
-			{
-				/* update link to right page after split */
-				pitem = (PostingItem *) GinDataPageGetItem(page, data->offset);
-				PostingItemSetBlockNumber(pitem, data->updateBlkno);
-			}
+				if (data->updateBlkno != InvalidBlockNumber)
+				{
+					/* update link to right page after split */
+					pitem = (PostingItem *) GinDataPageGetItem(page, data->offset);
+					PostingItemSetBlockNumber(pitem, data->updateBlkno);
+				}
 
-			pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+				pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
 
-			GinDataPageAddItem(page, pitem, data->offset);
+				GinDataPageAddItem(page, pitem, data->offset);
+			}
+		}
 
-			if (data->updateBlkno != InvalidBlockNumber)
-				forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
+		if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+		{
+			PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+			forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
 		}
+
 	}
 	else
 	{
@@ -181,36 +180,45 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 
 		Assert(!GinPageIsData(page));
 
-		if (data->updateBlkno != InvalidBlockNumber)
+		if ( ! XLByteLE(lsn, PageGetLSN(page)) )
 		{
-			/* update link to right page after split */
-			Assert(!GinPageIsLeaf(page));
-			Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
-			itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
-			ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber);
-		}
+			if (data->updateBlkno != InvalidBlockNumber)
+			{
+				/* update link to right page after split */
+				Assert(!GinPageIsLeaf(page));
+				Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
+				itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
+				ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber);
+			}
 
-		if (data->isDelete)
-		{
-			Assert(GinPageIsLeaf(page));
-			Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
-			PageIndexTupleDelete(page, data->offset);
-		}
+			if (data->isDelete)
+			{
+				Assert(GinPageIsLeaf(page));
+				Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
+				PageIndexTupleDelete(page, data->offset);
+			}
 
-		itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+			itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
 
-		if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
-			elog(ERROR, "failed to add item to index page in %u/%u/%u",
-				 data->node.spcNode, data->node.dbNode, data->node.relNode);
+			if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
+				elog(ERROR, "failed to add item to index page in %u/%u/%u",
+					 data->node.spcNode, data->node.dbNode, data->node.relNode);
+		}
 
 		if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+		{
+			itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
 			forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
+		}
 	}
 
-	PageSetLSN(page, lsn);
-	PageSetTLI(page, ThisTimeLineID);
+	if ( ! XLByteLE(lsn, PageGetLSN(page)) ) 
+	{
+		PageSetLSN(page, lsn);
+		PageSetTLI(page, ThisTimeLineID);
 
-	MarkBufferDirty(buffer);
+		MarkBufferDirty(buffer);
+	}
 	UnlockReleaseBuffer(buffer);
 }
 
-- 
GitLab