From d9bae5317300cf983dd9f01cc2e561c0eecd109a Mon Sep 17 00:00:00 2001 From: Magnus Hagander <magnus@hagander.net> Date: Wed, 26 Oct 2011 20:13:33 +0200 Subject: [PATCH] Implement streaming xlog for backup tools Add option for parallel streaming of the transaction log while a base backup is running, to get the logfiles before the server has removed them. Also add a tool called pg_receivexlog, which streams the transaction log into files, creating a log archive without having to wait for segments to complete, thus decreasing the window of data loss without having to waste space using archive_timeout. This works best in combination with archive_command - suggested usage docs etc coming later. --- doc/src/sgml/ref/allfiles.sgml | 1 + doc/src/sgml/ref/pg_basebackup.sgml | 65 ++- doc/src/sgml/ref/pg_receivexlog.sgml | 270 +++++++++++++ doc/src/sgml/reference.sgml | 1 + src/bin/pg_basebackup/.gitignore | 1 + src/bin/pg_basebackup/Makefile | 15 +- src/bin/pg_basebackup/pg_basebackup.c | 538 ++++++++++++++++++------- src/bin/pg_basebackup/pg_receivexlog.c | 465 +++++++++++++++++++++ src/bin/pg_basebackup/receivelog.c | 398 ++++++++++++++++++ src/bin/pg_basebackup/receivelog.h | 22 + src/bin/pg_basebackup/streamutil.c | 165 ++++++++ src/bin/pg_basebackup/streamutil.h | 22 + src/tools/msvc/Mkvcbuild.pm | 7 + 13 files changed, 1805 insertions(+), 165 deletions(-) create mode 100644 doc/src/sgml/ref/pg_receivexlog.sgml create mode 100644 src/bin/pg_basebackup/pg_receivexlog.c create mode 100644 src/bin/pg_basebackup/receivelog.c create mode 100644 src/bin/pg_basebackup/receivelog.h create mode 100644 src/bin/pg_basebackup/streamutil.c create mode 100644 src/bin/pg_basebackup/streamutil.h diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index 8a8616b0008..382d297bdb2 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory. <!ENTITY pgCtl SYSTEM "pg_ctl-ref.sgml"> <!ENTITY pgDump SYSTEM "pg_dump.sgml"> <!ENTITY pgDumpall SYSTEM "pg_dumpall.sgml"> +<!ENTITY pgReceivexlog SYSTEM "pg_receivexlog.sgml"> <!ENTITY pgResetxlog SYSTEM "pg_resetxlog.sgml"> <!ENTITY pgRestore SYSTEM "pg_restore.sgml"> <!ENTITY postgres SYSTEM "postgres-ref.sgml"> diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index 25280045412..8c8c78f0d15 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -143,8 +143,8 @@ PostgreSQL documentation </varlistentry> <varlistentry> - <term><option>-x</option></term> - <term><option>--xlog</option></term> + <term><option>-x <replaceable class="parameter">method</replaceable></option></term> + <term><option>--xlog=<replaceable class="parameter">method</replaceable></option></term> <listitem> <para> Includes the required transaction log files (WAL files) in the @@ -154,16 +154,43 @@ PostgreSQL documentation to consult the log archive, thus making this a completely standalone backup. </para> - <note> - <para> - The transaction log files are collected at the end of the backup. - Therefore, it is necessary for the - <xref linkend="guc-wal-keep-segments"> parameter to be set high - enough that the log is not removed before the end of the backup. - If the log has been rotated when it's time to transfer it, the - backup will fail and be unusable. - </para> - </note> + <para> + The following methods for collecting the transaction logs are + supported: + + <variablelist> + <varlistentry> + <term><literal>f</literal></term> + <term><literal>fetch</literal></term> + <listitem> + <para> + The transaction log files are collected at the end of the backup. + Therefore, it is necessary for the + <xref linkend="guc-wal-keep-segments"> parameter to be set high + enough that the log is not removed before the end of the backup. + If the log has been rotated when it's time to transfer it, the + backup will fail and be unusable. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>s</literal></term> + <term><literal>stream</literal></term> + <listitem> + <para> + Stream the transaction log while the backup is created. This will + open a second connection to the server and start streaming the + transaction log in parallel while running the backup. Therefore, + it will use up two slots configured by the + <xref linkend="guc-max-wal-senders"> parameter. As long as the + client can keep up with transaction log received, using this mode + requires no extra transaction logs to be saved on the master. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> </listitem> </varlistentry> @@ -260,6 +287,20 @@ PostgreSQL documentation The following command-line options control the database connection parameters. <variablelist> + <varlistentry> + <term><option>-s <replaceable class="parameter">interval</replaceable></option></term> + <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term> + <listitem> + <para> + Specifies the number of seconds between status packets sent back to the + server. This is required when streaming the transaction log (using + <literal>--xlog=stream</literal>) if replication timeout is configured + on the server, and allows for easier monitoring. The default value is + 10 seconds. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><option>-h <replaceable class="parameter">host</replaceable></option></term> <term><option>--host=<replaceable class="parameter">host</replaceable></option></term> diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml new file mode 100644 index 00000000000..9a2a24ba2e3 --- /dev/null +++ b/doc/src/sgml/ref/pg_receivexlog.sgml @@ -0,0 +1,270 @@ +<!-- +doc/src/sgml/ref/pg_receivexlog.sgml +PostgreSQL documentation +--> + +<refentry id="app-pgreceivexlog"> + <refmeta> + <refentrytitle>pg_receivexlog</refentrytitle> + <manvolnum>1</manvolnum> + <refmiscinfo>Application</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>pg_receivexlog</refname> + <refpurpose>streams transaction logs from a <productname>PostgreSQL</productname> cluster</refpurpose> + </refnamediv> + + <indexterm zone="app-pgreceivexlog"> + <primary>pg_receivexlog</primary> + </indexterm> + + <refsynopsisdiv> + <cmdsynopsis> + <command>pg_receivexlog</command> + <arg rep="repeat"><replaceable>option</></arg> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1> + <title> + Description + </title> + <para> + <application>pg_receivexlog</application> is used to stream transaction log + from a running <productname>PostgreSQL</productname> cluster. The transaction + log is streamed using the streaming replication protocol, and is written + to a local directory of files. This directory can be used as the archive + location for doing a restore using point-in-time recovery (see + <xref linkend="continuous-archiving">). + </para> + + <para> + <application>pg_receivexlog</application> streams the transaction + log in real time as it's being generated on the server, and does not wait + for segments to complete like <xref linkend="guc-archive-command"> does. + For this reason, it is not necessary to set + <xref linkend="guc-archive-timeout"> when using + <application>pg_receivexlog</application>. + </para> + + <para> + The transaction log is streamed over a regular + <productname>PostgreSQL</productname> connection, and uses the + replication protocol. The connection must be + made with a user having <literal>REPLICATION</literal> permissions (see + <xref linkend="role-attributes">), and the user must be granted explicit + permissions in <filename>pg_hba.conf</filename>. The server must also + be configured with <xref linkend="guc-max-wal-senders"> set high enough + to leave at least one session available for the stream. + </para> + </refsect1> + + <refsect1> + <title>Options</title> + + <para> + The following command-line options control the location and format of the + output. + + <variablelist> + <varlistentry> + <term><option>-D <replaceable class="parameter">directory</replaceable></option></term> + <term><option>--dir=<replaceable class="parameter">directory</replaceable></option></term> + <listitem> + <para> + Directory to write the output to. + </para> + <para> + This parameter is required. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + <para> + The following command-line options control the running of the program. + + <variablelist> + <varlistentry> + <term><option>-v</option></term> + <term><option>--verbose</option></term> + <listitem> + <para> + Enables verbose mode. + </para> + </listitem> + </varlistentry> + + </variablelist> + </para> + + <para> + The following command-line options control the database connection parameters. + + <variablelist> + <varlistentry> + <term><option>-s <replaceable class="parameter">interval</replaceable></option></term> + <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term> + <listitem> + <para> + Specifies the number of seconds between status packets sent back to the + server. This is required if replication timeout is configured on the + server, and allows for easier monitoring. The default value is + 10 seconds. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-h <replaceable class="parameter">host</replaceable></option></term> + <term><option>--host=<replaceable class="parameter">host</replaceable></option></term> + <listitem> + <para> + Specifies the host name of the machine on which the server is + running. If the value begins with a slash, it is used as the + directory for the Unix domain socket. The default is taken + from the <envar>PGHOST</envar> environment variable, if set, + else a Unix domain socket connection is attempted. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-p <replaceable class="parameter">port</replaceable></option></term> + <term><option>--port=<replaceable class="parameter">port</replaceable></option></term> + <listitem> + <para> + Specifies the TCP port or local Unix domain socket file + extension on which the server is listening for connections. + Defaults to the <envar>PGPORT</envar> environment variable, if + set, or a compiled-in default. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-U <replaceable>username</replaceable></option></term> + <term><option>--username=<replaceable class="parameter">username</replaceable></option></term> + <listitem> + <para> + User name to connect as. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-w</></term> + <term><option>--no-password</></term> + <listitem> + <para> + Never issue a password prompt. If the server requires + password authentication and a password is not available by + other means such as a <filename>.pgpass</filename> file, the + connection attempt will fail. This option can be useful in + batch jobs and scripts where no user is present to enter a + password. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-W</option></term> + <term><option>--password</option></term> + <listitem> + <para> + Force <application>pg_receivexlog</application> to prompt for a + password before connecting to a database. + </para> + + <para> + This option is never essential, since + <application>pg_receivexlog</application> will automatically prompt + for a password if the server demands password authentication. + However, <application>pg_receivexlog</application> will waste a + connection attempt finding out that the server wants a password. + In some cases it is worth typing <option>-W</> to avoid the extra + connection attempt. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + + <para> + Other, less commonly used, parameters are also available: + + <variablelist> + <varlistentry> + <term><option>-V</></term> + <term><option>--version</></term> + <listitem> + <para> + Print the <application>pg_receivexlog</application> version and exit. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-?</></term> + <term><option>--help</></term> + <listitem> + <para> + Show help about <application>pg_receivexlog</application> command line + arguments, and exit. + </para> + </listitem> + </varlistentry> + + </variablelist> + </para> + + </refsect1> + + <refsect1> + <title>Environment</title> + + <para> + This utility, like most other <productname>PostgreSQL</> utilities, + uses the environment variables supported by <application>libpq</> + (see <xref linkend="libpq-envars">). + </para> + + </refsect1> + + <refsect1> + <title>Notes</title> + + <para> + When using <application>pg_receivexlog</application> instead of + <xref linkend="guc-archive-command">, the server will continue to + recycle transaction log files even if the backups are not properly + archived, since there is no command that fails. This can be worked + around by having an <xref linkend="guc-archive-command"> that fails + when the file has not been properly archived yet. + </para> + + </refsect1> + + <refsect1> + <title>Examples</title> + + <para> + To stream the transaction log from the server at + <literal>mydbserver</literal> and store it in the local directory + <filename>/usr/local/pgsql/archive</filename>: + <screen> + <prompt>$</prompt> <userinput>pg_receivexlog -h mydbserver -D /home/pgbackup/archive</userinput> + </screen> + </para> + </refsect1> + + <refsect1> + <title>See Also</title> + + <simplelist type="inline"> + <member><xref linkend="APP-PGBASEBACKUP"></member> + </simplelist> + </refsect1> + +</refentry> diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index 5fd6410991d..7326519708e 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -220,6 +220,7 @@ &pgConfig; &pgDump; &pgDumpall; + &pgReceivexlog; &pgRestore; &psqlRef; &reindexdb; diff --git a/src/bin/pg_basebackup/.gitignore b/src/bin/pg_basebackup/.gitignore index a2510fbd826..49f503a4556 100644 --- a/src/bin/pg_basebackup/.gitignore +++ b/src/bin/pg_basebackup/.gitignore @@ -1 +1,2 @@ /pg_basebackup +/pg_receivexlog \ No newline at end of file diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index ccb15025ef5..464506560d6 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -18,21 +18,26 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) -OBJS= pg_basebackup.o $(WIN32RES) +OBJS=receivelog.o streamutil.o $(WIN32RES) -all: pg_basebackup +all: pg_basebackup pg_receivexlog -pg_basebackup: $(OBJS) | submake-libpq submake-libpgport - $(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) +pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport + $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) + +pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport + $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) install: all installdirs $(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)' + $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)' installdirs: $(MKDIR_P) '$(DESTDIR)$(bindir)' uninstall: rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)' + rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)' clean distclean maintainer-clean: - rm -f pg_basebackup$(X) $(OBJS) + rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 5c62be576ee..68e40f478ff 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -11,12 +11,20 @@ *------------------------------------------------------------------------- */ -#include "postgres_fe.h" +/* + * We have to use postgres.h not postgres_fe.h here, because there's so much + * backend-only stuff in the XLOG include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ +#define FRONTEND 1 +#include "postgres.h" #include "libpq-fe.h" #include <unistd.h> #include <dirent.h> #include <sys/stat.h> +#include <sys/types.h> +#include <sys/wait.h> #ifdef HAVE_LIBZ #include <zlib.h> @@ -24,9 +32,11 @@ #include "getopt_long.h" +#include "receivelog.h" +#include "streamutil.h" + /* Global options */ -static const char *progname; char *basedir = NULL; char format = 'p'; /* p(lain)/t(ar) */ char *label = "pg_basebackup base backup"; @@ -34,38 +44,38 @@ bool showprogress = false; int verbose = 0; int compresslevel = 0; bool includewal = false; +bool streamwal = false; bool fastcheckpoint = false; -char *dbhost = NULL; -char *dbuser = NULL; -char *dbport = NULL; -int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */ +int standby_message_timeout = 10; /* 10 sec = default */ /* Progress counters */ static uint64 totalsize; static uint64 totaldone; static int tablespacecount; -/* Connection kept global so we can disconnect easily */ -static PGconn *conn = NULL; +/* Pipe to communicate with background wal receiver process */ +#ifndef WIN32 +static int bgpipe[2] = {-1, -1}; +#endif -#define disconnect_and_exit(code) \ - { \ - if (conn != NULL) PQfinish(conn); \ - exit(code); \ - } +/* Handle to child process */ +static pid_t bgchild = -1; + +/* End position for xlog streaming, empty string if unknown yet */ +static XLogRecPtr xlogendptr; +static int has_xlogendptr = 0; /* Function headers */ -static char *xstrdup(const char *s); -static void *xmalloc0(int size); static void usage(void); static void verify_dir_is_empty_or_create(char *dirname); static void progress_report(int tablespacenum, const char *filename); -static PGconn *GetConnection(void); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void BaseBackup(void); +static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); + #ifdef HAVE_LIBZ static const char * get_gz_error(gzFile *gzf) @@ -81,39 +91,6 @@ get_gz_error(gzFile *gzf) } #endif -/* - * strdup() and malloc() replacements that prints an error and exits - * if something goes wrong. Can never return NULL. - */ -static char * -xstrdup(const char *s) -{ - char *result; - - result = strdup(s); - if (!result) - { - fprintf(stderr, _("%s: out of memory\n"), progname); - exit(1); - } - return result; -} - -static void * -xmalloc0(int size) -{ - void *result; - - result = malloc(size); - if (!result) - { - fprintf(stderr, _("%s: out of memory\n"), progname); - exit(1); - } - MemSet(result, 0, size); - return result; -} - static void usage(void) @@ -125,7 +102,7 @@ usage(void) printf(_("\nOptions controlling the output:\n")); printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n")); printf(_(" -F, --format=p|t output format (plain, tar)\n")); - printf(_(" -x, --xlog include required WAL files in backup\n")); + printf(_(" -x, --xlog=fetch|stream include required WAL files in backup\n")); printf(_(" -z, --gzip compress tar output\n")); printf(_(" -Z, --compress=0-9 compress tar output with given compression level\n")); printf(_("\nGeneral options:\n")); @@ -137,6 +114,7 @@ usage(void) printf(_(" --help show this help, then exit\n")); printf(_(" --version output version information, then exit\n")); printf(_("\nConnection options:\n")); + printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); printf(_(" -p, --port=PORT database server port number\n")); printf(_(" -U, --username=NAME connect as specified database user\n")); @@ -146,6 +124,199 @@ usage(void) } +/* + * Called in the background process whenever a complete segment of WAL + * has been received. + * On Unix, we check to see if there is any data on our pipe + * (which would mean we have a stop position), and if it is, check if + * it is time to stop. + * On Windows, we are in a single process, so we can just check if it's + * time to stop. + */ +static bool +segment_callback(XLogRecPtr segendpos, uint32 timeline) +{ + if (!has_xlogendptr) + { +#ifndef WIN32 + fd_set fds; + struct timeval tv; + int r; + + /* + * Don't have the end pointer yet - check our pipe to see if it has + * been sent yet. + */ + FD_ZERO(&fds); + FD_SET(bgpipe[0], &fds); + + MemSet(&tv, 0, sizeof(tv)); + + r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv); + if (r == 1) + { + char xlogend[64]; + + MemSet(xlogend, 0, sizeof(xlogend)); + r = piperead(bgpipe[0], xlogend, sizeof(xlogend)); + if (r < 0) + { + fprintf(stderr, _("%s: could not read from ready pipe: %s\n"), + progname, strerror(errno)); + exit(1); + } + + if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2) + { + fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"), + progname, xlogend); + exit(1); + } + has_xlogendptr = 1; + + /* + * Fall through to check if we've reached the point further + * already. + */ + } + else + { + /* + * No data received on the pipe means we don't know the end + * position yet - so just say it's not time to stop yet. + */ + return false; + } +#else + + /* + * On win32, has_xlogendptr is set by the main thread, so if it's not + * set here, we just go back and wait until it shows up. + */ + return false; +#endif + } + + /* + * At this point we have an end pointer, so compare it to the current + * position to figure out if it's time to stop. + */ + if (segendpos.xlogid > xlogendptr.xlogid || + (segendpos.xlogid == xlogendptr.xlogid && + segendpos.xrecoff >= xlogendptr.xrecoff)) + return true; + + /* + * Have end pointer, but haven't reached it yet - so tell the caller to + * keep streaming. + */ + return false; +} + +typedef struct +{ + PGconn *bgconn; + XLogRecPtr startptr; + char xlogdir[MAXPGPATH]; + char *sysidentifier; + int timeline; +} logstreamer_param; + +static int +LogStreamerMain(logstreamer_param * param) +{ + if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, + param->sysidentifier, param->xlogdir, + segment_callback, NULL, standby_message_timeout)) + + /* + * Any errors will already have been reported in the function process, + * but we need to tell the parent that we didn't shutdown in a nice + * way. + */ + return 1; + + PQfinish(param->bgconn); + return 0; +} + +/* + * Initiate background process for receiving xlog during the backup. + * The background stream will use its own database connection so we can + * stream the logfile in parallel with the backups. + */ +static void +StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) +{ + logstreamer_param *param; + + param = xmalloc0(sizeof(logstreamer_param)); + param->timeline = timeline; + param->sysidentifier = sysidentifier; + + /* Convert the starting position */ + if (sscanf(startpos, "%X/%X", ¶m->startptr.xlogid, ¶m->startptr.xrecoff) != 2) + { + fprintf(stderr, _("%s: invalid format of xlog location: %s\n"), + progname, startpos); + disconnect_and_exit(1); + } + /* Round off to even segment position */ + param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE; + +#ifndef WIN32 + /* Create our background pipe */ + if (pgpipe(bgpipe) < 0) + { + fprintf(stderr, _("%s: could not create pipe for background process: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } +#endif + + /* Get a second connection */ + param->bgconn = GetConnection(); + + /* + * Always in plain format, so we can write to basedir/pg_xlog. But the + * directory entry in the tar file may arrive later, so make sure it's + * created before we start. + */ + snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir); + verify_dir_is_empty_or_create(param->xlogdir); + + /* + * Start a child process and tell it to start streaming. On Unix, this is + * a fork(). On Windows, we create a thread. + */ +#ifndef WIN32 + bgchild = fork(); + if (bgchild == 0) + { + /* in child process */ + exit(LogStreamerMain(param)); + } + else if (bgchild < 0) + { + fprintf(stderr, _("%s: could not create background process: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + + /* + * Else we are in the parent process and all is well. + */ +#else /* WIN32 */ + bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL); + if (bgchild == 0) + { + fprintf(stderr, _("%s: could not create background thread: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } +#endif +} + /* * Verify that the given directory exists and is empty. If it does not * exist, it is created. If it exists but is not empty, an error will @@ -502,11 +673,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) else strcpy(current_path, PQgetvalue(res, rownum, 1)); - /* - * Make sure we're unpacking into an empty directory - */ - verify_dir_is_empty_or_create(current_path); - /* * Get the COPY data */ @@ -597,13 +763,21 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) /* * Directory */ - filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ + filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ if (mkdir(filename, S_IRWXU) != 0) { - fprintf(stderr, + /* + * When streaming WAL, pg_xlog will have been created + * by the wal receiver process, so just ignore failure + * on that. + */ + if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0) + { + fprintf(stderr, _("%s: could not create directory \"%s\": %s\n"), - progname, filename, strerror(errno)); - disconnect_and_exit(1); + progname, filename, strerror(errno)); + disconnect_and_exit(1); + } } #ifndef WIN32 if (chmod(filename, (mode_t) filemode)) @@ -616,12 +790,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) /* * Symbolic link */ - filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ + filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ if (symlink(©buf[157], filename) != 0) { fprintf(stderr, _("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"), - progname, filename, ©buf[157], strerror(errno)); + progname, filename, ©buf[157], strerror(errno)); disconnect_and_exit(1); } } @@ -714,94 +888,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) } -static PGconn * -GetConnection(void) -{ - PGconn *tmpconn; - int argcount = 4; /* dbname, replication, fallback_app_name, - * password */ - int i; - const char **keywords; - const char **values; - char *password = NULL; - - if (dbhost) - argcount++; - if (dbuser) - argcount++; - if (dbport) - argcount++; - - keywords = xmalloc0((argcount + 1) * sizeof(*keywords)); - values = xmalloc0((argcount + 1) * sizeof(*values)); - - keywords[0] = "dbname"; - values[0] = "replication"; - keywords[1] = "replication"; - values[1] = "true"; - keywords[2] = "fallback_application_name"; - values[2] = progname; - i = 3; - if (dbhost) - { - keywords[i] = "host"; - values[i] = dbhost; - i++; - } - if (dbuser) - { - keywords[i] = "user"; - values[i] = dbuser; - i++; - } - if (dbport) - { - keywords[i] = "port"; - values[i] = dbport; - i++; - } - - while (true) - { - if (dbgetpassword == 1) - { - /* Prompt for a password */ - password = simple_prompt(_("Password: "), 100, false); - keywords[argcount - 1] = "password"; - values[argcount - 1] = password; - } - - tmpconn = PQconnectdbParams(keywords, values, true); - if (password) - free(password); - - if (PQstatus(tmpconn) == CONNECTION_BAD && - PQconnectionNeedsPassword(tmpconn) && - dbgetpassword != -1) - { - dbgetpassword = 1; /* ask for password next time */ - PQfinish(tmpconn); - continue; - } - - if (PQstatus(tmpconn) != CONNECTION_OK) - { - fprintf(stderr, _("%s: could not connect to server: %s"), - progname, PQerrorMessage(tmpconn)); - exit(1); - } - - /* Connection ok! */ - free(values); - free(keywords); - return tmpconn; - } -} - static void BaseBackup(void) { PGresult *res; + char *sysidentifier; + uint32 timeline; char current_path[MAXPGPATH]; char escaped_label[MAXPGPATH]; int i; @@ -813,6 +905,26 @@ BaseBackup(void) */ conn = GetConnection(); + /* + * Run IDENTIFY_SYSTEM so we can get the timeline + */ + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not identify system: %s\n"), + progname, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + if (PQntuples(res) != 1) + { + fprintf(stderr, _("%s: could not identify system, got %i rows\n"), + progname, PQntuples(res)); + disconnect_and_exit(1); + } + sysidentifier = strdup(PQgetvalue(res, 0, 0)); + timeline = atoi(PQgetvalue(res, 0, 1)); + PQclear(res); + /* * Start the actual backup */ @@ -820,7 +932,7 @@ BaseBackup(void) snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s", escaped_label, showprogress ? "PROGRESS" : "", - includewal ? "WAL" : "", + includewal && !streamwal ? "WAL" : "", fastcheckpoint ? "FAST" : "", includewal ? "NOWAIT" : ""); @@ -898,6 +1010,18 @@ BaseBackup(void) disconnect_and_exit(1); } + /* + * If we're streaming WAL, start the streaming session before we start + * receiving the actual data chunks. + */ + if (streamwal) + { + if (verbose) + fprintf(stderr, _("%s: starting background WAL receiver\n"), + progname); + StartLogStreamer(xlogstart, timeline, sysidentifier); + } + /* * Start receiving chunks */ @@ -945,6 +1069,92 @@ BaseBackup(void) disconnect_and_exit(1); } + if (bgchild > 0) + { + int status; + +#ifndef WIN32 + int r; +#endif + + if (verbose) + fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname); + +#ifndef WIN32 + if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend)) + { + fprintf(stderr, _("%s: could not send command to background pipe: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + + /* Just wait for the background process to exit */ + r = waitpid(bgchild, &status, 0); + if (r == -1) + { + fprintf(stderr, _("%s: could not wait for child process: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + if (r != bgchild) + { + fprintf(stderr, _("%s: child %i died, expected %i\n"), + progname, r, bgchild); + disconnect_and_exit(1); + } + if (!WIFEXITED(status)) + { + fprintf(stderr, _("%s: child process did not exit normally\n"), + progname); + disconnect_and_exit(1); + } + if (WEXITSTATUS(status) != 0) + { + fprintf(stderr, _("%s: child process exited with error %i\n"), + progname, WEXITSTATUS(status)); + disconnect_and_exit(1); + } + /* Exited normally, we're happy! */ +#else /* WIN32 */ + + /* + * On Windows, since we are in the same process, we can just store the + * value directly in the variable, and then set the flag that says + * it's there. + */ + if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2) + { + fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"), + progname, xlogend); + exit(1); + } + InterlockedIncrement(&has_xlogendptr); + + /* First wait for the thread to exit */ + if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0) + { + _dosmaperr(GetLastError()); + fprintf(stderr, _("%s: could not wait for child thread: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + if (GetExitCodeThread((HANDLE) bgchild, &status) == 0) + { + _dosmaperr(GetLastError()); + fprintf(stderr, _("%s: could not get child thread exit status: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + if (status != 0) + { + fprintf(stderr, _("%s: child thread exited with error %u\n"), + progname, status); + disconnect_and_exit(1); + } + /* Exited normally, we're happy */ +#endif + } + /* * End of copy data. Final result is already checked inside the loop. */ @@ -964,7 +1174,7 @@ main(int argc, char **argv) {"pgdata", required_argument, NULL, 'D'}, {"format", required_argument, NULL, 'F'}, {"checkpoint", required_argument, NULL, 'c'}, - {"xlog", no_argument, NULL, 'x'}, + {"xlog", required_argument, NULL, 'x'}, {"gzip", no_argument, NULL, 'z'}, {"compress", required_argument, NULL, 'Z'}, {"label", required_argument, NULL, 'l'}, @@ -973,6 +1183,7 @@ main(int argc, char **argv) {"username", required_argument, NULL, 'U'}, {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, + {"statusint", required_argument, NULL, 's'}, {"verbose", no_argument, NULL, 'v'}, {"progress", no_argument, NULL, 'P'}, {NULL, 0, NULL, 0} @@ -999,7 +1210,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "D:F:xl:zZ:c:h:p:U:wWvP", + while ((c = getopt_long(argc, argv, "D:F:x:l:zZ:c:h:p:U:s:wWvP", long_options, &option_index)) != -1) { switch (c) @@ -1021,6 +1232,18 @@ main(int argc, char **argv) break; case 'x': includewal = true; + if (strcmp(optarg, "f") == 0 || + strcmp(optarg, "fetch") == 0) + streamwal = false; + else if (strcmp(optarg, "s") == 0 || + strcmp(optarg, "stream") == 0) + streamwal = true; + else + { + fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty, \"fetch\" or \"stream\"\n"), + progname, optarg); + exit(1); + } break; case 'l': label = xstrdup(optarg); @@ -1068,6 +1291,15 @@ main(int argc, char **argv) case 'W': dbgetpassword = 1; break; + case 's': + standby_message_timeout = atoi(optarg); + if (standby_message_timeout < 0) + { + fprintf(stderr, _("%s: invalid status interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; case 'v': verbose++; break; @@ -1122,6 +1354,16 @@ main(int argc, char **argv) exit(1); } + if (format != 'p' && streamwal) + { + fprintf(stderr, + _("%s: wal streaming can only be used in plain mode\n"), + progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + #ifndef HAVE_LIBZ if (compresslevel != 0) { diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c new file mode 100644 index 00000000000..ba533d35978 --- /dev/null +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -0,0 +1,465 @@ +/*------------------------------------------------------------------------- + * + * pg_receivexlog.c - receive streaming transaction log data and write it + * to a local file. + * + * Author: Magnus Hagander <magnus@hagander.net> + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/pg_receivexlog.c + *------------------------------------------------------------------------- + */ + +/* + * We have to use postgres.h not postgres_fe.h here, because there's so much + * backend-only stuff in the XLOG include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ +#define FRONTEND 1 +#include "postgres.h" +#include "libpq-fe.h" +#include "libpq/pqsignal.h" +#include "access/xlog_internal.h" + +#include "receivelog.h" +#include "streamutil.h" + +#include <dirent.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> + +#include "getopt_long.h" + +/* Global options */ +char *basedir = NULL; +int verbose = 0; +int standby_message_timeout = 10; /* 10 sec = default */ +volatile bool time_to_abort = false; + + +static void usage(void); +static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline); +static void StreamLog(); +static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); + +static void +usage(void) +{ + printf(_("%s receives PostgreSQL streaming transaction logs\n\n"), + progname); + printf(_("Usage:\n")); + printf(_(" %s [OPTION]...\n"), progname); + printf(_("\nOptions controlling the output:\n")); + printf(_(" -D, --dir=directory receive xlog files into this directory\n")); + printf(_("\nGeneral options:\n")); + printf(_(" -v, --verbose output verbose messages\n")); + printf(_(" -?, --help show this help, then exit\n")); + printf(_(" -V, --version output version information, then exit\n")); + printf(_("\nConnection options:\n")); + printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n")); + printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); + printf(_(" -p, --port=PORT database server port number\n")); + printf(_(" -U, --username=NAME connect as specified database user\n")); + printf(_(" -w, --no-password never prompt for password\n")); + printf(_(" -W, --password force password prompt (should happen automatically)\n")); + printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n")); +} + +static bool +segment_callback(XLogRecPtr segendpos, uint32 timeline) +{ + char fn[MAXPGPATH]; + struct stat statbuf; + + if (verbose) + fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), + progname, segendpos.xlogid, segendpos.xrecoff, timeline); + + /* + * Check if there is a partial file for the name we just finished, and if + * there is, remove it under the assumption that we have now got all the + * data we need. + */ + segendpos.xrecoff /= XLOG_SEG_SIZE; + PrevLogSeg(segendpos.xlogid, segendpos.xrecoff); + snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial", + basedir, timeline, + segendpos.xlogid, + segendpos.xrecoff); + if (stat(fn, &statbuf) == 0) + { + /* File existed, get rid of it */ + if (verbose) + fprintf(stderr, _("%s: removing file \"%s\"\n"), + progname, fn); + unlink(fn); + } + + /* + * Never abort from this - we handle all aborting in continue_streaming() + */ + return false; +} + +static bool +continue_streaming(void) +{ + if (time_to_abort) + { + fprintf(stderr, _("%s: received interrupt signal, exiting.\n"), + progname); + return true; + } + return false; +} + +/* + * Determine starting location for streaming, based on: + * 1. If there are existing xlog segments, start at the end of the last one + * 2. If the last one is a partial segment, rename it and start over, since + * we don't sync after every write. + * 3. If no existing xlog exists, start from the beginning of the current + * WAL segment. + */ +static XLogRecPtr +FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) +{ + DIR *dir; + struct dirent *dirent; + int i; + bool b; + uint32 high_log = 0; + uint32 high_seg = 0; + bool partial = false; + + dir = opendir(basedir); + if (dir == NULL) + { + fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), + progname, basedir, strerror(errno)); + disconnect_and_exit(1); + } + + while ((dirent = readdir(dir)) != NULL) + { + char fullpath[MAXPGPATH]; + struct stat statbuf; + uint32 tli, + log, + seg; + + if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, "..")) + continue; + + /* xlog files are always 24 characters */ + if (strlen(dirent->d_name) != 24) + continue; + + /* Filenames are always made out of 0-9 and A-F */ + b = false; + for (i = 0; i < 24; i++) + { + if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') && + !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F')) + { + b = true; + break; + } + } + if (b) + continue; + + /* + * Looks like an xlog file. Parse its position. + */ + if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3) + { + fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"), + progname, dirent->d_name); + disconnect_and_exit(1); + } + + /* Ignore any files that are for another timeline */ + if (tli != currenttimeline) + continue; + + /* Check if this is a completed segment or not */ + snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name); + if (stat(fullpath, &statbuf) != 0) + { + fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"), + progname, fullpath, strerror(errno)); + disconnect_and_exit(1); + } + + if (statbuf.st_size == 16 * 1024 * 1024) + { + /* Completed segment */ + if (log > high_log || + (log == high_log && seg > high_seg)) + { + high_log = log; + high_seg = seg; + continue; + } + } + else + { + /* + * This is a partial file. Rename it out of the way. + */ + char newfn[MAXPGPATH]; + + fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"), + progname, dirent->d_name, dirent->d_name); + + snprintf(newfn, sizeof(newfn), "%s/%s.partial", + basedir, dirent->d_name); + + if (stat(newfn, &statbuf) == 0) + { + /* + * XXX: perhaps we should only error out if the existing file + * is larger? + */ + fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"), + progname, newfn); + disconnect_and_exit(1); + } + if (rename(fullpath, newfn) != 0) + { + fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"), + progname, fullpath, newfn, strerror(errno)); + disconnect_and_exit(1); + } + + /* Don't continue looking for more, we assume this is the last */ + partial = true; + break; + } + } + + closedir(dir); + + if (high_log > 0 || high_seg > 0) + { + XLogRecPtr high_ptr; + + if (!partial) + { + /* + * If the segment was partial, the pointer is already at the right + * location since we want to re-transmit that segment. If it was + * not, we need to move it to the next segment, since we are + * tracking the last one that was complete. + */ + NextLogSeg(high_log, high_seg); + } + + high_ptr.xlogid = high_log; + high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE; + + return high_ptr; + } + else + return currentpos; +} + +/* + * Start the log streaming + */ +static void +StreamLog(void) +{ + PGresult *res; + uint32 timeline; + XLogRecPtr startpos; + + /* + * Connect in replication mode to the server + */ + conn = GetConnection(); + + /* + * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog + * position. + */ + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not identify system: %s\n"), + progname, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + if (PQntuples(res) != 1) + { + fprintf(stderr, _("%s: could not identify system, got %i rows\n"), + progname, PQntuples(res)); + disconnect_and_exit(1); + } + timeline = atoi(PQgetvalue(res, 0, 1)); + if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2) + { + fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"), + progname, PQgetvalue(res, 0, 2)); + disconnect_and_exit(1); + } + PQclear(res); + + /* + * Figure out where to start streaming. + */ + startpos = FindStreamingStart(startpos, timeline); + + /* + * Always start streaming at the beginning of a segment + */ + startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE; + + /* + * Start the replication + */ + if (verbose) + fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"), + progname, startpos.xlogid, startpos.xrecoff, timeline); + + ReceiveXlogStream(conn, startpos, timeline, NULL, basedir, + segment_callback, continue_streaming, + standby_message_timeout); +} + +/* + * When sigint is called, just tell the system to exit at the next possible + * moment. + */ +static void +sigint_handler(int signum) +{ + time_to_abort = true; +} + +int +main(int argc, char **argv) +{ + static struct option long_options[] = { + {"help", no_argument, NULL, '?'}, + {"version", no_argument, NULL, 'V'}, + {"dir", required_argument, NULL, 'D'}, + {"host", required_argument, NULL, 'h'}, + {"port", required_argument, NULL, 'p'}, + {"username", required_argument, NULL, 'U'}, + {"no-password", no_argument, NULL, 'w'}, + {"password", no_argument, NULL, 'W'}, + {"statusint", required_argument, NULL, 's'}, + {"verbose", no_argument, NULL, 'v'}, + {NULL, 0, NULL, 0} + }; + int c; + + int option_index; + + progname = get_progname(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog")); + + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(); + exit(0); + } + else if (strcmp(argv[1], "-V") == 0 + || strcmp(argv[1], "--version") == 0) + { + puts("pg_receivexlog (PostgreSQL) " PG_VERSION); + exit(0); + } + } + + while ((c = getopt_long(argc, argv, "D:h:p:U:s:wWv", + long_options, &option_index)) != -1) + { + switch (c) + { + case 'D': + basedir = xstrdup(optarg); + break; + case 'h': + dbhost = xstrdup(optarg); + break; + case 'p': + if (atoi(optarg) <= 0) + { + fprintf(stderr, _("%s: invalid port number \"%s\"\n"), + progname, optarg); + exit(1); + } + dbport = xstrdup(optarg); + break; + case 'U': + dbuser = xstrdup(optarg); + break; + case 'w': + dbgetpassword = -1; + break; + case 'W': + dbgetpassword = 1; + break; + case 's': + standby_message_timeout = atoi(optarg); + if (standby_message_timeout < 0) + { + fprintf(stderr, _("%s: invalid status interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; + case 'v': + verbose++; + break; + default: + + /* + * getopt_long already emitted a complaint + */ + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + } + + /* + * Any non-option arguments? + */ + if (optind < argc) + { + fprintf(stderr, + _("%s: too many command-line arguments (first is \"%s\")\n"), + progname, argv[optind]); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + /* + * Required arguments + */ + if (basedir == NULL) + { + fprintf(stderr, _("%s: no target directory specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + +#ifndef WIN32 + pqsignal(SIGINT, sigint_handler); +#endif + + StreamLog(); + + exit(0); +} diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c new file mode 100644 index 00000000000..0ca30c425f3 --- /dev/null +++ b/src/bin/pg_basebackup/receivelog.c @@ -0,0 +1,398 @@ +/*------------------------------------------------------------------------- + * + * receivelog.c - receive transaction log files using the streaming + * replication protocol. + * + * Author: Magnus Hagander <magnus@hagander.net> + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/receivelog.c + *------------------------------------------------------------------------- + */ + +/* + * We have to use postgres.h not postgres_fe.h here, because there's so much + * backend-only stuff in the XLOG include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ +#define FRONTEND 1 +#include "postgres.h" +#include "libpq-fe.h" +#include "access/xlog_internal.h" +#include "replication/walprotocol.h" +#include "utils/datetime.h" + +#include "receivelog.h" +#include "streamutil.h" + +#include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> + + +/* Size of the streaming replication protocol header */ +#define STREAMING_HEADER_SIZE (1+8+8+8) + +const XLogRecPtr InvalidXLogRecPtr = {0, 0}; + +/* + * Open a new WAL file in the specified directory. Store the name + * (not including the full directory) in namebuf. Assumes there is + * enough room in this buffer... + */ +static int +open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf) +{ + int f; + char fn[MAXPGPATH]; + + XLogFileName(namebuf, timeline, startpoint.xlogid, + startpoint.xrecoff / XLOG_SEG_SIZE); + + snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf); + f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666); + if (f == -1) + fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"), + progname, namebuf, strerror(errno)); + return f; +} + +/* + * Local version of GetCurrentTimestamp(), since we are not linked with + * backend code. + */ +static TimestampTz +localGetCurrentTimestamp(void) +{ + TimestampTz result; + struct timeval tp; + + gettimeofday(&tp, NULL); + + result = (TimestampTz) tp.tv_sec - + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + +#ifdef HAVE_INT64_TIMESTAMP + result = (result * USECS_PER_SEC) + tp.tv_usec; +#else + result = result + (tp.tv_usec / 1000000.0); +#endif + + return result; +} + +/* + * Receive a log stream starting at the specified position. + * + * If sysidentifier is specified, validate that both the system + * identifier and the timeline matches the specified ones + * (by sending an extra IDENTIFY_SYSTEM command) + * + * All received segments will be written to the directory + * specified by basedir. + * + * The segment_finish callback will be called after each segment + * has been finished, and the stream_continue callback will be + * called every time data is received. If either of these callbacks + * return true, the streaming will stop and the function + * return. As long as they return false, streaming will continue + * indefinitely. + * + * standby_message_timeout controls how often we send a message + * back to the master letting it know our progress, in seconds. + * This message will only contain the write location, and never + * flush or replay. + * + * Note: The log position *must* be at a log segment start! + */ +bool +ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout) +{ + char query[128]; + char current_walfile_name[MAXPGPATH]; + PGresult *res; + char *copybuf = NULL; + int walfile = -1; + int64 last_status = -1; + XLogRecPtr blockpos = InvalidXLogRecPtr; + + if (sysidentifier != NULL) + { + /* Validate system identifier and timeline hasn't changed */ + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not identify system: %s\n"), + progname, PQerrorMessage(conn)); + PQclear(res); + return false; + } + if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0) + { + fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), progname); + PQclear(res); + return false; + } + if (timeline != atoi(PQgetvalue(res, 0, 1))) + { + fprintf(stderr, _("%s: timeline does not match between base backup and streaming connection\n"), progname); + PQclear(res); + return false; + } + PQclear(res); + } + + /* Initiate the replication stream at specified location */ + snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff); + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_COPY_BOTH) + { + fprintf(stderr, _("%s: could not start replication: %s\n"), + progname, PQresultErrorMessage(res)); + return false; + } + PQclear(res); + + /* + * Receive the actual xlog data + */ + while (1) + { + int r; + int xlogoff; + int bytes_left; + int bytes_written; + int64 now; + + if (copybuf != NULL) + { + PQfreemem(copybuf); + copybuf = NULL; + } + + /* + * Check if we should continue streaming, or abort at this point. + */ + if (stream_continue && stream_continue()) + { + if (walfile != -1) + { + fsync(walfile); + close(walfile); + } + return true; + } + + /* + * Potentially send a status message to the master + */ + now = localGetCurrentTimestamp(); + if (standby_message_timeout > 0 && + last_status < now - standby_message_timeout * 1000000) + { + /* Time to send feedback! */ + char replybuf[sizeof(StandbyReplyMessage) + 1]; + StandbyReplyMessage *replymsg = (StandbyReplyMessage *) (replybuf + 1); + + replymsg->write = blockpos; + replymsg->flush = InvalidXLogRecPtr; + replymsg->apply = InvalidXLogRecPtr; + replymsg->sendTime = now; + replybuf[0] = 'r'; + + if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 || + PQflush(conn)) + { + fprintf(stderr, _("%s: could not send feedback packet: %s"), + progname, PQerrorMessage(conn)); + return false; + } + + last_status = now; + } + + r = PQgetCopyData(conn, ©buf, 1); + if (r == 0) + { + /* + * In async mode, and no data available. We block on reading but + * not more than the specified timeout, so that we can send a + * response back to the client. + */ + fd_set input_mask; + struct timeval timeout; + struct timeval *timeoutptr; + + FD_ZERO(&input_mask); + FD_SET(PQsocket(conn), &input_mask); + if (standby_message_timeout) + { + timeout.tv_sec = last_status + standby_message_timeout - now - 1; + if (timeout.tv_sec <= 0) + timeout.tv_sec = 1; /* Always sleep at least 1 sec */ + timeout.tv_usec = 0; + timeoutptr = &timeout; + } + else + timeoutptr = NULL; + + r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); + if (r == 0 || (r < 0 && errno == EINTR)) + { + /* + * Got a timeout or signal. Continue the loop and either + * deliver a status packet to the server or just go back into + * blocking. + */ + continue; + } + else if (r < 0) + { + fprintf(stderr, _("%s: select() failed: %m\n"), progname); + return false; + } + /* Else there is actually data on the socket */ + if (PQconsumeInput(conn) == 0) + { + fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"), + progname, PQerrorMessage(conn)); + return false; + } + continue; + } + if (r == -1) + /* End of copy stream */ + break; + if (r == -2) + { + fprintf(stderr, _("%s: could not read copy data: %s\n"), + progname, PQerrorMessage(conn)); + return false; + } + if (r < STREAMING_HEADER_SIZE + 1) + { + fprintf(stderr, _("%s: streaming header too small: %i\n"), + progname, r); + return false; + } + if (copybuf[0] != 'w') + { + fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), + progname, copybuf[0]); + return false; + } + + /* Extract WAL location for this block */ + memcpy(&blockpos, copybuf + 1, 8); + xlogoff = blockpos.xrecoff % XLOG_SEG_SIZE; + + /* + * Verify that the initial location in the stream matches where we + * think we are. + */ + if (walfile == -1) + { + /* No file open yet */ + if (xlogoff != 0) + { + fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"), + progname, xlogoff); + return false; + } + } + else + { + /* More data in existing segment */ + /* XXX: store seek value don't reseek all the time */ + if (lseek(walfile, 0, SEEK_CUR) != xlogoff) + { + fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"), + progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + return false; + } + } + + bytes_left = r - STREAMING_HEADER_SIZE; + bytes_written = 0; + + while (bytes_left) + { + int bytes_to_write; + + /* + * If crossing a WAL boundary, only write up until we reach + * XLOG_SEG_SIZE. + */ + if (xlogoff + bytes_left > XLOG_SEG_SIZE) + bytes_to_write = XLOG_SEG_SIZE - xlogoff; + else + bytes_to_write = bytes_left; + + if (walfile == -1) + { + walfile = open_walfile(blockpos, timeline, + basedir, current_walfile_name); + if (walfile == -1) + /* Error logged by open_walfile */ + return false; + } + + if (write(walfile, + copybuf + STREAMING_HEADER_SIZE + bytes_written, + bytes_to_write) != bytes_to_write) + { + fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"), + progname, + bytes_to_write, + current_walfile_name, + strerror(errno)); + return false; + } + + /* Write was successful, advance our position */ + bytes_written += bytes_to_write; + bytes_left -= bytes_to_write; + XLByteAdvance(blockpos, bytes_to_write); + xlogoff += bytes_to_write; + + /* Did we reach the end of a WAL segment? */ + if (blockpos.xrecoff % XLOG_SEG_SIZE == 0) + { + fsync(walfile); + close(walfile); + walfile = -1; + xlogoff = 0; + + if (segment_finish != NULL) + { + /* + * Callback when the segment finished, and return if it + * told us to. + */ + if (segment_finish(blockpos, timeline)) + return true; + } + } + } + /* No more data left to write, start receiving next copy packet */ + } + + /* + * The only way to get out of the loop is if the server shut down the + * replication stream. If it's a controlled shutdown, the server will send + * a shutdown message, and we'll return the latest xlog location that has + * been streamed. + */ + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"), + progname, PQresultErrorMessage(res)); + return false; + } + PQclear(res); + return true; +} diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h new file mode 100644 index 00000000000..1c61ea8ac1d --- /dev/null +++ b/src/bin/pg_basebackup/receivelog.h @@ -0,0 +1,22 @@ +#include "access/xlogdefs.h" + +/* + * Called whenever a segment is finished, return true to stop + * the streaming at this point. + */ +typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline); + +/* + * Called before trying to read more data. Return true to stop + * the streaming at this point. + */ +typedef bool (*stream_continue_callback)(void); + +extern bool ReceiveXlogStream(PGconn *conn, + XLogRecPtr startpos, + uint32 timeline, + char *sysidentifier, + char *basedir, + segment_finish_callback segment_finish, + stream_continue_callback stream_continue, + int standby_message_timeout); diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c new file mode 100644 index 00000000000..812343118c5 --- /dev/null +++ b/src/bin/pg_basebackup/streamutil.c @@ -0,0 +1,165 @@ +/*------------------------------------------------------------------------- + * + * streamutil.c - utility functions for pg_basebackup and pg_receivelog + * + * Author: Magnus Hagander <magnus@hagander.net> + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/streamutil.c + *------------------------------------------------------------------------- + */ + +/* + * We have to use postgres.h not postgres_fe.h here, because there's so much + * backend-only stuff in the XLOG include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ +#define FRONTEND 1 +#include "postgres.h" +#include "streamutil.h" + +#include <stdio.h> +#include <string.h> + +const char *progname; +char *dbhost = NULL; +char *dbuser = NULL; +char *dbport = NULL; +int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */ +static char *dbpassword = NULL; +PGconn *conn = NULL; + +/* + * strdup() and malloc() replacements that prints an error and exits + * if something goes wrong. Can never return NULL. + */ +char * +xstrdup(const char *s) +{ + char *result; + + result = strdup(s); + if (!result) + { + fprintf(stderr, _("%s: out of memory\n"), progname); + exit(1); + } + return result; +} + +void * +xmalloc0(int size) +{ + void *result; + + result = malloc(size); + if (!result) + { + fprintf(stderr, _("%s: out of memory\n"), progname); + exit(1); + } + MemSet(result, 0, size); + return result; +} + + +PGconn * +GetConnection(void) +{ + PGconn *tmpconn; + int argcount = 4; /* dbname, replication, fallback_app_name, + * password */ + int i; + const char **keywords; + const char **values; + char *password = NULL; + + if (dbhost) + argcount++; + if (dbuser) + argcount++; + if (dbport) + argcount++; + + keywords = xmalloc0((argcount + 1) * sizeof(*keywords)); + values = xmalloc0((argcount + 1) * sizeof(*values)); + + keywords[0] = "dbname"; + values[0] = "replication"; + keywords[1] = "replication"; + values[1] = "true"; + keywords[2] = "fallback_application_name"; + values[2] = progname; + i = 3; + if (dbhost) + { + keywords[i] = "host"; + values[i] = dbhost; + i++; + } + if (dbuser) + { + keywords[i] = "user"; + values[i] = dbuser; + i++; + } + if (dbport) + { + keywords[i] = "port"; + values[i] = dbport; + i++; + } + + while (true) + { + if (password) + free(password); + + if (dbpassword) + { + /* + * We've saved a password when a previous connection succeeded, + * meaning this is the call for a second session to the same + * database, so just forcibly reuse that password. + */ + keywords[argcount - 1] = "password"; + values[argcount - 1] = dbpassword; + dbgetpassword = -1; /* Don't try again if this fails */ + } + else if (dbgetpassword == 1) + { + password = simple_prompt(_("Password: "), 100, false); + keywords[argcount - 1] = "password"; + values[argcount - 1] = password; + } + + tmpconn = PQconnectdbParams(keywords, values, true); + + if (PQstatus(tmpconn) == CONNECTION_BAD && + PQconnectionNeedsPassword(tmpconn) && + dbgetpassword != -1) + { + dbgetpassword = 1; /* ask for password next time */ + PQfinish(tmpconn); + continue; + } + + if (PQstatus(tmpconn) != CONNECTION_OK) + { + fprintf(stderr, _("%s: could not connect to server: %s\n"), + progname, PQerrorMessage(tmpconn)); + exit(1); + } + + /* Connection ok! */ + free(values); + free(keywords); + + /* Store the password for next run */ + if (password) + dbpassword = password; + return tmpconn; + } +} diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h new file mode 100644 index 00000000000..baba5eb04fb --- /dev/null +++ b/src/bin/pg_basebackup/streamutil.h @@ -0,0 +1,22 @@ +#include "libpq-fe.h" + +extern const char *progname; +extern char *dbhost; +extern char *dbuser; +extern char *dbport; +extern int dbgetpassword; + +/* Connection kept global so we can disconnect easily */ +extern PGconn *conn; + +#define disconnect_and_exit(code) \ + { \ + if (conn != NULL) PQfinish(conn); \ + exit(code); \ + } + + +char *xstrdup(const char *s); +void *xmalloc0(int size); + +PGconn *GetConnection(void); diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 3d71c881cfb..e2ae0a15781 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -305,6 +305,13 @@ sub mkvcbuild $initdb->AddLibrary('ws2_32.lib'); my $pgbasebackup = AddSimpleFrontend('pg_basebackup', 1); + $pgbasebackup->AddFile('src\bin\pg_basebackup\pg_basebackup.c'); + $pgbasebackup->AddLibrary('ws2_32.lib'); + + my $pgreceivexlog = AddSimpleFrontend('pg_basebackup', 1); + $pgreceivexlog->{name} = 'pg_receivexlog'; + $pgreceivexlog->AddFile('src\bin\pg_basebackup\pg_receivexlog.c'); + $pgreceivexlog->AddLibrary('ws2_32.lib'); my $pgconfig = AddSimpleFrontend('pg_config'); -- GitLab