diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 4b83c9c0d16ac3a0dc65f9d15843f49d43aa56ba..b35008a3bb8afc84d393eddbd4d8e3a07c7ade8e 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.32 1998/06/30 02:33:31 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.33 1998/08/25 21:20:26 scrappy Exp $
  *
  * NOTES
  *	  Outside modules can create a lock table and acquire/release
@@ -18,11 +18,9 @@
  *
  *	Interface:
  *
- *	LockAcquire(), LockRelease(), LockMethodTableInit().
- *
- *	LockReplace() is called only within this module and by the
- *		lkchain module.  It releases a lock without looking
- *		the lock up in the lock table.
+ *	LockAcquire(), LockRelease(), LockMethodTableInit(),
+ *	LockMethodTableRename(), LockReleaseAll, LockOwners()
+ *	LockResolveConflicts(), GrantLock()
  *
  *	NOTE: This module is used to define new lock tables.  The
  *		multi-level lock table (multi.c) used by the heap
@@ -35,6 +33,7 @@
 #include <string.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <signal.h>
 
 #include "postgres.h"
 #include "miscadmin.h"
@@ -49,25 +48,110 @@
 #include "utils/palloc.h"
 #include "access/xact.h"
 #include "access/transam.h"
+#include "utils/trace.h"
+#include "utils/ps_status.h"
 
-static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
-
-/*#define LOCK_MGR_DEBUG*/
-
-#ifndef LOCK_MGR_DEBUG
+static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
+					  TransactionId xid);
 
-#define LOCK_PRINT(where,tag,type)
-#define LOCK_DUMP(where,lock,type)
-#define LOCK_DUMP_AUX(where,lock,type)
+/*
+ * lockDebugRelation can be used to trace unconditionally a single relation,
+ * for example pg_listener, if you suspect there are locking problems.
+ *
+ * lockDebugOidMin is is used to avoid tracing postgres relations, which
+ * would produce a lot of output. Unfortunately most system relations are
+ * created after bootstrap and have oid greater than BootstrapObjectIdData.
+ * If you are using tprintf you should specify a value greater than the max
+ * oid of system relations, which can be found with the following query:
+ *
+ *   select max(int4in(int4out(oid))) from pg_class where relname ~ '^pg_';
+ *
+ * To get a useful lock trace you can use the following pg_options:
+ *
+ *   -T "verbose,query,locks,userlocks,lock_debug_oidmin=17500"
+ */
+#define LOCKDEBUG(lockmethod)	(pg_options[TRACE_SHORTLOCKS+lockmethod])
+#define lockDebugRelation		(pg_options[TRACE_LOCKRELATION])
+#define lockDebugOidMin			(pg_options[TRACE_LOCKOIDMIN])
+#define lockReadPriority		(pg_options[OPT_LOCKREADPRIORITY])
+
+#ifdef LOCK_MGR_DEBUG
+#define LOCK_PRINT(where,lock,type) \
+    if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
+	     && (lock->tag.relId >= lockDebugOidMin)) \
+	    || (lock->tag.relId == lockDebugRelation)) \
+	    LOCK_PRINT_AUX(where,lock,type)
+
+#define LOCK_PRINT_AUX(where,lock,type) \
+	TPRINTF(TRACE_ALL, \
+		 "%s: lock(%x) tbl(%d) rel(%d) db(%d) tid(%d,%d) mask(%x) " \
+		 "hold(%d,%d,%d,%d,%d)=%d " \
+		 "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
+		 where, \
+		 MAKE_OFFSET(lock), \
+		 lock->tag.lockmethod, \
+		 lock->tag.relId, \
+		 lock->tag.dbId, \
+		 ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+ \
+		  lock->tag.tupleId.ip_blkid.bi_lo), \
+		 lock->tag.tupleId.ip_posid, \
+		 lock->mask, \
+		 lock->holders[1], \
+		 lock->holders[2], \
+		 lock->holders[3], \
+		 lock->holders[4], \
+		 lock->holders[5], \
+		 lock->nHolding, \
+		 lock->activeHolders[1], \
+		 lock->activeHolders[2], \
+		 lock->activeHolders[3], \
+		 lock->activeHolders[4], \
+		 lock->activeHolders[5], \
+		 lock->nActive, \
+		 lock->waitProcs.size, \
+		 lock_types[type])
+
+#define XID_PRINT(where,xidentP) \
+	if (((LOCKDEBUG(XIDENT_LOCKMETHOD(*(xidentP))) >= 1) \
+	     && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
+			 >= lockDebugOidMin)) \
+	    || (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
+			== lockDebugRelation)) \
+	    XID_PRINT_AUX(where,xidentP)
+
+#define XID_PRINT_AUX(where,xidentP) \
+	TPRINTF(TRACE_ALL, \
+		 "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \
+		 "hold(%d,%d,%d,%d,%d)=%d", \
+		 where, \
+		 MAKE_OFFSET(xidentP), \
+		 xidentP->tag.lock, \
+		 XIDENT_LOCKMETHOD(*(xidentP)), \
+		 xidentP->tag.pid, \
+		 xidentP->tag.xid, \
+		 xidentP->holders[1], \
+		 xidentP->holders[2], \
+		 xidentP->holders[3], \
+		 xidentP->holders[4], \
+		 xidentP->holders[5], \
+		 xidentP->nHolding)
+
+#define LOCK_TPRINTF(lock, args...) \
+    if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
+	     && (lock->tag.relId >= lockDebugOidMin)) \
+	    || (lock->tag.relId == lockDebugRelation)) \
+	    TPRINTF(TRACE_ALL, args)
+
+#else	/* !LOCK_MGR_DEBUG */
+#define LOCK_PRINT(where,lock,type)
+#define LOCK_PRINT_AUX(where,lock,type)
 #define XID_PRINT(where,xidentP)
+#define XID_PRINT_AUX(where,xidentP)
+#define LOCK_TPRINTF(lock, args...)
+#endif	/* !LOCK_MGR_DEBUG */
 
-#else							/* LOCK_MGR_DEBUG */
-
-int			lockDebug = 0;
-unsigned int lock_debug_oid_min = BootstrapObjectIdData;
 static char *lock_types[] = {
-	"NONE",
+	"",
 	"WRITE",
 	"READ",
 	"WRITE INTENT",
@@ -75,59 +159,6 @@ static char *lock_types[] = {
 	"EXTEND"
 };
 
-#define LOCK_PRINT(where,tag,type)\
-	if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \
-		elog(DEBUG, \
-			 "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \
-			 MyProcPid,\
-			 tag->relId, tag->dbId, \
-			 ((tag->tupleId.ip_blkid.bi_hi<<16)+\
-			  tag->tupleId.ip_blkid.bi_lo),\
-			 tag->tupleId.ip_posid, \
-			 lock_types[type])
-
-#define LOCK_DUMP(where,lock,type)\
-	if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \
-		LOCK_DUMP_AUX(where,lock,type)
-
-#define LOCK_DUMP_AUX(where,lock,type)\
-		elog(DEBUG, \
-			 "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\
-			 "holders (%d,%d,%d,%d,%d) type (%s)",where, \
-			 MyProcPid,\
-			 lock->tag.relId, lock->tag.dbId, \
-			 ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\
-			  lock->tag.tupleId.ip_blkid.bi_lo),\
-			 lock->tag.tupleId.ip_posid, \
-			 lock->nHolding,\
-			 lock->holders[1],\
-			 lock->holders[2],\
-			 lock->holders[3],\
-			 lock->holders[4],\
-			 lock->holders[5],\
-			 lock_types[type])
-
-#define XID_PRINT(where,xidentP)\
-	if ((lockDebug >= 2) && \
-		(((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
-		 >= lock_debug_oid_min)) \
-		elog(DEBUG,\
-			 "%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\
-			 "holders (%d,%d,%d,%d,%d)",\
-			 where,\
-			 MyProcPid,\
-			 xidentP->tag.xid,\
-			 xidentP->tag.pid,\
-			 xidentP->tag.lock,\
-			 xidentP->nHolding,\
-			 xidentP->holders[1],\
-			 xidentP->holders[2],\
-			 xidentP->holders[3],\
-			 xidentP->holders[4],\
-			 xidentP->holders[5])
-
-#endif							/* LOCK_MGR_DEBUG */
-
 SPINLOCK	LockMgrLock;		/* in Shmem or created in
 								 * CreateSpinlocks() */
 
@@ -171,6 +202,16 @@ InitLocks()
 		BITS_ON[i] = bit;
 		BITS_OFF[i] = ~bit;
 	}
+
+#ifdef LOCK_MGR_DEBUG
+	/*
+	 * If lockDebugOidMin value has not been specified
+	 * in pg_options set a default value.
+	 */
+	if (!lockDebugOidMin) {
+		lockDebugOidMin = BootstrapObjectIdData;
+	}
+#endif
 }
 
 /* -------------------
@@ -234,7 +275,7 @@ LockMethodTableInit(char *tabName,
 	{
 		elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
 			 numModes, MAX_LOCKMODES);
-		return (INVALID_TABLEID);
+		return (INVALID_LOCKMETHOD);
 	}
 
 	/* allocate a string for the shmem index table lookup */
@@ -242,7 +283,7 @@ LockMethodTableInit(char *tabName,
 	if (!shmemName)
 	{
 		elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName);
-		return (INVALID_TABLEID);
+		return (INVALID_LOCKMETHOD);
 	}
 
 	/* each lock table has a non-shared header */
@@ -251,7 +292,7 @@ LockMethodTableInit(char *tabName,
 	{
 		elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName);
 		pfree(shmemName);
-		return (INVALID_TABLEID);
+		return (INVALID_LOCKMETHOD);
 	}
 
 	/* ------------------------
@@ -354,7 +395,7 @@ LockMethodTableInit(char *tabName,
 	if (status)
 		return (lockMethodTable->ctl->lockmethod);
 	else
-		return (INVALID_TABLEID);
+		return (INVALID_LOCKMETHOD);
 }
 
 /*
@@ -369,16 +410,16 @@ LockMethodTableInit(char *tabName,
  *		client to use different lockmethods when acquiring/releasing
  *		short term and long term locks.
  */
-#ifdef NOT_USED
+
 LOCKMETHOD
 LockMethodTableRename(LOCKMETHOD lockmethod)
 {
 	LOCKMETHOD newLockMethod;
 
 	if (NumLockMethods >= MAX_LOCK_METHODS)
-		return (INVALID_TABLEID);
-	if (LockMethodTable[lockmethod] == INVALID_TABLEID)
-		return (INVALID_TABLEID);
+		return (INVALID_LOCKMETHOD);
+	if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
+		return (INVALID_LOCKMETHOD);
 
 	/* other modules refer to the lock table by a lockmethod */
 	newLockMethod = NumLockMethods;
@@ -387,7 +428,6 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
 	LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
 	return (newLockMethod);
 }
-#endif
 
 /*
  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
@@ -398,8 +438,11 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  * Side Effects: The lock is always acquired.  No way to abort
  *		a lock acquisition other than aborting the transaction.
  *		Lock is recorded in the lkchain.
+ *
 #ifdef USER_LOCKS
+ *
  * Note on User Locks:
+ *
  *		User locks are handled totally on the application side as
  *		long term cooperative locks which extend beyond the normal
  *		transaction boundaries.  Their purpose is to indicate to an
@@ -421,24 +464,24 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  *		acquired if already held by another process.  They must be
  *		released explicitly by the application but they are released
  *		automatically when a backend terminates.
- *		They are indicated by a dummy lockmethod 0 which doesn't have
- *		any table allocated but uses the normal lock table, and are
- *		distinguished from normal locks for the following differences:
+ *		They are indicated by a lockmethod 2 which is an alias for the
+ *		normal lock table, and are distinguished from normal locks
+ *		for the following differences:
  *
  *										normal lock		user lock
  *
- *		lockmethod						1				0
+ *		lockmethod						1				2
  *		tag.relId						rel oid			0
  *		tag.ItemPointerData.ip_blkid	block id		lock id2
  *		tag.ItemPointerData.ip_posid	tuple offset	lock id1
  *		xid.pid							0				backend pid
- *		xid.xid							current xid		0
+ *		xid.xid							xid or 0		0
  *		persistence						transaction		user or backend
  *
  *		The lockmode parameter can have the same values for normal locks
  *		although probably only WRITE_LOCK can have some practical use.
  *
- *														DZ - 4 Oct 1996
+ *														DZ - 22 Nov 1997
 #endif
  */
 
@@ -453,25 +496,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	SPINLOCK	masterLock;
 	LOCKMETHODTABLE    *lockMethodTable;
 	int			status;
-	TransactionId myXid;
+	TransactionId xid;
 
 #ifdef USER_LOCKS
 	int			is_user_lock;
 
-	is_user_lock = (lockmethod == 0);
+	is_user_lock = (lockmethod == USER_LOCKMETHOD);
 	if (is_user_lock)
 	{
-		lockmethod = 1;
 #ifdef USER_LOCKS_DEBUG
-		elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d",
-			 locktag->tupleId.ip_posid,
-			 ((locktag->tupleId.ip_blkid.bi_hi << 16) +
-			  locktag->tupleId.ip_blkid.bi_lo),
-			 lockmode);
+		TPRINTF(TRACE_USERLOCKS, "LockAcquire: user lock [%u,%u] %s",
+				locktag->tupleId.ip_posid,
+				((locktag->tupleId.ip_blkid.bi_hi << 16) +
+				 locktag->tupleId.ip_blkid.bi_lo),
+				lock_types[lockmode]);
 #endif
 	}
 #endif
 
+	/* ???????? This must be changed when short term locks will be used */
+	locktag->lockmethod = lockmethod;
+
 	Assert(lockmethod < NumLockMethods);
 	lockMethodTable = LockMethodTable[lockmethod];
 	if (!lockMethodTable)
@@ -483,14 +528,16 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	if (LockingIsDisabled)
 		return (TRUE);
 
-	LOCK_PRINT("Acquire", locktag, lockmode);
 	masterLock = lockMethodTable->ctl->masterLock;
 
 	SpinAcquire(masterLock);
 
+	/*
+	 * Find or create a lock with this tag
+	 */
 	Assert(lockMethodTable->lockHash->hash == tag_hash);
-	lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_ENTER, &found);
-
+	lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+								HASH_ENTER, &found);
 	if (!lock)
 	{
 		SpinRelease(masterLock);
@@ -505,15 +552,19 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	if (!found)
 	{
 		lock->mask = 0;
-		ProcQueueInit(&(lock->waitProcs));
-		MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
-		MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
 		lock->nHolding = 0;
 		lock->nActive = 0;
-
+		MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
+		MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
+		ProcQueueInit(&(lock->waitProcs));
 		Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
 							 &(locktag->tupleId.ip_blkid)));
-
+		LOCK_PRINT("LockAcquire: new", lock, lockmode);
+	} else {
+		LOCK_PRINT("LockAcquire: found", lock, lockmode);
+		Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+		Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
+		Assert(lock->nActive <= lock->nHolding);
 	}
 
 	/* ------------------
@@ -522,7 +573,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	 * ------------------
 	 */
 	xidTable = lockMethodTable->xidHash;
-	myXid = GetCurrentTransactionId();
 
 	/* ------------------
 	 * Zero out all of the tag bytes (this clears the padding bytes for long
@@ -530,36 +580,48 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	 * ------------------
 	 */
 	MemSet(&item, 0, XID_TAGSIZE);		/* must clear padding, needed */
-	TransactionIdStore(myXid, &item.tag.xid);
 	item.tag.lock = MAKE_OFFSET(lock);
-#if 0
-	item.tag.pid = MyPid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+	item.tag.lockmethod = lockmethod;
 #endif
-
 #ifdef USER_LOCKS
 	if (is_user_lock)
 	{
 		item.tag.pid = MyProcPid;
-		item.tag.xid = myXid = 0;
-#ifdef USER_LOCKS_DEBUG
-		elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]",
-			 item.tag.lock, item.tag.pid, item.tag.xid);
-#endif
+		item.tag.xid = xid = 0;
+	} else {
+		xid = GetCurrentTransactionId();
+		TransactionIdStore(xid, &item.tag.xid);
 	}
+#else
+	xid = GetCurrentTransactionId();
+	TransactionIdStore(xid, &item.tag.xid);
 #endif
 
-	result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found);
+	/*
+	 * Find or create an xid entry with this tag
+	 */
+	result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+										  HASH_ENTER, &found);
 	if (!result)
 	{
 		elog(NOTICE, "LockAcquire: xid table corrupted");
 		return (STATUS_ERROR);
 	}
+
+	/*
+	 * If not found initialize the new entry
+	 */
 	if (!found)
 	{
-		XID_PRINT("LockAcquire: queueing XidEnt", result);
-		ProcAddLock(&result->queue);
 		result->nHolding = 0;
 		MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
+		ProcAddLock(&result->queue);
+		XID_PRINT("LockAcquire: new", result);
+	} else {
+		XID_PRINT("LockAcquire: found", result);
+		Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0));
+		Assert(result->nHolding <= lock->nActive);
 	}
 
 	/* ----------------
@@ -570,6 +632,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	 */
 	lock->nHolding++;
 	lock->holders[lockmode]++;
+	Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
 
 	/* --------------------
 	 * If I'm the only one holding a lock, then there
@@ -582,43 +645,59 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	{
 		result->holders[lockmode]++;
 		result->nHolding++;
+		XID_PRINT("LockAcquire: owning", result);
+		Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
 		GrantLock(lock, lockmode);
 		SpinRelease(masterLock);
 		return (TRUE);
 	}
 
-	Assert(result->nHolding <= lock->nActive);
-
-	status = LockResolveConflicts(lockmethod, lock, lockmode, myXid);
-
+	status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
 	if (status == STATUS_OK)
 		GrantLock(lock, lockmode);
 	else if (status == STATUS_FOUND)
 	{
 #ifdef USER_LOCKS
-
 		/*
-		 * User locks are non blocking. If we can't acquire a lock remove
-		 * the xid entry and return FALSE without waiting.
+		 * User locks are non blocking. If we can't acquire a lock we must
+		 * remove the xid entry and return FALSE without waiting.
 		 */
 		if (is_user_lock)
 		{
 			if (!result->nHolding)
 			{
 				SHMQueueDelete(&result->queue);
-				hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found);
+				result = (XIDLookupEnt *) hash_search(xidTable,
+													  (Pointer) &result,
+													  HASH_REMOVE, &found);
+				if (!result || !found) {
+					elog(NOTICE, "LockAcquire: remove xid, table corrupted");
+				}
+			} else {
+				XID_PRINT_AUX("LockAcquire: NHOLDING", result);
 			}
 			lock->nHolding--;
 			lock->holders[lockmode]--;
+			LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
+			Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+			Assert(lock->nActive <= lock->nHolding);
 			SpinRelease(masterLock);
-#ifdef USER_LOCKS_DEBUG
-			elog(NOTICE, "LockAcquire: user lock failed");
-#endif
 			return (FALSE);
 		}
 #endif
-		status = WaitOnLock(lockmethod, lock, lockmode);
-		XID_PRINT("Someone granted me the lock", result);
+		status = WaitOnLock(lockmethod, lock, lockmode, xid);
+		/*
+		 * Check the xid entry status, in case something in the
+		 * ipc communication doesn't work correctly.
+		 */
+		if (!((result->nHolding > 0) && (result->holders[lockmode] > 0))) {
+			XID_PRINT_AUX("LockAcquire: INCONSISTENT ", result);
+			LOCK_PRINT_AUX("LockAcquire: INCONSISTENT ", lock, lockmode);
+			/* Should we retry ? */
+			return (FALSE);
+		}
+		XID_PRINT("LockAcquire: granted", result);
+		LOCK_PRINT("LockAcquire: granted", lock, lockmode);
 	}
 
 	SpinRelease(masterLock);
@@ -646,7 +725,8 @@ int
 LockResolveConflicts(LOCKMETHOD lockmethod,
 					 LOCK *lock,
 					 LOCKMODE lockmode,
-					 TransactionId xid)
+					 TransactionId xid,
+					 XIDLookupEnt *xidentP)		/* xident ptr or NULL */
 {
 	XIDLookupEnt *result,
 				item;
@@ -657,44 +737,81 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
 	int			bitmask;
 	int			i,
 				tmpMask;
+#ifdef USER_LOCKS
+	int			is_user_lock;
+#endif
 
 	numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
 	xidTable = LockMethodTable[lockmethod]->xidHash;
 
-	/* ---------------------
-	 * read my own statistics from the xid table.  If there
-	 * isn't an entry, then we'll just add one.
-	 *
-	 * Zero out the tag, this clears the padding bytes for long
-	 * word alignment and ensures hashing consistency.
-	 * ------------------
-	 */
-	MemSet(&item, 0, XID_TAGSIZE);
-	TransactionIdStore(xid, &item.tag.xid);
-	item.tag.lock = MAKE_OFFSET(lock);
-#if 0
-	item.tag.pid = pid;
+	if (xidentP) {
+		/*
+		 * A pointer to the xid entry was supplied from the caller.
+		 * Actually only LockAcquire can do it.
+		 */
+		result = xidentP;
+	} else {
+		/* ---------------------
+		 * read my own statistics from the xid table.  If there
+		 * isn't an entry, then we'll just add one.
+		 *
+		 * Zero out the tag, this clears the padding bytes for long
+		 * word alignment and ensures hashing consistency.
+		 * ------------------
+		 */
+		MemSet(&item, 0, XID_TAGSIZE);
+		item.tag.lock = MAKE_OFFSET(lock);
+#ifdef USE_XIDTAG_LOCKMETHOD
+		item.tag.lockmethod = lockmethod;
+#endif
+#ifdef USER_LOCKS
+		is_user_lock = (lockmethod == 2);
+		if (is_user_lock) {
+			item.tag.pid = MyProcPid;
+			item.tag.xid = 0;
+		} else {
+			TransactionIdStore(xid, &item.tag.xid);
+		}
+#else
+		TransactionIdStore(xid, &item.tag.xid);
 #endif
 
-	if (!(result = (XIDLookupEnt *)
-		  hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found)))
-	{
-		elog(NOTICE, "LockResolveConflicts: xid table corrupted");
-		return (STATUS_ERROR);
-	}
-	myHolders = result->holders;
+		/*
+		 * Find or create an xid entry with this tag
+		 */
+		result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+											  HASH_ENTER, &found);
+		if (!result)
+		{
+			elog(NOTICE, "LockResolveConflicts: xid table corrupted");
+			return (STATUS_ERROR);
+		}
 
-	if (!found)
-	{
-		/* ---------------
-		 * we're not holding any type of lock yet.  Clear
-		 * the lock stats.
-		 * ---------------
+		/*
+		 * If not found initialize the new entry. THIS SHOULD NEVER HAPPEN,
+		 * if we are trying to resolve a conflict we must already have
+		 * allocated an xid entry for this lock.	dz 21-11-1997
 		 */
-		MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
-		result->nHolding = 0;
+		if (!found)
+		{
+			/* ---------------
+			 * we're not holding any type of lock yet.  Clear
+			 * the lock stats.
+			 * ---------------
+			 */
+			MemSet(result->holders,0, numLockModes * sizeof(*(lock->holders)));
+			result->nHolding = 0;
+			XID_PRINT_AUX("LockResolveConflicts: NOT FOUND", result);
+		} else {
+			XID_PRINT("LockResolveConflicts: found", result);
+		}
 	}
+	Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
 
+	/*
+	 * We can control runtime this option. Default is lockReadPriority=0
+	 */
+	if (!lockReadPriority)
 	{
 		/* ------------------------
 		 * If someone with a greater priority is waiting for the lock,
@@ -705,8 +822,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
 		PROC_QUEUE *waitQueue = &(lock->waitProcs);
 		PROC	   *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
 
-		if (waitQueue->size && topproc->prio > myprio)
+		if (waitQueue->size && topproc->prio > myprio) {
+			XID_PRINT("LockResolveConflicts: higher priority proc waiting",
+					  result);
 			return STATUS_FOUND;
+		}
 	}
 
 	/* ----------------------------
@@ -721,12 +841,10 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
 	 */
 	if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
 	{
-
 		result->holders[lockmode]++;
 		result->nHolding++;
-
-		XID_PRINT("Conflict Resolved: updated xid entry stats", result);
-
+		XID_PRINT("LockResolveConflicts: no conflict", result);
+		Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
 		return (STATUS_OK);
 	}
 
@@ -736,6 +854,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
 	 * that does not reflect our own locks.
 	 * ------------------------
 	 */
+	myHolders = result->holders;
 	bitmask = 0;
 	tmpMask = 2;
 	for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
@@ -753,27 +872,42 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
 	 */
 	if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
 	{
-
 		/* no conflict. Get the lock and go on */
-
 		result->holders[lockmode]++;
 		result->nHolding++;
-
-		XID_PRINT("Conflict Resolved: updated xid entry stats", result);
-
+		XID_PRINT("LockResolveConflicts: resolved", result);
+		Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
 		return (STATUS_OK);
-
 	}
 
+	XID_PRINT("LockResolveConflicts: conflicting", result);
 	return (STATUS_FOUND);
 }
 
+/*
+ * GrantLock -- update the lock data structure to show
+ *		the new lock holder.
+ */
+void
+GrantLock(LOCK *lock, LOCKMODE lockmode)
+{
+	lock->nActive++;
+	lock->activeHolders[lockmode]++;
+	lock->mask |= BITS_ON[lockmode];
+	LOCK_PRINT("GrantLock", lock, lockmode);
+	Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
+	Assert(lock->nActive <= lock->nHolding);
+}
+
 static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
+WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
+		   TransactionId xid)
 {
 	PROC_QUEUE *waitQueue = &(lock->waitProcs);
 	LOCKMETHODTABLE    *lockMethodTable = LockMethodTable[lockmethod];
 	int			prio = lockMethodTable->ctl->prio[lockmode];
+	char		old_status[64],
+				new_status[64];
 
 	Assert(lockmethod < NumLockMethods);
 
@@ -785,27 +919,38 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
 	 * synchronization for this queue.	That will not be true if/when
 	 * people can be deleted from the queue by a SIGINT or something.
 	 */
-	LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
+	LOCK_PRINT_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
+	strcpy(old_status, PS_STATUS);
+	strcpy(new_status, PS_STATUS);
+	strcat(new_status, " waiting");
+	PS_SET_STATUS(new_status);
 	if (ProcSleep(waitQueue,
 				  lockMethodTable->ctl->masterLock,
 				  lockmode,
 				  prio,
-				  lock) != NO_ERROR)
+				  lock,
+				  xid) != NO_ERROR)
 	{
 		/* -------------------
-		 * This could have happend as a result of a deadlock, see HandleDeadLock()
-		 * Decrement the lock nHolding and holders fields as we are no longer
-		 * waiting on this lock.
+		 * This could have happend as a result of a deadlock,
+		 * see HandleDeadLock().
+		 * Decrement the lock nHolding and holders fields as
+		 * we are no longer waiting on this lock.
 		 * -------------------
 		 */
 		lock->nHolding--;
 		lock->holders[lockmode]--;
-		LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockmode);
+		LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
+		Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
+		Assert(lock->nActive <= lock->nHolding);
 		SpinRelease(lockMethodTable->ctl->masterLock);
 		elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
+
+		/* not reached */
 	}
 
-	LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
+	PS_SET_STATUS(old_status);
+	LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
 	return (STATUS_OK);
 }
 
@@ -829,25 +974,34 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	XIDLookupEnt *result,
 				item;
 	HTAB	   *xidTable;
+	TransactionId xid;
 	bool		wakeupNeeded = true;
+	int			trace_flag;
 
 #ifdef USER_LOCKS
 	int			is_user_lock;
 
-	is_user_lock = (lockmethod == 0);
+	is_user_lock = (lockmethod == USER_LOCKMETHOD);
 	if (is_user_lock)
 	{
-		lockmethod = 1;
-#ifdef USER_LOCKS_DEBUG
-		elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d",
-			 locktag->tupleId.ip_posid,
-			 ((locktag->tupleId.ip_blkid.bi_hi << 16) +
-			  locktag->tupleId.ip_blkid.bi_lo),
-			 lockmode);
-#endif
+		TPRINTF(TRACE_USERLOCKS, "LockRelease: user lock tag [%u,%u] %d",
+				locktag->tupleId.ip_posid,
+				((locktag->tupleId.ip_blkid.bi_hi << 16) +
+				 locktag->tupleId.ip_blkid.bi_lo),
+				lockmode);
 	}
 #endif
 
+	/* ???????? This must be changed when short term locks will be used */
+	locktag->lockmethod = lockmethod;
+
+#ifdef USER_LOCKS
+	trace_flag = \
+		(lockmethod == USER_LOCKMETHOD) ? TRACE_USERLOCKS : TRACE_LOCKS;
+#else
+	trace_flag = TRACE_LOCKS;
+#endif
+
 	Assert(lockmethod < NumLockMethods);
 	lockMethodTable = LockMethodTable[lockmethod];
 	if (!lockMethodTable)
@@ -859,34 +1013,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	if (LockingIsDisabled)
 		return (TRUE);
 
-	LOCK_PRINT("Release", locktag, lockmode);
-
 	masterLock = lockMethodTable->ctl->masterLock;
-	xidTable = lockMethodTable->xidHash;
-
 	SpinAcquire(masterLock);
 
-	Assert(lockMethodTable->lockHash->hash == tag_hash);
-	lock = (LOCK *)
-		hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_FIND_SAVE, &found);
-
-#ifdef USER_LOCKS
-
 	/*
-	 * If the entry is not found hash_search returns TRUE instead of NULL,
-	 * so we must check it explicitly.
+	 * Find a lock with this tag
 	 */
-	if ((is_user_lock) && (lock == (LOCK *) TRUE))
-	{
-		SpinRelease(masterLock);
-		elog(NOTICE, "LockRelease: there are no locks with this tag");
-		return (FALSE);
-	}
-#endif
+	Assert(lockMethodTable->lockHash->hash == tag_hash);
+	lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+								HASH_FIND, &found);
 
 	/*
-	 * let the caller print its own error message, too. Do not
-	 * elog(ERROR).
+	 * let the caller print its own error message, too. Do not elog(ERROR).
 	 */
 	if (!lock)
 	{
@@ -898,52 +1036,20 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	if (!found)
 	{
 		SpinRelease(masterLock);
+#ifdef USER_LOCKS
+		if (is_user_lock) {
+			TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
+			return (FALSE);
+		}
+#endif
 		elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
 		return (FALSE);
 	}
+	LOCK_PRINT("LockRelease: found", lock, lockmode);
+	Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+	Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
+	Assert(lock->nActive <= lock->nHolding);
 
-	Assert(lock->nHolding > 0);
-
-#ifdef USER_LOCKS
-
-	/*
-	 * If this is an user lock it can be removed only after checking that
-	 * it was acquired by the current process, so this code is skipped and
-	 * executed later.
-	 */
-	if (!is_user_lock)
-	{
-#endif
-
-		/*
-		 * fix the general lock stats
-		 */
-		lock->nHolding--;
-		lock->holders[lockmode]--;
-		lock->nActive--;
-		lock->activeHolders[lockmode]--;
-
-		Assert(lock->nActive >= 0);
-
-		if (!lock->nHolding)
-		{
-			/* ------------------
-			 * if there's no one waiting in the queue,
-			 * we just released the last lock.
-			 * Delete it from the lock table.
-			 * ------------------
-			 */
-			Assert(lockMethodTable->lockHash->hash == tag_hash);
-			lock = (LOCK *) hash_search(lockMethodTable->lockHash,
-										(Pointer) &(lock->tag),
-										HASH_REMOVE_SAVED,
-										&found);
-			Assert(lock && found);
-			wakeupNeeded = false;
-		}
-#ifdef USER_LOCKS
-	}
-#endif
 
 	/* ------------------
 	 * Zero out all of the tag bytes (this clears the padding bytes for long
@@ -951,42 +1057,102 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	 * ------------------
 	 */
 	MemSet(&item, 0, XID_TAGSIZE);
-
-	TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
 	item.tag.lock = MAKE_OFFSET(lock);
-#if 0
-	item.tag.pid = MyPid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+	item.tag.lockmethod = lockmethod;
 #endif
-
 #ifdef USER_LOCKS
-	if (is_user_lock)
-	{
+	if (is_user_lock) {
 		item.tag.pid = MyProcPid;
-		item.tag.xid = 0;
-#ifdef USER_LOCKS_DEBUG
-		elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]",
-			 item.tag.lock, item.tag.pid, item.tag.xid);
-#endif
+		item.tag.xid = xid = 0;
+	} else {
+		xid = GetCurrentTransactionId();
+		TransactionIdStore(xid, &item.tag.xid);
 	}
+#else
+	xid = GetCurrentTransactionId();
+	TransactionIdStore(xid, &item.tag.xid);
 #endif
 
-	if (!(result = (XIDLookupEnt *) hash_search(xidTable,
-												(Pointer) &item,
-												HASH_FIND_SAVE,
-												&found))
-		|| !found)
+	/*
+	 * Find an xid entry with this tag
+	 */
+	xidTable = lockMethodTable->xidHash;
+	result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+										  HASH_FIND_SAVE, &found);
+	if (!result || !found)
 	{
 		SpinRelease(masterLock);
 #ifdef USER_LOCKS
-		if ((is_user_lock) && (result))
-			elog(NOTICE, "LockRelease: you don't have a lock on this tag");
-		else
-			elog(NOTICE, "LockRelease: find xid, table corrupted");
-#else
-		elog(NOTICE, "LockReplace: xid table corrupted");
+		if (!found && is_user_lock) {
+			TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
+		} else
 #endif
+		elog(NOTICE, "LockReplace: xid table corrupted");
 		return (FALSE);
 	}
+	XID_PRINT("LockRelease: found", result);
+	Assert(result->tag.lock == MAKE_OFFSET(lock));
+
+	/*
+	 * Check that we are actually holding a lock of the type we want
+	 * to release.
+	 */
+	if (!(result->holders[lockmode] > 0)) {
+		SpinRelease(masterLock);
+		XID_PRINT_AUX("LockAcquire: WRONGTYPE", result);
+		elog(NOTICE, "LockRelease: you don't own a lock of type %s",
+			 lock_types[lockmode]);
+		Assert(result->holders[lockmode] >= 0);
+		return (FALSE);
+	}
+	Assert(result->nHolding > 0);
+
+	/*
+	 * fix the general lock stats
+	 */
+	lock->nHolding--;
+	lock->holders[lockmode]--;
+	lock->nActive--;
+    lock->activeHolders[lockmode]--;
+
+	/* --------------------------
+	 * If there are still active locks of the type I just released, no one
+	 * should be woken up.	Whoever is asleep will still conflict
+	 * with the remaining locks.
+	 * --------------------------
+	 */
+	if (lock->activeHolders[lockmode])
+	{
+		wakeupNeeded = false;
+	}
+	else
+	{
+		/* change the conflict mask.  No more of this lock type. */
+		lock->mask &= BITS_OFF[lockmode];
+	}
+
+	LOCK_PRINT("LockRelease: updated", lock, lockmode);
+	Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
+	Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
+	Assert(lock->nActive <= lock->nHolding);
+
+	if (!lock->nHolding)
+	{
+		/* ------------------
+		 * if there's no one waiting in the queue,
+		 * we just released the last lock.
+		 * Delete it from the lock table.
+		 * ------------------
+		 */
+		Assert(lockMethodTable->lockHash->hash == tag_hash);
+		lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+									(Pointer) &(lock->tag),
+									HASH_REMOVE,
+									&found);
+		Assert(lock && found);
+		wakeupNeeded = false;
+	}
 
 	/*
 	 * now check to see if I have any private locks.  If I do, decrement
@@ -994,8 +1160,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	 */
 	result->holders[lockmode]--;
 	result->nHolding--;
-
-	XID_PRINT("LockRelease updated xid stats", result);
+	XID_PRINT("LockRelease: updated", result);
+	Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
 
 	/*
 	 * If this was my last hold on this lock, delete my entry in the XID
@@ -1003,78 +1169,25 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 	 */
 	if (!result->nHolding)
 	{
-#ifdef USER_LOCKS
-		if (result->queue.prev == INVALID_OFFSET)
+		if (result->queue.prev == INVALID_OFFSET) {
 			elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
-		if (result->queue.next == INVALID_OFFSET)
+		}
+		if (result->queue.next == INVALID_OFFSET) {
 			elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
-#endif
+		}
 		if (result->queue.next != INVALID_OFFSET)
 			SHMQueueDelete(&result->queue);
-		if (!(result = (XIDLookupEnt *)
-			  hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) ||
-			!found)
+		XID_PRINT("LockRelease: deleting", result);
+		result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result,
+											  HASH_REMOVE_SAVED, &found);
+		if (!result || !found)
 		{
 			SpinRelease(masterLock);
-#ifdef USER_LOCKS
 			elog(NOTICE, "LockRelease: remove xid, table corrupted");
-#else
-			elog(NOTICE, "LockReplace: xid table corrupted");
-#endif
 			return (FALSE);
 		}
 	}
 
-#ifdef USER_LOCKS
-
-	/*
-	 * If this is an user lock remove it now, after the corresponding xid
-	 * entry has been found and deleted.
-	 */
-	if (is_user_lock)
-	{
-
-		/*
-		 * fix the general lock stats
-		 */
-		lock->nHolding--;
-		lock->holders[lockmode]--;
-		lock->nActive--;
-		lock->activeHolders[lockmode]--;
-
-		Assert(lock->nActive >= 0);
-
-		if (!lock->nHolding)
-		{
-			/* ------------------
-			 * if there's no one waiting in the queue,
-			 * we just released the last lock.
-			 * Delete it from the lock table.
-			 * ------------------
-			 */
-			Assert(lockMethodTable->lockHash->hash == tag_hash);
-			lock = (LOCK *) hash_search(lockMethodTable->lockHash,
-										(Pointer) &(lock->tag),
-										HASH_REMOVE,
-										&found);
-			Assert(lock && found);
-			wakeupNeeded = false;
-		}
-	}
-#endif
-
-	/* --------------------------
-	 * If there are still active locks of the type I just released, no one
-	 * should be woken up.	Whoever is asleep will still conflict
-	 * with the remaining locks.
-	 * --------------------------
-	 */
-	if (!(lock->activeHolders[lockmode]))
-	{
-		/* change the conflict mask.  No more of this lock type. */
-		lock->mask &= BITS_OFF[lockmode];
-	}
-
 	if (wakeupNeeded)
 	{
 		/* --------------------------
@@ -1085,35 +1198,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 		 */
 		ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
 	}
+	else
+	{
+		LOCK_TPRINTF(lock, "LockRelease: no wakeup needed");
+	}
 
 	SpinRelease(masterLock);
 	return (TRUE);
 }
 
-/*
- * GrantLock -- update the lock data structure to show
- *		the new lock holder.
- */
-void
-GrantLock(LOCK *lock, LOCKMODE lockmode)
-{
-	lock->nActive++;
-	lock->activeHolders[lockmode]++;
-	lock->mask |= BITS_ON[lockmode];
-}
-
-#ifdef USER_LOCKS
 /*
  * LockReleaseAll -- Release all locks in a process lock queue.
- *
- * Note: This code is a little complicated by the presence in the
- *		 same queue of user locks which can't be removed from the
- *		 normal lock queue at the end of a transaction. They must
- *		 however be removed when the backend exits.
- *		 A dummy lockmethod 0 is used to indicate that we are releasing
- *		 the user locks, from the code added to ProcKill().
  */
-#endif
 bool
 LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 {
@@ -1121,6 +1217,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 	int			done;
 	XIDLookupEnt *xidLook = NULL;
 	XIDLookupEnt *tmp = NULL;
+	XIDLookupEnt *result;
 	SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
 	SPINLOCK	masterLock;
 	LOCKMETHODTABLE    *lockMethodTable;
@@ -1128,45 +1225,52 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 				numLockModes;
 	LOCK	   *lock;
 	bool		found;
+	int			trace_flag;
+	int			xidtag_lockmethod;
 
 #ifdef USER_LOCKS
 	int			is_user_lock_table,
 				count,
-				nskip;
+				nleft;
 
-	is_user_lock_table = (lockmethod == 0);
-#ifdef USER_LOCKS_DEBUG
-	elog(NOTICE, "LockReleaseAll: lockmethod=%d, pid=%d", lockmethod, MyProcPid);
-#endif
-	if (is_user_lock_table)
-		lockmethod = 1;
+	count = nleft = 0;
+
+	is_user_lock_table = (lockmethod == USER_LOCKMETHOD);
+	trace_flag = (lockmethod == 2) ? TRACE_USERLOCKS : TRACE_LOCKS;
+#else
+	trace_flag = TRACE_LOCKS;
 #endif
+	TPRINTF(trace_flag, "LockReleaseAll: lockmethod=%d, pid=%d",
+			lockmethod, MyProcPid);
 
 	Assert(lockmethod < NumLockMethods);
 	lockMethodTable = LockMethodTable[lockmethod];
-	if (!lockMethodTable)
+	if (!lockMethodTable) {
+		elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod);
 		return (FALSE);
-
-	numLockModes = lockMethodTable->ctl->numLockModes;
-	masterLock = lockMethodTable->ctl->masterLock;
+	}
 
 	if (SHMQueueEmpty(lockQueue))
 		return TRUE;
 
-#ifdef USER_LOCKS
+	numLockModes = lockMethodTable->ctl->numLockModes;
+	masterLock = lockMethodTable->ctl->masterLock;
+
 	SpinAcquire(masterLock);
-#endif
 	SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
 
-	XID_PRINT("LockReleaseAll", xidLook);
-
-#ifndef USER_LOCKS
-	SpinAcquire(masterLock);
-#else
-	count = nskip = 0;
-#endif
 	for (;;)
 	{
+		/*
+		 * Sometimes the queue appears to be messed up.
+		 */
+		if (count++ > 1000)
+		{
+			elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
+			nleft = 0;
+			break;
+		}
+
 		/* ---------------------------
 		 * XXX Here we assume the shared memory queue is circular and
 		 * that we know its internal structure.  Should have some sort of
@@ -1176,72 +1280,73 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 		done = (xidLook->queue.next == end);
 		lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
-		LOCK_PRINT("LockReleaseAll", (&lock->tag), 0);
-
-#ifdef USER_LOCKS
+		xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook);
+		if ((xidtag_lockmethod == lockmethod) || (trace_flag >= 2)) {
+			XID_PRINT("LockReleaseAll", xidLook);
+			LOCK_PRINT("LockReleaseAll", lock, 0);
+		}
 
-		/*
-		 * Sometimes the queue appears to be messed up.
-		 */
-		if (count++ > 2000)
-		{
-			elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
-			nskip = 0;
-			break;
+#ifdef USE_XIDTAG_LOCKMETHOD
+		if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock))
+			elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d",
+				 xidtag_lockmethod, lock->tag.lockmethod);
+#endif
+		if ((xidtag_lockmethod != lockmethod) && (trace_flag >= 2)) {
+			TPRINTF(trace_flag, "LockReleaseAll: skipping other table");
+			nleft++;
+			goto next_item;
 		}
+
+		Assert(lock->nHolding > 0);
+		Assert(lock->nActive > 0);
+		Assert(lock->nActive <= lock->nHolding);
+		Assert(xidLook->nHolding >= 0);
+		Assert(xidLook->nHolding <= lock->nHolding);
+
+#ifdef USER_LOCKS
 		if (is_user_lock_table)
 		{
 			if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
 			{
-#ifdef USER_LOCKS_DEBUG
-				elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]",
-				  xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
-				nskip++;
+				TPRINTF(TRACE_USERLOCKS,
+						"LockReleaseAll: skiping normal lock [%d,%d,%d]",
+						xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+				nleft++;
 				goto next_item;
 			}
 			if (xidLook->tag.pid != MyProcPid)
 			{
-				/* This should never happen */
-#ifdef USER_LOCKS_DEBUG
+				/* Should never happen */
 				elog(NOTICE,
-					 "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
+					 "LockReleaseAll: INVALID PID: [%u,%u] [%d,%d,%d]",
 					 lock->tag.tupleId.ip_posid,
 					 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
 					  lock->tag.tupleId.ip_blkid.bi_lo),
-				  xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
-				nskip++;
+					 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+				nleft++;
 				goto next_item;
 			}
-#ifdef USER_LOCKS_DEBUG
-			elog(NOTICE,
-				 "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
-				 lock->tag.tupleId.ip_posid,
-				 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
-				  lock->tag.tupleId.ip_blkid.bi_lo),
-				 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
+			TPRINTF(TRACE_USERLOCKS,
+					"LockReleaseAll: releasing user lock [%u,%u] [%d,%d,%d]",
+					lock->tag.tupleId.ip_posid,
+					((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
+					 lock->tag.tupleId.ip_blkid.bi_lo),
+					xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
 		}
 		else
 		{
-			if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0))
+			/* Can't check xidLook->tag.xid, can be 0 also for normal locks */
+			if (xidLook->tag.pid != 0)
 			{
-#ifdef USER_LOCKS_DEBUG
-				elog(NOTICE,
-					 "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
-					 lock->tag.tupleId.ip_posid,
-					 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
-					  lock->tag.tupleId.ip_blkid.bi_lo),
-				  xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
-				nskip++;
+				TPRINTF(TRACE_LOCKS,
+						"LockReleaseAll: skiping user lock [%u,%u] [%d,%d,%d]",
+						lock->tag.tupleId.ip_posid,
+						((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
+						 lock->tag.tupleId.ip_blkid.bi_lo),
+						xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+				nleft++;
 				goto next_item;
 			}
-#ifdef USER_LOCKS_DEBUG
-			elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]",
-				 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
 		}
 #endif
 
@@ -1251,16 +1356,20 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 		 */
 		if (lock->nHolding != xidLook->nHolding)
 		{
-			lock->nHolding -= xidLook->nHolding;
-			lock->nActive -= xidLook->nHolding;
-			Assert(lock->nActive >= 0);
 			for (i = 1; i <= numLockModes; i++)
 			{
+				Assert(xidLook->holders[i] >= 0);
 				lock->holders[i] -= xidLook->holders[i];
 				lock->activeHolders[i] -= xidLook->holders[i];
+				Assert((lock->holders[i] >= 0) \
+					   && (lock->activeHolders[i] >= 0));
 				if (!lock->activeHolders[i])
 					lock->mask &= BITS_OFF[i];
 			}
+			lock->nHolding -= xidLook->nHolding;
+			lock->nActive -= xidLook->nHolding;
+			Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
+			Assert(lock->nActive <= lock->nHolding);
 		}
 		else
 		{
@@ -1270,16 +1379,30 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 			 * --------------
 			 */
 			lock->nHolding = 0;
+			/* Fix the lock status, just for next LOCK_PRINT message. */
+			for (i=1; i<=numLockModes; i++) {
+				Assert(lock->holders[i] == lock->activeHolders[i]);
+				lock->holders[i] = lock->activeHolders[i] = 0;
+			}
 		}
+		LOCK_PRINT("LockReleaseAll: updated", lock, 0);
+
+		/*
+		 * Remove the xid from the process lock queue
+		 */
+		SHMQueueDelete(&xidLook->queue);
+
 		/* ----------------
 		 * always remove the xidLookup entry, we're done with it now
 		 * ----------------
 		 */
-#ifdef USER_LOCKS
-		SHMQueueDelete(&xidLook->queue);
-#endif
-		if ((!hash_search(lockMethodTable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found))
-			|| !found)
+
+		XID_PRINT("LockReleaseAll: deleting", xidLook);
+		result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
+											  (Pointer) xidLook, 
+											  HASH_REMOVE,
+											  &found);
+		if (!result || !found)
 		{
 			SpinRelease(masterLock);
 			elog(NOTICE, "LockReleaseAll: xid table corrupted");
@@ -1293,10 +1416,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 			 * the last lock.
 			 * --------------------
 			 */
-
+			LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
 			Assert(lockMethodTable->lockHash->hash == tag_hash);
-			lock = (LOCK *)
-				hash_search(lockMethodTable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found);
+			lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+										(Pointer) &(lock->tag),
+										HASH_REMOVE, &found);
 			if ((!lock) || (!found))
 			{
 				SpinRelease(masterLock);
@@ -1324,15 +1448,18 @@ next_item:
 		SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
 		xidLook = tmp;
 	}
-	SpinRelease(masterLock);
-#ifdef USER_LOCKS
 
 	/*
 	 * Reinitialize the queue only if nothing has been left in.
 	 */
-	if (nskip == 0)
-#endif
+	if (nleft == 0) {
+		TPRINTF(trace_flag, "LockReleaseAll: reinitializing lockQueue");
 		SHMQueueInit(lockQueue);
+	}
+
+	SpinRelease(masterLock);
+	TPRINTF(trace_flag, "LockReleaseAll: done");
+
 	return TRUE;
 }
 
@@ -1420,7 +1547,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
 		checked_procs[0] = MyProc;
 		nprocs = 1;
 
-		lockMethodTable = LockMethodTable[1];
+		lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD];
 		xidTable = lockMethodTable->xidHash;
 
 		MemSet(&item, 0, XID_TAGSIZE);
@@ -1456,7 +1583,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
 		done = (xidLook->queue.next == end);
 		lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
-		LOCK_PRINT("DeadLockCheck", (&lock->tag), 0);
+		LOCK_PRINT("DeadLockCheck", lock, 0);
 
 		/*
 		 * This is our only check to see if we found the lock we want.
@@ -1560,9 +1687,190 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
 	return false;
 }
 
+/*
+ * Return an array with the pids of all processes owning a lock.
+ * This works only for user locks because normal locks have no
+ * pid information in the corresponding XIDLookupEnt.
+ */
+ArrayType *
+LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag)
+{
+	XIDLookupEnt *xidLook = NULL;
+	SPINLOCK	masterLock;
+	LOCK		*lock;
+	SHMEM_OFFSET lock_offset;
+	int 		count = 0;
+	LOCKMETHODTABLE    *lockMethodTable;
+	HTAB 		*xidTable;
+	bool		found;
+	int			ndims,
+				nitems,
+				hdrlen,
+				size;
+	int			lbounds[1],
+				hbounds[1];
+	ArrayType	*array;
+	int			*data_ptr;
+
+	/* Assume that no one will modify the result */
+	static int	empty_array[] = { 20, 1, 0, 0, 0 };
+
+#ifdef USER_LOCKS
+	int			is_user_lock;
+
+	is_user_lock = (lockmethod == USER_LOCKMETHOD);
+	if (is_user_lock)
+	{
+		TPRINTF(TRACE_USERLOCKS, "LockOwners: user lock tag [%u,%u]",
+				locktag->tupleId.ip_posid,
+				((locktag->tupleId.ip_blkid.bi_hi << 16) +
+				 locktag->tupleId.ip_blkid.bi_lo));
+	}
+#endif
+
+	/* This must be changed when short term locks will be used */
+	locktag->lockmethod = lockmethod;
+
+	Assert((lockmethod >= MIN_LOCKMETHOD) && (lockmethod < NumLockMethods));
+	lockMethodTable = LockMethodTable[lockmethod];
+	if (!lockMethodTable)
+	{
+		elog(NOTICE, "lockMethodTable is null in LockOwners");
+		return ((ArrayType *) &empty_array);
+	}
+
+	if (LockingIsDisabled)
+	{
+		return ((ArrayType *) &empty_array);
+	}
+
+	masterLock = lockMethodTable->ctl->masterLock;
+	SpinAcquire(masterLock);
+
+	/*
+	 * Find a lock with this tag
+	 */
+	Assert(lockMethodTable->lockHash->hash == tag_hash);
+	lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+								HASH_FIND, &found);
+
+	/*
+	 * let the caller print its own error message, too. Do not elog(WARN).
+	 */
+	if (!lock)
+	{
+		SpinRelease(masterLock);
+		elog(NOTICE, "LockOwners: locktable corrupted");
+		return ((ArrayType *) &empty_array);
+	}
+
+	if (!found)
+	{
+		SpinRelease(masterLock);
+#ifdef USER_LOCKS
+		if (is_user_lock) {
+			TPRINTF(TRACE_USERLOCKS, "LockOwners: no lock with this tag");
+			return ((ArrayType *) &empty_array);
+		}
+#endif
+		elog(NOTICE, "LockOwners: locktable lookup failed, no lock");
+		return ((ArrayType *) &empty_array);
+	}
+	LOCK_PRINT("LockOwners: found", lock, 0);
+	Assert((lock->nHolding > 0) && (lock->nActive > 0));
+	Assert(lock->nActive <= lock->nHolding);
+	lock_offset = MAKE_OFFSET(lock);
+
+	/* Construct a 1-dimensional array */
+	ndims = 1;
+	hdrlen = ARR_OVERHEAD(ndims);
+	lbounds[0] = 0;
+	hbounds[0] = lock->nActive;
+	size = hdrlen + sizeof(int) * hbounds[0];
+	array = (ArrayType *) palloc(size);
+	MemSet(array, 0, size);
+	memmove((char *) array, (char *) &size, sizeof(int));
+	memmove((char *) ARR_NDIM_PTR(array), (char *) &ndims, sizeof(int));
+	memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
+	memmove((char *) ARR_LBOUND(array), (char *) lbounds, ndims * sizeof(int));
+	SET_LO_FLAG(false, array);
+	data_ptr = (int *) ARR_DATA_PTR(array);
+
+	xidTable = lockMethodTable->xidHash;
+	hash_seq(NULL);
+	nitems = 0;
+	while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) &&
+		   (xidLook != (XIDLookupEnt *)TRUE)) {
+		if (count++ > 1000) {
+			elog(NOTICE,"LockOwners: possible loop, giving up");
+			break;
+		}
+
+		if (xidLook->tag.pid == 0) {
+			XID_PRINT("LockOwners: no pid", xidLook);
+			continue;
+		}
+
+		if (!xidLook->tag.lock) {
+			XID_PRINT("LockOwners: NULL LOCK", xidLook);
+			continue;
+		}
+
+		if (xidLook->tag.lock != lock_offset) {
+			XID_PRINT("LockOwners: different lock", xidLook);
+			continue;
+		}
+
+		if (LOCK_LOCKMETHOD(*lock) != lockmethod) {
+			XID_PRINT("LockOwners: other table", xidLook);
+			continue;
+		}
+
+		if (xidLook->nHolding <= 0) {
+			XID_PRINT("LockOwners: not holding", xidLook);
+			continue;
+		}
+
+		if (nitems >= hbounds[0]) {
+			elog(NOTICE,"LockOwners: array size exceeded");
+			break;
+		}
+
+		/*
+		 * Check that the holding process is still alive by sending
+		 * him an unused (ignored) signal. If the kill fails the
+		 * process is not alive.
+		 */
+		if ((xidLook->tag.pid != MyProcPid) \
+			&& (kill(xidLook->tag.pid, SIGCHLD)) != 0)
+		{
+			/* Return a negative pid to signal that process is dead */
+			data_ptr[nitems++] = - (xidLook->tag.pid);
+			XID_PRINT("LockOwners: not alive", xidLook);
+			/* XXX - TODO: remove this entry and update lock stats */
+			continue;
+		}
+
+		/* Found a process holding the lock */
+		XID_PRINT("LockOwners: holding", xidLook);
+		data_ptr[nitems++] = xidLook->tag.pid;
+	}
+
+	SpinRelease(masterLock);
+
+	/* Adjust the actual size of the array */
+	hbounds[0] = nitems;
+	size = hdrlen + sizeof(int) * hbounds[0];
+	memmove((char *) array, (char *) &size, sizeof(int));
+	memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
+
+	return (array);
+}
+
 #ifdef DEADLOCK_DEBUG
 /*
- * Dump all locks. Must have already acquired the masterLock.
+ * Dump all locks in the proc->lockQueue. Must have already acquired
+ * the masterLock.
  */
 void
 DumpLocks()
@@ -1577,9 +1885,8 @@ DumpLocks()
 	SPINLOCK	masterLock;
 	int			numLockModes;
 	LOCK	   *lock;
-
-	count;
-	int			lockmethod = 1;
+	int			count = 0;
+	int			lockmethod = DEFAULT_LOCKMETHOD;
 	LOCKMETHODTABLE    *lockMethodTable;
 
 	ShmemPIDLookup(MyProcPid, &location);
@@ -1604,11 +1911,18 @@ DumpLocks()
 	SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
 	end = MAKE_OFFSET(lockQueue);
 
-	LOCK_DUMP("DumpLocks", MyProc->waitLock, 0);
-	XID_PRINT("DumpLocks", xidLook);
+	if (MyProc->waitLock) {
+		LOCK_PRINT_AUX("DumpLocks: waiting on", MyProc->waitLock, 0);
+	}
 
-	for (count = 0;;)
+	for (;;)
 	{
+		if (count++ > 2000)
+		{
+			elog(NOTICE, "DumpLocks: xid loop detected, giving up");
+			break;
+		}
+
 		/* ---------------------------
 		 * XXX Here we assume the shared memory queue is circular and
 		 * that we know its internal structure.  Should have some sort of
@@ -1618,19 +1932,68 @@ DumpLocks()
 		done = (xidLook->queue.next == end);
 		lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
-		LOCK_DUMP("DumpLocks", lock, 0);
-
-		if (count++ > 2000)
-		{
-			elog(NOTICE, "DumpLocks: xid loop detected, giving up");
-			break;
-		}
+		XID_PRINT_AUX("DumpLocks", xidLook);
+		LOCK_PRINT_AUX("DumpLocks", lock, 0);
 
 		if (done)
 			break;
+
 		SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
 		xidLook = tmp;
 	}
 }
 
+/*
+ * Dump all postgres locks. Must have already acquired the masterLock.
+ */
+void
+DumpAllLocks()
+{
+	SHMEM_OFFSET location;
+	PROC 		*proc;
+	XIDLookupEnt *xidLook = NULL;
+	LOCK		*lock;
+	int 		pid;
+	int 		count = 0;
+	int			lockmethod = DEFAULT_LOCKMETHOD;
+	LOCKMETHODTABLE 	*lockMethodTable;
+	HTAB 		*xidTable;
+
+	pid = getpid();
+	ShmemPIDLookup(pid,&location);
+	if (location == INVALID_OFFSET)
+		return;
+	proc = (PROC *) MAKE_PTR(location);
+	if (proc != MyProc)
+		return;
+
+	Assert(lockmethod < NumLockMethods);
+	lockMethodTable = LockMethodTable[lockmethod];
+	if (!lockMethodTable)
+		return;
+
+	xidTable = lockMethodTable->xidHash;
+
+	if (MyProc->waitLock) {
+		LOCK_PRINT_AUX("DumpAllLocks: waiting on", MyProc->waitLock,0);
+	}
+
+	hash_seq(NULL);
+	while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) &&
+		   (xidLook != (XIDLookupEnt *)TRUE)) {
+		XID_PRINT_AUX("DumpAllLocks", xidLook);
+
+		if (xidLook->tag.lock) {
+			lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
+			LOCK_PRINT_AUX("DumpAllLocks", lock, 0);
+		} else {
+			elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL");
+		}
+
+		if (count++ > 2000) {
+			elog(NOTICE,"DumpAllLocks: possible loop, giving up");
+			break;
+		}
+	}
+}
 #endif
diff --git a/src/backend/storage/lmgr/multi.c b/src/backend/storage/lmgr/multi.c
index 0998389ffe91ec373a01760c722c3b68fb9f493b..3887a8e37d16b7051cb739a8342b511d15db13d2 100644
--- a/src/backend/storage/lmgr/multi.c
+++ b/src/backend/storage/lmgr/multi.c
@@ -12,7 +12,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.22 1998/08/19 02:02:44 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.23 1998/08/25 21:20:28 scrappy Exp $
  *
  * NOTES:
  *	 (1) The lock.c module assumes that the caller here is doing
@@ -29,12 +29,10 @@
 #include "utils/rel.h"
 #include "miscadmin.h"			/* MyDatabaseId */
 
-static bool
-MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode,
-			 PG_LOCK_LEVEL level);
-static bool
-MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode,
-			 PG_LOCK_LEVEL level);
+static bool MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag,
+						 LOCKMODE lockmode, PG_LOCK_LEVEL level);
+static bool MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag,
+						 LOCKMODE lockmode, PG_LOCK_LEVEL level);
 
 #ifdef LowLevelLocking
 
@@ -130,6 +128,7 @@ static int	MultiPrios[] = {
  * lock table is ONE lock table, not three.
  */
 LOCKMETHOD MultiTableId = (LOCKMETHOD) NULL;
+LOCKMETHOD LongTermTableId = (LOCKMETHOD) NULL;
 #ifdef NOT_USED
 LOCKMETHOD ShortTermTableId = (LOCKMETHOD) NULL;
 #endif
@@ -150,12 +149,25 @@ InitMultiLevelLocks()
 	/* -----------------------
 	 * No short term lock table for now.  -Jeff 15 July 1991
 	 *
-	 * ShortTermTableId = LockTableRename(lockmethod);
+	 * ShortTermTableId = LockMethodTableRename(lockmethod);
 	 * if (! (ShortTermTableId)) {
 	 *	 elog(ERROR,"InitMultiLocks: couldnt rename lock table");
 	 * }
 	 * -----------------------
 	 */
+
+#ifdef USER_LOCKS
+	/*
+	 * Allocate another tableId for long-term locks
+	 */
+	LongTermTableId = LockMethodTableRename(MultiTableId);
+	if (!(LongTermTableId))
+	{
+		elog(ERROR,
+			 "InitMultiLevelLocks: couldn't rename long-term lock table");
+	}
+#endif
+
 	return MultiTableId;
 }
 
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index fb052582e79d7fc0ba43338270620225fdf4bc64..4cd9602a8b51dae5ecf3beb29407761da02427fb 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@
  *		This is so that we can support more backends. (system-wide semaphore
  *		sets run out pretty fast.)				  -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $
  */
 #include <sys/time.h>
 #include <unistd.h>
@@ -75,10 +75,13 @@
 #include "storage/shmem.h"
 #include "storage/spin.h"
 #include "storage/proc.h"
+#include "utils/trace.h"
 
 static void HandleDeadLock(int sig);
 static PROC *ProcWakeup(PROC *proc, int errType);
 
+#define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT]
+
 /* --------------------
  * Spin lock for manipulating the shared process data structure:
  * ProcGlobal.... Adding an extra spin lock seemed like the smallest
@@ -247,10 +250,7 @@ InitProcess(IPCKey key)
 	 */
 	SpinRelease(ProcStructLock);
 
-	MyProc->pid = 0;
-#if 0
 	MyProc->pid = MyProcPid;
-#endif
 	MyProc->xid = InvalidTransactionId;
 #ifdef LowLevelLocking
 	MyProc->xmin = InvalidTransactionId;
@@ -361,10 +361,13 @@ ProcKill(int exitStatus, int pid)
 	 * ---------------
 	 */
 	ProcReleaseSpins(proc);
-	LockReleaseAll(1, &proc->lockQueue);
+	LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue);
 
 #ifdef USER_LOCKS
-	LockReleaseAll(0, &proc->lockQueue);
+	/*
+	 * Assume we have a second lock table.
+	 */
+	LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
 #endif
 
 	/* ----------------
@@ -437,11 +440,12 @@ ProcQueueInit(PROC_QUEUE *queue)
  * NOTES: The process queue is now a priority queue for locking.
  */
 int
-ProcSleep(PROC_QUEUE *waitQueue,
+ProcSleep(PROC_QUEUE *waitQueue,		/* lock->waitProcs */
 		  SPINLOCK spinlock,
-		  int token,
+		  int token,					/* lockmode */
 		  int prio,
-		  LOCK *lock)
+		  LOCK *lock,
+		  TransactionId xid)		    /* needed by user locks, see below */
 {
 	int			i;
 	PROC	   *proc;
@@ -470,7 +474,6 @@ ProcSleep(PROC_QUEUE *waitQueue,
 	proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
 
 	/* If we are a reader, and they are writers, skip past them */
-
 	for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
 		proc = (PROC *) MAKE_PTR(proc->links.prev);
 
@@ -482,12 +485,22 @@ ProcSleep(PROC_QUEUE *waitQueue,
 	MyProc->token = token;
 	MyProc->waitLock = lock;
 
+#ifdef USER_LOCKS
+	/* -------------------
+	 * Currently, we only need this for the ProcWakeup routines.
+	 * This must be 0 for user lock, so we can't just use the value
+	 * from GetCurrentTransactionId().
+	 * -------------------
+	 */
+	TransactionIdStore(xid, &MyProc->xid);
+#else
 #ifndef LowLevelLocking
 	/* -------------------
 	 * currently, we only need this for the ProcWakeup routines
 	 * -------------------
 	 */
 	TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
+#endif
 #endif
 
 	/* -------------------
@@ -510,7 +523,8 @@ ProcSleep(PROC_QUEUE *waitQueue,
 	 * --------------
 	 */
 	MemSet(&timeval, 0, sizeof(struct itimerval));
-	timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER;
+	timeval.it_value.tv_sec = \
+		(DeadlockCheckTimer ? DeadlockCheckTimer : DEADLOCK_CHECK_TIMER);
 
 	do
 	{
@@ -525,7 +539,8 @@ ProcSleep(PROC_QUEUE *waitQueue,
 		 * the semaphore implementation.
 		 * --------------
 		 */
-		IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+		IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum,
+						 IpcExclusiveLock);
 	} while (MyProc->errType == STATUS_NOT_FOUND);		/* sleep after deadlock
 														 * check */
 
@@ -534,8 +549,6 @@ ProcSleep(PROC_QUEUE *waitQueue,
 	 * ---------------
 	 */
 	timeval.it_value.tv_sec = 0;
-
-
 	if (setitimer(ITIMER_REAL, &timeval, &dummy))
 		elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup");
 
@@ -546,6 +559,11 @@ ProcSleep(PROC_QUEUE *waitQueue,
 	 */
 	SpinAcquire(spinlock);
 
+#ifdef LOCK_MGR_DEBUG
+	/* Just to get meaningful debug messages from DumpLocks() */
+	MyProc->waitLock = (LOCK *)NULL;
+#endif
+
 	return (MyProc->errType);
 }
 
@@ -589,17 +607,39 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
 {
 	PROC	   *proc;
 	int			count;
+	int			trace_flag;
+	int			last_locktype = -1;
+	int			queue_size = queue->size;
+
+	Assert(queue->size >= 0);
 
 	if (!queue->size)
 		return (STATUS_NOT_FOUND);
 
 	proc = (PROC *) MAKE_PTR(queue->links.prev);
 	count = 0;
-	while ((LockResolveConflicts(lockmethod,
+	while ((queue_size--) && (proc))
+	{
+		/*
+		 * This proc will conflict as the previous one did, don't even try.
+		 */
+		if (proc->token == last_locktype)
+		{
+			continue;
+		}
+
+		/*
+		 * This proc conflicts with locks held by others, ignored.
+		 */
+		if (LockResolveConflicts(lockmethod,
 								 lock,
 								 proc->token,
-								 proc->xid) == STATUS_OK))
-	{
+								 proc->xid,
+								 (XIDLookupEnt *) NULL) != STATUS_OK)
+		{
+			last_locktype = proc->token;
+			continue;
+		}
 
 		/*
 		 * there was a waiting process, grant it the lock before waking it
@@ -608,24 +648,34 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
 		 * time that the awoken process begins executing again.
 		 */
 		GrantLock(lock, proc->token);
-		queue->size--;
 
 		/*
 		 * ProcWakeup removes proc from the lock waiting process queue and
 		 * returns the next proc in chain.
 		 */
-		proc = ProcWakeup(proc, NO_ERROR);
 
 		count++;
-		if (!proc || queue->size == 0)
-			break;
+		queue->size--;
+		proc = ProcWakeup(proc, NO_ERROR);
 	}
 
+	Assert(queue->size >= 0);
+
 	if (count)
 		return (STATUS_OK);
-	else
+	else {
 		/* Something is still blocking us.	May have deadlocked. */
+		trace_flag = (lock->tag.lockmethod == USER_LOCKMETHOD) ? \
+			TRACE_USERLOCKS : TRACE_LOCKS;
+		TPRINTF(trace_flag,
+				"ProcLockWakeup: lock(%x) can't wake up any process",
+				MAKE_OFFSET(lock));
+#ifdef DEADLOCK_DEBUG
+		if (pg_options[trace_flag] >= 2)
+			DumpAllLocks();
+#endif
 		return (STATUS_NOT_FOUND);
+	}
 }
 
 void
@@ -685,7 +735,7 @@ HandleDeadLock(int sig)
 	}
 
 #ifdef DEADLOCK_DEBUG
-	DumpLocks();
+	DumpAllLocks();
 #endif
 
 	if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
@@ -711,7 +761,8 @@ HandleDeadLock(int sig)
 	 * I was awoken by a signal, not by someone unlocking my semaphore.
 	 * ------------------
 	 */
-	IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+	IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum,
+					   IpcExclusiveLock);
 
 	/* -------------
 	 * Set MyProc->errType to STATUS_ERROR so that we abort after
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index fd1f95aab988368eef0b1b5d5bed73f5d541384c..e989e57b4404ce8e47c8bf0b14bfbfb594cdd625 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.16 1998/08/01 15:26:37 vadim Exp $
+ * $Id: lock.h,v 1.17 1998/08/25 21:20:31 scrappy Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -15,6 +15,8 @@
 
 #include <storage/shmem.h>
 #include <storage/itemptr.h>
+#include <storage/sinvaladt.h>
+#include <utils/array.h>
 
 extern SPINLOCK LockMgrLock;
 typedef int MASK;
@@ -32,7 +34,7 @@ typedef int MASK;
  * NLOCKENTS - The maximum number of lock entries in the lock table.
  * ----------------------
  */
-#define NBACKENDS 50
+#define NBACKENDS MaxBackendId
 #define NLOCKS_PER_XACT 40
 #define NLOCKENTS NLOCKS_PER_XACT*NBACKENDS
 
@@ -51,9 +53,14 @@ typedef int LOCKMETHOD;
  * CreateSpinLocks() or the number of shared memory locations allocated
  * for lock table spin locks in the case of machines with TAS instructions.
  */
-#define MAX_LOCK_METHODS 2
+#define MAX_LOCK_METHODS 	3
 
-#define INVALID_TABLEID 0
+#define INVALID_TABLEID		0
+
+#define INVALID_LOCKMETHOD	INVALID_TABLEID
+#define DEFAULT_LOCKMETHOD	1
+#define USER_LOCKMETHOD		2
+#define MIN_LOCKMETHOD		DEFAULT_LOCKMETHOD
 
 /*typedef struct LOCK LOCK; */
 
@@ -63,9 +70,11 @@ typedef struct LTAG
 	Oid			relId;
 	Oid			dbId;
 	ItemPointerData tupleId;
+	uint16		lockmethod;				/* needed by user locks */
 } LOCKTAG;
 
 #define TAGSIZE (sizeof(LOCKTAG))
+#define LOCKTAG_LOCKMETHOD(locktag) ((locktag).lockmethod)
 
 /* This is the control structure for a lock table.	It
  * lives in shared memory:
@@ -143,8 +152,18 @@ typedef struct XIDTAG
 	SHMEM_OFFSET lock;
 	int			pid;
 	TransactionId xid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+	uint16		lockmethod;				/* for debug or consistency checking */
+#endif
 } XIDTAG;
 
+#ifdef USE_XIDTAG_LOCKMETHOD
+#define XIDTAG_LOCKMETHOD(xidtag) ((xidtag).lockmethod)
+#else
+#define XIDTAG_LOCKMETHOD(xidtag) \
+		(((LOCK*) MAKE_PTR((xidtag).lock))->tag.lockmethod)
+#endif
+
 typedef struct XIDLookupEnt
 {
 	/* tag */
@@ -157,6 +176,7 @@ typedef struct XIDLookupEnt
 } XIDLookupEnt;
 
 #define XID_TAGSIZE (sizeof(XIDTAG))
+#define XIDENT_LOCKMETHOD(xident) (XIDTAG_LOCKMETHOD((xident).tag))
 
 /* originally in procq.h */
 typedef struct PROC_QUEUE
@@ -191,14 +211,16 @@ typedef struct LOCK
 	int			nActive;
 } LOCK;
 
-#define LockGetLock_nHolders(l) l->nHolders
+#define LOCK_LOCKMETHOD(lock) (LOCKTAG_LOCKMETHOD((lock).tag))
 
+#define LockGetLock_nHolders(l) l->nHolders
+#ifdef NOT_USED
 #define LockDecrWaitHolders(lock, lockmode) \
 ( \
   lock->nHolding--, \
   lock->holders[lockmode]-- \
 )
-
+#endif
 #define LockLockTable() SpinAcquire(LockMgrLock);
 #define UnlockLockTable() SpinRelease(LockMgrLock);
 
@@ -209,23 +231,27 @@ extern SPINLOCK LockMgrLock;
  */
 extern void InitLocks(void);
 extern void LockDisable(int status);
-extern LOCKMETHOD
-LockMethodTableInit(char *tabName, MASK *conflictsP, int *prioP,
-			int numModes);
-extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode);
-extern int
-LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
-					 TransactionId xid);
-extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode);
+extern LOCKMETHOD LockMethodTableInit(char *tabName, MASK *conflictsP,
+									  int *prioP, int numModes);
+extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod);
+extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+						LOCKMODE lockmode);
+extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock,
+								LOCKMODE lockmode, TransactionId xid,
+								XIDLookupEnt *xidentP);
+extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+						LOCKMODE lockmode);
 extern void GrantLock(LOCK *lock, LOCKMODE lockmode);
 extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue);
 extern int	LockShmemSize(void);
 extern bool LockingDisabled(void);
-extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check);
+extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock,
+						  bool skip_check);
+ArrayType* LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag);
 
 #ifdef DEADLOCK_DEBUG
 extern void DumpLocks(void);
-
+extern void DumpAllLocks(void);
 #endif
 
 #endif							/* LOCK_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 8b8b7e28488daea7a1956b5d886a9a3115dd0c67..50f7b03ef633d37b4208e0af2ee6593609c063d7 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.13 1998/07/27 19:38:38 vadim Exp $
+ * $Id: proc.h,v 1.14 1998/08/25 21:20:32 scrappy Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -105,10 +105,10 @@ extern bool ProcRemove(int pid);
 /* make static in storage/lmgr/proc.c -- jolly */
 
 extern void ProcQueueInit(PROC_QUEUE *queue);
-extern int
-ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token,
-		  int prio, LOCK *lock);
-extern int	ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock);
+extern int ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token,
+					 int prio, LOCK *lock, TransactionId xid);
+extern int	ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
+						   LOCK *lock);
 extern void ProcAddLock(SHM_QUEUE *elem);
 extern void ProcReleaseSpins(PROC *proc);
 extern void ProcFreeAllSemaphores(void);