From deee783052892fc92711de95066f58a129906099 Mon Sep 17 00:00:00 2001
From: "Vadim B. Mikheev" <vadim4o@yahoo.com>
Date: Fri, 13 Oct 2000 12:05:22 +0000
Subject: [PATCH] WAL

---
 src/backend/access/heap/heapam.c      |   8 +-
 src/backend/access/nbtree/nbtinsert.c |  18 ++-
 src/backend/access/nbtree/nbtree.c    | 169 ++++++++++++++++++++++++--
 src/backend/access/transam/Makefile   |   4 +-
 src/include/access/nbtree.h           |  11 +-
 5 files changed, 187 insertions(+), 23 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 3e1de33bfe4..794ba977cc1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.87 2000/10/13 02:02:59 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.88 2000/10/13 12:05:20 vadim Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -2198,7 +2198,8 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
 	HeapTupleHeader	htup = (HeapTupleHeader) PageGetItem(page, lp);
 
 	/* is it our tuple ? */
-	if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
+	if (PageGetSUI(page) != ThisStartUpID || 
+		htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
 	{
 		if (!InRecovery)
 			elog(STOP, "heap_insert_undo: invalid target tuple in rollback");
@@ -2394,7 +2395,8 @@ newt:;
 	htup = (HeapTupleHeader) PageGetItem(page, lp);
 
 	/* is it our tuple ? */
-	if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
+	if (PageGetSUI(page) != ThisStartUpID || 
+		htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
 	{
 		if (!InRecovery)
 			elog(STOP, "heap_update_undo: invalid new tuple in rollback");
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index c72b8ca3df6..0105739b5cd 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.65 2000/10/13 02:03:00 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.66 2000/10/13 12:05:20 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -612,6 +612,10 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
 	OffsetNumber maxoff;
 	OffsetNumber i;
 
+#ifdef XLOG
+	BTItem		lhikey;
+#endif
+
 	rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
 	origpage = BufferGetPage(buf);
 	leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
@@ -680,6 +684,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
 		itemsz = ItemIdGetLength(itemid);
 		item = (BTItem) PageGetItem(origpage, itemid);
 	}
+#ifdef XLOG
+	lhikey = item;
+#endif
 	if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
 					LP_USED) == InvalidOffsetNumber)
 		elog(STOP, "btree: failed to add hikey to the left sibling");
@@ -793,12 +800,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
 			memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
 			hsize += sizeof(RelFileNode);
 		}
+		else
+		{
+			Size	itemsz = IndexTupleDSize(lhikey->bti_itup) + 
+						(sizeof(BTItemData) - sizeof(IndexTupleData));
+			memcpy(xlbuf + hsize, (char*) lhikey, itemsz);
+			hsize += itemsz;
+		}
 		if (newitemonleft)
 		{
 			/*
 			 * Read comments in _bt_pgaddtup.
 			 * Actually, seems that in non-leaf splits newitem shouldn't
-			 * go to first data key position.
+			 * go to first data key position on left page.
 			 */
 			if (! P_ISLEAF(lopaque) && itup_off == P_FIRSTDATAKEY(lopaque))
 			{
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 1064c2bb107..a57bac1c810 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -12,7 +12,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.64 2000/10/13 02:03:00 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.65 2000/10/13 12:05:20 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -837,7 +837,7 @@ static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
 					ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
 					(char*)xlrec + hsize,
 					record->xl_len - hsize,
-					&hnode))
+					hnode))
 				elog(STOP, "btree_insert_redo: failed to add item");
 
 			PageSetLSN(page, lsn);
@@ -908,7 +908,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
 		else
 		{
 			/* Delete items related to new right sibling */
-			_bt_thin_left_page(page, record);
+			_bt_fix_left_page(page, record, onleft);
 
 			if (onleft)
 			{
@@ -924,6 +924,13 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
 					memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + 
 								sizeof(CommandId), sizeof(RelFileNode));
 				}
+				else
+				{
+					memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
+					itemsz = IndexTupleDSize(btdata.bti_itup) +
+								(sizeof(BTItemData) - sizeof(IndexTupleData));
+					hsize += itemsz;
+				}
 
 				memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
 				itemsz = IndexTupleDSize(btdata.bti_itup) +
@@ -933,7 +940,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
 						ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
 						(char*)xlrec + hsize,
 						itemsz,
-						&hnode))
+						hnode))
 					elog(STOP, "btree_split_redo: failed to add item");
 			}
 			else
@@ -994,6 +1001,13 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
 				pageop->btpo_flags |= BTP_LEAF;
 				hsize += (sizeof(CommandId) + sizeof(RelFileNode));
 			}
+			else
+			{
+				memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
+				itemsz = IndexTupleDSize(btdata.bti_itup) +
+							(sizeof(BTItemData) - sizeof(IndexTupleData));
+				hsize += itemsz;
+			}
 			if (onleft)		/* skip target item */
 			{
 				memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
@@ -1198,17 +1212,28 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
 {
 	char		   *xlrec = (char*) XLogRecGetData(record);
 	Page			page = (Page) BufferGetPage(buffer);
-	BTPageOpaque	pageop;
+	BTPageOpaque	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
 	BlockNumber		blkno;
 	OffsetNumber	offno;
 	ItemId			lp;
+	BTItem			item;
 
 	for ( ; ; )
 	{
-		offno = _bt_find_btitem(page, btitem);
-		if (offno != InvalidOffsetNumber)
+		OffsetNumber	maxoff = PageGetMaxOffsetNumber(page);
+
+		for (offno = P_FIRSTDATAKEY(pageop);
+			 offno <= maxoff;
+			 offno = OffsetNumberNext(offno))
+		{
+			lp = PageGetItemId(page, offno);
+			item = (BTItem) PageGetItem(page, lp);
+			if (BTItemSame(item, btitem))
+				break;
+		}
+		if (offno <= maxoff)
 			break;
-		pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+		offno = InvalidOffsetNumber;
 		if (P_RIGHTMOST(pageop))
 			break;
 		blkno = pageop->btpo_next;
@@ -1221,6 +1246,7 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
 		if (PageIsNew((PageHeader) page))
 			elog(STOP, "btree_%s_undo: uninitialized right sibling",
 				(insert) ? "insert" : "split");
+		pageop = (BTPageOpaque) PageGetSpecialPointer(page);
 		if (XLByteLT(PageGetLSN(page), lsn))
 			break;
 	}
@@ -1250,9 +1276,9 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
 
 		memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
 		memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
-		result = XLogCheckHeapTuple(hnode, &(btitem->bti_itup.t_tid),
+		result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid),
 					record->xl_xid, cid);
-		if (result <= 0)	/* no tuple or not owner */
+		if (result < 0)	/* not owner */
 		{
 			UnlockAndReleaseBuffer(buffer);
 			return;
@@ -1278,7 +1304,7 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
 
 static bool
 _bt_add_item(Page page, OffsetNumber offno, 
-	char* item, Size size, RelFileNode* hnode)
+	char* item, Size size, RelFileNode hnode)
 {
 	BTPageOpaque	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
 
@@ -1309,4 +1335,125 @@ _bt_add_item(Page page, OffsetNumber offno,
 	return(true);
 }
 
+static bool
+_bt_cleanup_page(Page page, RelFileNode hnode)
+{
+	OffsetNumber	maxoff = PageGetMaxOffsetNumber(page);
+	OffsetNumber	offno;
+	ItemId			lp;
+	BTItem			item;
+	bool			result = false;
+
+	for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; )
+	{
+		lp = PageGetItemId(page, offno);
+		item = (BTItem) PageGetItem(page, lp);
+		if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid))
+			offno = OffsetNumberNext(offno);
+		else
+		{
+			PageIndexTupleDelete(page, offno);
+			maxoff = PageGetMaxOffsetNumber(page);
+			result = true;
+		}
+	}
+
+	return(result);
+}
+
+/*
+ * Remove from left sibling items belonging to right sibling
+ * and change P_HIKEY
+ */
+static void
+_bt_fix_left_page(Page page, XLogRecord *record, bool onleft)
+{
+	char		   *xlrec = (char*) XLogRecGetData(record);
+	BTPageOpaque	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+	Size			hsize = SizeOfBtreeSplit;
+	RelFileNode		hnode;
+	BTItemData		btdata;
+	OffsetNumber	maxoff = PageGetMaxOffsetNumber(page);
+	OffsetNumber	offno;
+	char		   *item;
+	Size			itemsz;
+	char		   *previtem = NULL;
+	char		   *lhikey = NULL;
+	Size			lhisize = 0;
+
+	if (pageop->btpo_flags & BTP_LEAF)
+	{
+		hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+		memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + 
+					sizeof(CommandId), sizeof(RelFileNode));
+	}
+	else
+	{
+		lhikey = (char*)xlrec + hsize;
+		memcpy(&btdata, lhikey, sizeof(BTItemData));
+		lhisize = IndexTupleDSize(btdata.bti_itup) +
+					(sizeof(BTItemData) - sizeof(IndexTupleData));
+		hsize += lhisize;
+	}
+
+	if (! P_RIGHTMOST(pageop))
+		PageIndexTupleDelete(page, P_HIKEY);
+
+	if (onleft)		/* skip target item */
+	{
+		memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
+		itemsz = IndexTupleDSize(btdata.bti_itup) +
+					(sizeof(BTItemData) - sizeof(IndexTupleData));
+		hsize += itemsz;
+	}
+
+	for (item = (char*)xlrec + hsize; ; )
+	{
+		memcpy(&btdata, item, sizeof(BTItemData));
+		for (offno = P_FIRSTDATAKEY(pageop);
+			 offno <= maxoff;
+			 offno = OffsetNumberNext(offno))
+		{
+			ItemId	lp = PageGetItemId(page, offno);
+			BTItem	btitem = (BTItem) PageGetItem(page, lp);
+
+			if (BTItemSame(&btdata, btitem))
+			{
+				PageIndexTupleDelete(page, offno);
+				break;
+			}
+		}
+
+		itemsz = IndexTupleDSize(btdata.bti_itup) +
+					(sizeof(BTItemData) - sizeof(IndexTupleData));
+		itemsz = MAXALIGN(itemsz);
+
+		if (item + itemsz < (char*)record + record->xl_len)
+		{
+			previtem = item;
+			item += itemsz;
+		}
+		else
+			break;
+	}
+
+	/* time to insert hi-key */
+	if (pageop->btpo_flags & BTP_LEAF)
+	{
+		lhikey = (P_RIGHTMOST(pageop)) ? item : previtem;
+		memcpy(&btdata, lhikey, sizeof(BTItemData));
+		lhisize = IndexTupleDSize(btdata.bti_itup) +
+					(sizeof(BTItemData) - sizeof(IndexTupleData));
+	}
+
+	if (! _bt_add_item(page, 
+			P_HIKEY,
+			lhikey,
+			lhisize,
+			&hnode))
+		elog(STOP, "btree_split_redo: failed to add hi key to left sibling");
+
+	return;
+}
+
 #endif
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 303936f4553..272a883c521 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -4,7 +4,7 @@
 #    Makefile for access/transam
 #
 # IDENTIFICATION
-#    $Header: /cvsroot/pgsql/src/backend/access/transam/Makefile,v 1.12 2000/08/31 16:09:46 petere Exp $
+#    $Header: /cvsroot/pgsql/src/backend/access/transam/Makefile,v 1.13 2000/10/13 12:05:21 vadim Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = transam.o transsup.o varsup.o xact.o xid.o xlog.o rmgr.o
+OBJS = transam.o transsup.o varsup.o xact.o xid.o xlog.o xlogutils.o rmgr.o
 
 all: SUBSYS.o
 
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 4ca61e0c630..02a0bb6f5d8 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: nbtree.h,v 1.44 2000/10/13 02:03:02 vadim Exp $
+ * $Id: nbtree.h,v 1.45 2000/10/13 12:05:22 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -259,8 +259,9 @@ typedef struct xl_btree_insert
 
 /* 
  * This is what we need to know about insert with split - 
- * 22 + [4+8] + [btitem] + right sibling btitems. Note that we need in
- * CommandID and HeapNode (4 + 8 bytes) only for leaf page insert.
+ * 22 + {4 + 8 | left hi-key} + [btitem] + right sibling btitems. Note that
+ * we need in CommandID and HeapNode (4 + 8 bytes) for leaf pages
+ * and in left page hi-key for non-leaf ones.
  */
 typedef struct xl_btree_split
 {
@@ -271,8 +272,8 @@ typedef struct xl_btree_split
 	/* 
 	 * We log all btitems from the right sibling. If new btitem goes on
 	 * the left sibling then we log it too and it will be the first
-	 * BTItemData at the end of this struct, but after (for the leaf
-	 * pages) CommandId and HeapNode.
+	 * BTItemData at the end of this struct after CommandId and HeapNode
+	 * on the leaf pages and left page hi-key on non-leaf ones.
 	 */
 } xl_btree_split;
 
-- 
GitLab