From 5b0740d3fcd55f6e545e8bd577fe8ccba2be4987 Mon Sep 17 00:00:00 2001
From: "Vadim B. Mikheev" <vadim4o@yahoo.com>
Date: Sat, 28 Oct 2000 16:21:00 +0000
Subject: [PATCH] WAL

---
 src/backend/access/transam/transsup.c      |   10 +-
 src/backend/access/transam/varsup.c        |    6 +-
 src/backend/access/transam/xact.c          |   13 +-
 src/backend/access/transam/xlog.c          |   83 +-
 src/backend/access/transam/xlogutils.c     |  106 +-
 src/backend/commands/dbcommands.c          |    5 +-
 src/backend/commands/vacuum.c              |    7 +-
 src/backend/storage/buffer/bufmgr.c        |   11 +-
 src/backend/storage/buffer/localbuf.c      |   20 +-
 src/backend/storage/buffer/xlog_bufmgr.c   | 2205 ++++++++++++++++++++
 src/backend/storage/buffer/xlog_localbuf.c |  274 +++
 src/backend/storage/file/fd.c              |    4 +-
 src/backend/storage/smgr/md.c              |   71 +-
 src/backend/storage/smgr/smgr.c            |  112 +-
 src/backend/utils/cache/relcache.c         |   57 +-
 src/backend/utils/init/postinit.c          |    9 +-
 src/include/access/transam.h               |   13 +-
 src/include/access/xact.h                  |    4 +-
 src/include/access/xlog.h                  |   14 +-
 src/include/access/xlogdefs.h              |   24 +
 src/include/access/xlogutils.h             |    4 +-
 src/include/storage/buf_internals.h        |    6 +-
 src/include/storage/bufmgr.h               |    9 +-
 src/include/storage/bufpage.h              |    5 +-
 src/include/storage/smgr.h                 |   27 +-
 25 files changed, 2828 insertions(+), 271 deletions(-)
 create mode 100644 src/backend/storage/buffer/xlog_bufmgr.c
 create mode 100644 src/backend/storage/buffer/xlog_localbuf.c
 create mode 100644 src/include/access/xlogdefs.h

diff --git a/src/backend/access/transam/transsup.c b/src/backend/access/transam/transsup.c
index d219f8b6841..74e8c39eae0 100644
--- a/src/backend/access/transam/transsup.c
+++ b/src/backend/access/transam/transsup.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/transam/Attic/transsup.c,v 1.25 2000/01/26 05:56:04 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/transam/Attic/transsup.c,v 1.26 2000/10/28 16:20:53 vadim Exp $
  *
  * NOTES
  *	  This file contains support functions for the high
@@ -186,6 +186,10 @@ TransBlockGetXidStatus(Block tblock,
 	bits8		bit2;
 	BitIndex	offset;
 
+#ifdef XLOG
+	tblock = (Block) ((char*) tblock + sizeof(XLogRecPtr));
+#endif
+
 	/* ----------------
 	 *	calculate the index into the transaction data where
 	 *	our transaction status is located
@@ -227,6 +231,10 @@ TransBlockSetXidStatus(Block tblock,
 	Index		index;
 	BitIndex	offset;
 
+#ifdef XLOG
+	tblock = (Block) ((char*) tblock + sizeof(XLogRecPtr));
+#endif
+
 	/* ----------------
 	 *	calculate the index into the transaction data where
 	 *	we sould store our transaction status.
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 029da1d72ca..49c82b55700 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/transam/varsup.c,v 1.29 2000/07/25 20:18:19 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/transam/varsup.c,v 1.30 2000/10/28 16:20:53 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -125,7 +125,11 @@ VariableRelationPutNextXid(TransactionId xid)
 
 	TransactionIdStore(xid, &(var->nextXidData));
 
+#ifdef XLOG
+	WriteBuffer(buf);	/* temp */
+#else
 	FlushBuffer(buf, TRUE);
+#endif
 }
 
 /* --------------------------------
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index a0476d97cff..6040b262b90 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.77 2000/10/24 20:06:39 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.78 2000/10/28 16:20:53 vadim Exp $
  *
  * NOTES
  *		Transaction aborts can now occur two ways:
@@ -176,6 +176,8 @@
 
 extern bool SharedBufferChanged;
 
+void RecordTransactionCommit(void);
+
 static void AbortTransaction(void);
 static void AtAbort_Cache(void);
 static void AtAbort_Locks(void);
@@ -191,7 +193,6 @@ static void AtStart_Memory(void);
 static void CleanupTransaction(void);
 static void CommitTransaction(void);
 static void RecordTransactionAbort(void);
-static void RecordTransactionCommit(void);
 static void StartTransaction(void);
 
 /* ----------------
@@ -220,7 +221,7 @@ int			XactIsoLevel;
 #ifdef XLOG
 #include "access/xlogutils.h"
 
-int			CommitDelay = 100;
+int			CommitDelay = 5;	/* 1/200 sec */
 
 void		xact_redo(XLogRecPtr lsn, XLogRecord *record);
 void		xact_undo(XLogRecPtr lsn, XLogRecord *record);
@@ -658,8 +659,8 @@ AtStart_Memory(void)
  *			  -cim 3/18/90
  * --------------------------------
  */
-static void
-RecordTransactionCommit(void)
+void
+RecordTransactionCommit()
 {
 	TransactionId xid;
 	int			leak;
@@ -683,6 +684,8 @@ RecordTransactionCommit(void)
 		struct timeval	delay;
 		XLogRecPtr		recptr;
 
+		BufmgrCommit();
+
 		xlrec.xtime = time(NULL);
 		/*
 		 * MUST SAVE ARRAY OF RELFILENODE-s TO DROP
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1343743c096..aa952a42ab4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.21 2000/10/24 09:56:09 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.22 2000/10/28 16:20:54 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -220,6 +220,8 @@ static uint32 readOff = 0;
 static char readBuf[BLCKSZ];
 static XLogRecord *nextRecord = NULL;
 
+static bool InRedo = false;
+
 XLogRecPtr
 XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
 {
@@ -481,6 +483,19 @@ XLogFlush(XLogRecPtr record)
 	unsigned	i = 0;
 	bool		force_lgwr = false;
 
+	if (XLOG_DEBUG)
+	{
+		fprintf(stderr, "XLogFlush%s%s: rqst %u/%u; wrt %u/%u; flsh %u/%u\n",
+			(IsBootstrapProcessingMode()) ? "(bootstrap)" : "",
+			(InRedo) ? "(redo)" : "",
+			record.xlogid, record.xrecoff,
+			LgwrResult.Write.xlogid, LgwrResult.Write.xrecoff,
+			LgwrResult.Flush.xlogid, LgwrResult.Flush.xrecoff);
+		fflush(stderr);
+	}
+
+	if (IsBootstrapProcessingMode() || InRedo)
+		return;
 	if (XLByteLE(record, LgwrResult.Flush))
 		return;
 	WriteRqst = LgwrRqst.Write;
@@ -894,7 +909,7 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer)
 	record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
 
 got_record:;
-	if (record->xl_len == 0 || record->xl_len >
+	if (record->xl_len >
 		(BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord))
 	{
 		elog(emode, "ReadRecord: invalid record len %u in (%u, %u)",
@@ -1259,7 +1274,6 @@ StartupXLOG()
 				LastRec;
 	XLogRecord *record;
 	char		buffer[MAXLOGRECSZ + SizeOfXLogRecord];
-	int			recovery = 0;
 	bool		sie_saved = false;
 
 #endif
@@ -1380,16 +1394,15 @@ StartupXLOG()
 			elog(STOP, "Invalid Redo/Undo record in shutdown checkpoint");
 		if (ControlFile->state == DB_SHUTDOWNED)
 			elog(STOP, "Invalid Redo/Undo record in Shutdowned state");
-		recovery = 1;
+		InRecovery = true;
 	}
 	else if (ControlFile->state != DB_SHUTDOWNED)
 	{
-		if (checkPoint.Shutdown)
-			elog(STOP, "Invalid state in control file");
-		recovery = 1;
+		InRecovery = true;
 	}
 
-	if (recovery)
+	/* REDO */
+	if (InRecovery)
 	{
 		elog(LOG, "The DataBase system was not properly shut down\n"
 			 "\tAutomatic recovery is in progress...");
@@ -1401,6 +1414,7 @@ StartupXLOG()
 		StopIfError = true;
 
 		XLogOpenLogRelation();	/* open pg_log */
+		XLogInitRelationCache();
 
 		/* Is REDO required ? */
 		if (XLByteLT(checkPoint.redo, RecPtr))
@@ -1409,9 +1423,9 @@ StartupXLOG()
 /* read past CheckPoint record */
 			record = ReadRecord(NULL, buffer);
 
-		/* REDO */
 		if (record->xl_len != 0)
 		{
+			InRedo = true;
 			elog(LOG, "Redo starts at (%u, %u)",
 				 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
 			do
@@ -1441,12 +1455,40 @@ StartupXLOG()
 			elog(LOG, "Redo done at (%u, %u)",
 				 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
 			LastRec = ReadRecPtr;
+			InRedo = false;
 		}
 		else
 			elog(LOG, "Redo is not required");
+	}
+
+	/* Init xlog buffer cache */
+	record = ReadRecord(&LastRec, buffer);
+	logId = EndRecPtr.xlogid;
+	logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
+	logOff = 0;
+	logFile = XLogFileOpen(logId, logSeg, false);
+	XLogCtl->xlblocks[0].xlogid = logId;
+	XLogCtl->xlblocks[0].xrecoff =
+		((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
+	Insert = &XLogCtl->Insert;
+	memcpy((char *) (Insert->currpage), readBuf, BLCKSZ);
+	Insert->currpos = ((char *) Insert->currpage) +
+		(EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
+	Insert->PrevRecord = LastRec;
+
+	LgwrRqst.Write = LgwrRqst.Flush =
+	LgwrResult.Write = LgwrResult.Flush = EndRecPtr;
+
+	XLogCtl->Write.LgwrResult = LgwrResult;
+	Insert->LgwrResult = LgwrResult;
+
+	XLogCtl->LgwrRqst = LgwrRqst;
+	XLogCtl->LgwrResult = LgwrResult;
 
 #ifdef NOT_USED
-		/* UNDO */
+	/* UNDO */
+	if (InRecovery)
+	{
 		RecPtr = ReadRecPtr;
 		if (XLByteLT(checkPoint.undo, RecPtr))
 		{
@@ -1465,29 +1507,16 @@ StartupXLOG()
 		}
 		else
 			elog(LOG, "Undo is not required");
-#endif
 	}
+#endif
 
-	/* Init xlog buffer cache */
-	record = ReadRecord(&LastRec, buffer);
-	logId = EndRecPtr.xlogid;
-	logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
-	logOff = 0;
-	logFile = XLogFileOpen(logId, logSeg, false);
-	XLogCtl->xlblocks[0].xlogid = logId;
-	XLogCtl->xlblocks[0].xrecoff =
-		((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
-	Insert = &XLogCtl->Insert;
-	memcpy((char *) (Insert->currpage), readBuf, BLCKSZ);
-	Insert->currpos = ((char *) Insert->currpage) +
-		(EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
-	Insert->PrevRecord = ControlFile->checkPoint;
-
-	if (recovery)
+	if (InRecovery)
 	{
 		CreateCheckPoint(true);
 		StopIfError = sie_saved;
+		XLogCloseRelationCache();
 	}
+	InRecovery = false;
 
 #endif	 /* XLOG */
 
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 2800ff0316f..3d15033b940 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -22,6 +22,7 @@
 #include "access/htup.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_database.h"
+#include "lib/hasht.h"
 
 /*
  * ---------------------------------------------------------------
@@ -240,32 +241,10 @@ static int					_xlcnt = 0;
 #define	_XLOG_INITRELCACHESIZE	32
 #define	_XLOG_MAXRELCACHESIZE	512
 
-void
-XLogCloseRelationCache(void)
-{
-	int i;
-
-	if (!_xlrelarr)
-		return;
-
-	for (i = 1; i < _xlast; i++)
-	{
-		Relation	reln = &(_xlrelarr[i].reldata);
-		if (reln->rd_fd >= 0)
-			smgrclose(DEFAULT_SMGR, reln);
-	}
-
-	free(_xlrelarr);
-	free(_xlpgcarr);
-
-	hash_destroy(_xlrelcache);
-	_xlrelarr = NULL;
-}
-
 static void
 _xl_init_rel_cache(void)
 {
-	HASHCTL	ctl;
+	HASHCTL			ctl;
 
 	_xlcnt = _XLOG_INITRELCACHESIZE;
 	_xlast = 0;
@@ -286,6 +265,35 @@ _xl_init_rel_cache(void)
 								HASH_ELEM | HASH_FUNCTION);
 }
 
+static void
+_xl_remove_hash_entry(XLogRelDesc **edata, int dummy)
+{
+	XLogRelCacheEntry	   *hentry;
+	bool					found;
+	XLogRelDesc			   *rdesc = *edata;
+	Form_pg_class			tpgc = rdesc->reldata.rd_rel;
+
+	rdesc->lessRecently->moreRecently = rdesc->moreRecently;
+	rdesc->moreRecently->lessRecently = rdesc->lessRecently;
+
+	hentry = (XLogRelCacheEntry*) hash_search(_xlrelcache, 
+		(char*)&(rdesc->reldata.rd_node), HASH_REMOVE, &found);
+
+	if (hentry == NULL)
+		elog(STOP, "_xl_remove_hash_entry: can't delete from cache");
+	if (!found)
+		elog(STOP, "_xl_remove_hash_entry: file was not found in cache");
+
+	if (rdesc->reldata.rd_fd >= 0)
+		smgrclose(DEFAULT_SMGR, &(rdesc->reldata));
+
+	memset(rdesc, 0, sizeof(XLogRelDesc));
+	memset(tpgc, 0, sizeof(FormData_pg_class));
+	rdesc->reldata.rd_rel = tpgc;
+
+	return;
+}
+
 static XLogRelDesc*
 _xl_new_reldesc(void)
 {
@@ -310,32 +318,41 @@ _xl_new_reldesc(void)
 	}
 	else /* reuse */
 	{
-		XLogRelCacheEntry	   *hentry;
-		bool					found;
-		XLogRelDesc			   *res = _xlrelarr[0].moreRecently;
-		Form_pg_class			tpgc = res->reldata.rd_rel;
+		XLogRelDesc	   *res = _xlrelarr[0].moreRecently;
 
-		res->lessRecently->moreRecently = res->moreRecently;
-		res->moreRecently->lessRecently = res->lessRecently;
+		_xl_remove_hash_entry(&res, 0);
 
-		hentry = (XLogRelCacheEntry*) hash_search(_xlrelcache, 
-			(char*)&(res->reldata.rd_node), HASH_REMOVE, &found);
+		_xlast--;
+		return(res);
+	}
+}
 
-		if (hentry == NULL)
-			elog(STOP, "XLogOpenRelation: can't delete from cache");
-		if (!found)
-			elog(STOP, "XLogOpenRelation: file was not found in cache");
+extern void CreateDummyCaches(void);
+extern void DestroyDummyCaches(void);
 
-		if (res->reldata.rd_fd >= 0)
-			smgrclose(DEFAULT_SMGR, &(res->reldata));
+void
+XLogInitRelationCache(void)
+{
+	CreateDummyCaches();
+	_xl_init_rel_cache();
+}
 
-		memset(res, 0, sizeof(XLogRelDesc));
-		memset(tpgc, 0, sizeof(FormData_pg_class));
-		res->reldata.rd_rel = tpgc;
+void
+XLogCloseRelationCache(void)
+{
 
-		_xlast--;
-		return(res);
-	}
+	DestroyDummyCaches();
+
+	if (!_xlrelarr)
+		return;
+
+	HashTableWalk(_xlrelcache, (HashtFunc)_xl_remove_hash_entry, 0);
+	hash_destroy(_xlrelcache);
+
+	free(_xlrelarr);
+	free(_xlpgcarr);
+
+	_xlrelarr = NULL;
 }
 
 Relation
@@ -345,9 +362,6 @@ XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode)
 	XLogRelCacheEntry	   *hentry;
 	bool					found;
 
-	if (!_xlrelarr)
-		_xl_init_rel_cache();
-
 	hentry = (XLogRelCacheEntry*) 
 			hash_search(_xlrelcache, (char*)&rnode, HASH_FIND, &found);
 
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index d68033d8975..802e6867015 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.62 2000/10/22 17:55:36 pjw Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.63 2000/10/28 16:20:54 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -193,6 +193,9 @@ createdb(const char *dbname, const char *dbpath, int encoding)
 			elog(ERROR, "CREATE DATABASE: Could not initialize database directory. Delete failed as well");
 	}
 
+#ifdef XLOG
+	BufferSync();
+#endif
 }
 
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 0905f60b807..3976cb1ab50 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.170 2000/10/24 09:56:15 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.171 2000/10/28 16:20:54 vadim Exp $
  *
 
  *-------------------------------------------------------------------------
@@ -1787,7 +1787,9 @@ failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
 
 	if (num_moved > 0)
 	{
-
+#ifdef XLOG
+		RecordTransactionCommit();
+#else
 		/*
 		 * We have to commit our tuple' movings before we'll truncate
 		 * relation, but we shouldn't lose our locks. And so - quick hack:
@@ -1797,6 +1799,7 @@ failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
 		FlushBufferPool();
 		TransactionIdCommit(myXID);
 		FlushBufferPool();
+#endif
 	}
 
 	/*
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index c0a320986ce..9c9bda5035c 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.91 2000/10/23 04:10:06 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.92 2000/10/28 16:20:55 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -42,6 +42,13 @@
  *		freelist.c -- chooses victim for buffer replacement
  *		buf_table.c -- manages the buffer lookup table
  */
+
+#ifdef XLOG
+
+#include "xlog_bufmgr.c"
+
+#else
+
 #include <sys/types.h>
 #include <sys/file.h>
 #include <math.h>
@@ -2512,3 +2519,5 @@ MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc)(Buffer))
 	SpinRelease(BufMgrLock);
 	return;
 }
+
+#endif	/* ! XLOG */
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 1d6a416e48e..faa3304b4f6 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -16,10 +16,17 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.32 2000/10/23 04:10:06 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.33 2000/10/28 16:20:56 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
+   
+#ifdef XLOG
+
+#include "xlog_localbuf.c"
+
+#else
+
 #include <sys/types.h>
 #include <sys/file.h>
 #include <math.h>
@@ -247,10 +254,11 @@ InitLocalBuffer(void)
 }
 
 /*
- * LocalBufferSync -
- *	  flush all dirty buffers in the local buffer cache. Since the buffer
- *	  cache is only used for keeping relations visible during a transaction,
- *	  we will not need these buffers again.
+ * LocalBufferSync
+ *
+ * Flush all dirty buffers in the local buffer cache at commit time.
+ * Since the buffer cache is only used for keeping relations visible
+ * during a transaction, we will not need these buffers again.
  */
 void
 LocalBufferSync(void)
@@ -303,3 +311,5 @@ ResetLocalBufferPool(void)
 	MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
 	nextFreeLocalBuf = 0;
 }
+
+#endif	/* XLOG */
diff --git a/src/backend/storage/buffer/xlog_bufmgr.c b/src/backend/storage/buffer/xlog_bufmgr.c
new file mode 100644
index 00000000000..dcd377b7eb3
--- /dev/null
+++ b/src/backend/storage/buffer/xlog_bufmgr.c
@@ -0,0 +1,2205 @@
+/*-------------------------------------------------------------------------
+ *
+ * bufmgr.c
+ *	  buffer manager interface routines
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_bufmgr.c,v 1.1 2000/10/28 16:20:56 vadim Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+/*
+ *
+ * BufferAlloc() -- lookup a buffer in the buffer table.  If
+ *		it isn't there add it, but do not read data into memory.
+ *		This is used when we are about to reinitialize the
+ *		buffer so don't care what the current disk contents are.
+ *		BufferAlloc() also pins the new buffer in memory.
+ *
+ * ReadBuffer() -- like BufferAlloc() but reads the data
+ *		on a buffer cache miss.
+ *
+ * ReleaseBuffer() -- unpin the buffer
+ *
+ * WriteNoReleaseBuffer() -- mark the buffer contents as "dirty"
+ *		but don't unpin.  The disk IO is delayed until buffer
+ *		replacement.
+ *
+ * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer()
+ *
+ * BufferSync() -- flush all dirty buffers in the buffer pool.
+ *
+ * InitBufferPool() -- Init the buffer module.
+ *
+ * See other files:
+ *		freelist.c -- chooses victim for buffer replacement
+ *		buf_table.c -- manages the buffer lookup table
+ */
+#include <sys/types.h>
+#include <sys/file.h>
+#include <math.h>
+#include <signal.h>
+
+#include "postgres.h"
+#include "executor/execdebug.h"
+#include "miscadmin.h"
+#include "storage/s_lock.h"
+#include "storage/smgr.h"
+#include "utils/relcache.h"
+
+#ifdef XLOG
+#include "catalog/pg_database.h"
+#endif
+
+#define BufferGetLSN(bufHdr)	\
+	(*((XLogRecPtr*)MAKE_PTR((bufHdr)->data)))
+
+
+extern SPINLOCK BufMgrLock;
+extern long int ReadBufferCount;
+extern long int ReadLocalBufferCount;
+extern long int BufferHitCount;
+extern long int LocalBufferHitCount;
+extern long int BufferFlushCount;
+extern long int LocalBufferFlushCount;
+
+/*
+ * It's used to avoid disk writes for read-only transactions
+ * (i.e. when no one shared buffer was changed by transaction).
+ * We set it to true in WriteBuffer/WriteNoReleaseBuffer when
+ * marking shared buffer as dirty. We set it to false in xact.c
+ * after transaction is committed/aborted.
+ */
+bool		SharedBufferChanged = false;
+
+static void WaitIO(BufferDesc *buf, SPINLOCK spinlock);
+static void StartBufferIO(BufferDesc *buf, bool forInput);
+static void TerminateBufferIO(BufferDesc *buf);
+static void ContinueBufferIO(BufferDesc *buf, bool forInput);
+extern void AbortBufferIO(void);
+
+/*
+ * Macro : BUFFER_IS_BROKEN
+ *		Note that write error doesn't mean the buffer broken
+*/
+#define BUFFER_IS_BROKEN(buf) ((buf->flags & BM_IO_ERROR) && !(buf->flags & BM_DIRTY))
+
+#ifndef HAS_TEST_AND_SET
+static void SignalIO(BufferDesc *buf);
+extern long *NWaitIOBackendP;	/* defined in buf_init.c */
+
+#endif	 /* HAS_TEST_AND_SET */
+
+static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
+						 bool bufferLockHeld);
+static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
+			bool *foundPtr, bool bufferLockHeld);
+static int	BufferReplace(BufferDesc *bufHdr);
+void		PrintBufferDescs(void);
+
+/* ---------------------------------------------------
+ * RelationGetBufferWithBuffer
+ *		see if the given buffer is what we want
+ *		if yes, we don't need to bother the buffer manager
+ * ---------------------------------------------------
+ */
+Buffer
+RelationGetBufferWithBuffer(Relation relation,
+							BlockNumber blockNumber,
+							Buffer buffer)
+{
+	BufferDesc *bufHdr;
+
+	if (BufferIsValid(buffer))
+	{
+		if (!BufferIsLocal(buffer))
+		{
+			bufHdr = &BufferDescriptors[buffer - 1];
+			SpinAcquire(BufMgrLock);
+			if (bufHdr->tag.blockNum == blockNumber &&
+				RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
+			{
+				SpinRelease(BufMgrLock);
+				return buffer;
+			}
+			return ReadBufferWithBufferLock(relation, blockNumber, true);
+		}
+		else
+		{
+			bufHdr = &LocalBufferDescriptors[-buffer - 1];
+			if (bufHdr->tag.blockNum == blockNumber &&
+				RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
+				return buffer;
+		}
+	}
+	return ReadBuffer(relation, blockNumber);
+}
+
+/*
+ * ReadBuffer -- returns a buffer containing the requested
+ *		block of the requested relation.  If the blknum
+ *		requested is P_NEW, extend the relation file and
+ *		allocate a new block.
+ *
+ * Returns: the buffer number for the buffer containing
+ *		the block read or NULL on an error.
+ *
+ * Assume when this function is called, that reln has been
+ *		opened already.
+ */
+
+#undef ReadBuffer				/* conflicts with macro when BUFMGR_DEBUG
+								 * defined */
+
+/*
+ * ReadBuffer
+ *
+ */
+Buffer
+ReadBuffer(Relation reln, BlockNumber blockNum)
+{
+	return ReadBufferWithBufferLock(reln, blockNum, false);
+}
+
+/*
+ * ReadBufferWithBufferLock -- does the work of
+ *		ReadBuffer() but with the possibility that
+ *		the buffer lock has already been held. this
+ *		is yet another effort to reduce the number of
+ *		semops in the system.
+ */
+static Buffer
+ReadBufferWithBufferLock(Relation reln,
+						 BlockNumber blockNum,
+						 bool bufferLockHeld)
+{
+	BufferDesc *bufHdr;
+	int			extend;			/* extending the file by one block */
+	int			status;
+	bool		found;
+	bool		isLocalBuf;
+
+	extend = (blockNum == P_NEW);
+	isLocalBuf = reln->rd_myxactonly;
+
+	if (isLocalBuf)
+	{
+		ReadLocalBufferCount++;
+		bufHdr = LocalBufferAlloc(reln, blockNum, &found);
+		if (found)
+			LocalBufferHitCount++;
+	}
+	else
+	{
+		ReadBufferCount++;
+
+		/*
+		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested
+		 * block is not currently in memory.
+		 */
+		bufHdr = BufferAlloc(reln, blockNum, &found, bufferLockHeld);
+		if (found)
+			BufferHitCount++;
+	}
+
+	if (!bufHdr)
+		return InvalidBuffer;
+
+	/* if it's already in the buffer pool, we're done */
+	if (found)
+	{
+
+		/*
+		 * This happens when a bogus buffer was returned previously and is
+		 * floating around in the buffer pool.	A routine calling this
+		 * would want this extended.
+		 */
+		if (extend)
+		{
+			/* new buffers are zero-filled */
+			MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
+			smgrextend(DEFAULT_SMGR, reln,
+					   (char *) MAKE_PTR(bufHdr->data));
+		}
+		return BufferDescriptorGetBuffer(bufHdr);
+
+	}
+
+	/*
+	 * if we have gotten to this point, the reln pointer must be ok and
+	 * the relation file must be open.
+	 */
+	if (extend)
+	{
+		/* new buffers are zero-filled */
+		MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
+		status = smgrextend(DEFAULT_SMGR, reln,
+							(char *) MAKE_PTR(bufHdr->data));
+	}
+	else
+	{
+		status = smgrread(DEFAULT_SMGR, reln, blockNum,
+						  (char *) MAKE_PTR(bufHdr->data));
+	}
+
+	if (isLocalBuf)
+		return BufferDescriptorGetBuffer(bufHdr);
+
+	/* lock buffer manager again to update IO IN PROGRESS */
+	SpinAcquire(BufMgrLock);
+
+	if (status == SM_FAIL)
+	{
+		/* IO Failed.  cleanup the data structures and go home */
+
+		if (!BufTableDelete(bufHdr))
+		{
+			SpinRelease(BufMgrLock);
+			elog(FATAL, "BufRead: buffer table broken after IO error\n");
+		}
+		/* remember that BufferAlloc() pinned the buffer */
+		UnpinBuffer(bufHdr);
+
+		/*
+		 * Have to reset the flag so that anyone waiting for the buffer
+		 * can tell that the contents are invalid.
+		 */
+		bufHdr->flags |= BM_IO_ERROR;
+		bufHdr->flags &= ~BM_IO_IN_PROGRESS;
+	}
+	else
+	{
+		/* IO Succeeded.  clear the flags, finish buffer update */
+
+		bufHdr->flags &= ~(BM_IO_ERROR | BM_IO_IN_PROGRESS);
+	}
+
+	/* If anyone was waiting for IO to complete, wake them up now */
+	TerminateBufferIO(bufHdr);
+
+	SpinRelease(BufMgrLock);
+
+	if (status == SM_FAIL)
+		return InvalidBuffer;
+
+	return BufferDescriptorGetBuffer(bufHdr);
+}
+
+/*
+ * BufferAlloc -- Get a buffer from the buffer pool but dont
+ *		read it.
+ *
+ * Returns: descriptor for buffer
+ *
+ * When this routine returns, the BufMgrLock is guaranteed NOT be held.
+ */
+static BufferDesc *
+BufferAlloc(Relation reln,
+			BlockNumber blockNum,
+			bool *foundPtr,
+			bool bufferLockHeld)
+{
+	BufferDesc *buf,
+			   *buf2;
+	BufferTag	newTag;			/* identity of requested block */
+	bool		inProgress;		/* buffer undergoing IO */
+	bool		newblock = FALSE;
+
+	/* create a new tag so we can lookup the buffer */
+	/* assume that the relation is already open */
+	if (blockNum == P_NEW)
+	{
+		newblock = TRUE;
+		blockNum = smgrnblocks(DEFAULT_SMGR, reln);
+	}
+
+	INIT_BUFFERTAG(&newTag, reln, blockNum);
+
+	if (!bufferLockHeld)
+		SpinAcquire(BufMgrLock);
+
+	/* see if the block is in the buffer pool already */
+	buf = BufTableLookup(&newTag);
+	if (buf != NULL)
+	{
+
+		/*
+		 * Found it.  Now, (a) pin the buffer so no one steals it from the
+		 * buffer pool, (b) check IO_IN_PROGRESS, someone may be faulting
+		 * the buffer into the buffer pool.
+		 */
+
+		PinBuffer(buf);
+		inProgress = (buf->flags & BM_IO_IN_PROGRESS);
+
+		*foundPtr = TRUE;
+		if (inProgress)			/* confirm end of IO */
+		{
+			WaitIO(buf, BufMgrLock);
+			inProgress = (buf->flags & BM_IO_IN_PROGRESS);
+		}
+		if (BUFFER_IS_BROKEN(buf))
+		{
+
+			/*
+			 * I couldn't understand the following old comment. If there's
+			 * no IO for the buffer and the buffer is BROKEN,it should be
+			 * read again. So start a new buffer IO here.
+			 *
+			 * wierd race condition:
+			 *
+			 * We were waiting for someone else to read the buffer. While we
+			 * were waiting, the reader boof'd in some way, so the
+			 * contents of the buffer are still invalid.  By saying that
+			 * we didn't find it, we can make the caller reinitialize the
+			 * buffer.	If two processes are waiting for this block, both
+			 * will read the block.  The second one to finish may
+			 * overwrite any updates made by the first.  (Assume higher
+			 * level synchronization prevents this from happening).
+			 *
+			 * This is never going to happen, don't worry about it.
+			 */
+			*foundPtr = FALSE;
+		}
+#ifdef BMTRACE
+		_bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCFND);
+#endif	 /* BMTRACE */
+
+		if (!(*foundPtr))
+			StartBufferIO(buf, true);
+		SpinRelease(BufMgrLock);
+
+		return buf;
+	}
+
+	*foundPtr = FALSE;
+
+	/*
+	 * Didn't find it in the buffer pool.  We'll have to initialize a new
+	 * buffer.	First, grab one from the free list.  If it's dirty, flush
+	 * it to disk. Remember to unlock BufMgr spinlock while doing the IOs.
+	 */
+	inProgress = FALSE;
+	for (buf = (BufferDesc *) NULL; buf == (BufferDesc *) NULL;)
+	{
+		buf = GetFreeBuffer();
+
+		/* GetFreeBuffer will abort if it can't find a free buffer */
+		Assert(buf);
+
+		/*
+		 * There should be exactly one pin on the buffer after it is
+		 * allocated -- ours.  If it had a pin it wouldn't have been on
+		 * the free list.  No one else could have pinned it between
+		 * GetFreeBuffer and here because we have the BufMgrLock.
+		 */
+		Assert(buf->refcount == 0);
+		buf->refcount = 1;
+		PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1;
+
+		if (buf->flags & BM_DIRTY || buf->cntxDirty)
+		{
+			bool		smok;
+
+			/*
+			 *	skip write error buffers 
+			 */
+			if ((buf->flags & BM_IO_ERROR) != 0)
+			{
+				PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+				buf->refcount--;
+				buf = (BufferDesc *) NULL;
+				continue;
+			}
+			/*
+			 * Set BM_IO_IN_PROGRESS to keep anyone from doing anything
+			 * with the contents of the buffer while we write it out. We
+			 * don't really care if they try to read it, but if they can
+			 * complete a BufferAlloc on it they can then scribble into
+			 * it, and we'd really like to avoid that while we are
+			 * flushing the buffer.  Setting this flag should block them
+			 * in WaitIO until we're done.
+			 */
+			inProgress = TRUE;
+
+			/*
+			 * All code paths that acquire this lock pin the buffer first;
+			 * since no one had it pinned (it just came off the free
+			 * list), no one else can have this lock.
+			 */
+			StartBufferIO(buf, false);
+
+			/*
+			 * Write the buffer out, being careful to release BufMgrLock
+			 * before starting the I/O.
+			 */
+			smok = BufferReplace(buf);
+
+			if (smok == FALSE)
+			{
+				elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s",
+				buf->tag.blockNum, buf->blind.dbname, buf->blind.relname);
+				inProgress = FALSE;
+				buf->flags |= BM_IO_ERROR;
+				buf->flags &= ~BM_IO_IN_PROGRESS;
+				TerminateBufferIO(buf);
+				PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+				Assert(buf->refcount > 0);
+				buf->refcount--;
+				if (buf->refcount == 0)
+				{
+					AddBufferToFreelist(buf);
+					buf->flags |= BM_FREE;
+				}
+				buf = (BufferDesc *) NULL;
+			}
+			else
+			{
+				/*
+				 * BM_JUST_DIRTIED cleared by BufferReplace and shouldn't
+				 * be setted by anyone.		- vadim 01/17/97
+				 */
+				if (buf->flags & BM_JUST_DIRTIED)
+				{
+					elog(STOP, "BufferAlloc: content of block %u (%s) changed while flushing",
+						 buf->tag.blockNum, buf->blind.relname);
+				}
+				else
+					buf->flags &= ~BM_DIRTY;
+				buf->cntxDirty = false;
+			}
+
+			/*
+			 * Somebody could have pinned the buffer while we were doing
+			 * the I/O and had given up the BufMgrLock (though they would
+			 * be waiting for us to clear the BM_IO_IN_PROGRESS flag).
+			 * That's why this is a loop -- if so, we need to clear the
+			 * I/O flags, remove our pin and start all over again.
+			 *
+			 * People may be making buffers free at any time, so there's no
+			 * reason to think that we have an immediate disaster on our
+			 * hands.
+			 */
+			if (buf && buf->refcount > 1)
+			{
+				inProgress = FALSE;
+				buf->flags &= ~BM_IO_IN_PROGRESS;
+				TerminateBufferIO(buf);
+				PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+				buf->refcount--;
+				buf = (BufferDesc *) NULL;
+			}
+
+			/*
+			 * Somebody could have allocated another buffer for the same
+			 * block we are about to read in. (While we flush out the
+			 * dirty buffer, we don't hold the lock and someone could have
+			 * allocated another buffer for the same block. The problem is
+			 * we haven't gotten around to insert the new tag into the
+			 * buffer table. So we need to check here.		-ay 3/95
+			 */
+			buf2 = BufTableLookup(&newTag);
+			if (buf2 != NULL)
+			{
+
+				/*
+				 * Found it. Someone has already done what we're about to
+				 * do. We'll just handle this as if it were found in the
+				 * buffer pool in the first place.
+				 */
+				if (buf != NULL)
+				{
+					buf->flags &= ~BM_IO_IN_PROGRESS;
+					TerminateBufferIO(buf);
+					/* give up the buffer since we don't need it any more */
+					PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+					Assert(buf->refcount > 0);
+					buf->refcount--;
+					if (buf->refcount == 0)
+					{
+						AddBufferToFreelist(buf);
+						buf->flags |= BM_FREE;
+					}
+				}
+
+				PinBuffer(buf2);
+				inProgress = (buf2->flags & BM_IO_IN_PROGRESS);
+
+				*foundPtr = TRUE;
+				if (inProgress)
+				{
+					WaitIO(buf2, BufMgrLock);
+					inProgress = (buf2->flags & BM_IO_IN_PROGRESS);
+				}
+				if (BUFFER_IS_BROKEN(buf2))
+					*foundPtr = FALSE;
+
+				if (!(*foundPtr))
+					StartBufferIO(buf2, true);
+				SpinRelease(BufMgrLock);
+
+				return buf2;
+			}
+		}
+	}
+
+	/*
+	 * At this point we should have the sole pin on a non-dirty buffer and
+	 * we may or may not already have the BM_IO_IN_PROGRESS flag set.
+	 */
+
+	/*
+	 * Change the name of the buffer in the lookup table:
+	 *
+	 * Need to update the lookup table before the read starts. If someone
+	 * comes along looking for the buffer while we are reading it in, we
+	 * don't want them to allocate a new buffer.  For the same reason, we
+	 * didn't want to erase the buf table entry for the buffer we were
+	 * writing back until now, either.
+	 */
+
+	if (!BufTableDelete(buf))
+	{
+		SpinRelease(BufMgrLock);
+		elog(FATAL, "buffer wasn't in the buffer table\n");
+	}
+
+	/* record the database name and relation name for this buffer */
+	strcpy(buf->blind.dbname, (DatabaseName) ? DatabaseName : "Recovery");
+	strcpy(buf->blind.relname, RelationGetPhysicalRelationName(reln));
+
+	INIT_BUFFERTAG(&(buf->tag), reln, blockNum);
+	if (!BufTableInsert(buf))
+	{
+		SpinRelease(BufMgrLock);
+		elog(FATAL, "Buffer in lookup table twice \n");
+	}
+
+	/*
+	 * Buffer contents are currently invalid.  Have to mark IO IN PROGRESS
+	 * so no one fiddles with them until the read completes.  If this
+	 * routine has been called simply to allocate a buffer, no io will be
+	 * attempted, so the flag isnt set.
+	 */
+	if (!inProgress)
+		StartBufferIO(buf, true);
+	else
+		ContinueBufferIO(buf, true);
+
+#ifdef BMTRACE
+	_bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCNOTFND);
+#endif	 /* BMTRACE */
+
+	SpinRelease(BufMgrLock);
+
+	return buf;
+}
+
+/*
+ * WriteBuffer
+ *
+ *		Marks buffer contents as dirty (actual write happens later).
+ *
+ * Assume that buffer is pinned.  Assume that reln is
+ *		valid.
+ *
+ * Side Effects:
+ *		Pin count is decremented.
+ */
+
+#undef WriteBuffer
+
+int
+WriteBuffer(Buffer buffer)
+{
+	BufferDesc *bufHdr;
+
+	if (BufferIsLocal(buffer))
+		return WriteLocalBuffer(buffer, TRUE);
+
+	if (BAD_BUFFER_ID(buffer))
+		return FALSE;
+
+	bufHdr = &BufferDescriptors[buffer - 1];
+
+	SharedBufferChanged = true;
+
+	SpinAcquire(BufMgrLock);
+	Assert(bufHdr->refcount > 0);
+
+	bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+
+	UnpinBuffer(bufHdr);
+	SpinRelease(BufMgrLock);
+
+	return TRUE;
+}
+
+/*
+ * WriteNoReleaseBuffer -- like WriteBuffer, but do not unpin the buffer
+ *						   when the operation is complete.
+ */
+int
+WriteNoReleaseBuffer(Buffer buffer)
+{
+	BufferDesc *bufHdr;
+
+	if (BufferIsLocal(buffer))
+		return WriteLocalBuffer(buffer, FALSE);
+
+	if (BAD_BUFFER_ID(buffer))
+		return STATUS_ERROR;
+
+	bufHdr = &BufferDescriptors[buffer - 1];
+
+	SharedBufferChanged = true;
+
+	SpinAcquire(BufMgrLock);
+	Assert(bufHdr->refcount > 0);
+
+	bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+
+	SpinRelease(BufMgrLock);
+
+	return STATUS_OK;
+}
+
+
+#undef ReleaseAndReadBuffer
+/*
+ * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
+ *		so that only one semop needs to be called.
+ *
+ */
+Buffer
+ReleaseAndReadBuffer(Buffer buffer,
+					 Relation relation,
+					 BlockNumber blockNum)
+{
+	BufferDesc *bufHdr;
+	Buffer		retbuf;
+
+	if (BufferIsLocal(buffer))
+	{
+		Assert(LocalRefCount[-buffer - 1] > 0);
+		LocalRefCount[-buffer - 1]--;
+	}
+	else
+	{
+		if (BufferIsValid(buffer))
+		{
+			bufHdr = &BufferDescriptors[buffer - 1];
+			Assert(PrivateRefCount[buffer - 1] > 0);
+			PrivateRefCount[buffer - 1]--;
+			if (PrivateRefCount[buffer - 1] == 0)
+			{
+				SpinAcquire(BufMgrLock);
+				Assert(bufHdr->refcount > 0);
+				bufHdr->refcount--;
+				if (bufHdr->refcount == 0)
+				{
+					AddBufferToFreelist(bufHdr);
+					bufHdr->flags |= BM_FREE;
+				}
+				retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
+				return retbuf;
+			}
+		}
+	}
+
+	return ReadBuffer(relation, blockNum);
+}
+
+/*
+ * BufferSync -- Write all dirty buffers in the pool.
+ *
+ * This is called at checkpoint time and write out all dirty buffers.
+ */
+void
+BufferSync()
+{
+	int			i;
+	BufferDesc *bufHdr;
+	Buffer		buffer;
+	int			status;
+	RelFileNode	rnode;
+	XLogRecPtr	recptr;
+	Relation	reln = NULL;
+
+	for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)
+	{
+
+		SpinAcquire(BufMgrLock);
+
+		if (!(bufHdr->flags & BM_VALID))
+		{
+			SpinRelease(BufMgrLock);
+			continue;
+		}
+
+		/*
+		 * Pin buffer and ensure that no one reads it from disk
+		 */
+		PinBuffer(bufHdr);
+		/* Synchronize with BufferAlloc */
+		if (bufHdr->flags & BM_IO_IN_PROGRESS)
+			WaitIO(bufHdr, BufMgrLock);
+
+		buffer = BufferDescriptorGetBuffer(bufHdr);
+		rnode = bufHdr->tag.rnode;
+
+		SpinRelease(BufMgrLock);
+
+		/*
+		 * Try to find relation for buffer
+		 */
+		reln = RelationNodeCacheGetRelation(rnode);
+
+		/*
+		 * Protect buffer content against concurrent update
+		 */
+		LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+		/*
+		 * Force XLOG flush for buffer' LSN
+		 */
+		recptr = BufferGetLSN(bufHdr);
+		XLogFlush(recptr);
+
+		/*
+		 * Now it's safe to write buffer to disk
+		 * (if needed at all -:))
+		 */
+
+		SpinAcquire(BufMgrLock);
+		if (bufHdr->flags & BM_IO_IN_PROGRESS)
+			WaitIO(bufHdr, BufMgrLock);
+
+		if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+		{
+			bufHdr->flags &= ~BM_JUST_DIRTIED;
+			StartBufferIO(bufHdr, false);		/* output IO start */
+
+			SpinRelease(BufMgrLock);
+
+			if (reln == (Relation) NULL)
+			{
+				status = smgrblindwrt(DEFAULT_SMGR,
+									bufHdr->tag.rnode,
+									bufHdr->tag.blockNum,
+									(char *) MAKE_PTR(bufHdr->data),
+									true);	/* must fsync */
+			}
+			else
+			{
+				status = smgrwrite(DEFAULT_SMGR, reln,
+								bufHdr->tag.blockNum,
+								(char *) MAKE_PTR(bufHdr->data));
+			}
+
+			if (status == SM_FAIL)	/* disk failure ?! */
+				elog(STOP, "BufferSync: cannot write %u for %s",
+					 bufHdr->tag.blockNum, bufHdr->blind.relname);
+
+			/*
+			 * Note that it's safe to change cntxDirty here because of
+			 * we protect it from upper writers by share lock and from
+			 * other bufmgr routines by BM_IO_IN_PROGRESS
+			 */
+			bufHdr->cntxDirty = false;
+
+			/*
+			 * Release the per-buffer readlock, reacquire BufMgrLock.
+			 */
+			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+			BufferFlushCount++;
+
+			SpinAcquire(BufMgrLock);
+
+			bufHdr->flags &= ~BM_IO_IN_PROGRESS;	/* mark IO finished */
+			TerminateBufferIO(bufHdr);				/* Sync IO finished */
+
+			/*
+			 * If this buffer was marked by someone as DIRTY while
+			 * we were flushing it out we must not clear DIRTY
+			 * flag - vadim 01/17/97
+			 */
+			if (!(bufHdr->flags & BM_JUST_DIRTIED))
+				bufHdr->flags &= ~BM_DIRTY;
+		}
+		else
+			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+		UnpinBuffer(bufHdr);
+
+		SpinRelease(BufMgrLock);
+
+		/* drop refcnt obtained by RelationIdCacheGetRelation */
+		if (reln != (Relation) NULL)
+		{
+			RelationDecrementReferenceCount(reln);
+			reln = NULL;
+		}
+	}
+
+}
+
+/*
+ * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
+ *
+ * Should be entered with buffer manager spinlock held; releases it before
+ * waiting and re-acquires it afterwards.
+ *
+ * OLD NOTES:
+ *		Because IO_IN_PROGRESS conflicts are
+ *		expected to be rare, there is only one BufferIO
+ *		lock in the entire system.	All processes block
+ *		on this semaphore when they try to use a buffer
+ *		that someone else is faulting in.  Whenever a
+ *		process finishes an IO and someone is waiting for
+ *		the buffer, BufferIO is signaled (SignalIO).  All
+ *		waiting processes then wake up and check to see
+ *		if their buffer is now ready.  This implementation
+ *		is simple, but efficient enough if WaitIO is
+ *		rarely called by multiple processes simultaneously.
+ *
+ * NEW NOTES:
+ *		The above is true only on machines without test-and-set
+ *		semaphores (which we hope are few, these days).  On better
+ *		hardware, each buffer has a spinlock that we can wait on.
+ */
+#ifdef HAS_TEST_AND_SET
+
+static void
+WaitIO(BufferDesc *buf, SPINLOCK spinlock)
+{
+
+	/*
+	 * Changed to wait until there's no IO - Inoue 01/13/2000
+	 */
+	while ((buf->flags & BM_IO_IN_PROGRESS) != 0)
+	{
+		SpinRelease(spinlock);
+		S_LOCK(&(buf->io_in_progress_lock));
+		S_UNLOCK(&(buf->io_in_progress_lock));
+		SpinAcquire(spinlock);
+	}
+}
+
+#else							/* !HAS_TEST_AND_SET */
+
+IpcSemaphoreId WaitIOSemId;
+IpcSemaphoreId WaitCLSemId;
+
+static void
+WaitIO(BufferDesc *buf, SPINLOCK spinlock)
+{
+	bool		inProgress;
+
+	for (;;)
+	{
+
+		/* wait until someone releases IO lock */
+		(*NWaitIOBackendP)++;
+		SpinRelease(spinlock);
+		IpcSemaphoreLock(WaitIOSemId, 0, 1);
+		SpinAcquire(spinlock);
+		inProgress = (buf->flags & BM_IO_IN_PROGRESS);
+		if (!inProgress)
+			break;
+	}
+}
+
+/*
+ * SignalIO
+ */
+static void
+SignalIO(BufferDesc *buf)
+{
+	/* somebody better be waiting. */
+	Assert(buf->refcount > 1);
+	IpcSemaphoreUnlock(WaitIOSemId, 0, *NWaitIOBackendP);
+	*NWaitIOBackendP = 0;
+}
+
+#endif	 /* HAS_TEST_AND_SET */
+
+long		NDirectFileRead;	/* some I/O's are direct file access.
+								 * bypass bufmgr */
+long		NDirectFileWrite;	/* e.g., I/O in psort and hashjoin.					*/
+
+void
+PrintBufferUsage(FILE *statfp)
+{
+	float		hitrate;
+	float		localhitrate;
+
+	if (ReadBufferCount == 0)
+		hitrate = 0.0;
+	else
+		hitrate = (float) BufferHitCount *100.0 / ReadBufferCount;
+
+	if (ReadLocalBufferCount == 0)
+		localhitrate = 0.0;
+	else
+		localhitrate = (float) LocalBufferHitCount *100.0 / ReadLocalBufferCount;
+
+	fprintf(statfp, "!\tShared blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
+			ReadBufferCount - BufferHitCount, BufferFlushCount, hitrate);
+	fprintf(statfp, "!\tLocal  blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
+			ReadLocalBufferCount - LocalBufferHitCount, LocalBufferFlushCount, localhitrate);
+	fprintf(statfp, "!\tDirect blocks: %10ld read, %10ld written\n",
+			NDirectFileRead, NDirectFileWrite);
+}
+
+void
+ResetBufferUsage()
+{
+	BufferHitCount = 0;
+	ReadBufferCount = 0;
+	BufferFlushCount = 0;
+	LocalBufferHitCount = 0;
+	ReadLocalBufferCount = 0;
+	LocalBufferFlushCount = 0;
+	NDirectFileRead = 0;
+	NDirectFileWrite = 0;
+}
+
+/* ----------------------------------------------
+ *		ResetBufferPool
+ *
+ *		This routine is supposed to be called when a transaction aborts.
+ *		it will release all the buffer pins held by the transaction.
+ *		Currently, we also call it during commit if BufferPoolCheckLeak
+ *		detected a problem --- in that case, isCommit is TRUE, and we
+ *		only clean up buffer pin counts.
+ *
+ * During abort, we also forget any pending fsync requests.  Dirtied buffers
+ * will still get written, eventually, but there will be no fsync for them.
+ *
+ * ----------------------------------------------
+ */
+void
+ResetBufferPool(bool isCommit)
+{
+	int			i;
+
+	for (i = 0; i < NBuffers; i++)
+	{
+		if (PrivateRefCount[i] != 0)
+		{
+			BufferDesc *buf = &BufferDescriptors[i];
+
+			SpinAcquire(BufMgrLock);
+			Assert(buf->refcount > 0);
+			buf->refcount--;
+			if (buf->refcount == 0)
+			{
+				AddBufferToFreelist(buf);
+				buf->flags |= BM_FREE;
+			}
+			SpinRelease(BufMgrLock);
+		}
+		PrivateRefCount[i] = 0;
+	}
+
+	ResetLocalBufferPool();
+
+	if (!isCommit)
+		smgrabort();
+}
+
+/* -----------------------------------------------
+ *		BufferPoolCheckLeak
+ *
+ *		check if there is buffer leak
+ *
+ * -----------------------------------------------
+ */
+int
+BufferPoolCheckLeak()
+{
+	int			i;
+	int			result = 0;
+
+	for (i = 1; i <= NBuffers; i++)
+	{
+		if (PrivateRefCount[i - 1] != 0)
+		{
+			BufferDesc *buf = &(BufferDescriptors[i - 1]);
+
+			elog(NOTICE,
+				 "Buffer Leak: [%03d] (freeNext=%ld, freePrev=%ld, \
+relname=%s, blockNum=%d, flags=0x%x, refcount=%d %ld)",
+				 i - 1, buf->freeNext, buf->freePrev,
+				 buf->blind.relname, buf->tag.blockNum, buf->flags,
+				 buf->refcount, PrivateRefCount[i - 1]);
+			result = 1;
+		}
+	}
+	return result;
+}
+
+/* ------------------------------------------------
+ * FlushBufferPool
+ *
+ * Flush all dirty blocks in buffer pool to disk
+ * at the checkpoint time
+ * ------------------------------------------------
+ */
+void
+FlushBufferPool(void)
+{
+	BufferSync();
+	smgrsync();
+}
+
+/*
+ * At the commit time we have to flush local buffer pool only
+ */
+void
+BufmgrCommit(void)
+{
+	LocalBufferSync();
+	smgrcommit();
+}
+
+/*
+ * BufferGetBlockNumber
+ *		Returns the block number associated with a buffer.
+ *
+ * Note:
+ *		Assumes that the buffer is valid.
+ */
+BlockNumber
+BufferGetBlockNumber(Buffer buffer)
+{
+	Assert(BufferIsValid(buffer));
+
+	/* XXX should be a critical section */
+	if (BufferIsLocal(buffer))
+		return LocalBufferDescriptors[-buffer - 1].tag.blockNum;
+	else
+		return BufferDescriptors[buffer - 1].tag.blockNum;
+}
+
+/*
+ * BufferReplace
+ *
+ * Write out the buffer corresponding to 'bufHdr'
+ *
+ * BufMgrLock must be held at entry, and the buffer must be pinned.
+ */
+static int
+BufferReplace(BufferDesc *bufHdr)
+{
+	Relation	reln;
+	XLogRecPtr	recptr;
+	int			status;
+
+	/* To check if block content changed while flushing. - vadim 01/17/97 */
+	bufHdr->flags &= ~BM_JUST_DIRTIED;
+
+	SpinRelease(BufMgrLock);
+
+	/*
+	 * No need to lock buffer context - no one should be able to
+	 * end ReadBuffer
+	 */
+	recptr = BufferGetLSN(bufHdr);
+	XLogFlush(recptr);
+
+	reln = RelationNodeCacheGetRelation(bufHdr->tag.rnode);
+
+	if (reln != (Relation) NULL)
+	{
+		status = smgrwrite(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
+						   (char *) MAKE_PTR(bufHdr->data));
+	}
+	else
+	{
+		status = smgrblindwrt(DEFAULT_SMGR, bufHdr->tag.rnode,
+							  bufHdr->tag.blockNum,
+							  (char *) MAKE_PTR(bufHdr->data),
+							  false);	/* no fsync */
+	}
+
+	/* drop relcache refcnt incremented by RelationIdCacheGetRelation */
+	if (reln != (Relation) NULL)
+		RelationDecrementReferenceCount(reln);
+
+	SpinAcquire(BufMgrLock);
+
+	if (status == SM_FAIL)
+		return FALSE;
+
+	BufferFlushCount++;
+
+	return TRUE;
+}
+
+/*
+ * RelationGetNumberOfBlocks
+ *		Returns the buffer descriptor associated with a page in a relation.
+ *
+ * Note:
+ *		XXX may fail for huge relations.
+ *		XXX should be elsewhere.
+ *		XXX maybe should be hidden
+ */
+BlockNumber
+RelationGetNumberOfBlocks(Relation relation)
+{
+	return ((relation->rd_myxactonly) ? relation->rd_nblocks :
+			smgrnblocks(DEFAULT_SMGR, relation));
+}
+
+/* ---------------------------------------------------------------------
+ *		ReleaseRelationBuffers
+ *
+ *		This function removes all the buffered pages for a relation
+ *		from the buffer pool.  Dirty pages are simply dropped, without
+ *		bothering to write them out first.  This is used when the
+ *		relation is about to be deleted.  We assume that the caller
+ *		holds an exclusive lock on the relation, which should assure
+ *		that no new buffers will be acquired for the rel meanwhile.
+ *
+ *		XXX currently it sequentially searches the buffer pool, should be
+ *		changed to more clever ways of searching.
+ * --------------------------------------------------------------------
+ */
+void
+ReleaseRelationBuffers(Relation rel)
+{
+	int			i;
+	BufferDesc *bufHdr;
+
+	if (rel->rd_myxactonly)
+	{
+		for (i = 0; i < NLocBuffer; i++)
+		{
+			bufHdr = &LocalBufferDescriptors[i];
+			if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+			{
+				bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+				bufHdr->cntxDirty = false;
+				LocalRefCount[i] = 0;
+				bufHdr->tag.rnode.relNode = InvalidOid;
+			}
+		}
+		return;
+	}
+
+	SpinAcquire(BufMgrLock);
+	for (i = 1; i <= NBuffers; i++)
+	{
+		bufHdr = &BufferDescriptors[i - 1];
+recheck:
+		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+		{
+
+			/*
+			 * If there is I/O in progress, better wait till it's done;
+			 * don't want to delete the relation out from under someone
+			 * who's just trying to flush the buffer!
+			 */
+			if (bufHdr->flags & BM_IO_IN_PROGRESS)
+			{
+				WaitIO(bufHdr, BufMgrLock);
+
+				/*
+				 * By now, the buffer very possibly belongs to some other
+				 * rel, so check again before proceeding.
+				 */
+				goto recheck;
+			}
+			/* Now we can do what we came for */
+			bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+			bufHdr->cntxDirty = false;
+
+			/*
+			 * Release any refcount we may have.
+			 *
+			 * This is very probably dead code, and if it isn't then it's
+			 * probably wrong.	I added the Assert to find out --- tgl
+			 * 11/99.
+			 */
+			if (!(bufHdr->flags & BM_FREE))
+			{
+				/* Assert checks that buffer will actually get freed! */
+				Assert(PrivateRefCount[i - 1] == 1 &&
+					   bufHdr->refcount == 1);
+				/* ReleaseBuffer expects we do not hold the lock at entry */
+				SpinRelease(BufMgrLock);
+				ReleaseBuffer(i);
+				SpinAcquire(BufMgrLock);
+			}
+			/*
+			 * And mark the buffer as no longer occupied by this rel.
+			 */
+			BufTableDelete(bufHdr);
+		}
+	}
+
+	SpinRelease(BufMgrLock);
+}
+
+/* ---------------------------------------------------------------------
+ *		DropBuffers
+ *
+ *		This function removes all the buffers in the buffer cache for a
+ *		particular database.  Dirty pages are simply dropped, without
+ *		bothering to write them out first.  This is used when we destroy a
+ *		database, to avoid trying to flush data to disk when the directory
+ *		tree no longer exists.	Implementation is pretty similar to
+ *		ReleaseRelationBuffers() which is for destroying just one relation.
+ * --------------------------------------------------------------------
+ */
+void
+DropBuffers(Oid dbid)
+{
+	int			i;
+	BufferDesc *bufHdr;
+
+	SpinAcquire(BufMgrLock);
+	for (i = 1; i <= NBuffers; i++)
+	{
+		bufHdr = &BufferDescriptors[i - 1];
+recheck:
+		/*
+		 * We know that currently database OID is tblNode but
+		 * this probably will be changed in future and this
+		 * func will be used to drop tablespace buffers.
+		 */
+		if (bufHdr->tag.rnode.tblNode == dbid)
+		{
+
+			/*
+			 * If there is I/O in progress, better wait till it's done;
+			 * don't want to delete the database out from under someone
+			 * who's just trying to flush the buffer!
+			 */
+			if (bufHdr->flags & BM_IO_IN_PROGRESS)
+			{
+				WaitIO(bufHdr, BufMgrLock);
+
+				/*
+				 * By now, the buffer very possibly belongs to some other
+				 * DB, so check again before proceeding.
+				 */
+				goto recheck;
+			}
+			/* Now we can do what we came for */
+			bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+			bufHdr->cntxDirty = false;
+
+			/*
+			 * The thing should be free, if caller has checked that no
+			 * backends are running in that database.
+			 */
+			Assert(bufHdr->flags & BM_FREE);
+			/*
+			 * And mark the buffer as no longer occupied by this page.
+			 */
+			BufTableDelete(bufHdr);
+		}
+	}
+	SpinRelease(BufMgrLock);
+}
+
+/* -----------------------------------------------------------------
+ *		PrintBufferDescs
+ *
+ *		this function prints all the buffer descriptors, for debugging
+ *		use only.
+ * -----------------------------------------------------------------
+ */
+void
+PrintBufferDescs()
+{
+	int			i;
+	BufferDesc *buf = BufferDescriptors;
+
+	if (IsUnderPostmaster)
+	{
+		SpinAcquire(BufMgrLock);
+		for (i = 0; i < NBuffers; ++i, ++buf)
+		{
+			elog(DEBUG, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \
+blockNum=%d, flags=0x%x, refcount=%d %ld)",
+				 i, buf->freeNext, buf->freePrev,
+				 buf->blind.relname, buf->tag.blockNum, buf->flags,
+				 buf->refcount, PrivateRefCount[i]);
+		}
+		SpinRelease(BufMgrLock);
+	}
+	else
+	{
+		/* interactive backend */
+		for (i = 0; i < NBuffers; ++i, ++buf)
+		{
+			printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n",
+					i, buf->blind.relname, buf->tag.blockNum,
+					buf->flags, buf->refcount, PrivateRefCount[i]);
+		}
+	}
+}
+
+void
+PrintPinnedBufs()
+{
+	int			i;
+	BufferDesc *buf = BufferDescriptors;
+
+	SpinAcquire(BufMgrLock);
+	for (i = 0; i < NBuffers; ++i, ++buf)
+	{
+		if (PrivateRefCount[i] > 0)
+			elog(NOTICE, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \
+blockNum=%d, flags=0x%x, refcount=%d %ld)\n",
+				 i, buf->freeNext, buf->freePrev, buf->blind.relname,
+				 buf->tag.blockNum, buf->flags,
+				 buf->refcount, PrivateRefCount[i]);
+	}
+	SpinRelease(BufMgrLock);
+}
+
+/*
+ * BufferPoolBlowaway
+ *
+ * this routine is solely for the purpose of experiments -- sometimes
+ * you may want to blowaway whatever is left from the past in buffer
+ * pool and start measuring some performance with a clean empty buffer
+ * pool.
+ */
+#ifdef NOT_USED
+void
+BufferPoolBlowaway()
+{
+	int			i;
+
+	BufferSync();
+	for (i = 1; i <= NBuffers; i++)
+	{
+		if (BufferIsValid(i))
+		{
+			while (BufferIsValid(i))
+				ReleaseBuffer(i);
+		}
+		BufTableDelete(&BufferDescriptors[i - 1]);
+	}
+}
+
+#endif
+
+/* ---------------------------------------------------------------------
+ *		FlushRelationBuffers
+ *
+ *		This function flushes all dirty pages of a relation out to disk.
+ *		Furthermore, pages that have blocknumber >= firstDelBlock are
+ *		actually removed from the buffer pool.  An error code is returned
+ *		if we fail to dump a dirty buffer or if we find one of
+ *		the target pages is pinned into the cache.
+ *
+ *		This is used by VACUUM before truncating the relation to the given
+ *		number of blocks.  (TRUNCATE TABLE also uses it in the same way.)
+ *		It might seem unnecessary to flush dirty pages before firstDelBlock,
+ *		since VACUUM should already have committed its changes.  However,
+ *		it is possible for there still to be dirty pages: if some page
+ *		had unwritten on-row tuple status updates from a prior transaction,
+ *		and VACUUM had no additional changes to make to that page, then
+ *		VACUUM won't have written it.  This is harmless in most cases but
+ *		will break pg_upgrade, which relies on VACUUM to ensure that *all*
+ *		tuples have correct on-row status.  So, we check and flush all
+ *		dirty pages of the rel regardless of block number.
+ *
+ *		This is also used by RENAME TABLE (with firstDelBlock = 0)
+ *		to clear out the buffer cache before renaming the physical files of
+ *		a relation.  Without that, some other backend might try to do a
+ *		blind write of a buffer page (relying on the BlindId of the buffer)
+ *		and fail because it's not got the right filename anymore.
+ *
+ *		In all cases, the caller should be holding AccessExclusiveLock on
+ *		the target relation to ensure that no other backend is busy reading
+ *		more blocks of the relation.
+ *
+ *		Formerly, we considered it an error condition if we found dirty
+ *		buffers here.	However, since BufferSync no longer forces out all
+ *		dirty buffers at every xact commit, it's possible for dirty buffers
+ *		to still be present in the cache due to failure of an earlier
+ *		transaction.  So, must flush dirty buffers without complaint.
+ *
+ *		Returns: 0 - Ok, -1 - FAILED TO WRITE DIRTY BUFFER, -2 - PINNED
+ *
+ *		XXX currently it sequentially searches the buffer pool, should be
+ *		changed to more clever ways of searching.
+ * --------------------------------------------------------------------
+ */
+int
+FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
+{
+	int			i;
+	BufferDesc *bufHdr;
+	XLogRecPtr	recptr;
+	int			status;
+
+	if (rel->rd_myxactonly)
+	{
+		for (i = 0; i < NLocBuffer; i++)
+		{
+			bufHdr = &LocalBufferDescriptors[i];
+			if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+			{
+				if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+				{
+					status = smgrwrite(DEFAULT_SMGR, rel, 
+								bufHdr->tag.blockNum,
+								(char *) MAKE_PTR(bufHdr->data));
+					if (status == SM_FAIL)
+					{
+						elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it",
+							 RelationGetRelationName(rel), firstDelBlock,
+							 bufHdr->tag.blockNum);
+						return(-1);
+					}
+					bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+					bufHdr->cntxDirty = false;
+				}
+				if (LocalRefCount[i] > 0)
+				{
+					elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is referenced (%ld)",
+						 RelationGetRelationName(rel), firstDelBlock,
+						 bufHdr->tag.blockNum, LocalRefCount[i]);
+					return(-2);
+				}
+				if (bufHdr->tag.blockNum >= firstDelBlock)
+				{
+					bufHdr->tag.rnode.relNode = InvalidOid;
+				}
+			}
+		}
+		return 0;
+	}
+
+	SpinAcquire(BufMgrLock);
+	for (i = 0; i < NBuffers; i++)
+	{
+		bufHdr = &BufferDescriptors[i];
+		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+		{
+			if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+			{
+				PinBuffer(bufHdr);
+				if (bufHdr->flags & BM_IO_IN_PROGRESS)
+					WaitIO(bufHdr, BufMgrLock);
+				SpinRelease(BufMgrLock);
+
+				/*
+				 * Force XLOG flush for buffer' LSN
+				 */
+				recptr = BufferGetLSN(bufHdr);
+				XLogFlush(recptr);
+
+				/*
+				 * Now it's safe to write buffer to disk
+				 */
+
+				SpinAcquire(BufMgrLock);
+				if (bufHdr->flags & BM_IO_IN_PROGRESS)
+					WaitIO(bufHdr, BufMgrLock);
+
+				if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+				{
+					bufHdr->flags &= ~BM_JUST_DIRTIED;
+					StartBufferIO(bufHdr, false);		/* output IO start */
+
+					SpinRelease(BufMgrLock);
+
+					status = smgrwrite(DEFAULT_SMGR, rel,
+									bufHdr->tag.blockNum,
+									(char *) MAKE_PTR(bufHdr->data));
+
+					if (status == SM_FAIL)	/* disk failure ?! */
+						elog(STOP, "FlushRelationBuffers: cannot write %u for %s",
+							 bufHdr->tag.blockNum, bufHdr->blind.relname);
+
+					BufferFlushCount++;
+
+					SpinAcquire(BufMgrLock);
+					bufHdr->flags &= ~BM_IO_IN_PROGRESS;
+					TerminateBufferIO(bufHdr);
+					Assert(!(bufHdr->flags & BM_JUST_DIRTIED));
+					bufHdr->flags &= ~BM_DIRTY;
+					/*
+					 * Note that it's safe to change cntxDirty here because
+					 * of we protect it from upper writers by
+					 * AccessExclusiveLock and from other bufmgr routines
+					 * by BM_IO_IN_PROGRESS
+					 */
+					bufHdr->cntxDirty = false;
+				}
+				UnpinBuffer(bufHdr);
+			}
+			if (!(bufHdr->flags & BM_FREE))
+			{
+				SpinRelease(BufMgrLock);
+				elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is referenced (private %ld, global %d)",
+					 RelationGetRelationName(rel), firstDelBlock,
+					 bufHdr->tag.blockNum,
+					 PrivateRefCount[i], bufHdr->refcount);
+				return -2;
+			}
+			if (bufHdr->tag.blockNum >= firstDelBlock)
+			{
+				BufTableDelete(bufHdr);
+			}
+		}
+	}
+	SpinRelease(BufMgrLock);
+	return 0;
+}
+
+#undef ReleaseBuffer
+
+/*
+ * ReleaseBuffer -- remove the pin on a buffer without
+ *		marking it dirty.
+ *
+ */
+int
+ReleaseBuffer(Buffer buffer)
+{
+	BufferDesc *bufHdr;
+
+	if (BufferIsLocal(buffer))
+	{
+		Assert(LocalRefCount[-buffer - 1] > 0);
+		LocalRefCount[-buffer - 1]--;
+		return STATUS_OK;
+	}
+
+	if (BAD_BUFFER_ID(buffer))
+		return STATUS_ERROR;
+
+	bufHdr = &BufferDescriptors[buffer - 1];
+
+	Assert(PrivateRefCount[buffer - 1] > 0);
+	PrivateRefCount[buffer - 1]--;
+	if (PrivateRefCount[buffer - 1] == 0)
+	{
+		SpinAcquire(BufMgrLock);
+		Assert(bufHdr->refcount > 0);
+		bufHdr->refcount--;
+		if (bufHdr->refcount == 0)
+		{
+			AddBufferToFreelist(bufHdr);
+			bufHdr->flags |= BM_FREE;
+		}
+		SpinRelease(BufMgrLock);
+	}
+
+	return STATUS_OK;
+}
+
+#ifdef NOT_USED
+void
+IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
+{
+	IncrBufferRefCount(buffer);
+	if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
+	{
+		BufferDesc *buf = &BufferDescriptors[buffer - 1];
+
+		fprintf(stderr, "PIN(Incr) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+				buffer, buf->blind.relname, buf->tag.blockNum,
+				PrivateRefCount[buffer - 1], file, line);
+	}
+}
+
+#endif
+
+#ifdef NOT_USED
+void
+ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
+{
+	ReleaseBuffer(buffer);
+	if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
+	{
+		BufferDesc *buf = &BufferDescriptors[buffer - 1];
+
+		fprintf(stderr, "UNPIN(Rel) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+				buffer, buf->blind.relname, buf->tag.blockNum,
+				PrivateRefCount[buffer - 1], file, line);
+	}
+}
+
+#endif
+
+#ifdef NOT_USED
+int
+ReleaseAndReadBuffer_Debug(char *file,
+						   int line,
+						   Buffer buffer,
+						   Relation relation,
+						   BlockNumber blockNum)
+{
+	bool		bufferValid;
+	Buffer		b;
+
+	bufferValid = BufferIsValid(buffer);
+	b = ReleaseAndReadBuffer(buffer, relation, blockNum);
+	if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
+		&& is_userbuffer(buffer))
+	{
+		BufferDesc *buf = &BufferDescriptors[buffer - 1];
+
+		fprintf(stderr, "UNPIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+				buffer, buf->blind.relname, buf->tag.blockNum,
+				PrivateRefCount[buffer - 1], file, line);
+	}
+	if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
+	{
+		BufferDesc *buf = &BufferDescriptors[b - 1];
+
+		fprintf(stderr, "PIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+				b, buf->blind.relname, buf->tag.blockNum,
+				PrivateRefCount[b - 1], file, line);
+	}
+	return b;
+}
+
+#endif
+
+#ifdef BMTRACE
+
+/*
+ *	trace allocations and deallocations in a circular buffer in
+ *	shared memory.	check the buffer before doing the allocation,
+ *	and die if there's anything fishy.
+ */
+
+_bm_trace(Oid dbId, Oid relId, int blkNo, int bufNo, int allocType)
+{
+	long		start,
+				cur;
+	bmtrace    *tb;
+
+	start = *CurTraceBuf;
+
+	if (start > 0)
+		cur = start - 1;
+	else
+		cur = BMT_LIMIT - 1;
+
+	for (;;)
+	{
+		tb = &TraceBuf[cur];
+		if (tb->bmt_op != BMT_NOTUSED)
+		{
+			if (tb->bmt_buf == bufNo)
+			{
+				if ((tb->bmt_op == BMT_DEALLOC)
+					|| (tb->bmt_dbid == dbId && tb->bmt_relid == relId
+						&& tb->bmt_blkno == blkNo))
+					goto okay;
+
+				/* die holding the buffer lock */
+				_bm_die(dbId, relId, blkNo, bufNo, allocType, start, cur);
+			}
+		}
+
+		if (cur == start)
+			goto okay;
+
+		if (cur == 0)
+			cur = BMT_LIMIT - 1;
+		else
+			cur--;
+	}
+
+okay:
+	tb = &TraceBuf[start];
+	tb->bmt_pid = MyProcPid;
+	tb->bmt_buf = bufNo;
+	tb->bmt_dbid = dbId;
+	tb->bmt_relid = relId;
+	tb->bmt_blkno = blkNo;
+	tb->bmt_op = allocType;
+
+	*CurTraceBuf = (start + 1) % BMT_LIMIT;
+}
+
+_bm_die(Oid dbId, Oid relId, int blkNo, int bufNo,
+		int allocType, long start, long cur)
+{
+	FILE	   *fp;
+	bmtrace    *tb;
+	int			i;
+
+	tb = &TraceBuf[cur];
+
+	if ((fp = AllocateFile("/tmp/death_notice", "w")) == NULL)
+		elog(FATAL, "buffer alloc trace error and can't open log file");
+
+	fprintf(fp, "buffer alloc trace detected the following error:\n\n");
+	fprintf(fp, "    buffer %d being %s inconsistently with a previous %s\n\n",
+		 bufNo, (allocType == BMT_DEALLOC ? "deallocated" : "allocated"),
+			(tb->bmt_op == BMT_DEALLOC ? "deallocation" : "allocation"));
+
+	fprintf(fp, "the trace buffer contains:\n");
+
+	i = start;
+	for (;;)
+	{
+		tb = &TraceBuf[i];
+		if (tb->bmt_op != BMT_NOTUSED)
+		{
+			fprintf(fp, "     [%3d]%spid %d buf %2d for <%d,%u,%d> ",
+					i, (i == cur ? " ---> " : "\t"),
+					tb->bmt_pid, tb->bmt_buf,
+					tb->bmt_dbid, tb->bmt_relid, tb->bmt_blkno);
+
+			switch (tb->bmt_op)
+			{
+				case BMT_ALLOCFND:
+					fprintf(fp, "allocate (found)\n");
+					break;
+
+				case BMT_ALLOCNOTFND:
+					fprintf(fp, "allocate (not found)\n");
+					break;
+
+				case BMT_DEALLOC:
+					fprintf(fp, "deallocate\n");
+					break;
+
+				default:
+					fprintf(fp, "unknown op type %d\n", tb->bmt_op);
+					break;
+			}
+		}
+
+		i = (i + 1) % BMT_LIMIT;
+		if (i == start)
+			break;
+	}
+
+	fprintf(fp, "\noperation causing error:\n");
+	fprintf(fp, "\tpid %d buf %d for <%d,%u,%d> ",
+			getpid(), bufNo, dbId, relId, blkNo);
+
+	switch (allocType)
+	{
+		case BMT_ALLOCFND:
+			fprintf(fp, "allocate (found)\n");
+			break;
+
+		case BMT_ALLOCNOTFND:
+			fprintf(fp, "allocate (not found)\n");
+			break;
+
+		case BMT_DEALLOC:
+			fprintf(fp, "deallocate\n");
+			break;
+
+		default:
+			fprintf(fp, "unknown op type %d\n", allocType);
+			break;
+	}
+
+	FreeFile(fp);
+
+	kill(getpid(), SIGILL);
+}
+
+#endif	 /* BMTRACE */
+
+/*
+ * SetBufferCommitInfoNeedsSave
+ *
+ *	Mark a buffer dirty when we have updated tuple commit-status bits in it.
+ *
+ * This is similar to WriteNoReleaseBuffer, except that we do not set
+ * SharedBufferChanged or BufferDirtiedByMe, because we have not made a
+ * critical change that has to be flushed to disk before xact commit --- the
+ * status-bit update could be redone by someone else just as easily.  The
+ * buffer will be marked dirty, but it will not be written to disk until
+ * there is another reason to write it.
+ *
+ * This routine might get called many times on the same page, if we are making
+ * the first scan after commit of an xact that added/deleted many tuples.
+ * So, be as quick as we can if the buffer is already dirty.
+ */
+void
+SetBufferCommitInfoNeedsSave(Buffer buffer)
+{
+	BufferDesc *bufHdr;
+
+	if (BufferIsLocal(buffer))
+		return;
+
+	if (BAD_BUFFER_ID(buffer))
+		return;
+
+	bufHdr = &BufferDescriptors[buffer - 1];
+
+	if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+		(BM_DIRTY | BM_JUST_DIRTIED))
+	{
+		SpinAcquire(BufMgrLock);
+		Assert(bufHdr->refcount > 0);
+		bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+		SpinRelease(BufMgrLock);
+	}
+}
+
+void
+UnlockBuffers()
+{
+	BufferDesc *buf;
+	int			i;
+
+	for (i = 0; i < NBuffers; i++)
+	{
+		if (BufferLocks[i] == 0)
+			continue;
+
+		Assert(BufferIsValid(i + 1));
+		buf = &(BufferDescriptors[i]);
+
+#ifdef HAS_TEST_AND_SET
+		S_LOCK(&(buf->cntx_lock));
+#else
+		IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+
+		if (BufferLocks[i] & BL_R_LOCK)
+		{
+			Assert(buf->r_locks > 0);
+			(buf->r_locks)--;
+		}
+		if (BufferLocks[i] & BL_RI_LOCK)
+		{
+
+			/*
+			 * Someone else could remove our RI lock when acquiring W
+			 * lock. This is possible if we came here from elog(ERROR)
+			 * from IpcSemaphore{Lock|Unlock}(WaitCLSemId). And so we
+			 * don't do Assert(buf->ri_lock) here.
+			 */
+			buf->ri_lock = false;
+		}
+		if (BufferLocks[i] & BL_W_LOCK)
+		{
+			Assert(buf->w_lock);
+			buf->w_lock = false;
+		}
+#ifdef HAS_TEST_AND_SET
+		S_UNLOCK(&(buf->cntx_lock));
+#else
+		IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+		BufferLocks[i] = 0;
+	}
+}
+
+void
+LockBuffer(Buffer buffer, int mode)
+{
+	BufferDesc *buf;
+	bits8	   *buflock;
+
+	Assert(BufferIsValid(buffer));
+	if (BufferIsLocal(buffer))
+		return;
+
+	buf = &(BufferDescriptors[buffer - 1]);
+	buflock = &(BufferLocks[buffer - 1]);
+
+#ifdef HAS_TEST_AND_SET
+	S_LOCK(&(buf->cntx_lock));
+#else
+	IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+
+	if (mode == BUFFER_LOCK_UNLOCK)
+	{
+		if (*buflock & BL_R_LOCK)
+		{
+			Assert(buf->r_locks > 0);
+			Assert(!(buf->w_lock));
+			Assert(!(*buflock & (BL_W_LOCK | BL_RI_LOCK)));
+			(buf->r_locks)--;
+			*buflock &= ~BL_R_LOCK;
+		}
+		else if (*buflock & BL_W_LOCK)
+		{
+			Assert(buf->w_lock);
+			Assert(buf->r_locks == 0);
+			Assert(!(*buflock & (BL_R_LOCK | BL_RI_LOCK)));
+			buf->w_lock = false;
+			*buflock &= ~BL_W_LOCK;
+		}
+		else
+			elog(ERROR, "UNLockBuffer: buffer %lu is not locked", buffer);
+	}
+	else if (mode == BUFFER_LOCK_SHARE)
+	{
+		unsigned	i = 0;
+
+		Assert(!(*buflock & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
+		while (buf->ri_lock || buf->w_lock)
+		{
+#ifdef HAS_TEST_AND_SET
+			S_UNLOCK(&(buf->cntx_lock));
+			s_lock_sleep(i++);
+			S_LOCK(&(buf->cntx_lock));
+#else
+			IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+			s_lock_sleep(i++);
+			IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+		}
+		(buf->r_locks)++;
+		*buflock |= BL_R_LOCK;
+	}
+	else if (mode == BUFFER_LOCK_EXCLUSIVE)
+	{
+		unsigned	i = 0;
+
+		Assert(!(*buflock & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
+		while (buf->r_locks > 0 || buf->w_lock)
+		{
+			if (buf->r_locks > 3 || (*buflock & BL_RI_LOCK))
+			{
+
+				/*
+				 * Our RI lock might be removed by concurrent W lock
+				 * acquiring (see what we do with RI locks below when our
+				 * own W acquiring succeeded) and so we set RI lock again
+				 * if we already did this.
+				 */
+				*buflock |= BL_RI_LOCK;
+				buf->ri_lock = true;
+			}
+#ifdef HAS_TEST_AND_SET
+			S_UNLOCK(&(buf->cntx_lock));
+			s_lock_sleep(i++);
+			S_LOCK(&(buf->cntx_lock));
+#else
+			IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+			s_lock_sleep(i++);
+			IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+		}
+		buf->w_lock = true;
+		*buflock |= BL_W_LOCK;
+
+		buf->cntxDirty = true;
+
+		if (*buflock & BL_RI_LOCK)
+		{
+
+			/*
+			 * It's possible to remove RI locks acquired by another W
+			 * lockers here, but they'll take care about it.
+			 */
+			buf->ri_lock = false;
+			*buflock &= ~BL_RI_LOCK;
+		}
+	}
+	else
+		elog(ERROR, "LockBuffer: unknown lock mode %d", mode);
+
+#ifdef HAS_TEST_AND_SET
+	S_UNLOCK(&(buf->cntx_lock));
+#else
+	IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+
+}
+
+/*
+ *	Functions for IO error handling
+ *
+ *	Note : We assume that nested buffer IO never occur.
+ *	i.e at most one io_in_progress spinlock is held
+ *	per proc.
+*/
+static BufferDesc *InProgressBuf = (BufferDesc *) NULL;
+static bool IsForInput;
+
+/*
+ * Function:StartBufferIO
+ *	(Assumptions)
+ *	My process is executing no IO
+ *	BufMgrLock is held
+ *	BM_IO_IN_PROGRESS mask is not set for the buffer
+ *	The buffer is Pinned
+ *
+*/
+static void
+StartBufferIO(BufferDesc *buf, bool forInput)
+{
+	Assert(!InProgressBuf);
+	Assert(!(buf->flags & BM_IO_IN_PROGRESS));
+	buf->flags |= BM_IO_IN_PROGRESS;
+#ifdef	HAS_TEST_AND_SET
+
+	/*
+	 * There used to be
+	 *
+	 * Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
+	 *
+	 * here, but that's wrong because of the way WaitIO works: someone else
+	 * waiting for the I/O to complete will succeed in grabbing the lock
+	 * for a few instructions, and if we context-swap back to here the
+	 * Assert could fail.  Tiny window for failure, but I've seen it
+	 * happen -- tgl
+	 */
+	S_LOCK(&(buf->io_in_progress_lock));
+#endif	 /* HAS_TEST_AND_SET */
+	InProgressBuf = buf;
+	IsForInput = forInput;
+}
+
+/*
+ * Function:TerminateBufferIO
+ *	(Assumptions)
+ *	My process is executing IO for the buffer
+ *	BufMgrLock is held
+ *	The buffer is Pinned
+ *
+*/
+static void
+TerminateBufferIO(BufferDesc *buf)
+{
+	Assert(buf == InProgressBuf);
+#ifdef	HAS_TEST_AND_SET
+	S_UNLOCK(&(buf->io_in_progress_lock));
+#else
+	if (buf->refcount > 1)
+		SignalIO(buf);
+#endif	 /* HAS_TEST_AND_SET */
+	InProgressBuf = (BufferDesc *) 0;
+}
+
+/*
+ * Function:ContinueBufferIO
+ *	(Assumptions)
+ *	My process is executing IO for the buffer
+ *	BufMgrLock is held
+ *	The buffer is Pinned
+ *
+*/
+static void
+ContinueBufferIO(BufferDesc *buf, bool forInput)
+{
+	Assert(buf == InProgressBuf);
+	Assert(buf->flags & BM_IO_IN_PROGRESS);
+	IsForInput = forInput;
+}
+
+#ifdef NOT_USED
+void
+InitBufferIO(void)
+{
+	InProgressBuf = (BufferDesc *) 0;
+}
+#endif
+
+/*
+ *	This function is called from ProcReleaseSpins().
+ *	BufMgrLock isn't held when this function is called.
+ *	BM_IO_ERROR is always set. If BM_IO_ERROR was already
+ *	set in case of output,this routine would kill all
+ *	backends and reset postmaster.
+ */
+void
+AbortBufferIO(void)
+{
+	BufferDesc *buf = InProgressBuf;
+
+	if (buf)
+	{
+		Assert(buf->flags & BM_IO_IN_PROGRESS);
+		SpinAcquire(BufMgrLock);
+		if (IsForInput)
+			Assert(!(buf->flags & BM_DIRTY) && !(buf->cntxDirty));
+		else
+		{
+			Assert(buf->flags & BM_DIRTY || buf->cntxDirty);
+			if (buf->flags & BM_IO_ERROR)
+			{
+				elog(NOTICE, "write error may be permanent: cannot write block %u for %s/%s",
+				buf->tag.blockNum, buf->blind.dbname, buf->blind.relname);
+			}
+			buf->flags |= BM_DIRTY;
+		}
+		buf->flags |= BM_IO_ERROR;
+		buf->flags &= ~BM_IO_IN_PROGRESS;
+		TerminateBufferIO(buf);
+		SpinRelease(BufMgrLock);
+	}
+}
+
+/*
+ * Cleanup buffer or mark it for cleanup. Buffer may be cleaned
+ * up if it's pinned only once.
+ *
+ * NOTE: buffer must be excl locked.
+ */
+void
+MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc)(Buffer))
+{
+	BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
+
+	Assert(PrivateRefCount[buffer - 1] > 0);
+
+	if (PrivateRefCount[buffer - 1] > 1)
+	{
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+		PrivateRefCount[buffer - 1]--;
+		SpinAcquire(BufMgrLock);
+		Assert(bufHdr->refcount > 0);
+		bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+		bufHdr->CleanupFunc = CleanupFunc;
+		SpinRelease(BufMgrLock);
+		return;
+	}
+
+	SpinAcquire(BufMgrLock);
+	Assert(bufHdr->refcount > 0);
+	if (bufHdr->refcount == 1)
+	{
+		SpinRelease(BufMgrLock);
+		CleanupFunc(buffer);
+		CleanupFunc = NULL;
+	}
+	else
+		SpinRelease(BufMgrLock);
+
+	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+	PrivateRefCount[buffer - 1]--;
+
+	SpinAcquire(BufMgrLock);
+	Assert(bufHdr->refcount > 0);
+	bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+	bufHdr->CleanupFunc = CleanupFunc;
+	bufHdr->refcount--;
+	if (bufHdr->refcount == 0)
+	{
+		AddBufferToFreelist(bufHdr);
+		bufHdr->flags |= BM_FREE;
+	}
+	SpinRelease(BufMgrLock);
+	return;
+}
diff --git a/src/backend/storage/buffer/xlog_localbuf.c b/src/backend/storage/buffer/xlog_localbuf.c
new file mode 100644
index 00000000000..cb14a32ed23
--- /dev/null
+++ b/src/backend/storage/buffer/xlog_localbuf.c
@@ -0,0 +1,274 @@
+/*-------------------------------------------------------------------------
+ *
+ * localbuf.c
+ *	  local buffer manager. Fast buffer manager for temporary tables
+ *	  or special cases when the operation is not visible to other backends.
+ *
+ *	  When a relation is being created, the descriptor will have rd_islocal
+ *	  set to indicate that the local buffer manager should be used. During
+ *	  the same transaction the relation is being created, any inserts or
+ *	  selects from the newly created relation will use the local buffer
+ *	  pool. rd_islocal is reset at the end of a transaction (commit/abort).
+ *	  This is useful for queries like SELECT INTO TABLE and create index.
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994-5, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_localbuf.c,v 1.1 2000/10/28 16:20:56 vadim Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include <sys/types.h>
+#include <sys/file.h>
+#include <math.h>
+#include <signal.h>
+
+#include "postgres.h"
+
+#include "executor/execdebug.h"
+#include "storage/smgr.h"
+#include "utils/relcache.h"
+
+extern long int LocalBufferFlushCount;
+
+int			NLocBuffer = 64;
+BufferDesc *LocalBufferDescriptors = NULL;
+long	   *LocalRefCount = NULL;
+
+static int	nextFreeLocalBuf = 0;
+
+/*#define LBDEBUG*/
+
+/*
+ * LocalBufferAlloc -
+ *	  allocate a local buffer. We do round robin allocation for now.
+ */
+BufferDesc *
+LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
+{
+	int			i;
+	BufferDesc *bufHdr = (BufferDesc *) NULL;
+
+	if (blockNum == P_NEW)
+	{
+		blockNum = reln->rd_nblocks;
+		reln->rd_nblocks++;
+	}
+
+	/* a low tech search for now -- not optimized for scans */
+	for (i = 0; i < NLocBuffer; i++)
+	{
+		if (LocalBufferDescriptors[i].tag.rnode.relNode == 
+			reln->rd_node.relNode &&
+			LocalBufferDescriptors[i].tag.blockNum == blockNum)
+		{
+
+#ifdef LBDEBUG
+			fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
+					RelationGetRelid(reln), blockNum, -i - 1);
+#endif
+			LocalRefCount[i]++;
+			*foundPtr = TRUE;
+			return &LocalBufferDescriptors[i];
+		}
+	}
+
+#ifdef LBDEBUG
+	fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
+			RelationGetRelid(reln), blockNum, -nextFreeLocalBuf - 1);
+#endif
+
+	/* need to get a new buffer (round robin for now) */
+	for (i = 0; i < NLocBuffer; i++)
+	{
+		int			b = (nextFreeLocalBuf + i) % NLocBuffer;
+
+		if (LocalRefCount[b] == 0)
+		{
+			bufHdr = &LocalBufferDescriptors[b];
+			LocalRefCount[b]++;
+			nextFreeLocalBuf = (b + 1) % NLocBuffer;
+			break;
+		}
+	}
+	if (bufHdr == NULL)
+		elog(ERROR, "no empty local buffer.");
+
+	/*
+	 * this buffer is not referenced but it might still be dirty (the last
+	 * transaction to touch it doesn't need its contents but has not
+	 * flushed it).  if that's the case, write it out before reusing it!
+	 */
+	if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+	{
+		Relation	bufrel = RelationNodeCacheGetRelation(bufHdr->tag.rnode);
+
+		Assert(bufrel != NULL);
+
+		/* flush this page */
+		smgrwrite(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
+				  (char *) MAKE_PTR(bufHdr->data));
+		LocalBufferFlushCount++;
+
+		/*
+		 * drop relcache refcount incremented by
+		 * RelationIdCacheGetRelation
+		 */
+		RelationDecrementReferenceCount(bufrel);
+	}
+
+	/*
+	 * it's all ours now.
+	 *
+	 * We need not in tblNode currently but will in future I think,
+	 * when we'll give up rel->rd_fd to fmgr cache.
+	 */
+	bufHdr->tag.rnode = reln->rd_node;
+	bufHdr->tag.blockNum = blockNum;
+	bufHdr->flags &= ~BM_DIRTY;
+	bufHdr->cntxDirty = false;
+
+	/*
+	 * lazy memory allocation. (see MAKE_PTR for why we need to do
+	 * MAKE_OFFSET.)
+	 */
+	if (bufHdr->data == (SHMEM_OFFSET) 0)
+	{
+		char	   *data = (char *) malloc(BLCKSZ);
+
+		bufHdr->data = MAKE_OFFSET(data);
+	}
+
+	*foundPtr = FALSE;
+	return bufHdr;
+}
+
+/*
+ * WriteLocalBuffer -
+ *	  writes out a local buffer
+ */
+int
+WriteLocalBuffer(Buffer buffer, bool release)
+{
+	int			bufid;
+
+	Assert(BufferIsLocal(buffer));
+
+#ifdef LBDEBUG
+	fprintf(stderr, "LB WRITE %d\n", buffer);
+#endif
+
+	bufid = -(buffer + 1);
+	LocalBufferDescriptors[bufid].flags |= BM_DIRTY;
+
+	if (release)
+	{
+		Assert(LocalRefCount[bufid] > 0);
+		LocalRefCount[bufid]--;
+	}
+
+	return true;
+}
+
+/*
+ * InitLocalBuffer -
+ *	  init the local buffer cache. Since most queries (esp. multi-user ones)
+ *	  don't involve local buffers, we delay allocating memory for actual the
+ *	  buffer until we need it.
+ */
+void
+InitLocalBuffer(void)
+{
+	int			i;
+
+	/*
+	 * these aren't going away. I'm not gonna use palloc.
+	 */
+	LocalBufferDescriptors =
+		(BufferDesc *) malloc(sizeof(BufferDesc) * NLocBuffer);
+	MemSet(LocalBufferDescriptors, 0, sizeof(BufferDesc) * NLocBuffer);
+	nextFreeLocalBuf = 0;
+
+	for (i = 0; i < NLocBuffer; i++)
+	{
+		BufferDesc *buf = &LocalBufferDescriptors[i];
+
+		/*
+		 * negative to indicate local buffer. This is tricky: shared
+		 * buffers start with 0. We have to start with -2. (Note that the
+		 * routine BufferDescriptorGetBuffer adds 1 to buf_id so our first
+		 * buffer id is -1.)
+		 */
+		buf->buf_id = -i - 2;
+	}
+
+	LocalRefCount = (long *) malloc(sizeof(long) * NLocBuffer);
+	MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
+}
+
+/*
+ * LocalBufferSync
+ *
+ * Flush all dirty buffers in the local buffer cache at commit time.
+ * Since the buffer cache is only used for keeping relations visible
+ * during a transaction, we will not need these buffers again.
+ *
+ * Note that we have to *flush* local buffers because of them are not
+ * visible to checkpoint makers. But we can skip XLOG flush check.
+ */
+void
+LocalBufferSync(void)
+{
+	int			i;
+
+	for (i = 0; i < NLocBuffer; i++)
+	{
+		BufferDesc *buf = &LocalBufferDescriptors[i];
+		Relation	bufrel;
+
+		if (buf->flags & BM_DIRTY || buf->cntxDirty)
+		{
+#ifdef LBDEBUG
+			fprintf(stderr, "LB SYNC %d\n", -i - 1);
+#endif
+			bufrel = RelationNodeCacheGetRelation(buf->tag.rnode);
+
+			Assert(bufrel != NULL);
+
+			smgrwrite(DEFAULT_SMGR, bufrel, buf->tag.blockNum,
+						(char *) MAKE_PTR(buf->data));
+			smgrmarkdirty(DEFAULT_SMGR, bufrel, buf->tag.blockNum);
+			LocalBufferFlushCount++;
+
+			/* drop relcache refcount from RelationIdCacheGetRelation */
+			RelationDecrementReferenceCount(bufrel);
+
+			buf->flags &= ~BM_DIRTY;
+			buf->cntxDirty = false;
+		}
+	}
+
+	MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
+	nextFreeLocalBuf = 0;
+}
+
+void
+ResetLocalBufferPool(void)
+{
+	int			i;
+
+	for (i = 0; i < NLocBuffer; i++)
+	{
+		BufferDesc *buf = &LocalBufferDescriptors[i];
+
+		buf->tag.rnode.relNode = InvalidOid;
+		buf->flags &= ~BM_DIRTY;
+		buf->cntxDirty = false;
+		buf->buf_id = -i - 2;
+	}
+
+	MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
+	nextFreeLocalBuf = 0;
+}
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 128c49989a0..84c4e76c09d 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/file/fd.c,v 1.64 2000/10/02 19:42:47 petere Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/file/fd.c,v 1.65 2000/10/28 16:20:56 vadim Exp $
  *
  * NOTES:
  *
@@ -823,8 +823,10 @@ FileWrite(File file, char *buffer, int amount)
 	if (returnCode > 0)
 	{
 		VfdCache[file].seekPos += returnCode;
+#ifndef XLOG
 		/* mark the file as needing fsync */
 		VfdCache[file].fdstate |= FD_DIRTY;
+#endif
 	}
 	else
 		VfdCache[file].seekPos = FileUnknownPos;
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index ff8b4ce52fe..da466afe9f8 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.76 2000/10/20 11:01:11 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.77 2000/10/28 16:20:57 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -76,12 +76,7 @@ static int	_mdfd_getrelnfd(Relation reln);
 static MdfdVec *_mdfd_openseg(Relation reln, int segno, int oflags);
 static MdfdVec *_mdfd_getseg(Relation reln, int blkno);
 
-#ifdef OLD_FILE_NAMING
-static int _mdfd_blind_getseg(char *dbname, char *relname,
-				   Oid dbid, Oid relid, int blkno);
-#else
 static int _mdfd_blind_getseg(RelFileNode rnode, int blkno);
-#endif
 
 static int	_fdvec_alloc(void);
 static void _fdvec_free(int);
@@ -134,11 +129,7 @@ mdcreate(Relation reln)
 
 	Assert(reln->rd_unlinked && reln->rd_fd < 0);
 
-#ifdef OLD_FILE_NAMING
-	path = relpath(RelationGetPhysicalRelationName(reln));
-#else
 	path = relpath(reln->rd_node);
-#endif
 	fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600);
 
 	/*
@@ -336,11 +327,7 @@ mdopen(Relation reln)
 	int			vfd;
 
 	Assert(reln->rd_fd < 0);
-#ifdef OLD_FILE_NAMING
-	path = relpath(RelationGetPhysicalRelationName(reln));
-#else
 	path = relpath(reln->rd_node);
-#endif
 
 	fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600);
 	if (fd < 0)
@@ -579,30 +566,16 @@ mdflush(Relation reln, BlockNumber blocknum, char *buffer)
  *		the file, making it more like mdflush().
  */
 int
-#ifdef OLD_FILE_NAMING
-mdblindwrt(char *dbname,
-		   char *relname,
-		   Oid dbid,
-		   Oid relid,
-		   BlockNumber blkno,
-		   char *buffer,
-		   bool dofsync)
-#else
 mdblindwrt(RelFileNode rnode,
 		   BlockNumber blkno,
 		   char *buffer,
 		   bool dofsync)
-#endif
 {
 	int			status;
 	long		seekpos;
 	int			fd;
 
-#ifdef OLD_FILE_NAMING
-	fd = _mdfd_blind_getseg(dbname, relname, dbid, relid, blkno);
-#else
 	fd = _mdfd_blind_getseg(rnode, blkno);
-#endif
 
 	if (fd < 0)
 		return SM_FAIL;
@@ -676,25 +649,13 @@ mdmarkdirty(Relation reln, BlockNumber blkno)
  *		rather than building md/fd datastructures to postpone it till later.
  */
 int
-#ifdef OLD_FILE_NAMING
-mdblindmarkdirty(char *dbname,
-				 char *relname,
-				 Oid dbid,
-				 Oid relid,
-				 BlockNumber blkno)
-#else
 mdblindmarkdirty(RelFileNode rnode,
 				 BlockNumber blkno)
-#endif
 {
 	int			status;
 	int			fd;
 
-#ifdef OLD_FILE_NAMING
-	fd = _mdfd_blind_getseg(dbname, relname, dbid, relid, blkno);
-#else
 	fd = _mdfd_blind_getseg(rnode, blkno);
-#endif
 
 	if (fd < 0)
 		return SM_FAIL;
@@ -915,6 +876,22 @@ mdabort()
 	return SM_SUCCESS;
 }
 
+#ifdef XLOG
+/*
+ *	mdsync() -- Sync storage.
+ *
+ */
+int
+mdsync()
+{
+	sync();
+	if (IsUnderPostmaster)
+		sleep(2);
+	sync();
+	return SM_SUCCESS;
+}
+#endif
+
 /*
  *	_fdvec_alloc () -- grab a free (or new) md file descriptor vector.
  *
@@ -996,11 +973,7 @@ _mdfd_openseg(Relation reln, int segno, int oflags)
 			   *fullpath;
 
 	/* be sure we have enough space for the '.segno', if any */
-#ifdef OLD_FILE_NAMING
-	path = relpath(RelationGetPhysicalRelationName(reln));
-#else
 	path = relpath(reln->rd_node);
-#endif
 
 	if (segno > 0)
 	{
@@ -1115,12 +1088,7 @@ _mdfd_getseg(Relation reln, int blkno)
  */
 
 static int
-#ifdef OLD_FILE_NAMING
-_mdfd_blind_getseg(char *dbname, char *relname, Oid dbid, Oid relid,
-				   int blkno)
-#else
 _mdfd_blind_getseg(RelFileNode rnode, int blkno)
-#endif
 {
 	char	   *path;
 	int			fd;
@@ -1130,12 +1098,7 @@ _mdfd_blind_getseg(RelFileNode rnode, int blkno)
 
 #endif
 
-#ifdef OLD_FILE_NAMING
-	/* construct the path to the relation */
-	path = relpath_blind(dbname, relname, dbid, relid);
-#else
 	path = relpath(rnode);
-#endif
 
 #ifndef LET_OS_MANAGE_FILESIZE
 	/* append the '.segno', if needed */
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 65bc5595a85..d2a940a76e5 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -11,7 +11,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.41 2000/10/21 15:43:31 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.42 2000/10/28 16:20:57 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -36,27 +36,17 @@ typedef struct f_smgr
 										   char *buffer);
 	int			(*smgr_flush) (Relation reln, BlockNumber blocknum,
 										   char *buffer);
-#ifdef OLD_FILE_NAMING
-	int			(*smgr_blindwrt) (char *dbname, char *relname,
-											  Oid dbid, Oid relid,
-										 BlockNumber blkno, char *buffer,
-											  bool dofsync);
-#else
 	int			(*smgr_blindwrt) (RelFileNode rnode, BlockNumber blkno, 
 										char *buffer, bool dofsync);
-#endif
 	int			(*smgr_markdirty) (Relation reln, BlockNumber blkno);
-#ifdef OLD_FILE_NAMING
-	int			(*smgr_blindmarkdirty) (char *dbname, char *relname,
-													Oid dbid, Oid relid,
-													BlockNumber blkno);
-#else
 	int			(*smgr_blindmarkdirty) (RelFileNode, BlockNumber blkno);
-#endif
 	int			(*smgr_nblocks) (Relation reln);
 	int			(*smgr_truncate) (Relation reln, int nblocks);
 	int			(*smgr_commit) (void);	/* may be NULL */
 	int			(*smgr_abort) (void);	/* may be NULL */
+#ifdef XLOG
+	int			(*smgr_sync) (void);
+#endif
 } f_smgr;
 
 /*
@@ -69,7 +59,11 @@ static f_smgr smgrsw[] = {
 	/* magnetic disk */
 	{mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
 		mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
+#ifdef XLOG
+	mdnblocks, mdtruncate, mdcommit, mdabort, mdsync},
+#else
 	mdnblocks, mdtruncate, mdcommit, mdabort},
+#endif
 
 #ifdef STABLE_MEMORY_STORAGE
 	/* main memory */
@@ -310,40 +304,6 @@ smgrflush(int16 which, Relation reln, BlockNumber blocknum, char *buffer)
  *		this page down to stable storage in this circumstance.	The
  *		write should be synchronous if dofsync is true.
  */
-#ifdef OLD_FILE_NAMING
-int
-smgrblindwrt(int16 which,
-			 char *dbname,
-			 char *relname,
-			 Oid dbid,
-			 Oid relid,
-			 BlockNumber blkno,
-			 char *buffer,
-			 bool dofsync)
-{
-	char	   *dbstr;
-	char	   *relstr;
-	int			status;
-
-	/* strdup here is probably redundant */
-	dbstr = pstrdup(dbname);
-	relstr = pstrdup(relname);
-
-	status = (*(smgrsw[which].smgr_blindwrt)) (dbstr, relstr, dbid, relid,
-											   blkno, buffer, dofsync);
-
-	if (status == SM_FAIL)
-		elog(ERROR, "cannot write block %d of %s [%s] blind: %m",
-			 blkno, relstr, dbstr);
-
-	pfree(dbstr);
-	pfree(relstr);
-
-	return status;
-}
-
-#else
-
 int
 smgrblindwrt(int16 which,
 			 RelFileNode rnode,
@@ -361,7 +321,6 @@ smgrblindwrt(int16 which,
 
 	return status;
 }
-#endif
 
 /*
  *	smgrmarkdirty() -- Mark a page dirty (needs fsync).
@@ -394,39 +353,6 @@ smgrmarkdirty(int16 which,
  *
  *		Just like smgrmarkdirty, except we don't have a reldesc.
  */
-#ifdef OLD_FILE_NAMING
-int
-smgrblindmarkdirty(int16 which,
-				   char *dbname,
-				   char *relname,
-				   Oid dbid,
-				   Oid relid,
-				   BlockNumber blkno)
-{
-	char	   *dbstr;
-	char	   *relstr;
-	int			status;
-
-	/* strdup here is probably redundant */
-	dbstr = pstrdup(dbname);
-	relstr = pstrdup(relname);
-
-	status = (*(smgrsw[which].smgr_blindmarkdirty)) (dbstr, relstr,
-													 dbid, relid,
-													 blkno);
-
-	if (status == SM_FAIL)
-		elog(ERROR, "cannot mark block %d of %s [%s] blind: %m",
-			 blkno, relstr, dbstr);
-
-	pfree(dbstr);
-	pfree(relstr);
-
-	return status;
-}
-
-#else
-
 int
 smgrblindmarkdirty(int16 which,
 				   RelFileNode rnode,
@@ -442,7 +368,6 @@ smgrblindmarkdirty(int16 which,
 
 	return status;
 }
-#endif
 
 /*
  *	smgrnblocks() -- Calculate the number of POSTGRES blocks in the
@@ -528,6 +453,27 @@ smgrabort()
 	return SM_SUCCESS;
 }
 
+#ifdef XLOG
+int
+smgrsync()
+{
+	int			i;
+
+	for (i = 0; i < NSmgr; i++)
+	{
+		if (smgrsw[i].smgr_sync)
+		{
+			if ((*(smgrsw[i].smgr_sync)) () == SM_FAIL)
+				elog(STOP, "storage sync failed on %s: %m",
+					 DatumGetCString(DirectFunctionCall1(smgrout,
+														 Int16GetDatum(i))));
+		}
+	}
+
+	return SM_SUCCESS;
+}
+#endif
+
 #ifdef NOT_USED
 bool
 smgriswo(int16 smgrno)
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index de3e3c4a8d3..ea7a8d0212c 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.113 2000/10/23 04:10:08 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.114 2000/10/28 16:20:57 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -2064,7 +2064,62 @@ RelationCacheInitializePhase2(void)
 	}
 }
 
+#ifdef XLOG		/* used by XLogInitCache */
 
+void CreateDummyCaches(void);
+void DestroyDummyCaches(void);
+
+void
+CreateDummyCaches(void)
+{
+	MemoryContext	oldcxt;
+	HASHCTL			ctl;
+
+	if (!CacheMemoryContext)
+		CreateCacheMemoryContext();
+
+	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+	MemSet(&ctl, 0, (int) sizeof(ctl));
+	ctl.keysize = sizeof(NameData);
+	ctl.datasize = sizeof(Relation);
+	RelationNameCache = hash_create(INITRELCACHESIZE, &ctl, HASH_ELEM);
+
+	ctl.keysize = sizeof(Oid);
+	ctl.hash = tag_hash;
+	RelationIdCache = hash_create(INITRELCACHESIZE, &ctl,
+								  HASH_ELEM | HASH_FUNCTION);
+
+	ctl.keysize = sizeof(RelFileNode);
+	ctl.hash = tag_hash;
+	RelationNodeCache = hash_create(INITRELCACHESIZE, &ctl,
+								  HASH_ELEM | HASH_FUNCTION);
+	MemoryContextSwitchTo(oldcxt);
+}
+
+void
+DestroyDummyCaches(void)
+{
+	MemoryContext	oldcxt;
+
+	if (!CacheMemoryContext)
+		return;
+
+	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+	if (RelationNameCache)
+		hash_destroy(RelationNameCache);
+	if (RelationIdCache)
+		hash_destroy(RelationIdCache);
+	if (RelationNodeCache)
+		hash_destroy(RelationNodeCache);
+
+	RelationNameCache = RelationIdCache = RelationNodeCache = NULL;
+
+	MemoryContextSwitchTo(oldcxt);
+}
+
+#endif	/* XLOG */
 
 static void
 AttrDefaultFetch(Relation relation)
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index cee8dfaac90..fbc9cc2ab2c 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/utils/init/postinit.c,v 1.68 2000/10/16 14:52:15 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/utils/init/postinit.c,v 1.69 2000/10/28 16:20:58 vadim Exp $
  *
  *
  *-------------------------------------------------------------------------
@@ -231,9 +231,6 @@ InitPostgres(const char *dbname, const char *username)
 {
 	bool		bootstrap = IsBootstrapProcessingMode();
 
-	/* initialize the local buffer manager */
-	InitLocalBuffer();
-
 #ifndef XLOG
 	if (!TransactionFlushEnabled())
 		on_shmem_exit(FlushBufferPool, 0);
@@ -414,4 +411,8 @@ BaseInit(void)
 	smgrinit();
 
 	EnablePortalManager();		/* memory for portal/transaction stuff */
+
+	/* initialize the local buffer manager */
+	InitLocalBuffer();
+
 }
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 752682ca969..415ad56b959 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: transam.h,v 1.24 2000/01/26 05:57:51 momjian Exp $
+ * $Id: transam.h,v 1.25 2000/10/28 16:20:59 vadim Exp $
  *
  *	 NOTES
  *		Transaction System Version 101 now support proper oid
@@ -67,7 +67,11 @@ typedef unsigned char XidStatus;/* (2 bits) */
  *		transaction page definitions
  * ----------------
  */
+#ifdef XLOG
+#define TP_DataSize				(BLCKSZ - sizeof(XLogRecPtr))
+#else
 #define TP_DataSize				BLCKSZ
+#endif
 #define TP_NumXidStatusPerBlock (TP_DataSize * 4)
 
 /* ----------------
@@ -84,6 +88,10 @@ typedef unsigned char XidStatus;/* (2 bits) */
  */
 typedef struct LogRelationContentsData
 {
+#ifdef XLOG
+	XLogRecPtr	LSN;		/* temp hack: LSN is member of any block */
+							/* so should be described in bufmgr */
+#endif
 	int			TransSystemVersion;
 } LogRelationContentsData;
 
@@ -107,6 +115,9 @@ typedef LogRelationContentsData *LogRelationContents;
  */
 typedef struct VariableRelationContentsData
 {
+#ifdef XLOG
+	XLogRecPtr	LSN;
+#endif
 	int			TransSystemVersion;
 	TransactionId nextXidData;
 	TransactionId lastXidData;	/* unused */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 712e88b6005..18ca96f3d83 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: xact.h,v 1.28 2000/10/20 11:01:14 vadim Exp $
+ * $Id: xact.h,v 1.29 2000/10/28 16:20:59 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -135,6 +135,8 @@ extern bool IsTransactionBlock(void);
 extern void UserAbortTransactionBlock(void);
 extern void AbortOutOfAnyTransaction(void);
 
+extern void RecordTransactionCommit(void);
+
 extern TransactionId DisabledTransactionId;
 
 extern void XactPushRollback(void (*func) (void *), void* data);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index c77c1cac02a..02998755c32 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -10,12 +10,7 @@
 
 #include "access/rmgr.h"
 #include "access/transam.h"
-
-typedef struct XLogRecPtr
-{
-	uint32		xlogid;			/* log file #, 0 based */
-	uint32		xrecoff;		/* offset of record in log file */
-} XLogRecPtr;
+#include "access/xlogdefs.h"
 
 typedef struct XLogRecord
 {
@@ -83,12 +78,7 @@ typedef XLogPageHeaderData *XLogPageHeader;
 #define XLByteEQ(left, right)		\
 			(right.xlogid == left.xlogid && right.xrecoff ==  left.xrecoff)
 
-/*
- * StartUpID (SUI) - system startups counter.
- * It's to allow removing pg_log after shutdown.
- */
-typedef	uint32		StartUpID;
-extern	StartUpID	ThisStartUpID;
+extern	StartUpID	ThisStartUpID;	/* current SUI */
 extern	bool		InRecovery;
 extern	XLogRecPtr	MyLastRecPtr;
 
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
new file mode 100644
index 00000000000..ce1b3ef8cf6
--- /dev/null
+++ b/src/include/access/xlogdefs.h
@@ -0,0 +1,24 @@
+/*
+ *
+ * xlogdefs.h
+ *
+ * Postgres transaction log manager record pointer and
+ * system stratup number definitions
+ *
+ */
+#ifndef XLOG_DEFS_H
+#define XLOG_DEFS_H
+
+typedef struct XLogRecPtr
+{
+	uint32		xlogid;			/* log file #, 0 based */
+	uint32		xrecoff;		/* offset of record in log file */
+} XLogRecPtr;
+
+/*
+ * StartUpID (SUI) - system startups counter. It's to allow removing
+ * pg_log after shutdown, in future.
+ */
+typedef	uint32		StartUpID;
+
+#endif	 /* XLOG_DEFS_H */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index f62f726d831..b8fa3549f42 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -9,8 +9,10 @@ extern bool XLogIsValidTuple(RelFileNode hnode, ItemPointer iptr);
 
 extern void XLogOpenLogRelation(void);
 
-extern Buffer XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno);
+extern void XLogInitRelationCache(void);
 extern void XLogCloseRelationCache(void);
+
 extern Relation XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode);
+extern Buffer XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno);
 
 #endif
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 65abe9b8ceb..80aca7c57e9 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: buf_internals.h,v 1.41 2000/10/23 04:10:14 vadim Exp $
+ * $Id: buf_internals.h,v 1.42 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -109,6 +109,10 @@ typedef struct sbufdesc
 	bool		ri_lock;		/* read-intent lock */
 	bool		w_lock;			/* context exclusively locked */
 
+#ifdef XLOG
+	bool		cntxDirty;		/* new way to mark block as dirty */
+#endif
+
 	BufferBlindId blind;		/* was used to support blind write */
 
 	/*
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 551f98e75f9..0ed4837305d 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: bufmgr.h,v 1.41 2000/10/20 11:01:21 vadim Exp $
+ * $Id: bufmgr.h,v 1.42 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -15,7 +15,7 @@
 #define BUFMGR_H
 
 #include "storage/buf_internals.h"
-
+#include "access/xlogdefs.h"
 
 typedef void *Block;
 
@@ -177,4 +177,9 @@ extern void AbortBufferIO(void);
 extern bool BufferIsUpdatable(Buffer buffer);
 extern void MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc)(Buffer));
 
+#ifdef XLOG
+extern void BufmgrCommit(void);
+extern void BufferSync(void);
+#endif
+
 #endif
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index d547f71b736..78b22f392cf 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: bufpage.h,v 1.34 2000/10/21 15:43:36 vadim Exp $
+ * $Id: bufpage.h,v 1.35 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -118,7 +118,8 @@ typedef OpaqueData *Opaque;
  */
 typedef struct PageHeaderData
 {
-#ifdef XLOG
+#ifdef XLOG						/* XXX LSN is member of *any* block, not */
+								/* only page-organized - 'll change later */
 	XLogRecPtr	pd_lsn;			/* LSN: next byte after last byte of xlog */
 								/* record for last change of this page */
 	StartUpID	pd_sui;			/* SUI of last changes (currently it's */
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 7caac813e9a..49a2e3e5e92 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: smgr.h,v 1.22 2000/10/16 14:52:28 vadim Exp $
+ * $Id: smgr.h,v 1.23 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -36,26 +36,19 @@ extern int smgrwrite(int16 which, Relation reln, BlockNumber blocknum,
 		  char *buffer);
 extern int smgrflush(int16 which, Relation reln, BlockNumber blocknum,
 		  char *buffer);
-#ifdef OLD_FILE_NAMING
-extern int smgrblindwrt(int16 which, char *dbname, char *relname,
-			 Oid dbid, Oid relid,
-			 BlockNumber blkno, char *buffer,
-			 bool dofsync);
-extern int smgrblindmarkdirty(int16 which, char *dbname, char *relname,
-				   Oid dbid, Oid relid,
-				   BlockNumber blkno);
-#else
 extern int smgrblindwrt(int16 which, RelFileNode rnode,
 						BlockNumber blkno, char *buffer, bool dofsync);
 extern int smgrblindmarkdirty(int16 which, RelFileNode rnode,
 						BlockNumber blkno);
-#endif
 extern int	smgrmarkdirty(int16 which, Relation reln, BlockNumber blkno);
 extern int	smgrnblocks(int16 which, Relation reln);
 extern int	smgrtruncate(int16 which, Relation reln, int nblocks);
 extern int	smgrcommit(void);
 extern int	smgrabort(void);
 
+#ifdef XLOG
+extern int	smgrsync(void);
+#endif
 
 
 /* internals: move me elsewhere -- ay 7/94 */
@@ -71,22 +64,18 @@ extern int	mdread(Relation reln, BlockNumber blocknum, char *buffer);
 extern int	mdwrite(Relation reln, BlockNumber blocknum, char *buffer);
 extern int	mdflush(Relation reln, BlockNumber blocknum, char *buffer);
 extern int	mdmarkdirty(Relation reln, BlockNumber blkno);
-#ifdef OLD_FILE_NAMING
-extern int mdblindwrt(char *dbname, char *relname, Oid dbid, Oid relid,
-		   BlockNumber blkno, char *buffer,
-		   bool dofsync);
-extern int mdblindmarkdirty(char *dbname, char *relname, Oid dbid, Oid relid,
-				 BlockNumber blkno);
-#else
 extern int mdblindwrt(RelFileNode rnode, BlockNumber blkno,
 						char *buffer, bool dofsync);
 extern int mdblindmarkdirty(RelFileNode rnode, BlockNumber blkno);
-#endif
 extern int	mdnblocks(Relation reln);
 extern int	mdtruncate(Relation reln, int nblocks);
 extern int	mdcommit(void);
 extern int	mdabort(void);
 
+#ifdef XLOG
+extern int	mdsync(void);
+#endif
+
 /* mm.c */
 extern SPINLOCK MMCacheLock;
 
-- 
GitLab