From 7b4ac19982a77a1a2a6f096c4a11ee7325a14d2c Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Tue, 24 Jan 2017 16:42:58 -0500
Subject: [PATCH] Extend index AM API for parallel index scans.

This patch doesn't actually make any index AM parallel-aware, but it
provides the necessary functions at the AM layer to do so.

Rahila Syed, Amit Kapila, Robert Haas
---
 contrib/bloom/blutils.c              |   3 +
 doc/src/sgml/indexam.sgml            |  67 +++++++++++++
 src/backend/access/brin/brin.c       |   3 +
 src/backend/access/gin/ginutil.c     |   3 +
 src/backend/access/gist/gist.c       |   3 +
 src/backend/access/hash/hash.c       |   3 +
 src/backend/access/index/indexam.c   | 135 ++++++++++++++++++++++++++-
 src/backend/access/nbtree/nbtree.c   |   3 +
 src/backend/access/spgist/spgutils.c |   3 +
 src/include/access/amapi.h           |  17 ++++
 src/include/access/genam.h           |   9 ++
 src/include/access/relscan.h         |  13 +++
 src/include/c.h                      |   3 +
 src/tools/pgindent/typedefs.list     |   2 +
 14 files changed, 262 insertions(+), 5 deletions(-)

diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c
index 06077afed69..858798db85c 100644
--- a/contrib/bloom/blutils.c
+++ b/contrib/bloom/blutils.c
@@ -138,6 +138,9 @@ blhandler(PG_FUNCTION_ARGS)
 	amroutine->amendscan = blendscan;
 	amroutine->ammarkpos = NULL;
 	amroutine->amrestrpos = NULL;
+	amroutine->amestimateparallelscan = NULL;
+	amroutine->aminitparallelscan = NULL;
+	amroutine->amparallelrescan = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index 40f201b11be..5d8e5574608 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -131,6 +131,11 @@ typedef struct IndexAmRoutine
     amendscan_function amendscan;
     ammarkpos_function ammarkpos;       /* can be NULL */
     amrestrpos_function amrestrpos;     /* can be NULL */
+
+    /* interface functions to support parallel index scans */
+    amestimateparallelscan_function amestimateparallelscan;    /* can be NULL */
+    aminitparallelscan_function aminitparallelscan;    /* can be NULL */
+    amparallelrescan_function amparallelrescan;    /* can be NULL */
 } IndexAmRoutine;
 </programlisting>
   </para>
@@ -624,6 +629,68 @@ amrestrpos (IndexScanDesc scan);
    the <structfield>amrestrpos</> field in its <structname>IndexAmRoutine</>
    struct may be set to NULL.
   </para>
+
+  <para>
+   In addition to supporting ordinary index scans, some types of index
+   may wish to support <firstterm>parallel index scans</>, which allow
+   multiple backends to cooperate in performing an index scan.  The
+   index access method should arrange things so that each cooperating
+   process returns a subset of the tuples that would be performed by
+   an ordinary, non-parallel index scan, but in such a way that the
+   union of those subsets is equal to the set of tuples that would be
+   returned by an ordinary, non-parallel index scan.  Furthermore, while
+   there need not be any global ordering of tuples returned by a parallel
+   scan, the ordering of that subset of tuples returned within each
+   cooperating backend must match the requested ordering.  The following
+   functions may be implemented to support parallel index scans:
+  </para>
+
+  <para>
+<programlisting>
+Size
+amestimateparallelscan (void);
+</programlisting>
+   Estimate and return the number of bytes of dynamic shared memory which
+   the access method will be needed to perform a parallel scan.  (This number
+   is in addition to, not in lieu of, the amount of space needed for
+   AM-independent data in <structname>ParallelIndexScanDescData</>.)
+  </para>
+
+  <para>
+   It is not necessary to implement this function for access methods which
+   do not support parallel scans or for which the number of additional bytes
+   of storage required is zero.
+  </para>
+
+  <para>
+<programlisting>
+void
+aminitparallelscan (void *target);
+</programlisting>
+   This function will be called to initialize dynamic shared memory at the
+   beginning of a parallel scan.  <parameter>target</> will point to at least
+   the number of bytes previously returned by
+   <function>amestimateparallelscan</>, and this function may use that
+   amount of space to store whatever data it wishes.
+  </para>
+
+  <para>
+   It is not necessary to implement this function for access methods which
+   do not support parallel scans or in cases where the shared memory space
+   required needs no initialization.
+  </para>
+
+  <para>
+<programlisting>
+void
+amparallelrescan (IndexScanDesc scan);
+</programlisting>
+   This function, if implemented, will be called when a parallel index scan
+   must be restarted.  It should reset any shared state set up by
+   <function>aminitparallelscan</> such that the scan will be restarted from
+   the beginning.
+  </para>
+
  </sect1>
 
  <sect1 id="index-scanning">
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index d60ddd242cb..b2afdb7bedb 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -112,6 +112,9 @@ brinhandler(PG_FUNCTION_ARGS)
 	amroutine->amendscan = brinendscan;
 	amroutine->ammarkpos = NULL;
 	amroutine->amrestrpos = NULL;
+	amroutine->amestimateparallelscan = NULL;
+	amroutine->aminitparallelscan = NULL;
+	amroutine->amparallelrescan = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index 3909638906b..02d920bb9db 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -68,6 +68,9 @@ ginhandler(PG_FUNCTION_ARGS)
 	amroutine->amendscan = ginendscan;
 	amroutine->ammarkpos = NULL;
 	amroutine->amrestrpos = NULL;
+	amroutine->amestimateparallelscan = NULL;
+	amroutine->aminitparallelscan = NULL;
+	amroutine->amparallelrescan = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 597056ae440..c2247ad2f78 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -89,6 +89,9 @@ gisthandler(PG_FUNCTION_ARGS)
 	amroutine->amendscan = gistendscan;
 	amroutine->ammarkpos = NULL;
 	amroutine->amrestrpos = NULL;
+	amroutine->amestimateparallelscan = NULL;
+	amroutine->aminitparallelscan = NULL;
+	amroutine->amparallelrescan = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index a64a9b9696a..ec8ed33c708 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -86,6 +86,9 @@ hashhandler(PG_FUNCTION_ARGS)
 	amroutine->amendscan = hashendscan;
 	amroutine->ammarkpos = NULL;
 	amroutine->amrestrpos = NULL;
+	amroutine->amestimateparallelscan = NULL;
+	amroutine->aminitparallelscan = NULL;
+	amroutine->amparallelrescan = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 4822af95a32..ba27c1e86d9 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -20,6 +20,10 @@
  *		index_insert	- insert an index tuple into a relation
  *		index_markpos	- mark a scan position
  *		index_restrpos	- restore a scan position
+ *		index_parallelscan_estimate - estimate shared memory for parallel scan
+ *		index_parallelscan_initialize - initialize parallel scan
+ *		index_parallelrescan  - (re)start a parallel scan of an index
+ *		index_beginscan_parallel - join parallel index scan
  *		index_getnext_tid	- get the next TID from a scan
  *		index_fetch_heap		- get the scan's next heap tuple
  *		index_getnext	- get the next heap tuple from a scan
@@ -120,7 +124,8 @@ do { \
 } while(0)
 
 static IndexScanDesc index_beginscan_internal(Relation indexRelation,
-						 int nkeys, int norderbys, Snapshot snapshot);
+						 int nkeys, int norderbys, Snapshot snapshot,
+						 ParallelIndexScanDesc pscan, bool temp_snap);
 
 
 /* ----------------------------------------------------------------
@@ -219,7 +224,7 @@ index_beginscan(Relation heapRelation,
 {
 	IndexScanDesc scan;
 
-	scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot);
+	scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot, NULL, false);
 
 	/*
 	 * Save additional parameters into the scandesc.  Everything else was set
@@ -244,7 +249,7 @@ index_beginscan_bitmap(Relation indexRelation,
 {
 	IndexScanDesc scan;
 
-	scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot);
+	scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot, NULL, false);
 
 	/*
 	 * Save additional parameters into the scandesc.  Everything else was set
@@ -260,8 +265,11 @@ index_beginscan_bitmap(Relation indexRelation,
  */
 static IndexScanDesc
 index_beginscan_internal(Relation indexRelation,
-						 int nkeys, int norderbys, Snapshot snapshot)
+						 int nkeys, int norderbys, Snapshot snapshot,
+						 ParallelIndexScanDesc pscan, bool temp_snap)
 {
+	IndexScanDesc scan;
+
 	RELATION_CHECKS;
 	CHECK_REL_PROCEDURE(ambeginscan);
 
@@ -276,8 +284,13 @@ index_beginscan_internal(Relation indexRelation,
 	/*
 	 * Tell the AM to open a scan.
 	 */
-	return indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys,
+	scan = indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys,
 													norderbys);
+	/* Initialize information for parallel scan. */
+	scan->parallel_scan = pscan;
+	scan->xs_temp_snap = temp_snap;
+
+	return scan;
 }
 
 /* ----------------
@@ -341,6 +354,9 @@ index_endscan(IndexScanDesc scan)
 	/* Release index refcount acquired by index_beginscan */
 	RelationDecrementReferenceCount(scan->indexRelation);
 
+	if (scan->xs_temp_snap)
+		UnregisterSnapshot(scan->xs_snapshot);
+
 	/* Release the scan data structure itself */
 	IndexScanEnd(scan);
 }
@@ -389,6 +405,115 @@ index_restrpos(IndexScanDesc scan)
 	scan->indexRelation->rd_amroutine->amrestrpos(scan);
 }
 
+/*
+ * index_parallelscan_estimate - estimate shared memory for parallel scan
+ *
+ * Currently, we don't pass any information to the AM-specific estimator,
+ * so it can probably only return a constant.  In the future, we might need
+ * to pass more information.
+ */
+Size
+index_parallelscan_estimate(Relation indexRelation, Snapshot snapshot)
+{
+	Size		nbytes;
+
+	RELATION_CHECKS;
+
+	nbytes = offsetof(ParallelIndexScanDescData, ps_snapshot_data);
+	nbytes = add_size(nbytes, EstimateSnapshotSpace(snapshot));
+	nbytes = MAXALIGN(nbytes);
+
+	/*
+	 * If amestimateparallelscan is not provided, assume there is no
+	 * AM-specific data needed.  (It's hard to believe that could work, but
+	 * it's easy enough to cater to it here.)
+	 */
+	if (indexRelation->rd_amroutine->amestimateparallelscan != NULL)
+		nbytes = add_size(nbytes,
+					  indexRelation->rd_amroutine->amestimateparallelscan());
+
+	return nbytes;
+}
+
+/*
+ * index_parallelscan_initialize - initialize parallel scan
+ *
+ * We initialize both the ParallelIndexScanDesc proper and the AM-specific
+ * information which follows it.
+ *
+ * This function calls access method specific initialization routine to
+ * initialize am specific information.  Call this just once in the leader
+ * process; then, individual workers attach via index_beginscan_parallel.
+ */
+void
+index_parallelscan_initialize(Relation heapRelation, Relation indexRelation,
+							  Snapshot snapshot, ParallelIndexScanDesc target)
+{
+	Size		offset;
+
+	RELATION_CHECKS;
+
+	offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data),
+					  EstimateSnapshotSpace(snapshot));
+	offset = MAXALIGN(offset);
+
+	target->ps_relid = RelationGetRelid(heapRelation);
+	target->ps_indexid = RelationGetRelid(indexRelation);
+	target->ps_offset = offset;
+	SerializeSnapshot(snapshot, target->ps_snapshot_data);
+
+	/* aminitparallelscan is optional; assume no-op if not provided by AM */
+	if (indexRelation->rd_amroutine->aminitparallelscan != NULL)
+	{
+		void	   *amtarget;
+
+		amtarget = OffsetToPointer(target, offset);
+		indexRelation->rd_amroutine->aminitparallelscan(amtarget);
+	}
+}
+
+/* ----------------
+ *		index_parallelrescan  - (re)start a parallel scan of an index
+ * ----------------
+ */
+void
+index_parallelrescan(IndexScanDesc scan)
+{
+	SCAN_CHECKS;
+
+	/* amparallelrescan is optional; assume no-op if not provided by AM */
+	if (scan->indexRelation->rd_amroutine->amparallelrescan != NULL)
+		scan->indexRelation->rd_amroutine->amparallelrescan(scan);
+}
+
+/*
+ * index_beginscan_parallel - join parallel index scan
+ *
+ * Caller must be holding suitable locks on the heap and the index.
+ */
+IndexScanDesc
+index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
+						 int norderbys, ParallelIndexScanDesc pscan)
+{
+	Snapshot	snapshot;
+	IndexScanDesc scan;
+
+	Assert(RelationGetRelid(heaprel) == pscan->ps_relid);
+	snapshot = RestoreSnapshot(pscan->ps_snapshot_data);
+	RegisterSnapshot(snapshot);
+	scan = index_beginscan_internal(indexrel, nkeys, norderbys, snapshot,
+									pscan, true);
+
+	/*
+	 * Save additional parameters into the scandesc.  Everything else was set
+	 * up by index_beginscan_internal.
+	 */
+	scan->heapRelation = heaprel;
+	scan->xs_snapshot = snapshot;
+
+	return scan;
+}
+
 /* ----------------
  * index_getnext_tid - get the next TID from a scan
  *
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 1bb1acfea6a..469e7abe4df 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -118,6 +118,9 @@ bthandler(PG_FUNCTION_ARGS)
 	amroutine->amendscan = btendscan;
 	amroutine->ammarkpos = btmarkpos;
 	amroutine->amrestrpos = btrestrpos;
+	amroutine->amestimateparallelscan = NULL;
+	amroutine->aminitparallelscan = NULL;
+	amroutine->amparallelrescan = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index ca4b0bdbe4f..78846bec666 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -68,6 +68,9 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amendscan = spgendscan;
 	amroutine->ammarkpos = NULL;
 	amroutine->amrestrpos = NULL;
+	amroutine->amestimateparallelscan = NULL;
+	amroutine->aminitparallelscan = NULL;
+	amroutine->amparallelrescan = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 6a5f279e7f9..e91e41dc0f4 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -137,6 +137,18 @@ typedef void (*ammarkpos_function) (IndexScanDesc scan);
 /* restore marked scan position */
 typedef void (*amrestrpos_function) (IndexScanDesc scan);
 
+/*
+ * Callback function signatures - for parallel index scans.
+ */
+
+/* estimate size of parallel scan descriptor */
+typedef Size (*amestimateparallelscan_function) (void);
+
+/* prepare for parallel index scan */
+typedef void (*aminitparallelscan_function) (void *target);
+
+/* (re)start parallel index scan */
+typedef void (*amparallelrescan_function) (IndexScanDesc scan);
 
 /*
  * API struct for an index AM.  Note this must be stored in a single palloc'd
@@ -196,6 +208,11 @@ typedef struct IndexAmRoutine
 	amendscan_function amendscan;
 	ammarkpos_function ammarkpos;		/* can be NULL */
 	amrestrpos_function amrestrpos;		/* can be NULL */
+
+	/* interface functions to support parallel index scans */
+	amestimateparallelscan_function amestimateparallelscan;		/* can be NULL */
+	aminitparallelscan_function aminitparallelscan;		/* can be NULL */
+	amparallelrescan_function amparallelrescan; /* can be NULL */
 } IndexAmRoutine;
 
 
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index b2e078aed2e..51466b96e8b 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -83,6 +83,8 @@ typedef bool (*IndexBulkDeleteCallback) (ItemPointer itemptr, void *state);
 typedef struct IndexScanDescData *IndexScanDesc;
 typedef struct SysScanDescData *SysScanDesc;
 
+typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc;
+
 /*
  * Enumeration specifying the type of uniqueness check to perform in
  * index_insert().
@@ -144,6 +146,13 @@ extern void index_rescan(IndexScanDesc scan,
 extern void index_endscan(IndexScanDesc scan);
 extern void index_markpos(IndexScanDesc scan);
 extern void index_restrpos(IndexScanDesc scan);
+extern Size index_parallelscan_estimate(Relation indexrel, Snapshot snapshot);
+extern void index_parallelscan_initialize(Relation heaprel, Relation indexrel,
+							Snapshot snapshot, ParallelIndexScanDesc target);
+extern void index_parallelrescan(IndexScanDesc scan);
+extern IndexScanDesc index_beginscan_parallel(Relation heaprel,
+						 Relation indexrel, int nkeys, int norderbys,
+						 ParallelIndexScanDesc pscan);
 extern ItemPointer index_getnext_tid(IndexScanDesc scan,
 				  ScanDirection direction);
 extern HeapTuple index_fetch_heap(IndexScanDesc scan);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 8746045d8d8..ce3ca8d4ac2 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -93,6 +93,7 @@ typedef struct IndexScanDescData
 	ScanKey		keyData;		/* array of index qualifier descriptors */
 	ScanKey		orderByData;	/* array of ordering op descriptors */
 	bool		xs_want_itup;	/* caller requests index tuples */
+	bool		xs_temp_snap;	/* unregister snapshot at scan end? */
 
 	/* signaling to index AM about killing index tuples */
 	bool		kill_prior_tuple;		/* last-returned tuple is dead */
@@ -126,8 +127,20 @@ typedef struct IndexScanDescData
 
 	/* state data for traversing HOT chains in index_getnext */
 	bool		xs_continue_hot;	/* T if must keep walking HOT chain */
+
+	/* parallel index scan information, in shared memory */
+	ParallelIndexScanDesc parallel_scan;
 }	IndexScanDescData;
 
+/* Generic structure for parallel scans */
+typedef struct ParallelIndexScanDescData
+{
+	Oid			ps_relid;
+	Oid			ps_indexid;
+	Size		ps_offset;		/* Offset in bytes of am specific structure */
+	char		ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+} ParallelIndexScanDescData;
+
 /* Struct for heap-or-index scans of system tables */
 typedef struct SysScanDescData
 {
diff --git a/src/include/c.h b/src/include/c.h
index efbb77f540a..a2c043adfbf 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -527,6 +527,9 @@ typedef NameData *Name;
 #define PointerIsAligned(pointer, type) \
 		(((uintptr_t)(pointer) % (sizeof (type))) == 0)
 
+#define OffsetToPointer(base, offset) \
+		((void *)((char *) base + offset))
+
 #define OidIsValid(objectId)  ((bool) ((objectId) != InvalidOid))
 
 #define RegProcedureIsValid(p)	OidIsValid(p)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 993880da43e..c4235ae63a4 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1264,6 +1264,8 @@ OverrideSearchPath
 OverrideStackEntry
 PACE_HEADER
 PACL
+ParallelIndexScanDesc
+ParallelIndexScanDescData
 PATH
 PBOOL
 PCtxtHandle
-- 
GitLab