From 862927f4433bc759b0e8ccd57e778b1259c20b81 Mon Sep 17 00:00:00 2001
From: Bruce Momjian <bruce@momjian.us>
Date: Tue, 27 Jan 1998 03:00:43 +0000
Subject: [PATCH] Real deadlock detection.

---
 src/backend/storage/lmgr/lock.c | 92 +++++++++++++++++++++++++++++----
 src/backend/storage/lmgr/proc.c | 67 +++++++++++++-----------
 src/include/config.h.in         | 16 +++---
 src/include/storage/lock.h      |  4 +-
 4 files changed, 130 insertions(+), 49 deletions(-)

diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index efc4c991ff4..286824e27f6 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.21 1998/01/25 05:14:02 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.22 1998/01/27 03:00:28 momjian Exp $
  *
  * NOTES
  *	  Outside modules can create a lock table and acquire/release
@@ -1201,7 +1201,7 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
 		done = (xidLook->queue.next == end);
 		lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
-		LOCK_PRINT("ReleaseAll", (&lock->tag), 0);
+		LOCK_PRINT("LockReleaseAll", (&lock->tag), 0);
 
 #ifdef USER_LOCKS
 
@@ -1307,11 +1307,7 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
 			|| !found)
 		{
 			SpinRelease(masterLock);
-#ifdef USER_LOCKS
 			elog(NOTICE, "LockReleaseAll: xid table corrupted");
-#else
-			elog(NOTICE, "LockReplace: xid table corrupted");
-#endif
 			return (FALSE);
 		}
 
@@ -1329,11 +1325,7 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
 			if ((!lock) || (!found))
 			{
 				SpinRelease(masterLock);
-#ifdef USER_LOCKS
 				elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
-#else
-				elog(NOTICE, "LockReplace: cannot remove lock from HTAB");
-#endif
 				return (FALSE);
 			}
 		}
@@ -1415,6 +1407,86 @@ LockingDisabled()
 	return LockingIsDisabled;
 }
 
+/*
+ * DeadlockCheck -- Checks for deadlocks for a given process
+ *
+ * We can't block on user locks, so no sense testing for deadlock
+ * because there is no blocking, and no timer for the block.
+ *
+ * This code takes a list of locks a process holds, and the lock that
+ * the process is sleeping on, and tries to find if any of the processes
+ * waiting on its locks hold the lock it is waiting for.
+ *
+ * We have already locked the master lock before being called.
+ */
+bool
+DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
+{
+	int			done;
+	XIDLookupEnt *xidLook = NULL;
+	XIDLookupEnt *tmp = NULL;
+	SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
+	LOCK	   *lock;
+
+	if (SHMQueueEmpty(lockQueue))
+		return false;
+
+	SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
+
+	XID_PRINT("DeadLockCheck", xidLook);
+
+	for (;;)
+	{
+		/* ---------------------------
+		 * XXX Here we assume the shared memory queue is circular and
+		 * that we know its internal structure.  Should have some sort of
+		 * macros to allow one to walk it.	mer 20 July 1991
+		 * ---------------------------
+		 */
+		done = (xidLook->queue.next == end);
+		lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
+
+		LOCK_PRINT("DeadLockCheck", (&lock->tag), 0);
+
+		/*
+		 * This is our only check to see if we found the lock we want.
+		 *
+		 * The lock we are waiting for is already in MyProc->lockQueue
+		 * so we need to skip it here.  We are trying to find it in
+		 * someone else's lockQueue.
+		 */
+		if (lock == findlock && !skip_check)
+			return true;
+		else if (lock != findlock || !skip_check)
+		{
+			PROC_QUEUE  *waitQueue = &(lock->waitProcs);
+			PROC		*proc;
+			int			i;
+			
+			proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+			for (i = 0; i < waitQueue->size; i++)
+			{
+				/* prevent endless loops */
+				if (proc != MyProc && skip_check)
+				{
+					/* If we found a deadlock, we can stop right now */
+					if (DeadLockCheck(&(proc->lockQueue), findlock, false))
+						return true;
+				}
+				proc = (PROC *) MAKE_PTR(proc->links.prev);
+			}
+		}
+			
+		if (done)
+			break;
+		SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
+		xidLook = tmp;
+	}
+
+	/* if we got here, no deadlock */
+	return false;
+}
+
 #ifdef DEADLOCK_DEBUG
 /*
  * Dump all locks. Must have already acquired the masterLock.
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index deca35c8d42..893ea41833d 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.28 1998/01/25 05:14:09 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.29 1998/01/27 03:00:29 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@
  *		This is so that we can support more backends. (system-wide semaphore
  *		sets run out pretty fast.)				  -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.28 1998/01/25 05:14:09 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.29 1998/01/27 03:00:29 momjian Exp $
  */
 #include <sys/time.h>
 #include <unistd.h>
@@ -442,7 +442,7 @@ ProcQueueInit(PROC_QUEUE *queue)
  * NOTES: The process queue is now a priority queue for locking.
  */
 int
-ProcSleep(PROC_QUEUE *queue,
+ProcSleep(PROC_QUEUE *waitQueue,
 		  SPINLOCK spinlock,
 		  int token,
 		  int prio,
@@ -453,8 +453,8 @@ ProcSleep(PROC_QUEUE *queue,
 	struct itimerval timeval,
 				dummy;
 
-	proc = (PROC *) MAKE_PTR(queue->links.prev);
-	for (i = 0; i < queue->size; i++)
+	proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+	for (i = 0; i < waitQueue->size; i++)
 	{
 		if (proc->prio >= prio)
 			proc = (PROC *) MAKE_PTR(proc->links.prev);
@@ -478,36 +478,38 @@ ProcSleep(PROC_QUEUE *queue,
 	 * -------------------
 	 */
 	SHMQueueInsertTL(&(proc->links), &(MyProc->links));
-	queue->size++;
+	waitQueue->size++;
 
 	SpinRelease(spinlock);
 
 	/* --------------
-	 * Postgres does not have any deadlock detection code and for this
-	 * reason we must set a timer to wake up the process in the event of
-	 * a deadlock.	For now the timer is set for 1 minute and we assume that
-	 * any process which sleeps for this amount of time is deadlocked and will
-	 * receive a SIGALRM signal.  The handler should release the processes
-	 * semaphore and abort the current transaction.
+	 * We set this so we can wake up periodically and check for a deadlock.
+	 * If a deadlock is detected, the handler releases the processes
+	 * semaphore and aborts the current transaction.
 	 *
 	 * Need to zero out struct to set the interval and the micro seconds fields
 	 * to 0.
 	 * --------------
 	 */
 	MemSet(&timeval, 0, sizeof(struct itimerval));
-	timeval.it_value.tv_sec = DEADLOCK_TIMEOUT;
+	timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER;
 
-	if (setitimer(ITIMER_REAL, &timeval, &dummy))
-		elog(FATAL, "ProcSleep: Unable to set timer for process wakeup");
+	do
+	{
+		MyProc->errType = NO_ERROR; /* reset flag after deadlock check */
 
-	/* --------------
-	 * if someone wakes us between SpinRelease and IpcSemaphoreLock,
-	 * IpcSemaphoreLock will not block.  The wakeup is "saved" by
-	 * the semaphore implementation.
-	 * --------------
-	 */
-	IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+		if (setitimer(ITIMER_REAL, &timeval, &dummy))
+			elog(FATAL, "ProcSleep: Unable to set timer for process wakeup");
 
+		/* --------------
+		 * if someone wakes us between SpinRelease and IpcSemaphoreLock,
+		 * IpcSemaphoreLock will not block.  The wakeup is "saved" by
+		 * the semaphore implementation.
+		 * --------------
+		 */
+		IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+	} while (MyProc->errType == STATUS_NOT_FOUND); /* sleep after deadlock check */
+	
 	/* ---------------
 	 * We were awoken before a timeout - now disable the timer
 	 * ---------------
@@ -615,10 +617,9 @@ ProcAddLock(SHM_QUEUE *elem)
 }
 
 /* --------------------
- * We only get to this routine if we got SIGALRM after DEADLOCK_TIMEOUT
- * while waiting for a lock to be released by some other process.  After
- * the one minute deadline we assume we have a deadlock and must abort
- * this transaction.  We must also indicate that I'm no longer waiting
+ * We only get to this routine if we got SIGALRM after DEADLOCK_CHECK_TIMER
+ * while waiting for a lock to be released by some other process.  If we have
+ * a real deadlock, we must also indicate that I'm no longer waiting
  * on a lock so that other processes don't try to wake me up and screw
  * up my semaphore.
  * --------------------
@@ -665,12 +666,19 @@ HandleDeadLock(int sig)
 		return;
 	}
 
-	mywaitlock = MyProc->waitLock;
-
 #ifdef DEADLOCK_DEBUG
 	DumpLocks();
 #endif
 
+	if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
+	{
+		UnlockLockTable();
+		MyProc->errType = STATUS_NOT_FOUND;
+		return;
+	}
+
+	mywaitlock = MyProc->waitLock;
+
 	/* ------------------------
 	 * Get this process off the lock's wait queue
 	 * ------------------------
@@ -701,8 +709,7 @@ HandleDeadLock(int sig)
 	 */
 	UnlockLockTable();
 
-	elog(NOTICE, "Timeout interval reached -- possible deadlock.");
-	elog(NOTICE, "See the lock(l) manual page for a possible cause.");
+	elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause.");
 	return;
 }
 
diff --git a/src/include/config.h.in b/src/include/config.h.in
index 74f84894db7..9c0b1829c1f 100644
--- a/src/include/config.h.in
+++ b/src/include/config.h.in
@@ -210,14 +210,16 @@ extern void srandom(int seed);
    code seems broken without it, Bruce Momjian */
 /* #define LOARRAY */
 
-/* This is the time, in seconds, at which a given backend server
- * will wait on a lock before deciding to abort the transaction
- * (this is what we do in lieu of deadlock detection).
- *
- * Low numbers are not recommended as they will tend to cause
- * false aborts if many transactions are long-lived.
+/*
+ * As soon as the backend blocks on a lock, it waits this number of seconds
+ * before checking for a deadlock.  If not, it keeps checking every this
+ * number of seconds.
+ * We don't check for deadlocks just before sleeping because a deadlock is
+ * a rare event, and checking is an expensive operation.
+ * We only detect deadlocks between two processes, not three or more, but
+ * these are the most common.
  */
-#define DEADLOCK_TIMEOUT 60
+#define DEADLOCK_CHECK_TIMER 60
 
 /*
  * This flag enables the use of idexes in plans generated for function
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index e2a45e91203..98804eb1f08 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.9 1998/01/24 22:50:11 momjian Exp $
+ * $Id: lock.h,v 1.10 1998/01/27 03:00:43 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -214,7 +214,7 @@ extern void GrantLock(LOCK *lock, LOCKT lockt);
 extern bool LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue);
 extern int	LockShmemSize(void);
 extern bool LockingDisabled(void);
-
+extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check);
 #ifdef DEADLOCK_DEBUG
 extern void DumpLocks(void);
 
-- 
GitLab