From 763554393a9c816a8b95d068a8137e18e33f824c Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Tue, 4 Sep 2001 21:42:17 +0000
Subject: [PATCH] Fix code so that we recover cleanly if there are no free
 semaphores available in freeSemMap.  As noted by Tatsuo, this is now a likely
 scenario for detecting MaxBackends-exceeded; if MaxBackends is a multiple of
 PROC_NSEMS_PER_SET then we will fail here and not in sinval.c.  The cleanup
 path did not work correctly before, anyway.

---
 src/backend/storage/lmgr/proc.c | 101 +++++++++++++++++++-------------
 1 file changed, 60 insertions(+), 41 deletions(-)

diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index f491bc16f70..71f85cbe58f 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.105 2001/09/04 02:26:57 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.106 2001/09/04 21:42:17 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -186,6 +186,17 @@ InitProcess(void)
 	unsigned long location,
 				myOffset;
 
+	/*
+	 * ProcStructLock protects the freelist of PROC entries and the map
+	 * of free semaphores.  Note that when we acquire it here, we do not
+	 * have a PROC entry and so the ownership of the spinlock is not
+	 * recorded anywhere; even if it was, until we register ProcKill as
+	 * an on_shmem_exit callback, there is no exit hook that will cause
+	 * owned spinlocks to be released.  Upshot: during the first part of
+	 * this routine, be careful to release the lock manually before any
+	 * elog(), else you'll have a stuck spinlock to add to your woes.
+	 */
+
 	SpinAcquire(ProcStructLock);
 
 	/* attach to the ProcGlobal structure */
@@ -194,13 +205,14 @@ InitProcess(void)
 	if (!found)
 	{
 		/* this should not happen. InitProcGlobal() is called before this. */
+		SpinRelease(ProcStructLock);
 		elog(STOP, "InitProcess: Proc Header uninitialized");
 	}
 
 	if (MyProc != NULL)
 	{
 		SpinRelease(ProcStructLock);
-		elog(ERROR, "ProcInit: you already exist");
+		elog(ERROR, "InitProcess: you already exist");
 	}
 
 	/* try to get a proc struct from the free list first */
@@ -214,14 +226,12 @@ InitProcess(void)
 	}
 	else
 	{
-
 		/*
 		 * have to allocate one.  We can't use the normal shmem index
 		 * table mechanism because the proc structure is stored by PID
 		 * instead of by a global name (need to look it up by PID when we
 		 * cleanup dead processes).
 		 */
-
 		MyProc = (PROC *) ShmemAlloc(sizeof(PROC));
 		if (!MyProc)
 		{
@@ -231,44 +241,32 @@ InitProcess(void)
 	}
 
 	/*
-	 * zero out the spin lock counts and set the sLocks field for
-	 * ProcStructLock to 1 as we have acquired this spinlock above but
-	 * didn't record it since we didn't have MyProc until now.
-	 */
-	MemSet(MyProc->sLocks, 0, sizeof(MyProc->sLocks));
-	MyProc->sLocks[ProcStructLock] = 1;
-
-	/*
-	 * Set up a wait-semaphore for the proc.
+	 * Initialize all fields of MyProc.
 	 */
-	if (IsUnderPostmaster)
-	{
-		ProcGetNewSemIdAndNum(&MyProc->sem.semId, &MyProc->sem.semNum);
-
-		/*
-		 * we might be reusing a semaphore that belongs to a dead backend.
-		 * So be careful and reinitialize its value here.
-		 */
-		ZeroProcSemaphore(MyProc);
-	}
-	else
-	{
-		MyProc->sem.semId = -1;
-		MyProc->sem.semNum = -1;
-	}
-
 	SHMQueueElemInit(&(MyProc->links));
+	MyProc->sem.semId = -1;		/* no wait-semaphore acquired yet */
+	MyProc->sem.semNum = -1;
 	MyProc->errType = STATUS_OK;
-	MyProc->pid = MyProcPid;
-	MyProc->databaseId = MyDatabaseId;
 	MyProc->xid = InvalidTransactionId;
 	MyProc->xmin = InvalidTransactionId;
+	MyProc->logRec.xrecoff = 0;
 	MyProc->waitLock = NULL;
 	MyProc->waitHolder = NULL;
+	MyProc->pid = MyProcPid;
+	MyProc->databaseId = MyDatabaseId;
 	SHMQueueInit(&(MyProc->procHolders));
+	/*
+	 * Zero out the spin lock counts and set the sLocks field for
+	 * ProcStructLock to 1 as we have acquired this spinlock above but
+	 * didn't record it since we didn't have MyProc until now.
+	 */
+	MemSet(MyProc->sLocks, 0, sizeof(MyProc->sLocks));
+	MyProc->sLocks[ProcStructLock] = 1;
 
 	/*
-	 * Release the lock.
+	 * Release the lock while accessing shmem index; we still haven't
+	 * installed ProcKill and so we don't want to hold lock if there's
+	 * an error.
 	 */
 	SpinRelease(ProcStructLock);
 
@@ -283,10 +281,28 @@ InitProcess(void)
 		elog(STOP, "InitProcess: ShmemPID table broken");
 
 	/*
-	 * Arrange to clean up at backend exit.
+	 * Arrange to clean up at backend exit.  Once we do this, owned
+	 * spinlocks will be released on exit, and so we can be a lot less
+	 * tense about errors.
 	 */
 	on_shmem_exit(ProcKill, 0);
 
+	/*
+	 * Set up a wait-semaphore for the proc.  (Do this last so that we
+	 * can rely on ProcKill to clean up if it fails.)
+	 */
+	if (IsUnderPostmaster)
+	{
+		SpinAcquire(ProcStructLock);
+		ProcGetNewSemIdAndNum(&MyProc->sem.semId, &MyProc->sem.semNum);
+		SpinRelease(ProcStructLock);
+		/*
+		 * We might be reusing a semaphore that belongs to a dead backend.
+		 * So be careful and reinitialize its value here.
+		 */
+		ZeroProcSemaphore(MyProc);
+	}
+
 	/*
 	 * Now that we have a PROC, we could try to acquire locks, so
 	 * initialize the deadlock checker.
@@ -425,8 +441,9 @@ ProcKill(void)
 
 	SpinAcquire(ProcStructLock);
 
-	/* Free up my wait semaphore */
-	ProcFreeSem(MyProc->sem.semId, MyProc->sem.semNum);
+	/* Free up my wait semaphore, if I got one */
+	if (MyProc->sem.semId >= 0)
+		ProcFreeSem(MyProc->sem.semId, MyProc->sem.semNum);
 
 	/* Add PROC struct to freelist so space can be recycled in future */
 	MyProc->links.next = ProcGlobal->freeProcs;
@@ -993,10 +1010,7 @@ ProcGetNewSemIdAndNum(IpcSemaphoreId *semId, int *semNum)
 		{
 			if ((freeSemMap[i] & mask) == 0)
 			{
-
-				/*
-				 * a free semaphore found. Mark it as allocated.
-				 */
+				/* A free semaphore found. Mark it as allocated. */
 				freeSemMap[i] |= mask;
 
 				*semId = procSemIds[i];
@@ -1007,8 +1021,13 @@ ProcGetNewSemIdAndNum(IpcSemaphoreId *semId, int *semNum)
 		}
 	}
 
-	/* if we reach here, all the semaphores are in use. */
-	elog(ERROR, "ProcGetNewSemIdAndNum: cannot allocate a free semaphore");
+	/*
+	 * If we reach here, all the semaphores are in use.  This is one of the
+	 * possible places to detect "too many backends", so give the standard
+	 * error message.  (Whether we detect it here or in sinval.c depends on
+	 * whether MaxBackends is a multiple of PROC_NSEMS_PER_SET.)
+	 */
+	elog(FATAL, "Sorry, too many clients already");
 }
 
 /*
-- 
GitLab