diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 503684936b3b7d186b7c8e167c1b50db2aa95b17..fc18b27427b646a3528aa61c428c416b1f60e225 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -48,6 +48,7 @@
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "portability/instr_time.h"
 #include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
@@ -69,6 +70,17 @@
 #define REL_TRUNCATE_MINIMUM	1000
 #define REL_TRUNCATE_FRACTION	16
 
+/*
+ * Timing parameters for truncate locking heuristics.
+ *
+ * These were not exposed as user tunable GUC values because it didn't seem
+ * that the potential for improvement was great enough to merit the cost of
+ * supporting them.
+ */
+#define AUTOVACUUM_TRUNCATE_LOCK_CHECK_INTERVAL		20	/* ms */
+#define AUTOVACUUM_TRUNCATE_LOCK_WAIT_INTERVAL		50	/* ms */
+#define AUTOVACUUM_TRUNCATE_LOCK_TIMEOUT			5000		/* ms */
+
 /*
  * Guesstimation of number of dead tuples per page.  This is used to
  * provide an upper limit to memory allocated when vacuuming small
@@ -103,6 +115,7 @@ typedef struct LVRelStats
 	ItemPointer dead_tuples;	/* array of ItemPointerData */
 	int			num_index_scans;
 	TransactionId latestRemovedXid;
+	bool		lock_waiter_detected;
 } LVRelStats;
 
 
@@ -193,6 +206,8 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
 	vacrelstats->old_rel_pages = onerel->rd_rel->relpages;
 	vacrelstats->old_rel_tuples = onerel->rd_rel->reltuples;
 	vacrelstats->num_index_scans = 0;
+	vacrelstats->pages_removed = 0;
+	vacrelstats->lock_waiter_detected = false;
 
 	/* Open all indexes of the relation */
 	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
@@ -259,10 +274,17 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
 						vacrelstats->hasindex,
 						new_frozen_xid);
 
-	/* report results to the stats collector, too */
-	pgstat_report_vacuum(RelationGetRelid(onerel),
-						 onerel->rd_rel->relisshared,
-						 new_rel_tuples);
+	/*
+	 * Report results to the stats collector, too. An early terminated
+	 * lazy_truncate_heap attempt suppresses the message and also cancels the
+	 * execution of ANALYZE, if that was ordered.
+	 */
+	if (!vacrelstats->lock_waiter_detected)
+		pgstat_report_vacuum(RelationGetRelid(onerel),
+							 onerel->rd_rel->relisshared,
+							 new_rel_tuples);
+	else
+		vacstmt->options &= ~VACOPT_ANALYZE;
 
 	/* and log the action if appropriate */
 	if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
@@ -1257,80 +1279,124 @@ lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats)
 	BlockNumber old_rel_pages = vacrelstats->rel_pages;
 	BlockNumber new_rel_pages;
 	PGRUsage	ru0;
+	int			lock_retry;
 
 	pg_rusage_init(&ru0);
 
 	/*
-	 * We need full exclusive lock on the relation in order to do truncation.
-	 * If we can't get it, give up rather than waiting --- we don't want to
-	 * block other backends, and we don't want to deadlock (which is quite
-	 * possible considering we already hold a lower-grade lock).
-	 */
-	if (!ConditionalLockRelation(onerel, AccessExclusiveLock))
-		return;
-
-	/*
-	 * Now that we have exclusive lock, look to see if the rel has grown
-	 * whilst we were vacuuming with non-exclusive lock.  If so, give up; the
-	 * newly added pages presumably contain non-deletable tuples.
+	 * Loop until no more truncating can be done.
 	 */
-	new_rel_pages = RelationGetNumberOfBlocks(onerel);
-	if (new_rel_pages != old_rel_pages)
+	do
 	{
 		/*
-		 * Note: we intentionally don't update vacrelstats->rel_pages with the
-		 * new rel size here.  If we did, it would amount to assuming that the
-		 * new pages are empty, which is unlikely.	Leaving the numbers alone
-		 * amounts to assuming that the new pages have the same tuple density
-		 * as existing ones, which is less unlikely.
+		 * We need full exclusive lock on the relation in order to do
+		 * truncation. If we can't get it, give up rather than waiting --- we
+		 * don't want to block other backends, and we don't want to deadlock
+		 * (which is quite possible considering we already hold a lower-grade
+		 * lock).
 		 */
-		UnlockRelation(onerel, AccessExclusiveLock);
-		return;
-	}
+		vacrelstats->lock_waiter_detected = false;
+		lock_retry = 0;
+		while (true)
+		{
+			if (ConditionalLockRelation(onerel, AccessExclusiveLock))
+				break;
 
-	/*
-	 * Scan backwards from the end to verify that the end pages actually
-	 * contain no tuples.  This is *necessary*, not optional, because other
-	 * backends could have added tuples to these pages whilst we were
-	 * vacuuming.
-	 */
-	new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
+			/*
+			 * Check for interrupts while trying to (re-)acquire the exclusive
+			 * lock.
+			 */
+			CHECK_FOR_INTERRUPTS();
 
-	if (new_rel_pages >= old_rel_pages)
-	{
-		/* can't do anything after all */
-		UnlockRelation(onerel, AccessExclusiveLock);
-		return;
-	}
+			if (++lock_retry > (AUTOVACUUM_TRUNCATE_LOCK_TIMEOUT /
+								AUTOVACUUM_TRUNCATE_LOCK_WAIT_INTERVAL))
+			{
+				/*
+				 * We failed to establish the lock in the specified number of
+				 * retries. This means we give up truncating. Suppress the
+				 * ANALYZE step. Doing an ANALYZE at this point will reset the
+				 * dead_tuple_count in the stats collector, so we will not get
+				 * called by the autovacuum launcher again to do the truncate.
+				 */
+				vacrelstats->lock_waiter_detected = true;
+				ereport(LOG,
+						(errmsg("automatic vacuum of table \"%s.%s.%s\": "
+								"cannot (re)acquire exclusive "
+								"lock for truncate scan",
+								get_database_name(MyDatabaseId),
+							get_namespace_name(RelationGetNamespace(onerel)),
+								RelationGetRelationName(onerel))));
+				return;
+			}
 
-	/*
-	 * Okay to truncate.
-	 */
-	RelationTruncate(onerel, new_rel_pages);
+			pg_usleep(AUTOVACUUM_TRUNCATE_LOCK_WAIT_INTERVAL);
+		}
 
-	/*
-	 * We can release the exclusive lock as soon as we have truncated.	Other
-	 * backends can't safely access the relation until they have processed the
-	 * smgr invalidation that smgrtruncate sent out ... but that should happen
-	 * as part of standard invalidation processing once they acquire lock on
-	 * the relation.
-	 */
-	UnlockRelation(onerel, AccessExclusiveLock);
+		/*
+		 * Now that we have exclusive lock, look to see if the rel has grown
+		 * whilst we were vacuuming with non-exclusive lock.  If so, give up;
+		 * the newly added pages presumably contain non-deletable tuples.
+		 */
+		new_rel_pages = RelationGetNumberOfBlocks(onerel);
+		if (new_rel_pages != old_rel_pages)
+		{
+			/*
+			 * Note: we intentionally don't update vacrelstats->rel_pages with
+			 * the new rel size here.  If we did, it would amount to assuming
+			 * that the new pages are empty, which is unlikely. Leaving the
+			 * numbers alone amounts to assuming that the new pages have the
+			 * same tuple density as existing ones, which is less unlikely.
+			 */
+			UnlockRelation(onerel, AccessExclusiveLock);
+			return;
+		}
 
-	/*
-	 * Update statistics.  Here, it *is* correct to adjust rel_pages without
-	 * also touching reltuples, since the tuple count wasn't changed by the
-	 * truncation.
-	 */
-	vacrelstats->rel_pages = new_rel_pages;
-	vacrelstats->pages_removed = old_rel_pages - new_rel_pages;
+		/*
+		 * Scan backwards from the end to verify that the end pages actually
+		 * contain no tuples.  This is *necessary*, not optional, because
+		 * other backends could have added tuples to these pages whilst we
+		 * were vacuuming.
+		 */
+		new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
 
-	ereport(elevel,
-			(errmsg("\"%s\": truncated %u to %u pages",
-					RelationGetRelationName(onerel),
-					old_rel_pages, new_rel_pages),
-			 errdetail("%s.",
-					   pg_rusage_show(&ru0))));
+		if (new_rel_pages >= old_rel_pages)
+		{
+			/* can't do anything after all */
+			UnlockRelation(onerel, AccessExclusiveLock);
+			return;
+		}
+
+		/*
+		 * Okay to truncate.
+		 */
+		RelationTruncate(onerel, new_rel_pages);
+
+		/*
+		 * We can release the exclusive lock as soon as we have truncated.
+		 * Other backends can't safely access the relation until they have
+		 * processed the smgr invalidation that smgrtruncate sent out ... but
+		 * that should happen as part of standard invalidation processing once
+		 * they acquire lock on the relation.
+		 */
+		UnlockRelation(onerel, AccessExclusiveLock);
+
+		/*
+		 * Update statistics.  Here, it *is* correct to adjust rel_pages
+		 * without also touching reltuples, since the tuple count wasn't
+		 * changed by the truncation.
+		 */
+		vacrelstats->pages_removed += old_rel_pages - new_rel_pages;
+		vacrelstats->rel_pages = new_rel_pages;
+
+		ereport(elevel,
+				(errmsg("\"%s\": truncated %u to %u pages",
+						RelationGetRelationName(onerel),
+						old_rel_pages, new_rel_pages),
+				 errdetail("%s.",
+						   pg_rusage_show(&ru0))));
+		old_rel_pages = new_rel_pages;
+	} while (new_rel_pages > vacrelstats->nonempty_pages &&
+			 vacrelstats->lock_waiter_detected);
 }
 
 /*
@@ -1342,6 +1408,12 @@ static BlockNumber
 count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 {
 	BlockNumber blkno;
+	instr_time	starttime;
+	instr_time	currenttime;
+	instr_time	elapsed;
+
+	/* Initialize the starttime if we check for conflicting lock requests */
+	INSTR_TIME_SET_CURRENT(starttime);
 
 	/* Strange coding of loop control is needed because blkno is unsigned */
 	blkno = vacrelstats->rel_pages;
@@ -1353,6 +1425,36 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 					maxoff;
 		bool		hastup;
 
+		/*
+		 * Check if another process requests a lock on our relation. We are
+		 * holding an AccessExclusiveLock here, so they will be waiting. We
+		 * only do this in autovacuum_truncate_lock_check millisecond
+		 * intervals, and we only check if that interval has elapsed once
+		 * every 32 blocks to keep the number of system calls and actual
+		 * shared lock table lookups to a minimum.
+		 */
+		if ((blkno % 32) == 0)
+		{
+			INSTR_TIME_SET_CURRENT(currenttime);
+			elapsed = currenttime;
+			INSTR_TIME_SUBTRACT(elapsed, starttime);
+			if ((INSTR_TIME_GET_MICROSEC(elapsed) / 1000)
+				>= AUTOVACUUM_TRUNCATE_LOCK_CHECK_INTERVAL)
+			{
+				if (LockHasWaitersRelation(onerel, AccessExclusiveLock))
+				{
+					ereport(elevel,
+							(errmsg("\"%s\": suspending truncate "
+									"due to conflicting lock request",
+									RelationGetRelationName(onerel))));
+
+					vacrelstats->lock_waiter_detected = true;
+					return blkno;
+				}
+				starttime = currenttime;
+			}
+		}
+
 		/*
 		 * We don't insert a vacuum delay point here, because we have an
 		 * exclusive lock on the table which we want to hold for as short a
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index a7786d0231f9117fa273345a4c34877860dc5a63..e1fa74f9601a2c9958300ed08dad04fc47f71bf0 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -232,6 +232,24 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
 	LockRelease(&tag, lockmode, false);
 }
 
+/*
+ *		LockHasWaitersRelation
+ *
+ * This is a functiion to check if someone else is waiting on a
+ * lock, we are currently holding.
+ */
+bool
+LockHasWaitersRelation(Relation relation, LOCKMODE lockmode)
+{
+	LOCKTAG		tag;
+
+	SET_LOCKTAG_RELATION(tag,
+						 relation->rd_lockInfo.lockRelId.dbId,
+						 relation->rd_lockInfo.lockRelId.relId);
+
+	return LockHasWaiters(&tag, lockmode, false);
+}
+
 /*
  *		LockRelationIdForSession
  *
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 01834437467d6428e17f81af74fdbc0dbab96f93..ec4da20a0d4199c38ad3cec9b8b024610f761159 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -538,6 +538,98 @@ ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
 	return lockhash;
 }
 
+/*
+ * LockHasWaiters -- look up 'locktag' and check if releasing this
+ *		lock would wake up other processes waiting for it.
+ */
+bool
+LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
+{
+	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
+	LockMethod	lockMethodTable;
+	LOCALLOCKTAG localtag;
+	LOCALLOCK  *locallock;
+	LOCK	   *lock;
+	PROCLOCK   *proclock;
+	LWLockId	partitionLock;
+	bool		hasWaiters = false;
+
+	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
+		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
+	lockMethodTable = LockMethods[lockmethodid];
+	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
+		elog(ERROR, "unrecognized lock mode: %d", lockmode);
+
+#ifdef LOCK_DEBUG
+	if (LOCK_DEBUG_ENABLED(locktag))
+		elog(LOG, "LockHasWaiters: lock [%u,%u] %s",
+			 locktag->locktag_field1, locktag->locktag_field2,
+			 lockMethodTable->lockModeNames[lockmode]);
+#endif
+
+	/*
+	 * Find the LOCALLOCK entry for this lock and lockmode
+	 */
+	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
+	localtag.lock = *locktag;
+	localtag.mode = lockmode;
+
+	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
+										  (void *) &localtag,
+										  HASH_FIND, NULL);
+
+	/*
+	 * let the caller print its own error message, too. Do not ereport(ERROR).
+	 */
+	if (!locallock || locallock->nLocks <= 0)
+	{
+		elog(WARNING, "you don't own a lock of type %s",
+			 lockMethodTable->lockModeNames[lockmode]);
+		return false;
+	}
+
+	/*
+	 * Check the shared lock table.
+	 */
+	partitionLock = LockHashPartitionLock(locallock->hashcode);
+
+	LWLockAcquire(partitionLock, LW_SHARED);
+
+	/*
+	 * We don't need to re-find the lock or proclock, since we kept their
+	 * addresses in the locallock table, and they couldn't have been removed
+	 * while we were holding a lock on them.
+	 */
+	lock = locallock->lock;
+	LOCK_PRINT("LockHasWaiters: found", lock, lockmode);
+	proclock = locallock->proclock;
+	PROCLOCK_PRINT("LockHasWaiters: found", proclock);
+
+	/*
+	 * Double-check that we are actually holding a lock of the type we want to
+	 * release.
+	 */
+	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
+	{
+		PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
+		LWLockRelease(partitionLock);
+		elog(WARNING, "you don't own a lock of type %s",
+			 lockMethodTable->lockModeNames[lockmode]);
+		RemoveLocalLock(locallock);
+		return false;
+	}
+
+	/*
+	 * Do the checking.
+	 */
+	if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0)
+		hasWaiters = true;
+
+	LWLockRelease(partitionLock);
+
+	return hasWaiters;
+}
+
 
 /*
  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index de340c4973e435610e690f23890503f2652a3dbd..aa79edaaa48b6056bac9ef0254569284a9e5708b 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -31,6 +31,7 @@ extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode);
 extern void LockRelation(Relation relation, LOCKMODE lockmode);
 extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
 extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
+extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode);
 
 extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
 extern void UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index e01a5c504434bc83591519b0f6b171536575de41..4d0a2f65f564b46529f7d49b4ed09be4620b0c56 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -494,6 +494,8 @@ extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
 extern void LockReleaseSession(LOCKMETHODID lockmethodid);
 extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
 extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
+extern bool LockHasWaiters(const LOCKTAG *locktag,
+			   LOCKMODE lockmode, bool sessionLock);
 extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
 				 LOCKMODE lockmode);
 extern void AtPrepare_Locks(void);