From 0eab92c0e674815d3b33868c14886845b68d7cfd Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Fri, 29 Jun 2001 20:14:27 +0000
Subject: [PATCH] Fix VACUUM so that it can use pages as move targets even if
 they do not have any newly-dead tuples on them.  This is a longstanding
 deficiency that prevents VACUUM from compacting a file as much as one would
 expect. Change requires fixing repair_frag to not assume that fraged_pages is
 a subset of vacuum_pages. Also make some further cleanups of places that
 assumed page numbers fit in int and tuple counts fit in uint32.

---
 src/backend/commands/vacuum.c | 376 +++++++++++++++++++---------------
 1 file changed, 212 insertions(+), 164 deletions(-)

diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index a5d6778b93f..9c5c8d9085d 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.199 2001/06/29 16:34:30 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.200 2001/06/29 20:14:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -71,18 +71,18 @@ typedef struct VacPageData
 	Size		free;			/* FreeSpace on this Page */
 	uint16		offsets_used;	/* Number of OffNums used by vacuum */
 	uint16		offsets_free;	/* Number of OffNums free or to be free */
-	OffsetNumber offsets[1];	/* Array of its OffNums */
+	OffsetNumber offsets[1];	/* Array of free OffNums */
 } VacPageData;
 
 typedef VacPageData *VacPage;
 
 typedef struct VacPageListData
 {
-	int			empty_end_pages;/* Number of "empty" end-pages */
-	int			num_pages;		/* Number of pages in pagedesc */
+	BlockNumber	empty_end_pages;	/* Number of "empty" end-pages */
+	int			num_pages;			/* Number of pages in pagedesc */
 	int			num_allocated_pages;	/* Number of allocated pages in
 										 * pagedesc */
-	VacPage    *pagedesc;		/* Descriptions of pages */
+	VacPage    *pagedesc;			/* Descriptions of pages */
 } VacPageListData;
 
 typedef VacPageListData *VacPageList;
@@ -146,7 +146,7 @@ static void vacuum_index(VacPageList vacpagelist, Relation indrel,
 						 double num_tuples, int keep_tuples);
 static void scan_index(Relation indrel, double num_tuples);
 static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist);
-static void reap_page(VacPageList vacpagelist, VacPage vacpage);
+static VacPage copy_vac_page(VacPage vacpage);
 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
 static void get_indices(Relation relation, int *nindices, Relation **Irel);
 static void close_indices(int nindices, Relation *Irel);
@@ -610,10 +610,10 @@ vacuum_rel(Oid relid)
 /*
  *	scan_heap() -- scan an open heap relation
  *
- *		This routine sets commit times, constructs vacuum_pages list of
- *		empty/uninitialized pages and pages with dead tuples and
- *		~LP_USED line pointers, constructs fraged_pages list of pages
- *		appropriate for purposes of shrinking and maintains statistics
+ *		This routine sets commit status bits, constructs vacuum_pages (list
+ *		of pages we need to compact free space on and/or clean indexes of
+ *		deleted tuples), constructs fraged_pages (list of pages with free
+ *		space that tuples could be moved into), and calculates statistics
  *		on the number of live tuples in a heap.
  */
 static void
@@ -625,8 +625,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 	ItemId		itemid;
 	Buffer		buf;
 	HeapTupleData tuple;
-	Page		page,
-				tempPage = NULL;
 	OffsetNumber offnum,
 				maxoff;
 	bool		pgchanged,
@@ -634,21 +632,21 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 				notup;
 	char	   *relname;
 	VacPage		vacpage,
-				vp;
-	double		num_tuples;
-	uint32		tups_vacuumed,
-				nkeep,
-				nunused,
-				ncrash,
-				empty_pages,
+				vacpagecopy;
+	BlockNumber	empty_pages,
 				new_pages,
 				changed_pages,
 				empty_end_pages;
-	Size		free_size,
+	double		num_tuples,
+				tups_vacuumed,
+				nkeep,
+				nunused,
+				ncrash;
+	double		free_size,
 				usable_free_size;
 	Size		min_tlen = MaxTupleSize;
 	Size		max_tlen = 0;
-	int32		i;
+	int			i;
 	bool		do_shrinking = true;
 	VTupleLink	vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
 	int			num_vtlinks = 0;
@@ -660,21 +658,30 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 	relname = RelationGetRelationName(onerel);
 	elog(MESSAGE_LEVEL, "--Relation %s--", relname);
 
-	tups_vacuumed = nkeep = nunused = ncrash = empty_pages =
-		new_pages = changed_pages = empty_end_pages = 0;
-	num_tuples = 0;
-	free_size = usable_free_size = 0;
+	empty_pages = new_pages = changed_pages = empty_end_pages = 0;
+	num_tuples = tups_vacuumed = nkeep = nunused = ncrash = 0;
+	free_size = 0;
 
 	nblocks = RelationGetNumberOfBlocks(onerel);
 
+	/*
+	 * We initially create each VacPage item in a maximal-sized workspace,
+	 * then copy the workspace into a just-large-enough copy.
+	 */
 	vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
-	vacpage->offsets_used = 0;
 
 	for (blkno = 0; blkno < nblocks; blkno++)
 	{
+		Page		page,
+					tempPage = NULL;
+		bool		do_reap,
+					do_frag;
+
 		buf = ReadBuffer(onerel, blkno);
 		page = BufferGetPage(buf);
+
 		vacpage->blkno = blkno;
+		vacpage->offsets_used = 0;
 		vacpage->offsets_free = 0;
 
 		if (PageIsNew(page))
@@ -686,7 +693,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 			free_size += (vacpage->free - sizeof(ItemIdData));
 			new_pages++;
 			empty_end_pages++;
-			reap_page(vacuum_pages, vacpage);
+			vacpagecopy = copy_vac_page(vacpage);
+			vpage_insert(vacuum_pages, vacpagecopy);
+			vpage_insert(fraged_pages, vacpagecopy);
 			WriteBuffer(buf);
 			continue;
 		}
@@ -697,7 +706,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 			free_size += (vacpage->free - sizeof(ItemIdData));
 			empty_pages++;
 			empty_end_pages++;
-			reap_page(vacuum_pages, vacpage);
+			vacpagecopy = copy_vac_page(vacpage);
+			vpage_insert(vacuum_pages, vacpagecopy);
+			vpage_insert(fraged_pages, vacpagecopy);
 			ReleaseBuffer(buf);
 			continue;
 		}
@@ -718,7 +729,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 			if (!ItemIdIsUsed(itemid))
 			{
 				vacpage->offsets[vacpage->offsets_free++] = offnum;
-				nunused++;
+				nunused += 1;
 				continue;
 			}
 
@@ -777,7 +788,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 						 * Not Aborted, Not Committed, Not in Progress -
 						 * so it's from crashed process. - vadim 11/26/96
 						 */
-						ncrash++;
+						ncrash += 1;
 						tupgone = true;
 					}
 					else
@@ -852,7 +863,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 					tuple.t_data->t_xmax >= XmaxRecent)
 				{
 					tupgone = false;
-					nkeep++;
+					nkeep += 1;
 					if (!(tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED))
 					{
 						tuple.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
@@ -909,7 +920,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 
 					pageSize = PageGetPageSize(page);
 					tempPage = (Page) palloc(pageSize);
-					memmove(tempPage, page, pageSize);
+					memcpy(tempPage, page, pageSize);
 				}
 
 				/* mark it unused on the temp page */
@@ -917,7 +928,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 				lpp->lp_flags &= ~LP_USED;
 
 				vacpage->offsets[vacpage->offsets_free++] = offnum;
-				tups_vacuumed++;
+				tups_vacuumed += 1;
 			}
 			else
 			{
@@ -931,33 +942,51 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 		}
 
 		if (tempPage != (Page) NULL)
-		{						/* Some tuples are gone */
+		{
+			/* Some tuples are removable; figure free space after removal */
 			PageRepairFragmentation(tempPage, NULL);
 			vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
-			free_size += vacpage->free;
-			reap_page(vacuum_pages, vacpage);
 			pfree(tempPage);
-			tempPage = (Page) NULL;
+			do_reap = true;
 		}
-		else if (vacpage->offsets_free > 0)
-		{						/* there are only ~LP_USED line pointers */
+		else
+		{
+			/* Just use current available space */
 			vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
-			free_size += vacpage->free;
-			reap_page(vacuum_pages, vacpage);
+			/* Need to reap the page if it has ~LP_USED line pointers */
+			do_reap = (vacpage->offsets_free > 0);
 		}
 
-		if (pgchanged)
+		free_size += vacpage->free;
+		/*
+		 * Add the page to fraged_pages if it has a useful amount of free
+		 * space.  "Useful" means enough for a minimal-sized tuple.
+		 * But we don't know that accurately near the start of the relation,
+		 * so add pages unconditionally if they have >= BLCKSZ/10 free space.
+		 */
+		do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ/10);
+
+		if (do_reap || do_frag)
 		{
-			WriteBuffer(buf);
-			changed_pages++;
+			vacpagecopy = copy_vac_page(vacpage);
+			if (do_reap)
+				vpage_insert(vacuum_pages, vacpagecopy);
+			if (do_frag)
+				vpage_insert(fraged_pages, vacpagecopy);
 		}
-		else
-			ReleaseBuffer(buf);
 
 		if (notup)
 			empty_end_pages++;
 		else
 			empty_end_pages = 0;
+
+		if (pgchanged)
+		{
+			WriteBuffer(buf);
+			changed_pages++;
+		}
+		else
+			ReleaseBuffer(buf);
 	}
 
 	pfree(vacpage);
@@ -974,26 +1003,22 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 	fraged_pages->empty_end_pages = empty_end_pages;
 
 	/*
-	 * Try to make fraged_pages keeping in mind that we can't use free
-	 * space of "empty" end-pages and last page if it reaped.
+	 * Clear the fraged_pages list if we found we couldn't shrink.
+	 * Else, remove any "empty" end-pages from the list, and compute
+	 * usable free space = free space in remaining pages.
 	 */
-	if (do_shrinking && vacuum_pages->num_pages - empty_end_pages > 0)
+	if (do_shrinking)
 	{
-		int			nusf;		/* blocks usefull for re-using */
-
-		nusf = vacuum_pages->num_pages - empty_end_pages;
-		if ((vacuum_pages->pagedesc[nusf - 1])->blkno == nblocks - empty_end_pages - 1)
-			nusf--;
-
-		for (i = 0; i < nusf; i++)
-		{
-			vp = vacuum_pages->pagedesc[i];
-			if (enough_space(vp, min_tlen))
-			{
-				vpage_insert(fraged_pages, vp);
-				usable_free_size += vp->free;
-			}
-		}
+		Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
+		fraged_pages->num_pages -= empty_end_pages;
+		usable_free_size = 0;
+		for (i = 0; i < fraged_pages->num_pages; i++)
+			usable_free_size += fraged_pages->pagedesc[i]->free;
+	}
+	else
+	{
+		fraged_pages->num_pages = 0;
+		usable_free_size = 0;
 	}
 
 	if (usable_free_size > 0 && num_vtlinks > 0)
@@ -1011,13 +1036,13 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 	}
 
 	elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
-Tup %.0f: Vac %u, Keep/VTL %u/%u, Crash %u, UnUsed %u, MinLen %lu, MaxLen %lu; \
-Re-using: Free/Avail. Space %lu/%lu; EndEmpty/Avail. Pages %u/%u. %s",
+Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, Crash %.0f, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
+Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u. %s",
 		 nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
 		 new_pages, num_tuples, tups_vacuumed,
 		 nkeep, vacrelstats->num_vtlinks, ncrash,
 		 nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
-		 (unsigned long) free_size, (unsigned long) usable_free_size,
+		 free_size, usable_free_size,
 		 empty_end_pages, fraged_pages->num_pages,
 		 show_rusage(&ru0));
 
@@ -1043,7 +1068,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 	CommandId	myCID;
 	Buffer		buf,
 				cur_buffer;
-	int			nblocks,
+	BlockNumber	nblocks,
 				blkno;
 	BlockNumber	last_move_dest_block = 0,
 				last_vacuum_block;
@@ -1093,10 +1118,19 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 
 	Nvacpagelist.num_pages = 0;
 	num_fraged_pages = fraged_pages->num_pages;
-	Assert(vacuum_pages->num_pages > vacuum_pages->empty_end_pages);
+	Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
 	vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
-	last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
-	last_vacuum_block = last_vacuum_page->blkno;
+	if (vacuumed_pages > 0)
+	{
+		/* get last reaped page from vacuum_pages */
+		last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
+		last_vacuum_block = last_vacuum_page->blkno;
+	}
+	else
+	{
+		last_vacuum_page = NULL;
+		last_vacuum_block = InvalidBlockNumber;
+	}
 	cur_buffer = InvalidBuffer;
 	num_moved = 0;
 
@@ -1106,19 +1140,34 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 	/*
 	 * Scan pages backwards from the last nonempty page, trying to move
 	 * tuples down to lower pages.	Quit when we reach a page that we have
-	 * moved any tuples onto.  Note that if a page is still in the
-	 * fraged_pages list (list of candidate move-target pages) when we
-	 * reach it, we will remove it from the list.  This ensures we never
-	 * move a tuple up to a higher page number.
+	 * moved any tuples onto, or the first page if we haven't moved anything,
+	 * or when we find a page we cannot completely empty (this last condition
+	 * is handled by "break" statements within the loop).
 	 *
 	 * NB: this code depends on the vacuum_pages and fraged_pages lists being
-	 * in order, and on fraged_pages being a subset of vacuum_pages.
+	 * in order by blkno.
 	 */
 	nblocks = vacrelstats->rel_pages;
 	for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
 		 blkno > last_move_dest_block;
 		 blkno--)
 	{
+		/*
+		 * Forget fraged_pages pages at or after this one; they're no longer
+		 * useful as move targets, since we only want to move down.  Note
+		 * that since we stop the outer loop at last_move_dest_block, pages
+		 * removed here cannot have had anything moved onto them already.
+		 */
+		while (num_fraged_pages > 0 &&
+			   fraged_pages->pagedesc[num_fraged_pages-1]->blkno >= blkno)
+		{
+			Assert(fraged_pages->pagedesc[num_fraged_pages-1]->offsets_used == 0);
+			--num_fraged_pages;
+		}
+
+		/*
+		 * Process this page of relation.
+		 */
 		buf = ReadBuffer(onerel, blkno);
 		page = BufferGetPage(buf);
 
@@ -1127,10 +1176,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 		isempty = PageIsEmpty(page);
 
 		dowrite = false;
-		if (blkno == last_vacuum_block) /* it's reaped page */
+
+		/* Is the page in the vacuum_pages list? */
+		if (blkno == last_vacuum_block)
 		{
-			if (last_vacuum_page->offsets_free > 0)		/* there are dead tuples */
-			{					/* on this page - clean */
+			if (last_vacuum_page->offsets_free > 0)
+			{
+				/* there are dead tuples on this page - clean them */
 				Assert(!isempty);
 				LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 				vacuum_page(onerel, buf, last_vacuum_page);
@@ -1151,12 +1203,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 				last_vacuum_page = NULL;
 				last_vacuum_block = InvalidBlockNumber;
 			}
-			if (num_fraged_pages > 0 &&
-				fraged_pages->pagedesc[num_fraged_pages - 1]->blkno == blkno)
-			{
-				/* page is in fraged_pages too; remove it */
-				--num_fraged_pages;
-			}
 			if (isempty)
 			{
 				ReleaseBuffer(buf);
@@ -1305,29 +1351,15 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 					if (to_vacpage == NULL ||
 						!enough_space(to_vacpage, tlen))
 					{
-
-						/*
-						 * if to_vacpage no longer has enough free space
-						 * to be useful, remove it from fraged_pages list
-						 */
-						if (to_vacpage != NULL &&
-						!enough_space(to_vacpage, vacrelstats->min_tlen))
-						{
-							Assert(num_fraged_pages > to_item);
-							memmove(fraged_pages->pagedesc + to_item,
-									fraged_pages->pagedesc + to_item + 1,
-									sizeof(VacPage) * (num_fraged_pages - to_item - 1));
-							num_fraged_pages--;
-						}
 						for (i = 0; i < num_fraged_pages; i++)
 						{
 							if (enough_space(fraged_pages->pagedesc[i], tlen))
 								break;
 						}
 
-						/* can't move item anywhere */
 						if (i == num_fraged_pages)
 						{
+							/* can't move item anywhere */
 							for (i = 0; i < num_vtmove; i++)
 							{
 								Assert(vtmove[i].vacpage->offsets_used > 0);
@@ -1649,19 +1681,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 				{
 					WriteBuffer(cur_buffer);
 					cur_buffer = InvalidBuffer;
-
-					/*
-					 * If previous target page is now too full to add *any*
-					 * tuple to it, remove it from fraged_pages.
-					 */
-					if (!enough_space(cur_page, vacrelstats->min_tlen))
-					{
-						Assert(num_fraged_pages > cur_item);
-						memmove(fraged_pages->pagedesc + cur_item,
-								fraged_pages->pagedesc + cur_item + 1,
-								sizeof(VacPage) * (num_fraged_pages - cur_item - 1));
-						num_fraged_pages--;
-					}
 				}
 				for (i = 0; i < num_fraged_pages; i++)
 				{
@@ -1835,7 +1854,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 				qsort((char *) (vacpage->offsets), vacpage->offsets_free,
 					  sizeof(OffsetNumber), vac_cmp_offno);
 			}
-			reap_page(&Nvacpagelist, vacpage);
+			vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
 			WriteBuffer(buf);
 		}
 		else if (dowrite)
@@ -1858,7 +1877,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 
 	if (num_moved > 0)
 	{
-
 		/*
 		 * We have to commit our tuple movings before we truncate the
 		 * relation.  Ideally we should do Commit/StartTransactionCommand
@@ -1872,55 +1890,84 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 	}
 
 	/*
-	 * Clean uncleaned reaped pages from vacuum_pages list list and set
-	 * xmin committed for inserted tuples
+	 * We are not going to move any more tuples across pages, but we still
+	 * need to apply vacuum_page to compact free space in the remaining
+	 * pages in vacuum_pages list.  Note that some of these pages may also
+	 * be in the fraged_pages list, and may have had tuples moved onto them;
+	 * if so, we already did vacuum_page and needn't do it again.
 	 */
-	checked_moved = 0;
-	for (i = 0, curpage = vacuum_pages->pagedesc; i < vacuumed_pages; i++, curpage++)
+	for (i = 0, curpage = vacuum_pages->pagedesc;
+		 i < vacuumed_pages;
+		 i++, curpage++)
 	{
 		Assert((*curpage)->blkno < blkno);
-		buf = ReadBuffer(onerel, (*curpage)->blkno);
-		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-		page = BufferGetPage(buf);
-		if ((*curpage)->offsets_used == 0)		/* this page was not used */
+		if ((*curpage)->offsets_used == 0)
 		{
+			/* this page was not used as a move target, so must clean it */
+			buf = ReadBuffer(onerel, (*curpage)->blkno);
+			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+			page = BufferGetPage(buf);
 			if (!PageIsEmpty(page))
 				vacuum_page(onerel, buf, *curpage);
+			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+			WriteBuffer(buf);
 		}
-		else
-/* this page was used */
+	}
+
+	/*
+	 * Now scan all the pages that we moved tuples onto and update
+	 * tuple status bits.  This is not really necessary, but will save time
+	 * for future transactions examining these tuples.
+	 *
+	 * XXX Notice that this code fails to clear HEAP_MOVED_OFF tuples from
+	 * pages that were move source pages but not move dest pages.  One also
+	 * wonders whether it wouldn't be better to skip this step and let the
+	 * tuple status updates happen someplace that's not holding an exclusive
+	 * lock on the relation.
+	 */
+	checked_moved = 0;
+	for (i = 0, curpage = fraged_pages->pagedesc;
+		 i < num_fraged_pages;
+		 i++, curpage++)
+	{
+		Assert((*curpage)->blkno < blkno);
+		if ((*curpage)->blkno > last_move_dest_block)
+			break;				/* no need to scan any further */
+		if ((*curpage)->offsets_used == 0)
+			continue;			/* this page was never used as a move dest */
+		buf = ReadBuffer(onerel, (*curpage)->blkno);
+		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+		page = BufferGetPage(buf);
+		num_tuples = 0;
+		max_offset = PageGetMaxOffsetNumber(page);
+		for (newoff = FirstOffsetNumber;
+			 newoff <= max_offset;
+			 newoff = OffsetNumberNext(newoff))
 		{
-			num_tuples = 0;
-			max_offset = PageGetMaxOffsetNumber(page);
-			for (newoff = FirstOffsetNumber;
-				 newoff <= max_offset;
-				 newoff = OffsetNumberNext(newoff))
+			itemid = PageGetItemId(page, newoff);
+			if (!ItemIdIsUsed(itemid))
+				continue;
+			tuple.t_datamcxt = NULL;
+			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+			if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
 			{
-				itemid = PageGetItemId(page, newoff);
-				if (!ItemIdIsUsed(itemid))
-					continue;
-				tuple.t_datamcxt = NULL;
-				tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
-				if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+				if ((TransactionId) tuple.t_data->t_cmin != myXID)
+					elog(ERROR, "Invalid XID in t_cmin (2)");
+				if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
 				{
-					if ((TransactionId) tuple.t_data->t_cmin != myXID)
-						elog(ERROR, "Invalid XID in t_cmin (2)");
-					if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-					{
-						tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
-						num_tuples++;
-					}
-					else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
-						tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
-					else
-						elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+					tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
+					num_tuples++;
 				}
+				else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
+					tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
+				else
+					elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
 			}
-			Assert((*curpage)->offsets_used == num_tuples);
-			checked_moved += num_tuples;
 		}
 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 		WriteBuffer(buf);
+		Assert((*curpage)->offsets_used == num_tuples);
+		checked_moved += num_tuples;
 	}
 	Assert(num_moved == checked_moved);
 
@@ -2083,7 +2130,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
 	 * tuples have correct on-row commit status on disk (see bufmgr.c's
 	 * comments for FlushRelationBuffers()).
 	 */
-	Assert(vacrelstats->rel_pages >= (BlockNumber) vacuum_pages->empty_end_pages);
+	Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
 	relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
 
 	i = FlushRelationBuffers(onerel, relblocks);
@@ -2367,34 +2414,35 @@ vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
 	heap_close(rd, RowExclusiveLock);
 }
 
-/*
- *	reap_page() -- save a page on the array of reaped pages.
- *
- *		As a side effect of the way that the vacuuming loop for a given
- *		relation works, higher pages come after lower pages in the array
- *		(and highest tid on a page is last).
- */
-static void
-reap_page(VacPageList vacpagelist, VacPage vacpage)
+/* Copy a VacPage structure */
+static VacPage
+copy_vac_page(VacPage vacpage)
 {
 	VacPage		newvacpage;
 
 	/* allocate a VacPageData entry */
-	newvacpage = (VacPage) palloc(sizeof(VacPageData) + vacpage->offsets_free * sizeof(OffsetNumber));
+	newvacpage = (VacPage) palloc(sizeof(VacPageData) +
+								  vacpage->offsets_free * sizeof(OffsetNumber));
 
 	/* fill it in */
 	if (vacpage->offsets_free > 0)
-		memmove(newvacpage->offsets, vacpage->offsets, vacpage->offsets_free * sizeof(OffsetNumber));
+		memcpy(newvacpage->offsets, vacpage->offsets,
+			   vacpage->offsets_free * sizeof(OffsetNumber));
 	newvacpage->blkno = vacpage->blkno;
 	newvacpage->free = vacpage->free;
 	newvacpage->offsets_used = vacpage->offsets_used;
 	newvacpage->offsets_free = vacpage->offsets_free;
 
-	/* insert this page into vacpagelist list */
-	vpage_insert(vacpagelist, newvacpage);
-
+	return newvacpage;
 }
 
+/*
+ * Add a VacPage pointer to a VacPageList.
+ *
+ *		As a side effect of the way that scan_heap works,
+ *		higher pages come after lower pages in the array
+ *		(and highest tid on a page is last).
+ */
 static void
 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
 {
-- 
GitLab