diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 8505e11437b5d154d8a87b1ce22b946fb615b7f3..0b750e72a7d53408b95a4cf5728f6ac720b65892 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.203 2005/11/22 18:17:06 momjian Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.204 2005/11/26 03:03:07 tgl Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -78,12 +78,17 @@ initscan(HeapScanDesc scan, ScanKey key)
 	 */
 	scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
 
+	scan->rs_inited = false;
 	scan->rs_ctup.t_data = NULL;
+	ItemPointerSetInvalid(&scan->rs_ctup.t_self);
 	scan->rs_cbuf = InvalidBuffer;
+	scan->rs_cblock = InvalidBlockNumber;
 
 	/* we don't have a marked position... */
 	ItemPointerSetInvalid(&(scan->rs_mctid));
 
+	/* page-at-a-time fields are always invalid when not rs_inited */
+
 	/*
 	 * copy the scan key, if appropriate
 	 */
@@ -93,79 +98,128 @@ initscan(HeapScanDesc scan, ScanKey key)
 	pgstat_count_heap_scan(&scan->rs_pgstat_info);
 }
 
-/* ----------------
- *		heapgettup - fetch next heap tuple
- *
- *		routine used by heap_getnext() which does most of the
- *		real work in scanning tuples.
+/*
+ * heapgetpage - subroutine for heapgettup()
  *
- *		The passed-in *buffer must be either InvalidBuffer or the pinned
- *		current page of the scan.  If we have to move to another page,
- *		we will unpin this buffer (if valid).  On return, *buffer is either
- *		InvalidBuffer or the ID of a pinned buffer.
- * ----------------
+ * This routine reads and pins the specified page of the relation.
+ * In page-at-a-time mode it performs additional work, namely determining
+ * which tuples on the page are visible.
  */
 static void
-heapgettup(Relation relation,
-		   int dir,
-		   HeapTuple tuple,
-		   Buffer *buffer,
-		   Snapshot snapshot,
-		   int nkeys,
-		   ScanKey key,
-		   BlockNumber pages)
+heapgetpage(HeapScanDesc scan, BlockNumber page)
 {
-	ItemId		lpp;
+	Buffer		buffer;
+	Snapshot	snapshot;
 	Page		dp;
-	BlockNumber page;
 	int			lines;
+	int			ntup;
 	OffsetNumber lineoff;
-	int			linesleft;
-	ItemPointer tid;
+	ItemId		lpp;
+
+	Assert(page < scan->rs_nblocks);
 
-	tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self);
+	scan->rs_cbuf = ReleaseAndReadBuffer(scan->rs_cbuf,
+										 scan->rs_rd,
+										 page);
+	scan->rs_cblock = page;
+
+	if (!scan->rs_pageatatime)
+		return;
+
+	buffer = scan->rs_cbuf;
+	snapshot = scan->rs_snapshot;
 
 	/*
-	 * debugging stuff
-	 *
-	 * check validity of arguments, here and for other functions too
-	 *
-	 * Note: no locking manipulations needed--this is a local function
+	 * We must hold share lock on the buffer content while examining
+	 * tuple visibility.  Afterwards, however, the tuples we have found
+	 * to be visible are guaranteed good as long as we hold the buffer pin.
 	 */
-#ifdef	HEAPDEBUGALL
-	if (ItemPointerIsValid(tid))
-		elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
-			 RelationGetRelationName(relation), tid, tid->ip_blkid,
-			 tid->ip_posid, dir);
-	else
-		elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
-			 RelationGetRelationName(relation), tid, dir);
-
-	elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
+	LockBuffer(buffer, BUFFER_LOCK_SHARE);
 
-	elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p",
-		 relation->rd_rel->relkind, RelationGetRelationName(relation),
-		 snapshot);
-#endif   /* HEAPDEBUGALL */
+	dp = (Page) BufferGetPage(buffer);
+	lines = PageGetMaxOffsetNumber(dp);
+	ntup = 0;
 
-	if (!ItemPointerIsValid(tid))
+	for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
+		 lineoff <= lines;
+		 lineoff++, lpp++)
 	{
-		Assert(!PointerIsValid(tid));
-		tid = NULL;
+		if (ItemIdIsUsed(lpp))
+		{
+			HeapTupleData loctup;
+			bool		valid;
+
+			loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+			loctup.t_len = ItemIdGetLength(lpp);
+			ItemPointerSet(&(loctup.t_self), page, lineoff);
+
+			valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
+			if (valid)
+				scan->rs_vistuples[ntup++] = lineoff;
+		}
 	}
 
-	tuple->t_tableOid = RelationGetRelid(relation);
+	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
-	/*
-	 * return null immediately if relation is empty
-	 */
-	if (pages == 0)
+	Assert(ntup <= MaxHeapTuplesPerPage);
+	scan->rs_ntuples = ntup;
+}
+
+/* ----------------
+ *		heapgettup - fetch next heap tuple
+ *
+ *		Initialize the scan if not already done; then advance to the next
+ *		tuple as indicated by "dir"; return the next tuple in scan->rs_ctup,
+ *		or set scan->rs_ctup.t_data = NULL if no more tuples.
+ *
+ * dir == 0 means "re-fetch the tuple indicated by scan->rs_ctup".
+ *
+ * Note: the reason nkeys/key are passed separately, even though they are
+ * kept in the scan descriptor, is that the caller may not want us to check
+ * the scankeys.
+ *
+ * Note: when we fall off the end of the scan in either direction, we
+ * reset rs_inited.  This means that a further request with the same
+ * scan direction will restart the scan, which is a bit odd, but a
+ * request with the opposite scan direction will start a fresh scan
+ * in the proper direction.  The latter is required behavior for cursors,
+ * while the former case is generally undefined behavior in Postgres
+ * so we don't care too much.
+ * ----------------
+ */
+static void
+heapgettup(HeapScanDesc scan,
+		   int dir,
+		   int nkeys,
+		   ScanKey key)
+{
+	HeapTuple	tuple = &(scan->rs_ctup);
+	ItemPointer tid = &(tuple->t_self);
+	Snapshot	snapshot = scan->rs_snapshot;
+	BlockNumber	pages = scan->rs_nblocks;
+	BlockNumber page;
+	Page		dp;
+	int			lines;
+	OffsetNumber lineoff;
+	int			linesleft;
+	ItemId		lpp;
+
+	if (!scan->rs_inited)
 	{
-		if (BufferIsValid(*buffer))
-			ReleaseBuffer(*buffer);
-		*buffer = InvalidBuffer;
-		tuple->t_data = NULL;
-		return;
+		/*
+		 * return null immediately if relation is empty
+		 */
+		if (pages == 0)
+		{
+			Assert(!BufferIsValid(scan->rs_cbuf));
+			tuple->t_data = NULL;
+			return;
+		}
+	}
+	else
+	{
+		/* resuming scan from tuple indicated by scan->rs_ctup.t_self */
+		Assert(ItemPointerIsValid(tid));
 	}
 
 	/*
@@ -174,30 +228,26 @@ heapgettup(Relation relation,
 	if (dir == 0)
 	{
 		/*
-		 * ``no movement'' scan direction: refetch same tuple
+		 * ``no movement'' scan direction: refetch prior tuple
 		 */
-		if (tid == NULL)
+		if (!scan->rs_inited)
 		{
-			if (BufferIsValid(*buffer))
-				ReleaseBuffer(*buffer);
-			*buffer = InvalidBuffer;
+			Assert(!BufferIsValid(scan->rs_cbuf));
 			tuple->t_data = NULL;
 			return;
 		}
 
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   ItemPointerGetBlockNumber(tid));
-
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+		page = ItemPointerGetBlockNumber(tid);
+		if (page != scan->rs_cblock)
+			heapgetpage(scan, page);
 
-		dp = (Page) BufferGetPage(*buffer);
+		/* Since the tuple was previously fetched, needn't lock page here */
+		dp = (Page) BufferGetPage(scan->rs_cbuf);
 		lineoff = ItemPointerGetOffsetNumber(tid);
 		lpp = PageGetItemId(dp, lineoff);
 
 		tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
 		tuple->t_len = ItemIdGetLength(lpp);
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
 
 		return;
 	}
@@ -206,28 +256,23 @@ heapgettup(Relation relation,
 		/*
 		 * reverse scan direction
 		 */
-		if (tid == NULL)
-		{
+		if (!scan->rs_inited)
 			page = pages - 1;	/* final page */
-		}
 		else
-		{
 			page = ItemPointerGetBlockNumber(tid);		/* current page */
-		}
-
-		Assert(page < pages);
 
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   page);
+		if (page != scan->rs_cblock)
+			heapgetpage(scan, page);
 
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-		dp = (Page) BufferGetPage(*buffer);
+		dp = (Page) BufferGetPage(scan->rs_cbuf);
 		lines = PageGetMaxOffsetNumber(dp);
-		if (tid == NULL)
+
+		if (!scan->rs_inited)
 		{
 			lineoff = lines;	/* final offnum */
+			scan->rs_inited = true;
 		}
 		else
 		{
@@ -241,10 +286,11 @@ heapgettup(Relation relation,
 		/*
 		 * forward scan direction
 		 */
-		if (tid == NULL)
+		if (!scan->rs_inited)
 		{
 			page = 0;			/* first page */
 			lineoff = FirstOffsetNumber;		/* first offnum */
+			scan->rs_inited = true;
 		}
 		else
 		{
@@ -253,15 +299,12 @@ heapgettup(Relation relation,
 				OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
 		}
 
-		Assert(page < pages);
-
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   page);
+		if (page != scan->rs_cblock)
+			heapgetpage(scan, page);
 
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-		dp = (Page) BufferGetPage(*buffer);
+		dp = (Page) BufferGetPage(scan->rs_cbuf);
 		lines = PageGetMaxOffsetNumber(dp);
 		/* page and lineoff now reference the physically next tid */
 	}
@@ -269,22 +312,21 @@ heapgettup(Relation relation,
 	/* 'dir' is now non-zero */
 
 	/*
-	 * calculate line pointer and number of remaining items to check on this
-	 * page.
+	 * calculate number of remaining items to check on this page
 	 */
-	lpp = PageGetItemId(dp, lineoff);
 	if (dir < 0)
-		linesleft = lineoff - 1;
+		linesleft = lineoff;
 	else
-		linesleft = lines - lineoff;
+		linesleft = lines - lineoff + 1;
 
 	/*
 	 * advance the scan until we find a qualifying tuple or run out of stuff
 	 * to scan
 	 */
+	lpp = PageGetItemId(dp, lineoff);
 	for (;;)
 	{
-		while (linesleft >= 0)
+		while (linesleft > 0)
 		{
 			if (ItemIdIsUsed(lpp))
 			{
@@ -297,11 +339,17 @@ heapgettup(Relation relation,
 				/*
 				 * if current tuple qualifies, return it.
 				 */
-				HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
-								   snapshot, nkeys, key, valid);
+				valid = HeapTupleSatisfiesVisibility(tuple,
+													 snapshot,
+													 scan->rs_cbuf);
+
+				if (valid && key != NULL)
+					HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
+								nkeys, key, valid);
+
 				if (valid)
 				{
-					LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+					LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
 					return;
 				}
 			}
@@ -326,32 +374,31 @@ heapgettup(Relation relation,
 		 * if we get here, it means we've exhausted the items on this page and
 		 * it's time to move to the next.
 		 */
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
 
 		/*
 		 * return NULL if we've exhausted all the pages
 		 */
 		if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
 		{
-			if (BufferIsValid(*buffer))
-				ReleaseBuffer(*buffer);
-			*buffer = InvalidBuffer;
+			if (BufferIsValid(scan->rs_cbuf))
+				ReleaseBuffer(scan->rs_cbuf);
+			scan->rs_cbuf = InvalidBuffer;
+			scan->rs_cblock = InvalidBlockNumber;
 			tuple->t_data = NULL;
+			scan->rs_inited = false;
 			return;
 		}
 
 		page = (dir < 0) ? (page - 1) : (page + 1);
 
-		Assert(page < pages);
+		heapgetpage(scan, page);
 
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   page);
+		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-		dp = (Page) BufferGetPage(*buffer);
+		dp = (Page) BufferGetPage(scan->rs_cbuf);
 		lines = PageGetMaxOffsetNumber((Page) dp);
-		linesleft = lines - 1;
+		linesleft = lines;
 		if (dir < 0)
 		{
 			lineoff = lines;
@@ -365,6 +412,233 @@ heapgettup(Relation relation,
 	}
 }
 
+/* ----------------
+ *		heapgettup_pagemode - fetch next heap tuple in page-at-a-time mode
+ *
+ *		Same API as heapgettup, but used in page-at-a-time mode
+ *
+ * The internal logic is much the same as heapgettup's too, but there are some
+ * differences: we do not take the buffer content lock (that only needs to
+ * happen inside heapgetpage), and we iterate through just the tuples listed
+ * in rs_vistuples[] rather than all tuples on the page.  Notice that
+ * lineindex is 0-based, where the corresponding loop variable lineoff in
+ * heapgettup is 1-based.
+ * ----------------
+ */
+static void
+heapgettup_pagemode(HeapScanDesc scan,
+					int dir,
+					int nkeys,
+					ScanKey key)
+{
+	HeapTuple	tuple = &(scan->rs_ctup);
+	ItemPointer tid = &(tuple->t_self);
+	BlockNumber	pages = scan->rs_nblocks;
+	BlockNumber page;
+	Page		dp;
+	int			lines;
+	int			lineindex;
+	OffsetNumber lineoff;
+	int			linesleft;
+	ItemId		lpp;
+
+	if (!scan->rs_inited)
+	{
+		/*
+		 * return null immediately if relation is empty
+		 */
+		if (pages == 0)
+		{
+			Assert(!BufferIsValid(scan->rs_cbuf));
+			tuple->t_data = NULL;
+			return;
+		}
+	}
+	else
+	{
+		/* resuming scan from tuple indicated by scan->rs_ctup.t_self */
+		Assert(ItemPointerIsValid(tid));
+	}
+
+	/*
+	 * calculate next starting lineindex, given scan direction
+	 */
+	if (dir == 0)
+	{
+		/*
+		 * ``no movement'' scan direction: refetch prior tuple
+		 */
+		if (!scan->rs_inited)
+		{
+			Assert(!BufferIsValid(scan->rs_cbuf));
+			tuple->t_data = NULL;
+			return;
+		}
+
+		page = ItemPointerGetBlockNumber(tid);
+		if (page != scan->rs_cblock)
+			heapgetpage(scan, page);
+
+		/* Since the tuple was previously fetched, needn't lock page here */
+		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		lineoff = ItemPointerGetOffsetNumber(tid);
+		lpp = PageGetItemId(dp, lineoff);
+
+		tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+		tuple->t_len = ItemIdGetLength(lpp);
+
+		/* check that rs_cindex is in sync */
+		Assert(scan->rs_cindex < scan->rs_ntuples);
+		Assert(lineoff == scan->rs_vistuples[scan->rs_cindex]);
+
+		return;
+	}
+	else if (dir < 0)
+	{
+		/*
+		 * reverse scan direction
+		 */
+		if (!scan->rs_inited)
+			page = pages - 1;	/* final page */
+		else
+			page = ItemPointerGetBlockNumber(tid);		/* current page */
+
+		if (page != scan->rs_cblock)
+			heapgetpage(scan, page);
+
+		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		lines = scan->rs_ntuples;
+
+		if (!scan->rs_inited)
+		{
+			lineindex = lines - 1;
+			scan->rs_inited = true;
+		}
+		else
+		{
+			lineindex = scan->rs_cindex - 1;
+		}
+		/* page and lineindex now reference the previous visible tid */
+	}
+	else
+	{
+		/*
+		 * forward scan direction
+		 */
+		if (!scan->rs_inited)
+		{
+			page = 0;			/* first page */
+			lineindex = 0;
+			scan->rs_inited = true;
+		}
+		else
+		{
+			page = ItemPointerGetBlockNumber(tid);		/* current page */
+			lineindex = scan->rs_cindex + 1;
+		}
+
+		if (page != scan->rs_cblock)
+			heapgetpage(scan, page);
+
+		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		lines = scan->rs_ntuples;
+		/* page and lineindex now reference the next visible tid */
+	}
+
+	/* 'dir' is now non-zero */
+
+	/*
+	 * calculate number of remaining items to check on this page
+	 */
+	if (dir < 0)
+		linesleft = lineindex + 1;
+	else
+		linesleft = lines - lineindex;
+
+	/*
+	 * advance the scan until we find a qualifying tuple or run out of stuff
+	 * to scan
+	 */
+	for (;;)
+	{
+		while (linesleft > 0)
+		{
+			lineoff = scan->rs_vistuples[lineindex];
+			lpp = PageGetItemId(dp, lineoff);
+			Assert(ItemIdIsUsed(lpp));
+
+			tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+			tuple->t_len = ItemIdGetLength(lpp);
+			ItemPointerSet(&(tuple->t_self), page, lineoff);
+
+			/*
+			 * if current tuple qualifies, return it.
+			 */
+			if (key != NULL)
+			{
+				bool		valid;
+
+				HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
+							nkeys, key, valid);
+				if (valid)
+				{
+					scan->rs_cindex = lineindex;
+					return;
+				}
+			}
+			else
+			{
+				scan->rs_cindex = lineindex;
+				return;
+			}
+
+			/*
+			 * otherwise move to the next item on the page
+			 */
+			--linesleft;
+			if (dir < 0)
+			{
+				--lineindex;
+			}
+			else
+			{
+				++lineindex;
+			}
+		}
+
+		/*
+		 * if we get here, it means we've exhausted the items on this page and
+		 * it's time to move to the next.
+		 */
+
+		/*
+		 * return NULL if we've exhausted all the pages
+		 */
+		if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
+		{
+			if (BufferIsValid(scan->rs_cbuf))
+				ReleaseBuffer(scan->rs_cbuf);
+			scan->rs_cbuf = InvalidBuffer;
+			scan->rs_cblock = InvalidBlockNumber;
+			tuple->t_data = NULL;
+			scan->rs_inited = false;
+			return;
+		}
+
+		page = (dir < 0) ? (page - 1) : (page + 1);
+
+		heapgetpage(scan, page);
+
+		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		lines = scan->rs_ntuples;
+		linesleft = lines;
+		if (dir < 0)
+			lineindex = lines - 1;
+		else
+			lineindex = 0;
+	}
+}
+
 
 #if defined(DISABLE_COMPLEX_MACRO)
 /*
@@ -642,6 +916,14 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	scan->rs_snapshot = snapshot;
 	scan->rs_nkeys = nkeys;
 
+	/*
+	 * we can use page-at-a-time mode if it's an MVCC-safe snapshot
+	 */
+	scan->rs_pageatatime = IsMVCCSnapshot(snapshot);
+
+	/* we only need to set this up once */
+	scan->rs_ctup.t_tableOid = RelationGetRelid(relation);
+
 	/*
 	 * we do this here instead of in initscan() because heap_rescan also calls
 	 * initscan() and we don't want to allocate memory again
@@ -741,16 +1023,14 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
 	/*
 	 * Note: we depend here on the -1/0/1 encoding of ScanDirection.
 	 */
-	heapgettup(scan->rs_rd,
-			   (int) direction,
-			   &(scan->rs_ctup),
-			   &(scan->rs_cbuf),
-			   scan->rs_snapshot,
-			   scan->rs_nkeys,
-			   scan->rs_key,
-			   scan->rs_nblocks);
+	if (scan->rs_pageatatime)
+		heapgettup_pagemode(scan, (int) direction,
+							scan->rs_nkeys, scan->rs_key);
+	else
+		heapgettup(scan, (int) direction,
+				   scan->rs_nkeys, scan->rs_key);
 
-	if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
+	if (scan->rs_ctup.t_data == NULL)
 	{
 		HEAPDEBUG_2;			/* heap_getnext returning EOS */
 		return NULL;
@@ -760,13 +1040,11 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
 	 * if we get here it means we have a new current scan tuple, so point to
 	 * the proper return buffer and return the tuple.
 	 */
-
 	HEAPDEBUG_3;				/* heap_getnext returning tuple */
 
-	if (scan->rs_ctup.t_data != NULL)
-		pgstat_count_heap_getnext(&scan->rs_pgstat_info);
+	pgstat_count_heap_getnext(&scan->rs_pgstat_info);
 
-	return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
+	return &(scan->rs_ctup);
 }
 
 /*
@@ -903,8 +1181,7 @@ heap_release_fetch(Relation relation,
 	/*
 	 * check time qualification of tuple, then release lock
 	 */
-	HeapTupleSatisfies(tuple, relation, buffer, dp,
-					   snapshot, 0, NULL, valid);
+	valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
 
 	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
@@ -1038,8 +1315,7 @@ heap_get_latest_tid(Relation relation,
 		 * Check time qualification of tuple; if visible, set it as the new
 		 * result candidate.
 		 */
-		HeapTupleSatisfies(&tp, relation, buffer, dp,
-						   snapshot, 0, NULL, valid);
+		valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
 		if (valid)
 			*tid = ctid;
 
@@ -2439,7 +2715,11 @@ heap_markpos(HeapScanDesc scan)
 	/* Note: no locking manipulations needed */
 
 	if (scan->rs_ctup.t_data != NULL)
+	{
 		scan->rs_mctid = scan->rs_ctup.t_self;
+		if (scan->rs_pageatatime)
+			scan->rs_mindex = scan->rs_cindex;
+	}
 	else
 		ItemPointerSetInvalid(&scan->rs_mctid);
 }
@@ -2453,31 +2733,38 @@ heap_restrpos(HeapScanDesc scan)
 {
 	/* XXX no amrestrpos checking that ammarkpos called */
 
-	/* Note: no locking manipulations needed */
-
-	/*
-	 * unpin scan buffers
-	 */
-	if (BufferIsValid(scan->rs_cbuf))
-		ReleaseBuffer(scan->rs_cbuf);
-	scan->rs_cbuf = InvalidBuffer;
-
 	if (!ItemPointerIsValid(&scan->rs_mctid))
 	{
 		scan->rs_ctup.t_data = NULL;
+		/*
+		 * unpin scan buffers
+		 */
+		if (BufferIsValid(scan->rs_cbuf))
+			ReleaseBuffer(scan->rs_cbuf);
+		scan->rs_cbuf = InvalidBuffer;
+		scan->rs_cblock = InvalidBlockNumber;
 	}
 	else
 	{
+		/*
+		 * If we reached end of scan, rs_inited will now be false.  We must
+		 * reset it to true to keep heapgettup from doing the wrong thing.
+		 */
+		scan->rs_inited = true;
 		scan->rs_ctup.t_self = scan->rs_mctid;
-		scan->rs_ctup.t_data = (HeapTupleHeader) 0x1;	/* for heapgettup */
-		heapgettup(scan->rs_rd,
-				   0,
-				   &(scan->rs_ctup),
-				   &(scan->rs_cbuf),
-				   scan->rs_snapshot,
-				   0,
-				   NULL,
-				   scan->rs_nblocks);
+		if (scan->rs_pageatatime)
+		{
+			scan->rs_cindex = scan->rs_mindex;
+			heapgettup_pagemode(scan,
+								0,			/* "no movement" */
+								0,			/* needn't recheck scan keys */
+								NULL);
+		}
+		else
+			heapgettup(scan,
+					   0,					/* "no movement" */
+					   0,					/* needn't recheck scan keys */
+					   NULL);
 	}
 }
 
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 3d4f7d38392a5525a9b790ff2ff3a8733e889d1b..16b0453023f5e9c286ce2fe4f4c24ea7fca4e0de 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -21,7 +21,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/executor/nodeBitmapHeapscan.c,v 1.5 2005/11/25 04:24:48 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/executor/nodeBitmapHeapscan.c,v 1.6 2005/11/26 03:03:07 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -44,6 +44,7 @@
 
 
 static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node);
+static void bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres);
 
 
 /* ----------------------------------------------------------------
@@ -57,7 +58,7 @@ BitmapHeapNext(BitmapHeapScanState *node)
 {
 	EState	   *estate;
 	ExprContext *econtext;
-	HeapScanDesc scandesc;
+	HeapScanDesc scan;
 	Index		scanrelid;
 	TIDBitmap  *tbm;
 	TBMIterateResult *tbmres;
@@ -70,7 +71,7 @@ BitmapHeapNext(BitmapHeapScanState *node)
 	estate = node->ss.ps.state;
 	econtext = node->ss.ps.ps_ExprContext;
 	slot = node->ss.ss_ScanTupleSlot;
-	scandesc = node->ss.ss_currentScanDesc;
+	scan = node->ss.ss_currentScanDesc;
 	scanrelid = ((BitmapHeapScan *) node->ss.ps.plan)->scan.scanrelid;
 	tbm = node->tbm;
 	tbmres = node->tbmres;
@@ -123,6 +124,9 @@ BitmapHeapNext(BitmapHeapScanState *node)
 
 	for (;;)
 	{
+		Page		dp;
+		ItemId		lp;
+
 		/*
 		 * Get next page of results if needed
 		 */
@@ -141,134 +145,199 @@ BitmapHeapNext(BitmapHeapScanState *node)
 			 * AccessShareLock before performing any of the indexscans, but
 			 * let's be safe.)
 			 */
-			if (tbmres->blockno >= scandesc->rs_nblocks)
+			if (tbmres->blockno >= scan->rs_nblocks)
 			{
 				node->tbmres = tbmres = NULL;
 				continue;
 			}
 
 			/*
-			 * Acquire pin on the current heap page.  We'll hold the pin until
-			 * done looking at the page.  We trade in any pin we held before.
+			 * Fetch the current heap page and identify candidate tuples.
 			 */
-			scandesc->rs_cbuf = ReleaseAndReadBuffer(scandesc->rs_cbuf,
-													 scandesc->rs_rd,
-													 tbmres->blockno);
-
-			/*
-			 * Determine how many entries we need to look at on this page. If
-			 * the bitmap is lossy then we need to look at each physical item
-			 * pointer; otherwise we just look through the offsets listed in
-			 * tbmres.
-			 */
-			if (tbmres->ntuples >= 0)
-			{
-				/* non-lossy case */
-				node->minslot = 0;
-				node->maxslot = tbmres->ntuples - 1;
-			}
-			else
-			{
-				/* lossy case */
-				Page		dp;
-
-				LockBuffer(scandesc->rs_cbuf, BUFFER_LOCK_SHARE);
-				dp = (Page) BufferGetPage(scandesc->rs_cbuf);
-
-				node->minslot = FirstOffsetNumber;
-				node->maxslot = PageGetMaxOffsetNumber(dp);
-
-				LockBuffer(scandesc->rs_cbuf, BUFFER_LOCK_UNLOCK);
-			}
+			bitgetpage(scan, tbmres);
 
 			/*
-			 * Set curslot to first slot to examine
+			 * Set rs_cindex to first slot to examine
 			 */
-			node->curslot = node->minslot;
+			scan->rs_cindex = 0;
 		}
 		else
 		{
 			/*
-			 * Continuing in previously obtained page; advance curslot
+			 * Continuing in previously obtained page; advance rs_cindex
 			 */
-			node->curslot++;
+			scan->rs_cindex++;
 		}
 
 		/*
 		 * Out of range?  If so, nothing more to look at on this page
 		 */
-		if (node->curslot < node->minslot || node->curslot > node->maxslot)
+		if (scan->rs_cindex < 0 || scan->rs_cindex >= scan->rs_ntuples)
 		{
 			node->tbmres = tbmres = NULL;
 			continue;
 		}
 
 		/*
-		 * Okay to try to fetch the tuple
+		 * Okay to fetch the tuple
+		 */
+		targoffset = scan->rs_vistuples[scan->rs_cindex];
+		dp = (Page) BufferGetPage(scan->rs_cbuf);
+		lp = PageGetItemId(dp, targoffset);
+		Assert(ItemIdIsUsed(lp));
+
+		scan->rs_ctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
+		scan->rs_ctup.t_len = ItemIdGetLength(lp);
+		ItemPointerSet(&scan->rs_ctup.t_self, tbmres->blockno, targoffset);
+
+		pgstat_count_heap_fetch(&scan->rs_pgstat_info);
+
+		/*
+		 * Set up the result slot to point to this tuple. Note that the
+		 * slot acquires a pin on the buffer.
 		 */
+		ExecStoreTuple(&scan->rs_ctup,
+					   slot,
+					   scan->rs_cbuf,
+					   false);
+
+		/*
+		 * If we are using lossy info, we have to recheck the qual
+		 * conditions at every tuple.
+		 */
+		if (tbmres->ntuples < 0)
+		{
+			econtext->ecxt_scantuple = slot;
+			ResetExprContext(econtext);
+
+			if (!ExecQual(node->bitmapqualorig, econtext, false))
+			{
+				/* Fails recheck, so drop it and loop back for another */
+				ExecClearTuple(slot);
+				continue;
+			}
+		}
+
+		/* OK to return this tuple */
+		return slot;
+	}
+
+	/*
+	 * if we get here it means we are at the end of the scan..
+	 */
+	return ExecClearTuple(slot);
+}
+
+/*
+ * bitgetpage - subroutine for BitmapHeapNext()
+ *
+ * This routine reads and pins the specified page of the relation, then
+ * builds an array indicating which tuples on the page are both potentially
+ * interesting according to the bitmap, and visible according to the snapshot.
+ */
+static void
+bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres)
+{
+	BlockNumber	page = tbmres->blockno;
+	Buffer		buffer;
+	Snapshot	snapshot;
+	Page		dp;
+	int			ntup;
+	int			curslot;
+	int			minslot;
+	int			maxslot;
+	int			maxoff;
+
+	/*
+	 * Acquire pin on the target heap page, trading in any pin we held before.
+	 */
+	Assert(page < scan->rs_nblocks);
+
+	scan->rs_cbuf = ReleaseAndReadBuffer(scan->rs_cbuf,
+										 scan->rs_rd,
+										 page);
+	buffer = scan->rs_cbuf;
+	snapshot = scan->rs_snapshot;
+
+	/*
+	 * We must hold share lock on the buffer content while examining
+	 * tuple visibility.  Afterwards, however, the tuples we have found
+	 * to be visible are guaranteed good as long as we hold the buffer pin.
+	 */
+	LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+	dp = (Page) BufferGetPage(buffer);
+	maxoff = PageGetMaxOffsetNumber(dp);
+
+	/*
+	 * Determine how many entries we need to look at on this page. If
+	 * the bitmap is lossy then we need to look at each physical item
+	 * pointer; otherwise we just look through the offsets listed in
+	 * tbmres.
+	 */
+	if (tbmres->ntuples >= 0)
+	{
+		/* non-lossy case */
+		minslot = 0;
+		maxslot = tbmres->ntuples - 1;
+	}
+	else
+	{
+		/* lossy case */
+		minslot = FirstOffsetNumber;
+		maxslot = maxoff;
+	}
+
+	ntup = 0;
+	for (curslot = minslot; curslot <= maxslot; curslot++)
+	{
+		OffsetNumber targoffset;
+		ItemId		lp;
+		HeapTupleData loctup;
+		bool		valid;
+
 		if (tbmres->ntuples >= 0)
 		{
 			/* non-lossy case */
-			targoffset = tbmres->offsets[node->curslot];
+			targoffset = tbmres->offsets[curslot];
 		}
 		else
 		{
 			/* lossy case */
-			targoffset = (OffsetNumber) node->curslot;
+			targoffset = (OffsetNumber) curslot;
 		}
 
-		ItemPointerSet(&scandesc->rs_ctup.t_self, tbmres->blockno, targoffset);
-
 		/*
-		 * Fetch the heap tuple and see if it matches the snapshot. We use
-		 * heap_release_fetch to avoid useless bufmgr traffic.
+		 * We'd better check for out-of-range offnum in case of VACUUM since
+		 * the TID was obtained.
 		 */
-		if (heap_release_fetch(scandesc->rs_rd,
-							   scandesc->rs_snapshot,
-							   &scandesc->rs_ctup,
-							   &scandesc->rs_cbuf,
-							   true,
-							   &scandesc->rs_pgstat_info))
-		{
-			/*
-			 * Set up the result slot to point to this tuple. Note that the
-			 * slot acquires a pin on the buffer.
-			 */
-			ExecStoreTuple(&scandesc->rs_ctup,
-						   slot,
-						   scandesc->rs_cbuf,
-						   false);
+		if (targoffset < FirstOffsetNumber || targoffset > maxoff)
+			continue;
 
-			/*
-			 * If we are using lossy info, we have to recheck the qual
-			 * conditions at every tuple.
-			 */
-			if (tbmres->ntuples < 0)
-			{
-				econtext->ecxt_scantuple = slot;
-				ResetExprContext(econtext);
-
-				if (!ExecQual(node->bitmapqualorig, econtext, false))
-				{
-					/* Fails recheck, so drop it and loop back for another */
-					ExecClearTuple(slot);
-					continue;
-				}
-			}
+		lp = PageGetItemId(dp, targoffset);
 
-			/* OK to return this tuple */
-			return slot;
-		}
+		/*
+		 * Must check for deleted tuple.
+		 */
+		if (!ItemIdIsUsed(lp))
+			continue;
 
 		/*
-		 * Failed the snap, so loop back and try again.
+		 * check time qualification of tuple, remember it if valid
 		 */
+		loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
+		loctup.t_len = ItemIdGetLength(lp);
+		ItemPointerSet(&(loctup.t_self), page, targoffset);
+
+		valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
+		if (valid)
+			scan->rs_vistuples[ntup++] = targoffset;
 	}
 
-	/*
-	 * if we get here it means we are at the end of the scan..
-	 */
-	return ExecClearTuple(slot);
+	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+	Assert(ntup <= MaxHeapTuplesPerPage);
+	scan->rs_ntuples = ntup;
 }
 
 /* ----------------------------------------------------------------
@@ -403,6 +472,12 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate)
 	Oid			reloid;
 	Relation	currentRelation;
 
+	/*
+	 * Assert caller didn't ask for an unsafe snapshot --- see comments
+	 * at head of file.
+	 */
+	Assert(IsMVCCSnapshot(estate->es_snapshot));
+
 	/*
 	 * create state structure
 	 */
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 88f4078d24b6faafb0d05b11476ad1b0b350da73..c0b7c92cd53dd7ff1ab115091c16fac21a4e4225 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/relscan.h,v 1.41 2005/10/15 02:49:42 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/access/relscan.h,v 1.42 2005/11/26 03:03:07 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -26,14 +26,23 @@ typedef struct HeapScanDescData
 	int			rs_nkeys;		/* number of scan keys */
 	ScanKey		rs_key;			/* array of scan key descriptors */
 	BlockNumber rs_nblocks;		/* number of blocks to scan */
+	bool		rs_pageatatime;	/* verify visibility page-at-a-time? */
 
 	/* scan current state */
+	bool		rs_inited;		/* false = scan not init'd yet */
 	HeapTupleData rs_ctup;		/* current tuple in scan, if any */
+	BlockNumber	rs_cblock;		/* current block # in scan, if any */
 	Buffer		rs_cbuf;		/* current buffer in scan, if any */
 	/* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
 	ItemPointerData rs_mctid;	/* marked scan position, if any */
 
 	PgStat_Info rs_pgstat_info; /* statistics collector hook */
+
+	/* these fields only used in page-at-a-time mode */
+	int			rs_cindex;		/* current tuple's index in vistuples */
+	int			rs_mindex;		/* marked tuple's saved index */
+	int			rs_ntuples;		/* number of visible tuples on page */
+	OffsetNumber rs_vistuples[MaxHeapTuplesPerPage];	/* their offsets */
 } HeapScanDescData;
 
 typedef HeapScanDescData *HeapScanDesc;
diff --git a/src/include/access/valid.h b/src/include/access/valid.h
index 8309c6991782a840f26fa6ea87439d514d171e84..cb9c6369a12b11fbcc6049bf6b63ac5c3321969a 100644
--- a/src/include/access/valid.h
+++ b/src/include/access/valid.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/valid.h,v 1.36 2004/12/31 22:03:21 pgsql Exp $
+ * $PostgreSQL: pgsql/src/include/access/valid.h,v 1.37 2005/11/26 03:03:07 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -65,37 +65,4 @@ do \
 	} \
 } while (0)
 
-/*
- *		HeapTupleSatisfies
- *
- *	res is set TRUE if the HeapTuple satisfies the timequal and keytest,
- *	otherwise it is set FALSE.	Note that the hint bits in the HeapTuple's
- *	t_infomask may be updated as a side effect.
- *
- *	on 8/21/92 mao says:  i rearranged the tests here to do keytest before
- *	SatisfiesTimeQual.	profiling indicated that even for vacuumed relations,
- *	time qual checking was more expensive than key testing.  time qual is
- *	least likely to fail, too.	we should really add the time qual test to
- *	the restriction and optimize it in the normal way.	this has interactions
- *	with joey's expensive function work.
- */
-#define HeapTupleSatisfies(tuple, \
-						   relation, \
-						   buffer, \
-						   disk_page, \
-						   snapshot, \
-						   nKeys, \
-						   key, \
-						   res) \
-do \
-{ \
-	if ((key) != NULL) \
-		HeapKeyTest(tuple, RelationGetDescr(relation), nKeys, key, res); \
-	else \
-		(res) = true; \
- \
-	if ((res) && (relation)->rd_rel->relkind != RELKIND_UNCATALOGED) \
-		(res) = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer); \
-} while (0)
-
 #endif   /* VALID_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index f70847798e662bbcb16e5898de554cdfafa8ba2a..82f182c3497f06dad4cc6514ae926613b4eaa086 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.142 2005/11/25 19:47:50 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.143 2005/11/26 03:03:07 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -958,9 +958,6 @@ typedef struct BitmapIndexScanState
  *		bitmapqualorig	   execution state for bitmapqualorig expressions
  *		tbm				   bitmap obtained from child index scan(s)
  *		tbmres			   current-page data
- *		curslot			   current tbmres index or tuple offset on page
- *		minslot			   lowest tbmres index or tuple offset to try
- *		maxslot			   highest tbmres index or tuple offset to try
  * ----------------
  */
 typedef struct BitmapHeapScanState
@@ -969,9 +966,6 @@ typedef struct BitmapHeapScanState
 	List	   *bitmapqualorig;
 	TIDBitmap  *tbm;
 	TBMIterateResult *tbmres;
-	int			curslot;
-	int			minslot;
-	int			maxslot;
 } BitmapHeapScanState;
 
 /* ----------------
diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h
index bfd51cb72e6e1b5070059aa28eaf5b9ba7bf2079..5e3efa17967cc4039b2dc283d17c8cde1071630d 100644
--- a/src/include/utils/tqual.h
+++ b/src/include/utils/tqual.h
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/utils/tqual.h,v 1.59 2005/10/15 02:49:46 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/utils/tqual.h,v 1.60 2005/11/26 03:03:07 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -55,6 +55,15 @@ typedef SnapshotData *Snapshot;
 
 extern DLLIMPORT Snapshot SnapshotDirty;
 
+/* This macro encodes the knowledge of which snapshots are MVCC-safe */
+#define IsMVCCSnapshot(snapshot)  \
+	((snapshot) != SnapshotNow && \
+	 (snapshot) != SnapshotSelf && \
+	 (snapshot) != SnapshotAny && \
+	 (snapshot) != SnapshotToast && \
+	 (snapshot) != SnapshotDirty)
+
+
 extern DLLIMPORT Snapshot SerializableSnapshot;
 extern DLLIMPORT Snapshot LatestSnapshot;
 extern DLLIMPORT Snapshot ActiveSnapshot;
@@ -69,8 +78,9 @@ extern TransactionId RecentGlobalXmin;
  *		True iff heap tuple satisfies a time qual.
  *
  * Notes:
- *		Assumes heap tuple is valid.
- *		Beware of multiple evaluations of snapshot argument.
+ *	Assumes heap tuple is valid.
+ *	Beware of multiple evaluations of snapshot argument.
+ *	Hint bits in the HeapTuple's t_infomask may be updated as a side effect.
  */
 #define HeapTupleSatisfiesVisibility(tuple, snapshot, buffer) \
 ((snapshot) == SnapshotNow ? \