From fdf9e21196a6f58c6021c967dc5776a16190f295 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 13 Feb 2013 17:46:23 +0200
Subject: [PATCH] Update visibility map in the second phase of vacuum.

There's a high chance that a page becomes all-visible when the second phase
of vacuum removes all the dead tuples on it, so it makes sense to check for
that. Otherwise the visibility map won't get updated until the next vacuum.

Pavan Deolasee, reviewed by Jeff Janes.
---
 src/backend/commands/vacuumlazy.c | 133 +++++++++++++++++++++++++++++-
 1 file changed, 129 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 5ec65ea41be..069dcb35759 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -142,7 +142,7 @@ static void lazy_cleanup_index(Relation indrel,
 				   IndexBulkDeleteResult *stats,
 				   LVRelStats *vacrelstats);
 static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats);
+				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
 static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 						 LVRelStats *vacrelstats);
@@ -151,6 +151,8 @@ static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
 					   ItemPointer itemptr);
 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
 static int	vac_cmp_itemptr(const void *left, const void *right);
+static bool heap_page_is_all_visible(Buffer buf,
+						 TransactionId *visibility_cutoff_xid);
 
 
 /*
@@ -697,6 +699,11 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		hastup = false;
 		prev_dead_count = vacrelstats->num_dead_tuples;
 		maxoff = PageGetMaxOffsetNumber(page);
+
+		/*
+		 * Note: If you change anything in the loop below, also look at
+		 * heap_page_is_all_visible to see if that needs to be changed.
+		 */
 		for (offnum = FirstOffsetNumber;
 			 offnum <= maxoff;
 			 offnum = OffsetNumberNext(offnum))
@@ -881,7 +888,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			vacrelstats->num_dead_tuples > 0)
 		{
 			/* Remove tuples from heap */
-			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats);
+			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
 
 			/*
 			 * Forget the now-vacuumed tuples, and press on, but be careful
@@ -1051,6 +1058,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 	int			tupindex;
 	int			npages;
 	PGRUsage	ru0;
+	Buffer		vmbuffer = InvalidBuffer;
 
 	pg_rusage_init(&ru0);
 	npages = 0;
@@ -1074,7 +1082,8 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 			++tupindex;
 			continue;
 		}
-		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
+		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats,
+									&vmbuffer);
 
 		/* Now that we've compacted the page, record its available space */
 		page = BufferGetPage(buf);
@@ -1085,6 +1094,12 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 		npages++;
 	}
 
+	if (BufferIsValid(vmbuffer))
+	{
+		ReleaseBuffer(vmbuffer);
+		vmbuffer = InvalidBuffer;
+	}
+
 	ereport(elevel,
 			(errmsg("\"%s\": removed %d row versions in %d pages",
 					RelationGetRelationName(onerel),
@@ -1105,11 +1120,12 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
  */
 static int
 lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats)
+				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
 {
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber unused[MaxOffsetNumber];
 	int			uncnt = 0;
+	TransactionId	visibility_cutoff_xid;
 
 	START_CRIT_SECTION();
 
@@ -1130,6 +1146,19 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 
 	PageRepairFragmentation(page);
 
+	/*
+	 * Now that we have removed the dead tuples from the page, once again check
+	 * if the page has become all-visible.
+	 */
+	if (!visibilitymap_test(onerel, blkno, vmbuffer) &&
+		heap_page_is_all_visible(buffer, &visibility_cutoff_xid))
+	{
+		Assert(BufferIsValid(*vmbuffer));
+		PageSetAllVisible(page);
+		visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, *vmbuffer,
+				visibility_cutoff_xid);
+	}
+
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
@@ -1633,3 +1662,99 @@ vac_cmp_itemptr(const void *left, const void *right)
 
 	return 0;
 }
+
+/*
+ * Check if every tuple in the given page is visible to all current and future
+ * transactions. Also return the visibility_cutoff_xid which is the highest
+ * xmin amongst the visible tuples.
+ */
+static bool
+heap_page_is_all_visible(Buffer buf, TransactionId *visibility_cutoff_xid)
+{
+	Page		 page = BufferGetPage(buf);
+	OffsetNumber offnum,
+				 maxoff;
+	bool		 all_visible = true;
+
+	*visibility_cutoff_xid = InvalidTransactionId;
+
+	/*
+	 * This is a stripped down version of the line pointer scan in
+	 * lazy_scan_heap(). So if you change anything here, also check that
+	 * code.
+	 */
+	maxoff = PageGetMaxOffsetNumber(page);
+	for (offnum = FirstOffsetNumber;
+			offnum <= maxoff && all_visible;
+			offnum = OffsetNumberNext(offnum))
+	{
+		ItemId			itemid;
+		HeapTupleData	tuple;
+
+		itemid = PageGetItemId(page, offnum);
+
+		/* Unused or redirect line pointers are of no interest */
+		if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
+			continue;
+
+		ItemPointerSet(&(tuple.t_self), BufferGetBlockNumber(buf), offnum);
+
+		/*
+		 * Dead line pointers can have index pointers pointing to them. So they
+		 * can't be treated as visible
+		 */
+		if (ItemIdIsDead(itemid))
+		{
+			all_visible = false;
+			break;
+		}
+
+		Assert(ItemIdIsNormal(itemid));
+
+		tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+
+		switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
+		{
+			case HEAPTUPLE_LIVE:
+				{
+					TransactionId xmin;
+
+					/* Check comments in lazy_scan_heap. */
+					if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+					{
+						all_visible = false;
+						break;
+					}
+
+					/*
+					 * The inserter definitely committed. But is it old
+					 * enough that everyone sees it as committed?
+					 */
+					xmin = HeapTupleHeaderGetXmin(tuple.t_data);
+					if (!TransactionIdPrecedes(xmin, OldestXmin))
+					{
+						all_visible = false;
+						break;
+					}
+
+					/* Track newest xmin on page. */
+					if (TransactionIdFollows(xmin, *visibility_cutoff_xid))
+						*visibility_cutoff_xid = xmin;
+				}
+				break;
+
+			case HEAPTUPLE_DEAD:
+			case HEAPTUPLE_RECENTLY_DEAD:
+			case HEAPTUPLE_INSERT_IN_PROGRESS:
+			case HEAPTUPLE_DELETE_IN_PROGRESS:
+				all_visible = false;
+				break;
+
+			default:
+				elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+				break;
+		}
+	}						/* scan along page */
+
+	return all_visible;
+}
-- 
GitLab