diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index bcf987124fded27ba94c4b174ad002223fdba6c6..66deb1faee01f765624febc0954a3c26b3b47404 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -63,6 +63,7 @@ #include "storage/predicate.h" #include "storage/procarray.h" #include "storage/smgr.h" +#include "storage/spin.h" #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" @@ -80,12 +81,14 @@ bool synchronize_seqscans = true; static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelHeapScanDesc parallel_scan, bool allow_strat, bool allow_sync, bool allow_pagemode, bool is_bitmapscan, bool is_samplescan, bool temp_snap); +static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan); static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, @@ -226,7 +229,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * results for a non-MVCC snapshot, the caller must hold some higher-level * lock that ensures the interesting tuple(s) won't change.) */ - scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); + if (scan->rs_parallel != NULL) + scan->rs_nblocks = scan->rs_parallel->phs_nblocks; + else + scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); /* * If the table is large relative to NBuffers, use a bulk-read access @@ -237,7 +243,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * behaviors, independently of the size of the table; also there is a GUC * variable that can disable synchronized scanning.) * - * During a rescan, don't make a new strategy object if we don't have to. + * Note that heap_parallelscan_initialize has a very similar test; if you + * change this, consider changing that one, too. */ if (!RelationUsesLocalBuffers(scan->rs_rd) && scan->rs_nblocks > NBuffers / 4) @@ -250,6 +257,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) if (allow_strat) { + /* During a rescan, keep the previous strategy object. */ if (scan->rs_strategy == NULL) scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD); } @@ -260,7 +268,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_strategy = NULL; } - if (keep_startblock) + if (scan->rs_parallel != NULL) + { + /* For parallel scan, believe whatever ParallelHeapScanDesc says. */ + scan->rs_syncscan = scan->rs_parallel->phs_syncscan; + } + else if (keep_startblock) { /* * When rescanning, we want to keep the previous startblock setting, @@ -496,7 +509,20 @@ heapgettup(HeapScanDesc scan, tuple->t_data = NULL; return; } - page = scan->rs_startblock; /* first page */ + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + + /* Other processes might have already finished the scan. */ + if (page == InvalidBlockNumber) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + page = scan->rs_startblock; /* first page */ heapgetpage(scan, page); lineoff = FirstOffsetNumber; /* first offnum */ scan->rs_inited = true; @@ -519,6 +545,9 @@ heapgettup(HeapScanDesc scan, } else if (backward) { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + if (!scan->rs_inited) { /* @@ -669,6 +698,11 @@ heapgettup(HeapScanDesc scan, page = scan->rs_nblocks; page--; } + else if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + finished = (page == InvalidBlockNumber); + } else { page++; @@ -773,7 +807,20 @@ heapgettup_pagemode(HeapScanDesc scan, tuple->t_data = NULL; return; } - page = scan->rs_startblock; /* first page */ + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + + /* Other processes might have already finished the scan. */ + if (page == InvalidBlockNumber) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + page = scan->rs_startblock; /* first page */ heapgetpage(scan, page); lineindex = 0; scan->rs_inited = true; @@ -793,6 +840,9 @@ heapgettup_pagemode(HeapScanDesc scan, } else if (backward) { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + if (!scan->rs_inited) { /* @@ -932,6 +982,11 @@ heapgettup_pagemode(HeapScanDesc scan, page = scan->rs_nblocks; page--; } + else if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + finished = (page == InvalidBlockNumber); + } else { page++; @@ -1341,7 +1396,7 @@ HeapScanDesc heap_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, true, true, true, false, false, false); } @@ -1351,7 +1406,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key) Oid relid = RelationGetRelid(relation); Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid)); - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, true, true, true, false, false, true); } @@ -1360,7 +1415,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, allow_strat, allow_sync, true, false, false, false); } @@ -1369,7 +1424,7 @@ HeapScanDesc heap_beginscan_bm(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, false, false, true, true, false, false); } @@ -1378,7 +1433,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync, bool allow_pagemode) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, allow_strat, allow_sync, allow_pagemode, false, true, false); } @@ -1386,6 +1441,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot, static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelHeapScanDesc parallel_scan, bool allow_strat, bool allow_sync, bool allow_pagemode, @@ -1418,6 +1474,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, scan->rs_allow_strat = allow_strat; scan->rs_allow_sync = allow_sync; scan->rs_temp_snap = temp_snap; + scan->rs_parallel = parallel_scan; /* * we can use page-at-a-time mode if it's an MVCC-safe snapshot @@ -1473,6 +1530,25 @@ heap_rescan(HeapScanDesc scan, * reinitialize scan descriptor */ initscan(scan, key, true); + + /* + * reset parallel scan, if present + */ + if (scan->rs_parallel != NULL) + { + ParallelHeapScanDesc parallel_scan; + + /* + * Caller is responsible for making sure that all workers have + * finished the scan before calling this, so it really shouldn't be + * necessary to acquire the mutex at all. We acquire it anyway, just + * to be tidy. + */ + parallel_scan = scan->rs_parallel; + SpinLockAcquire(¶llel_scan->phs_mutex); + parallel_scan->phs_cblock = parallel_scan->phs_startblock; + SpinLockRelease(¶llel_scan->phs_mutex); + } } /* ---------------- @@ -1531,6 +1607,154 @@ heap_endscan(HeapScanDesc scan) pfree(scan); } +/* ---------------- + * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc + * + * Sadly, this doesn't reduce to a constant, because the size required + * to serialize the snapshot can vary. + * ---------------- + */ +Size +heap_parallelscan_estimate(Snapshot snapshot) +{ + return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data), + EstimateSnapshotSpace(snapshot)); +} + +/* ---------------- + * heap_parallelscan_initialize - initialize ParallelHeapScanDesc + * + * Must allow as many bytes of shared memory as returned by + * heap_parallelscan_estimate. Call this just once in the leader + * process; then, individual workers attach via heap_beginscan_parallel. + * ---------------- + */ +void +heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, + Snapshot snapshot) +{ + target->phs_relid = RelationGetRelid(relation); + target->phs_nblocks = RelationGetNumberOfBlocks(relation); + /* compare phs_syncscan initialization to similar logic in initscan */ + target->phs_syncscan = synchronize_seqscans && + !RelationUsesLocalBuffers(relation) && + target->phs_nblocks > NBuffers / 4; + SpinLockInit(&target->phs_mutex); + target->phs_cblock = InvalidBlockNumber; + target->phs_startblock = InvalidBlockNumber; + SerializeSnapshot(snapshot, target->phs_snapshot_data); +} + +/* ---------------- + * heap_beginscan_parallel - join a parallel scan + * + * Caller must hold a suitable lock on the correct relation. + * ---------------- + */ +HeapScanDesc +heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) +{ + Snapshot snapshot; + + Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); + snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data); + RegisterSnapshot(snapshot); + + return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan, + true, true, true, false, false, true); +} + +/* ---------------- + * heap_parallelscan_nextpage - get the next page to scan + * + * Get the next page to scan. Even if there are no pages left to scan, + * another backend could have grabbed a page to scan and not yet finished + * looking at it, so it doesn't follow that the scan is done when the + * first backend gets an InvalidBlockNumber return. + * ---------------- + */ +static BlockNumber +heap_parallelscan_nextpage(HeapScanDesc scan) +{ + BlockNumber page = InvalidBlockNumber; + BlockNumber sync_startpage = InvalidBlockNumber; + BlockNumber report_page = InvalidBlockNumber; + ParallelHeapScanDesc parallel_scan; + + Assert(scan->rs_parallel); + parallel_scan = scan->rs_parallel; + +retry: + /* Grab the spinlock. */ + SpinLockAcquire(¶llel_scan->phs_mutex); + + /* + * If the scan's startblock has not yet been initialized, we must do so + * now. If this is not a synchronized scan, we just start at block 0, but + * if it is a synchronized scan, we must get the starting position from + * the synchronized scan machinery. We can't hold the spinlock while + * doing that, though, so release the spinlock, get the information we + * need, and retry. If nobody else has initialized the scan in the + * meantime, we'll fill in the value we fetched on the second time + * through. + */ + if (parallel_scan->phs_startblock == InvalidBlockNumber) + { + if (!parallel_scan->phs_syncscan) + parallel_scan->phs_startblock = 0; + else if (sync_startpage != InvalidBlockNumber) + parallel_scan->phs_startblock = sync_startpage; + else + { + SpinLockRelease(¶llel_scan->phs_mutex); + sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks); + goto retry; + } + parallel_scan->phs_cblock = parallel_scan->phs_startblock; + } + + /* + * The current block number is the next one that needs to be scanned, + * unless it's InvalidBlockNumber already, in which case there are no more + * blocks to scan. After remembering the current value, we must advance + * it so that the next call to this function returns the next block to be + * scanned. + */ + page = parallel_scan->phs_cblock; + if (page != InvalidBlockNumber) + { + parallel_scan->phs_cblock++; + if (parallel_scan->phs_cblock >= scan->rs_nblocks) + parallel_scan->phs_cblock = 0; + if (parallel_scan->phs_cblock == parallel_scan->phs_startblock) + { + parallel_scan->phs_cblock = InvalidBlockNumber; + report_page = parallel_scan->phs_startblock; + } + } + + /* Release the lock. */ + SpinLockRelease(¶llel_scan->phs_mutex); + + /* + * Report scan location. Normally, we report the current page number. + * When we reach the end of the scan, though, we report the starting page, + * not the ending page, just so the starting positions for later scans + * doesn't slew backwards. We only report the position at the end of the + * scan once, though: subsequent callers will have report nothing, since + * they will have page == InvalidBlockNumber. + */ + if (scan->rs_syncscan) + { + if (report_page == InvalidBlockNumber) + report_page = page; + if (report_page != InvalidBlockNumber) + ss_report_location(scan->rs_rd, report_page); + } + + return page; +} + /* ---------------- * heap_getnext - retrieve next tuple in scan * diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 75e6b72f9e0204913254548a42322a6fa7708d63..98eeadd23f82ca227ab7b2610130e1148e4f6586 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -96,8 +96,9 @@ extern Relation heap_openrv_extended(const RangeVar *relation, #define heap_close(r,l) relation_close(r,l) -/* struct definition appears in relscan.h */ +/* struct definitions appear in relscan.h */ typedef struct HeapScanDescData *HeapScanDesc; +typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc; /* * HeapScanIsValid @@ -126,6 +127,11 @@ extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key, extern void heap_endscan(HeapScanDesc scan); extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction); +extern Size heap_parallelscan_estimate(Snapshot snapshot); +extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, + Relation relation, Snapshot snapshot); +extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); + extern bool heap_fetch(Relation relation, Snapshot snapshot, HeapTuple tuple, Buffer *userbuf, bool keep_buf, Relation stats_relation); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 6e6231971fdca3ef0a780f179b7cda515b4835a3..356c7e6b048f98d7a03a5d1475cea1f7c4fb9ed8 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -20,6 +20,25 @@ #include "access/itup.h" #include "access/tupdesc.h" +/* + * Shared state for parallel heap scan. + * + * Each backend participating in a parallel heap scan has its own + * HeapScanDesc in backend-private memory, and those objects all contain + * a pointer to this structure. The information here must be sufficient + * to properly initialize each new HeapScanDesc as workers join the scan, + * and it must act as a font of block numbers for those workers. + */ +typedef struct ParallelHeapScanDescData +{ + Oid phs_relid; /* OID of relation to scan */ + bool phs_syncscan; /* report location to syncscan logic? */ + BlockNumber phs_nblocks; /* # blocks in relation at start of scan */ + slock_t phs_mutex; /* mutual exclusion for block number fields */ + BlockNumber phs_startblock; /* starting block number */ + BlockNumber phs_cblock; /* current block number */ + char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; +} ParallelHeapScanDescData; typedef struct HeapScanDescData { @@ -49,6 +68,7 @@ typedef struct HeapScanDescData BlockNumber rs_cblock; /* current block # in scan, if any */ Buffer rs_cbuf; /* current buffer in scan, if any */ /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ + ParallelHeapScanDesc rs_parallel; /* parallel scan information */ /* these fields only used in page-at-a-time mode and for bitmap scans */ int rs_cindex; /* current tuple's index in vistuples */