diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index c05f82b209d3272a53d2f9753f44a3f848daad97..6d8cf324ecf18b4f92ea79088b959cd1e68100c1 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.134 2006/03/31 23:32:05 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.135 2006/04/13 03:53:05 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -323,9 +323,9 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, * child page on the parent. * + updates the metapage if a true root or fast root is split. * - * On entry, we must have the right buffer on which to do the - * insertion, and the buffer must be pinned and locked. On return, - * we will have dropped both the pin and the write lock on the buffer. + * On entry, we must have the right buffer in which to do the + * insertion, and the buffer must be pinned and write-locked. On return, + * we will have dropped both the pin and the lock on the buffer. * * If 'afteritem' is >0 then the new tuple must be inserted after the * existing item of that number, noplace else. If 'afteritem' is 0 @@ -527,6 +527,8 @@ _bt_insertonpg(Relation rel, */ if (split_only_page) { + Assert(!P_ISLEAF(lpageop)); + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); metapg = BufferGetPage(metabuf); metad = BTPageGetMeta(metapg); @@ -557,10 +559,11 @@ _bt_insertonpg(Relation rel, if (!rel->rd_istemp) { xl_btree_insert xlrec; + BlockNumber xldownlink; xl_btree_metadata xlmeta; uint8 xlinfo; XLogRecPtr recptr; - XLogRecData rdata[3]; + XLogRecData rdata[4]; XLogRecData *nextrdata; IndexTupleData trunctuple; @@ -572,6 +575,22 @@ _bt_insertonpg(Relation rel, rdata[0].buffer = InvalidBuffer; rdata[0].next = nextrdata = &(rdata[1]); + if (P_ISLEAF(lpageop)) + xlinfo = XLOG_BTREE_INSERT_LEAF; + else + { + xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid)); + Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); + + nextrdata->data = (char *) &xldownlink; + nextrdata->len = sizeof(BlockNumber); + nextrdata->buffer = InvalidBuffer; + nextrdata->next = nextrdata + 1; + nextrdata++; + + xlinfo = XLOG_BTREE_INSERT_UPPER; + } + if (BufferIsValid(metabuf)) { xlmeta.root = metad->btm_root; @@ -584,12 +603,9 @@ _bt_insertonpg(Relation rel, nextrdata->buffer = InvalidBuffer; nextrdata->next = nextrdata + 1; nextrdata++; + xlinfo = XLOG_BTREE_INSERT_META; } - else if (P_ISLEAF(lpageop)) - xlinfo = XLOG_BTREE_INSERT_LEAF; - else - xlinfo = XLOG_BTREE_INSERT_UPPER; /* Read comments in _bt_pgaddtup */ if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop)) @@ -633,7 +649,7 @@ _bt_insertonpg(Relation rel, /* * _bt_split() -- split a page in the btree. * - * On entry, buf is the page to split, and is write-locked and pinned. + * On entry, buf is the page to split, and is pinned and write-locked. * firstright is the item index of the first item to be moved to the * new right page. newitemoff etc. tell us about the new item that * must be inserted along with the data from the old page. @@ -860,7 +876,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * Direct access to page is not good but faster - we should implement * some new func in page API. Note we only store the tuples * themselves, knowing that the item pointers are in the same order - * and can be reconstructed by scanning the tuples. + * and can be reconstructed by scanning the tuples. See comments + * for _bt_restore_page(). */ xlrec.leftlen = ((PageHeader) leftpage)->pd_special - ((PageHeader) leftpage)->pd_upper; @@ -1445,6 +1462,9 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * Insert the left page pointer into the new root page. The root page is * the rightmost page on its level so there is no "high key" in it; the * two items will go into positions P_HIKEY and P_FIRSTKEY. + * + * Note: we *must* insert the two items in item-number order, for the + * benefit of _bt_restore_page(). */ if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) elog(PANIC, "failed to add leftkey to new root page"); diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index c10936a8e2788d1cbee0d891fb65bc80e7bf70bf..7aae27dc853d97a6c36ee2feaa25ec3a89a5c499 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.31 2006/04/01 03:03:37 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.32 2006/04/13 03:53:05 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -51,32 +51,16 @@ log_incomplete_split(RelFileNode node, BlockNumber leftblk, } static void -forget_matching_split(Relation reln, RelFileNode node, - BlockNumber insertblk, OffsetNumber offnum, - bool is_root) +forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root) { - Buffer buffer; - Page page; - IndexTuple itup; - BlockNumber rightblk; ListCell *l; - /* Get downlink TID from page */ - buffer = XLogReadBuffer(reln, insertblk, false); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); - itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); - rightblk = ItemPointerGetBlockNumber(&(itup->t_tid)); - Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); - UnlockReleaseBuffer(buffer); - foreach(l, incomplete_splits) { bt_incomplete_split *split = (bt_incomplete_split *) lfirst(l); if (RelFileNodeEquals(node, split->node) && - rightblk == split->rightblk) + downlink == split->rightblk) { if (is_root != split->is_root) elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)", @@ -87,6 +71,20 @@ forget_matching_split(Relation reln, RelFileNode node, } } +/* + * _bt_restore_page -- re-enter all the index tuples on a page + * + * The page is freshly init'd, and *from (length len) is a copy of what + * had been its upper part (pd_upper to pd_special). We assume that the + * tuples had been added to the page in item-number order, and therefore + * the one with highest item number appears first (lowest on the page). + * + * NOTE: the way this routine is coded, the rebuilt page will have the items + * in correct itemno sequence, but physically the opposite order from the + * original, because we insert them in the opposite of itemno order. This + * does not matter in any current btree code, but it's something to keep an + * eye on. Is it worth changing just on general principles? + */ static void _bt_restore_page(Page page, char *from, int len) { @@ -158,9 +156,16 @@ btree_xlog_insert(bool isleaf, bool ismeta, char *datapos; int datalen; xl_btree_metadata md; + BlockNumber downlink = 0; datapos = (char *) xlrec + SizeOfBtreeInsert; datalen = record->xl_len - SizeOfBtreeInsert; + if (!isleaf) + { + memcpy(&downlink, datapos, sizeof(BlockNumber)); + datapos += sizeof(BlockNumber); + datalen -= sizeof(BlockNumber); + } if (ismeta) { memcpy(&md, datapos, sizeof(xl_btree_metadata)); @@ -168,8 +173,7 @@ btree_xlog_insert(bool isleaf, bool ismeta, datalen -= sizeof(xl_btree_metadata); } - if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && - incomplete_splits == NIL) + if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf) return; /* nothing to do */ reln = XLogOpenRelation(xlrec->target.node); @@ -208,13 +212,8 @@ btree_xlog_insert(bool isleaf, bool ismeta, md.fastroot, md.fastlevel); /* Forget any split this insertion completes */ - if (!isleaf && incomplete_splits != NIL) - { - forget_matching_split(reln, xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), - false); - } + if (!isleaf) + forget_matching_split(xlrec->target.node, downlink, false); } static void @@ -224,14 +223,17 @@ btree_xlog_split(bool onleft, bool isroot, xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record); Relation reln; BlockNumber targetblk; + OffsetNumber targetoff; BlockNumber leftsib; BlockNumber rightsib; + BlockNumber downlink = 0; Buffer buffer; Page page; BTPageOpaque pageop; reln = XLogOpenRelation(xlrec->target.node); targetblk = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + targetoff = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); leftsib = (onleft) ? targetblk : xlrec->otherblk; rightsib = (onleft) ? xlrec->otherblk : targetblk; @@ -252,6 +254,16 @@ btree_xlog_split(bool onleft, bool isroot, (char *) xlrec + SizeOfBtreeSplit, xlrec->leftlen); + if (onleft && xlrec->level > 0) + { + IndexTuple itup; + + /* extract downlink in the target tuple */ + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff)); + downlink = ItemPointerGetBlockNumber(&(itup->t_tid)); + Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); @@ -274,6 +286,16 @@ btree_xlog_split(bool onleft, bool isroot, (char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen, record->xl_len - SizeOfBtreeSplit - xlrec->leftlen); + if (!onleft && xlrec->level > 0) + { + IndexTuple itup; + + /* extract downlink in the target tuple */ + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff)); + downlink = ItemPointerGetBlockNumber(&(itup->t_tid)); + Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); @@ -308,13 +330,8 @@ btree_xlog_split(bool onleft, bool isroot, } /* Forget any split this insertion completes */ - if (xlrec->level > 0 && incomplete_splits != NIL) - { - forget_matching_split(reln, xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), - false); - } + if (xlrec->level > 0) + forget_matching_split(xlrec->target.node, downlink, false); /* The job ain't done till the parent link is inserted... */ log_incomplete_split(xlrec->target.node, @@ -516,6 +533,7 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; BTPageOpaque pageop; + BlockNumber downlink = 0; reln = XLogOpenRelation(xlrec->node); buffer = XLogReadBuffer(reln, xlrec->rootblk, true); @@ -532,9 +550,17 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) pageop->btpo_flags |= BTP_LEAF; if (record->xl_len > SizeOfBtreeNewroot) + { + IndexTuple itup; + _bt_restore_page(page, (char *) xlrec + SizeOfBtreeNewroot, record->xl_len - SizeOfBtreeNewroot); + /* extract downlink to the right-hand split page */ + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)); + downlink = ItemPointerGetBlockNumber(&(itup->t_tid)); + Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); + } PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); @@ -546,14 +572,8 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) xlrec->rootblk, xlrec->level); /* Check to see if this satisfies any incomplete insertions */ - if (record->xl_len > SizeOfBtreeNewroot && - incomplete_splits != NIL) - { - forget_matching_split(reln, xlrec->node, - xlrec->rootblk, - P_FIRSTKEY, - true); - } + if (record->xl_len > SizeOfBtreeNewroot) + forget_matching_split(xlrec->node, downlink, true); } diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index ba4a8441f1cdc4eab7b67b4d8a1683e481cfa0f9..4b254f1fd12af9754f86972da7cb6d7e803aa40d 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/nbtree.h,v 1.95 2006/04/01 03:03:37 tgl Exp $ + * $PostgreSQL: pgsql/src/include/access/nbtree.h,v 1.96 2006/04/13 03:53:05 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -206,6 +206,7 @@ typedef struct xl_btree_metadata typedef struct xl_btree_insert { xl_btreetid target; /* inserted tuple id */ + /* BlockNumber downlink field FOLLOWS IF NOT XLOG_BTREE_INSERT_LEAF */ /* xl_btree_metadata FOLLOWS IF XLOG_BTREE_INSERT_META */ /* INDEX TUPLE FOLLOWS AT END OF STRUCT */ } xl_btree_insert;