diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 534f052c319b7ac85cbdfbad1be96dd46df62843..685cb40d089685f7a761b4befa3b9d96b8a6e4ab 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1172,9 +1172,8 @@ WalSndWaitForWal(XLogRecPtr loc) int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; - /* - * Fast path to avoid acquiring the spinlock in the we already know we + * Fast path to avoid acquiring the spinlock in case we already know we * have enough WAL available. This is particularly interesting if we're * far behind. */ @@ -2489,6 +2488,7 @@ XLogSendLogical(void) { XLogRecord *record; char *errm; + XLogRecPtr flushPtr; /* * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to @@ -2505,40 +2505,29 @@ XLogSendLogical(void) if (errm != NULL) elog(ERROR, "%s", errm); + /* + * We'll use the current flush point to determine whether we've caught up. + */ + flushPtr = GetFlushRecPtr(); + if (record != NULL) { - /* XXX: Note that logical decoding cannot be used while in recovery */ - XLogRecPtr flushPtr = GetFlushRecPtr(); - LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); sentPtr = logical_decoding_ctx->reader->EndRecPtr; - - /* - * If we have sent a record that is at or beyond the flushed point, we - * have caught up. - */ - if (sentPtr >= flushPtr) - WalSndCaughtUp = true; } - else - { - /* - * If the record we just wanted read is at or beyond the flushed - * point, then we're caught up. - */ - if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr()) - { - WalSndCaughtUp = true; - /* - * Have WalSndLoop() terminate the connection in an orderly - * manner, after writing out all the pending data. - */ - if (got_STOPPING) - got_SIGUSR2 = true; - } - } + /* Set flag if we're caught up. */ + if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + WalSndCaughtUp = true; + + /* + * If we're caught up and have been requested to stop, have WalSndLoop() + * terminate the connection in an orderly manner, after writing out all + * the pending data. + */ + if (WalSndCaughtUp && got_STOPPING) + got_SIGUSR2 = true; /* Update shared memory status */ {