diff --git a/src/backend/access/gist/Makefile b/src/backend/access/gist/Makefile index f2ec10a90af3a538eb7026d91b962f391f3d14f5..b22f846a23ddc841bf10cb2fbdb1fa2356631fd9 100644 --- a/src/backend/access/gist/Makefile +++ b/src/backend/access/gist/Makefile @@ -4,7 +4,7 @@ # Makefile for access/gist # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.12 2003/11/29 19:51:39 pgsql Exp $ +# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.13 2005/06/14 11:45:13 teodor Exp $ # #------------------------------------------------------------------------- @@ -12,7 +12,7 @@ subdir = src/backend/access/gist top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = gist.o gistget.o gistscan.o +OBJS = gist.o gistutil.o gistxlog.o gistget.o gistscan.o all: SUBSYS.o diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 2a55c91f0df347138cec17f62d19b2b399fee7ca..4e3faccdf92566c4cdd2df8319bac9dde55f23a3 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.118 2005/06/06 17:01:21 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -23,43 +23,6 @@ #include "miscadmin.h" #include "utils/memutils.h" - -#undef GIST_PAGEADDITEM - -#define ATTSIZE(datum, tupdesc, i, isnull) \ - ( \ - (isnull) ? 0 : \ - att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \ - ) - -/* result's status */ -#define INSERTED 0x01 -#define SPLITED 0x02 - -/* group flags ( in gistSplit ) */ -#define LEFT_ADDED 0x01 -#define RIGHT_ADDED 0x02 -#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED ) - -/* - * This defines only for shorter code, used in gistgetadjusted - * and gistadjsubkey only - */ -#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \ - if (isnullkey) { \ - gistentryinit((evp), rkey, r, NULL, \ - (OffsetNumber) 0, rkeyb, FALSE); \ - } else { \ - gistentryinit((evp), okey, r, NULL, \ - (OffsetNumber) 0, okeyb, FALSE); \ - } \ -} while(0) - -#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \ - FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \ - FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \ -} while(0); - /* Working state for gistbuild and its callback */ typedef struct { @@ -80,62 +43,34 @@ static void gistbuildCallback(Relation index, static void gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *GISTstate); -static int gistlayerinsert(Relation r, BlockNumber blkno, - IndexTuple **itup, - int *len, - GISTSTATE *giststate); -static OffsetNumber gistwritebuffer(Relation r, - Page page, - IndexTuple *itup, - int len, - OffsetNumber off); -static bool gistnospace(Page page, IndexTuple *itvec, int len); -static IndexTuple *gistreadbuffer(Buffer buffer, int *len); -static IndexTuple *gistjoinvector( - IndexTuple *itvec, int *len, - IndexTuple *additvec, int addlen); -static IndexTuple gistunion(Relation r, IndexTuple *itvec, - int len, GISTSTATE *giststate); - -static IndexTuple gistgetadjusted(Relation r, - IndexTuple oldtup, - IndexTuple addtup, +static void gistfindleaf(GISTInsertState *state, GISTSTATE *giststate); -static int gistfindgroup(GISTSTATE *giststate, - GISTENTRY *valvec, GIST_SPLITVEC *spl); -static void gistadjsubkey(Relation r, - IndexTuple *itup, int *len, - GIST_SPLITVEC *v, - GISTSTATE *giststate); -static IndexTuple gistFormTuple(GISTSTATE *giststate, - Relation r, Datum *attdata, int *datumsize, bool *isnull); + + +typedef struct PageLayout { + gistxlogPage block; + OffsetNumber *list; + Buffer buffer; /* to write after all proceed */ + + struct PageLayout *next; +} PageLayout; + + +#define ROTATEDIST(d) do { \ + PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \ + memset(tmp,0,sizeof(PageLayout)); \ + tmp->next = (d); \ + (d)=tmp; \ +} while(0) + + static IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup, int *len, + PageLayout **dist, GISTSTATE *giststate); -static void gistnewroot(Relation r, IndexTuple *itup, int len); -static void GISTInitBuffer(Buffer b, uint32 f); -static OffsetNumber gistchoose(Relation r, Page p, - IndexTuple it, - GISTSTATE *giststate); -static void gistdelete(Relation r, ItemPointer tid); - -#ifdef GIST_PAGEADDITEM -static IndexTuple gist_tuple_replacekey(Relation r, - GISTENTRY entry, IndexTuple t); -#endif -static void gistcentryinit(GISTSTATE *giststate, int nkey, - GISTENTRY *e, Datum k, - Relation r, Page pg, - OffsetNumber o, int b, bool l, bool isNull); -static void gistDeCompressAtt(GISTSTATE *giststate, Relation r, - IndexTuple tuple, Page p, OffsetNumber o, - GISTENTRY *attdata, bool *isnull); -static void gistpenalty(GISTSTATE *giststate, int attno, - GISTENTRY *key1, bool isNull1, - GISTENTRY *key2, bool isNull2, - float *penalty); + #undef GISTDEBUG @@ -191,6 +126,26 @@ gistbuild(PG_FUNCTION_ARGS) /* initialize the root page */ buffer = ReadBuffer(index, P_NEW); GISTInitBuffer(buffer, F_LEAF); + if ( !index->rd_istemp ) { + XLogRecPtr recptr; + XLogRecData rdata; + Page page; + + rdata.buffer = InvalidBuffer; + rdata.data = (char*)&(index->rd_node); + rdata.len = sizeof(RelFileNode); + rdata.next = NULL; + + page = BufferGetPage(buffer); + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + END_CRIT_SECTION(); + } WriteBuffer(buffer); /* build the index */ @@ -340,51 +295,6 @@ gistinsert(PG_FUNCTION_ARGS) PG_RETURN_BOOL(true); } -#ifdef GIST_PAGEADDITEM -/* - * Take a compressed entry, and install it on a page. Since we now know - * where the entry will live, we decompress it and recompress it using - * that knowledge (some compression routines may want to fish around - * on the page, for example, or do something special for leaf nodes.) - */ -static OffsetNumber -gistPageAddItem(GISTSTATE *giststate, - Relation r, - Page page, - Item item, - Size size, - OffsetNumber offsetNumber, - ItemIdFlags flags, - GISTENTRY *dentry, - IndexTuple *newtup) -{ - GISTENTRY tmpcentry; - IndexTuple itup = (IndexTuple) item; - OffsetNumber retval; - Datum datum; - bool IsNull; - - /* - * recompress the item given that we now know the exact page and - * offset for insertion - */ - datum = index_getattr(itup, 1, r->rd_att, &IsNull); - gistdentryinit(giststate, 0, dentry, datum, - (Relation) 0, (Page) 0, - (OffsetNumber) InvalidOffsetNumber, - ATTSIZE(datum, r, 1, IsNull), - FALSE, IsNull); - gistcentryinit(giststate, 0, &tmpcentry, dentry->key, r, page, - offsetNumber, dentry->bytes, FALSE); - *newtup = gist_tuple_replacekey(r, tmpcentry, itup); - retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup), - offsetNumber, flags); - if (retval == InvalidOffsetNumber) - elog(ERROR, "failed to add index item to \"%s\"", - RelationGetRelationName(r)); - return retval; -} -#endif /* * Workhouse routine for doing insertion into a GiST index. Note that @@ -394,706 +304,365 @@ gistPageAddItem(GISTSTATE *giststate, static void gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) { - IndexTuple *instup; - int ret, - len = 1; + GISTInsertState state; - instup = (IndexTuple *) palloc(sizeof(IndexTuple)); - instup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); - memcpy(instup[0], itup, IndexTupleSize(itup)); + memset(&state, 0, sizeof(GISTInsertState)); - ret = gistlayerinsert(r, GIST_ROOT_BLKNO, &instup, &len, giststate); - if (ret & SPLITED) - gistnewroot(r, instup, len); -} + state.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); + state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); + memcpy(state.itup[0], itup, IndexTupleSize(itup)); + state.ituplen=1; + state.r = r; + state.key = itup->t_tid; + state.needInsertComplete = true; + state.xlog_mode = false; -static int -gistlayerinsert(Relation r, BlockNumber blkno, - IndexTuple **itup, /* in - out, has compressed entry */ - int *len, /* in - out */ - GISTSTATE *giststate) -{ - Buffer buffer; - Page page; - int ret; - GISTPageOpaque opaque; + state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack)); + memset( state.stack, 0, sizeof(GISTInsertStack)); + state.stack->blkno=GIST_ROOT_BLKNO; - buffer = ReadBuffer(r, blkno); - page = (Page) BufferGetPage(buffer); - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + gistfindleaf(&state, giststate); + gistmakedeal(&state, giststate); +} - if (!(opaque->flags & F_LEAF)) +static bool +gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { + bool is_splitted = false; + + if (gistnospace(state->stack->page, state->itup, state->ituplen)) { - /* - * This is an internal page, so continue to walk down the - * tree. We find the child node that has the minimum insertion - * penalty and recursively invoke ourselves to modify that - * node. Once the recursive call returns, we may need to - * adjust the parent node for two reasons: the child node - * split, or the key in this node needs to be adjusted for the - * newly inserted key below us. - */ - ItemId iid; - BlockNumber nblkno; - ItemPointerData oldtid; - IndexTuple oldtup; - OffsetNumber child; - - child = gistchoose(r, page, *(*itup), giststate); - iid = PageGetItemId(page, child); - oldtup = (IndexTuple) PageGetItem(page, iid); - nblkno = ItemPointerGetBlockNumber(&(oldtup->t_tid)); + /* no space for insertion */ + IndexTuple *itvec, + *newitup; + int tlen,olen; + PageLayout *dist=NULL, *ptr; + + memset(&dist, 0, sizeof(PageLayout)); + is_splitted = true; + itvec = gistextractbuffer(state->stack->buffer, &tlen); + olen=tlen; + itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); + newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate); + + if ( !state->r->rd_istemp && !state->xlog_mode) { + gistxlogPageSplit xlrec; + XLogRecPtr recptr; + XLogRecData *rdata; + int i, npage = 0, cur=1; + + ptr=dist; + while( ptr ) { + npage++; + ptr=ptr->next; + } - /* - * After this call: 1. if child page was splited, then itup - * contains keys for each page 2. if child page wasn't splited, - * then itup contains additional for adjustment of current key - */ - ret = gistlayerinsert(r, nblkno, itup, len, giststate); + rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2)); + + xlrec.node = state->r->rd_node; + xlrec.origblkno = state->stack->blkno; + xlrec.npage = npage; + xlrec.nitup = state->ituplen; + xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber; + xlrec.key = state->key; + xlrec.pathlen = (uint16)state->pathlen; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogPageSplit ); + rdata[0].next = NULL; + + if ( state->pathlen>=0 ) { + rdata[0].next = &(rdata[1]); + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) (state->path); + rdata[1].len = sizeof( BlockNumber ) * state->pathlen; + rdata[1].next = NULL; + cur++; + } + + /* new tuples */ + for(i=0;i<state->ituplen;i++) { + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)(state->itup[i]); + rdata[cur].len = IndexTupleSize(state->itup[i]); + rdata[cur-1].next = &(rdata[cur]); + cur++; + } - /* nothing inserted in child */ - if (!(ret & INSERTED)) - { - ReleaseBuffer(buffer); - return 0x00; - } + /* new page layout */ + ptr=dist; + while(ptr) { + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)&(ptr->block); + rdata[cur].len = sizeof(gistxlogPage); + rdata[cur-1].next = &(rdata[cur]); + cur++; + + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)(ptr->list); + rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num); + if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num ) + rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len ); + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].next=NULL; + cur++; + + ptr=ptr->next; + } - /* child did not split */ - if (!(ret & SPLITED)) - { - IndexTuple newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate); + START_CRIT_SECTION(); - if (!newtup) - { - /* not need to update key */ - ReleaseBuffer(buffer); - return 0x00; + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); + ptr = dist; + while(ptr) { + PageSetLSN(BufferGetPage(ptr->buffer), recptr); + PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); + ptr=ptr->next; } - (*itup)[0] = newtup; + END_CRIT_SECTION(); } - /* - * This node's key has been modified, either because a child - * split occurred or because we needed to adjust our key for - * an insert in a child node. Therefore, remove the old - * version of this node's key. - */ - ItemPointerSet(&oldtid, blkno, child); - gistdelete(r, &oldtid); - - /* - * if child was splitted, new key for child will be inserted in - * the end list of child, so we must say to any scans that page is - * changed beginning from 'child' offset - */ - if (ret & SPLITED) - gistadjscans(r, GISTOP_SPLIT, blkno, child); - } + ptr = dist; + while(ptr) { + WriteBuffer(ptr->buffer); + ptr=ptr->next; + } - ret = INSERTED; + state->itup = newitup; + state->ituplen = tlen; /* now tlen >= 2 */ - if (gistnospace(page, (*itup), *len)) - { - /* no space for insertion */ - IndexTuple *itvec, - *newitup; - int tlen, - oldlen; - - ret |= SPLITED; - itvec = gistreadbuffer(buffer, &tlen); - itvec = gistjoinvector(itvec, &tlen, (*itup), *len); - oldlen = *len; - newitup = gistSplit(r, buffer, itvec, &tlen, giststate); - ReleaseBuffer(buffer); - *itup = newitup; - *len = tlen; /* now tlen >= 2 */ + if ( state->stack->blkno == GIST_ROOT_BLKNO ) { + gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode); + state->needInsertComplete=false; + } + if ( state->xlog_mode ) + LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(state->stack->buffer); } else { /* enough space */ - OffsetNumber off, - l; + OffsetNumber off, l; - off = (PageIsEmpty(page)) ? + off = (PageIsEmpty(state->stack->page)) ? FirstOffsetNumber : - OffsetNumberNext(PageGetMaxOffsetNumber(page)); - l = gistwritebuffer(r, page, *itup, *len, off); - WriteBuffer(buffer); + OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page)); + l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off); + if ( !state->r->rd_istemp && !state->xlog_mode) { + gistxlogEntryUpdate xlrec; + XLogRecPtr recptr; + XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) ); + int i, cur=0; + + xlrec.node = state->r->rd_node; + xlrec.blkno = state->stack->blkno; + xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber; + xlrec.key = state->key; + xlrec.pathlen = (uint16)state->pathlen; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogEntryUpdate ); + rdata[0].next = NULL; + + if ( state->pathlen>=0 ) { + rdata[0].next = &(rdata[1]); + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) (state->path); + rdata[1].len = sizeof( BlockNumber ) * state->pathlen; + rdata[1].next = NULL; + cur++; + } + + for(i=1; i<=state->ituplen; i++) { /* adding tuples */ + rdata[i+cur].buffer = InvalidBuffer; + rdata[i+cur].data = (char*)(state->itup[i-1]); + rdata[i+cur].len = IndexTupleSize(state->itup[i-1]); + rdata[i+cur].next = NULL; + rdata[i-1+cur].next = &(rdata[i+cur]); + } + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); + PageSetLSN(state->stack->page, recptr); + PageSetTLI(state->stack->page, ThisTimeLineID); + + END_CRIT_SECTION(); + } + + if ( state->stack->blkno == GIST_ROOT_BLKNO ) + state->needInsertComplete=false; - if (*len > 1) - { /* previous insert ret & SPLITED != 0 */ + if ( state->xlog_mode ) + LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(state->stack->buffer); + + if (state->ituplen > 1) + { /* previous is_splitted==true */ /* * child was splited, so we must form union for insertion in * parent */ - IndexTuple newtup = gistunion(r, (*itup), *len, giststate); - ItemPointerSet(&(newtup->t_tid), blkno, 1); - (*itup)[0] = newtup; - *len = 1; + IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); + ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber); + state->itup[0] = newtup; + state->ituplen = 1; } } - - return ret; -} - -/* - * Write itup vector to page, has no control of free space - */ -static OffsetNumber -gistwritebuffer(Relation r, Page page, IndexTuple *itup, - int len, OffsetNumber off) -{ - OffsetNumber l = InvalidOffsetNumber; - int i; - - for (i = 0; i < len; i++) - { -#ifdef GIST_PAGEADDITEM - GISTENTRY tmpdentry; - IndexTuple newtup; - bool IsNull; - - l = gistPageAddItem(giststate, r, page, - (Item) itup[i], IndexTupleSize(itup[i]), - off, LP_USED, &tmpdentry, &newtup); - off = OffsetNumberNext(off); -#else - l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]), - off, LP_USED); - if (l == InvalidOffsetNumber) - elog(ERROR, "failed to add index item to \"%s\"", - RelationGetRelationName(r)); -#endif - } - return l; + return is_splitted; } -/* - * Check space for itup vector on page - */ -static bool -gistnospace(Page page, IndexTuple *itvec, int len) -{ - unsigned int size = 0; - int i; - - for (i = 0; i < len; i++) - size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); - - return (PageGetFreeSpace(page) < size); -} - -/* - * Read buffer into itup vector - */ -static IndexTuple * -gistreadbuffer(Buffer buffer, int *len /* out */ ) -{ - OffsetNumber i, - maxoff; - IndexTuple *itvec; - Page p = (Page) BufferGetPage(buffer); - - maxoff = PageGetMaxOffsetNumber(p); - *len = maxoff; - itvec = palloc(sizeof(IndexTuple) * maxoff); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - - return itvec; -} - -/* - * join two vectors into one - */ -static IndexTuple * -gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen) -{ - itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen)); - memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen); - *len += addlen; - return itvec; -} - -/* - * Return an IndexTuple containing the result of applying the "union" - * method to the specified IndexTuple vector. - */ -static IndexTuple -gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) +static void +gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) { - Datum attr[INDEX_MAX_KEYS]; - bool isnull[INDEX_MAX_KEYS]; - GistEntryVector *evec; - int i; - GISTENTRY centry[INDEX_MAX_KEYS]; - - evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); - - for (i = 0; i < r->rd_att->natts; i++) - { - Datum datum; - int j; - int real_len; + ItemId iid; + IndexTuple oldtup; + GISTInsertStack *ptr; - real_len = 0; - for (j = 0; j < len; j++) - { - bool IsNull; - datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); - if (IsNull) - continue; - - gistdentryinit(giststate, i, - &(evec->vector[real_len]), - datum, - NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), - FALSE, IsNull); - real_len++; - } + /* walk down */ + while( true ) { + GISTPageOpaque opaque; - /* If this tuple vector was all NULLs, the union is NULL */ - if (real_len == 0) - { - attr[i] = (Datum) 0; - isnull[i] = TRUE; - } - else + state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); + state->stack->page = (Page) BufferGetPage(state->stack->buffer); + opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page); + + if (!(opaque->flags & F_LEAF)) { - int datumsize; - - if (real_len == 1) - { - evec->n = 2; - gistentryinit(evec->vector[1], - evec->vector[0].key, r, NULL, - (OffsetNumber) 0, evec->vector[0].bytes, FALSE); - } - else - evec->n = real_len; - - /* Compress the result of the union and store in attr array */ - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - gistcentryinit(giststate, i, ¢ry[i], datum, - NULL, NULL, (OffsetNumber) 0, - datumsize, FALSE, FALSE); - isnull[i] = FALSE; - attr[i] = centry[i].key; - } + /* + * This is an internal page, so continue to walk down the + * tree. We find the child node that has the minimum insertion + * penalty and recursively invoke ourselves to modify that + * node. Once the recursive call returns, we may need to + * adjust the parent node for two reasons: the child node + * split, or the key in this node needs to be adjusted for the + * newly inserted key below us. + */ + GISTInsertStack *item=(GISTInsertStack*)palloc(sizeof(GISTInsertStack)); + + state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate); + + iid = PageGetItemId(state->stack->page, state->stack->childoffnum); + oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); + item->blkno = ItemPointerGetBlockNumber(&(oldtup->t_tid)); + item->parent = state->stack; + item->todelete = false; + state->stack = item; + } else + break; } - return index_form_tuple(giststate->tupdesc, attr, isnull); -} - - -/* - * Forms union of oldtup and addtup, if union == oldtup then return NULL - */ -static IndexTuple -gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate) -{ - GistEntryVector *evec; - bool neednew = false; - bool isnull[INDEX_MAX_KEYS]; - Datum attr[INDEX_MAX_KEYS]; - GISTENTRY centry[INDEX_MAX_KEYS], - oldatt[INDEX_MAX_KEYS], - addatt[INDEX_MAX_KEYS], - *ev0p, - *ev1p; - bool oldisnull[INDEX_MAX_KEYS], - addisnull[INDEX_MAX_KEYS]; - IndexTuple newtup = NULL; - int i; - - evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); - evec->n = 2; - ev0p = &(evec->vector[0]); - ev1p = &(evec->vector[1]); - - gistDeCompressAtt(giststate, r, oldtup, NULL, - (OffsetNumber) 0, oldatt, oldisnull); - - gistDeCompressAtt(giststate, r, addtup, NULL, - (OffsetNumber) 0, addatt, addisnull); - - for (i = 0; i < r->rd_att->natts; i++) - { - if (oldisnull[i] && addisnull[i]) - { - attr[i] = (Datum) 0; - isnull[i] = TRUE; - } - else - { - Datum datum; - int datumsize; - - FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes, - addisnull[i], addatt[i].key, addatt[i].bytes); - - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - if (oldisnull[i] || addisnull[i]) - { - if (oldisnull[i]) - neednew = true; - } - else - { - bool result; - - FunctionCall3(&giststate->equalFn[i], - ev0p->key, - datum, - PointerGetDatum(&result)); + /* now state->stack->(page, buffer and blkno) points to leaf page, so insert */ - if (!result) - neednew = true; - } - - gistcentryinit(giststate, i, ¢ry[i], datum, - NULL, NULL, (OffsetNumber) 0, - datumsize, FALSE, FALSE); - - attr[i] = centry[i].key; - isnull[i] = FALSE; - } + /* form state->path to work xlog */ + ptr = state->stack; + state->pathlen=1; + while( ptr ) { + state->pathlen++; + ptr=ptr->parent; } - - if (neednew) - { - /* need to update key */ - newtup = index_form_tuple(giststate->tupdesc, attr, isnull); - newtup->t_tid = oldtup->t_tid; + state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen); + ptr = state->stack; + state->pathlen=0; + while( ptr ) { + state->path[ state->pathlen ] = ptr->blkno; + state->pathlen++; + ptr=ptr->parent; } - - return newtup; + state->pathlen--; + state->path++; } -static void -gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) -{ - int lr; - - for (lr = 0; lr < 2; lr++) - { - OffsetNumber *entries; - int i; - Datum *attr; - int len, - *attrsize; - bool *isnull; - GistEntryVector *evec; - - if (lr) - { - attrsize = spl->spl_lattrsize; - attr = spl->spl_lattr; - len = spl->spl_nleft; - entries = spl->spl_left; - isnull = spl->spl_lisnull; - } - else - { - attrsize = spl->spl_rattrsize; - attr = spl->spl_rattr; - len = spl->spl_nright; - entries = spl->spl_right; - isnull = spl->spl_risnull; - } - - evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); - - for (i = 1; i < r->rd_att->natts; i++) - { - int j; - Datum datum; - int datumsize; - int real_len; - - real_len = 0; - for (j = 0; j < len; j++) - { - bool IsNull; - - if (spl->spl_idgrp[entries[j]]) - continue; - datum = index_getattr(itvec[entries[j] - 1], i + 1, - giststate->tupdesc, &IsNull); - if (IsNull) - continue; - gistdentryinit(giststate, i, - &(evec->vector[real_len]), - datum, - NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), - FALSE, IsNull); - real_len++; - - } - if (real_len == 0) - { - datum = (Datum) 0; - datumsize = 0; - isnull[i] = true; - } - else - { - /* - * evec->vector[0].bytes may be not defined, so form union - * with itself - */ - if (real_len == 1) - { - evec->n = 2; - memcpy(&(evec->vector[1]), &(evec->vector[0]), - sizeof(GISTENTRY)); - } - else - evec->n = real_len; - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - isnull[i] = false; - } +void +gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { + int is_splitted; + ItemId iid; + IndexTuple oldtup, newtup; - attr[i] = datum; - attrsize[i] = datumsize; - } - } -} + /* walk up */ + while( true ) { + /* + * After this call: 1. if child page was splited, then itup + * contains keys for each page 2. if child page wasn't splited, + * then itup contains additional for adjustment of current key + */ -/* - * find group in vector with equal value - */ -static int -gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) -{ - int i; - int curid = 1; - - /* - * first key is always not null (see gistinsert), so we may not check - * for nulls - */ - for (i = 0; i < spl->spl_nleft; i++) - { - int j; - int len; - bool result; - - if (spl->spl_idgrp[spl->spl_left[i]]) - continue; - len = 0; - /* find all equal value in right part */ - for (j = 0; j < spl->spl_nright; j++) - { - if (spl->spl_idgrp[spl->spl_right[j]]) - continue; - FunctionCall3(&giststate->equalFn[0], - valvec[spl->spl_left[i]].key, - valvec[spl->spl_right[j]].key, - PointerGetDatum(&result)); - if (result) - { - spl->spl_idgrp[spl->spl_right[j]] = curid; - len++; - } - } - /* find all other equal value in left part */ - if (len) - { - /* add current val to list of equal values */ - spl->spl_idgrp[spl->spl_left[i]] = curid; - /* searching .. */ - for (j = i + 1; j < spl->spl_nleft; j++) - { - if (spl->spl_idgrp[spl->spl_left[j]]) - continue; - FunctionCall3(&giststate->equalFn[0], - valvec[spl->spl_left[i]].key, - valvec[spl->spl_left[j]].key, - PointerGetDatum(&result)); - if (result) - { - spl->spl_idgrp[spl->spl_left[j]] = curid; - len++; - } - } - spl->spl_ngrp[curid] = len + 1; - curid++; - } - } + is_splitted = gistplacetopage(state, giststate ); - return curid; -} + /* pop page from stack */ + state->stack = state->stack->parent; + state->pathlen--; + state->path++; + + /* stack is void */ + if ( ! state->stack ) + break; -/* - * Insert equivalent tuples to left or right page with minimum - * penalty - */ -static void -gistadjsubkey(Relation r, - IndexTuple *itup, /* contains compressed entry */ - int *len, - GIST_SPLITVEC *v, - GISTSTATE *giststate) -{ - int curlen; - OffsetNumber *curwpos; - GISTENTRY entry, - identry[INDEX_MAX_KEYS], - *ev0p, - *ev1p; - float lpenalty, - rpenalty; - GistEntryVector *evec; - int datumsize; - bool isnull[INDEX_MAX_KEYS]; - int i, - j; - /* clear vectors */ - curlen = v->spl_nleft; - curwpos = v->spl_left; - for (i = 0; i < v->spl_nleft; i++) - { - if (v->spl_idgrp[v->spl_left[i]] == 0) + /* child did not split */ + if (!is_splitted) { - *curwpos = v->spl_left[i]; - curwpos++; - } - else - curlen--; - } - v->spl_nleft = curlen; + /* parent's tuple */ + iid = PageGetItemId(state->stack->page, state->stack->childoffnum); + oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); + newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate); + + if (!newtup) /* not need to update key */ + break; - curlen = v->spl_nright; - curwpos = v->spl_right; - for (i = 0; i < v->spl_nright; i++) - { - if (v->spl_idgrp[v->spl_right[i]] == 0) - { - *curwpos = v->spl_right[i]; - curwpos++; + state->itup[0] = newtup; } - else - curlen--; + + /* + * This node's key has been modified, either because a child + * split occurred or because we needed to adjust our key for + * an insert in a child node. Therefore, remove the old + * version of this node's key. + */ + gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum); + PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); + if ( !state->r->rd_istemp ) + state->stack->todelete = true; + + /* + * if child was splitted, new key for child will be inserted in + * the end list of child, so we must say to any scans that page is + * changed beginning from 'child' offset + */ + if (is_splitted) + gistadjscans(state->r, GISTOP_SPLIT, state->stack->blkno, state->stack->childoffnum); + } /* while */ + + /* release all buffers */ + while( state->stack ) { + if ( state->xlog_mode ) + LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(state->stack->buffer); + state->stack = state->stack->parent; } - v->spl_nright = curlen; - evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); - evec->n = 2; - ev0p = &(evec->vector[0]); - ev1p = &(evec->vector[1]); + /* say to xlog that insert is completed */ + if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) { + gistxlogInsertComplete xlrec; + XLogRecData rdata; + + xlrec.node = state->r->rd_node; + xlrec.key = state->key; + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) &xlrec; + rdata.len = sizeof( gistxlogInsertComplete ); + rdata.next = NULL; - /* add equivalent tuple */ - for (i = 0; i < *len; i++) - { - Datum datum; + START_CRIT_SECTION(); - if (v->spl_idgrp[i + 1] == 0) /* already inserted */ - continue; - gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0, - identry, isnull); + XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata); - v->spl_ngrp[v->spl_idgrp[i + 1]]--; - if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 && - (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED) - { - /* force last in group */ - rpenalty = 1.0; - lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0; - } - else - { - /* where? */ - for (j = 1; j < r->rd_att->natts; j++) - { - gistentryinit(entry, v->spl_lattr[j], r, NULL, - (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); - gistpenalty(giststate, j, &entry, v->spl_lisnull[j], - &identry[j], isnull[j], &lpenalty); - - gistentryinit(entry, v->spl_rattr[j], r, NULL, - (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); - gistpenalty(giststate, j, &entry, v->spl_risnull[j], - &identry[j], isnull[j], &rpenalty); - - if (lpenalty != rpenalty) - break; - } - } - - /* - * add - * XXX: refactor this to avoid duplicating code - */ - if (lpenalty < rpenalty) - { - v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED; - v->spl_left[v->spl_nleft] = i + 1; - v->spl_nleft++; - for (j = 1; j < r->rd_att->natts; j++) - { - if (isnull[j] && v->spl_lisnull[j]) - { - v->spl_lattr[j] = (Datum) 0; - v->spl_lattrsize[j] = 0; - } - else - { - FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j], - isnull[j], identry[j].key, identry[j].bytes); - - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - v->spl_lattr[j] = datum; - v->spl_lattrsize[j] = datumsize; - v->spl_lisnull[j] = false; - } - } - } - else - { - v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED; - v->spl_right[v->spl_nright] = i + 1; - v->spl_nright++; - for (j = 1; j < r->rd_att->natts; j++) - { - if (isnull[j] && v->spl_risnull[j]) - { - v->spl_rattr[j] = (Datum) 0; - v->spl_rattrsize[j] = 0; - } - else - { - FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j], - isnull[j], identry[j].key, identry[j].bytes); - - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - v->spl_rattr[j] = datum; - v->spl_rattrsize[j] = datumsize; - v->spl_risnull[j] = false; - } - } - } + END_CRIT_SECTION(); } } @@ -1105,6 +674,7 @@ gistSplit(Relation r, Buffer buffer, IndexTuple *itup, /* contains compressed entry */ int *len, + PageLayout **dist, GISTSTATE *giststate) { Page p; @@ -1225,29 +795,57 @@ gistSplit(Relation r, /* write on disk (may need another split) */ if (gistnospace(right, rvectup, v.spl_nright)) { + int i; + PageLayout *d, *origd=*dist; + nlen = v.spl_nright; - newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate); + newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate); + /* XLOG stuff */ + d=*dist; + /* translate offsetnumbers to our */ + while( d && d!=origd ) { + for(i=0;i<d->block.num;i++) + d->list[i] = v.spl_right[ d->list[i]-1 ]; + d=d->next; + } ReleaseBuffer(rightbuf); } else { OffsetNumber l; - l = gistwritebuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber); - WriteBuffer(rightbuf); - + l = gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber); + /* XLOG stuff */ + ROTATEDIST(*dist); + (*dist)->block.blkno = BufferGetBlockNumber(rightbuf); + (*dist)->block.num = v.spl_nright; + (*dist)->list = v.spl_right; + (*dist)->buffer = rightbuf; + nlen = 1; newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1); newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull); - ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1); + ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber); } if (gistnospace(left, lvectup, v.spl_nleft)) { int llen = v.spl_nleft; IndexTuple *lntup; - - lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate); + int i; + PageLayout *d, *origd=*dist; + + lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate); + + /* XLOG stuff */ + d=*dist; + /* translate offsetnumbers to our */ + while( d && d!=origd ) { + for(i=0;i<d->block.num;i++) + d->list[i] = v.spl_left[ d->list[i]-1 ]; + d=d->next; + } + ReleaseBuffer(leftbuf); newtup = gistjoinvector(newtup, &nlen, lntup, llen); @@ -1256,151 +854,76 @@ gistSplit(Relation r, { OffsetNumber l; - l = gistwritebuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber); + l = gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber); if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO) PageRestoreTempPage(left, p); - WriteBuffer(leftbuf); - + /* XLOG stuff */ + ROTATEDIST(*dist); + (*dist)->block.blkno = BufferGetBlockNumber(leftbuf); + (*dist)->block.num = v.spl_nleft; + (*dist)->list = v.spl_left; + (*dist)->buffer = leftbuf; + nlen += 1; newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen); newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull); - ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1); + ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber); } *len = nlen; return newtup; } -static void -gistnewroot(Relation r, IndexTuple *itup, int len) -{ - Buffer b; - Page p; - - b = ReadBuffer(r, GIST_ROOT_BLKNO); - GISTInitBuffer(b, 0); - p = BufferGetPage(b); - - gistwritebuffer(r, p, itup, len, FirstOffsetNumber); - WriteBuffer(b); -} - -static void -GISTInitBuffer(Buffer b, uint32 f) +void +gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode) { - GISTPageOpaque opaque; + Buffer buffer; Page page; - Size pageSize; - - pageSize = BufferGetPageSize(b); - page = BufferGetPage(b); - PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); - - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); - opaque->flags = f; -} - -/* - * find entry with lowest penalty - */ -static OffsetNumber -gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ - GISTSTATE *giststate) -{ - OffsetNumber maxoff; - OffsetNumber i; - OffsetNumber which; - float sum_grow, - which_grow[INDEX_MAX_KEYS]; - GISTENTRY entry, - identry[INDEX_MAX_KEYS]; - bool isnull[INDEX_MAX_KEYS]; - - maxoff = PageGetMaxOffsetNumber(p); - *which_grow = -1.0; - which = -1; - sum_grow = 1; - gistDeCompressAtt(giststate, r, - it, NULL, (OffsetNumber) 0, - identry, isnull); - - for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) - { - int j; - IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - sum_grow = 0; - for (j = 0; j < r->rd_att->natts; j++) - { - Datum datum; - float usize; - bool IsNull; - - datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); - gistdentryinit(giststate, j, &entry, datum, r, p, i, - ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), - FALSE, IsNull); - gistpenalty(giststate, j, &entry, IsNull, - &identry[j], isnull[j], &usize); - - if (which_grow[j] < 0 || usize < which_grow[j]) - { - which = i; - which_grow[j] = usize; - if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber) - which_grow[j + 1] = -1; - sum_grow += which_grow[j]; - } - else if (which_grow[j] == usize) - sum_grow += usize; - else - { - sum_grow = 1; - break; - } - } + buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO); + GISTInitBuffer(buffer, 0); + page = BufferGetPage(buffer); + + gistfillbuffer(r, page, itup, len, FirstOffsetNumber); + if ( !xlog_mode && !r->rd_istemp ) { + gistxlogEntryUpdate xlrec; + XLogRecPtr recptr; + XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) ); + int i; + + xlrec.node = r->rd_node; + xlrec.blkno = GIST_ROOT_BLKNO; + xlrec.todeleteoffnum = InvalidOffsetNumber; + xlrec.key = *key; + xlrec.pathlen=0; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogEntryUpdate ); + rdata[0].next = NULL; + + for(i=1; i<=len; i++) { + rdata[i].buffer = InvalidBuffer; + rdata[i].data = (char*)(itup[i-1]); + rdata[i].len = IndexTupleSize(itup[i-1]); + rdata[i].next = NULL; + rdata[i-1].next = &(rdata[i]); + } + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + END_CRIT_SECTION(); } - - return which; + if ( xlog_mode ) + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); } -/* - * Retail deletion of a single tuple. - * - * NB: this is no longer called externally, but is still needed by - * gistlayerinsert(). That dependency will have to be fixed if GIST - * is ever going to allow concurrent insertions. - */ -static void -gistdelete(Relation r, ItemPointer tid) -{ - BlockNumber blkno; - OffsetNumber offnum; - Buffer buf; - Page page; - - /* - * Since GIST is not marked "amconcurrent" in pg_am, caller should - * have acquired exclusive lock on index relation. We need no locking - * here. - */ - - blkno = ItemPointerGetBlockNumber(tid); - offnum = ItemPointerGetOffsetNumber(tid); - - /* adjust any scans that will be affected by this deletion */ - /* NB: this works only for scans in *this* backend! */ - gistadjscans(r, GISTOP_DEL, blkno, offnum); - - /* delete the index tuple */ - buf = ReadBuffer(r, blkno); - page = BufferGetPage(buf); - - PageIndexTupleDelete(page, offnum); - - WriteBuffer(buf); -} /* * Bulk deletion of all index entries pointing to a set of heap tuples. @@ -1462,6 +985,30 @@ gistbulkdelete(PG_FUNCTION_ARGS) page = BufferGetPage(buf); PageIndexTupleDelete(page, offnum); + if ( !rel->rd_istemp ) { + gistxlogEntryUpdate xlrec; + XLogRecPtr recptr; + XLogRecData rdata; + + xlrec.node = rel->rd_node; + xlrec.blkno = blkno; + xlrec.todeleteoffnum = offnum; + xlrec.pathlen=0; + ItemPointerSetInvalid( &(xlrec.key) ); + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) &xlrec; + rdata.len = sizeof( gistxlogEntryUpdate ); + rdata.next = NULL; + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + END_CRIT_SECTION(); + } WriteBuffer(buf); @@ -1527,159 +1074,6 @@ freeGISTstate(GISTSTATE *giststate) /* no work */ } -#ifdef GIST_PAGEADDITEM -/* - * Given an IndexTuple to be inserted on a page, this routine replaces - * the key with another key, which may involve generating a new IndexTuple - * if the sizes don't match or if the null status changes. - * - * XXX this only works for a single-column index tuple! - */ -static IndexTuple -gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) -{ - bool IsNull; - Datum datum = index_getattr(t, 1, r->rd_att, &IsNull); - - /* - * If new entry fits in index tuple, copy it in. To avoid worrying - * about null-value bitmask, pass it off to the general - * index_form_tuple routine if either the previous or new value is - * NULL. - */ - if (!IsNull && DatumGetPointer(entry.key) != NULL && - (Size) entry.bytes <= ATTSIZE(datum, r, 1, IsNull)) - { - memcpy(DatumGetPointer(datum), - DatumGetPointer(entry.key), - entry.bytes); - /* clear out old size */ - t->t_info &= ~INDEX_SIZE_MASK; - /* or in new size */ - t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData)); - - return t; - } - else - { - /* generate a new index tuple for the compressed entry */ - TupleDesc tupDesc = r->rd_att; - IndexTuple newtup; - bool isnull; - - isnull = (DatumGetPointer(entry.key) == NULL); - newtup = index_form_tuple(tupDesc, &(entry.key), &isnull); - newtup->t_tid = t->t_tid; - return newtup; - } -} -#endif - -/* - * initialize a GiST entry with a decompressed version of key - */ -void -gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, - Datum k, Relation r, Page pg, OffsetNumber o, - int b, bool l, bool isNull) -{ - if (b && !isNull) - { - GISTENTRY *dep; - - gistentryinit(*e, k, r, pg, o, b, l); - dep = (GISTENTRY *) - DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey], - PointerGetDatum(e))); - /* decompressFn may just return the given pointer */ - if (dep != e) - gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, - dep->bytes, dep->leafkey); - } - else - gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); -} - - -/* - * initialize a GiST entry with a compressed version of key - */ -static void -gistcentryinit(GISTSTATE *giststate, int nkey, - GISTENTRY *e, Datum k, Relation r, - Page pg, OffsetNumber o, int b, bool l, bool isNull) -{ - if (!isNull) - { - GISTENTRY *cep; - - gistentryinit(*e, k, r, pg, o, b, l); - cep = (GISTENTRY *) - DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], - PointerGetDatum(e))); - /* compressFn may just return the given pointer */ - if (cep != e) - gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, - cep->bytes, cep->leafkey); - } - else - gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); -} - -static IndexTuple -gistFormTuple(GISTSTATE *giststate, Relation r, - Datum attdata[], int datumsize[], bool isnull[]) -{ - GISTENTRY centry[INDEX_MAX_KEYS]; - Datum compatt[INDEX_MAX_KEYS]; - int i; - - for (i = 0; i < r->rd_att->natts; i++) - { - if (isnull[i]) - compatt[i] = (Datum) 0; - else - { - gistcentryinit(giststate, i, ¢ry[i], attdata[i], - NULL, NULL, (OffsetNumber) 0, - datumsize[i], FALSE, FALSE); - compatt[i] = centry[i].key; - } - } - - return index_form_tuple(giststate->tupdesc, compatt, isnull); -} - -static void -gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, - OffsetNumber o, GISTENTRY *attdata, bool *isnull) -{ - int i; - - for (i = 0; i < r->rd_att->natts; i++) - { - Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); - gistdentryinit(giststate, i, &attdata[i], - datum, r, p, o, - ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]), - FALSE, isnull[i]); - } -} - -static void -gistpenalty(GISTSTATE *giststate, int attno, - GISTENTRY *key1, bool isNull1, - GISTENTRY *key2, bool isNull2, float *penalty) -{ - if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2)) - *penalty = 0.0; - else - FunctionCall3(&giststate->penaltyFn[attno], - PointerGetDatum(key1), - PointerGetDatum(key2), - PointerGetDatum(penalty)); -} - #ifdef GISTDEBUG static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) @@ -1726,13 +1120,3 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) } #endif /* defined GISTDEBUG */ -void -gist_redo(XLogRecPtr lsn, XLogRecord *record) -{ - elog(PANIC, "gist_redo: unimplemented"); -} - -void -gist_desc(char *buf, uint8 xl_info, char *rec) -{ -} diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index e5f77b67926261e4f6c6cfc03123e1aa5995f159..5b9a94471b10816aeebc07c54ab601d924d2c3b3 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,14 +8,14 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.47 2005/05/17 03:34:18 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.48 2005/06/14 11:45:13 teodor Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" -#include "access/gist_private.h" #include "access/itup.h" +#include "access/gist_private.h" #include "executor/execdebug.h" #include "utils/memutils.h" diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c new file mode 100644 index 0000000000000000000000000000000000000000..44391f9f738b820a0b51c37d894e868fb4778149 --- /dev/null +++ b/src/backend/access/gist/gistutil.c @@ -0,0 +1,785 @@ +/*------------------------------------------------------------------------- + * + * gistutil.c + * utilities routines for the postgres GiST index access method. + * + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.1 2005/06/14 11:45:13 teodor Exp $ + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/genam.h" +#include "access/gist_private.h" +#include "access/gistscan.h" +#include "access/heapam.h" +#include "catalog/index.h" +#include "miscadmin.h" + +/* group flags ( in gistadjsubkey ) */ +#define LEFT_ADDED 0x01 +#define RIGHT_ADDED 0x02 +#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED ) + + +/* + * This defines is only for shorter code, used in gistgetadjusted + * and gistadjsubkey only + */ +#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \ + if (isnullkey) { \ + gistentryinit((evp), rkey, r, NULL, \ + (OffsetNumber) 0, rkeyb, FALSE); \ + } else { \ + gistentryinit((evp), okey, r, NULL, \ + (OffsetNumber) 0, okeyb, FALSE); \ + } \ +} while(0) + +#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \ + FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \ + FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \ +} while(0); + + +static void +gistpenalty(GISTSTATE *giststate, int attno, + GISTENTRY *key1, bool isNull1, + GISTENTRY *key2, bool isNull2, float *penalty); + +/* + * Write itup vector to page, has no control of free space + */ +OffsetNumber +gistfillbuffer(Relation r, Page page, IndexTuple *itup, + int len, OffsetNumber off) +{ + OffsetNumber l = InvalidOffsetNumber; + int i; + + for (i = 0; i < len; i++) + { + l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]), + off, LP_USED); + if (l == InvalidOffsetNumber) + elog(ERROR, "gistfillbuffer: failed to add index item to \"%s\"", + RelationGetRelationName(r)); + off++; + } + return l; +} + +/* + * Check space for itup vector on page + */ +bool +gistnospace(Page page, IndexTuple *itvec, int len) +{ + unsigned int size = 0; + int i; + + for (i = 0; i < len; i++) + size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); + + return (PageGetFreeSpace(page) < size); +} + +/* + * Read buffer into itup vector + */ +IndexTuple * +gistextractbuffer(Buffer buffer, int *len /* out */ ) +{ + OffsetNumber i, + maxoff; + IndexTuple *itvec; + Page p = (Page) BufferGetPage(buffer); + + maxoff = PageGetMaxOffsetNumber(p); + *len = maxoff; + itvec = palloc(sizeof(IndexTuple) * maxoff); + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); + + return itvec; +} + +/* + * join two vectors into one + */ +IndexTuple * +gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen) +{ + itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen)); + memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen); + *len += addlen; + return itvec; +} + +/* + * Return an IndexTuple containing the result of applying the "union" + * method to the specified IndexTuple vector. + */ +IndexTuple +gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) +{ + Datum attr[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + GistEntryVector *evec; + int i; + GISTENTRY centry[INDEX_MAX_KEYS]; + + evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); + + for (i = 0; i < r->rd_att->natts; i++) + { + Datum datum; + int j; + int real_len; + + real_len = 0; + for (j = 0; j < len; j++) + { + bool IsNull; + datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); + if (IsNull) + continue; + + gistdentryinit(giststate, i, + &(evec->vector[real_len]), + datum, + NULL, NULL, (OffsetNumber) 0, + ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), + FALSE, IsNull); + real_len++; + } + + /* If this tuple vector was all NULLs, the union is NULL */ + if (real_len == 0) + { + attr[i] = (Datum) 0; + isnull[i] = TRUE; + } + else + { + int datumsize; + + if (real_len == 1) + { + evec->n = 2; + gistentryinit(evec->vector[1], + evec->vector[0].key, r, NULL, + (OffsetNumber) 0, evec->vector[0].bytes, FALSE); + } + else + evec->n = real_len; + + /* Compress the result of the union and store in attr array */ + datum = FunctionCall2(&giststate->unionFn[i], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + gistcentryinit(giststate, i, ¢ry[i], datum, + NULL, NULL, (OffsetNumber) 0, + datumsize, FALSE, FALSE); + isnull[i] = FALSE; + attr[i] = centry[i].key; + } + } + + return index_form_tuple(giststate->tupdesc, attr, isnull); +} + + +/* + * Forms union of oldtup and addtup, if union == oldtup then return NULL + */ +IndexTuple +gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate) +{ + GistEntryVector *evec; + bool neednew = false; + bool isnull[INDEX_MAX_KEYS]; + Datum attr[INDEX_MAX_KEYS]; + GISTENTRY centry[INDEX_MAX_KEYS], + oldatt[INDEX_MAX_KEYS], + addatt[INDEX_MAX_KEYS], + *ev0p, + *ev1p; + bool oldisnull[INDEX_MAX_KEYS], + addisnull[INDEX_MAX_KEYS]; + IndexTuple newtup = NULL; + int i; + + evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); + evec->n = 2; + ev0p = &(evec->vector[0]); + ev1p = &(evec->vector[1]); + + gistDeCompressAtt(giststate, r, oldtup, NULL, + (OffsetNumber) 0, oldatt, oldisnull); + + gistDeCompressAtt(giststate, r, addtup, NULL, + (OffsetNumber) 0, addatt, addisnull); + + for (i = 0; i < r->rd_att->natts; i++) + { + if (oldisnull[i] && addisnull[i]) + { + attr[i] = (Datum) 0; + isnull[i] = TRUE; + } + else + { + Datum datum; + int datumsize; + + FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes, + addisnull[i], addatt[i].key, addatt[i].bytes); + + datum = FunctionCall2(&giststate->unionFn[i], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + if (oldisnull[i] || addisnull[i]) + { + if (oldisnull[i]) + neednew = true; + } + else + { + bool result; + + FunctionCall3(&giststate->equalFn[i], + ev0p->key, + datum, + PointerGetDatum(&result)); + + if (!result) + neednew = true; + } + + gistcentryinit(giststate, i, ¢ry[i], datum, + NULL, NULL, (OffsetNumber) 0, + datumsize, FALSE, FALSE); + + attr[i] = centry[i].key; + isnull[i] = FALSE; + } + } + + if (neednew) + { + /* need to update key */ + newtup = index_form_tuple(giststate->tupdesc, attr, isnull); + newtup->t_tid = oldtup->t_tid; + } + + return newtup; +} + +void +gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) +{ + int lr; + + for (lr = 0; lr < 2; lr++) + { + OffsetNumber *entries; + int i; + Datum *attr; + int len, + *attrsize; + bool *isnull; + GistEntryVector *evec; + + if (lr) + { + attrsize = spl->spl_lattrsize; + attr = spl->spl_lattr; + len = spl->spl_nleft; + entries = spl->spl_left; + isnull = spl->spl_lisnull; + } + else + { + attrsize = spl->spl_rattrsize; + attr = spl->spl_rattr; + len = spl->spl_nright; + entries = spl->spl_right; + isnull = spl->spl_risnull; + } + + evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); + + for (i = 1; i < r->rd_att->natts; i++) + { + int j; + Datum datum; + int datumsize; + int real_len; + + real_len = 0; + for (j = 0; j < len; j++) + { + bool IsNull; + + if (spl->spl_idgrp[entries[j]]) + continue; + datum = index_getattr(itvec[entries[j] - 1], i + 1, + giststate->tupdesc, &IsNull); + if (IsNull) + continue; + gistdentryinit(giststate, i, + &(evec->vector[real_len]), + datum, + NULL, NULL, (OffsetNumber) 0, + ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), + FALSE, IsNull); + real_len++; + + } + + if (real_len == 0) + { + datum = (Datum) 0; + datumsize = 0; + isnull[i] = true; + } + else + { + /* + * evec->vector[0].bytes may be not defined, so form union + * with itself + */ + if (real_len == 1) + { + evec->n = 2; + memcpy(&(evec->vector[1]), &(evec->vector[0]), + sizeof(GISTENTRY)); + } + else + evec->n = real_len; + datum = FunctionCall2(&giststate->unionFn[i], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + isnull[i] = false; + } + + attr[i] = datum; + attrsize[i] = datumsize; + } + } +} + +/* + * find group in vector with equal value + */ +int +gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) +{ + int i; + int curid = 1; + + /* + * first key is always not null (see gistinsert), so we may not check + * for nulls + */ + for (i = 0; i < spl->spl_nleft; i++) + { + int j; + int len; + bool result; + + if (spl->spl_idgrp[spl->spl_left[i]]) + continue; + len = 0; + /* find all equal value in right part */ + for (j = 0; j < spl->spl_nright; j++) + { + if (spl->spl_idgrp[spl->spl_right[j]]) + continue; + FunctionCall3(&giststate->equalFn[0], + valvec[spl->spl_left[i]].key, + valvec[spl->spl_right[j]].key, + PointerGetDatum(&result)); + if (result) + { + spl->spl_idgrp[spl->spl_right[j]] = curid; + len++; + } + } + /* find all other equal value in left part */ + if (len) + { + /* add current val to list of equal values */ + spl->spl_idgrp[spl->spl_left[i]] = curid; + /* searching .. */ + for (j = i + 1; j < spl->spl_nleft; j++) + { + if (spl->spl_idgrp[spl->spl_left[j]]) + continue; + FunctionCall3(&giststate->equalFn[0], + valvec[spl->spl_left[i]].key, + valvec[spl->spl_left[j]].key, + PointerGetDatum(&result)); + if (result) + { + spl->spl_idgrp[spl->spl_left[j]] = curid; + len++; + } + } + spl->spl_ngrp[curid] = len + 1; + curid++; + } + } + + return curid; +} + +/* + * Insert equivalent tuples to left or right page with minimum + * penalty + */ +void +gistadjsubkey(Relation r, + IndexTuple *itup, /* contains compressed entry */ + int *len, + GIST_SPLITVEC *v, + GISTSTATE *giststate) +{ + int curlen; + OffsetNumber *curwpos; + GISTENTRY entry, + identry[INDEX_MAX_KEYS], + *ev0p, + *ev1p; + float lpenalty, + rpenalty; + GistEntryVector *evec; + int datumsize; + bool isnull[INDEX_MAX_KEYS]; + int i, + j; + + /* clear vectors */ + curlen = v->spl_nleft; + curwpos = v->spl_left; + for (i = 0; i < v->spl_nleft; i++) + { + if (v->spl_idgrp[v->spl_left[i]] == 0) + { + *curwpos = v->spl_left[i]; + curwpos++; + } + else + curlen--; + } + v->spl_nleft = curlen; + + curlen = v->spl_nright; + curwpos = v->spl_right; + for (i = 0; i < v->spl_nright; i++) + { + if (v->spl_idgrp[v->spl_right[i]] == 0) + { + *curwpos = v->spl_right[i]; + curwpos++; + } + else + curlen--; + } + v->spl_nright = curlen; + + evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); + evec->n = 2; + ev0p = &(evec->vector[0]); + ev1p = &(evec->vector[1]); + + /* add equivalent tuple */ + for (i = 0; i < *len; i++) + { + Datum datum; + + if (v->spl_idgrp[i + 1] == 0) /* already inserted */ + continue; + gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0, + identry, isnull); + + v->spl_ngrp[v->spl_idgrp[i + 1]]--; + if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 && + (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED) + { + /* force last in group */ + rpenalty = 1.0; + lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0; + } + else + { + /* where? */ + for (j = 1; j < r->rd_att->natts; j++) + { + gistentryinit(entry, v->spl_lattr[j], r, NULL, + (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); + gistpenalty(giststate, j, &entry, v->spl_lisnull[j], + &identry[j], isnull[j], &lpenalty); + + gistentryinit(entry, v->spl_rattr[j], r, NULL, + (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); + gistpenalty(giststate, j, &entry, v->spl_risnull[j], + &identry[j], isnull[j], &rpenalty); + + if (lpenalty != rpenalty) + break; + } + } + + /* + * add + * XXX: refactor this to avoid duplicating code + */ + if (lpenalty < rpenalty) + { + v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED; + v->spl_left[v->spl_nleft] = i + 1; + v->spl_nleft++; + for (j = 1; j < r->rd_att->natts; j++) + { + if (isnull[j] && v->spl_lisnull[j]) + { + v->spl_lattr[j] = (Datum) 0; + v->spl_lattrsize[j] = 0; + } + else + { + FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j], + isnull[j], identry[j].key, identry[j].bytes); + + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + v->spl_lattr[j] = datum; + v->spl_lattrsize[j] = datumsize; + v->spl_lisnull[j] = false; + } + } + } + else + { + v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED; + v->spl_right[v->spl_nright] = i + 1; + v->spl_nright++; + for (j = 1; j < r->rd_att->natts; j++) + { + if (isnull[j] && v->spl_risnull[j]) + { + v->spl_rattr[j] = (Datum) 0; + v->spl_rattrsize[j] = 0; + } + else + { + FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j], + isnull[j], identry[j].key, identry[j].bytes); + + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + v->spl_rattr[j] = datum; + v->spl_rattrsize[j] = datumsize; + v->spl_risnull[j] = false; + } + } + } + } +} + +/* + * find entry with lowest penalty + */ +OffsetNumber +gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ + GISTSTATE *giststate) +{ + OffsetNumber maxoff; + OffsetNumber i; + OffsetNumber which; + float sum_grow, + which_grow[INDEX_MAX_KEYS]; + GISTENTRY entry, + identry[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + + maxoff = PageGetMaxOffsetNumber(p); + *which_grow = -1.0; + which = -1; + sum_grow = 1; + gistDeCompressAtt(giststate, r, + it, NULL, (OffsetNumber) 0, + identry, isnull); + + for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) + { + int j; + IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); + + sum_grow = 0; + for (j = 0; j < r->rd_att->natts; j++) + { + Datum datum; + float usize; + bool IsNull; + + datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); + gistdentryinit(giststate, j, &entry, datum, r, p, i, + ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), + FALSE, IsNull); + gistpenalty(giststate, j, &entry, IsNull, + &identry[j], isnull[j], &usize); + + if (which_grow[j] < 0 || usize < which_grow[j]) + { + which = i; + which_grow[j] = usize; + if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber) + which_grow[j + 1] = -1; + sum_grow += which_grow[j]; + } + else if (which_grow[j] == usize) + sum_grow += usize; + else + { + sum_grow = 1; + break; + } + } + } + + return which; +} + +/* + * initialize a GiST entry with a decompressed version of key + */ +void +gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, + Datum k, Relation r, Page pg, OffsetNumber o, + int b, bool l, bool isNull) +{ + if (b && !isNull) + { + GISTENTRY *dep; + + gistentryinit(*e, k, r, pg, o, b, l); + dep = (GISTENTRY *) + DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey], + PointerGetDatum(e))); + /* decompressFn may just return the given pointer */ + if (dep != e) + gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, + dep->bytes, dep->leafkey); + } + else + gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); +} + + +/* + * initialize a GiST entry with a compressed version of key + */ +void +gistcentryinit(GISTSTATE *giststate, int nkey, + GISTENTRY *e, Datum k, Relation r, + Page pg, OffsetNumber o, int b, bool l, bool isNull) +{ + if (!isNull) + { + GISTENTRY *cep; + + gistentryinit(*e, k, r, pg, o, b, l); + cep = (GISTENTRY *) + DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], + PointerGetDatum(e))); + /* compressFn may just return the given pointer */ + if (cep != e) + gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, + cep->bytes, cep->leafkey); + } + else + gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); +} + +IndexTuple +gistFormTuple(GISTSTATE *giststate, Relation r, + Datum attdata[], int datumsize[], bool isnull[]) +{ + GISTENTRY centry[INDEX_MAX_KEYS]; + Datum compatt[INDEX_MAX_KEYS]; + int i; + + for (i = 0; i < r->rd_att->natts; i++) + { + if (isnull[i]) + compatt[i] = (Datum) 0; + else + { + gistcentryinit(giststate, i, ¢ry[i], attdata[i], + NULL, NULL, (OffsetNumber) 0, + datumsize[i], FALSE, FALSE); + compatt[i] = centry[i].key; + } + } + + return index_form_tuple(giststate->tupdesc, compatt, isnull); +} + +void +gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, + OffsetNumber o, GISTENTRY *attdata, bool *isnull) +{ + int i; + + for (i = 0; i < r->rd_att->natts; i++) + { + Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); + gistdentryinit(giststate, i, &attdata[i], + datum, r, p, o, + ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]), + FALSE, isnull[i]); + } +} + +static void +gistpenalty(GISTSTATE *giststate, int attno, + GISTENTRY *key1, bool isNull1, + GISTENTRY *key2, bool isNull2, float *penalty) +{ + if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2)) + *penalty = 0.0; + else + FunctionCall3(&giststate->penaltyFn[attno], + PointerGetDatum(key1), + PointerGetDatum(key2), + PointerGetDatum(penalty)); +} + +void +GISTInitBuffer(Buffer b, uint32 f) +{ + GISTPageOpaque opaque; + Page page; + Size pageSize; + + pageSize = BufferGetPageSize(b); + page = BufferGetPage(b); + PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); + + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + opaque->flags = f; +} + diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c new file mode 100644 index 0000000000000000000000000000000000000000..b99ab24761d9b042a4da27a0f71e0c0965d98c92 --- /dev/null +++ b/src/backend/access/gist/gistxlog.c @@ -0,0 +1,628 @@ +/*------------------------------------------------------------------------- + * + * gistxlog.c + * WAL replay logic for GiST. + * + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $ + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/genam.h" +#include "access/gist_private.h" +#include "access/gistscan.h" +#include "access/heapam.h" +#include "catalog/index.h" +#include "commands/vacuum.h" +#include "miscadmin.h" +#include "utils/memutils.h" + +typedef struct { + gistxlogEntryUpdate *data; + int len; + IndexTuple *itup; + BlockNumber *path; +} EntryUpdateRecord; + +typedef struct { + gistxlogPage *header; + OffsetNumber *offnum; + + /* to work with */ + Page page; + Buffer buffer; + bool is_ok; +} NewPage; + +typedef struct { + gistxlogPageSplit *data; + NewPage *page; + IndexTuple *itup; + BlockNumber *path; +} PageSplitRecord; + +/* track for incomplete inserts, idea was taken from nbtxlog.c */ + +typedef struct gistIncompleteInsert { + RelFileNode node; + ItemPointerData key; + int lenblk; + BlockNumber *blkno; + int pathlen; + BlockNumber *path; +} gistIncompleteInsert; + + +MemoryContext opCtx; +MemoryContext insertCtx; +static List *incomplete_inserts; + + +#define ItemPointerEQ( a, b ) \ + ( \ + ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \ + ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \ + ) + +static void +pushIncompleteInsert(RelFileNode node, ItemPointerData key, + BlockNumber *blkno, int lenblk, + BlockNumber *path, int pathlen, + PageSplitRecord *xlinfo /* to extract blkno info */ ) { + MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); + gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) ); + + ninsert->node = node; + ninsert->key = key; + + if ( lenblk && blkno ) { + ninsert->lenblk = lenblk; + ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk ); + memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk); + } else { + int i; + + Assert( xlinfo ); + ninsert->lenblk = xlinfo->data->npage; + ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk ); + for(i=0;i<ninsert->lenblk;i++) + ninsert->blkno[i] = xlinfo->page[i].header->blkno; + } + Assert( ninsert->lenblk>0 ); + + if ( path && ninsert->pathlen ) { + ninsert->pathlen = pathlen; + ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen ); + memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen); + } else { + ninsert->pathlen = 0; + ninsert->path = NULL; + } + + incomplete_inserts = lappend(incomplete_inserts, ninsert); + MemoryContextSwitchTo(oldCxt); +} + +static void +forgetIncompleteInsert(RelFileNode node, ItemPointerData key) { + ListCell *l; + + foreach(l, incomplete_inserts) { + gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l); + + if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) { + + /* found */ + if ( insert->path ) pfree( insert->path ); + pfree( insert->blkno ); + incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); + pfree( insert ); + break; + } + } +} + +static void +decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) { + char *begin = XLogRecGetData(record), *ptr; + int i=0, addpath=0; + + decoded->data = (gistxlogEntryUpdate*)begin; + + if ( decoded->data->pathlen ) { + addpath = sizeof(BlockNumber) * decoded->data->pathlen; + decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate )); + } else + decoded->path = NULL; + + decoded->len=0; + ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath; + while( ptr - begin < record->xl_len ) { + decoded->len++; + ptr += IndexTupleSize( (IndexTuple)ptr ); + } + + decoded->itup=(IndexTuple*)palloc( sizeof( IndexTuple ) * decoded->len ); + + ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath; + while( ptr - begin < record->xl_len ) { + decoded->itup[i] = (IndexTuple)ptr; + ptr += IndexTupleSize( decoded->itup[i] ); + i++; + } +} + + +static void +gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { + EntryUpdateRecord xlrec; + Relation reln; + Buffer buffer; + Page page; + + decodeEntryUpdateRecord( &xlrec, record ); + + reln = XLogOpenRelation(xlrec.data->node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer(false, reln, xlrec.data->blkno); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistRedoEntryUpdateRecord: block unfound"); + page = (Page) BufferGetPage(buffer); + + if ( isnewroot ) { + if ( !PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)) ) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + } else { + if ( PageIsNew((PageHeader) page) ) + elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page"); + if (XLByteLE(lsn, PageGetLSN(page))) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + } + + if ( isnewroot ) + GISTInitBuffer(buffer, 0); + else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) + PageIndexTupleDelete(page, xlrec.data->todeleteoffnum); + + /* add tuples */ + if ( xlrec.len > 0 ) { + OffsetNumber off = (PageIsEmpty(page)) ? + FirstOffsetNumber + : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); + + gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off); + } + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + + if ( ItemPointerIsValid( &(xlrec.data->key) ) ) { + if ( incomplete_inserts != NIL ) + forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); + + if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO ) + pushIncompleteInsert(xlrec.data->node, xlrec.data->key, + &(xlrec.data->blkno), 1, + xlrec.path, xlrec.data->pathlen, + NULL); + } +} + +static void +decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { + char *begin = XLogRecGetData(record), *ptr; + int i=0, addpath = 0; + + decoded->data = (gistxlogPageSplit*)begin; + decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage ); + decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup ); + + if ( decoded->data->pathlen ) { + addpath = sizeof(BlockNumber) * decoded->data->pathlen; + decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate )); + } else + decoded->path = NULL; + + ptr=begin+sizeof( gistxlogPageSplit ) + addpath; + for(i=0;i<decoded->data->nitup;i++) { + Assert( ptr - begin < record->xl_len ); + decoded->itup[i] = (IndexTuple)ptr; + ptr += IndexTupleSize( decoded->itup[i] ); + } + + for(i=0;i<decoded->data->npage;i++) { + Assert( ptr - begin < record->xl_len ); + decoded->page[i].header = (gistxlogPage*)ptr; + ptr += sizeof(gistxlogPage); + + Assert( ptr - begin < record->xl_len ); + decoded->page[i].offnum = (OffsetNumber*)ptr; + ptr += MAXALIGN( sizeof(OffsetNumber) * decoded->page[i].header->num ); + } +} + +static void +gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { + PageSplitRecord xlrec; + Relation reln; + Buffer buffer; + Page page; + int i, len=0; + IndexTuple *itup, *institup; + GISTPageOpaque opaque; + bool release=true; + + decodePageSplitRecord( &xlrec, record ); + + reln = XLogOpenRelation(xlrec.data->node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer( false, reln, xlrec.data->origblkno); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistRedoEntryUpdateRecord: block unfound"); + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page"); + + if (XLByteLE(lsn, PageGetLSN(page))) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + + if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) + PageIndexTupleDelete(page, xlrec.data->todeleteoffnum); + + itup = gistextractbuffer(buffer, &len); + itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup); + institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len ); + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + + for(i=0;i<xlrec.data->npage;i++) { + int j; + NewPage *newpage = xlrec.page + i; + + /* prepare itup vector */ + for(j=0;j<newpage->header->num;j++) + institup[j] = itup[ newpage->offnum[j] - 1 ]; + + if ( newpage->header->blkno == xlrec.data->origblkno ) { + /* IncrBufferRefCount(buffer); */ + newpage->page = (Page) PageGetTempPage(page, sizeof(GISTPageOpaqueData)); + newpage->buffer = buffer; + newpage->is_ok=false; + } else { + newpage->buffer = XLogReadBuffer(true, reln, newpage->header->blkno); + if (!BufferIsValid(newpage->buffer)) + elog(PANIC, "gistRedoPageSplitRecord: lost page"); + newpage->page = (Page) BufferGetPage(newpage->buffer); + if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + newpage->is_ok=true; + continue; /* good page */ + } else { + newpage->is_ok=false; + GISTInitBuffer(newpage->buffer, opaque->flags & F_LEAF); + } + } + gistfillbuffer(reln, newpage->page, institup, newpage->header->num, FirstOffsetNumber); + } + + for(i=0;i<xlrec.data->npage;i++) { + NewPage *newpage = xlrec.page + i; + + if ( newpage->is_ok ) + continue; + + if ( newpage->header->blkno == xlrec.data->origblkno ) { + PageRestoreTempPage(newpage->page, page); + release = false; + } + + PageSetLSN(newpage->page, lsn); + PageSetTLI(newpage->page, ThisTimeLineID); + LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(newpage->buffer); + } + + if ( release ) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + } + + if ( ItemPointerIsValid( &(xlrec.data->key) ) ) { + if ( incomplete_inserts != NIL ) + forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); + + pushIncompleteInsert(xlrec.data->node, xlrec.data->key, + NULL, 0, + xlrec.path, xlrec.data->pathlen, + &xlrec); + } +} + +static void +gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) { + RelFileNode *node = (RelFileNode*)XLogRecGetData(record); + Relation reln; + Buffer buffer; + Page page; + + reln = XLogOpenRelation(*node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer( true, reln, GIST_ROOT_BLKNO); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistRedoCreateIndex: block unfound"); + page = (Page) BufferGetPage(buffer); + + if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + + GISTInitBuffer(buffer, F_LEAF); + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); +} + +void +gist_redo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + MemoryContext oldCxt; + oldCxt = MemoryContextSwitchTo(opCtx); + switch (info) { + case XLOG_GIST_ENTRY_UPDATE: + case XLOG_GIST_ENTRY_DELETE: + gistRedoEntryUpdateRecord(lsn, record,false); + break; + case XLOG_GIST_NEW_ROOT: + gistRedoEntryUpdateRecord(lsn, record,true); + break; + case XLOG_GIST_PAGE_SPLIT: + gistRedoPageSplitRecord(lsn, record); + break; + case XLOG_GIST_CREATE_INDEX: + gistRedoCreateIndex(lsn, record); + break; + case XLOG_GIST_INSERT_COMPLETE: + forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node, + ((gistxlogInsertComplete*)XLogRecGetData(record))->key ); + break; + default: + elog(PANIC, "gist_redo: unknown op code %u", info); + } + + MemoryContextSwitchTo(oldCxt); + MemoryContextReset(opCtx); +} + +static void +out_target(char *buf, RelFileNode node, ItemPointerData key) +{ + sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u", + node.spcNode, node.dbNode, node.relNode, + ItemPointerGetBlockNumber(&key), + ItemPointerGetOffsetNumber(&key)); +} + +static void +out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) { + out_target(buf, xlrec->node, xlrec->key); + sprintf(buf + strlen(buf), "; block number %u; update offset %u;", + xlrec->blkno, xlrec->todeleteoffnum); +} + +static void +out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) { + strcat(buf, "page_split: "); + out_target(buf, xlrec->node, xlrec->key); + sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages", + xlrec->origblkno, xlrec->todeleteoffnum, + xlrec->nitup, xlrec->npage); +} + +void +gist_desc(char *buf, uint8 xl_info, char *rec) +{ + uint8 info = xl_info & ~XLR_INFO_MASK; + + switch (info) { + case XLOG_GIST_ENTRY_UPDATE: + strcat(buf, "entry_update: "); + out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec); + break; + case XLOG_GIST_ENTRY_DELETE: + strcat(buf, "entry_delete: "); + out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec); + break; + case XLOG_GIST_NEW_ROOT: + strcat(buf, "new_root: "); + out_target(buf, ((gistxlogEntryUpdate*)rec)->node, ((gistxlogEntryUpdate*)rec)->key); + break; + case XLOG_GIST_PAGE_SPLIT: + out_gistxlogPageSplit(buf, (gistxlogPageSplit*)rec); + break; + case XLOG_GIST_CREATE_INDEX: + sprintf(buf + strlen(buf), "create_index: rel %u/%u/%u", + ((RelFileNode*)rec)->spcNode, + ((RelFileNode*)rec)->dbNode, + ((RelFileNode*)rec)->relNode); + break; + case XLOG_GIST_INSERT_COMPLETE: + strcat(buf, "insert_complete: "); + out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key); + default: + elog(PANIC, "gist_desc: unknown op code %u", info); + } +} + + +#ifdef GIST_INCOMPLETE_INSERT +static void +gistContinueInsert(gistIncompleteInsert *insert) { + GISTSTATE giststate; + GISTInsertState state; + int i; + MemoryContext oldCxt; + oldCxt = MemoryContextSwitchTo(opCtx); + + state.r = XLogOpenRelation(insert->node); + if (!RelationIsValid(state.r)) + return; + + initGISTstate(&giststate, state.r); + + state.needInsertComplete=false; + ItemPointerSetInvalid( &(state.key) ); + state.path=NULL; + state.pathlen=0; + state.xlog_mode = true; + + /* form union tuples */ + state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk); + state.ituplen = insert->lenblk; + for(i=0;i<insert->lenblk;i++) { + int len=0; + IndexTuple *itup; + Buffer buffer; + Page page; + + buffer = XLogReadBuffer(false, state.r, insert->blkno[i]); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistContinueInsert: block unfound"); + page = (Page) BufferGetPage(buffer); + if ( PageIsNew((PageHeader)page) ) + elog(PANIC, "gistContinueInsert: uninitialized page"); + + itup = gistextractbuffer(buffer, &len); + state.itup[i] = gistunion(state.r, itup, len, &giststate); + + ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber ); + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + } + + if ( insert->pathlen==0 ) { + /*it was split root, so we should only make new root*/ + gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true); + MemoryContextSwitchTo(oldCxt); + MemoryContextReset(opCtx); + return; + } + + /* form stack */ + state.stack=NULL; + for(i=0;i<insert->pathlen;i++) { + int j,len=0; + IndexTuple *itup; + GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) ); + + top->blkno = insert->path[i]; + top->buffer = XLogReadBuffer(false, state.r, top->blkno); + if (!BufferIsValid(top->buffer)) + elog(PANIC, "gistContinueInsert: block unfound"); + top->page = (Page) BufferGetPage(top->buffer); + if ( PageIsNew((PageHeader)(top->page)) ) + elog(PANIC, "gistContinueInsert: uninitialized page"); + + top->todelete = false; + + /* find childoffnum */ + itup = gistextractbuffer(top->buffer, &len); + top->childoffnum=InvalidOffsetNumber; + for(j=0;j<len && top->childoffnum==InvalidOffsetNumber;j++) { + BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) ); + + if ( i==0 ) { + int k; + for(k=0;k<insert->lenblk;k++) + if ( insert->blkno[k] == blkno ) { + top->childoffnum = j+1; + break; + } + } else if ( insert->path[i-1]==blkno ) + top->childoffnum = j+1; + } + + if ( top->childoffnum==InvalidOffsetNumber ) { + elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes"); + return; + } + + if ( i==0 ) + PageIndexTupleDelete(top->page, top->childoffnum); + + /* install item on right place in stack */ + top->parent=NULL; + if ( state.stack ) { + GISTInsertStack *ptr = state.stack; + while( ptr->parent ) + ptr = ptr->parent; + ptr->parent=top; + } else + state.stack = top; + } + + /* Good. Now we can continue insert */ + + gistmakedeal(&state, &giststate); + + MemoryContextSwitchTo(oldCxt); + MemoryContextReset(opCtx); +} +#endif + +void +gist_xlog_startup(void) { + incomplete_inserts=NIL; + insertCtx = AllocSetContextCreate(CurrentMemoryContext, + "GiST insert in xlog temporary context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + opCtx = createTempGistContext(); +} + +void +gist_xlog_cleanup(void) { + ListCell *l; + + foreach(l, incomplete_inserts) { + gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l); + char buf[1024]; + + *buf='\0'; + out_target(buf, insert->node, insert->key); + elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf); +#ifdef GIST_INCOMPLETE_INSERT + gistContinueInsert(insert); +#endif + } + MemoryContextDelete(opCtx); + MemoryContextDelete(insertCtx); +} + diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 5fe442fd80d05cb9dce964d7b32c13b0446cb939..3b8bcb131d0be70ea21c4cf290710ff7ff300c02 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -3,7 +3,7 @@ * * Resource managers definition * - * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.19 2005/06/08 15:50:26 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.20 2005/06/14 11:45:14 teodor Exp $ */ #include "postgres.h" @@ -37,6 +37,6 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = { {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup}, {"Hash", hash_redo, hash_desc, NULL, NULL}, {"Rtree", rtree_redo, rtree_desc, NULL, NULL}, - {"Gist", gist_redo, gist_desc, NULL, NULL}, + {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup}, {"Sequence", seq_redo, seq_desc, NULL, NULL} }; diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index db255a7b6f548f60670996352e3869cf07d8c6eb..479f221176b00f9fa7fd8e6367bc5dbee017d013 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -7,15 +7,17 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.2 2005/06/06 17:01:24 tgl Exp $ + * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.3 2005/06/14 11:45:14 teodor Exp $ * *------------------------------------------------------------------------- */ #ifndef GIST_PRIVATE_H #define GIST_PRIVATE_H +#include "access/itup.h" #include "access/gist.h" #include "access/xlog.h" +#include "access/xlogdefs.h" #include "fmgr.h" /* @@ -66,6 +68,42 @@ typedef struct GISTScanOpaqueData typedef GISTScanOpaqueData *GISTScanOpaque; +/* + * GISTInsertStack used for locking buffers and transfer arguments during + * insertion + */ + +typedef struct GISTInsertStack { + /* current page */ + BlockNumber blkno; + Buffer buffer; + Page page; + + /* child's offset */ + OffsetNumber childoffnum; + + /* pointer to parent */ + struct GISTInsertStack *parent; + + bool todelete; +} GISTInsertStack; + +typedef struct { + Relation r; + IndexTuple *itup; /* in/out, points to compressed entry */ + int ituplen; /* length of itup */ + GISTInsertStack *stack; + bool needInsertComplete; + bool xlog_mode; + + /* pointer to heap tuple */ + ItemPointerData key; + + /* path to stroe in XLog */ + BlockNumber *path; + int pathlen; +} GISTInsertState; + /* * When we're doing a scan and updating a tree at the same time, the * updates may affect the scan. We use the flags entry of the scan's @@ -89,6 +127,72 @@ typedef GISTScanOpaqueData *GISTScanOpaque; #define GISTOP_DEL 0 #define GISTOP_SPLIT 1 +#define ATTSIZE(datum, tupdesc, i, isnull) \ + ( \ + (isnull) ? 0 : \ + att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \ + ) + +/* XLog stuff */ +#define XLOG_GIST_ENTRY_UPDATE 0x00 +#define XLOG_GIST_ENTRY_DELETE 0x10 +#define XLOG_GIST_NEW_ROOT 0x20 + +typedef struct gistxlogEntryUpdate { + RelFileNode node; + BlockNumber blkno; + + /* if todeleteoffnum!=InvalidOffsetNumber then delete it. */ + OffsetNumber todeleteoffnum; + uint16 pathlen; + + /* + * It used to identify compliteness of insert. + * Sets to leaf itup + */ + ItemPointerData key; + + /* follow: + * 1. path to root (BlockNumber) + * 2. tuples to insert + */ +} gistxlogEntryUpdate; + +#define XLOG_GIST_PAGE_SPLIT 0x30 + +typedef struct gistxlogPageSplit { + RelFileNode node; + BlockNumber origblkno; /*splitted page*/ + OffsetNumber todeleteoffnum; + uint16 pathlen; + int npage; + int nitup; + + /* see comments on gistxlogEntryUpdate */ + ItemPointerData key; + + /* follow: + * 1. path to root (BlockNumber) + * 2. tuples to insert + * 3. gistxlogPage and array of OffsetNumber per page + */ +} gistxlogPageSplit; + +typedef struct gistxlogPage { + BlockNumber blkno; + int num; +} gistxlogPage; + + +#define XLOG_GIST_INSERT_COMPLETE 0x40 + +typedef struct gistxlogInsertComplete { + RelFileNode node; + ItemPointerData key; +} gistxlogInsertComplete; + +#define XLOG_GIST_CREATE_INDEX 0x50 + /* gist.c */ extern Datum gistbuild(PG_FUNCTION_ARGS); extern Datum gistinsert(PG_FUNCTION_ARGS); @@ -96,15 +200,57 @@ extern Datum gistbulkdelete(PG_FUNCTION_ARGS); extern MemoryContext createTempGistContext(void); extern void initGISTstate(GISTSTATE *giststate, Relation index); extern void freeGISTstate(GISTSTATE *giststate); -extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, - Datum k, Relation r, Page pg, OffsetNumber o, - int b, bool l, bool isNull); +extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode); +extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate); +/* gistxlog.c */ extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_desc(char *buf, uint8 xl_info, char *rec); +extern void gist_xlog_startup(void); +extern void gist_xlog_cleanup(void); /* gistget.c */ extern Datum gistgettuple(PG_FUNCTION_ARGS); extern Datum gistgetmulti(PG_FUNCTION_ARGS); +/* gistutil.c */ +extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup, + int len, OffsetNumber off); +extern bool gistnospace(Page page, IndexTuple *itvec, int len); +extern IndexTuple * gistextractbuffer(Buffer buffer, int *len /* out */ ); +extern IndexTuple * gistjoinvector( + IndexTuple *itvec, int *len, + IndexTuple *additvec, int addlen); +extern IndexTuple gistunion(Relation r, IndexTuple *itvec, + int len, GISTSTATE *giststate); +extern IndexTuple gistgetadjusted(Relation r, + IndexTuple oldtup, + IndexTuple addtup, + GISTSTATE *giststate); +extern int gistfindgroup(GISTSTATE *giststate, + GISTENTRY *valvec, GIST_SPLITVEC *spl); +extern void gistadjsubkey(Relation r, + IndexTuple *itup, int *len, + GIST_SPLITVEC *v, + GISTSTATE *giststate); +extern IndexTuple gistFormTuple(GISTSTATE *giststate, + Relation r, Datum *attdata, int *datumsize, bool *isnull); + +extern OffsetNumber gistchoose(Relation r, Page p, + IndexTuple it, + GISTSTATE *giststate); +extern void gistcentryinit(GISTSTATE *giststate, int nkey, + GISTENTRY *e, Datum k, + Relation r, Page pg, + OffsetNumber o, int b, bool l, bool isNull); +extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, + IndexTuple tuple, Page p, OffsetNumber o, + GISTENTRY *attdata, bool *isnull); +extern void gistunionsubkey(Relation r, GISTSTATE *giststate, + IndexTuple *itvec, GIST_SPLITVEC *spl); +extern void GISTInitBuffer(Buffer b, uint32 f); +extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, + Datum k, Relation r, Page pg, OffsetNumber o, + int b, bool l, bool isNull); + #endif /* GIST_PRIVATE_H */