diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c index 24b8291083546a1a9ca8d51892ba2e2d38e6867d..b21a31345f76d44fadf1f3e69f9c4906fd30ae48 100644 --- a/src/backend/access/transam/commit_ts.c +++ b/src/backend/access/transam/commit_ts.c @@ -78,13 +78,21 @@ static SlruCtlData CommitTsCtlData; #define CommitTsCtl (&CommitTsCtlData) /* - * We keep a cache of the last value set in shared memory. This is protected - * by CommitTsLock. + * We keep a cache of the last value set in shared memory. + * + * This is also good place to keep the activation status. We keep this + * separate from the GUC so that the standby can activate the module if the + * primary has it active independently of the value of the GUC. + * + * This is protected by CommitTsLock. In some places, we use commitTsActive + * without acquiring the lock; where this happens, a comment explains the + * rationale for it. */ typedef struct CommitTimestampShared { TransactionId xidLastCommit; CommitTimestampEntry dataLastCommit; + bool commitTsActive; } CommitTimestampShared; CommitTimestampShared *commitTsShared; @@ -93,14 +101,6 @@ CommitTimestampShared *commitTsShared; /* GUC variable */ bool track_commit_timestamp; -/* - * When this is set, commit_ts is force-enabled during recovery. This is so - * that a standby can replay WAL records coming from a master with the setting - * enabled. (Note that this doesn't enable SQL access to the data; it's - * effectively write-only until the GUC itself is enabled.) - */ -static bool enable_during_recovery; - static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno); @@ -109,7 +109,7 @@ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, static int ZeroCommitTsPage(int pageno, bool writeXlog); static bool CommitTsPagePrecedes(int page1, int page2); static void ActivateCommitTs(void); -static void DeactivateCommitTs(bool do_wal); +static void DeactivateCommitTs(void); static void WriteZeroPageXlogRec(int pageno); static void WriteTruncateXlogRec(int pageno); static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, @@ -149,10 +149,14 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId newestXact; /* - * No-op if the module is not enabled, but allow writes in a standby - * during recovery. + * No-op if the module is not active. + * + * An unlocked read here is fine, because in a standby (the only place + * where the flag can change in flight) this routine is only called by + * the recovery process, which is also the only process which can change + * the flag. */ - if (!track_commit_timestamp && !enable_during_recovery) + if (!commitTsShared->commitTsActive) return; /* @@ -283,30 +287,45 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, TransactionId oldestCommitTs; TransactionId newestCommitTs; + /* error if the given Xid doesn't normally commit */ + if (!TransactionIdIsNormal(xid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot retrieve commit timestamp for transaction %u", xid))); + + LWLockAcquire(CommitTsLock, LW_SHARED); + /* Error if module not enabled */ - if (!track_commit_timestamp) + if (!commitTsShared->commitTsActive) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not get commit timestamp data"), errhint("Make sure the configuration parameter \"%s\" is set.", "track_commit_timestamp"))); - /* error if the given Xid doesn't normally commit */ - if (!TransactionIdIsNormal(xid)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot retrieve commit timestamp for transaction %u", xid))); - /* - * Return empty if the requested value is outside our valid range. + * If we're asked for the cached value, return that. Otherwise, fall + * through to read from SLRU. */ - LWLockAcquire(CommitTsLock, LW_SHARED); + if (commitTsShared->xidLastCommit == xid) + { + *ts = commitTsShared->dataLastCommit.time; + if (nodeid) + *nodeid = commitTsShared->dataLastCommit.nodeid; + + LWLockRelease(CommitTsLock); + return *ts != 0; + } + oldestCommitTs = ShmemVariableCache->oldestCommitTs; newestCommitTs = ShmemVariableCache->newestCommitTs; /* neither is invalid, or both are */ Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs)); LWLockRelease(CommitTsLock); + /* + * Return empty if the requested value is outside our valid range. + */ if (!TransactionIdIsValid(oldestCommitTs) || TransactionIdPrecedes(xid, oldestCommitTs) || TransactionIdPrecedes(newestCommitTs, xid)) @@ -317,27 +336,6 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, return false; } - /* - * Use an unlocked atomic read on our cached value in shared memory; if - * it's a hit, acquire a lock and read the data, after verifying that it's - * still what we initially read. Otherwise, fall through to read from - * SLRU. - */ - if (commitTsShared->xidLastCommit == xid) - { - LWLockAcquire(CommitTsLock, LW_SHARED); - if (commitTsShared->xidLastCommit == xid) - { - *ts = commitTsShared->dataLastCommit.time; - if (nodeid) - *nodeid = commitTsShared->dataLastCommit.nodeid; - - LWLockRelease(CommitTsLock); - return *ts != 0; - } - LWLockRelease(CommitTsLock); - } - /* lock is acquired by SimpleLruReadPage_ReadOnly */ slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid); memcpy(&entry, @@ -366,15 +364,16 @@ GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid) { TransactionId xid; + LWLockAcquire(CommitTsLock, LW_SHARED); + /* Error if module not enabled */ - if (!track_commit_timestamp) + if (!commitTsShared->commitTsActive) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not get commit timestamp data"), errhint("Make sure the configuration parameter \"%s\" is set.", "track_commit_timestamp"))); - LWLockAcquire(CommitTsLock, LW_SHARED); xid = commitTsShared->xidLastCommit; if (ts) *ts = commitTsShared->dataLastCommit.time; @@ -493,6 +492,7 @@ CommitTsShmemInit(void) commitTsShared->xidLastCommit = InvalidTransactionId; TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; + commitTsShared->commitTsActive = false; } else Assert(found); @@ -566,7 +566,7 @@ CompleteCommitTsInitialization(void) * any leftover data. */ if (!track_commit_timestamp) - DeactivateCommitTs(true); + DeactivateCommitTs(); } /* @@ -588,11 +588,11 @@ CommitTsParameterChange(bool newvalue, bool oldvalue) */ if (newvalue) { - if (!track_commit_timestamp && !oldvalue) + if (!commitTsShared->commitTsActive) ActivateCommitTs(); } - else if (!track_commit_timestamp && oldvalue) - DeactivateCommitTs(false); + else if (commitTsShared->commitTsActive) + DeactivateCommitTs(); } /* @@ -645,7 +645,7 @@ ActivateCommitTs(void) } LWLockRelease(CommitTsLock); - /* Finally, create the current segment file, if necessary */ + /* Create the current segment file, if necessary */ if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno)) { int slotno; @@ -657,8 +657,10 @@ ActivateCommitTs(void) LWLockRelease(CommitTsControlLock); } - /* We can now replay xlog records from this module */ - enable_during_recovery = true; + /* Change the activation status in shared memory. */ + LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); + commitTsShared->commitTsActive = true; + LWLockRelease(CommitTsLock); } /* @@ -672,21 +674,25 @@ ActivateCommitTs(void) * possibly-invalid data; also removes segments of old data. */ static void -DeactivateCommitTs(bool do_wal) +DeactivateCommitTs(void) { - TransactionId xid = ShmemVariableCache->nextXid; - int pageno = TransactionIdToCTsPage(xid); - /* - * Re-Initialize our idea of the latest page number. + * Cleanup the status in the shared memory. + * + * We reset everything in the commitTsShared record to prevent user from + * getting confusing data about last committed transaction on the standby + * when the module was activated repeatedly on the primary. */ - LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); - CommitTsCtl->shared->latest_page_number = pageno; - LWLockRelease(CommitTsControlLock); - LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); + + commitTsShared->commitTsActive = false; + commitTsShared->xidLastCommit = InvalidTransactionId; + TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); + commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; + ShmemVariableCache->oldestCommitTs = InvalidTransactionId; ShmemVariableCache->newestCommitTs = InvalidTransactionId; + LWLockRelease(CommitTsLock); /* @@ -697,10 +703,9 @@ DeactivateCommitTs(bool do_wal) * be overwritten anyway when we wrap around, but it seems better to be * tidy.) */ + LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL); - - /* No longer enabled on recovery */ - enable_during_recovery = false; + LWLockRelease(CommitTsControlLock); } /* @@ -739,8 +744,13 @@ ExtendCommitTs(TransactionId newestXact) { int pageno; - /* nothing to do if module not enabled */ - if (!track_commit_timestamp && !enable_during_recovery) + /* + * Nothing to do if module not enabled. Note we do an unlocked read of the + * flag here, which is okay because this routine is only called from + * GetNewTransactionId, which is never called in a standby. + */ + Assert(!InRecovery); + if (!commitTsShared->commitTsActive) return; /* @@ -768,7 +778,7 @@ ExtendCommitTs(TransactionId newestXact) * Note that we don't need to flush XLOG here. */ void -TruncateCommitTs(TransactionId oldestXact, bool do_wal) +TruncateCommitTs(TransactionId oldestXact) { int cutoffPage; @@ -784,8 +794,7 @@ TruncateCommitTs(TransactionId oldestXact, bool do_wal) return; /* nothing to remove */ /* Write XLOG record */ - if (do_wal) - WriteTruncateXlogRec(cutoffPage); + WriteTruncateXlogRec(cutoffPage); /* Now we can remove the old CommitTs segment(s) */ SimpleLruTruncate(CommitTsCtl, cutoffPage); diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 6d55148edd4a72824f042cbea839bb22a55ac6dc..7c4ef58129e650bd62d20deba0e8df8201f9bdd1 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -1140,7 +1140,7 @@ vac_truncate_clog(TransactionId frozenXID, * Truncate CLOG, multixact and CommitTs to the oldest computed value. */ TruncateCLOG(frozenXID); - TruncateCommitTs(frozenXID, true); + TruncateCommitTs(frozenXID); TruncateMultiXact(minMulti, minmulti_datoid); /* diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h index 1b95b5837ea31dcc6433d371a1fb39b384b374c2..3844bb30ff252f0708c4ad63a18d297bfbcfe47c 100644 --- a/src/include/access/commit_ts.h +++ b/src/include/access/commit_ts.h @@ -40,7 +40,7 @@ extern void CompleteCommitTsInitialization(void); extern void ShutdownCommitTs(void); extern void CheckPointCommitTs(void); extern void ExtendCommitTs(TransactionId newestXact); -extern void TruncateCommitTs(TransactionId oldestXact, bool do_wal); +extern void TruncateCommitTs(TransactionId oldestXact); extern void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact); extern void AdvanceOldestCommitTs(TransactionId oldestXact);