From 564842a61779c91b7b5c92dceba5361c114cb37c Mon Sep 17 00:00:00 2001
From: Bruce Momjian <bruce@momjian.us>
Date: Thu, 13 May 1999 15:55:45 +0000
Subject: [PATCH] Hi, Bruce!

These are my last changes to lmgr fixing deadlock handling.
Please apply them to cvs...

Vadim
---
 src/backend/storage/lmgr/lock.c | 235 ++++++++++++++++++++++++--------
 src/backend/storage/lmgr/proc.c |  11 +-
 src/include/storage/lock.h      |   5 +-
 src/include/storage/proc.h      |   3 +-
 4 files changed, 189 insertions(+), 65 deletions(-)

diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 6c3437837df..09e85e5e134 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.51 1999/05/10 00:45:43 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.52 1999/05/13 15:55:44 momjian Exp $
  *
  * NOTES
  *	  Outside modules can create a lock table and acquire/release
@@ -83,9 +83,9 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
 
 #define LOCK_PRINT_AUX(where,lock,type) \
 	TPRINTF(TRACE_ALL, \
-		 "%s: lock(%x) tbl(%d) rel(%u) db(%d) obj(%u) mask(%x) " \
-		 "hold(%d,%d,%d,%d,%d)=%d " \
-		 "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
+		 "%s: lock(%x) tbl(%d) rel(%u) db(%u) obj(%u) mask(%x) " \
+		 "hold(%d,%d,%d,%d,%d,%d,%d)=%d " \
+		 "act(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
 		 where, \
 		 MAKE_OFFSET(lock), \
 		 lock->tag.lockmethod, \
@@ -98,12 +98,16 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
 		 lock->holders[3], \
 		 lock->holders[4], \
 		 lock->holders[5], \
+		 lock->holders[6], \
+		 lock->holders[7], \
 		 lock->nHolding, \
 		 lock->activeHolders[1], \
 		 lock->activeHolders[2], \
 		 lock->activeHolders[3], \
 		 lock->activeHolders[4], \
 		 lock->activeHolders[5], \
+		 lock->activeHolders[6], \
+		 lock->activeHolders[7], \
 		 lock->nActive, \
 		 lock->waitProcs.size, \
 		 lock_types[type])
@@ -119,8 +123,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
 
 #define XID_PRINT_AUX(where,xidentP) \
 	TPRINTF(TRACE_ALL, \
-		 "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \
-		 "hold(%d,%d,%d,%d,%d)=%d", \
+		 "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%u) " \
+		 "hold(%d,%d,%d,%d,%d,%d,%d)=%d", \
 		 where, \
 		 MAKE_OFFSET(xidentP), \
 		 xidentP->tag.lock, \
@@ -132,6 +136,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
 		 xidentP->holders[3], \
 		 xidentP->holders[4], \
 		 xidentP->holders[5], \
+		 xidentP->holders[6], \
+		 xidentP->holders[7], \
 		 xidentP->nHolding)
 
 #else							/* !LOCK_MGR_DEBUG */
@@ -1561,19 +1567,26 @@ LockingDisabled()
  * We have already locked the master lock before being called.
  */
 bool
-DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
+DeadLockCheck(void *proc, LOCK *findlock)
 {
-	int						done;
 	XIDLookupEnt		   *xidLook = NULL;
 	XIDLookupEnt		   *tmp = NULL;
+	PROC				   *thisProc = (PROC*) proc,
+						   *waitProc;
+	SHM_QUEUE			   *lockQueue = &(thisProc->lockQueue);
 	SHMEM_OFFSET			end = MAKE_OFFSET(lockQueue);
 	LOCK				   *lock;
+	PROC_QUEUE			   *waitQueue;
+	int						i,
+							j;
+	bool					first_run = (thisProc == MyProc),
+							done;
 
 	static PROC			   *checked_procs[MAXBACKENDS];
 	static int				nprocs;
 
 	/* initialize at start of recursion */
-	if (skip_check)
+	if (first_run)
 	{
 		checked_procs[0] = MyProc;
 		nprocs = 1;
@@ -1593,74 +1606,186 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
 
 		LOCK_PRINT("DeadLockCheck", lock, 0);
 
+		if (lock->tag.relId == 0)	/* user' lock */
+			goto nxtl;
+
 		/*
-		 * This is our only check to see if we found the lock we want.
-		 *
-		 * The lock we are waiting for is already in MyProc->lockQueue so we
-		 * need to skip it here.  We are trying to find it in someone
-		 * else's lockQueue.   bjm
+		 * waitLock is always in lockQueue of waiting proc,
+		 * if !first_run then upper caller will handle waitProcs
+		 * queue of waitLock.
 		 */
-		if (lock == findlock && !skip_check)
-			return true;
+		if (thisProc->waitLock == lock && !first_run)
+			goto nxtl;
 
+		/*
+		 * If we found proc holding findlock and sleeping on some my 
+		 * other lock then we have to check does it block me or
+		 * another waiters.
+		 */
+		if (lock == findlock && !first_run)
 		{
-			PROC_QUEUE *waitQueue = &(lock->waitProcs);
-			PROC	   *proc;
-			int			i;
-			int			j;
+			LOCKMETHODCTL  *lockctl = 
+							LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
+			int				lm;
 
-			proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
-			for (i = 0; i < waitQueue->size; i++)
+			Assert(xidLook->nHolding > 0);
+			for (lm = 1; lm <= lockctl->numLockModes; lm++)
 			{
+				if (xidLook->holders[lm] > 0 && 
+					lockctl->conflictTab[lm] & findlock->waitMask)
+					return true;
+			}
+			/*
+			 * Else - get the next lock from thisProc's lockQueue
+			 */
+			goto nxtl;	
+		}
+
+		waitQueue = &(lock->waitProcs);
+		waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+
+		for (i = 0; i < waitQueue->size; i++)
+		{
+			if (waitProc == thisProc)
+			{
+				Assert(waitProc->waitLock == lock);
+				Assert(waitProc == MyProc);
+				waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+				continue;
+			}
+			if (lock == findlock)	/* first_run also true */
+			{
+				LOCKMETHODCTL  *lockctl = 
+						LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
+
 				/*
-				 * If I hold some locks on findlock and another proc 
-				 * waits on it holding locks too - check if we are
-				 * waiting one another.
+				 * If me blocked by his holdlock...
 				 */
-				if (proc != MyProc &&
-					lock == findlock &&	/* skip_check also true */
-					MyProc->holdLock)
+				if (lockctl->conflictTab[MyProc->token] & waitProc->holdLock)
 				{
-					LOCKMETHODCTL  *lockctl = 
-							LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
-
-					Assert(skip_check);
-					if (lockctl->conflictTab[MyProc->token] & proc->holdLock && 
-						lockctl->conflictTab[proc->token] & MyProc->holdLock)
+					/* and he blocked by me -> deadlock */
+					if (lockctl->conflictTab[waitProc->token] & MyProc->holdLock)
 						return true;
+					/* we shouldn't look at lockQueue of our blockers */
+					waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+					continue;
 				}
-
 				/*
-				 * No sense in looking at the wait queue of the lock we
-				 * are looking for. If lock == findlock, and I got here,
-				 * skip_check must be true too.
+				 * If he isn't blocked by me and we request non-conflicting 
+				 * lock modes - no deadlock here because of he isn't
+				 * blocked by me in any sence (explicitle or implicitly). 
+				 * Note that we don't do like test if !first_run
+				 * (when thisProc is holder and non-waiter on lock) and so
+				 * we call DeadLockCheck below for every waitProc in
+				 * thisProc->lockQueue, even for waitProc-s un-blocked
+				 * by thisProc. Should we? This could save us some time...
 				 */
-				if (lock != findlock)
+				if (!(lockctl->conflictTab[waitProc->token] & MyProc->holdLock) && 
+					!(lockctl->conflictTab[waitProc->token] & (1 << MyProc->token)))
 				{
-					for (j = 0; j < nprocs; j++)
-						if (checked_procs[j] == proc)
-							break;
-					if (j >= nprocs && lock != findlock)
-					{
-						Assert(nprocs < MAXBACKENDS);
-						checked_procs[nprocs++] = proc;
+					waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+					continue;
+				}
+			}
 
+			/*
+			 * Look in lockQueue of this waitProc, if didn't do this before.
+			 */
+			for (j = 0; j < nprocs; j++)
+			{
+				if (checked_procs[j] == waitProc)
+					break;
+			}
+			if (j >= nprocs)
+			{
+				Assert(nprocs < MAXBACKENDS);
+				checked_procs[nprocs++] = waitProc;
+
+				if (DeadLockCheck(waitProc, findlock))
+				{
+					LOCKMETHODCTL  *lockctl = 
+							LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
+					int				holdLock;
+
+					/*
+					 * Ok, but is waitProc waiting for me (thisProc) ?
+					 */
+					if (thisProc->waitLock == lock)
+					{
+						Assert(first_run);
+						holdLock = thisProc->holdLock;
+					}
+					else	/* should we cache holdLock ? */
+					{
+						int	lm, tmpMask = 2;
+
+						Assert(xidLook->nHolding > 0);
+						for (holdLock = 0, lm = 1; 
+								lm <= lockctl->numLockModes; 
+								lm++, tmpMask <<= 1)
+						{
+							if (xidLook->holders[lm] > 0)
+								holdLock |= tmpMask;
+						}
+						Assert(holdLock != 0);
+					}
+					if (lockctl->conflictTab[waitProc->token] & holdLock)
+					{
 						/*
-						 * For non-MyProc entries, we are looking only
-						 * waiters, not necessarily people who already
-						 * hold locks and are waiting. Now we check for
-						 * cases where we have two or more tables in a
-						 * deadlock.  We do this by continuing to search
-						 * for someone holding a lock       bjm
+						 * Last attempt to avoid deadlock - try to wakeup
+						 * myself.
 						 */
-						if (DeadLockCheck(&(proc->lockQueue), findlock, false))
-							return true;
+						if (first_run)
+						{
+							if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
+													 MyProc->waitLock,
+													 MyProc->token,
+													 MyProc->xid,
+													 NULL) == STATUS_OK)
+							{
+								GrantLock(MyProc->waitLock, MyProc->token);
+								(MyProc->waitLock->waitProcs.size)--;
+								ProcWakeup(MyProc, NO_ERROR);
+								return false;
+							}
+						}
+						return true;
+					}
+					/*
+					 * Hell! Is he blocked by any (other) holder ?
+					 */
+					if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
+											 lock,
+											 waitProc->token,
+											 waitProc->xid,
+											 NULL) != STATUS_OK)
+					{
+						/*
+						 * Blocked by others - no deadlock...
+						 */
+#ifdef DEADLOCK_DEBUG
+						LOCK_PRINT("DeadLockCheck: blocked by others", 
+										lock, waitProc->token);
+#endif
+						waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+						continue;
 					}
+					/*
+					 * Well - wakeup this guy! This is the case of
+					 * implicit blocking: thisProc blocked someone who blocked
+					 * waitProc by the fact that he (someone) is already
+					 * waiting for lock (we do this for anti-starving).
+					 */
+					GrantLock(lock, waitProc->token);
+					waitQueue->size--;
+					waitProc = ProcWakeup(waitProc, NO_ERROR);
+					continue;
 				}
-				proc = (PROC *) MAKE_PTR(proc->links.prev);
 			}
+			waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
 		}
 
+nxtl:;
 		if (done)
 			break;
 		SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b80d32e1b44..4eb97b8a6a8 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.55 1999/05/13 15:55:44 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@
  *		This is so that we can support more backends. (system-wide semaphore
  *		sets run out pretty fast.)				  -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.55 1999/05/13 15:55:44 momjian Exp $
  */
 #include <sys/time.h>
 #include <unistd.h>
@@ -78,7 +78,6 @@
 #include "utils/trace.h"
 
 static void HandleDeadLock(int sig);
-static PROC *ProcWakeup(PROC *proc, int errType);
 static void ProcFreeAllSemaphores(void);
 
 #define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT]
@@ -640,7 +639,7 @@ rt:;
  *	 remove the process from the wait queue and set its links invalid.
  *	 RETURN: the next process in the wait queue.
  */
-static PROC *
+PROC *
 ProcWakeup(PROC *proc, int errType)
 {
 	PROC	   *retProc;
@@ -806,10 +805,10 @@ HandleDeadLock(int sig)
 	DumpAllLocks();
 #endif
 
-	if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
+	MyProc->errType = STATUS_NOT_FOUND;
+	if (!DeadLockCheck(MyProc, MyProc->waitLock))
 	{
 		UnlockLockTable();
-		MyProc->errType = STATUS_NOT_FOUND;
 		return;
 	}
 
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 387f164247c..ab084eb94fd 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.25 1999/05/07 01:23:07 vadim Exp $
+ * $Id: lock.h,v 1.26 1999/05/13 15:55:44 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -249,8 +249,7 @@ extern void GrantLock(LOCK *lock, LOCKMODE lockmode);
 extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue);
 extern int	LockShmemSize(int maxBackends);
 extern bool LockingDisabled(void);
-extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock,
-			  bool skip_check);
+extern bool DeadLockCheck(void *proc, LOCK *findlock);
 
 #ifdef DEADLOCK_DEBUG
 extern void DumpLocks(void);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 53b677858fd..3dcf1e281b5 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.21 1999/05/07 01:23:07 vadim Exp $
+ * $Id: proc.h,v 1.22 1999/05/13 15:55:45 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -119,6 +119,7 @@ extern bool ProcRemove(int pid);
 extern void ProcQueueInit(PROC_QUEUE *queue);
 extern int ProcSleep(PROC_QUEUE *queue, LOCKMETHODCTL *lockctl, int token, 
 					LOCK *lock);
+extern PROC *ProcWakeup(PROC *proc, int errType);
 extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
 			   LOCK *lock);
 extern void ProcAddLock(SHM_QUEUE *elem);
-- 
GitLab