From 597a87ccc9a6fa8af7f3cf280b1e24e41807d555 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Wed, 30 Nov 2016 12:00:00 -0500
Subject: [PATCH] Use latch instead of select() in walreceiver

Replace use of poll()/select() by WaitLatchOrSocket(), which is more
portable and flexible.

Also change walreceiver to use its procLatch instead of a custom latch.

From: Petr Jelinek <petr@2ndquadrant.com>
---
 src/backend/postmaster/pgstat.c               |   3 +
 .../libpqwalreceiver/libpqwalreceiver.c       | 101 +++++-------------
 src/backend/replication/walreceiver.c         |  18 ++--
 src/backend/replication/walreceiverfuncs.c    |   6 +-
 src/include/pgstat.h                          |   1 +
 src/include/replication/walreceiver.h         |   3 +-
 6 files changed, 43 insertions(+), 89 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index a3921977c57..c7584cb1d34 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3338,6 +3338,9 @@ pgstat_get_wait_client(WaitEventClient w)
 		case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
 			event_name = "WalReceiverWaitStart";
 			break;
+		case WAIT_EVENT_LIBPQWALRECEIVER_READ:
+			event_name = "LibPQWalReceiverRead";
+			break;
 		case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
 			event_name = "WalSenderWaitForWAL";
 			break;
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f1c843e868c..6c01e7b9918 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -23,19 +23,11 @@
 #include "pqexpbuffer.h"
 #include "access/xlog.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "replication/walreceiver.h"
+#include "storage/proc.h"
 #include "utils/builtins.h"
 
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#endif
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
 PG_MODULE_MAGIC;
 
 void		_PG_init(void);
@@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
 static void libpqrcv_disconnect(void);
 
 /* Prototypes for private functions */
-static bool libpq_select(int timeout_ms);
 static PGresult *libpqrcv_PQexec(const char *query);
 
 /*
@@ -366,67 +357,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
 	PQclear(res);
 }
 
-/*
- * Wait until we can read WAL stream, or timeout.
- *
- * Returns true if data has become available for reading, false if timed out
- * or interrupted by signal.
- *
- * This is based on pqSocketCheck.
- */
-static bool
-libpq_select(int timeout_ms)
-{
-	int			ret;
-
-	Assert(streamConn != NULL);
-	if (PQsocket(streamConn) < 0)
-		ereport(ERROR,
-				(errcode_for_socket_access(),
-				 errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
-
-	/* We use poll(2) if available, otherwise select(2) */
-	{
-#ifdef HAVE_POLL
-		struct pollfd input_fd;
-
-		input_fd.fd = PQsocket(streamConn);
-		input_fd.events = POLLIN | POLLERR;
-		input_fd.revents = 0;
-
-		ret = poll(&input_fd, 1, timeout_ms);
-#else							/* !HAVE_POLL */
-
-		fd_set		input_mask;
-		struct timeval timeout;
-		struct timeval *ptr_timeout;
-
-		FD_ZERO(&input_mask);
-		FD_SET(PQsocket(streamConn), &input_mask);
-
-		if (timeout_ms < 0)
-			ptr_timeout = NULL;
-		else
-		{
-			timeout.tv_sec = timeout_ms / 1000;
-			timeout.tv_usec = (timeout_ms % 1000) * 1000;
-			ptr_timeout = &timeout;
-		}
-
-		ret = select(PQsocket(streamConn) + 1, &input_mask,
-					 NULL, NULL, ptr_timeout);
-#endif   /* HAVE_POLL */
-	}
-
-	if (ret == 0 || (ret < 0 && errno == EINTR))
-		return false;
-	if (ret < 0)
-		ereport(ERROR,
-				(errcode_for_socket_access(),
-				 errmsg("select() failed: %m")));
-	return true;
-}
-
 /*
  * Send a query and wait for the results by using the asynchronous libpq
  * functions and the backend version of select().
@@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
 		 */
 		while (PQisBusy(streamConn))
 		{
+			int			rc;
+
 			/*
 			 * We don't need to break down the sleep into smaller increments,
-			 * and check for interrupts after each nap, since we can just
-			 * elog(FATAL) within SIGTERM signal handler if the signal arrives
-			 * in the middle of establishment of replication connection.
+			 * since we'll get interrupted by signals and can either handle
+			 * interrupts here or elog(FATAL) within SIGTERM signal handler if
+			 * the signal arrives in the middle of establishment of
+			 * replication connection.
 			 */
-			if (!libpq_select(-1))
-				continue;		/* interrupted */
+			ResetLatch(&MyProc->procLatch);
+			rc = WaitLatchOrSocket(&MyProc->procLatch,
+								   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+								   WL_LATCH_SET,
+								   PQsocket(streamConn),
+								   0,
+								   WAIT_EVENT_LIBPQWALRECEIVER_READ);
+			if (rc & WL_POSTMASTER_DEATH)
+				exit(1);
+
+			/* interrupted */
+			if (rc & WL_LATCH_SET)
+			{
+				CHECK_FOR_INTERRUPTS();
+				continue;
+			}
 			if (PQconsumeInput(streamConn) == 0)
 				return NULL;	/* trouble */
 		}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2bb3dce1b1c..8bfb0415608 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -261,7 +261,7 @@ WalReceiverMain(void)
 	/* Arrange to clean up at walreceiver exit */
 	on_shmem_exit(WalRcvDie, 0);
 
-	OwnLatch(&walrcv->latch);
+	walrcv->latch = &MyProc->procLatch;
 
 	/* Properly accept or ignore signals the postmaster might send us */
 	pqsignal(SIGHUP, WalRcvSigHupHandler);		/* set flag to read config
@@ -483,7 +483,7 @@ WalReceiverMain(void)
 				 * avoiding some system calls.
 				 */
 				Assert(wait_fd != PGINVALID_SOCKET);
-				rc = WaitLatchOrSocket(&walrcv->latch,
+				rc = WaitLatchOrSocket(walrcv->latch,
 								   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
 									   WL_TIMEOUT | WL_LATCH_SET,
 									   wait_fd,
@@ -491,7 +491,7 @@ WalReceiverMain(void)
 									   WAIT_EVENT_WAL_RECEIVER_MAIN);
 				if (rc & WL_LATCH_SET)
 				{
-					ResetLatch(&walrcv->latch);
+					ResetLatch(walrcv->latch);
 					if (walrcv->force_reply)
 					{
 						/*
@@ -652,7 +652,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	WakeupRecovery();
 	for (;;)
 	{
-		ResetLatch(&walrcv->latch);
+		ResetLatch(walrcv->latch);
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -687,7 +687,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 		}
 		SpinLockRelease(&walrcv->mutex);
 
-		WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
+		WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
 				  WAIT_EVENT_WAL_RECEIVER_WAIT_START);
 	}
 
@@ -763,7 +763,7 @@ WalRcvDie(int code, Datum arg)
 	/* Ensure that all WAL records received are flushed to disk */
 	XLogWalRcvFlush(true);
 
-	DisownLatch(&walrcv->latch);
+	walrcv->latch = NULL;
 
 	SpinLockAcquire(&walrcv->mutex);
 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
@@ -812,7 +812,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
 
 	got_SIGTERM = true;
 
-	SetLatch(&WalRcv->latch);
+	if (WalRcv->latch)
+		SetLatch(WalRcv->latch);
 
 	/* Don't joggle the elbow of proc_exit */
 	if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
@@ -1297,7 +1298,8 @@ void
 WalRcvForceReply(void)
 {
 	WalRcv->force_reply = true;
-	SetLatch(&WalRcv->latch);
+	if (WalRcv->latch)
+		SetLatch(WalRcv->latch);
 }
 
 /*
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 5f6e423f1f6..01111a4c12b 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -64,7 +64,7 @@ WalRcvShmemInit(void)
 		MemSet(WalRcv, 0, WalRcvShmemSize());
 		WalRcv->walRcvState = WALRCV_STOPPED;
 		SpinLockInit(&WalRcv->mutex);
-		InitSharedLatch(&WalRcv->latch);
+		WalRcv->latch = NULL;
 	}
 }
 
@@ -279,8 +279,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 
 	if (launch)
 		SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
-	else
-		SetLatch(&walrcv->latch);
+	else if (walrcv->latch)
+		SetLatch(walrcv->latch);
 }
 
 /*
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0b85b7ad3ae..152ff062085 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -763,6 +763,7 @@ typedef enum
 	WAIT_EVENT_CLIENT_WRITE,
 	WAIT_EVENT_SSL_OPEN_SERVER,
 	WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+	WAIT_EVENT_LIBPQWALRECEIVER_READ,
 	WAIT_EVENT_WAL_SENDER_WAIT_WAL,
 	WAIT_EVENT_WAL_SENDER_WRITE_DATA
 } WaitEventClient;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index cd787c92b3f..afbb8d8b954 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -127,8 +127,9 @@ typedef struct
 	 * where to start streaming (after setting receiveStart and
 	 * receiveStartTLI), and also to tell it to send apply feedback to the
 	 * primary whenever specially marked commit records are applied.
+	 * This is normally mapped to procLatch when walreceiver is running.
 	 */
-	Latch		latch;
+	Latch	   *latch;
 } WalRcvData;
 
 extern WalRcvData *WalRcv;
-- 
GitLab