diff --git a/contrib/bloom/blscan.c b/contrib/bloom/blscan.c
index ae937f667101951247390f2cecc839b4503fd07f..e75ed3d6136acf026c153e6e23fbf71b9d457c7c 100644
--- a/contrib/bloom/blscan.c
+++ b/contrib/bloom/blscan.c
@@ -138,7 +138,8 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 									blkno, RBM_NORMAL, bas);
 
 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
-		page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(buffer, scan->xs_snapshot, scan->indexRelation,
+							 BGP_TEST_FOR_OLD_SNAPSHOT);
 
 		if (!BloomPageIsDeleted(page))
 		{
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 2f04702b6e531ba51a911a56e6805ed41034757f..f9ba1487822e9f542961591ab2c0f466c8f8d239 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2041,6 +2041,42 @@ include_dir 'conf.d'
         </para>
        </listitem>
       </varlistentry>
+
+      <varlistentry id="guc-old-snapshot-threshold" xreflabel="old_snapshot_threshold">
+       <term><varname>old_snapshot_threshold</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>old_snapshot_threshold</> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the minimum time that a snapshot can be used without risk of a
+         <literal>snapshot too old</> error occurring when using the snapshot.
+         This parameter can only be set at server start.
+        </para>
+
+        <para>
+         Beyond the threshold, old data may be vacuumed away.  This can help
+         prevent bloat in the face of snapshots which remain in use for a
+         long time.  To prevent incorrect results due to cleanup of data which
+         would otherwise be visible to the snapshot, an error is generated
+         when the snapshot is older than this threshold and the snapshot is
+         used to read a page which has been modified since the snapshot was
+         built.
+        </para>
+
+        <para>
+         A value of <literal>-1</> disables this feature, and is the default.
+         Useful values for production work probably range from a small number
+         of hours to a few days.  The setting will be coerced to a granularity
+         of minutes, and small numbers (such as <literal>0</> or
+         <literal>1min</>) are only allowed because they may sometimes be
+         useful for testing.  While a setting as high as <literal>60d</> is
+         allowed, please note that in many workloads extreme bloat or
+         transaction ID wraparound may occur in much shorter time frames.
+        </para>
+       </listitem>
+      </varlistentry>
      </variablelist>
     </sect2>
    </sect1>
@@ -3051,6 +3087,10 @@ include_dir 'conf.d'
         You should also consider setting <varname>hot_standby_feedback</>
         on standby server(s) as an alternative to using this parameter.
        </para>
+       <para>
+        This does not prevent cleanup of dead rows which have reached the age
+        specified by <varname>old_snapshot_threshold</>.
+       </para>
       </listitem>
      </varlistentry>
 
@@ -3198,6 +3238,16 @@ include_dir 'conf.d'
         until it eventually reaches the primary.  Standbys make no other use
         of feedback they receive other than to pass upstream.
        </para>
+       <para>
+        This setting does not override the behavior of
+        <varname>old_snapshot_threshold</> on the primary; a snapshot on the
+        standby which exceeds the primary's age threshold can become invalid,
+        resulting in cancellation of transactions on the standby.  This is
+        because <varname>old_snapshot_threshold</> is intended to provide an
+        absolute limit on the time which dead rows can contribute to bloat,
+        which would otherwise be violated because of the configuration of a
+        standby.
+       </para>
       </listitem>
      </varlistentry>
 
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 6f6f1b1b4155e79bc0c0fd8708882b7b5a6b64d6..e64c94d35699aecc4f3b9d33517263e29c75cf84 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -135,7 +135,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
 	MemoryContext tupcxt = NULL;
 	MemoryContext oldcxt = NULL;
 
-	revmap = brinRevmapInitialize(idxRel, &pagesPerRange);
+	revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
 
 	for (;;)
 	{
@@ -152,7 +152,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
 		/* normalize the block number to be the first block in the range */
 		heapBlk = (heapBlk / pagesPerRange) * pagesPerRange;
 		brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
-										 BUFFER_LOCK_SHARE);
+										 BUFFER_LOCK_SHARE, NULL);
 
 		/* if range is unsummarized, there's nothing to do */
 		if (!brtup)
@@ -285,7 +285,8 @@ brinbeginscan(Relation r, int nkeys, int norderbys)
 	scan = RelationGetIndexScan(r, nkeys, norderbys);
 
 	opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
-	opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
+	opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
+											   scan->xs_snapshot);
 	opaque->bo_bdesc = brin_build_desc(r);
 	scan->opaque = opaque;
 
@@ -368,7 +369,8 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 		MemoryContextResetAndDeleteChildren(perRangeCxt);
 
 		tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
-									   &off, &size, BUFFER_LOCK_SHARE);
+									   &off, &size, BUFFER_LOCK_SHARE,
+									   scan->xs_snapshot);
 		if (tup)
 		{
 			tup = brin_copy_tuple(tup, size);
@@ -647,7 +649,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	/*
 	 * Initialize our state, including the deformed tuple state.
 	 */
-	revmap = brinRevmapInitialize(index, &pagesPerRange);
+	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
 
 	/*
@@ -1045,7 +1047,8 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
 		 * the same.)
 		 */
 		phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
-										 &offset, &phsz, BUFFER_LOCK_SHARE);
+										 &offset, &phsz, BUFFER_LOCK_SHARE,
+										 NULL);
 		/* the placeholder tuple must exist */
 		if (phtup == NULL)
 			elog(ERROR, "missing placeholder tuple");
@@ -1080,7 +1083,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
 	BlockNumber pagesPerRange;
 	Buffer		buf;
 
-	revmap = brinRevmapInitialize(index, &pagesPerRange);
+	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
 
 	/*
 	 * Scan the revmap to find unsummarized items.
@@ -1095,7 +1098,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
 		CHECK_FOR_INTERRUPTS();
 
 		tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
-									   BUFFER_LOCK_SHARE);
+									   BUFFER_LOCK_SHARE, NULL);
 		if (tup == NULL)
 		{
 			/* no revmap entry for this heap range. Summarize it. */
diff --git a/src/backend/access/brin/brin_revmap.c b/src/backend/access/brin/brin_revmap.c
index ce21cbabb7afe9723726457d7d805a6280298949..5ed867cf30bfdfd93dc3f23db25c57f50ed698e1 100644
--- a/src/backend/access/brin/brin_revmap.c
+++ b/src/backend/access/brin/brin_revmap.c
@@ -68,7 +68,8 @@ static void revmap_physical_extend(BrinRevmap *revmap);
  * brinRevmapTerminate when caller is done with it.
  */
 BrinRevmap *
-brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
+brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange,
+					 Snapshot snapshot)
 {
 	BrinRevmap *revmap;
 	Buffer		meta;
@@ -77,7 +78,7 @@ brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
 
 	meta = ReadBuffer(idxrel, BRIN_METAPAGE_BLKNO);
 	LockBuffer(meta, BUFFER_LOCK_SHARE);
-	page = BufferGetPage(meta, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+	page = BufferGetPage(meta, snapshot, idxrel, BGP_TEST_FOR_OLD_SNAPSHOT);
 	metadata = (BrinMetaPageData *) PageGetContents(page);
 
 	revmap = palloc(sizeof(BrinRevmap));
@@ -187,7 +188,8 @@ brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange,
  */
 BrinTuple *
 brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
-						 Buffer *buf, OffsetNumber *off, Size *size, int mode)
+						 Buffer *buf, OffsetNumber *off, Size *size, int mode,
+						 Snapshot snapshot)
 {
 	Relation	idxRel = revmap->rm_irel;
 	BlockNumber mapBlk;
@@ -264,7 +266,8 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
 			*buf = ReadBuffer(idxRel, blk);
 		}
 		LockBuffer(*buf, mode);
-		page = BufferGetPage(*buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(*buf, snapshot, idxRel,
+							 BGP_TEST_FOR_OLD_SNAPSHOT);
 
 		/* If we land on a revmap page, start over */
 		if (BRIN_IS_REGULAR_PAGE(page))
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index 13258cca0ea798c2a6bab8ca732b1c5b651ae321..e593b2bbe9938ca5b2d160e57a16eb7e1ecaaef8 100644
--- a/src/backend/access/gin/ginbtree.c
+++ b/src/backend/access/gin/ginbtree.c
@@ -71,7 +71,7 @@ ginTraverseLock(Buffer buffer, bool searchMode)
  * is share-locked, and stack->parent is NULL.
  */
 GinBtreeStack *
-ginFindLeafPage(GinBtree btree, bool searchMode)
+ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot)
 {
 	GinBtreeStack *stack;
 
@@ -89,7 +89,8 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
 
 		stack->off = InvalidOffsetNumber;
 
-		page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(stack->buffer, snapshot, btree->index,
+							 BGP_TEST_FOR_OLD_SNAPSHOT);
 
 		access = ginTraverseLock(stack->buffer, searchMode);
 
@@ -115,8 +116,8 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
 
 			stack->buffer = ginStepRight(stack->buffer, btree->index, access);
 			stack->blkno = rightlink;
-			page = BufferGetPage(stack->buffer, NULL, NULL,
-								 BGP_NO_SNAPSHOT_TEST);
+			page = BufferGetPage(stack->buffer, snapshot, btree->index,
+								 BGP_TEST_FOR_OLD_SNAPSHOT);
 
 			if (!searchMode && GinPageIsIncompleteSplit(page))
 				ginFinishSplit(btree, stack, false, NULL);
diff --git a/src/backend/access/gin/gindatapage.c b/src/backend/access/gin/gindatapage.c
index 9c501a1af5f17fb119220587a054ea717b1dfc41..ed3d9174f65f54b8ac453e34bd8132a14300a6b1 100644
--- a/src/backend/access/gin/gindatapage.c
+++ b/src/backend/access/gin/gindatapage.c
@@ -1820,7 +1820,7 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
 	{
 		/* search for the leaf page where the first item should go to */
 		btree.itemptr = insertdata.items[insertdata.curitem];
-		stack = ginFindLeafPage(&btree, false);
+		stack = ginFindLeafPage(&btree, false, NULL);
 
 		ginInsertValue(&btree, stack, &insertdata, buildStats);
 	}
@@ -1830,7 +1830,8 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
  * Starts a new scan on a posting tree.
  */
 GinBtreeStack *
-ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
+ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno,
+						Snapshot snapshot)
 {
 	GinBtreeStack *stack;
 
@@ -1838,7 +1839,7 @@ ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
 
 	btree->fullScan = TRUE;
 
-	stack = ginFindLeafPage(btree, TRUE);
+	stack = ginFindLeafPage(btree, TRUE, snapshot);
 
 	return stack;
 }
diff --git a/src/backend/access/gin/ginget.c b/src/backend/access/gin/ginget.c
index 33683278e100135b872d67709d546ea2fc3c3755..b79ba1e62afccb026f48a26d1589f021b231687b 100644
--- a/src/backend/access/gin/ginget.c
+++ b/src/backend/access/gin/ginget.c
@@ -73,7 +73,7 @@ scanPostingTree(Relation index, GinScanEntry scanEntry,
 	Page		page;
 
 	/* Descend to the leftmost leaf page */
-	stack = ginScanBeginPostingTree(&btree, index, rootPostingTree);
+	stack = ginScanBeginPostingTree(&btree, index, rootPostingTree, snapshot);
 	buffer = stack->buffer;
 	IncrBufferRefCount(buffer); /* prevent unpin in freeGinBtreeStack */
 
@@ -146,7 +146,8 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
 		if (moveRightIfItNeeded(btree, stack) == false)
 			return true;
 
-		page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(stack->buffer, snapshot, btree->index,
+							 BGP_TEST_FOR_OLD_SNAPSHOT);
 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off));
 
 		/*
@@ -320,7 +321,7 @@ restartScanEntry:
 	ginPrepareEntryScan(&btreeEntry, entry->attnum,
 						entry->queryKey, entry->queryCategory,
 						ginstate);
-	stackEntry = ginFindLeafPage(&btreeEntry, true);
+	stackEntry = ginFindLeafPage(&btreeEntry, true, snapshot);
 	page = BufferGetPage(stackEntry->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
 	needUnlock = TRUE;
 
@@ -385,7 +386,7 @@ restartScanEntry:
 			needUnlock = FALSE;
 
 			stack = ginScanBeginPostingTree(&entry->btree, ginstate->index,
-											rootPostingTree);
+											rootPostingTree, snapshot);
 			entry->buffer = stack->buffer;
 
 			/*
@@ -627,7 +628,7 @@ entryLoadMoreItems(GinState *ginstate, GinScanEntry entry,
 			entry->btree.itemptr.ip_posid++;
 		}
 		entry->btree.fullScan = false;
-		stack = ginFindLeafPage(&entry->btree, true);
+		stack = ginFindLeafPage(&entry->btree, true, snapshot);
 
 		/* we don't need the stack, just the buffer. */
 		entry->buffer = stack->buffer;
@@ -1335,8 +1336,8 @@ scanGetCandidate(IndexScanDesc scan, pendingPosition *pos)
 	ItemPointerSetInvalid(&pos->item);
 	for (;;)
 	{
-		page = BufferGetPage(pos->pendingBuffer, NULL,
-							 NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot,
+							 scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT);
 
 		maxoff = PageGetMaxOffsetNumber(page);
 		if (pos->firstOffset > maxoff)
@@ -1516,8 +1517,8 @@ collectMatchesForHeapRow(IndexScanDesc scan, pendingPosition *pos)
 		memset(datumExtracted + pos->firstOffset - 1, 0,
 			   sizeof(bool) * (pos->lastOffset - pos->firstOffset));
 
-		page = BufferGetPage(pos->pendingBuffer, NULL,
-							 NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot,
+							 scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT);
 
 		for (i = 0; i < so->nkeys; i++)
 		{
@@ -1710,7 +1711,8 @@ scanPendingInsert(IndexScanDesc scan, TIDBitmap *tbm, int64 *ntids)
 	*ntids = 0;
 
 	LockBuffer(metabuffer, GIN_SHARE);
-	page = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+	page = BufferGetPage(metabuffer, scan->xs_snapshot, scan->indexRelation,
+						 BGP_TEST_FOR_OLD_SNAPSHOT);
 	blkno = GinPageGetMeta(page)->head;
 
 	/*
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index 126501149d25f2b73ec8011fc99b2a7933a86cdf..d4bfed06bcc2a710fb49d2e1d158f6f079fc3f23 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -192,7 +192,7 @@ ginEntryInsert(GinState *ginstate,
 
 	ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
 
-	stack = ginFindLeafPage(&btree, false);
+	stack = ginFindLeafPage(&btree, false, NULL);
 	page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
 
 	if (btree.findItem(&btree, stack))
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 13a039947ba0748bfa31cb121fb524a265f6ff3a..24af868466d822a31b60d77d7d054c35b884b61a 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -336,7 +336,7 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
 	buffer = ReadBuffer(scan->indexRelation, pageItem->blkno);
 	LockBuffer(buffer, GIST_SHARE);
 	gistcheckpage(scan->indexRelation, buffer);
-	page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+	page = BufferGetPage(buffer, scan->xs_snapshot, r, BGP_TEST_FOR_OLD_SNAPSHOT);
 	opaque = GistPageGetOpaque(page);
 
 	/*
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index a5032e1251df741a3bfaeef8aa8ffe1afdc5fb2c..03cd0b006c3fc02cdb716930aba5ef0db0098202 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -278,7 +278,8 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
 
 		buf = so->hashso_curbuf;
 		Assert(BufferIsValid(buf));
-		page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(buf, scan->xs_snapshot, rel,
+							 BGP_TEST_FOR_OLD_SNAPSHOT);
 		maxoffnum = PageGetMaxOffsetNumber(page);
 		for (offnum = ItemPointerGetOffsetNumber(current);
 			 offnum <= maxoffnum;
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
index dd1f464e53addfa00e02a00615f2cfa3c07bc898..4c14362c6fe0238d7d8f906f178a5ade8e84be2d 100644
--- a/src/backend/access/hash/hashsearch.c
+++ b/src/backend/access/hash/hashsearch.c
@@ -188,8 +188,8 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 
 	/* Read the metapage */
 	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
-	page = BufferGetPage(metabuf, NULL, NULL,
-						 BGP_NO_SNAPSHOT_TEST);
+	page = BufferGetPage(metabuf, scan->xs_snapshot, rel,
+						 BGP_TEST_FOR_OLD_SNAPSHOT);
 	metap = HashPageGetMeta(page);
 
 	/*
@@ -242,8 +242,8 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 
 	/* Fetch the primary bucket page for the bucket */
 	buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
-	page = BufferGetPage(buf, NULL, NULL,
-						 BGP_NO_SNAPSHOT_TEST);
+	page = BufferGetPage(buf, scan->xs_snapshot, rel,
+						 BGP_TEST_FOR_OLD_SNAPSHOT);
 	opaque = (HashPageOpaque) PageGetSpecialPointer(page);
 	Assert(opaque->hasho_bucket == bucket);
 
@@ -350,6 +350,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					_hash_readnext(rel, &buf, &page, &opaque);
 					if (BufferIsValid(buf))
 					{
+						TestForOldSnapshot(scan->xs_snapshot, rel, page);
 						maxoff = PageGetMaxOffsetNumber(page);
 						offnum = _hash_binsearch(page, so->hashso_sk_hash);
 					}
@@ -391,6 +392,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					_hash_readprev(rel, &buf, &page, &opaque);
 					if (BufferIsValid(buf))
 					{
+						TestForOldSnapshot(scan->xs_snapshot, rel, page);
 						maxoff = PageGetMaxOffsetNumber(page);
 						offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
 					}
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 66b23540fe29f556a985e2dc3fadf226dad4ddc0..29fd31a819d278c8e052a398c56fd6f4294614e9 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -394,7 +394,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
 	 */
 	LockBuffer(buffer, BUFFER_LOCK_SHARE);
 
-	dp = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+	dp = BufferGetPage(buffer, snapshot, scan->rs_rd,
+					   BGP_TEST_FOR_OLD_SNAPSHOT);
 	lines = PageGetMaxOffsetNumber(dp);
 	ntup = 0;
 
@@ -537,7 +538,7 @@ heapgettup(HeapScanDesc scan,
 
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-		dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd, BGP_TEST_FOR_OLD_SNAPSHOT);
 		lines = PageGetMaxOffsetNumber(dp);
 		/* page and lineoff now reference the physically next tid */
 
@@ -582,7 +583,8 @@ heapgettup(HeapScanDesc scan,
 
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-		dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
+						   BGP_TEST_FOR_OLD_SNAPSHOT);
 		lines = PageGetMaxOffsetNumber(dp);
 
 		if (!scan->rs_inited)
@@ -616,7 +618,8 @@ heapgettup(HeapScanDesc scan,
 			heapgetpage(scan, page);
 
 		/* Since the tuple was previously fetched, needn't lock page here */
-		dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
+						   BGP_TEST_FOR_OLD_SNAPSHOT);
 		lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
 		lpp = PageGetItemId(dp, lineoff);
 		Assert(ItemIdIsNormal(lpp));
@@ -745,7 +748,8 @@ heapgettup(HeapScanDesc scan,
 
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-		dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
+						   BGP_TEST_FOR_OLD_SNAPSHOT);
 		lines = PageGetMaxOffsetNumber((Page) dp);
 		linesleft = lines;
 		if (backward)
@@ -832,7 +836,8 @@ heapgettup_pagemode(HeapScanDesc scan,
 			lineindex = scan->rs_cindex + 1;
 		}
 
-		dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+						   BGP_TEST_FOR_OLD_SNAPSHOT);
 		lines = scan->rs_ntuples;
 		/* page and lineindex now reference the next visible tid */
 
@@ -875,7 +880,8 @@ heapgettup_pagemode(HeapScanDesc scan,
 			page = scan->rs_cblock;		/* current page */
 		}
 
-		dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+						   BGP_TEST_FOR_OLD_SNAPSHOT);
 		lines = scan->rs_ntuples;
 
 		if (!scan->rs_inited)
@@ -908,7 +914,8 @@ heapgettup_pagemode(HeapScanDesc scan,
 			heapgetpage(scan, page);
 
 		/* Since the tuple was previously fetched, needn't lock page here */
-		dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+						   BGP_TEST_FOR_OLD_SNAPSHOT);
 		lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
 		lpp = PageGetItemId(dp, lineoff);
 		Assert(ItemIdIsNormal(lpp));
@@ -1027,7 +1034,8 @@ heapgettup_pagemode(HeapScanDesc scan,
 
 		heapgetpage(scan, page);
 
-		dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+						   BGP_TEST_FOR_OLD_SNAPSHOT);
 		lines = scan->rs_ntuples;
 		linesleft = lines;
 		if (backward)
@@ -1871,7 +1879,7 @@ heap_fetch(Relation relation,
 	 * Need share lock on buffer to examine tuple commit status.
 	 */
 	LockBuffer(buffer, BUFFER_LOCK_SHARE);
-	page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+	page = BufferGetPage(buffer, snapshot, relation, BGP_TEST_FOR_OLD_SNAPSHOT);
 
 	/*
 	 * We'd better check for out-of-range offnum in case of VACUUM since the
@@ -2200,7 +2208,8 @@ heap_get_latest_tid(Relation relation,
 		 */
 		buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
-		page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(buffer, snapshot, relation,
+							 BGP_TEST_FOR_OLD_SNAPSHOT);
 
 		/*
 		 * Check for bogus item number.  This is not treated as an error
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 19201b0bca5c9be7f2a36bb838414345aabe8385..ce073ccdc2359933ee6e8d23a549196dffcf6051 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -92,12 +92,21 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
 	 * need to use the horizon that includes slots, otherwise the data-only
 	 * horizon can be used. Note that the toast relation of user defined
 	 * relations are *not* considered catalog relations.
+	 *
+	 * It is OK to apply the old snapshot limit before acquiring the cleanup
+	 * lock because the worst that can happen is that we are not quite as
+	 * aggressive about the cleanup (by however many transaction IDs are
+	 * consumed between this point and acquiring the lock).  This allows us to
+	 * save significant overhead in the case where the page is found not to be
+	 * prunable.
 	 */
 	if (IsCatalogRelation(relation) ||
 		RelationIsAccessibleInLogicalDecoding(relation))
 		OldestXmin = RecentGlobalXmin;
 	else
-		OldestXmin = RecentGlobalDataXmin;
+		OldestXmin =
+				TransactionIdLimitedForOldSnapshots(RecentGlobalDataXmin,
+													relation);
 
 	Assert(TransactionIdIsValid(OldestXmin));
 
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index bf7a817551771151f49d63dde592d3c2bf9716e4..3796656e177e3e985f8d01d78eb756bd6e94b117 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -119,7 +119,7 @@ _bt_doinsert(Relation rel, IndexTuple itup,
 
 top:
 	/* find the first page containing this key */
-	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
 
 	offset = InvalidOffsetNumber;
 
@@ -135,7 +135,7 @@ top:
 	 * precise description.
 	 */
 	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
-						true, stack, BT_WRITE);
+						true, stack, BT_WRITE, NULL);
 
 	/*
 	 * If we're not allowing duplicates, make sure the key isn't already in
@@ -1682,7 +1682,8 @@ _bt_insert_parent(Relation rel,
 			elog(DEBUG2, "concurrent ROOT page split");
 			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
 			/* Find the leftmost page at the next level up */
-			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
+			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
+									NULL);
 			/* Set up a phony stack entry pointing there */
 			stack = &fakestack;
 			stack->bts_blkno = BufferGetBlockNumber(pbuf);
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 36b18047615026e0f9cc7e3286463b4787563aa3..9ba61d5fe130ec5330149da4cb7938d133eb8146 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -1255,7 +1255,7 @@ _bt_pagedel(Relation rel, Buffer buf)
 				itup_scankey = _bt_mkscankey(rel, targetkey);
 				/* find the leftmost leaf page containing this key */
 				stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
-								   false, &lbuf, BT_READ);
+								   false, &lbuf, BT_READ, NULL);
 				/* don't need a pin on the page */
 				_bt_relbuf(rel, lbuf);
 
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 83f790f79175a543b7f25e11b07bcec51bbbc967..470bab0c521e855dfefc5493d3d8104d48337482 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -79,6 +79,10 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
  * address of the leaf-page buffer, which is read-locked and pinned.
  * No locks are held on the parent pages, however!
  *
+ * If the snapshot parameter is not NULL, "old snapshot" checking will take
+ * place during the descent through the tree.  This is not needed when
+ * positioning for an insert or delete, so NULL is used for those cases.
+ *
  * NOTE that the returned buffer is read-locked regardless of the access
  * parameter.  However, access = BT_WRITE will allow an empty root page
  * to be created and returned.  When access = BT_READ, an empty index
@@ -87,7 +91,7 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
  */
 BTStack
 _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
-		   Buffer *bufP, int access)
+		   Buffer *bufP, int access, Snapshot snapshot)
 {
 	BTStack		stack_in = NULL;
 
@@ -126,7 +130,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
 		 */
 		*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
 							  (access == BT_WRITE), stack_in,
-							  BT_READ);
+							  BT_READ, snapshot);
 
 		/* if this is a leaf page, we're done */
 		page = BufferGetPage(*bufP, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
@@ -199,6 +203,10 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
  * On entry, we have the buffer pinned and a lock of the type specified by
  * 'access'.  If we move right, we release the buffer and lock and acquire
  * the same on the right sibling.  Return value is the buffer we stop at.
+ *
+ * If the snapshot parameter is not NULL, "old snapshot" checking will take
+ * place during the descent through the tree.  This is not needed when
+ * positioning for an insert or delete, so NULL is used for those cases.
  */
 Buffer
 _bt_moveright(Relation rel,
@@ -208,7 +216,8 @@ _bt_moveright(Relation rel,
 			  bool nextkey,
 			  bool forupdate,
 			  BTStack stack,
-			  int access)
+			  int access,
+			  Snapshot snapshot)
 {
 	Page		page;
 	BTPageOpaque opaque;
@@ -233,7 +242,7 @@ _bt_moveright(Relation rel,
 
 	for (;;)
 	{
-		page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
 		if (P_RIGHTMOST(opaque))
@@ -972,7 +981,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 	 * Use the manufactured insertion scan key to descend the tree and
 	 * position ourselves on the target leaf page.
 	 */
-	stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ);
+	stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ,
+					   scan->xs_snapshot);
 
 	/* don't need to keep the stack around... */
 	_bt_freestack(stack);
@@ -1337,8 +1347,8 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 			/* step right one page */
 			so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
 			/* check for deleted page */
-			page = BufferGetPage(so->currPos.buf, NULL, NULL,
-								 BGP_NO_SNAPSHOT_TEST);
+			page = BufferGetPage(so->currPos.buf, scan->xs_snapshot, rel,
+								 BGP_TEST_FOR_OLD_SNAPSHOT);
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 			if (!P_IGNORE(opaque))
 			{
@@ -1412,8 +1422,8 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 			 * it's not half-dead and contains matching tuples. Else loop back
 			 * and do it all again.
 			 */
-			page = BufferGetPage(so->currPos.buf, NULL, NULL,
-								 BGP_NO_SNAPSHOT_TEST);
+			page = BufferGetPage(so->currPos.buf, scan->xs_snapshot, rel,
+								 BGP_TEST_FOR_OLD_SNAPSHOT);
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 			if (!P_IGNORE(opaque))
 			{
@@ -1476,7 +1486,7 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
 		/* check for interrupts while we're not holding any buffer lock */
 		CHECK_FOR_INTERRUPTS();
 		buf = _bt_getbuf(rel, blkno, BT_READ);
-		page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
 		/*
@@ -1502,14 +1512,14 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
 				break;
 			blkno = opaque->btpo_next;
 			buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
-			page = BufferGetPage(buf, NULL, NULL,
-								 BGP_NO_SNAPSHOT_TEST);
+			page = BufferGetPage(buf, snapshot, rel,
+								 BGP_TEST_FOR_OLD_SNAPSHOT);
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 		}
 
 		/* Return to the original page to see what's up */
 		buf = _bt_relandgetbuf(rel, buf, obknum, BT_READ);
-		page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 		if (P_ISDELETED(opaque))
 		{
@@ -1526,8 +1536,8 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
 						 RelationGetRelationName(rel));
 				blkno = opaque->btpo_next;
 				buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
-				page = BufferGetPage(buf, NULL, NULL,
-									 BGP_NO_SNAPSHOT_TEST);
+				page = BufferGetPage(buf, snapshot, rel,
+									 BGP_TEST_FOR_OLD_SNAPSHOT);
 				opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 				if (!P_ISDELETED(opaque))
 					break;
@@ -1564,7 +1574,8 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
  * The returned buffer is pinned and read-locked.
  */
 Buffer
-_bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
+_bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
+				 Snapshot snapshot)
 {
 	Buffer		buf;
 	Page		page;
@@ -1586,7 +1597,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
 	if (!BufferIsValid(buf))
 		return InvalidBuffer;
 
-	page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+	page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
 	for (;;)
@@ -1605,8 +1616,8 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
 				elog(ERROR, "fell off the end of index \"%s\"",
 					 RelationGetRelationName(rel));
 			buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
-			page = BufferGetPage(buf, NULL, NULL,
-								 BGP_NO_SNAPSHOT_TEST);
+			page = BufferGetPage(buf, snapshot, rel,
+								 BGP_TEST_FOR_OLD_SNAPSHOT);
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 		}
 
@@ -1659,7 +1670,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
 	 * version of _bt_search().  We don't maintain a stack since we know we
 	 * won't need it.
 	 */
-	buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir));
+	buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir), scan->xs_snapshot);
 
 	if (!BufferIsValid(buf))
 	{
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index fafdca31f3965f02675e8412961bedfcd0645a26..7acd71a2911e7610478811a944ca6889b5c4b864 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -341,7 +341,7 @@ redirect:
 		}
 		/* else new pointer points to the same page, no work needed */
 
-		page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+		page = BufferGetPage(buffer, snapshot, index, BGP_TEST_FOR_OLD_SNAPSHOT);
 
 		isnull = SpGistPageStoresNulls(page) ? true : false;
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 4cb4acf33a505c9fae6e315423eb9667b95de808..93361a0c99c964a506c87e36878e0f3ce94f787e 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -489,7 +489,8 @@ vacuum_set_xid_limits(Relation rel,
 	 * working on a particular table at any time, and that each vacuum is
 	 * always an independent transaction.
 	 */
-	*oldestXmin = GetOldestXmin(rel, true);
+	*oldestXmin =
+		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 3f48ef40701ee1e4b5ae82ce487b076f88286fbf..d0e92b336580e1974a87ca17cda834d38dafec36 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -1660,7 +1660,8 @@ should_attempt_truncation(LVRelStats *vacrelstats)
 	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
 	if (possibly_freeable > 0 &&
 		(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
-		 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION))
+		 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION) &&
+		old_snapshot_threshold < 0)
 		return true;
 	else
 		return false;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 9874c3eaa04477fd824c56400d845499ab6d2b1d..c664984d0a11b9f75a71bbcd4612843bbd777efd 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -4114,3 +4114,43 @@ IssuePendingWritebacks(WritebackContext *context)
 
 	context->nr_pending = 0;
 }
+
+
+/*
+ * Check whether the given snapshot is too old to have safely read the given
+ * page from the given table.  If so, throw a "snapshot too old" error.
+ *
+ * This test generally needs to be performed after every BufferGetPage() call
+ * that is executed as part of a scan.  It is not needed for calls made for
+ * modifying the page (for example, to position to the right place to insert a
+ * new index tuple or for vacuuming).  To minimize errors of omission, the
+ * BufferGetPage() macro accepts parameters to specify whether the test should
+ * be run, and supply the necessary snapshot and relation parameters.  See the
+ * declaration of BufferGetPage() for more details.
+ *
+ * Note that a NULL snapshot argument is allowed and causes a fast return
+ * without error; this is to support call sites which can be called from
+ * either scans or index modification areas.
+ *
+ * For best performance, keep the tests that are fastest and/or most likely to
+ * exclude a page from old snapshot testing near the front.
+ */
+extern Page
+TestForOldSnapshot(Snapshot snapshot, Relation relation, Page page)
+{
+	Assert(relation != NULL);
+
+	if (old_snapshot_threshold >= 0
+	 && (snapshot) != NULL
+	 && (snapshot)->satisfies == HeapTupleSatisfiesMVCC
+	 && !XLogRecPtrIsInvalid((snapshot)->lsn)
+	 && PageGetLSN(page) > (snapshot)->lsn
+	 && !IsCatalogRelation(relation)
+	 && !RelationIsAccessibleInLogicalDecoding(relation)
+	 && (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp())
+		ereport(ERROR,
+				(errcode(ERRCODE_SNAPSHOT_TOO_OLD),
+				 errmsg("snapshot too old")));
+
+	return page;
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 36a04fc57089370f9bc9d3ede266b2e0cf95580e..c04b17fa8ead59f8190a3fa7d880197e9cff9235 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -43,6 +43,7 @@
 #include "storage/procsignal.h"
 #include "storage/sinvaladt.h"
 #include "storage/spin.h"
+#include "utils/snapmgr.h"
 
 
 shmem_startup_hook_type shmem_startup_hook = NULL;
@@ -136,6 +137,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, ReplicationOriginShmemSize());
 		size = add_size(size, WalSndShmemSize());
 		size = add_size(size, WalRcvShmemSize());
+		size = add_size(size, SnapMgrShmemSize());
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
@@ -247,6 +249,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	/*
 	 * Set up other modules that need some shared memory space
 	 */
+	SnapMgrInit();
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 01cfa9d5f90ddc403af4f4a8b04dd1f37788a1f9..5bc9fd6595e1e2ca29e97585a8e35c1f483c58e4 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1759,6 +1759,15 @@ GetSnapshotData(Snapshot snapshot)
 	snapshot->regd_count = 0;
 	snapshot->copied = false;
 
+	/*
+	 * Capture the current time and WAL stream location in case this snapshot
+	 * becomes old enough to need to fall back on the special "old snapshot"
+	 * logic.
+	 */
+	snapshot->lsn = GetXLogInsertRecPtr();
+	snapshot->whenTaken = GetSnapshotCurrentTimestamp();
+	MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
+
 	return snapshot;
 }
 
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index c557cb68d0b3255777af88ac2dfcd8bb11029689..f8996cd21a552089d08978936cd7418f9232aee9 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -46,3 +46,4 @@ CommitTsControlLock					38
 CommitTsLock						39
 ReplicationOriginLock				40
 MultiXactTruncationLock				41
+OldSnapshotTimeMapLock				42
diff --git a/src/backend/utils/errcodes.txt b/src/backend/utils/errcodes.txt
index 49494f9cd31510e3716145a0c0bedbf40592fc15..be924d58bd56b7beda10c38c19efc12290a87d96 100644
--- a/src/backend/utils/errcodes.txt
+++ b/src/backend/utils/errcodes.txt
@@ -417,6 +417,10 @@ Section: Class 58 - System Error (errors external to PostgreSQL itself)
 58P01    E    ERRCODE_UNDEFINED_FILE                                         undefined_file
 58P02    E    ERRCODE_DUPLICATE_FILE                                         duplicate_file
 
+Section: Class 72 - Snapshot Failure
+# (class borrowed from Oracle)
+72000    E    ERRCODE_SNAPSHOT_TOO_OLD                                       snapshot_too_old
+
 Section: Class F0 - Configuration File Error
 
 # (PostgreSQL-specific error class)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index f7ed167d7f87dc196f8ccd5731cf8dbcc46fa05c..fb091bc4a0bdde429fcc3662c4e4b658cac8bf1c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2677,6 +2677,17 @@ static struct config_int ConfigureNamesInt[] =
 		check_autovacuum_work_mem, NULL, NULL
 	},
 
+	{
+		{"old_snapshot_threshold", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS,
+			gettext_noop("Time before a snapshot is too old to read pages changed after the snapshot was taken."),
+			gettext_noop("A value of -1 disables this feature."),
+			GUC_UNIT_MIN
+		},
+		&old_snapshot_threshold,
+		-1, -1, MINS_PER_HOUR * HOURS_PER_DAY * 60,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
 			gettext_noop("Time between issuing TCP keepalives."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index bcc86e29d27743e48c43505999837f539f5f53cf..d4dd285ef0a6b52333d630cf0cc5d010df8aa730 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -166,6 +166,8 @@
 #effective_io_concurrency = 1		# 1-1000; 0 disables prefetching
 #max_worker_processes = 8
 #max_parallel_degree = 0		# max number of worker processes per node
+#old_snapshot_threshold = -1		# 1min-60d; -1 disables; 0 is immediate
+									# (change requires restart)
 
 
 #------------------------------------------------------------------------------
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index b88e01200419b114309f01ab7dd8ab4e5c9cc3b3..19504c3598747db0dd121bc29e8af9c82a20738c 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -46,20 +46,82 @@
 
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "catalog/catalog.h"
 #include "lib/pairingheap.h"
 #include "miscadmin.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/sinval.h"
+#include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/resowner_private.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/tqual.h"
 
 
+/*
+ * GUC parameters
+ */
+int			old_snapshot_threshold;		/* number of minutes, -1 disables */
+
+/*
+ * Structure for dealing with old_snapshot_threshold implementation.
+ */
+typedef struct OldSnapshotControlData
+{
+	/*
+	 * Variables for old snapshot handling are shared among processes and are
+	 * only allowed to move forward.
+	 */
+	slock_t		mutex_current;			/* protect current timestamp */
+	int64		current_timestamp;		/* latest snapshot timestamp */
+	slock_t		mutex_latest_xmin;		/* protect latest snapshot xmin */
+	TransactionId latest_xmin;			/* latest snapshot xmin */
+	slock_t		mutex_threshold;		/* protect threshold fields */
+	int64		threshold_timestamp;	/* earlier snapshot is old */
+	TransactionId threshold_xid;		/* earlier xid may be gone */
+
+	/*
+	 * Keep one xid per minute for old snapshot error handling.
+	 *
+	 * Use a circular buffer with a head offset, a count of entries currently
+	 * used, and a timestamp corresponding to the xid at the head offset.  A
+	 * count_used value of zero means that there are no times stored; a
+	 * count_used value of old_snapshot_threshold means that the buffer is
+	 * full and the head must be advanced to add new entries.  Use timestamps
+	 * aligned to minute boundaries, since that seems less surprising than
+	 * aligning based on the first usage timestamp.
+	 *
+	 * It is OK if the xid for a given time slot is from earlier than
+	 * calculated by adding the number of minutes corresponding to the
+	 * (possibly wrapped) distance from the head offset to the time of the
+	 * head entry, since that just results in the vacuuming of old tuples
+	 * being slightly less aggressive.  It would not be OK for it to be off in
+	 * the other direction, since it might result in vacuuming tuples that are
+	 * still expected to be there.
+	 *
+	 * Use of an SLRU was considered but not chosen because it is more
+	 * heavyweight than is needed for this, and would probably not be any less
+	 * code to implement.
+	 *
+	 * Persistence is not needed.
+	 */
+	int			head_offset;		/* subscript of oldest tracked time */
+	int64		head_timestamp;		/* time corresponding to head xid */
+	int			count_used;			/* how many slots are in use */
+	TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
+}	OldSnapshotControlData;
+
+typedef struct OldSnapshotControlData *OldSnapshotControl;
+
+static volatile OldSnapshotControl oldSnapshotControl;
+
+
 /*
  * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
  * mode, and to the latest one taken in a read-committed transaction.
@@ -153,6 +215,7 @@ static Snapshot FirstXactSnapshot = NULL;
 static List *exportedSnapshots = NIL;
 
 /* Prototypes for local functions */
+static int64 AlignTimestampToMinuteBoundary(int64 ts);
 static Snapshot CopySnapshot(Snapshot snapshot);
 static void FreeSnapshot(Snapshot snapshot);
 static void SnapshotResetXmin(void);
@@ -174,6 +237,49 @@ typedef struct SerializedSnapshotData
 	CommandId	curcid;
 } SerializedSnapshotData;
 
+Size
+SnapMgrShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(OldSnapshotControlData, xid_by_minute);
+	if (old_snapshot_threshold > 0)
+		size = add_size(size, mul_size(sizeof(TransactionId),
+									   old_snapshot_threshold));
+
+	return size;
+}
+
+/*
+ * Initialize for managing old snapshot detection.
+ */
+void
+SnapMgrInit(void)
+{
+	bool		found;
+
+	/*
+	 * Create or attach to the OldSnapshotControl structure.
+	 */
+	oldSnapshotControl = (OldSnapshotControl)
+		ShmemInitStruct("OldSnapshotControlData",
+						SnapMgrShmemSize(), &found);
+
+	if (!found)
+	{
+		SpinLockInit(&oldSnapshotControl->mutex_current);
+		oldSnapshotControl->current_timestamp = 0;
+		SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
+		oldSnapshotControl->latest_xmin = InvalidTransactionId;
+		SpinLockInit(&oldSnapshotControl->mutex_threshold);
+		oldSnapshotControl->threshold_timestamp = 0;
+		oldSnapshotControl->threshold_xid = InvalidTransactionId;
+		oldSnapshotControl->head_offset = 0;
+		oldSnapshotControl->head_timestamp = 0;
+		oldSnapshotControl->count_used = 0;
+	}
+}
+
 /*
  * GetTransactionSnapshot
  *		Get the appropriate snapshot for a new query in a transaction.
@@ -1405,6 +1511,304 @@ ThereAreNoPriorRegisteredSnapshots(void)
 	return false;
 }
 
+
+/*
+ * Return an int64 timestamp which is exactly on a minute boundary.
+ *
+ * If the argument is already aligned, return that value, otherwise move to
+ * the next minute boundary following the given time.
+ */
+static int64
+AlignTimestampToMinuteBoundary(int64 ts)
+{
+	int64		retval = ts + (USECS_PER_MINUTE - 1);
+
+	return retval - (retval % USECS_PER_MINUTE);
+}
+
+/*
+ * Get current timestamp for snapshots as int64 that never moves backward.
+ */
+int64
+GetSnapshotCurrentTimestamp(void)
+{
+	int64		now = GetCurrentIntegerTimestamp();
+
+	/*
+	 * Don't let time move backward; if it hasn't advanced, use the old value.
+	 */
+	SpinLockAcquire(&oldSnapshotControl->mutex_current);
+	if (now <= oldSnapshotControl->current_timestamp)
+		now = oldSnapshotControl->current_timestamp;
+	else
+		oldSnapshotControl->current_timestamp = now;
+	SpinLockRelease(&oldSnapshotControl->mutex_current);
+
+	return now;
+}
+
+/*
+ * Get timestamp through which vacuum may have processed based on last stored
+ * value for threshold_timestamp.
+ *
+ * XXX: So far, we never trust that a 64-bit value can be read atomically; if
+ * that ever changes, we could get rid of the spinlock here.
+ */
+int64
+GetOldSnapshotThresholdTimestamp(void)
+{
+	int64		threshold_timestamp;
+
+	SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+	threshold_timestamp = oldSnapshotControl->threshold_timestamp;
+	SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+	return threshold_timestamp;
+}
+
+static void
+SetOldSnapshotThresholdTimestamp(int64 ts, TransactionId xlimit)
+{
+	SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+	oldSnapshotControl->threshold_timestamp = ts;
+	oldSnapshotControl->threshold_xid = xlimit;
+	SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+}
+
+/*
+ * TransactionIdLimitedForOldSnapshots
+ *
+ * Apply old snapshot limit, if any.  This is intended to be called for page
+ * pruning and table vacuuming, to allow old_snapshot_threshold to override
+ * the normal global xmin value.  Actual testing for snapshot too old will be
+ * based on whether a snapshot timestamp is prior to the threshold timestamp
+ * set in this function.
+ */
+TransactionId
+TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+									Relation relation)
+{
+	if (TransactionIdIsNormal(recentXmin)
+		&& old_snapshot_threshold >= 0
+		&& RelationNeedsWAL(relation)
+		&& !IsCatalogRelation(relation)
+		&& !RelationIsAccessibleInLogicalDecoding(relation))
+	{
+		int64		ts = GetSnapshotCurrentTimestamp();
+		TransactionId xlimit = recentXmin;
+		TransactionId latest_xmin = oldSnapshotControl->latest_xmin;
+		bool		same_ts_as_threshold = false;
+
+		/*
+		 * Zero threshold always overrides to latest xmin, if valid.  Without
+		 * some heuristic it will find its own snapshot too old on, for
+		 * example, a simple UPDATE -- which would make it useless for most
+		 * testing, but there is no principled way to ensure that it doesn't
+		 * fail in this way.  Use a five-second delay to try to get useful
+		 * testing behavior, but this may need adjustment.
+		 */
+		if (old_snapshot_threshold == 0)
+		{
+			if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
+				&& TransactionIdFollows(latest_xmin, xlimit))
+				xlimit = latest_xmin;
+
+			ts -= 5 * USECS_PER_SEC;
+			SetOldSnapshotThresholdTimestamp(ts, xlimit);
+
+			return xlimit;
+		}
+
+		ts = AlignTimestampToMinuteBoundary(ts)
+			 - (old_snapshot_threshold * USECS_PER_MINUTE);
+
+		/* Check for fast exit without LW locking. */
+		SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+		if (ts == oldSnapshotControl->threshold_timestamp)
+		{
+			xlimit = oldSnapshotControl->threshold_xid;
+			same_ts_as_threshold = true;
+		}
+		SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+		if (!same_ts_as_threshold)
+		{
+			LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
+
+			if (oldSnapshotControl->count_used > 0
+				&& ts >= oldSnapshotControl->head_timestamp)
+			{
+				int		offset;
+
+				offset = ((ts - oldSnapshotControl->head_timestamp)
+						  / USECS_PER_MINUTE);
+				if (offset > oldSnapshotControl->count_used - 1)
+					offset = oldSnapshotControl->count_used - 1;
+				offset = (oldSnapshotControl->head_offset + offset)
+						% old_snapshot_threshold;
+				xlimit = oldSnapshotControl->xid_by_minute[offset];
+
+				if (NormalTransactionIdFollows(xlimit, recentXmin))
+					SetOldSnapshotThresholdTimestamp(ts, xlimit);
+			}
+
+			LWLockRelease(OldSnapshotTimeMapLock);
+		}
+
+		/*
+		 * Failsafe protection against vacuuming work of active transaction.
+		 *
+		 * This is not an assertion because we avoid the spinlock for
+		 * performance, leaving open the possibility that xlimit could advance
+		 * and be more current; but it seems prudent to apply this limit.  It
+		 * might make pruning a tiny bit less agressive than it could be, but
+		 * protects against data loss bugs.
+		 */
+		if (TransactionIdIsNormal(latest_xmin)
+			&& TransactionIdPrecedes(latest_xmin, xlimit))
+			xlimit = latest_xmin;
+
+		if (NormalTransactionIdFollows(xlimit, recentXmin))
+			return xlimit;
+	}
+
+	return recentXmin;
+}
+
+/*
+ * Take care of the circular buffer that maps time to xid.
+ */
+void
+MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin)
+{
+	int64		ts;
+
+	/* Fast exit when old_snapshot_threshold is not used. */
+	if (old_snapshot_threshold < 0)
+		return;
+
+	/* Keep track of the latest xmin seen by any process. */
+	SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
+	if (TransactionIdFollows(xmin, oldSnapshotControl->latest_xmin))
+		oldSnapshotControl->latest_xmin = xmin;
+	SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
+
+	/* No further tracking needed for 0 (used for testing). */
+	if (old_snapshot_threshold == 0)
+		return;
+
+	/*
+	 * We don't want to do something stupid with unusual values, but we don't
+	 * want to litter the log with warnings or break otherwise normal
+	 * processing for this feature; so if something seems unreasonable, just
+	 * log at DEBUG level and return without doing anything.
+	 */
+	if (whenTaken < 0)
+	{
+		elog(DEBUG1,
+			 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
+			 (long) whenTaken);
+		return;
+	}
+	if (!TransactionIdIsNormal(xmin))
+	{
+		elog(DEBUG1,
+			 "MaintainOldSnapshotTimeMapping called with xmin = %lu",
+			 (unsigned long) xmin);
+		return;
+	}
+
+	ts = AlignTimestampToMinuteBoundary(whenTaken);
+
+	LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
+
+	Assert(oldSnapshotControl->head_offset >= 0);
+	Assert(oldSnapshotControl->head_offset < old_snapshot_threshold);
+	Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
+	Assert(oldSnapshotControl->count_used >= 0);
+	Assert(oldSnapshotControl->count_used <= old_snapshot_threshold);
+
+	if (oldSnapshotControl->count_used == 0)
+	{
+		/* set up first entry for empty mapping */
+		oldSnapshotControl->head_offset = 0;
+		oldSnapshotControl->head_timestamp = ts;
+		oldSnapshotControl->count_used = 1;
+		oldSnapshotControl->xid_by_minute[0] = xmin;
+	}
+	else if (ts < oldSnapshotControl->head_timestamp)
+	{
+		/* old ts; log it at DEBUG */
+		LWLockRelease(OldSnapshotTimeMapLock);
+		elog(DEBUG1,
+			 "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
+			 (long) whenTaken);
+		return;
+	}
+	else if (ts <= (oldSnapshotControl->head_timestamp +
+					((oldSnapshotControl->count_used - 1)
+					 * USECS_PER_MINUTE)))
+	{
+		/* existing mapping; advance xid if possible */
+		int		bucket = (oldSnapshotControl->head_offset
+						  + ((ts - oldSnapshotControl->head_timestamp)
+							 / USECS_PER_MINUTE))
+						 % old_snapshot_threshold;
+
+		if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
+			oldSnapshotControl->xid_by_minute[bucket] = xmin;
+	}
+	else
+	{
+		/* We need a new bucket, but it might not be the very next one. */
+		int		advance = ((ts - oldSnapshotControl->head_timestamp)
+						   / USECS_PER_MINUTE);
+
+		oldSnapshotControl->head_timestamp = ts;
+
+		if (advance >= old_snapshot_threshold)
+		{
+			/* Advance is so far that all old data is junk; start over. */
+			oldSnapshotControl->head_offset = 0;
+			oldSnapshotControl->count_used = 1;
+			oldSnapshotControl->xid_by_minute[0] = xmin;
+		}
+		else
+		{
+			/* Store the new value in one or more buckets. */
+			int i;
+
+			for (i = 0; i < advance; i++)
+			{
+				if (oldSnapshotControl->count_used == old_snapshot_threshold)
+				{
+					/* Map full and new value replaces old head. */
+					int		old_head = oldSnapshotControl->head_offset;
+
+					if (old_head == (old_snapshot_threshold - 1))
+						oldSnapshotControl->head_offset = 0;
+					else
+						oldSnapshotControl->head_offset = old_head + 1;
+					oldSnapshotControl->xid_by_minute[old_head] = xmin;
+				}
+				else
+				{
+					/* Extend map to unused entry. */
+					int		new_tail = (oldSnapshotControl->head_offset
+										+ oldSnapshotControl->count_used)
+									   % old_snapshot_threshold;
+
+					oldSnapshotControl->count_used++;
+					oldSnapshotControl->xid_by_minute[new_tail] = xmin;
+				}
+			}
+		}
+	}
+
+	LWLockRelease(OldSnapshotTimeMapLock);
+}
+
+
 /*
  * Setup a snapshot that replaces normal catalog snapshots that allows catalog
  * access to behave just like it did at a certain point in the past.
diff --git a/src/include/access/brin_revmap.h b/src/include/access/brin_revmap.h
index 19528bf79ed2cf3f4764c43760472efd09c4ef2c..89054e01281636b50af6cba74299ffc2a6c40fc9 100644
--- a/src/include/access/brin_revmap.h
+++ b/src/include/access/brin_revmap.h
@@ -18,12 +18,13 @@
 #include "storage/itemptr.h"
 #include "storage/off.h"
 #include "utils/relcache.h"
+#include "utils/snapshot.h"
 
 /* struct definition lives in brin_revmap.c */
 typedef struct BrinRevmap BrinRevmap;
 
 extern BrinRevmap *brinRevmapInitialize(Relation idxrel,
-					 BlockNumber *pagesPerRange);
+					 BlockNumber *pagesPerRange, Snapshot snapshot);
 extern void brinRevmapTerminate(BrinRevmap *revmap);
 
 extern void brinRevmapExtend(BrinRevmap *revmap,
@@ -34,6 +35,6 @@ extern void brinSetHeapBlockItemptr(Buffer rmbuf, BlockNumber pagesPerRange,
 						BlockNumber heapBlk, ItemPointerData tid);
 extern BrinTuple *brinGetTupleForHeapBlock(BrinRevmap *revmap,
 						 BlockNumber heapBlk, Buffer *buf, OffsetNumber *off,
-						 Size *size, int mode);
+						 Size *size, int mode, Snapshot snapshot);
 
 #endif   /* BRIN_REVMAP_H */
diff --git a/src/include/access/gin_private.h b/src/include/access/gin_private.h
index e212c9ff1e0504271bce4147333884e6f19c6728..a7d4a90d4b1d9ce55a24b3351a5b3143c6bc249c 100644
--- a/src/include/access/gin_private.h
+++ b/src/include/access/gin_private.h
@@ -703,7 +703,7 @@ typedef struct
  * PostingItem
  */
 
-extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode);
+extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot);
 extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode);
 extern void freeGinBtreeStack(GinBtreeStack *stack);
 extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
@@ -731,7 +731,7 @@ extern void GinPageDeletePostingItem(Page page, OffsetNumber offset);
 extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
 					  ItemPointerData *items, uint32 nitem,
 					  GinStatsData *buildStats);
-extern GinBtreeStack *ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno);
+extern GinBtreeStack *ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno, Snapshot snapshot);
 extern void ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
 extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
 
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 9046b166bd9a56262f9cd02fd22b5a365307b64b..ca5034907dd6b27633aa75513fa63924804a70b9 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -710,17 +710,18 @@ extern int	_bt_pagedel(Relation rel, Buffer buf);
  */
 extern BTStack _bt_search(Relation rel,
 		   int keysz, ScanKey scankey, bool nextkey,
-		   Buffer *bufP, int access);
+		   Buffer *bufP, int access, Snapshot snapshot);
 extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
 			  ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
-			  int access);
+			  int access, Snapshot snapshot);
 extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
 			ScanKey scankey, bool nextkey);
 extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
 			Page page, OffsetNumber offnum);
 extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
 extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
-extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost);
+extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
+							   Snapshot snapshot);
 
 /*
  * prototypes for functions in nbtutils.c
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 4c15934f36b95ba20272d7828299c0894ea502dc..6fea1bc13dcb93520e3b6c812cf10b378cdd9b72 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -180,11 +180,26 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 /*
  * BufferGetPage
  *		Returns the page associated with a buffer.
+ *
+ * agetest will normally be a literal, so use a macro at the outer level to
+ * give the compiler a chance to optimize away the runtime code to check it.
+ *
+ * TestForOldSnapshot(), if it doesn't throw an error, will return the page
+ * argument it is passed, so the same result will go back to this macro's
+ * caller for either agetest value; it is a matter of whether to call the
+ * function to perform the test.  For call sites where the check is not needed
+ * (which is the vast majority of them), the snapshot and relation parameters
+ * can, and generally should, be NULL.
  */
 #define BufferGetPage(buffer, snapshot, relation, agetest) \
 ( \
-	AssertMacro((agetest) == BGP_NO_SNAPSHOT_TEST), \
-	((Page)BufferGetBlock(buffer)) \
+	( \
+		AssertMacro((agetest) == BGP_NO_SNAPSHOT_TEST || (agetest) == BGP_TEST_FOR_OLD_SNAPSHOT), \
+		((agetest) == BGP_NO_SNAPSHOT_TEST) \
+	) ? \
+		((Page)BufferGetBlock(buffer)) \
+	: \
+		(TestForOldSnapshot(snapshot, relation, (Page)BufferGetBlock(buffer))) \
 )
 
 /*
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index c7582c2a11ca27ab476a2f652e8a6bf3c8b366dc..b5d82d60042650b628a3bb4087b8c76aa280d099 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -15,6 +15,7 @@
 #define REL_H
 
 #include "access/tupdesc.h"
+#include "access/xlog.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
 #include "fmgr.h"
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index a9e9066dd2958eed32dd49d741f65fcd6a5ae13e..371042a6077069f51a1a0d39fe0391ba78d46dcc 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -14,10 +14,20 @@
 #define SNAPMGR_H
 
 #include "fmgr.h"
+#include "utils/relcache.h"
 #include "utils/resowner.h"
 #include "utils/snapshot.h"
 
 
+/* GUC variables */
+extern int	old_snapshot_threshold;
+
+
+extern Size SnapMgrShmemSize(void);
+extern void SnapMgrInit(void);
+extern int64 GetSnapshotCurrentTimestamp(void);
+extern int64 GetOldSnapshotThresholdTimestamp(void);
+
 extern bool FirstSnapshotSet;
 
 extern TransactionId TransactionXmin;
@@ -54,6 +64,9 @@ extern void ImportSnapshot(const char *idstr);
 extern bool XactHasExportedSnapshots(void);
 extern void DeleteAllExportedSnapshotFiles(void);
 extern bool ThereAreNoPriorRegisteredSnapshots(void);
+extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+														 Relation relation);
+extern void MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin);
 
 extern char *ExportSnapshot(Snapshot snapshot);
 
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 2a563633d2bfa7fd23a6a3d787c74075d216edeb..998e2e593d06ea07eb6b6459332e70d6d7dd3820 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -14,6 +14,7 @@
 #define SNAPSHOT_H
 
 #include "access/htup.h"
+#include "access/xlogdefs.h"
 #include "lib/pairingheap.h"
 #include "storage/buf.h"
 
@@ -105,6 +106,9 @@ typedef struct SnapshotData
 	uint32		active_count;	/* refcount on ActiveSnapshot stack */
 	uint32		regd_count;		/* refcount on RegisteredSnapshots */
 	pairingheap_node ph_node;	/* link in the RegisteredSnapshots heap */
+
+	int64		whenTaken;		/* timestamp when snapshot was taken */
+	XLogRecPtr	lsn;			/* position in the WAL stream when taken */
 } SnapshotData;
 
 /*
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index ebdcdc8c2ae9d633e78e65e931ade78020e89830..d086163fabcc427dbdf85fec1ca0a2d56658d0db 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -8,6 +8,7 @@ SUBDIRS = \
 		  brin \
 		  commit_ts \
 		  dummy_seclabel \
+		  snapshot_too_old \
 		  test_ddl_deparse \
 		  test_extensions \
 		  test_parser \
diff --git a/src/test/modules/snapshot_too_old/Makefile b/src/test/modules/snapshot_too_old/Makefile
new file mode 100644
index 0000000000000000000000000000000000000000..16339f0366799eec7bb52f3e795189add1151f4f
--- /dev/null
+++ b/src/test/modules/snapshot_too_old/Makefile
@@ -0,0 +1,47 @@
+# src/test/modules/snapshot_too_old/Makefile
+
+EXTRA_CLEAN = ./isolation_output
+
+ISOLATIONCHECKS=sto_using_cursor sto_using_select
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/snapshot_too_old
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
+
+# Disabled because these tests require "old_snapshot_threshold" >= 0, which
+# typical installcheck users do not have (e.g. buildfarm clients).
+installcheck:;
+
+# But it can nonetheless be very helpful to run tests on preexisting
+# installation, allow to do so, but only if requested explicitly.
+installcheck-force: isolationcheck-install-force
+
+check: isolationcheck
+
+submake-isolation:
+	$(MAKE) -C $(top_builddir)/src/test/isolation all
+
+submake-test_snapshot_too_old:
+	$(MAKE) -C $(top_builddir)/src/test/modules/snapshot_too_old
+
+isolationcheck: | submake-isolation submake-test_snapshot_too_old temp-install
+	$(MKDIR_P) isolation_output
+	$(pg_isolation_regress_check) \
+	    --temp-config $(top_srcdir)/src/test/modules/snapshot_too_old/sto.conf \
+	    --outputdir=./isolation_output \
+	    $(ISOLATIONCHECKS)
+
+isolationcheck-install-force: all | submake-isolation submake-test_snapshot_too_old temp-install
+	$(pg_isolation_regress_installcheck) \
+	    $(ISOLATIONCHECKS)
+
+.PHONY: check submake-test_snapshot_too_old isolationcheck isolationcheck-install-force
+
+temp-install: EXTRA_INSTALL=src/test/modules/snapshot_too_old
diff --git a/src/test/modules/snapshot_too_old/expected/sto_using_cursor.out b/src/test/modules/snapshot_too_old/expected/sto_using_cursor.out
new file mode 100644
index 0000000000000000000000000000000000000000..8cc29ec82f2b1f81500e02b7b4feba8ad7d7201e
--- /dev/null
+++ b/src/test/modules/snapshot_too_old/expected/sto_using_cursor.out
@@ -0,0 +1,73 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1decl s1f1 s1sleep s1f2 s2u
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c              
+
+1              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: FETCH FIRST FROM cursor1;
+c              
+
+1              
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+
+starting permutation: s1decl s1f1 s1sleep s2u s1f2
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c              
+
+1              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR:  snapshot too old
+
+starting permutation: s1decl s1f1 s2u s1sleep s1f2
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c              
+
+1              
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR:  snapshot too old
+
+starting permutation: s1decl s2u s1f1 s1sleep s1f2
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f1: FETCH FIRST FROM cursor1;
+c              
+
+1              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR:  snapshot too old
+
+starting permutation: s2u s1decl s1f1 s1sleep s1f2
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c              
+
+2              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR:  snapshot too old
diff --git a/src/test/modules/snapshot_too_old/expected/sto_using_select.out b/src/test/modules/snapshot_too_old/expected/sto_using_select.out
new file mode 100644
index 0000000000000000000000000000000000000000..eb15bc23bf9195fd43262c7ff0319b80e2a497f6
--- /dev/null
+++ b/src/test/modules/snapshot_too_old/expected/sto_using_select.out
@@ -0,0 +1,55 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1f1 s1sleep s1f2 s2u
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c              
+
+1              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c              
+
+1              
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+
+starting permutation: s1f1 s1sleep s2u s1f2
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c              
+
+1              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+ERROR:  snapshot too old
+
+starting permutation: s1f1 s2u s1sleep s1f2
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c              
+
+1              
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+ERROR:  snapshot too old
+
+starting permutation: s2u s1f1 s1sleep s1f2
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c              
+
+2              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+ERROR:  snapshot too old
diff --git a/src/test/modules/snapshot_too_old/specs/sto_using_cursor.spec b/src/test/modules/snapshot_too_old/specs/sto_using_cursor.spec
new file mode 100644
index 0000000000000000000000000000000000000000..eac18ca5b904534dca31040aff3a69b03eb95312
--- /dev/null
+++ b/src/test/modules/snapshot_too_old/specs/sto_using_cursor.spec
@@ -0,0 +1,37 @@
+# This test provokes a "snapshot too old" error using a cursor.
+#
+# The sleep is needed because with a threshold of zero a statement could error
+# on changes it made.  With more normal settings no external delay is needed,
+# but we don't want these tests to run long enough to see that, since
+# granularity is in minutes.
+#
+# Since results depend on the value of old_snapshot_threshold, sneak that into
+# the line generated by the sleep, so that a surprising values isn't so hard
+# to identify.
+
+setup
+{
+    CREATE TABLE sto1 (c int NOT NULL);
+    INSERT INTO sto1 SELECT generate_series(1, 1000);
+    CREATE TABLE sto2 (c int NOT NULL);
+}
+setup
+{
+    VACUUM ANALYZE sto1;
+}
+
+teardown
+{
+    DROP TABLE sto1, sto2;
+}
+
+session "s1"
+setup			{ BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1decl"	{ DECLARE cursor1 CURSOR FOR SELECT c FROM sto1; }
+step "s1f1"		{ FETCH FIRST FROM cursor1; }
+step "s1sleep"	{ SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold'; }
+step "s1f2"		{ FETCH FIRST FROM cursor1; }
+teardown		{ COMMIT; }
+
+session "s2"
+step "s2u"		{ UPDATE sto1 SET c = 1001 WHERE c = 1; }
diff --git a/src/test/modules/snapshot_too_old/specs/sto_using_select.spec b/src/test/modules/snapshot_too_old/specs/sto_using_select.spec
new file mode 100644
index 0000000000000000000000000000000000000000..d7c34f3d89419cf1cc582253d862fc265a914651
--- /dev/null
+++ b/src/test/modules/snapshot_too_old/specs/sto_using_select.spec
@@ -0,0 +1,36 @@
+# This test provokes a "snapshot too old" error using SELECT statements.
+#
+# The sleep is needed because with a threshold of zero a statement could error
+# on changes it made.  With more normal settings no external delay is needed,
+# but we don't want these tests to run long enough to see that, since
+# granularity is in minutes.
+#
+# Since results depend on the value of old_snapshot_threshold, sneak that into
+# the line generated by the sleep, so that a surprising values isn't so hard
+# to identify.
+
+setup
+{
+    CREATE TABLE sto1 (c int NOT NULL);
+    INSERT INTO sto1 SELECT generate_series(1, 1000);
+    CREATE TABLE sto2 (c int NOT NULL);
+}
+setup
+{
+    VACUUM ANALYZE sto1;
+}
+
+teardown
+{
+    DROP TABLE sto1, sto2;
+}
+
+session "s1"
+setup			{ BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1f1"		{ SELECT c FROM sto1 ORDER BY c LIMIT 1; }
+step "s1sleep"	{ SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold'; }
+step "s1f2"		{ SELECT c FROM sto1 ORDER BY c LIMIT 1; }
+teardown		{ COMMIT; }
+
+session "s2"
+step "s2u"		{ UPDATE sto1 SET c = 1001 WHERE c = 1; }
diff --git a/src/test/modules/snapshot_too_old/sto.conf b/src/test/modules/snapshot_too_old/sto.conf
new file mode 100644
index 0000000000000000000000000000000000000000..ce8048f198b3d5b0313fb825cb9885374c1e37c0
--- /dev/null
+++ b/src/test/modules/snapshot_too_old/sto.conf
@@ -0,0 +1,3 @@
+autovacuum = off
+old_snapshot_threshold = 0
+