diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 6be892bfc7c748f0e17f02d48d4c8db126aa644a..96550744158da3f217de3e17237f60c94fcbf249 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.116 2008/03/26 21:10:37 alvherre Exp $
+ *	  $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.117 2008/04/03 16:27:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -19,6 +19,7 @@
 #include "access/heapam.h"
 #include "access/transam.h"
 #include "access/tuptoaster.h"
+#include "access/xact.h"
 #include "catalog/index.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
@@ -33,6 +34,7 @@
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "storage/proc.h"
+#include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/datum.h"
 #include "utils/lsyscache.h"
@@ -362,10 +364,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt,
 		 * zero-column table.
 		 */
 		if (!vacstmt->vacuum)
-			pgstat_report_analyze(RelationGetRelid(onerel),
-								  onerel->rd_rel->relisshared,
-								  0, 0);
-
+			pgstat_report_analyze(onerel, 0, 0);
 		goto cleanup;
 	}
 
@@ -481,9 +480,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt,
 		}
 
 		/* report results to the stats collector, too */
-		pgstat_report_analyze(RelationGetRelid(onerel),
-							  onerel->rd_rel->relisshared,
-							  totalrows, totaldeadrows);
+		pgstat_report_analyze(onerel, totalrows, totaldeadrows);
 	}
 
 	/* We skip to here if there were no analyzable columns */
@@ -856,11 +853,13 @@ static int
 acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 					double *totalrows, double *totaldeadrows)
 {
-	int			numrows = 0;	/* # rows collected */
-	double		liverows = 0;	/* # rows seen */
+	int			numrows = 0;	/* # rows now in reservoir */
+	double		samplerows = 0;	/* total # rows collected */
+	double		liverows = 0;	/* # live rows seen */
 	double		deadrows = 0;	/* # dead rows seen */
 	double		rowstoskip = -1;	/* -1 means not set yet */
 	BlockNumber totalblocks;
+	TransactionId OldestXmin;
 	BlockSamplerData bs;
 	double		rstate;
 
@@ -868,6 +867,9 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 
 	totalblocks = RelationGetNumberOfBlocks(onerel);
 
+	/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
+	OldestXmin = GetOldestXmin(onerel->rd_rel->relisshared, true);
+
 	/* Prepare for sampling block numbers */
 	BlockSampler_Init(&bs, totalblocks, targrows);
 	/* Prepare for sampling rows */
@@ -888,28 +890,112 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 		 * We must maintain a pin on the target page's buffer to ensure that
 		 * the maxoffset value stays good (else concurrent VACUUM might delete
 		 * tuples out from under us).  Hence, pin the page until we are done
-		 * looking at it.  We don't maintain a lock on the page, so tuples
-		 * could get added to it, but we ignore such tuples.
+		 * looking at it.  We also choose to hold sharelock on the buffer
+		 * throughout --- we could release and re-acquire sharelock for
+		 * each tuple, but since we aren't doing much work per tuple, the
+		 * extra lock traffic is probably better avoided.
 		 */
 		targbuffer = ReadBufferWithStrategy(onerel, targblock, vac_strategy);
 		LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
 		targpage = BufferGetPage(targbuffer);
 		maxoffset = PageGetMaxOffsetNumber(targpage);
-		LockBuffer(targbuffer, BUFFER_LOCK_UNLOCK);
 
 		/* Inner loop over all tuples on the selected page */
 		for (targoffset = FirstOffsetNumber; targoffset <= maxoffset; targoffset++)
 		{
+			ItemId		itemid;
 			HeapTupleData targtuple;
+			bool		sample_it = false;
+
+			itemid = PageGetItemId(targpage, targoffset);
+
+			/*
+			 * We ignore unused and redirect line pointers.  DEAD line
+			 * pointers should be counted as dead, because we need vacuum
+			 * to run to get rid of them.  Note that this rule agrees with
+			 * the way that heap_page_prune() counts things.
+			 */
+			if (!ItemIdIsNormal(itemid))
+			{
+				if (ItemIdIsDead(itemid))
+					deadrows += 1;
+				continue;
+			}
 
 			ItemPointerSet(&targtuple.t_self, targblock, targoffset);
-			/* We use heap_release_fetch to avoid useless bufmgr traffic */
-			if (heap_release_fetch(onerel, SnapshotNow,
-								   &targtuple, &targbuffer,
-								   true, NULL))
+
+			targtuple.t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
+			targtuple.t_len = ItemIdGetLength(itemid);
+
+			switch (HeapTupleSatisfiesVacuum(targtuple.t_data,
+											 OldestXmin,
+											 targbuffer))
+			{
+				case HEAPTUPLE_LIVE:
+					sample_it = true;
+					liverows += 1;
+					break;
+
+				case HEAPTUPLE_DEAD:
+				case HEAPTUPLE_RECENTLY_DEAD:
+					/* Count dead and recently-dead rows */
+					deadrows += 1;
+					break;
+
+				case HEAPTUPLE_INSERT_IN_PROGRESS:
+					/*
+					 * Insert-in-progress rows are not counted.  We assume
+					 * that when the inserting transaction commits or aborts,
+					 * it will send a stats message to increment the proper
+					 * count.  This works right only if that transaction ends
+					 * after we finish analyzing the table; if things happen
+					 * in the other order, its stats update will be
+					 * overwritten by ours.  However, the error will be
+					 * large only if the other transaction runs long enough
+					 * to insert many tuples, so assuming it will finish
+					 * after us is the safer option.
+					 *
+					 * A special case is that the inserting transaction might
+					 * be our own.  In this case we should count and sample
+					 * the row, to accommodate users who load a table and
+					 * analyze it in one transaction.  (pgstat_report_analyze
+					 * has to adjust the numbers we send to the stats collector
+					 * to make this come out right.)
+					 */
+					if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(targtuple.t_data)))
+					{
+						sample_it = true;
+						liverows += 1;
+					}
+					break;
+
+				case HEAPTUPLE_DELETE_IN_PROGRESS:
+					/*
+					 * We count delete-in-progress rows as still live, using
+					 * the same reasoning given above; but we don't bother to
+					 * include them in the sample.
+					 *
+					 * If the delete was done by our own transaction, however,
+					 * we must count the row as dead to make
+					 * pgstat_report_analyze's stats adjustments come out
+					 * right.  (Note: this works out properly when the row
+					 * was both inserted and deleted in our xact.)
+					 */
+					if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmax(targtuple.t_data)))
+						deadrows += 1;
+					else
+						liverows += 1;
+					break;
+
+				default:
+					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+					break;
+			}
+
+			if (sample_it)
 			{
 				/*
-				 * The first targrows live rows are simply copied into the
+				 * The first targrows sample rows are simply copied into the
 				 * reservoir. Then we start replacing tuples in the sample
 				 * until we reach the end of the relation.	This algorithm is
 				 * from Jeff Vitter's paper (see full citation below). It
@@ -927,11 +1013,11 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 					/*
 					 * t in Vitter's paper is the number of records already
 					 * processed.  If we need to compute a new S value, we
-					 * must use the not-yet-incremented value of liverows as
-					 * t.
+					 * must use the not-yet-incremented value of samplerows
+					 * as t.
 					 */
 					if (rowstoskip < 0)
-						rowstoskip = get_next_S(liverows, targrows, &rstate);
+						rowstoskip = get_next_S(samplerows, targrows, &rstate);
 
 					if (rowstoskip <= 0)
 					{
@@ -949,18 +1035,12 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 					rowstoskip -= 1;
 				}
 
-				liverows += 1;
-			}
-			else
-			{
-				/* Count dead rows, but not empty slots */
-				if (targtuple.t_data != NULL)
-					deadrows += 1;
+				samplerows += 1;
 			}
 		}
 
-		/* Now release the pin on the page */
-		ReleaseBuffer(targbuffer);
+		/* Now release the lock and pin on the page */
+		UnlockReleaseBuffer(targbuffer);
 	}
 
 	/*
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 8a6cf19ebcdc17d066ec2e4c3c224f04ccbeb625..c9720a63104e8a1c576c3f34419c811114966c4f 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -13,7 +13,7 @@
  *
  *	Copyright (c) 2001-2008, PostgreSQL Global Development Group
  *
- *	$PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.172 2008/03/26 21:10:38 alvherre Exp $
+ *	$PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.173 2008/04/03 16:27:25 tgl Exp $
  * ----------
  */
 #include "postgres.h"
@@ -1037,7 +1037,7 @@ pgstat_report_vacuum(Oid tableoid, bool shared,
  * --------
  */
 void
-pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples,
+pgstat_report_analyze(Relation rel, PgStat_Counter livetuples,
 					  PgStat_Counter deadtuples)
 {
 	PgStat_MsgAnalyze msg;
@@ -1045,10 +1045,36 @@ pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples,
 	if (pgStatSock < 0 || !pgstat_track_counts)
 		return;
 
+	/*
+	 * Unlike VACUUM, ANALYZE might be running inside a transaction that
+	 * has already inserted and/or deleted rows in the target table.
+	 * ANALYZE will have counted such rows as live or dead respectively.
+	 * Because we will report our counts of such rows at transaction end,
+	 * we should subtract off these counts from what we send to the collector
+	 * now, else they'll be double-counted after commit.  (This approach also
+	 * ensures that the collector ends up with the right numbers if we abort
+	 * instead of committing.)
+	 */
+	if (rel->pgstat_info != NULL)
+	{
+		PgStat_TableXactStatus *trans;
+
+		for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
+		{
+			livetuples -= trans->tuples_inserted - trans->tuples_deleted;
+			deadtuples -= trans->tuples_deleted;
+		}
+		/* count stuff inserted by already-aborted subxacts, too */
+		deadtuples -= rel->pgstat_info->t_counts.t_new_dead_tuples;
+		/* Since ANALYZE's counts are estimates, we could have underflowed */
+		livetuples = Max(livetuples, 0);
+		deadtuples = Max(deadtuples, 0);
+	}
+
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
-	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
-	msg.m_tableoid = tableoid;
-	msg.m_autovacuum = IsAutoVacuumWorkerProcess();		/* is this autovacuum? */
+	msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
+	msg.m_tableoid = RelationGetRelid(rel);
+	msg.m_autovacuum = IsAutoVacuumWorkerProcess();	/* is this autovacuum? */
 	msg.m_analyzetime = GetCurrentTimestamp();
 	msg.m_live_tuples = livetuples;
 	msg.m_dead_tuples = deadtuples;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index fdc846acad4b52fc117471218e5aee522feaf48e..fc0173c6f3345a9916bf2c37ce0a5bc2c65b69da 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -5,7 +5,7 @@
  *
  *	Copyright (c) 2001-2008, PostgreSQL Global Development Group
  *
- *	$PostgreSQL: pgsql/src/include/pgstat.h,v 1.73 2008/03/24 18:22:36 tgl Exp $
+ *	$PostgreSQL: pgsql/src/include/pgstat.h,v 1.74 2008/04/03 16:27:25 tgl Exp $
  * ----------
  */
 #ifndef PGSTAT_H
@@ -497,7 +497,7 @@ extern void pgstat_reset_counters(void);
 extern void pgstat_report_autovac(Oid dboid);
 extern void pgstat_report_vacuum(Oid tableoid, bool shared,
 					 bool analyze, PgStat_Counter tuples);
-extern void pgstat_report_analyze(Oid tableoid, bool shared,
+extern void pgstat_report_analyze(Relation rel,
 					  PgStat_Counter livetuples,
 					  PgStat_Counter deadtuples);