diff --git a/contrib/test_decoding/expected/decoding_into_rel.out b/contrib/test_decoding/expected/decoding_into_rel.out index 2671258f5d9362c88ec5d25f6f7a42bd5691bbdf..be759caa31de8578c5023aef1b740d5be92a92b9 100644 --- a/contrib/test_decoding/expected/decoding_into_rel.out +++ b/contrib/test_decoding/expected/decoding_into_rel.out @@ -1,6 +1,8 @@ -- test that we can insert the result of a get_changes call into a -- logged relation. That's really not a good idea in practical terms, -- but provides a nice test. +-- predictability +SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); ?column? ---------- diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql index 3704821bcc3ef8bd109d0f1ee9b3a0d078b2f443..54670fd39e76ca41135eaee692edbc477c6ab175 100644 --- a/contrib/test_decoding/sql/decoding_into_rel.sql +++ b/contrib/test_decoding/sql/decoding_into_rel.sql @@ -1,6 +1,10 @@ -- test that we can insert the result of a get_changes call into a -- logged relation. That's really not a good idea in practical terms, -- but provides a nice test. + +-- predictability +SET synchronous_commit = on; + SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); -- slot works diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8c318cd4b519c789b5ac456e936d0d3146eb9965..8fd663562672f6b9145023eeb7344f5e37728ee6 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -146,10 +146,19 @@ StartupDecodingContext(List *output_plugin_options, * logical decoding backend which doesn't need to be checked individually * when computing the xmin horizon because the xmin is enforced via * replication slots. + * + * We can only do so if we're outside of a transaction (i.e. the case when + * streaming changes via walsender), otherwise a already setup + * snapshot/xid would end up being ignored. That's not a particularly + * bothersome restriction since the SQL interface can't be used for + * streaming anyway. */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING; - LWLockRelease(ProcArrayLock); + if (!IsTransactionOrTransactionBlock()) + { + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING; + LWLockRelease(ProcArrayLock); + } ctx->slot = slot; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index c23f4da5b606624e03107357b7b5965c29aebb6a..4ad4164927ecd76f3250438b48c1c0e05287b7b9 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -43,7 +43,7 @@ struct XidCache #define PROC_IN_ANALYZE 0x04 /* currently running analyze */ #define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */ #define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical - * decoding */ + * decoding outside xact */ /* flags reset at EOXact */ #define PROC_VACUUM_STATE_MASK \