diff --git a/src/backend/access/brin/brin_pageops.c b/src/backend/access/brin/brin_pageops.c index 6ebfedd6a976a866ff1ff0bef8b8d9d209fee433..24ae38be6530bc1bee30f46d516841e0404850c6 100644 --- a/src/backend/access/brin/brin_pageops.c +++ b/src/backend/access/brin/brin_pageops.c @@ -178,10 +178,8 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange, } START_CRIT_SECTION(); - PageIndexDeleteNoCompact(oldpage, &oldoff, 1); - if (PageAddItemExtended(oldpage, (Item) newtup, newsz, oldoff, - PAI_OVERWRITE | PAI_ALLOW_FAR_OFFSET) == InvalidOffsetNumber) - elog(ERROR, "failed to add BRIN tuple"); + if (!PageIndexTupleOverwrite(oldpage, oldoff, (Item) newtup, newsz)) + elog(ERROR, "failed to replace BRIN tuple"); MarkBufferDirty(oldbuf); /* XLOG stuff */ diff --git a/src/backend/access/brin/brin_xlog.c b/src/backend/access/brin/brin_xlog.c index 27ba0a97f8d48ddf67568ed7c83704edea36f2bc..ed147299558a93eacc751977c8a3a235296f8b74 100644 --- a/src/backend/access/brin/brin_xlog.c +++ b/src/backend/access/brin/brin_xlog.c @@ -189,14 +189,9 @@ brin_xlog_samepage_update(XLogReaderState *record) page = (Page) BufferGetPage(buffer); offnum = xlrec->offnum; - if (PageGetMaxOffsetNumber(page) + 1 < offnum) - elog(PANIC, "brin_xlog_samepage_update: invalid max offset number"); - PageIndexDeleteNoCompact(page, &offnum, 1); - offnum = PageAddItemExtended(page, (Item) brintuple, tuplen, offnum, - PAI_OVERWRITE | PAI_ALLOW_FAR_OFFSET); - if (offnum == InvalidOffsetNumber) - elog(PANIC, "brin_xlog_samepage_update: failed to add tuple"); + if (!PageIndexTupleOverwrite(page, offnum, (Item) brintuple, tuplen)) + elog(PANIC, "brin_xlog_samepage_update: failed to replace tuple"); PageSetLSN(page, lsn); MarkBufferDirty(buffer); diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index f7f44b49aa30d0e2ae54b349d95ea6c2edd5f613..b8aa9bca6068632f9ee93936f5588588dd7da41c 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -493,18 +493,36 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, else { /* - * Enough space. We also get here if ntuples==0. + * Enough space. We always get here if ntup==0. */ START_CRIT_SECTION(); /* - * While we delete only one tuple at once we could mix calls - * PageIndexTupleDelete() here and PageIndexMultiDelete() in - * gistRedoPageUpdateRecord() + * Delete old tuple if any, then insert new tuple(s) if any. If + * possible, use the fast path of PageIndexTupleOverwrite. */ if (OffsetNumberIsValid(oldoffnum)) - PageIndexTupleDelete(page, oldoffnum); - gistfillbuffer(page, itup, ntup, InvalidOffsetNumber); + { + if (ntup == 1) + { + /* One-for-one replacement, so use PageIndexTupleOverwrite */ + if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup, + IndexTupleSize(*itup))) + elog(ERROR, "failed to add item to index page in \"%s\"", + RelationGetRelationName(rel)); + } + else + { + /* Delete old, then append new tuple(s) to page */ + PageIndexTupleDelete(page, oldoffnum); + gistfillbuffer(page, itup, ntup, InvalidOffsetNumber); + } + } + else + { + /* Just append new tuples at the end of the page */ + gistfillbuffer(page, itup, ntup, InvalidOffsetNumber); + } MarkBufferDirty(buffer); diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 01c7ef7ea6796e6ac413c2ce84fe59fb34d30b7f..5853d7638e4711a7272ca5acd4599d7e31f5b5a8 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -80,9 +80,31 @@ gistRedoPageUpdateRecord(XLogReaderState *record) page = (Page) BufferGetPage(buffer); - /* Delete old tuples */ - if (xldata->ntodelete > 0) + if (xldata->ntodelete == 1 && xldata->ntoinsert == 1) { + /* + * When replacing one tuple with one other tuple, we must use + * PageIndexTupleOverwrite for consistency with gistplacetopage. + */ + OffsetNumber offnum = *((OffsetNumber *) data); + IndexTuple itup; + Size itupsize; + + data += sizeof(OffsetNumber); + itup = (IndexTuple) data; + itupsize = IndexTupleSize(itup); + if (!PageIndexTupleOverwrite(page, offnum, (Item) itup, itupsize)) + elog(ERROR, "failed to add item to GiST index page, size %d bytes", + (int) itupsize); + data += itupsize; + /* should be nothing left after consuming 1 tuple */ + Assert(data - begin == datalen); + /* update insertion count for assert check below */ + ninserted++; + } + else if (xldata->ntodelete > 0) + { + /* Otherwise, delete old tuples if any */ OffsetNumber *todelete = (OffsetNumber *) data; data += sizeof(OffsetNumber) * xldata->ntodelete; @@ -92,7 +114,7 @@ gistRedoPageUpdateRecord(XLogReaderState *record) GistMarkTuplesDeleted(page); } - /* add tuples */ + /* Add new tuples if any */ if (data - begin < datalen) { OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber : @@ -115,6 +137,7 @@ gistRedoPageUpdateRecord(XLogReaderState *record) } } + /* Check that XLOG record contained expected number of tuples */ Assert(ninserted == xldata->ntoinsert); PageSetLSN(page, lsn); diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index bce0d53a9c4a232ea3fa57787f39f4a2a4f1877d..08e222e583b814a939d80cd16cd2e991e354a8ee 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -166,21 +166,24 @@ PageIsVerified(Page page, BlockNumber blkno) * inserted, or InvalidOffsetNumber if the item is not inserted for any * reason. A WARNING is issued indicating the reason for the refusal. * - * If flag PAI_OVERWRITE is set, we just store the item at the specified - * offsetNumber (which must be either a currently-unused item pointer, - * or one past the last existing item). Otherwise, - * if offsetNumber is valid and <= current max offset in the page, - * insert item into the array at that position by shuffling ItemId's - * down to make room. - * If offsetNumber is not valid, then assign one by finding the first + * offsetNumber must be either InvalidOffsetNumber to specify finding a + * free item pointer, or a value between FirstOffsetNumber and one past + * the last existing item, to specify using that particular item pointer. + * + * If offsetNumber is valid and flag PAI_OVERWRITE is set, we just store + * the item at the specified offsetNumber, which must be either a + * currently-unused item pointer, or one past the last existing item. + * + * If offsetNumber is valid and flag PAI_OVERWRITE is not set, insert + * the item at the specified offsetNumber, moving existing items later + * in the array to make room. + * + * If offsetNumber is not valid, then assign a slot by finding the first * one that is both unused and deallocated. * * If flag PAI_IS_HEAP is set, we enforce that there can't be more than * MaxHeapTuplesPerPage line pointers on the page. * - * If flag PAI_ALLOW_FAR_OFFSET is not set, we disallow placing items - * beyond one past the last existing item. - * * !!! EREPORT(ERROR) IS DISALLOWED HERE !!! */ OffsetNumber @@ -267,11 +270,8 @@ PageAddItemExtended(Page page, } } - /* - * Reject placing items beyond the first unused line pointer, unless - * caller asked for that behavior specifically. - */ - if ((flags & PAI_ALLOW_FAR_OFFSET) == 0 && offsetNumber > limit) + /* Reject placing items beyond the first unused line pointer */ + if (offsetNumber > limit) { elog(WARNING, "specified item offset is too large"); return InvalidOffsetNumber; @@ -290,10 +290,7 @@ PageAddItemExtended(Page page, * Note: do arithmetic as signed ints, to avoid mistakes if, say, * alignedSize > pd_upper. */ - if ((flags & PAI_ALLOW_FAR_OFFSET) != 0) - lower = Max(phdr->pd_lower, - SizeOfPageHeaderData + sizeof(ItemIdData) * offsetNumber); - else if (offsetNumber == limit || needshuffle) + if (offsetNumber == limit || needshuffle) lower = phdr->pd_lower + sizeof(ItemIdData); else lower = phdr->pd_lower; @@ -1093,6 +1090,113 @@ PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems) } } + +/* + * PageIndexTupleOverwrite + * + * Replace a specified tuple on an index page. + * + * The new tuple is placed exactly where the old one had been, shifting + * other tuples' data up or down as needed to keep the page compacted. + * This is better than deleting and reinserting the tuple, because it + * avoids any data shifting when the tuple size doesn't change; and + * even when it does, we avoid moving the item pointers around. + * Conceivably this could also be of use to an index AM that cares about + * the physical order of tuples as well as their ItemId order. + * + * If there's insufficient space for the new tuple, return false. Other + * errors represent data-corruption problems, so we just elog. + */ +bool +PageIndexTupleOverwrite(Page page, OffsetNumber offnum, + Item newtup, Size newsize) +{ + PageHeader phdr = (PageHeader) page; + ItemId tupid; + int oldsize; + unsigned offset; + Size alignednewsize; + int size_diff; + int itemcount; + + /* + * As with PageRepairFragmentation, paranoia seems justified. + */ + if (phdr->pd_lower < SizeOfPageHeaderData || + phdr->pd_lower > phdr->pd_upper || + phdr->pd_upper > phdr->pd_special || + phdr->pd_special > BLCKSZ || + phdr->pd_special != MAXALIGN(phdr->pd_special)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u", + phdr->pd_lower, phdr->pd_upper, phdr->pd_special))); + + itemcount = PageGetMaxOffsetNumber(page); + if ((int) offnum <= 0 || (int) offnum > itemcount) + elog(ERROR, "invalid index offnum: %u", offnum); + + tupid = PageGetItemId(page, offnum); + Assert(ItemIdHasStorage(tupid)); + oldsize = ItemIdGetLength(tupid); + offset = ItemIdGetOffset(tupid); + + if (offset < phdr->pd_upper || (offset + oldsize) > phdr->pd_special || + offset != MAXALIGN(offset)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("corrupted item pointer: offset = %u, size = %u", + offset, (unsigned int) oldsize))); + + /* + * Determine actual change in space requirement, check for page overflow. + */ + oldsize = MAXALIGN(oldsize); + alignednewsize = MAXALIGN(newsize); + if (alignednewsize > oldsize + (phdr->pd_upper - phdr->pd_lower)) + return false; + + /* + * Relocate existing data and update line pointers, unless the new tuple + * is the same size as the old (after alignment), in which case there's + * nothing to do. Notice that what we have to relocate is data before the + * target tuple, not data after, so it's convenient to express size_diff + * as the amount by which the tuple's size is decreasing, making it the + * delta to add to pd_upper and affected line pointers. + */ + size_diff = oldsize - (int) alignednewsize; + if (size_diff != 0) + { + char *addr = (char *) page + phdr->pd_upper; + int i; + + /* relocate all tuple data before the target tuple */ + memmove(addr + size_diff, addr, offset - phdr->pd_upper); + + /* adjust free space boundary pointer */ + phdr->pd_upper += size_diff; + + /* adjust affected line pointers too */ + for (i = FirstOffsetNumber; i <= itemcount; i++) + { + ItemId ii = PageGetItemId(phdr, i); + + /* Allow items without storage; currently only BRIN needs that */ + if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset) + ii->lp_off += size_diff; + } + } + + /* Update the item's tuple length (other fields shouldn't change) */ + ItemIdSetNormal(tupid, offset + size_diff, newsize); + + /* Copy new tuple data onto page */ + memcpy(PageGetItem(page, tupid), newtup, newsize); + + return true; +} + + /* * Set checksum for a page in shared buffers. * diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index 15cebfc60d6def79a8fbd7702fca527f3f84d134..0ea47f5457ebea285ca88399f5edb4f9893ac664 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -409,7 +409,6 @@ do { \ */ #define PAI_OVERWRITE (1 << 0) #define PAI_IS_HEAP (1 << 1) -#define PAI_ALLOW_FAR_OFFSET (1 << 2) extern void PageInit(Page page, Size pageSize, Size specialSize); extern bool PageIsVerified(Page page, BlockNumber blkno); @@ -429,6 +428,8 @@ extern void PageIndexTupleDelete(Page page, OffsetNumber offset); extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems); extern void PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems); +extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum, + Item newtup, Size newsize); extern char *PageSetChecksumCopy(Page page, BlockNumber blkno); extern void PageSetChecksumInplace(Page page, BlockNumber blkno);