From 7d4d5c00f00efaf935e35067375d43d9e72f5e03 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Thu, 19 Jul 2001 02:12:35 +0000
Subject: [PATCH] Arrange to recycle old XLOG log segment files as new segment
 files, rather than deleting them only to have to create more.  Steady state
 is 2*CHECKPOINT_SEGMENTS + WAL_FILES + 1 segment files, which will simply be
 renamed rather than constantly deleted and recreated. To make this safe,
 added current XLOG file/offset number to page header of XLOG pages, so that
 an un-overwritten page from an old incarnation of a logfile can be reliably
 told from a valid page. This change means that if you try to restart
 postmaster in a CVS-tip database after installing the change, you'll get a
 complaint about bad XLOG page magic number.  If you don't want to initdb, run
 contrib/pg_resetxlog (and be sure you shut down the old postmaster cleanly).

---
 contrib/pg_resetxlog/pg_resetxlog.c |   6 +-
 src/backend/access/transam/xlog.c   | 260 ++++++++++++++++++++--------
 src/include/access/xlog.h           |   5 +-
 3 files changed, 198 insertions(+), 73 deletions(-)

diff --git a/contrib/pg_resetxlog/pg_resetxlog.c b/contrib/pg_resetxlog/pg_resetxlog.c
index f8c81b5c55d..6d32160905d 100644
--- a/contrib/pg_resetxlog/pg_resetxlog.c
+++ b/contrib/pg_resetxlog/pg_resetxlog.c
@@ -23,7 +23,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/contrib/pg_resetxlog/Attic/pg_resetxlog.c,v 1.5 2001/06/06 17:07:38 tgl Exp $
+ * $Header: /cvsroot/pgsql/contrib/pg_resetxlog/Attic/pg_resetxlog.c,v 1.6 2001/07/19 02:12:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -857,6 +857,10 @@ WriteEmptyXLOG(void)
 	page->xlp_magic = XLOG_PAGE_MAGIC;
 	page->xlp_info = 0;
 	page->xlp_sui = ControlFile.checkPointCopy.ThisStartUpID;
+	page->xlp_pageaddr.xlogid =
+		ControlFile.checkPointCopy.redo.xlogid;
+	page->xlp_pageaddr.xrecoff =
+		ControlFile.checkPointCopy.redo.xrecoff - SizeOfXLogPHD;
 	record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
 	record->xl_prev.xlogid = 0;
 	record->xl_prev.xrecoff = 0;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 94ba140b3ad..3251fb2afdb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.70 2001/06/21 19:45:45 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.71 2001/07/19 02:12:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -92,22 +92,35 @@
 /* User-settable parameters */
 int			CheckPointSegments = 3;
 int			XLOGbuffers = 8;
-int			XLOGfiles = 0;		/* how many files to pre-allocate during
-								 * ckpt */
+int			XLOGfiles = 0;		/* # of files to preallocate during ckpt */
 int			XLOG_DEBUG = 0;
 char	   *XLOG_sync_method = NULL;
 const char	XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
 char		XLOG_archive_dir[MAXPGPATH];		/* null string means
 												 * delete 'em */
 
+/*
+ * XLOGfileslop is used in the code as the allowed "fuzz" in the number of 
+ * preallocated XLOG segments --- we try to have at least XLOGfiles advance
+ * segments but no more than XLOGfiles+XLOGfileslop segments.  This could
+ * be made a separate GUC variable, but at present I think it's sufficient
+ * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
+ * checkpoint will free no more than 2*CheckPointSegments log segments, and
+ * we want to recycle all of them; the +1 allows boundary cases to happen
+ * without wasting a delete/create-segment cycle.
+ */
+
+#define XLOGfileslop	(2*CheckPointSegments + 1)
+
+
 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
 static int	sync_method = DEFAULT_SYNC_METHOD;
 static int	open_sync_bit = DEFAULT_SYNC_FLAGBIT;
 
-#define MinXLOGbuffers	4
-
 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
 
+#define MinXLOGbuffers	4
+
 
 /*
  * ThisStartUpID will be same in all backends --- it identifies current
@@ -405,9 +418,12 @@ static bool AdvanceXLInsertBuffer(void);
 static void XLogWrite(XLogwrtRqst WriteRqst);
 static int XLogFileInit(uint32 log, uint32 seg,
 			 bool *use_existent, bool use_lock);
+static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
+								   bool find_free, int max_advance,
+								   bool use_lock);
 static int	XLogFileOpen(uint32 log, uint32 seg, bool econt);
 static void PreallocXlogFiles(XLogRecPtr endptr);
-static void MoveOfflineLogs(uint32 log, uint32 seg);
+static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
@@ -856,6 +872,8 @@ AdvanceXLInsertBuffer(void)
 	bool		update_needed = true;
 	XLogRecPtr	OldPageRqstPtr;
 	XLogwrtRqst WriteRqst;
+	XLogRecPtr	NewPageEndPtr;
+	XLogPageHeader NewPage;
 
 	/* Use Insert->LogwrtResult copy if it's more fresh */
 	if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
@@ -930,32 +948,35 @@ AdvanceXLInsertBuffer(void)
 	 * Now the next buffer slot is free and we can set it up to be the
 	 * next output page.
 	 */
-	if (XLogCtl->xlblocks[Insert->curridx].xrecoff >= XLogFileSize)
+	NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
+	if (NewPageEndPtr.xrecoff >= XLogFileSize)
 	{
 		/* crossing a logid boundary */
-		XLogCtl->xlblocks[nextidx].xlogid =
-			XLogCtl->xlblocks[Insert->curridx].xlogid + 1;
-		XLogCtl->xlblocks[nextidx].xrecoff = BLCKSZ;
+		NewPageEndPtr.xlogid += 1;
+		NewPageEndPtr.xrecoff = BLCKSZ;
 	}
 	else
 	{
-		XLogCtl->xlblocks[nextidx].xlogid =
-			XLogCtl->xlblocks[Insert->curridx].xlogid;
-		XLogCtl->xlblocks[nextidx].xrecoff =
-			XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ;
+		NewPageEndPtr.xrecoff += BLCKSZ;
 	}
+	XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
+	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
 	Insert->curridx = nextidx;
-	Insert->currpage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
-	Insert->currpos = ((char *) Insert->currpage) + SizeOfXLogPHD;
+	Insert->currpage = NewPage;
+	Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD;
 
 	/*
 	 * Be sure to re-zero the buffer so that bytes beyond what we've
 	 * written will look like zeroes and not valid XLOG records...
 	 */
-	MemSet((char *) Insert->currpage, 0, BLCKSZ);
-	Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
-	/* Insert->currpage->xlp_info = 0; *//* done by memset */
-	Insert->currpage->xlp_sui = ThisStartUpID;
+	MemSet((char *) NewPage, 0, BLCKSZ);
+
+	/* And fill the new page's header */
+	NewPage->xlp_magic = XLOG_PAGE_MAGIC;
+	/* NewPage->xlp_info = 0; */			/* done by memset */
+	NewPage->xlp_sui = ThisStartUpID;
+	NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
+	NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
 
 	return update_needed;
 }
@@ -1273,10 +1294,7 @@ XLogFileInit(uint32 log, uint32 seg,
 {
 	char		path[MAXPGPATH];
 	char		tmppath[MAXPGPATH];
-	char		targpath[MAXPGPATH];
 	char		zbuffer[BLCKSZ];
-	uint32		targlog,
-				targseg;
 	int			fd;
 	int			nbytes;
 
@@ -1352,32 +1370,96 @@ XLogFileInit(uint32 log, uint32 seg,
 	close(fd);
 
 	/*
-	 * Now move the segment into place with its final name.  We want to be
-	 * sure that only one process does this at a time.
-	 */
-	if (use_lock)
-		SpinAcquire(ControlFileLockId);
-
-	/*
+	 * Now move the segment into place with its final name.
+	 *
 	 * If caller didn't want to use a pre-existing file, get rid of any
 	 * pre-existing file.  Otherwise, cope with possibility that someone
 	 * else has created the file while we were filling ours: if so, use
 	 * ours to pre-create a future log segment.
 	 */
-	targlog = log;
-	targseg = seg;
-	strcpy(targpath, path);
+	if (!InstallXLogFileSegment(log, seg, tmppath,
+								*use_existent, XLOGfiles + XLOGfileslop,
+								use_lock))
+	{
+		/* No need for any more future segments... */
+		unlink(tmppath);
+	}
+
+	/* Set flag to tell caller there was no existent file */
+	*use_existent = false;
+
+	/* Now open original target segment (might not be file I just made) */
+	fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
+					   S_IRUSR | S_IWUSR);
+	if (fd < 0)
+		elog(STOP, "open of %s (log file %u, segment %u) failed: %m",
+			 path, log, seg);
+
+	return (fd);
+}
+
+/*
+ * Install a new XLOG segment file as a current or future log segment.
+ *
+ * This is used both to install a newly-created segment (which has a temp
+ * filename while it's being created) and to recycle an old segment.
+ *
+ * log, seg: identify segment to install as (or first possible target).
+ *
+ * tmppath: initial name of file to install.  It will be renamed into place.
+ *
+ * find_free: if TRUE, install the new segment at the first empty log/seg
+ * number at or after the passed numbers.  If FALSE, install the new segment
+ * exactly where specified, deleting any existing segment file there.
+ *
+ * max_advance: maximum number of log/seg slots to advance past the starting
+ * point.  Fail if no free slot is found in this range.  (Irrelevant if
+ * find_free is FALSE.)
+ *
+ * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
+ * place.  This should be TRUE except during bootstrap log creation.  The
+ * caller must *not* hold the spinlock at call.
+ *
+ * Returns TRUE if file installed, FALSE if not installed because of
+ * exceeding max_advance limit.  (Any other kind of failure causes elog().)
+ */
+static bool
+InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
+					   bool find_free, int max_advance,
+					   bool use_lock)
+{
+	char		path[MAXPGPATH];
+	int			fd;
+
+	XLogFileName(path, log, seg);
+
+	/*
+	 * We want to be sure that only one process does this at a time.
+	 */
+	if (use_lock)
+		SpinAcquire(ControlFileLockId);
 
-	if (!*use_existent)
-		unlink(targpath);
+	if (!find_free)
+	{
+		/* Force installation: get rid of any pre-existing segment file */
+		unlink(path);
+	}
 	else
 	{
-		while ((fd = BasicOpenFile(targpath, O_RDWR | PG_BINARY,
+		/* Find a free slot to put it in */
+		while ((fd = BasicOpenFile(path, O_RDWR | PG_BINARY,
 								   S_IRUSR | S_IWUSR)) >= 0)
 		{
 			close(fd);
-			NextLogSeg(targlog, targseg);
-			XLogFileName(targpath, targlog, targseg);
+			if (--max_advance < 0)
+			{
+				/* Failed to find a free slot within specified range */
+				if (use_lock)
+					SpinRelease(ControlFileLockId);
+				return false;
+			}
+			NextLogSeg(log, seg);
+			XLogFileName(path, log, seg);
 		}
 	}
 
@@ -1387,30 +1469,20 @@ XLogFileInit(uint32 log, uint32 seg,
 	 * rename() is an acceptable substitute except for the truly paranoid.
 	 */
 #ifndef __BEOS__
-	if (link(tmppath, targpath) < 0)
+	if (link(tmppath, path) < 0)
 		elog(STOP, "link from %s to %s (initialization of log file %u, segment %u) failed: %m",
-			 tmppath, targpath, targlog, targseg);
+			 tmppath, path, log, seg);
 	unlink(tmppath);
 #else
-	if (rename(tmppath, targpath) < 0)
+	if (rename(tmppath, path) < 0)
 		elog(STOP, "rename from %s to %s (initialization of log file %u, segment %u) failed: %m",
-			 tmppath, targpath targlog, targseg);
+			 tmppath, path, log, seg);
 #endif
 
 	if (use_lock)
 		SpinRelease(ControlFileLockId);
 
-	/* Set flag to tell caller there was no existent file */
-	*use_existent = false;
-
-	/* Now open original target segment (might not be file I just made) */
-	fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
-					   S_IRUSR | S_IWUSR);
-	if (fd < 0)
-		elog(STOP, "open of %s (log file %u, segment %u) failed: %m",
-			 path, log, seg);
-
-	return (fd);
+	return true;
 }
 
 /*
@@ -1477,20 +1549,26 @@ PreallocXlogFiles(XLogRecPtr endptr)
 
 /*
  * Remove or move offline all log files older or equal to passed log/seg#
+ *
+ * endptr is current (or recent) end of xlog; this is used to determine
+ * whether we want to recycle rather than delete no-longer-wanted log files.
  */
 static void
-MoveOfflineLogs(uint32 log, uint32 seg)
+MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
 {
+	uint32		endlogId;
+	uint32		endlogSeg;
 	DIR		   *xldir;
 	struct dirent *xlde;
 	char		lastoff[32];
 	char		path[MAXPGPATH];
 
-	Assert(XLOG_archive_dir[0] == 0);	/* not implemented yet */
+	XLByteToPrevSeg(endptr, endlogId, endlogSeg);
 
 	xldir = opendir(XLogDir);
 	if (xldir == NULL)
-		elog(STOP, "could not open transaction log directory (%s): %m", XLogDir);
+		elog(STOP, "could not open transaction log directory (%s): %m",
+			 XLogDir);
 
 	sprintf(lastoff, "%08X%08X", log, seg);
 
@@ -1501,19 +1579,42 @@ MoveOfflineLogs(uint32 log, uint32 seg)
 			strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
 			strcmp(xlde->d_name, lastoff) <= 0)
 		{
+			sprintf(path, "%s/%s", XLogDir, xlde->d_name);
 			if (XLOG_archive_dir[0])
-				elog(LOG, "archiving transaction log file %s", xlde->d_name);
+			{
+				elog(LOG, "archiving transaction log file %s",
+					 xlde->d_name);
+				elog(NOTICE, "archiving log files is not implemented!");
+			}
 			else
-				elog(LOG, "removing transaction log file %s", xlde->d_name);
-
-			sprintf(path, "%s/%s", XLogDir, xlde->d_name);
-			if (XLOG_archive_dir[0] == 0)
-				unlink(path);
+			{
+				/*
+				 * Before deleting the file, see if it can be recycled as
+				 * a future log segment.  We allow recycling segments up to
+				 * XLOGfiles + XLOGfileslop segments beyond the current
+				 * XLOG location.
+				 */
+				if (InstallXLogFileSegment(endlogId, endlogSeg, path,
+										   true, XLOGfiles + XLOGfileslop,
+										   true))
+				{
+					elog(LOG, "recycled transaction log file %s",
+						 xlde->d_name);
+				}
+				else
+				{
+					/* No need for any more future segments... */
+					elog(LOG, "removing transaction log file %s",
+						 xlde->d_name);
+					unlink(path);
+				}
+			}
 		}
 		errno = 0;
 	}
 	if (errno)
-		elog(STOP, "could not read transaction log directory (%s): %m", XLogDir);
+		elog(STOP, "could not read transaction log directory (%s): %m",
+			 XLogDir);
 	closedir(xldir);
 }
 
@@ -1866,6 +1967,8 @@ next_record_is_invalid:;
 static bool
 ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
 {
+	XLogRecPtr	recaddr;
+
 	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
 	{
 		elog(emode, "ReadRecord: invalid magic number %04X in log file %u, segment %u, offset %u",
@@ -1878,6 +1981,15 @@ ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
 			 hdr->xlp_info, readId, readSeg, readOff);
 		return false;
 	}
+	recaddr.xlogid = readId;
+	recaddr.xrecoff = readSeg * XLogSegSize + readOff;
+	if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
+	{
+		elog(emode, "ReadRecord: unexpected pageaddr (%u, %u) in log file %u, segment %u, offset %u",
+			 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
+			 readId, readSeg, readOff);
+		return false;
+	}
 
 	/*
 	 * We disbelieve a SUI less than the previous page's SUI, or more than
@@ -2248,6 +2360,8 @@ BootStrapXLOG(void)
 	page->xlp_magic = XLOG_PAGE_MAGIC;
 	page->xlp_info = 0;
 	page->xlp_sui = checkPoint.ThisStartUpID;
+	page->xlp_pageaddr.xlogid = 0;
+	page->xlp_pageaddr.xrecoff = 0;
 	record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
 	record->xl_prev.xlogid = 0;
 	record->xl_prev.xrecoff = 0;
@@ -2500,23 +2614,29 @@ StartupXLOG(void)
 		EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ);
 	if (EndOfLog.xrecoff % BLCKSZ == 0)
 	{
-		if (EndOfLog.xrecoff >= XLogFileSize)
+		XLogRecPtr	NewPageEndPtr;
+
+		NewPageEndPtr = EndOfLog;
+		if (NewPageEndPtr.xrecoff >= XLogFileSize)
 		{
-			XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid + 1;
-			XLogCtl->xlblocks[0].xrecoff = BLCKSZ;
+			/* crossing a logid boundary */
+			NewPageEndPtr.xlogid += 1;
+			NewPageEndPtr.xrecoff = BLCKSZ;
 		}
 		else
 		{
-			XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid;
-			XLogCtl->xlblocks[0].xrecoff = EndOfLog.xrecoff + BLCKSZ;
+			NewPageEndPtr.xrecoff += BLCKSZ;
 		}
-		Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
+		XLogCtl->xlblocks[0] = NewPageEndPtr;
 		Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
 		if (InRecovery)
 			Insert->currpage->xlp_sui = ThisStartUpID;
 		else
 			Insert->currpage->xlp_sui = ThisStartUpID + 1;
+		Insert->currpage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
+		Insert->currpage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
 		/* rest of buffer was zeroed in XLOGShmemInit */
+		Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
 	}
 	else
 	{
@@ -2916,7 +3036,7 @@ CreateCheckPoint(bool shutdown)
 	if (_logId || _logSeg)
 	{
 		PrevLogSeg(_logId, _logSeg);
-		MoveOfflineLogs(_logId, _logSeg);
+		MoveOfflineLogs(_logId, _logSeg, recptr);
 	}
 
 	/*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 41a8d84dade..73a60b2e0ce 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: xlog.h,v 1.23 2001/03/22 04:00:32 momjian Exp $
+ * $Id: xlog.h,v 1.24 2001/07/19 02:12:35 tgl Exp $
  */
 #ifndef XLOG_H
 #define XLOG_H
@@ -109,13 +109,14 @@ typedef struct XLogContRecord
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD058	/* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD059	/* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
 	uint16		xlp_magic;		/* magic value for correctness checks */
 	uint16		xlp_info;		/* flag bits, see below */
 	StartUpID	xlp_sui;		/* StartUpID of first record on page */
+	XLogRecPtr	xlp_pageaddr;	/* XLOG address of this page */
 } XLogPageHeaderData;
 
 #define SizeOfXLogPHD	MAXALIGN(sizeof(XLogPageHeaderData))
-- 
GitLab