From a05eae029aa7648c8a5f4a5edb3709a2e867098e Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Thu, 25 Jan 2001 03:31:16 +0000
Subject: [PATCH] Re-implement deadlock detection and resolution, per design
 notes posted to pghackers on 18-Jan-01.

---
 src/backend/storage/lmgr/Makefile   |   4 +-
 src/backend/storage/lmgr/README     |  11 +-
 src/backend/storage/lmgr/deadlock.c | 734 ++++++++++++++++++++++++++++
 src/backend/storage/lmgr/lock.c     | 518 ++++++--------------
 src/backend/storage/lmgr/proc.c     | 291 +++++------
 src/include/storage/lock.h          |  14 +-
 src/include/storage/proc.h          |  15 +-
 7 files changed, 1015 insertions(+), 572 deletions(-)
 create mode 100644 src/backend/storage/lmgr/deadlock.c

diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile
index fca9a24a573..37cf81b1aa7 100644
--- a/src/backend/storage/lmgr/Makefile
+++ b/src/backend/storage/lmgr/Makefile
@@ -4,7 +4,7 @@
 #    Makefile for storage/lmgr
 #
 # IDENTIFICATION
-#    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Makefile,v 1.14 2000/08/31 16:10:36 petere Exp $
+#    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Makefile,v 1.15 2001/01/25 03:31:16 tgl Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -12,7 +12,7 @@ subdir = src/backend/storage/lmgr
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = lmgr.o lock.o proc.o
+OBJS = lmgr.o lock.o proc.o deadlock.o
 
 all: SUBSYS.o
 
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index af9fbc8421b..72e0d16f128 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -1,4 +1,4 @@
-$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.6 2001/01/22 22:30:06 tgl Exp $
+$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.7 2001/01/25 03:31:16 tgl Exp $
 
 There are two fundamental lock structures: the per-lockable-object LOCK
 struct, and the per-lock-holder HOLDER struct.  A LOCK object exists
@@ -373,7 +373,8 @@ time with "C before B", which won't move C far enough up.  So we look for
 soft edges outgoing from C starting at the front of the wait queue.
 
 5. The working data structures needed by the deadlock detection code can
-be proven not to need more than MAXBACKENDS entries.  Therefore the
-working storage can be statically allocated instead of depending on
-palloc().  This is a good thing, since if the deadlock detector could
-fail for extraneous reasons, all the above safety proofs fall down.
+be limited to numbers of entries computed from MaxBackends.  Therefore,
+we can allocate the worst-case space needed during backend startup.
+This seems a safer approach than trying to allocate workspace on the fly;
+we don't want to risk having the deadlock detector run out of memory,
+else we really have no guarantees at all that deadlock will be detected.
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
new file mode 100644
index 00000000000..aae635a6ccc
--- /dev/null
+++ b/src/backend/storage/lmgr/deadlock.c
@@ -0,0 +1,734 @@
+/*-------------------------------------------------------------------------
+ *
+ * deadlock.c
+ *	  POSTGRES deadlock detection code
+ *
+ * See src/backend/storage/lmgr/README for a description of the deadlock
+ * detection and resolution algorithms.
+ *
+ *
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.1 2001/01/25 03:31:16 tgl Exp $
+ *
+ *	Interface:
+ *
+ *	DeadLockCheck()
+ *	InitDeadLockChecking()
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "utils/memutils.h"
+
+
+/* One edge in the waits-for graph */
+typedef struct {
+	PROC   *waiter;				/* the waiting process */
+	PROC   *blocker;			/* the process it is waiting for */
+	int		pred;				/* workspace for TopoSort */
+	int		link;				/* workspace for TopoSort */
+} EDGE;
+
+/* One potential reordering of a lock's wait queue */
+typedef struct {
+	LOCK   *lock;				/* the lock whose wait queue is described */
+	PROC  **procs;				/* array of PROC *'s in new wait order */
+	int		nProcs;
+} WAIT_ORDER;
+
+
+static bool DeadLockCheckRecurse(PROC *proc);
+static bool TestConfiguration(PROC *startProc);
+static bool FindLockCycle(PROC *checkProc,
+						  EDGE *softEdges, int *nSoftEdges);
+static bool FindLockCycleRecurse(PROC *checkProc,
+								 EDGE *softEdges, int *nSoftEdges);
+static bool ExpandConstraints(EDGE *constraints, int nConstraints);
+static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints,
+					 PROC **ordering);
+#ifdef DEBUG_DEADLOCK
+static void PrintLockQueue(LOCK *lock, const char *info);
+#endif
+
+
+/*
+ * Working space for the deadlock detector
+ */
+
+/* Workspace for FindLockCycle */
+static PROC **visitedProcs;		/* Array of visited procs */
+static int nVisitedProcs;
+/* Workspace for TopoSort */
+static PROC **topoProcs;		/* Array of not-yet-output procs */
+static int *beforeConstraints;	/* Counts of remaining before-constraints */
+static int *afterConstraints;	/* List head for after-constraints */
+/* Output area for ExpandConstraints */
+static WAIT_ORDER *waitOrders;	/* Array of proposed queue rearrangements */
+static int nWaitOrders;
+static PROC **waitOrderProcs;	/* Space for waitOrders queue contents */
+/* Current list of constraints being considered */
+static EDGE *curConstraints;
+static int nCurConstraints;
+static int maxCurConstraints;
+/* Storage space for results from FindLockCycle */
+static EDGE *possibleConstraints;
+static int nPossibleConstraints;
+static int maxPossibleConstraints;
+
+
+/*
+ * InitDeadLockChecking -- initialize deadlock checker during backend startup
+ *
+ * This does per-backend initialization of the deadlock checker; primarily,
+ * allocation of working memory for DeadLockCheck.  We do this per-backend
+ * since there's no percentage in making the kernel do copy-on-write
+ * inheritance of workspace from the postmaster.  We want to allocate the
+ * space at startup because the deadlock checker might be invoked when there's
+ * no free memory left.
+ */
+void
+InitDeadLockChecking(void)
+{
+	MemoryContext	oldcxt;
+
+	/* Make sure allocations are permanent */
+	oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+
+	/*
+	 * FindLockCycle needs at most MaxBackends entries in visitedProcs[]
+	 */
+	visitedProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *));
+
+	/*
+	 * TopoSort needs to consider at most MaxBackends wait-queue entries,
+	 * and it needn't run concurrently with FindLockCycle.
+	 */
+	topoProcs = visitedProcs;	/* re-use this space */
+	beforeConstraints = (int *) palloc(MaxBackends * sizeof(int));
+	afterConstraints = (int *) palloc(MaxBackends * sizeof(int));
+
+	/*
+	 * We need to consider rearranging at most MaxBackends/2 wait queues
+	 * (since it takes at least two waiters in a queue to create a soft edge),
+	 * and the expanded form of the wait queues can't involve more than
+	 * MaxBackends total waiters.
+	 */
+	waitOrders = (WAIT_ORDER *) palloc((MaxBackends/2) * sizeof(WAIT_ORDER));
+	waitOrderProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *));
+
+	/*
+	 * Allow at most MaxBackends distinct constraints in a configuration.
+	 * (Is this enough?  In practice it seems it should be, but I don't quite
+	 * see how to prove it.  If we run out, we might fail to find a workable
+	 * wait queue rearrangement even though one exists.)  NOTE that this
+	 * number limits the maximum recursion depth of DeadLockCheckRecurse.
+	 * Making it really big might potentially allow a stack-overflow problem.
+	 */
+	maxCurConstraints = MaxBackends;
+	curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE));
+
+	/*
+	 * Allow up to 3*MaxBackends constraints to be saved without having to
+	 * re-run TestConfiguration.  (This is probably more than enough, but
+	 * we can survive if we run low on space by doing excess runs of
+	 * TestConfiguration to re-compute constraint lists each time needed.)
+	 * The last MaxBackends entries in possibleConstraints[] are reserved as
+	 * output workspace for FindLockCycle.
+	 */
+	maxPossibleConstraints = MaxBackends * 4;
+	possibleConstraints =
+		(EDGE *) palloc(maxPossibleConstraints * sizeof(EDGE));
+
+	MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * DeadLockCheck -- Checks for deadlocks for a given process
+ *
+ * This code looks for deadlocks involving the given process.  If any
+ * are found, it tries to rearrange lock wait queues to resolve the
+ * deadlock.  If resolution is impossible, return TRUE --- the caller
+ * is then expected to abort the given proc's transaction.
+ *
+ * We can't block on user locks, so no sense testing for deadlock
+ * because there is no blocking, and no timer for the block.  So,
+ * only look at regular locks.
+ *
+ * We must have already locked the master lock before being called.
+ * NOTE: although the lockctl structure appears to allow each lock
+ * table to have a different spinlock, all locks that can block had
+ * better use the same spinlock, else this code will not be adequately
+ * interlocked!
+ */
+bool
+DeadLockCheck(PROC *proc)
+{
+	int			i,
+				j;
+
+	/* Initialize to "no constraints" */
+	nCurConstraints = 0;
+	nPossibleConstraints = 0;
+	nWaitOrders = 0;
+
+	/* Search for deadlocks and possible fixes */
+	if (DeadLockCheckRecurse(proc))
+		return true;			/* cannot find a non-deadlocked state */
+
+	/* Apply any needed rearrangements of wait queues */
+	for (i = 0; i < nWaitOrders; i++)
+	{
+		LOCK   *lock = waitOrders[i].lock;
+		PROC  **procs = waitOrders[i].procs;
+		int		nProcs = waitOrders[i].nProcs;
+		PROC_QUEUE *waitQueue = &(lock->waitProcs);
+
+		Assert(nProcs == waitQueue->size);
+
+#ifdef DEBUG_DEADLOCK
+		PrintLockQueue(lock, "DeadLockCheck:");
+#endif
+
+		/* Reset the queue and re-add procs in the desired order */
+		ProcQueueInit(waitQueue);
+		for (j = 0; j < nProcs; j++)
+		{
+			SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links));
+			waitQueue->size++;
+		}
+
+#ifdef DEBUG_DEADLOCK
+		PrintLockQueue(lock, "rearranged to:");
+#endif
+	}
+	return false;
+}
+
+/*
+ * DeadLockCheckRecurse -- recursively search for valid orderings
+ *
+ * curConstraints[] holds the current set of constraints being considered
+ * by an outer level of recursion.  Add to this each possible solution
+ * constraint for any cycle detected at this level.
+ *
+ * Returns TRUE if no solution exists.  Returns FALSE if a deadlock-free
+ * state is attainable, in which case waitOrders[] shows the required
+ * rearrangements of lock wait queues (if any).
+ */
+static bool
+DeadLockCheckRecurse(PROC *proc)
+{
+	int			nEdges;
+	int			oldPossibleConstraints;
+	bool		savedList;
+	int			i;
+
+	nEdges = TestConfiguration(proc);
+	if (nEdges < 0)
+		return true;			/* hard deadlock --- no solution */
+	if (nEdges == 0)
+		return false;			/* good configuration found */
+	if (nCurConstraints >= maxCurConstraints)
+		return true;			/* out of room for active constraints? */
+	oldPossibleConstraints = nPossibleConstraints;
+	if (nPossibleConstraints + nEdges + MaxBackends <= maxPossibleConstraints)
+	{
+		/* We can save the edge list in possibleConstraints[] */
+		nPossibleConstraints += nEdges;
+		savedList = true;
+	}
+	else
+	{
+		/* Not room; will need to regenerate the edges on-the-fly */
+		savedList = false;
+	}
+	/*
+	 * Try each available soft edge as an addition to the configuration.
+	 */
+	for (i = 0; i < nEdges; i++)
+	{
+		if (!savedList && i > 0)
+		{
+			/* Regenerate the list of possible added constraints */
+			if (nEdges != TestConfiguration(proc))
+				elog(FATAL, "DeadLockCheckRecurse: inconsistent results");
+		}
+		curConstraints[nCurConstraints] =
+			possibleConstraints[oldPossibleConstraints+i];
+		nCurConstraints++;
+		if (!DeadLockCheckRecurse(proc))
+			return false;		/* found a valid solution! */
+		/* give up on that added constraint, try again */
+		nCurConstraints--;
+	}
+	nPossibleConstraints = oldPossibleConstraints;
+	return true;				/* no solution found */
+}
+
+
+/*--------------------
+ * Test a configuration (current set of constraints) for validity.
+ *
+ * Returns:
+ *		0: the configuration is good (no deadlocks)
+ *	   -1: the configuration has a hard deadlock or is not self-consistent
+ *		>0: the configuration has one or more soft deadlocks
+ *
+ * In the soft-deadlock case, one of the soft cycles is chosen arbitrarily
+ * and a list of its soft edges is returned beginning at
+ * possibleConstraints+nPossibleConstraints.  The return value is the
+ * number of soft edges.
+ *--------------------
+ */
+static bool
+TestConfiguration(PROC *startProc)
+{
+	int		softFound = 0;
+	EDGE   *softEdges = possibleConstraints + nPossibleConstraints;
+	int		nSoftEdges;
+	int		i;
+
+	/*
+	 * Make sure we have room for FindLockCycle's output.
+	 */
+	if (nPossibleConstraints + MaxBackends > maxPossibleConstraints)
+		return -1;
+	/*
+	 * Expand current constraint set into wait orderings.  Fail if the
+	 * constraint set is not self-consistent.
+	 */
+	if (!ExpandConstraints(curConstraints, nCurConstraints))
+		return -1;
+	/*
+	 * Check for cycles involving startProc or any of the procs mentioned
+	 * in constraints.  We check startProc last because if it has a soft
+	 * cycle still to be dealt with, we want to deal with that first.
+	 */
+	for (i = 0; i < nCurConstraints; i++)
+	{
+		if (FindLockCycle(curConstraints[i].waiter, softEdges, &nSoftEdges))
+		{
+			if (nSoftEdges == 0)
+				return -1;		/* hard deadlock detected */
+			softFound = nSoftEdges;
+		}
+		if (FindLockCycle(curConstraints[i].blocker, softEdges, &nSoftEdges))
+		{
+			if (nSoftEdges == 0)
+				return -1;		/* hard deadlock detected */
+			softFound = nSoftEdges;
+		}
+	}
+	if (FindLockCycle(startProc, softEdges, &nSoftEdges))
+	{
+		if (nSoftEdges == 0)
+			return -1;			/* hard deadlock detected */
+		softFound = nSoftEdges;
+	}
+	return softFound;
+}
+
+
+/*
+ * FindLockCycle -- basic check for deadlock cycles
+ *
+ * Scan outward from the given proc to see if there is a cycle in the
+ * waits-for graph that includes this proc.  Return TRUE if a cycle
+ * is found, else FALSE.  If a cycle is found, we also return a list of
+ * the "soft edges", if any, included in the cycle.  These edges could
+ * potentially be eliminated by rearranging wait queues.
+ *
+ * Since we need to be able to check hypothetical configurations that would
+ * exist after wait queue rearrangement, the routine pays attention to the
+ * table of hypothetical queue orders in waitOrders[].  These orders will
+ * be believed in preference to the actual ordering seen in the locktable.
+ */
+static bool
+FindLockCycle(PROC *checkProc,
+			  EDGE *softEdges,	/* output argument */
+			  int *nSoftEdges)	/* output argument */
+{
+	nVisitedProcs = 0;
+	*nSoftEdges = 0;
+	return FindLockCycleRecurse(checkProc, softEdges, nSoftEdges);
+}
+
+static bool
+FindLockCycleRecurse(PROC *checkProc,
+					 EDGE *softEdges,	/* output argument */
+					 int *nSoftEdges)	/* output argument */
+{
+	PROC	   *proc;
+	LOCK	   *lock;
+	HOLDER	   *holder;
+	SHM_QUEUE  *lockHolders;
+	LOCKMETHODTABLE *lockMethodTable;
+	LOCKMETHODCTL *lockctl;
+	PROC_QUEUE *waitQueue;
+	int			queue_size;
+	int			conflictMask;
+	int			i;
+	int			numLockModes,
+				lm;
+
+	/*
+	 * Have we already seen this proc?
+	 */
+	for (i = 0; i < nVisitedProcs; i++)
+	{
+		if (visitedProcs[i] == checkProc)
+		{
+			/* If we return to starting point, we have a deadlock cycle */
+			if (i == 0)
+				return true;
+			/*
+			 * Otherwise, we have a cycle but it does not include the start
+			 * point, so say "no deadlock".
+			 */
+			return false;
+		}
+	}
+	/* Mark proc as seen */
+	Assert(nVisitedProcs < MaxBackends);
+	visitedProcs[nVisitedProcs++] = checkProc;
+	/*
+	 * If the proc is not waiting, we have no outgoing waits-for edges.
+	 */
+	if (checkProc->links.next == INVALID_OFFSET)
+		return false;
+	lock = checkProc->waitLock;
+	if (lock == NULL)
+		return false;
+	lockMethodTable = GetLocksMethodTable(lock);
+	lockctl = lockMethodTable->ctl;
+	numLockModes = lockctl->numLockModes;
+	conflictMask = lockctl->conflictTab[checkProc->waitLockMode];
+	/*
+	 * Scan for procs that already hold conflicting locks.  These are
+	 * "hard" edges in the waits-for graph.
+	 */
+	lockHolders = &(lock->lockHolders);
+
+	holder = (HOLDER *) SHMQueueNext(lockHolders, lockHolders,
+									 offsetof(HOLDER, lockLink));
+
+	while (holder)
+	{
+		proc = (PROC *) MAKE_PTR(holder->tag.proc);
+
+		/* A proc never blocks itself */
+		if (proc != checkProc)
+		{
+			for (lm = 1; lm <= numLockModes; lm++)
+			{
+				if (holder->holding[lm] > 0 &&
+					((1 << lm) & conflictMask) != 0)
+				{
+					/* This proc hard-blocks checkProc */
+					if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+						return true;
+					/* If no deadlock, we're done looking at this holder */
+					break;
+				}
+			}
+		}
+
+		holder = (HOLDER *) SHMQueueNext(lockHolders, &holder->lockLink,
+										 offsetof(HOLDER, lockLink));
+	}
+
+	/*
+	 * Scan for procs that are ahead of this one in the lock's wait queue.
+	 * Those that have conflicting requests soft-block this one.  This must
+	 * be done after the hard-block search, since if another proc both
+	 * hard- and soft-blocks this one, we want to call it a hard edge.
+	 *
+	 * If there is a proposed re-ordering of the lock's wait order,
+	 * use that rather than the current wait order.
+	 */
+	for (i = 0; i < nWaitOrders; i++)
+	{
+		if (waitOrders[i].lock == lock)
+			break;
+	}
+
+	if (i < nWaitOrders)
+	{
+		/* Use the given hypothetical wait queue order */
+		PROC  **procs = waitOrders[i].procs;
+
+		queue_size = waitOrders[i].nProcs;
+
+		for (i = 0; i < queue_size; i++)
+		{
+			proc = procs[i];
+
+			/* Done when we reach the target proc */
+			if (proc == checkProc)
+				break;
+
+			/* Is there a conflict with this guy's request? */
+			if (((1 << proc->waitLockMode) & conflictMask) != 0)
+			{
+				/* This proc soft-blocks checkProc */
+				if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+				{
+					/* Add this edge to the list of soft edges in the cycle */
+					Assert(*nSoftEdges < MaxBackends);
+					softEdges[*nSoftEdges].waiter = checkProc;
+					softEdges[*nSoftEdges].blocker = proc;
+					(*nSoftEdges)++;
+					return true;
+				}
+			}
+		}
+	}
+	else
+	{
+		/* Use the true lock wait queue order */
+		waitQueue = &(lock->waitProcs);
+		queue_size = waitQueue->size;
+
+		proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+
+		while (queue_size-- > 0)
+		{
+			/* Done when we reach the target proc */
+			if (proc == checkProc)
+				break;
+
+			/* Is there a conflict with this guy's request? */
+			if (((1 << proc->waitLockMode) & conflictMask) != 0)
+			{
+				/* This proc soft-blocks checkProc */
+				if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+				{
+					/* Add this edge to the list of soft edges in the cycle */
+					Assert(*nSoftEdges < MaxBackends);
+					softEdges[*nSoftEdges].waiter = checkProc;
+					softEdges[*nSoftEdges].blocker = proc;
+					(*nSoftEdges)++;
+					return true;
+				}
+			}
+
+			proc = (PROC *) MAKE_PTR(proc->links.next);
+		}
+	}
+
+	/*
+	 * No conflict detected here.
+	 */
+	return false;
+}
+
+
+/*
+ * ExpandConstraints -- expand a list of constraints into a set of
+ *		specific new orderings for affected wait queues
+ *
+ * Input is a list of soft edges to be reversed.  The output is a list
+ * of nWaitOrders WAIT_ORDER structs in waitOrders[], with PROC array
+ * workspace in waitOrderProcs[].
+ *
+ * Returns TRUE if able to build an ordering that satisfies all the
+ * constraints, FALSE if not (there are contradictory constraints).
+ */
+static bool
+ExpandConstraints(EDGE *constraints,
+				  int nConstraints)
+{
+	int			nWaitOrderProcs = 0;
+	int			i,
+				j;
+
+	nWaitOrders = 0;
+	/*
+	 * Scan constraint list backwards.  This is because the last-added
+	 * constraint is the only one that could fail, and so we want to test
+	 * it for inconsistency first.
+	 */
+	for (i = nConstraints; --i >= 0; )
+	{
+		PROC   *proc = constraints[i].waiter;
+		LOCK   *lock = proc->waitLock;
+
+		/* Did we already make a list for this lock? */
+		for (j = nWaitOrders; --j >= 0; )
+		{
+			if (waitOrders[j].lock == lock)
+				break;
+		}
+		if (j >= 0)
+			continue;
+		/* No, so allocate a new list */
+		waitOrders[nWaitOrders].lock = lock;
+		waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs;
+		waitOrders[nWaitOrders].nProcs = lock->waitProcs.size;
+		nWaitOrderProcs += lock->waitProcs.size;
+		Assert(nWaitOrderProcs <= MaxBackends);
+		/*
+		 * Do the topo sort.  TopoSort need not examine constraints after
+		 * this one, since they must be for different locks.
+		 */
+		if (!TopoSort(lock, constraints, i+1,
+					  waitOrders[nWaitOrders].procs))
+			return false;
+		nWaitOrders++;
+	}
+	return true;
+}
+
+
+/*
+ * TopoSort -- topological sort of a wait queue
+ *
+ * Generate a re-ordering of a lock's wait queue that satisfies given
+ * constraints about certain procs preceding others.  (Each such constraint
+ * is a fact of a partial ordering.)  Minimize rearrangement of the queue
+ * not needed to achieve the partial ordering.
+ *
+ * This is a lot simpler and slower than, for example, the topological sort
+ * algorithm shown in Knuth's Volume 1.  However, Knuth's method doesn't
+ * try to minimize the damage to the existing order.  In practice we are
+ * not likely to be working with more than a few constraints, so the apparent
+ * slowness of the algorithm won't really matter.
+ *
+ * The initial queue ordering is taken directly from the lock's wait queue.
+ * The output is an array of PROC pointers, of length equal to the lock's
+ * wait queue length (the caller is responsible for providing this space).
+ * The partial order is specified by an array of EDGE structs.  Each EDGE
+ * is one that we need to reverse, therefore the "waiter" must appear before
+ * the "blocker" in the output array.  The EDGE array may well contain
+ * edges associated with other locks; these should be ignored.
+ *
+ * Returns TRUE if able to build an ordering that satisfies all the
+ * constraints, FALSE if not (there are contradictory constraints).
+ */
+static bool
+TopoSort(LOCK *lock,
+		 EDGE *constraints,
+		 int nConstraints,
+		 PROC **ordering)		/* output argument */
+{
+	PROC_QUEUE *waitQueue = &(lock->waitProcs);
+	int			queue_size = waitQueue->size;
+	PROC	   *proc;
+	int			i,
+				j,
+				k,
+				last;
+
+	/* First, fill topoProcs[] array with the procs in their current order */
+	proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+	for (i = 0; i < queue_size; i++)
+	{
+		topoProcs[i] = proc;
+		proc = (PROC *) MAKE_PTR(proc->links.next);
+	}
+
+	/*
+	 * Scan the constraints, and for each proc in the array, generate a count
+	 * of the number of constraints that say it must be before something else,
+	 * plus a list of the constraints that say it must be after something else.
+	 * The count for the j'th proc is stored in beforeConstraints[j], and the
+	 * head of its list in afterConstraints[j].  Each constraint stores its
+	 * list link in constraints[i].link (note any constraint will be in
+	 * just one list).  The array index for the before-proc of the i'th
+	 * constraint is remembered in constraints[i].pred.
+	 */
+	MemSet(beforeConstraints, 0, queue_size * sizeof(int));
+	MemSet(afterConstraints, 0, queue_size * sizeof(int));
+	for (i = 0; i < nConstraints; i++)
+	{
+		proc = constraints[i].waiter;
+		/* Ignore constraint if not for this lock */
+		if (proc->waitLock != lock)
+			continue;
+		/* Find the waiter proc in the array */
+		for (j = queue_size; --j >= 0; )
+		{
+			if (topoProcs[j] == proc)
+				break;
+		}
+		Assert(j >= 0);			/* should have found a match */
+		/* Find the blocker proc in the array */
+		proc = constraints[i].blocker;
+		for (k = queue_size; --k >= 0; )
+		{
+			if (topoProcs[k] == proc)
+				break;
+		}
+		Assert(k >= 0);			/* should have found a match */
+		beforeConstraints[j]++;	/* waiter must come before */
+		/* add this constraint to list of after-constraints for blocker */
+		constraints[i].pred = j;
+		constraints[i].link = afterConstraints[k];
+		afterConstraints[k] = i+1;
+	}
+	/*--------------------
+	 * Now scan the topoProcs array backwards.  At each step, output the
+	 * last proc that has no remaining before-constraints, and decrease
+	 * the beforeConstraints count of each of the procs it was constrained
+	 * against.
+	 * i = index of ordering[] entry we want to output this time
+	 * j = search index for topoProcs[]
+	 * k = temp for scanning constraint list for proc j
+	 * last = last non-null index in topoProcs (avoid redundant searches)
+	 *--------------------
+	 */
+	last = queue_size-1;
+	for (i = queue_size; --i >= 0; )
+	{
+		/* Find next candidate to output */
+		while (topoProcs[last] == NULL)
+			last--;
+		for (j = last; j >= 0; j--)
+		{
+			if (topoProcs[j] != NULL && beforeConstraints[j] == 0)
+				break;
+		}
+		/* If no available candidate, topological sort fails */
+		if (j < 0)
+			return false;
+		/* Output candidate, and mark it done by zeroing topoProcs[] entry */
+		ordering[i] = topoProcs[j];
+		topoProcs[j] = NULL;
+		/* Update beforeConstraints counts of its predecessors */
+		for (k = afterConstraints[j]; k > 0; k = constraints[k-1].link)
+		{
+			beforeConstraints[constraints[k-1].pred]--;
+		}
+	}
+
+	/* Done */
+	return true;
+}
+
+#ifdef DEBUG_DEADLOCK
+static void
+PrintLockQueue(LOCK *lock, const char *info)
+{
+	PROC_QUEUE *waitQueue = &(lock->waitProcs);
+	int			queue_size = waitQueue->size;
+	PROC	   *proc;
+	int			i;
+
+	printf("%s lock %lx queue ", info, MAKE_OFFSET(lock));
+	proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+	for (i = 0; i < queue_size; i++)
+	{
+		printf(" %d", proc->pid);
+		proc = (PROC *) MAKE_PTR(proc->links.next);
+	}
+	printf("\n");
+	fflush(stdout);
+}
+#endif
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 3d77ab2b4d1..08e023718eb 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.80 2001/01/24 19:43:08 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.81 2001/01/25 03:31:16 tgl Exp $
  *
  * NOTES
  *	  Outside modules can create a lock table and acquire/release
@@ -24,16 +24,16 @@
  *
  *	LockAcquire(), LockRelease(), LockMethodTableInit(),
  *	LockMethodTableRename(), LockReleaseAll,
- *	LockResolveConflicts(), GrantLock()
+ *	LockCheckConflicts(), GrantLock()
  *
  *-------------------------------------------------------------------------
  */
+#include "postgres.h"
+
 #include <sys/types.h>
 #include <unistd.h>
 #include <signal.h>
 
-#include "postgres.h"
-
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "storage/proc.h"
@@ -44,7 +44,6 @@ static int	WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
 					   LOCK *lock, HOLDER *holder);
 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
 							 int *myHolding);
-static int LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
 
 static char *lock_types[] =
 {
@@ -211,6 +210,18 @@ LockingDisabled(void)
 	return LockingIsDisabled;
 }
 
+/*
+ * Fetch the lock method table associated with a given lock
+ */
+LOCKMETHODTABLE *
+GetLocksMethodTable(LOCK *lock)
+{
+	LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock);
+
+	Assert(lockmethod > 0 && lockmethod < NumLockMethods);
+	return LockMethodTable[lockmethod];
+}
+
 
 /*
  * LockMethodInit -- initialize the lock table's lock type
@@ -559,7 +570,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 	if (!holder)
 	{
 		SpinRelease(masterLock);
-		elog(NOTICE, "LockAcquire: holder table corrupted");
+		elog(FATAL, "LockAcquire: holder table corrupted");
 		return FALSE;
 	}
 
@@ -623,11 +634,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
 
 	/* --------------------
-	 * If I'm the only one holding any lock on this object, then there
-	 * cannot be a conflict. The same is true if I already hold this lock.
+	 * If I already hold one or more locks of the requested type,
+	 * just grant myself another one without blocking.
 	 * --------------------
 	 */
-	if (holder->nHolding == lock->nGranted || holder->holding[lockmode] != 0)
+	if (holder->holding[lockmode] > 0)
 	{
 		GrantLock(lock, holder, lockmode);
 		HOLDER_PRINT("LockAcquire: owning", holder);
@@ -637,11 +648,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 
 	/* --------------------
 	 * If this process (under any XID) is a holder of the lock,
-	 * then there is no conflict, either.
+	 * also grant myself another one without blocking.
 	 * --------------------
 	 */
 	LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
-	if (myHolding[lockmode] != 0)
+	if (myHolding[lockmode] > 0)
 	{
 		GrantLock(lock, holder, lockmode);
 		HOLDER_PRINT("LockAcquire: my other XID owning", holder);
@@ -649,42 +660,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 		return TRUE;
 	}
 
-	/*
-	 * If lock requested conflicts with locks requested by waiters...
+	/* --------------------
+	 * If lock requested conflicts with locks requested by waiters,
+	 * must join wait queue.  Otherwise, check for conflict with
+	 * already-held locks.  (That's last because most complex check.)
+	 * --------------------
 	 */
 	if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
-	{
-		/*
-		 * If my process doesn't hold any locks that conflict with waiters
-		 * then force to sleep, so that prior waiters get first chance.
-		 */
-		for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++)
-		{
-			if (myHolding[i] > 0 &&
-				lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
-				break;			/* yes, there is a conflict */
-		}
-
-		if (i > lockMethodTable->ctl->numLockModes)
-		{
-			HOLDER_PRINT("LockAcquire: another proc already waiting",
-						 holder);
-			status = STATUS_FOUND;
-		}
-		else
-			status = LockResolveConflicts(lockmethod, lockmode,
-										  lock, holder,
-										  MyProc, myHolding);
-	}
+		status = STATUS_FOUND;
 	else
-		status = LockResolveConflicts(lockmethod, lockmode,
-									  lock, holder,
-									  MyProc, myHolding);
+		status = LockCheckConflicts(lockMethodTable, lockmode,
+									lock, holder,
+									MyProc, myHolding);
 
 	if (status == STATUS_OK)
+	{
+		/* No conflict with held or previously requested locks */
 		GrantLock(lock, holder, lockmode);
-	else if (status == STATUS_FOUND)
+	}
+	else
 	{
+		Assert(status == STATUS_FOUND);
 #ifdef USER_LOCKS
 
 		/*
@@ -765,49 +761,50 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 }
 
 /* ----------------------------
- * LockResolveConflicts -- test for lock conflicts
+ * LockCheckConflicts -- test whether requested lock conflicts
+ *		with those already granted
+ *
+ * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
  *
  * NOTES:
- *		Here's what makes this complicated: one transaction's
- * locks don't conflict with one another.  When many processes
- * hold locks, each has to subtract off the other's locks when
- * determining whether or not any new lock acquired conflicts with
- * the old ones.
+ *		Here's what makes this complicated: one process's locks don't
+ * conflict with one another, even if they are held under different
+ * transaction IDs (eg, session and xact locks do not conflict).
+ * So, we must subtract off our own locks when determining whether the
+ * requested new lock conflicts with those already held.
  *
  * The caller can optionally pass the process's total holding counts, if
  * known.  If NULL is passed then these values will be computed internally.
  * ----------------------------
  */
 int
-LockResolveConflicts(LOCKMETHOD lockmethod,
-					 LOCKMODE lockmode,
-					 LOCK *lock,
-					 HOLDER *holder,
-					 PROC *proc,
-					 int *myHolding)		/* myHolding[] array or NULL */
+LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+				   LOCKMODE lockmode,
+				   LOCK *lock,
+				   HOLDER *holder,
+				   PROC *proc,
+				   int *myHolding)		/* myHolding[] array or NULL */
 {
-	LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl;
+	LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
 	int			numLockModes = lockctl->numLockModes;
 	int			bitmask;
 	int			i,
 				tmpMask;
 	int			localHolding[MAX_LOCKMODES];
 
-	Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
-
 	/* ----------------------------
 	 * first check for global conflicts: If no locks conflict
-	 * with mine, then I get the lock.
+	 * with my request, then I get the lock.
 	 *
 	 * Checking for conflict: lock->grantMask represents the types of
 	 * currently held locks.  conflictTable[lockmode] has a bit
-	 * set for each type of lock that conflicts with mine.	Bitwise
+	 * set for each type of lock that conflicts with request.	Bitwise
 	 * compare tells if there is a conflict.
 	 * ----------------------------
 	 */
 	if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
 	{
-		HOLDER_PRINT("LockResolveConflicts: no conflict", holder);
+		HOLDER_PRINT("LockCheckConflicts: no conflict", holder);
 		return STATUS_OK;
 	}
 
@@ -844,11 +841,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
 	if (!(lockctl->conflictTab[lockmode] & bitmask))
 	{
 		/* no conflict. OK to get the lock */
-		HOLDER_PRINT("LockResolveConflicts: resolved", holder);
+		HOLDER_PRINT("LockCheckConflicts: resolved", holder);
 		return STATUS_OK;
 	}
 
-	HOLDER_PRINT("LockResolveConflicts: conflicting", holder);
+	HOLDER_PRINT("LockCheckConflicts: conflicting", holder);
 	return STATUS_FOUND;
 }
 
@@ -889,33 +886,12 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
 	}
 }
 
-/*
- * LockGetMyHeldLocks -- compute bitmask of lock types held by a process
- *		for a given lockable object.
- */
-static int
-LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
-{
-	int			myHolding[MAX_LOCKMODES];
-	int			heldLocks = 0;
-	int			i,
-				tmpMask;
-
-	LockCountMyLocks(lockOffset, proc, myHolding);
-
-	for (i = 1, tmpMask = 2;
-		 i < MAX_LOCKMODES;
-		 i++, tmpMask <<= 1)
-	{
-		if (myHolding[i] > 0)
-			heldLocks |= tmpMask;
-	}
-	return heldLocks;
-}
-
 /*
  * GrantLock -- update the lock and holder data structures to show
  *		the lock request has been granted.
+ *
+ * NOTE: if proc was blocked, it also needs to be removed from the wait list
+ * and have its waitLock/waitHolder fields cleared.  That's not done here.
  */
 void
 GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
@@ -936,6 +912,9 @@ GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
 /*
  * WaitOnLock -- wait to acquire a lock
  *
+ * Caller must have set MyProc->heldLocks to reflect locks already held
+ * on the lockable object by this process (under all XIDs).
+ *
  * The locktable spinlock must be held at entry.
  */
 static int
@@ -956,7 +935,7 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
 	strcat(new_status, " waiting");
 	set_ps_display(new_status);
 
-	/*
+	/* -------------------
 	 * NOTE: Think not to put any lock state cleanup after the call to
 	 * ProcSleep, in either the normal or failure path.  The lock state
 	 * must be fully set by the lock grantor, or by HandleDeadLock if we
@@ -965,12 +944,13 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
 	 * after someone else grants us the lock, but before we've noticed it.
 	 * Hence, after granting, the locktable state must fully reflect the
 	 * fact that we own the lock; we can't do additional work on return.
+	 * -------------------
 	 */
 
-	if (ProcSleep(lockMethodTable->ctl,
+	if (ProcSleep(lockMethodTable,
 				  lockmode,
 				  lock,
-				  holder) != NO_ERROR)
+				  holder) != STATUS_OK)
 	{
 		/* -------------------
 		 * We failed as a result of a deadlock, see HandleDeadLock().
@@ -992,14 +972,60 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
 	return STATUS_OK;
 }
 
+/*--------------------
+ * Remove a proc from the wait-queue it is on
+ * (caller must know it is on one).
+ *
+ * Locktable lock must be held by caller.
+ *
+ * NB: this does not remove the process' holder object, nor the lock object,
+ * even though their counts might now have gone to zero.  That will happen
+ * during a subsequent LockReleaseAll call, which we expect will happen
+ * during transaction cleanup.  (Removal of a proc from its wait queue by
+ * this routine can only happen if we are aborting the transaction.)
+ *--------------------
+ */
+void
+RemoveFromWaitQueue(PROC *proc)
+{
+	LOCK   *waitLock = proc->waitLock;
+	LOCKMODE lockmode = proc->waitLockMode;
+
+	/* Make sure proc is waiting */
+	Assert(proc->links.next != INVALID_OFFSET);
+	Assert(waitLock);
+	Assert(waitLock->waitProcs.size > 0);
+
+	/* Remove proc from lock's wait queue */
+	SHMQueueDelete(&(proc->links));
+	waitLock->waitProcs.size--;
+
+	/* Undo increments of request counts by waiting process */
+	Assert(waitLock->nRequested > 0);
+	Assert(waitLock->nRequested > proc->waitLock->nGranted);
+	waitLock->nRequested--;
+	Assert(waitLock->requested[lockmode] > 0);
+	waitLock->requested[lockmode]--;
+	/* don't forget to clear waitMask bit if appropriate */
+	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
+		waitLock->waitMask &= BITS_OFF[lockmode];
+
+	/* Clean up the proc's own state */
+	proc->waitLock = NULL;
+	proc->waitHolder = NULL;
+
+	/* See if any other waiters for the lock can be woken up now */
+	ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
+}
+
 /*
  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
- *		release it.
+ *		release one 'lockmode' lock on it.
  *
- * Side Effects: if the lock no longer conflicts with the highest
- *		priority waiting process, that process is granted the lock
- *		and awoken. (We have to grant the lock here to avoid a
- *		race between the waking process and any new process to
+ * Side Effects: find any waiting processes that are now wakable,
+ *		grant them their requested locks and awaken them.
+ *		(We have to grant the lock here to avoid a race between
+ *		the waking process and any new process to
  *		come along and request the lock.)
  */
 bool
@@ -1013,7 +1039,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 	HOLDER	   *holder;
 	HOLDERTAG	holdertag;
 	HTAB	   *holderTable;
-	bool		wakeupNeeded = true;
+	bool		wakeupNeeded = false;
 
 #ifdef LOCK_DEBUG
 	if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
@@ -1086,7 +1112,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 		return FALSE;
 	}
 	HOLDER_PRINT("LockRelease: found", holder);
-	Assert(holder->tag.lock == MAKE_OFFSET(lock));
 
 	/*
 	 * Check that we are actually holding a lock of the type we want to
@@ -1094,11 +1119,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 	 */
 	if (!(holder->holding[lockmode] > 0))
 	{
-		SpinRelease(masterLock);
 		HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
+		Assert(holder->holding[lockmode] >= 0);
+		SpinRelease(masterLock);
 		elog(NOTICE, "LockRelease: you don't own a lock of type %s",
 			 lock_types[lockmode]);
-		Assert(holder->holding[lockmode] >= 0);
 		return FALSE;
 	}
 	Assert(holder->nHolding > 0);
@@ -1120,34 +1145,24 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 		lock->grantMask &= BITS_OFF[lockmode];
 	}
 
-#ifdef NOT_USED
+	LOCK_PRINT("LockRelease: updated", lock, lockmode);
+	Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
+	Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
+	Assert(lock->nGranted <= lock->nRequested);
+
 	/* --------------------------
-	 * If there are still active locks of the type I just released, no one
-	 * should be woken up.	Whoever is asleep will still conflict
-	 * with the remaining locks.
+	 * We need only run ProcLockWakeup if the released lock conflicts with
+	 * at least one of the lock types requested by waiter(s).  Otherwise
+	 * whatever conflict made them wait must still exist.  NOTE: before MVCC,
+	 * we could skip wakeup if lock->granted[lockmode] was still positive.
+	 * But that's not true anymore, because the remaining granted locks might
+	 * belong to some waiter, who could now be awakened because he doesn't
+	 * conflict with his own locks.
 	 * --------------------------
 	 */
-	if (lock->granted[lockmode])
-		wakeupNeeded = false;
-	else
-#endif
-
-		/*
-		 * Above is not valid any more (due to MVCC lock modes). Actually
-		 * we should compare granted[lockmode] with number of
-		 * waiters holding lock of this type and try to wakeup only if
-		 * these numbers are equal (and lock released conflicts with locks
-		 * requested by waiters). For the moment we only check the last
-		 * condition.
-		 */
 	if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
 		wakeupNeeded = true;
 
-	LOCK_PRINT("LockRelease: updated", lock, lockmode);
-	Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
-	Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
-	Assert(lock->nGranted <= lock->nRequested);
-
 	if (lock->nRequested == 0)
 	{
 		/* ------------------
@@ -1161,8 +1176,13 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 									(Pointer) &(lock->tag),
 									HASH_REMOVE,
 									&found);
-		Assert(lock && found);
-		wakeupNeeded = false;
+		if (!lock || !found)
+		{
+			SpinRelease(masterLock);
+			elog(NOTICE, "LockRelease: remove lock, table corrupted");
+			return FALSE;
+		}
+		wakeupNeeded = false;	/* should be false, but make sure */
 	}
 
 	/*
@@ -1192,12 +1212,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 		}
 	}
 
+	/*
+	 * Wake up waiters if needed.
+	 */
 	if (wakeupNeeded)
-		ProcLockWakeup(lockmethod, lock);
-#ifdef LOCK_DEBUG
-	else if (LOCK_DEBUG_ENABLED(lock))
-        elog(DEBUG, "LockRelease: no wakeup needed");
-#endif
+		ProcLockWakeup(lockMethodTable, lock);
 
 	SpinRelease(masterLock);
 	return TRUE;
@@ -1310,8 +1329,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
 		else
 		{
 			/* --------------
-			 * set nRequested to zero so that we can garbage collect the lock
-			 * down below...
+			 * This holder accounts for all the requested locks on the object,
+			 * so we can be lazy and just zero things out.
 			 * --------------
 			 */
 			lock->nRequested = 0;
@@ -1347,7 +1366,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
 			return FALSE;
 		}
 
-		if (!lock->nRequested)
+		if (lock->nRequested == 0)
 		{
 			/* --------------------
 			 * We've just released the last lock, so garbage-collect the
@@ -1359,7 +1378,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
 			lock = (LOCK *) hash_search(lockMethodTable->lockHash,
 										(Pointer) &(lock->tag),
 										HASH_REMOVE, &found);
-			if ((!lock) || (!found))
+			if (!lock || !found)
 			{
 				SpinRelease(masterLock);
 				elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
@@ -1367,7 +1386,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
 			}
 		}
 		else if (wakeupNeeded)
-			ProcLockWakeup(lockmethod, lock);
+			ProcLockWakeup(lockMethodTable, lock);
 
 next_item:
 		holder = nextHolder;
@@ -1412,245 +1431,6 @@ LockShmemSize(int maxBackends)
 	return size;
 }
 
-/*
- * DeadLockCheck -- Checks for deadlocks for a given process
- *
- * This code takes a list of locks a process holds, and the lock that
- * the process is sleeping on, and tries to find if any of the processes
- * waiting on its locks hold the lock it is waiting for.  If no deadlock
- * is found, it goes on to look at all the processes waiting on their locks.
- *
- * We can't block on user locks, so no sense testing for deadlock
- * because there is no blocking, and no timer for the block.  So,
- * only look at regular locks.
- *
- * We have already locked the master lock before being called.
- */
-bool
-DeadLockCheck(PROC *thisProc, LOCK *findlock)
-{
-	PROC	   *waitProc;
-	PROC_QUEUE *waitQueue;
-	SHM_QUEUE  *procHolders = &(thisProc->procHolders);
-	HOLDER	   *holder;
-	HOLDER	   *nextHolder;
-	LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
-	LOCK	   *lock;
-	int			i,
-				j;
-	bool		first_run = (thisProc == MyProc);
-
-	static PROC *checked_procs[MAXBACKENDS];
-	static int	nprocs;
-
-	/* initialize at start of recursion */
-	if (first_run)
-	{
-		checked_procs[0] = thisProc;
-		nprocs = 1;
-	}
-
-	/*
-	 * Scan over all the locks held/awaited by thisProc.
-	 */
-	holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
-									 offsetof(HOLDER, procLink));
-
-	while (holder)
-	{
-		/* Get link first, since we may unlink/delete this holder */
-		nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
-											 offsetof(HOLDER, procLink));
-
-		Assert(holder->tag.proc == MAKE_OFFSET(thisProc));
-
-		lock = (LOCK *) MAKE_PTR(holder->tag.lock);
-
-		/* Ignore user locks */
-		if (lock->tag.lockmethod != DEFAULT_LOCKMETHOD)
-			goto nxtl;
-
-		HOLDER_PRINT("DeadLockCheck", holder);
-		LOCK_PRINT("DeadLockCheck", lock, 0);
-
-		/*
-		 * waitLock is always in procHolders of waiting proc, if !first_run
-		 * then upper caller will handle waitProcs queue of waitLock.
-		 */
-		if (thisProc->waitLock == lock && !first_run)
-			goto nxtl;
-
-		/*
-		 * If we found proc holding findlock and sleeping on some my other
-		 * lock then we have to check does it block me or another waiters.
-		 */
-		if (lock == findlock && !first_run)
-		{
-			int			lm;
-
-			Assert(holder->nHolding > 0);
-			for (lm = 1; lm <= lockctl->numLockModes; lm++)
-			{
-				if (holder->holding[lm] > 0 &&
-					lockctl->conflictTab[lm] & findlock->waitMask)
-					return true;
-			}
-
-			/*
-			 * Else - get the next lock from thisProc's procHolders
-			 */
-			goto nxtl;
-		}
-
-		waitQueue = &(lock->waitProcs);
-		waitProc = (PROC *) MAKE_PTR(waitQueue->links.next);
-
-		/*
-		 * Inner loop scans over all processes waiting for this lock.
-		 *
-		 * NOTE: loop must count down because we want to examine each item
-		 * in the queue even if waitQueue->size decreases due to waking up
-		 * some of the processes.
-		 */
-		for (i = waitQueue->size; --i >= 0; )
-		{
-			Assert(waitProc->waitLock == lock);
-			if (waitProc == thisProc)
-			{
-				/* This should only happen at first level */
-				Assert(waitProc == MyProc);
-				goto nextWaitProc;
-			}
-			if (lock == findlock)		/* first_run also true */
-			{
-				/*
-				 * If I'm blocked by his heldLocks...
-				 */
-				if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->heldLocks)
-				{
-					/* and he blocked by me -> deadlock */
-					if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
-						return true;
-					/* we shouldn't look at procHolders of our blockers */
-					goto nextWaitProc;
-				}
-
-				/*
-				 * If he isn't blocked by me and we request
-				 * non-conflicting lock modes - no deadlock here because
-				 * he isn't blocked by me in any sense (explicitly or
-				 * implicitly). Note that we don't do like test if
-				 * !first_run (when thisProc is holder and non-waiter on
-				 * lock) and so we call DeadLockCheck below for every
-				 * waitProc in thisProc->procHolders, even for waitProc-s
-				 * un-blocked by thisProc. Should we? This could save us
-				 * some time...
-				 */
-				if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) &&
-					!(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode)))
-					goto nextWaitProc;
-			}
-
-			/*
-			 * Skip this waiter if already checked.
-			 */
-			for (j = 0; j < nprocs; j++)
-			{
-				if (checked_procs[j] == waitProc)
-					goto nextWaitProc;
-			}
-
-			/* Recursively check this process's procHolders. */
-			Assert(nprocs < MAXBACKENDS);
-			checked_procs[nprocs++] = waitProc;
-
-			if (DeadLockCheck(waitProc, findlock))
-			{
-				int			heldLocks;
-
-				/*
-				 * Ok, but is waitProc waiting for me (thisProc) ?
-				 */
-				if (thisProc->waitLock == lock)
-				{
-					Assert(first_run);
-					heldLocks = thisProc->heldLocks;
-				}
-				else
-				{
-					/* should we cache heldLocks to speed this up? */
-					heldLocks = LockGetMyHeldLocks(holder->tag.lock, thisProc);
-					Assert(heldLocks != 0);
-				}
-				if (lockctl->conflictTab[waitProc->waitLockMode] & heldLocks)
-				{
-					/*
-					 * Last attempt to avoid deadlock: try to wakeup myself.
-					 */
-					if (first_run)
-					{
-						if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
-												 MyProc->waitLockMode,
-												 MyProc->waitLock,
-												 MyProc->waitHolder,
-												 MyProc,
-												 NULL) == STATUS_OK)
-						{
-							GrantLock(MyProc->waitLock,
-									  MyProc->waitHolder,
-									  MyProc->waitLockMode);
-							ProcWakeup(MyProc, NO_ERROR);
-							return false;
-						}
-					}
-					return true;
-				}
-
-				/*
-				 * Hell! Is he blocked by any (other) holder ?
-				 */
-				if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
-										 waitProc->waitLockMode,
-										 lock,
-										 waitProc->waitHolder,
-										 waitProc,
-										 NULL) != STATUS_OK)
-				{
-					/*
-					 * Blocked by others - no deadlock...
-					 */
-					LOCK_PRINT("DeadLockCheck: blocked by others",
-							   lock, waitProc->waitLockMode);
-					goto nextWaitProc;
-				}
-
-				/*
-				 * Well - wakeup this guy! This is the case of
-				 * implicit blocking: thisProc blocked someone who
-				 * blocked waitProc by the fact that he/someone is
-				 * already waiting for lock.  We do this for
-				 * anti-starving.
-				 */
-				GrantLock(lock, waitProc->waitHolder, waitProc->waitLockMode);
-				waitProc = ProcWakeup(waitProc, NO_ERROR);
-				/*
-				 * Use next-proc link returned by ProcWakeup, since this
-				 * proc's own links field is now cleared.
-				 */
-				continue;
-			}
-
-nextWaitProc:
-			waitProc = (PROC *) MAKE_PTR(waitProc->links.next);
-		}
-
-nxtl:
-		holder = nextHolder;
-	}
-
-	/* if we got here, no deadlock */
-	return false;
-}
 
 #ifdef LOCK_DEBUG
 /*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 377e9dbeb58..fd4c4b14856 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.97 2001/01/25 03:31:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -18,7 +18,7 @@
  *
  *
  * Interface (a):
- *		ProcSleep(), ProcWakeup(), ProcWakeupNext(),
+ *		ProcSleep(), ProcWakeup(),
  *		ProcQueueAlloc() -- create a shm queue for sleeping processes
  *		ProcQueueInit() -- create a queue without allocing memory
  *
@@ -47,8 +47,6 @@
  *		shared among backends (we keep a few sets of semaphores around).
  *		This is so that we can support more backends. (system-wide semaphore
  *		sets run out pretty fast.)				  -ay 4/95
- *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $
  */
 #include "postgres.h"
 
@@ -257,7 +255,7 @@ InitProcess(void)
 	}
 
 	SHMQueueElemInit(&(MyProc->links));
-	MyProc->errType = NO_ERROR;
+	MyProc->errType = STATUS_OK;
 	MyProc->pid = MyProcPid;
 	MyProc->databaseId = MyDatabaseId;
 	MyProc->xid = InvalidTransactionId;
@@ -284,7 +282,16 @@ InitProcess(void)
 		(location != MAKE_OFFSET(MyProc)))
 		elog(STOP, "InitProcess: ShmemPID table broken");
 
+	/*
+	 * Arrange to clean up at backend exit.
+	 */
 	on_shmem_exit(ProcKill, 0);
+
+	/*
+	 * Now that we have a PROC, we could try to acquire locks,
+	 * so initialize the deadlock checker.
+	 */
+	InitDeadLockChecking();
 }
 
 /*
@@ -304,50 +311,6 @@ ZeroProcSemaphore(PROC *proc)
 	}
 }
 
-/*
- * Remove a proc from the wait-queue it is on
- * (caller must know it is on one).
- * Locktable lock must be held by caller.
- *
- * NB: this does not remove the process' holder object, nor the lock object,
- * even though their counts might now have gone to zero.  That will happen
- * during a subsequent LockReleaseAll call, which we expect will happen
- * during transaction cleanup.  (Removal of a proc from its wait queue by
- * this routine can only happen if we are aborting the transaction.)
- */
-static void
-RemoveFromWaitQueue(PROC *proc)
-{
-	LOCK   *waitLock = proc->waitLock;
-	LOCKMODE lockmode = proc->waitLockMode;
-
-	/* Make sure proc is waiting */
-	Assert(proc->links.next != INVALID_OFFSET);
-	Assert(waitLock);
-	Assert(waitLock->waitProcs.size > 0);
-
-	/* Remove proc from lock's wait queue */
-	SHMQueueDelete(&(proc->links));
-	waitLock->waitProcs.size--;
-
-	/* Undo increments of request counts by waiting process */
-	Assert(waitLock->nRequested > 0);
-	Assert(waitLock->nRequested > proc->waitLock->nGranted);
-	waitLock->nRequested--;
-	Assert(waitLock->requested[lockmode] > 0);
-	waitLock->requested[lockmode]--;
-	/* don't forget to clear waitMask bit if appropriate */
-	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
-		waitLock->waitMask &= ~(1 << lockmode);
-
-	/* Clean up the proc's own state */
-	proc->waitLock = NULL;
-	proc->waitHolder = NULL;
-
-	/* See if any other waiters for the lock can be woken up now */
-	ProcLockWakeup(LOCK_LOCKMETHOD(*waitLock), waitLock);
-}
-
 /*
  * Cancel any pending wait for lock, when aborting a transaction.
  *
@@ -529,34 +492,34 @@ ProcQueueInit(PROC_QUEUE *queue)
 /*
  * ProcSleep -- put a process to sleep
  *
- * P() on the semaphore should put us to sleep.  The process
- * semaphore is normally zero, so when we try to acquire it, we sleep.
+ * Caller must have set MyProc->heldLocks to reflect locks already held
+ * on the lockable object by this process (under all XIDs).
  *
  * Locktable's spinlock must be held at entry, and will be held
  * at exit.
  *
- * Result is NO_ERROR if we acquired the lock, STATUS_ERROR if not (deadlock).
+ * Result: STATUS_OK if we acquired the lock, STATUS_ERROR if not (deadlock).
  *
  * ASSUME: that no one will fiddle with the queue until after
  *		we release the spin lock.
  *
  * NOTES: The process queue is now a priority queue for locking.
+ *
+ * P() on the semaphore should put us to sleep.  The process
+ * semaphore is normally zero, so when we try to acquire it, we sleep.
  */
 int
-ProcSleep(LOCKMETHODCTL *lockctl,
+ProcSleep(LOCKMETHODTABLE *lockMethodTable,
 		  LOCKMODE lockmode,
 		  LOCK *lock,
 		  HOLDER *holder)
 {
-	PROC_QUEUE *waitQueue = &(lock->waitProcs);
+	LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
 	SPINLOCK	spinlock = lockctl->masterLock;
-	int			myMask = (1 << lockmode);
-	int			waitMask = lock->waitMask;
+	PROC_QUEUE *waitQueue = &(lock->waitProcs);
+	int			myHeldLocks = MyProc->heldLocks;
 	PROC	   *proc;
 	int			i;
-	int			aheadGranted[MAX_LOCKMODES];
-	bool		selfConflict = (lockctl->conflictTab[lockmode] & myMask),
-				prevSame = false;
 #ifndef __BEOS__
 	struct itimerval timeval,
 				dummy;
@@ -564,64 +527,63 @@ ProcSleep(LOCKMETHODCTL *lockctl,
     bigtime_t time_interval;
 #endif
 
-	proc = (PROC *) MAKE_PTR(waitQueue->links.next);
-
-	/* if we don't conflict with any waiter - be first in queue */
-	if (!(lockctl->conflictTab[lockmode] & waitMask))
-		goto ins;
-
-	/* otherwise, determine where we should go into the queue */
-	for (i = 1; i < MAX_LOCKMODES; i++)
-		aheadGranted[i] = lock->granted[i];
-	(aheadGranted[lockmode])++;
-
-	for (i = 0; i < waitQueue->size; i++)
+	/* ----------------------
+	 * Determine where to add myself in the wait queue.
+	 *
+	 * Normally I should go at the end of the queue.  However, if I already
+	 * hold locks that conflict with the request of any previous waiter,
+	 * put myself in the queue just in front of the first such waiter.
+	 * This is not a necessary step, since deadlock detection would move
+	 * me to before that waiter anyway; but it's relatively cheap to detect
+	 * such a conflict immediately, and avoid delaying till deadlock timeout.
+	 *
+	 * Special case: if I find I should go in front of the first waiter,
+	 * and I do not conflict with already-held locks, then just grant myself
+	 * the requested lock immediately.
+	 * ----------------------
+	 */
+	if (myHeldLocks != 0)
 	{
-		LOCKMODE	procWaitMode = proc->waitLockMode;
-
-		/* must I wait for him ? */
-		if (lockctl->conflictTab[lockmode] & proc->heldLocks)
+		proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+		for (i = 0; i < waitQueue->size; i++)
 		{
-			/* is he waiting for me ? */
-			if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
+			/* Must he wait for me? */
+			if (lockctl->conflictTab[proc->waitLockMode] & myHeldLocks)
 			{
-				/* Yes, report deadlock failure */
-				MyProc->errType = STATUS_ERROR;
-				return STATUS_ERROR;
-			}
-			/* I must go after him in queue - so continue loop */
-		}
-		/* if he waits for me, go before him in queue */
-		else if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
-			break;
-		/* if conflicting locks requested */
-		else if (lockctl->conflictTab[procWaitMode] & myMask)
-		{
-
-			/*
-			 * If I request non self-conflicting lock and there are others
-			 * requesting the same lock just before this guy - stop here.
-			 */
-			if (!selfConflict && prevSame)
+				/* Must I wait for him ? */
+				if (lockctl->conflictTab[lockmode] & proc->heldLocks)
+				{
+					/* Yes, can report deadlock failure immediately */
+					MyProc->errType = STATUS_ERROR;
+					return STATUS_ERROR;
+				}
+				if (i == 0)
+				{
+					/* I must go before first waiter.  Check special case. */
+					if (LockCheckConflicts(lockMethodTable,
+										   lockmode,
+										   lock,
+										   holder,
+										   MyProc,
+										   NULL) == STATUS_OK)
+					{
+						/* Skip the wait and just grant myself the lock. */
+						GrantLock(lock, holder, lockmode);
+						return STATUS_OK;
+					}
+				}
+				/* Break out of loop to put myself before him */
 				break;
+			}
+			proc = (PROC *) MAKE_PTR(proc->links.next);
 		}
-
-		/*
-		 * Last attempt to not move any further to the back of the queue:
-		 * if we don't conflict with remaining waiters, stop here.
-		 */
-		else if (!(lockctl->conflictTab[lockmode] & waitMask))
-			break;
-
-		/* Move past this guy, and update state accordingly */
-		prevSame = (procWaitMode == lockmode);
-		(aheadGranted[procWaitMode])++;
-		if (aheadGranted[procWaitMode] == lock->requested[procWaitMode])
-			waitMask &= ~(1 << procWaitMode);
-		proc = (PROC *) MAKE_PTR(proc->links.next);
+	}
+	else
+	{
+		/* I hold no locks, so I can't push in front of anyone. */
+		proc = (PROC *) &(waitQueue->links);
 	}
 
-ins:;
 	/* -------------------
 	 * Insert self into queue, ahead of the given proc (or at tail of queue).
 	 * -------------------
@@ -629,15 +591,14 @@ ins:;
 	SHMQueueInsertBefore(&(proc->links), &(MyProc->links));
 	waitQueue->size++;
 
-	lock->waitMask |= myMask;
+	lock->waitMask |= (1 << lockmode);
 
 	/* Set up wait information in PROC object, too */
 	MyProc->waitLock = lock;
 	MyProc->waitHolder = holder;
 	MyProc->waitLockMode = lockmode;
-	/* We assume the caller set up MyProc->heldLocks */
 
-	MyProc->errType = NO_ERROR;		/* initialize result for success */
+	MyProc->errType = STATUS_OK; /* initialize result for success */
 
 	/* mark that we are waiting for a lock */
 	waitingForLock = true;
@@ -662,7 +623,7 @@ ins:;
 	 * By delaying the check until we've waited for a bit, we can avoid
 	 * running the rather expensive deadlock-check code in most cases.
 	 *
-	 * Need to zero out struct to set the interval and the micro seconds fields
+	 * Need to zero out struct to set the interval and the microseconds fields
 	 * to 0.
 	 * --------------
 	 */
@@ -768,89 +729,59 @@ ProcWakeup(PROC *proc, int errType)
 
 /*
  * ProcLockWakeup -- routine for waking up processes when a lock is
- *		released.
+ *		released (or a prior waiter is aborted).  Scan all waiters
+ *		for lock, waken any that are no longer blocked.
  */
-int
-ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
+void
+ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock)
 {
-	PROC_QUEUE *queue = &(lock->waitProcs);
+	LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
+	PROC_QUEUE *waitQueue = &(lock->waitProcs);
+	int			queue_size = waitQueue->size;
 	PROC	   *proc;
-	int			awoken = 0;
-	LOCKMODE	last_lockmode = 0;
-	int			queue_size = queue->size;
+	int			conflictMask = 0;
 
 	Assert(queue_size >= 0);
 
-	if (!queue_size)
-		return STATUS_NOT_FOUND;
+	if (queue_size == 0)
+		return;
 
-	proc = (PROC *) MAKE_PTR(queue->links.next);
+	proc = (PROC *) MAKE_PTR(waitQueue->links.next);
 
 	while (queue_size-- > 0)
 	{
-		if (proc->waitLockMode == last_lockmode)
-		{
-			/*
-			 * This proc will conflict as the previous one did, don't even
-			 * try.
-			 */
-			goto nextProc;
-		}
+		LOCKMODE lockmode = proc->waitLockMode;
 
 		/*
-		 * Does this proc conflict with locks held by others ?
+		 * Waken if (a) doesn't conflict with requests of earlier waiters,
+		 * and (b) doesn't conflict with already-held locks.
 		 */
-		if (LockResolveConflicts(lockmethod,
-								 proc->waitLockMode,
-								 lock,
-								 proc->waitHolder,
-								 proc,
-								 NULL) != STATUS_OK)
+		if (((1 << lockmode) & conflictMask) == 0 &&
+			LockCheckConflicts(lockMethodTable,
+							   lockmode,
+							   lock,
+							   proc->waitHolder,
+							   proc,
+							   NULL) == STATUS_OK)
 		{
-			/* Yes.  Quit if we already awoke at least one process. */
-			if (awoken != 0)
-				break;
-			/* Otherwise, see if any later waiters can be awoken. */
-			last_lockmode = proc->waitLockMode;
-			goto nextProc;
+			/* OK to waken */
+			GrantLock(lock, proc->waitHolder, lockmode);
+			proc = ProcWakeup(proc, STATUS_OK);
+			/*
+			 * ProcWakeup removes proc from the lock's waiting process queue
+			 * and returns the next proc in chain; don't use proc's next-link,
+			 * because it's been cleared.
+			 */
 		}
-
-		/*
-		 * OK to wake up this sleeping process.
-		 */
-		GrantLock(lock, proc->waitHolder, proc->waitLockMode);
-		proc = ProcWakeup(proc, NO_ERROR);
-		awoken++;
-
-		/*
-		 * ProcWakeup removes proc from the lock's waiting process queue
-		 * and returns the next proc in chain; don't use proc's next-link,
-		 * because it's been cleared.
-		 */
-		continue;
-
-nextProc:
-		proc = (PROC *) MAKE_PTR(proc->links.next);
-	}
-
-	Assert(queue->size >= 0);
-
-	if (awoken)
-		return STATUS_OK;
-	else
-	{
-		/* Something is still blocking us.	May have deadlocked. */
-#ifdef LOCK_DEBUG
-		if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+		else
 		{
-			elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process",
-				 MAKE_OFFSET(lock));
-			if (Debug_deadlocks)
-				DumpAllLocks();
+			/* Cannot wake this guy.  Add his request to conflict mask. */
+			conflictMask |= lockctl->conflictTab[lockmode];
+			proc = (PROC *) MAKE_PTR(proc->links.next);
 		}
-#endif
-		return STATUS_NOT_FOUND;
 	}
+
+	Assert(waitQueue->size >= 0);
 }
 
 /* --------------------
@@ -900,7 +831,7 @@ HandleDeadLock(SIGNAL_ARGS)
         DumpAllLocks();
 #endif
 
-	if (!DeadLockCheck(MyProc, MyProc->waitLock))
+	if (!DeadLockCheck(MyProc))
 	{
 		/* No deadlock, so keep waiting */
 		UnlockLockTable();
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 39eefac0ca6..0ada6eaccfe 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.43 2001/01/24 19:43:27 momjian Exp $
+ * $Id: lock.h,v 1.44 2001/01/25 03:31:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -247,6 +247,7 @@ typedef struct HOLDER
 extern void InitLocks(void);
 extern void LockDisable(bool status);
 extern bool LockingDisabled(void);
+extern LOCKMETHODTABLE *GetLocksMethodTable(LOCK *lock);
 extern LOCKMETHOD LockMethodTableInit(char *tabName, LOCKMASK *conflictsP,
 					int *prioP, int numModes, int maxBackends);
 extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod);
@@ -256,12 +257,15 @@ extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 						TransactionId xid, LOCKMODE lockmode);
 extern bool LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
 						   bool allxids, TransactionId xid);
-extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCKMODE lockmode,
-								LOCK *lock, HOLDER *holder, PROC *proc,
-								int *myHolding);
+extern int LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+							  LOCKMODE lockmode,
+							  LOCK *lock, HOLDER *holder, PROC *proc,
+							  int *myHolding);
 extern void GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode);
+extern void RemoveFromWaitQueue(PROC *proc);
 extern int	LockShmemSize(int maxBackends);
-extern bool DeadLockCheck(PROC *thisProc, LOCK *findlock);
+extern bool DeadLockCheck(PROC *proc);
+extern void InitDeadLockChecking(void);
 
 #ifdef LOCK_DEBUG
 extern void DumpLocks(void);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 3f8902e7d37..4dd5a8c2a67 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.38 2001/01/24 19:43:28 momjian Exp $
+ * $Id: proc.h,v 1.39 2001/01/25 03:31:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -41,7 +41,7 @@ struct proc
 	SHM_QUEUE	links;			/* list link if process is in a list */
 
 	SEMA		sem;			/* ONE semaphore to sleep on */
-	int			errType;		/* error code tells why we woke up */
+	int			errType;		/* STATUS_OK or STATUS_ERROR after wakeup */
 
 	TransactionId xid;			/* transaction currently being executed by
 								 * this proc */
@@ -86,13 +86,6 @@ do { \
 	if (MyProc) (MyProc->sLocks[(lock)])--; \
 } while (0)
 
-/*
- * flags explaining why process woke up
- */
-#define NO_ERROR		0
-#define ERR_TIMEOUT		1
-#define ERR_BUFFER_IO	2
-
 
 /*
  * There is one ProcGlobal struct for the whole installation.
@@ -134,10 +127,10 @@ extern void ProcReleaseLocks(bool isCommit);
 extern bool ProcRemove(int pid);
 
 extern void ProcQueueInit(PROC_QUEUE *queue);
-extern int ProcSleep(LOCKMETHODCTL *lockctl, LOCKMODE lockmode,
+extern int ProcSleep(LOCKMETHODTABLE *lockMethodTable, LOCKMODE lockmode,
 					 LOCK *lock, HOLDER *holder);
 extern PROC *ProcWakeup(PROC *proc, int errType);
-extern int ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock);
+extern void ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock);
 extern void ProcReleaseSpins(PROC *proc);
 extern bool LockWaitCancel(void);
 extern void HandleDeadLock(SIGNAL_ARGS);
-- 
GitLab