diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index cd2e57867c062acc8132abc8f91656481aaa35c2..65a9e6c81ce82960f9b6837e18a378623fc898e7 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -159,41 +159,41 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, /* * Poll connection until we have OK or FAILED status. * - * Note that the initial state after PQconnectStartParams is - * PGRES_POLLING_WRITING. + * Per spec for PQconnectPoll, first wait till socket is write-ready. */ - for (status = PGRES_POLLING_WRITING; - status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED; - status = PQconnectPoll(conn->streamConn)) + status = PGRES_POLLING_WRITING; + do { - /* Sleep a bit if waiting for socket. */ - if (status == PGRES_POLLING_READING || - status == PGRES_POLLING_WRITING) + /* Wait for socket ready and/or other events. */ + int io_flag; + int rc; + + io_flag = (status == PGRES_POLLING_READING + ? WL_SOCKET_READABLE + : WL_SOCKET_WRITEABLE); + + rc = WaitLatchOrSocket(&MyProc->procLatch, + WL_POSTMASTER_DEATH | + WL_LATCH_SET | io_flag, + PQsocket(conn->streamConn), + 0, + WAIT_EVENT_LIBPQWALRECEIVER); + + /* Emergency bailout? */ + if (rc & WL_POSTMASTER_DEATH) + exit(1); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) { - int extra_flag; - int rc; - - extra_flag = (status == PGRES_POLLING_READING - ? WL_SOCKET_READABLE - : WL_SOCKET_WRITEABLE); - ResetLatch(&MyProc->procLatch); - rc = WaitLatchOrSocket(&MyProc->procLatch, - WL_POSTMASTER_DEATH | - WL_LATCH_SET | extra_flag, - PQsocket(conn->streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER); - - /* Emergency bailout. */ - if (rc & WL_POSTMASTER_DEATH) - exit(1); - - /* Interrupted. */ - if (rc & WL_LATCH_SET) - CHECK_FOR_INTERRUPTS(); + CHECK_FOR_INTERRUPTS(); } - } + + /* If socket is ready, advance the libpq state machine */ + if (rc & io_flag) + status = PQconnectPoll(conn->streamConn); + } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); if (PQstatus(conn->streamConn) != CONNECTION_OK) {