From af5ced9cfdb1aefd5e64af405d7e582c99a55106 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Fri, 29 Jun 2001 21:08:25 +0000
Subject: [PATCH] Further work on connecting the free space map (which is still
 just a stub) into the rest of the system.  Adopt a cleaner approach to
 preventing deadlock in concurrent heap_updates: allow
 RelationGetBufferForTuple to select any page of the rel, and put the onus on
 it to lock both buffers in a consistent order.  Remove no-longer-needed
 isExtend hack from API of ReleaseAndReadBuffer.

---
 src/backend/access/heap/heapam.c          |  34 ++--
 src/backend/access/heap/hio.c             | 231 ++++++++++++++--------
 src/backend/access/transam/xlogutils.c    |   4 +-
 src/backend/catalog/heap.c                |  79 +-------
 src/backend/commands/sequence.c           |  11 +-
 src/backend/storage/buffer/bufmgr.c       |  63 +++---
 src/backend/storage/freespace/freespace.c |  35 +++-
 src/backend/storage/smgr/smgr.c           |  17 +-
 src/backend/utils/cache/relcache.c        | 112 +++++++++--
 src/include/access/hio.h                  |   4 +-
 src/include/storage/bufmgr.h              |   6 +-
 src/include/utils/relcache.h              |  14 +-
 12 files changed, 379 insertions(+), 231 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index b86425f7d11..2560d7a10ac 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.120 2001/06/27 23:31:38 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.121 2001/06/29 21:08:23 tgl Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -202,8 +202,7 @@ heapgettup(Relation relation,
 
 		*buffer = ReleaseAndReadBuffer(*buffer,
 									   relation,
-									   ItemPointerGetBlockNumber(tid),
-									   false);
+									   ItemPointerGetBlockNumber(tid));
 		if (!BufferIsValid(*buffer))
 			elog(ERROR, "heapgettup: failed ReadBuffer");
 
@@ -238,8 +237,7 @@ heapgettup(Relation relation,
 
 		*buffer = ReleaseAndReadBuffer(*buffer,
 									   relation,
-									   page,
-									   false);
+									   page);
 		if (!BufferIsValid(*buffer))
 			elog(ERROR, "heapgettup: failed ReadBuffer");
 
@@ -280,8 +278,7 @@ heapgettup(Relation relation,
 
 		*buffer = ReleaseAndReadBuffer(*buffer,
 									   relation,
-									   page,
-									   false);
+									   page);
 		if (!BufferIsValid(*buffer))
 			elog(ERROR, "heapgettup: failed ReadBuffer");
 
@@ -374,8 +371,7 @@ heapgettup(Relation relation,
 
 		*buffer = ReleaseAndReadBuffer(*buffer,
 									   relation,
-									   page,
-									   false);
+									   page);
 		if (!BufferIsValid(*buffer))
 			elog(ERROR, "heapgettup: failed ReadBuffer");
 
@@ -1088,8 +1084,8 @@ heap_insert(Relation relation, HeapTuple tup)
 		heap_tuple_toast_attrs(relation, tup, NULL);
 #endif
 
-	/* Find buffer for this tuple */
-	buffer = RelationGetBufferForTuple(relation, tup->t_len, 0);
+	/* Find buffer to insert this tuple into */
+	buffer = RelationGetBufferForTuple(relation, tup->t_len, InvalidBuffer);
 
 	/* NO ELOG(ERROR) from here till changes are logged */
 	START_CRIT_SECTION();
@@ -1501,18 +1497,16 @@ l2:
 		 * buffer locks on both old and new pages.  To avoid deadlock against
 		 * some other backend trying to get the same two locks in the other
 		 * order, we must be consistent about the order we get the locks in.
-		 * We use the rule "lock the higher-numbered page of the relation
+		 * We use the rule "lock the lower-numbered page of the relation
 		 * first".  To implement this, we must do RelationGetBufferForTuple
-		 * while not holding the lock on the old page, and we must tell it
-		 * to give us a page beyond the old page.
+		 * while not holding the lock on the old page, and we must rely on it
+		 * to get the locks on both pages in the correct order.
 		 */
 		if (newtupsize > pagefree)
 		{
 			/* Assume there's no chance to put newtup on same page. */
 			newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
-											BufferGetBlockNumber(buffer) + 1);
-			/* Now reacquire lock on old tuple's page. */
-			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+											   buffer);
 		}
 		else
 		{
@@ -1529,8 +1523,7 @@ l2:
 				 */
 				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 				newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
-											BufferGetBlockNumber(buffer) + 1);
-				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+												   buffer);
 			}
 			else
 			{
@@ -1550,7 +1543,8 @@ l2:
 
 	/*
 	 * At this point newbuf and buffer are both pinned and locked,
-	 * and newbuf has enough space for the new tuple.
+	 * and newbuf has enough space for the new tuple.  If they are
+	 * the same buffer, only one pin is held.
 	 */
 
 	/* NO ELOG(ERROR) from here till changes are logged */
diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index 3a520f2c315..82b1ec50f61 100644
--- a/src/backend/access/heap/hio.c
+++ b/src/backend/access/heap/hio.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Id: hio.c,v 1.40 2001/06/27 23:31:38 tgl Exp $
+ *	  $Id: hio.c,v 1.41 2001/06/29 21:08:23 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -17,6 +17,8 @@
 
 #include "access/heapam.h"
 #include "access/hio.h"
+#include "storage/freespace.h"
+
 
 /*
  * RelationPutHeapTuple - place tuple at specified page
@@ -66,16 +68,29 @@ RelationPutHeapTuple(Relation relation,
 /*
  * RelationGetBufferForTuple
  *
- *	Returns exclusive-locked buffer with free space >= given len,
- *	being careful to select only a page at or beyond minblocknum
- *	in the relation.
+ *	Returns pinned and exclusive-locked buffer of a page in given relation
+ *	with free space >= given len.
+ *
+ *	If otherBuffer is not InvalidBuffer, then it references a previously
+ *	pinned buffer of another page in the same relation; on return, this
+ *	buffer will also be exclusive-locked.  (This case is used by heap_update;
+ *	the otherBuffer contains the tuple being updated.)
  *
- *	The minblocknum parameter is needed to prevent deadlock between
- *	concurrent heap_update operations; see heap_update for details.
- *	Pass zero if you don't particularly care which page you get.
+ *	The reason for passing otherBuffer is that if two backends are doing
+ *	concurrent heap_update operations, a deadlock could occur if they try
+ *	to lock the same two buffers in opposite orders.  To ensure that this
+ *	can't happen, we impose the rule that buffers of a relation must be
+ *	locked in increasing page number order.  This is most conveniently done
+ *	by having RelationGetBufferForTuple lock them both, with suitable care
+ *	for ordering.
  *
- *	Note that we use LockPage to lock relation for extension. We can
- *	do this as long as in all other places we use page-level locking
+ *	NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
+ *	same buffer we select for insertion of the new tuple (this could only
+ *	happen if space is freed in that page after heap_update finds there's not
+ *	enough there).  In that case, the page will be pinned and locked only once.
+ *
+ *	Note that we use LockPage(rel, 0) to lock relation for extension.
+ *	We can do this as long as in all other places we use page-level locking
  *	for indices only. Alternatively, we could define pseudo-table as
  *	we do for transactions with XactLockTable.
  *
@@ -84,12 +99,13 @@ RelationPutHeapTuple(Relation relation,
  */
 Buffer
 RelationGetBufferForTuple(Relation relation, Size len,
-						  BlockNumber minblocknum)
+						  Buffer otherBuffer)
 {
 	Buffer		buffer = InvalidBuffer;
 	Page		pageHeader;
-	BlockNumber lastblock,
-				oldnblocks;
+	Size		pageFreeSpace;
+	BlockNumber targetBlock,
+				otherBlock;
 
 	len = MAXALIGN(len);		/* be conservative */
 
@@ -100,36 +116,118 @@ RelationGetBufferForTuple(Relation relation, Size len,
 		elog(ERROR, "Tuple is too big: size %lu, max size %ld",
 			 (unsigned long) len, MaxTupleSize);
 
+	if (otherBuffer != InvalidBuffer)
+		otherBlock = BufferGetBlockNumber(otherBuffer);
+	else
+		otherBlock = InvalidBlockNumber; /* just to keep compiler quiet */
+
 	/*
-	 * First, use relcache's record of table length to guess where the
-	 * last page is, and try to put the tuple there.  This cached value
-	 * may be out of date, in which case we'll be inserting into a non-last
-	 * page, but that should be OK.  Note that in a newly created relcache
-	 * entry, rd_nblocks may be zero; if so, we'll set it correctly below.
+	 * We first try to put the tuple on the same page we last inserted a
+	 * tuple on, as cached in the relcache entry.  If that doesn't work,
+	 * we ask the shared Free Space Map to locate a suitable page.  Since
+	 * the FSM's info might be out of date, we have to be prepared to loop
+	 * around and retry multiple times.  (To insure this isn't an infinite
+	 * loop, we must update the FSM with the correct amount of free space on
+	 * each page that proves not to be suitable.)  If the FSM has no record of
+	 * a page with enough free space, we give up and extend the relation.
 	 */
-	if (relation->rd_nblocks > 0)
+
+	targetBlock = relation->rd_targblock;
+
+	if (targetBlock == InvalidBlockNumber)
+	{
+		/*
+		 * We have no cached target page, so ask the FSM for an initial
+		 * target.
+		 */
+		targetBlock = GetPageWithFreeSpace(&relation->rd_node, len);
+		/*
+		 * If the FSM knows nothing of the rel, try the last page before
+		 * we give up and extend.  This avoids one-tuple-per-page syndrome
+		 * during bootstrapping or in a recently-started system.
+		 */
+		if (targetBlock == InvalidBlockNumber)
+		{
+			BlockNumber	nblocks = RelationGetNumberOfBlocks(relation);
+
+			if (nblocks > 0)
+				targetBlock = nblocks - 1;
+		}
+	}
+
+	while (targetBlock != InvalidBlockNumber)
 	{
-		lastblock = relation->rd_nblocks - 1;
-		if (lastblock >= minblocknum)
+		/*
+		 * Read and exclusive-lock the target block, as well as the
+		 * other block if one was given, taking suitable care with
+		 * lock ordering and the possibility they are the same block.
+		 */
+		if (otherBuffer == InvalidBuffer)
+		{
+			/* easy case */
+			buffer = ReadBuffer(relation, targetBlock);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		}
+		else if (otherBlock == targetBlock)
+		{
+			/* also easy case */
+			buffer = otherBuffer;
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		}
+		else if (otherBlock < targetBlock)
 		{
-			buffer = ReadBuffer(relation, lastblock);
+			/* lock other buffer first */
+			buffer = ReadBuffer(relation, targetBlock);
+			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-			pageHeader = (Page) BufferGetPage(buffer);
-			if (len <= PageGetFreeSpace(pageHeader))
-				return buffer;
-			/*
-			 * Doesn't fit, so we'll have to try someplace else.
-			 */
-			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-			/* buffer release will happen below... */
 		}
+		else
+		{
+			/* lock target buffer first */
+			buffer = ReadBuffer(relation, targetBlock);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
+		}
+		/*
+		 * Now we can check to see if there's enough free space here.
+		 * If so, we're done.
+		 */
+		pageHeader = (Page) BufferGetPage(buffer);
+		pageFreeSpace = PageGetFreeSpace(pageHeader);
+		if (len <= pageFreeSpace)
+		{
+			/* use this page as future insert target, too */
+			relation->rd_targblock = targetBlock;
+			return buffer;
+		}
+		/*
+		 * Not enough space, so we must give up our page locks and
+		 * pin (if any) and prepare to look elsewhere.  We don't care
+		 * which order we unlock the two buffers in, so this can be
+		 * slightly simpler than the code above.
+		 */
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+		if (otherBuffer == InvalidBuffer)
+		{
+			ReleaseBuffer(buffer);
+		}
+		else if (otherBlock != targetBlock)
+		{
+			LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
+			ReleaseBuffer(buffer);
+		}
+		/*
+		 * Update FSM as to condition of this page, and ask for another
+		 * page to try.
+		 */
+		targetBlock = RecordAndGetPageWithFreeSpace(&relation->rd_node,
+													targetBlock,
+													pageFreeSpace,
+													len);
 	}
 
 	/*
-	 * Before extending relation, make sure no one else has done
-	 * so more recently than our last rd_nblocks update.  (If we
-	 * blindly extend the relation here, then probably most of the
-	 * page the other guy added will end up going to waste.)
+	 * Have to extend the relation.
 	 *
 	 * We have to use a lock to ensure no one else is extending the
 	 * rel at the same time, else we will both try to initialize the
@@ -138,51 +236,27 @@ RelationGetBufferForTuple(Relation relation, Size len,
 	if (!relation->rd_myxactonly)
 		LockPage(relation, 0, ExclusiveLock);
 
-	oldnblocks = relation->rd_nblocks;
 	/*
 	 * XXX This does an lseek - rather expensive - but at the moment it is
 	 * the only way to accurately determine how many blocks are in a
 	 * relation.  Is it worth keeping an accurate file length in shared
 	 * memory someplace, rather than relying on the kernel to do it for us?
 	 */
-	relation->rd_nblocks = RelationGetNumberOfBlocks(relation);
+	buffer = ReadBuffer(relation, P_NEW);
 
-	if (relation->rd_nblocks > oldnblocks)
-	{
-		/*
-		 * Someone else has indeed extended the relation recently.
-		 * Try to fit our tuple into the new last page.
-		 */
-		lastblock = relation->rd_nblocks - 1;
-		if (lastblock >= minblocknum)
-		{
-			buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, false);
-			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-			pageHeader = (Page) BufferGetPage(buffer);
-			if (len <= PageGetFreeSpace(pageHeader))
-			{
-				/* OK, we don't need to extend again. */
-				if (!relation->rd_myxactonly)
-					UnlockPage(relation, 0, ExclusiveLock);
-				return buffer;
-			}
-			/*
-			 * Doesn't fit, so we'll have to extend the relation (again).
-			 */
-			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-			/* buffer release will happen below... */
-		}
-	}
+	/*
+	 * Release the file-extension lock; it's now OK for someone else
+	 * to extend the relation some more.
+	 */
+	if (!relation->rd_myxactonly)
+		UnlockPage(relation, 0, ExclusiveLock);
 
 	/*
-	 * Extend the relation by one page and update rd_nblocks for next time.
-	 *
-	 * Note: at this point minblocknum is ignored; we won't extend by more
-	 * than one block...
+	 * We can be certain that locking the otherBuffer first is OK,
+	 * since it must have a lower page number.
 	 */
-	lastblock = relation->rd_nblocks;
-	buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, true);
-	relation->rd_nblocks = lastblock + 1;
+	if (otherBuffer != InvalidBuffer)
+		LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
 
 	/*
 	 * We need to initialize the empty new page.
@@ -192,19 +266,22 @@ RelationGetBufferForTuple(Relation relation, Size len,
 	Assert(PageIsNew((PageHeader) pageHeader));
 	PageInit(pageHeader, BufferGetPageSize(buffer), 0);
 
-	/*
-	 * Release the file-extension lock; it's now OK for someone else
-	 * to extend the relation some more.
-	 */
-	if (!relation->rd_myxactonly)
-		UnlockPage(relation, 0, ExclusiveLock);
-
 	if (len > PageGetFreeSpace(pageHeader))
 	{
 		/* We should not get here given the test at the top */
-		elog(STOP, "Tuple is too big: size %lu",
-			 (unsigned long) len);
+		elog(STOP, "Tuple is too big: size %lu", (unsigned long) len);
 	}
 
+	/*
+	 * Remember the new page as our target for future insertions.
+	 *
+	 * XXX should we enter the new page into the free space map immediately,
+	 * or just keep it for this backend's exclusive use in the short run
+	 * (until VACUUM sees it)?  Seems to depend on whether you expect the
+	 * current backend to make more insertions or not, which is probably a
+	 * good bet most of the time.  So for now, don't add it to FSM yet.
+	 */
+	relation->rd_targblock = BufferGetBlockNumber(buffer);
+
 	return buffer;
 }
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index a3f440ca5f9..39bd86d8a7f 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlogutils.c,v 1.15 2001/03/22 03:59:18 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlogutils.c,v 1.16 2001/06/29 21:08:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -171,6 +171,7 @@ XLogOpenLogRelation(void)
 	sprintf(RelationGetPhysicalRelationName(logRelation), "pg_log");
 	logRelation->rd_node.tblNode = InvalidOid;
 	logRelation->rd_node.relNode = RelOid_pg_log;
+	logRelation->rd_targblock = InvalidBlockNumber;
 	logRelation->rd_fd = -1;
 	logRelation->rd_fd = smgropen(DEFAULT_SMGR, logRelation, false);
 	if (logRelation->rd_fd < 0)
@@ -384,6 +385,7 @@ XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode)
 
 		hentry->rdesc = res;
 
+		res->reldata.rd_targblock = InvalidBlockNumber;
 		res->reldata.rd_fd = -1;
 		res->reldata.rd_fd = smgropen(DEFAULT_SMGR, &(res->reldata),
 									  true /* allow failure */ );
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 1f2cdf9131d..1171376bb25 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.169 2001/06/27 23:31:38 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.170 2001/06/29 21:08:24 tgl Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -39,7 +39,6 @@
 #include "catalog/pg_attrdef.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_index.h"
-#include "catalog/pg_proc.h"
 #include "catalog/pg_relcheck.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_type.h"
@@ -50,8 +49,6 @@
 #include "optimizer/planmain.h"
 #include "optimizer/prep.h"
 #include "optimizer/var.h"
-#include "nodes/makefuncs.h"
-#include "parser/parse_clause.h"
 #include "parser/parse_expr.h"
 #include "parser/parse_relation.h"
 #include "parser/parse_target.h"
@@ -59,7 +56,6 @@
 #include "rewrite/rewriteRemove.h"
 #include "storage/smgr.h"
 #include "utils/builtins.h"
-#include "utils/catcache.h"
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/relcache.h"
@@ -197,25 +193,18 @@ heap_create(char *relname,
 	static unsigned int uniqueId = 0;
 
 	Oid			relid;
-	Relation	rel;
-	bool		nailme = false;
-	int			natts = tupDesc->natts;
-	int			i;
-	MemoryContext oldcxt;
 	Oid			tblNode = MyDatabaseId;
+	bool		nailme = false;
+	Relation	rel;
 
 	/*
 	 * sanity checks
 	 */
-	AssertArg(natts > 0);
-
 	if (relname && !allow_system_table_mods &&
 		IsSystemRelationName(relname) && IsNormalProcessingMode())
-	{
 		elog(ERROR, "Illegal class name '%s'"
 			 "\n\tThe 'pg_' name prefix is reserved for system catalogs",
 			 relname);
-	}
 
 	/*
 	 * Real ugly stuff to assign the proper relid in the relation
@@ -276,70 +265,20 @@ heap_create(char *relname,
 
 	if (istemp)
 	{
-
 		/*
 		 * replace relname of caller with a unique name for a temp
 		 * relation
 		 */
 		snprintf(relname, NAMEDATALEN, "%s_%d_%u",
-				PG_TEMP_REL_PREFIX, (int) MyProcPid, uniqueId++);
+				 PG_TEMP_REL_PREFIX, (int) MyProcPid, uniqueId++);
 	}
 
 	/*
-	 * switch to the cache context to create the relcache entry.
-	 */
-	if (!CacheMemoryContext)
-		CreateCacheMemoryContext();
-
-	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-
-	/*
-	 * allocate a new relation descriptor.
+	 * build the relcache entry.
 	 */
-	rel = (Relation) palloc(sizeof(RelationData));
-	MemSet((char *) rel, 0, sizeof(RelationData));
-	rel->rd_fd = -1;			/* physical file is not open */
-
-	RelationSetReferenceCount(rel, 1);
-
-	/*
-	 * create a new tuple descriptor from the one passed in
-	 */
-	rel->rd_att = CreateTupleDescCopyConstr(tupDesc);
-
-	/*
-	 * nail the reldesc if this is a bootstrap create reln and we may need
-	 * it in the cache later on in the bootstrap process so we don't ever
-	 * want it kicked out.	e.g. pg_attribute!!!
-	 */
-	if (nailme)
-		rel->rd_isnailed = true;
-
-	/*
-	 * initialize the fields of our new relation descriptor
-	 */
-	rel->rd_rel = (Form_pg_class) palloc(sizeof *rel->rd_rel);
-	MemSet((char *) rel->rd_rel, 0, sizeof *rel->rd_rel);
-	strcpy(RelationGetPhysicalRelationName(rel), relname);
-	rel->rd_rel->relkind = RELKIND_UNCATALOGED;
-	rel->rd_rel->relnatts = natts;
-	rel->rd_rel->reltype = InvalidOid;
-	if (tupDesc->constr)
-		rel->rd_rel->relchecks = tupDesc->constr->num_check;
-
-	for (i = 0; i < natts; i++)
-		rel->rd_att->attrs[i]->attrelid = relid;
-
-	RelationGetRelid(rel) = relid;
-
-	rel->rd_node.tblNode = tblNode;
-	rel->rd_node.relNode = relid;
-	rel->rd_rel->relfilenode = relid;
-
-	/*
-	 * done building relcache entry.
-	 */
-	MemoryContextSwitchTo(oldcxt);
+	rel = RelationBuildLocalRelation(relname, tupDesc,
+									 relid, tblNode,
+									 nailme);
 
 	/*
 	 * have the storage manager create the relation.
@@ -347,8 +286,6 @@ heap_create(char *relname,
 	if (storage_create)
 		heap_storage_create(rel);
 
-	RelationRegisterRelation(rel);
-
 	return rel;
 }
 
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 400c60b9b46..1da20e1c098 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.60 2001/06/23 00:07:34 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.61 2001/06/29 21:08:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -168,21 +168,26 @@ DefineSequence(CreateSeqStmt *seq)
 	DefineRelation(stmt, RELKIND_SEQUENCE);
 
 	rel = heap_openr(seq->seqname, AccessExclusiveLock);
-
 	tupDesc = RelationGetDescr(rel);
 
-	Assert(RelationGetNumberOfBlocks(rel) == 0);
+	/* Initialize first page of relation with special magic number */
+
 	buf = ReadBuffer(rel, P_NEW);
 
 	if (!BufferIsValid(buf))
 		elog(ERROR, "DefineSequence: ReadBuffer failed");
 
+	Assert(BufferGetBlockNumber(buf) == 0);
+
 	page = (PageHeader) BufferGetPage(buf);
 
 	PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));
 	sm = (sequence_magic *) PageGetSpecialPointer(page);
 	sm->magic = SEQ_MAGIC;
 
+	/* hack: ensure heap_insert will insert on the just-created page */
+	rel->rd_targblock = 0;
+
 	/* Now - form & insert sequence tuple */
 	tuple = heap_formtuple(tupDesc, value, null);
 	heap_insert(rel, tuple);
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 8970c907a8b..85afa46ce3b 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.113 2001/06/22 19:16:22 wieck Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.114 2001/06/29 21:08:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -91,7 +91,7 @@ extern void AbortBufferIO(void);
 #define BUFFER_IS_BROKEN(buf) ((buf->flags & BM_IO_ERROR) && !(buf->flags & BM_DIRTY))
 
 static Buffer ReadBufferInternal(Relation reln, BlockNumber blockNum,
-								 bool isExtend, bool bufferLockHeld);
+								 bool bufferLockHeld);
 static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
 							   bool *foundPtr);
 static int	ReleaseBufferWithBufferLock(Buffer buffer);
@@ -103,7 +103,9 @@ void		PrintBufferDescs(void);
  * ReadBuffer -- returns a buffer containing the requested
  *		block of the requested relation.  If the blknum
  *		requested is P_NEW, extend the relation file and
- *		allocate a new block.
+ *		allocate a new block.  (Caller is responsible for
+ *		ensuring that only one backend tries to extend a
+ *		relation at the same time!)
  *
  * Returns: the buffer number for the buffer containing
  *		the block read, or NULL on an error.  If successful,
@@ -111,6 +113,8 @@ void		PrintBufferDescs(void);
  *
  * Assume when this function is called, that reln has been
  *		opened already.
+ *
+ * Note: a side effect of a P_NEW call is to update reln->rd_nblocks.
  */
 
 #undef ReadBuffer				/* conflicts with macro when BUFMGR_DEBUG
@@ -122,28 +126,26 @@ void		PrintBufferDescs(void);
 Buffer
 ReadBuffer(Relation reln, BlockNumber blockNum)
 {
-	return ReadBufferInternal(reln, blockNum, false, false);
+	return ReadBufferInternal(reln, blockNum, false);
 }
 
 /*
  * ReadBufferInternal -- internal version of ReadBuffer with more options
  *
- * isExtend: if true, assume that we are extending the file and the caller
- * is passing the current EOF block number (ie, caller already called
- * smgrnblocks()).
- *
  * bufferLockHeld: if true, caller already acquired the bufmgr spinlock.
  * (This is assumed never to be true if dealing with a local buffer!)
  */
 static Buffer
 ReadBufferInternal(Relation reln, BlockNumber blockNum,
-				   bool isExtend, bool bufferLockHeld)
+				   bool bufferLockHeld)
 {
 	BufferDesc *bufHdr;
 	int			status;
 	bool		found;
+	bool		isExtend;
 	bool		isLocalBuf;
 
+	isExtend = (blockNum == P_NEW);
 	isLocalBuf = reln->rd_myxactonly;
 
 	if (isLocalBuf)
@@ -151,11 +153,10 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
 		ReadLocalBufferCount++;
 		pgstat_count_buffer_read(&reln->pgstat_info, reln);
 		/* Substitute proper block number if caller asked for P_NEW */
-		if (blockNum == P_NEW)
+		if (isExtend)
 		{
 			blockNum = reln->rd_nblocks;
 			reln->rd_nblocks++;
-			isExtend = true;
 		}
 		bufHdr = LocalBufferAlloc(reln, blockNum, &found);
 		if (found)
@@ -169,10 +170,11 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
 		ReadBufferCount++;
 		pgstat_count_buffer_read(&reln->pgstat_info, reln);
 		/* Substitute proper block number if caller asked for P_NEW */
-		if (blockNum == P_NEW)
+		if (isExtend)
 		{
-			blockNum = smgrnblocks(DEFAULT_SMGR, reln);
-			isExtend = true;
+			/* must be sure we have accurate file length! */
+			blockNum = reln->rd_nblocks = smgrnblocks(DEFAULT_SMGR, reln);
+			reln->rd_nblocks++;
 		}
 		/*
 		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested
@@ -649,26 +651,24 @@ WriteNoReleaseBuffer(Buffer buffer)
  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
  *		to save a spinlock release/acquire.
  *
- * An additional frammish of this routine is that the caller may perform
- * file extension (as if blockNum = P_NEW) by passing the actual current
- * EOF block number as blockNum and setting isExtend true.  This hack
- * allows us to avoid calling smgrnblocks() again when the caller has
- * already done it.
- *
- * Note: it is OK to pass buffer = InvalidBuffer, indicating that no old
- * buffer actually needs to be released.  This case is the same as ReadBuffer
- * except for the isExtend option.
- *
  * Also, if the passed buffer is valid and already contains the desired block
  * number, we simply return it without ever acquiring the spinlock at all.
  * Since the passed buffer must be pinned, it's OK to examine its block
  * number without getting the lock first.
+ *
+ * Note: it is OK to pass buffer = InvalidBuffer, indicating that no old
+ * buffer actually needs to be released.  This case is the same as ReadBuffer,
+ * but can save some tests in the caller.
+ *
+ * Also note: while it will work to call this routine with blockNum == P_NEW,
+ * it's best to avoid doing so, since that would result in calling
+ * smgrnblocks() while holding the bufmgr spinlock, hence some loss of
+ * concurrency.
  */
 Buffer
 ReleaseAndReadBuffer(Buffer buffer,
 					 Relation relation,
-					 BlockNumber blockNum,
-					 bool isExtend)
+					 BlockNumber blockNum)
 {
 	BufferDesc *bufHdr;
 
@@ -703,14 +703,12 @@ ReleaseAndReadBuffer(Buffer buffer,
 					AddBufferToFreelist(bufHdr);
 					bufHdr->flags |= BM_FREE;
 				}
-				return ReadBufferInternal(relation, blockNum,
-										  isExtend, true);
+				return ReadBufferInternal(relation, blockNum, true);
 			}
 		}
 	}
 
-	return ReadBufferInternal(relation, blockNum,
-							  isExtend, false);
+	return ReadBufferInternal(relation, blockNum, false);
 }
 
 /*
@@ -1738,14 +1736,13 @@ ReleaseAndReadBuffer_Debug(char *file,
 						   int line,
 						   Buffer buffer,
 						   Relation relation,
-						   BlockNumber blockNum,
-						   bool isExtend)
+						   BlockNumber blockNum)
 {
 	bool		bufferValid;
 	Buffer		b;
 
 	bufferValid = BufferIsValid(buffer);
-	b = ReleaseAndReadBuffer(buffer, relation, blockNum, isExtend);
+	b = ReleaseAndReadBuffer(buffer, relation, blockNum);
 	if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
 		&& is_userbuffer(buffer))
 	{
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 84f7066348e..b4d0a605ed0 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/freespace/freespace.c,v 1.1 2001/06/27 23:31:39 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/freespace/freespace.c,v 1.2 2001/06/29 21:08:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -161,10 +161,42 @@ FreeSpaceShmemSize(void)
 	return size;
 }
 
+BlockNumber
+GetPageWithFreeSpace(RelFileNode *rel, Size spaceNeeded)
+{
+	return InvalidBlockNumber;	/* stub */
+}
+
+void
+RecordFreeSpace(RelFileNode *rel, BlockNumber page, Size spaceAvail)
+{
+	/* stub */
+}
+
+BlockNumber
+RecordAndGetPageWithFreeSpace(RelFileNode *rel,
+							  BlockNumber oldPage,
+							  Size oldSpaceAvail,
+							  Size spaceNeeded)
+{
+	return InvalidBlockNumber;	/* stub */
+}
+
+void
+MultiRecordFreeSpace(RelFileNode *rel,
+					 BlockNumber minPage,
+					 BlockNumber maxPage,
+					 int nPages,
+					 BlockNumber *pages,
+					 Size *spaceAvail)
+{
+	/* stub */
+}
 
 void
 FreeSpaceMapForgetRel(RelFileNode *rel)
 {
+	/* stub */
 }
 
 
@@ -178,6 +210,7 @@ FreeSpaceMapForgetRel(RelFileNode *rel)
 void
 DumpFreeSpace(void)
 {
+	/* stub */
 }
 
 #endif	 /* FREESPACE_DEBUG */
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 56edbec60f2..9e33c54d52e 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -11,7 +11,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.50 2001/06/27 23:31:39 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.51 2001/06/29 21:08:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -437,7 +437,20 @@ smgrblindmarkdirty(int16 which,
 BlockNumber
 smgrnblocks(int16 which, Relation reln)
 {
-	return (*(smgrsw[which].smgr_nblocks)) (reln);
+	BlockNumber		nblocks;
+
+	nblocks = (*(smgrsw[which].smgr_nblocks)) (reln);
+	/*
+	 * NOTE: if a relation ever did grow to 2^32-1 blocks, this code would
+	 * fail --- but that's a good thing, because it would stop us from
+	 * extending the rel another block and having a block whose number
+	 * actually is InvalidBlockNumber.
+	 */
+	if (nblocks == InvalidBlockNumber)
+		elog(ERROR, "cannot count blocks for %s: %m",
+			 RelationGetRelationName(reln));
+
+	return nblocks;
 }
 
 /*
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 00b66b2575b..670cdf6ddaa 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.140 2001/06/27 23:31:39 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.141 2001/06/29 21:08:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1928,33 +1928,111 @@ RelationCacheAbortWalker(Relation *relationPtr, Datum dummy)
 }
 
 /*
- *		RelationRegisterRelation -
- *		   register the Relation descriptor of a newly created relation
- *		   with the relation descriptor Cache.
+ *		RelationBuildLocalRelation
+ *			Build a relcache entry for an about-to-be-created relation,
+ *			and enter it into the relcache.
  */
-void
-RelationRegisterRelation(Relation relation)
+Relation
+RelationBuildLocalRelation(const char *relname,
+						   TupleDesc tupDesc,
+						   Oid relid, Oid dbid,
+						   bool nailit)
 {
+	Relation	rel;
 	MemoryContext oldcxt;
+	int			natts = tupDesc->natts;
+	int			i;
 
-	RelationInitLockInfo(relation);
+	AssertArg(natts > 0);
+
+	/*
+	 * switch to the cache context to create the relcache entry.
+	 */
+	if (!CacheMemoryContext)
+		CreateCacheMemoryContext();
 
 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
-	RelationCacheInsert(relation);
+	/*
+	 * allocate a new relation descriptor.
+	 */
+	rel = (Relation) palloc(sizeof(RelationData));
+	MemSet((char *) rel, 0, sizeof(RelationData));
+	rel->rd_targblock = InvalidBlockNumber;
+
+	/* make sure relation is marked as having no open file yet */
+	rel->rd_fd = -1;
+
+	RelationSetReferenceCount(rel, 1);
+
+	/*
+	 * nail the reldesc if this is a bootstrap create reln and we may need
+	 * it in the cache later on in the bootstrap process so we don't ever
+	 * want it kicked out.	e.g. pg_attribute!!!
+	 */
+	if (nailit)
+		rel->rd_isnailed = true;
+
+	/*
+	 * create a new tuple descriptor from the one passed in
+	 * (we do this to copy it into the cache context)
+	 */
+	rel->rd_att = CreateTupleDescCopyConstr(tupDesc);
+
+	/*
+	 * initialize relation tuple form (caller may add/override data later)
+	 */
+	rel->rd_rel = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
+	MemSet((char *) rel->rd_rel, 0, CLASS_TUPLE_SIZE);
+
+	strcpy(RelationGetPhysicalRelationName(rel), relname);
+
+	rel->rd_rel->relkind = RELKIND_UNCATALOGED;
+	rel->rd_rel->relnatts = natts;
+	rel->rd_rel->reltype = InvalidOid;
+	if (tupDesc->constr)
+		rel->rd_rel->relchecks = tupDesc->constr->num_check;
+
+	/*
+	 * Insert relation OID and database/tablespace ID into the right places.
+	 * XXX currently we assume physical tblspace/relnode are same as logical
+	 * dbid/reloid.  Probably should pass an extra pair of parameters.
+	 */
+	rel->rd_rel->relisshared = (dbid == InvalidOid);
+
+	RelationGetRelid(rel) = relid;
+
+	for (i = 0; i < natts; i++)
+		rel->rd_att->attrs[i]->attrelid = relid;
+
+	RelationInitLockInfo(rel);	/* see lmgr.c */
+
+	rel->rd_node.tblNode = dbid;
+	rel->rd_node.relNode = relid;
+	rel->rd_rel->relfilenode = relid;
+
+	/*
+	 * Okay to insert into the relcache hash tables.
+	 */
+	RelationCacheInsert(rel);
 
 	/*
 	 * we've just created the relation. It is invisible to anyone else
 	 * before the transaction is committed. Setting rd_myxactonly allows
 	 * us to use the local buffer manager for select/insert/etc before the
 	 * end of transaction. (We also need to keep track of relations
-	 * created during a transaction and does the necessary clean up at the
+	 * created during a transaction and do the necessary clean up at the
 	 * end of the transaction.)				- ay 3/95
 	 */
-	relation->rd_myxactonly = TRUE;
-	newlyCreatedRelns = lcons(relation, newlyCreatedRelns);
+	rel->rd_myxactonly = true;
+	newlyCreatedRelns = lcons(rel, newlyCreatedRelns);
 
+	/*
+	 * done building relcache entry.
+	 */
 	MemoryContextSwitchTo(oldcxt);
+
+	return rel;
 }
 
 /*
@@ -1972,14 +2050,18 @@ RelationPurgeLocalRelation(bool xactCommitted)
 		List	   *l = newlyCreatedRelns;
 		Relation	reln = lfirst(l);
 
+		newlyCreatedRelns = lnext(newlyCreatedRelns);
+		pfree(l);
+
 		Assert(reln != NULL && reln->rd_myxactonly);
 
 		reln->rd_myxactonly = false;	/* mark it not on list anymore */
 
-		newlyCreatedRelns = lnext(newlyCreatedRelns);
-		pfree(l);
-
-		/* XXX is this step still needed?  If so, why? */
+		/*
+		 * XXX while we clearly must throw out new Relation entries at
+		 * xact abort, it's not clear why we need to do it at commit.
+		 * Could this be improved?
+		 */
 		if (!IsBootstrapProcessingMode())
 			RelationClearRelation(reln, false);
 	}
diff --git a/src/include/access/hio.h b/src/include/access/hio.h
index 8c50a128a20..da4114c0d00 100644
--- a/src/include/access/hio.h
+++ b/src/include/access/hio.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: hio.h,v 1.18 2001/05/16 22:35:12 tgl Exp $
+ * $Id: hio.h,v 1.19 2001/06/29 21:08:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -19,6 +19,6 @@
 extern void RelationPutHeapTuple(Relation relation, Buffer buffer,
 					 HeapTuple tuple);
 extern Buffer RelationGetBufferForTuple(Relation relation, Size len,
-										BlockNumber minblocknum);
+										Buffer otherBuffer);
 
 #endif	 /* HIO_H */
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 9d7f568e193..f06b3bb6cc3 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: bufmgr.h,v 1.52 2001/06/09 18:16:59 tgl Exp $
+ * $Id: bufmgr.h,v 1.53 2001/06/29 21:08:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -155,10 +155,11 @@ extern long *LocalRefCount;
  * prototypes for functions in bufmgr.c
  */
 extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
+extern int	ReleaseBuffer(Buffer buffer);
 extern int	WriteBuffer(Buffer buffer);
 extern int	WriteNoReleaseBuffer(Buffer buffer);
 extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation,
-								   BlockNumber blockNum, bool isExtend);
+								   BlockNumber blockNum);
 extern int	FlushBuffer(Buffer buffer, bool sync, bool release);
 
 extern void InitBufferPool(void);
@@ -176,7 +177,6 @@ extern void DropRelFileNodeBuffers(RelFileNode rnode);
 extern void DropBuffers(Oid dbid);
 extern void PrintPinnedBufs(void);
 extern int	BufferShmemSize(void);
-extern int	ReleaseBuffer(Buffer buffer);
 extern RelFileNode BufferGetFileNode(Buffer buffer);
 
 extern void SetBufferCommitInfoNeedsSave(Buffer buffer);
diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h
index a9178b8604d..160ee0f47c7 100644
--- a/src/include/utils/relcache.h
+++ b/src/include/utils/relcache.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: relcache.h,v 1.24 2001/01/24 19:43:29 momjian Exp $
+ * $Id: relcache.h,v 1.25 2001/06/29 21:08:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -27,7 +27,6 @@ extern Relation RelationNodeCacheGetRelation(RelFileNode rnode);
 extern Relation RelationIdCacheGetRelation(Oid relationId);
 
 extern void RelationClose(Relation relation);
-extern void RelationForgetRelation(Oid rid);
 
 /*
  * Routines to compute/retrieve additional cached information
@@ -40,14 +39,23 @@ extern List *RelationGetIndexList(Relation relation);
 extern void RelationCacheInitialize(void);
 extern void RelationCacheInitializePhase2(void);
 
+/*
+ * Routine to create a relcache entry for an about-to-be-created relation
+ */
+extern Relation RelationBuildLocalRelation(const char *relname,
+										   TupleDesc tupDesc,
+										   Oid relid, Oid dbid,
+										   bool nailit);
+
 /*
  * Routines for flushing/rebuilding relcache entries in various scenarios
  */
+extern void RelationForgetRelation(Oid rid);
+
 extern void RelationIdInvalidateRelationCacheByRelationId(Oid relationId);
 
 extern void RelationCacheInvalidate(void);
 
-extern void RelationRegisterRelation(Relation relation);
 extern void RelationPurgeLocalRelation(bool xactComitted);
 
 extern void RelationCacheAbort(void);
-- 
GitLab