From 2095206de13b3fc9643cac7eef8c0f51b56cb556 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Wed, 2 Jun 2004 17:28:18 +0000
Subject: [PATCH] Adjust btree index build to not use shared buffers, thereby
 avoiding the locking conflict against concurrent CHECKPOINT that was
 discussed a few weeks ago.  Also, if not using WAL archiving (which is always
 true ATM but won't be if PITR makes it into this release), there's no need to
 WAL-log the index build process; it's sufficient to force-fsync the completed
 index before commit.  This seems to gain about a factor of 2 in my tests,
 which is consistent with writing half as much data.  I did not try it with
 WAL on a separate drive though --- probably the gain would be a lot less in
 that scenario.

---
 src/backend/access/nbtree/nbtpage.c | 121 ++++---------
 src/backend/access/nbtree/nbtree.c  |  11 +-
 src/backend/access/nbtree/nbtsort.c | 260 +++++++++++++++++++---------
 src/backend/access/nbtree/nbtxlog.c |  42 ++---
 src/backend/storage/smgr/md.c       |  36 +++-
 src/backend/storage/smgr/smgr.c     |  34 +++-
 src/include/access/nbtree.h         |  10 +-
 src/include/storage/smgr.h          |   4 +-
 8 files changed, 304 insertions(+), 214 deletions(-)

diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index a4c6c0b974c..002fb018dba 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.75 2004/04/21 18:24:25 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.76 2004/06/02 17:28:17 tgl Exp $
  *
  *	NOTES
  *	   Postgres btree pages look like ordinary relation pages.	The opaque
@@ -31,8 +31,9 @@
 /*
  *	_bt_metapinit() -- Initialize the metadata page of a new btree.
  *
- * If markvalid is true, the index is immediately marked valid, else it
- * will be invalid until _bt_metaproot() is called.
+ * Note: this is actually not used for standard btree index building;
+ * nbtsort.c prefers not to make the metadata page valid until completion
+ * of build.
  *
  * Note: there's no real need for any locking here.  Since the transaction
  * creating the index hasn't committed yet, no one else can even see the index
@@ -40,12 +41,11 @@
  * not true, but we assume the caller holds sufficient locks on the index.)
  */
 void
-_bt_metapinit(Relation rel, bool markvalid)
+_bt_metapinit(Relation rel)
 {
 	Buffer		buf;
 	Page		pg;
 	BTMetaPageData *metad;
-	BTPageOpaque op;
 
 	if (RelationGetNumberOfBlocks(rel) != 0)
 		elog(ERROR, "cannot initialize non-empty btree index \"%s\"",
@@ -55,21 +55,11 @@ _bt_metapinit(Relation rel, bool markvalid)
 	Assert(BufferGetBlockNumber(buf) == BTREE_METAPAGE);
 	pg = BufferGetPage(buf);
 
-	/* NO ELOG(ERROR) from here till newmeta op is logged */
-	START_CRIT_SECTION();
-
-	_bt_pageinit(pg, BufferGetPageSize(buf));
-
+	_bt_initmetapage(pg, P_NONE, 0);
 	metad = BTPageGetMeta(pg);
-	metad->btm_magic = markvalid ? BTREE_MAGIC : 0;
-	metad->btm_version = BTREE_VERSION;
-	metad->btm_root = P_NONE;
-	metad->btm_level = 0;
-	metad->btm_fastroot = P_NONE;
-	metad->btm_fastlevel = 0;
 
-	op = (BTPageOpaque) PageGetSpecialPointer(pg);
-	op->btpo_flags = BTP_META;
+	/* NO ELOG(ERROR) from here till newmeta op is logged */
+	START_CRIT_SECTION();
 
 	/* XLOG stuff */
 	if (!rel->rd_istemp)
@@ -90,7 +80,7 @@ _bt_metapinit(Relation rel, bool markvalid)
 		rdata[0].next = NULL;
 
 		recptr = XLogInsert(RM_BTREE_ID,
-							markvalid ? XLOG_BTREE_NEWMETA : XLOG_BTREE_INVALIDMETA,
+							XLOG_BTREE_NEWMETA,
 							rdata);
 
 		PageSetLSN(pg, recptr);
@@ -102,6 +92,29 @@ _bt_metapinit(Relation rel, bool markvalid)
 	WriteBuffer(buf);
 }
 
+/*
+ *	_bt_initmetapage() -- Fill a page buffer with a correct metapage image
+ */
+void
+_bt_initmetapage(Page page, BlockNumber rootbknum, uint32 level)
+{
+	BTMetaPageData *metad;
+	BTPageOpaque metaopaque;
+
+	_bt_pageinit(page, BLCKSZ);
+
+	metad = BTPageGetMeta(page);
+	metad->btm_magic = BTREE_MAGIC;
+	metad->btm_version = BTREE_VERSION;
+	metad->btm_root = rootbknum;
+	metad->btm_level = level;
+	metad->btm_fastroot = rootbknum;
+	metad->btm_fastlevel = level;
+
+	metaopaque = (BTPageOpaque) PageGetSpecialPointer(page);
+	metaopaque->btpo_flags = BTP_META;
+}
+
 /*
  *	_bt_getroot() -- Get the root page of the btree.
  *
@@ -609,76 +622,6 @@ _bt_page_recyclable(Page page)
 	return false;
 }
 
-/*
- *	_bt_metaproot() -- Change the root page of the btree.
- *
- *		Lehman and Yao require that the root page move around in order to
- *		guarantee deadlock-free short-term, fine-granularity locking.  When
- *		we split the root page, we record the new parent in the metadata page
- *		for the relation.  This routine does the work.
- *
- *		No direct preconditions, but if you don't have the write lock on
- *		at least the old root page when you call this, you're making a big
- *		mistake.  On exit, metapage data is correct and we no longer have
- *		a pin or lock on the metapage.
- *
- * Actually this is not used for splitting on-the-fly anymore.	It's only used
- * in nbtsort.c at the completion of btree building, where we know we have
- * sole access to the index anyway.
- */
-void
-_bt_metaproot(Relation rel, BlockNumber rootbknum, uint32 level)
-{
-	Buffer		metabuf;
-	Page		metap;
-	BTPageOpaque metaopaque;
-	BTMetaPageData *metad;
-
-	metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
-	metap = BufferGetPage(metabuf);
-	metaopaque = (BTPageOpaque) PageGetSpecialPointer(metap);
-	Assert(metaopaque->btpo_flags & BTP_META);
-
-	/* NO ELOG(ERROR) from here till newmeta op is logged */
-	START_CRIT_SECTION();
-
-	metad = BTPageGetMeta(metap);
-	Assert(metad->btm_magic == BTREE_MAGIC || metad->btm_magic == 0);
-	metad->btm_magic = BTREE_MAGIC;		/* it's valid now for sure */
-	metad->btm_root = rootbknum;
-	metad->btm_level = level;
-	metad->btm_fastroot = rootbknum;
-	metad->btm_fastlevel = level;
-
-	/* XLOG stuff */
-	if (!rel->rd_istemp)
-	{
-		xl_btree_newmeta xlrec;
-		XLogRecPtr	recptr;
-		XLogRecData rdata[1];
-
-		xlrec.node = rel->rd_node;
-		xlrec.meta.root = metad->btm_root;
-		xlrec.meta.level = metad->btm_level;
-		xlrec.meta.fastroot = metad->btm_fastroot;
-		xlrec.meta.fastlevel = metad->btm_fastlevel;
-
-		rdata[0].buffer = InvalidBuffer;
-		rdata[0].data = (char *) &xlrec;
-		rdata[0].len = SizeOfBtreeNewmeta;
-		rdata[0].next = NULL;
-
-		recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWMETA, rdata);
-
-		PageSetLSN(metap, recptr);
-		PageSetSUI(metap, ThisStartUpID);
-	}
-
-	END_CRIT_SECTION();
-
-	_bt_wrtbuf(rel, metabuf);
-}
-
 /*
  * Delete item(s) from a btree page.
  *
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index f89ec91caf3..7a1d8372d73 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -12,7 +12,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtree.c,v 1.116 2004/05/31 19:24:04 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtree.c,v 1.117 2004/06/02 17:28:17 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -112,10 +112,6 @@ btbuild(PG_FUNCTION_ARGS)
 		elog(ERROR, "index \"%s\" already contains data",
 			 RelationGetRelationName(index));
 
-	/* initialize the btree index metadata page */
-	/* mark it valid right away only if using slow build */
-	_bt_metapinit(index, !buildstate.usefast);
-
 	if (buildstate.usefast)
 	{
 		buildstate.spool = _bt_spoolinit(index, indexInfo->ii_Unique, false);
@@ -127,6 +123,11 @@ btbuild(PG_FUNCTION_ARGS)
 		if (indexInfo->ii_Unique)
 			buildstate.spool2 = _bt_spoolinit(index, false, true);
 	}
+	else
+	{
+		/* if using slow build, initialize the btree index metadata page */
+		_bt_metapinit(index);
+	}
 
 	/* do the heap scan */
 	reltuples = IndexBuildHeapScan(heap, index, indexInfo,
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 08be20a0271..ba0383c0bcd 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -31,12 +31,32 @@
  * (there aren't many upper pages if the keys are reasonable-size) without
  * incurring a lot of cascading splits during early insertions.
  *
+ * Formerly the index pages being built were kept in shared buffers, but
+ * that is of no value (since other backends have no interest in them yet)
+ * and it created locking problems for CHECKPOINT, because the upper-level
+ * pages were held exclusive-locked for long periods.  Now we just build
+ * the pages in local memory and smgrwrite() them as we finish them.  They
+ * will need to be re-read into shared buffers on first use after the build
+ * finishes.
+ *
+ * Since the index will never be used unless it is completely built,
+ * from a crash-recovery point of view there is no need to WAL-log the
+ * steps of the build.  After completing the index build, we can just sync
+ * the whole file to disk using smgrimmedsync() before exiting this module.
+ * This can be seen to be sufficient for crash recovery by considering that
+ * it's effectively equivalent to what would happen if a CHECKPOINT occurred
+ * just after the index build.  However, it is clearly not sufficient if the
+ * DBA is using the WAL log for PITR or replication purposes, since another
+ * machine would not be able to reconstruct the index from WAL.  Therefore,
+ * we log the completed index pages to WAL if and only if WAL archiving is
+ * active.
+ *
  *
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtsort.c,v 1.81 2004/02/03 17:34:02 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtsort.c,v 1.82 2004/06/02 17:28:17 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -45,11 +65,14 @@
 
 #include "access/nbtree.h"
 #include "miscadmin.h"
+#include "storage/smgr.h"
 #include "utils/tuplesort.h"
 
 
 /*
- * Status record for spooling.
+ * Status record for spooling/sorting phase.  (Note we may have two of
+ * these due to the special requirements for uniqueness-checking with
+ * dead tuples.)
  */
 struct BTSpool
 {
@@ -73,8 +96,8 @@ struct BTSpool
  */
 typedef struct BTPageState
 {
-	Buffer		btps_buf;		/* current buffer & page */
-	Page		btps_page;
+	Page		btps_page;		/* workspace for page building */
+	BlockNumber	btps_blkno;		/* block # to write this page at */
 	BTItem		btps_minkey;	/* copy of minimum key (first item) on
 								 * page */
 	OffsetNumber btps_lastoff;	/* last item offset loaded */
@@ -84,6 +107,18 @@ typedef struct BTPageState
 	struct BTPageState *btps_next;		/* link to parent level, if any */
 } BTPageState;
 
+/*
+ * Overall status record for index writing phase.
+ */
+typedef struct BTWriteState
+{
+	Relation	index;
+	bool		btws_use_wal;		/* dump pages to WAL? */
+	BlockNumber	btws_pages_alloced;	/* # pages allocated */
+	BlockNumber	btws_pages_written;	/* # pages written out */
+	Page		btws_zeropage;		/* workspace for filling zeroes */
+} BTWriteState;
+
 
 #define BTITEMSZ(btitem) \
 	((btitem) ? \
@@ -92,15 +127,15 @@ typedef struct BTPageState
 	 0)
 
 
-static void _bt_blnewpage(Relation index, Buffer *buf, Page *page,
-			  uint32 level);
-static BTPageState *_bt_pagestate(Relation index, uint32 level);
-static void _bt_slideleft(Relation index, Buffer buf, Page page);
+static Page _bt_blnewpage(uint32 level);
+static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level);
+static void _bt_slideleft(Page page);
 static void _bt_sortaddtup(Page page, Size itemsize,
 			   BTItem btitem, OffsetNumber itup_off);
-static void _bt_buildadd(Relation index, BTPageState *state, BTItem bti);
-static void _bt_uppershutdown(Relation index, BTPageState *state);
-static void _bt_load(Relation index, BTSpool *btspool, BTSpool *btspool2);
+static void _bt_buildadd(BTWriteState *wstate, BTPageState *state, BTItem bti);
+static void _bt_uppershutdown(BTWriteState *wstate, BTPageState *state);
+static void _bt_load(BTWriteState *wstate,
+					 BTSpool *btspool, BTSpool *btspool2);
 
 
 /*
@@ -169,6 +204,8 @@ _bt_spool(BTItem btitem, BTSpool *btspool)
 void
 _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 {
+	BTWriteState	wstate;
+
 #ifdef BTREE_BUILD_STATS
 	if (log_btree_build_stats)
 	{
@@ -180,7 +217,26 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 	tuplesort_performsort(btspool->sortstate);
 	if (btspool2)
 		tuplesort_performsort(btspool2->sortstate);
-	_bt_load(btspool->index, btspool, btspool2);
+
+	wstate.index = btspool->index;
+	/*
+	 * We need to log index creation in WAL iff WAL archiving is enabled
+	 * AND it's not a temp index.
+	 *
+	 * XXX when WAL archiving is actually supported, this test will likely
+	 * need to change; and the hardwired extern is cruddy anyway ...
+	 */
+	{
+		extern char XLOG_archive_dir[];
+
+		wstate.btws_use_wal = XLOG_archive_dir[0] && !wstate.index->rd_istemp;
+	}
+	/* reserve the metapage */
+	wstate.btws_pages_alloced = BTREE_METAPAGE + 1;
+	wstate.btws_pages_written = 0;
+	wstate.btws_zeropage = NULL;	 /* until needed */
+
+	_bt_load(&wstate, btspool, btspool2);
 }
 
 
@@ -190,70 +246,101 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 
 
 /*
- * allocate a new, clean btree page, not linked to any siblings.
+ * allocate workspace for a new, clean btree page, not linked to any siblings.
  */
-static void
-_bt_blnewpage(Relation index, Buffer *buf, Page *page, uint32 level)
+static Page
+_bt_blnewpage(uint32 level)
 {
+	Page	page;
 	BTPageOpaque opaque;
 
-	*buf = _bt_getbuf(index, P_NEW, BT_WRITE);
-	*page = BufferGetPage(*buf);
+	page = (Page) palloc(BLCKSZ);
 
 	/* Zero the page and set up standard page header info */
-	_bt_pageinit(*page, BufferGetPageSize(*buf));
+	_bt_pageinit(page, BLCKSZ);
 
 	/* Initialize BT opaque state */
-	opaque = (BTPageOpaque) PageGetSpecialPointer(*page);
+	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 	opaque->btpo_prev = opaque->btpo_next = P_NONE;
 	opaque->btpo.level = level;
 	opaque->btpo_flags = (level > 0) ? 0 : BTP_LEAF;
 
 	/* Make the P_HIKEY line pointer appear allocated */
-	((PageHeader) *page)->pd_lower += sizeof(ItemIdData);
+	((PageHeader) page)->pd_lower += sizeof(ItemIdData);
+
+	return page;
 }
 
 /*
- * emit a completed btree page, and release the lock and pin on it.
- * This is essentially _bt_wrtbuf except we also emit a WAL record.
+ * emit a completed btree page, and release the working storage.
  */
 static void
-_bt_blwritepage(Relation index, Buffer buf)
+_bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
 {
-	Page		pg = BufferGetPage(buf);
-
-	/* NO ELOG(ERROR) from here till newpage op is logged */
-	START_CRIT_SECTION();
-
 	/* XLOG stuff */
-	if (!index->rd_istemp)
+	if (wstate->btws_use_wal)
 	{
 		xl_btree_newpage xlrec;
 		XLogRecPtr	recptr;
 		XLogRecData rdata[2];
 
-		xlrec.node = index->rd_node;
-		xlrec.blkno = BufferGetBlockNumber(buf);
+		/* NO ELOG(ERROR) from here till newpage op is logged */
+		START_CRIT_SECTION();
+
+		xlrec.node = wstate->index->rd_node;
+		xlrec.blkno = blkno;
 
 		rdata[0].buffer = InvalidBuffer;
 		rdata[0].data = (char *) &xlrec;
 		rdata[0].len = SizeOfBtreeNewpage;
 		rdata[0].next = &(rdata[1]);
 
-		rdata[1].buffer = buf;
-		rdata[1].data = (char *) pg;
+		rdata[1].buffer = InvalidBuffer;
+		rdata[1].data = (char *) page;
 		rdata[1].len = BLCKSZ;
 		rdata[1].next = NULL;
 
 		recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWPAGE, rdata);
 
-		PageSetLSN(pg, recptr);
-		PageSetSUI(pg, ThisStartUpID);
+		PageSetLSN(page, recptr);
+		PageSetSUI(page, ThisStartUpID);
+
+		END_CRIT_SECTION();
+	}
+	else
+	{
+		/* Leave the page LSN zero if not WAL-logged, but set SUI anyway */
+		PageSetSUI(page, ThisStartUpID);
 	}
 
-	END_CRIT_SECTION();
+	/*
+	 * If we have to write pages nonsequentially, fill in the space with
+	 * zeroes until we come back and overwrite.  This is not logically
+	 * necessary on standard Unix filesystems (unwritten space will read
+	 * as zeroes anyway), but it should help to avoid fragmentation.
+	 * The dummy pages aren't WAL-logged though.
+	 */
+	while (blkno > wstate->btws_pages_written)
+	{
+		if (!wstate->btws_zeropage)
+			wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
+		smgrwrite(wstate->index->rd_smgr, wstate->btws_pages_written++,
+				  (char *) wstate->btws_zeropage,
+				  !wstate->btws_use_wal);
+	}
 
-	_bt_wrtbuf(index, buf);
+	/*
+	 * Now write the page.  If not using WAL, say isTemp = true, to suppress
+	 * duplicate fsync.  If we are using WAL, it surely isn't a temp index,
+	 * so !use_wal is a sufficient condition.
+	 */
+	smgrwrite(wstate->index->rd_smgr, blkno, (char *) page,
+			  !wstate->btws_use_wal);
+
+	if (blkno == wstate->btws_pages_written)
+		wstate->btws_pages_written++;
+
+	pfree(page);
 }
 
 /*
@@ -261,12 +348,15 @@ _bt_blwritepage(Relation index, Buffer buf)
  * is suitable for immediate use by _bt_buildadd.
  */
 static BTPageState *
-_bt_pagestate(Relation index, uint32 level)
+_bt_pagestate(BTWriteState *wstate, uint32 level)
 {
 	BTPageState *state = (BTPageState *) palloc0(sizeof(BTPageState));
 
-	/* create initial page */
-	_bt_blnewpage(index, &(state->btps_buf), &(state->btps_page), level);
+	/* create initial page for level */
+	state->btps_page = _bt_blnewpage(level);
+
+	/* and assign it a page position */
+	state->btps_blkno = wstate->btws_pages_alloced++;
 
 	state->btps_minkey = NULL;
 	/* initialize lastoff so first item goes into P_FIRSTKEY */
@@ -290,7 +380,7 @@ _bt_pagestate(Relation index, uint32 level)
  * P_RIGHTMOST page.
  */
 static void
-_bt_slideleft(Relation index, Buffer buf, Page page)
+_bt_slideleft(Page page)
 {
 	OffsetNumber off;
 	OffsetNumber maxoff;
@@ -380,16 +470,16 @@ _bt_sortaddtup(Page page,
  *----------
  */
 static void
-_bt_buildadd(Relation index, BTPageState *state, BTItem bti)
+_bt_buildadd(BTWriteState *wstate, BTPageState *state, BTItem bti)
 {
-	Buffer		nbuf;
 	Page		npage;
+	BlockNumber	nblkno;
 	OffsetNumber last_off;
 	Size		pgspc;
 	Size		btisz;
 
-	nbuf = state->btps_buf;
 	npage = state->btps_page;
+	nblkno = state->btps_blkno;
 	last_off = state->btps_lastoff;
 
 	pgspc = PageGetFreeSpace(npage);
@@ -420,14 +510,17 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti)
 		 * Item won't fit on this page, or we feel the page is full enough
 		 * already.  Finish off the page and write it out.
 		 */
-		Buffer		obuf = nbuf;
 		Page		opage = npage;
+		BlockNumber	oblkno = nblkno;
 		ItemId		ii;
 		ItemId		hii;
 		BTItem		obti;
 
-		/* Create new page on same level */
-		_bt_blnewpage(index, &nbuf, &npage, state->btps_level);
+		/* Create new page of same level */
+		npage = _bt_blnewpage(state->btps_level);
+
+		/* and assign it a page position */
+		nblkno = wstate->btws_pages_alloced++;
 
 		/*
 		 * We copy the last item on the page into the new page, and then
@@ -451,17 +544,17 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti)
 		((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
 
 		/*
-		 * Link the old buffer into its parent, using its minimum key. If
+		 * Link the old page into its parent, using its minimum key. If
 		 * we don't have a parent, we have to create one; this adds a new
 		 * btree level.
 		 */
 		if (state->btps_next == NULL)
-			state->btps_next = _bt_pagestate(index, state->btps_level + 1);
+			state->btps_next = _bt_pagestate(wstate, state->btps_level + 1);
 
 		Assert(state->btps_minkey != NULL);
 		ItemPointerSet(&(state->btps_minkey->bti_itup.t_tid),
-					   BufferGetBlockNumber(obuf), P_HIKEY);
-		_bt_buildadd(index, state->btps_next, state->btps_minkey);
+					   oblkno, P_HIKEY);
+		_bt_buildadd(wstate, state->btps_next, state->btps_minkey);
 		pfree((void *) state->btps_minkey);
 
 		/*
@@ -478,16 +571,16 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti)
 			BTPageOpaque oopaque = (BTPageOpaque) PageGetSpecialPointer(opage);
 			BTPageOpaque nopaque = (BTPageOpaque) PageGetSpecialPointer(npage);
 
-			oopaque->btpo_next = BufferGetBlockNumber(nbuf);
-			nopaque->btpo_prev = BufferGetBlockNumber(obuf);
+			oopaque->btpo_next = nblkno;
+			nopaque->btpo_prev = oblkno;
 			nopaque->btpo_next = P_NONE;		/* redundant */
 		}
 
 		/*
-		 * Write out the old page.	We never want to see it again, so we
-		 * can give up our lock.
+		 * Write out the old page.	We never need to touch it again,
+		 * so we can free the opage workspace too.
 		 */
-		_bt_blwritepage(index, obuf);
+		_bt_blwritepage(wstate, opage, oblkno);
 
 		/*
 		 * Reset last_off to point to new page
@@ -513,8 +606,8 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti)
 	last_off = OffsetNumberNext(last_off);
 	_bt_sortaddtup(npage, btisz, bti, last_off);
 
-	state->btps_buf = nbuf;
 	state->btps_page = npage;
+	state->btps_blkno = nblkno;
 	state->btps_lastoff = last_off;
 }
 
@@ -522,11 +615,12 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti)
  * Finish writing out the completed btree.
  */
 static void
-_bt_uppershutdown(Relation index, BTPageState *state)
+_bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
 {
 	BTPageState *s;
 	BlockNumber	rootblkno = P_NONE;
 	uint32		rootlevel = 0;
+	Page		metapage;
 
 	/*
 	 * Each iteration of this loop completes one more level of the tree.
@@ -536,7 +630,7 @@ _bt_uppershutdown(Relation index, BTPageState *state)
 		BlockNumber blkno;
 		BTPageOpaque opaque;
 
-		blkno = BufferGetBlockNumber(s->btps_buf);
+		blkno = s->btps_blkno;
 		opaque = (BTPageOpaque) PageGetSpecialPointer(s->btps_page);
 
 		/*
@@ -558,7 +652,7 @@ _bt_uppershutdown(Relation index, BTPageState *state)
 			Assert(s->btps_minkey != NULL);
 			ItemPointerSet(&(s->btps_minkey->bti_itup.t_tid),
 						   blkno, P_HIKEY);
-			_bt_buildadd(index, s->btps_next, s->btps_minkey);
+			_bt_buildadd(wstate, s->btps_next, s->btps_minkey);
 			pfree((void *) s->btps_minkey);
 			s->btps_minkey = NULL;
 		}
@@ -567,17 +661,20 @@ _bt_uppershutdown(Relation index, BTPageState *state)
 		 * This is the rightmost page, so the ItemId array needs to be
 		 * slid back one slot.	Then we can dump out the page.
 		 */
-		_bt_slideleft(index, s->btps_buf, s->btps_page);
-		_bt_blwritepage(index, s->btps_buf);
+		_bt_slideleft(s->btps_page);
+		_bt_blwritepage(wstate, s->btps_page, s->btps_blkno);
+		s->btps_page = NULL;	/* writepage freed the workspace */
 	}
 
 	/*
-	 * As the last step in the process, update the metapage to point to
-	 * the new root (unless we had no data at all, in which case it's
-	 * left pointing to "P_NONE").  This changes the index to the "valid"
-	 * state by updating its magic number.
+	 * As the last step in the process, construct the metapage and make it
+	 * point to the new root (unless we had no data at all, in which case it's
+	 * set to point to "P_NONE").  This changes the index to the "valid"
+	 * state by filling in a valid magic number in the metapage.
 	 */
-	_bt_metaproot(index, rootblkno, rootlevel);
+	metapage = (Page) palloc(BLCKSZ);
+	_bt_initmetapage(metapage, rootblkno, rootlevel);
+	_bt_blwritepage(wstate, metapage, BTREE_METAPAGE);
 }
 
 /*
@@ -585,7 +682,7 @@ _bt_uppershutdown(Relation index, BTPageState *state)
  * btree leaves.
  */
 static void
-_bt_load(Relation index, BTSpool *btspool, BTSpool *btspool2)
+_bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 {
 	BTPageState *state = NULL;
 	bool		merge = (btspool2 != NULL);
@@ -594,9 +691,9 @@ _bt_load(Relation index, BTSpool *btspool, BTSpool *btspool2)
 	bool		should_free,
 				should_free2,
 				load1;
-	TupleDesc	tupdes = RelationGetDescr(index);
+	TupleDesc	tupdes = RelationGetDescr(wstate->index);
 	int			i,
-				keysz = RelationGetNumberOfAttributes(index);
+				keysz = RelationGetNumberOfAttributes(wstate->index);
 	ScanKey		indexScanKey = NULL;
 
 	if (merge)
@@ -611,7 +708,7 @@ _bt_load(Relation index, BTSpool *btspool, BTSpool *btspool2)
 											   true, &should_free);
 		bti2 = (BTItem) tuplesort_getindextuple(btspool2->sortstate,
 												true, &should_free2);
-		indexScanKey = _bt_mkscankey_nodata(index);
+		indexScanKey = _bt_mkscankey_nodata(wstate->index);
 
 		for (;;)
 		{
@@ -668,11 +765,11 @@ _bt_load(Relation index, BTSpool *btspool, BTSpool *btspool2)
 
 			/* When we see first tuple, create first index page */
 			if (state == NULL)
-				state = _bt_pagestate(index, 0);
+				state = _bt_pagestate(wstate, 0);
 
 			if (load1)
 			{
-				_bt_buildadd(index, state, bti);
+				_bt_buildadd(wstate, state, bti);
 				if (should_free)
 					pfree((void *) bti);
 				bti = (BTItem) tuplesort_getindextuple(btspool->sortstate,
@@ -680,7 +777,7 @@ _bt_load(Relation index, BTSpool *btspool, BTSpool *btspool2)
 			}
 			else
 			{
-				_bt_buildadd(index, state, bti2);
+				_bt_buildadd(wstate, state, bti2);
 				if (should_free2)
 					pfree((void *) bti2);
 				bti2 = (BTItem) tuplesort_getindextuple(btspool2->sortstate,
@@ -697,14 +794,21 @@ _bt_load(Relation index, BTSpool *btspool, BTSpool *btspool2)
 		{
 			/* When we see first tuple, create first index page */
 			if (state == NULL)
-				state = _bt_pagestate(index, 0);
+				state = _bt_pagestate(wstate, 0);
 
-			_bt_buildadd(index, state, bti);
+			_bt_buildadd(wstate, state, bti);
 			if (should_free)
 				pfree((void *) bti);
 		}
 	}
 
-	/* Close down final pages and rewrite the metapage */
-	_bt_uppershutdown(index, state);
+	/* Close down final pages and write the metapage */
+	_bt_uppershutdown(wstate, state);
+
+	/*
+	 * If we weren't using WAL, and the index isn't temp, we must fsync it
+	 * down to disk before it's safe to commit the transaction.
+	 */
+	if (!wstate->btws_use_wal && !wstate->index->rd_istemp)
+		smgrimmedsync(wstate->index->rd_smgr);
 }
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index ed47dba2bac..ed398b32da5 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.12 2004/05/30 23:40:25 neilc Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.13 2004/06/02 17:28:17 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -110,8 +110,7 @@ _bt_restore_page(Page page, char *from, int len)
 static void
 _bt_restore_meta(Relation reln, XLogRecPtr lsn,
 				 BlockNumber root, uint32 level,
-				 BlockNumber fastroot, uint32 fastlevel,
-				 bool markvalid)
+				 BlockNumber fastroot, uint32 fastlevel)
 {
 	Buffer		metabuf;
 	Page		metapg;
@@ -126,7 +125,7 @@ _bt_restore_meta(Relation reln, XLogRecPtr lsn,
 	_bt_pageinit(metapg, BufferGetPageSize(metabuf));
 
 	md = BTPageGetMeta(metapg);
-	md->btm_magic = markvalid ? BTREE_MAGIC : 0;
+	md->btm_magic = BTREE_MAGIC;
 	md->btm_version = BTREE_VERSION;
 	md->btm_root = root;
 	md->btm_level = level;
@@ -223,8 +222,7 @@ btree_xlog_insert(bool redo, bool isleaf, bool ismeta,
 		if (ismeta)
 			_bt_restore_meta(reln, lsn,
 							 md.root, md.level,
-							 md.fastroot, md.fastlevel,
-							 true);
+							 md.fastroot, md.fastlevel);
 	}
 
 	/* Forget any split this insertion completes */
@@ -594,8 +592,7 @@ btree_xlog_delete_page(bool redo, bool ismeta,
 				   sizeof(xl_btree_metadata));
 			_bt_restore_meta(reln, lsn,
 							 md.root, md.level,
-							 md.fastroot, md.fastlevel,
-							 true);
+							 md.fastroot, md.fastlevel);
 		}
 	}
 }
@@ -641,8 +638,7 @@ btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
 
 	_bt_restore_meta(reln, lsn,
 					 xlrec->rootblk, xlrec->level,
-					 xlrec->rootblk, xlrec->level,
-					 true);
+					 xlrec->rootblk, xlrec->level);
 
 	/* Check to see if this satisfies any incomplete insertions */
 	if (record->xl_len > SizeOfBtreeNewroot &&
@@ -656,8 +652,7 @@ btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
 }
 
 static void
-btree_xlog_newmeta(bool redo, XLogRecPtr lsn, XLogRecord *record,
-				   bool markvalid)
+btree_xlog_newmeta(bool redo, XLogRecPtr lsn, XLogRecord *record)
 {
 	xl_btree_newmeta *xlrec = (xl_btree_newmeta *) XLogRecGetData(record);
 	Relation	reln;
@@ -671,8 +666,7 @@ btree_xlog_newmeta(bool redo, XLogRecPtr lsn, XLogRecord *record,
 
 	_bt_restore_meta(reln, lsn,
 					 xlrec->meta.root, xlrec->meta.level,
-					 xlrec->meta.fastroot, xlrec->meta.fastlevel,
-					 markvalid);
+					 xlrec->meta.fastroot, xlrec->meta.fastlevel);
 }
 
 static void
@@ -745,14 +739,11 @@ btree_redo(XLogRecPtr lsn, XLogRecord *record)
 			btree_xlog_newroot(true, lsn, record);
 			break;
 		case XLOG_BTREE_NEWMETA:
-			btree_xlog_newmeta(true, lsn, record, true);
+			btree_xlog_newmeta(true, lsn, record);
 			break;
 		case XLOG_BTREE_NEWPAGE:
 			btree_xlog_newpage(true, lsn, record);
 			break;
-		case XLOG_BTREE_INVALIDMETA:
-			btree_xlog_newmeta(true, lsn, record, false);
-			break;
 		default:
 			elog(PANIC, "btree_redo: unknown op code %u", info);
 	}
@@ -799,14 +790,11 @@ btree_undo(XLogRecPtr lsn, XLogRecord *record)
 			btree_xlog_newroot(false, lsn, record);
 			break;
 		case XLOG_BTREE_NEWMETA:
-			btree_xlog_newmeta(false, lsn, record, true);
+			btree_xlog_newmeta(false, lsn, record);
 			break;
 		case XLOG_BTREE_NEWPAGE:
 			btree_xlog_newpage(false, lsn, record);
 			break;
-		case XLOG_BTREE_INVALIDMETA:
-			btree_xlog_newmeta(false, lsn, record, false);
-			break;
 		default:
 			elog(PANIC, "btree_undo: unknown op code %u", info);
 	}
@@ -939,16 +927,6 @@ btree_desc(char *buf, uint8 xl_info, char *rec)
 						xlrec->blkno);
 				break;
 			}
-		case XLOG_BTREE_INVALIDMETA:
-			{
-				xl_btree_newmeta *xlrec = (xl_btree_newmeta *) rec;
-
-				sprintf(buf + strlen(buf), "invalidmeta: node %u/%u; root %u lev %u fast %u lev %u",
-						xlrec->node.tblNode, xlrec->node.relNode,
-						xlrec->meta.root, xlrec->meta.level,
-						xlrec->meta.fastroot, xlrec->meta.fastlevel);
-				break;
-			}
 		default:
 			strcat(buf, "UNKNOWN");
 			break;
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 1a0218c4e58..4f0d241215d 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/storage/smgr/md.c,v 1.106 2004/05/31 20:31:33 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/storage/smgr/md.c,v 1.107 2004/06/02 17:28:18 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -661,6 +661,40 @@ mdtruncate(SMgrRelation reln, BlockNumber nblocks, bool isTemp)
 	return nblocks;
 }
 
+/*
+ *	mdimmedsync() -- Immediately sync a relation to stable storage.
+ */
+bool
+mdimmedsync(SMgrRelation reln)
+{
+	MdfdVec    *v;
+	BlockNumber curnblk;
+
+	/*
+	 * NOTE: mdnblocks makes sure we have opened all existing segments, so
+	 * that fsync loop will get them all!
+	 */
+	curnblk = mdnblocks(reln);
+	if (curnblk == InvalidBlockNumber)
+		return false;			/* mdnblocks failed */
+
+	v = mdopen(reln, false);
+
+#ifndef LET_OS_MANAGE_FILESIZE
+	while (v != NULL)
+	{
+		if (FileSync(v->mdfd_vfd) < 0)
+			return false;
+		v = v->mdfd_chain;
+	}
+#else
+	if (FileSync(v->mdfd_vfd) < 0)
+		return false;
+#endif
+
+	return true;
+}
+
 /*
  *	mdsync() -- Sync previous writes to stable storage.
  *
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 5320532be4b..8977f026e4f 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -11,7 +11,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/storage/smgr/smgr.c,v 1.72 2004/05/31 20:31:33 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/storage/smgr/smgr.c,v 1.73 2004/06/02 17:28:18 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -48,6 +48,7 @@ typedef struct f_smgr
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln);
 	BlockNumber (*smgr_truncate) (SMgrRelation reln, BlockNumber nblocks,
 								  bool isTemp);
+	bool		(*smgr_immedsync) (SMgrRelation reln);
 	bool		(*smgr_commit) (void);			/* may be NULL */
 	bool		(*smgr_abort) (void);			/* may be NULL */
 	bool		(*smgr_sync) (void);			/* may be NULL */
@@ -57,7 +58,8 @@ typedef struct f_smgr
 static const f_smgr smgrsw[] = {
 	/* magnetic disk */
 	{mdinit, NULL, mdclose, mdcreate, mdunlink, mdextend,
-	 mdread, mdwrite, mdnblocks, mdtruncate, NULL, NULL, mdsync
+	 mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync,
+	 NULL, NULL, mdsync
 	}
 };
 
@@ -582,6 +584,34 @@ smgrtruncate(SMgrRelation reln, BlockNumber nblocks, bool isTemp)
 	return newblks;
 }
 
+/*
+ *	smgrimmedsync() -- Force the specified relation to stable storage.
+ *
+ *		Synchronously force all of the specified relation down to disk.
+ *
+ *		This is really only useful for non-WAL-logged index building:
+ *		instead of incrementally WAL-logging the index build steps,
+ *		we can just write completed index pages to disk with smgrwrite
+ *		or smgrextend, and then fsync the completed index file before
+ *		committing the transaction.  (This is sufficient for purposes of
+ *		crash recovery, since it effectively duplicates forcing a checkpoint
+ *		for the completed index.  But it is *not* workable if one wishes
+ *		to use the WAL log for PITR or replication purposes.)
+ *
+ *		The preceding writes should specify isTemp = true to avoid
+ *		duplicative fsyncs.
+ */
+void
+smgrimmedsync(SMgrRelation reln)
+{
+	if (! (*(smgrsw[reln->smgr_which].smgr_immedsync)) (reln))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not sync relation %u/%u: %m",
+						reln->smgr_rnode.tblNode,
+						reln->smgr_rnode.relNode)));
+}
+
 /*
  *	smgrDoPendingDeletes() -- Take care of relation deletes at end of xact.
  */
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index bf64b70ebcf..e8e4c46a820 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/nbtree.h,v 1.77 2004/04/21 18:24:26 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/nbtree.h,v 1.78 2004/06/02 17:28:18 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -198,7 +198,6 @@ typedef BTItemData *BTItem;
 #define XLOG_BTREE_NEWROOT		0xA0	/* new root page */
 #define XLOG_BTREE_NEWMETA		0xB0	/* update metadata page */
 #define XLOG_BTREE_NEWPAGE		0xC0	/* new index page during build */
-#define XLOG_BTREE_INVALIDMETA	0xD0	/* new metadata, temp. invalid */
 
 /*
  * All that we need to find changed index tuple
@@ -315,8 +314,7 @@ typedef struct xl_btree_newroot
 
 /*
  * New metapage log record.  This is not issued during routine operations;
- * it's only used when initializing an empty index and at completion of
- * index build.
+ * it's only used when initializing an empty index.
  */
 typedef struct xl_btree_newmeta
 {
@@ -442,7 +440,8 @@ extern void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
 /*
  * prototypes for functions in nbtpage.c
  */
-extern void _bt_metapinit(Relation rel, bool markvalid);
+extern void _bt_metapinit(Relation rel);
+extern void _bt_initmetapage(Page page, BlockNumber rootbknum, uint32 level);
 extern Buffer _bt_getroot(Relation rel, int access);
 extern Buffer _bt_gettrueroot(Relation rel);
 extern Buffer _bt_getbuf(Relation rel, BlockNumber blkno, int access);
@@ -453,7 +452,6 @@ extern void _bt_wrtbuf(Relation rel, Buffer buf);
 extern void _bt_wrtnorelbuf(Relation rel, Buffer buf);
 extern void _bt_pageinit(Page page, Size size);
 extern bool _bt_page_recyclable(Page page);
-extern void _bt_metaproot(Relation rel, BlockNumber rootbknum, uint32 level);
 extern void _bt_delitems(Relation rel, Buffer buf,
 			 OffsetNumber *itemnos, int nitems);
 extern int	_bt_pagedel(Relation rel, Buffer buf, bool vacuum_full);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 6999635dc46..52040432dcc 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/smgr.h,v 1.43 2004/05/31 20:31:33 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/storage/smgr.h,v 1.44 2004/06/02 17:28:18 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -63,6 +63,7 @@ extern void smgrwrite(SMgrRelation reln, BlockNumber blocknum, char *buffer,
 extern BlockNumber smgrnblocks(SMgrRelation reln);
 extern BlockNumber smgrtruncate(SMgrRelation reln, BlockNumber nblocks,
 								bool isTemp);
+extern void smgrimmedsync(SMgrRelation reln);
 extern void smgrDoPendingDeletes(bool isCommit);
 extern int	smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
 extern void smgrcommit(void);
@@ -89,6 +90,7 @@ extern bool mdwrite(SMgrRelation reln, BlockNumber blocknum, char *buffer,
 extern BlockNumber mdnblocks(SMgrRelation reln);
 extern BlockNumber mdtruncate(SMgrRelation reln, BlockNumber nblocks,
 							  bool isTemp);
+extern bool mdimmedsync(SMgrRelation reln);
 extern bool mdsync(void);
 
 extern void RememberFsyncRequest(RelFileNode rnode, BlockNumber segno);
-- 
GitLab