From 9052537325158c2e377a6217a894e5631cb92f4a Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Mon, 22 Aug 2005 23:59:04 +0000
Subject: [PATCH] Rewrite gather-write patch into something less obviously
 bolted on after the fact.  Fix bug with incorrect test for whether we are at
 end of logfile segment.  Arrange for writes triggered by XLogInsert's
 is-cache-more-than-half-full test to synchronize with the cache boundaries,
 so that in long transactions we tend to write alternating halves of the cache
 rather than randomly chosen portions of it; this saves one more write syscall
 per cache load.

---
 src/backend/access/transam/xlog.c | 265 ++++++++++++++++--------------
 1 file changed, 142 insertions(+), 123 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e16ac0cb917..14490a918e7 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.217 2005/08/22 00:41:28 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.218 2005/08/22 23:59:04 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -70,19 +70,19 @@
  * default method.	We assume that fsync() is always available, and that
  * configure determined whether fdatasync() is.
  */
-#ifdef O_SYNC
-#define CMP_OPEN_SYNC_FLAG		O_SYNC
+#if defined(O_SYNC)
+#define BARE_OPEN_SYNC_FLAG		O_SYNC
 #elif defined(O_FSYNC)
-#define CMP_OPEN_SYNC_FLAG		O_FSYNC
+#define BARE_OPEN_SYNC_FLAG		O_FSYNC
 #endif
-#ifdef CMP_OPEN_SYNC_FLAG
-#define OPEN_SYNC_FLAG			(CMP_OPEN_SYNC_FLAG | PG_O_DIRECT)
+#ifdef BARE_OPEN_SYNC_FLAG
+#define OPEN_SYNC_FLAG			(BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
 #endif
 
-#ifdef O_DSYNC
-#ifdef OPEN_SYNC_FLAG
+#if defined(O_DSYNC)
+#if defined(OPEN_SYNC_FLAG)
 /* O_DSYNC is distinct? */
-#if O_DSYNC != CMP_OPEN_SYNC_FLAG
+#if O_DSYNC != BARE_OPEN_SYNC_FLAG
 #define OPEN_DATASYNC_FLAG		(O_DSYNC | PG_O_DIRECT)
 #endif
 #else /* !defined(OPEN_SYNC_FLAG) */
@@ -91,7 +91,7 @@
 #endif
 #endif
 
-#ifdef OPEN_DATASYNC_FLAG
+#if defined(OPEN_DATASYNC_FLAG)
 #define DEFAULT_SYNC_METHOD_STR	"open_datasync"
 #define DEFAULT_SYNC_METHOD		SYNC_METHOD_OPEN
 #define DEFAULT_SYNC_FLAGBIT	OPEN_DATASYNC_FLAG
@@ -469,7 +469,7 @@ static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
 static bool XLogCheckBuffer(XLogRecData *rdata,
 							XLogRecPtr *lsn, BkpBlock *bkpb);
 static bool AdvanceXLInsertBuffer(void);
-static void XLogWrite(XLogwrtRqst WriteRqst);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
 static int XLogFileInit(uint32 log, uint32 seg,
 			 bool *use_existent, bool use_lock);
 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
@@ -497,18 +497,6 @@ static void ReadControlFile(void);
 static char *str_time(time_t tnow);
 static void issue_xlog_fsync(void);
 
-/* XLog gather-write stuff */
-typedef struct XLogPages
-{
-	char	*head;		/* Start of first page to write */
-	Size	 size;		/* Total bytes to write == count(pages) * BLCKSZ */
-	uint32	 offset;	/* Starting offset in xlog segment file */
-} XLogPages;
-
-static void XLogPageReset(XLogPages *pages);
-static void XLogPageWrite(XLogPages *pages, int index);
-static void XLogPageFlush(XLogPages *pages, int index);
-
 #ifdef WAL_DEBUG
 static void xlog_outrec(char *buf, XLogRecord *record);
 #endif
@@ -726,9 +714,17 @@ begin:;
 	{
 		if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
 		{
+			/*
+			 * Since the amount of data we write here is completely optional
+			 * anyway, tell XLogWrite it can be "flexible" and stop at a
+			 * convenient boundary.  This allows writes triggered by this
+			 * mechanism to synchronize with the cache boundaries, so that
+			 * in a long transaction we'll basically dump alternating halves
+			 * of the buffer array.
+			 */
 			LogwrtResult = XLogCtl->Write.LogwrtResult;
 			if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
-				XLogWrite(LogwrtRqst);
+				XLogWrite(LogwrtRqst, true);
 			LWLockRelease(WALWriteLock);
 		}
 	}
@@ -1219,7 +1215,7 @@ AdvanceXLInsertBuffer(void)
 				WriteRqst.Write = OldPageRqstPtr;
 				WriteRqst.Flush.xlogid = 0;
 				WriteRqst.Flush.xrecoff = 0;
-				XLogWrite(WriteRqst);
+				XLogWrite(WriteRqst, false);
 				LWLockRelease(WALWriteLock);
 				Insert->LogwrtResult = LogwrtResult;
 			}
@@ -1279,16 +1275,24 @@ AdvanceXLInsertBuffer(void)
 /*
  * Write and/or fsync the log at least as far as WriteRqst indicates.
  *
+ * If flexible == TRUE, we don't have to write as far as WriteRqst, but
+ * may stop at any convenient boundary (such as a cache or logfile boundary).
+ * This option allows us to avoid uselessly issuing multiple writes when a
+ * single one would do.
+ *
  * Must be called with WALWriteLock held.
  */
 static void
-XLogWrite(XLogwrtRqst WriteRqst)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 {
 	XLogCtlWrite *Write = &XLogCtl->Write;
 	bool		ispartialpage;
+	bool		finishing_seg;
 	bool		use_existent;
-	int			currentIndex = Write->curridx;
-	XLogPages	pages;
+	int			curridx;
+	int			npages;
+	int			startidx;
+	uint32		startoffset;
 
 	/* We should always be inside a critical section here */
 	Assert(CritSectionCount > 0);
@@ -1299,7 +1303,27 @@ XLogWrite(XLogwrtRqst WriteRqst)
 	 */
 	LogwrtResult = Write->LogwrtResult;
 
-	XLogPageReset(&pages);
+	/*
+	 * Since successive pages in the xlog cache are consecutively allocated,
+	 * we can usually gather multiple pages together and issue just one
+	 * write() call.  npages is the number of pages we have determined can
+	 * be written together; startidx is the cache block index of the first
+	 * one, and startoffset is the file offset at which it should go.
+	 * The latter two variables are only valid when npages > 0, but we must
+	 * initialize all of them to keep the compiler quiet.
+	 */
+	npages = 0;
+	startidx = 0;
+	startoffset = 0;
+
+	/*
+	 * Within the loop, curridx is the cache block index of the page to
+	 * consider writing.  We advance Write->curridx only after successfully
+	 * writing pages.  (Right now, this refinement is useless since we are
+	 * going to PANIC if any error occurs anyway; but someday it may come
+	 * in useful.)
+	 */
+	curridx = Write->curridx;
 
 	while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
 	{
@@ -1309,22 +1333,23 @@ XLogWrite(XLogwrtRqst WriteRqst)
 		 * end of the last page that's been initialized by
 		 * AdvanceXLInsertBuffer.
 		 */
-		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[currentIndex]))
+		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
 			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
 				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
-				 XLogCtl->xlblocks[currentIndex].xlogid,
-				 XLogCtl->xlblocks[currentIndex].xrecoff);
+				 XLogCtl->xlblocks[curridx].xlogid,
+				 XLogCtl->xlblocks[curridx].xrecoff);
 
 		/* Advance LogwrtResult.Write to end of current buffer page */
-		LogwrtResult.Write = XLogCtl->xlblocks[currentIndex];
+		LogwrtResult.Write = XLogCtl->xlblocks[curridx];
 		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
 
 		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
 		{
 			/*
-			 * Switch to new logfile segment.
+			 * Switch to new logfile segment.  We cannot have any pending
+			 * pages here (since we dump what we have at segment end).
 			 */
-			XLogPageFlush(&pages, currentIndex);
+			Assert(npages == 0);
 			if (openLogFile >= 0)
 			{
 				if (close(openLogFile))
@@ -1391,6 +1416,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
 			LWLockRelease(ControlFileLock);
 		}
 
+		/* Make sure we have the current logfile open */
 		if (openLogFile < 0)
 		{
 			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
@@ -1398,27 +1424,83 @@ XLogWrite(XLogwrtRqst WriteRqst)
 			openLogOff = 0;
 		}
 
-		/* Add a page to buffer */
-		XLogPageWrite(&pages, currentIndex);
+		/* Add current page to the set of pending pages-to-dump */
+		if (npages == 0)
+		{
+			/* first of group */
+			startidx = curridx;
+			startoffset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
+		}
+		npages++;
 
 		/*
-		 * If we just wrote the whole last page of a logfile segment,
-		 * fsync the segment immediately.  This avoids having to go back
-		 * and re-open prior segments when an fsync request comes along
-		 * later. Doing it here ensures that one and only one backend will
-		 * perform this fsync.
-		 *
-		 * This is also the right place to notify the Archiver that the
-		 * segment is ready to copy to archival storage.
+		 * Dump the set if this will be the last loop iteration, or if
+		 * we are at the last page of the cache area (since the next page
+		 * won't be contiguous in memory), or if we are at the end of the
+		 * logfile segment.
 		 */
-		if (openLogOff + pages.size >= XLogSegSize && !ispartialpage)
+		finishing_seg = !ispartialpage &&
+			(startoffset + npages * BLCKSZ) >= XLogSegSize;
+
+		if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) ||
+			curridx == XLogCtl->XLogCacheBlck ||
+			finishing_seg)
 		{
-			XLogPageFlush(&pages, currentIndex);
-			issue_xlog_fsync();
-			LogwrtResult.Flush = LogwrtResult.Write;	/* end of current page */
+			char	   *from;
+			Size		nbytes;
 
-			if (XLogArchivingActive())
-				XLogArchiveNotifySeg(openLogId, openLogSeg);
+			/* Need to seek in the file? */
+			if (openLogOff != startoffset)
+			{
+				if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
+					ereport(PANIC,
+							(errcode_for_file_access(),
+							 errmsg("could not seek in log file %u, "
+									"segment %u to offset %u: %m",
+									openLogId, openLogSeg, startoffset)));
+				openLogOff = startoffset;
+			}
+
+			/* OK to write the page(s) */
+			from = XLogCtl->pages + startidx * (Size) BLCKSZ;
+			nbytes = npages * (Size) BLCKSZ;
+			errno = 0;
+			if (write(openLogFile, from, nbytes) != nbytes)
+			{
+				/* if write didn't set errno, assume no disk space */
+				if (errno == 0)
+					errno = ENOSPC;
+				ereport(PANIC,
+						(errcode_for_file_access(),
+						 errmsg("could not write to log file %u, segment %u "
+								"at offset %u length %lu: %m",
+								openLogId, openLogSeg,
+								openLogOff, (unsigned long) nbytes)));
+			}
+
+			/* Update state for write */
+			openLogOff += nbytes;
+			Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
+			npages = 0;
+
+			/*
+			 * If we just wrote the whole last page of a logfile segment,
+			 * fsync the segment immediately.  This avoids having to go back
+			 * and re-open prior segments when an fsync request comes along
+			 * later. Doing it here ensures that one and only one backend will
+			 * perform this fsync.
+			 *
+			 * This is also the right place to notify the Archiver that the
+			 * segment is ready to copy to archival storage.
+			 */
+			if (finishing_seg)
+			{
+				issue_xlog_fsync();
+				LogwrtResult.Flush = LogwrtResult.Write;	/* end of page */
+
+				if (XLogArchivingActive())
+					XLogArchiveNotifySeg(openLogId, openLogSeg);
+			}
 		}
 
 		if (ispartialpage)
@@ -1427,9 +1509,15 @@ XLogWrite(XLogwrtRqst WriteRqst)
 			LogwrtResult.Write = WriteRqst.Write;
 			break;
 		}
-		currentIndex = NextBufIdx(currentIndex);
+		curridx = NextBufIdx(curridx);
+
+		/* If flexible, break out of loop as soon as we wrote something */
+		if (flexible && npages == 0)
+			break;
 	}
-	XLogPageFlush(&pages, currentIndex);
+
+	Assert(npages == 0);
+	Assert(curridx == Write->curridx);
 
 	/*
 	 * If asked to flush, do so
@@ -1572,7 +1660,7 @@ XLogFlush(XLogRecPtr record)
 				WriteRqst.Write = WriteRqstPtr;
 				WriteRqst.Flush = record;
 			}
-			XLogWrite(WriteRqst);
+			XLogWrite(WriteRqst, false);
 		}
 		LWLockRelease(WALWriteLock);
 	}
@@ -5898,72 +5986,3 @@ remove_backup_label(void)
 					 errmsg("could not remove file \"%s\": %m",
 							BACKUP_LABEL_FILE)));
 }
-
-
-/* XLog gather-write stuff */
-
-static void
-XLogPageReset(XLogPages *pages)
-{
-	memset(pages, 0, sizeof(*pages));
-}
-
-static void
-XLogPageWrite(XLogPages *pages, int index)
-{
-	char *page = XLogCtl->pages + index * (Size) BLCKSZ;
-	Size size = BLCKSZ;
-	uint32 offset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
-
-	if (pages->head + pages->size == page &&
-		pages->offset + pages->size == offset)
-	{	/* Pages are continuous. Append new page. */
-		pages->size += size;
-	}
-	else
-	{	/* Pages are not continuous. Flush and clear. */
-		XLogPageFlush(pages, PrevBufIdx(index));
-		pages->head = page;
-		pages->size = size;
-		pages->offset = offset;
-	}
-}
-
-static void
-XLogPageFlush(XLogPages *pages, int index)
-{
-	if (!pages->head)
-	{	/* Nothing to write */
-		XLogCtl->Write.curridx = index;
-		return;
-	}
-
-	/* Need to seek in the file? */
-	if (openLogOff != pages->offset)
-	{
-		openLogOff = pages->offset;
-		if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
-			ereport(PANIC,
-					(errcode_for_file_access(),
-					 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
-							openLogId, openLogSeg, openLogOff)));
-	}
-
-	/* OK to write the page */
-	errno = 0;
-	if (write(openLogFile, pages->head, pages->size) != pages->size)
-	{
-		/* if write didn't set errno, assume problem is no disk space */
-		if (errno == 0)
-			errno = ENOSPC;
-		ereport(PANIC,
-				(errcode_for_file_access(),
-				 errmsg("could not write to log file %u, segment %u length %u at offset %u: %m",
-						openLogId, openLogSeg,
-						(unsigned int) pages->size, openLogOff)));
-	}
-
-	openLogOff += pages->size;
-	XLogCtl->Write.curridx = index;
-	XLogPageReset(pages);
-}
-- 
GitLab