diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9e318e669d099bbb375c52580b910fb23c63cb39..d41858e49a201de8975d04514a75cd40189f98ff 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.8 2010/03/21 00:17:58 petere Exp $ + * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $ * *------------------------------------------------------------------------- */ @@ -54,6 +54,7 @@ static void libpqrcv_disconnect(void); /* Prototypes for private functions */ static bool libpq_select(int timeout_ms); +static PGresult *libpqrcv_PQexec(const char *query); /* * Module load callback @@ -97,7 +98,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = PQexec(streamConn, "IDENTIFY_SYSTEM"); + res = libpqrcv_PQexec("IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -149,11 +150,14 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) /* Start streaming from the point requested by startup process */ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", startpoint.xlogid, startpoint.xrecoff); - res = PQexec(streamConn, cmd); + res = libpqrcv_PQexec(cmd); if (PQresultStatus(res) != PGRES_COPY_OUT) + { + PQclear(res); ereport(ERROR, (errmsg("could not start WAL streaming: %s", PQerrorMessage(streamConn)))); + } PQclear(res); justconnected = true; @@ -224,6 +228,84 @@ libpq_select(int timeout_ms) return true; } +/* + * Send a query and wait for the results by using the asynchronous libpq + * functions and the backend version of select(). + * + * We must not use the regular blocking libpq functions like PQexec() + * since they are uninterruptible by signals on some platforms, such as + * Windows. + * + * We must also not use vanilla select() here since it cannot handle the + * signal emulation layer on Windows. + * + * The function is modeled on PQexec() in libpq, but only implements + * those parts that are in use in the walreceiver. + * + * Queries are always executed on the connection in streamConn. + */ +static PGresult * +libpqrcv_PQexec(const char *query) +{ + PGresult *result = NULL; + PGresult *lastResult = NULL; + + /* + * PQexec() silently discards any prior query results on the + * connection. This is not required for walreceiver since it's + * expected that walsender won't generate any such junk results. + */ + + /* + * Submit a query. Since we don't use non-blocking mode, this also + * can block. But its risk is relatively small, so we ignore that + * for now. + */ + if (!PQsendQuery(streamConn, query)) + return NULL; + + for (;;) + { + /* + * Receive data until PQgetResult is ready to get the result + * without blocking. + */ + while (PQisBusy(streamConn)) + { + /* + * We don't need to break down the sleep into smaller increments, + * and check for interrupts after each nap, since we can just + * elog(FATAL) within SIGTERM signal handler if the signal + * arrives in the middle of establishment of replication connection. + */ + if (!libpq_select(-1)) + continue; /* interrupted */ + if (PQconsumeInput(streamConn) == 0) + return NULL; /* trouble */ + } + + /* + * Emulate the PQexec()'s behavior of returning the last result + * when there are many. + * Since walsender will never generate multiple results, we skip + * the concatenation of error messages. + */ + result = PQgetResult(streamConn); + if (result == NULL) + break; /* query is complete */ + + PQclear(lastResult); + lastResult = result; + + if (PQresultStatus(lastResult) == PGRES_COPY_IN || + PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQstatus(streamConn) == CONNECTION_BAD) + break; + } + + return lastResult; +} + /* * Disconnect connection to primary, if any. */ diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 090111bb1126d6ad48f582d91e7b263885163192..f2694db8733b39af6da53744100b40844da3ca80 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.8 2010/04/13 08:16:09 mha Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $ * *------------------------------------------------------------------------- */ @@ -86,8 +86,8 @@ static void DisableWalRcvImmediateExit(void); * We can't just exit(1) within SIGTERM signal handler, because the signal * might arrive in the middle of some critical operation, like while we're * holding a spinlock. We also can't just set a flag in signal handler and - * check it in the main loop, because we perform some blocking libpq - * operations like PQexec(), which can take a long time to finish. + * check it in the main loop, because we perform some blocking operations + * like libpqrcv_PQexec(), which can take a long time to finish. * * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's * safe for the signal handler to elog(FATAL) immediately. Otherwise it just