diff --git a/doc/src/sgml/manage-ag.sgml b/doc/src/sgml/manage-ag.sgml
index be9c963c500efd4bb91f6acbc5dd5abd67b9913c..cb0eb7a6dcb01a7cca4eba80f21e07713b68f334 100644
--- a/doc/src/sgml/manage-ag.sgml
+++ b/doc/src/sgml/manage-ag.sgml
@@ -1,4 +1,4 @@
-<!-- $PostgreSQL: pgsql/doc/src/sgml/manage-ag.sgml,v 2.45 2006/03/10 19:10:48 momjian Exp $ -->
+<!-- $PostgreSQL: pgsql/doc/src/sgml/manage-ag.sgml,v 2.46 2006/05/04 16:07:28 tgl Exp $ -->
 
 <chapter id="managing-databases">
  <title>Managing Databases</title>
@@ -166,6 +166,7 @@ CREATE DATABASE <replaceable>dbname</> OWNER <replaceable>rolename</>;
 <programlisting>
 createdb -O <replaceable>rolename</> <replaceable>dbname</>
 </programlisting>
+   from the shell.
    You must be a superuser to be allowed to create a database for
    someone else (that is, for a role you are not a member of).
   </para>
@@ -220,19 +221,15 @@ createdb -T template0 <replaceable>dbname</>
 
   <para>
    It is possible to create additional template databases, and indeed
-   one might copy any database in a cluster by specifying its name
+   one may copy any database in a cluster by specifying its name
    as the template for <command>CREATE DATABASE</>.  It is important to
    understand, however, that this is not (yet) intended as
-   a general-purpose <quote><command>COPY DATABASE</command></quote> facility.  In particular, it is
-   essential that the source database be idle (no data-altering transactions
-   in progress)
-   for the duration of the copying operation.  <command>CREATE DATABASE</>
-   will check
-   that no session (other than itself) is connected to
-   the source database at the start of the operation, but this does not
-   guarantee that changes cannot be made while the copy proceeds, which
-   would result in an inconsistent copied database.  Therefore,
-   we recommend that databases used as templates be treated as read-only.
+   a general-purpose <quote><command>COPY DATABASE</command></quote> facility.
+   The principal limitation is that no other sessions can be connected to
+   the source database while it is being copied.  <command>CREATE
+   DATABASE</> will fail if any other connection exists when it starts;
+   otherwise, new connections to the source database are locked out
+   until <command>CREATE DATABASE</> completes.
   </para>
 
   <para>
diff --git a/doc/src/sgml/ref/create_database.sgml b/doc/src/sgml/ref/create_database.sgml
index b4bd2d57398b9df9879149c493fa727bdef7d0d3..6bf94dbf150a60eb6b2c05a2b1e55808f455af55 100644
--- a/doc/src/sgml/ref/create_database.sgml
+++ b/doc/src/sgml/ref/create_database.sgml
@@ -1,5 +1,5 @@
 <!--
-$PostgreSQL: pgsql/doc/src/sgml/ref/create_database.sgml,v 1.44 2005/07/31 17:19:17 tgl Exp $
+$PostgreSQL: pgsql/doc/src/sgml/ref/create_database.sgml,v 1.45 2006/05/04 16:07:29 tgl Exp $
 PostgreSQL documentation
 -->
 
@@ -45,7 +45,7 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
 
   <para>
    Normally, the creator becomes the owner of the new database.
-   Superusers can create databases owned by other users using the
+   Superusers can create databases owned by other users, by using the
    <literal>OWNER</> clause. They can even create databases owned by
    users with no special privileges. Non-superusers with <literal>CREATEDB</>
    privilege can only create databases owned by themselves.
@@ -104,7 +104,8 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
         Character set encoding to use in the new database.  Specify
         a string constant (e.g., <literal>'SQL_ASCII'</literal>),
         or an integer encoding number, or <literal>DEFAULT</literal>
-        to use the default encoding. The character sets supported by the
+        to use the default encoding (namely, the encoding of the
+        template database). The character sets supported by the
         <productname>PostgreSQL</productname> server are described in
         <xref linkend="multibyte-charset-supported">.
        </para>
@@ -169,7 +170,11 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
    Although it is possible to copy a database other than <literal>template1</>
    by specifying its name as the template, this is not (yet) intended as
    a general-purpose <quote><command>COPY DATABASE</command></quote> facility.
-   We recommend that databases used as templates be treated as read-only.
+   The principal limitation is that no other sessions can be connected to
+   the template database while it is being copied.  <command>CREATE
+   DATABASE</> will fail if any other connection exists when it starts;
+   otherwise, new connections to the template database are locked out
+   until <command>CREATE DATABASE</> completes.
    See <xref linkend="manage-ag-templatedbs"> for more information.
   </para>
 
@@ -220,6 +225,16 @@ CREATE DATABASE music ENCODING 'LATIN1';
    implementation-defined.
   </para>
  </refsect1>
+
+ <refsect1>
+  <title>See Also</title>
+
+  <simplelist type="inline">
+   <member><xref linkend="sql-alterdatabase" endterm="sql-alterdatabase-title"></member>
+   <member><xref linkend="sql-dropdatabase" endterm="sql-dropdatabase-title"></member>
+  </simplelist>
+ </refsect1>
+
 </refentry>
 
 <!-- Keep this comment at the end of the file
diff --git a/doc/src/sgml/ref/drop_database.sgml b/doc/src/sgml/ref/drop_database.sgml
index ee63c11815aabc30219e22eb6e23c19159cb423f..74e14dc1388ac3f01112d656c107f19110cb25cf 100644
--- a/doc/src/sgml/ref/drop_database.sgml
+++ b/doc/src/sgml/ref/drop_database.sgml
@@ -1,5 +1,5 @@
 <!--
-$PostgreSQL: pgsql/doc/src/sgml/ref/drop_database.sgml,v 1.21 2005/11/22 15:24:17 adunstan Exp $
+$PostgreSQL: pgsql/doc/src/sgml/ref/drop_database.sgml,v 1.22 2006/05/04 16:07:29 tgl Exp $
 PostgreSQL documentation
 -->
 
@@ -86,7 +86,7 @@ DROP DATABASE [ IF EXISTS ] <replaceable class="PARAMETER">name</replaceable>
   <title>Compatibility</title>
 
   <para>
-   The is no <command>DROP DATABASE</command> statement in the SQL standard.
+   There is no <command>DROP DATABASE</command> statement in the SQL standard.
   </para>
  </refsect1>
 
diff --git a/src/backend/catalog/pg_shdepend.c b/src/backend/catalog/pg_shdepend.c
index 547111bbf065c4e5414b2291041fc58a17ce713c..deac9a75d816dfe1c62637c3ad26c3bd802d4173 100644
--- a/src/backend/catalog/pg_shdepend.c
+++ b/src/backend/catalog/pg_shdepend.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.8 2006/03/05 15:58:23 momjian Exp $
+ *	  $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.9 2006/05/04 16:07:29 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -37,7 +37,6 @@
 #include "lib/stringinfo.h"
 #include "miscadmin.h"
 #include "utils/fmgroids.h"
-#include "utils/inval.h"
 #include "utils/syscache.h"
 
 
@@ -911,12 +910,6 @@ shdepLockAndCheckObject(Oid classId, Oid objectId)
 	/* AccessShareLock should be OK, since we are not modifying the object */
 	LockSharedObject(classId, objectId, 0, AccessShareLock);
 
-	/*
-	 * We have to recognize sinval updates here, else our local syscache may
-	 * still contain the object even if it was just dropped.
-	 */
-	AcceptInvalidationMessages();
-
 	switch (classId)
 	{
 		case AuthIdRelationId:
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index b21d750394d9fc30759b7447783ad31d1e2aa0fe..518fed819689db16c85d43b9f178dee35a11afba 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -3,19 +3,17 @@
  * dbcommands.c
  *		Database management commands (create/drop database).
  *
- * Note: database creation/destruction commands take ExclusiveLock on
- * pg_database to ensure that no two proceed in parallel.  We must use
- * at least this level of locking to ensure that no two backends try to
- * write the flat-file copy of pg_database at once.  We avoid using
- * AccessExclusiveLock since there's no need to lock out ordinary readers
- * of pg_database.
+ * Note: database creation/destruction commands use exclusive locks on
+ * the database objects (as expressed by LockSharedObject()) to avoid
+ * stepping on each others' toes.  Formerly we used table-level locks
+ * on pg_database, but that's too coarse-grained.
  *
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.180 2006/05/03 22:45:26 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.181 2006/05/04 16:07:29 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -53,7 +51,8 @@
 
 
 /* non-export function prototypes */
-static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
+static bool get_db_info(const char *name, LOCKMODE lockmode,
+			Oid *dbIdP, Oid *ownerIdP,
 			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
 			Oid *dbLastSysOidP,
 			TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
@@ -80,13 +79,12 @@ createdb(const CreatedbStmt *stmt)
 	TransactionId src_frozenxid;
 	Oid			src_deftablespace;
 	volatile Oid dst_deftablespace;
-	volatile Relation pg_database_rel;
+	Relation	pg_database_rel;
 	HeapTuple	tuple;
-	TupleDesc	pg_database_dsc;
 	Datum		new_record[Natts_pg_database];
 	char		new_record_nulls[Natts_pg_database];
 	Oid			dboid;
-	volatile Oid datdba;
+	Oid			datdba;
 	ListCell   *option;
 	DefElem    *dtablespacename = NULL;
 	DefElem    *downer = NULL;
@@ -96,8 +94,8 @@ createdb(const CreatedbStmt *stmt)
 	char	   *dbname = stmt->dbname;
 	char	   *dbowner = NULL;
 	const char *dbtemplate = NULL;
-	volatile int encoding = -1;
-	volatile int dbconnlimit = -1;
+	int			encoding = -1;
+	int			dbconnlimit = -1;
 
 	/* don't call this in a transaction block */
 	PreventTransactionChain((void *) stmt, "CREATE DATABASE");
@@ -216,31 +214,25 @@ createdb(const CreatedbStmt *stmt)
 	check_is_member_of_role(GetUserId(), datdba);
 
 	/*
-	 * Check for db name conflict.	There is a race condition here, since
-	 * another backend could create the same DB name before we commit.
-	 * However, holding an exclusive lock on pg_database for the whole time we
-	 * are copying the source database doesn't seem like a good idea, so
-	 * accept possibility of race to create.  We will check again after we
-	 * grab the exclusive lock.
-	 */
-	if (get_db_info(dbname, NULL, NULL, NULL,
-					NULL, NULL, NULL, NULL, NULL, NULL))
-		ereport(ERROR,
-				(errcode(ERRCODE_DUPLICATE_DATABASE),
-				 errmsg("database \"%s\" already exists", dbname)));
-
-	/*
-	 * Lookup database (template) to be cloned.
+	 * Lookup database (template) to be cloned, and obtain share lock on it.
+	 * ShareLock allows two CREATE DATABASEs to work from the same template
+	 * concurrently, while ensuring no one is busy dropping it in parallel
+	 * (which would be Very Bad since we'd likely get an incomplete copy
+	 * without knowing it).  This also prevents any new connections from being
+	 * made to the source until we finish copying it, so we can be sure it
+	 * won't change underneath us.
 	 */
 	if (!dbtemplate)
 		dbtemplate = "template1";		/* Default template database name */
 
-	if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
+	if (!get_db_info(dbtemplate, ShareLock,
+					 &src_dboid, &src_owner, &src_encoding,
 					 &src_istemplate, &src_allowconn, &src_lastsysoid,
 					 &src_vacuumxid, &src_frozenxid, &src_deftablespace))
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_DATABASE),
-			 errmsg("template database \"%s\" does not exist", dbtemplate)));
+				 errmsg("template database \"%s\" does not exist",
+						dbtemplate)));
 
 	/*
 	 * Permission check: to copy a DB that's not marked datistemplate, you
@@ -258,8 +250,7 @@ createdb(const CreatedbStmt *stmt)
 	/*
 	 * The source DB can't have any active backends, except this one
 	 * (exception is to allow CREATE DB while connected to template1).
-	 * Otherwise we might copy inconsistent data.  This check is not
-	 * bulletproof, since someone might connect while we are copying...
+	 * Otherwise we might copy inconsistent data.
 	 */
 	if (DatabaseHasActiveBackends(src_dboid, true))
 		ereport(ERROR,
@@ -346,14 +337,65 @@ createdb(const CreatedbStmt *stmt)
 		src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
 
 	/*
-	 * Preassign OID for pg_database tuple, so that we can compute db path. We
-	 * have to open pg_database to do this, but we don't want to take
-	 * ExclusiveLock yet, so just do it and close again.
+	 * Check for db name conflict.	This is just to give a more friendly
+	 * error message than "unique index violation".  There's a race condition
+	 * but we're willing to accept the less friendly message in that case.
 	 */
-	pg_database_rel = heap_open(DatabaseRelationId, AccessShareLock);
-	dboid = GetNewOid(pg_database_rel);
-	heap_close(pg_database_rel, AccessShareLock);
-	pg_database_rel = NULL;
+	if (OidIsValid(get_database_oid(dbname)))
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_DATABASE),
+				 errmsg("database \"%s\" already exists", dbname)));
+
+	/*
+	 * Insert a new tuple into pg_database.  This establishes our ownership
+	 * of the new database name (anyone else trying to insert the same name
+	 * will block on the unique index, and fail after we commit).  It also
+	 * assigns the OID that the new database will have.
+	 */
+	pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
+
+	/* Form tuple */
+	MemSet(new_record, 0, sizeof(new_record));
+	MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
+
+	new_record[Anum_pg_database_datname - 1] =
+		DirectFunctionCall1(namein, CStringGetDatum(dbname));
+	new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
+	new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
+	new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
+	new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
+	new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
+	new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
+	new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
+	new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
+	new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
+
+	/*
+	 * We deliberately set datconfig and datacl to defaults (NULL), rather
+	 * than copying them from the template database.  Copying datacl would
+	 * be a bad idea when the owner is not the same as the template's
+	 * owner. It's more debatable whether datconfig should be copied.
+	 */
+	new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
+	new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
+
+	tuple = heap_formtuple(RelationGetDescr(pg_database_rel),
+						   new_record, new_record_nulls);
+
+	dboid = simple_heap_insert(pg_database_rel, tuple);
+
+	/* Update indexes */
+	CatalogUpdateIndexes(pg_database_rel, tuple);
+
+	/*
+	 * Now generate additional catalog entries associated with the new DB
+	 */
+
+	/* Register owner dependency */
+	recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
+
+	/* Create pg_shdepend entries for objects within database */
+	copyTemplateDependencies(src_dboid, dboid);
 
 	/*
 	 * Force dirty buffers out to disk, to ensure source database is
@@ -434,64 +476,6 @@ createdb(const CreatedbStmt *stmt)
 		heap_endscan(scan);
 		heap_close(rel, AccessShareLock);
 
-		/*
-		 * Now OK to grab exclusive lock on pg_database.
-		 */
-		pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock);
-
-		/* Check to see if someone else created same DB name meanwhile. */
-		if (get_db_info(dbname, NULL, NULL, NULL,
-						NULL, NULL, NULL, NULL, NULL, NULL))
-			ereport(ERROR,
-					(errcode(ERRCODE_DUPLICATE_DATABASE),
-					 errmsg("database \"%s\" already exists", dbname)));
-
-		/*
-		 * Insert a new tuple into pg_database
-		 */
-		pg_database_dsc = RelationGetDescr(pg_database_rel);
-
-		/* Form tuple */
-		MemSet(new_record, 0, sizeof(new_record));
-		MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
-
-		new_record[Anum_pg_database_datname - 1] =
-			DirectFunctionCall1(namein, CStringGetDatum(dbname));
-		new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
-		new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
-		new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
-		new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
-		new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
-		new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
-		new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
-		new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
-		new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
-
-		/*
-		 * We deliberately set datconfig and datacl to defaults (NULL), rather
-		 * than copying them from the template database.  Copying datacl would
-		 * be a bad idea when the owner is not the same as the template's
-		 * owner. It's more debatable whether datconfig should be copied.
-		 */
-		new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
-		new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
-
-		tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
-
-		HeapTupleSetOid(tuple, dboid);	/* override heap_insert's OID
-										 * selection */
-
-		simple_heap_insert(pg_database_rel, tuple);
-
-		/* Update indexes */
-		CatalogUpdateIndexes(pg_database_rel, tuple);
-
-		/* Register owner dependency */
-		recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
-
-		/* Create pg_shdepend entries for objects within database */
-		copyTemplateDependencies(src_dboid, dboid);
-
 		/*
 		 * We force a checkpoint before committing.  This effectively means
 		 * that committed XLOG_DBASE_CREATE operations will never need to be
@@ -523,6 +507,12 @@ createdb(const CreatedbStmt *stmt)
 		 */
 		RequestCheckpoint(true, false);
 
+		/*
+		 * Close pg_database, but keep lock till commit (this is important
+		 * to prevent any risk of deadlock failure while updating flat file)
+		 */
+		heap_close(pg_database_rel, NoLock);
+
 		/*
 		 * Set flag to update flat database file at commit.
 		 */
@@ -530,9 +520,9 @@ createdb(const CreatedbStmt *stmt)
 	}
 	PG_CATCH();
 	{
-		/* Don't hold pg_database lock while doing recursive remove */
-		if (pg_database_rel != NULL)
-			heap_close(pg_database_rel, ExclusiveLock);
+		/* Release lock on source database before doing recursive remove */
+		UnlockSharedObject(DatabaseRelationId, src_dboid, 0,
+						   ShareLock);
 
 		/* Throw away any successfully copied subdirectories */
 		remove_dbtablespaces(dboid);
@@ -540,10 +530,6 @@ createdb(const CreatedbStmt *stmt)
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
-
-	/* Close pg_database, but keep exclusive lock till commit */
-	/* This has to be outside the PG_TRY */
-	heap_close(pg_database_rel, NoLock);
 }
 
 
@@ -568,20 +554,15 @@ dropdb(const char *dbname, bool missing_ok)
 				 errmsg("cannot drop the currently open database")));
 
 	/*
-	 * Obtain exclusive lock on pg_database.  We need this to ensure that no
-	 * new backend starts up in the target database while we are deleting it.
-	 * (Actually, a new backend might still manage to start up, because it
-	 * isn't able to lock pg_database while starting.  But it will detect its
-	 * error in ReverifyMyDatabase and shut down before any serious damage is
-	 * done.  See postinit.c.)
-	 *
-	 * An ExclusiveLock, rather than AccessExclusiveLock, is sufficient since
-	 * ReverifyMyDatabase takes RowShareLock.  This allows ordinary readers of
-	 * pg_database to proceed in parallel.
+	 * Look up the target database's OID, and get exclusive lock on it.
+	 * We need this to ensure that no new backend starts up in the target
+	 * database while we are deleting it (see postinit.c), and that no one is
+	 * using it as a CREATE DATABASE template or trying to delete it for
+	 * themselves.
 	 */
-	pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
+	pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
 
-	if (!get_db_info(dbname, &db_id, NULL, NULL,
+	if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
 					 &db_istemplate, NULL, NULL, NULL, NULL, NULL))
 	{
 		if (!missing_ok)
@@ -592,17 +573,18 @@ dropdb(const char *dbname, bool missing_ok)
 		}
 		else
 		{
-
 			/* Close pg_database, release the lock, since we changed nothing */
-			heap_close(pgdbrel, ExclusiveLock);
+			heap_close(pgdbrel, RowExclusiveLock);
 			ereport(NOTICE,
 					(errmsg("database \"%s\" does not exist, skipping",
 							dbname)));
-
 			return;
 		}
 	}
 
+	/*
+	 * Permission checks
+	 */
 	if (!pg_database_ownercheck(db_id, GetUserId()))
 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
 					   dbname);
@@ -618,7 +600,8 @@ dropdb(const char *dbname, bool missing_ok)
 				 errmsg("cannot drop a template database")));
 
 	/*
-	 * Check for active backends in the target database.
+	 * Check for active backends in the target database.  (Because we hold
+	 * the database lock, no new ones can start after this.)
 	 */
 	if (DatabaseHasActiveBackends(db_id, false))
 		ereport(ERROR,
@@ -640,8 +623,7 @@ dropdb(const char *dbname, bool missing_ok)
 	ReleaseSysCache(tup);
 
 	/*
-	 * Delete any comments associated with the database
-	 *
+	 * Delete any comments associated with the database.
 	 */
 	DeleteSharedComments(db_id, DatabaseRelationId);
 
@@ -675,7 +657,10 @@ dropdb(const char *dbname, bool missing_ok)
 	 */
 	remove_dbtablespaces(db_id);
 
-	/* Close pg_database, but keep exclusive lock till commit */
+	/*
+	 * Close pg_database, but keep lock till commit (this is important
+	 * to prevent any risk of deadlock failure while updating flat file)
+	 */
 	heap_close(pgdbrel, NoLock);
 
 	/*
@@ -691,29 +676,18 @@ dropdb(const char *dbname, bool missing_ok)
 void
 RenameDatabase(const char *oldname, const char *newname)
 {
-	HeapTuple	tup,
-				newtup;
+	Oid			db_id;
+	HeapTuple	newtup;
 	Relation	rel;
-	SysScanDesc scan,
-				scan2;
-	ScanKeyData key,
-				key2;
 
 	/*
-	 * Obtain ExclusiveLock so that no new session gets started while the
-	 * rename is in progress.
+	 * Look up the target database's OID, and get exclusive lock on it.
+	 * We need this for the same reasons as DROP DATABASE.
 	 */
-	rel = heap_open(DatabaseRelationId, ExclusiveLock);
-
-	ScanKeyInit(&key,
-				Anum_pg_database_datname,
-				BTEqualStrategyNumber, F_NAMEEQ,
-				NameGetDatum(oldname));
-	scan = systable_beginscan(rel, DatabaseNameIndexId, true,
-							  SnapshotNow, 1, &key);
+	rel = heap_open(DatabaseRelationId, RowExclusiveLock);
 
-	tup = systable_getnext(scan);
-	if (!HeapTupleIsValid(tup))
+	if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
+					 NULL, NULL, NULL, NULL, NULL, NULL))
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_DATABASE),
 				 errmsg("database \"%s\" does not exist", oldname)));
@@ -724,36 +698,29 @@ RenameDatabase(const char *oldname, const char *newname)
 	 * be an actual problem besides a little confusion, so think about this
 	 * and decide.
 	 */
-	if (HeapTupleGetOid(tup) == MyDatabaseId)
+	if (db_id == MyDatabaseId)
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("current database may not be renamed")));
 
 	/*
-	 * Make sure the database does not have active sessions.  Might not be
-	 * necessary, but it's consistent with other database operations.
+	 * Make sure the database does not have active sessions.  This is the
+	 * same concern as above, but applied to other sessions.
 	 */
-	if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
+	if (DatabaseHasActiveBackends(db_id, false))
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_IN_USE),
 				 errmsg("database \"%s\" is being accessed by other users",
 						oldname)));
 
 	/* make sure the new name doesn't exist */
-	ScanKeyInit(&key2,
-				Anum_pg_database_datname,
-				BTEqualStrategyNumber, F_NAMEEQ,
-				NameGetDatum(newname));
-	scan2 = systable_beginscan(rel, DatabaseNameIndexId, true,
-							   SnapshotNow, 1, &key2);
-	if (HeapTupleIsValid(systable_getnext(scan2)))
+	if (OidIsValid(get_database_oid(newname)))
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_DATABASE),
 				 errmsg("database \"%s\" already exists", newname)));
-	systable_endscan(scan2);
 
 	/* must be owner */
-	if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+	if (!pg_database_ownercheck(db_id, GetUserId()))
 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
 					   oldname);
 
@@ -764,14 +731,19 @@ RenameDatabase(const char *oldname, const char *newname)
 				 errmsg("permission denied to rename database")));
 
 	/* rename */
-	newtup = heap_copytuple(tup);
+	newtup = SearchSysCacheCopy(DATABASEOID,
+								ObjectIdGetDatum(db_id),
+								0, 0, 0);
+	if (!HeapTupleIsValid(newtup))
+		elog(ERROR, "cache lookup failed for database %u", db_id);
 	namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
 	simple_heap_update(rel, &newtup->t_self, newtup);
 	CatalogUpdateIndexes(rel, newtup);
 
-	systable_endscan(scan);
-
-	/* Close pg_database, but keep exclusive lock till commit */
+	/*
+	 * Close pg_database, but keep lock till commit (this is important
+	 * to prevent any risk of deadlock failure while updating flat file)
+	 */
 	heap_close(rel, NoLock);
 
 	/*
@@ -821,7 +793,9 @@ AlterDatabase(AlterDatabaseStmt *stmt)
 		connlimit = intVal(dconnlimit->arg);
 
 	/*
-	 * We don't need ExclusiveLock since we aren't updating the flat file.
+	 * Get the old tuple.  We don't need a lock on the database per se,
+	 * because we're not going to do anything that would mess up incoming
+	 * connections.
 	 */
 	rel = heap_open(DatabaseRelationId, RowExclusiveLock);
 	ScanKeyInit(&scankey,
@@ -891,7 +865,9 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
 	valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
 
 	/*
-	 * We don't need ExclusiveLock since we aren't updating the flat file.
+	 * Get the old tuple.  We don't need a lock on the database per se,
+	 * because we're not going to do anything that would mess up incoming
+	 * connections.
 	 */
 	rel = heap_open(DatabaseRelationId, RowExclusiveLock);
 	ScanKeyInit(&scankey,
@@ -974,7 +950,9 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
 	Form_pg_database datForm;
 
 	/*
-	 * We don't need ExclusiveLock since we aren't updating the flat file.
+	 * Get the old tuple.  We don't need a lock on the database per se,
+	 * because we're not going to do anything that would mess up incoming
+	 * connections.
 	 */
 	rel = heap_open(DatabaseRelationId, RowExclusiveLock);
 	ScanKeyInit(&scankey,
@@ -1077,72 +1055,127 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
  * Helper functions
  */
 
+/*
+ * Look up info about the database named "name".  If the database exists,
+ * obtain the specified lock type on it, fill in any of the remaining
+ * parameters that aren't NULL, and return TRUE.  If no such database,
+ * return FALSE.
+ */
 static bool
-get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
+get_db_info(const char *name, LOCKMODE lockmode,
+			Oid *dbIdP, Oid *ownerIdP,
 			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
 			Oid *dbLastSysOidP,
 			TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
 			Oid *dbTablespace)
 {
+	bool		result = false;
 	Relation	relation;
-	ScanKeyData scanKey;
-	SysScanDesc scan;
-	HeapTuple	tuple;
-	bool		gottuple;
 
 	AssertArg(name);
 
 	/* Caller may wish to grab a better lock on pg_database beforehand... */
 	relation = heap_open(DatabaseRelationId, AccessShareLock);
 
-	ScanKeyInit(&scanKey,
-				Anum_pg_database_datname,
-				BTEqualStrategyNumber, F_NAMEEQ,
-				NameGetDatum(name));
+	/*
+	 * Loop covers the rare case where the database is renamed before we
+	 * can lock it.  We try again just in case we can find a new one of
+	 * the same name.
+	 */
+	for (;;)
+	{
+		ScanKeyData scanKey;
+		SysScanDesc scan;
+		HeapTuple	tuple;
+		Oid			dbOid;
 
-	scan = systable_beginscan(relation, DatabaseNameIndexId, true,
-							  SnapshotNow, 1, &scanKey);
+		/*
+		 * there's no syscache for database-indexed-by-name,
+		 * so must do it the hard way
+		 */
+		ScanKeyInit(&scanKey,
+					Anum_pg_database_datname,
+					BTEqualStrategyNumber, F_NAMEEQ,
+					NameGetDatum(name));
 
-	tuple = systable_getnext(scan);
+		scan = systable_beginscan(relation, DatabaseNameIndexId, true,
+								  SnapshotNow, 1, &scanKey);
 
-	gottuple = HeapTupleIsValid(tuple);
-	if (gottuple)
-	{
-		Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
-
-		/* oid of the database */
-		if (dbIdP)
-			*dbIdP = HeapTupleGetOid(tuple);
-		/* oid of the owner */
-		if (ownerIdP)
-			*ownerIdP = dbform->datdba;
-		/* character encoding */
-		if (encodingP)
-			*encodingP = dbform->encoding;
-		/* allowed as template? */
-		if (dbIsTemplateP)
-			*dbIsTemplateP = dbform->datistemplate;
-		/* allowing connections? */
-		if (dbAllowConnP)
-			*dbAllowConnP = dbform->datallowconn;
-		/* last system OID used in database */
-		if (dbLastSysOidP)
-			*dbLastSysOidP = dbform->datlastsysoid;
-		/* limit of vacuumed XIDs */
-		if (dbVacuumXidP)
-			*dbVacuumXidP = dbform->datvacuumxid;
-		/* limit of frozen XIDs */
-		if (dbFrozenXidP)
-			*dbFrozenXidP = dbform->datfrozenxid;
-		/* default tablespace for this database */
-		if (dbTablespace)
-			*dbTablespace = dbform->dattablespace;
+		tuple = systable_getnext(scan);
+
+		if (!HeapTupleIsValid(tuple))
+		{
+			/* definitely no database of that name */
+			systable_endscan(scan);
+			break;
+		}
+
+		dbOid = HeapTupleGetOid(tuple);
+
+		systable_endscan(scan);
+
+		/*
+		 * Now that we have a database OID, we can try to lock the DB.
+		 */
+		if (lockmode != NoLock)
+			LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
+
+		/*
+		 * And now, re-fetch the tuple by OID.  If it's still there and
+		 * still the same name, we win; else, drop the lock and loop
+		 * back to try again.
+		 */
+		tuple = SearchSysCache(DATABASEOID,
+							   ObjectIdGetDatum(dbOid),
+							   0, 0, 0);
+		if (HeapTupleIsValid(tuple))
+		{
+			Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
+
+			if (strcmp(name, NameStr(dbform->datname)) == 0)
+			{
+				/* oid of the database */
+				if (dbIdP)
+					*dbIdP = dbOid;
+				/* oid of the owner */
+				if (ownerIdP)
+					*ownerIdP = dbform->datdba;
+				/* character encoding */
+				if (encodingP)
+					*encodingP = dbform->encoding;
+				/* allowed as template? */
+				if (dbIsTemplateP)
+					*dbIsTemplateP = dbform->datistemplate;
+				/* allowing connections? */
+				if (dbAllowConnP)
+					*dbAllowConnP = dbform->datallowconn;
+				/* last system OID used in database */
+				if (dbLastSysOidP)
+					*dbLastSysOidP = dbform->datlastsysoid;
+				/* limit of vacuumed XIDs */
+				if (dbVacuumXidP)
+					*dbVacuumXidP = dbform->datvacuumxid;
+				/* limit of frozen XIDs */
+				if (dbFrozenXidP)
+					*dbFrozenXidP = dbform->datfrozenxid;
+				/* default tablespace for this database */
+				if (dbTablespace)
+					*dbTablespace = dbform->dattablespace;
+				ReleaseSysCache(tuple);
+				result = true;
+				break;
+			}
+			/* can only get here if it was just renamed */
+			ReleaseSysCache(tuple);
+		}
+
+		if (lockmode != NoLock)
+			UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
 	}
 
-	systable_endscan(scan);
 	heap_close(relation, AccessShareLock);
 
-	return gottuple;
+	return result;
 }
 
 /* Check if current user has createdb privileges */
@@ -1234,8 +1267,6 @@ remove_dbtablespaces(Oid db_id)
  * get_database_oid - given a database name, look up the OID
  *
  * Returns InvalidOid if database name not found.
- *
- * This is not actually used in this file, but is exported for use elsewhere.
  */
 Oid
 get_database_oid(const char *dbname)
@@ -1277,8 +1308,6 @@ get_database_oid(const char *dbname)
  * get_database_name - given a database OID, look up the name
  *
  * Returns a palloc'd string, or NULL if no such database.
- *
- * This is not actually used in this file, but is exported for use elsewhere.
  */
 char *
 get_database_name(Oid dbid)
diff --git a/src/backend/commands/user.c b/src/backend/commands/user.c
index 5fddd80dfdd3cd24c89e3070fe1342bae868a1a8..61956b27706bd1737d8fcbae925dfbb946ce604c 100644
--- a/src/backend/commands/user.c
+++ b/src/backend/commands/user.c
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/commands/user.c,v 1.170 2006/03/05 15:58:25 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/user.c,v 1.171 2006/05/04 16:07:29 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -274,10 +274,9 @@ CreateRole(CreateRoleStmt *stmt)
 
 	/*
 	 * Check the pg_authid relation to be certain the role doesn't already
-	 * exist.  Note we secure exclusive lock because we need to protect our
-	 * eventual update of the flat auth file.
+	 * exist.
 	 */
-	pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock);
+	pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock);
 	pg_authid_dsc = RelationGetDescr(pg_authid_rel);
 
 	tuple = SearchSysCache(AUTHNAME,
@@ -377,8 +376,8 @@ CreateRole(CreateRoleStmt *stmt)
 				GetUserId(), false);
 
 	/*
-	 * Now we can clean up; but keep lock until commit (to avoid possible
-	 * deadlock when commit code tries to acquire lock).
+	 * Close pg_authid, but keep lock till commit (this is important
+	 * to prevent any risk of deadlock failure while updating flat file)
 	 */
 	heap_close(pg_authid_rel, NoLock);
 
@@ -538,10 +537,9 @@ AlterRole(AlterRoleStmt *stmt)
 		validUntil = strVal(dvalidUntil->arg);
 
 	/*
-	 * Scan the pg_authid relation to be certain the user exists. Note we
-	 * secure exclusive lock to protect our update of the flat auth file.
+	 * Scan the pg_authid relation to be certain the user exists.
 	 */
-	pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock);
+	pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock);
 	pg_authid_dsc = RelationGetDescr(pg_authid_rel);
 
 	tuple = SearchSysCache(AUTHNAME,
@@ -697,8 +695,8 @@ AlterRole(AlterRoleStmt *stmt)
 					false);
 
 	/*
-	 * Now we can clean up; but keep lock until commit (to avoid possible
-	 * deadlock when commit code tries to acquire lock).
+	 * Close pg_authid, but keep lock till commit (this is important
+	 * to prevent any risk of deadlock failure while updating flat file)
 	 */
 	heap_close(pg_authid_rel, NoLock);
 
@@ -726,10 +724,6 @@ AlterRoleSet(AlterRoleSetStmt *stmt)
 
 	valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
 
-	/*
-	 * RowExclusiveLock is sufficient, because we don't need to update the
-	 * flat auth file.
-	 */
 	rel = heap_open(AuthIdRelationId, RowExclusiveLock);
 	oldtuple = SearchSysCache(AUTHNAME,
 							  PointerGetDatum(stmt->role),
@@ -799,6 +793,7 @@ AlterRoleSet(AlterRoleSetStmt *stmt)
 	CatalogUpdateIndexes(rel, newtuple);
 
 	ReleaseSysCache(oldtuple);
+	/* needn't keep lock since we won't be updating the flat file */
 	heap_close(rel, RowExclusiveLock);
 }
 
@@ -820,11 +815,9 @@ DropRole(DropRoleStmt *stmt)
 
 	/*
 	 * Scan the pg_authid relation to find the Oid of the role(s) to be
-	 * deleted.  Note we secure exclusive lock on pg_authid, because we need
-	 * to protect our update of the flat auth file.  A regular writer's lock
-	 * on pg_auth_members is sufficient though.
+	 * deleted.
 	 */
-	pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock);
+	pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock);
 	pg_auth_members_rel = heap_open(AuthMemRelationId, RowExclusiveLock);
 
 	foreach(item, stmt->roles)
@@ -960,7 +953,7 @@ DropRole(DropRoleStmt *stmt)
 
 	/*
 	 * Now we can clean up; but keep locks until commit (to avoid possible
-	 * deadlock when commit code tries to acquire lock).
+	 * deadlock failure while updating flat file)
 	 */
 	heap_close(pg_auth_members_rel, NoLock);
 	heap_close(pg_authid_rel, NoLock);
@@ -989,8 +982,7 @@ RenameRole(const char *oldname, const char *newname)
 	int			i;
 	Oid			roleid;
 
-	/* ExclusiveLock because we need to update the flat auth file */
-	rel = heap_open(AuthIdRelationId, ExclusiveLock);
+	rel = heap_open(AuthIdRelationId, RowExclusiveLock);
 	dsc = RelationGetDescr(rel);
 
 	oldtuple = SearchSysCache(AUTHNAME,
@@ -1080,6 +1072,11 @@ RenameRole(const char *oldname, const char *newname)
 	CatalogUpdateIndexes(rel, newtuple);
 
 	ReleaseSysCache(oldtuple);
+
+	/*
+	 * Close pg_authid, but keep lock till commit (this is important
+	 * to prevent any risk of deadlock failure while updating flat file)
+	 */
 	heap_close(rel, NoLock);
 
 	/*
@@ -1108,11 +1105,8 @@ GrantRole(GrantRoleStmt *stmt)
 
 	grantee_ids = roleNamesToIds(stmt->grantee_roles);
 
-	/*
-	 * Even though this operation doesn't change pg_authid, we must secure
-	 * exclusive lock on it to protect our update of the flat auth file.
-	 */
-	pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock);
+	/* AccessShareLock is enough since we aren't modifying pg_authid */
+	pg_authid_rel = heap_open(AuthIdRelationId, AccessShareLock);
 
 	/*
 	 * Step through all of the granted roles and add/remove entries for the
@@ -1136,6 +1130,10 @@ GrantRole(GrantRoleStmt *stmt)
 						stmt->admin_opt);
 	}
 
+	/*
+	 * Close pg_authid, but keep lock till commit (this is important
+	 * to prevent any risk of deadlock failure while updating flat file)
+	 */
 	heap_close(pg_authid_rel, NoLock);
 
 	/*
@@ -1237,8 +1235,7 @@ roleNamesToIds(List *memberNames)
  * grantorId: who is granting the membership
  * admin_opt: granting admin option?
  *
- * Note: caller is responsible for holding ExclusiveLock on pg_authid,
- * and for calling auth_file_update_needed().
+ * Note: caller is responsible for calling auth_file_update_needed().
  */
 static void
 AddRoleMems(const char *rolename, Oid roleid,
@@ -1283,7 +1280,6 @@ AddRoleMems(const char *rolename, Oid roleid,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 				 errmsg("must be superuser to set grantor")));
 
-	/* We need only regular writer's lock on pg_auth_members */
 	pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock);
 	pg_authmem_dsc = RelationGetDescr(pg_authmem_rel);
 
@@ -1363,8 +1359,8 @@ AddRoleMems(const char *rolename, Oid roleid,
 	}
 
 	/*
-	 * Now we can clean up; but keep lock until commit (to avoid possible
-	 * deadlock when commit code tries to acquire lock).
+	 * Close pg_authmem, but keep lock till commit (this is important
+	 * to prevent any risk of deadlock failure while updating flat file)
 	 */
 	heap_close(pg_authmem_rel, NoLock);
 }
@@ -1378,8 +1374,7 @@ AddRoleMems(const char *rolename, Oid roleid,
  * memberIds: OIDs of roles to del
  * admin_opt: remove admin option only?
  *
- * Note: caller is responsible for holding ExclusiveLock on pg_authid,
- * and for calling auth_file_update_needed().
+ * Note: caller is responsible for calling auth_file_update_needed().
  */
 static void
 DelRoleMems(const char *rolename, Oid roleid,
@@ -1418,7 +1413,6 @@ DelRoleMems(const char *rolename, Oid roleid,
 							rolename)));
 	}
 
-	/* We need only regular writer's lock on pg_auth_members */
 	pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock);
 	pg_authmem_dsc = RelationGetDescr(pg_authmem_rel);
 
@@ -1478,8 +1472,8 @@ DelRoleMems(const char *rolename, Oid roleid,
 	}
 
 	/*
-	 * Now we can clean up; but keep lock until commit (to avoid possible
-	 * deadlock when commit code tries to acquire lock).
+	 * Close pg_authmem, but keep lock till commit (this is important
+	 * to prevent any risk of deadlock failure while updating flat file)
 	 */
 	heap_close(pg_authmem_rel, NoLock);
 }
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 76103d690162ceb390f158e62e665160fed7f40a..09e9c8b7e9be1548a92450b3354d8444f97f0181 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.82 2006/03/05 15:58:38 momjian Exp $
+ *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.83 2006/05/04 16:07:29 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -478,6 +478,9 @@ LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
 					   objsubid);
 
 	(void) LockAcquire(&tag, false, lockmode, false, false);
+
+	/* Make sure syscaches are up-to-date with any changes we waited for */
+	AcceptInvalidationMessages();
 }
 
 /*
diff --git a/src/backend/utils/init/flatfiles.c b/src/backend/utils/init/flatfiles.c
index 50aa55ae18503a44523480990d546925a937b2de..79e73af8201fe4b6684caa9069187ffd8164deba 100644
--- a/src/backend/utils/init/flatfiles.c
+++ b/src/backend/utils/init/flatfiles.c
@@ -23,7 +23,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/utils/init/flatfiles.c,v 1.17 2006/03/05 15:58:46 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/init/flatfiles.c,v 1.18 2006/05/04 16:07:29 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -768,25 +768,49 @@ AtEOXact_UpdateFlatFiles(bool isCommit)
 	CommandCounterIncrement();
 
 	/*
-	 * We use ExclusiveLock to ensure that only one backend writes the flat
-	 * file(s) at a time.  That's sufficient because it's okay to allow plain
-	 * reads of the tables in parallel.  There is some chance of a deadlock
-	 * here (if we were triggered by a user update of one of the tables, which
-	 * likely won't have gotten a strong enough lock), so get the locks we
-	 * need before writing anything.
+	 * Open and lock the needed catalog(s).
 	 *
-	 * For writing the auth file, it's sufficient to ExclusiveLock pg_authid;
-	 * we take just regular AccessShareLock on pg_auth_members.
+	 * Even though we only need AccessShareLock, this could theoretically fail
+	 * due to deadlock.  In practice, however, our transaction already holds
+	 * RowExclusiveLock or better (it couldn't have updated the catalog
+	 * without such a lock).  This implies that dbcommands.c and other places
+	 * that force flat-file updates must not follow the common practice of
+	 * dropping catalog locks before commit.
 	 */
 	if (database_file_update_subid != InvalidSubTransactionId)
-		drel = heap_open(DatabaseRelationId, ExclusiveLock);
+		drel = heap_open(DatabaseRelationId, AccessShareLock);
 
 	if (auth_file_update_subid != InvalidSubTransactionId)
 	{
-		arel = heap_open(AuthIdRelationId, ExclusiveLock);
+		arel = heap_open(AuthIdRelationId, AccessShareLock);
 		mrel = heap_open(AuthMemRelationId, AccessShareLock);
 	}
 
+	/*
+	 * Obtain special locks to ensure that two transactions don't try to write
+	 * the same flat file concurrently.  Quite aside from any direct risks of
+	 * corrupted output, the winning writer probably wouldn't have seen the
+	 * other writer's updates.  By taking a lock and holding it till commit,
+	 * we ensure that whichever updater goes second will see the other
+	 * updater's changes as committed, and thus the final state of the file
+	 * will include all updates.
+	 *
+	 * We use a lock on "database 0" to protect writing the pg_database flat
+	 * file, and a lock on "role 0" to protect the auth file.  This is a bit
+	 * ugly but it's not worth inventing any more-general convention.  (Any
+	 * two locktags that are never used for anything else would do.)
+	 *
+	 * This is safe against deadlock as long as these are the very last locks
+	 * acquired during the transaction.
+	 */
+	if (database_file_update_subid != InvalidSubTransactionId)
+		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
+						 AccessExclusiveLock);
+
+	if (auth_file_update_subid != InvalidSubTransactionId)
+		LockSharedObject(AuthIdRelationId, InvalidOid, 0,
+						 AccessExclusiveLock);
+
 	/* Okay to write the files */
 	if (database_file_update_subid != InvalidSubTransactionId)
 	{
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 4ddc7f712afb03f52c326d8821e99d2d191cc542..0be0fd854388340cffce82ec07b420480b183f93 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.165 2006/05/03 22:45:26 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.166 2006/05/04 16:07:29 tgl Exp $
  *
  *
  *-------------------------------------------------------------------------
@@ -16,14 +16,10 @@
 #include "postgres.h"
 
 #include <fcntl.h>
-#include <sys/file.h>
-#include <math.h>
 #include <unistd.h>
 
-#include "access/genam.h"
 #include "access/heapam.h"
 #include "catalog/catalog.h"
-#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
 #include "catalog/pg_database.h"
@@ -42,7 +38,6 @@
 #include "storage/smgr.h"
 #include "utils/acl.h"
 #include "utils/flatfiles.h"
-#include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/portal.h"
 #include "utils/relcache.h"
@@ -51,7 +46,7 @@
 
 
 static bool FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace);
-static void ReverifyMyDatabase(const char *name, bool am_superuser);
+static void CheckMyDatabase(const char *name, bool am_superuser);
 static void InitCommunication(void);
 static void ShutdownPostgres(int code, Datum arg);
 static bool ThereIsAtLeastOneRole(void);
@@ -72,7 +67,7 @@ static bool ThereIsAtLeastOneRole(void);
  * file" copy of pg_database that is helpfully maintained by flatfiles.c.
  * This is subject to various race conditions, so after we have the
  * transaction infrastructure started, we have to recheck the information;
- * see ReverifyMyDatabase.
+ * see InitPostgres.
  */
 static bool
 FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace)
@@ -108,76 +103,35 @@ FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace)
 }
 
 /*
- * ReverifyMyDatabase -- recheck info obtained by FindMyDatabase
- *
- * Since FindMyDatabase cannot lock pg_database, the information it read
- * could be stale; for example we might have attached to a database that's in
- * process of being destroyed by dropdb().	This routine is called after
- * we have all the locking and other infrastructure running --- now we can
- * check that we are really attached to a valid database.
- *
- * In reality, if dropdb() is running in parallel with our startup,
- * it's pretty likely that we will have failed before now, due to being
- * unable to read some of the system tables within the doomed database.
- * This routine just exists to make *sure* we have not started up in an
- * invalid database.  If we quit now, we should have managed to avoid
- * creating any serious problems.
- *
- * This is also a handy place to fetch the database encoding info out
- * of pg_database.
- *
- * To avoid having to read pg_database more times than necessary
- * during session startup, this place is also fitting to check CONNECT
- * privilege and set up any database-specific configuration variables.
+ * CheckMyDatabase -- fetch information from the pg_database entry for our DB
  */
 static void
-ReverifyMyDatabase(const char *name, bool am_superuser)
+CheckMyDatabase(const char *name, bool am_superuser)
 {
-	Relation	pgdbrel;
-	SysScanDesc pgdbscan;
-	ScanKeyData key;
 	HeapTuple	tup;
 	Form_pg_database dbform;
 
-	/*
-	 * Because we grab RowShareLock here, we can be sure that dropdb() is not
-	 * running in parallel with us (any more).
-	 */
-	pgdbrel = heap_open(DatabaseRelationId, RowShareLock);
-
-	ScanKeyInit(&key,
-				Anum_pg_database_datname,
-				BTEqualStrategyNumber, F_NAMEEQ,
-				NameGetDatum(name));
-
-	pgdbscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
-								  SnapshotNow, 1, &key);
-
-	tup = systable_getnext(pgdbscan);
-	if (!HeapTupleIsValid(tup) ||
-		HeapTupleGetOid(tup) != MyDatabaseId)
-	{
-		/* OOPS */
-		heap_close(pgdbrel, RowShareLock);
+	/* Fetch our real pg_database row */
+	tup = SearchSysCache(DATABASEOID,
+						 ObjectIdGetDatum(MyDatabaseId),
+						 0, 0, 0);
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for database %u", MyDatabaseId);
+	dbform = (Form_pg_database) GETSTRUCT(tup);
 
-		/*
-		 * The only real problem I could have created is to load dirty buffers
-		 * for the dead database into shared buffer cache; if I did, some
-		 * other backend will eventually try to write them and die in
-		 * mdblindwrt.	Flush any such pages to forestall trouble.
-		 */
-		DropDatabaseBuffers(MyDatabaseId);
-		/* Now I can commit hara-kiri with a clear conscience... */
+	/* This recheck is strictly paranoia */
+	if (strcmp(name, NameStr(dbform->datname)) != 0)
 		ereport(FATAL,
 				(errcode(ERRCODE_UNDEFINED_DATABASE),
-		  errmsg("database \"%s\", OID %u, has disappeared from pg_database",
-				 name, MyDatabaseId)));
-	}
-
-	dbform = (Form_pg_database) GETSTRUCT(tup);
+				 errmsg("database \"%s\" has disappeared from pg_database",
+						name),
+				 errdetail("Database OID %u now seems to belong to \"%s\".",
+						   MyDatabaseId, NameStr(dbform->datname))));
 
 	/*
-	 * These next checks are not enforced when in standalone mode, so that
+	 * Check permissions to connect to the database.
+	 *
+	 * These checks are not enforced when in standalone mode, so that
 	 * there is a way to recover from disabling all access to all databases,
 	 * for example "UPDATE pg_database SET datallowconn = false;".
 	 *
@@ -246,8 +200,8 @@ ReverifyMyDatabase(const char *name, bool am_superuser)
 		Datum		datum;
 		bool		isnull;
 
-		datum = heap_getattr(tup, Anum_pg_database_datconfig,
-							 RelationGetDescr(pgdbrel), &isnull);
+		datum = SysCacheGetAttr(DATABASEOID, tup, Anum_pg_database_datconfig,
+								&isnull);
 		if (!isnull)
 		{
 			ArrayType  *a = DatumGetArrayTypeP(datum);
@@ -256,8 +210,7 @@ ReverifyMyDatabase(const char *name, bool am_superuser)
 		}
 	}
 
-	systable_endscan(pgdbscan);
-	heap_close(pgdbrel, RowShareLock);
+	ReleaseSysCache(tup);
 }
 
 
@@ -337,9 +290,11 @@ InitPostgres(const char *dbname, const char *username)
 	bool		bootstrap = IsBootstrapProcessingMode();
 	bool		autovacuum = IsAutoVacuumProcess();
 	bool		am_superuser;
+	char	   *fullpath;
 
 	/*
-	 * Set up the global variables holding database id and path.
+	 * Set up the global variables holding database id and path.  But note
+	 * we won't actually try to touch the database just yet.
 	 *
 	 * We take a shortcut in the bootstrap case, otherwise we have to look up
 	 * the db name in pg_database.
@@ -348,55 +303,24 @@ InitPostgres(const char *dbname, const char *username)
 	{
 		MyDatabaseId = TemplateDbOid;
 		MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
-		SetDatabasePath(GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace));
 	}
 	else
 	{
-		char	   *fullpath;
-
-		/*
-		 * Formerly we validated DataDir here, but now that's done earlier.
-		 */
-
 		/*
 		 * Find oid and tablespace of the database we're about to open. Since
 		 * we're not yet up and running we have to use the hackish
-		 * FindMyDatabase.
+		 * FindMyDatabase, which looks in the flat-file copy of pg_database.
 		 */
 		if (!FindMyDatabase(dbname, &MyDatabaseId, &MyDatabaseTableSpace))
 			ereport(FATAL,
 					(errcode(ERRCODE_UNDEFINED_DATABASE),
 					 errmsg("database \"%s\" does not exist",
 							dbname)));
-
-		fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
-
-		/* Verify the database path */
-
-		if (access(fullpath, F_OK) == -1)
-		{
-			if (errno == ENOENT)
-				ereport(FATAL,
-						(errcode(ERRCODE_UNDEFINED_DATABASE),
-						 errmsg("database \"%s\" does not exist",
-								dbname),
-					errdetail("The database subdirectory \"%s\" is missing.",
-							  fullpath)));
-			else
-				ereport(FATAL,
-						(errcode_for_file_access(),
-						 errmsg("could not access directory \"%s\": %m",
-								fullpath)));
-		}
-
-		ValidatePgVersion(fullpath);
-
-		SetDatabasePath(fullpath);
 	}
 
-	/*
-	 * Code after this point assumes we are in the proper directory!
-	 */
+	fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
+
+	SetDatabasePath(fullpath);
 
 	/*
 	 * Finish filling in the PGPROC struct, and add it to the ProcArray.
@@ -459,10 +383,78 @@ InitPostgres(const char *dbname, const char *username)
 	 */
 	on_shmem_exit(ShutdownPostgres, 0);
 
-	/* start a new transaction here before access to db */
+	/*
+	 * Start a new transaction here before first access to db
+	 */
 	if (!bootstrap)
 		StartTransactionCommand();
 
+	/*
+	 * Now that we have a transaction, we can take locks.  Take a writer's
+	 * lock on the database we are trying to connect to.  If there is
+	 * a concurrently running DROP DATABASE on that database, this will
+	 * block us until it finishes (and has updated the flat file copy
+	 * of pg_database).
+	 *
+	 * Note that the lock is not held long, only until the end of this
+	 * startup transaction.  This is OK since we are already advertising
+	 * our use of the database in the PGPROC array; anyone trying a DROP
+	 * DATABASE after this point will see us there.
+	 *
+	 * Note: use of RowExclusiveLock here is reasonable because we envision
+	 * our session as being a concurrent writer of the database.  If we had
+	 * a way of declaring a session as being guaranteed-read-only, we could
+	 * use AccessShareLock for such sessions and thereby not conflict against
+	 * CREATE DATABASE.
+	 */
+	if (!bootstrap)
+		LockSharedObject(DatabaseRelationId, MyDatabaseId, 0,
+						 RowExclusiveLock);
+
+	/*
+	 * Recheck the flat file copy of pg_database to make sure the target
+	 * database hasn't gone away.  If there was a concurrent DROP DATABASE,
+	 * this ensures we will die cleanly without creating a mess.
+	 */
+	if (!bootstrap)
+	{
+		Oid		dbid2;
+		Oid		tsid2;
+
+		if (!FindMyDatabase(dbname, &dbid2, &tsid2) ||
+			dbid2 != MyDatabaseId || tsid2 != MyDatabaseTableSpace)
+			ereport(FATAL,
+					(errcode(ERRCODE_UNDEFINED_DATABASE),
+					 errmsg("database \"%s\" does not exist",
+							dbname),
+				errdetail("It seems to have just been dropped or renamed.")));
+	}
+
+	/*
+	 * Now we should be able to access the database directory safely.
+	 * Verify it's there and looks reasonable.
+	 */
+	if (!bootstrap)
+	{
+		if (access(fullpath, F_OK) == -1)
+		{
+			if (errno == ENOENT)
+				ereport(FATAL,
+						(errcode(ERRCODE_UNDEFINED_DATABASE),
+						 errmsg("database \"%s\" does not exist",
+								dbname),
+						 errdetail("The database subdirectory \"%s\" is missing.",
+								   fullpath)));
+			else
+				ereport(FATAL,
+						(errcode_for_file_access(),
+						 errmsg("could not access directory \"%s\": %m",
+								fullpath)));
+		}
+
+		ValidatePgVersion(fullpath);
+	}
+
 	/*
 	 * It's now possible to do real access to the system catalogs.
 	 *
@@ -499,22 +491,21 @@ InitPostgres(const char *dbname, const char *username)
 		am_superuser = superuser();
 	}
 
-	/* set up ACL framework (so ReverifyMyDatabase can check permissions) */
+	/* set up ACL framework (so CheckMyDatabase can check permissions) */
 	initialize_acl();
 
 	/*
-	 * Unless we are bootstrapping, double-check that InitMyDatabaseInfo() got
-	 * a correct result.  We can't do this until all the database-access
-	 * infrastructure is up.  (Also, it wants to know if the user is a
-	 * superuser, so the above stuff has to happen first.)
+	 * Read the real pg_database row for our database, check permissions
+	 * and set up database-specific GUC settings.  We can't do this until all
+	 * the database-access infrastructure is up.  (Also, it wants to know if
+	 * the user is a superuser, so the above stuff has to happen first.)
 	 */
 	if (!bootstrap)
-		ReverifyMyDatabase(dbname, am_superuser);
+		CheckMyDatabase(dbname, am_superuser);
 
 	/*
 	 * Final phase of relation cache startup: write a new cache file if
-	 * necessary.  This is done after ReverifyMyDatabase to avoid writing a
-	 * cache file into a dead database.
+	 * necessary.  (XXX this could be folded back into Phase2)
 	 */
 	RelationCacheInitializePhase3();
 
@@ -530,7 +521,7 @@ InitPostgres(const char *dbname, const char *username)
 
 	/*
 	 * Initialize various default states that can't be set up until we've
-	 * selected the active user and done ReverifyMyDatabase.
+	 * selected the active user and gotten the right GUC settings.
 	 */
 
 	/* set default namespace search path */
@@ -587,13 +578,13 @@ ThereIsAtLeastOneRole(void)
 	HeapScanDesc scan;
 	bool		result;
 
-	pg_authid_rel = heap_open(AuthIdRelationId, AccessExclusiveLock);
+	pg_authid_rel = heap_open(AuthIdRelationId, AccessShareLock);
 
 	scan = heap_beginscan(pg_authid_rel, SnapshotNow, 0, NULL);
 	result = (heap_getnext(scan, ForwardScanDirection) != NULL);
 
 	heap_endscan(scan);
-	heap_close(pg_authid_rel, AccessExclusiveLock);
+	heap_close(pg_authid_rel, AccessShareLock);
 
 	return result;
 }