diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e16ac0cb9171fc8f6b1476edca4042f72efeeaac..14490a918e7983b14254947267453c4882216ca6 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.217 2005/08/22 00:41:28 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.218 2005/08/22 23:59:04 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -70,19 +70,19 @@ * default method. We assume that fsync() is always available, and that * configure determined whether fdatasync() is. */ -#ifdef O_SYNC -#define CMP_OPEN_SYNC_FLAG O_SYNC +#if defined(O_SYNC) +#define BARE_OPEN_SYNC_FLAG O_SYNC #elif defined(O_FSYNC) -#define CMP_OPEN_SYNC_FLAG O_FSYNC +#define BARE_OPEN_SYNC_FLAG O_FSYNC #endif -#ifdef CMP_OPEN_SYNC_FLAG -#define OPEN_SYNC_FLAG (CMP_OPEN_SYNC_FLAG | PG_O_DIRECT) +#ifdef BARE_OPEN_SYNC_FLAG +#define OPEN_SYNC_FLAG (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT) #endif -#ifdef O_DSYNC -#ifdef OPEN_SYNC_FLAG +#if defined(O_DSYNC) +#if defined(OPEN_SYNC_FLAG) /* O_DSYNC is distinct? */ -#if O_DSYNC != CMP_OPEN_SYNC_FLAG +#if O_DSYNC != BARE_OPEN_SYNC_FLAG #define OPEN_DATASYNC_FLAG (O_DSYNC | PG_O_DIRECT) #endif #else /* !defined(OPEN_SYNC_FLAG) */ @@ -91,7 +91,7 @@ #endif #endif -#ifdef OPEN_DATASYNC_FLAG +#if defined(OPEN_DATASYNC_FLAG) #define DEFAULT_SYNC_METHOD_STR "open_datasync" #define DEFAULT_SYNC_METHOD SYNC_METHOD_OPEN #define DEFAULT_SYNC_FLAGBIT OPEN_DATASYNC_FLAG @@ -469,7 +469,7 @@ static bool recoveryStopsHere(XLogRecord *record, bool *includeThis); static bool XLogCheckBuffer(XLogRecData *rdata, XLogRecPtr *lsn, BkpBlock *bkpb); static bool AdvanceXLInsertBuffer(void); -static void XLogWrite(XLogwrtRqst WriteRqst); +static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); static int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock); static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, @@ -497,18 +497,6 @@ static void ReadControlFile(void); static char *str_time(time_t tnow); static void issue_xlog_fsync(void); -/* XLog gather-write stuff */ -typedef struct XLogPages -{ - char *head; /* Start of first page to write */ - Size size; /* Total bytes to write == count(pages) * BLCKSZ */ - uint32 offset; /* Starting offset in xlog segment file */ -} XLogPages; - -static void XLogPageReset(XLogPages *pages); -static void XLogPageWrite(XLogPages *pages, int index); -static void XLogPageFlush(XLogPages *pages, int index); - #ifdef WAL_DEBUG static void xlog_outrec(char *buf, XLogRecord *record); #endif @@ -726,9 +714,17 @@ begin:; { if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE)) { + /* + * Since the amount of data we write here is completely optional + * anyway, tell XLogWrite it can be "flexible" and stop at a + * convenient boundary. This allows writes triggered by this + * mechanism to synchronize with the cache boundaries, so that + * in a long transaction we'll basically dump alternating halves + * of the buffer array. + */ LogwrtResult = XLogCtl->Write.LogwrtResult; if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write)) - XLogWrite(LogwrtRqst); + XLogWrite(LogwrtRqst, true); LWLockRelease(WALWriteLock); } } @@ -1219,7 +1215,7 @@ AdvanceXLInsertBuffer(void) WriteRqst.Write = OldPageRqstPtr; WriteRqst.Flush.xlogid = 0; WriteRqst.Flush.xrecoff = 0; - XLogWrite(WriteRqst); + XLogWrite(WriteRqst, false); LWLockRelease(WALWriteLock); Insert->LogwrtResult = LogwrtResult; } @@ -1279,16 +1275,24 @@ AdvanceXLInsertBuffer(void) /* * Write and/or fsync the log at least as far as WriteRqst indicates. * + * If flexible == TRUE, we don't have to write as far as WriteRqst, but + * may stop at any convenient boundary (such as a cache or logfile boundary). + * This option allows us to avoid uselessly issuing multiple writes when a + * single one would do. + * * Must be called with WALWriteLock held. */ static void -XLogWrite(XLogwrtRqst WriteRqst) +XLogWrite(XLogwrtRqst WriteRqst, bool flexible) { XLogCtlWrite *Write = &XLogCtl->Write; bool ispartialpage; + bool finishing_seg; bool use_existent; - int currentIndex = Write->curridx; - XLogPages pages; + int curridx; + int npages; + int startidx; + uint32 startoffset; /* We should always be inside a critical section here */ Assert(CritSectionCount > 0); @@ -1299,7 +1303,27 @@ XLogWrite(XLogwrtRqst WriteRqst) */ LogwrtResult = Write->LogwrtResult; - XLogPageReset(&pages); + /* + * Since successive pages in the xlog cache are consecutively allocated, + * we can usually gather multiple pages together and issue just one + * write() call. npages is the number of pages we have determined can + * be written together; startidx is the cache block index of the first + * one, and startoffset is the file offset at which it should go. + * The latter two variables are only valid when npages > 0, but we must + * initialize all of them to keep the compiler quiet. + */ + npages = 0; + startidx = 0; + startoffset = 0; + + /* + * Within the loop, curridx is the cache block index of the page to + * consider writing. We advance Write->curridx only after successfully + * writing pages. (Right now, this refinement is useless since we are + * going to PANIC if any error occurs anyway; but someday it may come + * in useful.) + */ + curridx = Write->curridx; while (XLByteLT(LogwrtResult.Write, WriteRqst.Write)) { @@ -1309,22 +1333,23 @@ XLogWrite(XLogwrtRqst WriteRqst) * end of the last page that's been initialized by * AdvanceXLInsertBuffer. */ - if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[currentIndex])) + if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx])) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, - XLogCtl->xlblocks[currentIndex].xlogid, - XLogCtl->xlblocks[currentIndex].xrecoff); + XLogCtl->xlblocks[curridx].xlogid, + XLogCtl->xlblocks[curridx].xrecoff); /* Advance LogwrtResult.Write to end of current buffer page */ - LogwrtResult.Write = XLogCtl->xlblocks[currentIndex]; + LogwrtResult.Write = XLogCtl->xlblocks[curridx]; ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write); if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) { /* - * Switch to new logfile segment. + * Switch to new logfile segment. We cannot have any pending + * pages here (since we dump what we have at segment end). */ - XLogPageFlush(&pages, currentIndex); + Assert(npages == 0); if (openLogFile >= 0) { if (close(openLogFile)) @@ -1391,6 +1416,7 @@ XLogWrite(XLogwrtRqst WriteRqst) LWLockRelease(ControlFileLock); } + /* Make sure we have the current logfile open */ if (openLogFile < 0) { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); @@ -1398,27 +1424,83 @@ XLogWrite(XLogwrtRqst WriteRqst) openLogOff = 0; } - /* Add a page to buffer */ - XLogPageWrite(&pages, currentIndex); + /* Add current page to the set of pending pages-to-dump */ + if (npages == 0) + { + /* first of group */ + startidx = curridx; + startoffset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize; + } + npages++; /* - * If we just wrote the whole last page of a logfile segment, - * fsync the segment immediately. This avoids having to go back - * and re-open prior segments when an fsync request comes along - * later. Doing it here ensures that one and only one backend will - * perform this fsync. - * - * This is also the right place to notify the Archiver that the - * segment is ready to copy to archival storage. + * Dump the set if this will be the last loop iteration, or if + * we are at the last page of the cache area (since the next page + * won't be contiguous in memory), or if we are at the end of the + * logfile segment. */ - if (openLogOff + pages.size >= XLogSegSize && !ispartialpage) + finishing_seg = !ispartialpage && + (startoffset + npages * BLCKSZ) >= XLogSegSize; + + if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) || + curridx == XLogCtl->XLogCacheBlck || + finishing_seg) { - XLogPageFlush(&pages, currentIndex); - issue_xlog_fsync(); - LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */ + char *from; + Size nbytes; - if (XLogArchivingActive()) - XLogArchiveNotifySeg(openLogId, openLogSeg); + /* Need to seek in the file? */ + if (openLogOff != startoffset) + { + if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not seek in log file %u, " + "segment %u to offset %u: %m", + openLogId, openLogSeg, startoffset))); + openLogOff = startoffset; + } + + /* OK to write the page(s) */ + from = XLogCtl->pages + startidx * (Size) BLCKSZ; + nbytes = npages * (Size) BLCKSZ; + errno = 0; + if (write(openLogFile, from, nbytes) != nbytes) + { + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log file %u, segment %u " + "at offset %u length %lu: %m", + openLogId, openLogSeg, + openLogOff, (unsigned long) nbytes))); + } + + /* Update state for write */ + openLogOff += nbytes; + Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx); + npages = 0; + + /* + * If we just wrote the whole last page of a logfile segment, + * fsync the segment immediately. This avoids having to go back + * and re-open prior segments when an fsync request comes along + * later. Doing it here ensures that one and only one backend will + * perform this fsync. + * + * This is also the right place to notify the Archiver that the + * segment is ready to copy to archival storage. + */ + if (finishing_seg) + { + issue_xlog_fsync(); + LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ + + if (XLogArchivingActive()) + XLogArchiveNotifySeg(openLogId, openLogSeg); + } } if (ispartialpage) @@ -1427,9 +1509,15 @@ XLogWrite(XLogwrtRqst WriteRqst) LogwrtResult.Write = WriteRqst.Write; break; } - currentIndex = NextBufIdx(currentIndex); + curridx = NextBufIdx(curridx); + + /* If flexible, break out of loop as soon as we wrote something */ + if (flexible && npages == 0) + break; } - XLogPageFlush(&pages, currentIndex); + + Assert(npages == 0); + Assert(curridx == Write->curridx); /* * If asked to flush, do so @@ -1572,7 +1660,7 @@ XLogFlush(XLogRecPtr record) WriteRqst.Write = WriteRqstPtr; WriteRqst.Flush = record; } - XLogWrite(WriteRqst); + XLogWrite(WriteRqst, false); } LWLockRelease(WALWriteLock); } @@ -5898,72 +5986,3 @@ remove_backup_label(void) errmsg("could not remove file \"%s\": %m", BACKUP_LABEL_FILE))); } - - -/* XLog gather-write stuff */ - -static void -XLogPageReset(XLogPages *pages) -{ - memset(pages, 0, sizeof(*pages)); -} - -static void -XLogPageWrite(XLogPages *pages, int index) -{ - char *page = XLogCtl->pages + index * (Size) BLCKSZ; - Size size = BLCKSZ; - uint32 offset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize; - - if (pages->head + pages->size == page && - pages->offset + pages->size == offset) - { /* Pages are continuous. Append new page. */ - pages->size += size; - } - else - { /* Pages are not continuous. Flush and clear. */ - XLogPageFlush(pages, PrevBufIdx(index)); - pages->head = page; - pages->size = size; - pages->offset = offset; - } -} - -static void -XLogPageFlush(XLogPages *pages, int index) -{ - if (!pages->head) - { /* Nothing to write */ - XLogCtl->Write.curridx = index; - return; - } - - /* Need to seek in the file? */ - if (openLogOff != pages->offset) - { - openLogOff = pages->offset; - if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not seek in log file %u, segment %u to offset %u: %m", - openLogId, openLogSeg, openLogOff))); - } - - /* OK to write the page */ - errno = 0; - if (write(openLogFile, pages->head, pages->size) != pages->size) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not write to log file %u, segment %u length %u at offset %u: %m", - openLogId, openLogSeg, - (unsigned int) pages->size, openLogOff))); - } - - openLogOff += pages->size; - XLogCtl->Write.curridx = index; - XLogPageReset(pages); -}