From ef5856fd9b77ef9d0d0c31fb314bb61bbfb1d704 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvherre@alvh.no-ip.org> Date: Thu, 27 Feb 2014 18:55:57 -0300 Subject: [PATCH] Allow BASE_BACKUP to be throttled MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A new MAX_RATE option allows imposing a limit to the network transfer rate from the server side. This is useful to limit the stress that taking a base backup has on the server. pg_basebackup is now able to specify a value to the server, too. Author: Antonin Houska Patch reviewed by Stefan Radomski, Andres Freund, Zoltán Böszörményi, Fujii Masao, and Ãlvaro Herrera. --- doc/src/sgml/protocol.sgml | 18 +++- doc/src/sgml/ref/pg_basebackup.sgml | 21 ++++ src/backend/replication/basebackup.c | 137 ++++++++++++++++++++++++- src/backend/replication/repl_gram.y | 8 +- src/backend/replication/repl_scanner.l | 1 + src/bin/pg_basebackup/pg_basebackup.c | 134 +++++++++++++++++++++--- src/include/replication/basebackup.h | 7 ++ 7 files changed, 306 insertions(+), 20 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 832524e95e4..d36f2f3af1f 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1772,7 +1772,7 @@ The commands accepted in walsender mode are: </varlistentry> <varlistentry> - <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>]</term> + <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>] [<literal>MAX_RATE</literal> <replaceable>rate</replaceable>]</term> <listitem> <para> Instructs the server to start streaming a base backup. @@ -1840,7 +1840,21 @@ The commands accepted in walsender mode are: the waiting and the warning, leaving the client responsible for ensuring the required log is available. </para> - </listitem> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>MAX_RATE</literal> <replaceable>rate</></term> + <listitem> + <para> + Limit (throttle) the maximum amount of data transferred from server + to client per unit of time. The expected unit is kilobytes per second. + If this option is specified, the value must either be equal to zero + or it must fall within the range from 32 kB through 1 GB (inclusive). + If zero is passed or the option is not specified, no restriction is + imposed on the transfer. + </para> + </listitem> </varlistentry> </variablelist> </para> diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index 84b45ae7a00..ede68db9388 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -188,6 +188,27 @@ PostgreSQL documentation </listitem> </varlistentry> + <varlistentry> + <term><option>-r <replaceable class="parameter">rate</replaceable></option></term> + <term><option>--max-rate=<replaceable class="parameter">rate</replaceable></option></term> + <listitem> + <para> + The maximum transfer rate of data transferred from the server. Values are + in kilobytes per second. Use a suffix of <literal>M</> to indicate megabytes + per second. A suffix of <literal>k</> is also accepted, and has no effect. + Valid values are between 32 kilobytes per second and 1024 megabytes per second. + </para> + <para> + The purpose is to limit the impact of <application>pg_basebackup</application> + on the running server. + </para> + <para> + This option always affects transfer of the data directory. Transfer of + WAL files is only affected if the collection method is <literal>fetch</literal>. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><option>-R</option></term> <term><option>--write-recovery-conf</option></term> diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 2bbe384e351..d68a1533602 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -25,6 +25,7 @@ #include "libpq/pqformat.h" #include "miscadmin.h" #include "nodes/pg_list.h" +#include "pgtar.h" #include "pgstat.h" #include "replication/basebackup.h" #include "replication/walsender.h" @@ -34,7 +35,8 @@ #include "utils/builtins.h" #include "utils/elog.h" #include "utils/ps_status.h" -#include "pgtar.h" +#include "utils/timestamp.h" + typedef struct { @@ -43,6 +45,7 @@ typedef struct bool fastcheckpoint; bool nowait; bool includewal; + uint32 maxrate; } basebackup_options; @@ -60,6 +63,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void parse_basebackup_options(List *options, basebackup_options *opt); static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const void *a, const void *b); +static void throttle(size_t increment); /* Was the backup currently in-progress initiated in recovery mode? */ static bool backup_started_in_recovery = false; @@ -72,6 +76,23 @@ static char *statrelpath = NULL; */ #define TAR_SEND_SIZE 32768 +/* + * How frequently to throttle, as a fraction of the specified rate-second. + */ +#define THROTTLING_FREQUENCY 8 + +/* The actual number of bytes, transfer of which may cause sleep. */ +static uint64 throttling_sample; + +/* Amount of data already transfered but not yet throttled. */ +static int64 throttling_counter; + +/* The minimum time required to transfer throttling_sample bytes. */ +static int64 elapsed_min_unit; + +/* The last check of the transfer rate. */ +static int64 throttled_last; + typedef struct { char *oid; @@ -203,6 +224,29 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) /* Send tablespace header */ SendBackupHeader(tablespaces); + /* Setup and activate network throttling, if client requested it */ + if (opt->maxrate > 0) + { + throttling_sample = opt->maxrate * 1024 / THROTTLING_FREQUENCY; + + /* + * The minimum amount of time for throttling_sample + * bytes to be transfered. + */ + elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; + + /* Enable throttling. */ + throttling_counter = 0; + + /* The 'real data' starts now (header was ignored). */ + throttled_last = GetCurrentIntegerTimestamp(); + } + else + { + /* Disable throttling. */ + throttling_counter = -1; + } + /* Send off our tablespaces one by one */ foreach(lc, tablespaces) { @@ -430,6 +474,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) (errmsg("base backup could not send data, aborting backup"))); len += cnt; + throttle(cnt); + if (len == XLogSegSize) break; } @@ -500,6 +546,7 @@ parse_basebackup_options(List *options, basebackup_options *opt) bool o_fast = false; bool o_nowait = false; bool o_wal = false; + bool o_maxrate = false; MemSet(opt, 0, sizeof(*opt)); foreach(lopt, options) @@ -551,6 +598,25 @@ parse_basebackup_options(List *options, basebackup_options *opt) opt->includewal = true; o_wal = true; } + else if (strcmp(defel->defname, "max_rate") == 0) + { + long maxrate; + + if (o_maxrate) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + + maxrate = intVal(defel->arg); + if (maxrate < MAX_RATE_LOWER || maxrate > MAX_RATE_UPPER) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("%d is outside the valid range for parameter \"%s\" (%d .. %d)", + (int) maxrate, "MAX_RATE", MAX_RATE_LOWER, MAX_RATE_UPPER))); + + opt->maxrate = (uint32) maxrate; + o_maxrate = true; + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); @@ -1112,6 +1178,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, (errmsg("base backup could not send data, aborting backup"))); len += cnt; + throttle(cnt); if (len >= statbuf->st_size) { @@ -1133,10 +1200,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, cnt = Min(sizeof(buf), statbuf->st_size - len); pq_putmessage('d', buf, cnt); len += cnt; + throttle(cnt); } } - /* Pad to 512 byte boundary, per tar format requirements */ + /* + * Pad to 512 byte boundary, per tar format requirements. (This small + * piece of data is probably not worth throttling.) + */ pad = ((len + 511) & ~511) - len; if (pad > 0) { @@ -1162,3 +1233,65 @@ _tarWriteHeader(const char *filename, const char *linktarget, pq_putmessage('d', h, 512); } + +/* + * Increment the network transfer counter by the given number of bytes, + * and sleep if necessary to comply with the requested network transfer + * rate. + */ +static void +throttle(size_t increment) +{ + int64 elapsed, + elapsed_min, + sleep; + int wait_result; + + if (throttling_counter < 0) + return; + + throttling_counter += increment; + if (throttling_counter < throttling_sample) + return; + + /* Time elapsed since the last measurement (and possible wake up). */ + elapsed = GetCurrentIntegerTimestamp() - throttled_last; + /* How much should have elapsed at minimum? */ + elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample); + sleep = elapsed_min - elapsed; + /* Only sleep if the transfer is faster than it should be. */ + if (sleep > 0) + { + ResetLatch(&MyWalSnd->latch); + + /* + * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be + * the maximum time to sleep. Thus the cast to long is safe. + */ + wait_result = WaitLatch(&MyWalSnd->latch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + (long) (sleep / 1000)); + } + else + { + /* + * The actual transfer rate is below the limit. A negative value would + * distort the adjustment of throttled_last. + */ + wait_result = 0; + sleep = 0; + } + + /* + * Only a whole multiple of throttling_sample was processed. The rest will + * be done during the next call of this function. + */ + throttling_counter %= throttling_sample; + + /* Once the (possible) sleep has ended, new period starts. */ + if (wait_result & WL_TIMEOUT) + throttled_last += elapsed + sleep; + else if (sleep > 0) + /* Sleep was necessary but might have been interrupted. */ + throttled_last = GetCurrentIntegerTimestamp(); +} diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index c3f4a24a8ff..308889b5c9a 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -69,6 +69,7 @@ Node *replication_parse_result; %token K_PROGRESS %token K_FAST %token K_NOWAIT +%token K_MAX_RATE %token K_WAL %token K_TIMELINE %token K_PHYSICAL @@ -113,7 +114,7 @@ identify_system: ; /* - * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] + * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [MAX_RATE %d] */ base_backup: K_BASE_BACKUP base_backup_opt_list @@ -157,6 +158,11 @@ base_backup_opt: $$ = makeDefElem("nowait", (Node *)makeInteger(TRUE)); } + | K_MAX_RATE UCONST + { + $$ = makeDefElem("max_rate", + (Node *)makeInteger($2)); + } ; /* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 24195a59719..ca32aa67ff1 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -86,6 +86,7 @@ IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } LABEL { return K_LABEL; } NOWAIT { return K_NOWAIT; } PROGRESS { return K_PROGRESS; } +MAX_RATE { return K_MAX_RATE; } WAL { return K_WAL; } TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 9d7a1e38add..919805f5cfa 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -12,10 +12,6 @@ */ #include "postgres_fe.h" -#include "libpq-fe.h" -#include "pqexpbuffer.h" -#include "pgtar.h" -#include "pgtime.h" #include <unistd.h> #include <dirent.h> @@ -30,8 +26,12 @@ #endif #include "getopt_long.h" - +#include "libpq-fe.h" +#include "pqexpbuffer.h" +#include "pgtar.h" +#include "pgtime.h" #include "receivelog.h" +#include "replication/basebackup.h" #include "streamutil.h" @@ -65,6 +65,8 @@ static bool fastcheckpoint = false; static bool writerecoveryconf = false; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static pg_time_t last_progress_report = 0; +static int32 maxrate = 0; /* no limit by default */ + /* Progress counters */ static uint64 totalsize; @@ -226,6 +228,7 @@ usage(void) printf(_("\nOptions controlling the output:\n")); printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n")); printf(_(" -F, --format=p|t output format (plain (default), tar)\n")); + printf(_(" -r, --max-rate=RATE maximum transfer rate to transfer data directory\n")); printf(_(" -R, --write-recovery-conf\n" " write recovery.conf after backup\n")); printf(_(" -T, --tablespace-mapping=OLDDIR=NEWDIR\n" @@ -606,6 +609,97 @@ progress_report(int tablespacenum, const char *filename, bool force) fprintf(stderr, "\r"); } +static int32 +parse_max_rate(char *src) +{ + double result; + char *after_num; + char *suffix = NULL; + + errno = 0; + result = strtod(src, &after_num); + if (src == after_num) + { + fprintf(stderr, + _("%s: transfer rate \"%s\" is not a valid value\n"), + progname, src); + exit(1); + } + if (errno != 0) + { + fprintf(stderr, + _("%s: invalid transfer rate \"%s\": %s\n"), + progname, src, strerror(errno)); + exit(1); + } + + if (result <= 0) + { + /* + * Reject obviously wrong values here. + */ + fprintf(stderr, _("%s: transfer rate must be greater than zero\n"), + progname); + exit(1); + } + + /* + * Evaluate suffix, after skipping over possible whitespace. + * Lack of suffix means kilobytes. + */ + while (*after_num != '\0' && isspace((unsigned char) *after_num)) + after_num++; + + if (*after_num != '\0') + { + suffix = after_num; + if (*after_num == 'k') + { + /* kilobyte is the expected unit. */ + after_num++; + } + else if (*after_num == 'M') + { + after_num++; + result *= 1024.0; + } + } + + /* The rest can only consist of white space. */ + while (*after_num != '\0' && isspace((unsigned char) *after_num)) + after_num++; + + if (*after_num != '\0') + { + fprintf(stderr, + _("%s: invalid --max-rate units: \"%s\"\n"), + progname, suffix); + exit(1); + } + + /* Valid integer? */ + if ((uint64) result != (uint64) ((uint32) result)) + { + fprintf(stderr, + _("%s: transfer rate \"%s\" exceeds integer range\n"), + progname, src); + exit(1); + } + + /* + * The range is checked on the server side too, but avoid the server + * connection if a nonsensical value was passed. + */ + if (result < MAX_RATE_LOWER || result > MAX_RATE_UPPER) + { + fprintf(stderr, + _("%s: transfer rate \"%s\" is out of range\n"), + progname, src); + exit(1); + } + + return (int32) result; +} /* * Write a piece of tar data @@ -1485,8 +1579,9 @@ BaseBackup(void) char *sysidentifier; uint32 latesttli; uint32 starttli; - char current_path[MAXPGPATH]; + char *basebkp; char escaped_label[MAXPGPATH]; + char *maxrate_clause = NULL; int i; char xlogstart[64]; char xlogend[64]; @@ -1559,15 +1654,20 @@ BaseBackup(void) * Start the actual backup */ PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i); - snprintf(current_path, sizeof(current_path), - "BASE_BACKUP LABEL '%s' %s %s %s %s", - escaped_label, - showprogress ? "PROGRESS" : "", - includewal && !streamwal ? "WAL" : "", - fastcheckpoint ? "FAST" : "", - includewal ? "NOWAIT" : ""); - if (PQsendQuery(conn, current_path) == 0) + if (maxrate > 0) + maxrate_clause = psprintf("MAX_RATE %u", maxrate); + + basebkp = + psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s", + escaped_label, + showprogress ? "PROGRESS" : "", + includewal && !streamwal ? "WAL" : "", + fastcheckpoint ? "FAST" : "", + includewal ? "NOWAIT" : "", + maxrate_clause ? maxrate_clause : ""); + + if (PQsendQuery(conn, basebkp) == 0) { fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), progname, "BASE_BACKUP", PQerrorMessage(conn)); @@ -1847,6 +1947,7 @@ main(int argc, char **argv) {"pgdata", required_argument, NULL, 'D'}, {"format", required_argument, NULL, 'F'}, {"checkpoint", required_argument, NULL, 'c'}, + {"max-rate", required_argument, NULL, 'r'}, {"write-recovery-conf", no_argument, NULL, 'R'}, {"tablespace-mapping", required_argument, NULL, 'T'}, {"xlog", no_argument, NULL, 'x'}, @@ -1888,7 +1989,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "D:F:RT:xX:l:zZ:d:c:h:p:U:s:wWvP", + while ((c = getopt_long(argc, argv, "D:F:r:RT:xX:l:zZ:d:c:h:p:U:s:wWvP", long_options, &option_index)) != -1) { switch (c) @@ -1909,6 +2010,9 @@ main(int argc, char **argv) exit(1); } break; + case 'r': + maxrate = parse_max_rate(optarg); + break; case 'R': writerecoveryconf = true; break; diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h index 4e3e5f35c68..3dbc4bc9ef8 100644 --- a/src/include/replication/basebackup.h +++ b/src/include/replication/basebackup.h @@ -14,6 +14,13 @@ #include "nodes/replnodes.h" +/* + * Minimum and maximum values of MAX_RATE option in BASE_BACKUP command. + */ +#define MAX_RATE_LOWER 32 +#define MAX_RATE_UPPER 1048576 + + extern void SendBaseBackup(BaseBackupCmd *cmd); #endif /* _BASEBACKUP_H */ -- GitLab