diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index bcaebc060290c00413e955fdb686bdf77063420d..d34c5e8d53e5a9354fc859e4489732b0dff00159 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -395,8 +395,8 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
 		ResolveRecoveryConflictWithVirtualXIDs(backends,
 											 PROCSIG_RECOVERY_CONFLICT_LOCK);
 
-		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
-			!= LOCKACQUIRE_NOT_AVAIL)
+		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true,
+								false, NULL) != LOCKACQUIRE_NOT_AVAIL)
 			lock_acquired = true;
 	}
 }
@@ -631,8 +631,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 	 */
 	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
 
-	if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
-		== LOCKACQUIRE_NOT_AVAIL)
+	if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true,
+							false, NULL) == LOCKACQUIRE_NOT_AVAIL)
 		ResolveRecoveryConflictWithLock(newlock->dbOid, newlock->relOid);
 }
 
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index a6bbb7ecf52b0777e628aed4653340025e3fc9e7..13640f52871e9b5df5129405776a86baf0a632a4 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -72,11 +72,12 @@ void
 LockRelationOid(Oid relid, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SetLocktagRelationOid(&tag, relid);
 
-	res = LockAcquire(&tag, lockmode, false, false);
+	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
 
 	/*
 	 * Now that we have the lock, check for invalidation messages, so that we
@@ -87,9 +88,18 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
 	 * relcache entry in an undesirable way.  (In the case where our own xact
 	 * modifies the rel, the relcache update happens via
 	 * CommandCounterIncrement, not here.)
+	 *
+	 * However, in corner cases where code acts on tables (usually catalogs)
+	 * recursively, we might get here while still processing invalidation
+	 * messages in some outer execution of this function or a sibling.  The
+	 * "cleared" status of the lock tells us whether we really are done
+	 * absorbing relevant inval messages.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 }
 
 /*
@@ -105,11 +115,12 @@ bool
 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SetLocktagRelationOid(&tag, relid);
 
-	res = LockAcquire(&tag, lockmode, false, true);
+	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
 
 	if (res == LOCKACQUIRE_NOT_AVAIL)
 		return false;
@@ -118,8 +129,11 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 	 * Now that we have the lock, check for invalidation messages; see notes
 	 * in LockRelationOid.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 
 	return true;
 }
@@ -166,20 +180,24 @@ void
 LockRelation(Relation relation, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SET_LOCKTAG_RELATION(tag,
 						 relation->rd_lockInfo.lockRelId.dbId,
 						 relation->rd_lockInfo.lockRelId.relId);
 
-	res = LockAcquire(&tag, lockmode, false, false);
+	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
 
 	/*
 	 * Now that we have the lock, check for invalidation messages; see notes
 	 * in LockRelationOid.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 }
 
 /*
@@ -193,13 +211,14 @@ bool
 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SET_LOCKTAG_RELATION(tag,
 						 relation->rd_lockInfo.lockRelId.dbId,
 						 relation->rd_lockInfo.lockRelId.relId);
 
-	res = LockAcquire(&tag, lockmode, false, true);
+	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
 
 	if (res == LOCKACQUIRE_NOT_AVAIL)
 		return false;
@@ -208,8 +227,11 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
 	 * Now that we have the lock, check for invalidation messages; see notes
 	 * in LockRelationOid.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 
 	return true;
 }
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 76d6eea55dd1c71a470fe41604e5a23384b48dbe..f56056254114fee3c1422df164433930bedbc8e6 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -658,6 +658,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
  *		LOCKACQUIRE_NOT_AVAIL		lock not available, and dontWait=true
  *		LOCKACQUIRE_OK				lock successfully acquired
  *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
+ *		LOCKACQUIRE_ALREADY_CLEAR	incremented count for lock already clear
  *
  * In the normal case where dontWait=false and the caller doesn't need to
  * distinguish a freshly acquired lock from one already taken earlier in
@@ -674,7 +675,8 @@ LockAcquire(const LOCKTAG *locktag,
 			bool sessionLock,
 			bool dontWait)
 {
-	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
+	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
+							   true, NULL);
 }
 
 /*
@@ -685,13 +687,17 @@ LockAcquire(const LOCKTAG *locktag,
  * caller to note that the lock table is full and then begin taking
  * extreme action to reduce the number of other lock holders before
  * retrying the action.
+ *
+ * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
+ * table entry if a lock is successfully acquired, or NULL if not.
  */
 LockAcquireResult
 LockAcquireExtended(const LOCKTAG *locktag,
 					LOCKMODE lockmode,
 					bool sessionLock,
 					bool dontWait,
-					bool reportMemoryError)
+					bool reportMemoryError,
+					LOCALLOCK **locallockp)
 {
 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
 	LockMethod	lockMethodTable;
@@ -758,6 +764,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
 		locallock->numLockOwners = 0;
 		locallock->maxLockOwners = 8;
 		locallock->holdsStrongLockCount = FALSE;
+		locallock->lockCleared = false;
 		locallock->lockOwners = NULL;	/* in case next line fails */
 		locallock->lockOwners = (LOCALLOCKOWNER *)
 			MemoryContextAlloc(TopMemoryContext,
@@ -778,13 +785,22 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	}
 	hashcode = locallock->hashcode;
 
+	if (locallockp)
+		*locallockp = locallock;
+
 	/*
 	 * If we already hold the lock, we can just increase the count locally.
+	 *
+	 * If lockCleared is already set, caller need not worry about absorbing
+	 * sinval messages related to the lock's object.
 	 */
 	if (locallock->nLocks > 0)
 	{
 		GrantLockLocal(locallock, owner);
-		return LOCKACQUIRE_ALREADY_HELD;
+		if (locallock->lockCleared)
+			return LOCKACQUIRE_ALREADY_CLEAR;
+		else
+			return LOCKACQUIRE_ALREADY_HELD;
 	}
 
 	/*
@@ -866,6 +882,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
 										   hashcode))
 		{
 			AbortStrongLockAcquire();
+			if (locallock->nLocks == 0)
+				RemoveLocalLock(locallock);
+			if (locallockp)
+				*locallockp = NULL;
 			if (reportMemoryError)
 				ereport(ERROR,
 						(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -900,6 +920,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	{
 		AbortStrongLockAcquire();
 		LWLockRelease(partitionLock);
+		if (locallock->nLocks == 0)
+			RemoveLocalLock(locallock);
+		if (locallockp)
+			*locallockp = NULL;
 		if (reportMemoryError)
 			ereport(ERROR,
 					(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -965,6 +989,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
 			LWLockRelease(partitionLock);
 			if (locallock->nLocks == 0)
 				RemoveLocalLock(locallock);
+			if (locallockp)
+				*locallockp = NULL;
 			return LOCKACQUIRE_NOT_AVAIL;
 		}
 
@@ -1567,6 +1593,20 @@ GrantAwaitedLock(void)
 	GrantLockLocal(awaitedLock, awaitedOwner);
 }
 
+/*
+ * MarkLockClear -- mark an acquired lock as "clear"
+ *
+ * This means that we know we have absorbed all sinval messages that other
+ * sessions generated before we acquired this lock, and so we can confidently
+ * assume we know about any catalog changes protected by this lock.
+ */
+void
+MarkLockClear(LOCALLOCK *locallock)
+{
+	Assert(locallock->nLocks > 0);
+	locallock->lockCleared = true;
+}
+
 /*
  * WaitOnLock -- wait to acquire a lock
  *
@@ -1834,6 +1874,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	if (locallock->nLocks > 0)
 		return TRUE;
 
+	/*
+	 * At this point we can no longer suppose we are clear of invalidation
+	 * messages related to this lock.  Although we'll delete the LOCALLOCK
+	 * object before any intentional return from this routine, it seems worth
+	 * the trouble to explicitly reset lockCleared right now, just in case
+	 * some error prevents us from deleting the LOCALLOCK.
+	 */
+	locallock->lockCleared = false;
+
 	/* Attempt fast release of any lock eligible for the fast path. */
 	if (EligibleForRelationFastPath(locktag, lockmode) &&
 		FastPathLocalUseCount > 0)
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index e22423cd6a6e3fa927a79a0c25721cf6bc545b4c..85dd5d761ebd6cf533217ba10ba8c93365f24843 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -425,6 +425,7 @@ typedef struct LOCALLOCK
 	int			numLockOwners;	/* # of relevant ResourceOwners */
 	int			maxLockOwners;	/* allocated size of array */
 	bool		holdsStrongLockCount;	/* bumped FastPathStrongRelationLocks */
+	bool		lockCleared;	/* we read all sinval msgs for lock */
 	LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
 } LOCALLOCK;
 
@@ -459,7 +460,8 @@ typedef enum
 {
 	LOCKACQUIRE_NOT_AVAIL,		/* lock not available, and dontWait=true */
 	LOCKACQUIRE_OK,				/* lock successfully acquired */
-	LOCKACQUIRE_ALREADY_HELD	/* incremented count for lock already held */
+	LOCKACQUIRE_ALREADY_HELD,	/* incremented count for lock already held */
+	LOCKACQUIRE_ALREADY_CLEAR	/* incremented count for lock already clear */
 } LockAcquireResult;
 
 /* Deadlock states identified by DeadLockCheck() */
@@ -501,8 +503,10 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
 					LOCKMODE lockmode,
 					bool sessionLock,
 					bool dontWait,
-					bool report_memory_error);
+					bool reportMemoryError,
+					LOCALLOCK **locallockp);
 extern void AbortStrongLockAcquire(void);
+extern void MarkLockClear(LOCALLOCK *locallock);
 extern bool LockRelease(const LOCKTAG *locktag,
 			LOCKMODE lockmode, bool sessionLock);
 extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);