From e3ba543525c8be7d8e6f3304fdda1b74d60b14ee Mon Sep 17 00:00:00 2001
From: "Vadim B. Mikheev" <vadim4o@yahoo.com>
Date: Sun, 29 Oct 2000 18:33:41 +0000
Subject: [PATCH] WAL fixes.

---
 src/backend/access/heap/heapam.c   | 136 ++++++++++++++++++++---------
 src/backend/access/nbtree/nbtree.c |  17 ++--
 src/backend/access/transam/xact.c  |   4 +-
 3 files changed, 109 insertions(+), 48 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 3a43f3f772d..e796762ef16 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.91 2000/10/24 09:56:07 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.92 2000/10/29 18:33:39 vadim Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -2057,6 +2057,48 @@ log_heap_move(Relation reln, ItemPointerData from, HeapTuple newtup)
 	return(log_heap_update(reln, from, newtup, true));
 }
 
+static void
+_heap_cleanup_page_(Page page)
+{
+	OffsetNumber	maxoff = PageGetMaxOffsetNumber(page);
+	OffsetNumber	offnum;
+	ItemId			lp;
+	HeapTupleHeader	htup;
+
+	for (offnum = FirstOffsetNumber;
+		 offnum <= maxoff;
+		 offnum = OffsetNumberNext(offnum))
+	{
+		lp = PageGetItemId(page, offnum);
+
+		if (!ItemIdIsUsed(lp))
+			continue;
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		if (!HeapTupleSatisfiesNow(htup))
+			lp->lp_flags &= ~LP_USED;
+	}
+
+	PageRepairFragmentation(page);
+
+}
+
+static OffsetNumber
+_heap_add_tuple_(Page page, HeapTupleHeader htup, uint32 len, OffsetNumber offnum)
+{
+	ItemId	lp = PageGetItemId(page, offnum);
+
+	if (len > PageGetFreeSpace(page) || 
+			lp->lp_flags & LP_USED || lp->lp_len != 0)
+		_heap_cleanup_page_(page);
+
+	offnum = PageAddItem(page, (Item)htup, len, offnum, 
+							LP_USED | OverwritePageMode);
+
+	return(offnum);
+}
+
 static void
 heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
 {
@@ -2097,24 +2139,18 @@ heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
 		elog(STOP, "heap_delete_undo: bad page LSN");
 
 	offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
-	lp = PageGetItemId(page, offnum);
+	if (PageGetMaxOffsetNumber(page) >= offnum)
+		lp = PageGetItemId(page, offnum);
 
-	if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
+	/* page removed by vacuum ? */
+	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
 	{
-		if (redo)
-			elog(STOP, "heap_delete_redo: unused/deleted target tuple");
-		if (!InRecovery)
-			elog(STOP, "heap_delete_undo: unused/deleted target tuple in rollback");
-		if (ItemIdDeleted(lp))
-		{
-			lp->lp_flags &= ~LP_USED;
-			PageRepairFragmentation(page);
-			UnlockAndWriteBuffer(buffer);
-		}
-		else
-			UnlockAndReleaseBuffer(buffer);
+		PageSetLSN(page, lsn);
+		PageSetSUI(page, ThisStartUpID);
+		UnlockAndWriteBuffer(buffer);
 		return;
 	}
+
 	htup = (HeapTupleHeader) PageGetItem(page, lp);
 
 	if (redo)
@@ -2189,6 +2225,16 @@ heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
 			return;
 		}
 
+		offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+		/* page removed by vacuum ? */
+		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+		{
+			PageSetLSN(page, lsn);
+			PageSetSUI(page, ThisStartUpID);
+			UnlockAndWriteBuffer(buffer);
+			return;
+		}
+
 		memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits), 
 			(char*)xlrec + SizeOfHeapInsert, newlen);
 		newlen += offsetof(HeapTupleHeaderData, t_bits);
@@ -2200,9 +2246,7 @@ heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
 		htup->t_xmax = htup->t_cmax = 0;
 		htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask;
 		
-		offnum = PageAddItem(page, (Item)htup, newlen, 
-			ItemPointerGetOffsetNumber(&(xlrec->target.tid)), 
-			LP_USED | OverwritePageMode);
+		offnum = _heap_add_tuple_(page, htup, newlen, offnum);
 		if (offnum == InvalidOffsetNumber)
 			elog(STOP, "heap_insert_redo: failed to add tuple");
 		PageSetLSN(page, lsn);
@@ -2258,6 +2302,9 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
 	xl_heap_update *xlrec = (xl_heap_update*) XLogRecGetData(record);
 	Relation		reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
 	Buffer			buffer;
+	bool			samepage = 
+		(ItemPointerGetBlockNumber(&(xlrec->newtid)) == 
+		ItemPointerGetBlockNumber(&(xlrec->target.tid)));
 	Page			page;
 	OffsetNumber	offnum;
 	ItemId			lp;
@@ -2266,13 +2313,6 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
 	if (!RelationIsValid(reln))
 		return;
 
-	/* 
-	 * Currently UPDATE is DELETE + INSERT and so code below are near
-	 * exact sum of code in heap_xlog_delete & heap_xlog_insert. We could
-	 * re-structure code better, but keeping in mind upcoming overwriting
-	 * smgr separate heap_xlog_update code seems to be Good Thing.
-	 */
-
 	/* Deal with old tuple version */
 
 	buffer = XLogReadBuffer(false, reln, 
@@ -2283,6 +2323,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
 	page = (Page) BufferGetPage(buffer);
 	if (PageIsNew((PageHeader) page))
 	{
+		if (samepage)
+			goto newsame;
 		PageInit(page, BufferGetPageSize(buffer), 0);
 		PageSetLSN(page, lsn);
 		PageSetSUI(page, ThisStartUpID);
@@ -2295,6 +2337,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
 		if (XLByteLE(lsn, PageGetLSN(page)))	/* changes are applied */
 		{
 			UnlockAndReleaseBuffer(buffer);
+			if (samepage)
+				return;
 			goto newt;
 		}
 	}
@@ -2302,22 +2346,17 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
 		elog(STOP, "heap_update_undo: bad old tuple page LSN");
 
 	offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
-	lp = PageGetItemId(page, offnum);
+	if (PageGetMaxOffsetNumber(page) >= offnum)
+		lp = PageGetItemId(page, offnum);
 
-	if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
+	/* page removed by vacuum ? */
+	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
 	{
-		if (redo)
-			elog(STOP, "heap_update_redo: unused/deleted old tuple");
-		if (!InRecovery)
-			elog(STOP, "heap_update_undo: unused/deleted old tuple in rollback");
-		if (ItemIdDeleted(lp))
-		{
-			lp->lp_flags &= ~LP_USED;
-			PageRepairFragmentation(page);
-			UnlockAndWriteBuffer(buffer);
-		}
-		else
-			UnlockAndReleaseBuffer(buffer);
+		if (samepage)
+			goto newsame;
+		PageSetLSN(page, lsn);
+		PageSetSUI(page, ThisStartUpID);
+		UnlockAndWriteBuffer(buffer);
 		goto newt;
 	}
 	htup = (HeapTupleHeader) PageGetItem(page, lp);
@@ -2338,6 +2377,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
 			htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
 								HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
 		}
+		if (samepage)
+			goto newsame;
 		PageSetLSN(page, lsn);
 		PageSetSUI(page, ThisStartUpID);
 		UnlockAndWriteBuffer(buffer);
@@ -2377,6 +2418,8 @@ newt:;
 		return;
 
 	page = (Page) BufferGetPage(buffer);
+
+newsame:;
 	if (PageIsNew((PageHeader) page))
 	{
 		PageInit(page, BufferGetPageSize(buffer), 0);
@@ -2401,6 +2444,16 @@ newt:;
 			return;
 		}
 
+		offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
+		/* page removed by vacuum ? */
+		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+		{
+			PageSetLSN(page, lsn);
+			PageSetSUI(page, ThisStartUpID);
+			UnlockAndWriteBuffer(buffer);
+			return;
+		}
+
 		hsize = SizeOfHeapUpdate;
 		if (move)
 			hsize += sizeof(TransactionId);
@@ -2431,9 +2484,8 @@ newt:;
 			htup->t_infomask = HEAP_XMAX_INVALID | xlrec->mask;
 		}
 		
-		offnum = PageAddItem(page, (Item)htup, newlen, 
-			ItemPointerGetOffsetNumber(&(xlrec->newtid)), 
-			LP_USED | OverwritePageMode);
+		offnum = _heap_add_tuple_(page, htup, newlen, 
+			ItemPointerGetOffsetNumber(&(xlrec->newtid)));
 		if (offnum == InvalidOffsetNumber)
 			elog(STOP, "heap_update_redo: failed to add tuple");
 		PageSetLSN(page, lsn);
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 7c715496735..3dbbc78b035 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -12,7 +12,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.67 2000/10/21 15:43:18 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.68 2000/10/29 18:33:40 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -787,6 +787,7 @@ _bt_add_item(Page page, OffsetNumber offno,
 	if (PageAddItem(page, (Item) item, size, offno,	
 			LP_USED) == InvalidOffsetNumber)
 	{
+#ifdef NOT_USED		/* it's not valid code currently */
 		/* ops, not enough space - try to deleted dead tuples */
 		bool		result;
 
@@ -795,6 +796,7 @@ _bt_add_item(Page page, OffsetNumber offno,
 		result = _bt_cleanup_page(page, hnode);
 		if (!result || PageAddItem(page, (Item) item, size, offno,	
 				LP_USED) == InvalidOffsetNumber)
+#endif
 			return(false);
 	}
 
@@ -868,7 +870,7 @@ _bt_fix_left_page(Page page, XLogRecord *record, bool onleft)
 					(sizeof(BTItemData) - sizeof(IndexTupleData));
 		itemsz = MAXALIGN(itemsz);
 
-		if (item + itemsz < (char*)record + record->xl_len)
+		if (item + itemsz < (char*)xlrec + record->xl_len)
 		{
 			previtem = item;
 			item += itemsz;
@@ -1173,6 +1175,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
 			else
 				pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
 
+			pageop->btpo_flags &= ~BTP_ROOT;
+
 			PageSetLSN(page, lsn);
 			PageSetSUI(page, ThisStartUpID);
 			UnlockAndWriteBuffer(buffer);
@@ -1245,7 +1249,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
 			}
 
 			for (item = (char*)xlrec + hsize;
-					item < (char*)record + record->xl_len; )
+					item < (char*)xlrec + record->xl_len; )
 			{
 				memcpy(&btdata, item, sizeof(BTItemData));
 				itemsz = IndexTupleDSize(btdata.bti_itup) +
@@ -1283,7 +1287,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
 
 			item = (char*)xlrec + SizeOfBtreeSplit +
 					sizeof(CommandId) + sizeof(RelFileNode);
-			for (cnt = 0; item < (char*)record + record->xl_len; )
+			for (cnt = 0; item < (char*)xlrec + record->xl_len; )
 			{
 				BTItem	btitem = (BTItem)
 					(tbuf + cnt * (MAXALIGN(sizeof(BTItemData))));
@@ -1306,6 +1310,9 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
 
 	/* Right (next) page */
 	blkno = BlockIdGetBlockNumber(&(xlrec->rightblk));
+	if (blkno == P_NONE)
+		return;
+
 	buffer = XLogReadBuffer(false, reln, blkno);
 	if (!BufferIsValid(buffer))
 		elog(STOP, "btree_split_%s: lost next right page", op);
@@ -1385,7 +1392,7 @@ btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
 			char	   *item;
 
 			for (item = (char*)xlrec + SizeOfBtreeNewroot;
-					item < (char*)record + record->xl_len; )
+					item < (char*)xlrec + record->xl_len; )
 			{
 				memcpy(&btdata, item, sizeof(BTItemData));
 				itemsz = IndexTupleDSize(btdata.bti_itup) +
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 6040b262b90..28d3555d438 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.78 2000/10/28 16:20:53 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.79 2000/10/29 18:33:41 vadim Exp $
  *
  * NOTES
  *		Transaction aborts can now occur two ways:
@@ -1807,8 +1807,10 @@ xact_desc(char *buf, uint8 xl_info, char* rec)
 void
 XactPushRollback(void (*func) (void *), void* data)
 {
+#ifdef XLOG_II
 	if (_RollbackFunc != NULL)
 		elog(STOP, "XactPushRollback: already installed");
+#endif
 
 	_RollbackFunc = func;
 	_RollbackData = data;
-- 
GitLab