diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index ba533d35978d91ac07a9f02b81e5dc5575d3cff9..9facc198e4e176638e98993ce1183ead18eeb930 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -71,33 +71,10 @@ usage(void) static bool segment_callback(XLogRecPtr segendpos, uint32 timeline) { - char fn[MAXPGPATH]; - struct stat statbuf; - if (verbose) fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), progname, segendpos.xlogid, segendpos.xrecoff, timeline); - /* - * Check if there is a partial file for the name we just finished, and if - * there is, remove it under the assumption that we have now got all the - * data we need. - */ - segendpos.xrecoff /= XLOG_SEG_SIZE; - PrevLogSeg(segendpos.xlogid, segendpos.xrecoff); - snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial", - basedir, timeline, - segendpos.xlogid, - segendpos.xrecoff); - if (stat(fn, &statbuf) == 0) - { - /* File existed, get rid of it */ - if (verbose) - fprintf(stderr, _("%s: removing file \"%s\"\n"), - progname, fn); - unlink(fn); - } - /* * Never abort from this - we handle all aborting in continue_streaming() */ @@ -119,9 +96,8 @@ continue_streaming(void) /* * Determine starting location for streaming, based on: * 1. If there are existing xlog segments, start at the end of the last one - * 2. If the last one is a partial segment, rename it and start over, since - * we don't sync after every write. - * 3. If no existing xlog exists, start from the beginning of the current + * that is complete (size matches XLogSegSize) + * 2. If no valid xlog exists, start from the beginning of the current * WAL segment. */ static XLogRecPtr @@ -133,7 +109,6 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) bool b; uint32 high_log = 0; uint32 high_seg = 0; - bool partial = false; dir = opendir(basedir); if (dir == NULL) @@ -195,7 +170,7 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) disconnect_and_exit(1); } - if (statbuf.st_size == 16 * 1024 * 1024) + if (statbuf.st_size == XLOG_SEG_SIZE) { /* Completed segment */ if (log > high_log || @@ -208,37 +183,9 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) } else { - /* - * This is a partial file. Rename it out of the way. - */ - char newfn[MAXPGPATH]; - - fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"), - progname, dirent->d_name, dirent->d_name); - - snprintf(newfn, sizeof(newfn), "%s/%s.partial", - basedir, dirent->d_name); - - if (stat(newfn, &statbuf) == 0) - { - /* - * XXX: perhaps we should only error out if the existing file - * is larger? - */ - fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"), - progname, newfn); - disconnect_and_exit(1); - } - if (rename(fullpath, newfn) != 0) - { - fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"), - progname, fullpath, newfn, strerror(errno)); - disconnect_and_exit(1); - } - - /* Don't continue looking for more, we assume this is the last */ - partial = true; - break; + fprintf(stderr, _("%s: segment file '%s' is incorrect size %d, skipping\n"), + progname, dirent->d_name, (int) statbuf.st_size); + continue; } } @@ -247,17 +194,11 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) if (high_log > 0 || high_seg > 0) { XLogRecPtr high_ptr; - - if (!partial) - { - /* - * If the segment was partial, the pointer is already at the right - * location since we want to re-transmit that segment. If it was - * not, we need to move it to the next segment, since we are - * tracking the last one that was complete. - */ - NextLogSeg(high_log, high_seg); - } + /* + * Move the starting pointer to the start of the next segment, + * since the highest one we've seen was completed. + */ + NextLogSeg(high_log, high_seg); high_ptr.xlogid = high_log; high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 0ca30c425f32559bd50fe55aa7ee944b3d54d014..dea944beb947d42ef4f75b11cf23cc4ef22b66e0 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -27,6 +27,7 @@ #include "receivelog.h" #include "streamutil.h" +#include <sys/stat.h> #include <sys/time.h> #include <sys/types.h> #include <unistd.h> @@ -41,24 +42,128 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0}; * Open a new WAL file in the specified directory. Store the name * (not including the full directory) in namebuf. Assumes there is * enough room in this buffer... + * + * The file will be padded to 16Mb with zeroes. */ static int open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf) { int f; char fn[MAXPGPATH]; + struct stat statbuf; + char *zerobuf; + int bytes; XLogFileName(namebuf, timeline, startpoint.xlogid, startpoint.xrecoff / XLOG_SEG_SIZE); - snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf); - f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666); + snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf); + f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); if (f == -1) + { fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"), - progname, namebuf, strerror(errno)); + progname, fn, strerror(errno)); + return -1; + } + + /* + * Verify that the file is either empty (just created), or a complete + * XLogSegSize segment. Anything in between indicates a corrupt file. + */ + if (fstat(f, &statbuf) != 0) + { + fprintf(stderr, _("%s: could not stat WAL segment %s: %s\n"), + progname, fn, strerror(errno)); + close(f); + return -1; + } + if (statbuf.st_size == XLogSegSize) + return f; /* File is open and ready to use */ + if (statbuf.st_size != 0) + { + fprintf(stderr, _("%s: WAL segment %s is %d bytes, should be 0 or %d\n"), + progname, fn, (int) statbuf.st_size, XLogSegSize); + close(f); + return -1; + } + + /* New, empty, file. So pad it to 16Mb with zeroes */ + zerobuf = xmalloc0(XLOG_BLCKSZ); + for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ) + { + if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + { + fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"), + progname, fn, strerror(errno)); + close(f); + unlink(fn); + return -1; + } + } + free(zerobuf); + + if (lseek(f, SEEK_SET, 0) != 0) + { + fprintf(stderr, _("%s: could not seek back to beginning of WAL segment %s: %s\n"), + progname, fn, strerror(errno)); + close(f); + return -1; + } return f; } +static bool +close_walfile(int walfile, char *basedir, char *walname) +{ + off_t currpos = lseek(walfile, 0, SEEK_CUR); + + if (currpos == -1) + { + fprintf(stderr, _("%s: could not get current position in file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + + if (fsync(walfile) != 0) + { + fprintf(stderr, _("%s: could not fsync file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + + if (close(walfile) != 0) + { + fprintf(stderr, _("%s: could not close file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + + /* + * Rename the .partial file only if we've completed writing the + * whole segment. + */ + if (currpos == XLOG_SEG_SIZE) + { + char oldfn[MAXPGPATH]; + char newfn[MAXPGPATH]; + + snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname); + snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname); + if (rename(oldfn, newfn) != 0) + { + fprintf(stderr, _("%s: could not rename file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + } + else + fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"), + progname, walname); + + return true; +} + + /* * Local version of GetCurrentTimestamp(), since we are not linked with * backend code. @@ -178,10 +283,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi if (stream_continue && stream_continue()) { if (walfile != -1) - { - fsync(walfile); - close(walfile); - } + /* Potential error message is written by close_walfile */ + return close_walfile(walfile, basedir, current_walfile_name); return true; } @@ -360,8 +463,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi /* Did we reach the end of a WAL segment? */ if (blockpos.xrecoff % XLOG_SEG_SIZE == 0) { - fsync(walfile); - close(walfile); + if (!close_walfile(walfile, basedir, current_walfile_name)) + /* Error message written in close_walfile() */ + return false; + walfile = -1; xlogoff = 0;