diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 81422afa2f8c42ab770ec5d88b8127c1105ad3aa..72ed6325384d3c53bc545b99a12ba7beb9009882 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -24,6 +24,7 @@ * heap_getnext - retrieve next tuple in scan * heap_fetch - retrieve tuple with given tid * heap_insert - insert tuple into a relation + * heap_multi_insert - insert multiple tuples into a relation * heap_delete - delete a tuple from a relation * heap_update - replace a tuple in a relation with another tuple * heap_markpos - mark scan position @@ -79,6 +80,8 @@ static HeapScanDesc heap_beginscan_internal(Relation relation, int nkeys, ScanKey key, bool allow_strat, bool allow_sync, bool is_bitmapscan); +static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, + TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool all_visible_cleared, bool new_all_visible_cleared); @@ -1866,55 +1869,14 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, Buffer vmbuffer = InvalidBuffer; bool all_visible_cleared = false; - if (relation->rd_rel->relhasoids) - { -#ifdef NOT_USED - /* this is redundant with an Assert in HeapTupleSetOid */ - Assert(tup->t_data->t_infomask & HEAP_HASOID); -#endif - - /* - * If the object id of this tuple has already been assigned, trust the - * caller. There are a couple of ways this can happen. At initial db - * creation, the backend program sets oids for tuples. When we define - * an index, we set the oid. Finally, in the future, we may allow - * users to set their own object ids in order to support a persistent - * object store (objects need to contain pointers to one another). - */ - if (!OidIsValid(HeapTupleGetOid(tup))) - HeapTupleSetOid(tup, GetNewOid(relation)); - } - else - { - /* check there is not space for an OID */ - Assert(!(tup->t_data->t_infomask & HEAP_HASOID)); - } - - tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); - tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); - tup->t_data->t_infomask |= HEAP_XMAX_INVALID; - HeapTupleHeaderSetXmin(tup->t_data, xid); - HeapTupleHeaderSetCmin(tup->t_data, cid); - HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */ - tup->t_tableOid = RelationGetRelid(relation); - /* - * If the new tuple is too big for storage or contains already toasted - * out-of-line attributes from some other relation, invoke the toaster. + * Fill in tuple header fields, assign an OID, and toast the tuple if + * necessary. * * Note: below this point, heaptup is the data we actually intend to store * into the relation; tup is the caller's original untoasted data. */ - if (relation->rd_rel->relkind != RELKIND_RELATION) - { - /* toast table entries should never be recursively toasted */ - Assert(!HeapTupleHasExternal(tup)); - heaptup = tup; - } - else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD) - heaptup = toast_insert_or_update(relation, tup, NULL, options); - else - heaptup = tup; + heaptup = heap_prepare_insert(relation, tup, xid, cid, options); /* * We're about to do the actual insert -- but check for conflict first, @@ -2035,7 +1997,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ CacheInvalidateHeapTuple(relation, heaptup, NULL); - pgstat_count_heap_insert(relation); + pgstat_count_heap_insert(relation, 1); /* * If heaptup is a private copy, release it. Don't forget to copy t_self @@ -2050,6 +2012,285 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, return HeapTupleGetOid(tup); } +/* + * Subroutine for heap_insert(). Prepares a tuple for insertion. This sets the + * tuple header fields, assigns an OID, and toasts the tuple if necessary. + * Returns a toasted version of the tuple if it was toasted, or the original + * tuple if not. Note that in any case, the header fields are also set in + * the original tuple. + */ +static HeapTuple +heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, + CommandId cid, int options) +{ + if (relation->rd_rel->relhasoids) + { +#ifdef NOT_USED + /* this is redundant with an Assert in HeapTupleSetOid */ + Assert(tup->t_data->t_infomask & HEAP_HASOID); +#endif + + /* + * If the object id of this tuple has already been assigned, trust the + * caller. There are a couple of ways this can happen. At initial db + * creation, the backend program sets oids for tuples. When we define + * an index, we set the oid. Finally, in the future, we may allow + * users to set their own object ids in order to support a persistent + * object store (objects need to contain pointers to one another). + */ + if (!OidIsValid(HeapTupleGetOid(tup))) + HeapTupleSetOid(tup, GetNewOid(relation)); + } + else + { + /* check there is not space for an OID */ + Assert(!(tup->t_data->t_infomask & HEAP_HASOID)); + } + + tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); + tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); + tup->t_data->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderSetXmin(tup->t_data, xid); + HeapTupleHeaderSetCmin(tup->t_data, cid); + HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */ + tup->t_tableOid = RelationGetRelid(relation); + + /* + * If the new tuple is too big for storage or contains already toasted + * out-of-line attributes from some other relation, invoke the toaster. + */ + if (relation->rd_rel->relkind != RELKIND_RELATION) + { + /* toast table entries should never be recursively toasted */ + Assert(!HeapTupleHasExternal(tup)); + return tup; + } + else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD) + return toast_insert_or_update(relation, tup, NULL, options); + else + return tup; +} + +/* + * heap_multi_insert - insert multiple tuple into a heap + * + * This is like heap_insert(), but inserts multiple tuples in one operation. + * That's faster than calling heap_insert() in a loop, because when multiple + * tuples can be inserted on a single page, we can write just a single WAL + * record covering all of them, and only need to lock/unlock the page once. + * + * Note: this leaks memory into the current memory context. You can create a + * temporary context before calling this, if that's a problem. + */ +void +heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, + CommandId cid, int options, BulkInsertState bistate) +{ + TransactionId xid = GetCurrentTransactionId(); + HeapTuple *heaptuples; + Buffer buffer; + Buffer vmbuffer = InvalidBuffer; + bool all_visible_cleared = false; + int i; + int ndone; + char *scratch = NULL; + Page page; + bool needwal; + + needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation); + + /* Toast and set header data in all the tuples */ + heaptuples = palloc(ntuples * sizeof(HeapTuple)); + for (i = 0; i < ntuples; i++) + heaptuples[i] = heap_prepare_insert(relation, tuples[i], + xid, cid, options); + + /* + * Allocate some memory to use for constructing the WAL record. Using + * palloc() within a critical section is not safe, so we allocate this + * beforehand. + */ + if (needwal) + scratch = palloc(BLCKSZ); + + /* + * We're about to do the actual inserts -- but check for conflict first, + * to avoid possibly having to roll back work we've just done. + * + * For a heap insert, we only need to check for table-level SSI locks. + * Our new tuple can't possibly conflict with existing tuple locks, and + * heap page locks are only consolidated versions of tuple locks; they do + * not lock "gaps" as index page locks do. So we don't need to identify + * a buffer before making the call. + */ + CheckForSerializableConflictIn(relation, NULL, InvalidBuffer); + + ndone = 0; + while (ndone < ntuples) + { + int nthispage; + + /* + * Find buffer where at least the next tuple will fit. If the page + * is all-visible, this will also pin the requisite visibility map + * page. + */ + buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len, + InvalidBuffer, options, bistate, + &vmbuffer, NULL); + page = BufferGetPage(buffer); + + if (PageIsAllVisible(page)) + { + all_visible_cleared = true; + PageClearAllVisible(page); + visibilitymap_clear(relation, + BufferGetBlockNumber(buffer), + vmbuffer); + } + + /* NO EREPORT(ERROR) from here till changes are logged */ + START_CRIT_SECTION(); + + /* Put as many tuples as fit on this page */ + for (nthispage = 0; ndone + nthispage < ntuples; nthispage++) + { + HeapTuple heaptup = heaptuples[ndone + nthispage]; + + if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len)) + break; + + RelationPutHeapTuple(relation, buffer, heaptup); + } + + /* + * XXX Should we set PageSetPrunable on this page ? See heap_insert() + */ + + MarkBufferDirty(buffer); + + /* XLOG stuff */ + if (needwal) + { + XLogRecPtr recptr; + xl_heap_multi_insert *xlrec; + XLogRecData rdata[2]; + uint8 info = XLOG_HEAP2_MULTI_INSERT; + char *tupledata; + int totaldatalen; + char *scratchptr = scratch; + bool init; + + /* + * If the page was previously empty, we can reinit the page + * instead of restoring the whole thing. + */ + init = (ItemPointerGetOffsetNumber(&(heaptuples[ndone]->t_self)) == FirstOffsetNumber && + PageGetMaxOffsetNumber(page) == FirstOffsetNumber + nthispage - 1); + + /* allocate xl_heap_multi_insert struct from the scratch area */ + xlrec = (xl_heap_multi_insert *) scratchptr; + scratchptr += SizeOfHeapMultiInsert; + + /* + * Allocate offsets array. Unless we're reinitializing the page, + * in that case the tuples are stored in order starting at + * FirstOffsetNumber and we don't need to store the offsets + * explicitly. + */ + if (!init) + scratchptr += nthispage * sizeof(OffsetNumber); + + /* the rest of the scratch space is used for tuple data */ + tupledata = scratchptr; + + xlrec->all_visible_cleared = all_visible_cleared; + xlrec->node = relation->rd_node; + xlrec->blkno = BufferGetBlockNumber(buffer); + xlrec->ntuples = nthispage; + + /* + * Write out an xl_multi_insert_tuple and the tuple data itself + * for each tuple. + */ + for (i = 0; i < nthispage; i++) + { + HeapTuple heaptup = heaptuples[ndone + i]; + xl_multi_insert_tuple *tuphdr; + int datalen; + + if (!init) + xlrec->offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self); + /* xl_multi_insert_tuple needs two-byte alignment. */ + tuphdr = (xl_multi_insert_tuple *) SHORTALIGN(scratchptr); + scratchptr = ((char *) tuphdr) + SizeOfMultiInsertTuple; + + tuphdr->t_infomask2 = heaptup->t_data->t_infomask2; + tuphdr->t_infomask = heaptup->t_data->t_infomask; + tuphdr->t_hoff = heaptup->t_data->t_hoff; + + /* write bitmap [+ padding] [+ oid] + data */ + datalen = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits); + memcpy(scratchptr, + (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits), + datalen); + tuphdr->datalen = datalen; + scratchptr += datalen; + } + totaldatalen = scratchptr - tupledata; + Assert((scratchptr - scratch) < BLCKSZ); + + rdata[0].data = (char *) xlrec; + rdata[0].len = tupledata - scratch; + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &rdata[1]; + + rdata[1].data = tupledata; + rdata[1].len = totaldatalen; + rdata[1].buffer = buffer; + rdata[1].buffer_std = true; + rdata[1].next = NULL; + + /* + * If we're going to reinitialize the whole page using the WAL + * record, hide buffer reference from XLogInsert. + */ + if (init) + { + rdata[1].buffer = InvalidBuffer; + info |= XLOG_HEAP_INIT_PAGE; + } + + recptr = XLogInsert(RM_HEAP2_ID, info, rdata); + + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buffer); + if (vmbuffer != InvalidBuffer) + ReleaseBuffer(vmbuffer); + + ndone += nthispage; + } + + /* + * If tuples are cachable, mark them for invalidation from the caches in + * case we abort. Note it is OK to do this after releasing the buffer, + * because the heaptuples data structure is all in local memory, not in + * the shared buffer. + */ + if (IsSystemRelation(relation)) + { + for (i = 0; i < ntuples; i++) + CacheInvalidateHeapTuple(relation, heaptuples[i], NULL); + } + + pgstat_count_heap_insert(relation, ntuples); +} + /* * simple_heap_insert - insert a tuple * @@ -4775,6 +5016,144 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) XLogRecordPageWithFreeSpace(xlrec->target.node, blkno, freespace); } +/* + * Handles MULTI_INSERT record type. + */ +static void +heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) +{ + char *recdata = XLogRecGetData(record); + xl_heap_multi_insert *xlrec; + Buffer buffer; + Page page; + struct + { + HeapTupleHeaderData hdr; + char data[MaxHeapTupleSize]; + } tbuf; + HeapTupleHeader htup; + uint32 newlen; + Size freespace; + BlockNumber blkno; + int i; + bool isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0; + + xlrec = (xl_heap_multi_insert *) recdata; + recdata += SizeOfHeapMultiInsert; + + /* + * If we're reinitializing the page, the tuples are stored in order from + * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL + * record. + */ + if (!isinit) + recdata += sizeof(OffsetNumber) * xlrec->ntuples; + + blkno = xlrec->blkno; + + /* + * The visibility map may need to be fixed even if the heap page is + * already up-to-date. + */ + if (xlrec->all_visible_cleared) + { + Relation reln = CreateFakeRelcacheEntry(xlrec->node); + Buffer vmbuffer = InvalidBuffer; + + visibilitymap_pin(reln, blkno, &vmbuffer); + visibilitymap_clear(reln, blkno, vmbuffer); + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } + + if (record->xl_info & XLR_BKP_BLOCK_1) + return; + + if (isinit) + { + buffer = XLogReadBuffer(xlrec->node, blkno, true); + Assert(BufferIsValid(buffer)); + page = (Page) BufferGetPage(buffer); + + PageInit(page, BufferGetPageSize(buffer), 0); + } + else + { + buffer = XLogReadBuffer(xlrec->node, blkno, false); + if (!BufferIsValid(buffer)) + return; + page = (Page) BufferGetPage(buffer); + + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ + { + UnlockReleaseBuffer(buffer); + return; + } + } + + for (i = 0; i < xlrec->ntuples; i++) + { + OffsetNumber offnum; + xl_multi_insert_tuple *xlhdr; + + if (isinit) + offnum = FirstOffsetNumber + i; + else + offnum = xlrec->offsets[i]; + if (PageGetMaxOffsetNumber(page) + 1 < offnum) + elog(PANIC, "heap_multi_insert_redo: invalid max offset number"); + + xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata); + recdata += SizeOfMultiInsertTuple; + + newlen = xlhdr->datalen; + Assert(newlen <= MaxHeapTupleSize); + htup = &tbuf.hdr; + MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); + /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ + memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), + (char *) recdata, + newlen); + recdata += newlen; + + newlen += offsetof(HeapTupleHeaderData, t_bits); + htup->t_infomask2 = xlhdr->t_infomask2; + htup->t_infomask = xlhdr->t_infomask; + htup->t_hoff = xlhdr->t_hoff; + HeapTupleHeaderSetXmin(htup, record->xl_xid); + HeapTupleHeaderSetCmin(htup, FirstCommandId); + ItemPointerSetBlockNumber(&htup->t_ctid, blkno); + ItemPointerSetOffsetNumber(&htup->t_ctid, offnum); + + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); + if (offnum == InvalidOffsetNumber) + elog(PANIC, "heap_multi_insert_redo: failed to add tuple"); + } + + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + + if (xlrec->all_visible_cleared) + PageClearAllVisible(page); + + MarkBufferDirty(buffer); + UnlockReleaseBuffer(buffer); + + /* + * If the page is running low on free space, update the FSM as well. + * Arbitrarily, our definition of "low" is less than 20%. We can't do much + * better than that without knowing the fill-factor for the table. + * + * XXX: We don't get here if the page was restored from full page image. + * We don't bother to update the FSM in that case, it doesn't need to be + * totally accurate anyway. + */ + if (freespace < BLCKSZ / 5) + XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace); +} + /* * Handles UPDATE and HOT_UPDATE */ @@ -5164,6 +5543,9 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) case XLOG_HEAP2_VISIBLE: heap_xlog_visible(lsn, record); break; + case XLOG_HEAP2_MULTI_INSERT: + heap_xlog_multi_insert(lsn, record); + break; default: elog(PANIC, "heap2_redo: unknown op code %u", info); } @@ -5301,6 +5683,18 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec) xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode, xlrec->block); } + else if (info == XLOG_HEAP2_MULTI_INSERT) + { + xl_heap_multi_insert *xlrec = (xl_heap_multi_insert *) rec; + + if (xl_info & XLOG_HEAP_INIT_PAGE) + appendStringInfo(buf, "multi-insert (init): "); + else + appendStringInfo(buf, "multi-insert: "); + appendStringInfo(buf, "rel %u/%u/%u; blk %u; %d tuples", + xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode, + xlrec->blkno, xlrec->ntuples); + } else appendStringInfo(buf, "UNKNOWN"); } diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index e0c72d9cdc1d503448083cd5e45187f4cf4528e0..7637b689dba300d7f4e313610a74c97e961f3cf0 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -33,6 +33,7 @@ #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "optimizer/clauses.h" #include "optimizer/planner.h" #include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" @@ -149,6 +150,7 @@ typedef struct CopyStateData Oid *typioparams; /* array of element types for in_functions */ int *defmap; /* array of default att numbers */ ExprState **defexprs; /* array of default att expressions */ + bool volatile_defexprs; /* is any of defexprs volatile? */ /* * These variables are used to reduce overhead in textual COPY FROM. @@ -277,6 +279,11 @@ static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); static uint64 CopyFrom(CopyState cstate); +static void CopyFromInsertBatch(CopyState cstate, EState *estate, + CommandId mycid, int hi_options, + ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, + BulkInsertState bistate, + int nBufferedTuples, HeapTuple *bufferedTuples); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); @@ -1842,11 +1849,17 @@ CopyFrom(CopyState cstate) ExprContext *econtext; TupleTableSlot *myslot; MemoryContext oldcontext = CurrentMemoryContext; + ErrorContextCallback errcontext; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ BulkInsertState bistate; uint64 processed = 0; + bool useHeapMultiInsert; + int nBufferedTuples = 0; +#define MAX_BUFFERED_TUPLES 1000 + HeapTuple *bufferedTuples; + Size bufferedTuplesSize = 0; Assert(cstate->rel); @@ -1941,6 +1954,28 @@ CopyFrom(CopyState cstate) /* Triggers might need a slot as well */ estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); + /* + * It's more efficient to prepare a bunch of tuples for insertion, and + * insert them in one heap_multi_insert() call, than call heap_insert() + * separately for every tuple. However, we can't do that if there are + * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default + * expressions. Such triggers or expressions might query the table we're + * inserting to, and act differently if the tuples that have already been + * processed and prepared for insertion are not there. + */ + if ((resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_before_row || + resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) || + cstate->volatile_defexprs) + { + useHeapMultiInsert = false; + } + else + { + useHeapMultiInsert = true; + bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple)); + } + /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); @@ -1972,8 +2007,15 @@ CopyFrom(CopyState cstate) CHECK_FOR_INTERRUPTS(); - /* Reset the per-tuple exprcontext */ - ResetPerTupleExprContext(estate); + if (nBufferedTuples == 0) + { + /* + * Reset the per-tuple exprcontext. We can only do this if the + * tuple buffer is empty (calling the context the per-tuple memory + * context is a bit of a misnomer now + */ + ResetPerTupleExprContext(estate); + } /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); @@ -2010,24 +2052,49 @@ CopyFrom(CopyState cstate) if (!skip_tuple) { - List *recheckIndexes = NIL; - /* Check the constraints of the tuple */ if (cstate->rel->rd_att->constr) ExecConstraints(resultRelInfo, slot, estate); - /* OK, store the tuple and create index entries for it */ - heap_insert(cstate->rel, tuple, mycid, hi_options, bistate); + if (useHeapMultiInsert) + { + /* Add this tuple to the tuple buffer */ + bufferedTuples[nBufferedTuples++] = tuple; + bufferedTuplesSize += tuple->t_len; - if (resultRelInfo->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), - estate); + /* + * If the buffer filled up, flush it. Also flush if the total + * size of all the tuples in the buffer becomes large, to + * avoid using large amounts of memory for the buffers when + * the tuples are exceptionally wide. + */ + if (nBufferedTuples == MAX_BUFFERED_TUPLES || + bufferedTuplesSize > 65535) + { + CopyFromInsertBatch(cstate, estate, mycid, hi_options, + resultRelInfo, myslot, bistate, + nBufferedTuples, bufferedTuples); + nBufferedTuples = 0; + bufferedTuplesSize = 0; + } + } + else + { + List *recheckIndexes = NIL; - /* AFTER ROW INSERT Triggers */ - ExecARInsertTriggers(estate, resultRelInfo, tuple, - recheckIndexes); + /* OK, store the tuple and create index entries for it */ + heap_insert(cstate->rel, tuple, mycid, hi_options, bistate); - list_free(recheckIndexes); + if (resultRelInfo->ri_NumIndices > 0) + recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), + estate); + + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, tuple, + recheckIndexes); + + list_free(recheckIndexes); + } /* * We count only tuples not suppressed by a BEFORE INSERT trigger; @@ -2038,6 +2105,12 @@ CopyFrom(CopyState cstate) } } + /* Flush any remaining buffered tuples */ + if (nBufferedTuples > 0) + CopyFromInsertBatch(cstate, estate, mycid, hi_options, + resultRelInfo, myslot, bistate, + nBufferedTuples, bufferedTuples); + /* Done, clean up */ error_context_stack = errcontext.previous; @@ -2070,6 +2143,67 @@ CopyFrom(CopyState cstate) return processed; } +/* + * A subroutine of CopyFrom, to write the current batch of buffered heap + * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT + * triggers. + */ +static void +CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, + int hi_options, ResultRelInfo *resultRelInfo, + TupleTableSlot *myslot, BulkInsertState bistate, + int nBufferedTuples, HeapTuple *bufferedTuples) +{ + MemoryContext oldcontext; + int i; + + /* + * heap_multi_insert leaks memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + heap_multi_insert(cstate->rel, + bufferedTuples, + nBufferedTuples, + mycid, + hi_options, + bistate); + MemoryContextSwitchTo(oldcontext); + + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + for (i = 0; i < nBufferedTuples; i++) + { + List *recheckIndexes; + + ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false); + recheckIndexes = + ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self), + estate); + ExecARInsertTriggers(estate, resultRelInfo, + bufferedTuples[i], + recheckIndexes); + list_free(recheckIndexes); + } + } + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT triggers + * anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + resultRelInfo->ri_TrigDesc->trig_insert_after_row) + { + for (i = 0; i < nBufferedTuples; i++) + ExecARInsertTriggers(estate, resultRelInfo, + bufferedTuples[i], + NIL); + } +} + /* * Setup to read tuples from a file for COPY FROM. * @@ -2099,6 +2233,7 @@ BeginCopyFrom(Relation rel, int *defmap; ExprState **defexprs; MemoryContext oldcontext; + bool volatile_defexprs; cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); @@ -2122,6 +2257,7 @@ BeginCopyFrom(Relation rel, attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; num_defaults = 0; + volatile_defexprs = false; /* * Pick up the required catalog information for each attribute in the @@ -2163,6 +2299,9 @@ BeginCopyFrom(Relation rel, expression_planner((Expr *) defexpr), NULL); defmap[num_defaults] = attnum - 1; num_defaults++; + + if (!volatile_defexprs) + volatile_defexprs = contain_volatile_functions(defexpr); } } } @@ -2172,6 +2311,7 @@ BeginCopyFrom(Relation rel, cstate->typioparams = typioparams; cstate->defmap = defmap; cstate->defexprs = defexprs; + cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; if (pipe) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 24582e304c0f1d0a1724900597c38a244e4a8ae5..24f4cde068e7799053152e42af40d3390efc009e 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1677,10 +1677,10 @@ add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level) } /* - * pgstat_count_heap_insert - count a tuple insertion + * pgstat_count_heap_insert - count a tuple insertion of n tuples */ void -pgstat_count_heap_insert(Relation rel) +pgstat_count_heap_insert(Relation rel, int n) { PgStat_TableStatus *pgstat_info = rel->pgstat_info; @@ -1693,7 +1693,7 @@ pgstat_count_heap_insert(Relation rel) pgstat_info->trans->nest_level != nest_level) add_tabstat_xact_level(pgstat_info, nest_level); - pgstat_info->trans->tuples_inserted++; + pgstat_info->trans->tuples_inserted += n; } } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 85cbeb3273b80fba42d47f2aa0f3b6d7490c2f71..ed30b5dada63e54e326c96fe176f07961c55cf3a 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -97,6 +97,8 @@ extern void FreeBulkInsertState(BulkInsertState); extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate); +extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, + CommandId cid, int options, BulkInsertState bistate); extern HTSU_Result heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid, TransactionId *update_xmax, CommandId cid, Snapshot crosscheck, bool wait); diff --git a/src/include/access/htup.h b/src/include/access/htup.h index 966e2d0299e95c280a3acaaea257416bfbbbb2ce..3ca25acb897d06be87568b31b6842deb66bfb4d0 100644 --- a/src/include/access/htup.h +++ b/src/include/access/htup.h @@ -608,6 +608,7 @@ typedef HeapTupleData *HeapTuple; /* 0x20 is free, was XLOG_HEAP2_CLEAN_MOVE */ #define XLOG_HEAP2_CLEANUP_INFO 0x30 #define XLOG_HEAP2_VISIBLE 0x40 +#define XLOG_HEAP2_MULTI_INSERT 0x50 /* * All what we need to find changed tuple @@ -661,6 +662,36 @@ typedef struct xl_heap_insert #define SizeOfHeapInsert (offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool)) +/* + * This is what we need to know about a multi-insert. The record consists of + * xl_heap_multi_insert header, followed by a xl_multi_insert_tuple and tuple + * data for each tuple. 'offsets' array is omitted if the whole page is + * reinitialized (XLOG_HEAP_INIT_PAGE) + */ +typedef struct xl_heap_multi_insert +{ + RelFileNode node; + BlockNumber blkno; + bool all_visible_cleared; + uint16 ntuples; + OffsetNumber offsets[1]; + + /* TUPLE DATA (xl_multi_insert_tuples) FOLLOW AT END OF STRUCT */ +} xl_heap_multi_insert; + +#define SizeOfHeapMultiInsert offsetof(xl_heap_multi_insert, offsets) + +typedef struct xl_multi_insert_tuple +{ + uint16 datalen; /* size of tuple data that follows */ + uint16 t_infomask2; + uint16 t_infomask; + uint8 t_hoff; + /* TUPLE DATA FOLLOWS AT END OF STRUCT */ +} xl_multi_insert_tuple; + +#define SizeOfMultiInsertTuple (offsetof(xl_multi_insert_tuple, t_hoff) + sizeof(uint8)) + /* This is what we need to know about update|hot_update */ typedef struct xl_heap_update { diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 7b2bd4edc0e1d01c27442ca09b9d9031584af84f..651b7d9ddf85c6e1dded59d68253caf5ab2b7fd8 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -766,7 +766,7 @@ extern void pgstat_initstats(Relation rel); (rel)->pgstat_info->t_counts.t_blocks_hit++; \ } while (0) -extern void pgstat_count_heap_insert(Relation rel); +extern void pgstat_count_heap_insert(Relation rel, int n); extern void pgstat_count_heap_update(Relation rel, bool hot); extern void pgstat_count_heap_delete(Relation rel); extern void pgstat_update_heap_dead_tuples(Relation rel, int delta);