diff --git a/contrib/userlock/user_locks.c b/contrib/userlock/user_locks.c
index f2cd7b2f76de4bf5430937ba2c94c13b6fe2ed57..3dee92ea3168834dd600a6e9bf59cbae1e4c3c61 100644
--- a/contrib/userlock/user_locks.c
+++ b/contrib/userlock/user_locks.c
@@ -75,7 +75,7 @@ user_write_unlock_oid(Oid oid)
 int
 user_unlock_all(void)
 {
-	return LockReleaseAll(USER_LOCKMETHOD, MyProc, true);
+	return LockReleaseAll(USER_LOCKMETHOD, true);
 }
 
 /* end of file */
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index f3949cc72feb9c6e29f987aee90e9cc26ec6c77a..d8489ab12111c74d083ea06faa64853fb3cb9d50 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -1,4 +1,4 @@
-$PostgreSQL: pgsql/src/backend/storage/lmgr/README,v 1.14 2003/11/29 19:51:56 pgsql Exp $
+$PostgreSQL: pgsql/src/backend/storage/lmgr/README,v 1.15 2004/08/27 17:07:41 tgl Exp $
 
 
 LOCKING OVERVIEW
@@ -47,12 +47,6 @@ The rest of this README file discusses the regular lock manager in detail.
 
 LOCK DATA STRUCTURES
 
-There are two fundamental lock structures: the per-lockable-object LOCK
-struct, and the per-lock PROCLOCK struct.  A LOCK object exists
-for each lockable object that currently has locks held or requested on it.
-A PROCLOCK struct exists for each transaction that is holding or requesting
-lock(s) on each LOCK object.
-
 Lock methods describe the overall locking behavior.  Currently there are
 two lock methods: DEFAULT and USER.  (USER locks are non-blocking.)
 
@@ -60,6 +54,20 @@ Lock modes describe the type of the lock (read/write or shared/exclusive).
 See src/tools/backend/index.html and src/include/storage/lock.h for more
 details.
 
+There are two fundamental lock structures in shared memory: the
+per-lockable-object LOCK struct, and the per-lock-and-requestor PROCLOCK
+struct.  A LOCK object exists for each lockable object that currently has
+locks held or requested on it.  A PROCLOCK struct exists for each transaction
+that is holding or requesting lock(s) on each LOCK object.
+
+In addition to these, each backend maintains an unshared LOCALLOCK structure
+for each lockable object and lock mode that it is currently holding or
+requesting.  The shared lock structures only allow a single lock grant to
+be made per lockable object/lock mode/transaction.  Internally to a backend,
+however, the same lock may be requested and perhaps released multiple times
+in a transaction.  The internal request counts are held in LOCALLOCK so that
+the shared LockMgrLock need not be obtained to alter them.
+
 ---------------------------------------------------------------------------
 
 The lock manager's LOCK objects contain:
@@ -103,7 +111,7 @@ waitMask -
     This bitmask shows the types of locks being waited for.  Bit i of waitMask
     is 1 if and only if requested[i] > granted[i].
 
-lockHolders -
+procLocks -
     This is a shared memory queue of all the PROCLOCK structs associated with
     the lock object.  Note that both granted and waiting PROCLOCKs are in this
     list (indeed, the same PROCLOCK might have some already-granted locks and
@@ -120,7 +128,10 @@ nRequested -
     acquired.  The count includes attempts by processes which were put
     to sleep due to conflicts.  It also counts the same backend twice
     if, for example, a backend process first acquires a read and then
-    acquires a write, or acquires a read lock twice.
+    acquires a write, or acquires the lock under two different transaction
+    IDs.  (But multiple acquisitions of the same lock/lock mode under the
+    same transaction ID are not multiply counted here; they are recorded
+    only in the backend's LOCALLOCK structure.)
 
 requested -
     Keeps a count of how many locks of each type have been attempted.  Only
@@ -130,9 +141,8 @@ requested -
 
 nGranted -
     Keeps count of how many times this lock has been successfully acquired.
-    This count does not include attempts that are waiting due to conflicts,
-    but can count the same backend twice (e.g. a read then a write -- since
-    its the same transaction this won't cause a conflict).
+    This count does not include attempts that are waiting due to conflicts.
+    Otherwise the counting rules are the same as for nRequested.
 
 granted -
     Keeps count of how many locks of each type are currently held.  Once again
@@ -164,18 +174,17 @@ tag -
         if the PROCLOCK is for session-level locking.
 
     Note that this structure will support multiple transactions running
-    concurrently in one backend, which may be handy if we someday decide
-    to support nested transactions.  Currently, the XID field is only needed
-    to distinguish per-transaction locks from session locks.  User locks
-    are always session locks, and we also use session locks for multi-
-    transaction operations like VACUUM.
-
-holding -
-    The number of successfully acquired locks of each type for this PROCLOCK.
-    This should be <= the corresponding granted[] value of the lock object!
-
-nHolding -
-    Sum of the holding[] array.
+    concurrently in one backend.  Currently we do not use it for that
+    purpose: subtransactions acquire locks in the name of their top parent
+    transaction, to simplify reassigning lock ownership at subtransaction end.
+    So the XID field is really only needed to distinguish per-transaction
+    locks from session locks.  User locks are always session locks, and we
+    also use session locks for multi-transaction operations like VACUUM.
+
+holdMask -
+    A bitmask for the lock types successfully acquired by this PROCLOCK.
+    This should be a subset of the LOCK object's grantMask, and also a
+    subset of the PGPROC object's heldLocks mask.
 
 lockLink -
     List link for shared memory queue of all the PROCLOCK objects for the
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
index edfdbcbee74863edd2ccf1a5703ab0260603c5de..c68602bf8679c5ac4cc5e289fbf65b3e26a6fa86 100644
--- a/src/backend/storage/lmgr/deadlock.c
+++ b/src/backend/storage/lmgr/deadlock.c
@@ -12,7 +12,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/deadlock.c,v 1.29 2004/07/21 22:31:22 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/deadlock.c,v 1.30 2004/08/27 17:07:41 tgl Exp $
  *
  *	Interface:
  *
@@ -427,7 +427,7 @@ FindLockCycleRecurse(PGPROC *checkProc,
 	PGPROC	   *proc;
 	LOCK	   *lock;
 	PROCLOCK   *proclock;
-	SHM_QUEUE  *lockHolders;
+	SHM_QUEUE  *procLocks;
 	LockMethod	lockMethodTable;
 	PROC_QUEUE *waitQueue;
 	int			queue_size;
@@ -483,9 +483,9 @@ FindLockCycleRecurse(PGPROC *checkProc,
 	 * Scan for procs that already hold conflicting locks.	These are
 	 * "hard" edges in the waits-for graph.
 	 */
-	lockHolders = &(lock->lockHolders);
+	procLocks = &(lock->procLocks);
 
-	proclock = (PROCLOCK *) SHMQueueNext(lockHolders, lockHolders,
+	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
 										 offsetof(PROCLOCK, lockLink));
 
 	while (proclock)
@@ -497,8 +497,8 @@ FindLockCycleRecurse(PGPROC *checkProc,
 		{
 			for (lm = 1; lm <= numLockModes; lm++)
 			{
-				if (proclock->holding[lm] > 0 &&
-					((1 << lm) & conflictMask) != 0)
+				if ((proclock->holdMask & LOCKBIT_ON(lm)) &&
+					(conflictMask & LOCKBIT_ON(lm)))
 				{
 					/* This proc hard-blocks checkProc */
 					if (FindLockCycleRecurse(proc, depth + 1,
@@ -519,7 +519,7 @@ FindLockCycleRecurse(PGPROC *checkProc,
 			}
 		}
 
-		proclock = (PROCLOCK *) SHMQueueNext(lockHolders, &proclock->lockLink,
+		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
 										   offsetof(PROCLOCK, lockLink));
 	}
 
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index e61eabb70e9b12d2ff140d5707cc9925347c5b84..2849f877678162c6f603044c08610e5ce1a28b34 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.136 2004/08/26 17:23:30 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.137 2004/08/27 17:07:41 tgl Exp $
  *
  * NOTES
  *	  Outside modules can create a lock table and acquire/release
@@ -47,12 +47,24 @@ int			max_locks_per_xact; /* set by guc.c */
 #define NLOCKENTS(maxBackends)	(max_locks_per_xact * (maxBackends))
 
 
-static int WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode,
-		   LOCK *lock, PROCLOCK *proclock);
-static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
-				 int *myHolding);
+/*
+ * map from lock method id to the lock table data structures
+ */
+static LockMethod LockMethods[MAX_LOCK_METHODS];
+static HTAB	*LockMethodLockHash[MAX_LOCK_METHODS];
+static HTAB	*LockMethodProcLockHash[MAX_LOCK_METHODS];
+static HTAB	*LockMethodLocalHash[MAX_LOCK_METHODS];
+
+/* exported so lmgr.c can initialize it */
+int		NumLockMethods;
+
 
-static char *lock_mode_names[] =
+/* private state for GrantAwaitedLock */
+static LOCALLOCK *awaitedLock;
+static ResourceOwner awaitedOwner;
+
+
+static const char * const lock_mode_names[] =
 {
 	"INVALID",
 	"AccessShareLock",
@@ -134,31 +146,27 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
 		|| (Trace_lock_table && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId == Trace_lock_table))
 		)
 		elog(LOG,
-			 "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
+			 "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%x)",
 			 where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
 			 PROCLOCK_LOCKMETHOD(*(proclockP)),
 			 proclockP->tag.proc, proclockP->tag.xid,
-			 proclockP->holding[1], proclockP->holding[2], proclockP->holding[3],
-			 proclockP->holding[4], proclockP->holding[5], proclockP->holding[6],
-			 proclockP->holding[7], proclockP->nHolding);
+			 (int) proclockP->holdMask);
 }
 
 #else							/* not LOCK_DEBUG */
 
 #define LOCK_PRINT(where, lock, type)
 #define PROCLOCK_PRINT(where, proclockP)
+
 #endif   /* not LOCK_DEBUG */
 
 
-/*
- * map from lock method id to the lock table structure
- */
-static LockMethod LockMethods[MAX_LOCK_METHODS];
-static HTAB*	LockMethodLockHash[MAX_LOCK_METHODS];
-static HTAB*	LockMethodProcLockHash[MAX_LOCK_METHODS];
-
-/* exported so lmgr.c can initialize it */
-int		NumLockMethods;
+static void RemoveLocalLock(LOCALLOCK *locallock);
+static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
+static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
+					  ResourceOwner owner);
+static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
+				 int *myHolding);
 
 
 /*
@@ -220,6 +228,7 @@ LockMethodTableInit(const char *tabName,
 					int maxBackends)
 {
 	LockMethod	newLockMethod;
+	LOCKMETHODID lockmethodid;
 	char	   *shmemName;
 	HASHCTL		info;
 	int			hash_flags;
@@ -254,40 +263,39 @@ LockMethodTableInit(const char *tabName,
 	{
 		MemSet(newLockMethod, 0, sizeof(LockMethodData));
 		newLockMethod->masterLock = LockMgrLock;
-		newLockMethod->lockmethodid = NumLockMethods;
 		LockMethodInit(newLockMethod, conflictsP, numModes);
 	}
 
 	/*
 	 * other modules refer to the lock table by a lockmethod ID
 	 */
-	LockMethods[NumLockMethods] = newLockMethod;
-	NumLockMethods++;
-	Assert(NumLockMethods <= MAX_LOCK_METHODS);
+	Assert(NumLockMethods < MAX_LOCK_METHODS);
+	lockmethodid = NumLockMethods++;
+	LockMethods[lockmethodid] = newLockMethod;
 
 	/*
 	 * allocate a hash table for LOCK structs.	This is used to store
 	 * per-locked-object information.
 	 */
+	MemSet(&info, 0, sizeof(info));
 	info.keysize = sizeof(LOCKTAG);
 	info.entrysize = sizeof(LOCK);
 	info.hash = tag_hash;
 	hash_flags = (HASH_ELEM | HASH_FUNCTION);
 
 	sprintf(shmemName, "%s (lock hash)", tabName);
-	LockMethodLockHash[NumLockMethods-1] = ShmemInitHash(shmemName,
+	LockMethodLockHash[lockmethodid] = ShmemInitHash(shmemName,
 											init_table_size,
 											max_table_size,
 											&info,
 											hash_flags);
 
-	if (!LockMethodLockHash[NumLockMethods-1])
+	if (!LockMethodLockHash[lockmethodid])
 		elog(FATAL, "could not initialize lock table \"%s\"", tabName);
-	Assert(LockMethodLockHash[NumLockMethods-1]->hash == tag_hash);
 
 	/*
 	 * allocate a hash table for PROCLOCK structs.	This is used to store
-	 * per-lock-proclock information.
+	 * per-lock-holder information.
 	 */
 	info.keysize = sizeof(PROCLOCKTAG);
 	info.entrysize = sizeof(PROCLOCK);
@@ -295,18 +303,44 @@ LockMethodTableInit(const char *tabName,
 	hash_flags = (HASH_ELEM | HASH_FUNCTION);
 
 	sprintf(shmemName, "%s (proclock hash)", tabName);
-	LockMethodProcLockHash[NumLockMethods-1] = ShmemInitHash(shmemName,
+	LockMethodProcLockHash[lockmethodid] = ShmemInitHash(shmemName,
 												init_table_size,
 												max_table_size,
 												&info,
 												hash_flags);
 
-	if (!LockMethodProcLockHash[NumLockMethods-1])
+	if (!LockMethodProcLockHash[lockmethodid])
+		elog(FATAL, "could not initialize lock table \"%s\"", tabName);
+
+	/*
+	 * allocate a non-shared hash table for LOCALLOCK structs.  This is used
+	 * to store lock counts and resource owner information.
+	 *
+	 * The non-shared table could already exist in this process (this occurs
+	 * when the postmaster is recreating shared memory after a backend crash).
+	 * If so, delete and recreate it.  (We could simply leave it, since it
+	 * ought to be empty in the postmaster, but for safety let's zap it.)
+	 */
+	if (LockMethodLocalHash[lockmethodid])
+		hash_destroy(LockMethodLocalHash[lockmethodid]);
+
+	info.keysize = sizeof(LOCALLOCKTAG);
+	info.entrysize = sizeof(LOCALLOCK);
+	info.hash = tag_hash;
+	hash_flags = (HASH_ELEM | HASH_FUNCTION);
+
+	sprintf(shmemName, "%s (locallock hash)", tabName);
+	LockMethodLocalHash[lockmethodid] = hash_create(shmemName,
+													init_table_size,
+													&info,
+													hash_flags);
+
+	if (!LockMethodLocalHash[lockmethodid])
 		elog(FATAL, "could not initialize lock table \"%s\"", tabName);
 
 	pfree(shmemName);
 
-	return newLockMethod->lockmethodid;
+	return lockmethodid;
 }
 
 /*
@@ -339,6 +373,7 @@ LockMethodTableRename(LOCKMETHODID lockmethodid)
 	LockMethods[newLockMethodId] = LockMethods[lockmethodid];
 	LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid];
 	LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid];
+	LockMethodLocalHash[newLockMethodId] = LockMethodLocalHash[lockmethodid];
 
 	return newLockMethodId;
 }
@@ -408,11 +443,13 @@ bool
 LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 			TransactionId xid, LOCKMODE lockmode, bool dontWait)
 {
+	LOCALLOCKTAG localtag;
+	LOCALLOCK  *locallock;
+	LOCK	   *lock;
 	PROCLOCK   *proclock;
 	PROCLOCKTAG proclocktag;
-	HTAB	   *proclockTable;
 	bool		found;
-	LOCK	   *lock;
+	ResourceOwner owner;
 	LWLockId	masterLock;
 	LockMethod	lockMethodTable;
 	int			status;
@@ -428,9 +465,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 	/* ???????? This must be changed when short term locks will be used */
 	locktag->lockmethodid = lockmethodid;
 
-	/* Prepare to record the lock in the current resource owner */
-	ResourceOwnerEnlargeLocks(CurrentResourceOwner);
-
 	Assert(lockmethodid < NumLockMethods);
 	lockMethodTable = LockMethods[lockmethodid];
 	if (!lockMethodTable)
@@ -439,14 +473,82 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 		return FALSE;
 	}
 
+	/* Session locks and user locks are not transactional */
+	if (xid != InvalidTransactionId &&
+		lockmethodid == DEFAULT_LOCKMETHOD)
+		owner = CurrentResourceOwner;
+	else
+		owner = NULL;
+
+	/*
+	 * Find or create a LOCALLOCK entry for this lock and lockmode
+	 */
+	MemSet(&localtag, 0, sizeof(localtag));	/* must clear padding */
+	localtag.lock = *locktag;
+	localtag.xid = xid;
+	localtag.mode = lockmode;
+
+	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+										  (void *) &localtag,
+										  HASH_ENTER, &found);
+	if (!locallock)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory")));
+
+	/*
+	 * if it's a new locallock object, initialize it
+	 */
+	if (!found)
+	{
+		locallock->lock = NULL;
+		locallock->proclock = NULL;
+		locallock->nLocks = 0;
+		locallock->numLockOwners = 0;
+		locallock->maxLockOwners = 8;
+		locallock->lockOwners = NULL;
+		locallock->lockOwners = (LOCALLOCKOWNER *)
+			MemoryContextAlloc(TopMemoryContext,
+							   locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
+	}
+	else
+	{
+		/* Make sure there will be room to remember the lock */
+		if (locallock->numLockOwners >= locallock->maxLockOwners)
+		{
+			int	newsize = locallock->maxLockOwners * 2;
+
+			locallock->lockOwners = (LOCALLOCKOWNER *)
+				repalloc(locallock->lockOwners,
+						 newsize * sizeof(LOCALLOCKOWNER));
+			locallock->maxLockOwners = newsize;
+		}
+	}
+
+	/*
+	 * If we already hold the lock, we can just increase the count locally.
+	 */
+	if (locallock->nLocks > 0)
+	{
+		GrantLockLocal(locallock, owner);
+		return TRUE;
+	}
+
+	/*
+	 * Otherwise we've got to mess with the shared lock table.
+	 */
 	masterLock = lockMethodTable->masterLock;
 
 	LWLockAcquire(masterLock, LW_EXCLUSIVE);
 
 	/*
-	 * Find or create a lock with this tag
+	 * Find or create a lock with this tag.
+	 *
+	 * Note: if the locallock object already existed, it might have a pointer
+	 * to the lock already ... but we probably should not assume that that
+	 * pointer is valid, since a lock object with no locks can go away
+	 * anytime.
 	 */
-	Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash);
 	lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
 								(void *) locktag,
 								HASH_ENTER, &found);
@@ -458,6 +560,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 				 errmsg("out of shared memory"),
 				 errhint("You may need to increase max_locks_per_transaction.")));
 	}
+	locallock->lock = lock;
 
 	/*
 	 * if it's a new lock object, initialize it
@@ -466,7 +569,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 	{
 		lock->grantMask = 0;
 		lock->waitMask = 0;
-		SHMQueueInit(&(lock->lockHolders));
+		SHMQueueInit(&(lock->procLocks));
 		ProcQueueInit(&(lock->waitProcs));
 		lock->nRequested = 0;
 		lock->nGranted = 0;
@@ -485,8 +588,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 	/*
 	 * Create the hash key for the proclock table.
 	 */
-	MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));		/* must clear padding,
-														 * needed */
+	MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));	/* must clear padding */
 	proclocktag.lock = MAKE_OFFSET(lock);
 	proclocktag.proc = MAKE_OFFSET(MyProc);
 	TransactionIdStore(xid, &proclocktag.xid);
@@ -494,8 +596,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 	/*
 	 * Find or create a proclock entry with this tag
 	 */
-	proclockTable = LockMethodProcLockHash[lockmethodid];
-	proclock = (PROCLOCK *) hash_search(proclockTable,
+	proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
 										(void *) &proclocktag,
 										HASH_ENTER, &found);
 	if (!proclock)
@@ -506,24 +607,23 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 				 errmsg("out of shared memory"),
 				 errhint("You may need to increase max_locks_per_transaction.")));
 	}
+	locallock->proclock = proclock;
 
 	/*
 	 * If new, initialize the new entry
 	 */
 	if (!found)
 	{
-		proclock->nHolding = 0;
-		MemSet((char *) proclock->holding, 0, sizeof(int) * MAX_LOCKMODES);
+		proclock->holdMask = 0;
 		/* Add proclock to appropriate lists */
-		SHMQueueInsertBefore(&lock->lockHolders, &proclock->lockLink);
-		SHMQueueInsertBefore(&MyProc->procHolders, &proclock->procLink);
+		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
+		SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
 		PROCLOCK_PRINT("LockAcquire: new", proclock);
 	}
 	else
 	{
 		PROCLOCK_PRINT("LockAcquire: found", proclock);
-		Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0));
-		Assert(proclock->nHolding <= lock->nGranted);
+		Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
 #ifdef CHECK_DEADLOCK_RISK
 
@@ -544,7 +644,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 		 */
 		for (i = lockMethodTable->numLockModes; i > 0; i--)
 		{
-			if (proclock->holding[i] > 0)
+			if (proclock->holdMask & LOCKBIT_ON(i))
 			{
 				if (i >= (int) lockmode)
 					break;		/* safe: we have a lock >= req level */
@@ -568,29 +668,14 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
 
 	/*
-	 * If I already hold one or more locks of the requested type, just
-	 * grant myself another one without blocking.
-	 */
-	if (proclock->holding[lockmode] > 0)
-	{
-		GrantLock(lock, proclock, lockmode);
-		ResourceOwnerRememberLock(CurrentResourceOwner, locktag, xid,
-								  lockmode);
-		PROCLOCK_PRINT("LockAcquire: owning", proclock);
-		LWLockRelease(masterLock);
-		return TRUE;
-	}
-
-	/*
-	 * If this process (under any XID) is a proclock of the lock, also
+	 * If this process (under any XID) is a holder of the lock, just
 	 * grant myself another one without blocking.
 	 */
 	LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
 	if (myHolding[lockmode] > 0)
 	{
 		GrantLock(lock, proclock, lockmode);
-		ResourceOwnerRememberLock(CurrentResourceOwner, locktag, xid,
-								  lockmode);
+		GrantLockLocal(locallock, owner);
 		PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
 		LWLockRelease(masterLock);
 		return TRUE;
@@ -612,8 +697,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 	{
 		/* No conflict with held or previously requested locks */
 		GrantLock(lock, proclock, lockmode);
-		ResourceOwnerRememberLock(CurrentResourceOwner, locktag, xid,
-								  lockmode);
+		GrantLockLocal(locallock, owner);
 	}
 	else
 	{
@@ -621,29 +705,31 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 
 		/*
 		 * We can't acquire the lock immediately.  If caller specified no
-		 * blocking, remove the proclock entry and return FALSE without
+		 * blocking, remove useless table entries and return FALSE without
 		 * waiting.
 		 */
 		if (dontWait)
 		{
-			if (proclock->nHolding == 0)
+			if (proclock->holdMask == 0)
 			{
 				SHMQueueDelete(&proclock->lockLink);
 				SHMQueueDelete(&proclock->procLink);
-				proclock = (PROCLOCK *) hash_search(proclockTable,
-													(void *) proclock,
+				proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+													(void *) &(proclock->tag),
 													HASH_REMOVE, NULL);
 				if (!proclock)
 					elog(WARNING, "proclock table corrupted");
 			}
 			else
-				PROCLOCK_PRINT("LockAcquire: NHOLDING", proclock);
+				PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
 			lock->nRequested--;
 			lock->requested[lockmode]--;
 			LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
 			Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
 			Assert(lock->nGranted <= lock->nRequested);
 			LWLockRelease(masterLock);
+			if (locallock->nLocks == 0)
+				RemoveLocalLock(locallock);
 			return FALSE;
 		}
 
@@ -664,7 +750,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 		/*
 		 * Sleep till someone wakes me up.
 		 */
-		status = WaitOnLock(lockmethodid, lockmode, lock, proclock);
+		status = WaitOnLock(lockmethodid, locallock, owner);
 
 		/*
 		 * NOTE: do not do any material change of state between here and
@@ -677,7 +763,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 		 * Check the proclock entry status, in case something in the ipc
 		 * communication doesn't work correctly.
 		 */
-		if (!((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0)))
+		if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
 		{
 			PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
 			LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
@@ -694,6 +780,23 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 	return status == STATUS_OK;
 }
 
+/*
+ * Subroutine to free a locallock entry
+ */
+static void
+RemoveLocalLock(LOCALLOCK *locallock)
+{
+	LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
+
+	pfree(locallock->lockOwners);
+	locallock->lockOwners = NULL;
+	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+										  (void *) &(locallock->tag),
+										  HASH_REMOVE, NULL);
+	if (!locallock)
+		elog(WARNING, "locallock table corrupted");
+}
+
 /*
  * LockCheckConflicts -- test whether requested lock conflicts
  *		with those already granted
@@ -788,24 +891,29 @@ LockCheckConflicts(LockMethod lockMethodTable,
 static void
 LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
 {
-	SHM_QUEUE  *procHolders = &(proc->procHolders);
+	SHM_QUEUE  *procLocks = &(proc->procLocks);
 	PROCLOCK   *proclock;
-	int			i;
 
 	MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
 
-	proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
+	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
 										 offsetof(PROCLOCK, procLink));
 
 	while (proclock)
 	{
 		if (lockOffset == proclock->tag.lock)
 		{
+			LOCKMASK	holdMask = proclock->holdMask;
+			int			i;
+
 			for (i = 1; i < MAX_LOCKMODES; i++)
-				myHolding[i] += proclock->holding[i];
+			{
+				if (holdMask & LOCKBIT_ON(i))
+					myHolding[i]++;
+			}
 		}
 
-		proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
+		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
 										   offsetof(PROCLOCK, procLink));
 	}
 }
@@ -815,10 +923,11 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
  *		the lock request has been granted.
  *
  * NOTE: if proc was blocked, it also needs to be removed from the wait list
- * and have its waitLock/waitHolder fields cleared.  That's not done here.
+ * and have its waitLock/waitProcLock fields cleared.  That's not done here.
  *
- * NOTE: the lock also has to be recorded in the current ResourceOwner;
- * but since we may be awaking some other process, we can't do that here.
+ * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
+ * table entry; but since we may be awaking some other process, we can't do
+ * that here; it's done by GrantLockLocal, instead.
  */
 void
 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
@@ -828,12 +937,56 @@ GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
 	lock->grantMask |= LOCKBIT_ON(lockmode);
 	if (lock->granted[lockmode] == lock->requested[lockmode])
 		lock->waitMask &= LOCKBIT_OFF(lockmode);
+	proclock->holdMask |= LOCKBIT_ON(lockmode);
 	LOCK_PRINT("GrantLock", lock, lockmode);
 	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
 	Assert(lock->nGranted <= lock->nRequested);
-	proclock->holding[lockmode]++;
-	proclock->nHolding++;
-	Assert((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0));
+}
+
+/*
+ * GrantLockLocal -- update the locallock data structures to show
+ *		the lock request has been granted.
+ *
+ * We expect that LockAcquire made sure there is room to add a new
+ * ResourceOwner entry.
+ */
+static void
+GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
+{
+	LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+	int		i;
+
+	Assert(locallock->numLockOwners < locallock->maxLockOwners);
+	/* Count the total */
+	locallock->nLocks++;
+	/* Count the per-owner lock */
+	for (i = 0; i < locallock->numLockOwners; i++)
+	{
+		if (lockOwners[i].owner == owner)
+		{
+			lockOwners[i].nLocks++;
+			return;
+		}
+	}
+	lockOwners[i].owner = owner;
+	lockOwners[i].nLocks = 1;
+	locallock->numLockOwners++;
+}
+
+/*
+ * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
+ *		WaitOnLock on.
+ *
+ * proc.c needs this for the case where we are booted off the lock by
+ * timeout, but discover that someone granted us the lock anyway.
+ *
+ * We could just export GrantLockLocal, but that would require including
+ * resowner.h in lock.h, which creates circularity.
+ */
+void
+GrantAwaitedLock(void)
+{
+	GrantLockLocal(awaitedLock, awaitedOwner);
 }
 
 /*
@@ -845,8 +998,8 @@ GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
  * The locktable's masterLock must be held at entry.
  */
 static int
-WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode,
-		   LOCK *lock, PROCLOCK *proclock)
+WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
+		   ResourceOwner owner)
 {
 	LockMethod	lockMethodTable = LockMethods[lockmethodid];
 	char	   *new_status,
@@ -854,7 +1007,8 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode,
 
 	Assert(lockmethodid < NumLockMethods);
 
-	LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
+	LOCK_PRINT("WaitOnLock: sleeping on lock",
+			   locallock->lock, locallock->tag.mode);
 
 	old_status = pstrdup(get_ps_display());
 	new_status = (char *) palloc(strlen(old_status) + 10);
@@ -862,6 +1016,9 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode,
 	strcat(new_status, " waiting");
 	set_ps_display(new_status);
 
+	awaitedLock = locallock;
+	awaitedOwner = owner;
+
 	/*
 	 * NOTE: Think not to put any shared-state cleanup after the call to
 	 * ProcSleep, in either the normal or failure path.  The lock state
@@ -877,16 +1034,18 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode,
 	 */
 
 	if (ProcSleep(lockMethodTable,
-				  lockmode,
-				  lock,
-				  proclock) != STATUS_OK)
+				  locallock->tag.mode,
+				  locallock->lock,
+				  locallock->proclock) != STATUS_OK)
 	{
 		/*
 		 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
 		 * now.  Removal of the proclock and lock objects, if no longer
 		 * needed, will happen in xact cleanup (see above for motivation).
 		 */
-		LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
+		awaitedLock = NULL;
+		LOCK_PRINT("WaitOnLock: aborting on lock",
+				   locallock->lock, locallock->tag.mode);
 		LWLockRelease(lockMethodTable->masterLock);
 
 		/*
@@ -897,11 +1056,14 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode,
 		/* not reached */
 	}
 
+	awaitedLock = NULL;
+
 	set_ps_display(old_status);
 	pfree(old_status);
 	pfree(new_status);
 
-	LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
+	LOCK_PRINT("WaitOnLock: wakeup on lock",
+			   locallock->lock, locallock->tag.mode);
 	return STATUS_OK;
 }
 
@@ -944,7 +1106,7 @@ RemoveFromWaitQueue(PGPROC *proc)
 
 	/* Clean up the proc's own state */
 	proc->waitLock = NULL;
-	proc->waitHolder = NULL;
+	proc->waitProcLock = NULL;
 
 	/* See if any other waiters for the lock can be woken up now */
 	ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
@@ -964,12 +1126,12 @@ bool
 LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 			TransactionId xid, LOCKMODE lockmode)
 {
+	LOCALLOCKTAG localtag;
+	LOCALLOCK  *locallock;
 	LOCK	   *lock;
+	PROCLOCK   *proclock;
 	LWLockId	masterLock;
 	LockMethod	lockMethodTable;
-	PROCLOCK   *proclock;
-	PROCLOCKTAG proclocktag;
-	HTAB	   *proclockTable;
 	bool		wakeupNeeded = false;
 
 #ifdef LOCK_DEBUG
@@ -980,9 +1142,6 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 	/* ???????? This must be changed when short term locks will be used */
 	locktag->lockmethodid = lockmethodid;
 
-	/* Record release of the lock in the current resource owner */
-	ResourceOwnerForgetLock(CurrentResourceOwner, locktag, xid, lockmode);
-
 	Assert(lockmethodid < NumLockMethods);
 	lockMethodTable = LockMethods[lockmethodid];
 	if (!lockMethodTable)
@@ -991,69 +1150,107 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 		return FALSE;
 	}
 
-	masterLock = lockMethodTable->masterLock;
-	LWLockAcquire(masterLock, LW_EXCLUSIVE);
-
 	/*
-	 * Find a lock with this tag
+	 * Find the LOCALLOCK entry for this lock and lockmode
 	 */
-	Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash);
-	lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
-								(void *) locktag,
-								HASH_FIND, NULL);
+	MemSet(&localtag, 0, sizeof(localtag));	/* must clear padding */
+	localtag.lock = *locktag;
+	localtag.xid = xid;
+	localtag.mode = lockmode;
+
+	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+										  (void *) &localtag,
+										  HASH_FIND, NULL);
 
 	/*
 	 * let the caller print its own error message, too. Do not
 	 * ereport(ERROR).
 	 */
-	if (!lock)
+	if (!locallock || locallock->nLocks <= 0)
 	{
-		LWLockRelease(masterLock);
-		elog(WARNING, "no such lock");
+		elog(WARNING, "you don't own a lock of type %s",
+			 lock_mode_names[lockmode]);
 		return FALSE;
 	}
-	LOCK_PRINT("LockRelease: found", lock, lockmode);
 
 	/*
-	 * Find the proclock entry for this proclock.
+	 * Decrease the count for the resource owner.
 	 */
-	MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));		/* must clear padding,
-														 * needed */
-	proclocktag.lock = MAKE_OFFSET(lock);
-	proclocktag.proc = MAKE_OFFSET(MyProc);
-	TransactionIdStore(xid, &proclocktag.xid);
-
-	proclockTable = LockMethodProcLockHash[lockmethodid];
-	proclock = (PROCLOCK *) hash_search(proclockTable,
-										(void *) &proclocktag,
-										HASH_FIND_SAVE, NULL);
-	if (!proclock)
 	{
-		LWLockRelease(masterLock);
-#ifdef USER_LOCKS
-		if (lockmethodid == USER_LOCKMETHOD)
-			elog(WARNING, "no lock with this tag");
+		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+		ResourceOwner owner;
+		int		i;
+
+		/* Session locks and user locks are not transactional */
+		if (xid != InvalidTransactionId &&
+			lockmethodid == DEFAULT_LOCKMETHOD)
+			owner = CurrentResourceOwner;
 		else
-#endif
-			elog(WARNING, "proclock table corrupted");
-		return FALSE;
+			owner = NULL;
+
+		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		{
+			if (lockOwners[i].owner == owner)
+			{
+				Assert(lockOwners[i].nLocks > 0);
+				if (--lockOwners[i].nLocks == 0)
+				{
+					/* compact out unused slot */
+					locallock->numLockOwners--;
+					if (i < locallock->numLockOwners)
+						lockOwners[i] = lockOwners[locallock->numLockOwners];
+				}
+				break;
+			}
+		}
+		if (i < 0)
+		{
+			/* don't release a lock belonging to another owner */
+			elog(WARNING, "you don't own a lock of type %s",
+				 lock_mode_names[lockmode]);
+			return FALSE;
+		}
 	}
+
+	/*
+	 * Decrease the total local count.  If we're still holding the lock,
+	 * we're done.
+	 */
+	locallock->nLocks--;
+
+	if (locallock->nLocks > 0)
+		return TRUE;
+
+	/*
+	 * Otherwise we've got to mess with the shared lock table.
+	 */
+	masterLock = lockMethodTable->masterLock;
+
+	LWLockAcquire(masterLock, LW_EXCLUSIVE);
+
+	/*
+	 * We don't need to re-find the lock or proclock, since we kept their
+	 * addresses in the locallock table, and they couldn't have been
+	 * removed while we were holding a lock on them.
+	 */
+	lock = locallock->lock;
+	LOCK_PRINT("LockRelease: found", lock, lockmode);
+	proclock = locallock->proclock;
 	PROCLOCK_PRINT("LockRelease: found", proclock);
 
 	/*
-	 * Check that we are actually holding a lock of the type we want to
+	 * Double-check that we are actually holding a lock of the type we want to
 	 * release.
 	 */
-	if (!(proclock->holding[lockmode] > 0))
+	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
 	{
 		PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
-		Assert(proclock->holding[lockmode] >= 0);
 		LWLockRelease(masterLock);
 		elog(WARNING, "you don't own a lock of type %s",
 			 lock_mode_names[lockmode]);
+		RemoveLocalLock(locallock);
 		return FALSE;
 	}
-	Assert(proclock->nHolding > 0);
 	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
 	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
 	Assert(lock->nGranted <= lock->nRequested);
@@ -1089,67 +1286,69 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
 		wakeupNeeded = true;
 
-	if (lock->nRequested == 0)
-	{
-		/*
-		 * if there's no one waiting in the queue, we just released the
-		 * last lock on this object. Delete it from the lock table.
-		 */
-		Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash);
-		lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
-									(void *) &(lock->tag),
-									HASH_REMOVE,
-									NULL);
-		if (!lock)
-		{
-			LWLockRelease(masterLock);
-			elog(WARNING, "lock table corrupted");
-			return FALSE;
-		}
-		wakeupNeeded = false;	/* should be false, but make sure */
-	}
-
 	/*
-	 * Now fix the per-proclock lock stats.
+	 * Now fix the per-proclock state.
 	 */
-	proclock->holding[lockmode]--;
-	proclock->nHolding--;
+	proclock->holdMask &= LOCKBIT_OFF(lockmode);
 	PROCLOCK_PRINT("LockRelease: updated", proclock);
-	Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0));
 
 	/*
 	 * If this was my last hold on this lock, delete my entry in the
 	 * proclock table.
 	 */
-	if (proclock->nHolding == 0)
+	if (proclock->holdMask == 0)
 	{
 		PROCLOCK_PRINT("LockRelease: deleting", proclock);
 		SHMQueueDelete(&proclock->lockLink);
 		SHMQueueDelete(&proclock->procLink);
-		proclock = (PROCLOCK *) hash_search(proclockTable,
-											(void *) &proclock,
-											HASH_REMOVE_SAVED, NULL);
+		proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+											(void *) &(proclock->tag),
+											HASH_REMOVE, NULL);
 		if (!proclock)
 		{
 			LWLockRelease(masterLock);
 			elog(WARNING, "proclock table corrupted");
+			RemoveLocalLock(locallock);
 			return FALSE;
 		}
 	}
 
-	/*
-	 * Wake up waiters if needed.
-	 */
-	if (wakeupNeeded)
-		ProcLockWakeup(lockMethodTable, lock);
+	if (lock->nRequested == 0)
+	{
+		/*
+		 * We've just released the last lock, so garbage-collect the
+		 * lock object.
+		 */
+		Assert(SHMQueueEmpty(&(lock->procLocks)));
+		lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+									(void *) &(lock->tag),
+									HASH_REMOVE, NULL);
+		if (!lock)
+		{
+			LWLockRelease(masterLock);
+			elog(WARNING, "lock table corrupted");
+			RemoveLocalLock(locallock);
+			return FALSE;
+		}
+	}
+	else
+	{
+		/*
+		 * Wake up waiters if needed.
+		 */
+		if (wakeupNeeded)
+			ProcLockWakeup(lockMethodTable, lock);
+	}
 
 	LWLockRelease(masterLock);
+
+	RemoveLocalLock(locallock);
 	return TRUE;
 }
 
 /*
  * LockReleaseAll -- Release all locks of the specified lock method that
- *		are held by the specified process.
+ *		are held by the current process.
  *
  * Well, not necessarily *all* locks.  The available behaviors are:
  *
@@ -1160,22 +1359,21 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
  * (zero is the Xid used for "session" locks).
  */
 bool
-LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc,
-			   bool allxids)
+LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
 {
-	SHM_QUEUE  *procHolders = &(proc->procHolders);
-	PROCLOCK   *proclock;
-	PROCLOCK   *nextHolder;
+	HASH_SEQ_STATUS status;
+	SHM_QUEUE  *procLocks = &(MyProc->procLocks);
 	LWLockId	masterLock;
 	LockMethod	lockMethodTable;
 	int			i,
 				numLockModes;
+	LOCALLOCK   *locallock;
+	PROCLOCK   *proclock;
 	LOCK	   *lock;
 
 #ifdef LOCK_DEBUG
 	if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
-		elog(LOG, "LockReleaseAll: lockmethod=%d, pid=%d",
-			 lockmethodid, proc->pid);
+		elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
 #endif
 
 	Assert(lockmethodid < NumLockMethods);
@@ -1189,20 +1387,55 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc,
 	numLockModes = lockMethodTable->numLockModes;
 	masterLock = lockMethodTable->masterLock;
 
+	/*
+	 * First we run through the locallock table and get rid of unwanted
+	 * entries, then we scan the process's proclocks and get rid of those.
+	 * We do this separately because we may have multiple locallock entries
+	 * pointing to the same proclock, and we daren't end up with any
+	 * dangling pointers.
+	 */
+	hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
+
+	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+	{
+		if (locallock->proclock == NULL || locallock->lock == NULL)
+		{
+			/*
+			 * We must've run out of shared memory while trying to set up
+			 * this lock.  Just forget the local entry.
+			 */
+			Assert(locallock->nLocks == 0);
+			RemoveLocalLock(locallock);
+			continue;
+		}
+
+		/* Ignore items that are not of the lockmethod to be removed */
+		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
+			continue;
+
+		/* Ignore locks with Xid=0 unless we are asked to release all locks */
+		if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)
+			&& !allxids)
+			continue;
+
+		RemoveLocalLock(locallock);
+	}
+
 	LWLockAcquire(masterLock, LW_EXCLUSIVE);
 
-	proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
+	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
 										 offsetof(PROCLOCK, procLink));
 
 	while (proclock)
 	{
 		bool		wakeupNeeded = false;
+		PROCLOCK   *nextHolder;
 
 		/* Get link first, since we may unlink/delete this proclock */
-		nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
+		nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
 										   offsetof(PROCLOCK, procLink));
 
-		Assert(proclock->tag.proc == MAKE_OFFSET(proc));
+		Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
 
 		lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
 
@@ -1220,24 +1453,24 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc,
 		Assert(lock->nRequested >= 0);
 		Assert(lock->nGranted >= 0);
 		Assert(lock->nGranted <= lock->nRequested);
-		Assert(proclock->nHolding >= 0);
-		Assert(proclock->nHolding <= lock->nRequested);
+		Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
 		/*
 		 * fix the general lock stats
 		 */
-		if (lock->nRequested != proclock->nHolding)
+		if (proclock->holdMask)
 		{
 			for (i = 1; i <= numLockModes; i++)
 			{
-				Assert(proclock->holding[i] >= 0);
-				if (proclock->holding[i] > 0)
+				if (proclock->holdMask & LOCKBIT_ON(i))
 				{
-					lock->requested[i] -= proclock->holding[i];
-					lock->granted[i] -= proclock->holding[i];
+					lock->requested[i]--;
+					lock->granted[i]--;
 					Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
 					if (lock->granted[i] == 0)
 						lock->grantMask &= LOCKBIT_OFF(i);
+					lock->nRequested--;
+					lock->nGranted--;
 
 					/*
 					 * Read comments in LockRelease
@@ -1247,26 +1480,9 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc,
 						wakeupNeeded = true;
 				}
 			}
-			lock->nRequested -= proclock->nHolding;
-			lock->nGranted -= proclock->nHolding;
-			Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
-			Assert(lock->nGranted <= lock->nRequested);
-		}
-		else
-		{
-			/*
-			 * This proclock accounts for all the requested locks on the
-			 * object, so we can be lazy and just zero things out.
-			 */
-			lock->nRequested = 0;
-			lock->nGranted = 0;
-			/* Fix the lock status, just for next LOCK_PRINT message. */
-			for (i = 1; i <= numLockModes; i++)
-			{
-				Assert(lock->requested[i] == lock->granted[i]);
-				lock->requested[i] = lock->granted[i] = 0;
-			}
 		}
+		Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
+		Assert(lock->nGranted <= lock->nRequested);
 		LOCK_PRINT("LockReleaseAll: updated", lock, 0);
 
 		PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
@@ -1281,7 +1497,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc,
 		 * remove the proclock entry from the hashtable
 		 */
 		proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
-											(void *) proclock,
+											(void *) &(proclock->tag),
 											HASH_REMOVE,
 											NULL);
 		if (!proclock)
@@ -1298,7 +1514,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc,
 			 * lock object.
 			 */
 			LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
-			Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash);
+			Assert(SHMQueueEmpty(&(lock->procLocks)));
 			lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
 										(void *) &(lock->tag),
 										HASH_REMOVE, NULL);
@@ -1326,6 +1542,130 @@ next_item:
 	return TRUE;
 }
 
+/*
+ * LockReleaseCurrentOwner
+ *		Release all locks belonging to CurrentResourceOwner
+ *
+ * Only DEFAULT_LOCKMETHOD locks can belong to a resource owner.
+ */
+void
+LockReleaseCurrentOwner(void)
+{
+	HASH_SEQ_STATUS status;
+	LOCALLOCK   *locallock;
+	LOCALLOCKOWNER *lockOwners;
+	int			i;
+
+	hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
+
+	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+	{
+		/* Ignore items that must be nontransactional */
+		if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
+			continue;
+		if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
+			continue;
+
+		/* Scan to see if there are any locks belonging to current owner */
+		lockOwners = locallock->lockOwners;
+		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		{
+			if (lockOwners[i].owner == CurrentResourceOwner)
+			{
+				Assert(lockOwners[i].nLocks > 0);
+				if (lockOwners[i].nLocks < locallock->nLocks)
+				{
+					/*
+					 * We will still hold this lock after forgetting this
+					 * ResourceOwner.
+					 */
+					locallock->nLocks -= lockOwners[i].nLocks;
+					/* compact out unused slot */
+					locallock->numLockOwners--;
+					if (i < locallock->numLockOwners)
+						lockOwners[i] = lockOwners[locallock->numLockOwners];
+				}
+				else
+				{
+					Assert(lockOwners[i].nLocks == locallock->nLocks);
+					/* We want to call LockRelease just once */
+					lockOwners[i].nLocks = 1;
+					locallock->nLocks = 1;
+					if (!LockRelease(DEFAULT_LOCKMETHOD,
+									 &locallock->tag.lock,
+									 locallock->tag.xid,
+									 locallock->tag.mode))
+						elog(WARNING, "LockReleaseCurrentOwner: failed??");
+				}
+				break;
+			}
+		}
+	}
+}
+
+/*
+ * LockReassignCurrentOwner
+ *		Reassign all locks belonging to CurrentResourceOwner to belong
+ *		to its parent resource owner
+ */
+void
+LockReassignCurrentOwner(void)
+{
+	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
+	HASH_SEQ_STATUS status;
+	LOCALLOCK   *locallock;
+	LOCALLOCKOWNER *lockOwners;
+
+	Assert(parent != NULL);
+
+	hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
+
+	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+	{
+		int			i;
+		int			ic = -1;
+		int			ip = -1;
+
+		/* Ignore items that must be nontransactional */
+		if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
+			continue;
+		if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
+			continue;
+
+		/*
+		 * Scan to see if there are any locks belonging to current owner
+		 * or its parent
+		 */
+		lockOwners = locallock->lockOwners;
+		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		{
+			if (lockOwners[i].owner == CurrentResourceOwner)
+				ic = i;
+			else if (lockOwners[i].owner == parent)
+				ip = i;
+		}
+
+		if (ic < 0)
+			continue;			/* no current locks */
+
+		if (ip < 0)
+		{
+			/* Parent has no slot, so just give it child's slot */
+			lockOwners[ic].owner = parent;
+		}
+		else
+		{
+			/* Merge child's count with parent's */
+			lockOwners[ip].nLocks += lockOwners[ic].nLocks;
+			/* compact out unused slot */
+			locallock->numLockOwners--;
+			if (ic < locallock->numLockOwners)
+				lockOwners[ic] = lockOwners[locallock->numLockOwners];
+		}
+	}
+}
+
+
 int
 LockShmemSize(int maxBackends)
 {
@@ -1420,7 +1760,7 @@ GetLockmodeName(LOCKMODE mode)
 
 #ifdef LOCK_DEBUG
 /*
- * Dump all locks in the proc->procHolders list.
+ * Dump all locks in the MyProc->procLocks list.
  *
  * Must have already acquired the masterLock.
  */
@@ -1428,7 +1768,7 @@ void
 DumpLocks(void)
 {
 	PGPROC	   *proc;
-	SHM_QUEUE  *procHolders;
+	SHM_QUEUE  *procLocks;
 	PROCLOCK   *proclock;
 	LOCK	   *lock;
 	int			lockmethodid = DEFAULT_LOCKMETHOD;
@@ -1438,7 +1778,7 @@ DumpLocks(void)
 	if (proc == NULL)
 		return;
 
-	procHolders = &proc->procHolders;
+	procLocks = &proc->procLocks;
 
 	Assert(lockmethodid < NumLockMethods);
 	lockMethodTable = LockMethods[lockmethodid];
@@ -1448,7 +1788,7 @@ DumpLocks(void)
 	if (proc->waitLock)
 		LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
 
-	proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
+	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
 										 offsetof(PROCLOCK, procLink));
 
 	while (proclock)
@@ -1460,7 +1800,7 @@ DumpLocks(void)
 		PROCLOCK_PRINT("DumpLocks", proclock);
 		LOCK_PRINT("DumpLocks", lock, 0);
 
-		proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
+		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
 										   offsetof(PROCLOCK, procLink));
 	}
 }
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 3c7249ffc13e8a0c280ee47691c49b0628c6b311..8a0d30e8672569aa05435cd9d3118db979d0ad7c 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.150 2004/07/17 03:28:51 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.151 2004/08/27 17:07:41 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,11 +46,11 @@
 
 #include "miscadmin.h"
 #include "access/xact.h"
+#include "storage/bufmgr.h"
 #include "storage/ipc.h"
 #include "storage/proc.h"
 #include "storage/sinval.h"
 #include "storage/spin.h"
-#include "utils/resowner.h"
 
 
 /* GUC variables */
@@ -76,11 +76,6 @@ static PGPROC *DummyProcs = NULL;
 static bool waitingForLock = false;
 static bool waitingForSignal = false;
 
-/* Auxiliary state, valid when waitingForLock is true */
-static LOCKTAG waitingForLockTag;
-static TransactionId waitingForLockXid;
-static LOCKMODE waitingForLockMode;
-
 /* Mark these volatile because they can be changed by signal handler */
 static volatile bool statement_timeout_active = false;
 static volatile bool deadlock_timeout_active = false;
@@ -250,8 +245,8 @@ InitProcess(void)
 	MyProc->lwExclusive = false;
 	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
-	MyProc->waitHolder = NULL;
-	SHMQueueInit(&(MyProc->procHolders));
+	MyProc->waitProcLock = NULL;
+	SHMQueueInit(&(MyProc->procLocks));
 
 	/*
 	 * Arrange to clean up at backend exit.
@@ -323,8 +318,8 @@ InitDummyProcess(int proctype)
 	MyProc->lwExclusive = false;
 	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
-	MyProc->waitHolder = NULL;
-	SHMQueueInit(&(MyProc->procHolders));
+	MyProc->waitProcLock = NULL;
+	SHMQueueInit(&(MyProc->procLocks));
 
 	/*
 	 * Arrange to clean up at process exit.
@@ -372,18 +367,10 @@ LockWaitCancel(void)
 		 * Somebody kicked us off the lock queue already.  Perhaps they
 		 * granted us the lock, or perhaps they detected a deadlock.
 		 * If they did grant us the lock, we'd better remember it in
-		 * CurrentResourceOwner.
-		 *
-		 * Exception: if CurrentResourceOwner is NULL then we can't do
-		 * anything.  This could only happen when we are invoked from ProcKill
-		 * or some similar place, where all our locks are about to be released
-		 * anyway.
+		 * our local lock table.
 		 */
-		if (MyProc->waitStatus == STATUS_OK && CurrentResourceOwner != NULL)
-			ResourceOwnerRememberLock(CurrentResourceOwner,
-									  &waitingForLockTag,
-									  waitingForLockXid,
-									  waitingForLockMode);
+		if (MyProc->waitStatus == STATUS_OK)
+			GrantAwaitedLock();
 	}
 
 	waitingForLock = false;
@@ -433,7 +420,7 @@ ProcReleaseLocks(bool isCommit)
 	/* If waiting, get off wait queue (should only be needed after error) */
 	LockWaitCancel();
 	/* Release locks */
-	LockReleaseAll(DEFAULT_LOCKMETHOD, MyProc, !isCommit);
+	LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit);
 }
 
 
@@ -466,11 +453,11 @@ ProcKill(int code, Datum arg)
 	LockWaitCancel();
 
 	/* Remove from the standard lock table */
-	LockReleaseAll(DEFAULT_LOCKMETHOD, MyProc, true);
+	LockReleaseAll(DEFAULT_LOCKMETHOD, true);
 
 #ifdef USER_LOCKS
 	/* Remove from the user lock table */
-	LockReleaseAll(USER_LOCKMETHOD, MyProc, true);
+	LockReleaseAll(USER_LOCKMETHOD, true);
 #endif
 
 	SpinLockAcquire(ProcStructLock);
@@ -644,10 +631,7 @@ ProcSleep(LockMethod lockMethodTable,
 				{
 					/* Skip the wait and just grant myself the lock. */
 					GrantLock(lock, proclock, lockmode);
-					ResourceOwnerRememberLock(CurrentResourceOwner,
-											  &lock->tag,
-											  proclock->tag.xid,
-											  lockmode);
+					GrantAwaitedLock();
 					return STATUS_OK;
 				}
 				/* Break out of loop to put myself before him */
@@ -680,7 +664,7 @@ ProcSleep(LockMethod lockMethodTable,
 
 	/* Set up wait information in PGPROC object, too */
 	MyProc->waitLock = lock;
-	MyProc->waitHolder = proclock;
+	MyProc->waitProcLock = proclock;
 	MyProc->waitLockMode = lockmode;
 
 	MyProc->waitStatus = STATUS_ERROR;	/* initialize result for error */
@@ -697,9 +681,6 @@ ProcSleep(LockMethod lockMethodTable,
 	}
 
 	/* mark that we are waiting for a lock */
-	waitingForLockTag = lock->tag;
-	waitingForLockXid = proclock->tag.xid;
-	waitingForLockMode = lockmode;
 	waitingForLock = true;
 
 	/*
@@ -737,8 +718,8 @@ ProcSleep(LockMethod lockMethodTable,
 	 * promise that we don't mind losing control to a cancel/die interrupt
 	 * here.  We don't, because we have no shared-state-change work to do
 	 * after being granted the lock (the grantor did it all).  We do have
-	 * to worry about updating the local CurrentResourceOwner, but if we
-	 * lose control to an error, LockWaitCancel will fix that up.
+	 * to worry about updating the locallock table, but if we lose control
+	 * to an error, LockWaitCancel will fix that up.
 	 */
 	PGSemaphoreLock(&MyProc->sem, true);
 
@@ -751,8 +732,7 @@ ProcSleep(LockMethod lockMethodTable,
 	/*
 	 * Re-acquire the locktable's masterLock.  We have to do this to hold
 	 * off cancel/die interrupts before we can mess with waitingForLock
-	 * (else we might have a missed or duplicated CurrentResourceOwner
-	 * update).
+	 * (else we might have a missed or duplicated locallock update).
 	 */
 	LWLockAcquire(masterLock, LW_EXCLUSIVE);
 
@@ -762,13 +742,10 @@ ProcSleep(LockMethod lockMethodTable,
 	waitingForLock = false;
 
 	/*
-	 * If we got the lock, be sure to remember it in CurrentResourceOwner.
+	 * If we got the lock, be sure to remember it in the locallock table.
 	 */
 	if (MyProc->waitStatus == STATUS_OK)
-		ResourceOwnerRememberLock(CurrentResourceOwner,
-								  &lock->tag,
-								  proclock->tag.xid,
-								  lockmode);
+		GrantAwaitedLock();
 
 	/*
 	 * We don't have to do anything else, because the awaker did all the
@@ -809,7 +786,7 @@ ProcWakeup(PGPROC *proc, int waitStatus)
 
 	/* Clean up process' state and pass it the ok/fail signal */
 	proc->waitLock = NULL;
-	proc->waitHolder = NULL;
+	proc->waitProcLock = NULL;
 	proc->waitStatus = waitStatus;
 
 	/* And awaken it */
@@ -850,12 +827,12 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
 			LockCheckConflicts(lockMethodTable,
 							   lockmode,
 							   lock,
-							   proc->waitHolder,
+							   proc->waitProcLock,
 							   proc,
 							   NULL) == STATUS_OK)
 		{
 			/* OK to waken */
-			GrantLock(lock, proc->waitHolder, lockmode);
+			GrantLock(lock, proc->waitProcLock, lockmode);
 			proc = ProcWakeup(proc, STATUS_OK);
 
 			/*
diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c
index bc4901aa5463c634533340e8e874219e1277c1ec..5afa1a551a316956f621a1d58c6d08a75e076471 100644
--- a/src/backend/utils/adt/lockfuncs.c
+++ b/src/backend/utils/adt/lockfuncs.c
@@ -6,7 +6,7 @@
  * Copyright (c) 2002-2003, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
- *		$PostgreSQL: pgsql/src/backend/utils/adt/lockfuncs.c,v 1.13 2004/04/01 21:28:45 tgl Exp $
+ *		$PostgreSQL: pgsql/src/backend/utils/adt/lockfuncs.c,v 1.14 2004/08/27 17:07:41 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -92,7 +92,7 @@ pg_lock_status(PG_FUNCTION_ARGS)
 		LOCK	   *lock;
 		PGPROC	   *proc;
 		bool		granted;
-		LOCKMODE	mode;
+		LOCKMODE	mode = 0;
 		Datum		values[6];
 		char		nulls[6];
 		HeapTuple	tuple;
@@ -108,13 +108,16 @@ pg_lock_status(PG_FUNCTION_ARGS)
 		 * report again.
 		 */
 		granted = false;
-		for (mode = 0; mode < MAX_LOCKMODES; mode++)
+		if (proclock->holdMask)
 		{
-			if (proclock->holding[mode] > 0)
+			for (mode = 0; mode < MAX_LOCKMODES; mode++)
 			{
-				granted = true;
-				proclock->holding[mode] = 0;
-				break;
+				if (proclock->holdMask & LOCKBIT_ON(mode))
+				{
+					granted = true;
+					proclock->holdMask &= LOCKBIT_OFF(mode);
+					break;
+				}
 			}
 		}
 
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 05bd9b5ab5cf818085d30ec91637b8d5b6b0e848..77f9cd63f2f509fc429fa861069d1577ccde3877 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -14,7 +14,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/utils/resowner/resowner.c,v 1.3 2004/08/25 18:43:43 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/utils/resowner/resowner.c,v 1.4 2004/08/27 17:07:41 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -30,18 +30,6 @@
 #include "utils/relcache.h"
 
 
-/*
- * Info needed to identify/release a lock
- */
-typedef struct LockIdData
-{
-	/* we assume lockmethodid is part of locktag */
-	LOCKTAG		locktag;
-	TransactionId xid;
-	LOCKMODE	lockmode;
-} LockIdData;
-
-
 /*
  * ResourceOwner objects look like this
  */
@@ -57,11 +45,6 @@ typedef struct ResourceOwnerData
 	Buffer	   *buffers;		/* dynamically allocated array */
 	int			maxbuffers;		/* currently allocated array size */
 
-	/* We have built-in support for remembering owned locks */
-	int			nlocks;			/* number of owned locks */
-	LockIdData *locks;			/* dynamically allocated array */
-	int			maxlocks;		/* currently allocated array size */
-
 	/* We have built-in support for remembering catcache references */
 	int			ncatrefs;		/* number of owned catcache pins */
 	HeapTuple  *catrefs;		/* dynamically allocated array */
@@ -274,44 +257,19 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 			 */
 			if (owner == TopTransactionResourceOwner)
 				ProcReleaseLocks(isCommit);
-			/* Mark object as holding no locks, just for sanity */
-			owner->nlocks = 0;
 		}
 		else
 		{
 			/*
-			 * Release locks retail.  Note that LockRelease will remove
-			 * the lock entry from my list, so I just have to iterate till
-			 * there are none.  Also note that if we are committing a
+			 * Release locks retail.  Note that if we are committing a
 			 * subtransaction, we do NOT release its locks yet, but transfer
 			 * them to the parent.
-			 *
-			 * XXX as above, this is a bit inefficient but probably not worth
-			 * the trouble to optimize more.
 			 */
 			Assert(owner->parent != NULL);
-			while (owner->nlocks > 0)
-			{
-				LockIdData *lockid = &owner->locks[owner->nlocks - 1];
-
-				if (isCommit)
-				{
-					ResourceOwnerEnlargeLocks(owner->parent);
-					ResourceOwnerRememberLock(owner->parent,
-											  &lockid->locktag,
-											  lockid->xid,
-											  lockid->lockmode);
-					owner->nlocks--;
-				}
-				else
-				{
-					LockRelease(lockid->locktag.lockmethodid,
-								&lockid->locktag,
-								lockid->xid,
-								lockid->lockmode);
-					/* LockRelease will have removed the entry from list */
-				}
-			}
+			if (isCommit)
+				LockReassignCurrentOwner();
+			else
+				LockReleaseCurrentOwner();
 		}
 	}
 	else if (phase == RESOURCE_RELEASE_AFTER_LOCKS)
@@ -368,7 +326,6 @@ ResourceOwnerDelete(ResourceOwner owner)
 
 	/* And it better not own any resources, either */
 	Assert(owner->nbuffers == 0);
-	Assert(owner->nlocks == 0);
 	Assert(owner->ncatrefs == 0);
 	Assert(owner->ncatlistrefs == 0);
 	Assert(owner->nrelrefs == 0);
@@ -390,8 +347,6 @@ ResourceOwnerDelete(ResourceOwner owner)
 	/* And free the object. */
 	if (owner->buffers)
 		pfree(owner->buffers);
-	if (owner->locks)
-		pfree(owner->locks);
 	if (owner->catrefs)
 		pfree(owner->catrefs);
 	if (owner->catlistrefs)
@@ -402,6 +357,15 @@ ResourceOwnerDelete(ResourceOwner owner)
 	pfree(owner);
 }
 
+/*
+ * Fetch parent of a ResourceOwner (returns NULL if top-level owner)
+ */
+ResourceOwner
+ResourceOwnerGetParent(ResourceOwner owner)
+{
+	return owner->parent;
+}
+
 /*
  * Reassign a ResourceOwner to have a new parent
  */
@@ -581,97 +545,6 @@ ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer)
 	}
 }
 
-/*
- * Make sure there is room for at least one more entry in a ResourceOwner's
- * lock array.
- *
- * This is separate from actually inserting an entry because if we run out
- * of memory, it's critical to do so *before* acquiring the resource.
- */
-void
-ResourceOwnerEnlargeLocks(ResourceOwner owner)
-{
-	int			newmax;
-
-	if (owner->nlocks < owner->maxlocks)
-		return;					/* nothing to do */
-
-	if (owner->locks == NULL)
-	{
-		newmax = 16;
-		owner->locks = (LockIdData *)
-			MemoryContextAlloc(TopMemoryContext, newmax * sizeof(LockIdData));
-		owner->maxlocks = newmax;
-	}
-	else
-	{
-		newmax = owner->maxlocks * 2;
-		owner->locks = (LockIdData *)
-			repalloc(owner->locks, newmax * sizeof(LockIdData));
-		owner->maxlocks = newmax;
-	}
-}
-
-/*
- * Remember that a lock is owned by a ResourceOwner
- *
- * Caller must have previously done ResourceOwnerEnlargeLocks()
- */
-void
-ResourceOwnerRememberLock(ResourceOwner owner,
-						  LOCKTAG *locktag,
-						  TransactionId xid,
-						  LOCKMODE lockmode)
-{
-	/* Session locks and user locks are not transactional */
-	if (xid != InvalidTransactionId &&
-		locktag->lockmethodid == DEFAULT_LOCKMETHOD)
-	{
-		Assert(owner->nlocks < owner->maxlocks);
-		owner->locks[owner->nlocks].locktag = *locktag;
-		owner->locks[owner->nlocks].xid = xid;
-		owner->locks[owner->nlocks].lockmode = lockmode;
-		owner->nlocks++;
-	}
-}
-
-/*
- * Forget that a lock is owned by a ResourceOwner
- */
-void
-ResourceOwnerForgetLock(ResourceOwner owner,
-						LOCKTAG *locktag,
-						TransactionId xid,
-						LOCKMODE lockmode)
-{
-	/* Session locks and user locks are not transactional */
-	if (xid != InvalidTransactionId &&
-		locktag->lockmethodid == DEFAULT_LOCKMETHOD)
-	{
-		LockIdData *locks = owner->locks;
-		int			nl1 = owner->nlocks - 1;
-		int			i;
-
-		for (i = nl1; i >= 0; i--)
-		{
-			if (memcmp(&locks[i].locktag, locktag, sizeof(LOCKTAG)) == 0 &&
-				locks[i].xid == xid &&
-				locks[i].lockmode == lockmode)
-			{
-				while (i < nl1)
-				{
-					locks[i] = locks[i + 1];
-					i++;
-				}
-				owner->nlocks = nl1;
-				return;
-			}
-		}
-		elog(ERROR, "lock %u/%u/%u is not owned by resource owner %s",
-			 locktag->relId, locktag->dbId, locktag->objId.xid, owner->name);
-	}
-}
-
 /*
  * Make sure there is room for at least one more entry in a ResourceOwner's
  * catcache reference array.
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 0c0c66077f738e6b6983c8c5843af1b313d41f19..eecc51a204a420f72f016d2eb5a96ac1f81e1289 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.80 2004/08/26 17:22:28 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.81 2004/08/27 17:07:42 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -69,15 +69,17 @@ typedef uint16 LOCKMETHODID;
 
 #define LockMethodIsValid(lockmethodid) ((lockmethodid) != INVALID_LOCKMETHOD)
 
+extern int NumLockMethods;
+
+
 /*
  * This is the control structure for a lock table. It lives in shared
  * memory.  Currently, none of these fields change after startup.  In addition
- * to the LockMethodData, a lock table has a "lockHash" table holding
- * per-locked-object lock information, and a "proclockHash" table holding
- * per-lock-holder/waiter lock information.
+ * to the LockMethodData, a lock table has a shared "lockHash" table holding
+ * per-locked-object lock information, and a shared "proclockHash" table
+ * holding per-lock-holder/waiter lock information.
  *
- * lockmethodid -- the handle used by the lock table's clients to
- *		refer to the type of lock table being used.
+ * masterLock -- LWLock used to synchronize access to the table
  *
  * numLockModes -- number of lock types (READ,WRITE,etc) that
  *		are defined on this lock table
@@ -85,16 +87,14 @@ typedef uint16 LOCKMETHODID;
  * conflictTab -- this is an array of bitmasks showing lock
  *		type conflicts. conflictTab[i] is a mask with the j-th bit
  *		turned on if lock types i and j conflict.
- *
- * masterLock -- LWLock used to synchronize access to the table
  */
 typedef struct LockMethodData
 {
-	LOCKMETHODID	lockmethodid;
+	LWLockId		masterLock;
 	int				numLockModes;
 	LOCKMASK		conflictTab[MAX_LOCKMODES];
-	LWLockId		masterLock;
 } LockMethodData;
+
 typedef LockMethodData *LockMethod;
 
 
@@ -113,8 +113,8 @@ typedef struct LOCKTAG
 	}			objId;
 
 	/*
-	 * offnum should be part of objId.tupleId above, but would increase
-	 * sizeof(LOCKTAG) and so moved here; currently used by userlocks
+	 * offnum should be part of objId union above, but doing that would
+	 * increase sizeof(LOCKTAG) due to padding.  Currently used by userlocks
 	 * only.
 	 */
 	OffsetNumber offnum;
@@ -129,7 +129,7 @@ typedef struct LOCKTAG
  * tag -- uniquely identifies the object being locked
  * grantMask -- bitmask for all lock types currently granted on this object.
  * waitMask -- bitmask for all lock types currently awaited on this object.
- * lockHolders -- list of PROCLOCK objects for this lock.
+ * procLocks -- list of PROCLOCK objects for this lock.
  * waitProcs -- queue of processes waiting for this lock.
  * requested -- count of each lock type currently requested on the lock
  *		(includes requests already granted!!).
@@ -145,7 +145,7 @@ typedef struct LOCK
 	/* data */
 	LOCKMASK	grantMask;		/* bitmask for lock types already granted */
 	LOCKMASK	waitMask;		/* bitmask for lock types awaited */
-	SHM_QUEUE	lockHolders;	/* list of PROCLOCK objects assoc. with
+	SHM_QUEUE	procLocks;		/* list of PROCLOCK objects assoc. with
 								 * lock */
 	PROC_QUEUE	waitProcs;		/* list of PGPROC objects waiting on lock */
 	int			requested[MAX_LOCKMODES];		/* counts of requested
@@ -168,7 +168,7 @@ typedef struct LOCK
  * proclock hashtable.	A PROCLOCKTAG value uniquely identifies the combination
  * of a lockable object and a holder/waiter for that object.
  *
- * There are two possible kinds of proclock tags: a transaction (identified
+ * There are two possible kinds of proclock owners: a transaction (identified
  * both by the PGPROC of the backend running it, and the xact's own ID) and
  * a session (identified by backend PGPROC, with XID = InvalidTransactionId).
  *
@@ -177,10 +177,10 @@ typedef struct LOCK
  * under several different XIDs at once (including session locks).  We treat
  * such locks as never conflicting (a backend can never block itself).
  *
- * The holding[] array counts the granted locks (of each type) represented
- * by this proclock. Note that there will be a proclock object, possibly with
- * zero holding[], for any lock that the process is currently waiting on.
- * Otherwise, proclock objects whose counts have gone to zero are recycled
+ * The holdMask field shows the already-granted locks represented by this
+ * proclock.  Note that there will be a proclock object, possibly with
+ * zero holdMask, for any lock that the process is currently waiting on.
+ * Otherwise, proclock objects whose holdMasks are zero are recycled
  * as soon as convenient.
  *
  * Each PROCLOCK object is linked into lists for both the associated LOCK
@@ -202,8 +202,7 @@ typedef struct PROCLOCK
 	PROCLOCKTAG tag;			/* unique identifier of proclock object */
 
 	/* data */
-	int			holding[MAX_LOCKMODES]; /* count of locks currently held */
-	int			nHolding;		/* total of holding[] array */
+	LOCKMASK	holdMask;		/* bitmask for lock types currently held */
 	SHM_QUEUE	lockLink;		/* list link for lock's list of proclocks */
 	SHM_QUEUE	procLink;		/* list link for process's list of
 								 * proclocks */
@@ -212,6 +211,49 @@ typedef struct PROCLOCK
 #define PROCLOCK_LOCKMETHOD(proclock) \
 		(((LOCK *) MAKE_PTR((proclock).tag.lock))->tag.lockmethodid)
 
+/*
+ * Each backend also maintains a local hash table with information about each
+ * lock it is currently interested in.  In particular the local table counts
+ * the number of times that lock has been acquired.  This allows multiple
+ * requests for the same lock to be executed without additional accesses to
+ * shared memory.  We also track the number of lock acquisitions per
+ * ResourceOwner, so that we can release just those locks belonging to a
+ * particular ResourceOwner.
+ */
+typedef struct LOCALLOCKTAG
+{
+	LOCKTAG		lock;			/* identifies the lockable object */
+	TransactionId xid;			/* xact ID, or InvalidTransactionId */
+	LOCKMODE	mode;			/* lock mode for this table entry */
+} LOCALLOCKTAG;
+
+typedef struct LOCALLOCKOWNER
+{
+	/*
+	 * Note: owner can be NULL to indicate a non-transactional lock.
+	 * Must use a forward struct reference to avoid circularity.
+	 */
+	struct ResourceOwnerData *owner;
+	int			nLocks;			/* # of times held by this owner */
+} LOCALLOCKOWNER;
+
+typedef struct LOCALLOCK
+{
+	/* tag */
+	LOCALLOCKTAG tag;			/* unique identifier of locallock entry */
+
+	/* data */
+	LOCK	   *lock;			/* associated LOCK object in shared mem */
+	PROCLOCK   *proclock;		/* associated PROCLOCK object in shmem */
+	int			nLocks;			/* total number of times lock is held */
+	int			numLockOwners;	/* # of relevant ResourceOwners */
+	int			maxLockOwners;	/* allocated size of array */
+	LOCALLOCKOWNER *lockOwners;	/* dynamically resizable array */
+} LOCALLOCK;
+
+#define LOCALLOCK_LOCKMETHOD(llock) ((llock).tag.lock.lockmethodid)
+
+
 /*
  * This struct holds information passed from lmgr internals to the lock
  * listing user-level functions (lockfuncs.c).	For each PROCLOCK in the
@@ -229,7 +271,6 @@ typedef struct
 	LOCK	   *locks;
 } LockData;
 
-extern int NumLockMethods;
 
 /*
  * function prototypes
@@ -244,13 +285,15 @@ extern bool LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 			TransactionId xid, LOCKMODE lockmode, bool dontWait);
 extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
 			TransactionId xid, LOCKMODE lockmode);
-extern bool LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc,
-						   bool allxids);
+extern bool LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids);
+extern void LockReleaseCurrentOwner(void);
+extern void LockReassignCurrentOwner(void);
 extern int LockCheckConflicts(LockMethod lockMethodTable,
 				   LOCKMODE lockmode,
 				   LOCK *lock, PROCLOCK *proclock, PGPROC *proc,
 				   int *myHolding);
 extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode);
+extern void GrantAwaitedLock(void);
 extern void RemoveFromWaitQueue(PGPROC *proc);
 extern int	LockShmemSize(int maxBackends);
 extern bool DeadLockCheck(PGPROC *proc);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 4f7f39003adc9df3d8cecc08aa92845e8154a175..9369aee4608a03671d06f1cb50500554e2b1e99f 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.72 2004/08/01 17:32:21 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.73 2004/08/27 17:07:42 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -78,14 +78,14 @@ struct PGPROC
 	struct PGPROC *lwWaitLink;	/* next waiter for same LW lock */
 
 	/* Info about lock the process is currently waiting for, if any. */
-	/* waitLock and waitHolder are NULL if not currently waiting. */
+	/* waitLock and waitProcLock are NULL if not currently waiting. */
 	LOCK	   *waitLock;		/* Lock object we're sleeping on ... */
-	PROCLOCK   *waitHolder;		/* Per-holder info for awaited lock */
+	PROCLOCK   *waitProcLock;	/* Per-holder info for awaited lock */
 	LOCKMODE	waitLockMode;	/* type of lock we're waiting for */
 	LOCKMASK	heldLocks;		/* bitmask for lock types already held on
 								 * this lock object by this backend */
 
-	SHM_QUEUE	procHolders;	/* list of PROCLOCK objects for locks held
+	SHM_QUEUE	procLocks;		/* list of PROCLOCK objects for locks held
 								 * or awaited by this backend */
 
 	struct XidCache	subxids;	/* cache for subtransaction XIDs */
diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h
index 6de270d2b0efeb9e2a3b8de883ae6e61e4fa409e..038ed910ce3c8c2ee8a6f41f12715262f36a70ab 100644
--- a/src/include/utils/resowner.h
+++ b/src/include/utils/resowner.h
@@ -12,7 +12,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/utils/resowner.h,v 1.1 2004/07/17 03:31:47 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/utils/resowner.h,v 1.2 2004/08/27 17:07:42 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -20,7 +20,6 @@
 #define RESOWNER_H
 
 #include "storage/buf.h"
-#include "storage/lock.h"
 #include "utils/catcache.h"
 #include "utils/rel.h"
 
@@ -76,6 +75,7 @@ extern void ResourceOwnerRelease(ResourceOwner owner,
 								 bool isCommit,
 								 bool isTopLevel);
 extern void ResourceOwnerDelete(ResourceOwner owner);
+extern ResourceOwner ResourceOwnerGetParent(ResourceOwner owner);
 extern void ResourceOwnerNewParent(ResourceOwner owner,
 								   ResourceOwner newparent);
 extern void RegisterResourceReleaseCallback(ResourceReleaseCallback callback,
@@ -88,17 +88,6 @@ extern void ResourceOwnerEnlargeBuffers(ResourceOwner owner);
 extern void ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer);
 extern void ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer);
 
-/* support for lock management */
-extern void ResourceOwnerEnlargeLocks(ResourceOwner owner);
-extern void ResourceOwnerRememberLock(ResourceOwner owner,
-									  LOCKTAG *locktag,
-									  TransactionId xid,
-									  LOCKMODE lockmode);
-extern void ResourceOwnerForgetLock(ResourceOwner owner,
-									LOCKTAG *locktag,
-									TransactionId xid,
-									LOCKMODE lockmode);
-
 /* support for catcache refcount management */
 extern void ResourceOwnerEnlargeCatCacheRefs(ResourceOwner owner);
 extern void ResourceOwnerRememberCatCacheRef(ResourceOwner owner,