diff --git a/src/backend/storage/ipc/shmqueue.c b/src/backend/storage/ipc/shmqueue.c
index 2cdccd0a55027601da932284238ee7d895b51bf8..ae6950ca6f0779441767172df3ab2b27cb8ffd97 100644
--- a/src/backend/storage/ipc/shmqueue.c
+++ b/src/backend/storage/ipc/shmqueue.c
@@ -8,32 +8,34 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/ipc/shmqueue.c,v 1.13 2000/01/26 05:56:58 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/ipc/shmqueue.c,v 1.14 2001/01/22 22:30:06 tgl Exp $
  *
  * NOTES
  *
  * Package for managing doubly-linked lists in shared memory.
  * The only tricky thing is that SHM_QUEUE will usually be a field
- * in a larger record.	SHMQueueGetFirst has to return a pointer
+ * in a larger record.	SHMQueueNext has to return a pointer
  * to the record itself instead of a pointer to the SHMQueue field
- * of the record.  It takes an extra pointer and does some extra
+ * of the record.  It takes an extra parameter and does some extra
  * pointer arithmetic to do this correctly.
  *
  * NOTE: These are set up so they can be turned into macros some day.
  *
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
+
 #include "storage/shmem.h"
 
 /*#define SHMQUEUE_DEBUG*/
 #ifdef SHMQUEUE_DEBUG
-#define SHMQUEUE_DEBUG_DEL		/* deletions */
-#define SHMQUEUE_DEBUG_HD		/* head inserts */
-#define SHMQUEUE_DEBUG_TL		/* tail inserts */
+
 #define SHMQUEUE_DEBUG_ELOG NOTICE
-#endif	 /* SHMQUEUE_DEBUG */
+
+static void dumpQ(SHM_QUEUE *q, char *s);
+
+#endif
+
 
 /*
  * ShmemQueueInit -- make the head of a new queue point
@@ -84,76 +86,23 @@ SHMQueueDelete(SHM_QUEUE *queue)
 	Assert(SHM_PTR_VALID(nextElem));
 	Assert(SHM_PTR_VALID(prevElem));
 
-#ifdef SHMQUEUE_DEBUG_DEL
+#ifdef SHMQUEUE_DEBUG
 	dumpQ(queue, "in SHMQueueDelete: begin");
-#endif	 /* SHMQUEUE_DEBUG_DEL */
+#endif
 
 	prevElem->next = (queue)->next;
 	nextElem->prev = (queue)->prev;
 
-#ifdef SHMQUEUE_DEBUG_DEL
-	dumpQ((SHM_QUEUE *) MAKE_PTR(queue->prev), "in SHMQueueDelete: end");
-#endif	 /* SHMQUEUE_DEBUG_DEL */
-}
-
-#ifdef SHMQUEUE_DEBUG
-void
-dumpQ(SHM_QUEUE *q, char *s)
-{
-	char		elem[NAMEDATALEN];
-	char		buf[1024];
-	SHM_QUEUE  *start = q;
-	int			count = 0;
-
-	sprintf(buf, "q prevs: %x", MAKE_OFFSET(q));
-	q = (SHM_QUEUE *) MAKE_PTR(q->prev);
-	while (q != start)
-	{
-		sprintf(elem, "--->%x", MAKE_OFFSET(q));
-		strcat(buf, elem);
-		q = (SHM_QUEUE *) MAKE_PTR(q->prev);
-		if (q->prev == MAKE_OFFSET(q))
-			break;
-		if (count++ > 40)
-		{
-			strcat(buf, "BAD PREV QUEUE!!");
-			break;
-		}
-	}
-	sprintf(elem, "--->%x", MAKE_OFFSET(q));
-	strcat(buf, elem);
-	elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
-
-	sprintf(buf, "q nexts: %x", MAKE_OFFSET(q));
-	count = 0;
-	q = (SHM_QUEUE *) MAKE_PTR(q->next);
-	while (q != start)
-	{
-		sprintf(elem, "--->%x", MAKE_OFFSET(q));
-		strcat(buf, elem);
-		q = (SHM_QUEUE *) MAKE_PTR(q->next);
-		if (q->next == MAKE_OFFSET(q))
-			break;
-		if (count++ > 10)
-		{
-			strcat(buf, "BAD NEXT QUEUE!!");
-			break;
-		}
-	}
-	sprintf(elem, "--->%x", MAKE_OFFSET(q));
-	strcat(buf, elem);
-	elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
+	(queue)->prev = (queue)->next = INVALID_OFFSET;
 }
 
-#endif	 /* SHMQUEUE_DEBUG */
-
 /*
- * SHMQueueInsertHD -- put elem in queue between the queue head
- *		and its "prev" element.
+ * SHMQueueInsertBefore -- put elem in queue before the given queue
+ *		element.  Inserting "before" the queue head puts the elem
+ *		at the tail of the queue.
  */
-#ifdef NOT_USED
 void
-SHMQueueInsertHD(SHM_QUEUE *queue, SHM_QUEUE *elem)
+SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem)
 {
 	SHM_QUEUE  *prevPtr = (SHM_QUEUE *) MAKE_PTR((queue)->prev);
 	SHMEM_OFFSET elemOffset = MAKE_OFFSET(elem);
@@ -161,24 +110,28 @@ SHMQueueInsertHD(SHM_QUEUE *queue, SHM_QUEUE *elem)
 	Assert(SHM_PTR_VALID(queue));
 	Assert(SHM_PTR_VALID(elem));
 
-#ifdef SHMQUEUE_DEBUG_HD
-	dumpQ(queue, "in SHMQueueInsertHD: begin");
-#endif	 /* SHMQUEUE_DEBUG_HD */
+#ifdef SHMQUEUE_DEBUG
+	dumpQ(queue, "in SHMQueueInsertBefore: begin");
+#endif
 
 	(elem)->next = prevPtr->next;
 	(elem)->prev = queue->prev;
 	(queue)->prev = elemOffset;
 	prevPtr->next = elemOffset;
 
-#ifdef SHMQUEUE_DEBUG_HD
-	dumpQ(queue, "in SHMQueueInsertHD: end");
-#endif	 /* SHMQUEUE_DEBUG_HD */
-}
-
+#ifdef SHMQUEUE_DEBUG
+	dumpQ(queue, "in SHMQueueInsertBefore: end");
 #endif
+}
 
+/*
+ * SHMQueueInsertAfter -- put elem in queue after the given queue
+ *		element.  Inserting "after" the queue head puts the elem
+ *		at the head of the queue.
+ */
+#ifdef NOT_USED
 void
-SHMQueueInsertTL(SHM_QUEUE *queue, SHM_QUEUE *elem)
+SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
 {
 	SHM_QUEUE  *nextPtr = (SHM_QUEUE *) MAKE_PTR((queue)->next);
 	SHMEM_OFFSET elemOffset = MAKE_OFFSET(elem);
@@ -186,58 +139,55 @@ SHMQueueInsertTL(SHM_QUEUE *queue, SHM_QUEUE *elem)
 	Assert(SHM_PTR_VALID(queue));
 	Assert(SHM_PTR_VALID(elem));
 
-#ifdef SHMQUEUE_DEBUG_TL
-	dumpQ(queue, "in SHMQueueInsertTL: begin");
-#endif	 /* SHMQUEUE_DEBUG_TL */
+#ifdef SHMQUEUE_DEBUG
+	dumpQ(queue, "in SHMQueueInsertAfter: begin");
+#endif
 
 	(elem)->prev = nextPtr->prev;
 	(elem)->next = queue->next;
 	(queue)->next = elemOffset;
 	nextPtr->prev = elemOffset;
 
-#ifdef SHMQUEUE_DEBUG_TL
-	dumpQ(queue, "in SHMQueueInsertTL: end");
-#endif	 /* SHMQUEUE_DEBUG_TL */
+#ifdef SHMQUEUE_DEBUG
+	dumpQ(queue, "in SHMQueueInsertAfter: end");
+#endif
 }
+#endif /* NOT_USED */
 
-/*
- * SHMQueueFirst -- Get the first element from a queue
+/*--------------------
+ * SHMQueueNext -- Get the next element from a queue
  *
- * First element is queue->next.  If SHMQueue is part of
+ * To start the iteration, pass the queue head as both queue and curElem.
+ * Returns NULL if no more elements.
+ *
+ * Next element is at curElem->next.  If SHMQueue is part of
  * a larger structure, we want to return a pointer to the
  * whole structure rather than a pointer to its SHMQueue field.
  * I.E. struct {
  *		int				stuff;
  *		SHMQueue		elem;
  * } ELEMType;
- * when this element is in a queue (queue->next) is struct.elem.
- * nextQueue allows us to calculate the offset of the SHMQueue
- * field in the structure.
- *
- * call to SHMQueueFirst should take these parameters:
+ * When this element is in a queue, (prevElem->next) is struct.elem.
+ * We subtract linkOffset to get the correct start address of the structure.
  *
- *	 &(queueHead),&firstElem,&(firstElem->next)
+ * calls to SHMQueueNext should take these parameters:
  *
- * Note that firstElem may well be uninitialized.  if firstElem
- * is initially K, &(firstElem->next) will be K+ the offset to
- * next.
+ *	 &(queueHead), &(queueHead), offsetof(ELEMType, elem)
+ * or
+ *	 &(queueHead), &(curElem->elem), offsetof(ELEMType, elem)
+ *--------------------
  */
-void
-SHMQueueFirst(SHM_QUEUE *queue, Pointer *nextPtrPtr, SHM_QUEUE *nextQueue)
+Pointer
+SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem, Size linkOffset)
 {
-	SHM_QUEUE  *elemPtr = (SHM_QUEUE *) MAKE_PTR((queue)->next);
+	SHM_QUEUE  *elemPtr = (SHM_QUEUE *) MAKE_PTR((curElem)->next);
 
-	Assert(SHM_PTR_VALID(queue));
-	*nextPtrPtr = (Pointer) (((unsigned long) *nextPtrPtr) +
-				((unsigned long) elemPtr) - ((unsigned long) nextQueue));
-
-	/*
-	 * nextPtrPtr a ptr to a structure linked in the queue nextQueue is
-	 * the SHMQueue field of the structure nextPtrPtr - nextQueue is 0
-	 * minus the offset of the queue field n the record elemPtr +
-	 * (*nextPtrPtr - nexQueue) is the start of the structure containing
-	 * elemPtr.
-	 */
+	Assert(SHM_PTR_VALID(curElem));
+
+	if (elemPtr == queue)		/* back to the queue head? */
+		return NULL;
+
+	return (Pointer) (((char *) elemPtr) - linkOffset);
 }
 
 /*
@@ -255,3 +205,55 @@ SHMQueueEmpty(SHM_QUEUE *queue)
 	}
 	return FALSE;
 }
+
+#ifdef SHMQUEUE_DEBUG
+
+static void
+dumpQ(SHM_QUEUE *q, char *s)
+{
+	char		elem[NAMEDATALEN];
+	char		buf[1024];
+	SHM_QUEUE  *start = q;
+	int			count = 0;
+
+	sprintf(buf, "q prevs: %lx", MAKE_OFFSET(q));
+	q = (SHM_QUEUE *) MAKE_PTR(q->prev);
+	while (q != start)
+	{
+		sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+		strcat(buf, elem);
+		q = (SHM_QUEUE *) MAKE_PTR(q->prev);
+		if (q->prev == MAKE_OFFSET(q))
+			break;
+		if (count++ > 40)
+		{
+			strcat(buf, "BAD PREV QUEUE!!");
+			break;
+		}
+	}
+	sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+	strcat(buf, elem);
+	elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
+
+	sprintf(buf, "q nexts: %lx", MAKE_OFFSET(q));
+	count = 0;
+	q = (SHM_QUEUE *) MAKE_PTR(q->next);
+	while (q != start)
+	{
+		sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+		strcat(buf, elem);
+		q = (SHM_QUEUE *) MAKE_PTR(q->next);
+		if (q->next == MAKE_OFFSET(q))
+			break;
+		if (count++ > 10)
+		{
+			strcat(buf, "BAD NEXT QUEUE!!");
+			break;
+		}
+	}
+	sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+	strcat(buf, elem);
+	elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
+}
+
+#endif	 /* SHMQUEUE_DEBUG */
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index 7d881ff6da43e3f0fdbebe0a6c5d3631a435905c..af9fbc8421b1291d81ca8af14c1a54fcf0d0ab23 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -1,4 +1,4 @@
-$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.5 2001/01/16 06:11:34 tgl Exp $
+$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.6 2001/01/22 22:30:06 tgl Exp $
 
 There are two fundamental lock structures: the per-lockable-object LOCK
 struct, and the per-lock-holder HOLDER struct.  A LOCK object exists
@@ -15,7 +15,7 @@ details.
 
 ---------------------------------------------------------------------------
 
-The lock manager's LOCK:
+The lock manager's LOCK objects contain:
 
 tag -
     The key fields that are used for hashing locks in the shared memory
@@ -29,10 +29,10 @@ tag -
     
     tag.dbId -
 	Uniquely identifies the database in which the relation lives.  If
-	this is a shared system relation (e.g. pg_user) the dbId should be
-	set to 0.
+	this is a shared system relation (e.g. pg_database) the dbId must
+	be set to 0.
 
-    tag.tupleId -
+    tag.objId -
 	Uniquely identifies the block/page within the relation and the
 	tuple within the block.  If we are setting a table level lock
 	both the blockId and tupleId (in an item pointer this is called
@@ -56,6 +56,12 @@ waitMask -
     This bitmask shows the types of locks being waited for.  Bit i of waitMask
     is 1 if and only if requested[i] > granted[i].
 
+lockHolders -
+    This is a shared memory queue of all the HOLDER structs associated with
+    the lock object.  Note that both granted and waiting HOLDERs are in this
+    list (indeed, the same HOLDER might have some already-granted locks and
+    be waiting for more!).
+
 waitProcs -
     This is a shared memory queue of all process structures corresponding to
     a backend that is waiting (sleeping) until another backend releases this
@@ -93,7 +99,7 @@ zero, the lock object is no longer needed and can be freed.
 
 ---------------------------------------------------------------------------
 
-The lock manager's HOLDER:
+The lock manager's HOLDER objects contain:
 
 tag -
     The key fields that are used for hashing entries in the shared memory
@@ -103,8 +109,8 @@ tag -
     tag.lock
         SHMEM offset of the LOCK object this holder is for.
 
-    tag.pid
-        PID of backend process that owns this holder.
+    tag.proc
+        SHMEM offset of PROC of backend process that owns this holder.
 
     tag.xid
         XID of transaction this holder is for, or InvalidTransactionId
@@ -124,6 +130,250 @@ holding -
 nHolding -
     Sum of the holding[] array.
 
-queue -
+lockLink -
+    List link for shared memory queue of all the HOLDER objects for the
+    same LOCK.
+
+procLink -
     List link for shared memory queue of all the HOLDER objects for the
     same backend.
+
+---------------------------------------------------------------------------
+
+The deadlock detection algorithm:
+
+Since we allow user transactions to request locks in any order, deadlock
+is possible.  We use a deadlock detection/breaking algorithm that is
+fairly standard in essence, but there are many special considerations
+needed to deal with Postgres' generalized locking model.
+
+A key design consideration is that we want to make routine operations
+(lock grant and release) run quickly when there is no deadlock, and avoid
+the overhead of deadlock handling as much as possible.  We do this using
+an "optimistic waiting" approach: if a process cannot acquire the lock
+it wants immediately, it goes to sleep without any deadlock check.  But
+it also sets a delay timer, with a delay of DeadlockTimeout milliseconds
+(typically set to one second).  If the delay expires before the process is
+granted the lock it wants, it runs the deadlock detection/breaking code.
+Normally this code will determine that there is no deadlock condition,
+and then the process will go back to sleep and wait quietly until it is
+granted the lock.  But if a deadlock condition does exist, it will be
+resolved, usually by aborting the detecting process' transaction.  In this
+way, we avoid deadlock handling overhead whenever the wait time for a lock
+is less than DeadlockTimeout, while not imposing an unreasonable delay of
+detection when there is an error.
+
+Lock acquisition (routines LockAcquire and ProcSleep) follows these rules:
+
+1. A lock request is granted immediately if it does not conflict with any
+existing or waiting lock request, or if the process already holds an
+instance of the same lock type (eg, there's no penalty to acquire a read
+lock twice).  Note that a process never conflicts with itself, eg one can
+obtain read lock when one already holds exclusive lock.
+
+2. Otherwise the process joins the lock's wait queue.  Normally it will be
+added to the end of the queue, but there is an exception: if the process
+already holds locks on this same lockable object that conflict with the
+request of any pending waiter, then the process will be inserted in the
+wait queue just ahead of the first such waiter.  (If we did not make this
+check, the deadlock detection code would adjust the queue order to resolve
+the conflict, but it's relatively cheap to make the check in ProcSleep and
+avoid a deadlock timeout delay in this case.)  Note special case: if the
+process holds locks that conflict with the first waiter, so that it would
+go at the front of the queue, and its request does not conflict with the
+already-granted locks, then the process will be granted the lock without
+going to sleep at all.
+
+When a lock is released, the lock release routine (ProcLockWakeup) scans
+the lock object's wait queue.  Each waiter is awoken if (a) its request
+does not conflict with already-granted locks, and (b) its request does
+not conflict with the requests of prior un-wakable waiters.  Rule (b)
+ensures that conflicting requests are granted in order of arrival.
+There are cases where a later waiter must be allowed to go in front of
+conflicting earlier waiters to avoid deadlock, but it is not
+ProcLockWakeup's responsibility to recognize these cases; instead, the
+deadlock detection code re-orders the wait queue when necessary.
+
+To perform deadlock checking, we use the standard method of viewing the
+various processes as nodes in a directed graph (the waits-for graph or
+WFG).  There is a graph edge leading from process A to process B if A
+waits for B, ie, A is waiting for some lock and B holds a conflicting
+lock.  There is a deadlock condition if and only if the WFG contains
+a cycle.  We detect cycles by searching outward along waits-for edges
+to see if we return to our starting point.  There are three possible
+outcomes:
+
+1. All outgoing paths terminate at a running process (which has no
+outgoing edge).
+
+2. A deadlock is detected by looping back to the start point.  We resolve
+such a deadlock by canceling the start point's lock request and reporting
+an error in that transaction, which normally leads to transaction abort
+and release of that transaction's held locks.  Note that it's sufficient
+to cancel one request to remove the cycle; we don't need to kill all the
+transactions involved.
+
+3. Some path(s) loop back to a node other than the start point.  This
+indicates a deadlock, but one that does not involve our starting process.
+We ignore this condition on the grounds that resolving such a deadlock
+is the responsibility of the processes involved --- killing our start-
+point process would not resolve the deadlock.  So, cases 1 and 3 both
+report "no deadlock".
+
+Postgres' situation is a little more complex than the standard discussion
+of deadlock detection, for two reasons:
+
+1. A process can be waiting for more than one other process, since there
+might be multiple holders of (nonconflicting) lock types that all conflict
+with the waiter's request.  This creates no real difficulty however; we
+simply need to be prepared to trace more than one outgoing edge.
+
+2. If a process A is behind a process B in some lock's wait queue, and
+their requested locks conflict, then we must say that A waits for B, since
+ProcLockWakeup will never awaken A before B.  This creates additional
+edges in the WFG.  We call these "soft" edges, as opposed to the "hard"
+edges induced by locks already held.  Note that if B already holds any
+locks conflicting with A's request, then their relationship is a hard edge
+not a soft edge.
+
+A "soft" block, or wait-priority block, has the same potential for
+inducing deadlock as a hard block.  However, we may be able to resolve
+a soft block without aborting the transactions involved: we can instead
+rearrange the order of the wait queue.  This rearrangement reverses the
+direction of the soft edge between two processes with conflicting requests
+whose queue order is reversed.  If we can find a rearrangement that
+eliminates a cycle without creating new ones, then we can avoid an abort.
+Checking for such possible rearrangements is the trickiest part of the
+algorithm.
+
+The workhorse of the deadlock detector is a routine FindLockCycle() which
+is given a starting point process (which must be a waiting process).
+It recursively scans outwards across waits-for edges as discussed above.
+If it finds no cycle involving the start point, it returns "false".
+(As discussed above, we can ignore cycles not involving the start point.)
+When such a cycle is found, FindLockCycle() returns "true", and as it
+unwinds it also builds a list of any "soft" edges involved in the cycle.
+If the resulting list is empty then there is a hard deadlock and the
+configuration cannot succeed.  However, if the list is not empty, then
+reversing any one of the listed edges through wait-queue rearrangement
+will eliminate that cycle.  Since such a reversal might create cycles
+elsewhere, we may need to try every possibility.  Therefore, we need to
+be able to invoke FindLockCycle() on hypothetical configurations (wait
+orders) as well as the current real order.
+
+The easiest way to handle this seems to be to have a lookaside table that
+shows the proposed new queue order for each wait queue that we are
+considering rearranging.  This table is passed to FindLockCycle, and it
+believes the given queue order rather than the "real" order for each lock
+that has an entry in the lookaside table.
+
+We build a proposed new queue order by doing a "topological sort" of the
+existing entries.  Each soft edge that we are currently considering
+reversing is a property of the partial order that the topological sort
+has to enforce.  We must use a sort method that preserves the input
+ordering as much as possible, so as not to gratuituously break arrival
+order for processes not involved in a deadlock.  (This is not true of the
+tsort method shown in Knuth, for example, but it's easily done by a simple
+doubly-nested-loop method that emits the first legal candidate at each
+step.  Fortunately, we don't need a highly efficient sort algorithm, since
+the number of partial order constraints is not likely to be large.)  Note
+that failure of the topological sort tells us we have conflicting ordering
+constraints, and therefore that the last-added soft edge reversal
+conflicts with a prior edge reversal.  We need to detect this case to
+avoid an infinite loop in the case where no possible rearrangement will
+work: otherwise, we might try a reversal, find that it still leads to
+a cycle, then try to un-reverse the reversal while trying to get rid of
+that cycle, etc etc.  Topological sort failure tells us the un-reversal
+is not a legitimate move in this context.
+
+So, the basic step in our rearrangement method is to take a list of
+soft edges in a cycle (as returned by FindLockCycle()) and successively
+try the reversal of each one as a topological-sort constraint added to
+whatever constraints we are already considering.  We recursively search
+through all such sets of constraints to see if any one eliminates all
+the deadlock cycles at once.  Although this might seem impossibly
+inefficient, it shouldn't be a big problem in practice, because there
+will normally be very few, and not very large, deadlock cycles --- if
+any at all.  So the combinatorial inefficiency isn't going to hurt us.
+Besides, it's better to spend some time to guarantee that we've checked
+all possible escape routes than to abort a transaction when we didn't
+really have to.
+
+Each edge reversal constraint can be viewed as requesting that the waiting
+process A be moved to before the blocking process B in the wait queue they
+are both in.  This action will reverse the desired soft edge, as well as
+any other soft edges between A and other processes it is advanced over.
+No other edges will be affected (note this is actually a constraint on our
+topological sort method to not re-order the queue more than necessary.)
+Therefore, we can be sure we have not created any new deadlock cycles if
+neither FindLockCycle(A) nor FindLockCycle(B) discovers any cycle.  Given
+the above-defined behavior of FindLockCycle, each of these searches is
+necessary as well as sufficient, since FindLockCycle starting at the
+original start point will not complain about cycles that include A or B
+but not the original start point.
+
+In short then, a proposed rearrangement of the wait queue(s) is determined
+by one or more broken soft edges A->B, fully specified by the output of
+topological sorts of each wait queue involved, and then tested by invoking
+FindLockCycle() starting at the original start point as well as each of
+the mentioned processes (A's and B's).  If none of the tests detect a
+cycle, then we have a valid configuration and can implement it by
+reordering the wait queues per the sort outputs (and then applying
+ProcLockWakeup on each reordered queue, in case a waiter has become wakable).
+If any test detects a soft cycle, we can try to resolve it by adding each
+soft link in that cycle, in turn, to the proposed rearrangement list.
+This is repeated recursively until we either find a workable rearrangement
+or determine that none exists.  In the latter case, the outer level
+resolves the deadlock by aborting the original start-point transaction.
+
+The particular order in which rearrangements are tried depends on the
+order FindLockCycle() happens to scan in, so if there are multiple
+workable rearrangements of the wait queues, then it is unspecified which
+one will be chosen.  What's more important is that we guarantee to try
+every queue rearrangement that could lead to success.  (For example,
+if we have A before B before C and the needed order constraints are
+C before A and B before C, we would first discover that A before C
+doesn't work and try the rearrangement C before A before B.  This would
+eventually lead to the discovery of the additional constraint B before C.)
+
+Got that?
+
+Miscellaneous notes:
+
+1. It is easily proven that no deadlock will be missed due to our
+asynchronous invocation of deadlock checking.  A deadlock cycle in the WFG
+is formed when the last edge in the cycle is added; therefore the last
+process in the cycle to wait (the one from which that edge is outgoing) is
+certain to detect and resolve the cycle when it later runs HandleDeadLock.
+This holds even if that edge addition created multiple cycles; the process
+may indeed abort without ever noticing those additional cycles, but we
+don't particularly care.  The only other possible creation of deadlocks is
+during deadlock resolution's rearrangement of wait queues, and we already
+saw that that algorithm will prove that it creates no new deadlocks before
+it attempts to actually execute any rearrangement.
+
+2. It is not certain that a deadlock will be resolved by aborting the
+last-to-wait process.  If earlier waiters in the cycle have not yet run
+HandleDeadLock, then the first one to do so will be the victim.
+
+3. No live (wakable) process can be missed by ProcLockWakeup, since it
+examines every member of the wait queue (this was not true in the 7.0
+implementation, BTW).  Therefore, if ProcLockWakeup is always invoked
+after a lock is released or a wait queue is rearranged, there can be no
+failure to wake a wakable process.  One should also note that
+LockWaitCancel (abort a waiter due to outside factors) must run
+ProcLockWakeup, in case the cancelled waiter was soft-blocking other
+waiters.
+
+4. We can minimize excess rearrangement-trial work by being careful to scan
+the wait queue from the front when looking for soft edges.  For example,
+if we have queue order A,B,C and C has deadlock conflicts with both A and B,
+we want to generate the "C before A" constraint first, rather than wasting
+time with "C before B", which won't move C far enough up.  So we look for
+soft edges outgoing from C starting at the front of the wait queue.
+
+5. The working data structures needed by the deadlock detection code can
+be proven not to need more than MAXBACKENDS entries.  Therefore the
+working storage can be statically allocated instead of depending on
+palloc().  This is a good thing, since if the deadlock detector could
+fail for extraneous reasons, all the above safety proofs fall down.
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index fa2b98cc463bfa50d3e69001fd12c98844ea2759..35e960e800857cbf43042ad28dad22d8e852fd5d 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.78 2001/01/16 06:11:34 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.79 2001/01/22 22:30:06 tgl Exp $
  *
  * NOTES
  *	  Outside modules can create a lock table and acquire/release
@@ -127,10 +127,10 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP)
 		|| (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
         )
         elog(DEBUG,
-             "%s: holder(%lx) lock(%lx) tbl(%d) pid(%d) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
+             "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
              where, MAKE_OFFSET(holderP), holderP->tag.lock,
 			 HOLDER_LOCKMETHOD(*(holderP)),
-             holderP->tag.pid, holderP->tag.xid,
+             holderP->tag.proc, holderP->tag.xid,
              holderP->holding[1], holderP->holding[2], holderP->holding[3],
 			 holderP->holding[4], holderP->holding[5], holderP->holding[6],
 			 holderP->holding[7], holderP->nHolding);
@@ -455,8 +455,7 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  *		tag.objId						block id		lock id2
  *										or xact id
  *		tag.offnum						0				lock id1
- *		xid.pid							backend pid		backend pid
- *		xid.xid							xid or 0		0
+ *		holder.xid						xid or 0		0
  *		persistence						transaction		user or backend
  *										or backend
  *
@@ -526,11 +525,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 	{
 		lock->grantMask = 0;
 		lock->waitMask = 0;
+		SHMQueueInit(&(lock->lockHolders));
+		ProcQueueInit(&(lock->waitProcs));
 		lock->nRequested = 0;
 		lock->nGranted = 0;
 		MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
 		MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
-		ProcQueueInit(&(lock->waitProcs));
 		LOCK_PRINT("LockAcquire: new", lock, lockmode);
 	}
 	else
@@ -547,7 +547,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 	 */
 	MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
 	holdertag.lock = MAKE_OFFSET(lock);
-	holdertag.pid = MyProcPid;
+	holdertag.proc = MAKE_OFFSET(MyProc);
 	TransactionIdStore(xid, &holdertag.xid);
 
 	/*
@@ -570,7 +570,9 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 	{
 		holder->nHolding = 0;
 		MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
-		ProcAddLock(&holder->queue);
+		/* Add holder to appropriate lists */
+		SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
+		SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
 		HOLDER_PRINT("LockAcquire: new", holder);
 	}
 	else
@@ -693,7 +695,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 		{
 			if (holder->nHolding == 0)
 			{
-				SHMQueueDelete(&holder->queue);
+				SHMQueueDelete(&holder->lockLink);
+				SHMQueueDelete(&holder->procLink);
 				holder = (HOLDER *) hash_search(holderTable,
 												(Pointer) holder,
 												HASH_REMOVE, &found);
@@ -862,33 +865,17 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
 static void
 LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
 {
-	HOLDER	   *holder = NULL;
-	HOLDER	   *nextHolder = NULL;
-	SHM_QUEUE  *holderQueue = &(proc->holderQueue);
-	SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
+	SHM_QUEUE  *procHolders = &(proc->procHolders);
+	HOLDER	   *holder;
 	int			i;
 
 	MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
 
-	if (SHMQueueEmpty(holderQueue))
-		return;
-
-	SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
+	holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+									 offsetof(HOLDER, procLink));
 
-	do
+	while (holder)
 	{
-		/* ---------------------------
-		 * XXX Here we assume the shared memory queue is circular and
-		 * that we know its internal structure.  Should have some sort of
-		 * macros to allow one to walk it.	mer 20 July 1991
-		 * ---------------------------
-		 */
-		if (holder->queue.next == end)
-			nextHolder = NULL;
-		else
-			SHMQueueFirst(&holder->queue,
-						  (Pointer *) &nextHolder, &nextHolder->queue);
-
 		if (lockOffset == holder->tag.lock)
 		{
 			for (i = 1; i < MAX_LOCKMODES; i++)
@@ -897,8 +884,9 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
 			}
 		}
 
-		holder = nextHolder;
-	} while (holder);
+		holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+										 offsetof(HOLDER, procLink));
+	}
 }
 
 /*
@@ -1080,7 +1068,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 	 */
 	MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
 	holdertag.lock = MAKE_OFFSET(lock);
-	holdertag.pid = MyProcPid;
+	holdertag.proc = MAKE_OFFSET(MyProc);
 	TransactionIdStore(xid, &holdertag.xid);
 
 	holderTable = lockMethodTable->holderHash;
@@ -1160,7 +1148,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 	Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
 	Assert(lock->nGranted <= lock->nRequested);
 
-	if (!lock->nRequested)
+	if (lock->nRequested == 0)
 	{
 		/* ------------------
 		 * if there's no one waiting in the queue,
@@ -1189,15 +1177,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 	 * If this was my last hold on this lock, delete my entry in the holder
 	 * table.
 	 */
-	if (!holder->nHolding)
+	if (holder->nHolding == 0)
 	{
-		if (holder->queue.prev == INVALID_OFFSET)
-			elog(NOTICE, "LockRelease: holder.prev == INVALID_OFFSET");
-		if (holder->queue.next == INVALID_OFFSET)
-			elog(NOTICE, "LockRelease: holder.next == INVALID_OFFSET");
-		if (holder->queue.next != INVALID_OFFSET)
-			SHMQueueDelete(&holder->queue);
 		HOLDER_PRINT("LockRelease: deleting", holder);
+		SHMQueueDelete(&holder->lockLink);
+		SHMQueueDelete(&holder->procLink);
 		holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
 										HASH_REMOVE_SAVED, &found);
 		if (!holder || !found)
@@ -1220,7 +1204,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 }
 
 /*
- * LockReleaseAll -- Release all locks in a process's lock queue.
+ * LockReleaseAll -- Release all locks in a process's lock list.
  *
  * Well, not really *all* locks.
  *
@@ -1234,22 +1218,20 @@ bool
 LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
 			   bool allxids, TransactionId xid)
 {
-	HOLDER	   *holder = NULL;
-	HOLDER	   *nextHolder = NULL;
-	SHM_QUEUE  *holderQueue = &(proc->holderQueue);
-	SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
+	SHM_QUEUE  *procHolders = &(proc->procHolders);
+	HOLDER	   *holder;
+	HOLDER	   *nextHolder;
 	SPINLOCK	masterLock;
 	LOCKMETHODTABLE *lockMethodTable;
 	int			i,
 				numLockModes;
 	LOCK	   *lock;
 	bool		found;
-	int			nleft;
 
 #ifdef LOCK_DEBUG
 	if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
 		elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
-			 lockmethod, MyProcPid);
+			 lockmethod, proc->pid);
 #endif
 
 	Assert(lockmethod < NumLockMethods);
@@ -1260,51 +1242,33 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
 		return FALSE;
 	}
 
-	if (SHMQueueEmpty(holderQueue))
-		return TRUE;
-
 	numLockModes = lockMethodTable->ctl->numLockModes;
 	masterLock = lockMethodTable->ctl->masterLock;
 
 	SpinAcquire(masterLock);
 
-	SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
-
-	nleft = 0;
+	holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+									 offsetof(HOLDER, procLink));
 
-	do
+	while (holder)
 	{
 		bool		wakeupNeeded = false;
 
-		/* ---------------------------
-		 * XXX Here we assume the shared memory queue is circular and
-		 * that we know its internal structure.  Should have some sort of
-		 * macros to allow one to walk it.	mer 20 July 1991
-		 * ---------------------------
-		 */
-		if (holder->queue.next == end)
-			nextHolder = NULL;
-		else
-			SHMQueueFirst(&holder->queue,
-						  (Pointer *) &nextHolder, &nextHolder->queue);
+		/* Get link first, since we may unlink/delete this holder */
+		nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+											 offsetof(HOLDER, procLink));
 
-		Assert(holder->tag.pid == proc->pid);
+		Assert(holder->tag.proc == MAKE_OFFSET(proc));
 
 		lock = (LOCK *) MAKE_PTR(holder->tag.lock);
 
 		/* Ignore items that are not of the lockmethod to be removed */
 		if (LOCK_LOCKMETHOD(*lock) != lockmethod)
-		{
-			nleft++;
 			goto next_item;
-		}
 
 		/* If not allxids, ignore items that are of the wrong xid */
 		if (!allxids && xid != holder->tag.xid)
-		{
-			nleft++;
 			goto next_item;
-		}
 
 		HOLDER_PRINT("LockReleaseAll", holder);
 		LOCK_PRINT("LockReleaseAll", lock, 0);
@@ -1364,9 +1328,10 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
 		HOLDER_PRINT("LockReleaseAll: deleting", holder);
 
 		/*
-		 * Remove the holder entry from the process' lock queue
+		 * Remove the holder entry from the linked lists
 		 */
-		SHMQueueDelete(&holder->queue);
+		SHMQueueDelete(&holder->lockLink);
+		SHMQueueDelete(&holder->procLink);
 
 		/*
 		 * remove the holder entry from the hashtable
@@ -1406,18 +1371,6 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
 
 next_item:
 		holder = nextHolder;
-	} while (holder);
-
-	/*
-	 * Reinitialize the queue only if nothing has been left in.
-	 */
-	if (nleft == 0)
-	{
-#ifdef LOCK_DEBUG
-        if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
-            elog(DEBUG, "LockReleaseAll: reinitializing holderQueue");
-#endif
-		SHMQueueInit(holderQueue);
 	}
 
 	SpinRelease(masterLock);
@@ -1476,12 +1429,11 @@ LockShmemSize(int maxBackends)
 bool
 DeadLockCheck(PROC *thisProc, LOCK *findlock)
 {
-	HOLDER	   *holder = NULL;
-	HOLDER	   *nextHolder = NULL;
 	PROC	   *waitProc;
 	PROC_QUEUE *waitQueue;
-	SHM_QUEUE  *holderQueue = &(thisProc->holderQueue);
-	SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
+	SHM_QUEUE  *procHolders = &(thisProc->procHolders);
+	HOLDER	   *holder;
+	HOLDER	   *nextHolder;
 	LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
 	LOCK	   *lock;
 	int			i,
@@ -1501,26 +1453,16 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
 	/*
 	 * Scan over all the locks held/awaited by thisProc.
 	 */
-	if (SHMQueueEmpty(holderQueue))
-		return false;
-
-	SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
+	holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+									 offsetof(HOLDER, procLink));
 
-	do
+	while (holder)
 	{
-		/* ---------------------------
-		 * XXX Here we assume the shared memory queue is circular and
-		 * that we know its internal structure.  Should have some sort of
-		 * macros to allow one to walk it.	mer 20 July 1991
-		 * ---------------------------
-		 */
-		if (holder->queue.next == end)
-			nextHolder = NULL;
-		else
-			SHMQueueFirst(&holder->queue,
-						  (Pointer *) &nextHolder, &nextHolder->queue);
+		/* Get link first, since we may unlink/delete this holder */
+		nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+											 offsetof(HOLDER, procLink));
 
-		Assert(holder->tag.pid == thisProc->pid);
+		Assert(holder->tag.proc == MAKE_OFFSET(thisProc));
 
 		lock = (LOCK *) MAKE_PTR(holder->tag.lock);
 
@@ -1532,7 +1474,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
 		LOCK_PRINT("DeadLockCheck", lock, 0);
 
 		/*
-		 * waitLock is always in holderQueue of waiting proc, if !first_run
+		 * waitLock is always in procHolders of waiting proc, if !first_run
 		 * then upper caller will handle waitProcs queue of waitLock.
 		 */
 		if (thisProc->waitLock == lock && !first_run)
@@ -1555,13 +1497,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
 			}
 
 			/*
-			 * Else - get the next lock from thisProc's holderQueue
+			 * Else - get the next lock from thisProc's procHolders
 			 */
 			goto nxtl;
 		}
 
 		waitQueue = &(lock->waitProcs);
-		waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+		waitProc = (PROC *) MAKE_PTR(waitQueue->links.next);
 
 		/*
 		 * Inner loop scans over all processes waiting for this lock.
@@ -1589,7 +1531,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
 					/* and he blocked by me -> deadlock */
 					if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
 						return true;
-					/* we shouldn't look at holderQueue of our blockers */
+					/* we shouldn't look at procHolders of our blockers */
 					goto nextWaitProc;
 				}
 
@@ -1600,7 +1542,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
 				 * implicitly). Note that we don't do like test if
 				 * !first_run (when thisProc is holder and non-waiter on
 				 * lock) and so we call DeadLockCheck below for every
-				 * waitProc in thisProc->holderQueue, even for waitProc-s
+				 * waitProc in thisProc->procHolders, even for waitProc-s
 				 * un-blocked by thisProc. Should we? This could save us
 				 * some time...
 				 */
@@ -1618,7 +1560,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
 					goto nextWaitProc;
 			}
 
-			/* Recursively check this process's holderQueue. */
+			/* Recursively check this process's procHolders. */
 			Assert(nprocs < MAXBACKENDS);
 			checked_procs[nprocs++] = waitProc;
 
@@ -1699,12 +1641,12 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
 			}
 
 nextWaitProc:
-			waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+			waitProc = (PROC *) MAKE_PTR(waitProc->links.next);
 		}
 
 nxtl:
 		holder = nextHolder;
-	} while (holder);
+	}
 
 	/* if we got here, no deadlock */
 	return false;
@@ -1712,18 +1654,17 @@ nxtl:
 
 #ifdef LOCK_DEBUG
 /*
- * Dump all locks in the proc->holderQueue. Must have already acquired
- * the masterLock.
+ * Dump all locks in the proc->procHolders list.
+ *
+ * Must have already acquired the masterLock.
  */
 void
 DumpLocks(void)
 {
 	SHMEM_OFFSET location;
 	PROC	   *proc;
-	SHM_QUEUE  *holderQueue;
-	HOLDER	   *holder = NULL;
-	HOLDER	   *nextHolder = NULL;
-	SHMEM_OFFSET end;
+	SHM_QUEUE  *procHolders;
+	HOLDER	   *holder;
 	LOCK	   *lock;
 	int			lockmethod = DEFAULT_LOCKMETHOD;
 	LOCKMETHODTABLE *lockMethodTable;
@@ -1734,8 +1675,7 @@ DumpLocks(void)
 	proc = (PROC *) MAKE_PTR(location);
 	if (proc != MyProc)
 		return;
-	holderQueue = &proc->holderQueue;
-	end = MAKE_OFFSET(holderQueue);
+	procHolders = &proc->procHolders;
 
 	Assert(lockmethod < NumLockMethods);
 	lockMethodTable = LockMethodTable[lockmethod];
@@ -1745,34 +1685,21 @@ DumpLocks(void)
 	if (proc->waitLock)
 		LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
 
-	if (SHMQueueEmpty(holderQueue))
-		return;
-
-	SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
+	holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+									 offsetof(HOLDER, procLink));
 
-	do
+	while (holder)
 	{
-		/* ---------------------------
-		 * XXX Here we assume the shared memory queue is circular and
-		 * that we know its internal structure.  Should have some sort of
-		 * macros to allow one to walk it.	mer 20 July 1991
-		 * ---------------------------
-		 */
-		if (holder->queue.next == end)
-			nextHolder = NULL;
-		else
-			SHMQueueFirst(&holder->queue,
-						  (Pointer *) &nextHolder, &nextHolder->queue);
-
-		Assert(holder->tag.pid == proc->pid);
+		Assert(holder->tag.proc == MAKE_OFFSET(proc));
 
 		lock = (LOCK *) MAKE_PTR(holder->tag.lock);
 
 		HOLDER_PRINT("DumpLocks", holder);
 		LOCK_PRINT("DumpLocks", lock, 0);
 
-		holder = nextHolder;
-	} while (holder);
+		holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+										 offsetof(HOLDER, procLink));
+	}
 }
 
 /*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index bc461f0f86f369314f3bc74b5fc426be1b14b586..af345e6e825b56441824d65d7c8f5ebaede29ea5 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.94 2001/01/16 20:59:34 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.95 2001/01/22 22:30:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -48,7 +48,7 @@
  *		This is so that we can support more backends. (system-wide semaphore
  *		sets run out pretty fast.)				  -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.94 2001/01/16 20:59:34 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.95 2001/01/22 22:30:06 tgl Exp $
  */
 #include "postgres.h"
 
@@ -228,9 +228,6 @@ InitProcess(void)
 			SpinRelease(ProcStructLock);
 			elog(FATAL, "cannot create new proc: out of memory");
 		}
-
-		/* this cannot be initialized until after the buffer pool */
-		SHMQueueInit(&(MyProc->holderQueue));
 	}
 
 	/*
@@ -259,10 +256,15 @@ InitProcess(void)
 		MyProc->sem.semNum = -1;
 	}
 
+	SHMQueueElemInit(&(MyProc->links));
+	MyProc->errType = NO_ERROR;
 	MyProc->pid = MyProcPid;
 	MyProc->databaseId = MyDatabaseId;
 	MyProc->xid = InvalidTransactionId;
 	MyProc->xmin = InvalidTransactionId;
+	MyProc->waitLock = NULL;
+	MyProc->waitHolder = NULL;
+	SHMQueueInit(&(MyProc->procHolders));
 
 	/* ----------------------
 	 * Release the lock.
@@ -282,9 +284,6 @@ InitProcess(void)
 		(location != MAKE_OFFSET(MyProc)))
 		elog(STOP, "InitProcess: ShmemPID table broken");
 
-	MyProc->errType = NO_ERROR;
-	SHMQueueElemInit(&(MyProc->links));
-
 	on_shmem_exit(ProcKill, 0);
 }
 
@@ -342,7 +341,6 @@ RemoveFromWaitQueue(PROC *proc)
 		waitLock->waitMask &= ~(1 << lockmode);
 
 	/* Clean up the proc's own state */
-	SHMQueueElemInit(&(proc->links));
 	proc->waitLock = NULL;
 	proc->waitHolder = NULL;
 
@@ -451,6 +449,7 @@ ProcRemove(int pid)
 
 	ProcFreeSem(proc->sem.semId, proc->sem.semNum);
 
+	/* Add PROC struct to freelist so space can be recycled in future */
 	proc->links.next = ProcGlobal->freeProcs;
 	ProcGlobal->freeProcs = MAKE_OFFSET(proc);
 
@@ -565,12 +564,7 @@ ProcSleep(LOCKMETHODCTL *lockctl,
     bigtime_t time_interval;
 #endif
 
-	MyProc->waitLock = lock;
-	MyProc->waitHolder = holder;
-	MyProc->waitLockMode = lockmode;
-	/* We assume the caller set up MyProc->heldLocks */
-
-	proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+	proc = (PROC *) MAKE_PTR(waitQueue->links.next);
 
 	/* if we don't conflict with any waiter - be first in queue */
 	if (!(lockctl->conflictTab[lockmode] & waitMask))
@@ -593,7 +587,7 @@ ProcSleep(LOCKMETHODCTL *lockctl,
 			{
 				/* Yes, report deadlock failure */
 				MyProc->errType = STATUS_ERROR;
-				goto rt;
+				return STATUS_ERROR;
 			}
 			/* I must go after him in queue - so continue loop */
 		}
@@ -624,20 +618,25 @@ ProcSleep(LOCKMETHODCTL *lockctl,
 		(aheadGranted[procWaitMode])++;
 		if (aheadGranted[procWaitMode] == lock->requested[procWaitMode])
 			waitMask &= ~(1 << procWaitMode);
-		proc = (PROC *) MAKE_PTR(proc->links.prev);
+		proc = (PROC *) MAKE_PTR(proc->links.next);
 	}
 
 ins:;
 	/* -------------------
-	 * Insert self into queue, ahead of the given proc.
-	 * These operations are atomic (because of the spinlock).
+	 * Insert self into queue, ahead of the given proc (or at tail of queue).
 	 * -------------------
 	 */
-	SHMQueueInsertTL(&(proc->links), &(MyProc->links));
+	SHMQueueInsertBefore(&(proc->links), &(MyProc->links));
 	waitQueue->size++;
 
 	lock->waitMask |= myMask;
 
+	/* Set up wait information in PROC object, too */
+	MyProc->waitLock = lock;
+	MyProc->waitHolder = holder;
+	MyProc->waitLockMode = lockmode;
+	/* We assume the caller set up MyProc->heldLocks */
+
 	MyProc->errType = NO_ERROR;		/* initialize result for success */
 
 	/* mark that we are waiting for a lock */
@@ -723,11 +722,10 @@ ins:;
 	 */
 	SpinAcquire(spinlock);
 
-rt:;
-
-	MyProc->waitLock = NULL;
-	MyProc->waitHolder = NULL;
-
+	/*
+	 * We don't have to do anything else, because the awaker did all the
+	 * necessary update of the lock table and MyProc.
+	 */
 	return MyProc->errType;
 }
 
@@ -745,18 +743,24 @@ ProcWakeup(PROC *proc, int errType)
 
 	/* assume that spinlock has been acquired */
 
+	/* Proc should be sleeping ... */
 	if (proc->links.prev == INVALID_OFFSET ||
 		proc->links.next == INVALID_OFFSET)
 		return (PROC *) NULL;
 
-	retProc = (PROC *) MAKE_PTR(proc->links.prev);
+	/* Save next process before we zap the list link */
+	retProc = (PROC *) MAKE_PTR(proc->links.next);
 
+	/* Remove process from wait queue */
 	SHMQueueDelete(&(proc->links));
-	SHMQueueElemInit(&(proc->links));
 	(proc->waitLock->waitProcs.size)--;
 
+	/* Clean up process' state and pass it the ok/fail signal */
+	proc->waitLock = NULL;
+	proc->waitHolder = NULL;
 	proc->errType = errType;
 
+	/* And awaken it */
 	IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum);
 
 	return retProc;
@@ -780,7 +784,7 @@ ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
 	if (!queue_size)
 		return STATUS_NOT_FOUND;
 
-	proc = (PROC *) MAKE_PTR(queue->links.prev);
+	proc = (PROC *) MAKE_PTR(queue->links.next);
 
 	while (queue_size-- > 0)
 	{
@@ -820,12 +824,13 @@ ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
 
 		/*
 		 * ProcWakeup removes proc from the lock's waiting process queue
-		 * and returns the next proc in chain; don't use prev link.
+		 * and returns the next proc in chain; don't use proc's next-link,
+		 * because it's been cleared.
 		 */
 		continue;
 
 nextProc:
-		proc = (PROC *) MAKE_PTR(proc->links.prev);
+		proc = (PROC *) MAKE_PTR(proc->links.next);
 	}
 
 	Assert(queue->size >= 0);
@@ -848,12 +853,6 @@ nextProc:
 	}
 }
 
-void
-ProcAddLock(SHM_QUEUE *elem)
-{
-	SHMQueueInsertTL(&MyProc->holderQueue, elem);
-}
-
 /* --------------------
  * We only get to this routine if we got SIGALRM after DeadlockTimeout
  * while waiting for a lock to be released by some other process.  Look
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 85e2f379786eadd381d8d30ee158f9c6bb8bd415..6d84fea806a877227f1df21ecc5c53acfa05f56d 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.41 2001/01/16 06:11:34 tgl Exp $
+ * $Id: lock.h,v 1.42 2001/01/22 22:30:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -22,8 +22,8 @@
 /* originally in procq.h */
 typedef struct PROC_QUEUE
 {
-	SHM_QUEUE	links;
-	int			size;
+	SHM_QUEUE	links;			/* head of list of PROC objects */
+	int			size;			/* number of entries in list */
 } PROC_QUEUE;
 
 /* struct proc is declared in storage/proc.h, but must forward-reference it */
@@ -59,7 +59,7 @@ typedef int LOCKMASK;
 typedef int LOCKMODE;
 typedef int LOCKMETHOD;
 
-/* MAX_LOCKMODES cannot be larger than the bits in LOCKMASK */
+/* MAX_LOCKMODES cannot be larger than the # of bits in LOCKMASK */
 #define MAX_LOCKMODES	8
 
 /*
@@ -152,6 +152,7 @@ typedef struct LOCKTAG
  * tag -- uniquely identifies the object being locked
  * grantMask -- bitmask for all lock types currently granted on this object.
  * waitMask -- bitmask for all lock types currently awaited on this object.
+ * lockHolders -- list of HOLDER objects for this lock.
  * waitProcs -- queue of processes waiting for this lock.
  * requested -- count of each lock type currently requested on the lock
  *		(includes requests already granted!!).
@@ -167,6 +168,7 @@ typedef struct LOCK
 	/* data */
 	int			grantMask;		/* bitmask for lock types already granted */
 	int			waitMask;		/* bitmask for lock types awaited */
+	SHM_QUEUE	lockHolders;	/* list of HOLDER objects assoc. with lock */
 	PROC_QUEUE	waitProcs;		/* list of PROC objects waiting on lock */
 	int			requested[MAX_LOCKMODES]; /* counts of requested locks */
 	int			nRequested;		/* total of requested[] array */
@@ -189,8 +191,8 @@ typedef struct LOCK
  * holder hashtable.  A HOLDERTAG value uniquely identifies a lock holder.
  *
  * There are two possible kinds of holder tags: a transaction (identified
- * both by the PID of the backend running it, and the xact's own ID) and
- * a session (identified by backend PID, with xid = InvalidTransactionId).
+ * both by the PROC of the backend running it, and the xact's own ID) and
+ * a session (identified by backend PROC, with xid = InvalidTransactionId).
  *
  * Currently, session holders are used for user locks and for cross-xact
  * locks obtained for VACUUM.  We assume that a session lock never conflicts
@@ -201,11 +203,17 @@ typedef struct LOCK
  * zero holding[], for any lock that the process is currently waiting on.
  * Otherwise, holder objects whose counts have gone to zero are recycled
  * as soon as convenient.
+ *
+ * Each HOLDER object is linked into lists for both the associated LOCK object
+ * and the owning PROC object.  Note that the HOLDER is entered into these
+ * lists as soon as it is created, even if no lock has yet been granted.
+ * A PROC that is waiting for a lock to be granted will also be linked into
+ * the lock's waitProcs queue.
  */
 typedef struct HOLDERTAG
 {
 	SHMEM_OFFSET lock;			/* link to per-lockable-object information */
-	int			pid;			/* PID of backend */
+	SHMEM_OFFSET proc;			/* link to PROC of owning backend */
 	TransactionId xid;			/* xact ID, or InvalidTransactionId */
 } HOLDERTAG;
 
@@ -217,7 +225,8 @@ typedef struct HOLDER
 	/* data */
 	int			holding[MAX_LOCKMODES];	/* count of locks currently held */
 	int			nHolding;		/* total of holding[] array */
-	SHM_QUEUE	queue;			/* list link for process' list of holders */
+	SHM_QUEUE	lockLink;		/* list link for lock's list of holders */
+	SHM_QUEUE	procLink;		/* list link for process's list of holders */
 } HOLDER;
 
 #define SHMEM_HOLDERTAB_KEYSIZE  sizeof(HOLDERTAG)
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 131c3397894f7e5a4f133b3b646041e3c692f093..5fcd7c60ac7b6cc18a554e332d3858625fb26740 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.36 2001/01/16 20:59:34 tgl Exp $
+ * $Id: proc.h,v 1.37 2001/01/22 22:30:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -27,9 +27,8 @@ typedef struct
 } SEMA;
 
 /*
- * Each backend has a PROC struct in shared memory.  There is also a list
- * of currently-unused PROC structs that will be reallocated to new backends
- * (a fairly pointless optimization, but it's there anyway).
+ * Each backend has a PROC struct in shared memory.  There is also a list of
+ * currently-unused PROC structs that will be reallocated to new backends.
  *
  * links: list link for any list the PROC is in.  When waiting for a lock,
  * the PROC is linked into that lock's waitProcs queue.  A recycled PROC
@@ -37,7 +36,7 @@ typedef struct
  */
 struct proc
 {
-	/* proc->links MUST BE THE FIRST ELEMENT OF STRUCT (see ProcWakeup()) */
+	/* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */
 
 	SHM_QUEUE	links;			/* list link if process is in a list */
 
@@ -53,7 +52,8 @@ struct proc
 
 	XLogRecPtr	logRec;
 
-	/* Info about lock the process is currently waiting for, if any */
+	/* Info about lock the process is currently waiting for, if any. */
+	/* waitLock and waitHolder are NULL if not currently waiting. */
 	LOCK	   *waitLock;		/* Lock object we're sleeping on ... */
 	HOLDER	   *waitHolder;		/* Per-holder info for awaited lock */
 	LOCKMODE	waitLockMode;	/* type of lock we're waiting for */
@@ -64,7 +64,7 @@ struct proc
 	Oid			databaseId;		/* OID of database this backend is using */
 
 	short		sLocks[MAX_SPINS];		/* Spin lock stats */
-	SHM_QUEUE	holderQueue;	/* list of HOLDER objects for locks held or
+	SHM_QUEUE	procHolders;	/* list of HOLDER objects for locks held or
 								 * awaited by this backend */
 };
 
@@ -138,7 +138,6 @@ extern int ProcSleep(LOCKMETHODCTL *lockctl, LOCKMODE lockmode,
 					 LOCK *lock, HOLDER *holder);
 extern PROC *ProcWakeup(PROC *proc, int errType);
 extern int ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock);
-extern void ProcAddLock(SHM_QUEUE *elem);
 extern void ProcReleaseSpins(PROC *proc);
 extern bool LockWaitCancel(void);
 extern void HandleDeadLock(SIGNAL_ARGS);
diff --git a/src/include/storage/shmem.h b/src/include/storage/shmem.h
index 8b2cc4487f0aa400e79275a4ec36190e288535ed..fb76297a89802e3c45dd1110aaa7b2a08f39e18b 100644
--- a/src/include/storage/shmem.h
+++ b/src/include/storage/shmem.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: shmem.h,v 1.24 2000/11/28 23:27:57 tgl Exp $
+ * $Id: shmem.h,v 1.25 2001/01/22 22:30:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -100,9 +100,9 @@ typedef struct
 extern void SHMQueueInit(SHM_QUEUE *queue);
 extern void SHMQueueElemInit(SHM_QUEUE *queue);
 extern void SHMQueueDelete(SHM_QUEUE *queue);
-extern void SHMQueueInsertTL(SHM_QUEUE *queue, SHM_QUEUE *elem);
-extern void SHMQueueFirst(SHM_QUEUE *queue, Pointer *nextPtrPtr,
-			  SHM_QUEUE *nextQueue);
+extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem);
+extern Pointer SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem,
+							Size linkOffset);
 extern bool SHMQueueEmpty(SHM_QUEUE *queue);
 
 #endif	 /* SHMEM_H */