Skip to content
Snippets Groups Projects
Select Git revision
  • benchmark-tools
  • postgres-lambda
  • master default
  • REL9_4_25
  • REL9_5_20
  • REL9_6_16
  • REL_10_11
  • REL_11_6
  • REL_12_1
  • REL_12_0
  • REL_12_RC1
  • REL_12_BETA4
  • REL9_4_24
  • REL9_5_19
  • REL9_6_15
  • REL_10_10
  • REL_11_5
  • REL_12_BETA3
  • REL9_4_23
  • REL9_5_18
  • REL9_6_14
  • REL_10_9
  • REL_11_4
23 results

procarray.c

Blame
  • procarray.c 30.61 KiB
    /*-------------------------------------------------------------------------
     *
     * procarray.c
     *	  POSTGRES process array code.
     *
     *
     * This module maintains an unsorted array of the PGPROC structures for all
     * active backends.  Although there are several uses for this, the principal
     * one is as a means of determining the set of currently running transactions.
     *
     * Because of various subtle race conditions it is critical that a backend
     * hold the correct locks while setting or clearing its MyProc->xid field.
     * See notes in GetSnapshotData.
     *
     * The process array now also includes PGPROC structures representing
     * prepared transactions.  The xid and subxids fields of these are valid,
     * as are the myProcLocks lists.  They can be distinguished from regular
     * backend PGPROCs at need by checking for pid == 0.
     *
     *
     * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
     * Portions Copyright (c) 1994, Regents of the University of California
     *
     *
     * IDENTIFICATION
     *	  $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.28 2007/07/01 02:22:23 tgl Exp $
     *
     *-------------------------------------------------------------------------
     */
    #include "postgres.h"
    
    #include <signal.h>
    
    #include "access/subtrans.h"
    #include "access/transam.h"
    #include "access/xact.h"
    #include "access/twophase.h"
    #include "miscadmin.h"
    #include "storage/procarray.h"
    #include "utils/tqual.h"
    
    
    /* Our shared memory area */
    typedef struct ProcArrayStruct
    {
    	int			numProcs;		/* number of valid procs entries */
    	int			maxProcs;		/* allocated size of procs array */
    
    	/*
    	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
    	 * actually it is maxProcs entries long.
    	 */
    	PGPROC	   *procs[1];		/* VARIABLE LENGTH ARRAY */
    } ProcArrayStruct;
    
    static ProcArrayStruct *procArray;
    
    
    #ifdef XIDCACHE_DEBUG
    
    /* counters for XidCache measurement */
    static long xc_by_recent_xmin = 0;
    static long xc_by_main_xid = 0;
    static long xc_by_child_xid = 0;
    static long xc_slow_answer = 0;
    
    #define xc_by_recent_xmin_inc()		(xc_by_recent_xmin++)
    #define xc_by_main_xid_inc()		(xc_by_main_xid++)
    #define xc_by_child_xid_inc()		(xc_by_child_xid++)
    #define xc_slow_answer_inc()		(xc_slow_answer++)
    
    static void DisplayXidCache(void);
    #else							/* !XIDCACHE_DEBUG */
    
    #define xc_by_recent_xmin_inc()		((void) 0)
    #define xc_by_main_xid_inc()		((void) 0)
    #define xc_by_child_xid_inc()		((void) 0)
    #define xc_slow_answer_inc()		((void) 0)
    #endif   /* XIDCACHE_DEBUG */
    
    
    /*
     * Report shared-memory space needed by CreateSharedProcArray.
     */
    Size
    ProcArrayShmemSize(void)
    {
    	Size		size;
    
    	size = offsetof(ProcArrayStruct, procs);
    	size = add_size(size, mul_size(sizeof(PGPROC *),
    								 add_size(MaxBackends, max_prepared_xacts)));
    
    	return size;
    }
    
    /*
     * Initialize the shared PGPROC array during postmaster startup.
     */
    void
    CreateSharedProcArray(void)
    {
    	bool		found;
    
    	/* Create or attach to the ProcArray shared structure */
    	procArray = (ProcArrayStruct *)
    		ShmemInitStruct("Proc Array", ProcArrayShmemSize(), &found);
    
    	if (!found)
    	{
    		/*
    		 * We're the first - initialize.
    		 */
    		procArray->numProcs = 0;
    		procArray->maxProcs = MaxBackends + max_prepared_xacts;
    	}
    }
    
    /*
     * Add the specified PGPROC to the shared array.
     */
    void
    ProcArrayAdd(PGPROC *proc)
    {
    	ProcArrayStruct *arrayP = procArray;
    
    	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    
    	if (arrayP->numProcs >= arrayP->maxProcs)
    	{
    		/*
    		 * Ooops, no room.	(This really shouldn't happen, since there is a
    		 * fixed supply of PGPROC structs too, and so we should have failed
    		 * earlier.)
    		 */
    		LWLockRelease(ProcArrayLock);
    		ereport(FATAL,
    				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
    				 errmsg("sorry, too many clients already")));
    	}
    
    	arrayP->procs[arrayP->numProcs] = proc;
    	arrayP->numProcs++;
    
    	LWLockRelease(ProcArrayLock);
    }
    
    /*
     * Remove the specified PGPROC from the shared array.
     */
    void
    ProcArrayRemove(PGPROC *proc)
    {
    	ProcArrayStruct *arrayP = procArray;
    	int			index;
    
    #ifdef XIDCACHE_DEBUG
    	/* dump stats at backend shutdown, but not prepared-xact end */
    	if (proc->pid != 0)
    		DisplayXidCache();
    #endif
    
    	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    
    	for (index = 0; index < arrayP->numProcs; index++)
    	{
    		if (arrayP->procs[index] == proc)
    		{
    			arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1];
    			arrayP->numProcs--;
    			LWLockRelease(ProcArrayLock);
    			return;
    		}
    	}
    
    	/* Ooops */
    	LWLockRelease(ProcArrayLock);
    
    	elog(LOG, "failed to find proc %p in ProcArray", proc);
    }
    
    
    /*
     * TransactionIdIsInProgress -- is given transaction running in some backend
     *
     * There are three possibilities for finding a running transaction:
     *
     * 1. the given Xid is a main transaction Id.  We will find this out cheaply
     * by looking at the PGPROC struct for each backend.
     *
     * 2. the given Xid is one of the cached subxact Xids in the PGPROC array.
     * We can find this out cheaply too.
     *
     * 3. Search the SubTrans tree to find the Xid's topmost parent, and then
     * see if that is running according to PGPROC.	This is the slowest, but
     * sadly it has to be done always if the other two failed, unless we see
     * that the cached subxact sets are complete (none have overflowed).
     *
     * ProcArrayLock has to be held while we do 1 and 2.  If we save the top Xids
     * while doing 1, we can release the ProcArrayLock while we do 3.  This buys
     * back some concurrency (we can't retrieve the main Xids from PGPROC again
     * anyway; see GetNewTransactionId).
     */
    bool
    TransactionIdIsInProgress(TransactionId xid)
    {
    	bool		result = false;
    	ProcArrayStruct *arrayP = procArray;
    	int			i,
    				j;
    	int			nxids = 0;
    	TransactionId *xids;
    	TransactionId topxid;
    	bool		locked;
    
    	/*
    	 * Don't bother checking a transaction older than RecentXmin; it could not
    	 * possibly still be running.  (Note: in particular, this guarantees
    	 * that we reject InvalidTransactionId, FrozenTransactionId, etc as
    	 * not running.)
    	 */
    	if (TransactionIdPrecedes(xid, RecentXmin))
    	{
    		xc_by_recent_xmin_inc();
    		return false;
    	}
    
    	/* Get workspace to remember main XIDs in */
    	xids = (TransactionId *) palloc(sizeof(TransactionId) * arrayP->maxProcs);
    
    	LWLockAcquire(ProcArrayLock, LW_SHARED);
    	locked = true;
    
    	for (i = 0; i < arrayP->numProcs; i++)
    	{
    		PGPROC	   *proc = arrayP->procs[i];
    
    		/* Fetch xid just once - see GetNewTransactionId */
    		TransactionId pxid = proc->xid;
    
    		if (!TransactionIdIsValid(pxid))
    			continue;
    
    		/*
    		 * Step 1: check the main Xid
    		 */
    		if (TransactionIdEquals(pxid, xid))
    		{
    			xc_by_main_xid_inc();
    			result = true;
    			goto result_known;
    		}
    
    		/*
    		 * We can ignore main Xids that are younger than the target Xid, since
    		 * the target could not possibly be their child.
    		 */
    		if (TransactionIdPrecedes(xid, pxid))
    			continue;
    
    		/*
    		 * Step 2: check the cached child-Xids arrays
    		 */
    		for (j = proc->subxids.nxids - 1; j >= 0; j--)
    		{
    			/* Fetch xid just once - see GetNewTransactionId */
    			TransactionId cxid = proc->subxids.xids[j];
    
    			if (TransactionIdEquals(cxid, xid))
    			{
    				xc_by_child_xid_inc();
    				result = true;
    				goto result_known;
    			}
    		}
    
    		/*
    		 * Save the main Xid for step 3.  We only need to remember main Xids
    		 * that have uncached children.  (Note: there is no race condition
    		 * here because the overflowed flag cannot be cleared, only set, while
    		 * we hold ProcArrayLock.  So we can't miss an Xid that we need to
    		 * worry about.)
    		 */
    		if (proc->subxids.overflowed)
    			xids[nxids++] = pxid;
    	}
    
    	LWLockRelease(ProcArrayLock);
    	locked = false;
    
    	/*
    	 * If none of the relevant caches overflowed, we know the Xid is not
    	 * running without looking at pg_subtrans.
    	 */
    	if (nxids == 0)
    		goto result_known;
    
    	/*
    	 * Step 3: have to check pg_subtrans.
    	 *
    	 * At this point, we know it's either a subtransaction of one of the Xids
    	 * in xids[], or it's not running.  If it's an already-failed
    	 * subtransaction, we want to say "not running" even though its parent may
    	 * still be running.  So first, check pg_clog to see if it's been aborted.
    	 */
    	xc_slow_answer_inc();
    
    	if (TransactionIdDidAbort(xid))
    		goto result_known;
    
    	/*
    	 * It isn't aborted, so check whether the transaction tree it belongs to
    	 * is still running (or, more precisely, whether it was running when this
    	 * routine started -- note that we already released ProcArrayLock).
    	 */
    	topxid = SubTransGetTopmostTransaction(xid);
    	Assert(TransactionIdIsValid(topxid));
    	if (!TransactionIdEquals(topxid, xid))
    	{
    		for (i = 0; i < nxids; i++)
    		{
    			if (TransactionIdEquals(xids[i], topxid))
    			{
    				result = true;
    				break;
    			}
    		}
    	}
    
    result_known:
    	if (locked)
    		LWLockRelease(ProcArrayLock);
    
    	pfree(xids);
    
    	return result;
    }
    
    /*
     * TransactionIdIsActive -- is xid the top-level XID of an active backend?
     *
     * This differs from TransactionIdIsInProgress in that it ignores prepared
     * transactions.  Also, we ignore subtransactions since that's not needed
     * for current uses.
     */
    bool
    TransactionIdIsActive(TransactionId xid)
    {
    	bool		result = false;
    	ProcArrayStruct *arrayP = procArray;
    	int			i;
    
    	/*
    	 * Don't bother checking a transaction older than RecentXmin; it could not
    	 * possibly still be running.
    	 */
    	if (TransactionIdPrecedes(xid, RecentXmin))
    		return false;
    
    	LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    	for (i = 0; i < arrayP->numProcs; i++)
    	{
    		PGPROC	   *proc = arrayP->procs[i];
    
    		/* Fetch xid just once - see GetNewTransactionId */
    		TransactionId pxid = proc->xid;
    
    		if (!TransactionIdIsValid(pxid))
    			continue;
    
    		if (proc->pid == 0)
    			continue;			/* ignore prepared transactions */
    
    		if (TransactionIdEquals(pxid, xid))
    		{
    			result = true;
    			break;
    		}
    	}
    
    	LWLockRelease(ProcArrayLock);
    
    	return result;
    }
    
    
    /*
     * GetOldestXmin -- returns oldest transaction that was running
     *					when any current transaction was started.
     *
     * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
     * then only backends running in my own database are considered.
     *
     * If ignoreVacuum is TRUE then backends with inVacuum set are ignored.
     *
     * This is used by VACUUM to decide which deleted tuples must be preserved
     * in a table.	allDbs = TRUE is needed for shared relations, but allDbs =
     * FALSE is sufficient for non-shared relations, since only backends in my
     * own database could ever see the tuples in them.	Also, we can ignore
     * concurrently running lazy VACUUMs because (a) they must be working on other
     * tables, and (b) they don't need to do snapshot-based lookups.
     *
     * This is also used to determine where to truncate pg_subtrans.  allDbs
     * must be TRUE for that case, and ignoreVacuum FALSE.
     *
     * Note: we include the currently running xids in the set of considered xids.
     * This ensures that if a just-started xact has not yet set its snapshot,
     * when it does set the snapshot it cannot set xmin less than what we compute.
     */
    TransactionId
    GetOldestXmin(bool allDbs, bool ignoreVacuum)
    {
    	ProcArrayStruct *arrayP = procArray;
    	TransactionId result;
    	int			index;
    
    	/*
    	 * Normally we start the min() calculation with our own XID.  But if
    	 * called by checkpointer, we will not be inside a transaction, so use
    	 * next XID as starting point for min() calculation.  (Note that if there
    	 * are no xacts running at all, that will be the subtrans truncation
    	 * point!)
    	 */
    	result = GetTopTransactionId();
    	if (!TransactionIdIsValid(result))
    		result = ReadNewTransactionId();
    
    	LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    	for (index = 0; index < arrayP->numProcs; index++)
    	{
    		PGPROC	   *proc = arrayP->procs[index];
    
    		if (ignoreVacuum && proc->inVacuum)
    			continue;
    
    		if (allDbs || proc->databaseId == MyDatabaseId)
    		{
    			/* Fetch xid just once - see GetNewTransactionId */
    			TransactionId xid = proc->xid;
    
    			if (TransactionIdIsNormal(xid))
    			{
    				/* First consider the transaction own's Xid */
    				if (TransactionIdPrecedes(xid, result))
    					result = xid;
    
    				/*
    				 * Also consider the transaction's Xmin, if set.
    				 *
    				 * We must check both Xid and Xmin because there is a window
    				 * where an xact's Xid is set but Xmin isn't yet.
    				 */
    				xid = proc->xmin;
    				if (TransactionIdIsNormal(xid))
    					if (TransactionIdPrecedes(xid, result))
    						result = xid;
    			}
    		}
    	}
    
    	LWLockRelease(ProcArrayLock);
    
    	return result;
    }
    
    /*----------
     * GetSnapshotData -- returns information about running transactions.
     *
     * The returned snapshot includes xmin (lowest still-running xact ID),
     * xmax (next xact ID to be assigned), and a list of running xact IDs
     * in the range xmin <= xid < xmax.  It is used as follows:
     *		All xact IDs < xmin are considered finished.
     *		All xact IDs >= xmax are considered still running.
     *		For an xact ID xmin <= xid < xmax, consult list to see whether
     *		it is considered running or not.
     * This ensures that the set of transactions seen as "running" by the
     * current xact will not change after it takes the snapshot.
     *
     * All running top-level XIDs are included in the snapshot.  We also try
     * to include running subtransaction XIDs, but since PGPROC has only a
     * limited cache area for subxact XIDs, full information may not be
     * available.  If we find any overflowed subxid arrays, we have to mark
     * the snapshot's subxid data as overflowed, and extra work will need to
     * be done to determine what's running (see XidInMVCCSnapshot() in tqual.c).
     *
     * We also update the following backend-global variables:
     *		TransactionXmin: the oldest xmin of any snapshot in use in the
     *			current transaction (this is the same as MyProc->xmin).  This
     *			is just the xmin computed for the first, serializable snapshot.
     *		RecentXmin: the xmin computed for the most recent snapshot.  XIDs
     *			older than this are known not running any more.
     *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
     *			running transactions, except those running LAZY VACUUM).  This is
     *			the same computation done by GetOldestXmin(true, true).
     *----------
     */
    Snapshot
    GetSnapshotData(Snapshot snapshot, bool serializable)
    {
    	ProcArrayStruct *arrayP = procArray;
    	TransactionId xmin;
    	TransactionId xmax;
    	TransactionId globalxmin;
    	int			index;
    	int			count = 0;
    	int			subcount = 0;
    
    	Assert(snapshot != NULL);
    
    	/* Serializable snapshot must be computed before any other... */
    	Assert(serializable ?
    		   !TransactionIdIsValid(MyProc->xmin) :
    		   TransactionIdIsValid(MyProc->xmin));
    
    	/*
    	 * Allocating space for maxProcs xids is usually overkill; numProcs would
    	 * be sufficient.  But it seems better to do the malloc while not holding
    	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
    	 * more subxip storage than is probably needed.
    	 *
    	 * This does open a possibility for avoiding repeated malloc/free: since
    	 * maxProcs does not change at runtime, we can simply reuse the previous
    	 * xip arrays if any.  (This relies on the fact that all callers pass
    	 * static SnapshotData structs.)
    	 */
    	if (snapshot->xip == NULL)
    	{
    		/*
    		 * First call for this snapshot
    		 */
    		snapshot->xip = (TransactionId *)
    			malloc(arrayP->maxProcs * sizeof(TransactionId));
    		if (snapshot->xip == NULL)
    			ereport(ERROR,
    					(errcode(ERRCODE_OUT_OF_MEMORY),
    					 errmsg("out of memory")));
    		Assert(snapshot->subxip == NULL);
    		snapshot->subxip = (TransactionId *)
    			malloc(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
    		if (snapshot->subxip == NULL)
    			ereport(ERROR,
    					(errcode(ERRCODE_OUT_OF_MEMORY),
    					 errmsg("out of memory")));
    	}
    
    	globalxmin = xmin = GetTopTransactionId();
    
    	/*
    	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
    	 * computing a serializable snapshot and therefore will be setting
    	 * MyProc->xmin.  This is because any two backends that have overlapping
    	 * shared holds on ProcArrayLock will certainly compute the same xmin
    	 * (since no xact, in particular not the oldest, can exit the set of
    	 * running transactions while we hold ProcArrayLock --- see further
    	 * discussion just below).	So it doesn't matter whether another backend
    	 * concurrently doing GetSnapshotData or GetOldestXmin sees our xmin as
    	 * set or not; he'd compute the same xmin for himself either way.
    	 */
    	LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    	/*--------------------
    	 * Unfortunately, we have to call ReadNewTransactionId() after acquiring
    	 * ProcArrayLock above.  It's not good because ReadNewTransactionId() does
    	 * LWLockAcquire(XidGenLock), but *necessary*.	We need to be sure that
    	 * no transactions exit the set of currently-running transactions
    	 * between the time we fetch xmax and the time we finish building our
    	 * snapshot.  Otherwise we could have a situation like this:
    	 *
    	 *		1. Tx Old is running (in Read Committed mode).
    	 *		2. Tx S reads new transaction ID into xmax, then
    	 *		   is swapped out before acquiring ProcArrayLock.
    	 *		3. Tx New gets new transaction ID (>= S' xmax),
    	 *		   makes changes and commits.
    	 *		4. Tx Old changes some row R changed by Tx New and commits.
    	 *		5. Tx S finishes getting its snapshot data.  It sees Tx Old as
    	 *		   done, but sees Tx New as still running (since New >= xmax).
    	 *
    	 * Now S will see R changed by both Tx Old and Tx New, *but* does not
    	 * see other changes made by Tx New.  If S is supposed to be in
    	 * Serializable mode, this is wrong.
    	 *
    	 * By locking ProcArrayLock before we read xmax, we ensure that TX Old
    	 * cannot exit the set of running transactions seen by Tx S.  Therefore
    	 * both Old and New will be seen as still running => no inconsistency.
    	 *--------------------
    	 */
    
    	xmax = ReadNewTransactionId();
    
    	/*
    	 * Spin over procArray checking xid, xmin, and subxids.  The goal is
    	 * to gather all active xids, find the lowest xmin, and try to record
    	 * subxids.
    	 */
    	for (index = 0; index < arrayP->numProcs; index++)
    	{
    		PGPROC	   *proc = arrayP->procs[index];
    
    		/* Fetch xid just once - see GetNewTransactionId */
    		TransactionId xid = proc->xid;
    
    		/*
    		 * Ignore my own proc (dealt with my xid above), procs not running a
    		 * transaction, xacts started since we read the next transaction ID,
    		 * and xacts executing LAZY VACUUM. There's no need to store XIDs
    		 * above what we got from ReadNewTransactionId, since we'll treat them
    		 * as running anyway.  We also assume that such xacts can't compute an
    		 * xmin older than ours, so they needn't be considered in computing
    		 * globalxmin.
    		 */
    		if (proc == MyProc ||
    			!TransactionIdIsNormal(xid) ||
    			TransactionIdFollowsOrEquals(xid, xmax) ||
    			proc->inVacuum)
    			continue;
    
    		if (TransactionIdPrecedes(xid, xmin))
    			xmin = xid;
    		snapshot->xip[count++] = xid;
    
    		/* Update globalxmin to be the smallest valid xmin */
    		xid = proc->xmin;
    		if (TransactionIdIsNormal(xid))
    			if (TransactionIdPrecedes(xid, globalxmin))
    				globalxmin = xid;
    
    		/*
    		 * Save subtransaction XIDs if possible (if we've already overflowed,
    		 * there's no point).  Note that the subxact XIDs must be later than
    		 * their parent, so no need to check them against xmin.
    		 *
    		 * The other backend can add more subxids concurrently, but cannot
    		 * remove any.	Hence it's important to fetch nxids just once. Should
    		 * be safe to use memcpy, though.  (We needn't worry about missing any
    		 * xids added concurrently, because they must postdate xmax.)
    		 */
    		if (subcount >= 0)
    		{
    			if (proc->subxids.overflowed)
    				subcount = -1;	/* overflowed */
    			else
    			{
    				int			nxids = proc->subxids.nxids;
    
    				if (nxids > 0)
    				{
    					memcpy(snapshot->subxip + subcount,
    						   proc->subxids.xids,
    						   nxids * sizeof(TransactionId));
    					subcount += nxids;
    				}
    			}
    		}
    	}
    
    	if (serializable)
    		MyProc->xmin = TransactionXmin = xmin;
    
    	LWLockRelease(ProcArrayLock);
    
    	/*
    	 * Update globalxmin to include actual process xids.  This is a slightly
    	 * different way of computing it than GetOldestXmin uses, but should give
    	 * the same result.
    	 */
    	if (TransactionIdPrecedes(xmin, globalxmin))
    		globalxmin = xmin;
    
    	/* Update global variables too */
    	RecentGlobalXmin = globalxmin;
    	RecentXmin = xmin;
    
    	snapshot->xmin = xmin;
    	snapshot->xmax = xmax;
    	snapshot->xcnt = count;
    	snapshot->subxcnt = subcount;
    
    	snapshot->curcid = GetCurrentCommandId();
    
    	return snapshot;
    }
    
    /*
     * GetTransactionsInCommit -- Get the XIDs of transactions that are committing
     *
     * Constructs an array of XIDs of transactions that are currently in commit
     * critical sections, as shown by having inCommit set in their PGPROC entries.
     *
     * *xids_p is set to a palloc'd array that should be freed by the caller.
     * The return value is the number of valid entries.
     *
     * Note that because backends set or clear inCommit without holding any lock,
     * the result is somewhat indeterminate, but we don't really care.  Even in
     * a multiprocessor with delayed writes to shared memory, it should be certain
     * that setting of inCommit will propagate to shared memory when the backend
     * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if
     * it's already inserted its commit record.  Whether it takes a little while
     * for clearing of inCommit to propagate is unimportant for correctness.
     */
    int
    GetTransactionsInCommit(TransactionId **xids_p)
    {
    	ProcArrayStruct *arrayP = procArray;
    	TransactionId *xids;
    	int	nxids;
    	int	index;
    
    	xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId));
    	nxids = 0;
    
    	LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    	for (index = 0; index < arrayP->numProcs; index++)
    	{
    		PGPROC	   *proc = arrayP->procs[index];
    		/* Fetch xid just once - see GetNewTransactionId */
    		TransactionId pxid = proc->xid;
    
    		if (proc->inCommit && TransactionIdIsValid(pxid))
    			xids[nxids++] = pxid;
    	}
    
    	LWLockRelease(ProcArrayLock);
    
    	*xids_p = xids;
    	return nxids;
    }
    
    /*
     * HaveTransactionsInCommit -- Are any of the specified XIDs in commit?
     *
     * This is used with the results of GetTransactionsInCommit to see if any
     * of the specified XIDs are still in their commit critical sections.
     *
     * Note: this is O(N^2) in the number of xacts that are/were in commit, but
     * those numbers should be small enough for it not to be a problem.
     */
    bool
    HaveTransactionsInCommit(TransactionId *xids, int nxids)
    {
    	bool result = false;
    	ProcArrayStruct *arrayP = procArray;
    	int	index;
    
    	LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    	for (index = 0; index < arrayP->numProcs; index++)
    	{
    		PGPROC	   *proc = arrayP->procs[index];
    		/* Fetch xid just once - see GetNewTransactionId */
    		TransactionId pxid = proc->xid;
    
    		if (proc->inCommit && TransactionIdIsValid(pxid))
    		{
    			int		i;
    
    			for (i = 0; i < nxids; i++)
    			{
    				if (xids[i] == pxid)
    				{
    					result = true;
    					break;
    				}
    			}
    			if (result)
    				break;
    		}
    	}
    
    	LWLockRelease(ProcArrayLock);
    
    	return result;
    }
    
    /*
     * BackendPidGetProc -- get a backend's PGPROC given its PID
     *
     * Returns NULL if not found.  Note that it is up to the caller to be
     * sure that the question remains meaningful for long enough for the
     * answer to be used ...
     */
    PGPROC *
    BackendPidGetProc(int pid)
    {
    	PGPROC	   *result = NULL;
    	ProcArrayStruct *arrayP = procArray;
    	int			index;
    
    	if (pid == 0)				/* never match dummy PGPROCs */
    		return NULL;
    
    	LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    	for (index = 0; index < arrayP->numProcs; index++)
    	{
    		PGPROC	   *proc = arrayP->procs[index];
    
    		if (proc->pid == pid)
    		{
    			result = proc;
    			break;
    		}
    	}
    
    	LWLockRelease(ProcArrayLock);
    
    	return result;
    }
    
    /*
     * BackendXidGetPid -- get a backend's pid given its XID
     *
     * Returns 0 if not found or it's a prepared transaction.  Note that
     * it is up to the caller to be sure that the question remains
     * meaningful for long enough for the answer to be used ...
     *
     * Only main transaction Ids are considered.  This function is mainly
     * useful for determining what backend owns a lock.
     */
    int
    BackendXidGetPid(TransactionId xid)
    {
    	int			result = 0;
    	ProcArrayStruct *arrayP = procArray;
    	int			index;
    
    	if (xid == InvalidTransactionId)	/* never match invalid xid */
    		return 0;
    
    	LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    	for (index = 0; index < arrayP->numProcs; index++)
    	{
    		PGPROC	   *proc = arrayP->procs[index];
    
    		if (proc->xid == xid)
    		{
    			result = proc->pid;
    			break;
    		}
    	}
    
    	LWLockRelease(ProcArrayLock);
    
    	return result;
    }
    
    /*
     * IsBackendPid -- is a given pid a running backend
     */
    bool
    IsBackendPid(int pid)
    {
    	return (BackendPidGetProc(pid) != NULL);
    }
    
    /*
     * CountActiveBackends --- count backends (other than myself) that are in
     *		active transactions.  This is used as a heuristic to decide if
     *		a pre-XLOG-flush delay is worthwhile during commit.
     *
     * Do not count backends that are blocked waiting for locks, since they are
     * not going to get to run until someone else commits.
     */
    int
    CountActiveBackends(void)
    {
    	ProcArrayStruct *arrayP = procArray;
    	int			count = 0;
    	int			index;
    
    	/*
    	 * Note: for speed, we don't acquire ProcArrayLock.  This is a little bit
    	 * bogus, but since we are only testing fields for zero or nonzero, it
    	 * should be OK.  The result is only used for heuristic purposes anyway...
    	 */
    	for (index = 0; index < arrayP->numProcs; index++)
    	{
    		PGPROC	   *proc = arrayP->procs[index];
    
    		if (proc == MyProc)
    			continue;			/* do not count myself */
    		if (proc->pid == 0)
    			continue;			/* do not count prepared xacts */
    		if (proc->xid == InvalidTransactionId)
    			continue;			/* do not count if not in a transaction */
    		if (proc->waitLock != NULL)
    			continue;			/* do not count if blocked on a lock */
    		count++;
    	}
    
    	return count;
    }
    
    /*
     * CountDBBackends --- count backends that are using specified database
     */
    int
    CountDBBackends(Oid databaseid)
    {
    	ProcArrayStruct *arrayP = procArray;
    	int			count = 0;
    	int			index;
    
    	LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    	for (index = 0; index < arrayP->numProcs; index++)
    	{
    		PGPROC	   *proc = arrayP->procs[index];
    
    		if (proc->pid == 0)
    			continue;			/* do not count prepared xacts */
    		if (proc->databaseId == databaseid)
    			count++;
    	}
    
    	LWLockRelease(ProcArrayLock);
    
    	return count;
    }
    
    /*
     * CountUserBackends --- count backends that are used by specified user
     */
    int
    CountUserBackends(Oid roleid)
    {
    	ProcArrayStruct *arrayP = procArray;
    	int			count = 0;
    	int			index;
    
    	LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    	for (index = 0; index < arrayP->numProcs; index++)
    	{
    		PGPROC	   *proc = arrayP->procs[index];
    
    		if (proc->pid == 0)
    			continue;			/* do not count prepared xacts */
    		if (proc->roleId == roleid)
    			count++;
    	}
    
    	LWLockRelease(ProcArrayLock);
    
    	return count;
    }
    
    /*
     * CheckOtherDBBackends -- check for other backends running in the given DB
     *
     * If there are other backends in the DB, we will wait a maximum of 5 seconds
     * for them to exit.  Autovacuum backends are encouraged to exit early by
     * sending them SIGTERM, but normal user backends are just waited for.
     *
     * The current backend is always ignored; it is caller's responsibility to
     * check whether the current backend uses the given DB, if it's important.
     *
     * Returns TRUE if there are (still) other backends in the DB, FALSE if not.
     *
     * This function is used to interlock DROP DATABASE and related commands
     * against there being any active backends in the target DB --- dropping the
     * DB while active backends remain would be a Bad Thing.  Note that we cannot
     * detect here the possibility of a newly-started backend that is trying to
     * connect to the doomed database, so additional interlocking is needed during
     * backend startup.  The caller should normally hold an exclusive lock on the
     * target DB before calling this, which is one reason we mustn't wait
     * indefinitely.
     */
    bool
    CheckOtherDBBackends(Oid databaseId)
    {
    	ProcArrayStruct *arrayP = procArray;
    	int			tries;
    
    	/* 50 tries with 100ms sleep between tries makes 5 sec total wait */
    	for (tries = 0; tries < 50; tries++)
    	{
    		bool		found = false;
    		int			index;
    
    		CHECK_FOR_INTERRUPTS();
    
    		LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    		for (index = 0; index < arrayP->numProcs; index++)
    		{
    			PGPROC	   *proc = arrayP->procs[index];
    
    			if (proc->databaseId != databaseId)
    				continue;
    			if (proc == MyProc)
    				continue;
    
    			found = true;
    
    			if (proc->isAutovacuum)
    			{
    				/* an autovacuum --- send it SIGTERM before sleeping */
    				int		autopid = proc->pid;
    
    				/*
    				 * It's a bit awkward to release ProcArrayLock within the loop,
    				 * but we'd probably better do so before issuing kill().  We
    				 * have no idea what might block kill() inside the kernel...
    				 */
    				LWLockRelease(ProcArrayLock);
    
    				(void) kill(autopid, SIGTERM);		/* ignore any error */
    
    				break;
    			}
    			else
    			{
    				LWLockRelease(ProcArrayLock);
    				break;
    			}
    		}
    
    		/* if found is set, we released the lock within the loop body */
    		if (!found)
    		{
    			LWLockRelease(ProcArrayLock);
    			return false;				/* no conflicting backends, so done */
    		}
    
    		/* else sleep and try again */
    		pg_usleep(100 * 1000L);			/* 100ms */
    	}
    
    	return true;						/* timed out, still conflicts */
    }
    
    
    #define XidCacheRemove(i) \
    	do { \
    		MyProc->subxids.xids[i] = MyProc->subxids.xids[MyProc->subxids.nxids - 1]; \
    		MyProc->subxids.nxids--; \
    	} while (0)
    
    /*
     * XidCacheRemoveRunningXids
     *
     * Remove a bunch of TransactionIds from the list of known-running
     * subtransactions for my backend.	Both the specified xid and those in
     * the xids[] array (of length nxids) are removed from the subxids cache.
     */
    void
    XidCacheRemoveRunningXids(TransactionId xid, int nxids, TransactionId *xids)
    {
    	int			i,
    				j;
    
    	Assert(TransactionIdIsValid(xid));
    
    	/*
    	 * We must hold ProcArrayLock exclusively in order to remove transactions
    	 * from the PGPROC array.  (See notes in GetSnapshotData.)	It's possible
    	 * this could be relaxed since we know this routine is only used to abort
    	 * subtransactions, but pending closer analysis we'd best be conservative.
    	 */
    	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    
    	/*
    	 * Under normal circumstances xid and xids[] will be in increasing order,
    	 * as will be the entries in subxids.  Scan backwards to avoid O(N^2)
    	 * behavior when removing a lot of xids.
    	 */
    	for (i = nxids - 1; i >= 0; i--)
    	{
    		TransactionId anxid = xids[i];
    
    		for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
    		{
    			if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
    			{
    				XidCacheRemove(j);
    				break;
    			}
    		}
    
    		/*
    		 * Ordinarily we should have found it, unless the cache has
    		 * overflowed. However it's also possible for this routine to be
    		 * invoked multiple times for the same subtransaction, in case of an
    		 * error during AbortSubTransaction.  So instead of Assert, emit a
    		 * debug warning.
    		 */
    		if (j < 0 && !MyProc->subxids.overflowed)
    			elog(WARNING, "did not find subXID %u in MyProc", anxid);
    	}
    
    	for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
    	{
    		if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
    		{
    			XidCacheRemove(j);
    			break;
    		}
    	}
    	/* Ordinarily we should have found it, unless the cache has overflowed */
    	if (j < 0 && !MyProc->subxids.overflowed)
    		elog(WARNING, "did not find subXID %u in MyProc", xid);
    
    	LWLockRelease(ProcArrayLock);
    }
    
    #ifdef XIDCACHE_DEBUG
    
    /*
     * Print stats about effectiveness of XID cache
     */
    static void
    DisplayXidCache(void)
    {
    	fprintf(stderr,
    			"XidCache: xmin: %ld, mainxid: %ld, childxid: %ld, slow: %ld\n",
    			xc_by_recent_xmin,
    			xc_by_main_xid,
    			xc_by_child_xid,
    			xc_slow_answer);
    }
    
    #endif   /* XIDCACHE_DEBUG */