Skip to content
Snippets Groups Projects
Select Git revision
  • benchmark-tools
  • postgres-lambda
  • master default
  • REL9_4_25
  • REL9_5_20
  • REL9_6_16
  • REL_10_11
  • REL_11_6
  • REL_12_1
  • REL_12_0
  • REL_12_RC1
  • REL_12_BETA4
  • REL9_4_24
  • REL9_5_19
  • REL9_6_15
  • REL_10_10
  • REL_11_5
  • REL_12_BETA3
  • REL9_4_23
  • REL9_5_18
  • REL9_6_14
  • REL_10_9
  • REL_11_4
23 results

xact.c

Blame
  • xact.c 134.76 KiB
    /*-------------------------------------------------------------------------
     *
     * xact.c
     *	  top level transaction system support routines
     *
     * See src/backend/access/transam/README for more information.
     *
     * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
     * Portions Copyright (c) 1994, Regents of the University of California
     *
     *
     * IDENTIFICATION
     *	  src/backend/access/transam/xact.c
     *
     *-------------------------------------------------------------------------
     */
    
    #include "postgres.h"
    
    #include <time.h>
    #include <unistd.h>
    
    #include "access/multixact.h"
    #include "access/subtrans.h"
    #include "access/transam.h"
    #include "access/twophase.h"
    #include "access/xact.h"
    #include "access/xlogutils.h"
    #include "catalog/catalog.h"
    #include "catalog/namespace.h"
    #include "catalog/storage.h"
    #include "commands/async.h"
    #include "commands/tablecmds.h"
    #include "commands/trigger.h"
    #include "executor/spi.h"
    #include "libpq/be-fsstubs.h"
    #include "miscadmin.h"
    #include "pgstat.h"
    #include "replication/walsender.h"
    #include "replication/syncrep.h"
    #include "storage/lmgr.h"
    #include "storage/predicate.h"
    #include "storage/procarray.h"
    #include "storage/sinvaladt.h"
    #include "storage/smgr.h"
    #include "utils/combocid.h"
    #include "utils/guc.h"
    #include "utils/inval.h"
    #include "utils/memutils.h"
    #include "utils/relmapper.h"
    #include "utils/snapmgr.h"
    #include "utils/timestamp.h"
    #include "pg_trace.h"
    
    
    /*
     *	User-tweakable parameters
     */
    int			DefaultXactIsoLevel = XACT_READ_COMMITTED;
    int			XactIsoLevel;
    
    bool		DefaultXactReadOnly = false;
    bool		XactReadOnly;
    
    bool		DefaultXactDeferrable = false;
    bool		XactDeferrable;
    
    int			synchronous_commit = SYNCHRONOUS_COMMIT_ON;
    
    int			CommitDelay = 0;	/* precommit delay in microseconds */
    int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
    
    /*
     * MyXactAccessedTempRel is set when a temporary relation is accessed.
     * We don't allow PREPARE TRANSACTION in that case.  (This is global
     * so that it can be set from heapam.c.)
     */
    bool		MyXactAccessedTempRel = false;
    
    
    /*
     *	transaction states - transaction state from server perspective
     */
    typedef enum TransState
    {
    	TRANS_DEFAULT,				/* idle */
    	TRANS_START,				/* transaction starting */
    	TRANS_INPROGRESS,			/* inside a valid transaction */
    	TRANS_COMMIT,				/* commit in progress */
    	TRANS_ABORT,				/* abort in progress */
    	TRANS_PREPARE				/* prepare in progress */
    } TransState;
    
    /*
     *	transaction block states - transaction state of client queries
     *
     * Note: the subtransaction states are used only for non-topmost
     * transactions; the others appear only in the topmost transaction.
     */
    typedef enum TBlockState
    {
    	/* not-in-transaction-block states */
    	TBLOCK_DEFAULT,				/* idle */
    	TBLOCK_STARTED,				/* running single-query transaction */
    
    	/* transaction block states */
    	TBLOCK_BEGIN,				/* starting transaction block */
    	TBLOCK_INPROGRESS,			/* live transaction */
    	TBLOCK_END,					/* COMMIT received */
    	TBLOCK_ABORT,				/* failed xact, awaiting ROLLBACK */
    	TBLOCK_ABORT_END,			/* failed xact, ROLLBACK received */
    	TBLOCK_ABORT_PENDING,		/* live xact, ROLLBACK received */
    	TBLOCK_PREPARE,				/* live xact, PREPARE received */
    
    	/* subtransaction states */
    	TBLOCK_SUBBEGIN,			/* starting a subtransaction */
    	TBLOCK_SUBINPROGRESS,		/* live subtransaction */
    	TBLOCK_SUBRELEASE,			/* RELEASE received */
    	TBLOCK_SUBCOMMIT,			/* COMMIT received while TBLOCK_SUBINPROGRESS */
    	TBLOCK_SUBABORT,			/* failed subxact, awaiting ROLLBACK */
    	TBLOCK_SUBABORT_END,		/* failed subxact, ROLLBACK received */
    	TBLOCK_SUBABORT_PENDING,	/* live subxact, ROLLBACK received */
    	TBLOCK_SUBRESTART,			/* live subxact, ROLLBACK TO received */
    	TBLOCK_SUBABORT_RESTART		/* failed subxact, ROLLBACK TO received */
    } TBlockState;
    
    /*
     *	transaction state structure
     */
    typedef struct TransactionStateData
    {
    	TransactionId transactionId;	/* my XID, or Invalid if none */
    	SubTransactionId subTransactionId;	/* my subxact ID */
    	char	   *name;			/* savepoint name, if any */
    	int			savepointLevel; /* savepoint level */
    	TransState	state;			/* low-level state */
    	TBlockState blockState;		/* high-level state */
    	int			nestingLevel;	/* transaction nesting depth */
    	int			gucNestLevel;	/* GUC context nesting depth */
    	MemoryContext curTransactionContext;		/* my xact-lifetime context */
    	ResourceOwner curTransactionOwner;	/* my query resources */
    	TransactionId *childXids;	/* subcommitted child XIDs, in XID order */
    	int			nChildXids;		/* # of subcommitted child XIDs */
    	int			maxChildXids;	/* allocated size of childXids[] */
    	Oid			prevUser;		/* previous CurrentUserId setting */
    	int			prevSecContext; /* previous SecurityRestrictionContext */
    	bool		prevXactReadOnly;		/* entry-time xact r/o state */
    	bool		startedInRecovery;		/* did we start in recovery? */
    	struct TransactionStateData *parent;		/* back link to parent */
    } TransactionStateData;
    
    typedef TransactionStateData *TransactionState;
    
    /*
     * CurrentTransactionState always points to the current transaction state
     * block.  It will point to TopTransactionStateData when not in a
     * transaction at all, or when in a top-level transaction.
     */
    static TransactionStateData TopTransactionStateData = {
    	0,							/* transaction id */
    	0,							/* subtransaction id */
    	NULL,						/* savepoint name */
    	0,							/* savepoint level */
    	TRANS_DEFAULT,				/* transaction state */
    	TBLOCK_DEFAULT,				/* transaction block state from the client
    								 * perspective */
    	0,							/* transaction nesting depth */
    	0,							/* GUC context nesting depth */
    	NULL,						/* cur transaction context */
    	NULL,						/* cur transaction resource owner */
    	NULL,						/* subcommitted child Xids */
    	0,							/* # of subcommitted child Xids */
    	0,							/* allocated size of childXids[] */
    	InvalidOid,					/* previous CurrentUserId setting */
    	0,							/* previous SecurityRestrictionContext */
    	false,						/* entry-time xact r/o state */
    	false,						/* startedInRecovery */
    	NULL						/* link to parent state block */
    };
    
    /*
     * unreportedXids holds XIDs of all subtransactions that have not yet been
     * reported in a XLOG_XACT_ASSIGNMENT record.
     */
    static int	nUnreportedXids;
    static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];
    
    static TransactionState CurrentTransactionState = &TopTransactionStateData;
    
    /*
     * The subtransaction ID and command ID assignment counters are global
     * to a whole transaction, so we do not keep them in the state stack.
     */
    static SubTransactionId currentSubTransactionId;
    static CommandId currentCommandId;
    static bool currentCommandIdUsed;
    
    /*
     * xactStartTimestamp is the value of transaction_timestamp().
     * stmtStartTimestamp is the value of statement_timestamp().
     * xactStopTimestamp is the time at which we log a commit or abort WAL record.
     * These do not change as we enter and exit subtransactions, so we don't
     * keep them inside the TransactionState stack.
     */
    static TimestampTz xactStartTimestamp;
    static TimestampTz stmtStartTimestamp;
    static TimestampTz xactStopTimestamp;
    
    /*
     * GID to be used for preparing the current transaction.  This is also
     * global to a whole transaction, so we don't keep it in the state stack.
     */
    static char *prepareGID;
    
    /*
     * Some commands want to force synchronous commit.
     */
    static bool forceSyncCommit = false;
    
    /*
     * Private context for transaction-abort work --- we reserve space for this
     * at startup to ensure that AbortTransaction and AbortSubTransaction can work
     * when we've run out of memory.
     */
    static MemoryContext TransactionAbortContext = NULL;
    
    /*
     * List of add-on start- and end-of-xact callbacks
     */
    typedef struct XactCallbackItem
    {
    	struct XactCallbackItem *next;
    	XactCallback callback;
    	void	   *arg;
    } XactCallbackItem;
    
    static XactCallbackItem *Xact_callbacks = NULL;
    
    /*
     * List of add-on start- and end-of-subxact callbacks
     */
    typedef struct SubXactCallbackItem
    {
    	struct SubXactCallbackItem *next;
    	SubXactCallback callback;
    	void	   *arg;
    } SubXactCallbackItem;
    
    static SubXactCallbackItem *SubXact_callbacks = NULL;
    
    
    /* local function prototypes */
    static void AssignTransactionId(TransactionState s);
    static void AbortTransaction(void);
    static void AtAbort_Memory(void);
    static void AtCleanup_Memory(void);
    static void AtAbort_ResourceOwner(void);
    static void AtCCI_LocalCache(void);
    static void AtCommit_Memory(void);
    static void AtStart_Cache(void);
    static void AtStart_Memory(void);
    static void AtStart_ResourceOwner(void);
    static void CallXactCallbacks(XactEvent event);
    static void CallSubXactCallbacks(SubXactEvent event,
    					 SubTransactionId mySubid,
    					 SubTransactionId parentSubid);
    static void CleanupTransaction(void);
    static void CommitTransaction(void);
    static TransactionId RecordTransactionAbort(bool isSubXact);
    static void StartTransaction(void);
    
    static void StartSubTransaction(void);
    static void CommitSubTransaction(void);
    static void AbortSubTransaction(void);
    static void CleanupSubTransaction(void);
    static void PushTransaction(void);
    static void PopTransaction(void);
    
    static void AtSubAbort_Memory(void);
    static void AtSubCleanup_Memory(void);
    static void AtSubAbort_ResourceOwner(void);
    static void AtSubCommit_Memory(void);
    static void AtSubStart_Memory(void);
    static void AtSubStart_ResourceOwner(void);
    
    static void ShowTransactionState(const char *str);
    static void ShowTransactionStateRec(TransactionState state);
    static const char *BlockStateAsString(TBlockState blockState);
    static const char *TransStateAsString(TransState state);
    
    
    /* ----------------------------------------------------------------
     *	transaction state accessors
     * ----------------------------------------------------------------
     */
    
    /*
     *	IsTransactionState
     *
     *	This returns true if we are inside a valid transaction; that is,
     *	it is safe to initiate database access, take heavyweight locks, etc.
     */
    bool
    IsTransactionState(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	/*
    	 * TRANS_DEFAULT and TRANS_ABORT are obviously unsafe states.  However, we
    	 * also reject the startup/shutdown states TRANS_START, TRANS_COMMIT,
    	 * TRANS_PREPARE since it might be too soon or too late within those
    	 * transition states to do anything interesting.  Hence, the only "valid"
    	 * state is TRANS_INPROGRESS.
    	 */
    	return (s->state == TRANS_INPROGRESS);
    }
    
    /*
     *	IsAbortedTransactionBlockState
     *
     *	This returns true if we are within an aborted transaction block.
     */
    bool
    IsAbortedTransactionBlockState(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	if (s->blockState == TBLOCK_ABORT ||
    		s->blockState == TBLOCK_SUBABORT)
    		return true;
    
    	return false;
    }
    
    
    /*
     *	GetTopTransactionId
     *
     * This will return the XID of the main transaction, assigning one if
     * it's not yet set.  Be careful to call this only inside a valid xact.
     */
    TransactionId
    GetTopTransactionId(void)
    {
    	if (!TransactionIdIsValid(TopTransactionStateData.transactionId))
    		AssignTransactionId(&TopTransactionStateData);
    	return TopTransactionStateData.transactionId;
    }
    
    /*
     *	GetTopTransactionIdIfAny
     *
     * This will return the XID of the main transaction, if one is assigned.
     * It will return InvalidTransactionId if we are not currently inside a
     * transaction, or inside a transaction that hasn't yet been assigned an XID.
     */
    TransactionId
    GetTopTransactionIdIfAny(void)
    {
    	return TopTransactionStateData.transactionId;
    }
    
    /*
     *	GetCurrentTransactionId
     *
     * This will return the XID of the current transaction (main or sub
     * transaction), assigning one if it's not yet set.  Be careful to call this
     * only inside a valid xact.
     */
    TransactionId
    GetCurrentTransactionId(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	if (!TransactionIdIsValid(s->transactionId))
    		AssignTransactionId(s);
    	return s->transactionId;
    }
    
    /*
     *	GetCurrentTransactionIdIfAny
     *
     * This will return the XID of the current sub xact, if one is assigned.
     * It will return InvalidTransactionId if we are not currently inside a
     * transaction, or inside a transaction that hasn't been assigned an XID yet.
     */
    TransactionId
    GetCurrentTransactionIdIfAny(void)
    {
    	return CurrentTransactionState->transactionId;
    }
    
    /*
     *	GetStableLatestTransactionId
     *
     * Get the XID once and then return same value for rest of transaction.
     * Acts as a useful reference point for maintenance tasks.
     */
    TransactionId
    GetStableLatestTransactionId(void)
    {
    	static LocalTransactionId lxid = InvalidLocalTransactionId;
    	static TransactionId stablexid = InvalidTransactionId;
    
    	if (lxid != MyProc->lxid)
    	{
    		lxid = MyProc->lxid;
    		stablexid = GetTopTransactionIdIfAny();
    		if (!TransactionIdIsValid(stablexid))
    			stablexid = ReadNewTransactionId();
    	}
    
    	Assert(TransactionIdIsValid(stablexid));
    
    	return stablexid;
    }
    
    /*
     * AssignTransactionId
     *
     * Assigns a new permanent XID to the given TransactionState.
     * We do not assign XIDs to transactions until/unless this is called.
     * Also, any parent TransactionStates that don't yet have XIDs are assigned
     * one; this maintains the invariant that a child transaction has an XID
     * following its parent's.
     */
    static void
    AssignTransactionId(TransactionState s)
    {
    	bool		isSubXact = (s->parent != NULL);
    	ResourceOwner currentOwner;
    
    	/* Assert that caller didn't screw up */
    	Assert(!TransactionIdIsValid(s->transactionId));
    	Assert(s->state == TRANS_INPROGRESS);
    
    	/*
    	 * Ensure parent(s) have XIDs, so that a child always has an XID later
    	 * than its parent.  Musn't recurse here, or we might get a stack overflow
    	 * if we're at the bottom of a huge stack of subtransactions none of which
    	 * have XIDs yet.
    	 */
    	if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
    	{
    		TransactionState p = s->parent;
    		TransactionState *parents;
    		size_t		parentOffset = 0;
    
    		parents = palloc(sizeof(TransactionState) * s->nestingLevel);
    		while (p != NULL && !TransactionIdIsValid(p->transactionId))
    		{
    			parents[parentOffset++] = p;
    			p = p->parent;
    		}
    
    		/*
    		 * This is technically a recursive call, but the recursion will never
    		 * be more than one layer deep.
    		 */
    		while (parentOffset != 0)
    			AssignTransactionId(parents[--parentOffset]);
    
    		pfree(parents);
    	}
    
    	/*
    	 * Generate a new Xid and record it in PG_PROC and pg_subtrans.
    	 *
    	 * NB: we must make the subtrans entry BEFORE the Xid appears anywhere in
    	 * shared storage other than PG_PROC; because if there's no room for it in
    	 * PG_PROC, the subtrans entry is needed to ensure that other backends see
    	 * the Xid as "running".  See GetNewTransactionId.
    	 */
    	s->transactionId = GetNewTransactionId(isSubXact);
    
    	if (isSubXact)
    		SubTransSetParent(s->transactionId, s->parent->transactionId, false);
    
    	/*
    	 * If it's a top-level transaction, the predicate locking system needs to
    	 * be told about it too.
    	 */
    	if (!isSubXact)
    		RegisterPredicateLockingXid(s->transactionId);
    
    	/*
    	 * Acquire lock on the transaction XID.  (We assume this cannot block.) We
    	 * have to ensure that the lock is assigned to the transaction's own
    	 * ResourceOwner.
    	 */
    	currentOwner = CurrentResourceOwner;
    	PG_TRY();
    	{
    		CurrentResourceOwner = s->curTransactionOwner;
    		XactLockTableInsert(s->transactionId);
    	}
    	PG_CATCH();
    	{
    		/* Ensure CurrentResourceOwner is restored on error */
    		CurrentResourceOwner = currentOwner;
    		PG_RE_THROW();
    	}
    	PG_END_TRY();
    	CurrentResourceOwner = currentOwner;
    
    	/*
    	 * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each
    	 * top-level transaction we issue a WAL record for the assignment. We
    	 * include the top-level xid and all the subxids that have not yet been
    	 * reported using XLOG_XACT_ASSIGNMENT records.
    	 *
    	 * This is required to limit the amount of shared memory required in a hot
    	 * standby server to keep track of in-progress XIDs. See notes for
    	 * RecordKnownAssignedTransactionIds().
    	 *
    	 * We don't keep track of the immediate parent of each subxid, only the
    	 * top-level transaction that each subxact belongs to. This is correct in
    	 * recovery only because aborted subtransactions are separately WAL
    	 * logged.
    	 */
    	if (isSubXact && XLogStandbyInfoActive())
    	{
    		unreportedXids[nUnreportedXids] = s->transactionId;
    		nUnreportedXids++;
    
    		/*
    		 * ensure this test matches similar one in
    		 * RecoverPreparedTransactions()
    		 */
    		if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS)
    		{
    			XLogRecData rdata[2];
    			xl_xact_assignment xlrec;
    
    			/*
    			 * xtop is always set by now because we recurse up transaction
    			 * stack to the highest unassigned xid and then come back down
    			 */
    			xlrec.xtop = GetTopTransactionId();
    			Assert(TransactionIdIsValid(xlrec.xtop));
    			xlrec.nsubxacts = nUnreportedXids;
    
    			rdata[0].data = (char *) &xlrec;
    			rdata[0].len = MinSizeOfXactAssignment;
    			rdata[0].buffer = InvalidBuffer;
    			rdata[0].next = &rdata[1];
    
    			rdata[1].data = (char *) unreportedXids;
    			rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId);
    			rdata[1].buffer = InvalidBuffer;
    			rdata[1].next = NULL;
    
    			(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata);
    
    			nUnreportedXids = 0;
    		}
    	}
    }
    
    /*
     *	GetCurrentSubTransactionId
     */
    SubTransactionId
    GetCurrentSubTransactionId(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	return s->subTransactionId;
    }
    
    
    /*
     *	GetCurrentCommandId
     *
     * "used" must be TRUE if the caller intends to use the command ID to mark
     * inserted/updated/deleted tuples.  FALSE means the ID is being fetched
     * for read-only purposes (ie, as a snapshot validity cutoff).	See
     * CommandCounterIncrement() for discussion.
     */
    CommandId
    GetCurrentCommandId(bool used)
    {
    	/* this is global to a transaction, not subtransaction-local */
    	if (used)
    		currentCommandIdUsed = true;
    	return currentCommandId;
    }
    
    /*
     *	GetCurrentTransactionStartTimestamp
     */
    TimestampTz
    GetCurrentTransactionStartTimestamp(void)
    {
    	return xactStartTimestamp;
    }
    
    /*
     *	GetCurrentStatementStartTimestamp
     */
    TimestampTz
    GetCurrentStatementStartTimestamp(void)
    {
    	return stmtStartTimestamp;
    }
    
    /*
     *	GetCurrentTransactionStopTimestamp
     *
     * We return current time if the transaction stop time hasn't been set
     * (which can happen if we decide we don't need to log an XLOG record).
     */
    TimestampTz
    GetCurrentTransactionStopTimestamp(void)
    {
    	if (xactStopTimestamp != 0)
    		return xactStopTimestamp;
    	return GetCurrentTimestamp();
    }
    
    /*
     *	SetCurrentStatementStartTimestamp
     */
    void
    SetCurrentStatementStartTimestamp(void)
    {
    	stmtStartTimestamp = GetCurrentTimestamp();
    }
    
    /*
     *	SetCurrentTransactionStopTimestamp
     */
    static inline void
    SetCurrentTransactionStopTimestamp(void)
    {
    	xactStopTimestamp = GetCurrentTimestamp();
    }
    
    /*
     *	GetCurrentTransactionNestLevel
     *
     * Note: this will return zero when not inside any transaction, one when
     * inside a top-level transaction, etc.
     */
    int
    GetCurrentTransactionNestLevel(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	return s->nestingLevel;
    }
    
    
    /*
     *	TransactionIdIsCurrentTransactionId
     */
    bool
    TransactionIdIsCurrentTransactionId(TransactionId xid)
    {
    	TransactionState s;
    
    	/*
    	 * We always say that BootstrapTransactionId is "not my transaction ID"
    	 * even when it is (ie, during bootstrap).	Along with the fact that
    	 * transam.c always treats BootstrapTransactionId as already committed,
    	 * this causes the tqual.c routines to see all tuples as committed, which
    	 * is what we need during bootstrap.  (Bootstrap mode only inserts tuples,
    	 * it never updates or deletes them, so all tuples can be presumed good
    	 * immediately.)
    	 *
    	 * Likewise, InvalidTransactionId and FrozenTransactionId are certainly
    	 * not my transaction ID, so we can just return "false" immediately for
    	 * any non-normal XID.
    	 */
    	if (!TransactionIdIsNormal(xid))
    		return false;
    
    	/*
    	 * We will return true for the Xid of the current subtransaction, any of
    	 * its subcommitted children, any of its parents, or any of their
    	 * previously subcommitted children.  However, a transaction being aborted
    	 * is no longer "current", even though it may still have an entry on the
    	 * state stack.
    	 */
    	for (s = CurrentTransactionState; s != NULL; s = s->parent)
    	{
    		int			low,
    					high;
    
    		if (s->state == TRANS_ABORT)
    			continue;
    		if (!TransactionIdIsValid(s->transactionId))
    			continue;			/* it can't have any child XIDs either */
    		if (TransactionIdEquals(xid, s->transactionId))
    			return true;
    		/* As the childXids array is ordered, we can use binary search */
    		low = 0;
    		high = s->nChildXids - 1;
    		while (low <= high)
    		{
    			int			middle;
    			TransactionId probe;
    
    			middle = low + (high - low) / 2;
    			probe = s->childXids[middle];
    			if (TransactionIdEquals(probe, xid))
    				return true;
    			else if (TransactionIdPrecedes(probe, xid))
    				low = middle + 1;
    			else
    				high = middle - 1;
    		}
    	}
    
    	return false;
    }
    
    /*
     *	TransactionStartedDuringRecovery
     *
     * Returns true if the current transaction started while recovery was still
     * in progress. Recovery might have ended since so RecoveryInProgress() might
     * return false already.
     */
    bool
    TransactionStartedDuringRecovery(void)
    {
    	return CurrentTransactionState->startedInRecovery;
    }
    
    /*
     *	CommandCounterIncrement
     */
    void
    CommandCounterIncrement(void)
    {
    	/*
    	 * If the current value of the command counter hasn't been "used" to mark
    	 * tuples, we need not increment it, since there's no need to distinguish
    	 * a read-only command from others.  This helps postpone command counter
    	 * overflow, and keeps no-op CommandCounterIncrement operations cheap.
    	 */
    	if (currentCommandIdUsed)
    	{
    		currentCommandId += 1;
    		if (currentCommandId == FirstCommandId) /* check for overflow */
    		{
    			currentCommandId -= 1;
    			ereport(ERROR,
    					(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    					 errmsg("cannot have more than 2^32-1 commands in a transaction")));
    		}
    		currentCommandIdUsed = false;
    
    		/* Propagate new command ID into static snapshots */
    		SnapshotSetCommandId(currentCommandId);
    
    		/*
    		 * Make any catalog changes done by the just-completed command visible
    		 * in the local syscache.  We obviously don't need to do this after a
    		 * read-only command.  (But see hacks in inval.c to make real sure we
    		 * don't think a command that queued inval messages was read-only.)
    		 */
    		AtCCI_LocalCache();
    	}
    }
    
    /*
     * ForceSyncCommit
     *
     * Interface routine to allow commands to force a synchronous commit of the
     * current top-level transaction
     */
    void
    ForceSyncCommit(void)
    {
    	forceSyncCommit = true;
    }
    
    
    /* ----------------------------------------------------------------
     *						StartTransaction stuff
     * ----------------------------------------------------------------
     */
    
    /*
     *	AtStart_Cache
     */
    static void
    AtStart_Cache(void)
    {
    	AcceptInvalidationMessages();
    }
    
    /*
     *	AtStart_Memory
     */
    static void
    AtStart_Memory(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	/*
    	 * If this is the first time through, create a private context for
    	 * AbortTransaction to work in.  By reserving some space now, we can
    	 * insulate AbortTransaction from out-of-memory scenarios.	Like
    	 * ErrorContext, we set it up with slow growth rate and a nonzero minimum
    	 * size, so that space will be reserved immediately.
    	 */
    	if (TransactionAbortContext == NULL)
    		TransactionAbortContext =
    			AllocSetContextCreate(TopMemoryContext,
    								  "TransactionAbortContext",
    								  32 * 1024,
    								  32 * 1024,
    								  32 * 1024);
    
    	/*
    	 * We shouldn't have a transaction context already.
    	 */
    	Assert(TopTransactionContext == NULL);
    
    	/*
    	 * Create a toplevel context for the transaction.
    	 */
    	TopTransactionContext =
    		AllocSetContextCreate(TopMemoryContext,
    							  "TopTransactionContext",
    							  ALLOCSET_DEFAULT_MINSIZE,
    							  ALLOCSET_DEFAULT_INITSIZE,
    							  ALLOCSET_DEFAULT_MAXSIZE);
    
    	/*
    	 * In a top-level transaction, CurTransactionContext is the same as
    	 * TopTransactionContext.
    	 */
    	CurTransactionContext = TopTransactionContext;
    	s->curTransactionContext = CurTransactionContext;
    
    	/* Make the CurTransactionContext active. */
    	MemoryContextSwitchTo(CurTransactionContext);
    }
    
    /*
     *	AtStart_ResourceOwner
     */
    static void
    AtStart_ResourceOwner(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	/*
    	 * We shouldn't have a transaction resource owner already.
    	 */
    	Assert(TopTransactionResourceOwner == NULL);
    
    	/*
    	 * Create a toplevel resource owner for the transaction.
    	 */
    	s->curTransactionOwner = ResourceOwnerCreate(NULL, "TopTransaction");
    
    	TopTransactionResourceOwner = s->curTransactionOwner;
    	CurTransactionResourceOwner = s->curTransactionOwner;
    	CurrentResourceOwner = s->curTransactionOwner;
    }
    
    /* ----------------------------------------------------------------
     *						StartSubTransaction stuff
     * ----------------------------------------------------------------
     */
    
    /*
     * AtSubStart_Memory
     */
    static void
    AtSubStart_Memory(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	Assert(CurTransactionContext != NULL);
    
    	/*
    	 * Create a CurTransactionContext, which will be used to hold data that
    	 * survives subtransaction commit but disappears on subtransaction abort.
    	 * We make it a child of the immediate parent's CurTransactionContext.
    	 */
    	CurTransactionContext = AllocSetContextCreate(CurTransactionContext,
    												  "CurTransactionContext",
    												  ALLOCSET_DEFAULT_MINSIZE,
    												  ALLOCSET_DEFAULT_INITSIZE,
    												  ALLOCSET_DEFAULT_MAXSIZE);
    	s->curTransactionContext = CurTransactionContext;
    
    	/* Make the CurTransactionContext active. */
    	MemoryContextSwitchTo(CurTransactionContext);
    }
    
    /*
     * AtSubStart_ResourceOwner
     */
    static void
    AtSubStart_ResourceOwner(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	Assert(s->parent != NULL);
    
    	/*
    	 * Create a resource owner for the subtransaction.	We make it a child of
    	 * the immediate parent's resource owner.
    	 */
    	s->curTransactionOwner =
    		ResourceOwnerCreate(s->parent->curTransactionOwner,
    							"SubTransaction");
    
    	CurTransactionResourceOwner = s->curTransactionOwner;
    	CurrentResourceOwner = s->curTransactionOwner;
    }
    
    /* ----------------------------------------------------------------
     *						CommitTransaction stuff
     * ----------------------------------------------------------------
     */
    
    /*
     *	RecordTransactionCommit
     *
     * Returns latest XID among xact and its children, or InvalidTransactionId
     * if the xact has no XID.	(We compute that here just because it's easier.)
     */
    static TransactionId
    RecordTransactionCommit(void)
    {
    	TransactionId xid = GetTopTransactionIdIfAny();
    	bool		markXidCommitted = TransactionIdIsValid(xid);
    	TransactionId latestXid = InvalidTransactionId;
    	int			nrels;
    	RelFileNode *rels;
    	int			nchildren;
    	TransactionId *children;
    	int			nmsgs = 0;
    	SharedInvalidationMessage *invalMessages = NULL;
    	bool		RelcacheInitFileInval = false;
    	bool		wrote_xlog;
    
    	/* Get data needed for commit record */
    	nrels = smgrGetPendingDeletes(true, &rels);
    	nchildren = xactGetCommittedChildren(&children);
    	if (XLogStandbyInfoActive())
    		nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
    													 &RelcacheInitFileInval);
    	wrote_xlog = (XactLastRecEnd.xrecoff != 0);
    
    	/*
    	 * If we haven't been assigned an XID yet, we neither can, nor do we want
    	 * to write a COMMIT record.
    	 */
    	if (!markXidCommitted)
    	{
    		/*
    		 * We expect that every smgrscheduleunlink is followed by a catalog
    		 * update, and hence XID assignment, so we shouldn't get here with any
    		 * pending deletes.  Use a real test not just an Assert to check this,
    		 * since it's a bit fragile.
    		 */
    		if (nrels != 0)
    			elog(ERROR, "cannot commit a transaction that deleted files but has no xid");
    
    		/* Can't have child XIDs either; AssignTransactionId enforces this */
    		Assert(nchildren == 0);
    
    		/*
    		 * If we didn't create XLOG entries, we're done here; otherwise we
    		 * should flush those entries the same as a commit record.	(An
    		 * example of a possible record that wouldn't cause an XID to be
    		 * assigned is a sequence advance record due to nextval() --- we want
    		 * to flush that to disk before reporting commit.)
    		 */
    		if (!wrote_xlog)
    			goto cleanup;
    	}
    	else
    	{
    		/*
    		 * Begin commit critical section and insert the commit XLOG record.
    		 */
    		/* Tell bufmgr and smgr to prepare for commit */
    		BufmgrCommit();
    
    		/*
    		 * Mark ourselves as within our "commit critical section".	This
    		 * forces any concurrent checkpoint to wait until we've updated
    		 * pg_clog.  Without this, it is possible for the checkpoint to set
    		 * REDO after the XLOG record but fail to flush the pg_clog update to
    		 * disk, leading to loss of the transaction commit if the system
    		 * crashes a little later.
    		 *
    		 * Note: we could, but don't bother to, set this flag in
    		 * RecordTransactionAbort.	That's because loss of a transaction abort
    		 * is noncritical; the presumption would be that it aborted, anyway.
    		 *
    		 * It's safe to change the inCommit flag of our own backend without
    		 * holding the ProcArrayLock, since we're the only one modifying it.
    		 * This makes checkpoint's determination of which xacts are inCommit a
    		 * bit fuzzy, but it doesn't matter.
    		 */
    		START_CRIT_SECTION();
    		MyPgXact->inCommit = true;
    
    		SetCurrentTransactionStopTimestamp();
    
    		/*
    		 * Do we need the long commit record? If not, use the compact format.
    		 */
    		if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
    		{
    			XLogRecData rdata[4];
    			int			lastrdata = 0;
    			xl_xact_commit xlrec;
    			/*
    			 * Set flags required for recovery processing of commits.
    			 */
    			xlrec.xinfo = 0;
    			if (RelcacheInitFileInval)
    				xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
    			if (forceSyncCommit)
    				xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
    
    			xlrec.dbId = MyDatabaseId;
    			xlrec.tsId = MyDatabaseTableSpace;
    
    			xlrec.xact_time = xactStopTimestamp;
    			xlrec.nrels = nrels;
    			xlrec.nsubxacts = nchildren;
    			xlrec.nmsgs = nmsgs;
    			rdata[0].data = (char *) (&xlrec);
    			rdata[0].len = MinSizeOfXactCommit;
    			rdata[0].buffer = InvalidBuffer;
    			/* dump rels to delete */
    			if (nrels > 0)
    			{
    				rdata[0].next = &(rdata[1]);
    				rdata[1].data = (char *) rels;
    				rdata[1].len = nrels * sizeof(RelFileNode);
    				rdata[1].buffer = InvalidBuffer;
    				lastrdata = 1;
    			}
    			/* dump committed child Xids */
    			if (nchildren > 0)
    			{
    				rdata[lastrdata].next = &(rdata[2]);
    				rdata[2].data = (char *) children;
    				rdata[2].len = nchildren * sizeof(TransactionId);
    				rdata[2].buffer = InvalidBuffer;
    				lastrdata = 2;
    			}
    			/* dump shared cache invalidation messages */
    			if (nmsgs > 0)
    			{
    				rdata[lastrdata].next = &(rdata[3]);
    				rdata[3].data = (char *) invalMessages;
    				rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
    				rdata[3].buffer = InvalidBuffer;
    				lastrdata = 3;
    			}
    			rdata[lastrdata].next = NULL;
    
    			(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
    		}
    		else
    		{
    			XLogRecData rdata[2];
    			int			lastrdata = 0;
    			xl_xact_commit_compact	xlrec;
    			xlrec.xact_time = xactStopTimestamp;
    			xlrec.nsubxacts = nchildren;
    			rdata[0].data = (char *) (&xlrec);
    			rdata[0].len = MinSizeOfXactCommitCompact;
    			rdata[0].buffer = InvalidBuffer;
    			/* dump committed child Xids */
    			if (nchildren > 0)
    			{
    				rdata[0].next = &(rdata[1]);
    				rdata[1].data = (char *) children;
    				rdata[1].len = nchildren * sizeof(TransactionId);
    				rdata[1].buffer = InvalidBuffer;
    				lastrdata = 1;
    			}
    			rdata[lastrdata].next = NULL;
    
    			(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT, rdata);
    		}
    	}
    
    	/*
    	 * Check if we want to commit asynchronously.  We can allow the XLOG flush
    	 * to happen asynchronously if synchronous_commit=off, or if the current
    	 * transaction has not performed any WAL-logged operation.	The latter
    	 * case can arise if the current transaction wrote only to temporary
    	 * and/or unlogged tables.	In case of a crash, the loss of such a
    	 * transaction will be irrelevant since temp tables will be lost anyway,
    	 * and unlogged tables will be truncated.  (Given the foregoing, you might
    	 * think that it would be unnecessary to emit the XLOG record at all in
    	 * this case, but we don't currently try to do that.  It would certainly
    	 * cause problems at least in Hot Standby mode, where the
    	 * KnownAssignedXids machinery requires tracking every XID assignment.	It
    	 * might be OK to skip it only when wal_level < hot_standby, but for now
    	 * we don't.)
    	 *
    	 * However, if we're doing cleanup of any non-temp rels or committing any
    	 * command that wanted to force sync commit, then we must flush XLOG
    	 * immediately.  (We must not allow asynchronous commit if there are any
    	 * non-temp tables to be deleted, because we might delete the files before
    	 * the COMMIT record is flushed to disk.  We do allow asynchronous commit
    	 * if all to-be-deleted tables are temporary though, since they are lost
    	 * anyway if we crash.)
    	 */
    	if ((wrote_xlog && synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
    		forceSyncCommit || nrels > 0)
    	{
    		/*
    		 * Synchronous commit case:
    		 *
    		 * Sleep before flush! So we can flush more than one commit records
    		 * per single fsync.  (The idea is some other backend may do the
    		 * XLogFlush while we're sleeping.  This needs work still, because on
    		 * most Unixen, the minimum select() delay is 10msec or more, which is
    		 * way too long.)
    		 *
    		 * We do not sleep if enableFsync is not turned on, nor if there are
    		 * fewer than CommitSiblings other backends with active transactions.
    		 */
    		if (CommitDelay > 0 && enableFsync &&
    			MinimumActiveBackends(CommitSiblings))
    			pg_usleep(CommitDelay);
    
    		XLogFlush(XactLastRecEnd);
    
    		/*
    		 * Wake up all walsenders to send WAL up to the COMMIT record
    		 * immediately if replication is enabled
    		 */
    		if (max_wal_senders > 0)
    			WalSndWakeup();
    
    		/*
    		 * Now we may update the CLOG, if we wrote a COMMIT record above
    		 */
    		if (markXidCommitted)
    			TransactionIdCommitTree(xid, nchildren, children);
    	}
    	else
    	{
    		/*
    		 * Asynchronous commit case:
    		 *
    		 * This enables possible committed transaction loss in the case of a
    		 * postmaster crash because WAL buffers are left unwritten. Ideally we
    		 * could issue the WAL write without the fsync, but some
    		 * wal_sync_methods do not allow separate write/fsync.
    		 *
    		 * Report the latest async commit LSN, so that the WAL writer knows to
    		 * flush this commit.
    		 */
    		XLogSetAsyncXactLSN(XactLastRecEnd);
    
    		/*
    		 * We must not immediately update the CLOG, since we didn't flush the
    		 * XLOG. Instead, we store the LSN up to which the XLOG must be
    		 * flushed before the CLOG may be updated.
    		 */
    		if (markXidCommitted)
    			TransactionIdAsyncCommitTree(xid, nchildren, children, XactLastRecEnd);
    	}
    
    	/*
    	 * If we entered a commit critical section, leave it now, and let
    	 * checkpoints proceed.
    	 */
    	if (markXidCommitted)
    	{
    		MyPgXact->inCommit = false;
    		END_CRIT_SECTION();
    	}
    
    	/* Compute latestXid while we have the child XIDs handy */
    	latestXid = TransactionIdLatest(xid, nchildren, children);
    
    	/*
    	 * Wait for synchronous replication, if required.
    	 *
    	 * Note that at this stage we have marked clog, but still show as running
    	 * in the procarray and continue to hold locks.
    	 */
    	if (wrote_xlog)
    		SyncRepWaitForLSN(XactLastRecEnd);
    
    	/* Reset XactLastRecEnd until the next transaction writes something */
    	XactLastRecEnd.xrecoff = 0;
    
    cleanup:
    	/* Clean up local data */
    	if (rels)
    		pfree(rels);
    
    	return latestXid;
    }
    
    
    /*
     *	AtCCI_LocalCache
     */
    static void
    AtCCI_LocalCache(void)
    {
    	/*
    	 * Make any pending relation map changes visible.  We must do this before
    	 * processing local sinval messages, so that the map changes will get
    	 * reflected into the relcache when relcache invals are processed.
    	 */
    	AtCCI_RelationMap();
    
    	/*
    	 * Make catalog changes visible to me for the next command.
    	 */
    	CommandEndInvalidationMessages();
    }
    
    /*
     *	AtCommit_Memory
     */
    static void
    AtCommit_Memory(void)
    {
    	/*
    	 * Now that we're "out" of a transaction, have the system allocate things
    	 * in the top memory context instead of per-transaction contexts.
    	 */
    	MemoryContextSwitchTo(TopMemoryContext);
    
    	/*
    	 * Release all transaction-local memory.
    	 */
    	Assert(TopTransactionContext != NULL);
    	MemoryContextDelete(TopTransactionContext);
    	TopTransactionContext = NULL;
    	CurTransactionContext = NULL;
    	CurrentTransactionState->curTransactionContext = NULL;
    }
    
    /* ----------------------------------------------------------------
     *						CommitSubTransaction stuff
     * ----------------------------------------------------------------
     */
    
    /*
     * AtSubCommit_Memory
     */
    static void
    AtSubCommit_Memory(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	Assert(s->parent != NULL);
    
    	/* Return to parent transaction level's memory context. */
    	CurTransactionContext = s->parent->curTransactionContext;
    	MemoryContextSwitchTo(CurTransactionContext);
    
    	/*
    	 * Ordinarily we cannot throw away the child's CurTransactionContext,
    	 * since the data it contains will be needed at upper commit.  However, if
    	 * there isn't actually anything in it, we can throw it away.  This avoids
    	 * a small memory leak in the common case of "trivial" subxacts.
    	 */
    	if (MemoryContextIsEmpty(s->curTransactionContext))
    	{
    		MemoryContextDelete(s->curTransactionContext);
    		s->curTransactionContext = NULL;
    	}
    }
    
    /*
     * AtSubCommit_childXids
     *
     * Pass my own XID and my child XIDs up to my parent as committed children.
     */
    static void
    AtSubCommit_childXids(void)
    {
    	TransactionState s = CurrentTransactionState;
    	int			new_nChildXids;
    
    	Assert(s->parent != NULL);
    
    	/*
    	 * The parent childXids array will need to hold my XID and all my
    	 * childXids, in addition to the XIDs already there.
    	 */
    	new_nChildXids = s->parent->nChildXids + s->nChildXids + 1;
    
    	/* Allocate or enlarge the parent array if necessary */
    	if (s->parent->maxChildXids < new_nChildXids)
    	{
    		int			new_maxChildXids;
    		TransactionId *new_childXids;
    
    		/*
    		 * Make it 2x what's needed right now, to avoid having to enlarge it
    		 * repeatedly. But we can't go above MaxAllocSize.  (The latter limit
    		 * is what ensures that we don't need to worry about integer overflow
    		 * here or in the calculation of new_nChildXids.)
    		 */
    		new_maxChildXids = Min(new_nChildXids * 2,
    							   (int) (MaxAllocSize / sizeof(TransactionId)));
    
    		if (new_maxChildXids < new_nChildXids)
    			ereport(ERROR,
    					(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    					 errmsg("maximum number of committed subtransactions (%d) exceeded",
    							(int) (MaxAllocSize / sizeof(TransactionId)))));
    
    		/*
    		 * We keep the child-XID arrays in TopTransactionContext; this avoids
    		 * setting up child-transaction contexts for what might be just a few
    		 * bytes of grandchild XIDs.
    		 */
    		if (s->parent->childXids == NULL)
    			new_childXids =
    				MemoryContextAlloc(TopTransactionContext,
    								   new_maxChildXids * sizeof(TransactionId));
    		else
    			new_childXids = repalloc(s->parent->childXids,
    								   new_maxChildXids * sizeof(TransactionId));
    
    		s->parent->childXids = new_childXids;
    		s->parent->maxChildXids = new_maxChildXids;
    	}
    
    	/*
    	 * Copy all my XIDs to parent's array.
    	 *
    	 * Note: We rely on the fact that the XID of a child always follows that
    	 * of its parent.  By copying the XID of this subtransaction before the
    	 * XIDs of its children, we ensure that the array stays ordered. Likewise,
    	 * all XIDs already in the array belong to subtransactions started and
    	 * subcommitted before us, so their XIDs must precede ours.
    	 */
    	s->parent->childXids[s->parent->nChildXids] = s->transactionId;
    
    	if (s->nChildXids > 0)
    		memcpy(&s->parent->childXids[s->parent->nChildXids + 1],
    			   s->childXids,
    			   s->nChildXids * sizeof(TransactionId));
    
    	s->parent->nChildXids = new_nChildXids;
    
    	/* Release child's array to avoid leakage */
    	if (s->childXids != NULL)
    		pfree(s->childXids);
    	/* We must reset these to avoid double-free if fail later in commit */
    	s->childXids = NULL;
    	s->nChildXids = 0;
    	s->maxChildXids = 0;
    }
    
    /* ----------------------------------------------------------------
     *						AbortTransaction stuff
     * ----------------------------------------------------------------
     */
    
    /*
     *	RecordTransactionAbort
     *
     * Returns latest XID among xact and its children, or InvalidTransactionId
     * if the xact has no XID.	(We compute that here just because it's easier.)
     */
    static TransactionId
    RecordTransactionAbort(bool isSubXact)
    {
    	TransactionId xid = GetCurrentTransactionIdIfAny();
    	TransactionId latestXid;
    	int			nrels;
    	RelFileNode *rels;
    	int			nchildren;
    	TransactionId *children;
    	XLogRecData rdata[3];
    	int			lastrdata = 0;
    	xl_xact_abort xlrec;
    
    	/*
    	 * If we haven't been assigned an XID, nobody will care whether we aborted
    	 * or not.	Hence, we're done in that case.  It does not matter if we have
    	 * rels to delete (note that this routine is not responsible for actually
    	 * deleting 'em).  We cannot have any child XIDs, either.
    	 */
    	if (!TransactionIdIsValid(xid))
    	{
    		/* Reset XactLastRecEnd until the next transaction writes something */
    		if (!isSubXact)
    			XactLastRecEnd.xrecoff = 0;
    		return InvalidTransactionId;
    	}
    
    	/*
    	 * We have a valid XID, so we should write an ABORT record for it.
    	 *
    	 * We do not flush XLOG to disk here, since the default assumption after a
    	 * crash would be that we aborted, anyway.	For the same reason, we don't
    	 * need to worry about interlocking against checkpoint start.
    	 */
    
    	/*
    	 * Check that we haven't aborted halfway through RecordTransactionCommit.
    	 */
    	if (TransactionIdDidCommit(xid))
    		elog(PANIC, "cannot abort transaction %u, it was already committed",
    			 xid);
    
    	/* Fetch the data we need for the abort record */
    	nrels = smgrGetPendingDeletes(false, &rels);
    	nchildren = xactGetCommittedChildren(&children);
    
    	/* XXX do we really need a critical section here? */
    	START_CRIT_SECTION();
    
    	/* Write the ABORT record */
    	if (isSubXact)
    		xlrec.xact_time = GetCurrentTimestamp();
    	else
    	{
    		SetCurrentTransactionStopTimestamp();
    		xlrec.xact_time = xactStopTimestamp;
    	}
    	xlrec.nrels = nrels;
    	xlrec.nsubxacts = nchildren;
    	rdata[0].data = (char *) (&xlrec);
    	rdata[0].len = MinSizeOfXactAbort;
    	rdata[0].buffer = InvalidBuffer;
    	/* dump rels to delete */
    	if (nrels > 0)
    	{
    		rdata[0].next = &(rdata[1]);
    		rdata[1].data = (char *) rels;
    		rdata[1].len = nrels * sizeof(RelFileNode);
    		rdata[1].buffer = InvalidBuffer;
    		lastrdata = 1;
    	}
    	/* dump committed child Xids */
    	if (nchildren > 0)
    	{
    		rdata[lastrdata].next = &(rdata[2]);
    		rdata[2].data = (char *) children;
    		rdata[2].len = nchildren * sizeof(TransactionId);
    		rdata[2].buffer = InvalidBuffer;
    		lastrdata = 2;
    	}
    	rdata[lastrdata].next = NULL;
    
    	(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, rdata);
    
    	/*
    	 * Report the latest async abort LSN, so that the WAL writer knows to
    	 * flush this abort. There's nothing to be gained by delaying this, since
    	 * WALWriter may as well do this when it can. This is important with
    	 * streaming replication because if we don't flush WAL regularly we will
    	 * find that large aborts leave us with a long backlog for when commits
    	 * occur after the abort, increasing our window of data loss should
    	 * problems occur at that point.
    	 */
    	if (!isSubXact)
    		XLogSetAsyncXactLSN(XactLastRecEnd);
    
    	/*
    	 * Mark the transaction aborted in clog.  This is not absolutely necessary
    	 * but we may as well do it while we are here; also, in the subxact case
    	 * it is helpful because XactLockTableWait makes use of it to avoid
    	 * waiting for already-aborted subtransactions.  It is OK to do it without
    	 * having flushed the ABORT record to disk, because in event of a crash
    	 * we'd be assumed to have aborted anyway.
    	 */
    	TransactionIdAbortTree(xid, nchildren, children);
    
    	END_CRIT_SECTION();
    
    	/* Compute latestXid while we have the child XIDs handy */
    	latestXid = TransactionIdLatest(xid, nchildren, children);
    
    	/*
    	 * If we're aborting a subtransaction, we can immediately remove failed
    	 * XIDs from PGPROC's cache of running child XIDs.  We do that here for
    	 * subxacts, because we already have the child XID array at hand.  For
    	 * main xacts, the equivalent happens just after this function returns.
    	 */
    	if (isSubXact)
    		XidCacheRemoveRunningXids(xid, nchildren, children, latestXid);
    
    	/* Reset XactLastRecEnd until the next transaction writes something */
    	if (!isSubXact)
    		XactLastRecEnd.xrecoff = 0;
    
    	/* And clean up local data */
    	if (rels)
    		pfree(rels);
    
    	return latestXid;
    }
    
    /*
     *	AtAbort_Memory
     */
    static void
    AtAbort_Memory(void)
    {
    	/*
    	 * Switch into TransactionAbortContext, which should have some free space
    	 * even if nothing else does.  We'll work in this context until we've
    	 * finished cleaning up.
    	 *
    	 * It is barely possible to get here when we've not been able to create
    	 * TransactionAbortContext yet; if so use TopMemoryContext.
    	 */
    	if (TransactionAbortContext != NULL)
    		MemoryContextSwitchTo(TransactionAbortContext);
    	else
    		MemoryContextSwitchTo(TopMemoryContext);
    }
    
    /*
     * AtSubAbort_Memory
     */
    static void
    AtSubAbort_Memory(void)
    {
    	Assert(TransactionAbortContext != NULL);
    
    	MemoryContextSwitchTo(TransactionAbortContext);
    }
    
    
    /*
     *	AtAbort_ResourceOwner
     */
    static void
    AtAbort_ResourceOwner(void)
    {
    	/*
    	 * Make sure we have a valid ResourceOwner, if possible (else it will be
    	 * NULL, which is OK)
    	 */
    	CurrentResourceOwner = TopTransactionResourceOwner;
    }
    
    /*
     * AtSubAbort_ResourceOwner
     */
    static void
    AtSubAbort_ResourceOwner(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	/* Make sure we have a valid ResourceOwner */
    	CurrentResourceOwner = s->curTransactionOwner;
    }
    
    
    /*
     * AtSubAbort_childXids
     */
    static void
    AtSubAbort_childXids(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	/*
    	 * We keep the child-XID arrays in TopTransactionContext (see
    	 * AtSubCommit_childXids).	This means we'd better free the array
    	 * explicitly at abort to avoid leakage.
    	 */
    	if (s->childXids != NULL)
    		pfree(s->childXids);
    	s->childXids = NULL;
    	s->nChildXids = 0;
    	s->maxChildXids = 0;
    
    	/*
    	 * We could prune the unreportedXids array here. But we don't bother. That
    	 * would potentially reduce number of XLOG_XACT_ASSIGNMENT records but it
    	 * would likely introduce more CPU time into the more common paths, so we
    	 * choose not to do that.
    	 */
    }
    
    /* ----------------------------------------------------------------
     *						CleanupTransaction stuff
     * ----------------------------------------------------------------
     */
    
    /*
     *	AtCleanup_Memory
     */
    static void
    AtCleanup_Memory(void)
    {
    	Assert(CurrentTransactionState->parent == NULL);
    
    	/*
    	 * Now that we're "out" of a transaction, have the system allocate things
    	 * in the top memory context instead of per-transaction contexts.
    	 */
    	MemoryContextSwitchTo(TopMemoryContext);
    
    	/*
    	 * Clear the special abort context for next time.
    	 */
    	if (TransactionAbortContext != NULL)
    		MemoryContextResetAndDeleteChildren(TransactionAbortContext);
    
    	/*
    	 * Release all transaction-local memory.
    	 */
    	if (TopTransactionContext != NULL)
    		MemoryContextDelete(TopTransactionContext);
    	TopTransactionContext = NULL;
    	CurTransactionContext = NULL;
    	CurrentTransactionState->curTransactionContext = NULL;
    }
    
    
    /* ----------------------------------------------------------------
     *						CleanupSubTransaction stuff
     * ----------------------------------------------------------------
     */
    
    /*
     * AtSubCleanup_Memory
     */
    static void
    AtSubCleanup_Memory(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	Assert(s->parent != NULL);
    
    	/* Make sure we're not in an about-to-be-deleted context */
    	MemoryContextSwitchTo(s->parent->curTransactionContext);
    	CurTransactionContext = s->parent->curTransactionContext;
    
    	/*
    	 * Clear the special abort context for next time.
    	 */
    	if (TransactionAbortContext != NULL)
    		MemoryContextResetAndDeleteChildren(TransactionAbortContext);
    
    	/*
    	 * Delete the subxact local memory contexts. Its CurTransactionContext can
    	 * go too (note this also kills CurTransactionContexts from any children
    	 * of the subxact).
    	 */
    	if (s->curTransactionContext)
    		MemoryContextDelete(s->curTransactionContext);
    	s->curTransactionContext = NULL;
    }
    
    /* ----------------------------------------------------------------
     *						interface routines
     * ----------------------------------------------------------------
     */
    
    /*
     *	StartTransaction
     */
    static void
    StartTransaction(void)
    {
    	TransactionState s;
    	VirtualTransactionId vxid;
    
    	/*
    	 * Let's just make sure the state stack is empty
    	 */
    	s = &TopTransactionStateData;
    	CurrentTransactionState = s;
    
    	/*
    	 * check the current transaction state
    	 */
    	if (s->state != TRANS_DEFAULT)
    		elog(WARNING, "StartTransaction while in %s state",
    			 TransStateAsString(s->state));
    
    	/*
    	 * set the current transaction state information appropriately during
    	 * start processing
    	 */
    	s->state = TRANS_START;
    	s->transactionId = InvalidTransactionId;	/* until assigned */
    
    	/*
    	 * Make sure we've reset xact state variables
    	 *
    	 * If recovery is still in progress, mark this transaction as read-only.
    	 * We have lower level defences in XLogInsert and elsewhere to stop us
    	 * from modifying data during recovery, but this gives the normal
    	 * indication to the user that the transaction is read-only.
    	 */
    	if (RecoveryInProgress())
    	{
    		s->startedInRecovery = true;
    		XactReadOnly = true;
    	}
    	else
    	{
    		s->startedInRecovery = false;
    		XactReadOnly = DefaultXactReadOnly;
    	}
    	XactDeferrable = DefaultXactDeferrable;
    	XactIsoLevel = DefaultXactIsoLevel;
    	forceSyncCommit = false;
    	MyXactAccessedTempRel = false;
    
    	/*
    	 * reinitialize within-transaction counters
    	 */
    	s->subTransactionId = TopSubTransactionId;
    	currentSubTransactionId = TopSubTransactionId;
    	currentCommandId = FirstCommandId;
    	currentCommandIdUsed = false;
    
    	/*
    	 * initialize reported xid accounting
    	 */
    	nUnreportedXids = 0;
    
    	/*
    	 * must initialize resource-management stuff first
    	 */
    	AtStart_Memory();
    	AtStart_ResourceOwner();
    
    	/*
    	 * Assign a new LocalTransactionId, and combine it with the backendId to
    	 * form a virtual transaction id.
    	 */
    	vxid.backendId = MyBackendId;
    	vxid.localTransactionId = GetNextLocalTransactionId();
    
    	/*
    	 * Lock the virtual transaction id before we announce it in the proc array
    	 */
    	VirtualXactLockTableInsert(vxid);
    
    	/*
    	 * Advertise it in the proc array.	We assume assignment of
    	 * LocalTransactionID is atomic, and the backendId should be set already.
    	 */
    	Assert(MyProc->backendId == vxid.backendId);
    	MyProc->lxid = vxid.localTransactionId;
    
    	TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId);
    
    	/*
    	 * set transaction_timestamp() (a/k/a now()).  We want this to be the same
    	 * as the first command's statement_timestamp(), so don't do a fresh
    	 * GetCurrentTimestamp() call (which'd be expensive anyway).  Also, mark
    	 * xactStopTimestamp as unset.
    	 */
    	xactStartTimestamp = stmtStartTimestamp;
    	xactStopTimestamp = 0;
    	pgstat_report_xact_timestamp(xactStartTimestamp);
    
    	/*
    	 * initialize current transaction state fields
    	 *
    	 * note: prevXactReadOnly is not used at the outermost level
    	 */
    	s->nestingLevel = 1;
    	s->gucNestLevel = 1;
    	s->childXids = NULL;
    	s->nChildXids = 0;
    	s->maxChildXids = 0;
    	GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
    	/* SecurityRestrictionContext should never be set outside a transaction */
    	Assert(s->prevSecContext == 0);
    
    	/*
    	 * initialize other subsystems for new transaction
    	 */
    	AtStart_GUC();
    	AtStart_Inval();
    	AtStart_Cache();
    	AfterTriggerBeginXact();
    
    	/*
    	 * done with start processing, set current transaction state to "in
    	 * progress"
    	 */
    	s->state = TRANS_INPROGRESS;
    
    	ShowTransactionState("StartTransaction");
    }
    
    
    /*
     *	CommitTransaction
     *
     * NB: if you change this routine, better look at PrepareTransaction too!
     */
    static void
    CommitTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    	TransactionId latestXid;
    
    	ShowTransactionState("CommitTransaction");
    
    	/*
    	 * check the current transaction state
    	 */
    	if (s->state != TRANS_INPROGRESS)
    		elog(WARNING, "CommitTransaction while in %s state",
    			 TransStateAsString(s->state));
    	Assert(s->parent == NULL);
    
    	/*
    	 * Do pre-commit processing that involves calling user-defined code, such
    	 * as triggers.  Since closing cursors could queue trigger actions,
    	 * triggers could open cursors, etc, we have to keep looping until there's
    	 * nothing left to do.
    	 */
    	for (;;)
    	{
    		/*
    		 * Fire all currently pending deferred triggers.
    		 */
    		AfterTriggerFireDeferred();
    
    		/*
    		 * Close open portals (converting holdable ones into static portals).
    		 * If there weren't any, we are done ... otherwise loop back to check
    		 * if they queued deferred triggers.  Lather, rinse, repeat.
    		 */
    		if (!PreCommit_Portals(false))
    			break;
    	}
    
    	/*
    	 * The remaining actions cannot call any user-defined code, so it's safe
    	 * to start shutting down within-transaction services.	But note that most
    	 * of this stuff could still throw an error, which would switch us into
    	 * the transaction-abort path.
    	 */
    
    	/* Shut down the deferred-trigger manager */
    	AfterTriggerEndXact(true);
    
    	/*
    	 * Let ON COMMIT management do its thing (must happen after closing
    	 * cursors, to avoid dangling-reference problems)
    	 */
    	PreCommit_on_commit_actions();
    
    	/* close large objects before lower-level cleanup */
    	AtEOXact_LargeObject(true);
    
    	/*
    	 * Mark serializable transaction as complete for predicate locking
    	 * purposes.  This should be done as late as we can put it and still allow
    	 * errors to be raised for failure patterns found at commit.
    	 */
    	PreCommit_CheckForSerializationFailure();
    
    	/*
    	 * Insert notifications sent by NOTIFY commands into the queue.  This
    	 * should be late in the pre-commit sequence to minimize time spent
    	 * holding the notify-insertion lock.
    	 */
    	PreCommit_Notify();
    
    	/* Prevent cancel/die interrupt while cleaning up */
    	HOLD_INTERRUPTS();
    
    	/* Commit updates to the relation map --- do this as late as possible */
    	AtEOXact_RelationMap(true);
    
    	/*
    	 * set the current transaction state information appropriately during
    	 * commit processing
    	 */
    	s->state = TRANS_COMMIT;
    
    	/*
    	 * Here is where we really truly commit.
    	 */
    	latestXid = RecordTransactionCommit();
    
    	TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
    
    	/*
    	 * Let others know about no transaction in progress by me. Note that this
    	 * must be done _before_ releasing locks we hold and _after_
    	 * RecordTransactionCommit.
    	 */
    	ProcArrayEndTransaction(MyProc, latestXid);
    
    	/*
    	 * This is all post-commit cleanup.  Note that if an error is raised here,
    	 * it's too late to abort the transaction.  This should be just
    	 * noncritical resource releasing.
    	 *
    	 * The ordering of operations is not entirely random.  The idea is:
    	 * release resources visible to other backends (eg, files, buffer pins);
    	 * then release locks; then release backend-local resources. We want to
    	 * release locks at the point where any backend waiting for us will see
    	 * our transaction as being fully cleaned up.
    	 *
    	 * Resources that can be associated with individual queries are handled by
    	 * the ResourceOwner mechanism.  The other calls here are for backend-wide
    	 * state.
    	 */
    
    	CallXactCallbacks(XACT_EVENT_COMMIT);
    
    	ResourceOwnerRelease(TopTransactionResourceOwner,
    						 RESOURCE_RELEASE_BEFORE_LOCKS,
    						 true, true);
    
    	/* Check we've released all buffer pins */
    	AtEOXact_Buffers(true);
    
    	/* Clean up the relation cache */
    	AtEOXact_RelationCache(true);
    
    	/*
    	 * Make catalog changes visible to all backends.  This has to happen after
    	 * relcache references are dropped (see comments for
    	 * AtEOXact_RelationCache), but before locks are released (if anyone is
    	 * waiting for lock on a relation we've modified, we want them to know
    	 * about the catalog change before they start using the relation).
    	 */
    	AtEOXact_Inval(true);
    
    	/*
    	 * Likewise, dropping of files deleted during the transaction is best done
    	 * after releasing relcache and buffer pins.  (This is not strictly
    	 * necessary during commit, since such pins should have been released
    	 * already, but this ordering is definitely critical during abort.)
    	 */
    	smgrDoPendingDeletes(true);
    
    	AtEOXact_MultiXact();
    
    	ResourceOwnerRelease(TopTransactionResourceOwner,
    						 RESOURCE_RELEASE_LOCKS,
    						 true, true);
    	ResourceOwnerRelease(TopTransactionResourceOwner,
    						 RESOURCE_RELEASE_AFTER_LOCKS,
    						 true, true);
    
    	/* Check we've released all catcache entries */
    	AtEOXact_CatCache(true);
    
    	AtCommit_Notify();
    	AtEOXact_GUC(true, 1);
    	AtEOXact_SPI(true);
    	AtEOXact_on_commit_actions(true);
    	AtEOXact_Namespace(true);
    	/* smgrcommit already done */
    	AtEOXact_Files();
    	AtEOXact_ComboCid();
    	AtEOXact_HashTables(true);
    	AtEOXact_PgStat(true);
    	AtEOXact_Snapshot(true);
    	pgstat_report_xact_timestamp(0);
    
    	CurrentResourceOwner = NULL;
    	ResourceOwnerDelete(TopTransactionResourceOwner);
    	s->curTransactionOwner = NULL;
    	CurTransactionResourceOwner = NULL;
    	TopTransactionResourceOwner = NULL;
    
    	AtCommit_Memory();
    
    	s->transactionId = InvalidTransactionId;
    	s->subTransactionId = InvalidSubTransactionId;
    	s->nestingLevel = 0;
    	s->gucNestLevel = 0;
    	s->childXids = NULL;
    	s->nChildXids = 0;
    	s->maxChildXids = 0;
    
    	/*
    	 * done with commit processing, set current transaction state back to
    	 * default
    	 */
    	s->state = TRANS_DEFAULT;
    
    	RESUME_INTERRUPTS();
    }
    
    
    /*
     *	PrepareTransaction
     *
     * NB: if you change this routine, better look at CommitTransaction too!
     */
    static void
    PrepareTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    	TransactionId xid = GetCurrentTransactionId();
    	GlobalTransaction gxact;
    	TimestampTz prepared_at;
    
    	ShowTransactionState("PrepareTransaction");
    
    	/*
    	 * check the current transaction state
    	 */
    	if (s->state != TRANS_INPROGRESS)
    		elog(WARNING, "PrepareTransaction while in %s state",
    			 TransStateAsString(s->state));
    	Assert(s->parent == NULL);
    
    	/*
    	 * Do pre-commit processing that involves calling user-defined code, such
    	 * as triggers.  Since closing cursors could queue trigger actions,
    	 * triggers could open cursors, etc, we have to keep looping until there's
    	 * nothing left to do.
    	 */
    	for (;;)
    	{
    		/*
    		 * Fire all currently pending deferred triggers.
    		 */
    		AfterTriggerFireDeferred();
    
    		/*
    		 * Close open portals (converting holdable ones into static portals).
    		 * If there weren't any, we are done ... otherwise loop back to check
    		 * if they queued deferred triggers.  Lather, rinse, repeat.
    		 */
    		if (!PreCommit_Portals(true))
    			break;
    	}
    
    	/*
    	 * The remaining actions cannot call any user-defined code, so it's safe
    	 * to start shutting down within-transaction services.	But note that most
    	 * of this stuff could still throw an error, which would switch us into
    	 * the transaction-abort path.
    	 */
    
    	/* Shut down the deferred-trigger manager */
    	AfterTriggerEndXact(true);
    
    	/*
    	 * Let ON COMMIT management do its thing (must happen after closing
    	 * cursors, to avoid dangling-reference problems)
    	 */
    	PreCommit_on_commit_actions();
    
    	/* close large objects before lower-level cleanup */
    	AtEOXact_LargeObject(true);
    
    	/*
    	 * Mark serializable transaction as complete for predicate locking
    	 * purposes.  This should be done as late as we can put it and still allow
    	 * errors to be raised for failure patterns found at commit.
    	 */
    	PreCommit_CheckForSerializationFailure();
    
    	/* NOTIFY will be handled below */
    
    	/*
    	 * Don't allow PREPARE TRANSACTION if we've accessed a temporary table in
    	 * this transaction.  Having the prepared xact hold locks on another
    	 * backend's temp table seems a bad idea --- for instance it would prevent
    	 * the backend from exiting.  There are other problems too, such as how to
    	 * clean up the source backend's local buffers and ON COMMIT state if the
    	 * prepared xact includes a DROP of a temp table.
    	 *
    	 * We must check this after executing any ON COMMIT actions, because they
    	 * might still access a temp relation.
    	 *
    	 * XXX In principle this could be relaxed to allow some useful special
    	 * cases, such as a temp table created and dropped all within the
    	 * transaction.  That seems to require much more bookkeeping though.
    	 */
    	if (MyXactAccessedTempRel)
    		ereport(ERROR,
    				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    				 errmsg("cannot PREPARE a transaction that has operated on temporary tables")));
    
    	/*
    	 * Likewise, don't allow PREPARE after pg_export_snapshot.  This could be
    	 * supported if we added cleanup logic to twophase.c, but for now it
    	 * doesn't seem worth the trouble.
    	 */
    	if (XactHasExportedSnapshots())
    		ereport(ERROR,
    				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    				 errmsg("cannot PREPARE a transaction that has exported snapshots")));
    
    	/* Prevent cancel/die interrupt while cleaning up */
    	HOLD_INTERRUPTS();
    
    	/*
    	 * set the current transaction state information appropriately during
    	 * prepare processing
    	 */
    	s->state = TRANS_PREPARE;
    
    	prepared_at = GetCurrentTimestamp();
    
    	/* Tell bufmgr and smgr to prepare for commit */
    	BufmgrCommit();
    
    	/*
    	 * Reserve the GID for this transaction. This could fail if the requested
    	 * GID is invalid or already in use.
    	 */
    	gxact = MarkAsPreparing(xid, prepareGID, prepared_at,
    							GetUserId(), MyDatabaseId);
    	prepareGID = NULL;
    
    	/*
    	 * Collect data for the 2PC state file.  Note that in general, no actual
    	 * state change should happen in the called modules during this step,
    	 * since it's still possible to fail before commit, and in that case we
    	 * want transaction abort to be able to clean up.  (In particular, the
    	 * AtPrepare routines may error out if they find cases they cannot
    	 * handle.)  State cleanup should happen in the PostPrepare routines
    	 * below.  However, some modules can go ahead and clear state here because
    	 * they wouldn't do anything with it during abort anyway.
    	 *
    	 * Note: because the 2PC state file records will be replayed in the same
    	 * order they are made, the order of these calls has to match the order in
    	 * which we want things to happen during COMMIT PREPARED or ROLLBACK
    	 * PREPARED; in particular, pay attention to whether things should happen
    	 * before or after releasing the transaction's locks.
    	 */
    	StartPrepare(gxact);
    
    	AtPrepare_Notify();
    	AtPrepare_Locks();
    	AtPrepare_PredicateLocks();
    	AtPrepare_PgStat();
    	AtPrepare_MultiXact();
    	AtPrepare_RelationMap();
    
    	/*
    	 * Here is where we really truly prepare.
    	 *
    	 * We have to record transaction prepares even if we didn't make any
    	 * updates, because the transaction manager might get confused if we lose
    	 * a global transaction.
    	 */
    	EndPrepare(gxact);
    
    	/*
    	 * Now we clean up backend-internal state and release internal resources.
    	 */
    
    	/* Reset XactLastRecEnd until the next transaction writes something */
    	XactLastRecEnd.xrecoff = 0;
    
    	/*
    	 * Let others know about no transaction in progress by me.	This has to be
    	 * done *after* the prepared transaction has been marked valid, else
    	 * someone may think it is unlocked and recyclable.
    	 */
    	ProcArrayClearTransaction(MyProc);
    
    	/*
    	 * This is all post-transaction cleanup.  Note that if an error is raised
    	 * here, it's too late to abort the transaction.  This should be just
    	 * noncritical resource releasing.	See notes in CommitTransaction.
    	 */
    
    	CallXactCallbacks(XACT_EVENT_PREPARE);
    
    	ResourceOwnerRelease(TopTransactionResourceOwner,
    						 RESOURCE_RELEASE_BEFORE_LOCKS,
    						 true, true);
    
    	/* Check we've released all buffer pins */
    	AtEOXact_Buffers(true);
    
    	/* Clean up the relation cache */
    	AtEOXact_RelationCache(true);
    
    	/* notify doesn't need a postprepare call */
    
    	PostPrepare_PgStat();
    
    	PostPrepare_Inval();
    
    	PostPrepare_smgr();
    
    	PostPrepare_MultiXact(xid);
    
    	PostPrepare_Locks(xid);
    	PostPrepare_PredicateLocks(xid);
    
    	ResourceOwnerRelease(TopTransactionResourceOwner,
    						 RESOURCE_RELEASE_LOCKS,
    						 true, true);
    	ResourceOwnerRelease(TopTransactionResourceOwner,
    						 RESOURCE_RELEASE_AFTER_LOCKS,
    						 true, true);
    
    	/* Check we've released all catcache entries */
    	AtEOXact_CatCache(true);
    
    	/* PREPARE acts the same as COMMIT as far as GUC is concerned */
    	AtEOXact_GUC(true, 1);
    	AtEOXact_SPI(true);
    	AtEOXact_on_commit_actions(true);
    	AtEOXact_Namespace(true);
    	/* smgrcommit already done */
    	AtEOXact_Files();
    	AtEOXact_ComboCid();
    	AtEOXact_HashTables(true);
    	/* don't call AtEOXact_PgStat here */
    	AtEOXact_Snapshot(true);
    
    	CurrentResourceOwner = NULL;
    	ResourceOwnerDelete(TopTransactionResourceOwner);
    	s->curTransactionOwner = NULL;
    	CurTransactionResourceOwner = NULL;
    	TopTransactionResourceOwner = NULL;
    
    	AtCommit_Memory();
    
    	s->transactionId = InvalidTransactionId;
    	s->subTransactionId = InvalidSubTransactionId;
    	s->nestingLevel = 0;
    	s->gucNestLevel = 0;
    	s->childXids = NULL;
    	s->nChildXids = 0;
    	s->maxChildXids = 0;
    
    	/*
    	 * done with 1st phase commit processing, set current transaction state
    	 * back to default
    	 */
    	s->state = TRANS_DEFAULT;
    
    	RESUME_INTERRUPTS();
    }
    
    
    /*
     *	AbortTransaction
     */
    static void
    AbortTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    	TransactionId latestXid;
    
    	/* Prevent cancel/die interrupt while cleaning up */
    	HOLD_INTERRUPTS();
    
    	/* Make sure we have a valid memory context and resource owner */
    	AtAbort_Memory();
    	AtAbort_ResourceOwner();
    
    	/*
    	 * Release any LW locks we might be holding as quickly as possible.
    	 * (Regular locks, however, must be held till we finish aborting.)
    	 * Releasing LW locks is critical since we might try to grab them again
    	 * while cleaning up!
    	 */
    	LWLockReleaseAll();
    
    	/* Clean up buffer I/O and buffer context locks, too */
    	AbortBufferIO();
    	UnlockBuffers();
    
    	/*
    	 * Also clean up any open wait for lock, since the lock manager will choke
    	 * if we try to wait for another lock before doing this.
    	 */
    	LockErrorCleanup();
    
    	/*
    	 * check the current transaction state
    	 */
    	if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE)
    		elog(WARNING, "AbortTransaction while in %s state",
    			 TransStateAsString(s->state));
    	Assert(s->parent == NULL);
    
    	/*
    	 * set the current transaction state information appropriately during the
    	 * abort processing
    	 */
    	s->state = TRANS_ABORT;
    
    	/*
    	 * Reset user ID which might have been changed transiently.  We need this
    	 * to clean up in case control escaped out of a SECURITY DEFINER function
    	 * or other local change of CurrentUserId; therefore, the prior value of
    	 * SecurityRestrictionContext also needs to be restored.
    	 *
    	 * (Note: it is not necessary to restore session authorization or role
    	 * settings here because those can only be changed via GUC, and GUC will
    	 * take care of rolling them back if need be.)
    	 */
    	SetUserIdAndSecContext(s->prevUser, s->prevSecContext);
    
    	/*
    	 * do abort processing
    	 */
    	AfterTriggerEndXact(false); /* 'false' means it's abort */
    	AtAbort_Portals();
    	AtEOXact_LargeObject(false);
    	AtAbort_Notify();
    	AtEOXact_RelationMap(false);
    
    	/*
    	 * Advertise the fact that we aborted in pg_clog (assuming that we got as
    	 * far as assigning an XID to advertise).
    	 */
    	latestXid = RecordTransactionAbort(false);
    
    	TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
    
    	/*
    	 * Let others know about no transaction in progress by me. Note that this
    	 * must be done _before_ releasing locks we hold and _after_
    	 * RecordTransactionAbort.
    	 */
    	ProcArrayEndTransaction(MyProc, latestXid);
    
    	/*
    	 * Post-abort cleanup.	See notes in CommitTransaction() concerning
    	 * ordering.  We can skip all of it if the transaction failed before
    	 * creating a resource owner.
    	 */
    	if (TopTransactionResourceOwner != NULL)
    	{
    		CallXactCallbacks(XACT_EVENT_ABORT);
    
    		ResourceOwnerRelease(TopTransactionResourceOwner,
    							 RESOURCE_RELEASE_BEFORE_LOCKS,
    							 false, true);
    		AtEOXact_Buffers(false);
    		AtEOXact_RelationCache(false);
    		AtEOXact_Inval(false);
    		smgrDoPendingDeletes(false);
    		AtEOXact_MultiXact();
    		ResourceOwnerRelease(TopTransactionResourceOwner,
    							 RESOURCE_RELEASE_LOCKS,
    							 false, true);
    		ResourceOwnerRelease(TopTransactionResourceOwner,
    							 RESOURCE_RELEASE_AFTER_LOCKS,
    							 false, true);
    		AtEOXact_CatCache(false);
    
    		AtEOXact_GUC(false, 1);
    		AtEOXact_SPI(false);
    		AtEOXact_on_commit_actions(false);
    		AtEOXact_Namespace(false);
    		AtEOXact_Files();
    		AtEOXact_ComboCid();
    		AtEOXact_HashTables(false);
    		AtEOXact_PgStat(false);
    		pgstat_report_xact_timestamp(0);
    	}
    
    	/*
    	 * State remains TRANS_ABORT until CleanupTransaction().
    	 */
    	RESUME_INTERRUPTS();
    }
    
    /*
     *	CleanupTransaction
     */
    static void
    CleanupTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	/*
    	 * State should still be TRANS_ABORT from AbortTransaction().
    	 */
    	if (s->state != TRANS_ABORT)
    		elog(FATAL, "CleanupTransaction: unexpected state %s",
    			 TransStateAsString(s->state));
    
    	/*
    	 * do abort cleanup processing
    	 */
    	AtCleanup_Portals();		/* now safe to release portal memory */
    	AtEOXact_Snapshot(false);	/* and release the transaction's snapshots */
    
    	CurrentResourceOwner = NULL;	/* and resource owner */
    	if (TopTransactionResourceOwner)
    		ResourceOwnerDelete(TopTransactionResourceOwner);
    	s->curTransactionOwner = NULL;
    	CurTransactionResourceOwner = NULL;
    	TopTransactionResourceOwner = NULL;
    
    	AtCleanup_Memory();			/* and transaction memory */
    
    	s->transactionId = InvalidTransactionId;
    	s->subTransactionId = InvalidSubTransactionId;
    	s->nestingLevel = 0;
    	s->gucNestLevel = 0;
    	s->childXids = NULL;
    	s->nChildXids = 0;
    	s->maxChildXids = 0;
    
    	/*
    	 * done with abort processing, set current transaction state back to
    	 * default
    	 */
    	s->state = TRANS_DEFAULT;
    }
    
    /*
     *	StartTransactionCommand
     */
    void
    StartTransactionCommand(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	switch (s->blockState)
    	{
    			/*
    			 * if we aren't in a transaction block, we just do our usual start
    			 * transaction.
    			 */
    		case TBLOCK_DEFAULT:
    			StartTransaction();
    			s->blockState = TBLOCK_STARTED;
    			break;
    
    			/*
    			 * We are somewhere in a transaction block or subtransaction and
    			 * about to start a new command.  For now we do nothing, but
    			 * someday we may do command-local resource initialization. (Note
    			 * that any needed CommandCounterIncrement was done by the
    			 * previous CommitTransactionCommand.)
    			 */
    		case TBLOCK_INPROGRESS:
    		case TBLOCK_SUBINPROGRESS:
    			break;
    
    			/*
    			 * Here we are in a failed transaction block (one of the commands
    			 * caused an abort) so we do nothing but remain in the abort
    			 * state.  Eventually we will get a ROLLBACK command which will
    			 * get us out of this state.  (It is up to other code to ensure
    			 * that no commands other than ROLLBACK will be processed in these
    			 * states.)
    			 */
    		case TBLOCK_ABORT:
    		case TBLOCK_SUBABORT:
    			break;
    
    			/* These cases are invalid. */
    		case TBLOCK_STARTED:
    		case TBLOCK_BEGIN:
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_END:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_ABORT_END:
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_ABORT_PENDING:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    		case TBLOCK_SUBABORT_RESTART:
    		case TBLOCK_PREPARE:
    			elog(ERROR, "StartTransactionCommand: unexpected state %s",
    				 BlockStateAsString(s->blockState));
    			break;
    	}
    
    	/*
    	 * We must switch to CurTransactionContext before returning. This is
    	 * already done if we called StartTransaction, otherwise not.
    	 */
    	Assert(CurTransactionContext != NULL);
    	MemoryContextSwitchTo(CurTransactionContext);
    }
    
    /*
     *	CommitTransactionCommand
     */
    void
    CommitTransactionCommand(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	switch (s->blockState)
    	{
    			/*
    			 * This shouldn't happen, because it means the previous
    			 * StartTransactionCommand didn't set the STARTED state
    			 * appropriately.
    			 */
    		case TBLOCK_DEFAULT:
    			elog(FATAL, "CommitTransactionCommand: unexpected state %s",
    				 BlockStateAsString(s->blockState));
    			break;
    
    			/*
    			 * If we aren't in a transaction block, just do our usual
    			 * transaction commit, and return to the idle state.
    			 */
    		case TBLOCK_STARTED:
    			CommitTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * We are completing a "BEGIN TRANSACTION" command, so we change
    			 * to the "transaction block in progress" state and return.  (We
    			 * assume the BEGIN did nothing to the database, so we need no
    			 * CommandCounterIncrement.)
    			 */
    		case TBLOCK_BEGIN:
    			s->blockState = TBLOCK_INPROGRESS;
    			break;
    
    			/*
    			 * This is the case when we have finished executing a command
    			 * someplace within a transaction block.  We increment the command
    			 * counter and return.
    			 */
    		case TBLOCK_INPROGRESS:
    		case TBLOCK_SUBINPROGRESS:
    			CommandCounterIncrement();
    			break;
    
    			/*
    			 * We are completing a "COMMIT" command.  Do it and return to the
    			 * idle state.
    			 */
    		case TBLOCK_END:
    			CommitTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * Here we are in the middle of a transaction block but one of the
    			 * commands caused an abort so we do nothing but remain in the
    			 * abort state.  Eventually we will get a ROLLBACK comand.
    			 */
    		case TBLOCK_ABORT:
    		case TBLOCK_SUBABORT:
    			break;
    
    			/*
    			 * Here we were in an aborted transaction block and we just got
    			 * the ROLLBACK command from the user, so clean up the
    			 * already-aborted transaction and return to the idle state.
    			 */
    		case TBLOCK_ABORT_END:
    			CleanupTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * Here we were in a perfectly good transaction block but the user
    			 * told us to ROLLBACK anyway.	We have to abort the transaction
    			 * and then clean up.
    			 */
    		case TBLOCK_ABORT_PENDING:
    			AbortTransaction();
    			CleanupTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * We are completing a "PREPARE TRANSACTION" command.  Do it and
    			 * return to the idle state.
    			 */
    		case TBLOCK_PREPARE:
    			PrepareTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * We were just issued a SAVEPOINT inside a transaction block.
    			 * Start a subtransaction.	(DefineSavepoint already did
    			 * PushTransaction, so as to have someplace to put the SUBBEGIN
    			 * state.)
    			 */
    		case TBLOCK_SUBBEGIN:
    			StartSubTransaction();
    			s->blockState = TBLOCK_SUBINPROGRESS;
    			break;
    
    			/*
    			 * We were issued a RELEASE command, so we end the
    			 * current subtransaction and return to the parent transaction.
    			 * The parent might be ended too, so repeat till we find an
    			 * INPROGRESS transaction or subtransaction.
    			 */
    		case TBLOCK_SUBRELEASE:
    			do
    			{
    				CommitSubTransaction();
    				s = CurrentTransactionState;	/* changed by pop */
    			} while (s->blockState == TBLOCK_SUBRELEASE);
    
    			Assert(s->blockState == TBLOCK_INPROGRESS ||
    				   s->blockState == TBLOCK_SUBINPROGRESS);
    			break;
    
    			/*
    			 * We were issued a COMMIT, so we end the current subtransaction
    			 * hierarchy and perform final commit. We do this by rolling up
    			 * any subtransactions into their parent, which leads to O(N^2)
    			 * operations with respect to resource owners - this isn't that
    			 * bad until we approach a thousands of savepoints but is necessary
    			 * for correctness should after triggers create new resource
    			 * owners.
    			 */
    		case TBLOCK_SUBCOMMIT:
    			do
    			{
    				CommitSubTransaction();
    				s = CurrentTransactionState;	/* changed by pop */
    			} while (s->blockState == TBLOCK_SUBCOMMIT);
    			/* If we had a COMMIT command, finish off the main xact too */
    			if (s->blockState == TBLOCK_END)
    			{
    				Assert(s->parent == NULL);
    				CommitTransaction();
    				s->blockState = TBLOCK_DEFAULT;
    			}
    			else if (s->blockState == TBLOCK_PREPARE)
    			{
    				Assert(s->parent == NULL);
    				PrepareTransaction();
    				s->blockState = TBLOCK_DEFAULT;
    			}
    			else
    				elog(ERROR, "CommitTransactionCommand: unexpected state %s",
    					 BlockStateAsString(s->blockState));
    			break;
    
    			/*
    			 * The current already-failed subtransaction is ending due to a
    			 * ROLLBACK or ROLLBACK TO command, so pop it and recursively
    			 * examine the parent (which could be in any of several states).
    			 */
    		case TBLOCK_SUBABORT_END:
    			CleanupSubTransaction();
    			CommitTransactionCommand();
    			break;
    
    			/*
    			 * As above, but it's not dead yet, so abort first.
    			 */
    		case TBLOCK_SUBABORT_PENDING:
    			AbortSubTransaction();
    			CleanupSubTransaction();
    			CommitTransactionCommand();
    			break;
    
    			/*
    			 * The current subtransaction is the target of a ROLLBACK TO
    			 * command.  Abort and pop it, then start a new subtransaction
    			 * with the same name.
    			 */
    		case TBLOCK_SUBRESTART:
    			{
    				char	   *name;
    				int			savepointLevel;
    
    				/* save name and keep Cleanup from freeing it */
    				name = s->name;
    				s->name = NULL;
    				savepointLevel = s->savepointLevel;
    
    				AbortSubTransaction();
    				CleanupSubTransaction();
    
    				DefineSavepoint(NULL);
    				s = CurrentTransactionState;	/* changed by push */
    				s->name = name;
    				s->savepointLevel = savepointLevel;
    
    				/* This is the same as TBLOCK_SUBBEGIN case */
    				AssertState(s->blockState == TBLOCK_SUBBEGIN);
    				StartSubTransaction();
    				s->blockState = TBLOCK_SUBINPROGRESS;
    			}
    			break;
    
    			/*
    			 * Same as above, but the subtransaction had already failed, so we
    			 * don't need AbortSubTransaction.
    			 */
    		case TBLOCK_SUBABORT_RESTART:
    			{
    				char	   *name;
    				int			savepointLevel;
    
    				/* save name and keep Cleanup from freeing it */
    				name = s->name;
    				s->name = NULL;
    				savepointLevel = s->savepointLevel;
    
    				CleanupSubTransaction();
    
    				DefineSavepoint(NULL);
    				s = CurrentTransactionState;	/* changed by push */
    				s->name = name;
    				s->savepointLevel = savepointLevel;
    
    				/* This is the same as TBLOCK_SUBBEGIN case */
    				AssertState(s->blockState == TBLOCK_SUBBEGIN);
    				StartSubTransaction();
    				s->blockState = TBLOCK_SUBINPROGRESS;
    			}
    			break;
    	}
    }
    
    /*
     *	AbortCurrentTransaction
     */
    void
    AbortCurrentTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	switch (s->blockState)
    	{
    		case TBLOCK_DEFAULT:
    			if (s->state == TRANS_DEFAULT)
    			{
    				/* we are idle, so nothing to do */
    			}
    			else
    			{
    				/*
    				 * We can get here after an error during transaction start
    				 * (state will be TRANS_START).  Need to clean up the
    				 * incompletely started transaction.  First, adjust the
    				 * low-level state to suppress warning message from
    				 * AbortTransaction.
    				 */
    				if (s->state == TRANS_START)
    					s->state = TRANS_INPROGRESS;
    				AbortTransaction();
    				CleanupTransaction();
    			}
    			break;
    
    			/*
    			 * if we aren't in a transaction block, we just do the basic abort
    			 * & cleanup transaction.
    			 */
    		case TBLOCK_STARTED:
    			AbortTransaction();
    			CleanupTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * If we are in TBLOCK_BEGIN it means something screwed up right
    			 * after reading "BEGIN TRANSACTION".  We assume that the user
    			 * will interpret the error as meaning the BEGIN failed to get him
    			 * into a transaction block, so we should abort and return to idle
    			 * state.
    			 */
    		case TBLOCK_BEGIN:
    			AbortTransaction();
    			CleanupTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * We are somewhere in a transaction block and we've gotten a
    			 * failure, so we abort the transaction and set up the persistent
    			 * ABORT state.  We will stay in ABORT until we get a ROLLBACK.
    			 */
    		case TBLOCK_INPROGRESS:
    			AbortTransaction();
    			s->blockState = TBLOCK_ABORT;
    			/* CleanupTransaction happens when we exit TBLOCK_ABORT_END */
    			break;
    
    			/*
    			 * Here, we failed while trying to COMMIT.	Clean up the
    			 * transaction and return to idle state (we do not want to stay in
    			 * the transaction).
    			 */
    		case TBLOCK_END:
    			AbortTransaction();
    			CleanupTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * Here, we are already in an aborted transaction state and are
    			 * waiting for a ROLLBACK, but for some reason we failed again! So
    			 * we just remain in the abort state.
    			 */
    		case TBLOCK_ABORT:
    		case TBLOCK_SUBABORT:
    			break;
    
    			/*
    			 * We are in a failed transaction and we got the ROLLBACK command.
    			 * We have already aborted, we just need to cleanup and go to idle
    			 * state.
    			 */
    		case TBLOCK_ABORT_END:
    			CleanupTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * We are in a live transaction and we got a ROLLBACK command.
    			 * Abort, cleanup, go to idle state.
    			 */
    		case TBLOCK_ABORT_PENDING:
    			AbortTransaction();
    			CleanupTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * Here, we failed while trying to PREPARE.  Clean up the
    			 * transaction and return to idle state (we do not want to stay in
    			 * the transaction).
    			 */
    		case TBLOCK_PREPARE:
    			AbortTransaction();
    			CleanupTransaction();
    			s->blockState = TBLOCK_DEFAULT;
    			break;
    
    			/*
    			 * We got an error inside a subtransaction.  Abort just the
    			 * subtransaction, and go to the persistent SUBABORT state until
    			 * we get ROLLBACK.
    			 */
    		case TBLOCK_SUBINPROGRESS:
    			AbortSubTransaction();
    			s->blockState = TBLOCK_SUBABORT;
    			break;
    
    			/*
    			 * If we failed while trying to create a subtransaction, clean up
    			 * the broken subtransaction and abort the parent.	The same
    			 * applies if we get a failure while ending a subtransaction.
    			 */
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    			AbortSubTransaction();
    			CleanupSubTransaction();
    			AbortCurrentTransaction();
    			break;
    
    			/*
    			 * Same as above, except the Abort() was already done.
    			 */
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_SUBABORT_RESTART:
    			CleanupSubTransaction();
    			AbortCurrentTransaction();
    			break;
    	}
    }
    
    /*
     *	PreventTransactionChain
     *
     *	This routine is to be called by statements that must not run inside
     *	a transaction block, typically because they have non-rollback-able
     *	side effects or do internal commits.
     *
     *	If we have already started a transaction block, issue an error; also issue
     *	an error if we appear to be running inside a user-defined function (which
     *	could issue more commands and possibly cause a failure after the statement
     *	completes).  Subtransactions are verboten too.
     *
     *	isTopLevel: passed down from ProcessUtility to determine whether we are
     *	inside a function or multi-query querystring.  (We will always fail if
     *	this is false, but it's convenient to centralize the check here instead of
     *	making callers do it.)
     *	stmtType: statement type name, for error messages.
     */
    void
    PreventTransactionChain(bool isTopLevel, const char *stmtType)
    {
    	/*
    	 * xact block already started?
    	 */
    	if (IsTransactionBlock())
    		ereport(ERROR,
    				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
    		/* translator: %s represents an SQL statement name */
    				 errmsg("%s cannot run inside a transaction block",
    						stmtType)));
    
    	/*
    	 * subtransaction?
    	 */
    	if (IsSubTransaction())
    		ereport(ERROR,
    				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
    		/* translator: %s represents an SQL statement name */
    				 errmsg("%s cannot run inside a subtransaction",
    						stmtType)));
    
    	/*
    	 * inside a function call?
    	 */
    	if (!isTopLevel)
    		ereport(ERROR,
    				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
    		/* translator: %s represents an SQL statement name */
    				 errmsg("%s cannot be executed from a function or multi-command string",
    						stmtType)));
    
    	/* If we got past IsTransactionBlock test, should be in default state */
    	if (CurrentTransactionState->blockState != TBLOCK_DEFAULT &&
    		CurrentTransactionState->blockState != TBLOCK_STARTED)
    		elog(FATAL, "cannot prevent transaction chain");
    	/* all okay */
    }
    
    /*
     *	RequireTransactionChain
     *
     *	This routine is to be called by statements that must run inside
     *	a transaction block, because they have no effects that persist past
     *	transaction end (and so calling them outside a transaction block
     *	is presumably an error).  DECLARE CURSOR is an example.
     *
     *	If we appear to be running inside a user-defined function, we do not
     *	issue an error, since the function could issue more commands that make
     *	use of the current statement's results.  Likewise subtransactions.
     *	Thus this is an inverse for PreventTransactionChain.
     *
     *	isTopLevel: passed down from ProcessUtility to determine whether we are
     *	inside a function.
     *	stmtType: statement type name, for error messages.
     */
    void
    RequireTransactionChain(bool isTopLevel, const char *stmtType)
    {
    	/*
    	 * xact block already started?
    	 */
    	if (IsTransactionBlock())
    		return;
    
    	/*
    	 * subtransaction?
    	 */
    	if (IsSubTransaction())
    		return;
    
    	/*
    	 * inside a function call?
    	 */
    	if (!isTopLevel)
    		return;
    
    	ereport(ERROR,
    			(errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
    	/* translator: %s represents an SQL statement name */
    			 errmsg("%s can only be used in transaction blocks",
    					stmtType)));
    }
    
    /*
     *	IsInTransactionChain
     *
     *	This routine is for statements that need to behave differently inside
     *	a transaction block than when running as single commands.  ANALYZE is
     *	currently the only example.
     *
     *	isTopLevel: passed down from ProcessUtility to determine whether we are
     *	inside a function.
     */
    bool
    IsInTransactionChain(bool isTopLevel)
    {
    	/*
    	 * Return true on same conditions that would make PreventTransactionChain
    	 * error out
    	 */
    	if (IsTransactionBlock())
    		return true;
    
    	if (IsSubTransaction())
    		return true;
    
    	if (!isTopLevel)
    		return true;
    
    	if (CurrentTransactionState->blockState != TBLOCK_DEFAULT &&
    		CurrentTransactionState->blockState != TBLOCK_STARTED)
    		return true;
    
    	return false;
    }
    
    
    /*
     * Register or deregister callback functions for start- and end-of-xact
     * operations.
     *
     * These functions are intended for use by dynamically loaded modules.
     * For built-in modules we generally just hardwire the appropriate calls
     * (mainly because it's easier to control the order that way, where needed).
     *
     * At transaction end, the callback occurs post-commit or post-abort, so the
     * callback functions can only do noncritical cleanup.
     */
    void
    RegisterXactCallback(XactCallback callback, void *arg)
    {
    	XactCallbackItem *item;
    
    	item = (XactCallbackItem *)
    		MemoryContextAlloc(TopMemoryContext, sizeof(XactCallbackItem));
    	item->callback = callback;
    	item->arg = arg;
    	item->next = Xact_callbacks;
    	Xact_callbacks = item;
    }
    
    void
    UnregisterXactCallback(XactCallback callback, void *arg)
    {
    	XactCallbackItem *item;
    	XactCallbackItem *prev;
    
    	prev = NULL;
    	for (item = Xact_callbacks; item; prev = item, item = item->next)
    	{
    		if (item->callback == callback && item->arg == arg)
    		{
    			if (prev)
    				prev->next = item->next;
    			else
    				Xact_callbacks = item->next;
    			pfree(item);
    			break;
    		}
    	}
    }
    
    static void
    CallXactCallbacks(XactEvent event)
    {
    	XactCallbackItem *item;
    
    	for (item = Xact_callbacks; item; item = item->next)
    		(*item->callback) (event, item->arg);
    }
    
    
    /*
     * Register or deregister callback functions for start- and end-of-subxact
     * operations.
     *
     * Pretty much same as above, but for subtransaction events.
     *
     * At subtransaction end, the callback occurs post-subcommit or post-subabort,
     * so the callback functions can only do noncritical cleanup.  At
     * subtransaction start, the callback is called when the subtransaction has
     * finished initializing.
     */
    void
    RegisterSubXactCallback(SubXactCallback callback, void *arg)
    {
    	SubXactCallbackItem *item;
    
    	item = (SubXactCallbackItem *)
    		MemoryContextAlloc(TopMemoryContext, sizeof(SubXactCallbackItem));
    	item->callback = callback;
    	item->arg = arg;
    	item->next = SubXact_callbacks;
    	SubXact_callbacks = item;
    }
    
    void
    UnregisterSubXactCallback(SubXactCallback callback, void *arg)
    {
    	SubXactCallbackItem *item;
    	SubXactCallbackItem *prev;
    
    	prev = NULL;
    	for (item = SubXact_callbacks; item; prev = item, item = item->next)
    	{
    		if (item->callback == callback && item->arg == arg)
    		{
    			if (prev)
    				prev->next = item->next;
    			else
    				SubXact_callbacks = item->next;
    			pfree(item);
    			break;
    		}
    	}
    }
    
    static void
    CallSubXactCallbacks(SubXactEvent event,
    					 SubTransactionId mySubid,
    					 SubTransactionId parentSubid)
    {
    	SubXactCallbackItem *item;
    
    	for (item = SubXact_callbacks; item; item = item->next)
    		(*item->callback) (event, mySubid, parentSubid, item->arg);
    }
    
    
    /* ----------------------------------------------------------------
     *					   transaction block support
     * ----------------------------------------------------------------
     */
    
    /*
     *	BeginTransactionBlock
     *		This executes a BEGIN command.
     */
    void
    BeginTransactionBlock(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	switch (s->blockState)
    	{
    			/*
    			 * We are not inside a transaction block, so allow one to begin.
    			 */
    		case TBLOCK_STARTED:
    			s->blockState = TBLOCK_BEGIN;
    			break;
    
    			/*
    			 * Already a transaction block in progress.
    			 */
    		case TBLOCK_INPROGRESS:
    		case TBLOCK_SUBINPROGRESS:
    		case TBLOCK_ABORT:
    		case TBLOCK_SUBABORT:
    			ereport(WARNING,
    					(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
    					 errmsg("there is already a transaction in progress")));
    			break;
    
    			/* These cases are invalid. */
    		case TBLOCK_DEFAULT:
    		case TBLOCK_BEGIN:
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_END:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_ABORT_END:
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_ABORT_PENDING:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    		case TBLOCK_SUBABORT_RESTART:
    		case TBLOCK_PREPARE:
    			elog(FATAL, "BeginTransactionBlock: unexpected state %s",
    				 BlockStateAsString(s->blockState));
    			break;
    	}
    }
    
    /*
     *	PrepareTransactionBlock
     *		This executes a PREPARE command.
     *
     * Since PREPARE may actually do a ROLLBACK, the result indicates what
     * happened: TRUE for PREPARE, FALSE for ROLLBACK.
     *
     * Note that we don't actually do anything here except change blockState.
     * The real work will be done in the upcoming PrepareTransaction().
     * We do it this way because it's not convenient to change memory context,
     * resource owner, etc while executing inside a Portal.
     */
    bool
    PrepareTransactionBlock(char *gid)
    {
    	TransactionState s;
    	bool		result;
    
    	/* Set up to commit the current transaction */
    	result = EndTransactionBlock();
    
    	/* If successful, change outer tblock state to PREPARE */
    	if (result)
    	{
    		s = CurrentTransactionState;
    
    		while (s->parent != NULL)
    			s = s->parent;
    
    		if (s->blockState == TBLOCK_END)
    		{
    			/* Save GID where PrepareTransaction can find it again */
    			prepareGID = MemoryContextStrdup(TopTransactionContext, gid);
    
    			s->blockState = TBLOCK_PREPARE;
    		}
    		else
    		{
    			/*
    			 * ignore case where we are not in a transaction;
    			 * EndTransactionBlock already issued a warning.
    			 */
    			Assert(s->blockState == TBLOCK_STARTED);
    			/* Don't send back a PREPARE result tag... */
    			result = false;
    		}
    	}
    
    	return result;
    }
    
    /*
     *	EndTransactionBlock
     *		This executes a COMMIT command.
     *
     * Since COMMIT may actually do a ROLLBACK, the result indicates what
     * happened: TRUE for COMMIT, FALSE for ROLLBACK.
     *
     * Note that we don't actually do anything here except change blockState.
     * The real work will be done in the upcoming CommitTransactionCommand().
     * We do it this way because it's not convenient to change memory context,
     * resource owner, etc while executing inside a Portal.
     */
    bool
    EndTransactionBlock(void)
    {
    	TransactionState s = CurrentTransactionState;
    	bool		result = false;
    
    	switch (s->blockState)
    	{
    			/*
    			 * We are in a transaction block, so tell CommitTransactionCommand
    			 * to COMMIT.
    			 */
    		case TBLOCK_INPROGRESS:
    			s->blockState = TBLOCK_END;
    			result = true;
    			break;
    
    			/*
    			 * We are in a failed transaction block.  Tell
    			 * CommitTransactionCommand it's time to exit the block.
    			 */
    		case TBLOCK_ABORT:
    			s->blockState = TBLOCK_ABORT_END;
    			break;
    
    			/*
    			 * We are in a live subtransaction block.  Set up to subcommit all
    			 * open subtransactions and then commit the main transaction.
    			 */
    		case TBLOCK_SUBINPROGRESS:
    			while (s->parent != NULL)
    			{
    				if (s->blockState == TBLOCK_SUBINPROGRESS)
    					s->blockState = TBLOCK_SUBCOMMIT;
    				else
    					elog(FATAL, "EndTransactionBlock: unexpected state %s",
    						 BlockStateAsString(s->blockState));
    				s = s->parent;
    			}
    			if (s->blockState == TBLOCK_INPROGRESS)
    				s->blockState = TBLOCK_END;
    			else
    				elog(FATAL, "EndTransactionBlock: unexpected state %s",
    					 BlockStateAsString(s->blockState));
    			result = true;
    			break;
    
    			/*
    			 * Here we are inside an aborted subtransaction.  Treat the COMMIT
    			 * as ROLLBACK: set up to abort everything and exit the main
    			 * transaction.
    			 */
    		case TBLOCK_SUBABORT:
    			while (s->parent != NULL)
    			{
    				if (s->blockState == TBLOCK_SUBINPROGRESS)
    					s->blockState = TBLOCK_SUBABORT_PENDING;
    				else if (s->blockState == TBLOCK_SUBABORT)
    					s->blockState = TBLOCK_SUBABORT_END;
    				else
    					elog(FATAL, "EndTransactionBlock: unexpected state %s",
    						 BlockStateAsString(s->blockState));
    				s = s->parent;
    			}
    			if (s->blockState == TBLOCK_INPROGRESS)
    				s->blockState = TBLOCK_ABORT_PENDING;
    			else if (s->blockState == TBLOCK_ABORT)
    				s->blockState = TBLOCK_ABORT_END;
    			else
    				elog(FATAL, "EndTransactionBlock: unexpected state %s",
    					 BlockStateAsString(s->blockState));
    			break;
    
    			/*
    			 * The user issued COMMIT when not inside a transaction.  Issue a
    			 * WARNING, staying in TBLOCK_STARTED state.  The upcoming call to
    			 * CommitTransactionCommand() will then close the transaction and
    			 * put us back into the default state.
    			 */
    		case TBLOCK_STARTED:
    			ereport(WARNING,
    					(errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
    					 errmsg("there is no transaction in progress")));
    			result = true;
    			break;
    
    			/* These cases are invalid. */
    		case TBLOCK_DEFAULT:
    		case TBLOCK_BEGIN:
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_END:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_ABORT_END:
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_ABORT_PENDING:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    		case TBLOCK_SUBABORT_RESTART:
    		case TBLOCK_PREPARE:
    			elog(FATAL, "EndTransactionBlock: unexpected state %s",
    				 BlockStateAsString(s->blockState));
    			break;
    	}
    
    	return result;
    }
    
    /*
     *	UserAbortTransactionBlock
     *		This executes a ROLLBACK command.
     *
     * As above, we don't actually do anything here except change blockState.
     */
    void
    UserAbortTransactionBlock(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	switch (s->blockState)
    	{
    			/*
    			 * We are inside a transaction block and we got a ROLLBACK command
    			 * from the user, so tell CommitTransactionCommand to abort and
    			 * exit the transaction block.
    			 */
    		case TBLOCK_INPROGRESS:
    			s->blockState = TBLOCK_ABORT_PENDING;
    			break;
    
    			/*
    			 * We are inside a failed transaction block and we got a ROLLBACK
    			 * command from the user.  Abort processing is already done, so
    			 * CommitTransactionCommand just has to cleanup and go back to
    			 * idle state.
    			 */
    		case TBLOCK_ABORT:
    			s->blockState = TBLOCK_ABORT_END;
    			break;
    
    			/*
    			 * We are inside a subtransaction.	Mark everything up to top
    			 * level as exitable.
    			 */
    		case TBLOCK_SUBINPROGRESS:
    		case TBLOCK_SUBABORT:
    			while (s->parent != NULL)
    			{
    				if (s->blockState == TBLOCK_SUBINPROGRESS)
    					s->blockState = TBLOCK_SUBABORT_PENDING;
    				else if (s->blockState == TBLOCK_SUBABORT)
    					s->blockState = TBLOCK_SUBABORT_END;
    				else
    					elog(FATAL, "UserAbortTransactionBlock: unexpected state %s",
    						 BlockStateAsString(s->blockState));
    				s = s->parent;
    			}
    			if (s->blockState == TBLOCK_INPROGRESS)
    				s->blockState = TBLOCK_ABORT_PENDING;
    			else if (s->blockState == TBLOCK_ABORT)
    				s->blockState = TBLOCK_ABORT_END;
    			else
    				elog(FATAL, "UserAbortTransactionBlock: unexpected state %s",
    					 BlockStateAsString(s->blockState));
    			break;
    
    			/*
    			 * The user issued ABORT when not inside a transaction. Issue a
    			 * WARNING and go to abort state.  The upcoming call to
    			 * CommitTransactionCommand() will then put us back into the
    			 * default state.
    			 */
    		case TBLOCK_STARTED:
    			ereport(NOTICE,
    					(errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
    					 errmsg("there is no transaction in progress")));
    			s->blockState = TBLOCK_ABORT_PENDING;
    			break;
    
    			/* These cases are invalid. */
    		case TBLOCK_DEFAULT:
    		case TBLOCK_BEGIN:
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_END:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_ABORT_END:
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_ABORT_PENDING:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    		case TBLOCK_SUBABORT_RESTART:
    		case TBLOCK_PREPARE:
    			elog(FATAL, "UserAbortTransactionBlock: unexpected state %s",
    				 BlockStateAsString(s->blockState));
    			break;
    	}
    }
    
    /*
     * DefineSavepoint
     *		This executes a SAVEPOINT command.
     */
    void
    DefineSavepoint(char *name)
    {
    	TransactionState s = CurrentTransactionState;
    
    	switch (s->blockState)
    	{
    		case TBLOCK_INPROGRESS:
    		case TBLOCK_SUBINPROGRESS:
    			/* Normal subtransaction start */
    			PushTransaction();
    			s = CurrentTransactionState;		/* changed by push */
    
    			/*
    			 * Savepoint names, like the TransactionState block itself, live
    			 * in TopTransactionContext.
    			 */
    			if (name)
    				s->name = MemoryContextStrdup(TopTransactionContext, name);
    			break;
    
    			/* These cases are invalid. */
    		case TBLOCK_DEFAULT:
    		case TBLOCK_STARTED:
    		case TBLOCK_BEGIN:
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_END:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_ABORT:
    		case TBLOCK_SUBABORT:
    		case TBLOCK_ABORT_END:
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_ABORT_PENDING:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    		case TBLOCK_SUBABORT_RESTART:
    		case TBLOCK_PREPARE:
    			elog(FATAL, "DefineSavepoint: unexpected state %s",
    				 BlockStateAsString(s->blockState));
    			break;
    	}
    }
    
    /*
     * ReleaseSavepoint
     *		This executes a RELEASE command.
     *
     * As above, we don't actually do anything here except change blockState.
     */
    void
    ReleaseSavepoint(List *options)
    {
    	TransactionState s = CurrentTransactionState;
    	TransactionState target,
    				xact;
    	ListCell   *cell;
    	char	   *name = NULL;
    
    	switch (s->blockState)
    	{
    			/*
    			 * We can't rollback to a savepoint if there is no savepoint
    			 * defined.
    			 */
    		case TBLOCK_INPROGRESS:
    			ereport(ERROR,
    					(errcode(ERRCODE_S_E_INVALID_SPECIFICATION),
    					 errmsg("no such savepoint")));
    			break;
    
    			/*
    			 * We are in a non-aborted subtransaction.	This is the only valid
    			 * case.
    			 */
    		case TBLOCK_SUBINPROGRESS:
    			break;
    
    			/* These cases are invalid. */
    		case TBLOCK_DEFAULT:
    		case TBLOCK_STARTED:
    		case TBLOCK_BEGIN:
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_END:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_ABORT:
    		case TBLOCK_SUBABORT:
    		case TBLOCK_ABORT_END:
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_ABORT_PENDING:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    		case TBLOCK_SUBABORT_RESTART:
    		case TBLOCK_PREPARE:
    			elog(FATAL, "ReleaseSavepoint: unexpected state %s",
    				 BlockStateAsString(s->blockState));
    			break;
    	}
    
    	foreach(cell, options)
    	{
    		DefElem    *elem = lfirst(cell);
    
    		if (strcmp(elem->defname, "savepoint_name") == 0)
    			name = strVal(elem->arg);
    	}
    
    	Assert(PointerIsValid(name));
    
    	for (target = s; PointerIsValid(target); target = target->parent)
    	{
    		if (PointerIsValid(target->name) && strcmp(target->name, name) == 0)
    			break;
    	}
    
    	if (!PointerIsValid(target))
    		ereport(ERROR,
    				(errcode(ERRCODE_S_E_INVALID_SPECIFICATION),
    				 errmsg("no such savepoint")));
    
    	/* disallow crossing savepoint level boundaries */
    	if (target->savepointLevel != s->savepointLevel)
    		ereport(ERROR,
    				(errcode(ERRCODE_S_E_INVALID_SPECIFICATION),
    				 errmsg("no such savepoint")));
    
    	/*
    	 * Mark "commit pending" all subtransactions up to the target
    	 * subtransaction.	The actual commits will happen when control gets to
    	 * CommitTransactionCommand.
    	 */
    	xact = CurrentTransactionState;
    	for (;;)
    	{
    		Assert(xact->blockState == TBLOCK_SUBINPROGRESS);
    		xact->blockState = TBLOCK_SUBRELEASE;
    		if (xact == target)
    			break;
    		xact = xact->parent;
    		Assert(PointerIsValid(xact));
    	}
    }
    
    /*
     * RollbackToSavepoint
     *		This executes a ROLLBACK TO <savepoint> command.
     *
     * As above, we don't actually do anything here except change blockState.
     */
    void
    RollbackToSavepoint(List *options)
    {
    	TransactionState s = CurrentTransactionState;
    	TransactionState target,
    				xact;
    	ListCell   *cell;
    	char	   *name = NULL;
    
    	switch (s->blockState)
    	{
    			/*
    			 * We can't rollback to a savepoint if there is no savepoint
    			 * defined.
    			 */
    		case TBLOCK_INPROGRESS:
    		case TBLOCK_ABORT:
    			ereport(ERROR,
    					(errcode(ERRCODE_S_E_INVALID_SPECIFICATION),
    					 errmsg("no such savepoint")));
    			break;
    
    			/*
    			 * There is at least one savepoint, so proceed.
    			 */
    		case TBLOCK_SUBINPROGRESS:
    		case TBLOCK_SUBABORT:
    			break;
    
    			/* These cases are invalid. */
    		case TBLOCK_DEFAULT:
    		case TBLOCK_STARTED:
    		case TBLOCK_BEGIN:
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_END:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_ABORT_END:
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_ABORT_PENDING:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    		case TBLOCK_SUBABORT_RESTART:
    		case TBLOCK_PREPARE:
    			elog(FATAL, "RollbackToSavepoint: unexpected state %s",
    				 BlockStateAsString(s->blockState));
    			break;
    	}
    
    	foreach(cell, options)
    	{
    		DefElem    *elem = lfirst(cell);
    
    		if (strcmp(elem->defname, "savepoint_name") == 0)
    			name = strVal(elem->arg);
    	}
    
    	Assert(PointerIsValid(name));
    
    	for (target = s; PointerIsValid(target); target = target->parent)
    	{
    		if (PointerIsValid(target->name) && strcmp(target->name, name) == 0)
    			break;
    	}
    
    	if (!PointerIsValid(target))
    		ereport(ERROR,
    				(errcode(ERRCODE_S_E_INVALID_SPECIFICATION),
    				 errmsg("no such savepoint")));
    
    	/* disallow crossing savepoint level boundaries */
    	if (target->savepointLevel != s->savepointLevel)
    		ereport(ERROR,
    				(errcode(ERRCODE_S_E_INVALID_SPECIFICATION),
    				 errmsg("no such savepoint")));
    
    	/*
    	 * Mark "abort pending" all subtransactions up to the target
    	 * subtransaction.	The actual aborts will happen when control gets to
    	 * CommitTransactionCommand.
    	 */
    	xact = CurrentTransactionState;
    	for (;;)
    	{
    		if (xact == target)
    			break;
    		if (xact->blockState == TBLOCK_SUBINPROGRESS)
    			xact->blockState = TBLOCK_SUBABORT_PENDING;
    		else if (xact->blockState == TBLOCK_SUBABORT)
    			xact->blockState = TBLOCK_SUBABORT_END;
    		else
    			elog(FATAL, "RollbackToSavepoint: unexpected state %s",
    				 BlockStateAsString(xact->blockState));
    		xact = xact->parent;
    		Assert(PointerIsValid(xact));
    	}
    
    	/* And mark the target as "restart pending" */
    	if (xact->blockState == TBLOCK_SUBINPROGRESS)
    		xact->blockState = TBLOCK_SUBRESTART;
    	else if (xact->blockState == TBLOCK_SUBABORT)
    		xact->blockState = TBLOCK_SUBABORT_RESTART;
    	else
    		elog(FATAL, "RollbackToSavepoint: unexpected state %s",
    			 BlockStateAsString(xact->blockState));
    }
    
    /*
     * BeginInternalSubTransaction
     *		This is the same as DefineSavepoint except it allows TBLOCK_STARTED,
     *		TBLOCK_END, and TBLOCK_PREPARE states, and therefore it can safely be
     *		used in functions that might be called when not inside a BEGIN block
     *		or when running deferred triggers at COMMIT/PREPARE time.  Also, it
     *		automatically does CommitTransactionCommand/StartTransactionCommand
     *		instead of expecting the caller to do it.
     */
    void
    BeginInternalSubTransaction(char *name)
    {
    	TransactionState s = CurrentTransactionState;
    
    	switch (s->blockState)
    	{
    		case TBLOCK_STARTED:
    		case TBLOCK_INPROGRESS:
    		case TBLOCK_END:
    		case TBLOCK_PREPARE:
    		case TBLOCK_SUBINPROGRESS:
    			/* Normal subtransaction start */
    			PushTransaction();
    			s = CurrentTransactionState;		/* changed by push */
    
    			/*
    			 * Savepoint names, like the TransactionState block itself, live
    			 * in TopTransactionContext.
    			 */
    			if (name)
    				s->name = MemoryContextStrdup(TopTransactionContext, name);
    			break;
    
    			/* These cases are invalid. */
    		case TBLOCK_DEFAULT:
    		case TBLOCK_BEGIN:
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_ABORT:
    		case TBLOCK_SUBABORT:
    		case TBLOCK_ABORT_END:
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_ABORT_PENDING:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    		case TBLOCK_SUBABORT_RESTART:
    			elog(FATAL, "BeginInternalSubTransaction: unexpected state %s",
    				 BlockStateAsString(s->blockState));
    			break;
    	}
    
    	CommitTransactionCommand();
    	StartTransactionCommand();
    }
    
    /*
     * ReleaseCurrentSubTransaction
     *
     * RELEASE (ie, commit) the innermost subtransaction, regardless of its
     * savepoint name (if any).
     * NB: do NOT use CommitTransactionCommand/StartTransactionCommand with this.
     */
    void
    ReleaseCurrentSubTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	if (s->blockState != TBLOCK_SUBINPROGRESS)
    		elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s",
    			 BlockStateAsString(s->blockState));
    	Assert(s->state == TRANS_INPROGRESS);
    	MemoryContextSwitchTo(CurTransactionContext);
    	CommitSubTransaction();
    	s = CurrentTransactionState;	/* changed by pop */
    	Assert(s->state == TRANS_INPROGRESS);
    }
    
    /*
     * RollbackAndReleaseCurrentSubTransaction
     *
     * ROLLBACK and RELEASE (ie, abort) the innermost subtransaction, regardless
     * of its savepoint name (if any).
     * NB: do NOT use CommitTransactionCommand/StartTransactionCommand with this.
     */
    void
    RollbackAndReleaseCurrentSubTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	switch (s->blockState)
    	{
    			/* Must be in a subtransaction */
    		case TBLOCK_SUBINPROGRESS:
    		case TBLOCK_SUBABORT:
    			break;
    
    			/* These cases are invalid. */
    		case TBLOCK_DEFAULT:
    		case TBLOCK_STARTED:
    		case TBLOCK_BEGIN:
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_INPROGRESS:
    		case TBLOCK_END:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_ABORT:
    		case TBLOCK_ABORT_END:
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_ABORT_PENDING:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    		case TBLOCK_SUBABORT_RESTART:
    		case TBLOCK_PREPARE:
    			elog(FATAL, "RollbackAndReleaseCurrentSubTransaction: unexpected state %s",
    				 BlockStateAsString(s->blockState));
    			break;
    	}
    
    	/*
    	 * Abort the current subtransaction, if needed.
    	 */
    	if (s->blockState == TBLOCK_SUBINPROGRESS)
    		AbortSubTransaction();
    
    	/* And clean it up, too */
    	CleanupSubTransaction();
    
    	s = CurrentTransactionState;	/* changed by pop */
    	AssertState(s->blockState == TBLOCK_SUBINPROGRESS ||
    				s->blockState == TBLOCK_INPROGRESS ||
    				s->blockState == TBLOCK_STARTED);
    }
    
    /*
     *	AbortOutOfAnyTransaction
     *
     *	This routine is provided for error recovery purposes.  It aborts any
     *	active transaction or transaction block, leaving the system in a known
     *	idle state.
     */
    void
    AbortOutOfAnyTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	/*
    	 * Get out of any transaction or nested transaction
    	 */
    	do
    	{
    		switch (s->blockState)
    		{
    			case TBLOCK_DEFAULT:
    				/* Not in a transaction, do nothing */
    				break;
    			case TBLOCK_STARTED:
    			case TBLOCK_BEGIN:
    			case TBLOCK_INPROGRESS:
    			case TBLOCK_END:
    			case TBLOCK_ABORT_PENDING:
    			case TBLOCK_PREPARE:
    				/* In a transaction, so clean up */
    				AbortTransaction();
    				CleanupTransaction();
    				s->blockState = TBLOCK_DEFAULT;
    				break;
    			case TBLOCK_ABORT:
    			case TBLOCK_ABORT_END:
    				/* AbortTransaction already done, still need Cleanup */
    				CleanupTransaction();
    				s->blockState = TBLOCK_DEFAULT;
    				break;
    
    				/*
    				 * In a subtransaction, so clean it up and abort parent too
    				 */
    			case TBLOCK_SUBBEGIN:
    			case TBLOCK_SUBINPROGRESS:
    			case TBLOCK_SUBRELEASE:
    			case TBLOCK_SUBCOMMIT:
    			case TBLOCK_SUBABORT_PENDING:
    			case TBLOCK_SUBRESTART:
    				AbortSubTransaction();
    				CleanupSubTransaction();
    				s = CurrentTransactionState;	/* changed by pop */
    				break;
    
    			case TBLOCK_SUBABORT:
    			case TBLOCK_SUBABORT_END:
    			case TBLOCK_SUBABORT_RESTART:
    				/* As above, but AbortSubTransaction already done */
    				CleanupSubTransaction();
    				s = CurrentTransactionState;	/* changed by pop */
    				break;
    		}
    	} while (s->blockState != TBLOCK_DEFAULT);
    
    	/* Should be out of all subxacts now */
    	Assert(s->parent == NULL);
    }
    
    /*
     * IsTransactionBlock --- are we within a transaction block?
     */
    bool
    IsTransactionBlock(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	if (s->blockState == TBLOCK_DEFAULT || s->blockState == TBLOCK_STARTED)
    		return false;
    
    	return true;
    }
    
    /*
     * IsTransactionOrTransactionBlock --- are we within either a transaction
     * or a transaction block?	(The backend is only really "idle" when this
     * returns false.)
     *
     * This should match up with IsTransactionBlock and IsTransactionState.
     */
    bool
    IsTransactionOrTransactionBlock(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	if (s->blockState == TBLOCK_DEFAULT)
    		return false;
    
    	return true;
    }
    
    /*
     * TransactionBlockStatusCode - return status code to send in ReadyForQuery
     */
    char
    TransactionBlockStatusCode(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	switch (s->blockState)
    	{
    		case TBLOCK_DEFAULT:
    		case TBLOCK_STARTED:
    			return 'I';			/* idle --- not in transaction */
    		case TBLOCK_BEGIN:
    		case TBLOCK_SUBBEGIN:
    		case TBLOCK_INPROGRESS:
    		case TBLOCK_SUBINPROGRESS:
    		case TBLOCK_END:
    		case TBLOCK_SUBRELEASE:
    		case TBLOCK_SUBCOMMIT:
    		case TBLOCK_PREPARE:
    			return 'T';			/* in transaction */
    		case TBLOCK_ABORT:
    		case TBLOCK_SUBABORT:
    		case TBLOCK_ABORT_END:
    		case TBLOCK_SUBABORT_END:
    		case TBLOCK_ABORT_PENDING:
    		case TBLOCK_SUBABORT_PENDING:
    		case TBLOCK_SUBRESTART:
    		case TBLOCK_SUBABORT_RESTART:
    			return 'E';			/* in failed transaction */
    	}
    
    	/* should never get here */
    	elog(FATAL, "invalid transaction block state: %s",
    		 BlockStateAsString(s->blockState));
    	return 0;					/* keep compiler quiet */
    }
    
    /*
     * IsSubTransaction
     */
    bool
    IsSubTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	if (s->nestingLevel >= 2)
    		return true;
    
    	return false;
    }
    
    /*
     * StartSubTransaction
     *
     * If you're wondering why this is separate from PushTransaction: it's because
     * we can't conveniently do this stuff right inside DefineSavepoint.  The
     * SAVEPOINT utility command will be executed inside a Portal, and if we
     * muck with CurrentMemoryContext or CurrentResourceOwner then exit from
     * the Portal will undo those settings.  So we make DefineSavepoint just
     * push a dummy transaction block, and when control returns to the main
     * idle loop, CommitTransactionCommand will be called, and we'll come here
     * to finish starting the subtransaction.
     */
    static void
    StartSubTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	if (s->state != TRANS_DEFAULT)
    		elog(WARNING, "StartSubTransaction while in %s state",
    			 TransStateAsString(s->state));
    
    	s->state = TRANS_START;
    
    	/*
    	 * Initialize subsystems for new subtransaction
    	 *
    	 * must initialize resource-management stuff first
    	 */
    	AtSubStart_Memory();
    	AtSubStart_ResourceOwner();
    	AtSubStart_Inval();
    	AtSubStart_Notify();
    	AfterTriggerBeginSubXact();
    
    	s->state = TRANS_INPROGRESS;
    
    	/*
    	 * Call start-of-subxact callbacks
    	 */
    	CallSubXactCallbacks(SUBXACT_EVENT_START_SUB, s->subTransactionId,
    						 s->parent->subTransactionId);
    
    	ShowTransactionState("StartSubTransaction");
    }
    
    /*
     * CommitSubTransaction
     *
     *	The caller has to make sure to always reassign CurrentTransactionState
     *	if it has a local pointer to it after calling this function.
     */
    static void
    CommitSubTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	ShowTransactionState("CommitSubTransaction");
    
    	if (s->state != TRANS_INPROGRESS)
    		elog(WARNING, "CommitSubTransaction while in %s state",
    			 TransStateAsString(s->state));
    
    	/* Pre-commit processing goes here -- nothing to do at the moment */
    
    	s->state = TRANS_COMMIT;
    
    	/* Must CCI to ensure commands of subtransaction are seen as done */
    	CommandCounterIncrement();
    
    	/*
    	 * Prior to 8.4 we marked subcommit in clog at this point.	We now only
    	 * perform that step, if required, as part of the atomic update of the
    	 * whole transaction tree at top level commit or abort.
    	 */
    
    	/* Post-commit cleanup */
    	if (TransactionIdIsValid(s->transactionId))
    		AtSubCommit_childXids();
    	AfterTriggerEndSubXact(true);
    	AtSubCommit_Portals(s->subTransactionId,
    						s->parent->subTransactionId,
    						s->parent->curTransactionOwner);
    	AtEOSubXact_LargeObject(true, s->subTransactionId,
    							s->parent->subTransactionId);
    	AtSubCommit_Notify();
    
    	CallSubXactCallbacks(SUBXACT_EVENT_COMMIT_SUB, s->subTransactionId,
    						 s->parent->subTransactionId);
    
    	ResourceOwnerRelease(s->curTransactionOwner,
    						 RESOURCE_RELEASE_BEFORE_LOCKS,
    						 true, false);
    	AtEOSubXact_RelationCache(true, s->subTransactionId,
    							  s->parent->subTransactionId);
    	AtEOSubXact_Inval(true);
    	AtSubCommit_smgr();
    
    	/*
    	 * The only lock we actually release here is the subtransaction XID lock.
    	 */
    	CurrentResourceOwner = s->curTransactionOwner;
    	if (TransactionIdIsValid(s->transactionId))
    		XactLockTableDelete(s->transactionId);
    
    	/*
    	 * Other locks should get transferred to their parent resource owner.
    	 */
    	ResourceOwnerRelease(s->curTransactionOwner,
    						 RESOURCE_RELEASE_LOCKS,
    						 true, false);
    	ResourceOwnerRelease(s->curTransactionOwner,
    						 RESOURCE_RELEASE_AFTER_LOCKS,
    						 true, false);
    
    	AtEOXact_GUC(true, s->gucNestLevel);
    	AtEOSubXact_SPI(true, s->subTransactionId);
    	AtEOSubXact_on_commit_actions(true, s->subTransactionId,
    								  s->parent->subTransactionId);
    	AtEOSubXact_Namespace(true, s->subTransactionId,
    						  s->parent->subTransactionId);
    	AtEOSubXact_Files(true, s->subTransactionId,
    					  s->parent->subTransactionId);
    	AtEOSubXact_HashTables(true, s->nestingLevel);
    	AtEOSubXact_PgStat(true, s->nestingLevel);
    	AtSubCommit_Snapshot(s->nestingLevel);
    
    	/*
    	 * We need to restore the upper transaction's read-only state, in case the
    	 * upper is read-write while the child is read-only; GUC will incorrectly
    	 * think it should leave the child state in place.
    	 */
    	XactReadOnly = s->prevXactReadOnly;
    
    	CurrentResourceOwner = s->parent->curTransactionOwner;
    	CurTransactionResourceOwner = s->parent->curTransactionOwner;
    	ResourceOwnerDelete(s->curTransactionOwner);
    	s->curTransactionOwner = NULL;
    
    	AtSubCommit_Memory();
    
    	s->state = TRANS_DEFAULT;
    
    	PopTransaction();
    }
    
    /*
     * AbortSubTransaction
     */
    static void
    AbortSubTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	/* Prevent cancel/die interrupt while cleaning up */
    	HOLD_INTERRUPTS();
    
    	/* Make sure we have a valid memory context and resource owner */
    	AtSubAbort_Memory();
    	AtSubAbort_ResourceOwner();
    
    	/*
    	 * Release any LW locks we might be holding as quickly as possible.
    	 * (Regular locks, however, must be held till we finish aborting.)
    	 * Releasing LW locks is critical since we might try to grab them again
    	 * while cleaning up!
    	 *
    	 * FIXME This may be incorrect --- Are there some locks we should keep?
    	 * Buffer locks, for example?  I don't think so but I'm not sure.
    	 */
    	LWLockReleaseAll();
    
    	AbortBufferIO();
    	UnlockBuffers();
    
    	LockErrorCleanup();
    
    	/*
    	 * check the current transaction state
    	 */
    	ShowTransactionState("AbortSubTransaction");
    
    	if (s->state != TRANS_INPROGRESS)
    		elog(WARNING, "AbortSubTransaction while in %s state",
    			 TransStateAsString(s->state));
    
    	s->state = TRANS_ABORT;
    
    	/*
    	 * Reset user ID which might have been changed transiently.  (See notes in
    	 * AbortTransaction.)
    	 */
    	SetUserIdAndSecContext(s->prevUser, s->prevSecContext);
    
    	/*
    	 * We can skip all this stuff if the subxact failed before creating a
    	 * ResourceOwner...
    	 */
    	if (s->curTransactionOwner)
    	{
    		AfterTriggerEndSubXact(false);
    		AtSubAbort_Portals(s->subTransactionId,
    						   s->parent->subTransactionId,
    						   s->parent->curTransactionOwner);
    		AtEOSubXact_LargeObject(false, s->subTransactionId,
    								s->parent->subTransactionId);
    		AtSubAbort_Notify();
    
    		/* Advertise the fact that we aborted in pg_clog. */
    		(void) RecordTransactionAbort(true);
    
    		/* Post-abort cleanup */
    		if (TransactionIdIsValid(s->transactionId))
    			AtSubAbort_childXids();
    
    		CallSubXactCallbacks(SUBXACT_EVENT_ABORT_SUB, s->subTransactionId,
    							 s->parent->subTransactionId);
    
    		ResourceOwnerRelease(s->curTransactionOwner,
    							 RESOURCE_RELEASE_BEFORE_LOCKS,
    							 false, false);
    		AtEOSubXact_RelationCache(false, s->subTransactionId,
    								  s->parent->subTransactionId);
    		AtEOSubXact_Inval(false);
    		AtSubAbort_smgr();
    		ResourceOwnerRelease(s->curTransactionOwner,
    							 RESOURCE_RELEASE_LOCKS,
    							 false, false);
    		ResourceOwnerRelease(s->curTransactionOwner,
    							 RESOURCE_RELEASE_AFTER_LOCKS,
    							 false, false);
    
    		AtEOXact_GUC(false, s->gucNestLevel);
    		AtEOSubXact_SPI(false, s->subTransactionId);
    		AtEOSubXact_on_commit_actions(false, s->subTransactionId,
    									  s->parent->subTransactionId);
    		AtEOSubXact_Namespace(false, s->subTransactionId,
    							  s->parent->subTransactionId);
    		AtEOSubXact_Files(false, s->subTransactionId,
    						  s->parent->subTransactionId);
    		AtEOSubXact_HashTables(false, s->nestingLevel);
    		AtEOSubXact_PgStat(false, s->nestingLevel);
    		AtSubAbort_Snapshot(s->nestingLevel);
    	}
    
    	/*
    	 * Restore the upper transaction's read-only state, too.  This should be
    	 * redundant with GUC's cleanup but we may as well do it for consistency
    	 * with the commit case.
    	 */
    	XactReadOnly = s->prevXactReadOnly;
    
    	RESUME_INTERRUPTS();
    }
    
    /*
     * CleanupSubTransaction
     *
     *	The caller has to make sure to always reassign CurrentTransactionState
     *	if it has a local pointer to it after calling this function.
     */
    static void
    CleanupSubTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	ShowTransactionState("CleanupSubTransaction");
    
    	if (s->state != TRANS_ABORT)
    		elog(WARNING, "CleanupSubTransaction while in %s state",
    			 TransStateAsString(s->state));
    
    	AtSubCleanup_Portals(s->subTransactionId);
    
    	CurrentResourceOwner = s->parent->curTransactionOwner;
    	CurTransactionResourceOwner = s->parent->curTransactionOwner;
    	if (s->curTransactionOwner)
    		ResourceOwnerDelete(s->curTransactionOwner);
    	s->curTransactionOwner = NULL;
    
    	AtSubCleanup_Memory();
    
    	s->state = TRANS_DEFAULT;
    
    	PopTransaction();
    }
    
    /*
     * PushTransaction
     *		Create transaction state stack entry for a subtransaction
     *
     *	The caller has to make sure to always reassign CurrentTransactionState
     *	if it has a local pointer to it after calling this function.
     */
    static void
    PushTransaction(void)
    {
    	TransactionState p = CurrentTransactionState;
    	TransactionState s;
    
    	/*
    	 * We keep subtransaction state nodes in TopTransactionContext.
    	 */
    	s = (TransactionState)
    		MemoryContextAllocZero(TopTransactionContext,
    							   sizeof(TransactionStateData));
    
    	/*
    	 * Assign a subtransaction ID, watching out for counter wraparound.
    	 */
    	currentSubTransactionId += 1;
    	if (currentSubTransactionId == InvalidSubTransactionId)
    	{
    		currentSubTransactionId -= 1;
    		pfree(s);
    		ereport(ERROR,
    				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    				 errmsg("cannot have more than 2^32-1 subtransactions in a transaction")));
    	}
    
    	/*
    	 * We can now stack a minimally valid subtransaction without fear of
    	 * failure.
    	 */
    	s->transactionId = InvalidTransactionId;	/* until assigned */
    	s->subTransactionId = currentSubTransactionId;
    	s->parent = p;
    	s->nestingLevel = p->nestingLevel + 1;
    	s->gucNestLevel = NewGUCNestLevel();
    	s->savepointLevel = p->savepointLevel;
    	s->state = TRANS_DEFAULT;
    	s->blockState = TBLOCK_SUBBEGIN;
    	GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
    	s->prevXactReadOnly = XactReadOnly;
    
    	CurrentTransactionState = s;
    
    	/*
    	 * AbortSubTransaction and CleanupSubTransaction have to be able to cope
    	 * with the subtransaction from here on out; in particular they should not
    	 * assume that it necessarily has a transaction context, resource owner,
    	 * or XID.
    	 */
    }
    
    /*
     * PopTransaction
     *		Pop back to parent transaction state
     *
     *	The caller has to make sure to always reassign CurrentTransactionState
     *	if it has a local pointer to it after calling this function.
     */
    static void
    PopTransaction(void)
    {
    	TransactionState s = CurrentTransactionState;
    
    	if (s->state != TRANS_DEFAULT)
    		elog(WARNING, "PopTransaction while in %s state",
    			 TransStateAsString(s->state));
    
    	if (s->parent == NULL)
    		elog(FATAL, "PopTransaction with no parent");
    
    	CurrentTransactionState = s->parent;
    
    	/* Let's just make sure CurTransactionContext is good */
    	CurTransactionContext = s->parent->curTransactionContext;
    	MemoryContextSwitchTo(CurTransactionContext);
    
    	/* Ditto for ResourceOwner links */
    	CurTransactionResourceOwner = s->parent->curTransactionOwner;
    	CurrentResourceOwner = s->parent->curTransactionOwner;
    
    	/* Free the old child structure */
    	if (s->name)
    		pfree(s->name);
    	pfree(s);
    }
    
    /*
     * ShowTransactionState
     *		Debug support
     */
    static void
    ShowTransactionState(const char *str)
    {
    	/* skip work if message will definitely not be printed */
    	if (log_min_messages <= DEBUG3 || client_min_messages <= DEBUG3)
    	{
    		elog(DEBUG3, "%s", str);
    		ShowTransactionStateRec(CurrentTransactionState);
    	}
    }
    
    /*
     * ShowTransactionStateRec
     *		Recursive subroutine for ShowTransactionState
     */
    static void
    ShowTransactionStateRec(TransactionState s)
    {
    	StringInfoData buf;
    
    	initStringInfo(&buf);
    
    	if (s->nChildXids > 0)
    	{
    		int			i;
    
    		appendStringInfo(&buf, "%u", s->childXids[0]);
    		for (i = 1; i < s->nChildXids; i++)
    			appendStringInfo(&buf, " %u", s->childXids[i]);
    	}
    
    	if (s->parent)
    		ShowTransactionStateRec(s->parent);
    
    	/* use ereport to suppress computation if msg will not be printed */
    	ereport(DEBUG3,
    			(errmsg_internal("name: %s; blockState: %13s; state: %7s, xid/subid/cid: %u/%u/%u%s, nestlvl: %d, children: %s",
    							 PointerIsValid(s->name) ? s->name : "unnamed",
    							 BlockStateAsString(s->blockState),
    							 TransStateAsString(s->state),
    							 (unsigned int) s->transactionId,
    							 (unsigned int) s->subTransactionId,
    							 (unsigned int) currentCommandId,
    							 currentCommandIdUsed ? " (used)" : "",
    							 s->nestingLevel, buf.data)));
    
    	pfree(buf.data);
    }
    
    /*
     * BlockStateAsString
     *		Debug support
     */
    static const char *
    BlockStateAsString(TBlockState blockState)
    {
    	switch (blockState)
    	{
    		case TBLOCK_DEFAULT:
    			return "DEFAULT";
    		case TBLOCK_STARTED:
    			return "STARTED";
    		case TBLOCK_BEGIN:
    			return "BEGIN";
    		case TBLOCK_INPROGRESS:
    			return "INPROGRESS";
    		case TBLOCK_END:
    			return "END";
    		case TBLOCK_ABORT:
    			return "ABORT";
    		case TBLOCK_ABORT_END:
    			return "ABORT END";
    		case TBLOCK_ABORT_PENDING:
    			return "ABORT PEND";
    		case TBLOCK_PREPARE:
    			return "PREPARE";
    		case TBLOCK_SUBBEGIN:
    			return "SUB BEGIN";
    		case TBLOCK_SUBINPROGRESS:
    			return "SUB INPROGRS";
    		case TBLOCK_SUBRELEASE:
    			return "SUB RELEASE";
    		case TBLOCK_SUBCOMMIT:
    			return "SUB COMMIT";
    		case TBLOCK_SUBABORT:
    			return "SUB ABORT";
    		case TBLOCK_SUBABORT_END:
    			return "SUB ABORT END";
    		case TBLOCK_SUBABORT_PENDING:
    			return "SUB ABRT PEND";
    		case TBLOCK_SUBRESTART:
    			return "SUB RESTART";
    		case TBLOCK_SUBABORT_RESTART:
    			return "SUB AB RESTRT";
    	}
    	return "UNRECOGNIZED";
    }
    
    /*
     * TransStateAsString
     *		Debug support
     */
    static const char *
    TransStateAsString(TransState state)
    {
    	switch (state)
    	{
    		case TRANS_DEFAULT:
    			return "DEFAULT";
    		case TRANS_START:
    			return "START";
    		case TRANS_INPROGRESS:
    			return "INPROGR";
    		case TRANS_COMMIT:
    			return "COMMIT";
    		case TRANS_ABORT:
    			return "ABORT";
    		case TRANS_PREPARE:
    			return "PREPARE";
    	}
    	return "UNRECOGNIZED";
    }
    
    /*
     * xactGetCommittedChildren
     *
     * Gets the list of committed children of the current transaction.	The return
     * value is the number of child transactions.  *ptr is set to point to an
     * array of TransactionIds.  The array is allocated in TopTransactionContext;
     * the caller should *not* pfree() it (this is a change from pre-8.4 code!).
     * If there are no subxacts, *ptr is set to NULL.
     */
    int
    xactGetCommittedChildren(TransactionId **ptr)
    {
    	TransactionState s = CurrentTransactionState;
    
    	if (s->nChildXids == 0)
    		*ptr = NULL;
    	else
    		*ptr = s->childXids;
    
    	return s->nChildXids;
    }
    
    /*
     *	XLOG support routines
     */
    
    /*
     * Before 9.0 this was a fairly short function, but now it performs many
     * actions for which the order of execution is critical.
     */
    static void
    xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
    					TransactionId *sub_xids, int nsubxacts,
    					SharedInvalidationMessage *inval_msgs, int nmsgs,
    					RelFileNode *xnodes, int nrels,
    					Oid dbId, Oid tsId,
    					uint32 xinfo)
    {
    	TransactionId max_xid;
    	int			i;
    
    	max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
    
    	/*
    	 * Make sure nextXid is beyond any XID mentioned in the record.
    	 *
    	 * We don't expect anyone else to modify nextXid, hence we don't need to
    	 * hold a lock while checking this. We still acquire the lock to modify
    	 * it, though.
    	 */
    	if (TransactionIdFollowsOrEquals(max_xid,
    									 ShmemVariableCache->nextXid))
    	{
    		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
    		ShmemVariableCache->nextXid = max_xid;
    		TransactionIdAdvance(ShmemVariableCache->nextXid);
    		LWLockRelease(XidGenLock);
    	}
    
    	if (standbyState == STANDBY_DISABLED)
    	{
    		/*
    		 * Mark the transaction committed in pg_clog.
    		 */
    		TransactionIdCommitTree(xid, nsubxacts, sub_xids);
    	}
    	else
    	{
    		/*
    		 * If a transaction completion record arrives that has as-yet
    		 * unobserved subtransactions then this will not have been fully
    		 * handled by the call to RecordKnownAssignedTransactionIds() in the
    		 * main recovery loop in xlog.c. So we need to do bookkeeping again to
    		 * cover that case. This is confusing and it is easy to think this
    		 * call is irrelevant, which has happened three times in development
    		 * already. Leave it in.
    		 */
    		RecordKnownAssignedTransactionIds(max_xid);
    
    		/*
    		 * Mark the transaction committed in pg_clog. We use async commit
    		 * protocol during recovery to provide information on database
    		 * consistency for when users try to set hint bits. It is important
    		 * that we do not set hint bits until the minRecoveryPoint is past
    		 * this commit record. This ensures that if we crash we don't see hint
    		 * bits set on changes made by transactions that haven't yet
    		 * recovered. It's unlikely but it's good to be safe.
    		 */
    		TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
    
    		/*
    		 * We must mark clog before we update the ProcArray.
    		 */
    		ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
    
    		/*
    		 * Send any cache invalidations attached to the commit. We must
    		 * maintain the same order of invalidation then release locks as
    		 * occurs in CommitTransaction().
    		 */
    		ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
    								  XactCompletionRelcacheInitFileInval(xinfo),
    											 dbId, tsId);
    
    		/*
    		 * Release locks, if any. We do this for both two phase and normal one
    		 * phase transactions. In effect we are ignoring the prepare phase and
    		 * just going straight to lock release.
    		 */
    		StandbyReleaseLockTree(xid, nsubxacts, sub_xids);
    	}
    
    	/* Make sure files supposed to be dropped are dropped */
    	for (i = 0; i < nrels; i++)
    	{
    		SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
    		ForkNumber	fork;
    
    		for (fork = 0; fork <= MAX_FORKNUM; fork++)
    		{
    			XLogDropRelation(xnodes[i], fork);
    			smgrdounlink(srel, fork, true);
    		}
    		smgrclose(srel);
    	}
    
    	/*
    	 * We issue an XLogFlush() for the same reason we emit ForceSyncCommit()
    	 * in normal operation. For example, in DROP DATABASE, we delete all the
    	 * files belonging to the database, and then commit the transaction. If we
    	 * crash after all the files have been deleted but before the commit, you
    	 * have an entry in pg_database without any files. To minimize the window
    	 * for that, we use ForceSyncCommit() to rush the commit record to disk as
    	 * quick as possible. We have the same window during recovery, and forcing
    	 * an XLogFlush() (which updates minRecoveryPoint during recovery) helps
    	 * to reduce that problem window, for any user that requested
    	 * ForceSyncCommit().
    	 */
    	if (XactCompletionForceSyncCommit(xinfo))
    		XLogFlush(lsn);
    
    }
    /*
     * Utility function to call xact_redo_commit_internal after breaking down xlrec
     */
    static void
    xact_redo_commit(xl_xact_commit *xlrec,
    							TransactionId xid, XLogRecPtr lsn)
    {
    	TransactionId *subxacts;
    	SharedInvalidationMessage *inval_msgs;
    
    	/* subxid array follows relfilenodes */
    	subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
    	/* invalidation messages array follows subxids */
    	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
    
    	xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
    								inval_msgs, xlrec->nmsgs,
    								xlrec->xnodes, xlrec->nrels,
    								xlrec->dbId,
    								xlrec->tsId,
    								xlrec->xinfo);
    }
    
    /*
     * Utility function to call xact_redo_commit_internal  for compact form of message.
     */
    static void
    xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
    							TransactionId xid, XLogRecPtr lsn)
    {
    	xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
    								NULL, 0,		/* inval msgs */
    								NULL, 0,		/* relfilenodes */
    								InvalidOid,		/* dbId */
    								InvalidOid,		/* tsId */
    								0);				/* xinfo */
    }
    
    /*
     * Be careful with the order of execution, as with xact_redo_commit().
     * The two functions are similar but differ in key places.
     *
     * Note also that an abort can be for a subtransaction and its children,
     * not just for a top level abort. That means we have to consider
     * topxid != xid, whereas in commit we would find topxid == xid always
     * because subtransaction commit is never WAL logged.
     */
    static void
    xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
    {
    	TransactionId *sub_xids;
    	TransactionId max_xid;
    	int			i;
    
    	sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
    	max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
    
    	/*
    	 * Make sure nextXid is beyond any XID mentioned in the record.
    	 *
    	 * We don't expect anyone else to modify nextXid, hence we don't need to
    	 * hold a lock while checking this. We still acquire the lock to modify
    	 * it, though.
    	 */
    	if (TransactionIdFollowsOrEquals(max_xid,
    									 ShmemVariableCache->nextXid))
    	{
    		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
    		ShmemVariableCache->nextXid = max_xid;
    		TransactionIdAdvance(ShmemVariableCache->nextXid);
    		LWLockRelease(XidGenLock);
    	}
    
    	if (standbyState == STANDBY_DISABLED)
    	{
    		/* Mark the transaction aborted in pg_clog, no need for async stuff */
    		TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
    	}
    	else
    	{
    		/*
    		 * If a transaction completion record arrives that has as-yet
    		 * unobserved subtransactions then this will not have been fully
    		 * handled by the call to RecordKnownAssignedTransactionIds() in the
    		 * main recovery loop in xlog.c. So we need to do bookkeeping again to
    		 * cover that case. This is confusing and it is easy to think this
    		 * call is irrelevant, which has happened three times in development
    		 * already. Leave it in.
    		 */
    		RecordKnownAssignedTransactionIds(max_xid);
    
    		/* Mark the transaction aborted in pg_clog, no need for async stuff */
    		TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
    
    		/*
    		 * We must update the ProcArray after we have marked clog.
    		 */
    		ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
    
    		/*
    		 * There are no flat files that need updating, nor invalidation
    		 * messages to send or undo.
    		 */
    
    		/*
    		 * Release locks, if any. There are no invalidations to send.
    		 */
    		StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
    	}
    
    	/* Make sure files supposed to be dropped are dropped */
    	for (i = 0; i < xlrec->nrels; i++)
    	{
    		SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
    		ForkNumber	fork;
    
    		for (fork = 0; fork <= MAX_FORKNUM; fork++)
    		{
    			XLogDropRelation(xlrec->xnodes[i], fork);
    			smgrdounlink(srel, fork, true);
    		}
    		smgrclose(srel);
    	}
    }
    
    void
    xact_redo(XLogRecPtr lsn, XLogRecord *record)
    {
    	uint8		info = record->xl_info & ~XLR_INFO_MASK;
    
    	/* Backup blocks are not used in xact records */
    	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
    
    	if (info == XLOG_XACT_COMMIT_COMPACT)
    	{
    		xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
    
    		xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
    	}
    	else if (info == XLOG_XACT_COMMIT)
    	{
    		xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
    
    		xact_redo_commit(xlrec, record->xl_xid, lsn);
    	}
    	else if (info == XLOG_XACT_ABORT)
    	{
    		xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
    
    		xact_redo_abort(xlrec, record->xl_xid);
    	}
    	else if (info == XLOG_XACT_PREPARE)
    	{
    		/* the record contents are exactly the 2PC file */
    		RecreateTwoPhaseFile(record->xl_xid,
    							 XLogRecGetData(record), record->xl_len);
    	}
    	else if (info == XLOG_XACT_COMMIT_PREPARED)
    	{
    		xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
    
    		xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
    		RemoveTwoPhaseFile(xlrec->xid, false);
    	}
    	else if (info == XLOG_XACT_ABORT_PREPARED)
    	{
    		xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
    
    		xact_redo_abort(&xlrec->arec, xlrec->xid);
    		RemoveTwoPhaseFile(xlrec->xid, false);
    	}
    	else if (info == XLOG_XACT_ASSIGNMENT)
    	{
    		xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
    
    		if (standbyState >= STANDBY_INITIALIZED)
    			ProcArrayApplyXidAssignment(xlrec->xtop,
    										xlrec->nsubxacts, xlrec->xsub);
    	}
    	else
    		elog(PANIC, "xact_redo: unknown op code %u", info);
    }
    
    static void
    xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
    {
    	int			i;
    	TransactionId *subxacts;
    
    	subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
    
    	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
    
    	if (xlrec->nrels > 0)
    	{
    		appendStringInfo(buf, "; rels:");
    		for (i = 0; i < xlrec->nrels; i++)
    		{
    			char	   *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
    
    			appendStringInfo(buf, " %s", path);
    			pfree(path);
    		}
    	}
    	if (xlrec->nsubxacts > 0)
    	{
    		appendStringInfo(buf, "; subxacts:");
    		for (i = 0; i < xlrec->nsubxacts; i++)
    			appendStringInfo(buf, " %u", subxacts[i]);
    	}
    	if (xlrec->nmsgs > 0)
    	{
    		SharedInvalidationMessage *msgs;
    
    		msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
    
    		if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
    			appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
    							 xlrec->dbId, xlrec->tsId);
    
    		appendStringInfo(buf, "; inval msgs:");
    		for (i = 0; i < xlrec->nmsgs; i++)
    		{
    			SharedInvalidationMessage *msg = &msgs[i];
    
    			if (msg->id >= 0)
    				appendStringInfo(buf, " catcache %d", msg->id);
    			else if (msg->id == SHAREDINVALCATALOG_ID)
    				appendStringInfo(buf, " catalog %u", msg->cat.catId);
    			else if (msg->id == SHAREDINVALRELCACHE_ID)
    				appendStringInfo(buf, " relcache %u", msg->rc.relId);
    			/* remaining cases not expected, but print something anyway */
    			else if (msg->id == SHAREDINVALSMGR_ID)
    				appendStringInfo(buf, " smgr");
    			else if (msg->id == SHAREDINVALRELMAP_ID)
    				appendStringInfo(buf, " relmap");
    			else
    				appendStringInfo(buf, " unknown id %d", msg->id);
    		}
    	}
    }
    
    static void
    xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
    {
    	int			i;
    
    	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
    
    	if (xlrec->nsubxacts > 0)
    	{
    		appendStringInfo(buf, "; subxacts:");
    		for (i = 0; i < xlrec->nsubxacts; i++)
    			appendStringInfo(buf, " %u", xlrec->subxacts[i]);
    	}
    }
    
    static void
    xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
    {
    	int			i;
    
    	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
    	if (xlrec->nrels > 0)
    	{
    		appendStringInfo(buf, "; rels:");
    		for (i = 0; i < xlrec->nrels; i++)
    		{
    			char	   *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
    
    			appendStringInfo(buf, " %s", path);
    			pfree(path);
    		}
    	}
    	if (xlrec->nsubxacts > 0)
    	{
    		TransactionId *xacts = (TransactionId *)
    		&xlrec->xnodes[xlrec->nrels];
    
    		appendStringInfo(buf, "; subxacts:");
    		for (i = 0; i < xlrec->nsubxacts; i++)
    			appendStringInfo(buf, " %u", xacts[i]);
    	}
    }
    
    static void
    xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec)
    {
    	int			i;
    
    	appendStringInfo(buf, "subxacts:");
    
    	for (i = 0; i < xlrec->nsubxacts; i++)
    		appendStringInfo(buf, " %u", xlrec->xsub[i]);
    }
    
    void
    xact_desc(StringInfo buf, uint8 xl_info, char *rec)
    {
    	uint8		info = xl_info & ~XLR_INFO_MASK;
    
    	if (info == XLOG_XACT_COMMIT_COMPACT)
    	{
    		xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
    
    		appendStringInfo(buf, "commit: ");
    		xact_desc_commit_compact(buf, xlrec);
    	}
    	else if (info == XLOG_XACT_COMMIT)
    	{
    		xl_xact_commit *xlrec = (xl_xact_commit *) rec;
    
    		appendStringInfo(buf, "commit: ");
    		xact_desc_commit(buf, xlrec);
    	}
    	else if (info == XLOG_XACT_ABORT)
    	{
    		xl_xact_abort *xlrec = (xl_xact_abort *) rec;
    
    		appendStringInfo(buf, "abort: ");
    		xact_desc_abort(buf, xlrec);
    	}
    	else if (info == XLOG_XACT_PREPARE)
    	{
    		appendStringInfo(buf, "prepare");
    	}
    	else if (info == XLOG_XACT_COMMIT_PREPARED)
    	{
    		xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
    
    		appendStringInfo(buf, "commit prepared %u: ", xlrec->xid);
    		xact_desc_commit(buf, &xlrec->crec);
    	}
    	else if (info == XLOG_XACT_ABORT_PREPARED)
    	{
    		xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
    
    		appendStringInfo(buf, "abort prepared %u: ", xlrec->xid);
    		xact_desc_abort(buf, &xlrec->arec);
    	}
    	else if (info == XLOG_XACT_ASSIGNMENT)
    	{
    		xl_xact_assignment *xlrec = (xl_xact_assignment *) rec;
    
    		/*
    		 * Note that we ignore the WAL record's xid, since we're more
    		 * interested in the top-level xid that issued the record and which
    		 * xids are being reported here.
    		 */
    		appendStringInfo(buf, "xid assignment xtop %u: ", xlrec->xtop);
    		xact_desc_assignment(buf, xlrec);
    	}
    	else
    		appendStringInfo(buf, "UNKNOWN");
    }