diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index cd70a4c73e0766726d840a4a0ed306626d9f33d6..37c7bd1f8867b7058e44dc48490cbf9f57a8f6e5 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.178 2010/03/28 09:27:01 sriggs Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.179 2010/08/29 19:33:14 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -74,9 +74,8 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page, static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstoldonright, bool newitemonleft, int dataitemstoleft, Size firstoldonrightsz); -static void _bt_pgaddtup(Relation rel, Page page, - Size itemsize, IndexTuple itup, - OffsetNumber itup_off, const char *where); +static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, + OffsetNumber itup_off); static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel); @@ -753,7 +752,9 @@ _bt_insertonpg(Relation rel, /* Do the update. No ereport(ERROR) until changes are logged */ START_CRIT_SECTION(); - _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page"); + if (!_bt_pgaddtup(page, itemsz, itup, newitemoff)) + elog(PANIC, "failed to add new item to block %u in index \"%s\"", + itup_blkno, RelationGetRelationName(rel)); MarkBufferDirty(buf); @@ -879,6 +880,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, Page origpage; Page leftpage, rightpage; + BlockNumber origpagenumber, + rightpagenumber; BTPageOpaque ropaque, lopaque, oopaque; @@ -894,11 +897,27 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, OffsetNumber i; bool isroot; + /* Acquire a new page to split into */ rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + + /* + * origpage is the original page to be split. leftpage is a temporary + * buffer that receives the left-sibling data, which will be copied back + * into origpage on success. rightpage is the new page that receives + * the right-sibling data. If we fail before reaching the critical + * section, origpage hasn't been modified and leftpage is only workspace. + * In principle we shouldn't need to worry about rightpage either, + * because it hasn't been linked into the btree page structure; but to + * avoid leaving possibly-confusing junk behind, we are careful to rewrite + * rightpage as zeroes before throwing any error. + */ origpage = BufferGetPage(buf); leftpage = PageGetTempPage(origpage); rightpage = BufferGetPage(rbuf); + origpagenumber = BufferGetBlockNumber(buf); + rightpagenumber = BufferGetBlockNumber(rbuf); + _bt_pageinit(leftpage, BufferGetPageSize(buf)); /* rightpage was already initialized by _bt_getbuf */ @@ -923,8 +942,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE); ropaque->btpo_flags = lopaque->btpo_flags; lopaque->btpo_prev = oopaque->btpo_prev; - lopaque->btpo_next = BufferGetBlockNumber(rbuf); - ropaque->btpo_prev = BufferGetBlockNumber(buf); + lopaque->btpo_next = rightpagenumber; + ropaque->btpo_prev = origpagenumber; ropaque->btpo_next = oopaque->btpo_next; lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level; /* Since we already have write-lock on both pages, ok to read cycleid */ @@ -947,9 +966,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, item = (IndexTuple) PageGetItem(origpage, itemid); if (PageAddItem(rightpage, (Item) item, itemsz, rightoff, false, false) == InvalidOffsetNumber) - elog(PANIC, "failed to add hikey to the right sibling" + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add hikey to the right sibling" " while splitting block %u of index \"%s\"", - BufferGetBlockNumber(buf), RelationGetRelationName(rel)); + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } @@ -974,9 +996,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, } if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, false, false) == InvalidOffsetNumber) - elog(PANIC, "failed to add hikey to the left sibling" + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add hikey to the left sibling" " while splitting block %u of index \"%s\"", - BufferGetBlockNumber(buf), RelationGetRelationName(rel)); + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); /* @@ -998,14 +1023,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, { if (newitemonleft) { - _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } } @@ -1013,14 +1048,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, /* decide which page to put it on */ if (i < firstright) { - _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add old item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add old item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } } @@ -1034,8 +1079,13 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * not be splitting the page). */ Assert(!newitemonleft); - _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } @@ -1047,16 +1097,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * neighbors. */ - if (!P_RIGHTMOST(ropaque)) + if (!P_RIGHTMOST(oopaque)) { - sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); + sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE); spage = BufferGetPage(sbuf); sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); - if (sopaque->btpo_prev != ropaque->btpo_prev) - elog(PANIC, "right sibling's left-link doesn't match: " - "block %u links to %u instead of expected %u in index \"%s\"", - ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev, + if (sopaque->btpo_prev != origpagenumber) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "right sibling's left-link doesn't match: " + "block %u links to %u instead of expected %u in index \"%s\"", + oopaque->btpo_next, sopaque->btpo_prev, origpagenumber, RelationGetRelationName(rel)); + } /* * Check to see if we can set the SPLIT_END flag in the right-hand @@ -1081,8 +1134,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * * NO EREPORT(ERROR) till right sibling is updated. We can get away with * not starting the critical section till here because we haven't been - * scribbling on the original page yet, and we don't care about the new - * sibling until it's linked into the btree. + * scribbling on the original page yet; see comments above. */ START_CRIT_SECTION(); @@ -1094,19 +1146,21 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * (in the page management code) that the center of a page always be * clean, and the most efficient way to guarantee this is just to compact * the data by reinserting it into a new left page. (XXX the latter - * comment is probably obsolete.) + * comment is probably obsolete; but in any case it's good to not scribble + * on the original page until we enter the critical section.) * * We need to do this before writing the WAL record, so that XLogInsert * can WAL log an image of the page if necessary. */ PageRestoreTempPage(leftpage, origpage); + /* leftpage, lopaque must not be used below here */ MarkBufferDirty(buf); MarkBufferDirty(rbuf); if (!P_RIGHTMOST(ropaque)) { - sopaque->btpo_prev = BufferGetBlockNumber(rbuf); + sopaque->btpo_prev = rightpagenumber; MarkBufferDirty(sbuf); } @@ -1120,8 +1174,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, XLogRecData *lastrdata; xlrec.node = rel->rd_node; - xlrec.leftsib = BufferGetBlockNumber(buf); - xlrec.rightsib = BufferGetBlockNumber(rbuf); + xlrec.leftsib = origpagenumber; + xlrec.rightsib = rightpagenumber; xlrec.rnext = ropaque->btpo_next; xlrec.level = ropaque->btpo.level; xlrec.firstright = firstright; @@ -1920,13 +1974,11 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * we insert the tuples in order, so that the given itup_off does * represent the final position of the tuple! */ -static void -_bt_pgaddtup(Relation rel, - Page page, +static bool +_bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, - OffsetNumber itup_off, - const char *where) + OffsetNumber itup_off) { BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); IndexTupleData trunctuple; @@ -1941,8 +1993,9 @@ _bt_pgaddtup(Relation rel, if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false) == InvalidOffsetNumber) - elog(PANIC, "failed to add item to the %s in index \"%s\"", - where, RelationGetRelationName(rel)); + return false; + + return true; } /* diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 5e9e7b9270ca88cea6c0f395c86970d2bd81a6fe..25fdcd289c81ad5a7fdeeb5302d05f4554dd0e19 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.123 2010/07/06 19:18:55 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.124 2010/08/29 19:33:14 tgl Exp $ * * NOTES * Postgres btree pages look like ordinary relation pages. The opaque @@ -1175,6 +1175,13 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack) */ rightsib = opaque->btpo_next; rbuf = _bt_getbuf(rel, rightsib, BT_WRITE); + page = BufferGetPage(rbuf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + if (opaque->btpo_prev != target) + elog(ERROR, "right sibling's left-link doesn't match: " + "block %u links to %u instead of expected %u in index \"%s\"", + rightsib, opaque->btpo_prev, target, + RelationGetRelationName(rel)); /* * Next find and write-lock the current parent of the target page. This is @@ -1252,6 +1259,38 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack) } } + /* + * Check that the parent-page index items we're about to delete/overwrite + * contain what we expect. This can fail if the index has become + * corrupt for some reason. We want to throw any error before entering + * the critical section --- otherwise it'd be a PANIC. + * + * The test on the target item is just an Assert because _bt_getstackbuf + * should have guaranteed it has the expected contents. The test on the + * next-child downlink is known to sometimes fail in the field, though. + */ + page = BufferGetPage(pbuf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + +#ifdef USE_ASSERT_CHECKING + itemid = PageGetItemId(page, poffset); + itup = (IndexTuple) PageGetItem(page, itemid); + Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target); +#endif + + if (!parent_half_dead) + { + OffsetNumber nextoffset; + + nextoffset = OffsetNumberNext(poffset); + itemid = PageGetItemId(page, nextoffset); + itup = (IndexTuple) PageGetItem(page, itemid); + if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib) + elog(ERROR, "right sibling %u of block %u is not next child %u of block %u in index \"%s\"", + rightsib, target, ItemPointerGetBlockNumber(&(itup->t_tid)), + parent, RelationGetRelationName(rel)); + } + /* * Here we begin doing the deletion. */ @@ -1265,8 +1304,6 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack) * to copy the right sibling's downlink over the target downlink, and then * delete the following item. */ - page = BufferGetPage(pbuf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (parent_half_dead) { PageIndexTupleDelete(page, poffset); @@ -1278,23 +1315,16 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack) itemid = PageGetItemId(page, poffset); itup = (IndexTuple) PageGetItem(page, itemid); - Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target); ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY); nextoffset = OffsetNumberNext(poffset); - /* This part is just for double-checking */ - itemid = PageGetItemId(page, nextoffset); - itup = (IndexTuple) PageGetItem(page, itemid); - if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib) - elog(PANIC, "right sibling %u of block %u is not next child of %u in index \"%s\"", - rightsib, target, BufferGetBlockNumber(pbuf), - RelationGetRelationName(rel)); PageIndexTupleDelete(page, nextoffset); } /* * Update siblings' side-links. Note the target page's side-links will - * continue to point to the siblings. + * continue to point to the siblings. Asserts here are just rechecking + * things we already verified above. */ if (BufferIsValid(lbuf)) {