Skip to content
Snippets Groups Projects
Commit 4beb25c6 authored by Robert Haas's avatar Robert Haas
Browse files

Add subtransaction handling for table synchronization workers.

Since the old logic was completely unaware of subtransactions, a
change made in a subsequently-aborted subtransaction would still cause
workers to be stopped at toplevel transaction commit.  Fix that by
managing a stack of worker lists rather than just one.

Amit Khandekar and Robert Haas

Discussion: http://postgr.es/m/CAJ3gD9eaG_mWqiOTA2LfAug-VRNn1hrhf50Xi1YroxL37QkZNg@mail.gmail.com
parent 0bb28ca3
No related branches found
Tags
No related merge requests found
...@@ -4542,6 +4542,7 @@ CommitSubTransaction(void) ...@@ -4542,6 +4542,7 @@ CommitSubTransaction(void)
AtEOSubXact_HashTables(true, s->nestingLevel); AtEOSubXact_HashTables(true, s->nestingLevel);
AtEOSubXact_PgStat(true, s->nestingLevel); AtEOSubXact_PgStat(true, s->nestingLevel);
AtSubCommit_Snapshot(s->nestingLevel); AtSubCommit_Snapshot(s->nestingLevel);
AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
/* /*
* We need to restore the upper transaction's read-only state, in case the * We need to restore the upper transaction's read-only state, in case the
...@@ -4695,6 +4696,7 @@ AbortSubTransaction(void) ...@@ -4695,6 +4696,7 @@ AbortSubTransaction(void)
AtEOSubXact_HashTables(false, s->nestingLevel); AtEOSubXact_HashTables(false, s->nestingLevel);
AtEOSubXact_PgStat(false, s->nestingLevel); AtEOSubXact_PgStat(false, s->nestingLevel);
AtSubAbort_Snapshot(s->nestingLevel); AtSubAbort_Snapshot(s->nestingLevel);
AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
} }
/* /*
......
...@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId ...@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
Oid relid; Oid relid;
} LogicalRepWorkerId; } LogicalRepWorkerId;
static List *on_commit_stop_workers = NIL; typedef struct StopWorkersData
{
int nestDepth; /* Sub-transaction nest level */
List *workers; /* List of LogicalRepWorkerId */
struct StopWorkersData *parent; /* This need not be an immediate
* subtransaction parent */
} StopWorkersData;
/*
* Stack of StopWorkersData elements. Each stack element contains the workers
* to be stopped for that subtransaction.
*/
static StopWorkersData *on_commit_stop_workers = NULL;
static void ApplyLauncherWakeup(void); static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_launcher_onexit(int code, Datum arg);
...@@ -558,17 +570,41 @@ logicalrep_worker_stop(Oid subid, Oid relid) ...@@ -558,17 +570,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
void void
logicalrep_worker_stop_at_commit(Oid subid, Oid relid) logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
{ {
int nestDepth = GetCurrentTransactionNestLevel();
LogicalRepWorkerId *wid; LogicalRepWorkerId *wid;
MemoryContext oldctx; MemoryContext oldctx;
/* Make sure we store the info in context that survives until commit. */ /* Make sure we store the info in context that survives until commit. */
oldctx = MemoryContextSwitchTo(TopTransactionContext); oldctx = MemoryContextSwitchTo(TopTransactionContext);
/* Check that previous transactions were properly cleaned up. */
Assert(on_commit_stop_workers == NULL ||
nestDepth >= on_commit_stop_workers->nestDepth);
/*
* Push a new stack element if we don't already have one for the current
* nestDepth.
*/
if (on_commit_stop_workers == NULL ||
nestDepth > on_commit_stop_workers->nestDepth)
{
StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
newdata->nestDepth = nestDepth;
newdata->workers = NIL;
newdata->parent = on_commit_stop_workers;
on_commit_stop_workers = newdata;
}
/*
* Finally add a new worker into the worker list of the current
* subtransaction.
*/
wid = palloc(sizeof(LogicalRepWorkerId)); wid = palloc(sizeof(LogicalRepWorkerId));
wid->subid = subid; wid->subid = subid;
wid->relid = relid; wid->relid = relid;
on_commit_stop_workers->workers =
on_commit_stop_workers = lappend(on_commit_stop_workers, wid); lappend(on_commit_stop_workers->workers, wid);
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
} }
...@@ -820,7 +856,7 @@ ApplyLauncherShmemInit(void) ...@@ -820,7 +856,7 @@ ApplyLauncherShmemInit(void)
bool bool
XactManipulatesLogicalReplicationWorkers(void) XactManipulatesLogicalReplicationWorkers(void)
{ {
return (on_commit_stop_workers != NIL); return (on_commit_stop_workers != NULL);
} }
/* /*
...@@ -829,16 +865,26 @@ XactManipulatesLogicalReplicationWorkers(void) ...@@ -829,16 +865,26 @@ XactManipulatesLogicalReplicationWorkers(void)
void void
AtEOXact_ApplyLauncher(bool isCommit) AtEOXact_ApplyLauncher(bool isCommit)
{ {
Assert(on_commit_stop_workers == NULL ||
(on_commit_stop_workers->nestDepth == 1 &&
on_commit_stop_workers->parent == NULL));
if (isCommit) if (isCommit)
{ {
ListCell *lc; ListCell *lc;
foreach(lc, on_commit_stop_workers) if (on_commit_stop_workers != NULL)
{
List *workers = on_commit_stop_workers->workers;
foreach(lc, workers)
{ {
LogicalRepWorkerId *wid = lfirst(lc); LogicalRepWorkerId *wid = lfirst(lc);
logicalrep_worker_stop(wid->subid, wid->relid); logicalrep_worker_stop(wid->subid, wid->relid);
} }
}
if (on_commit_launcher_wakeup) if (on_commit_launcher_wakeup)
ApplyLauncherWakeup(); ApplyLauncherWakeup();
...@@ -848,10 +894,64 @@ AtEOXact_ApplyLauncher(bool isCommit) ...@@ -848,10 +894,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
* No need to pfree on_commit_stop_workers. It was allocated in * No need to pfree on_commit_stop_workers. It was allocated in
* transaction memory context, which is going to be cleaned soon. * transaction memory context, which is going to be cleaned soon.
*/ */
on_commit_stop_workers = NIL; on_commit_stop_workers = NULL;
on_commit_launcher_wakeup = false; on_commit_launcher_wakeup = false;
} }
/*
* On commit, merge the current on_commit_stop_workers list into the
* immediate parent, if present.
* On rollback, discard the current on_commit_stop_workers list.
* Pop out the stack.
*/
void
AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
{
StopWorkersData *parent;
/* Exit immediately if there's no work to do at this level. */
if (on_commit_stop_workers == NULL ||
on_commit_stop_workers->nestDepth < nestDepth)
return;
Assert(on_commit_stop_workers->nestDepth == nestDepth);
parent = on_commit_stop_workers->parent;
if (isCommit)
{
/*
* If the upper stack element is not an immediate parent
* subtransaction, just decrement the notional nesting depth without
* doing any real work. Else, we need to merge the current workers
* list into the parent.
*/
if (!parent || parent->nestDepth < nestDepth - 1)
{
on_commit_stop_workers->nestDepth--;
return;
}
parent->workers =
list_concat(parent->workers, on_commit_stop_workers->workers);
}
else
{
/*
* Abandon everything that was done at this nesting level. Explicitly
* free memory to avoid a transaction-lifespan leak.
*/
list_free_deep(on_commit_stop_workers->workers);
}
/*
* We have taken care of the current subtransaction workers list for both
* abort or commit. So we are ready to pop the stack.
*/
pfree(on_commit_stop_workers);
on_commit_stop_workers = parent;
}
/* /*
* Request wakeup of the launcher on commit of the transaction. * Request wakeup of the launcher on commit of the transaction.
* *
......
...@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void); ...@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherWakeupAtCommit(void); extern void ApplyLauncherWakeupAtCommit(void);
extern bool XactManipulatesLogicalReplicationWorkers(void); extern bool XactManipulatesLogicalReplicationWorkers(void);
extern void AtEOXact_ApplyLauncher(bool isCommit); extern void AtEOXact_ApplyLauncher(bool isCommit);
extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
extern bool IsLogicalLauncher(void); extern bool IsLogicalLauncher(void);
......
...@@ -2112,6 +2112,7 @@ StdAnalyzeData ...@@ -2112,6 +2112,7 @@ StdAnalyzeData
StdRdOptions StdRdOptions
Step Step
StopList StopList
StopWorkersData
StrategyNumber StrategyNumber
StreamCtl StreamCtl
StringInfo StringInfo
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment