diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index d388f521f183383e53b07c0a9d0a9e8f45003f58..0f6ff2d37e6e124f2d066ab3ad0c8632c78b44c7 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.71 2004/05/08 19:09:24 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.72 2004/05/23 21:24:02 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -39,6 +39,16 @@
 #include "utils/tuplesort.h"
 
 
+/* Data structure for Algorithm S from Knuth 3.4.2 */
+typedef struct
+{
+	BlockNumber	N;				/* number of blocks, known in advance */
+	int			n;				/* desired sample size */
+	BlockNumber	t;				/* current block number */
+	int			m;				/* blocks selected so far */
+} BlockSamplerData;
+typedef BlockSamplerData *BlockSampler;
+
 /* Per-index data for ANALYZE */
 typedef struct AnlIndexData
 {
@@ -57,6 +67,10 @@ static int	elevel = -1;
 static MemoryContext anl_context = NULL;
 
 
+static void BlockSampler_Init(BlockSampler bs, BlockNumber nblocks,
+							  int samplesize);
+static bool BlockSampler_HasMore(BlockSampler bs);
+static BlockNumber BlockSampler_Next(BlockSampler bs);
 static void compute_index_stats(Relation onerel, double totalrows,
 								AnlIndexData *indexdata, int nindexes,
 								HeapTuple *rows, int numrows,
@@ -66,7 +80,7 @@ static int acquire_sample_rows(Relation onerel, HeapTuple *rows,
 					int targrows, double *totalrows);
 static double random_fract(void);
 static double init_selection_state(int n);
-static double select_next_random_record(double t, int n, double *stateptr);
+static double get_next_S(double t, int n, double *stateptr);
 static int	compare_rows(const void *a, const void *b);
 static void update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats);
 static Datum std_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull);
@@ -637,16 +651,118 @@ examine_attribute(Relation onerel, int attnum)
 	return stats;
 }
 
+/*
+ * BlockSampler_Init -- prepare for random sampling of blocknumbers
+ *
+ * BlockSampler is used for stage one of our new two-stage tuple
+ * sampling mechanism as discussed on pgsql-hackers 2004-04-02 (subject
+ * "Large DB").  It selects a random sample of samplesize blocks out of
+ * the nblocks blocks in the table.  If the table has less than
+ * samplesize blocks, all blocks are selected.
+ *
+ * Since we know the total number of blocks in advance, we can use the
+ * straightforward Algorithm S from Knuth 3.4.2, rather than Vitter's
+ * algorithm.
+ */
+static void
+BlockSampler_Init(BlockSampler bs, BlockNumber nblocks, int samplesize)
+{
+	bs->N = nblocks;			/* measured table size */
+	/*
+	 * If we decide to reduce samplesize for tables that have less or
+	 * not much more than samplesize blocks, here is the place to do
+	 * it.
+	 */
+	bs->n = samplesize;
+	bs->t = 0;					/* blocks scanned so far */
+	bs->m = 0;					/* blocks selected so far */
+}
+
+static bool
+BlockSampler_HasMore(BlockSampler bs)
+{
+	return (bs->t < bs->N) && (bs->m < bs->n);
+}
+
+static BlockNumber
+BlockSampler_Next(BlockSampler bs)
+{
+	BlockNumber	K = bs->N - bs->t;		/* remaining blocks */
+	int			k = bs->n - bs->m;		/* blocks still to sample */
+	double		p;						/* probability to skip block */
+	double		V;						/* random */
+
+	Assert(BlockSampler_HasMore(bs));	/* hence K > 0 and k > 0 */
+
+	if ((BlockNumber) k >= K)
+	{
+		/* need all the rest */
+		bs->m++;
+		return bs->t++;
+	}
+
+	/*----------
+	 * It is not obvious that this code matches Knuth's Algorithm S.
+	 * Knuth says to skip the current block with probability 1 - k/K.
+	 * If we are to skip, we should advance t (hence decrease K), and
+	 * repeat the same probabilistic test for the next block.  The naive
+	 * implementation thus requires a random_fract() call for each block
+	 * number.  But we can reduce this to one random_fract() call per
+	 * selected block, by noting that each time the while-test succeeds,
+	 * we can reinterpret V as a uniform random number in the range 0 to p.
+	 * Therefore, instead of choosing a new V, we just adjust p to be
+	 * the appropriate fraction of its former value, and our next loop
+	 * makes the appropriate probabilistic test.
+	 *
+	 * We have initially K > k > 0.  If the loop reduces K to equal k,
+	 * the next while-test must fail since p will become exactly zero
+	 * (we assume there will not be roundoff error in the division).
+	 * (Note: Knuth suggests a "<=" loop condition, but we use "<" just
+	 * to be doubly sure about roundoff error.)  Therefore K cannot become
+	 * less than k, which means that we cannot fail to select enough blocks.
+	 *----------
+	 */
+	V = random_fract();
+	p = 1.0 - (double) k / (double) K;
+	while (V < p)
+	{
+		/* skip */
+		bs->t++;
+		K--;					/* keep K == N - t */
+
+		/* adjust p to be new cutoff point in reduced range */
+		p *= 1.0 - (double) k / (double) K;
+	}
+
+	/* select */
+	bs->m++;
+	return bs->t++;
+}
+
 /*
  * acquire_sample_rows -- acquire a random sample of rows from the table
  *
- * Up to targrows rows are collected (if there are fewer than that many
- * rows in the table, all rows are collected).	When the table is larger
- * than targrows, a truly random sample is collected: every row has an
- * equal chance of ending up in the final sample.
+ * As of May 2004 we use a new two-stage method:  Stage one selects up
+ * to targrows random blocks (or all blocks, if there aren't so many).
+ * Stage two scans these blocks and uses the Vitter algorithm to create
+ * a random sample of targrows rows (or less, if there are less in the
+ * sample of blocks).  The two stages are executed simultaneously: each
+ * block is processed as soon as stage one returns its number and while
+ * the rows are read stage two controls which ones are to be inserted
+ * into the sample.
+ *
+ * Although every row has an equal chance of ending up in the final
+ * sample, this sampling method is not perfect: not every possible
+ * sample has an equal chance of being selected.  For large relations
+ * the number of different blocks represented by the sample tends to be
+ * too small.  We can live with that for now.  Improvements are welcome.
  *
  * We also estimate the total number of rows in the table, and return that
- * into *totalrows.
+ * into *totalrows.  An important property of this sampling method is that
+ * because we do look at a statistically unbiased set of blocks, we should
+ * get an unbiased estimate of the average number of live rows per block.
+ * The previous sampling method put too much credence in the row density near
+ * the start of the table.
  *
  * The returned list of tuples is in order by physical position in the table.
  * (We will rely on this later to derive correlation estimates.)
@@ -655,101 +771,27 @@ static int
 acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 					double *totalrows)
 {
-	int			numrows = 0;
-	HeapScanDesc scan;
+	int			numrows = 0;					/* # rows collected */
+	double		liverows = 0;					/* # rows seen */
+	double		deadrows = 0;
+	double		rowstoskip = -1;				/* -1 means not set yet */
 	BlockNumber	totalblocks;
-	HeapTuple	tuple;
-	ItemPointer lasttuple;
-	BlockNumber lastblock,
-				estblock;
-	OffsetNumber lastoffset;
-	int			numest;
-	double		tuplesperpage;
-	double		t;
+	BlockSamplerData bs;
 	double		rstate;
 
 	Assert(targrows > 1);
 
-	/*
-	 * Do a simple linear scan until we reach the target number of rows.
-	 */
-	scan = heap_beginscan(onerel, SnapshotNow, 0, NULL);
-	totalblocks = scan->rs_nblocks;	/* grab current relation size */
-	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-	{
-		rows[numrows++] = heap_copytuple(tuple);
-		if (numrows >= targrows)
-			break;
-		vacuum_delay_point();
-	}
-	heap_endscan(scan);
-
-	/*
-	 * If we ran out of tuples then we're done, no matter how few we
-	 * collected.  No sort is needed, since they're already in order.
-	 */
-	if (!HeapTupleIsValid(tuple))
-	{
-		*totalrows = (double) numrows;
-
-		ereport(elevel,
-				(errmsg("\"%s\": %u pages, %d rows sampled, %.0f estimated total rows",
-						RelationGetRelationName(onerel),
-						totalblocks, numrows, *totalrows)));
-
-		return numrows;
-	}
-
-	/*
-	 * Otherwise, start replacing tuples in the sample until we reach the
-	 * end of the relation.  This algorithm is from Jeff Vitter's paper
-	 * (see full citation below).  It works by repeatedly computing the
-	 * number of the next tuple we want to fetch, which will replace a
-	 * randomly chosen element of the reservoir (current set of tuples).
-	 * At all times the reservoir is a true random sample of the tuples
-	 * we've passed over so far, so when we fall off the end of the
-	 * relation we're done.
-	 *
-	 * A slight difficulty is that since we don't want to fetch tuples or
-	 * even pages that we skip over, it's not possible to fetch *exactly*
-	 * the N'th tuple at each step --- we don't know how many valid tuples
-	 * are on the skipped pages.  We handle this by assuming that the
-	 * average number of valid tuples/page on the pages already scanned
-	 * over holds good for the rest of the relation as well; this lets us
-	 * estimate which page the next tuple should be on and its position in
-	 * the page.  Then we fetch the first valid tuple at or after that
-	 * position, being careful not to use the same tuple twice.  This
-	 * approach should still give a good random sample, although it's not
-	 * perfect.
-	 */
-	lasttuple = &(rows[numrows - 1]->t_self);
-	lastblock = ItemPointerGetBlockNumber(lasttuple);
-	lastoffset = ItemPointerGetOffsetNumber(lasttuple);
-
-	/*
-	 * If possible, estimate tuples/page using only completely-scanned
-	 * pages.
-	 */
-	for (numest = numrows; numest > 0; numest--)
-	{
-		if (ItemPointerGetBlockNumber(&(rows[numest - 1]->t_self)) != lastblock)
-			break;
-	}
-	if (numest == 0)
-	{
-		numest = numrows;		/* don't have a full page? */
-		estblock = lastblock + 1;
-	}
-	else
-		estblock = lastblock;
-	tuplesperpage = (double) numest / (double) estblock;
+	totalblocks = RelationGetNumberOfBlocks(onerel);
 
-	t = (double) numrows;		/* t is the # of records processed so far */
+	/* Prepare for sampling block numbers */
+	BlockSampler_Init(&bs, totalblocks, targrows);
+	/* Prepare for sampling rows */
 	rstate = init_selection_state(targrows);
-	for (;;)
+
+	/* Outer loop over blocks to sample */
+	while (BlockSampler_HasMore(&bs))
 	{
-		double		targpos;
-		BlockNumber targblock;
+		BlockNumber targblock = BlockSampler_Next(&bs);
 		Buffer		targbuffer;
 		Page		targpage;
 		OffsetNumber targoffset,
@@ -757,28 +799,6 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 
 		vacuum_delay_point();
 
-		t = select_next_random_record(t, targrows, &rstate);
-		/* Try to read the t'th record in the table */
-		targpos = t / tuplesperpage;
-		targblock = (BlockNumber) targpos;
-		targoffset = ((int) ((targpos - targblock) * tuplesperpage)) +
-			FirstOffsetNumber;
-		/* Make sure we are past the last selected record */
-		if (targblock <= lastblock)
-		{
-			targblock = lastblock;
-			if (targoffset <= lastoffset)
-				targoffset = lastoffset + 1;
-		}
-		/* Loop to find first valid record at or after given position */
-pageloop:;
-
-		/*
-		 * Have we fallen off the end of the relation?
-		 */
-		if (targblock >= totalblocks)
-			break;
-
 		/*
 		 * We must maintain a pin on the target page's buffer to ensure
 		 * that the maxoffset value stays good (else concurrent VACUUM
@@ -795,62 +815,109 @@ pageloop:;
 		maxoffset = PageGetMaxOffsetNumber(targpage);
 		LockBuffer(targbuffer, BUFFER_LOCK_UNLOCK);
 
-		for (;;)
+		/* Inner loop over all tuples on the selected page */
+		for (targoffset = FirstOffsetNumber; targoffset <= maxoffset; targoffset++)
 		{
 			HeapTupleData targtuple;
 			Buffer		tupbuffer;
 
-			if (targoffset > maxoffset)
-			{
-				/* Fell off end of this page, try next */
-				ReleaseBuffer(targbuffer);
-				targblock++;
-				targoffset = FirstOffsetNumber;
-				goto pageloop;
-			}
 			ItemPointerSet(&targtuple.t_self, targblock, targoffset);
 			if (heap_fetch(onerel, SnapshotNow, &targtuple, &tupbuffer,
 						   false, NULL))
 			{
 				/*
-				 * Found a suitable tuple, so save it, replacing one old
-				 * tuple at random
+				 * The first targrows live rows are simply copied into the
+				 * reservoir.
+				 * Then we start replacing tuples in the sample until
+				 * we reach the end of the relation.  This algorithm is
+				 * from Jeff Vitter's paper (see full citation below).
+				 * It works by repeatedly computing the number of tuples
+				 * to skip before selecting a tuple, which replaces a
+				 * randomly chosen element of the reservoir (current
+				 * set of tuples).  At all times the reservoir is a true
+				 * random sample of the tuples we've passed over so far,
+				 * so when we fall off the end of the relation we're done.
 				 */
-				int			k = (int) (targrows * random_fract());
+				if (numrows < targrows)
+					rows[numrows++] = heap_copytuple(&targtuple);
+				else
+				{
+					/*
+					 * t in Vitter's paper is the number of records already
+					 * processed.  If we need to compute a new S value, we
+					 * must use the not-yet-incremented value of liverows
+					 * as t.
+					 */
+					if (rowstoskip < 0)
+						rowstoskip = get_next_S(liverows, targrows, &rstate);
+
+					if (rowstoskip <= 0)
+					{
+						/*
+						 * Found a suitable tuple, so save it,
+						 * replacing one old tuple at random
+						 */
+						int	k = (int) (targrows * random_fract());
 
-				Assert(k >= 0 && k < targrows);
-				heap_freetuple(rows[k]);
-				rows[k] = heap_copytuple(&targtuple);
-				/* this releases the second pin acquired by heap_fetch: */
+						Assert(k >= 0 && k < targrows);
+						heap_freetuple(rows[k]);
+						rows[k] = heap_copytuple(&targtuple);
+					}
+
+					rowstoskip -= 1;
+				}
+
+				/* must release the extra pin acquired by heap_fetch */
 				ReleaseBuffer(tupbuffer);
-				/* this releases the initial pin: */
-				ReleaseBuffer(targbuffer);
-				lastblock = targblock;
-				lastoffset = targoffset;
-				break;
+
+				liverows += 1;
+			}
+			else
+			{
+				/*
+				 * Count dead rows, but not empty slots.  This information is
+				 * currently not used, but it seems likely we'll want it
+				 * someday.
+				 */
+				if (targtuple.t_data != NULL)
+					deadrows += 1;
 			}
-			/* this tuple is dead, so advance to next one on same page */
-			targoffset++;
 		}
+
+		/* Now release the initial pin on the page */
+		ReleaseBuffer(targbuffer);
 	}
 
 	/*
-	 * Now we need to sort the collected tuples by position (itempointer).
+	 * If we didn't find as many tuples as we wanted then we're done.
+	 * No sort is needed, since they're already in order.
+	 *
+	 * Otherwise we need to sort the collected tuples by position
+	 * (itempointer).  It's not worth worrying about corner cases
+	 * where the tuples are already sorted.
 	 */
-	qsort((void *) rows, numrows, sizeof(HeapTuple), compare_rows);
+	if (numrows == targrows)
+		qsort((void *) rows, numrows, sizeof(HeapTuple), compare_rows);
 
 	/*
-	 * Estimate total number of valid rows in relation.
+	 * Estimate total number of live rows in relation.
 	 */
-	*totalrows = floor((double) totalblocks * tuplesperpage + 0.5);
+	if (bs.m > 0)
+		*totalrows = floor((liverows * totalblocks) / bs.m + 0.5);
+	else
+		*totalrows = 0.0;
 
 	/*
 	 * Emit some interesting relation info 
 	 */
 	ereport(elevel,
-			(errmsg("\"%s\": %u pages, %d rows sampled, %.0f estimated total rows",
+			(errmsg("\"%s\": scanned %d of %u pages, "
+					"containing %.0f live rows and %.0f dead rows; "
+					"%d rows in sample, %.0f estimated total rows",
 					RelationGetRelationName(onerel),
-					totalblocks, numrows, *totalrows)));
+					bs.m, totalblocks,
+					liverows, deadrows,
+					numrows, *totalrows)));
 
 	return numrows;
 }
@@ -872,23 +939,16 @@ random_fract(void)
 /*
  * These two routines embody Algorithm Z from "Random sampling with a
  * reservoir" by Jeffrey S. Vitter, in ACM Trans. Math. Softw. 11, 1
- * (Mar. 1985), Pages 37-57.  While Vitter describes his algorithm in terms
- * of the count S of records to skip before processing another record,
- * it is convenient to work primarily with t, the index (counting from 1)
- * of the last record processed and next record to process.  The only extra
- * state needed between calls is W, a random state variable.
- *
- * Note: the original algorithm defines t, S, numer, and denom as integers.
- * Here we express them as doubles to avoid overflow if the number of rows
- * in the table exceeds INT_MAX.  The algorithm should work as long as the
- * row count does not become so large that it is not represented accurately
- * in a double (on IEEE-math machines this would be around 2^52 rows).
+ * (Mar. 1985), Pages 37-57.  Vitter describes his algorithm in terms
+ * of the count S of records to skip before processing another record.
+ * It is computed primarily based on t, the number of records already read.
+ * The only extra state needed between calls is W, a random state variable.
  *
  * init_selection_state computes the initial W value.
  *
- * Given that we've already processed t records (t >= n),
- * select_next_random_record determines the number of the next record to
- * process.
+ * Given that we've already read t records (t >= n), get_next_S
+ * determines the number of records to skip before the next record is
+ * processed.
  */
 static double
 init_selection_state(int n)
@@ -898,8 +958,10 @@ init_selection_state(int n)
 }
 
 static double
-select_next_random_record(double t, int n, double *stateptr)
+get_next_S(double t, int n, double *stateptr)
 {
+	double		S;
+
 	/* The magic constant here is T from Vitter's paper */
 	if (t <= (22.0 * n))
 	{
@@ -908,11 +970,14 @@ select_next_random_record(double t, int n, double *stateptr)
 					quot;
 
 		V = random_fract();		/* Generate V */
+		S = 0;
 		t += 1;
+		/* Note: "num" in Vitter's code is always equal to t - n */
 		quot = (t - (double) n) / t;
 		/* Find min S satisfying (4.1) */
 		while (quot > V)
 		{
+			S += 1;
 			t += 1;
 			quot *= (t - (double) n) / t;
 		}
@@ -922,7 +987,6 @@ select_next_random_record(double t, int n, double *stateptr)
 		/* Now apply Algorithm Z */
 		double		W = *stateptr;
 		double		term = t - (double) n + 1;
-		double		S;
 
 		for (;;)
 		{
@@ -970,10 +1034,9 @@ select_next_random_record(double t, int n, double *stateptr)
 			if (exp(log(y) / n) <= (t + X) / t)
 				break;
 		}
-		t += S + 1;
 		*stateptr = W;
 	}
-	return t;
+	return S;
 }
 
 /*