diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c
index 6a8a96f60339118f7edb11668c5db23fbf85c211..c9594f68ad463aaf7c86fcb96ed51961cbfa3af1 100644
--- a/contrib/tcn/tcn.c
+++ b/contrib/tcn/tcn.c
@@ -141,8 +141,8 @@ triggered_change_notification(PG_FUNCTION_ARGS)
 		if (!HeapTupleIsValid(indexTuple))		/* should not happen */
 			elog(ERROR, "cache lookup failed for index %u", indexoid);
 		index = (Form_pg_index) GETSTRUCT(indexTuple);
-		/* we're only interested if it is the primary key */
-		if (index->indisprimary)
+		/* we're only interested if it is the primary key and valid */
+		if (index->indisprimary && IndexIsValid(index))
 		{
 			int			numatts = index->indnatts;
 
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index f99919093ca0da8c59ee4f4df0643837dfbdb38b..5f270404bf1279ad2ba745ca417ab00b0b5dbb08 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -3480,7 +3480,7 @@
        index is possibly incomplete: it must still be modified by
        <command>INSERT</>/<command>UPDATE</> operations, but it cannot safely
        be used for queries. If it is unique, the uniqueness property is not
-       true either.
+       guaranteed true either.
       </entry>
      </row>
 
@@ -3507,6 +3507,16 @@
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>indislive</structfield></entry>
+      <entry><type>bool</type></entry>
+      <entry></entry>
+      <entry>
+       If false, the index is in process of being dropped, and should be
+       ignored for all purposes (including HOT-safety decisions)
+      </entry>
+     </row>
+
      <row>
       <entry><structfield>indkey</structfield></entry>
       <entry><type>int2vector</type></entry>
diff --git a/doc/src/sgml/ref/drop_index.sgml b/doc/src/sgml/ref/drop_index.sgml
index 343f7aca003dd1687e7370da663cc26554955dbd..98fd9c966c3b2cbd0fdc3bf5609e2ab6d64f6b16 100644
--- a/doc/src/sgml/ref/drop_index.sgml
+++ b/doc/src/sgml/ref/drop_index.sgml
@@ -40,34 +40,33 @@ DROP INDEX [ CONCURRENTLY ] [ IF EXISTS ] <replaceable class="PARAMETER">name</r
 
   <variablelist>
    <varlistentry>
-    <term><literal>IF EXISTS</literal></term>
+    <term><literal>CONCURRENTLY</literal></term>
     <listitem>
      <para>
-      Do not throw an error if the index does not exist. A notice is issued
-      in this case.
+      Drop the index without locking out concurrent selects, inserts, updates,
+      and deletes on the index's table.  A normal <command>DROP INDEX</>
+      acquires exclusive lock on the table, blocking other accesses until the
+      index drop can be completed.  With this option, the command instead
+      waits until conflicting transactions have completed.
+     </para>
+     <para>
+      There are several caveats to be aware of when using this option.
+      Only one index name can be specified, and the <literal>CASCADE</> option
+      is not supported.  (Thus, an index that supports a <literal>UNIQUE</> or
+      <literal>PRIMARY KEY</> constraint cannot be dropped this way.)
+      Also, regular <command>DROP INDEX</> commands can be
+      performed within a transaction block, but
+      <command>DROP INDEX CONCURRENTLY</> cannot.
      </para>
     </listitem>
    </varlistentry>
 
    <varlistentry>
-    <term><literal>CONCURRENTLY</literal></term>
+    <term><literal>IF EXISTS</literal></term>
     <listitem>
      <para>
-      When this option is used, <productname>PostgreSQL</> will drop the
-      index without taking any locks that prevent concurrent selects, inserts,
-      updates, or deletes on the table; whereas a standard index drop
-      waits for a lock that locks out everything on the table until it's done.
-      Concurrent drop index is a two stage process. First, we mark the index
-      both invalid and not ready then commit the change. Next we wait until
-      there are no users locking the table who can see the index.
-     </para>
-     <para>
-      There are several caveats to be aware of when using this option.
-      Only one index name can be specified if the <literal>CONCURRENTLY</literal>
-      parameter is specified.  Regular <command>DROP INDEX</> command can be
-      performed within a transaction block, but
-      <command>DROP INDEX CONCURRENTLY</> cannot.
-      The CASCADE option is not supported when dropping an index concurrently.
+      Do not throw an error if the index does not exist. A notice is issued
+      in this case.
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/access/heap/README.HOT b/src/backend/access/heap/README.HOT
index f12cad44e56c363e62fa617497bbedfe1ba8c1fe..4cf3c3a0d4c2db96a57e73e46fdd7463db439f79 100644
--- a/src/backend/access/heap/README.HOT
+++ b/src/backend/access/heap/README.HOT
@@ -386,6 +386,34 @@ from the index, as well as ensuring that no one can see any inconsistent
 rows in a broken HOT chain (the first condition is stronger than the
 second).  Finally, we can mark the index valid for searches.
 
+Note that we do not need to set pg_index.indcheckxmin in this code path,
+because we have outwaited any transactions that would need to avoid using
+the index.  (indcheckxmin is only needed because non-concurrent CREATE
+INDEX doesn't want to wait; its stronger lock would create too much risk of
+deadlock if it did.)
+
+
+DROP INDEX CONCURRENTLY
+-----------------------
+
+DROP INDEX CONCURRENTLY is sort of the reverse sequence of CREATE INDEX
+CONCURRENTLY.  We first mark the index as not indisvalid, and then wait for
+any transactions that could be using it in queries to end.  (During this
+time, index updates must still be performed as normal, since such
+transactions might expect freshly inserted tuples to be findable.)
+Then, we clear indisready and indislive, and again wait for transactions
+that could be updating the index to end.  Finally we can drop the index
+normally (though taking only ShareUpdateExclusiveLock on its parent table).
+
+The reason we need the pg_index.indislive flag is that after the second
+wait step begins, we don't want transactions to be touching the index at
+all; otherwise they might suffer errors if the DROP finally commits while
+they are reading catalog entries for the index.  If we had only indisvalid
+and indisready, this state would be indistinguishable from the first stage
+of CREATE INDEX CONCURRENTLY --- but in that state, we *do* want
+transactions to examine the index, since they must consider it in
+HOT-safety checks.
+
 
 Limitations and Restrictions
 ----------------------------
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index b9cfee2e81246e638b43c844fba0b985b054db25..0b61c97eb4da391e66b7094115983094b3da2409 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -995,7 +995,6 @@ deleteOneObject(const ObjectAddress *object, Relation depRel, int flags)
 	int			nkeys;
 	SysScanDesc scan;
 	HeapTuple	tup;
-	Oid			depRelOid = depRel->rd_id;
 
 	/* DROP hook of the objects being removed */
 	if (object_access_hook)
@@ -1008,9 +1007,9 @@ deleteOneObject(const ObjectAddress *object, Relation depRel, int flags)
 	}
 
 	/*
-	 * Close depRel if we are doing a drop concurrently. The individual
-	 * deletion has to commit the transaction and we don't want dangling
-	 * references.
+	 * Close depRel if we are doing a drop concurrently.  The object deletion
+	 * subroutine will commit the current transaction, so we can't keep the
+	 * relation open across doDeletion().
 	 */
 	if (flags & PERFORM_DELETION_CONCURRENTLY)
 		heap_close(depRel, RowExclusiveLock);
@@ -1018,24 +1017,23 @@ deleteOneObject(const ObjectAddress *object, Relation depRel, int flags)
 	/*
 	 * Delete the object itself, in an object-type-dependent way.
 	 *
-	 * Do this before removing outgoing dependencies as deletions can be
-	 * happening in concurrent mode. That will only happen for a single object
-	 * at once and if so the object will be invalidated inside a separate
-	 * transaction and only dropped inside a transaction thats in-progress when
-	 * doDeletion returns. This way no observer can see dangling dependency
-	 * entries.
+	 * We used to do this after removing the outgoing dependency links, but it
+	 * seems just as reasonable to do it beforehand.  In the concurrent case
+	 * we *must* do it in this order, because we can't make any transactional
+	 * updates before calling doDeletion() --- they'd get committed right
+	 * away, which is not cool if the deletion then fails.
 	 */
 	doDeletion(object, flags);
 
 	/*
-	 * Reopen depRel if we closed it before
+	 * Reopen depRel if we closed it above
 	 */
 	if (flags & PERFORM_DELETION_CONCURRENTLY)
-		depRel = heap_open(depRelOid, RowExclusiveLock);
+		depRel = heap_open(DependRelationId, RowExclusiveLock);
 
 	/*
-	 * Then remove any pg_depend records that link from this object to
-	 * others.	(Any records linking to this object should be gone already.)
+	 * Now remove any pg_depend records that link from this object to others.
+	 * (Any records linking to this object should be gone already.)
 	 *
 	 * When dropping a whole object (subId = 0), remove all pg_depend records
 	 * for its sub-objects too.
@@ -1258,15 +1256,23 @@ AcquireDeletionLock(const ObjectAddress *object, int flags)
 {
 	if (object->classId == RelationRelationId)
 	{
+		/*
+		 * In DROP INDEX CONCURRENTLY, take only ShareUpdateExclusiveLock on
+		 * the index for the moment.  index_drop() will promote the lock once
+		 * it's safe to do so.  In all other cases we need full exclusive
+		 * lock.
+		 */
 		if (flags & PERFORM_DELETION_CONCURRENTLY)
 			LockRelationOid(object->objectId, ShareUpdateExclusiveLock);
 		else
 			LockRelationOid(object->objectId, AccessExclusiveLock);
 	}
 	else
+	{
 		/* assume we should lock the whole object not a sub-object */
 		LockDatabaseObject(object->classId, object->objectId, 0,
 						   AccessExclusiveLock);
+	}
 }
 
 /*
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index d2d91c1b786e3247ba3b289830e19896e4ea6a64..92e8de15313fee163cded106bd5de719d51054e3 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -125,6 +125,10 @@ static void ResetReindexPending(void);
  *		See whether an existing relation has a primary key.
  *
  * Caller must have suitable lock on the relation.
+ *
+ * Note: we intentionally do not check IndexIsValid here; that's because this
+ * is used to enforce the rule that there can be only one indisprimary index,
+ * and we want that to be true even if said index is invalid.
  */
 static bool
 relationHasPrimaryKey(Relation rel)
@@ -608,6 +612,7 @@ UpdateIndexRelation(Oid indexoid,
 	values[Anum_pg_index_indcheckxmin - 1] = BoolGetDatum(false);
 	/* we set isvalid and isready the same way */
 	values[Anum_pg_index_indisready - 1] = BoolGetDatum(isvalid);
+	values[Anum_pg_index_indislive - 1] = BoolGetDatum(true);
 	values[Anum_pg_index_indkey - 1] = PointerGetDatum(indkey);
 	values[Anum_pg_index_indcollation - 1] = PointerGetDatum(indcollation);
 	values[Anum_pg_index_indclass - 1] = PointerGetDatum(indclass);
@@ -1258,8 +1263,9 @@ index_constraint_create(Relation heapRelation,
 	 * Note: since this is a transactional update, it's unsafe against
 	 * concurrent SnapshotNow scans of pg_index.  When making an existing
 	 * index into a constraint, caller must have a table lock that prevents
-	 * concurrent table updates, and there is a risk that concurrent readers
-	 * of the table will miss seeing this index at all.
+	 * concurrent table updates; if it's less than a full exclusive lock,
+	 * there is a risk that concurrent readers of the table will miss seeing
+	 * this index at all.
 	 */
 	if (update_pgindex && (mark_as_primary || deferrable))
 	{
@@ -1317,8 +1323,8 @@ index_drop(Oid indexId, bool concurrent)
 	LockRelId	heaprelid,
 				indexrelid;
 	LOCKTAG		heaplocktag;
+	LOCKMODE	lockmode;
 	VirtualTransactionId *old_lockholders;
-	Form_pg_index indexForm;
 
 	/*
 	 * To drop an index safely, we must grab exclusive lock on its parent
@@ -1330,105 +1336,101 @@ index_drop(Oid indexId, bool concurrent)
 	 * proceeding until we commit and send out a shared-cache-inval notice
 	 * that will make them update their index lists.
 	 *
-	 * In the concurrent case we make sure that nobody can be looking at the
-	 * indexes by dropping the index in multiple steps, so we don't need a full
-	 * AccessExclusiveLock yet.
-	 *
-	 * All predicate locks on the index are about to be made invalid. Promote
-	 * them to relation locks on the heap. For correctness the index must not
-	 * be seen with indisvalid = true during query planning after the move
-	 * starts, so that the index will not be used for a scan after the
-	 * predicate lock move, as this could create new predicate locks on the
-	 * index which would not ensure a heap relation lock. Also, the index must
-	 * not be seen during execution of a heap tuple insert with indisready =
-	 * false before the move is complete, since the conflict with the
-	 * predicate lock on the index gap could be missed before the lock on the
-	 * heap relation is in place to detect a conflict based on the heap tuple
-	 * insert.
+	 * In the concurrent case we avoid this requirement by disabling index use
+	 * in multiple steps and waiting out any transactions that might be using
+	 * the index, so we don't need exclusive lock on the parent table. Instead
+	 * we take ShareUpdateExclusiveLock, to ensure that two sessions aren't
+	 * doing CREATE/DROP INDEX CONCURRENTLY on the same index.	(We will get
+	 * AccessExclusiveLock on the index below, once we're sure nobody else is
+	 * using it.)
 	 */
 	heapId = IndexGetRelation(indexId, false);
-	if (concurrent)
-	{
-		userHeapRelation = heap_open(heapId, ShareUpdateExclusiveLock);
-		userIndexRelation = index_open(indexId, ShareUpdateExclusiveLock);
-	}
-	else
-	{
-		userHeapRelation = heap_open(heapId, AccessExclusiveLock);
-		userIndexRelation = index_open(indexId, AccessExclusiveLock);
-	}
+	lockmode = concurrent ? ShareUpdateExclusiveLock : AccessExclusiveLock;
+	userHeapRelation = heap_open(heapId, lockmode);
+	userIndexRelation = index_open(indexId, lockmode);
 
 	/*
-	 * We might still have open queries using it in our own session.
+	 * We might still have open queries using it in our own session, which the
+	 * above locking won't prevent, so test explicitly.
 	 */
 	CheckTableNotInUse(userIndexRelation, "DROP INDEX");
 
 	/*
-	 * Drop Index concurrently is similar in many ways to creating an index
-	 * concurrently, so some actions are similar to DefineIndex() just in the
-	 * reverse order.
+	 * Drop Index Concurrently is more or less the reverse process of Create
+	 * Index Concurrently.
 	 *
 	 * First we unset indisvalid so queries starting afterwards don't use the
-	 * index to answer queries anymore. We have to keep indisready = true
-	 * so transactions that are still scanning the index can continue to
-	 * see valid index contents. E.g. when they are using READ COMMITTED mode,
-	 * and another transactions that started later commits makes changes and
-	 * commits, they need to see those new tuples in the index.
+	 * index to answer queries anymore.  We have to keep indisready = true so
+	 * transactions that are still scanning the index can continue to see
+	 * valid index contents.  For instance, if they are using READ COMMITTED
+	 * mode, and another transaction makes changes and commits, they need to
+	 * see those new tuples in the index.
 	 *
-	 * After all transactions that could possibly have used it for queries
-	 * ended we can unset indisready and wait till nobody could be updating it
-	 * anymore.
+	 * After all transactions that could possibly have used the index for
+	 * queries end, we can unset indisready and indislive, then wait till
+	 * nobody could be touching it anymore.  (Note: we need indislive because
+	 * this state must be distinct from the initial state during CREATE INDEX
+	 * CONCURRENTLY, which has indislive true while indisready and indisvalid
+	 * are false.  That's because in that state, transactions must examine the
+	 * index for HOT-safety decisions, while in this state we don't want them
+	 * to open it at all.)
+	 *
+	 * Since all predicate locks on the index are about to be made invalid, we
+	 * must promote them to predicate locks on the heap.  In the
+	 * non-concurrent case we can just do that now.  In the concurrent case
+	 * it's a bit trickier.  The predicate locks must be moved when there are
+	 * no index scans in progress on the index and no more can subsequently
+	 * start, so that no new predicate locks can be made on the index.	Also,
+	 * they must be moved before heap inserts stop maintaining the index, else
+	 * the conflict with the predicate lock on the index gap could be missed
+	 * before the lock on the heap relation is in place to detect a conflict
+	 * based on the heap tuple insert.
 	 */
 	if (concurrent)
 	{
 		/*
-		 * Mark index invalid by updating its pg_index entry
-		 *
-		 * Don't Assert(indexForm->indisvalid) because we may be trying to
-		 * clear up after an error when trying to create an index which left
-		 * the index invalid
+		 * We must commit our transaction in order to make the first pg_index
+		 * state update visible to other sessions.	If the DROP machinery has
+		 * already performed any other actions (removal of other objects,
+		 * pg_depend entries, etc), the commit would make those actions
+		 * permanent, which would leave us with inconsistent catalog state if
+		 * we fail partway through the following sequence.	Since DROP INDEX
+		 * CONCURRENTLY is restricted to dropping just one index that has no
+		 * dependencies, we should get here before anything's been done ---
+		 * but let's check that to be sure.  We can verify that the current
+		 * transaction has not executed any transactional updates by checking
+		 * that no XID has been assigned.
 		 */
-		indexRelation = heap_open(IndexRelationId, RowExclusiveLock);
-
-		tuple = SearchSysCacheCopy1(INDEXRELID,
-									ObjectIdGetDatum(indexId));
-		if (!HeapTupleIsValid(tuple))
-			elog(ERROR, "cache lookup failed for index %u", indexId);
-		indexForm = (Form_pg_index) GETSTRUCT(tuple);
+		if (GetTopTransactionIdIfAny() != InvalidTransactionId)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("DROP INDEX CONCURRENTLY must be first action in transaction")));
 
 		/*
-		 * If indisready == true we leave it set so the index still gets
-		 * maintained by pre-existing transactions. We only need to ensure
-		 * that indisvalid is false.
+		 * Mark index invalid by updating its pg_index entry
 		 */
-		if (indexForm->indisvalid)
-		{
-			indexForm->indisvalid = false;	/* make unusable for new queries */
-
-			simple_heap_update(indexRelation, &tuple->t_self, tuple);
-			CatalogUpdateIndexes(indexRelation, tuple);
-		}
+		index_set_state_flags(indexId, INDEX_DROP_CLEAR_VALID);
 
-		heap_close(indexRelation, RowExclusiveLock);
+		/*
+		 * Invalidate the relcache for the table, so that after this commit
+		 * all sessions will refresh any cached plans that might reference the
+		 * index.
+		 */
+		CacheInvalidateRelcache(userHeapRelation);
 
 		/* save lockrelid and locktag for below, then close but keep locks */
 		heaprelid = userHeapRelation->rd_lockInfo.lockRelId;
 		SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
-		heap_close(userHeapRelation, NoLock);
-
 		indexrelid = userIndexRelation->rd_lockInfo.lockRelId;
+
+		heap_close(userHeapRelation, NoLock);
 		index_close(userIndexRelation, NoLock);
 
 		/*
-		 * For a concurrent drop, it's important to make the catalog entries
-		 * visible to other transactions before we drop the index. The index
-		 * will be marked not indisvalid, so that no one else tries to use it
-		 * for queries.
-		 *
-		 * We must commit our current transaction so that the index update
-		 * becomes visible; then start another.  Note that all the data
-		 * structures we just built are lost in the commit.  The only data we
-		 * keep past here are the relation IDs.
+		 * We must commit our current transaction so that the indisvalid
+		 * update becomes visible to other transactions; then start another.
+		 * Note that any previously-built data structures are lost in the
+		 * commit.	The only data we keep past here are the relation IDs.
 		 *
 		 * Before committing, get a session-level lock on the table, to ensure
 		 * that neither it nor the index can be dropped before we finish. This
@@ -1443,13 +1445,13 @@ index_drop(Oid indexId, bool concurrent)
 		StartTransactionCommand();
 
 		/*
-		 * Now we must wait until no running transaction could have the table
-		 * open with the old list of indexes.  To do this, inquire which xacts
-		 * currently would conflict with AccessExclusiveLock on the table --
-		 * ie, which ones have a lock of any kind on the table. Then wait for
-		 * each of these xacts to commit or abort.	Note we do not need to
-		 * worry about xacts that open the table for writing after this point;
-		 * they will see the index as invalid when they open the relation.
+		 * Now we must wait until no running transaction could be using the
+		 * index for a query.  To do this, inquire which xacts currently would
+		 * conflict with AccessExclusiveLock on the table -- ie, which ones
+		 * have a lock of any kind on the table. Then wait for each of these
+		 * xacts to commit or abort. Note we do not need to worry about xacts
+		 * that open the table for reading after this point; they will see the
+		 * index as invalid when they open the relation.
 		 *
 		 * Note: the reason we use actual lock acquisition here, rather than
 		 * just checking the ProcArray and sleeping, is that deadlock is
@@ -1480,28 +1482,19 @@ index_drop(Oid indexId, bool concurrent)
 		TransferPredicateLocksToHeapRelation(userIndexRelation);
 
 		/*
-		 * Now we are sure that nobody uses the index for queries, they just
-		 * might have it opened for updating it. So now we can unset
-		 * indisready and wait till nobody could update the index anymore.
+		 * Now we are sure that nobody uses the index for queries; they just
+		 * might have it open for updating it.	So now we can unset indisready
+		 * and indislive, then wait till nobody could be using it at all
+		 * anymore.
 		 */
-		indexRelation = heap_open(IndexRelationId, RowExclusiveLock);
-
-		tuple = SearchSysCacheCopy1(INDEXRELID,
-									ObjectIdGetDatum(indexId));
-		if (!HeapTupleIsValid(tuple))
-			elog(ERROR, "cache lookup failed for index %u", indexId);
-		indexForm = (Form_pg_index) GETSTRUCT(tuple);
-
-		Assert(indexForm->indisvalid == false);
-		if (indexForm->indisready)
-		{
-			indexForm->indisready = false;	/* don't update index anymore */
-
-			simple_heap_update(indexRelation, &tuple->t_self, tuple);
-			CatalogUpdateIndexes(indexRelation, tuple);
-		}
+		index_set_state_flags(indexId, INDEX_DROP_SET_DEAD);
 
-		heap_close(indexRelation, RowExclusiveLock);
+		/*
+		 * Invalidate the relcache for the table, so that after this commit
+		 * all sessions will refresh the table's index list.  Forgetting just
+		 * the index's relcache entry is not enough.
+		 */
+		CacheInvalidateRelcache(userHeapRelation);
 
 		/*
 		 * Close the relations again, though still holding session lock.
@@ -1510,23 +1503,15 @@ index_drop(Oid indexId, bool concurrent)
 		index_close(userIndexRelation, NoLock);
 
 		/*
-		 * Invalidate the relcache for the table, so that after this
-		 * transaction we will refresh the index list. Forgetting just the
-		 * index is not enough.
-		 */
-		CacheInvalidateRelcache(userHeapRelation);
-
-		/*
-		 * Just as with indisvalid = false we need to make sure indisready
-		 * is false is visible for everyone.
+		 * Again, commit the transaction to make the pg_index update visible
+		 * to other sessions.
 		 */
 		CommitTransactionCommand();
 		StartTransactionCommand();
 
 		/*
-		 * Wait till everyone that saw indisready = true finished so we can
-		 * finally really remove the index. The logic here is the same as
-		 * above.
+		 * Wait till every transaction that saw the old index state has
+		 * finished.  The logic here is the same as above.
 		 */
 		old_lockholders = GetLockConflicts(&heaplocktag, AccessExclusiveLock);
 
@@ -1547,7 +1532,10 @@ index_drop(Oid indexId, bool concurrent)
 		userIndexRelation = index_open(indexId, AccessExclusiveLock);
 	}
 	else
+	{
+		/* Not concurrent, so just transfer predicate locks and we're good */
 		TransferPredicateLocksToHeapRelation(userIndexRelation);
+	}
 
 	/*
 	 * Schedule physical removal of the files
@@ -1601,7 +1589,8 @@ index_drop(Oid indexId, bool concurrent)
 	 * of relhasindex (the next VACUUM will fix it if necessary). So there is
 	 * no need to update the pg_class tuple for the owning relation. But we
 	 * must send out a shared-cache-inval notice on the owning relation to
-	 * ensure other backends update their relcache lists of indexes.
+	 * ensure other backends update their relcache lists of indexes.  (In the
+	 * concurrent case, this is redundant but harmless.)
 	 */
 	CacheInvalidateRelcache(userHeapRelation);
 
@@ -1677,7 +1666,7 @@ BuildIndexInfo(Relation index)
 
 	/* other info */
 	ii->ii_Unique = indexStruct->indisunique;
-	ii->ii_ReadyForInserts = indexStruct->indisready;
+	ii->ii_ReadyForInserts = IndexIsReady(indexStruct);
 
 	/* initialize index-build state to default */
 	ii->ii_Concurrent = false;
@@ -2035,8 +2024,20 @@ index_build(Relation heapRelation,
 	 * index's usability horizon.  Moreover, we *must not* try to change the
 	 * index's pg_index entry while reindexing pg_index itself, and this
 	 * optimization nicely prevents that.
-	 */
-	if (indexInfo->ii_BrokenHotChain && !isreindex)
+	 *
+	 * We also need not set indcheckxmin during a concurrent index build,
+	 * because we won't set indisvalid true until all transactions that care
+	 * about the broken HOT chains are gone.
+	 *
+	 * Therefore, this code path can only be taken during non-concurrent
+	 * CREATE INDEX.  Thus the fact that heap_update will set the pg_index
+	 * tuple's xmin doesn't matter, because that tuple was created in the
+	 * current transaction anyway.	That also means we don't need to worry
+	 * about any concurrent readers of the tuple; no other transaction can see
+	 * it yet.
+	 */
+	if (indexInfo->ii_BrokenHotChain && !isreindex &&
+		!indexInfo->ii_Concurrent)
 	{
 		Oid			indexId = RelationGetRelid(indexRelation);
 		Relation	pg_index;
@@ -2999,6 +3000,97 @@ validate_index_heapscan(Relation heapRelation,
 }
 
 
+/*
+ * index_set_state_flags - adjust pg_index state flags
+ *
+ * This is used during CREATE/DROP INDEX CONCURRENTLY to adjust the pg_index
+ * flags that denote the index's state.  We must use an in-place update of
+ * the pg_index tuple, because we do not have exclusive lock on the parent
+ * table and so other sessions might concurrently be doing SnapshotNow scans
+ * of pg_index to identify the table's indexes.  A transactional update would
+ * risk somebody not seeing the index at all.  Because the update is not
+ * transactional and will not roll back on error, this must only be used as
+ * the last step in a transaction that has not made any transactional catalog
+ * updates!
+ *
+ * Note that heap_inplace_update does send a cache inval message for the
+ * tuple, so other sessions will hear about the update as soon as we commit.
+ */
+void
+index_set_state_flags(Oid indexId, IndexStateFlagsAction action)
+{
+	Relation	pg_index;
+	HeapTuple	indexTuple;
+	Form_pg_index indexForm;
+
+	/* Assert that current xact hasn't done any transactional updates */
+	Assert(GetTopTransactionIdIfAny() == InvalidTransactionId);
+
+	/* Open pg_index and fetch a writable copy of the index's tuple */
+	pg_index = heap_open(IndexRelationId, RowExclusiveLock);
+
+	indexTuple = SearchSysCacheCopy1(INDEXRELID,
+									 ObjectIdGetDatum(indexId));
+	if (!HeapTupleIsValid(indexTuple))
+		elog(ERROR, "cache lookup failed for index %u", indexId);
+	indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
+
+	/* Perform the requested state change on the copy */
+	switch (action)
+	{
+		case INDEX_CREATE_SET_READY:
+			/* Set indisready during a CREATE INDEX CONCURRENTLY sequence */
+			Assert(indexForm->indislive);
+			Assert(!indexForm->indisready);
+			Assert(!indexForm->indisvalid);
+			indexForm->indisready = true;
+			break;
+		case INDEX_CREATE_SET_VALID:
+			/* Set indisvalid during a CREATE INDEX CONCURRENTLY sequence */
+			Assert(indexForm->indislive);
+			Assert(indexForm->indisready);
+			Assert(!indexForm->indisvalid);
+			indexForm->indisvalid = true;
+			break;
+		case INDEX_DROP_CLEAR_VALID:
+
+			/*
+			 * Clear indisvalid during a DROP INDEX CONCURRENTLY sequence
+			 *
+			 * If indisready == true we leave it set so the index still gets
+			 * maintained by active transactions.  We only need to ensure that
+			 * indisvalid is false.  (We don't assert that either is initially
+			 * true, though, since we want to be able to retry a DROP INDEX
+			 * CONCURRENTLY that failed partway through.)
+			 *
+			 * Note: the CLUSTER logic assumes that indisclustered cannot be
+			 * set on any invalid index, so clear that flag too.
+			 */
+			indexForm->indisvalid = false;
+			indexForm->indisclustered = false;
+			break;
+		case INDEX_DROP_SET_DEAD:
+
+			/*
+			 * Clear indisready/indislive during DROP INDEX CONCURRENTLY
+			 *
+			 * We clear both indisready and indislive, because we not only
+			 * want to stop updates, we want to prevent sessions from touching
+			 * the index at all.
+			 */
+			Assert(!indexForm->indisvalid);
+			indexForm->indisready = false;
+			indexForm->indislive = false;
+			break;
+	}
+
+	/* ... and write it back in-place */
+	heap_inplace_update(pg_index, indexTuple);
+
+	heap_close(pg_index, RowExclusiveLock);
+}
+
+
 /*
  * IndexGetRelation: given an index's relation OID, get the OID of the
  * relation it is an index on.	Uses the system cache.
@@ -3032,12 +3124,9 @@ void
 reindex_index(Oid indexId, bool skip_constraint_checks)
 {
 	Relation	iRel,
-				heapRelation,
-				pg_index;
+				heapRelation;
 	Oid			heapId;
 	IndexInfo  *indexInfo;
-	HeapTuple	indexTuple;
-	Form_pg_index indexForm;
 	volatile bool skipped_constraint = false;
 
 	/*
@@ -3110,32 +3199,48 @@ reindex_index(Oid indexId, bool skip_constraint_checks)
 	ResetReindexProcessing();
 
 	/*
-	 * If the index is marked invalid or not ready (ie, it's from a failed
-	 * CREATE INDEX CONCURRENTLY), and we didn't skip a uniqueness check, we
-	 * can now mark it valid.  This allows REINDEX to be used to clean up in
-	 * such cases.
+	 * If the index is marked invalid/not-ready/dead (ie, it's from a failed
+	 * CREATE INDEX CONCURRENTLY, or a DROP INDEX CONCURRENTLY failed midway),
+	 * and we didn't skip a uniqueness check, we can now mark it valid.  This
+	 * allows REINDEX to be used to clean up in such cases.
 	 *
 	 * We can also reset indcheckxmin, because we have now done a
 	 * non-concurrent index build, *except* in the case where index_build
-	 * found some still-broken HOT chains.	If it did, we normally leave
-	 * indcheckxmin alone (note that index_build won't have changed it,
-	 * because this is a reindex).	But if the index was invalid or not ready
-	 * and there were broken HOT chains, it seems best to force indcheckxmin
-	 * true, because the normal argument that the HOT chains couldn't conflict
-	 * with the index is suspect for an invalid index.
+	 * found some still-broken HOT chains. If it did, and we don't have to
+	 * change any of the other flags, we just leave indcheckxmin alone (note
+	 * that index_build won't have changed it, because this is a reindex).
+	 * This is okay and desirable because not updating the tuple leaves the
+	 * index's usability horizon (recorded as the tuple's xmin value) the same
+	 * as it was.
 	 *
-	 * Note that it is important to not update the pg_index entry if we don't
-	 * have to, because updating it will move the index's usability horizon
-	 * (recorded as the tuple's xmin value) if indcheckxmin is true.  We don't
-	 * really want REINDEX to move the usability horizon forward ever, but we
-	 * have no choice if we are to fix indisvalid or indisready.  Of course,
-	 * clearing indcheckxmin eliminates the issue, so we're happy to do that
-	 * if we can.  Another reason for caution here is that while reindexing
-	 * pg_index itself, we must not try to update it.  We assume that
-	 * pg_index's indexes will always have these flags in their clean state.
+	 * But, if the index was invalid/not-ready/dead and there were broken HOT
+	 * chains, we had better force indcheckxmin true, because the normal
+	 * argument that the HOT chains couldn't conflict with the index is
+	 * suspect for an invalid index.  (A conflict is definitely possible if
+	 * the index was dead.	It probably shouldn't happen otherwise, but let's
+	 * be conservative.)  In this case advancing the usability horizon is
+	 * appropriate.
+	 *
+	 * Note that if we have to update the tuple, there is a risk of concurrent
+	 * transactions not seeing it during their SnapshotNow scans of pg_index.
+	 * While not especially desirable, this is safe because no such
+	 * transaction could be trying to update the table (since we have
+	 * ShareLock on it).  The worst case is that someone might transiently
+	 * fail to use the index for a query --- but it was probably unusable
+	 * before anyway, if we are updating the tuple.
+	 *
+	 * Another reason for avoiding unnecessary updates here is that while
+	 * reindexing pg_index itself, we must not try to update tuples in it.
+	 * pg_index's indexes should always have these flags in their clean state,
+	 * so that won't happen.
 	 */
 	if (!skipped_constraint)
 	{
+		Relation	pg_index;
+		HeapTuple	indexTuple;
+		Form_pg_index indexForm;
+		bool		index_bad;
+
 		pg_index = heap_open(IndexRelationId, RowExclusiveLock);
 
 		indexTuple = SearchSysCacheCopy1(INDEXRELID,
@@ -3144,17 +3249,30 @@ reindex_index(Oid indexId, bool skip_constraint_checks)
 			elog(ERROR, "cache lookup failed for index %u", indexId);
 		indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
 
-		if (!indexForm->indisvalid || !indexForm->indisready ||
+		index_bad = (!indexForm->indisvalid ||
+					 !indexForm->indisready ||
+					 !indexForm->indislive);
+		if (index_bad ||
 			(indexForm->indcheckxmin && !indexInfo->ii_BrokenHotChain))
 		{
 			if (!indexInfo->ii_BrokenHotChain)
 				indexForm->indcheckxmin = false;
-			else if (!indexForm->indisvalid || !indexForm->indisready)
+			else if (index_bad)
 				indexForm->indcheckxmin = true;
 			indexForm->indisvalid = true;
 			indexForm->indisready = true;
+			indexForm->indislive = true;
 			simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
 			CatalogUpdateIndexes(pg_index, indexTuple);
+
+			/*
+			 * Invalidate the relcache for the table, so that after we commit
+			 * all sessions will refresh the table's index list.  This ensures
+			 * that if anyone misses seeing the pg_index row during this
+			 * update, they'll refresh their list before attempting any update
+			 * on the table.
+			 */
+			CacheInvalidateRelcache(heapRelation);
 		}
 
 		heap_close(pg_index, RowExclusiveLock);
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index de71a3594d48302b4b6be801ad383a2f509bdd6d..c3deb567626ef22cf4e2dbf072267d1aed562b95 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -444,7 +444,7 @@ check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck, LOCKMOD
 	 * might put recently-dead tuples out-of-order in the new table, and there
 	 * is little harm in that.)
 	 */
-	if (!OldIndex->rd_index->indisvalid)
+	if (!IndexIsValid(OldIndex->rd_index))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot cluster on invalid index \"%s\"",
@@ -458,6 +458,11 @@ check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck, LOCKMOD
  * mark_index_clustered: mark the specified index as the one clustered on
  *
  * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
+ *
+ * Note: we do transactional updates of the pg_index rows, which are unsafe
+ * against concurrent SnapshotNow scans of pg_index.  Therefore this is unsafe
+ * to execute with less than full exclusive lock on the parent table;
+ * otherwise concurrent executions of RelationGetIndexList could miss indexes.
  */
 void
 mark_index_clustered(Relation rel, Oid indexOid)
@@ -513,6 +518,9 @@ mark_index_clustered(Relation rel, Oid indexOid)
 		}
 		else if (thisIndexOid == indexOid)
 		{
+			/* this was checked earlier, but let's be real sure */
+			if (!IndexIsValid(indexForm))
+				elog(ERROR, "cannot cluster on invalid index %u", indexOid);
 			indexForm->indisclustered = true;
 			simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
 			CatalogUpdateIndexes(pg_index, indexTuple);
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index dd46cf93dad8b32bfea63cf699a026c2f7f8681a..75f9ff19cc70bcbd417eb2bedd8dc30a827f23ee 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -124,6 +124,7 @@ CheckIndexCompatible(Oid oldId,
 	Oid			accessMethodId;
 	Oid			relationId;
 	HeapTuple	tuple;
+	Form_pg_index indexForm;
 	Form_pg_am	accessMethodForm;
 	bool		amcanorder;
 	int16	   *coloptions;
@@ -193,17 +194,22 @@ CheckIndexCompatible(Oid oldId,
 	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
 	if (!HeapTupleIsValid(tuple))
 		elog(ERROR, "cache lookup failed for index %u", oldId);
+	indexForm = (Form_pg_index) GETSTRUCT(tuple);
 
-	/* We don't assess expressions or predicates; assume incompatibility. */
+	/*
+	 * We don't assess expressions or predicates; assume incompatibility.
+	 * Also, if the index is invalid for any reason, treat it as incompatible.
+	 */
 	if (!(heap_attisnull(tuple, Anum_pg_index_indpred) &&
-		  heap_attisnull(tuple, Anum_pg_index_indexprs)))
+		  heap_attisnull(tuple, Anum_pg_index_indexprs) &&
+		  IndexIsValid(indexForm)))
 	{
 		ReleaseSysCache(tuple);
 		return false;
 	}
 
 	/* Any change in operator class or collation breaks compatibility. */
-	old_natts = ((Form_pg_index) GETSTRUCT(tuple))->indnatts;
+	old_natts = indexForm->indnatts;
 	Assert(old_natts == numberOfAttributes);
 
 	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
@@ -320,9 +326,6 @@ DefineIndex(IndexStmt *stmt,
 	LockRelId	heaprelid;
 	LOCKTAG		heaplocktag;
 	Snapshot	snapshot;
-	Relation	pg_index;
-	HeapTuple	indexTuple;
-	Form_pg_index indexForm;
 	int			i;
 
 	/*
@@ -717,23 +720,7 @@ DefineIndex(IndexStmt *stmt,
 	 * commit this transaction, any new transactions that open the table must
 	 * insert new entries into the index for insertions and non-HOT updates.
 	 */
-	pg_index = heap_open(IndexRelationId, RowExclusiveLock);
-
-	indexTuple = SearchSysCacheCopy1(INDEXRELID,
-									 ObjectIdGetDatum(indexRelationId));
-	if (!HeapTupleIsValid(indexTuple))
-		elog(ERROR, "cache lookup failed for index %u", indexRelationId);
-	indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
-
-	Assert(!indexForm->indisready);
-	Assert(!indexForm->indisvalid);
-
-	indexForm->indisready = true;
-
-	simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
-	CatalogUpdateIndexes(pg_index, indexTuple);
-
-	heap_close(pg_index, RowExclusiveLock);
+	index_set_state_flags(indexRelationId, INDEX_CREATE_SET_READY);
 
 	/* we can do away with our snapshot */
 	PopActiveSnapshot();
@@ -857,23 +844,7 @@ DefineIndex(IndexStmt *stmt,
 	/*
 	 * Index can now be marked valid -- update its pg_index entry
 	 */
-	pg_index = heap_open(IndexRelationId, RowExclusiveLock);
-
-	indexTuple = SearchSysCacheCopy1(INDEXRELID,
-									 ObjectIdGetDatum(indexRelationId));
-	if (!HeapTupleIsValid(indexTuple))
-		elog(ERROR, "cache lookup failed for index %u", indexRelationId);
-	indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
-
-	Assert(indexForm->indisready);
-	Assert(!indexForm->indisvalid);
-
-	indexForm->indisvalid = true;
-
-	simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
-	CatalogUpdateIndexes(pg_index, indexTuple);
-
-	heap_close(pg_index, RowExclusiveLock);
+	index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
 
 	/*
 	 * The pg_index update will cause backends (including this one) to update
@@ -881,7 +852,7 @@ DefineIndex(IndexStmt *stmt,
 	 * relcache inval on the parent table to force replanning of cached plans.
 	 * Otherwise existing sessions might fail to use the new index where it
 	 * would be useful.  (Note that our earlier commits did not create reasons
-	 * to replan; relcache flush on the index itself was sufficient.)
+	 * to replan; so relcache flush on the index itself was sufficient.)
 	 */
 	CacheInvalidateRelcacheByRelid(heaprelid.relId);
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index f88bf793e57e60f98a73fcf1de5a750d34cdb5b0..4065740b336079305bc663b57ef8ae96b9692100 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -744,10 +744,13 @@ RemoveRelations(DropStmt *drop)
 	int			flags = 0;
 	LOCKMODE	lockmode = AccessExclusiveLock;
 
+	/* DROP CONCURRENTLY uses a weaker lock, and has some restrictions */
 	if (drop->concurrent)
 	{
+		flags |= PERFORM_DELETION_CONCURRENTLY;
 		lockmode = ShareUpdateExclusiveLock;
-		if (list_length(drop->objects) > 1)
+		Assert(drop->removeType == OBJECT_INDEX);
+		if (list_length(drop->objects) != 1)
 			ereport(ERROR,
 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 					 errmsg("DROP INDEX CONCURRENTLY does not support dropping multiple objects")));
@@ -839,19 +842,6 @@ RemoveRelations(DropStmt *drop)
 		add_exact_object_address(&obj, objects);
 	}
 
-	/*
-	 * Set options and check further requirements for concurrent drop
-	 */
-	if (drop->concurrent)
-	{
-		/*
-		 * Confirm that concurrent behaviour is restricted in grammar.
-		 */
-		Assert(drop->removeType == OBJECT_INDEX);
-
-		flags |= PERFORM_DELETION_CONCURRENTLY;
-	}
-
 	performMultipleDeletions(objects, drop->behavior, flags);
 
 	free_object_addresses(objects);
@@ -918,7 +908,7 @@ RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid,
 	 * locking the index.  index_drop() will need this anyway, and since
 	 * regular queries lock tables before their indexes, we risk deadlock if
 	 * we do it the other way around.  No error if we don't find a pg_index
-	 * entry, though --- the relation may have been droppd.
+	 * entry, though --- the relation may have been dropped.
 	 */
 	if (relkind == RELKIND_INDEX && relOid != oldRelOid)
 	{
@@ -4784,6 +4774,8 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
 
 	/*
 	 * Check that the attribute is not in a primary key
+	 *
+	 * Note: we'll throw error even if the pkey index is not valid.
 	 */
 
 	/* Loop over all indexes on the relation */
@@ -6318,7 +6310,7 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
 	/*
 	 * Get the list of index OIDs for the table from the relcache, and look up
 	 * each one in the pg_index syscache until we find one marked primary key
-	 * (hopefully there isn't more than one such).
+	 * (hopefully there isn't more than one such).  Insist it's valid, too.
 	 */
 	*indexOid = InvalidOid;
 
@@ -6332,7 +6324,7 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
 		if (!HeapTupleIsValid(indexTuple))
 			elog(ERROR, "cache lookup failed for index %u", indexoid);
 		indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
-		if (indexStruct->indisprimary)
+		if (indexStruct->indisprimary && IndexIsValid(indexStruct))
 		{
 			/*
 			 * Refuse to use a deferrable primary key.	This is per SQL spec,
@@ -6430,10 +6422,12 @@ transformFkeyCheckAttrs(Relation pkrel,
 
 		/*
 		 * Must have the right number of columns; must be unique and not a
-		 * partial index; forget it if there are any expressions, too
+		 * partial index; forget it if there are any expressions, too. Invalid
+		 * indexes are out as well.
 		 */
 		if (indexStruct->indnatts == numattrs &&
 			indexStruct->indisunique &&
+			IndexIsValid(indexStruct) &&
 			heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
 			heap_attisnull(indexTuple, Anum_pg_index_indexprs))
 		{
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 14d1c08c9738184b70b9bff27798bcfef9c5783a..c85289544808867257b1f3cf867fa67a87b6736a 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1097,9 +1097,16 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound)
 
 
 /*
- * Open all the indexes of the given relation, obtaining the specified kind
- * of lock on each.  Return an array of Relation pointers for the indexes
- * into *Irel, and the number of indexes into *nindexes.
+ * Open all the vacuumable indexes of the given relation, obtaining the
+ * specified kind of lock on each.	Return an array of Relation pointers for
+ * the indexes into *Irel, and the number of indexes into *nindexes.
+ *
+ * We consider an index vacuumable if it is marked insertable (IndexIsReady).
+ * If it isn't, probably a CREATE INDEX CONCURRENTLY command failed early in
+ * execution, and what we have is too corrupt to be processable.  We will
+ * vacuum even if the index isn't indisvalid; this is important because in a
+ * unique index, uniqueness checks will be performed anyway and had better not
+ * hit dangling index pointers.
  */
 void
 vac_open_indexes(Relation relation, LOCKMODE lockmode,
@@ -1113,21 +1120,30 @@ vac_open_indexes(Relation relation, LOCKMODE lockmode,
 
 	indexoidlist = RelationGetIndexList(relation);
 
-	*nindexes = list_length(indexoidlist);
+	/* allocate enough memory for all indexes */
+	i = list_length(indexoidlist);
 
-	if (*nindexes > 0)
-		*Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
+	if (i > 0)
+		*Irel = (Relation *) palloc(i * sizeof(Relation));
 	else
 		*Irel = NULL;
 
+	/* collect just the ready indexes */
 	i = 0;
 	foreach(indexoidscan, indexoidlist)
 	{
 		Oid			indexoid = lfirst_oid(indexoidscan);
+		Relation	indrel;
 
-		(*Irel)[i++] = index_open(indexoid, lockmode);
+		indrel = index_open(indexoid, lockmode);
+		if (IndexIsReady(indrel->rd_index))
+			(*Irel)[i++] = indrel;
+		else
+			index_close(indrel, lockmode);
 	}
 
+	*nindexes = i;
+
 	list_free(indexoidlist);
 }
 
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 0bbd0d464025c71dac05d72119c5a4d3052ecbf7..d6cf06c6fdfa90b51030aa26bbdac779576b82b9 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -906,6 +906,9 @@ ExecOpenIndices(ResultRelInfo *resultRelInfo)
 	/*
 	 * For each index, open the index relation and save pg_index info. We
 	 * acquire RowExclusiveLock, signifying we will update the index.
+	 *
+	 * Note: we do this even if the index is not IndexIsReady; it's not worth
+	 * the trouble to optimize for the case where it isn't.
 	 */
 	i = 0;
 	foreach(l, indexoidlist)
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index abcd0ee574cf4f7560e71d8fdc11ebea5a9b303c..04d502866b516380a7695c773ca830869eb91fe4 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -170,9 +170,10 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 			 * Ignore invalid indexes, since they can't safely be used for
 			 * queries.  Note that this is OK because the data structure we
 			 * are constructing is only used by the planner --- the executor
-			 * still needs to insert into "invalid" indexes!
+			 * still needs to insert into "invalid" indexes, if they're marked
+			 * IndexIsReady.
 			 */
-			if (!index->indisvalid)
+			if (!IndexIsValid(index))
 			{
 				index_close(indexRelation, NoLock);
 				continue;
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index 95c57e81b584e9e5c82a2efb3a626632c6753cfd..086cc75e2269ec1cd621b1024970860112ca470a 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -1533,18 +1533,12 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
 							index_name, RelationGetRelationName(heap_rel)),
 					 parser_errposition(cxt->pstate, constraint->location)));
 
-		if (!index_form->indisvalid)
+		if (!IndexIsValid(index_form))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("index \"%s\" is not valid", index_name),
 					 parser_errposition(cxt->pstate, constraint->location)));
 
-		if (!index_form->indisready)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("index \"%s\" is not ready", index_name),
-					 parser_errposition(cxt->pstate, constraint->location)));
-
 		if (!index_form->indisunique)
 			ereport(ERROR,
 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 8c9ebe0f6f89cda440fae64299e6ba5d59aff150..9a504f80251b6c1c548672282612ec130ba70401 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1731,9 +1731,23 @@ RelationReloadIndexInfo(Relation relation)
 				 RelationGetRelid(relation));
 		index = (Form_pg_index) GETSTRUCT(tuple);
 
+		/*
+		 * Basically, let's just copy all the bool fields.  There are one or
+		 * two of these that can't actually change in the current code, but
+		 * it's not worth it to track exactly which ones they are.  None of
+		 * the array fields are allowed to change, though.
+		 */
+		relation->rd_index->indisunique = index->indisunique;
+		relation->rd_index->indisprimary = index->indisprimary;
+		relation->rd_index->indisexclusion = index->indisexclusion;
+		relation->rd_index->indimmediate = index->indimmediate;
+		relation->rd_index->indisclustered = index->indisclustered;
 		relation->rd_index->indisvalid = index->indisvalid;
 		relation->rd_index->indcheckxmin = index->indcheckxmin;
 		relation->rd_index->indisready = index->indisready;
+		relation->rd_index->indislive = index->indislive;
+
+		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
 		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
 							   HeapTupleHeaderGetXmin(tuple->t_data));
 
@@ -3299,6 +3313,10 @@ CheckConstraintFetch(Relation relation)
  * so that we must recompute the index list on next request.  This handles
  * creation or deletion of an index.
  *
+ * Indexes that are marked not IndexIsLive are omitted from the returned list.
+ * Such indexes are expected to be dropped momentarily, and should not be
+ * touched at all by any caller of this function.
+ *
  * The returned list is guaranteed to be sorted in order by OID.  This is
  * needed by the executor, since for index types that we obtain exclusive
  * locks on when updating the index, all backends must lock the indexes in
@@ -3358,9 +3376,12 @@ RelationGetIndexList(Relation relation)
 		bool		isnull;
 
 		/*
-		 * Ignore any indexes that are currently being dropped
+		 * Ignore any indexes that are currently being dropped.  This will
+		 * prevent them from being searched, inserted into, or considered in
+		 * HOT-safety decisions.  It's unsafe to touch such an index at all
+		 * since its catalog entries could disappear at any instant.
 		 */
-		if (!index->indisvalid && !index->indisready)
+		if (!IndexIsLive(index))
 			continue;
 
 		/* Add index's OID to result list in the proper order */
@@ -3379,7 +3400,8 @@ RelationGetIndexList(Relation relation)
 		indclass = (oidvector *) DatumGetPointer(indclassDatum);
 
 		/* Check to see if it is a unique, non-partial btree index on OID */
-		if (index->indnatts == 1 &&
+		if (IndexIsValid(index) &&
+			index->indnatts == 1 &&
 			index->indisunique && index->indimmediate &&
 			index->indkey.values[0] == ObjectIdAttributeNumber &&
 			indclass->values[0] == OID_BTREE_OPS_OID &&
@@ -3674,6 +3696,13 @@ RelationGetIndexAttrBitmap(Relation relation)
 
 	/*
 	 * For each index, add referenced attributes to indexattrs.
+	 *
+	 * Note: we consider all indexes returned by RelationGetIndexList, even if
+	 * they are not indisready or indisvalid.  This is important because an
+	 * index for which CREATE INDEX CONCURRENTLY has just started must be
+	 * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
+	 * CONCURRENTLY is far enough along that we should ignore the index, it
+	 * won't be returned at all by RelationGetIndexList.
 	 */
 	indexattrs = NULL;
 	foreach(l, indexoidlist)
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 5254a57d8a6816b8d1a91b6effab7fc7c2398230..9622356a6376e85fc2ccbf16afc0d1e9e68ab28f 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	201210071
+#define CATALOG_VERSION_NO	201211281
 
 #endif
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 298641baf7c1f3a8034df0161596a16efdaa0268..b96099f94ce39c9dec94d069847e2ef85788b6eb 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -27,6 +27,15 @@ typedef void (*IndexBuildCallback) (Relation index,
 												bool tupleIsAlive,
 												void *state);
 
+/* Action code for index_set_state_flags */
+typedef enum
+{
+	INDEX_CREATE_SET_READY,
+	INDEX_CREATE_SET_VALID,
+	INDEX_DROP_CLEAR_VALID,
+	INDEX_DROP_SET_DEAD
+} IndexStateFlagsAction;
+
 
 extern void index_check_primary_key(Relation heapRel,
 						IndexInfo *indexInfo,
@@ -90,6 +99,8 @@ extern double IndexBuildHeapScan(Relation heapRelation,
 
 extern void validate_index(Oid heapId, Oid indexId, Snapshot snapshot);
 
+extern void index_set_state_flags(Oid indexId, IndexStateFlagsAction action);
+
 extern void reindex_index(Oid indexId, bool skip_constraint_checks);
 
 /* Flag bits for reindex_relation(): */
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index 934fe97e5f2e966b1ae038522ad74a97ad018af9..0ebe2cc9e2c6780fa7bd59fd299563c7f41a8ff6 100644
--- a/src/include/catalog/pg_index.h
+++ b/src/include/catalog/pg_index.h
@@ -41,6 +41,7 @@ CATALOG(pg_index,2610) BKI_WITHOUT_OIDS BKI_SCHEMA_MACRO
 	bool		indisvalid;		/* is this index valid for use by queries? */
 	bool		indcheckxmin;	/* must we wait for xmin to be old? */
 	bool		indisready;		/* is this index ready for inserts? */
+	bool		indislive;		/* is this index alive at all? */
 
 	/* variable-length fields start here, but we allow direct access to indkey */
 	int2vector	indkey;			/* column numbers of indexed cols, or 0 */
@@ -68,7 +69,7 @@ typedef FormData_pg_index *Form_pg_index;
  *		compiler constants for pg_index
  * ----------------
  */
-#define Natts_pg_index					17
+#define Natts_pg_index					18
 #define Anum_pg_index_indexrelid		1
 #define Anum_pg_index_indrelid			2
 #define Anum_pg_index_indnatts			3
@@ -80,12 +81,13 @@ typedef FormData_pg_index *Form_pg_index;
 #define Anum_pg_index_indisvalid		9
 #define Anum_pg_index_indcheckxmin		10
 #define Anum_pg_index_indisready		11
-#define Anum_pg_index_indkey			12
-#define Anum_pg_index_indcollation		13
-#define Anum_pg_index_indclass			14
-#define Anum_pg_index_indoption			15
-#define Anum_pg_index_indexprs			16
-#define Anum_pg_index_indpred			17
+#define Anum_pg_index_indislive			12
+#define Anum_pg_index_indkey			13
+#define Anum_pg_index_indcollation		14
+#define Anum_pg_index_indclass			15
+#define Anum_pg_index_indoption			16
+#define Anum_pg_index_indexprs			17
+#define Anum_pg_index_indpred			18
 
 /*
  * Index AMs that support ordered scans must support these two indoption
@@ -95,4 +97,13 @@ typedef FormData_pg_index *Form_pg_index;
 #define INDOPTION_DESC			0x0001	/* values are in reverse order */
 #define INDOPTION_NULLS_FIRST	0x0002	/* NULLs are first instead of last */
 
+/*
+ * Use of these macros is recommended over direct examination of the state
+ * flag columns where possible; this allows source code compatibility with
+ * the hacky representation used in 9.2.
+ */
+#define IndexIsValid(indexForm) ((indexForm)->indisvalid)
+#define IndexIsReady(indexForm) ((indexForm)->indisready)
+#define IndexIsLive(indexForm)	((indexForm)->indislive)
+
 #endif   /* PG_INDEX_H */