diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 166fed7233c70470b0807507fa55602c294ab3d2..6ff76b9d9559fa8a2064dcf228289305dd12ecf8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.250 2008/03/04 19:54:06 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.251 2008/03/08 21:57:59 tgl Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -3967,11 +3967,13 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
 	Relation	reln;
 	Buffer		buffer;
 	Page		page;
-	OffsetNumber *offnum;
 	OffsetNumber *end;
+	OffsetNumber *redirected;
+	OffsetNumber *nowdead;
+	OffsetNumber *nowunused;
 	int			nredirected;
 	int			ndead;
-	int			i;
+	int			nunused;
 
 	if (record->xl_info & XLR_BKP_BLOCK_1)
 		return;
@@ -3990,61 +3992,24 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
 
 	nredirected = xlrec->nredirected;
 	ndead = xlrec->ndead;
-	offnum = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
 	end = (OffsetNumber *) ((char *) xlrec + record->xl_len);
-
-	/* Update all redirected or moved line pointers */
-	for (i = 0; i < nredirected; i++)
-	{
-		OffsetNumber fromoff = *offnum++;
-		OffsetNumber tooff = *offnum++;
-		ItemId		fromlp = PageGetItemId(page, fromoff);
-
-		if (clean_move)
-		{
-			/* Physically move the "to" item to the "from" slot */
-			ItemId		tolp = PageGetItemId(page, tooff);
-			HeapTupleHeader htup;
-
-			*fromlp = *tolp;
-			ItemIdSetUnused(tolp);
-
-			/* We also have to clear the tuple's heap-only bit */
-			Assert(ItemIdIsNormal(fromlp));
-			htup = (HeapTupleHeader) PageGetItem(page, fromlp);
-			Assert(HeapTupleHeaderIsHeapOnly(htup));
-			HeapTupleHeaderClearHeapOnly(htup);
-		}
-		else
-		{
-			/* Just insert a REDIRECT link at fromoff */
-			ItemIdSetRedirect(fromlp, tooff);
-		}
-	}
-
-	/* Update all now-dead line pointers */
-	for (i = 0; i < ndead; i++)
-	{
-		OffsetNumber off = *offnum++;
-		ItemId		lp = PageGetItemId(page, off);
-
-		ItemIdSetDead(lp);
-	}
-
-	/* Update all now-unused line pointers */
-	while (offnum < end)
-	{
-		OffsetNumber off = *offnum++;
-		ItemId		lp = PageGetItemId(page, off);
-
-		ItemIdSetUnused(lp);
-	}
+	redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
+	nowdead = redirected + (nredirected * 2);
+	nowunused = nowdead + ndead;
+	nunused = (end - nowunused);
+	Assert(nunused >= 0);
+
+	/* Update all item pointers per the record, and repair fragmentation */
+	heap_page_prune_execute(reln, buffer,
+							redirected, nredirected,
+							nowdead, ndead,
+							nowunused, nunused,
+							clean_move);
 
 	/*
-	 * Finally, repair any fragmentation, and update the page's hint bit about
-	 * whether it has free pointers.
+	 * Note: we don't worry about updating the page's prunability hints.
+	 * At worst this will cause an extra prune cycle to occur soon.
 	 */
-	PageRepairFragmentation(page);
 
 	PageSetLSN(page, lsn);
 	PageSetTLI(page, ThisTimeLineID);
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index a5032e861c3b4452ab0a307c8ea9b09901fd50af..09c55ee76155f266b5aa06ef9475d53071941d3e 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/heap/pruneheap.c,v 1.6 2008/01/01 19:45:46 momjian Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/heap/pruneheap.c,v 1.7 2008/03/08 21:57:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -21,22 +21,32 @@
 #include "utils/inval.h"
 
 
+/* Working data for heap_page_prune and subroutines */
+typedef struct
+{
+	TransactionId new_prune_xid;	/* new prune hint value for page */
+	int			nredirected;		/* numbers of entries in arrays below */
+	int			ndead;
+	int			nunused;
+	/* arrays that accumulate indexes of items to be changed */
+	OffsetNumber redirected[MaxHeapTuplesPerPage * 2];
+	OffsetNumber nowdead[MaxHeapTuplesPerPage];
+	OffsetNumber nowunused[MaxHeapTuplesPerPage];
+	/* marked[i] is TRUE if item i is entered in one of the above arrays */
+	bool		marked[MaxHeapTuplesPerPage + 1];
+} PruneState;
+
 /* Local functions */
 static int heap_prune_chain(Relation relation, Buffer buffer,
 				 OffsetNumber rootoffnum,
 				 TransactionId OldestXmin,
-				 OffsetNumber *redirected, int *nredirected,
-				 OffsetNumber *nowdead, int *ndead,
-				 OffsetNumber *nowunused, int *nunused,
+				 PruneState *prstate,
 				 bool redirect_move);
-static void heap_prune_record_redirect(OffsetNumber *redirected,
-						   int *nredirected,
-						   OffsetNumber offnum,
-						   OffsetNumber rdoffnum);
-static void heap_prune_record_dead(OffsetNumber *nowdead, int *ndead,
-					   OffsetNumber offnum);
-static void heap_prune_record_unused(OffsetNumber *nowunused, int *nunused,
-						 OffsetNumber offnum);
+static void heap_prune_record_prunable(PruneState *prstate, TransactionId xid);
+static void heap_prune_record_redirect(PruneState *prstate,
+						   OffsetNumber offnum, OffsetNumber rdoffnum);
+static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum);
+static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum);
 
 
 /*
@@ -77,9 +87,9 @@ heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
 	 * Checking free space here is questionable since we aren't holding any
 	 * lock on the buffer; in the worst case we could get a bogus answer. It's
 	 * unlikely to be *seriously* wrong, though, since reading either pd_lower
-	 * or pd_upper is probably atomic.	Avoiding taking a lock seems better
-	 * than sometimes getting a wrong answer in what is after all just a
-	 * heuristic estimate.
+	 * or pd_upper is probably atomic.	Avoiding taking a lock seems more
+	 * important than sometimes getting a wrong answer in what is after all
+	 * just a heuristic estimate.
 	 */
 	minfree = RelationGetTargetPageFreeSpace(relation,
 											 HEAP_DEFAULT_FILLFACTOR);
@@ -140,35 +150,22 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber offnum,
 				maxoff;
-	OffsetNumber redirected[MaxHeapTuplesPerPage * 2];
-	OffsetNumber nowdead[MaxHeapTuplesPerPage];
-	OffsetNumber nowunused[MaxHeapTuplesPerPage];
-	int			nredirected = 0;
-	int			ndead = 0;
-	int			nunused = 0;
-	bool		page_was_full = false;
-	TransactionId save_prune_xid;
-
-	START_CRIT_SECTION();
-
-	/*
-	 * Save the current pd_prune_xid and mark the page as clear of prunable
-	 * tuples. If we find a tuple which may soon become prunable, we shall set
-	 * the hint again.
-	 */
-	save_prune_xid = ((PageHeader) page)->pd_prune_xid;
-	PageClearPrunable(page);
+	PruneState	prstate;
 
 	/*
-	 * Also clear the "page is full" flag if it is set, since there's no point
-	 * in repeating the prune/defrag process until something else happens to
-	 * the page.
+	 * Our strategy is to scan the page and make lists of items to change,
+	 * then apply the changes within a critical section.  This keeps as
+	 * much logic as possible out of the critical section, and also ensures
+	 * that WAL replay will work the same as the normal case.
+	 *
+	 * First, initialize the new pd_prune_xid value to zero (indicating no
+	 * prunable tuples).  If we find any tuples which may soon become
+	 * prunable, we will save the lowest relevant XID in new_prune_xid.
+	 * Also initialize the rest of our working state.
 	 */
-	if (PageIsFull(page))
-	{
-		PageClearFull(page);
-		page_was_full = true;
-	}
+	prstate.new_prune_xid = InvalidTransactionId;
+	prstate.nredirected = prstate.ndead = prstate.nunused = 0;
+	memset(prstate.marked, 0, sizeof(prstate.marked));
 
 	/* Scan the page */
 	maxoff = PageGetMaxOffsetNumber(page);
@@ -176,29 +173,53 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 		 offnum <= maxoff;
 		 offnum = OffsetNumberNext(offnum))
 	{
-		ItemId		itemid = PageGetItemId(page, offnum);
+		ItemId		itemid;
+
+		/* Ignore items already processed as part of an earlier chain */
+		if (prstate.marked[offnum])
+			continue;
 
 		/* Nothing to do if slot is empty or already dead */
+		itemid = PageGetItemId(page, offnum);
 		if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid))
 			continue;
 
 		/* Process this item or chain of items */
 		ndeleted += heap_prune_chain(relation, buffer, offnum,
 									 OldestXmin,
-									 redirected, &nredirected,
-									 nowdead, &ndead,
-									 nowunused, &nunused,
+									 &prstate,
 									 redirect_move);
 	}
 
-	/* Have we pruned any items? */
-	if (nredirected > 0 || ndead > 0 || nunused > 0)
+	/* Any error while applying the changes is critical */
+	START_CRIT_SECTION();
+
+	/* Have we found any prunable items? */
+	if (prstate.nredirected > 0 || prstate.ndead > 0 || prstate.nunused > 0)
 	{
 		/*
-		 * Repair page fragmentation, and update the page's hint bit about
-		 * whether it has free line pointers.
+		 * Apply the planned item changes, then repair page fragmentation,
+		 * and update the page's hint bit about whether it has free line
+		 * pointers.
+		 */
+		heap_page_prune_execute(relation, buffer,
+								prstate.redirected, prstate.nredirected,
+								prstate.nowdead, prstate.ndead,
+								prstate.nowunused, prstate.nunused,
+								redirect_move);
+
+		/*
+		 * Update the page's pd_prune_xid field to either zero, or the lowest
+		 * XID of any soon-prunable tuple.
 		 */
-		PageRepairFragmentation((Page) page);
+		((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
+
+		/*
+		 * Also clear the "page is full" flag, since there's no point in
+		 * repeating the prune/defrag process until something else happens to
+		 * the page.
+		 */
+		PageClearFull(page);
 
 		MarkBufferDirty(buffer);
 
@@ -210,24 +231,33 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 			XLogRecPtr	recptr;
 
 			recptr = log_heap_clean(relation, buffer,
-									redirected, nredirected,
-									nowdead, ndead,
-									nowunused, nunused,
+									prstate.redirected, prstate.nredirected,
+									prstate.nowdead, prstate.ndead,
+									prstate.nowunused, prstate.nunused,
 									redirect_move);
-			PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
+
 			PageSetLSN(BufferGetPage(buffer), recptr);
+			PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
 		}
 	}
 	else
 	{
 		/*
-		 * If we didn't prune anything, but have updated either the
-		 * pd_prune_xid field or the "page is full" flag, mark the buffer
-		 * dirty.  This is treated as a non-WAL-logged hint.
+		 * If we didn't prune anything, but have found a new value for the
+		 * pd_prune_xid field, update it and mark the buffer dirty.
+		 * This is treated as a non-WAL-logged hint.
+		 *
+		 * Also clear the "page is full" flag if it is set, since there's no
+		 * point in repeating the prune/defrag process until something else
+		 * happens to the page.
 		 */
-		if (((PageHeader) page)->pd_prune_xid != save_prune_xid ||
-			page_was_full)
+		if (((PageHeader) page)->pd_prune_xid != prstate.new_prune_xid ||
+			PageIsFull(page))
+		{
+			((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
+			PageClearFull(page);
 			SetBufferCommitInfoNeedsSave(buffer);
+		}
 	}
 
 	END_CRIT_SECTION();
@@ -237,8 +267,8 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 	 * ndeleted minus ndead, because we don't want to count a now-DEAD root
 	 * item as a deletion for this purpose.
 	 */
-	if (report_stats && ndeleted > ndead)
-		pgstat_update_heap_dead_tuples(relation, ndeleted - ndead);
+	if (report_stats && ndeleted > prstate.ndead)
+		pgstat_update_heap_dead_tuples(relation, ndeleted - prstate.ndead);
 
 	/*
 	 * XXX Should we update the FSM information of this page ?
@@ -279,21 +309,22 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
  *
  * OldestXmin is the cutoff XID used to identify dead tuples.
  *
- * Redirected items are added to the redirected[] array (two entries per
- * redirection); items set to LP_DEAD state are added to nowdead[]; and
- * items set to LP_UNUSED state are added to nowunused[].  (These arrays
- * will be used to generate a WAL record after all chains are pruned.)
+ * We don't actually change the page here, except perhaps for hint-bit updates
+ * caused by HeapTupleSatisfiesVacuum.  We just add entries to the arrays in
+ * prstate showing the changes to be made.  Items to be redirected are added
+ * to the redirected[] array (two entries per redirection); items to be set to
+ * LP_DEAD state are added to nowdead[]; and items to be set to LP_UNUSED
+ * state are added to nowunused[].
  *
- * If redirect_move is true, we get rid of redirecting line pointers.
+ * If redirect_move is true, we intend to get rid of redirecting line pointers,
+ * not just make redirection entries.
  *
- * Returns the number of tuples deleted from the page.
+ * Returns the number of tuples (to be) deleted from the page.
  */
 static int
 heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 				 TransactionId OldestXmin,
-				 OffsetNumber *redirected, int *nredirected,
-				 OffsetNumber *nowdead, int *ndead,
-				 OffsetNumber *nowunused, int *nunused,
+				 PruneState *prstate,
 				 bool redirect_move)
 {
 	int			ndeleted = 0;
@@ -302,6 +333,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 	ItemId		rootlp;
 	HeapTupleHeader htup;
 	OffsetNumber latestdead = InvalidOffsetNumber,
+				redirect_target = InvalidOffsetNumber,
 				maxoff = PageGetMaxOffsetNumber(dp),
 				offnum;
 	OffsetNumber chainitems[MaxHeapTuplesPerPage];
@@ -327,7 +359,10 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 			 * XMIN_INVALID heap-only tuples.  Those might not be linked to by
 			 * any chain, since the parent tuple might be re-updated before
 			 * any pruning occurs.	So we have to be able to reap them
-			 * separately from chain-pruning.
+			 * separately from chain-pruning.  (Note that
+			 * HeapTupleHeaderIsHotUpdated will never return true for an
+			 * XMIN_INVALID tuple, so this code will work even when there were
+			 * sequential updates within the aborted transaction.)
 			 *
 			 * Note that we might first arrive at a dead heap-only tuple
 			 * either here or while following a chain below.  Whichever path
@@ -336,8 +371,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 			if (HeapTupleSatisfiesVacuum(htup, OldestXmin, buffer)
 				== HEAPTUPLE_DEAD && !HeapTupleHeaderIsHotUpdated(htup))
 			{
-				ItemIdSetUnused(rootlp);
-				heap_prune_record_unused(nowunused, nunused, rootoffnum);
+				heap_prune_record_unused(prstate, rootoffnum);
 				ndeleted++;
 			}
 
@@ -360,8 +394,13 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		if (offnum < FirstOffsetNumber || offnum > maxoff)
 			break;
 
+		/* If item is already processed, stop --- it must not be same chain */
+		if (prstate->marked[offnum])
+			break;
+
 		lp = PageGetItemId(dp, offnum);
 
+		/* Unused item obviously isn't part of the chain */
 		if (!ItemIdIsUsed(lp))
 			break;
 
@@ -420,7 +459,8 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 				 * This tuple may soon become DEAD.  Update the hint field so
 				 * that the page is reconsidered for pruning in future.
 				 */
-				PageSetPrunable(dp, HeapTupleHeaderGetXmax(htup));
+				heap_prune_record_prunable(prstate,
+										   HeapTupleHeaderGetXmax(htup));
 				break;
 
 			case HEAPTUPLE_DELETE_IN_PROGRESS:
@@ -429,7 +469,8 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 				 * This tuple may soon become DEAD.  Update the hint field so
 				 * that the page is reconsidered for pruning in future.
 				 */
-				PageSetPrunable(dp, HeapTupleHeaderGetXmax(htup));
+				heap_prune_record_prunable(prstate,
+										   HeapTupleHeaderGetXmax(htup));
 				break;
 
 			case HEAPTUPLE_LIVE:
@@ -492,10 +533,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		 */
 		for (i = 1; (i < nchain) && (chainitems[i - 1] != latestdead); i++)
 		{
-			ItemId		lp = PageGetItemId(dp, chainitems[i]);
-
-			ItemIdSetUnused(lp);
-			heap_prune_record_unused(nowunused, nunused, chainitems[i]);
+			heap_prune_record_unused(prstate, chainitems[i]);
 			ndeleted++;
 		}
 
@@ -513,16 +551,13 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		 * redirect the root to the correct chain member.
 		 */
 		if (i >= nchain)
-		{
-			ItemIdSetDead(rootlp);
-			heap_prune_record_dead(nowdead, ndead, rootoffnum);
-		}
+			heap_prune_record_dead(prstate, rootoffnum);
 		else
 		{
-			ItemIdSetRedirect(rootlp, chainitems[i]);
-			heap_prune_record_redirect(redirected, nredirected,
-									   rootoffnum,
-									   chainitems[i]);
+			heap_prune_record_redirect(prstate, rootoffnum, chainitems[i]);
+			/* If the redirection will be a move, need more processing */
+			if (redirect_move)
+				redirect_target = chainitems[i];
 		}
 	}
 	else if (nchain < 2 && ItemIdIsRedirected(rootlp))
@@ -534,19 +569,38 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		 * redirect item.  We can clean up by setting the redirect item to
 		 * DEAD state.
 		 */
-		ItemIdSetDead(rootlp);
-		heap_prune_record_dead(nowdead, ndead, rootoffnum);
+		heap_prune_record_dead(prstate, rootoffnum);
+	}
+	else if (redirect_move && ItemIdIsRedirected(rootlp))
+	{
+		/*
+		 * If we desire to eliminate LP_REDIRECT items by moving tuples,
+		 * make a redirection entry for each redirected root item; this
+		 * will cause heap_page_prune_execute to actually do the move.
+		 * (We get here only when there are no DEAD tuples in the chain;
+		 * otherwise the redirection entry was made above.)
+		 */
+		heap_prune_record_redirect(prstate, rootoffnum, chainitems[1]);
+		redirect_target = chainitems[1];
 	}
 
 	/*
-	 * If requested, eliminate LP_REDIRECT items by moving tuples.	Note that
-	 * if the root item is LP_REDIRECT and doesn't point to a valid follow-on
-	 * item, we already killed it above.
+	 * If we are going to implement a redirect by moving tuples, we have
+	 * to issue a cache invalidation against the redirection target tuple,
+	 * because its CTID will be effectively changed by the move.  It is
+	 * safe to do this now, even though we might fail before reaching the
+	 * actual page update, because a useless cache invalidation doesn't
+	 * hurt anything.  Furthermore we really really don't want to issue
+	 * the inval inside the critical section, since it has a nonzero
+	 * chance of causing an error (eg, due to running out of memory).
+	 *
+	 * XXX this is not really right because CacheInvalidateHeapTuple
+	 * expects transactional semantics, and our move isn't transactional.
+	 * FIXME!!
 	 */
-	if (redirect_move && ItemIdIsRedirected(rootlp))
+	if (OffsetNumberIsValid(redirect_target))
 	{
-		OffsetNumber firstoffnum = ItemIdGetRedirect(rootlp);
-		ItemId		firstlp = PageGetItemId(dp, firstoffnum);
+		ItemId		firstlp = PageGetItemId(dp, redirect_target);
 		HeapTupleData firsttup;
 
 		Assert(ItemIdIsNormal(firstlp));
@@ -555,72 +609,145 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		firsttup.t_len = ItemIdGetLength(firstlp);
 		ItemPointerSet(&firsttup.t_self,
 					   BufferGetBlockNumber(buffer),
-					   firstoffnum);
+					   redirect_target);
 		firsttup.t_tableOid = RelationGetRelid(relation);
-
-		/*
-		 * Mark the tuple for invalidation.  Needed because we're changing its
-		 * CTID.
-		 */
 		CacheInvalidateHeapTuple(relation, &firsttup);
-
-		/*
-		 * Change heap-only status of the tuple because after the line pointer
-		 * manipulation, it's no longer a heap-only tuple, but is directly
-		 * pointed to by index entries.
-		 */
-		Assert(HeapTupleIsHeapOnly(&firsttup));
-		HeapTupleClearHeapOnly(&firsttup);
-
-		/* Now move the item pointer */
-		*rootlp = *firstlp;
-		ItemIdSetUnused(firstlp);
-
-		/*
-		 * If latestdead is valid, we have already recorded the redirection
-		 * above.  Otherwise, do it now.
-		 *
-		 * We don't record firstlp in the nowunused[] array, since the
-		 * redirection entry is enough to tell heap_xlog_clean what to do.
-		 */
-		if (!OffsetNumberIsValid(latestdead))
-			heap_prune_record_redirect(redirected, nredirected, rootoffnum,
-									   firstoffnum);
 	}
 
 	return ndeleted;
 }
 
+/* Record lowest soon-prunable XID */
+static void
+heap_prune_record_prunable(PruneState *prstate, TransactionId xid)
+{
+	/*
+	 * This should exactly match the PageSetPrunable macro.  We can't store
+	 * directly into the page header yet, so we update working state.
+	 */
+	Assert(TransactionIdIsNormal(xid));
+	if (!TransactionIdIsValid(prstate->new_prune_xid) ||
+		TransactionIdPrecedes(xid, prstate->new_prune_xid))
+		prstate->new_prune_xid = xid;
+}
 
-/* Record newly-redirected item pointer */
+/* Record item pointer to be redirected */
 static void
-heap_prune_record_redirect(OffsetNumber *redirected, int *nredirected,
+heap_prune_record_redirect(PruneState *prstate,
 						   OffsetNumber offnum, OffsetNumber rdoffnum)
 {
-	Assert(*nredirected < MaxHeapTuplesPerPage);
-	redirected[*nredirected * 2] = offnum;
-	redirected[*nredirected * 2 + 1] = rdoffnum;
-	(*nredirected)++;
+	Assert(prstate->nredirected < MaxHeapTuplesPerPage);
+	prstate->redirected[prstate->nredirected * 2] = offnum;
+	prstate->redirected[prstate->nredirected * 2 + 1] = rdoffnum;
+	prstate->nredirected++;
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+	Assert(!prstate->marked[rdoffnum]);
+	prstate->marked[rdoffnum] = true;
 }
 
-/* Record newly-dead item pointer */
+/* Record item pointer to be marked dead */
 static void
-heap_prune_record_dead(OffsetNumber *nowdead, int *ndead,
-					   OffsetNumber offnum)
+heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum)
 {
-	Assert(*ndead < MaxHeapTuplesPerPage);
-	nowdead[*ndead] = offnum;
-	(*ndead)++;
+	Assert(prstate->ndead < MaxHeapTuplesPerPage);
+	prstate->nowdead[prstate->ndead] = offnum;
+	prstate->ndead++;
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
 }
 
-/* Record newly-unused item pointer */
+/* Record item pointer to be marked unused */
 static void
-heap_prune_record_unused(OffsetNumber *nowunused, int *nunused,
-						 OffsetNumber offnum)
+heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum)
 {
-	Assert(*nunused < MaxHeapTuplesPerPage);
-	nowunused[*nunused] = offnum;
-	(*nunused)++;
+	Assert(prstate->nunused < MaxHeapTuplesPerPage);
+	prstate->nowunused[prstate->nunused] = offnum;
+	prstate->nunused++;
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+}
+
+
+/*
+ * Perform the actual page changes needed by heap_page_prune.
+ * It is expected that the caller has suitable pin and lock on the
+ * buffer, and is inside a critical section.
+ *
+ * This is split out because it is also used by heap_xlog_clean()
+ * to replay the WAL record when needed after a crash.  Note that the
+ * arguments are identical to those of log_heap_clean().
+ */
+void
+heap_page_prune_execute(Relation reln, Buffer buffer,
+						OffsetNumber *redirected, int nredirected,
+						OffsetNumber *nowdead, int ndead,
+						OffsetNumber *nowunused, int nunused,
+						bool redirect_move)
+{
+	Page		page = (Page) BufferGetPage(buffer);
+	OffsetNumber *offnum;
+	int			i;
+
+	/* Update all redirected or moved line pointers */
+	offnum = redirected;
+	for (i = 0; i < nredirected; i++)
+	{
+		OffsetNumber fromoff = *offnum++;
+		OffsetNumber tooff = *offnum++;
+		ItemId		fromlp = PageGetItemId(page, fromoff);
+
+		if (redirect_move)
+		{
+			/* Physically move the "to" item to the "from" slot */
+			ItemId		tolp = PageGetItemId(page, tooff);
+			HeapTupleHeader htup;
+
+			*fromlp = *tolp;
+			ItemIdSetUnused(tolp);
+
+			/*
+			 * Change heap-only status of the tuple because after the line
+			 * pointer manipulation, it's no longer a heap-only tuple, but is
+			 * directly pointed to by index entries.
+			 */
+			Assert(ItemIdIsNormal(fromlp));
+			htup = (HeapTupleHeader) PageGetItem(page, fromlp);
+			Assert(HeapTupleHeaderIsHeapOnly(htup));
+			HeapTupleHeaderClearHeapOnly(htup);
+		}
+		else
+		{
+			/* Just insert a REDIRECT link at fromoff */
+			ItemIdSetRedirect(fromlp, tooff);
+		}
+	}
+
+	/* Update all now-dead line pointers */
+	offnum = nowdead;
+	for (i = 0; i < ndead; i++)
+	{
+		OffsetNumber off = *offnum++;
+		ItemId		lp = PageGetItemId(page, off);
+
+		ItemIdSetDead(lp);
+	}
+
+	/* Update all now-unused line pointers */
+	offnum = nowunused;
+	for (i = 0; i < nunused; i++)
+	{
+		OffsetNumber off = *offnum++;
+		ItemId		lp = PageGetItemId(page, off);
+
+		ItemIdSetUnused(lp);
+	}
+
+	/*
+	 * Finally, repair any fragmentation, and update the page's hint bit about
+	 * whether it has free pointers.
+	 */
+	PageRepairFragmentation(page);
 }
 
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 78eca9533f773eb557caf36198937c0083ae5eb4..a6dda552f2e9efd85251bf4129932f11bb4575de 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.130 2008/01/14 01:39:09 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.131 2008/03/08 21:57:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -258,6 +258,11 @@ extern void heap_page_prune_opt(Relation relation, Buffer buffer,
 extern int heap_page_prune(Relation relation, Buffer buffer,
 				TransactionId OldestXmin,
 				bool redirect_move, bool report_stats);
+extern void heap_page_prune_execute(Relation reln, Buffer buffer,
+						OffsetNumber *redirected, int nredirected,
+						OffsetNumber *nowdead, int ndead,
+						OffsetNumber *nowunused, int nunused,
+						bool redirect_move);
 extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
 
 /* in heap/syncscan.c */