From 0c013e08cfbebd35ec982c4df15d44b521094d52 Mon Sep 17 00:00:00 2001 From: Andres Freund <andres@anarazel.de> Date: Wed, 1 Oct 2014 17:22:21 +0200 Subject: [PATCH] Refactor replication connection code of various pg_basebackup utilities. Move some more code to manage replication connection command to streamutil.c. A later patch will introduce replication slot via pg_receivexlog and this avoid duplicating relevant code between pg_receivexlog and pg_recvlogical. Author: Michael Paquier, with some editing by me. --- src/bin/pg_basebackup/pg_basebackup.c | 21 +-- src/bin/pg_basebackup/pg_receivexlog.c | 38 +----- src/bin/pg_basebackup/pg_recvlogical.c | 116 ++++------------ src/bin/pg_basebackup/streamutil.c | 177 ++++++++++++++++++++++++- src/bin/pg_basebackup/streamutil.h | 11 ++ 5 files changed, 222 insertions(+), 141 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 8b9acea9f08..0ebda9ae9e0 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -1569,8 +1569,8 @@ BaseBackup(void) { PGresult *res; char *sysidentifier; - uint32 latesttli; - uint32 starttli; + TimeLineID latesttli; + TimeLineID starttli; char *basebkp; char escaped_label[MAXPGPATH]; char *maxrate_clause = NULL; @@ -1624,23 +1624,8 @@ BaseBackup(void) /* * Run IDENTIFY_SYSTEM so we can get the timeline */ - res = PQexec(conn, "IDENTIFY_SYSTEM"); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); + if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL)) disconnect_and_exit(1); - } - if (PQntuples(res) != 1 || PQnfields(res) < 3) - { - fprintf(stderr, - _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 3); - disconnect_and_exit(1); - } - sysidentifier = pg_strdup(PQgetvalue(res, 0, 0)); - latesttli = atoi(PQgetvalue(res, 0, 1)); - PQclear(res); /* * Start the actual backup diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index a8b9ad3c05f..171cf431f57 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -253,13 +253,8 @@ FindStreamingStart(uint32 *tli) static void StreamLog(void) { - PGresult *res; - XLogRecPtr startpos; - uint32 starttli; - XLogRecPtr serverpos; - uint32 servertli; - uint32 hi, - lo; + XLogRecPtr startpos, serverpos; + TimeLineID starttli, servertli; /* * Connect in replication mode to the server @@ -280,33 +275,12 @@ StreamLog(void) } /* - * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog - * position. + * Identify server, obtaining start LSN position and current timeline ID + * at the same time, necessary if not valid data can be found in the + * existing output directory. */ - res = PQexec(conn, "IDENTIFY_SYSTEM"); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); - disconnect_and_exit(1); - } - if (PQntuples(res) != 1 || PQnfields(res) < 3) - { - fprintf(stderr, - _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 3); + if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL)) disconnect_and_exit(1); - } - servertli = atoi(PQgetvalue(res, 0, 1)); - if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2) - { - fprintf(stderr, - _("%s: could not parse transaction log location \"%s\"\n"), - progname, PQgetvalue(res, 0, 2)); - disconnect_and_exit(1); - } - serverpos = ((uint64) hi) << 32 | lo; - PQclear(res); /* * Figure out where to start streaming. diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index a88ffacc06d..c48ceccf901 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -596,7 +596,6 @@ sighup_handler(int signum) int main(int argc, char **argv) { - PGresult *res; static struct option long_options[] = { /* general options */ {"file", required_argument, NULL, 'f'}, @@ -628,6 +627,7 @@ main(int argc, char **argv) int option_index; uint32 hi, lo; + char *db_name; progname = get_progname(argv[0]); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical")); @@ -834,124 +834,62 @@ main(int argc, char **argv) #endif /* - * don't really need this but it actually helps to get more precise error - * messages about authentication, required GUCs and such without starting - * to loop around connection attempts lateron. + * Obtain a connection to server. This is not really necessary but it + * helps to get more precise error messages about authentification, + * required GUC parameters and such. */ - { - conn = GetConnection(); - if (!conn) - /* Error message already written in GetConnection() */ - exit(1); + conn = GetConnection(); + if (!conn) + /* Error message already written in GetConnection() */ + exit(1); - /* - * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog - * position. - */ - res = PQexec(conn, "IDENTIFY_SYSTEM"); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); - disconnect_and_exit(1); - } + /* + * Run IDENTIFY_SYSTEM to make sure we connected using a database specific + * replication connection. + */ + if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name)) + disconnect_and_exit(1); - if (PQntuples(res) != 1 || PQnfields(res) < 4) - { - fprintf(stderr, - _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 4); - disconnect_and_exit(1); - } - PQclear(res); + if (db_name == NULL) + { + fprintf(stderr, + _("%s: failed to establish database specific replication connection\n"), + progname); + disconnect_and_exit(1); } - - /* - * drop a replication slot - */ + /* Drop a replication slot. */ if (do_drop_slot) { - char query[256]; - if (verbose) fprintf(stderr, _("%s: dropping replication slot \"%s\"\n"), progname, replication_slot); - snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"", - replication_slot); - res = PQexec(conn, query); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, query, PQerrorMessage(conn)); + if (!DropReplicationSlot(conn, replication_slot)) disconnect_and_exit(1); - } - - if (PQntuples(res) != 0 || PQnfields(res) != 0) - { - fprintf(stderr, - _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, replication_slot, PQntuples(res), PQnfields(res), 0, 0); - disconnect_and_exit(1); - } - - PQclear(res); - disconnect_and_exit(0); } - /* - * create a replication slot - */ + /* Create a replication slot. */ if (do_create_slot) { - char query[256]; - if (verbose) fprintf(stderr, _("%s: creating replication slot \"%s\"\n"), progname, replication_slot); - snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", - replication_slot, plugin); - - res = PQexec(conn, query); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, query, PQerrorMessage(conn)); + if (!CreateReplicationSlot(conn, replication_slot, plugin, + &startpos, false)) disconnect_and_exit(1); - } - - if (PQntuples(res) != 1 || PQnfields(res) != 4) - { - fprintf(stderr, - _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, replication_slot, PQntuples(res), PQnfields(res), 1, 4); - disconnect_and_exit(1); - } - - if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) - { - fprintf(stderr, - _("%s: could not parse transaction log location \"%s\"\n"), - progname, PQgetvalue(res, 0, 1)); - disconnect_and_exit(1); - } - startpos = ((uint64) hi) << 32 | lo; - - replication_slot = strdup(PQgetvalue(res, 0, 0)); - PQclear(res); } - if (!do_start_slot) disconnect_and_exit(0); + /* Stream loop */ while (true) { - StreamLog(); + StreamLogicalLog(); if (time_to_abort) { /* diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 1100260c05a..2f4bac95508 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -27,6 +27,7 @@ #include "receivelog.h" #include "streamutil.h" +#include "pqexpbuffer.h" #include "common/fe_memutils.h" #include "datatype/timestamp.h" @@ -227,11 +228,183 @@ GetConnection(void) return tmpconn; } +/* + * Run IDENTIFY_SYSTEM through a given connection and give back to caller + * some result information if requested: + * - Start LSN position + * - Current timeline ID + * - System identifier + * - Plugin name + */ +bool +RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, + XLogRecPtr *startpos, char **db_name) +{ + PGresult *res; + uint32 hi, lo; + + /* Check connection existence */ + Assert(conn != NULL); + + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); + return false; + } + if (PQntuples(res) != 1 || PQnfields(res) < 3) + { + fprintf(stderr, + _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 3); + return false; + } + + /* Get system identifier */ + if (sysid != NULL) + *sysid = pg_strdup(PQgetvalue(res, 0, 0)); + + /* Get timeline ID to start streaming from */ + if (starttli != NULL) + *starttli = atoi(PQgetvalue(res, 0, 1)); + + /* Get LSN start position if necessary */ + if (startpos != NULL) + { + if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse transaction log location \"%s\"\n"), + progname, PQgetvalue(res, 0, 2)); + return false; + } + *startpos = ((uint64) hi) << 32 | lo; + } + + /* Get database name, only available in 9.4 and newer versions */ + if (db_name != NULL) + { + if (PQnfields(res) < 4) + fprintf(stderr, + _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 4); + + if (PQgetisnull(res, 0, 3)) + *db_name = NULL; + else + *db_name = pg_strdup(PQgetvalue(res, 0, 3)); + } + + PQclear(res); + return true; +} + +/* + * Create a replication slot for the given connection. This function + * returns true in case of success as well as the start position + * obtained after the slot creation. + */ +bool +CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, + XLogRecPtr *startpos, bool is_physical) +{ + PQExpBuffer query; + PGresult *res; + + query = createPQExpBuffer(); + + Assert((is_physical && plugin == NULL) || + (!is_physical && plugin != NULL)); + Assert(slot_name != NULL); + + /* Build query */ + if (is_physical) + appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL", + slot_name); + else + appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", + slot_name, plugin); + + res = PQexec(conn, query->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, query->data, PQerrorMessage(conn)); + return false; + } + + if (PQntuples(res) != 1 || PQnfields(res) != 4) + { + fprintf(stderr, + _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, slot_name, + PQntuples(res), PQnfields(res), 1, 4); + return false; + } + + /* Get LSN start position if necessary */ + if (startpos != NULL) + { + uint32 hi, lo; + + if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse transaction log location \"%s\"\n"), + progname, PQgetvalue(res, 0, 1)); + return false; + } + *startpos = ((uint64) hi) << 32 | lo; + } + + PQclear(res); + return true; +} + +/* + * Drop a replication slot for the given connection. This function + * returns true in case of success. + */ +bool +DropReplicationSlot(PGconn *conn, const char *slot_name) +{ + PQExpBuffer query; + PGresult *res; + + Assert(slot_name != NULL); + + query = createPQExpBuffer(); + + /* Build query */ + appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", + slot_name); + res = PQexec(conn, query->data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, query->data, PQerrorMessage(conn)); + return false; + } + + if (PQntuples(res) != 0 || PQnfields(res) != 0) + { + fprintf(stderr, + _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, slot_name, + PQntuples(res), PQnfields(res), 0, 0); + return false; + } + + PQclear(res); + return true; +} + /* * Frontend version of GetCurrentTimestamp(), since we are not linked with - * backend code. The protocol always uses integer timestamps, regardless of - * server setting. + * backend code. The replication protocol always uses integer timestamps, + * regardless of the server setting. */ int64 feGetCurrentTimestamp(void) diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index 8c6691f9c8c..ac66145c359 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -14,6 +14,8 @@ #include "libpq-fe.h" +#include "access/xlogdefs.h" + extern const char *progname; extern char *connection_string; extern char *dbhost; @@ -28,6 +30,15 @@ extern PGconn *conn; extern PGconn *GetConnection(void); +/* Replication commands */ +extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name, + const char *plugin, XLogRecPtr *startpos, + bool is_physical); +extern bool DropReplicationSlot(PGconn *conn, const char *slot_name); +extern bool RunIdentifySystem(PGconn *conn, char **sysid, + TimeLineID *starttli, + XLogRecPtr *startpos, + char **db_name); extern int64 feGetCurrentTimestamp(void); extern void feTimestampDifference(int64 start_time, int64 stop_time, long *secs, int *microsecs); -- GitLab