diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 8cb2f13b27861b533845b85a4ea2bdd21c51ff5a..92ff632e124e679cf5e29470321b29813c6d90bf 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -410,6 +410,8 @@ ReceiveCopyBegin(CopyState cstate)
 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 			errmsg("COPY BINARY is not supported to stdout or from stdin")));
 		pq_putemptymessage('G');
+		/* any error in old protocol will make us lose sync */
+		pq_startmsgread();
 		cstate->copy_dest = COPY_OLD_FE;
 	}
 	else
@@ -420,6 +422,8 @@ ReceiveCopyBegin(CopyState cstate)
 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 			errmsg("COPY BINARY is not supported to stdout or from stdin")));
 		pq_putemptymessage('D');
+		/* any error in old protocol will make us lose sync */
+		pq_startmsgread();
 		cstate->copy_dest = COPY_OLD_FE;
 	}
 	/* We *must* flush here to ensure FE knows it can send. */
@@ -606,6 +610,8 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
 					int			mtype;
 
 			readmessage:
+					HOLD_CANCEL_INTERRUPTS();
+					pq_startmsgread();
 					mtype = pq_getbyte();
 					if (mtype == EOF)
 						ereport(ERROR,
@@ -615,6 +621,7 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
 						ereport(ERROR,
 								(errcode(ERRCODE_CONNECTION_FAILURE),
 								 errmsg("unexpected EOF on client connection with an open transaction")));
+					RESUME_CANCEL_INTERRUPTS();
 					switch (mtype)
 					{
 						case 'd':		/* CopyData */
@@ -2463,6 +2470,13 @@ CopyFrom(CopyState cstate)
 
 	MemoryContextSwitchTo(oldcontext);
 
+	/*
+	 * In the old protocol, tell pqcomm that we can process normal protocol
+	 * messages again.
+	 */
+	if (cstate->copy_dest == COPY_OLD_FE)
+		pq_endmsgread();
+
 	/* Execute AFTER STATEMENT insertion triggers */
 	ExecASInsertTriggers(estate, resultRelInfo);
 
diff --git a/src/backend/libpq/auth.c b/src/backend/libpq/auth.c
index e9c8e569e2a354ab086be5578ad14ce3c0f51c29..3f3cf4485ac5ec62c82b33e339264e8bfe680918 100644
--- a/src/backend/libpq/auth.c
+++ b/src/backend/libpq/auth.c
@@ -625,6 +625,7 @@ recv_password_packet(Port *port)
 {
 	StringInfoData buf;
 
+	pq_startmsgread();
 	if (PG_PROTOCOL_MAJOR(port->proto) >= 3)
 	{
 		/* Expect 'p' message type */
@@ -849,6 +850,7 @@ pg_GSS_recvauth(Port *port)
 	 */
 	do
 	{
+		pq_startmsgread();
 		mtype = pq_getbyte();
 		if (mtype != 'p')
 		{
@@ -1083,6 +1085,7 @@ pg_SSPI_recvauth(Port *port)
 	 */
 	do
 	{
+		pq_startmsgread();
 		mtype = pq_getbyte();
 		if (mtype != 'p')
 		{
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index e3efac34ce4e768597c1b3eaac113a0c97e767af..254fd8285b053c98b1e640b3df4886b72aeb152d 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -127,8 +127,9 @@ static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 /*
  * Message status
  */
-static bool PqCommBusy;
-static bool DoingCopyOut;
+static bool PqCommBusy;			/* busy sending data to the client */
+static bool PqCommReadingMsg;	/* in the middle of reading a message */
+static bool DoingCopyOut;		/* in old-protocol COPY OUT processing */
 
 
 /* Internal functions */
@@ -177,6 +178,7 @@ pq_init(void)
 	PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
 	PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
 	PqCommBusy = false;
+	PqCommReadingMsg = false;
 	DoingCopyOut = false;
 	on_proc_exit(socket_close, 0);
 }
@@ -916,6 +918,8 @@ pq_recvbuf(void)
 int
 pq_getbyte(void)
 {
+	Assert(PqCommReadingMsg);
+
 	while (PqRecvPointer >= PqRecvLength)
 	{
 		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
@@ -954,6 +958,8 @@ pq_getbyte_if_available(unsigned char *c)
 {
 	int			r;
 
+	Assert(PqCommReadingMsg);
+
 	if (PqRecvPointer < PqRecvLength)
 	{
 		*c = PqRecvBuffer[PqRecvPointer++];
@@ -1006,6 +1012,8 @@ pq_getbytes(char *s, size_t len)
 {
 	size_t		amount;
 
+	Assert(PqCommReadingMsg);
+
 	while (len > 0)
 	{
 		while (PqRecvPointer >= PqRecvLength)
@@ -1038,6 +1046,8 @@ pq_discardbytes(size_t len)
 {
 	size_t		amount;
 
+	Assert(PqCommReadingMsg);
+
 	while (len > 0)
 	{
 		while (PqRecvPointer >= PqRecvLength)
@@ -1074,6 +1084,8 @@ pq_getstring(StringInfo s)
 {
 	int			i;
 
+	Assert(PqCommReadingMsg);
+
 	resetStringInfo(s);
 
 	/* Read until we get the terminating '\0' */
@@ -1105,6 +1117,58 @@ pq_getstring(StringInfo s)
 }
 
 
+/* --------------------------------
+ *		pq_startmsgread	- begin reading a message from the client.
+ *
+ *		This must be called before any of the pq_get* functions.
+ * --------------------------------
+ */
+void
+pq_startmsgread(void)
+{
+	/*
+	 * There shouldn't be a read active already, but let's check just to be
+	 * sure.
+	 */
+	if (PqCommReadingMsg)
+		ereport(FATAL,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("terminating connection because protocol sync was lost")));
+
+	PqCommReadingMsg = true;
+}
+
+
+/* --------------------------------
+ *		pq_endmsgread	- finish reading message.
+ *
+ *		This must be called after reading a V2 protocol message with
+ *		pq_getstring() and friends, to indicate that we have read the whole
+ *		message. In V3 protocol, pq_getmessage() does this implicitly.
+ * --------------------------------
+ */
+void
+pq_endmsgread(void)
+{
+	Assert(PqCommReadingMsg);
+
+	PqCommReadingMsg = false;
+}
+
+/* --------------------------------
+ *		pq_is_reading_msg - are we currently reading a message?
+ *
+ * This is used in error recovery at the outer idle loop to detect if we have
+ * lost protocol sync, and need to terminate the connection. pq_startmsgread()
+ * will check for that too, but it's nicer to detect it earlier.
+ * --------------------------------
+ */
+bool
+pq_is_reading_msg(void)
+{
+	return PqCommReadingMsg;
+}
+
 /* --------------------------------
  *		pq_getmessage	- get a message with length word from connection
  *
@@ -1126,6 +1190,8 @@ pq_getmessage(StringInfo s, int maxlen)
 {
 	int32		len;
 
+	Assert(PqCommReadingMsg);
+
 	resetStringInfo(s);
 
 	/* Read message length word */
@@ -1167,6 +1233,9 @@ pq_getmessage(StringInfo s, int maxlen)
 				ereport(COMMERROR,
 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
 						 errmsg("incomplete message from client")));
+
+			/* we discarded the rest of the message so we're back in sync. */
+			PqCommReadingMsg = false;
 			PG_RE_THROW();
 		}
 		PG_END_TRY();
@@ -1184,6 +1253,9 @@ pq_getmessage(StringInfo s, int maxlen)
 		s->data[len] = '\0';
 	}
 
+	/* finished reading the message. */
+	PqCommReadingMsg = false;
+
 	return 0;
 }
 
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index fe6316ecbe928143a09b28e51c5930af91d70c60..36b8267fa50d5c07a33494639e2f6c3af00d7b99 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1761,6 +1761,7 @@ ProcessStartupPacket(Port *port, bool SSLdone)
 	ProtocolVersion proto;
 	MemoryContext oldcontext;
 
+	pq_startmsgread();
 	if (pq_getbytes((char *) &len, 4) == EOF)
 	{
 		/*
@@ -1805,6 +1806,7 @@ ProcessStartupPacket(Port *port, bool SSLdone)
 				 errmsg("incomplete startup packet")));
 		return STATUS_ERROR;
 	}
+	pq_endmsgread();
 
 	/*
 	 * The first field is either a protocol version number or a special
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 05d2339b15093701f8bac2ed96560c1fb303cb3b..25fcbca39e9cd0de923065cd53ee905297f5b72b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1357,6 +1357,7 @@ ProcessRepliesIfAny(void)
 
 	for (;;)
 	{
+		pq_startmsgread();
 		r = pq_getbyte_if_available(&firstchar);
 		if (r < 0)
 		{
@@ -1369,9 +1370,20 @@ ProcessRepliesIfAny(void)
 		if (r == 0)
 		{
 			/* no data available without blocking */
+			pq_endmsgread();
 			break;
 		}
 
+		/* Read the message contents */
+		resetStringInfo(&reply_message);
+		if (pq_getmessage(&reply_message, 0))
+		{
+			ereport(COMMERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("unexpected EOF on standby connection")));
+			proc_exit(0);
+		}
+
 		/*
 		 * If we already received a CopyDone from the frontend, the frontend
 		 * should not send us anything until we've closed our end of the COPY.
@@ -1407,16 +1419,6 @@ ProcessRepliesIfAny(void)
 					streamingDoneSending = true;
 				}
 
-				/* consume the CopyData message */
-				resetStringInfo(&reply_message);
-				if (pq_getmessage(&reply_message, 0))
-				{
-					ereport(COMMERROR,
-							(errcode(ERRCODE_PROTOCOL_VIOLATION),
-							 errmsg("unexpected EOF on standby connection")));
-					proc_exit(0);
-				}
-
 				streamingDoneReceiving = true;
 				received = true;
 				break;
@@ -1453,19 +1455,6 @@ ProcessStandbyMessage(void)
 {
 	char		msgtype;
 
-	resetStringInfo(&reply_message);
-
-	/*
-	 * Read the message contents.
-	 */
-	if (pq_getmessage(&reply_message, 0))
-	{
-		ereport(COMMERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg("unexpected EOF on standby connection")));
-		proc_exit(0);
-	}
-
 	/*
 	 * Check message type from the first byte.
 	 */
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 65e8afe457fa9cc120401b2bad48f4d78b37e584..24636223b1dda1253babd9bdeb14d40b69ce9bea 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -655,11 +655,16 @@ LockErrorCleanup(void)
 	LWLock	   *partitionLock;
 	DisableTimeoutParams timeouts[2];
 
+	HOLD_INTERRUPTS();
+
 	AbortStrongLockAcquire();
 
 	/* Nothing to do if we weren't waiting for a lock */
 	if (lockAwaited == NULL)
+	{
+		RESUME_INTERRUPTS();
 		return;
+	}
 
 	/*
 	 * Turn off the deadlock and lock timeout timers, if they are still
@@ -709,6 +714,8 @@ LockErrorCleanup(void)
 	 * wakeup signal isn't harmful, and it seems not worth expending cycles to
 	 * get rid of a signal that most likely isn't there.
 	 */
+
+	RESUME_INTERRUPTS();
 }
 
 
diff --git a/src/backend/tcop/fastpath.c b/src/backend/tcop/fastpath.c
index 042956096e49a1087b6be72f1ad63f36ccbf6a04..ce3b9ebdca36152aa493a0e01a11ec5224f50805 100644
--- a/src/backend/tcop/fastpath.c
+++ b/src/backend/tcop/fastpath.c
@@ -75,7 +75,7 @@ static int16 parse_fcall_arguments_20(StringInfo msgBuf, struct fp_info * fip,
  * The caller should already have initialized buf to empty.
  * ----------------
  */
-static int
+int
 GetOldFunctionMessage(StringInfo buf)
 {
 	int32		ibuf;
@@ -280,33 +280,6 @@ HandleFunctionRequest(StringInfo msgBuf)
 	bool		was_logged = false;
 	char		msec_str[32];
 
-	/*
-	 * Read message contents if not already done.
-	 */
-	if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)
-	{
-		if (GetOldFunctionMessage(msgBuf))
-		{
-			if (IsTransactionState())
-				ereport(COMMERROR,
-						(errcode(ERRCODE_CONNECTION_FAILURE),
-						 errmsg("unexpected EOF on client connection with an open transaction")));
-			else
-			{
-				/*
-				 * Can't send DEBUG log messages to client at this point.
-				 * Since we're disconnecting right away, we don't need to
-				 * restore whereToSendOutput.
-				 */
-				whereToSendOutput = DestNone;
-				ereport(DEBUG1,
-						(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
-						 errmsg("unexpected EOF on client connection")));
-			}
-			return EOF;
-		}
-	}
-
 	/*
 	 * Now that we've eaten the input message, check to see if we actually
 	 * want to do the function call or not.  It's now safe to ereport(); we
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 8f743536cffdc95171909031de5a76458a9da318..b82c3b333bd00d6e3b4b5cbbe8fa166f41dd28b4 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -331,6 +331,8 @@ SocketBackend(StringInfo inBuf)
 	/*
 	 * Get message type code from the frontend.
 	 */
+	HOLD_CANCEL_INTERRUPTS();
+	pq_startmsgread();
 	qtype = pq_getbyte();
 
 	if (qtype == EOF)			/* frontend disconnected */
@@ -379,7 +381,7 @@ SocketBackend(StringInfo inBuf)
 					{
 						/*
 						 * Can't send DEBUG log messages to client at this
-						 * point.Since we're disconnecting right away, we
+						 * point. Since we're disconnecting right away, we
 						 * don't need to restore whereToSendOutput.
 						 */
 						whereToSendOutput = DestNone;
@@ -393,8 +395,30 @@ SocketBackend(StringInfo inBuf)
 			break;
 
 		case 'F':				/* fastpath function call */
-			/* we let fastpath.c cope with old-style input of this */
 			doing_extended_query_message = false;
+			if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)
+			{
+				if (GetOldFunctionMessage(inBuf))
+				{
+					if (IsTransactionState())
+						ereport(COMMERROR,
+								(errcode(ERRCODE_CONNECTION_FAILURE),
+								 errmsg("unexpected EOF on client connection with an open transaction")));
+					else
+					{
+						/*
+						 * Can't send DEBUG log messages to client at this
+						 * point. Since we're disconnecting right away, we
+						 * don't need to restore whereToSendOutput.
+						 */
+						whereToSendOutput = DestNone;
+						ereport(DEBUG1,
+								(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+							 errmsg("unexpected EOF on client connection")));
+					}
+					return EOF;
+				}
+			}
 			break;
 
 		case 'X':				/* terminate */
@@ -462,6 +486,9 @@ SocketBackend(StringInfo inBuf)
 		if (pq_getmessage(inBuf, 0))
 			return EOF;			/* suitable message already logged */
 	}
+	else
+		pq_endmsgread();
+	RESUME_CANCEL_INTERRUPTS();
 
 	return qtype;
 }
@@ -506,7 +533,7 @@ prepare_for_client_read(void)
 		EnableNotifyInterrupt();
 		EnableCatchupInterrupt();
 
-		/* Allow cancel/die interrupts to be processed while waiting */
+		/* Allow die interrupts to be processed while waiting */
 		ImmediateInterruptOK = true;
 
 		/* And don't forget to detect one that already arrived */
@@ -2589,21 +2616,11 @@ die(SIGNAL_ARGS)
 		ProcDiePending = true;
 
 		/*
-		 * If it's safe to interrupt, and we're waiting for input or a lock,
-		 * service the interrupt immediately
+		 * If we're waiting for input or a lock so that it's safe to
+		 * interrupt, service the interrupt immediately
 		 */
-		if (ImmediateInterruptOK && InterruptHoldoffCount == 0 &&
-			CritSectionCount == 0)
-		{
-			/* bump holdoff count to make ProcessInterrupts() a no-op */
-			/* until we are done getting ready for it */
-			InterruptHoldoffCount++;
-			LockErrorCleanup(); /* prevent CheckDeadLock from running */
-			DisableNotifyInterrupt();
-			DisableCatchupInterrupt();
-			InterruptHoldoffCount--;
+		if (ImmediateInterruptOK)
 			ProcessInterrupts();
-		}
 	}
 
 	/* If we're still here, waken anything waiting on the process latch */
@@ -2630,21 +2647,11 @@ StatementCancelHandler(SIGNAL_ARGS)
 		QueryCancelPending = true;
 
 		/*
-		 * If it's safe to interrupt, and we're waiting for input or a lock,
-		 * service the interrupt immediately
+		 * If we're waiting for input or a lock so that it's safe to
+		 * interrupt, service the interrupt immediately
 		 */
-		if (ImmediateInterruptOK && InterruptHoldoffCount == 0 &&
-			CritSectionCount == 0)
-		{
-			/* bump holdoff count to make ProcessInterrupts() a no-op */
-			/* until we are done getting ready for it */
-			InterruptHoldoffCount++;
-			LockErrorCleanup(); /* prevent CheckDeadLock from running */
-			DisableNotifyInterrupt();
-			DisableCatchupInterrupt();
-			InterruptHoldoffCount--;
+		if (ImmediateInterruptOK)
 			ProcessInterrupts();
-		}
 	}
 
 	/* If we're still here, waken anything waiting on the process latch */
@@ -2787,21 +2794,11 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 			RecoveryConflictRetryable = false;
 
 		/*
-		 * If it's safe to interrupt, and we're waiting for input or a lock,
-		 * service the interrupt immediately
+		 * If we're waiting for input or a lock so that it's safe to
+		 * interrupt, service the interrupt immediately.
 		 */
-		if (ImmediateInterruptOK && InterruptHoldoffCount == 0 &&
-			CritSectionCount == 0)
-		{
-			/* bump holdoff count to make ProcessInterrupts() a no-op */
-			/* until we are done getting ready for it */
-			InterruptHoldoffCount++;
-			LockErrorCleanup(); /* prevent CheckDeadLock from running */
-			DisableNotifyInterrupt();
-			DisableCatchupInterrupt();
-			InterruptHoldoffCount--;
+		if (ImmediateInterruptOK)
 			ProcessInterrupts();
-		}
 	}
 
 	/*
@@ -2826,15 +2823,17 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 void
 ProcessInterrupts(void)
 {
-	/* OK to accept interrupt now? */
+	/* OK to accept any interrupts now? */
 	if (InterruptHoldoffCount != 0 || CritSectionCount != 0)
 		return;
 	InterruptPending = false;
+
 	if (ProcDiePending)
 	{
 		ProcDiePending = false;
 		QueryCancelPending = false;		/* ProcDie trumps QueryCancel */
 		ImmediateInterruptOK = false;	/* not idle anymore */
+		LockErrorCleanup();
 		DisableNotifyInterrupt();
 		DisableCatchupInterrupt();
 		/* As in quickdie, don't risk sending to client during auth */
@@ -2871,6 +2870,7 @@ ProcessInterrupts(void)
 	{
 		QueryCancelPending = false;		/* lost connection trumps QueryCancel */
 		ImmediateInterruptOK = false;	/* not idle anymore */
+		LockErrorCleanup();
 		DisableNotifyInterrupt();
 		DisableCatchupInterrupt();
 		/* don't send to client, we already know the connection to be dead. */
@@ -2879,12 +2879,53 @@ ProcessInterrupts(void)
 				(errcode(ERRCODE_CONNECTION_FAILURE),
 				 errmsg("connection to client lost")));
 	}
+
+	/*
+	 * If a recovery conflict happens while we are waiting for input from the
+	 * client, the client is presumably just sitting idle in a transaction,
+	 * preventing recovery from making progress.  Terminate the connection to
+	 * dislodge it.
+	 */
+	if (RecoveryConflictPending && DoingCommandRead)
+	{
+		QueryCancelPending = false;			/* this trumps QueryCancel */
+		ImmediateInterruptOK = false;		/* not idle anymore */
+		RecoveryConflictPending = false;
+		LockErrorCleanup();
+		DisableNotifyInterrupt();
+		DisableCatchupInterrupt();
+		pgstat_report_recovery_conflict(RecoveryConflictReason);
+		ereport(FATAL,
+				(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+				 errmsg("terminating connection due to conflict with recovery"),
+				 errdetail_recovery_conflict(),
+				 errhint("In a moment you should be able to reconnect to the"
+						 " database and repeat your command.")));
+	}
+
 	if (QueryCancelPending)
 	{
+		/*
+		 * Don't allow query cancel interrupts while reading input from the
+		 * client, because we might lose sync in the FE/BE protocol.  (Die
+		 * interrupts are OK, because we won't read any further messages from
+		 * the client in that case.)
+		 */
+		if (QueryCancelHoldoffCount != 0)
+		{
+			/*
+			 * Re-arm InterruptPending so that we process the cancel request
+			 * as soon as we're done reading the message.
+			 */
+			InterruptPending = true;
+			return;
+		}
+
 		QueryCancelPending = false;
 		if (ClientAuthInProgress)
 		{
 			ImmediateInterruptOK = false;		/* not idle anymore */
+			LockErrorCleanup();
 			DisableNotifyInterrupt();
 			DisableCatchupInterrupt();
 			/* As in quickdie, don't risk sending to client during auth */
@@ -2903,6 +2944,7 @@ ProcessInterrupts(void)
 		{
 			ImmediateInterruptOK = false;		/* not idle anymore */
 			(void) get_timeout_indicator(STATEMENT_TIMEOUT, true);
+			LockErrorCleanup();
 			DisableNotifyInterrupt();
 			DisableCatchupInterrupt();
 			ereport(ERROR,
@@ -2912,6 +2954,7 @@ ProcessInterrupts(void)
 		if (get_timeout_indicator(STATEMENT_TIMEOUT, true))
 		{
 			ImmediateInterruptOK = false;		/* not idle anymore */
+			LockErrorCleanup();
 			DisableNotifyInterrupt();
 			DisableCatchupInterrupt();
 			ereport(ERROR,
@@ -2921,6 +2964,7 @@ ProcessInterrupts(void)
 		if (IsAutoVacuumWorkerProcess())
 		{
 			ImmediateInterruptOK = false;		/* not idle anymore */
+			LockErrorCleanup();
 			DisableNotifyInterrupt();
 			DisableCatchupInterrupt();
 			ereport(ERROR,
@@ -2931,21 +2975,14 @@ ProcessInterrupts(void)
 		{
 			ImmediateInterruptOK = false;		/* not idle anymore */
 			RecoveryConflictPending = false;
+			LockErrorCleanup();
 			DisableNotifyInterrupt();
 			DisableCatchupInterrupt();
 			pgstat_report_recovery_conflict(RecoveryConflictReason);
-			if (DoingCommandRead)
-				ereport(FATAL,
-						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-						 errmsg("terminating connection due to conflict with recovery"),
-						 errdetail_recovery_conflict(),
-				 errhint("In a moment you should be able to reconnect to the"
-						 " database and repeat your command.")));
-			else
-				ereport(ERROR,
-						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+			ereport(ERROR,
+					(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 				 errmsg("canceling statement due to conflict with recovery"),
-						 errdetail_recovery_conflict()));
+					 errdetail_recovery_conflict()));
 		}
 
 		/*
@@ -2956,6 +2993,7 @@ ProcessInterrupts(void)
 		if (!DoingCommandRead)
 		{
 			ImmediateInterruptOK = false;		/* not idle anymore */
+			LockErrorCleanup();
 			DisableNotifyInterrupt();
 			DisableCatchupInterrupt();
 			ereport(ERROR,
@@ -3862,6 +3900,19 @@ PostgresMain(int argc, char *argv[],
 		/* We don't have a transaction command open anymore */
 		xact_started = false;
 
+		/*
+		 * If an error occurred while we were reading a message from the
+		 * client, we have potentially lost track of where the previous
+		 * message ends and the next one begins.  Even though we have
+		 * otherwise recovered from the error, we cannot safely read any more
+		 * messages from the client, so there isn't much we can do with the
+		 * connection anymore.
+		 */
+		if (pq_is_reading_msg())
+			ereport(FATAL,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("terminating connection because protocol sync was lost")));
+
 		/* Now we can allow interrupts again */
 		RESUME_INTERRUPTS();
 	}
@@ -3946,7 +3997,14 @@ PostgresMain(int argc, char *argv[],
 
 		/*
 		 * (4) disable async signal conditions again.
+		 *
+		 * Query cancel is supposed to be a no-op when there is no query in
+		 * progress, so if a query cancel arrived while we were idle, just
+		 * reset QueryCancelPending. ProcessInterrupts() has that effect when
+		 * it's called when DoingCommandRead is set, so check for interrupts
+		 * before resetting DoingCommandRead.
 		 */
+		CHECK_FOR_INTERRUPTS();
 		DoingCommandRead = false;
 
 		/*
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 310c5bbffa05f87f65263b6be58abd214f7b3525..0f7aa1993a4fed9f6100eae51f596b1287fefa34 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -469,6 +469,7 @@ errfinish(int dummy,...)
 		 * while doing error cleanup.
 		 */
 		InterruptHoldoffCount = 0;
+		QueryCancelHoldoffCount = 0;
 
 		CritSectionCount = 0;	/* should be unnecessary, but... */
 
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index c35867bcfe972ba116d5e54a315337326910cab4..8cf2ead392591e3f0cbad5de474ce514efb975a0 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -32,6 +32,7 @@ volatile bool ProcDiePending = false;
 volatile bool ClientConnectionLost = false;
 volatile bool ImmediateInterruptOK = false;
 volatile uint32 InterruptHoldoffCount = 0;
+volatile uint32 QueryCancelHoldoffCount = 0;
 volatile uint32 CritSectionCount = 0;
 
 int			MyProcPid;
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 50dd1f081af4325a4329b94fb23c142b03875816..af4ba2ab07cd01f274240d4e59286beecae596c4 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -78,6 +78,9 @@ extern void TouchSocketFiles(void);
 extern void pq_init(void);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
+extern void pq_startmsgread(void);
+extern void pq_endmsgread(void);
+extern bool pq_is_reading_msg(void);
 extern int	pq_getmessage(StringInfo s, int maxlen);
 extern int	pq_getbyte(void);
 extern int	pq_peekbyte(void);
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 6e33a1721223c8b46fbc5a99258667290fddd403..6c68da5f64fb98a5e2b2205b53f16ca6b94fdf15 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -52,6 +52,10 @@
  * will be held off until CHECK_FOR_INTERRUPTS() is done outside any
  * HOLD_INTERRUPTS() ... RESUME_INTERRUPTS() section.
  *
+ * There is also a mechanism to prevent query cancel interrupts, while still
+ * allowing die interrupts: HOLD_CANCEL_INTERRUPTS() and
+ * RESUME_CANCEL_INTERRUPTS().
+ *
  * Special mechanisms are used to let an interrupt be accepted when we are
  * waiting for a lock or when we are waiting for command input (but, of
  * course, only if the interrupt holdoff counter is zero).  See the
@@ -82,6 +86,7 @@ extern volatile bool ClientConnectionLost;
 /* these are marked volatile because they are examined by signal handlers: */
 extern PGDLLIMPORT volatile bool ImmediateInterruptOK;
 extern PGDLLIMPORT volatile uint32 InterruptHoldoffCount;
+extern PGDLLIMPORT volatile uint32 QueryCancelHoldoffCount;
 extern PGDLLIMPORT volatile uint32 CritSectionCount;
 
 /* in tcop/postgres.c */
@@ -114,6 +119,14 @@ do { \
 	InterruptHoldoffCount--; \
 } while(0)
 
+#define HOLD_CANCEL_INTERRUPTS()  (QueryCancelHoldoffCount++)
+
+#define RESUME_CANCEL_INTERRUPTS() \
+do { \
+	Assert(QueryCancelHoldoffCount > 0); \
+	QueryCancelHoldoffCount--; \
+} while(0)
+
 #define START_CRIT_SECTION()  (CritSectionCount++)
 
 #define END_CRIT_SECTION() \
diff --git a/src/include/tcop/fastpath.h b/src/include/tcop/fastpath.h
index 6286c0a687f7fedcf9dcabe94ef1c0dffd56775b..47028cb113ce14eeff550afddbffb02f517803d9 100644
--- a/src/include/tcop/fastpath.h
+++ b/src/include/tcop/fastpath.h
@@ -15,6 +15,7 @@
 
 #include "lib/stringinfo.h"
 
+extern int GetOldFunctionMessage(StringInfo buf);
 extern int	HandleFunctionRequest(StringInfo msgBuf);
 
 #endif   /* FASTPATH_H */