diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 247efc8e6ecb92b8d542b8c2389af2be844f7aeb..1868bf5f9ee3e5f8ca027eef4dd631286ab12b70 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -461,7 +461,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) if (stmt->drop_slot) PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT"); - rel = heap_open(SubscriptionRelationId, RowExclusiveLock); + /* + * Lock pg_subscription with AccessExclusiveLock to ensure + * that the launcher doesn't restart new worker during dropping + * the subscription + */ + rel = heap_open(SubscriptionRelationId, AccessExclusiveLock); tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId, CStringGetDatum(stmt->subname)); @@ -528,14 +533,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); - /* Protect against launcher restarting the worker. */ - LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE); - /* Kill the apply worker so that the slot becomes accessible. */ logicalrep_worker_stop(subid); - LWLockRelease(LogicalRepLauncherLock); - /* Remove the origin tracking if exists. */ snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 39530f96a3fc60e2e417d00ab7f98c2a991ec5cc..afdadc17d6d9dd36d409431b6654a40eb349ea82 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -305,17 +305,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) /* * Stop the logical replication worker and wait until it detaches from the * slot. - * - * The caller must hold LogicalRepLauncherLock to ensure that new workers are - * not being started during this function call. */ void logicalrep_worker_stop(Oid subid) { LogicalRepWorker *worker; - Assert(LWLockHeldByMe(LogicalRepLauncherLock)); - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(subid); @@ -602,9 +597,6 @@ ApplyLauncherMain(Datum main_arg) ALLOCSET_DEFAULT_MAXSIZE); oldctx = MemoryContextSwitchTo(subctx); - /* Block any concurrent DROP SUBSCRIPTION. */ - LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE); - /* search for subscriptions to start or stop. */ sublist = get_subscription_list(); @@ -628,8 +620,6 @@ ApplyLauncherMain(Datum main_arg) } } - LWLockRelease(LogicalRepLauncherLock); - /* Switch back to original memory context. */ MemoryContextSwitchTo(oldctx); /* Clean the temporary memory. */ diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index c95ca5b2e1a0f061ef6d0ec17d474927cb816ac0..cd8b08f50dae72791c4cbc8a7fb06fd8dd75917c 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -48,5 +48,4 @@ ReplicationOriginLock 40 MultiXactTruncationLock 41 OldSnapshotTimeMapLock 42 BackendRandomLock 43 -LogicalRepLauncherLock 44 -LogicalRepWorkerLock 45 +LogicalRepWorkerLock 44