From c77a29a14efe69851a087989e3520ed560fa8862 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Tue, 6 Oct 1998 02:40:09 +0000
Subject: [PATCH] Substantial rewrite of async.c to avoid problems with
 non-reentrant stdio and possibly other problems.  Minor changes in xact.c and
 postgres.c's main loop to support new handling of async NOTIFY.

---
 src/backend/access/transam/xact.c |   45 +-
 src/backend/commands/async.c      | 1048 ++++++++++++++++-------------
 src/backend/tcop/postgres.c       |   25 +-
 src/backend/utils/misc/trace.c    |    4 -
 src/include/access/xact.h         |    3 +-
 src/include/commands/async.h      |   27 +-
 src/include/utils/trace.h         |    4 -
 7 files changed, 645 insertions(+), 511 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d082c805cd7..24db5befb9e 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.23 1998/09/01 04:27:19 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.24 1998/10/06 02:39:58 tgl Exp $
  *
  * NOTES
  *		Transaction aborts can now occur two ways:
@@ -901,6 +901,9 @@ CommitTransaction()
 	/* handle commit for large objects [ PA, 7/17/98 ] */
 	_lo_commit();
 
+	/* NOTIFY commit must also come before lower-level cleanup */
+	AtCommit_Notify();
+
 	CloseSequences();
 	DestroyTempRels();
 	AtEOXact_portals();
@@ -916,10 +919,6 @@ CommitTransaction()
 	 * ----------------
 	 */
 	s->state = TRANS_DEFAULT;
-	{							/* want this after commit */
-		if (IsNormalProcessingMode())
-			Async_NotifyAtCommit();
-	}
 
 	/*
 	 * Let others to know about no transaction in progress - vadim
@@ -967,6 +966,7 @@ AbortTransaction()
 	 *	do abort processing
 	 * ----------------
 	 */
+	AtAbort_Notify();
 	CloseSequences();
 	AtEOXact_portals();
 	RecordTransactionAbort();
@@ -982,17 +982,6 @@ AbortTransaction()
 	 * ----------------
 	 */
 	s->state = TRANS_DEFAULT;
-	{
-
-		/*
-		 * We need to do this in case another process notified us while we
-		 * are in the middle of an aborted transaction.  We need to notify
-		 * our frontend after we finish the current transaction. -- jw,
-		 * 1/3/94
-		 */
-		if (IsNormalProcessingMode())
-			Async_NotifyAtAbort();
-	}
 }
 
 /* --------------------------------
@@ -1455,6 +1444,30 @@ UserAbortTransactionBlock()
 	s->blockState = TBLOCK_ENDABORT;
 }
 
+/* --------------------------------
+ *		AbortOutOfAnyTransaction
+ *
+ * This routine is provided for error recovery purposes.  It aborts any
+ * active transaction or transaction block, leaving the system in a known
+ * idle state.
+ * --------------------------------
+ */
+void
+AbortOutOfAnyTransaction()
+{
+	TransactionState s = CurrentTransactionState;
+
+	/*
+	 * Get out of any low-level transaction
+	 */
+	if (s->state != TRANS_DEFAULT)
+		AbortTransaction();
+	/*
+	 * Now reset the high-level state
+	 */
+	s->blockState = TBLOCK_DEFAULT;
+}
+
 bool
 IsTransactionBlock()
 {
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 12121977177..a8c447cb077 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -1,38 +1,79 @@
 /*-------------------------------------------------------------------------
  *
  * async.c--
- *	  Asynchronous notification
+ *	  Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.40 1998/09/01 04:27:42 momjian Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.41 1998/10/06 02:39:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
-/* New Async Notification Model:
+
+/*-------------------------------------------------------------------------
+ * New Async Notification Model:
  * 1. Multiple backends on same machine.  Multiple backends listening on
- *	  one relation.
- *
- * 2. One of the backend does a 'notify <relname>'.  For all backends that
- *	  are listening to this relation (all notifications take place at the
- *	  end of commit),
- *	  2.a  If the process is the same as the backend process that issued
- *		   notification (we are notifying something that we are listening),
- *		   signal the corresponding frontend over the comm channel.
- *	  2.b  For all other listening processes, we send kill(SIGUSR2) to wake up
- *		   the listening backend.
- * 3. Upon receiving a kill(SIGUSR2) signal from another backend process
- *	  notifying that one of the relation that we are listening is being
- *	  notified, we can be in either of two following states:
- *	  3.a  We are sleeping, wake up and signal our frontend.
- *	  3.b  We are in middle of another transaction, wait until the end of
- *		   of the current transaction and signal our frontend.
- * 4. Each frontend receives this notification and processes accordingly.
- *
- * -- jw, 12/28/93
- *
+ *	  one relation.  (Note: "listening on a relation" is not really the
+ *	  right way to think about it, since the notify names need not have
+ *	  anything to do with the names of relations actually in the database.
+ *	  But this terminology is all over the code and docs, and I don't feel
+ *	  like trying to replace it.)
+ *
+ * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
+ *	  ie, each relname/listenerPID pair.  The "notification" field of the
+ *	  tuple is zero when no NOTIFY is pending for that listener, or the PID
+ *	  of the originating backend when a cross-backend NOTIFY is pending.
+ *	  (We skip writing to pg_listener when doing a self-NOTIFY, so the
+ *	  notification field should never be equal to the listenerPID field.)
+ *
+ * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
+ *	  relname to a list of outstanding NOTIFY requests.  Actual processing
+ *	  happens if and only if we reach transaction commit.  At that time (in
+ *	  routine AtCommit_Notify) we scan pg_listener for matching relnames.
+ *    If the listenerPID in a matching tuple is ours, we just send a notify
+ *	  message to our own front end.  If it is not ours, and "notification"
+ *	  is not already nonzero, we set notification to our own PID and send a
+ *	  SIGUSR2 signal to the receiving process (indicated by listenerPID).
+ *	  BTW: if the signal operation fails, we presume that the listener backend
+ *    crashed without removing this tuple, and remove the tuple for it.
+ *
+ * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
+ *	  notify processing immediately if this backend is idle (ie, it is
+ *	  waiting for a frontend command and is not within a transaction block).
+ *    Otherwise the handler may only set a flag, which will cause the
+ *	  processing to occur just before we next go idle.
+ *
+ * 5. Inbound-notify processing consists of scanning pg_listener for tuples
+ *	  matching our own listenerPID and having nonzero notification fields.
+ *	  For each such tuple, we send a message to our frontend and clear the
+ *	  notification field.  BTW: this routine has to start/commit its own
+ *	  transaction, since by assumption it is only called from outside any
+ *	  transaction.
+ *
+ * Note that the system's use of pg_listener is confined to very short
+ * intervals at the end of a transaction that contains NOTIFY statements,
+ * or during the transaction caused by an inbound SIGUSR2.  So the fact that
+ * pg_listener is a global resource shouldn't cause too much performance
+ * problem.  But application authors ought to be discouraged from doing
+ * LISTEN or UNLISTEN near the start of a long transaction --- that would
+ * result in holding the pg_listener write lock for a long time, possibly
+ * blocking unrelated activity.  It could even lead to deadlock against another
+ * transaction that touches the same user tables and then tries to NOTIFY.
+ * Probably best to do LISTEN or UNLISTEN outside of transaction blocks.
+ *
+ * An application that listens on the same relname it notifies will get
+ * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
+ * by comparing be_pid in the NOTIFY message to the application's own backend's
+ * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
+ * frontend during startup.)  The above design guarantees that notifies from
+ * other backends will never be missed by ignoring self-notifies.  Note,
+ * however, that we do *not* guarantee that a separate frontend message will
+ * be sent for every outside NOTIFY.  Since there is only room for one
+ * originating PID in pg_listener, outside notifies occurring at about the
+ * same time may be collapsed into a single message bearing the PID of the
+ * first outside backend to perform the NOTIFY.
+ *-------------------------------------------------------------------------
  */
 
 #include <unistd.h>
@@ -44,90 +85,59 @@
 
 #include "postgres.h"
 
+#include "commands/async.h"
 #include "access/heapam.h"
 #include "access/relscan.h"
 #include "access/xact.h"
 #include "catalog/catname.h"
 #include "catalog/pg_listener.h"
-#include "commands/async.h"
 #include "fmgr.h"
 #include "lib/dllist.h"
 #include "libpq/libpq.h"
 #include "miscadmin.h"
-#include "nodes/memnodes.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "tcop/dest.h"
-#include "utils/mcxt.h"
 #include "utils/syscache.h"
 #include <utils/trace.h>
 #include <utils/ps_status.h>
 
-#define NotifyUnlock pg_options[OPT_NOTIFYUNLOCK]
-#define NotifyHack	 pg_options[OPT_NOTIFYHACK]
-
+/* stuff that we really ought not be touching directly :-( */
 extern TransactionState CurrentTransactionState;
 extern CommandDest whereToSendOutput;
 
-GlobalMemory notifyContext = NULL;
-
-static int	notifyFrontEndPending = 0;
-static int	notifyIssued = 0;
+/*
+ * State for outbound notifies consists of a list of all relnames NOTIFYed
+ * in the current transaction.  We do not actually perform a NOTIFY until
+ * and unless the transaction commits.  pendingNotifies is NULL if no
+ * NOTIFYs have been done in the current transaction.
+ */
 static Dllist *pendingNotifies = NULL;
 
-static int	AsyncExistsPendingNotify(char *);
-static void ClearPendingNotify(void);
-static void Async_NotifyFrontEnd(void);
-static void Async_NotifyFrontEnd_Aux(void);
-void		Async_Unlisten(char *relname, int pid);
-static void Async_UnlistenOnExit(int code, char *relname);
-static void Async_UnlistenAll(void);
-
 /*
- *--------------------------------------------------------------
- * Async_NotifyHandler --
- *
- *		This is the signal handler for SIGUSR2.  When the backend
- *		is signaled, the backend can be in two states.
- *		1. If the backend is in the middle of another transaction,
- *		   we set the flag, notifyFrontEndPending, and wait until
- *		   the end of the transaction to notify the front end.
- *		2. If the backend is not in the middle of another transaction,
- *		   we notify the front end immediately.
- *
- *		-- jw, 12/28/93
- * Results:
- *		none
- *
- * Side effects:
- *		none
+ * State for inbound notifies consists of two flags: one saying whether
+ * the signal handler is currently allowed to call ProcessIncomingNotify
+ * directly, and one saying whether the signal has occurred but the handler
+ * was not allowed to call ProcessIncomingNotify at the time.
+ *
+ * NB: the "volatile" on these declarations is critical!  If your compiler
+ * does not grok "volatile", you'd be best advised to compile this file
+ * with all optimization turned off.
  */
-void
-Async_NotifyHandler(SIGNAL_ARGS)
-{
-	TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
+static volatile int	notifyInterruptEnabled = 0;
+static volatile int	notifyInterruptOccurred = 0;
 
-	if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-		(CurrentTransactionState->blockState == TRANS_DEFAULT))
-	{
-		TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
-				"waking up sleeping backend process");
-		PS_SET_STATUS("async_notify");
-		Async_NotifyFrontEnd();
-		PS_SET_STATUS("idle");
-	}
-	else
-	{
-		TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
-			 "process in middle of transaction, state=%d, blockstate=%d",
-				CurrentTransactionState->state,
-				CurrentTransactionState->blockState);
-		notifyFrontEndPending = 1;
-		TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
-	}
+/* True if we've registered an on_shmem_exit cleanup (or at least tried to). */
+static int	unlistenExitRegistered = 0;
+
+
+static void Async_UnlistenAll(void);
+static void Async_UnlistenOnExit(void);
+static void ProcessIncomingNotify(void);
+static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
+static int	AsyncExistsPendingNotify(char *relname);
+static void ClearPendingNotifies(void);
 
-	TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
-}
 
 /*
  *--------------------------------------------------------------
@@ -136,253 +146,40 @@ Async_NotifyHandler(SIGNAL_ARGS)
  *		This is executed by the SQL notify command.
  *
  *		Adds the relation to the list of pending notifies.
- *		All notification happens at end of commit.
- *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- *
- *		All notification of backend processes happens here,
- *		then each backend notifies its corresponding front end at
- *		the end of commit.
- *
- *		-- jw, 12/28/93
+ *		Actual notification happens during transaction commit.
+ *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  *
  * Results:
  *		XXX
  *
- * Side effects:
- *		All tuples for relname in pg_listener are updated.
- *
  *--------------------------------------------------------------
  */
 void
 Async_Notify(char *relname)
 {
-
-	HeapTuple	lTuple,
-				rTuple;
-	Relation	lRel;
-	HeapScanDesc sRel;
-	TupleDesc	tdesc;
-	ScanKeyData key;
-	Datum		d,
-				value[3];
-	bool		isnull;
-	char		repl[3],
-				nulls[3];
-
 	char	   *notifyName;
 
 	TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
 
-	if (!pendingNotifies)
-		pendingNotifies = DLNewList();
-
 	/*
-	 * Allocate memory from the global malloc pool because it needs to be
-	 * referenced also when the transaction is finished.  DZ - 26-08-1996
+	 * We allocate list memory from the global malloc pool to ensure that
+	 * it will live until we want to use it.  This is probably not necessary
+	 * any longer, since we will use it before the end of the transaction.
+	 * DLList only knows how to use malloc() anyway, but we could probably
+	 * palloc() the strings...
 	 */
+	if (!pendingNotifies)
+		pendingNotifies = DLNewList();
 	notifyName = strdup(relname);
 	DLAddHead(pendingNotifies, DLNewElem(notifyName));
-
-	ScanKeyEntryInitialize(&key, 0,
-						   Anum_pg_listener_relname,
-						   F_NAMEEQ,
-						   PointerGetDatum(notifyName));
-
-	lRel = heap_openr(ListenerRelationName);
-	tdesc = RelationGetDescr(lRel);
-	RelationSetLockForWrite(lRel);
-	sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
-
-	nulls[0] = nulls[1] = nulls[2] = ' ';
-	repl[0] = repl[1] = repl[2] = ' ';
-	repl[Anum_pg_listener_notify - 1] = 'r';
-	value[0] = value[1] = value[2] = (Datum) 0;
-	value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
-
-	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
-	{
-		d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
-		if (!DatumGetInt32(d))
-		{
-			rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
-			heap_replace(lRel, &lTuple->t_ctid, rTuple);
-			/* notify is really issued only if a tuple has been changed */
-			notifyIssued = 1;
-		}
-	}
-	heap_endscan(sRel);
-
 	/*
-	 * Note: if the write lock is unset we can get multiple tuples with
-	 * same oid if other backends notify the same relation. Use this
-	 * option at your own risk.
+	 * NOTE: we could check to see if pendingNotifies already has an entry
+	 * for relname, and thus avoid making duplicate entries.  However, most
+	 * apps probably don't notify the same name multiple times per transaction,
+	 * so we'd likely just be wasting cycles to make such a check.
+	 * AsyncExistsPendingNotify() doesn't really care whether the list
+	 * contains duplicates...
 	 */
-	if (NotifyUnlock)
-		RelationUnsetLockForWrite(lRel);
-
-	heap_close(lRel);
-
-	TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
-}
-
-/*
- *--------------------------------------------------------------
- * Async_NotifyAtCommit --
- *
- *		This is called at transaction commit.
- *
- *		Signal our corresponding frontend process on relations that
- *		were notified.	Signal all other backend process that
- *		are listening also.
- *
- *		-- jw, 12/28/93
- *
- * Results:
- *		XXX
- *
- * Side effects:
- *		Tuples in pg_listener that has our listenerpid are updated so
- *		that the notification is 0.  We do not want to notify frontend
- *		more than once.
- *
- *		-- jw, 12/28/93
- *
- *--------------------------------------------------------------
- */
-void
-Async_NotifyAtCommit()
-{
-	HeapTuple	lTuple;
-	Relation	lRel;
-	HeapScanDesc sRel;
-	TupleDesc	tdesc;
-	ScanKeyData key;
-	Datum		d;
-	bool		isnull;
-	extern TransactionState CurrentTransactionState;
-
-	if (!pendingNotifies)
-		pendingNotifies = DLNewList();
-
-	if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-		(CurrentTransactionState->blockState == TRANS_DEFAULT))
-	{
-		if (notifyIssued)
-		{
-			/* 'notify <relname>' issued by us */
-			notifyIssued = 0;
-			StartTransactionCommand();
-			TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
-			ScanKeyEntryInitialize(&key, 0,
-								   Anum_pg_listener_notify,
-								   F_INT4EQ,
-								   Int32GetDatum(1));
-			lRel = heap_openr(ListenerRelationName);
-			RelationSetLockForWrite(lRel);
-			sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
-			tdesc = RelationGetDescr(lRel);
-
-			while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
-			{
-				d = heap_getattr(lTuple, Anum_pg_listener_relname,
-								 tdesc, &isnull);
-
-				if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
-				{
-					d = heap_getattr(lTuple, Anum_pg_listener_pid,
-									 tdesc, &isnull);
-
-					if (MyProcPid == DatumGetInt32(d))
-					{
-						notifyFrontEndPending = 1;
-						TPRINTF(TRACE_NOTIFY,
-								"Async_NotifyAtCommit: notifying self");
-					}
-					else
-					{
-						TPRINTF(TRACE_NOTIFY,
-								"Async_NotifyAtCommit: notifying pid %d",
-								DatumGetInt32(d));
-#ifdef HAVE_KILL
-						if (kill(DatumGetInt32(d), SIGUSR2) < 0)
-						{
-							if (errno == ESRCH)
-								heap_delete(lRel, &lTuple->t_ctid);
-						}
-#endif
-					}
-				}
-			}
-			heap_endscan(sRel);
-			heap_close(lRel);
-
-			/*
-			 * Notify the frontend inside the current transaction while we
-			 * still have a valid write lock on pg_listeners. This avoid
-			 * waiting until all other backends have finished with
-			 * pg_listener.
-			 */
-			if (notifyFrontEndPending)
-			{
-				/* The aux version is called inside transaction */
-				Async_NotifyFrontEnd_Aux();
-			}
-
-			TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit: done");
-			CommitTransactionCommand();
-		}
-		else
-		{
-
-			/*
-			 * No notifies issued by us. If notifyFrontEndPending has been
-			 * set by Async_NotifyHandler notify the frontend of pending
-			 * notifies from other backends.
-			 */
-			if (notifyFrontEndPending)
-				Async_NotifyFrontEnd();
-		}
-
-		ClearPendingNotify();
-	}
-}
-
-/*
- *--------------------------------------------------------------
- * Async_NotifyAtAbort --
- *
- *		This is called at transaction commit.
- *
- *		Gets rid of pending notifies.  List elements are automatically
- *		freed through memory context.
- *
- *
- * Results:
- *		XXX
- *
- * Side effects:
- *		XXX
- *
- *--------------------------------------------------------------
- */
-void
-Async_NotifyAtAbort()
-{
-	if (pendingNotifies)
-	{
-		ClearPendingNotify();
-		DLFreeList(pendingNotifies);
-	}
-	pendingNotifies = DLNewList();
-	notifyIssued = 0;
-
-	if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-		(CurrentTransactionState->blockState == TRANS_DEFAULT))
-	{
-		/* don't forget to notify front end */
-		if (notifyFrontEndPending)
-			Async_NotifyFrontEnd();
-	}
 }
 
 /*
@@ -394,108 +191,94 @@ Async_NotifyAtAbort()
  *		Register a backend (identified by its Unix PID) as listening
  *		on the specified relation.
  *
- *		One listener per relation, pg_listener relation is keyed
- *		on (relname,pid) to provide multiple listeners in future.
- *
  * Results:
- *		pg_listeners is updated.
+ *		XXX
  *
  * Side effects:
- *		XXX
+ *		pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 void
 Async_Listen(char *relname, int pid)
 {
-	Datum		values[Natts_pg_listener];
-	char		nulls[Natts_pg_listener];
+	Relation	lRel;
 	TupleDesc	tdesc;
 	HeapScanDesc scan;
 	HeapTuple	tuple,
 				newtup;
-	Relation	lDesc;
+	Datum		values[Natts_pg_listener];
+	char		nulls[Natts_pg_listener];
 	Datum		d;
 	int			i;
 	bool		isnull;
 	int			alreadyListener = 0;
-	char	   *relnamei;
 	TupleDesc	tupDesc;
 
-	if (whereToSendOutput != Remote)
-	{
-		elog(NOTICE, "Async_Listen: "
-			 "listen not available on interactive sessions");
-		return;
-	}
-
 	TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
-	for (i = 0; i < Natts_pg_listener; i++)
-	{
-		nulls[i] = ' ';
-		values[i] = PointerGetDatum(NULL);
-	}
 
-	i = 0;
-	values[i++] = (Datum) relname;
-	values[i++] = (Datum) pid;
-	values[i++] = (Datum) 0;	/* no notifies pending */
-
-	lDesc = heap_openr(ListenerRelationName);
-	RelationSetLockForWrite(lDesc);
+	lRel = heap_openr(ListenerRelationName);
+	RelationSetLockForWrite(lRel);
+	tdesc = RelationGetDescr(lRel);
 
-	/* is someone already listening.  One listener per relation */
-	tdesc = RelationGetDescr(lDesc);
-	scan = heap_beginscan(lDesc, 0, SnapshotNow, 0, (ScanKey) NULL);
+	/* Detect whether we are already listening on this relname */
+	scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
 	while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
 	{
-		d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc,
-						 &isnull);
-		relnamei = DatumGetPointer(d);
-		if (!strncmp(relnamei, relname, NAMEDATALEN))
+		d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, &isnull);
+		if (!strncmp((char *) DatumGetPointer(d), relname, NAMEDATALEN))
 		{
 			d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);
-			pid = DatumGetInt32(d);
-			if (pid == MyProcPid)
+			if (DatumGetInt32(d) == pid)
+			{
 				alreadyListener = 1;
-		}
-		if (alreadyListener)
-		{
-			/* No need to scan the rest of the table */
-			break;
+				/* No need to scan the rest of the table */
+				break;
+			}
 		}
 	}
 	heap_endscan(scan);
 
 	if (alreadyListener)
 	{
-		elog(NOTICE, "Async_Listen: We are already listening on %s",
-			 relname);
-		RelationUnsetLockForWrite(lDesc);
-		heap_close(lDesc);
+		elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
+		RelationUnsetLockForWrite(lRel);
+		heap_close(lRel);
 		return;
 	}
 
-	tupDesc = lDesc->rd_att;
-	newtup = heap_formtuple(tupDesc, values, nulls);
-	heap_insert(lDesc, newtup);
-	pfree(newtup);
-
 	/*
-	 * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
-	 * listener on %s (possibly dead)",relname); }
+	 * OK to insert a new tuple
 	 */
 
-	RelationUnsetLockForWrite(lDesc);
-	heap_close(lDesc);
+	for (i = 0; i < Natts_pg_listener; i++)
+	{
+		nulls[i] = ' ';
+		values[i] = PointerGetDatum(NULL);
+	}
+
+	i = 0;
+	values[i++] = (Datum) relname;
+	values[i++] = (Datum) pid;
+	values[i++] = (Datum) 0;	/* no notifies pending */
+
+	tupDesc = lRel->rd_att;
+	newtup = heap_formtuple(tupDesc, values, nulls);
+	heap_insert(lRel, newtup);
+	pfree(newtup);
+
+	RelationUnsetLockForWrite(lRel);
+	heap_close(lRel);
 
 	/*
-	 * now that we are listening, we should make a note to ourselves to
-	 * unlisten prior to dying.
+	 * now that we are listening, make sure we will unlisten before dying.
 	 */
-	relnamei = malloc(NAMEDATALEN);		/* persists to process exit */
-	StrNCpy(relnamei, relname, NAMEDATALEN);
-	on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei);
+	if (! unlistenExitRegistered)
+	{
+		if (on_shmem_exit(Async_UnlistenOnExit, (caddr_t) NULL) < 0)
+			elog(NOTICE, "Async_Listen: out of shmem_exit slots");
+		unlistenExitRegistered = 1;
+	}
 }
 
 /*
@@ -508,17 +291,17 @@ Async_Listen(char *relname, int pid)
  *		for the specified relation.
  *
  * Results:
- *		pg_listeners is updated.
+ *		XXX
  *
  * Side effects:
- *		XXX
+ *		pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 void
 Async_Unlisten(char *relname, int pid)
 {
-	Relation	lDesc;
+	Relation	lRel;
 	HeapTuple	lTuple;
 
 	/* Handle specially the `unlisten "*"' command */
@@ -530,17 +313,21 @@ Async_Unlisten(char *relname, int pid)
 
 	TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
 
+	/* Note we assume there can be only one matching tuple. */
 	lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
 								 Int32GetDatum(pid),
 								 0, 0);
 	if (lTuple != NULL)
 	{
-		lDesc = heap_openr(ListenerRelationName);
-		RelationSetLockForWrite(lDesc);
-		heap_delete(lDesc, &lTuple->t_ctid);
-		RelationUnsetLockForWrite(lDesc);
-		heap_close(lDesc);
+		lRel = heap_openr(ListenerRelationName);
+		RelationSetLockForWrite(lRel);
+		heap_delete(lRel, &lTuple->t_ctid);
+		RelationUnsetLockForWrite(lRel);
+		heap_close(lRel);
 	}
+	/* We do not complain about unlistening something not being listened;
+	 * should we?
+	 */
 }
 
 /*
@@ -549,187 +336,487 @@ Async_Unlisten(char *relname, int pid)
  *
  *		Unlisten all relations for this backend.
  *
+ *		This is invoked by UNLISTEN "*" command, and also at backend exit.
+ *
  * Results:
- *		pg_listeners is updated.
+ *		XXX
  *
  * Side effects:
- *		XXX
+ *		pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 static void
 Async_UnlistenAll()
 {
-	HeapTuple	lTuple;
 	Relation	lRel;
-	HeapScanDesc sRel;
 	TupleDesc	tdesc;
+	HeapScanDesc sRel;
+	HeapTuple	lTuple;
 	ScanKeyData key[1];
 
 	TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
+
+	lRel = heap_openr(ListenerRelationName);
+	RelationSetLockForWrite(lRel);
+	tdesc = RelationGetDescr(lRel);
+
+	/* Find and delete all entries with my listenerPID */
 	ScanKeyEntryInitialize(&key[0], 0,
 						   Anum_pg_listener_pid,
 						   F_INT4EQ,
 						   Int32GetDatum(MyProcPid));
-	lRel = heap_openr(ListenerRelationName);
-	RelationSetLockForWrite(lRel);
-	tdesc = RelationGetDescr(lRel);
 	sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
 
 	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
 		heap_delete(lRel, &lTuple->t_ctid);
+
 	heap_endscan(sRel);
 	RelationUnsetLockForWrite(lRel);
 	heap_close(lRel);
-	TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll: done");
 }
 
 /*
- * --------------------------------------------------------------
+ *--------------------------------------------------------------
  * Async_UnlistenOnExit --
  *
- *		This is called at backend exit for each registered listen.
+ *		Clean up the pg_listener table at backend exit.
+ *
+ *		This is executed if we have done any LISTENs in this backend.
+ *		It might not be necessary anymore, if the user UNLISTENed everything,
+ *		but we don't try to detect that case.
  *
  * Results:
  *		XXX
  *
- * --------------------------------------------------------------
- */
-static void
-Async_UnlistenOnExit(int code,	/* from exitpg */
-					 char *relname)
-{
-	Async_Unlisten((char *) relname, MyProcPid);
-}
-
-/*
- * --------------------------------------------------------------
- * Async_NotifyFrontEnd --
- *
- *		This is called outside transactions. The real work is done
- *		by Async_NotifyFrontEnd_Aux().
+ * Side effects:
+ *		pg_listener is updated if necessary.
  *
- * --------------------------------------------------------------
+ *--------------------------------------------------------------
  */
 static void
-Async_NotifyFrontEnd()
+Async_UnlistenOnExit()
 {
+	/*
+	 * We need to start/commit a transaction for the unlisten,
+	 * but if there is already an active transaction we had better
+	 * abort that one first.  Otherwise we'd end up committing changes
+	 * that probably ought to be discarded.
+	 */
+	AbortOutOfAnyTransaction();
+	/* Now we can do the unlisten */
 	StartTransactionCommand();
-	Async_NotifyFrontEnd_Aux();
+	Async_UnlistenAll();
 	CommitTransactionCommand();
 }
 
 /*
- * --------------------------------------------------------------
- * Async_NotifyFrontEnd_Aux --
+ *--------------------------------------------------------------
+ * AtCommit_Notify --
  *
- *		This must be called inside a transaction block.
+ *		This is called at transaction commit.
  *
- *		Perform an asynchronous notification to front end over
- *		portal comm channel.  The name of the relation which contains the
- *		data is sent to the front end.
+ *		If there are outbound notify requests in the pendingNotifies list,
+ *		scan pg_listener for matching tuples, and either signal the other
+ *		backend or send a message to our own frontend.
  *
- *		We remove the notification flag from the pg_listener tuple
- *		associated with our process.
+ *		NOTE: we are still inside the current transaction, therefore can
+ *		piggyback on its committing of changes.
  *
  * Results:
  *		XXX
  *
- * --------------------------------------------------------------
+ * Side effects:
+ *		Tuples in pg_listener that have matching relnames and other peoples'
+ *		listenerPIDs are updated with a nonzero notification field.
+ *
+ *--------------------------------------------------------------
  */
-static void
-Async_NotifyFrontEnd_Aux()
+void
+AtCommit_Notify()
 {
-	HeapTuple	lTuple,
-				rTuple;
 	Relation	lRel;
-	HeapScanDesc sRel;
 	TupleDesc	tdesc;
-	ScanKeyData key[2];
+	HeapScanDesc sRel;
+	HeapTuple	lTuple,
+				rTuple;
 	Datum		d,
-				value[3];
-	char		repl[3],
-				nulls[3];
+				value[Natts_pg_listener];
+	char		repl[Natts_pg_listener],
+				nulls[Natts_pg_listener];
 	bool		isnull;
+	char	   *relname;
+	int32		listenerPID;
 
-#define MAX_DONE 64
+	if (!pendingNotifies)
+		return;					/* no NOTIFY statements in this transaction */
 
-	char	   *done[MAX_DONE];
-	int			ndone = 0;
-	int			i;
+	/* NOTIFY is disabled if not normal processing mode.
+	 * This test used to be in xact.c, but it seems cleaner to do it here.
+	 */
+	if (! IsNormalProcessingMode())
+	{
+		ClearPendingNotifies();
+		return;
+	}
 
-	notifyFrontEndPending = 0;
+	TPRINTF(TRACE_NOTIFY, "AtCommit_Notify");
 
-	TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
-	StartTransactionCommand();
-	ScanKeyEntryInitialize(&key[0], 0,
-						   Anum_pg_listener_notify,
-						   F_INT4EQ,
-						   Int32GetDatum(1));
-	ScanKeyEntryInitialize(&key[1], 0,
-						   Anum_pg_listener_pid,
-						   F_INT4EQ,
-						   Int32GetDatum(MyProcPid));
 	lRel = heap_openr(ListenerRelationName);
 	RelationSetLockForWrite(lRel);
 	tdesc = RelationGetDescr(lRel);
-	sRel = heap_beginscan(lRel, 0, SnapshotNow, 2, key);
+	sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
 
+	/* preset data to update notify column to MyProcPid */
 	nulls[0] = nulls[1] = nulls[2] = ' ';
 	repl[0] = repl[1] = repl[2] = ' ';
 	repl[Anum_pg_listener_notify - 1] = 'r';
 	value[0] = value[1] = value[2] = (Datum) 0;
-	value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
+	value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
 
 	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
 	{
-		d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc,
-						 &isnull);
+		d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
+		relname = (char *) DatumGetPointer(d);
 
-		/*
-		 * This hack deletes duplicate tuples which can be left in the
-		 * table if the NotifyUnlock option is set. I'm further
-		 * investigating this.	-- dz
-		 */
-		if (NotifyHack)
+		if (AsyncExistsPendingNotify(relname))
 		{
-			for (i = 0; i < ndone; i++)
+			d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
+			listenerPID = DatumGetInt32(d);
+
+			if (listenerPID == MyProcPid)
+			{
+				/* Self-notify: no need to bother with table update.
+				 * Indeed, we *must not* clear the notification field in
+				 * this path, or we could lose an outside notify, which'd be
+				 * bad for applications that ignore self-notify messages.
+				 */
+				TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self");
+				NotifyMyFrontEnd(relname, listenerPID);
+			}
+			else
 			{
-				if (strcmp(DatumGetName(d)->data, done[i]) == 0)
+				TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d",
+						listenerPID);
+				/*
+				 * If someone has already notified this listener,
+				 * we don't bother modifying the table, but we do still send
+				 * a SIGUSR2 signal, just in case that backend missed the
+				 * earlier signal for some reason.  It's OK to send the signal
+				 * first, because the other guy can't read pg_listener until
+				 * we unlock it.
+				 */
+#ifdef HAVE_KILL
+				if (kill(listenerPID, SIGUSR2) < 0)
 				{
-					TPRINTF(TRACE_NOTIFY,
-							"Async_NotifyFrontEnd: duplicate %s",
-							DatumGetName(d)->data);
+					/* Get rid of pg_listener entry if it refers to a PID
+					 * that no longer exists.  Presumably, that backend
+					 * crashed without deleting its pg_listener entries.
+					 * This code used to only delete the entry if errno==ESRCH,
+					 * but as far as I can see we should just do it for any
+					 * failure (certainly at least for EPERM too...)
+					 */
 					heap_delete(lRel, &lTuple->t_ctid);
-					continue;
 				}
+				else
+#endif
+				{
+					d = heap_getattr(lTuple, Anum_pg_listener_notify,
+									 tdesc, &isnull);
+					if (DatumGetInt32(d) == 0)
+					{
+						rTuple = heap_modifytuple(lTuple, lRel,
+												  value, nulls, repl);
+						heap_replace(lRel, &lTuple->t_ctid, rTuple);
+					}
+				}
+			}
+		}
+	}
+
+	heap_endscan(sRel);
+	/*
+	 * We do not do RelationUnsetLockForWrite(lRel) here, because the
+	 * transaction is about to be committed anyway.
+	 */
+	heap_close(lRel);
+
+	ClearPendingNotifies();
+
+	TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");
+}
+
+/*
+ *--------------------------------------------------------------
+ * AtAbort_Notify --
+ *
+ *		This is called at transaction abort.
+ *
+ *		Gets rid of pending outbound notifies that we would have executed
+ *		if the transaction got committed.
+ *
+ * Results:
+ *		XXX
+ *
+ *--------------------------------------------------------------
+ */
+void
+AtAbort_Notify()
+{
+	ClearPendingNotifies();
+}
+
+/*
+ *--------------------------------------------------------------
+ * Async_NotifyHandler --
+ *
+ *		This is the signal handler for SIGUSR2.
+ *
+ *		If we are idle (notifyInterruptEnabled is set), we can safely invoke
+ *		ProcessIncomingNotify directly.  Otherwise, just set a flag
+ *		to do it later.
+ *
+ * Results:
+ *		none
+ *
+ * Side effects:
+ *		per above
+ *--------------------------------------------------------------
+ */
+
+void
+Async_NotifyHandler(SIGNAL_ARGS)
+{
+	/*
+	 * Note: this is a SIGNAL HANDLER.  You must be very wary what you do here.
+	 * Some helpful soul had this routine sprinkled with TPRINTFs, which would
+	 * likely lead to corruption of stdio buffers if they were ever turned on.
+	 */
+
+	if (notifyInterruptEnabled)
+	{
+		/* I'm not sure whether some flavors of Unix might allow another
+		 * SIGUSR2 occurrence to recursively interrupt this routine.
+		 * To cope with the possibility, we do the same sort of dance that
+		 * EnableNotifyInterrupt must do --- see that routine for comments.
+		 */
+		notifyInterruptEnabled = 0;		/* disable any recursive signal */
+		notifyInterruptOccurred = 1;	/* do at least one iteration */
+		for (;;)
+		{
+			notifyInterruptEnabled = 1;
+			if (! notifyInterruptOccurred)
+				break;
+			notifyInterruptEnabled = 0;
+			if (notifyInterruptOccurred)
+			{
+				/* Here, it is finally safe to do stuff. */
+				TPRINTF(TRACE_NOTIFY,
+						"Async_NotifyHandler: perform async notify");
+				ProcessIncomingNotify();
+				TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
 			}
-			if (ndone < MAX_DONE)
-				done[ndone++] = pstrdup(DatumGetName(d)->data);
 		}
+	}
+	else
+	{
+		/* In this path it is NOT SAFE to do much of anything, except this: */
+		notifyInterruptOccurred = 1;
+	}
+}
+
+/*
+ * --------------------------------------------------------------
+ * EnableNotifyInterrupt --
+ *
+ *		This is called by the PostgresMain main loop just before waiting
+ *		for a frontend command.  If we are truly idle (ie, *not* inside
+ *		a transaction block), then process any pending inbound notifies,
+ *		and enable the signal handler to process future notifies directly.
+ *
+ *		NOTE: the signal handler starts out disabled, and stays so until
+ *		PostgresMain calls this the first time.
+ * --------------------------------------------------------------
+ */
+
+void
+EnableNotifyInterrupt(void)
+{
+	if (CurrentTransactionState->blockState != TRANS_DEFAULT)
+		return;					/* not really idle */
+
+	/*
+	 * This code is tricky because we are communicating with a signal
+	 * handler that could interrupt us at any point.  If we just checked
+	 * notifyInterruptOccurred and then set notifyInterruptEnabled, we
+	 * could fail to respond promptly to a signal that happens in between
+	 * those two steps.  (A very small time window, perhaps, but Murphy's
+	 * Law says you can hit it...)  Instead, we first set the enable flag,
+	 * then test the occurred flag.  If we see an unserviced interrupt
+	 * has occurred, we re-clear the enable flag before going off to do
+	 * the service work.  (That prevents re-entrant invocation of
+	 * ProcessIncomingNotify() if another interrupt occurs.)
+	 * If an interrupt comes in between the setting and clearing of
+	 * notifyInterruptEnabled, then it will have done the service
+	 * work and left notifyInterruptOccurred zero, so we have to check
+	 * again after clearing enable.  The whole thing has to be in a loop
+	 * in case another interrupt occurs while we're servicing the first.
+	 * Once we get out of the loop, enable is set and we know there is no
+	 * unserviced interrupt.
+	 *
+	 * NB: an overenthusiastic optimizing compiler could easily break this
+	 * code.  Hopefully, they all understand what "volatile" means these days.
+	 */
+	for (;;)
+	{
+		notifyInterruptEnabled = 1;
+		if (! notifyInterruptOccurred)
+			break;
+		notifyInterruptEnabled = 0;
+		if (notifyInterruptOccurred)
+		{
+			TPRINTF(TRACE_NOTIFY,
+					"EnableNotifyInterrupt: perform async notify");
+			ProcessIncomingNotify();
+			TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done");
+		}
+	}
+}
+
+/*
+ * --------------------------------------------------------------
+ * DisableNotifyInterrupt --
+ *
+ *		This is called by the PostgresMain main loop just after receiving
+ *		a frontend command.  Signal handler execution of inbound notifies
+ *		is disabled until the next EnableNotifyInterrupt call.
+ * --------------------------------------------------------------
+ */
+
+void
+DisableNotifyInterrupt(void)
+{
+	notifyInterruptEnabled = 0;
+}
+
+/*
+ * --------------------------------------------------------------
+ * ProcessIncomingNotify --
+ *
+ *		Deal with arriving NOTIFYs from other backends.
+ *		This is called either directly from the SIGUSR2 signal handler,
+ *		or the next time control reaches the outer idle loop.
+ *		Scan pg_listener for arriving notifies, report them to my front end,
+ *		and clear the notification field in pg_listener until next time.
+ *
+ *		NOTE: since we are outside any transaction, we must create our own.
+ *
+ * Results:
+ *		XXX
+ *
+ * --------------------------------------------------------------
+ */
+static void
+ProcessIncomingNotify(void)
+{
+	Relation	lRel;
+	TupleDesc	tdesc;
+	ScanKeyData key[1];
+	HeapScanDesc sRel;
+	HeapTuple	lTuple,
+				rTuple;
+	Datum		d,
+				value[Natts_pg_listener];
+	char		repl[Natts_pg_listener],
+				nulls[Natts_pg_listener];
+	bool		isnull;
+	char	   *relname;
+	int32		sourcePID;
+
+	TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify");
+	PS_SET_STATUS("async_notify");
+
+	notifyInterruptOccurred = 0;
+
+	StartTransactionCommand();
 
-		rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
-		heap_replace(lRel, &lTuple->t_ctid, rTuple);
+	lRel = heap_openr(ListenerRelationName);
+	RelationSetLockForWrite(lRel);
+	tdesc = RelationGetDescr(lRel);
+
+	/* Scan only entries with my listenerPID */
+	ScanKeyEntryInitialize(&key[0], 0,
+						   Anum_pg_listener_pid,
+						   F_INT4EQ,
+						   Int32GetDatum(MyProcPid));
+	sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
 
-		/* notifying the front end */
-		TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: notifying %s",
-				DatumGetName(d)->data);
+	/* Prepare data for rewriting 0 into notification field */
+	nulls[0] = nulls[1] = nulls[2] = ' ';
+	repl[0] = repl[1] = repl[2] = ' ';
+	repl[Anum_pg_listener_notify - 1] = 'r';
+	value[0] = value[1] = value[2] = (Datum) 0;
+	value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
 
-		if (whereToSendOutput == Remote)
+	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+	{
+		d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
+		sourcePID = DatumGetInt32(d);
+		if (sourcePID != 0)
 		{
-			pq_putnchar("A", 1);
-			pq_putint((int32) MyProcPid, sizeof(int32));
-			pq_putstr(DatumGetName(d)->data);
-			pq_flush();
+			d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
+			relname = (char *) DatumGetPointer(d);
+			/* Notify the frontend */
+			TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d",
+					relname, (int) sourcePID);
+			NotifyMyFrontEnd(relname, sourcePID);
+			/* Rewrite the tuple with 0 in notification column */
+			rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
+			heap_replace(lRel, &lTuple->t_ctid, rTuple);
 		}
 	}
 	heap_endscan(sRel);
-	RelationUnsetLockForWrite(lRel);
+	/*
+	 * We do not do RelationUnsetLockForWrite(lRel) here, because the
+	 * transaction is about to be committed anyway.
+	 */
 	heap_close(lRel);
 
-	TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: done");
+	CommitTransactionCommand();
+
+	/* Must flush the notify messages to ensure frontend gets them promptly. */
+	pq_flush();
+
+	PS_SET_STATUS("idle");
+	TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");
 }
 
+/* Send NOTIFY message to my front end. */
+
+static void
+NotifyMyFrontEnd(char *relname, int32 listenerPID)
+{
+	if (whereToSendOutput == Remote)
+	{
+		pq_putnchar("A", 1);
+		pq_putint(listenerPID, sizeof(int32));
+		pq_putstr(relname);
+		/* NOTE: we do not do pq_flush() here.  For a self-notify, it will
+		 * happen at the end of the transaction, and for incoming notifies
+		 * ProcessIncomingNotify will do it after finding all the notifies.
+		 */
+	}
+	else
+	{
+		elog(NOTICE, "NOTIFY for %s", relname);
+	}
+}
+
+/* Does pendingNotifies include the given relname?
+ *
+ * NB: not called unless pendingNotifies != NULL.
+ */
+
 static int
 AsyncExistsPendingNotify(char *relname)
 {
@@ -747,11 +834,26 @@ AsyncExistsPendingNotify(char *relname)
 	return 0;
 }
 
+/* Clear the pendingNotifies list. */
+
 static void
-ClearPendingNotify()
+ClearPendingNotifies()
 {
 	Dlelem	   *p;
 
-	while ((p = DLRemHead(pendingNotifies)) != NULL)
-		free(DLE_VAL(p));
+	if (pendingNotifies)
+	{
+		/* Since the referenced strings are malloc'd, we have to scan the
+		 * list and delete them individually.  If we used palloc for the
+		 * strings then we could just do DLFreeList to get rid of both
+		 * the list nodes and the list base...
+		 */
+		while ((p = DLRemHead(pendingNotifies)) != NULL)
+		{
+			free(DLE_VAL(p));
+			DLFreeElem(p);
+		}
+		DLFreeList(pendingNotifies);
+		pendingNotifies = NULL;
+	}
 }
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 566b15c5b0d..8394fcdea77 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.90 1998/10/02 01:14:14 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.91 1998/10/06 02:40:01 tgl Exp $
  *
  * NOTES
  *	  this is the "main" module of the postgres backend and
@@ -1511,7 +1511,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
 	if (!IsUnderPostmaster)
 	{
 		puts("\nPOSTGRES backend interactive interface ");
-		puts("$Revision: 1.90 $ $Date: 1998/10/02 01:14:14 $\n");
+		puts("$Revision: 1.91 $ $Date: 1998/10/06 02:40:01 $\n");
 	}
 
 	/* ----------------
@@ -1559,7 +1559,16 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
 		ReadyForQuery(whereToSendOutput);
 
 		/* ----------------
-		 *	 (2) read a command.
+		 *	 (2) deal with pending asynchronous NOTIFY from other backends,
+		 *   and enable async.c's signal handler to execute NOTIFY directly.
+		 * ----------------
+		 */
+		QueryCancel = false;	/* forget any earlier CANCEL signal */
+
+		EnableNotifyInterrupt();
+
+		/* ----------------
+		 *	 (3) read a command.
 		 * ----------------
 		 */
 		MemSet(parser_input, 0, MAX_PARSE_BUFFER);
@@ -1569,7 +1578,13 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
 		QueryCancel = false;	/* forget any earlier CANCEL signal */
 
 		/* ----------------
-		 *	 (3) process the command.
+		 *	 (4) disable async.c's signal handler.
+		 * ----------------
+		 */
+		DisableNotifyInterrupt();
+
+		/* ----------------
+		 *	 (5) process the command.
 		 * ----------------
 		 */
 		switch (firstchar)
@@ -1640,7 +1655,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
 		}
 
 		/* ----------------
-		 *	 (4) commit the current transaction
+		 *	 (6) commit the current transaction
 		 *
 		 *	 Note: if we had an empty input buffer, then we didn't
 		 *	 call pg_exec_query, so we don't bother to commit this transaction.
diff --git a/src/backend/utils/misc/trace.c b/src/backend/utils/misc/trace.c
index fb3289d7bc1..8adbb075be1 100644
--- a/src/backend/utils/misc/trace.c
+++ b/src/backend/utils/misc/trace.c
@@ -70,10 +70,6 @@ static char *opt_names[] = {
 	"syslog",					/* use syslog for error messages */
 	"hostlookup",				/* enable hostname lookup in ps_status */
 	"showportnumber",			/* show port number in ps_status */
-	"notifyunlock",				/* enable unlock of pg_listener after
-								 * notify */
-	"notifyhack"				/* enable notify hack to remove duplicate
-								 * tuples */
 };
 
 /*
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index a6129100477..d006264ae12 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: xact.h,v 1.15 1998/09/01 04:34:35 momjian Exp $
+ * $Id: xact.h,v 1.16 1998/10/06 02:40:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -107,6 +107,7 @@ extern void BeginTransactionBlock(void);
 extern void EndTransactionBlock(void);
 extern bool IsTransactionBlock(void);
 extern void UserAbortTransactionBlock(void);
+extern void AbortOutOfAnyTransaction(void);
 
 extern TransactionId DisabledTransactionId;
 
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index 2c9d0a348a5..5494b0f6c7e 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -1,27 +1,38 @@
 /*-------------------------------------------------------------------------
  *
  * async.h--
- *
- *
+ *	  Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: async.h,v 1.9 1998/09/01 04:35:22 momjian Exp $
+ * $Id: async.h,v 1.10 1998/10/06 02:40:08 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef ASYNC_H
 #define ASYNC_H
 
-#include <nodes/memnodes.h>
+#include <postgres.h>
 
-extern void Async_NotifyHandler(SIGNAL_ARGS);
+/* notify-related SQL statements */
 extern void Async_Notify(char *relname);
-extern void Async_NotifyAtCommit(void);
-extern void Async_NotifyAtAbort(void);
 extern void Async_Listen(char *relname, int pid);
 extern void Async_Unlisten(char *relname, int pid);
 
-extern GlobalMemory notifyContext;
+/* perform (or cancel) outbound notify processing at transaction commit */
+extern void AtCommit_Notify(void);
+extern void AtAbort_Notify(void);
+
+/* signal handler for inbound notifies (SIGUSR2) */
+extern void Async_NotifyHandler(SIGNAL_ARGS);
+
+/*
+ * enable/disable processing of inbound notifies directly from signal handler.
+ * The enable routine first performs processing of any inbound notifies that
+ * have occurred since the last disable.  These are meant to be called ONLY
+ * from the appropriate places in PostgresMain().
+ */
+extern void EnableNotifyInterrupt(void);
+extern void DisableNotifyInterrupt(void);
 
 #endif	 /* ASYNC_H */
diff --git a/src/include/utils/trace.h b/src/include/utils/trace.h
index 8f716393c67..d978f16ab44 100644
--- a/src/include/utils/trace.h
+++ b/src/include/utils/trace.h
@@ -66,10 +66,6 @@ enum pg_option_enum
 	OPT_SYSLOG,					/* use syslog for error messages */
 	OPT_HOSTLOOKUP,				/* enable hostname lookup in ps_status */
 	OPT_SHOWPORTNUMBER,			/* show port number in ps_status */
-	OPT_NOTIFYUNLOCK,			/* enable unlock of pg_listener after
-								 * notify */
-	OPT_NOTIFYHACK,				/* enable notify hack to remove duplicate
-								 * tuples */
 
 	NUM_PG_OPTIONS				/* must be the last item of enum */
 };
-- 
GitLab