From 14f6b387b11841b18db12dd89983c4b98dd38ee4 Mon Sep 17 00:00:00 2001
From: "Vadim B. Mikheev" <vadim4o@yahoo.com>
Date: Mon, 24 Mar 1997 08:48:16 +0000
Subject: [PATCH] + NULLs handling 	Actually required by multi-column
 indices support. 	We still don't use btree for 'A is (not) null', but 
 now btree keep items with NULL attrs using single rule 	for
 placing/finding items on pages: 	NULLs greater NOT_NULLs and NULL =
 NULL. + Bulkload code (nbtsort.c) support for multi-column indices 
 building and NULLs. + Fix for btendscan()->pfree(scanopaque) from Chris
 Dunlop.

---
 src/backend/access/nbtree/nbtinsert.c | 120 +++++++++++++++++++--
 src/backend/access/nbtree/nbtree.c    |  28 +++--
 src/backend/access/nbtree/nbtsearch.c | 131 +++++++++++++++++------
 src/backend/access/nbtree/nbtsort.c   | 117 +++++++++++++++------
 src/backend/access/nbtree/nbtutils.c  | 143 +++++++++++++++++++++-----
 5 files changed, 432 insertions(+), 107 deletions(-)

diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 89a80c23970..06c54a456dc 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.10 1997/01/25 21:08:09 momjian Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.11 1997/03/24 08:48:09 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -19,6 +19,7 @@
 #include <access/nbtree.h>
 #include <access/heapam.h>
 #include <storage/bufmgr.h>
+#include <fmgr.h>
 
 #ifndef HAVE_MEMMOVE
 # include <regex/utils.h>
@@ -33,6 +34,7 @@ static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
 static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);
 static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);
 static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, Oid bti_oid, BTItem newItem);
+static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);
 
 /*
  *  _bt_doinsert() -- Handle insertion of a single btitem in the tree.
@@ -104,8 +106,16 @@ _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel
 	    itupdesc = RelationGetTupleDescriptor(rel);
 	    nbuf = InvalidBuffer;
 	    opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+	    /*
+	     * _bt_compare returns 0 for (1,NULL) and (1,NULL) -
+	     * this's how we handling NULLs - and so we must not use
+	     * _bt_compare in real comparison, but only for
+	     * ordering/finding items on pages.	- vadim 03/24/97
+
 	    while ( !_bt_compare (rel, itupdesc, page, 
 			    	natts, itup_scankey, offset) )
+	     */
+	    while ( _bt_isequal (itupdesc, page, offset, natts, itup_scankey) )
 	    {	/* they're equal */
 		btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offset));
 		itup = &(btitem->bti_itup);
@@ -123,8 +133,8 @@ _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel
 	    	{	/* move right ? */
 		    if ( P_RIGHTMOST (opaque) )
 		    	break;
-		    if ( _bt_compare (rel, itupdesc, page, 
-			    	natts, itup_scankey, P_HIKEY) )
+	    	    if ( !_bt_isequal (itupdesc, page, P_HIKEY, 
+	    	    				natts, itup_scankey) )
 			break;
 		    /* 
 		     * min key of the right page is the same,
@@ -939,18 +949,70 @@ _bt_itemcmp(Relation rel,
     IndexTuple indexTuple1, indexTuple2;
     Datum attrDatum1, attrDatum2;
     int i;
-    bool isNull;
+    bool isFirstNull, isSecondNull;
     bool compare;
+    bool useEqual = false;
+    
+    if ( strat == BTLessEqualStrategyNumber )
+    {
+	useEqual = true;
+	strat = BTLessStrategyNumber;
+    }
+    else if ( strat == BTGreaterEqualStrategyNumber )
+    {
+	useEqual = true;
+	strat = BTGreaterStrategyNumber;
+    }
     
     tupDes = RelationGetTupleDescriptor(rel);
     indexTuple1 = &(item1->bti_itup);
     indexTuple2 = &(item2->bti_itup);
     
     for (i = 1; i <= keysz; i++) {
-	attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isNull);
-	attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isNull);
-	compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2);
-	if (!compare) {
+	attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isFirstNull);
+	attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isSecondNull);
+	
+	/* see comments about NULLs handling in btbuild */
+	if ( isFirstNull )	/* attr in item1 is NULL */
+	{
+    	    if ( isSecondNull )	/* attr in item2 is NULL too */
+    	    	compare = ( strat == BTEqualStrategyNumber ) ? true : false;
+    	    else
+    	    	compare = ( strat == BTGreaterStrategyNumber ) ? true : false;
+    	}
+    	else if ( isSecondNull )	/* attr in item1 is NOT_NULL and */
+    	{				/* and attr in item2 is NULL */
+    	    compare = ( strat == BTLessStrategyNumber ) ? true : false;
+    	}
+    	else
+    	{
+	    compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2);
+	}
+	
+	if ( compare )	/* true for one of ">, <, =" */
+	{
+	    if ( strat != BTEqualStrategyNumber )
+	    	return (true);
+	}
+	else		/* false for one of ">, <, =" */
+	{
+	    if ( strat == BTEqualStrategyNumber )
+		return (false);
+	    /*
+	     * if original strat was "<=, >=" OR
+	     * "<, >" but some attribute(s) left
+	     * - need to test for Equality
+	     */
+	    if ( useEqual || i < keysz )
+	    {
+	    	if ( isFirstNull || isSecondNull )
+	    	    compare = ( isFirstNull && isSecondNull ) ? true : false;
+	    	else
+		    compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber, 
+						attrDatum1, attrDatum2);
+		if ( compare )	/* item1' and item2' attributes are equal */
+		    continue;	/* - try to compare next attributes */
+	    }
 	    return (false);
 	}
     }
@@ -1015,3 +1077,45 @@ _bt_updateitem(Relation rel,
     ItemPointerCopy(&itemPtrData, &(oldIndexTuple->t_tid));
     
 }
+
+/*
+ * _bt_isequal - used in _bt_doinsert in check for duplicates.
+ *
+ * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too.
+ */
+static bool
+_bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum,
+					int keysz, ScanKey scankey)
+{
+    Datum datum;
+    BTItem btitem;
+    IndexTuple itup;
+    ScanKey entry;
+    AttrNumber attno;
+    long result;
+    int i;
+    bool null;
+    
+    btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
+    itup = &(btitem->bti_itup);
+    
+    for (i = 1; i <= keysz; i++)
+    {
+	entry = &scankey[i - 1];
+	attno = entry->sk_attno;
+	Assert (attno == i);
+	datum = index_getattr(itup, attno, itupdesc, &null);
+
+	/* NULLs are not equal */
+	if ( entry->sk_flags & SK_ISNULL || null )
+	    return (false);
+	
+	result = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure,
+					entry->sk_argument, datum);
+	if (result != 0)
+	    return (false);
+    }
+    
+    /* by here, the keys are equal */
+    return (true);
+}
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 0fe6787c010..e75814dd8e8 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.16 1997/03/18 18:38:35 scrappy Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.17 1997/03/24 08:48:11 vadim Exp $
  *
  * NOTES
  *    This file contains only the public interface routines.
@@ -219,11 +219,21 @@ btbuild(Relation heap,
 	 *  artifact of the strategy map architecture chosen in 1986, not
 	 *  of the way nulls are handled here.
 	 */
-	
+	/*
+	 * New comments: NULLs handling.
+	 * While we can't do NULL comparison, we can follow simple
+	 * rule for ordering items on btree pages - NULLs greater
+	 * NOT_NULLs and NULL = NULL is TRUE. Sure, it's just rule
+	 * for placing/finding items and no more - keytest'll return
+	 * FALSE for a = 5 for items having 'a' isNULL.	
+	 * Look at _bt_skeycmp, _bt_compare and _bt_itemcmp for
+	 * how it works.		- vadim 03/23/97
+	 
 	if (itup->t_info & INDEX_NULL_MASK) {
 	    pfree(itup);
 	    continue;
 	}
+	 */
 	
 	itup->t_tid = htup->t_ctid;
 	btitem = _bt_formitem(itup);
@@ -328,8 +338,12 @@ btinsert(Relation rel, Datum *datum, char *nulls, ItemPointer ht_ctid, Relation
     itup = index_formtuple(RelationGetTupleDescriptor(rel), datum, nulls);
     itup->t_tid = *ht_ctid;
 
+    /*
+     * See comments in btbuild.
+     
     if (itup->t_info & INDEX_NULL_MASK)
 	return ((InsertIndexResult) NULL);
+    */
     
     btitem = _bt_formitem(itup);
     
@@ -423,7 +437,7 @@ btrescan(IndexScanDesc scan, bool fromEnd, ScanKey scankey)
     
     /* reset the scan key */
     so->numberOfKeys = scan->numberOfKeys;
-    so->numberOfFirstKeys = 0;
+    so->numberOfFirstKeys = 0;		/* may be changed by _bt_orderkeys */
     so->qual_ok = 1;			/* may be changed by _bt_orderkeys */
     if (scan->numberOfKeys > 0) {
 	memmove(scan->keyData,
@@ -433,10 +447,7 @@ btrescan(IndexScanDesc scan, bool fromEnd, ScanKey scankey)
 		scankey,
 		so->numberOfKeys * sizeof(ScanKeyData));
 	/* order the keys in the qualification */
-	if (so->numberOfKeys > 1)
-	    _bt_orderkeys(scan->relation, so);
-	else
-	    so->numberOfFirstKeys = 1;
+	_bt_orderkeys(scan->relation, so);
     }
     
     /* finally, be sure that the scan exploits the tree order */
@@ -499,9 +510,10 @@ btendscan(IndexScanDesc scan)
 	ItemPointerSetInvalid(iptr);
     }
 
-    pfree (scan->opaque);
     if ( so->keyData != (ScanKey) NULL )
     	pfree (so->keyData);
+    pfree (so);
+
     _bt_dropscan(scan);
 }
 
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 2e802ee8527..99fb38f18ce 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.15 1997/03/18 18:38:41 scrappy Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.16 1997/03/24 08:48:12 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -19,6 +19,7 @@
 #include <storage/bufpage.h>
 #include <storage/bufmgr.h>
 #include <access/nbtree.h>
+#include <catalog/pg_proc.h>
 
 #ifndef HAVE_MEMMOVE
 # include <regex/utils.h>
@@ -238,7 +239,20 @@ _bt_skeycmp(Relation rel,
     Datum keyDatum;
     bool compare;
     bool isNull;
+    bool useEqual = false;
+    bool keyNull;
     
+    if ( strat == BTLessEqualStrategyNumber )
+    {
+	useEqual = true;
+	strat = BTLessStrategyNumber;
+    }
+    else if ( strat == BTGreaterEqualStrategyNumber )
+    {
+	useEqual = true;
+	strat = BTGreaterStrategyNumber;
+    }
+	
     item = (BTItem) PageGetItem(page, itemid);
     indexTuple = &(item->bti_itup);
     
@@ -248,27 +262,60 @@ _bt_skeycmp(Relation rel,
     for (i=1; i <= keysz; i++) {
 	
 	entry = &scankey[i-1];
+	Assert ( entry->sk_attno == i );
 	attrDatum = index_getattr(indexTuple,
 				  entry->sk_attno,
 				  tupDes,
 				  &isNull);
 	keyDatum  = entry->sk_argument;
-	
-	/*
-	 * This may happen in a nested loop if an attribute used
-	 * as scan key is null.			DZ 29-10-1996
-	 */
-	if ((entry->sk_flags & SK_ISNULL) || (isNull)) {
-	    if ((entry->sk_flags & SK_ISNULL) && (isNull)) {
-		return (true);
-	    } else {
-		return (false);
-	    }
+
+	/* see comments about NULLs handling in btbuild */
+	if ( entry->sk_flags & SK_ISNULL )	/* key is NULL */
+	{
+	    Assert ( entry->sk_procedure == NullValueRegProcedure );
+	    keyNull = true;
+    	    if ( isNull )
+    	    	compare = ( strat == BTEqualStrategyNumber ) ? true : false;
+    	    else
+    	    	compare = ( strat == BTGreaterStrategyNumber ) ? true : false;
+    	}
+    	else if ( isNull )	/* key is NOT_NULL and item is NULL */
+    	{
+	    keyNull = false;
+    	    compare = ( strat == BTLessStrategyNumber ) ? true : false;
+    	}
+    	else
+    	{
+	    keyNull = false;
+	    compare = _bt_invokestrat(rel, i, strat, keyDatum, attrDatum);
 	}
 
-	compare = _bt_invokestrat(rel, i, strat, keyDatum, attrDatum);
-	if (!compare)
+	if ( compare )	/* true for one of ">, <, =" */
+	{
+	    if ( strat != BTEqualStrategyNumber )
+	    	return (true);
+	}
+	else		/* false for one of ">, <, =" */
+	{
+	    if ( strat == BTEqualStrategyNumber )
+		return (false);
+	    /*
+	     * if original strat was "<=, >=" OR
+	     * "<, >" but some attribute(s) left
+	     * - need to test for Equality
+	     */
+	    if ( useEqual || i < keysz )
+	    {
+	    	if ( keyNull || isNull )
+	    	    compare = ( keyNull && isNull ) ? true : false;
+	    	else
+		    compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber, 
+						keyDatum, attrDatum);
+		if ( compare )	/* key' and item' attributes are equal */
+		    continue;	/* - try to compare next attributes */
+	    }
 	    return (false);
+	}
     }
     
     return (true);
@@ -520,20 +567,24 @@ _bt_compare(Relation rel,
 	attno = entry->sk_attno;
 	datum = index_getattr(itup, attno, itupdesc, &null);
 
-	/*
-	 * This may happen in a nested loop if an attribute used
-	 * as scan key is null.			DZ 29-10-1996
-	 */
-	if ((entry->sk_flags & SK_ISNULL) || (null)) {
-	    if ((entry->sk_flags & SK_ISNULL) && (null)) {
-		return (0);
-	    } else {
-		return (null ? +1 : -1);
-	    }
+	/* see comments about NULLs handling in btbuild */
+	if ( entry->sk_flags & SK_ISNULL )	/* key is NULL */
+	{
+	    Assert ( entry->sk_procedure == NullValueRegProcedure );
+    	    if ( null )
+    	    	tmpres = (long) 0;		/* NULL "=" NULL */
+    	    else
+    	    	tmpres = (long) 1;		/* NULL ">" NOT_NULL */
+    	}
+    	else if ( null )	/* key is NOT_NULL and item is NULL */
+    	{
+    	    	tmpres = (long) -1;		/* NOT_NULL "<" NULL */
+    	}
+    	else
+    	{
+	    tmpres = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure,
+					entry->sk_argument, datum);
 	}
-
-	tmpres = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure,
-				  entry->sk_argument, datum);
 	result = tmpres;
 	
 	/* if the keys are unequal, return the difference */
@@ -566,6 +617,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
     BTItem btitem;
     IndexTuple itup;
     BTScanOpaque so;
+    Size keysok;
     
     rel = scan->relation;
     so = (BTScanOpaque) scan->opaque;
@@ -596,8 +648,9 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
     	btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
     	itup = &btitem->bti_itup;
     
-    	if (_bt_checkqual(scan, itup)) 
+    	if ( _bt_checkkeys (scan, itup, &keysok) ) 
     	{
+	    Assert (keysok == so->numberOfKeys);
 	    res = FormRetrieveIndexResult(current, &(itup->t_tid));
 	
 	    /* remember which buffer we have pinned and locked */
@@ -605,7 +658,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
 	    return (res);
 	}
 
-    } while ( _bt_checkforkeys (scan, itup, so->numberOfFirstKeys) );
+    } while ( keysok >= so->numberOfFirstKeys );
 
     ItemPointerSetInvalid(current);
     so->btso_curbuf = InvalidBuffer;
@@ -644,6 +697,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
     int result;
     BTScanOpaque so;
     ScanKeyData skdata;
+    Size keysok;
     
     so = (BTScanOpaque) scan->opaque;
     if ( so->qual_ok == 0 )		/* may be set by _bt_orderkeys */
@@ -663,6 +717,12 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
      *  ordered to take advantage of index ordering) to position ourselves
      *  at the right place in the scan.
      */
+    /* _bt_orderkeys disallows it, but it's place to add some code latter */
+    if ( so->keyData[0].sk_flags & SK_ISNULL )
+    {
+    	elog (WARN, "_bt_first: btree doesn't support is(not)null, yet");
+    	return ((RetrieveIndexResult) NULL);
+    }
     proc = index_getprocid(rel, 1, BTORDER_PROC);
     ScanKeyEntryInitialize(&skdata, so->keyData[0].sk_flags, 1, proc,
 			   so->keyData[0].sk_argument);
@@ -706,6 +766,9 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
      */
     
     result = _bt_compare(rel, itupdesc, page, 1, &skdata, offnum);
+
+    /* it's yet other place to add some code latter for is(not)null */
+
     strat = _bt_getstrat(rel, 1, so->keyData[0].sk_procedure);
     
     switch (strat) {
@@ -798,14 +861,14 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
     btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
     itup = &btitem->bti_itup;
     
-    if ( _bt_checkqual(scan, itup) )
+    if ( _bt_checkkeys (scan, itup, &keysok) ) 
     {
 	res = FormRetrieveIndexResult(current, &(itup->t_tid));
 	
 	/* remember which buffer we have pinned */
 	so->btso_curbuf = buf;
     }
-    else if ( _bt_checkforkeys (scan, itup, so->numberOfFirstKeys) )
+    else if ( keysok >= so->numberOfFirstKeys )
     {
 	so->btso_curbuf = buf;
 	return (_bt_next (scan, dir));
@@ -1081,6 +1144,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
     IndexTuple itup;
     BTScanOpaque so;
     RetrieveIndexResult res;
+    Size keysok;
     
     rel = scan->relation;
     current = &(scan->currentItemData);
@@ -1223,13 +1287,14 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
     itup = &(btitem->bti_itup);
     
     /* see if we picked a winner */
-    if (_bt_checkqual(scan, itup)) {
+    if ( _bt_checkkeys (scan, itup, &keysok) ) 
+    {
 	res = FormRetrieveIndexResult(current, &(itup->t_tid));
 	
 	/* remember which buffer we have pinned */
 	so->btso_curbuf = buf;
     }
-    else if ( _bt_checkforkeys (scan, itup, so->numberOfFirstKeys) )
+    else if ( keysok >= so->numberOfFirstKeys )
     {
 	so->btso_curbuf = buf;
 	return (_bt_next (scan, dir));
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 7c3d5ba5639..7ec926f9e24 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -5,7 +5,7 @@
  *
  *
  * IDENTIFICATION
- *    $Id: nbtsort.c,v 1.12 1997/02/25 03:38:23 scrappy Exp $
+ *    $Id: nbtsort.c,v 1.13 1997/03/24 08:48:15 vadim Exp $
  *
  * NOTES
  *
@@ -137,11 +137,13 @@ typedef struct {
  * *-------------------------------------------------------------------------
  */
 typedef struct {
-    Datum	btsk_datum;
+    Datum	*btsk_datum;
+    char	*btsk_nulls;
     BTItem	btsk_item;
 } BTSortKey;
 
 static Relation _bt_sortrel;
+static int _bt_nattr;
 static BTSpool * _bt_inspool;
 
 static void
@@ -149,26 +151,51 @@ _bt_isortcmpinit(Relation index, BTSpool *spool)
 {
     _bt_sortrel = index;
     _bt_inspool = spool;
+    _bt_nattr = index->rd_att->natts;
 }
 
 static int
 _bt_isortcmp(BTSortKey *k1, BTSortKey *k2)
 {
-    if (k1->btsk_item == (BTItem) NULL) {
-	if (k2->btsk_item == (BTItem) NULL) {
+    Datum *k1_datum = k1->btsk_datum;
+    Datum *k2_datum = k2->btsk_datum;
+    char *k1_nulls = k1->btsk_nulls;
+    char *k2_nulls = k2->btsk_nulls;
+    bool equal_isnull = false;
+    int i;
+    
+    if (k1->btsk_item == (BTItem) NULL)
+    {
+	if (k2->btsk_item == (BTItem) NULL)
 	    return(0);	/* 1 = 2 */
-	}
 	return(1);	/* 1 > 2 */
-    } else if (k2->btsk_item == (BTItem) NULL) {
-	return(-1);	/* 1 < 2 */
-    } else if (_bt_invokestrat(_bt_sortrel, 1, BTGreaterStrategyNumber,
-			       k1->btsk_datum, k2->btsk_datum)) {
-	return(1);	/* 1 > 2 */
-    } else if (_bt_invokestrat(_bt_sortrel, 1, BTGreaterStrategyNumber,
-			       k2->btsk_datum, k1->btsk_datum)) {
+    }
+    else if (k2->btsk_item == (BTItem) NULL)
 	return(-1);	/* 1 < 2 */
+    
+    for (i = 0; i < _bt_nattr; i++)
+    {
+	if ( k1_nulls[i] != ' ' )	/* k1 attr is NULL */
+	{
+	    if ( k2_nulls[i] != ' ' )	/* the same for k2 */
+	    {
+	    	equal_isnull = true;
+	    	continue;
+	    }
+	    return (1);			/* NULL ">" NOT_NULL */
+	}
+	else if ( k2_nulls[i] != ' ' )	/* k2 attr is NULL */
+	    return (-1);		/* NOT_NULL "<" NULL */
+	
+    	if (_bt_invokestrat(_bt_sortrel, i+1, BTGreaterStrategyNumber,
+			       k1_datum[i], k2_datum[i]))
+		return(1);	/* 1 > 2 */
+    	else if (_bt_invokestrat(_bt_sortrel, i+1, BTGreaterStrategyNumber,
+			       k2_datum[i], k1_datum[i]))
+		return(-1);	/* 1 < 2 */
     }
-    if ( _bt_inspool->isunique )
+
+    if ( _bt_inspool->isunique && !equal_isnull )
     {
     	_bt_spooldestroy ((void*)_bt_inspool);
     	elog (WARN, "Cannot create unique index. Table contains non-unique values");
@@ -180,15 +207,29 @@ static void
 _bt_setsortkey(Relation index, BTItem bti, BTSortKey *sk)
 {
     sk->btsk_item = (BTItem) NULL;
-    sk->btsk_datum = (Datum) NULL;
-    if (bti != (BTItem) NULL) {
+    sk->btsk_datum = (Datum*) NULL;
+    sk->btsk_nulls = (char*) NULL;
+
+    if (bti != (BTItem) NULL)
+    {
+	IndexTuple it = &(bti->bti_itup);
+	TupleDesc itdesc = index->rd_att;
+	Datum *dp = (Datum*) palloc (_bt_nattr * sizeof (Datum));
+	char *np = (char*) palloc (_bt_nattr * sizeof (char));
 	bool isnull;
-	Datum d = index_getattr(&(bti->bti_itup), 1, index->rd_att, &isnull);
-	
-	if (!isnull) {
-	    sk->btsk_item = bti;
-	    sk->btsk_datum = d;
+	int i;
+
+    	for (i = 0; i < _bt_nattr; i++)
+    	{
+	    dp[i] = index_getattr(it, i+1, itdesc, &isnull);
+	    if ( isnull )
+		np[i] = 'n';
+	    else
+	    	np[i] = ' ';
 	}
+	sk->btsk_item = bti;
+	sk->btsk_datum = dp;
+	sk->btsk_nulls = np;
     }
 }
 
@@ -622,27 +663,25 @@ _bt_spool(Relation index, BTItem btitem, void *spool)
 	BTItem bti;
 	char *pos;
 	int btisz;
+	int it_ntup = itape->bttb_ntup;
 	int i;
 
 	/*
 	 * build an array of pointers to the BTItemDatas on the input
 	 * block.
 	 */
-	if (itape->bttb_ntup > 0) {
+	if (it_ntup > 0) {
 	    parray =
-		(BTSortKey *) palloc(itape->bttb_ntup * sizeof(BTSortKey));
-	    if (parray == (BTSortKey *) NULL) {
-		elog(WARN, "_bt_spool: out of memory");
-	    }
+		(BTSortKey *) palloc(it_ntup * sizeof(BTSortKey));
 	    pos = itape->bttb_data;
-	    for (i = 0; i < itape->bttb_ntup; ++i) {
+	    for (i = 0; i < it_ntup; ++i) {
 		_bt_setsortkey(index, _bt_tapenext(itape, &pos), &(parray[i]));
 	    }
 	    
 	    /*
 	     * qsort the pointer array.
 	     */
-	    qsort((void *) parray, itape->bttb_ntup, sizeof(BTSortKey),
+	    qsort((void *) parray, it_ntup, sizeof(BTSortKey),
 		  (int (*)(const void *,const void *))_bt_isortcmp);
 	}
 
@@ -656,7 +695,7 @@ _bt_spool(Relation index, BTItem btitem, void *spool)
 	 * block..)
 	 */
 	otape = btspool->bts_otape[btspool->bts_tape];
-	for (i = 0; i < itape->bttb_ntup; ++i) {
+	for (i = 0; i < it_ntup; ++i) {
 	    bti = parray[i].btsk_item;
 	    btisz = BTITEMSZ(bti);
 	    btisz = DOUBLEALIGN(btisz);
@@ -694,7 +733,15 @@ _bt_spool(Relation index, BTItem btitem, void *spool)
 	/*
 	 * destroy the pointer array.
 	 */
-	if (parray != (BTSortKey *) NULL) {
+	if (parray != (BTSortKey *) NULL)
+	{
+	    for (i = 0; i < it_ntup; i++)
+	    {
+	    	if ( parray[i].btsk_datum != (Datum*) NULL )
+	    	    pfree ((void*)(parray[i].btsk_datum));
+	    	if ( parray[i].btsk_nulls != (char*) NULL )
+	    	    pfree ((void*)(parray[i].btsk_nulls));
+	    }
 	    pfree((void *) parray);
 	}
     }
@@ -976,7 +1023,7 @@ _bt_buildadd(Relation index, void *pstate, BTItem bti, int flags)
 #endif
     if (last_bti == (BTItem) NULL) {
 	first_off = P_FIRSTKEY;
-    } else if (!_bt_itemcmp(index, 1, bti, last_bti, BTEqualStrategyNumber)) {
+    } else if (!_bt_itemcmp(index, _bt_nattr, bti, last_bti, BTEqualStrategyNumber)) {
 	first_off = off;
     }
     last_off = off;
@@ -1044,6 +1091,7 @@ _bt_merge(Relation index, BTSpool *btspool)
     BTPageState *state;
     BTPriQueue q;
     BTPriQueueElem e;
+    BTSortKey btsk;
     BTItem bti;
     BTTapeBlock *itape;
     BTTapeBlock *otape;
@@ -1136,7 +1184,8 @@ _bt_merge(Relation index, BTSpool *btspool)
 		 * if it hits either End-Of-Run or EOF.
 		 */
 		t = e.btpqe_tape;
-		bti = e.btpqe_item.btsk_item;
+		btsk = e.btpqe_item;
+		bti = btsk.btsk_item;
 		if (bti != (BTItem) NULL) {
 		    btisz = BTITEMSZ(bti);
 		    btisz = DOUBLEALIGN(btisz);
@@ -1177,6 +1226,12 @@ _bt_merge(Relation index, BTSpool *btspool)
 			}
 #endif /* FASTBUILD_DEBUG && FASTBUILD_MERGE */
 		    }
+		    
+    		    if ( btsk.btsk_datum != (Datum*) NULL )
+	    	    	pfree ((void*)(btsk.btsk_datum));
+	    	    if ( btsk.btsk_nulls != (char*) NULL )
+	    	    	pfree ((void*)(btsk.btsk_nulls));
+		    
 		}
 		itape = btspool->bts_itape[t];
 		if (!tapedone[t]) {
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 6d0a40ef132..fa2ff890fe9 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.8 1997/03/18 18:38:46 scrappy Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.9 1997/03/24 08:48:16 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -20,6 +20,11 @@
 #include <access/nbtree.h>
 #include <access/istrat.h>
 #include <access/iqual.h>
+#include <catalog/pg_proc.h>
+#include <executor/execdebug.h>
+
+extern int	NIndexTupleProcessed;
+
 
 #ifndef HAVE_MEMMOVE
 # include <regex/utils.h>
@@ -37,6 +42,7 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
     Datum arg;
     RegProcedure proc;
     bool null;
+    bits16 flag;
     
     natts = rel->rd_rel->relnatts;
     itupdesc = RelationGetTupleDescriptor(rel);
@@ -45,9 +51,18 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
     
     for (i = 0; i < natts; i++) {
 	arg = index_getattr(itup, i + 1, itupdesc, &null);
-	proc = index_getprocid(rel, i + 1, BTORDER_PROC);
+	if ( null )
+	{
+	    proc = NullValueRegProcedure;
+	    flag = SK_ISNULL;
+	}
+	else
+	{
+	    proc = index_getprocid(rel, i + 1, BTORDER_PROC);
+	    flag = 0x0;
+	}
 	ScanKeyEntryInitialize(&skey[i],
-			       0x0, (AttrNumber) (i + 1), proc, arg);
+			       flag, (AttrNumber) (i + 1), proc, arg);
     }
     
     return (skey);
@@ -90,22 +105,35 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
     int i, j;
     int init[BTMaxStrategyNumber+1];
     ScanKey key;
-    uint16 numberOfKeys, new_numberOfKeys = 0;
+    uint16 numberOfKeys = so->numberOfKeys;
+    uint16 new_numberOfKeys = 0;
     AttrNumber attno = 1;
     
-    numberOfKeys = so->numberOfKeys;
+    if ( numberOfKeys < 1 )
+    	return;
+    
     key = so->keyData;
     
-    if ( numberOfKeys <= 1 )
+    cur = &key[0];
+    if ( cur->sk_attno != 1 )
+	elog (WARN, "_bt_orderkeys: key(s) for attribute 1 missed");
+    
+    if ( numberOfKeys == 1 )
+    {
+	/*
+	 * We don't use indices for 'A is null' and 'A is not null'
+	 * currently and 'A < = > <> NULL' is non-sense' - so
+	 * qual is not Ok. 	- vadim 03/21/97
+	 */
+	if ( cur->sk_flags & SK_ISNULL )
+	    so->qual_ok = 0;
+    	so->numberOfFirstKeys = 1;
     	return;
+    }
     
     /* get space for the modified array of keys */
     nbytes = BTMaxStrategyNumber * sizeof(ScanKeyData);
     xform = (ScanKey) palloc(nbytes);
-    
-    cur = &key[0];
-    if ( cur->sk_attno != 1 )
-	elog (WARN, "_bt_orderkeys: key(s) for attribute 1 missed");
 
     memset(xform, 0, nbytes); 
     map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation),
@@ -119,6 +147,10 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
     {
 	if ( i < numberOfKeys )
 	    cur = &key[i];
+
+	if ( cur->sk_flags & SK_ISNULL )	/* see comments above */
+	    so->qual_ok = 0;
+
 	if ( i == numberOfKeys || cur->sk_attno != attno )
 	{
 	    if ( cur->sk_attno != attno + 1 && i < numberOfKeys )
@@ -243,6 +275,32 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
     pfree(xform);
 }
 
+BTItem
+_bt_formitem(IndexTuple itup)
+{
+    int nbytes_btitem;
+    BTItem btitem;
+    Size tuplen;
+    extern Oid newoid();
+    
+    /* see comments in btbuild
+    
+    if (itup->t_info & INDEX_NULL_MASK)
+	elog(WARN, "btree indices cannot include null keys");
+    */
+    
+    /* make a copy of the index tuple with room for the sequence number */
+    tuplen = IndexTupleSize(itup);
+    nbytes_btitem = tuplen +
+	(sizeof(BTItemData) - sizeof(IndexTupleData));
+    
+    btitem = (BTItem) palloc(nbytes_btitem);
+    memmove((char *) &(btitem->bti_itup), (char *) itup, tuplen);
+    
+    btitem->bti_oid = newoid();
+    return (btitem);
+}
+
 bool
 _bt_checkqual(IndexScanDesc scan, IndexTuple itup)
 {
@@ -269,26 +327,57 @@ _bt_checkforkeys(IndexScanDesc scan, IndexTuple itup, Size keysz)
 	return (true);
 }
 
-BTItem
-_bt_formitem(IndexTuple itup)
+bool
+_bt_checkkeys (IndexScanDesc scan, IndexTuple tuple, Size *keysok)
 {
-    int nbytes_btitem;
-    BTItem btitem;
-    Size tuplen;
-    extern Oid newoid();
+    BTScanOpaque so = (BTScanOpaque) scan->opaque;
+    Size keysz = so->numberOfKeys;
+    TupleDesc tupdesc;
+    ScanKey key;
+    Datum datum;
+    bool isNull;
+    int	test;
     
-    /* disallow nulls in btree keys */
-    if (itup->t_info & INDEX_NULL_MASK)
-	elog(WARN, "btree indices cannot include null keys");
+    *keysok = 0;
+    if ( keysz == 0 )
+        return (true);
     
-    /* make a copy of the index tuple with room for the sequence number */
-    tuplen = IndexTupleSize(itup);
-    nbytes_btitem = tuplen +
-	(sizeof(BTItemData) - sizeof(IndexTupleData));
+    key = so->keyData;
+    tupdesc = RelationGetTupleDescriptor(scan->relation);
     
-    btitem = (BTItem) palloc(nbytes_btitem);
-    memmove((char *) &(btitem->bti_itup), (char *) itup, tuplen);
+    IncrIndexProcessed();
     
-    btitem->bti_oid = newoid();
-    return (btitem);
+    while (keysz > 0)
+    {
+	datum = index_getattr(tuple,
+			      key[0].sk_attno,
+			      tupdesc,
+			      &isNull);
+	
+	/* btree doesn't support 'A is null' clauses, yet */
+	if ( isNull || key[0].sk_flags & SK_ISNULL )
+	{
+	    return (false);
+	}
+
+	if (key[0].sk_flags & SK_COMMUTE) {
+	    test = (int) (*(key[0].sk_func))
+		(DatumGetPointer(key[0].sk_argument),
+		 datum);
+	} else {
+	    test = (int) (*(key[0].sk_func))
+		(datum,
+		 DatumGetPointer(key[0].sk_argument));
+	}
+	
+	if (!test == !(key[0].sk_flags & SK_NEGATE)) {
+	    return (false);
+	}
+	
+	keysz -= 1;
+	key++;
+	(*keysok)++;
+    }
+    
+    return (true);
 }
-- 
GitLab