diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 4bb638d6c9374a05dd0fd0162fb372013bb17721..dbcefbf273376e12b61f22b3a070d426c2ad3dde 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.85 2000/09/07 09:58:34 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.86 2000/10/04 00:04:41 vadim Exp $ * * * INTERFACE ROUTINES @@ -86,8 +86,8 @@ #include "utils/inval.h" #include "utils/relcache.h" -#ifdef XLOG /* comments are in _heap_update */ -static ItemPointerData _locked_tuple; +#ifdef XLOG /* comments are in heap_update */ +static xl_heaptid _locked_tuple_; #endif @@ -1650,8 +1650,9 @@ l2: * In the event of crash prio logging, TQUAL routines will see * HEAP_XMAX_UNLOGGED flag... */ - _locked_tuple = *otid; - XactPushRollback(_heap_unlock_tuple, (void*) &_locked_tuple); + _locked_tuple_.node = relation->rd_node; + _locked_tuple_.tid = *otid; + XactPushRollback(_heap_unlock_tuple, (void*) &_locked_tuple_); #endif TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax)); oldtup.t_data->t_cmax = GetCurrentCommandId(); diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 1dc945d9d962fed04fea5d237d86927244bbb46f..ddaaa5f304c4fc57ae43aa41d2a6ce15ee77d951 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.62 2000/08/25 23:13:33 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.63 2000/10/04 00:04:42 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -33,6 +33,7 @@ typedef struct int best_delta; /* best size delta so far */ } FindSplitData; +void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static TransactionId _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel, Buffer buf, @@ -54,7 +55,6 @@ static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, int leftfree, int rightfree, bool newitemonleft, Size firstrightitemsz); static Buffer _bt_getstackbuf(Relation rel, BTStack stack); -static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static void _bt_pgaddtup(Relation rel, Page page, Size itemsize, BTItem btitem, OffsetNumber itup_off, const char *where); @@ -514,6 +514,29 @@ _bt_insertonpg(Relation rel, } else { +#ifdef XLOG + /* XLOG stuff */ + { + char xlbuf[sizeof(xl_btree_insert) + 2 * sizeof(CommandId)]; + xl_btree_insert *xlrec = xlbuf; + int hsize = SizeOfBtreeInsert; + + xlrec->target.node = rel->rd_node; + ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff); + if (P_ISLEAF(lpageop)) + { + CommandId cid = GetCurrentCommandId(); + memcpy(xlbuf + SizeOfBtreeInsert, &(char*)cid, sizeof(CommandId)); + hsize += sizeof(CommandId); + } + + XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT, + xlbuf, hsize, (char*) btitem, itemsz); + + PageSetLSN(page, recptr); + PageSetSUI(page, ThisStartUpID); + } +#endif _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page"); itup_off = newitemoff; itup_blkno = BufferGetBlockNumber(buf); @@ -578,8 +601,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); /* if we're splitting this page, it won't be the root when we're done */ - oopaque->btpo_flags &= ~BTP_ROOT; - lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags; + lopaque->btpo_flags = oopaque->btpo_flags; + lopaque->btpo_flags &= ~BTP_ROOT; + ropaque->btpo_flags = lopaque->btpo_flags; lopaque->btpo_prev = oopaque->btpo_prev; lopaque->btpo_next = BufferGetBlockNumber(rbuf); ropaque->btpo_prev = BufferGetBlockNumber(buf); @@ -608,7 +632,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, item = (BTItem) PageGetItem(origpage, itemid); if (PageAddItem(rightpage, (Item) item, itemsz, rightoff, LP_USED) == InvalidOffsetNumber) - elog(FATAL, "btree: failed to add hikey to the right sibling"); + elog(STOP, "btree: failed to add hikey to the right sibling"); rightoff = OffsetNumberNext(rightoff); } @@ -633,7 +657,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, } if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, LP_USED) == InvalidOffsetNumber) - elog(FATAL, "btree: failed to add hikey to the left sibling"); + elog(STOP, "btree: failed to add hikey to the left sibling"); leftoff = OffsetNumberNext(leftoff); /* @@ -704,6 +728,75 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, } } + /* + * We have to grab the right sibling (if any) and fix the prev + * pointer there. We are guaranteed that this is deadlock-free + * since no other writer will be holding a lock on that page + * and trying to move left, and all readers release locks on a page + * before trying to fetch its neighbors. + */ + + if (!P_RIGHTMOST(ropaque)) + { + sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); + spage = BufferGetPage(sbuf); + } + +#ifdef XLOG + /* + * Right sibling is locked, new siblings are prepared, but original + * page is not updated yet. Log changes before continuing. + * + * NO ELOG(ERROR) till right sibling is updated. + * + */ + { + char xlbuf[sizeof(xl_btree_split) + + 2 * sizeof(CommandId) + BLCKSZ]; + xl_btree_split *xlrec = xlbuf; + int hsize = SizeOfBtreeSplit; + int flag = (newitemonleft) ? + XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT; + + xlrec->target.node = rel->rd_node; + ItemPointerSet(&(xlrec->target.tid), itup_blkno, itup_off); + if (P_ISLEAF(lopaque)) + { + CommandId cid = GetCurrentCommandId(); + memcpy(xlbuf + hsize, &(char*)cid, sizeof(CommandId)); + hsize += sizeof(CommandId); + } + if (newitemonleft) + { + memcpy(xlbuf + hsize, (char*) newitem, newitemsz); + hsize += newitemsz; + xlrec->otherblk = BufferGetBlockNumber(rbuf); + } + else + xlrec->otherblk = BufferGetBlockNumber(buf); + + xlrec->rightblk = ropaque->btpo_next; + + /* + * Dirrect access to page is not good but faster - we should + * implement some new func in page API. + */ + XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, flag, xlbuf, + hsize, (char*)rightpage + (PageHeader) rightpage)->pd_upper, + ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->upper); + + PageSetLSN(leftpage, recptr); + PageSetSUI(leftpage, ThisStartUpID); + PageSetLSN(rightpage, recptr); + PageSetSUI(rightpage, ThisStartUpID); + if (!P_RIGHTMOST(ropaque)) + { + PageSetLSN(spage, recptr); + PageSetSUI(spage, ThisStartUpID); + } + } +#endif + /* * By here, the original data page has been split into two new halves, * and these are correct. The algorithm requires that the left page @@ -716,18 +809,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, PageRestoreTempPage(leftpage, origpage); - /* - * Finally, we need to grab the right sibling (if any) and fix the - * prev pointer there. We are guaranteed that this is deadlock-free - * since no other writer will be holding a lock on that page - * and trying to move left, and all readers release locks on a page - * before trying to fetch its neighbors. - */ - if (!P_RIGHTMOST(ropaque)) { - sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); - spage = BufferGetPage(sbuf); sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); sopaque->btpo_prev = BufferGetBlockNumber(rbuf); @@ -1002,7 +1085,7 @@ _bt_getstackbuf(Relation rel, BTStack stack) * two new children. The new root page is neither pinned nor locked, and * we have also written out lbuf and rbuf and dropped their pins/locks. */ -static void +void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) { Buffer rootbuf; @@ -1011,7 +1094,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) rootpage; BlockNumber lbkno, rbkno; - BlockNumber rootbknum; + BlockNumber rootblknum; BTPageOpaque rootopaque; ItemId itemid; BTItem item; @@ -1021,12 +1104,16 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) /* get a new root page */ rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); rootpage = BufferGetPage(rootbuf); - rootbknum = BufferGetBlockNumber(rootbuf); + rootblknum = BufferGetBlockNumber(rootbuf); + + + /* NO ELOG(ERROR) from here till newroot op is logged */ /* set btree special data */ rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE; rootopaque->btpo_flags |= BTP_ROOT; + rootopaque->btpo_parent = BTREE_METAPAGE; lbkno = BufferGetBlockNumber(lbuf); rbkno = BufferGetBlockNumber(rbuf); @@ -1040,7 +1127,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) */ ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent = ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent = - rootbknum; + rootblknum; /* * Create downlink item for left page (old root). Since this will be @@ -1058,7 +1145,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * the two items will go into positions P_HIKEY and P_FIRSTKEY. */ if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) - elog(FATAL, "btree: failed to add leftkey to new root page"); + elog(STOP, "btree: failed to add leftkey to new root page"); pfree(new_item); /* @@ -1075,14 +1162,35 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * insert the right page pointer into the new root page. */ if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber) - elog(FATAL, "btree: failed to add rightkey to new root page"); + elog(STOP, "btree: failed to add rightkey to new root page"); pfree(new_item); +#ifdef XLOG + /* XLOG stuff */ + { + xl_btree_newroot xlrec; + xlrec.node = rel->rd_node; + xlrec.rootblk = rootblknum; + + /* + * Dirrect access to page is not good but faster - we should + * implement some new func in page API. + */ + XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, + &xlrec, SizeOfBtreeNewroot, + (char*)rootpage + (PageHeader) rootpage)->pd_upper, + ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->upper); + + PageSetLSN(rootpage, recptr); + PageSetSUI(rootpage, ThisStartUpID); + } +#endif + /* write and let go of the new root buffer */ _bt_wrtbuf(rel, rootbuf); /* update metadata page with new root block number */ - _bt_metaproot(rel, rootbknum, 0); + _bt_metaproot(rel, rootblknum, 0); /* update and release new sibling, and finally the old root */ _bt_wrtbuf(rel, rbuf); @@ -1125,7 +1233,7 @@ _bt_pgaddtup(Relation rel, if (PageAddItem(page, (Item) btitem, itemsize, itup_off, LP_USED) == InvalidOffsetNumber) - elog(FATAL, "btree: failed to add item to the %s for %s", + elog(STOP, "btree: failed to add item to the %s for %s", where, RelationGetRelationName(rel)); } diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 40604dbc25830dfbaea2f03b4f39a3290ad0e477..2da74219010543275f7bed0ca84c6f3b7d7889b1 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.37 2000/07/21 06:42:32 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.38 2000/10/04 00:04:42 vadim Exp $ * * NOTES * Postgres btree pages look like ordinary relation pages. The opaque @@ -128,7 +128,7 @@ _bt_getroot(Relation rel, int access) Page metapg; BTPageOpaque metaopaque; Buffer rootbuf; - Page rootpg; + Page rootpage; BTPageOpaque rootopaque; BlockNumber rootblkno; BTMetaPageData *metad; @@ -177,14 +177,31 @@ _bt_getroot(Relation rel, int access) */ rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); rootblkno = BufferGetBlockNumber(rootbuf); - rootpg = BufferGetPage(rootbuf); + rootpage = BufferGetPage(rootbuf); + + /* NO ELOG(ERROR) till meta is updated */ + + _bt_pageinit(rootpage, BufferGetPageSize(rootbuf)); + rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); + rootopaque->btpo_flags |= (BTP_LEAF | BTP_ROOT); + +#ifdef XLOG + /* XLOG stuff */ + { + xl_btree_insert xlrec; + xlrec.node = rel->rd_node; + + XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, + &xlrec, SizeOfBtreeNewroot, NULL, 0); + + PageSetLSN(rootpage, recptr); + PageSetSUI(rootpage, ThisStartUpID); + } +#endif metad->btm_root = rootblkno; metad->btm_level = 1; - _bt_pageinit(rootpg, BufferGetPageSize(rootbuf)); - rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpg); - rootopaque->btpo_flags |= (BTP_LEAF | BTP_ROOT); _bt_wrtnorelbuf(rel, rootbuf); /* swap write lock for read lock */ @@ -218,8 +235,8 @@ _bt_getroot(Relation rel, int access) * at the metadata page and got the root buffer, then we got the wrong * buffer. Release it and try again. */ - rootpg = BufferGetPage(rootbuf); - rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpg); + rootpage = BufferGetPage(rootbuf); + rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); if (! P_ISROOT(rootopaque)) { @@ -396,6 +413,20 @@ _bt_pagedel(Relation rel, ItemPointer tid) buf = _bt_getbuf(rel, blkno, BT_WRITE); page = BufferGetPage(buf); +#ifdef XLOG + /* XLOG stuff */ + { + xl_btree_delete xlrec; + xlrec.target.node = rel->rd_node; + xlrec.target.tid = *tid; + XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, + (char*) xlrec, SizeOfBtreeDelete, NULL, 0); + + PageSetLSN(page, recptr); + PageSetSUI(page, ThisStartUpID); + } +#endif + PageIndexTupleDelete(page, offno); /* write the buffer and release the lock */ diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index fb358a3157e840f60d0fa3214966c3e15d5dcdbe..437b6637b2438bbe47f798b5c2c3d4e027a11fb1 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: nbtree.h,v 1.42 2000/09/12 06:07:52 vadim Exp $ + * $Id: nbtree.h,v 1.43 2000/10/04 00:04:43 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -46,6 +46,8 @@ typedef struct BTPageOpaqueData typedef BTPageOpaqueData *BTPageOpaque; +#define BTREE_METAPAGE 0 /* first page is meta */ + /* * BTScanOpaqueData is used to remember which buffers we're currently * examining in the scan. We keep these buffers pinned (but not locked, @@ -106,6 +108,12 @@ typedef struct BTItemData typedef BTItemData *BTItem; +/* + * For XLOG: size without alignement. Sizeof works as long as + * IndexTupleData has exactly 8 bytes. + */ +#define SizeOfBTItem sizeof(BTItemData) + /* Test whether items are the "same" per the above notes */ #define BTItemSame(i1, i2) ( (i1)->bti_itup.t_tid.ip_blkid.bi_hi == \ (i2)->bti_itup.t_tid.ip_blkid.bi_hi && \ @@ -195,55 +203,75 @@ typedef BTStackData *BTStack; #define XLOG_BTREE_DELETE 0x00 /* delete btitem */ #define XLOG_BTREE_INSERT 0x10 /* add btitem without split */ #define XLOG_BTREE_SPLIT 0x20 /* add btitem with split */ -#define XLOG_BTREE_ONLEFT 0x40 /* flag for split case: new btitem */ +#define XLOG_BTREE_SPLEFT 0x30 /* as above + flag that new btitem */ /* goes to the left sibling */ +#define XLOG_BTREE_NEWROOT 0x40 /* new root page */ /* - * All what we need to find changed index tuple (18 bytes) + * All what we need to find changed index tuple (14 bytes) */ typedef struct xl_btreetid { RelFileNode node; - CommandId cid; /* this is for "better" tuple' */ - /* identification - it allows to avoid */ - /* "compensation" records for undo */ ItemPointerData tid; /* changed tuple id */ } xl_btreetid; -/* This is what we need to know about delete - ALIGN(18) = 24 bytes */ +/* + * This is what we need to know about delete - ALIGN(14) = 18 bytes. + */ typedef struct xl_btree_delete { xl_btreetid target; /* deleted tuple id */ } xl_btree_delete; -#define SizeOfBtreeDelete (offsetof(xl_btreetid, tid) + SizeOfIptrData)) +#define SizeOfBtreeDelete (offsetof(xl_btreetid, tid) + SizeOfIptrData) -/* This is what we need to know about pure (without split) insert - 26 + key data */ +/* + * This is what we need to know about pure (without split) insert - + * 14 + [4] + btitem with key data. Note that we need in CommandID + * (4 bytes) only for leaf page insert. + */ typedef struct xl_btree_insert { xl_btreetid target; /* inserted tuple id */ - BTItemData btitem; - /* KEY DATA FOLLOWS AT END OF STRUCT */ + /* [CommandID and ] BTITEM FOLLOWS AT END OF STRUCT */ } xl_btree_insert; -#define SizeOfBtreeInsert (offsetof(xl_btree_insert, btitem) + sizeof(BTItemData)) +#define SizeOfBtreeInsert (offsetof(xl_btreetid, tid) + SizeOfIptrData) -/* This is what we need to know about insert with split - 26 + right sibling btitems */ +/* + * This is what we need to know about insert with split - + * 22 + [4] + [btitem] + right sibling btitems. Note that we need in + * CommandID (4 bytes) only for leaf page insert. + */ typedef struct xl_btree_split { xl_btreetid target; /* inserted tuple id */ - BlockNumber othblk; /* second block participated in split: */ + BlockId otherblk; /* second block participated in split: */ /* first one is stored in target' tid */ - BlockNumber parblk; /* parent block to be updated */ + BlockId rightblk; /* next right block */ /* * We log all btitems from the right sibling. If new btitem goes on - * the left sibling then we log it too and it will be first BTItemData - * at the end of this struct. + * the left sibling then we log it too and it will be the first + * BTItemData at the end of this struct, but after (for the leaf + * pages) CommandId. */ } xl_btree_split; -#define SizeOfBtreeSplit (offsetof(xl_btree_insert, parblk) + sizeof(BlockNumber)) +#define SizeOfBtreeSplit (offsetof(xl_btree_insert, rightblk) + sizeof(BlockId)) + +/* + * New root log record. + */ +typedef struct xl_btree_newroot +{ + RelFileNode node; + BlockId rootblk; + /* 0 or 2 BTITEMS FOLLOW AT END OF STRUCT */ +} xl_btree_newroot; + +#define SizeOfBtreeNewroot (offsetof(xl_btree_newroot, rootblk) + sizeof(BlockId)) /* end of XLOG stuff */