diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 1dac14ead2e6ad0285ca51adc84d8c8907adae9a..96f964fb95fc75cf17d70cb532d0ef5d752371b0 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -15,7 +15,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.162 2005/06/28 05:08:53 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.163 2005/06/29 20:34:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -194,8 +194,8 @@ createdb(const CreatedbStmt *stmt)
 
 	if (is_member_of_role(GetUserId(), datdba))
 	{
-		/* creating database for self: can be superuser or createdb */
-		if (!superuser() && !have_createdb_privilege())
+		/* creating database for self: createdb is required */
+		if (!have_createdb_privilege())
 			ereport(ERROR,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 					 errmsg("permission denied to create database")));
@@ -759,7 +759,7 @@ RenameDatabase(const char *oldname, const char *newname)
 					   oldname);
 
 	/* must have createdb rights */
-	if (!superuser() && !have_createdb_privilege())
+	if (!have_createdb_privilege())
 		ereport(ERROR,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 				 errmsg("permission denied to rename database")));
@@ -1044,6 +1044,10 @@ have_createdb_privilege(void)
 	bool		result = false;
 	HeapTuple	utup;
 
+	/* Superusers can always do everything */
+	if (superuser())
+		return true;
+
 	utup = SearchSysCache(AUTHOID,
 						  ObjectIdGetDatum(GetUserId()),
 						  0, 0, 0);
diff --git a/src/backend/commands/user.c b/src/backend/commands/user.c
index e1e3e16a3500529010ecad1e76c3eba03b8fec2c..982c286cde7e05d1f76defa919e3b0cc3d879b1b 100644
--- a/src/backend/commands/user.c
+++ b/src/backend/commands/user.c
@@ -6,12 +6,13 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/commands/user.c,v 1.154 2005/06/28 22:16:44 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/user.c,v 1.155 2005/06/29 20:34:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/heapam.h"
 #include "catalog/indexing.h"
 #include "catalog/pg_auth_members.h"
@@ -20,8 +21,8 @@
 #include "commands/user.h"
 #include "libpq/crypt.h"
 #include "miscadmin.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
-#include "utils/catcache.h"
 #include "utils/flatfiles.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
@@ -40,6 +41,29 @@ static void DelRoleMems(const char *rolename, Oid roleid,
 						bool admin_opt);
 
 
+/* Check if current user has createrole privileges */
+static bool
+have_createrole_privilege(void)
+{
+	bool		result = false;
+	HeapTuple	utup;
+
+	/* Superusers can always do everything */
+	if (superuser())
+		return true;
+
+	utup = SearchSysCache(AUTHOID,
+						  ObjectIdGetDatum(GetUserId()),
+						  0, 0, 0);
+	if (HeapTupleIsValid(utup))
+	{
+		result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreaterole;
+		ReleaseSysCache(utup);
+	}
+	return result;
+}
+
+
 /*
  * CREATE ROLE
  */
@@ -66,8 +90,9 @@ CreateRole(CreateRoleStmt *stmt)
 	List	   *adminmembers = NIL;		/* roles to be admins of this role */
 	char	   *validUntil = NULL;		/* time the login is valid until */
 	DefElem    *dpassword = NULL;
-	DefElem    *dcreatedb = NULL;
+	DefElem    *dissuper = NULL;
 	DefElem    *dcreaterole = NULL;
+	DefElem    *dcreatedb = NULL;
 	DefElem    *dcanlogin = NULL;
 	DefElem    *daddroleto = NULL;
 	DefElem    *drolemembers = NULL;
@@ -98,6 +123,14 @@ CreateRole(CreateRoleStmt *stmt)
 			ereport(WARNING,
 					(errmsg("SYSID can no longer be specified")));
 		}
+		else if (strcmp(defel->defname, "superuser") == 0)
+		{
+			if (dissuper)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			dissuper = defel;
+		}
 		else if (strcmp(defel->defname, "createrole") == 0)
 		{
 			if (dcreaterole)
@@ -159,32 +192,40 @@ CreateRole(CreateRoleStmt *stmt)
 				 defel->defname);
 	}
 
-	if (dcreatedb)
-		createdb = intVal(dcreatedb->arg) != 0;
+	if (dpassword)
+		password = strVal(dpassword->arg);
+	if (dissuper)
+		issuper = intVal(dissuper->arg) != 0;
 	if (dcreaterole)
-	{
 		createrole = intVal(dcreaterole->arg) != 0;
-		/* XXX issuper is implied by createrole for now */
-		issuper = createrole;
-	}
+	if (dcreatedb)
+		createdb = intVal(dcreatedb->arg) != 0;
 	if (dcanlogin)
 		canlogin = intVal(dcanlogin->arg) != 0;
-	if (dvalidUntil)
-		validUntil = strVal(dvalidUntil->arg);
-	if (dpassword)
-		password = strVal(dpassword->arg);
 	if (daddroleto)
 		addroleto = (List *) daddroleto->arg;
 	if (drolemembers)
 		rolemembers = (List *) drolemembers->arg;
 	if (dadminmembers)
 		adminmembers = (List *) dadminmembers->arg;
+	if (dvalidUntil)
+		validUntil = strVal(dvalidUntil->arg);
 
 	/* Check some permissions first */
-	if (!superuser())
-		ereport(ERROR,
-				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 errmsg("must be superuser to create roles")));
+	if (issuper)
+	{
+		if (!superuser())
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("must be superuser to create superusers")));
+	}
+	else
+	{
+		if (!have_createrole_privilege())
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("permission denied to create role")));
+	}
 
 	if (strcmp(stmt->role, "public") == 0)
 		ereport(ERROR,
@@ -260,11 +301,15 @@ CreateRole(CreateRoleStmt *stmt)
 	 * Insert new record in the pg_authid table
 	 */
 	roleid = simple_heap_insert(pg_authid_rel, tuple);
-	Assert(OidIsValid(roleid));
-
-	/* Update indexes */
 	CatalogUpdateIndexes(pg_authid_rel, tuple);
 
+	/*
+	 * Advance command counter so we can see new record; else tests
+	 * in AddRoleMems may fail.
+	 */
+	if (addroleto || adminmembers || rolemembers)
+		CommandCounterIncrement();
+
 	/*
 	 * Add the new role to the specified existing roles.
 	 */
@@ -327,11 +372,12 @@ AlterRole(AlterRoleStmt *stmt)
 	List	   *rolemembers = NIL;		/* roles to be added/removed */
 	char	   *validUntil = NULL;		/* time the login is valid until */
 	DefElem    *dpassword = NULL;
-	DefElem    *dcreatedb = NULL;
+	DefElem    *dissuper = NULL;
 	DefElem    *dcreaterole = NULL;
+	DefElem    *dcreatedb = NULL;
 	DefElem    *dcanlogin = NULL;
-	DefElem    *dvalidUntil = NULL;
 	DefElem    *drolemembers = NULL;
+	DefElem    *dvalidUntil = NULL;
 	Oid			roleid;
 
 	/* Extract options from the statement node tree */
@@ -353,13 +399,13 @@ AlterRole(AlterRoleStmt *stmt)
 			else if (strcmp(defel->defname, "unencryptedPassword") == 0)
 				encrypt_password = false;
 		}
-		else if (strcmp(defel->defname, "createdb") == 0)
+		else if (strcmp(defel->defname, "superuser") == 0)
 		{
-			if (dcreatedb)
+			if (dissuper)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
-			dcreatedb = defel;
+			dissuper = defel;
 		}
 		else if (strcmp(defel->defname, "createrole") == 0)
 		{
@@ -369,21 +415,21 @@ AlterRole(AlterRoleStmt *stmt)
 						 errmsg("conflicting or redundant options")));
 			dcreaterole = defel;
 		}
-		else if (strcmp(defel->defname, "canlogin") == 0)
+		else if (strcmp(defel->defname, "createdb") == 0)
 		{
-			if (dcanlogin)
+			if (dcreatedb)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
-			dcanlogin = defel;
+			dcreatedb = defel;
 		}
-		else if (strcmp(defel->defname, "validUntil") == 0)
+		else if (strcmp(defel->defname, "canlogin") == 0)
 		{
-			if (dvalidUntil)
+			if (dcanlogin)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
-			dvalidUntil = defel;
+			dcanlogin = defel;
 		}
 		else if (strcmp(defel->defname, "rolemembers") == 0 &&
 				 stmt->action != 0)
@@ -394,41 +440,33 @@ AlterRole(AlterRoleStmt *stmt)
 						 errmsg("conflicting or redundant options")));
 			drolemembers = defel;
 		}
+		else if (strcmp(defel->defname, "validUntil") == 0)
+		{
+			if (dvalidUntil)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			dvalidUntil = defel;
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 defel->defname);
 	}
 
-	if (dcreatedb)
-		createdb = intVal(dcreatedb->arg);
+	if (dpassword)
+		password = strVal(dpassword->arg);
+	if (dissuper)
+		issuper = intVal(dissuper->arg);
 	if (dcreaterole)
-	{
 		createrole = intVal(dcreaterole->arg);
-		/* XXX createrole implies issuper for now */
-		issuper = createrole;
-	}
+	if (dcreatedb)
+		createdb = intVal(dcreatedb->arg);
 	if (dcanlogin)
 		canlogin = intVal(dcanlogin->arg);
-	if (dvalidUntil)
-		validUntil = strVal(dvalidUntil->arg);
-	if (dpassword)
-		password = strVal(dpassword->arg);
 	if (drolemembers)
 		rolemembers = (List *) drolemembers->arg;
-
-	/* must be superuser or just want to change your own password */
-	if (!superuser() &&
-		!(issuper < 0 &&
-		  createrole < 0 &&
-		  createdb < 0 &&
-		  canlogin < 0 &&
-		  !validUntil &&
-		  !rolemembers &&
-		  password &&
-		  strcmp(GetUserNameFromId(GetUserId()), stmt->role) == 0))
-		ereport(ERROR,
-				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 errmsg("permission denied")));
+	if (dvalidUntil)
+		validUntil = strVal(dvalidUntil->arg);
 
 	/*
 	 * Scan the pg_authid relation to be certain the user exists. Note we
@@ -447,6 +485,32 @@ AlterRole(AlterRoleStmt *stmt)
 
 	roleid = HeapTupleGetOid(tuple);
 
+	/*
+	 * To mess with a superuser you gotta be superuser; else you need
+	 * createrole, or just want to change your own password
+	 */
+	if (((Form_pg_authid) GETSTRUCT(tuple))->rolsuper || issuper >= 0)
+	{
+		if (!superuser())
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("must be superuser to alter superusers")));
+	}
+	else
+	{
+		if (!have_createrole_privilege() &&
+			!(createrole < 0 &&
+			  createdb < 0 &&
+			  canlogin < 0 &&
+			  !rolemembers &&
+			  !validUntil &&
+			  password &&
+			  roleid == GetUserId()))
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("permission denied")));
+	}
+
 	/*
 	 * Build an updated tuple, perusing the information just obtained
 	 */
@@ -454,10 +518,6 @@ AlterRole(AlterRoleStmt *stmt)
 	MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
 	MemSet(new_record_repl, ' ', sizeof(new_record_repl));
 
-	new_record[Anum_pg_authid_rolname - 1] = DirectFunctionCall1(namein,
-											CStringGetDatum(stmt->role));
-	new_record_repl[Anum_pg_authid_rolname - 1] = 'r';
-
 	/*
 	 * issuper/createrole/catupdate/etc
 	 *
@@ -532,10 +592,11 @@ AlterRole(AlterRoleStmt *stmt)
 	heap_freetuple(new_tuple);
 
 	/*
-	 * Now we can clean up; but keep lock until commit (to avoid possible
-	 * deadlock when commit code tries to acquire lock).
+	 * Advance command counter so we can see new record; else tests
+	 * in AddRoleMems may fail.
 	 */
-	heap_close(pg_authid_rel, NoLock);
+	if (rolemembers)
+		CommandCounterIncrement();
 
 	if (stmt->action == +1)		/* add members to role */
 		AddRoleMems(stmt->role, roleid,
@@ -546,6 +607,12 @@ AlterRole(AlterRoleStmt *stmt)
 					rolemembers, roleNamesToIds(rolemembers),
 					false);
 
+	/*
+	 * Now we can clean up; but keep lock until commit (to avoid possible
+	 * deadlock when commit code tries to acquire lock).
+	 */
+	heap_close(pg_authid_rel, NoLock);
+
 	/*
 	 * Set flag to update flat auth file at commit.
 	 */
@@ -583,11 +650,25 @@ AlterRoleSet(AlterRoleSetStmt *stmt)
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("role \"%s\" does not exist", stmt->role)));
 
-	if (!(superuser() ||
-		(HeapTupleGetOid(oldtuple) == GetUserId())))
-		ereport(ERROR,
-				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 errmsg("permission denied")));
+	/*
+	 * To mess with a superuser you gotta be superuser; else you need
+	 * createrole, or just want to change your own settings
+	 */
+	if (((Form_pg_authid) GETSTRUCT(oldtuple))->rolsuper)
+	{
+		if (!superuser())
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("must be superuser to alter superusers")));
+	}
+	else
+	{
+		if (!have_createrole_privilege() &&
+			HeapTupleGetOid(oldtuple) != GetUserId())
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("permission denied")));
+	}
 
 	for (i = 0; i < Natts_pg_authid; i++)
 		repl_repl[i] = ' ';
@@ -622,9 +703,10 @@ AlterRoleSet(AlterRoleSetStmt *stmt)
 			repl_null[Anum_pg_authid_rolconfig - 1] = 'n';
 	}
 
-	newtuple = heap_modifytuple(oldtuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
-	simple_heap_update(rel, &oldtuple->t_self, newtuple);
+	newtuple = heap_modifytuple(oldtuple, RelationGetDescr(rel),
+								repl_val, repl_null, repl_repl);
 
+	simple_heap_update(rel, &oldtuple->t_self, newtuple);
 	CatalogUpdateIndexes(rel, newtuple);
 
 	ReleaseSysCache(oldtuple);
@@ -641,18 +723,19 @@ DropRole(DropRoleStmt *stmt)
 	Relation	pg_authid_rel, pg_auth_members_rel;
 	ListCell   *item;
 
-	if (!superuser())
+	if (!have_createrole_privilege())
 		ereport(ERROR,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 errmsg("must be superuser to drop roles")));
+				 errmsg("permission denied to drop role")));
 
 	/*
-	 * Scan the pg_authid relation to find the Oid of the role to be
-	 * deleted.  Note we secure exclusive lock, because we need to protect
-	 * our update of the flat auth file.
+	 * Scan the pg_authid relation to find the Oid of the role(s) to be
+	 * deleted.  Note we secure exclusive lock on pg_authid, because we
+	 * need to protect our update of the flat auth file.  A regular
+	 * writer's lock on pg_auth_members is sufficient though.
 	 */
 	pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock);
-	pg_auth_members_rel = heap_open(AuthMemRelationId, ExclusiveLock);
+	pg_auth_members_rel = heap_open(AuthMemRelationId, RowExclusiveLock);
 
 	foreach(item, stmt->roles)
 	{
@@ -663,9 +746,8 @@ DropRole(DropRoleStmt *stmt)
 		TupleDesc	pg_dsc;
 		ScanKeyData scankey;
 		HeapScanDesc scan;
-		CatCList	*auth_mem_list;
+		SysScanDesc sscan;
 		Oid			roleid;
-		int			i;
 
 		tuple = SearchSysCache(AUTHNAME,
 							   PointerGetDatum(role),
@@ -686,6 +768,17 @@ DropRole(DropRoleStmt *stmt)
 					(errcode(ERRCODE_OBJECT_IN_USE),
 					 errmsg("session role cannot be dropped")));
 
+		/*
+		 * For safety's sake, we allow createrole holders to drop ordinary
+		 * roles but not superuser roles.  This is mainly to avoid the
+		 * scenario where you accidentally drop the last superuser.
+		 */
+		if (((Form_pg_authid) GETSTRUCT(tuple))->rolsuper &&
+			!superuser())
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("must be superuser to drop superusers")));
+
 		/*
 		 * Check if role still owns a database. If so, error out.
 		 *
@@ -732,36 +825,55 @@ DropRole(DropRoleStmt *stmt)
 		ReleaseSysCache(tuple);
 
 		/*
-		 * Remove role from roles
+		 * Remove role from the pg_auth_members table.  We have to remove
+		 * all tuples that show it as either a role or a member.
 		 *
-		 * scan pg_auth_members and remove tuples which have 
-		 * roleid == member or roleid == role
+		 * XXX what about grantor entries?  Maybe we should do one heap scan.
 		 */
-		auth_mem_list = SearchSysCacheList(AUTHMEMROLEMEM, 1,
-											ObjectIdGetDatum(roleid),
-											0, 0, 0);
+		ScanKeyInit(&scankey,
+					Anum_pg_auth_members_roleid,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(roleid));
+
+		sscan = systable_beginscan(pg_auth_members_rel, AuthMemRoleMemIndexId,
+								   true, SnapshotNow, 1, &scankey);
 
-		for (i = 0; i < auth_mem_list->n_members; i++)
+		while (HeapTupleIsValid(tmp_tuple = systable_getnext(sscan)))
 		{
-			HeapTuple	authmemtup = &auth_mem_list->members[i]->tuple;
-			simple_heap_delete(pg_auth_members_rel, &authmemtup->t_self);
+			simple_heap_delete(pg_auth_members_rel, &tmp_tuple->t_self);
 		}
-		ReleaseSysCacheList(auth_mem_list);
 
-		auth_mem_list = SearchSysCacheList(AUTHMEMMEMROLE, 1,
-											ObjectIdGetDatum(roleid),
-											0, 0, 0);
+		systable_endscan(sscan);
 
-		for (i = 0; i < auth_mem_list->n_members; i++)
+		ScanKeyInit(&scankey,
+					Anum_pg_auth_members_member,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(roleid));
+
+		sscan = systable_beginscan(pg_auth_members_rel, AuthMemMemRoleIndexId,
+								   true, SnapshotNow, 1, &scankey);
+
+		while (HeapTupleIsValid(tmp_tuple = systable_getnext(sscan)))
 		{
-			HeapTuple	authmemtup = &auth_mem_list->members[i]->tuple;
-			simple_heap_delete(pg_auth_members_rel, &authmemtup->t_self);
+			simple_heap_delete(pg_auth_members_rel, &tmp_tuple->t_self);
 		}
-		ReleaseSysCacheList(auth_mem_list);
+
+		systable_endscan(sscan);
+
+		/*
+		 * Advance command counter so that later iterations of this loop
+		 * will see the changes already made.  This is essential if, for
+		 * example, we are trying to drop both a role and one of its
+		 * direct members --- we'll get an error if we try to delete the
+		 * linking pg_auth_members tuple twice.  (We do not need a CCI
+		 * between the two delete loops above, because it's not allowed
+		 * for a role to directly contain itself.)
+		 */
+		CommandCounterIncrement();
 	}
 
 	/*
-	 * Now we can clean up; but keep lock until commit (to avoid possible
+	 * Now we can clean up; but keep locks until commit (to avoid possible
 	 * deadlock when commit code tries to acquire lock).
 	 */
 	heap_close(pg_auth_members_rel, NoLock);
@@ -791,7 +903,7 @@ RenameRole(const char *oldname, const char *newname)
 	int			i;
 	Oid			roleid;
 
-	/* ExclusiveLock because we need to update the password file */
+	/* ExclusiveLock because we need to update the flat auth file */
 	rel = heap_open(AuthIdRelationId, ExclusiveLock);
 	dsc = RelationGetDescr(rel);
 
@@ -825,12 +937,25 @@ RenameRole(const char *oldname, const char *newname)
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("role \"%s\" already exists", newname)));
 
-	/* must be superuser */
-	if (!superuser())
-		ereport(ERROR,
-				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 errmsg("must be superuser to rename roles")));
+	/*
+	 * createrole is enough privilege unless you want to mess with a superuser
+	 */
+	if (((Form_pg_authid) GETSTRUCT(oldtuple))->rolsuper)
+	{
+		if (!superuser())
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("must be superuser to rename superusers")));
+	}
+	else
+	{
+		if (!have_createrole_privilege())
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("permission denied to rename role")));
+	}
 
+	/* OK, construct the modified tuple */
 	for (i = 0; i < Natts_pg_authid; i++)
 		repl_repl[i] = ' ';
 
@@ -873,6 +998,7 @@ RenameRole(const char *oldname, const char *newname)
 void
 GrantRole(GrantRoleStmt *stmt)
 {
+	Relation	pg_authid_rel;
 	Oid			grantor;
 	List	   *grantee_ids;
 	ListCell   *item;
@@ -884,6 +1010,13 @@ GrantRole(GrantRoleStmt *stmt)
 
 	grantee_ids = roleNamesToIds(stmt->grantee_roles);
 
+	/*
+	 * Even though this operation doesn't change pg_authid, we must
+	 * secure exclusive lock on it to protect our update of the flat
+	 * auth file.
+	 */
+	pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock);
+
 	/*
 	 * Step through all of the granted roles and add/remove
 	 * entries for the grantees, or, if admin_opt is set, then
@@ -906,6 +1039,8 @@ GrantRole(GrantRoleStmt *stmt)
 						stmt->admin_opt);
 	}
 
+	heap_close(pg_authid_rel, NoLock);
+
 	/*
 	 * Set flag to update flat auth file at commit.
 	 */
@@ -943,6 +1078,9 @@ roleNamesToIds(List *memberNames)
  * memberIds: OIDs of roles to add
  * grantorId: who is granting the membership
  * admin_opt: granting admin option?
+ *
+ * Note: caller is responsible for holding ExclusiveLock on pg_authid,
+ * and for calling auth_file_update_needed().
  */
 static void
 AddRoleMems(const char *rolename, Oid roleid,
@@ -961,45 +1099,35 @@ AddRoleMems(const char *rolename, Oid roleid,
 		return;
 
 	/*
-	 * Check permissions: must be superuser or have admin option on the
-	 * role to be changed.
-	 *
-	 * XXX: The admin option is not considered to be inherited through
-	 * multiple roles, unlike normal 'is_member_of_role' privilege checks.
+	 * Check permissions: must have createrole or admin option on the
+	 * role to be changed.  To mess with a superuser role, you gotta
+	 * be superuser.
 	 */
-	if (!superuser()) 
+	if (superuser_arg(roleid))
 	{
-		HeapTuple authmem_chk_tuple;
-		Form_pg_auth_members authmem_chk;
-
-		if (grantorId != GetUserId())
-			ereport(ERROR,
-					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-					 errmsg("must be superuser to set grantor ID")));
-
-		authmem_chk_tuple = SearchSysCache(AUTHMEMROLEMEM,
-										   ObjectIdGetDatum(roleid),
-										   ObjectIdGetDatum(grantorId),
-										   0, 0);
-		if (!HeapTupleIsValid(authmem_chk_tuple))
+		if (!superuser())
 			ereport(ERROR,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-					 errmsg("must be superuser or have admin option on role \"%s\"",
-							rolename)));
-
-		authmem_chk = (Form_pg_auth_members) GETSTRUCT(authmem_chk_tuple);
-		if (!authmem_chk->admin_option)
+					 errmsg("must be superuser to alter superusers")));
+	}
+	else
+	{
+		if (!have_createrole_privilege() &&
+			!is_admin_of_role(grantorId, roleid))
 			ereport(ERROR,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-					 errmsg("must be superuser or have admin option on role \"%s\"",
+					 errmsg("must have admin option on role \"%s\"",
 							rolename)));
-		ReleaseSysCache(authmem_chk_tuple);
 	}
 
-	/*
-	 * Secure exclusive lock to protect our update of the flat auth file.
-	 */
-	pg_authmem_rel = heap_open(AuthMemRelationId, ExclusiveLock);
+	/* XXX not sure about this check */
+	if (grantorId != GetUserId() && !superuser())
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg("must be superuser to set grantor ID")));
+
+	/* We need only regular writer's lock on pg_auth_members */
+	pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock);
 	pg_authmem_dsc = RelationGetDescr(pg_authmem_rel);
 
 	forboth(nameitem, memberNames, iditem, memberIds)
@@ -1012,6 +1140,18 @@ AddRoleMems(const char *rolename, Oid roleid,
 		char    new_record_nulls[Natts_pg_auth_members];
 		char    new_record_repl[Natts_pg_auth_members];
 
+		/*
+		 * Refuse creation of membership loops, including the trivial case
+		 * where a role is made a member of itself.  We do this by checking
+		 * to see if the target role is already a member of the proposed
+		 * member role.
+		 */
+		if (is_member_of_role(roleid, memberid))
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_GRANT_OPERATION),
+					(errmsg("role \"%s\" is a member of role \"%s\"",
+							rolename, membername))));
+
 		/*
 		 * Check if entry for this role/member already exists;
 		 * if so, give warning unless we are adding admin option.
@@ -1020,7 +1160,9 @@ AddRoleMems(const char *rolename, Oid roleid,
 									   ObjectIdGetDatum(roleid),
 									   ObjectIdGetDatum(memberid),
 									   0, 0);
-		if (HeapTupleIsValid(authmem_tuple) && !admin_opt)
+		if (HeapTupleIsValid(authmem_tuple) &&
+			(!admin_opt || 
+			 ((Form_pg_auth_members) GETSTRUCT(authmem_tuple))->admin_option))
 		{
 			ereport(NOTICE,
 					(errmsg("role \"%s\" is already a member of role \"%s\"",
@@ -1057,6 +1199,9 @@ AddRoleMems(const char *rolename, Oid roleid,
 			simple_heap_insert(pg_authmem_rel, tuple);
 			CatalogUpdateIndexes(pg_authmem_rel, tuple);
 		}
+
+		/* CCI after each change, in case there are duplicates in list */
+		CommandCounterIncrement();
 	}
 
 	/*
@@ -1074,6 +1219,9 @@ AddRoleMems(const char *rolename, Oid roleid,
  * memberNames: list of names of roles to del (used only for error messages)
  * memberIds: OIDs of roles to del
  * admin_opt: remove admin option only?
+ *
+ * Note: caller is responsible for holding ExclusiveLock on pg_authid,
+ * and for calling auth_file_update_needed().
  */
 static void
 DelRoleMems(const char *rolename, Oid roleid,
@@ -1092,40 +1240,29 @@ DelRoleMems(const char *rolename, Oid roleid,
 		return;
 
 	/*
-	 * Check permissions: must be superuser or have admin option on the
-	 * role to be changed.
-	 *
-	 * XXX: The admin option is not considered to be inherited through
-	 * multiple roles, unlike normal 'is_member_of_role' privilege checks.
+	 * Check permissions: must have createrole or admin option on the
+	 * role to be changed.  To mess with a superuser role, you gotta
+	 * be superuser.
 	 */
-	if (!superuser()) 
+	if (superuser_arg(roleid))
 	{
-		HeapTuple authmem_chk_tuple;
-		Form_pg_auth_members authmem_chk;
-
-		authmem_chk_tuple = SearchSysCache(AUTHMEMROLEMEM,
-										   ObjectIdGetDatum(roleid),
-										   ObjectIdGetDatum(GetUserId()),
-										   0, 0);
-		if (!HeapTupleIsValid(authmem_chk_tuple))
+		if (!superuser())
 			ereport(ERROR,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-					 errmsg("must be superuser or have admin option on role \"%s\"",
-							rolename)));
-
-		authmem_chk = (Form_pg_auth_members) GETSTRUCT(authmem_chk_tuple);
-		if (!authmem_chk->admin_option)
+					 errmsg("must be superuser to alter superusers")));
+	}
+	else
+	{
+		if (!have_createrole_privilege() &&
+			!is_admin_of_role(GetUserId(), roleid))
 			ereport(ERROR,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-					 errmsg("must be superuser or have admin option on role \"%s\"",
+					 errmsg("must have admin option on role \"%s\"",
 							rolename)));
-		ReleaseSysCache(authmem_chk_tuple);
 	}
 
-	/*
-	 * Secure exclusive lock to protect our update of the flat auth file.
-	 */
-	pg_authmem_rel = heap_open(AuthMemRelationId, ExclusiveLock);
+	/* We need only regular writer's lock on pg_auth_members */
+	pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock);
 	pg_authmem_dsc = RelationGetDescr(pg_authmem_rel);
 
 	forboth(nameitem, memberNames, iditem, memberIds)
@@ -1178,6 +1315,9 @@ DelRoleMems(const char *rolename, Oid roleid,
 		}
 
 		ReleaseSysCache(authmem_tuple);
+
+		/* CCI after each change, in case there are duplicates in list */
+		CommandCounterIncrement();
 	}
 
 	/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7f18b12b92b0308536bf7a2bb80958dadcf47b8b..8afc948a07a88ccf049837124ef7cb4246f7784a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -11,7 +11,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.500 2005/06/28 19:51:22 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.501 2005/06/29 20:34:13 tgl Exp $
  *
  * HISTORY
  *	  AUTHOR			DATE			MAJOR EVENT
@@ -376,7 +376,7 @@ static void doNegateFloat(Value *v);
 	MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE
 
 	NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NOCREATEDB
-	NOCREATEROLE NOCREATEUSER NOLOGIN_P NONE NOT NOTHING NOTIFY
+	NOCREATEROLE NOCREATEUSER NOLOGIN_P NONE NOSUPERUSER NOT NOTHING NOTIFY
 	NOTNULL NOWAIT NULL_P NULLIF NUMERIC
 
 	OBJECT_P OF OFF OFFSET OIDS OLD ON ONLY OPERATOR OPTION OR
@@ -395,7 +395,7 @@ static void doNegateFloat(Value *v);
 	SAVEPOINT SCHEMA SCROLL SECOND_P SECURITY SELECT SEQUENCE
 	SERIALIZABLE SESSION SESSION_USER SET SETOF SHARE
 	SHOW SIMILAR SIMPLE SMALLINT SOME STABLE START STATEMENT
-	STATISTICS STDIN STDOUT STORAGE STRICT_P SUBSTRING SYMMETRIC
+	STATISTICS STDIN STDOUT STORAGE STRICT_P SUBSTRING SUPERUSER_P SYMMETRIC
 	SYSID SYSTEM_P
 
 	TABLE TABLESPACE TEMP TEMPLATE TEMPORARY THEN TIME TIMESTAMP
@@ -622,6 +622,14 @@ OptRoleElem:
 				{
 					$$ = makeDefElem("sysid", (Node *)makeInteger($2));
 				}
+			| SUPERUSER_P
+				{
+					$$ = makeDefElem("superuser", (Node *)makeInteger(TRUE));
+				}
+			| NOSUPERUSER
+				{
+					$$ = makeDefElem("superuser", (Node *)makeInteger(FALSE));
+				}
 			| CREATEDB
 				{
 					$$ = makeDefElem("createdb", (Node *)makeInteger(TRUE));
@@ -634,21 +642,22 @@ OptRoleElem:
 				{
 					$$ = makeDefElem("createrole", (Node *)makeInteger(TRUE));
 				}
-			| CREATEUSER
+			| NOCREATEROLE
 				{
-					$$ = makeDefElem("createrole", (Node *)makeInteger(TRUE));
+					$$ = makeDefElem("createrole", (Node *)makeInteger(FALSE));
 				}
-			| LOGIN_P
+			| CREATEUSER
 				{
-					$$ = makeDefElem("canlogin", (Node *)makeInteger(TRUE));
+					/* For backwards compatibility, synonym for SUPERUSER */
+					$$ = makeDefElem("superuser", (Node *)makeInteger(TRUE));
 				}
-			| NOCREATEROLE
+			| NOCREATEUSER
 				{
-					$$ = makeDefElem("createrole", (Node *)makeInteger(FALSE));
+					$$ = makeDefElem("superuser", (Node *)makeInteger(FALSE));
 				}
-			| NOCREATEUSER
+			| LOGIN_P
 				{
-					$$ = makeDefElem("createrole", (Node *)makeInteger(FALSE));
+					$$ = makeDefElem("canlogin", (Node *)makeInteger(TRUE));
 				}
 			| NOLOGIN_P
 				{
@@ -8013,6 +8022,7 @@ unreserved_keyword:
 			| NOCREATEROLE
 			| NOCREATEUSER
 			| NOLOGIN_P
+			| NOSUPERUSER
 			| NOTHING
 			| NOTIFY
 			| NOWAIT
@@ -8068,6 +8078,7 @@ unreserved_keyword:
 			| STDIN
 			| STDOUT
 			| STORAGE
+			| SUPERUSER_P
 			| SYSID
 			| SYSTEM_P
 			| STRICT_P
diff --git a/src/backend/parser/keywords.c b/src/backend/parser/keywords.c
index 6fcb97675f629f98c40c791c83811b190cee07d3..726e7fc01e36744d89b721f5a696f29d066d08c8 100644
--- a/src/backend/parser/keywords.c
+++ b/src/backend/parser/keywords.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/parser/keywords.c,v 1.161 2005/06/28 19:51:22 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/parser/keywords.c,v 1.162 2005/06/29 20:34:14 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -221,6 +221,7 @@ static const ScanKeyword ScanKeywords[] = {
 	{"nocreateuser", NOCREATEUSER},
 	{"nologin", NOLOGIN_P},
 	{"none", NONE},
+	{"nosuperuser", NOSUPERUSER},
 	{"not", NOT},
 	{"nothing", NOTHING},
 	{"notify", NOTIFY},
@@ -308,6 +309,7 @@ static const ScanKeyword ScanKeywords[] = {
 	{"storage", STORAGE},
 	{"strict", STRICT_P},
 	{"substring", SUBSTRING},
+	{"superuser", SUPERUSER_P},
 	{"symmetric", SYMMETRIC},
 	{"sysid", SYSID},
 	{"system", SYSTEM_P},
diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c
index 2fe7fd39bcac787135d3c34fc7fefc620d890039..5b65099696e70c3c9880b22c21852b3449d9054c 100644
--- a/src/backend/utils/adt/acl.c
+++ b/src/backend/utils/adt/acl.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/utils/adt/acl.c,v 1.116 2005/06/28 19:51:23 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/utils/adt/acl.c,v 1.117 2005/06/29 20:34:14 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -32,18 +32,23 @@
 #include "utils/syscache.h"
 
 
-#define ACL_IDTYPE_ROLE_KEYWORD	"role"
-
-/* The rolmemcache is a possibly-empty list of role OIDs.
- * rolmemRole is the Role for which the cache was generated.
- * In the event of a Role change the cache will be regenerated.
+/*
+ * We frequently need to test whether a given role is a member of some other
+ * role.  In most of these tests the "given role" is the same, namely the
+ * active current user.  So we can optimize it by keeping a cached list of
+ * all the roles the "given role" is a member of, directly or indirectly.
+ * The cache is flushed whenever we detect a change in pg_auth_members.
+ *
+ * Possibly this mechanism should be generalized to allow caching membership
+ * info for more than one role?
+ *
+ * cached_role is the role OID the cache is for.
+ * cached_memberships is an OID list of roles that cached_role is a member of.
+ * The cache is valid if cached_role is not InvalidOid.
  */
-static List	*rolmemcache = NIL;
-static Oid	rolmemRole = InvalidOid;
+static Oid	cached_role = InvalidOid;
+static List	*cached_memberships = NIL;
 
-/* rolmemcache and rolmemRole only valid when 
- * rolmemcacheValid is true */
-static bool rolmemcacheValid = false;
 
 static const char *getid(const char *s, char *n);
 static void putid(char *p, const char *s);
@@ -70,8 +75,7 @@ static AclMode convert_schema_priv_string(text *priv_type_text);
 static Oid	convert_tablespace_name(text *tablespacename);
 static AclMode convert_tablespace_priv_string(text *priv_type_text);
 
-static void RolMemCacheCallback(Datum arg, Oid relid);
-static void recomputeRolMemCache(Oid roleid);
+static void RoleMembershipCacheCallback(Datum arg, Oid relid);
 
 
 /*
@@ -134,7 +138,7 @@ getid(const char *s, char *n)
 }
 
 /*
- * Write a user or group Name at *p, adding double quotes if needed.
+ * Write a role name at *p, adding double quotes if needed.
  * There must be at least (2*NAMEDATALEN)+2 bytes available at *p.
  * This needs to be kept in sync with copyAclUserName in pg_dump/dumputils.c
  */
@@ -175,6 +179,9 @@ putid(char *p, const char *s)
  *		between the optional id type keyword (group|user) and the actual
  *		ACL specification.
  *
+ *		The group|user decoration is unnecessary in the roles world,
+ *		but we still accept it for backward compatibility.
+ *
  *		This routine is called by the parser as well as aclitemin(), hence
  *		the added generality.
  *
@@ -202,17 +209,17 @@ aclparse(const char *s, AclItem *aip)
 	if (*s != '=')
 	{
 		/* we just read a keyword, not a name */
-		if (strcmp(name, ACL_IDTYPE_ROLE_KEYWORD) != 0)
+		if (strcmp(name, "group") != 0 && strcmp(name, "user") != 0)
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
 					 errmsg("unrecognized key word: \"%s\"", name),
-				errhint("ACL key word must be \"role\".")));
+				errhint("ACL key word must be \"group\" or \"user\".")));
 		s = getid(s, name);		/* move s to the name beyond the keyword */
 		if (name[0] == '\0')
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
 					 errmsg("missing name"),
-					 errhint("A name must follow the \"role\" key word.")));
+					 errhint("A name must follow the \"group\" or \"user\" key word.")));
 	}
 
 	if (*s != '=')
@@ -378,7 +385,7 @@ aclitemout(PG_FUNCTION_ARGS)
 	HeapTuple	htup;
 	unsigned	i;
 
-	out = palloc(strlen("group =/") +
+	out = palloc(strlen("=/") +
 				 2 * N_ACL_RIGHTS +
 				 2 * (2 * NAMEDATALEN + 2) +
 				 1);
@@ -432,10 +439,6 @@ aclitemout(PG_FUNCTION_ARGS)
 		sprintf(p, "%u", aip->ai_grantor);
 	}
 
-	while (*p)
-		++p;
-	*p = '\0';
-
 	PG_RETURN_CSTRING(out);
 }
 
@@ -968,13 +971,13 @@ restart:
  *
  * To determine exactly which of a set of privileges are held:
  *		heldprivs = aclmask(acl, roleid, ownerId, privs, ACLMASK_ALL);
- *
  */
 AclMode
 aclmask(const Acl *acl, Oid roleid, Oid ownerId,
 		AclMode mask, AclMaskHow how)
 {
 	AclMode		result;
+	AclMode		remaining;
 	AclItem    *aidat;
 	int			i,
 				num;
@@ -993,7 +996,7 @@ aclmask(const Acl *acl, Oid roleid, Oid ownerId,
 	result = 0;
 
 	/* Owner always implicitly has all grant options */
-	if (is_member_of_role(roleid,ownerId))
+	if (is_member_of_role(roleid, ownerId))
 	{
 		result = mask & ACLITEM_ALL_GOPTION_BITS;
 		if (result == mask)
@@ -1004,15 +1007,14 @@ aclmask(const Acl *acl, Oid roleid, Oid ownerId,
 	aidat = ACL_DAT(acl);
 
 	/*
-	 * Check privileges granted directly to role, indirectly
-	 * via role membership or to public
+	 * Check privileges granted directly to user or to public
 	 */
 	for (i = 0; i < num; i++)
 	{
 		AclItem    *aidata = &aidat[i];
 
 		if (aidata->ai_grantee == ACL_ID_PUBLIC ||
-			is_member_of_role(roleid, aidata->ai_grantee))
+			aidata->ai_grantee == roleid)
 		{
 			result |= (aidata->ai_privs & mask);
 			if ((how == ACLMASK_ALL) ? (result == mask) : (result != 0))
@@ -1020,24 +1022,33 @@ aclmask(const Acl *acl, Oid roleid, Oid ownerId,
 		}
 	}
 
-	return result;
-}
-
+	/*
+	 * Check privileges granted indirectly via roles.
+	 * We do this in a separate pass to minimize expensive indirect
+	 * membership tests.  In particular, it's worth testing whether
+	 * a given ACL entry grants any privileges still of interest before
+	 * we perform the is_member test.
+	 */
+	remaining = (mask & ~result);
+	for (i = 0; i < num; i++)
+	{
+		AclItem    *aidata = &aidat[i];
 
-/*
- * Is member a member of role?
- * relmemcache includes the role itself too
- */
-bool
-is_member_of_role(Oid member, Oid role)
-{
-	/* Fast path for simple case */
-	if (member == role)
-		return true;
+		if (aidata->ai_grantee == ACL_ID_PUBLIC ||
+			aidata->ai_grantee == roleid)
+			continue;			/* already checked it */
 
-	recomputeRolMemCache(member);
+		if ((aidata->ai_privs & remaining) &&
+			is_member_of_role(roleid, aidata->ai_grantee))
+		{
+			result |= (aidata->ai_privs & mask);
+			if ((how == ACLMASK_ALL) ? (result == mask) : (result != 0))
+				return result;
+			remaining = (mask & ~result);
+		}
+	}
 
-	return list_member_oid(rolmemcache, role);
+	return result;
 }
 
 
@@ -1092,20 +1103,20 @@ makeaclitem(PG_FUNCTION_ARGS)
 	Oid 		grantor = PG_GETARG_OID(1);
 	text	   *privtext = PG_GETARG_TEXT_P(2);
 	bool		goption = PG_GETARG_BOOL(3);
-	AclItem    *aclitem;
+	AclItem    *result;
 	AclMode		priv;
 
 	priv = convert_priv_string(privtext);
 
-	aclitem = (AclItem *) palloc(sizeof(AclItem));
+	result = (AclItem *) palloc(sizeof(AclItem));
 
-	aclitem->ai_grantee = grantee;
-	aclitem->ai_grantor = grantor;
+	result->ai_grantee = grantee;
+	result->ai_grantor = grantor;
 
-	ACLITEM_SET_PRIVS_GOPTIONS(*aclitem, priv,
+	ACLITEM_SET_PRIVS_GOPTIONS(*result, priv,
 							   (goption ? priv : ACL_NO_RIGHTS));
 
-	PG_RETURN_ACLITEM_P(aclitem);
+	PG_RETURN_ACLITEM_P(result);
 }
 
 static AclMode
@@ -1175,7 +1186,6 @@ has_table_privilege_name_name(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*rolename));
-
 	tableoid = convert_table_name(tablename);
 	mode = convert_table_priv_string(priv_type_text);
 
@@ -1225,7 +1235,6 @@ has_table_privilege_name_id(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	mode = convert_table_priv_string(priv_type_text);
 
 	aclresult = pg_class_aclcheck(tableoid, roleid, mode);
@@ -1401,7 +1410,6 @@ has_database_privilege_name_name(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	databaseoid = convert_database_name(databasename);
 	mode = convert_database_priv_string(priv_type_text);
 
@@ -1451,7 +1459,6 @@ has_database_privilege_name_id(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	mode = convert_database_priv_string(priv_type_text);
 
 	aclresult = pg_database_aclcheck(databaseoid, roleid, mode);
@@ -1615,7 +1622,6 @@ has_function_privilege_name_name(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	functionoid = convert_function_name(functionname);
 	mode = convert_function_priv_string(priv_type_text);
 
@@ -1665,7 +1671,6 @@ has_function_privilege_name_id(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	mode = convert_function_priv_string(priv_type_text);
 
 	aclresult = pg_proc_aclcheck(functionoid, roleid, mode);
@@ -1821,7 +1826,6 @@ has_language_privilege_name_name(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	languageoid = convert_language_name(languagename);
 	mode = convert_language_priv_string(priv_type_text);
 
@@ -1871,7 +1875,6 @@ has_language_privilege_name_id(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	mode = convert_language_priv_string(priv_type_text);
 
 	aclresult = pg_language_aclcheck(languageoid, roleid, mode);
@@ -2027,7 +2030,6 @@ has_schema_privilege_name_name(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	schemaoid = convert_schema_name(schemaname);
 	mode = convert_schema_priv_string(priv_type_text);
 
@@ -2077,7 +2079,6 @@ has_schema_privilege_name_id(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	mode = convert_schema_priv_string(priv_type_text);
 
 	aclresult = pg_namespace_aclcheck(schemaoid, roleid, mode);
@@ -2237,7 +2238,6 @@ has_tablespace_privilege_name_name(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	tablespaceoid = convert_tablespace_name(tablespacename);
 	mode = convert_tablespace_priv_string(priv_type_text);
 
@@ -2287,7 +2287,6 @@ has_tablespace_privilege_name_id(PG_FUNCTION_ARGS)
 	AclResult	aclresult;
 
 	roleid = get_roleid_checked(NameStr(*username));
-
 	mode = convert_tablespace_priv_string(priv_type_text);
 
 	aclresult = pg_tablespace_aclcheck(tablespaceoid, roleid, mode);
@@ -2413,6 +2412,9 @@ convert_tablespace_priv_string(text *priv_type_text)
 	return ACL_NO_RIGHTS;		/* keep compiler quiet */
 }
 
+/*
+ * initialization function (called by InitPostgres)
+ */
 void
 initialize_acl(void)
 {
@@ -2423,99 +2425,158 @@ initialize_acl(void)
 		 * invalidation of pg_auth_members rows
 		 */
 		CacheRegisterSyscacheCallback(AUTHMEMROLEMEM,
-									  RolMemCacheCallback,
+									  RoleMembershipCacheCallback,
 									  (Datum) 0);
-
-	    /* Force role/member cache to be recomputed on next use */
-	    rolmemcacheValid = false;
 	}
 }
 
 /*
- * RolMemCacheCallback
+ * RoleMembershipCacheCallback
  * 		Syscache inval callback function
  */
 static void
-RolMemCacheCallback(Datum arg, Oid relid)
+RoleMembershipCacheCallback(Datum arg, Oid relid)
 {
-	    /* Force role/member cache to be recomputed on next use */
-	    rolmemcacheValid = false;
+	/* Force membership cache to be recomputed on next use */
+	cached_role = InvalidOid;
 }
 
 
-/* 
- * recomputeRolMemCache - recompute the role/member cache if needed 
+/*
+ * Is member a member of role (directly or indirectly)?
+ *
+ * Since indirect membership testing is relatively expensive, we cache
+ * a list of memberships.
  */
-static void
-recomputeRolMemCache(Oid roleid)
-{
-	int 		i;
-	Oid			memberOid;
-	List 		*roles_list_hunt = NIL;
-	List		*roles_list = NIL;
-	List		*newrolmemcache;
-	CatCList	*memlist;
+bool
+is_member_of_role(Oid member, Oid role)
+{
+	List		*roles_list;
+	ListCell	*l;
+	List		*new_cached_memberships;
 	MemoryContext	oldctx;
 
-	/* Do nothing if rolmemcache is already valid */
-	if (rolmemcacheValid && rolmemRole == roleid)
-		return;
+	/* Fast path for simple case */
+	if (member == role)
+		return true;
 
-	if (rolmemRole != roleid)
-		rolmemcacheValid = false;
+	/* If cache is already valid, just use the list */
+	if (OidIsValid(cached_role) && cached_role == member)
+		return list_member_oid(cached_memberships, role);
 
 	/* 
-	 * Find all the roles which this role is a member of,
-	 * including multi-level recursion
+	 * Find all the roles that member is a member of,
+	 * including multi-level recursion.  The role itself will always
+	 * be the first element of the resulting list.
+	 *
+	 * Each element of the list is scanned to see if it adds any indirect
+	 * memberships.  We can use a single list as both the record of
+	 * already-found memberships and the agenda of roles yet to be scanned.
+	 * This is a bit tricky but works because the foreach() macro doesn't
+	 * fetch the next list element until the bottom of the loop.
 	 */
+	roles_list = list_make1_oid(member);
 
-	/*
-	 * Include the current role itself to simplify checks
-	 * later on, also should be at the head so lookup should
-	 * be fast.
-	 */
-	roles_list = lappend_oid(roles_list, roleid);
-	roles_list_hunt = lappend_oid(roles_list_hunt, roleid);
-	
-	while (roles_list_hunt)
+	foreach(l, roles_list)
 	{
-		memberOid = linitial_oid(roles_list_hunt);
+		Oid		memberid = lfirst_oid(l);
+		CatCList	*memlist;
+		int 		i;
+
+		/* Find roles that memberid is directly a member of */
 		memlist = SearchSysCacheList(AUTHMEMMEMROLE, 1,
-									 ObjectIdGetDatum(memberOid),
+									 ObjectIdGetDatum(memberid),
 									 0, 0, 0);
-		for (i = 0; i < memlist->n_members; i++) {
-			HeapTuple roletup = &memlist->members[i]->tuple;
-			Form_pg_auth_members rolemem = (Form_pg_auth_members) GETSTRUCT(roletup);
-
-			if (!list_member_oid(roles_list,rolemem->roleid)) {
-				roles_list = lappend_oid(roles_list,rolemem->roleid);
-				roles_list_hunt = lappend_oid(roles_list_hunt,rolemem->roleid);
-			}
+		for (i = 0; i < memlist->n_members; i++)
+		{
+			HeapTuple	tup = &memlist->members[i]->tuple;
+			Oid		otherid = ((Form_pg_auth_members) GETSTRUCT(tup))->roleid;
+
+			/*
+			 * Even though there shouldn't be any loops in the membership
+			 * graph, we must test for having already seen this role.
+			 * It is legal for instance to have both A->B and A->C->B.
+			 */
+			if (!list_member_oid(roles_list, otherid))
+				roles_list = lappend_oid(roles_list, otherid);
 		}
-		roles_list_hunt = list_delete_oid(roles_list_hunt, memberOid);
 		ReleaseSysCacheList(memlist);
 	}
 
 	/*
-	 * Now that we've built the list of role Oids this
-	 * role is a member of, save it in permanent storage
+	 * Copy the completed list into TopMemoryContext so it will persist.
 	 */
 	oldctx = MemoryContextSwitchTo(TopMemoryContext);
-	newrolmemcache = list_copy(roles_list);
+	new_cached_memberships = list_copy(roles_list);
 	MemoryContextSwitchTo(oldctx);
+	list_free(roles_list);
 
 	/*
 	 * Now safe to assign to state variable
 	 */
-	list_free(rolmemcache);
-	rolmemcache = newrolmemcache;
+	cached_role = InvalidOid;	/* just paranoia */
+	list_free(cached_memberships);
+	cached_memberships = new_cached_memberships;
+	cached_role = member;
 
-	/*
-	 * Mark as valid
+	/* And now we can return the answer */
+	return list_member_oid(cached_memberships, role);
+}
+
+
+/*
+ * Is member an admin of role (directly or indirectly)?  That is, is it
+ * a member WITH ADMIN OPTION?
+ *
+ * We could cache the result as for is_member_of_role, but currently this
+ * is not used in any performance-critical paths, so we don't.
+ */
+bool
+is_admin_of_role(Oid member, Oid role)
+{
+	bool		result = false;
+	List		*roles_list;
+	ListCell	*l;
+
+	/* 
+	 * Find all the roles that member is a member of,
+	 * including multi-level recursion.  We build a list in the same way
+	 * that is_member_of_role does to track visited and unvisited roles.
 	 */
-	rolmemRole = roleid;
-	rolmemcacheValid = true;
+	roles_list = list_make1_oid(member);
+
+	foreach(l, roles_list)
+	{
+		Oid		memberid = lfirst_oid(l);
+		CatCList	*memlist;
+		int 		i;
+
+		/* Find roles that memberid is directly a member of */
+		memlist = SearchSysCacheList(AUTHMEMMEMROLE, 1,
+									 ObjectIdGetDatum(memberid),
+									 0, 0, 0);
+		for (i = 0; i < memlist->n_members; i++)
+		{
+			HeapTuple	tup = &memlist->members[i]->tuple;
+			Oid		otherid = ((Form_pg_auth_members) GETSTRUCT(tup))->roleid;
+
+			if (otherid == role &&
+				((Form_pg_auth_members) GETSTRUCT(tup))->admin_option)
+			{
+				/* Found what we came for, so can stop searching */
+				result = true;
+				break;
+			}
+
+			if (!list_member_oid(roles_list, otherid))
+				roles_list = lappend_oid(roles_list, otherid);
+		}
+		ReleaseSysCacheList(memlist);
+		if (result)
+			break;
+	}
 
-	/* Clean up */
 	list_free(roles_list);
+
+	return result;
 }
diff --git a/src/backend/utils/init/flatfiles.c b/src/backend/utils/init/flatfiles.c
index 19d6b69d9833bb96474b0e75b2bc4ae1b530e530..2343c01b54882155e5af1910afec7fa90334b846 100644
--- a/src/backend/utils/init/flatfiles.c
+++ b/src/backend/utils/init/flatfiles.c
@@ -23,7 +23,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/utils/init/flatfiles.c,v 1.10 2005/06/28 22:16:45 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/init/flatfiles.c,v 1.11 2005/06/29 20:34:15 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -295,7 +295,7 @@ write_database_file(Relation drel)
  *		"rolename" "password" "validuntil" "memberof" "memberof" ...
  * Only roles that are marked rolcanlogin are entered into the auth file.
  * Each role's line lists all the roles (groups) of which it is directly
- * or indirectly a member.
+ * or indirectly a member, except for itself.
  *
  * The postmaster expects the file to be sorted by rolename.  There is not
  * any special ordering of the membership lists.
@@ -538,28 +538,31 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
 		qsort(auth_info, total_roles, sizeof(auth_entry), oid_compar);
 		qsort(authmem_info, total_mem, sizeof(authmem_entry), mem_compar);
 		/*
-		 * For each role, find what it belongs to.  We can skip this for
-		 * non-login roles.
+		 * For each role, find what it belongs to.
 		 */
 		for (curr_role = 0; curr_role < total_roles; curr_role++)
 		{
-			List	*roles_list = NIL;
+			List	*roles_list;
 			List	*roles_names_list = NIL;
-			List	*roles_list_hunt;
 			ListCell *mem;
 
+			/* We can skip this for non-login roles */
 			if (!auth_info[curr_role].rolcanlogin)
 				continue;
 
-			roles_list_hunt = list_make1_oid(auth_info[curr_role].roleid);
-			while (roles_list_hunt)
+			/*
+			 * This search algorithm is the same as in is_member_of_role;
+			 * we are just working with a different input data structure.
+			 */
+			roles_list = list_make1_oid(auth_info[curr_role].roleid);
+
+			foreach(mem, roles_list)
 			{
 				authmem_entry key;
 				authmem_entry *found_mem;
-				int		first_found, last_found, curr_mem;
+				int		first_found, last_found, i;
 
-				key.memberid = linitial_oid(roles_list_hunt);
-				roles_list_hunt = list_delete_first(roles_list_hunt);
+				key.memberid = lfirst_oid(mem);
 				found_mem = bsearch(&key, authmem_info, total_mem,
 									sizeof(authmem_entry), mem_compar);
 				if (!found_mem)
@@ -577,26 +580,25 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
 					   mem_compar(&key, &authmem_info[last_found + 1]) == 0)
 					last_found++;
 				/*
-				 * Now add all the new roles to roles_list, as well
-				 * as to our list of what remains to be searched.
+				 * Now add all the new roles to roles_list.
 				 */
-				for (curr_mem = first_found; curr_mem <= last_found; curr_mem++)
+				for (i = first_found; i <= last_found; i++)
 				{
-					Oid	rolid = authmem_info[curr_mem].roleid;
+					Oid	rolid = authmem_info[i].roleid;
 
 					if (!list_member_oid(roles_list, rolid))
-					{
 						roles_list = lappend_oid(roles_list, rolid);
-						roles_list_hunt = lappend_oid(roles_list_hunt, rolid);
-					}
 				}
 			}
 
 			/*
 			 * Convert list of role Oids to list of role names.
 			 * We must do this before re-sorting auth_info.
+			 *
+			 * We skip the first list element (curr_role itself) since there
+			 * is no point in writing that a role is a member of itself.
 			 */
-			foreach(mem, roles_list)
+			for_each_cell(mem, lnext(list_head(roles_list)))
 			{
 				auth_entry key_auth;
 				auth_entry *found_role;
@@ -604,10 +606,12 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
 				key_auth.roleid = lfirst_oid(mem);
 				found_role = bsearch(&key_auth, auth_info, total_roles,
 									 sizeof(auth_entry), oid_compar);
-				roles_names_list = lappend(roles_names_list,
-										   found_role->rolname);
+				if (found_role)			/* paranoia */
+					roles_names_list = lappend(roles_names_list,
+											   found_role->rolname);
 			}
 			auth_info[curr_role].member_of = roles_names_list;
+			list_free(roles_list);
 		}
 	}
 
diff --git a/src/include/utils/acl.h b/src/include/utils/acl.h
index 82e004794bcfd380bad4a9f66fa5b316580d932b..0b560e062c13836b98847e747740f2329124d42a 100644
--- a/src/include/utils/acl.h
+++ b/src/include/utils/acl.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/utils/acl.h,v 1.79 2005/06/28 19:51:25 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/utils/acl.h,v 1.80 2005/06/29 20:34:15 tgl Exp $
  *
  * NOTES
  *	  An ACL array is simply an array of AclItems, representing the union
@@ -210,6 +210,7 @@ extern AclMode aclmask(const Acl *acl, Oid roleid, Oid ownerId,
 		AclMode mask, AclMaskHow how);
 
 extern bool is_member_of_role(Oid member, Oid role);
+extern bool is_admin_of_role(Oid member, Oid role);
 
 extern void initialize_acl(void);