From b4a0223d008d7c2c9824d846e22b664b2f09cf6e Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Thu, 20 Oct 2011 19:43:31 -0400
Subject: [PATCH] Simplify and improve ProcessStandbyHSFeedbackMessage logic.

There's no need to clamp the standby's xmin to be greater than
GetOldestXmin's result; if there were any such need this logic would be
hopelessly inadequate anyway, because it fails to account for
within-database versus cluster-wide values of GetOldestXmin.  So get rid of
that, and just rely on sanity-checking that the xmin is not wrapped around
relative to the nextXid counter.  Also, don't reset the walsender's xmin if
the current feedback xmin is indeed out of range; that just creates more
problems than we already had.  Lastly, don't bother to take the
ProcArrayLock; there's no need to do that to set xmin.

Also improve the comments about this in GetOldestXmin itself.
---
 src/backend/replication/walsender.c | 105 +++++++++++++---------------
 src/backend/storage/ipc/procarray.c |  50 ++++++++-----
 2 files changed, 79 insertions(+), 76 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c8fd165dcb3..dd2d6ee2184 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -642,76 +642,67 @@ static void
 ProcessStandbyHSFeedbackMessage(void)
 {
 	StandbyHSFeedbackMessage reply;
-	TransactionId newxmin = InvalidTransactionId;
+	TransactionId nextXid;
+	uint32		nextEpoch;
 
-	pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+	/* Decipher the reply message */
+	pq_copymsgbytes(&reply_message, (char *) &reply,
+					sizeof(StandbyHSFeedbackMessage));
 
 	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
 		 reply.xmin,
 		 reply.epoch);
 
+	/* Ignore invalid xmin (can't actually happen with current walreceiver) */
+	if (!TransactionIdIsNormal(reply.xmin))
+		return;
+
 	/*
-	 * Update the WalSender's proc xmin to allow it to be visible to
-	 * snapshots. This will hold back the removal of dead rows and thereby
-	 * prevent the generation of cleanup conflicts on the standby server.
+	 * Check that the provided xmin/epoch are sane, that is, not in the future
+	 * and not so far back as to be already wrapped around.  Ignore if not.
+	 *
+	 * Epoch of nextXid should be same as standby, or if the counter has
+	 * wrapped, then one greater than standby.
 	 */
-	if (TransactionIdIsValid(reply.xmin))
-	{
-		TransactionId nextXid;
-		uint32		nextEpoch;
-		bool		epochOK = false;
-
-		GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-		/*
-		 * Epoch of oldestXmin should be same as standby or if the counter has
-		 * wrapped, then one less than reply.
-		 */
-		if (reply.xmin <= nextXid)
-		{
-			if (reply.epoch == nextEpoch)
-				epochOK = true;
-		}
-		else
-		{
-			if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
-				epochOK = true;
-		}
-
-		/*
-		 * Feedback from standby must not go backwards, nor should it go
-		 * forwards further than our most recent xid.
-		 */
-		if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
-		{
-			if (!TransactionIdIsValid(MyProc->xmin))
-			{
-				TransactionId oldestXmin = GetOldestXmin(true, true);
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
 
-				if (TransactionIdPrecedes(oldestXmin, reply.xmin))
-					newxmin = reply.xmin;
-				else
-					newxmin = oldestXmin;
-			}
-			else
-			{
-				if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
-					newxmin = reply.xmin;
-				else
-					newxmin = MyProc->xmin;		/* stay the same */
-			}
-		}
+	if (reply.xmin <= nextXid)
+	{
+		if (reply.epoch != nextEpoch)
+			return;
 	}
+	else
+	{
+		if (reply.epoch + 1 != nextEpoch)
+			return;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+		return;					/* epoch OK, but it's wrapped around */
 
 	/*
-	 * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
+	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
+	 * the xmin will be taken into account by GetOldestXmin.  This will hold
+	 * back the removal of dead rows and thereby prevent the generation of
+	 * cleanup conflicts on the standby server.
+	 *
+	 * There is a small window for a race condition here: although we just
+	 * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+	 * advanced between our fetching it and applying the xmin below, perhaps
+	 * far enough to make reply.xmin wrap around.  In that case the xmin we
+	 * set here would be "in the future" and have no effect.  No point in
+	 * worrying about this since it's too late to save the desired data
+	 * anyway.  Assuming that the standby sends us an increasing sequence of
+	 * xmins, this could only happen during the first reply cycle, else our
+	 * own xmin would prevent nextXid from advancing so far.
+	 *
+	 * We don't bother taking the ProcArrayLock here.  Setting the xmin field
+	 * is assumed atomic, and there's no real need to prevent a concurrent
+	 * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
+	 * safe, and if we're moving it backwards, well, the data is at risk
+	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 */
-	if (MyProc->xmin != newxmin)
-	{
-		LWLockAcquire(ProcArrayLock, LW_SHARED);
-		MyProc->xmin = newxmin;
-		LWLockRelease(ProcArrayLock);
-	}
+	MyProc->xmin = reply.xmin;
 }
 
 /* Main loop of walsender process */
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 9489012a187..7d44a34d025 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -997,22 +997,32 @@ TransactionIdIsActive(TransactionId xid)
  * This is also used to determine where to truncate pg_subtrans.  allDbs
  * must be TRUE for that case, and ignoreVacuum FALSE.
  *
- * Note: it's possible for the calculated value to move backwards on repeated
- * calls. The calculated value is conservative, so that anything older is
- * definitely not considered as running by anyone anymore, but the exact
- * value calculated depends on a number of things. For example, if allDbs is
- * TRUE and there are no transactions running in the current database,
- * GetOldestXmin() returns latestCompletedXid. If a transaction begins after
- * that, its xmin will include in-progress transactions in other databases
- * that started earlier, so another call will return an lower value. The
- * return value is also adjusted with vacuum_defer_cleanup_age, so increasing
- * that setting on the fly is an easy way to have GetOldestXmin() move
- * backwards.
- *
  * Note: we include all currently running xids in the set of considered xids.
  * This ensures that if a just-started xact has not yet set its snapshot,
  * when it does set the snapshot it cannot set xmin less than what we compute.
  * See notes in src/backend/access/transam/README.
+ *
+ * Note: despite the above, it's possible for the calculated value to move
+ * backwards on repeated calls. The calculated value is conservative, so that
+ * anything older is definitely not considered as running by anyone anymore,
+ * but the exact value calculated depends on a number of things. For example,
+ * if allDbs is FALSE and there are no transactions running in the current
+ * database, GetOldestXmin() returns latestCompletedXid. If a transaction
+ * begins after that, its xmin will include in-progress transactions in other
+ * databases that started earlier, so another call will return a lower value.
+ * Nonetheless it is safe to vacuum a table in the current database with the
+ * first result.  There are also replication-related effects: a walsender
+ * process can set its xmin based on transactions that are no longer running
+ * in the master but are still being replayed on the standby, thus possibly
+ * making the GetOldestXmin reading go backwards.  In this case there is a
+ * possibility that we lose data that the standby would like to have, but
+ * there is little we can do about that --- data is only protected if the
+ * walsender runs continuously while queries are executed on the standby.
+ * (The Hot Standby code deals with such cases by failing standby queries
+ * that needed to access already-removed data, so there's no integrity bug.)
+ * The return value is also adjusted with vacuum_defer_cleanup_age, so
+ * increasing that setting on the fly is another easy way to make
+ * GetOldestXmin() move backwards, with no consequences for data integrity.
  */
 TransactionId
 GetOldestXmin(bool allDbs, bool ignoreVacuum)
@@ -1045,7 +1055,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
 
 		if (allDbs ||
 			proc->databaseId == MyDatabaseId ||
-			proc->databaseId == 0)		/* include WalSender */
+			proc->databaseId == 0)		/* always include WalSender */
 		{
 			/* Fetch xid just once - see GetNewTransactionId */
 			TransactionId xid = proc->xid;
@@ -1091,16 +1101,18 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
 		LWLockRelease(ProcArrayLock);
 
 		/*
-		 * Compute the cutoff XID, being careful not to generate a "permanent"
-		 * XID. We need do this only on the primary, never on standby.
+		 * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age,
+		 * being careful not to generate a "permanent" XID.
 		 *
 		 * vacuum_defer_cleanup_age provides some additional "slop" for the
 		 * benefit of hot standby queries on slave servers.  This is quick and
 		 * dirty, and perhaps not all that useful unless the master has a
-		 * predictable transaction rate, but it's what we've got.  Note that
-		 * we are assuming vacuum_defer_cleanup_age isn't large enough to
-		 * cause wraparound --- so guc.c should limit it to no more than the
-		 * xidStopLimit threshold in varsup.c.
+		 * predictable transaction rate, but it offers some protection when
+		 * there's no walsender connection.  Note that we are assuming
+		 * vacuum_defer_cleanup_age isn't large enough to cause wraparound ---
+		 * so guc.c should limit it to no more than the xidStopLimit threshold
+		 * in varsup.c.  Also note that we intentionally don't apply
+		 * vacuum_defer_cleanup_age on standby servers.
 		 */
 		result -= vacuum_defer_cleanup_age;
 		if (!TransactionIdIsNormal(result))
-- 
GitLab