From e7a9ccdb6c5a95bf13a307d44e89a1956914dffc Mon Sep 17 00:00:00 2001
From: Bruce Momjian <bruce@momjian.us>
Date: Tue, 7 Feb 2006 16:03:50 +0000
Subject: [PATCH] I think that NUMERIC datatype has a problem in the
 performance that the format on Tuple(Numeric) and the format to
 calculate(NumericVar) are different. I understood that to reduce I/O.
 However, when many comparisons or calculations of NUMERIC are executed, the
 conversion of Numeric and NumericVar becomes a bottleneck.

It is profile result when "create index on NUMERIC column" is executed:

  %   cumulative   self              self     total
 time   seconds   seconds    calls   s/call   s/call  name
 17.61     10.27    10.27 34542006     0.00     0.00  cmp_numerics
 11.90     17.21     6.94 34542006     0.00     0.00  comparetup_index
  7.42     21.54     4.33 71102587     0.00     0.00  AllocSetAlloc
  7.02     25.64     4.09 69084012     0.00     0.00  set_var_from_num
  4.87     28.48     2.84 69084012     0.00     0.00  alloc_var
  4.79     31.27     2.79 142205745     0.00     0.00  AllocSetFreeIndex
  4.55     33.92     2.65 34542004     0.00     0.00  cmp_abs
  4.07     36.30     2.38 71101189     0.00     0.00  AllocSetFree
  3.83     38.53     2.23 69084012     0.00     0.00  free_var

The create index command executes many comparisons of Numeric values.
Functions other than comparetup_index spent a lot of cycles for
conversion from Numeric to NumericVar.

An attached patch enables the comparison of Numeric values without
executing conversion to NumericVar. The execution time of that SQL
becomes half.

o Test SQL (index_test table has 1,000,000 tuples)
 create index index_test_idx on index_test(num_col);

o Test results (executed the test five times)
(1)PentiumIII
 original: 39.789s  36.823s  36.737s  37.752s  37.019s
 patched : 18.560s  19.103s  18.830s  18.408s  18.853s
  4.07     36.30     2.38 71101189     0.00     0.00  AllocSetFree
  3.83     38.53     2.23 69084012     0.00     0.00  free_var

The create index command executes many comparisons of Numeric values.
Functions other than comparetup_index spent a lot of cycles for
conversion from Numeric to NumericVar.

An attached patch enables the comparison of Numeric values without
executing conversion to NumericVar. The execution time of that SQL
becomes half.

o Test SQL (index_test table has 1,000,000 tuples)
 create index index_test_idx on index_test(num_col);

o Test results (executed the test five times)
(1)PentiumIII
 original: 39.789s  36.823s  36.737s  37.752s  37.019s
 patched : 18.560s  19.103s  18.830s  18.408s  18.853s

(2)Pentium4
 original: 16.349s  14.997s  12.979s  13.169s  12.955s
 patched :  7.005s   6.594s   6.770s   6.740s   6.828s

(3)Itanium2
 original: 15.392s  15.447s  15.350s  15.370s  15.417s
 patched :  7.413s   7.330s   7.334s   7.339s   7.339s

(4)Ultra Sparc
 original: 64.435s  59.336s  59.332s  58.455s  59.781s
 patched : 28.630s  28.666s  28.983s  28.744s  28.595s

Atsushi Ogawa
---
 src/backend/utils/adt/numeric.c | 112 +++++++++++++++++++++-----------
 1 file changed, 73 insertions(+), 39 deletions(-)

diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index bd1e28d64b9..f3fa9086be8 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -14,7 +14,7 @@
  * Copyright (c) 1998-2005, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.90 2006/01/25 18:15:03 momjian Exp $
+ *	  $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.91 2006/02/07 16:03:50 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -229,6 +229,10 @@ static void dump_var(const char *str, NumericVar *var);
 
 #define init_var(v)		MemSetAligned(v, 0, sizeof(NumericVar))
 
+#define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data)
+#define NUMERIC_NDIGITS(num) \
+	(((num)->varlen - NUMERIC_HDRSZ) / sizeof(NumericDigit))
+
 static void alloc_var(NumericVar *var, int ndigits);
 static void free_var(NumericVar *var);
 static void zero_var(NumericVar *var);
@@ -250,6 +254,10 @@ static double numericvar_to_double_no_overflow(NumericVar *var);
 
 static int	cmp_numerics(Numeric num1, Numeric num2);
 static int	cmp_var(NumericVar *var1, NumericVar *var2);
+static int	cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
+						   int var1weight, int var1sign,
+						   const NumericDigit *var2digits, int var2ndigits,
+						   int var2weight, int var2sign);
 static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
 static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
 static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
@@ -271,6 +279,10 @@ static void power_var_int(NumericVar *base, int exp, NumericVar *result,
 			  int rscale);
 
 static int	cmp_abs(NumericVar *var1, NumericVar *var2);
+static int	cmp_abs_common(const NumericDigit *var1digits, int var1ndigits,
+						   int var1weight,
+						   const NumericDigit *var2digits, int var2ndigits,
+						   int var2weight);
 static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
 static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
 static void round_var(NumericVar *var, int rscale);
@@ -1061,19 +1073,10 @@ cmp_numerics(Numeric num1, Numeric num2)
 	}
 	else
 	{
-		NumericVar	arg1;
-		NumericVar	arg2;
-
-		init_var(&arg1);
-		init_var(&arg2);
-
-		set_var_from_num(num1, &arg1);
-		set_var_from_num(num2, &arg2);
-
-		result = cmp_var(&arg1, &arg2);
-
-		free_var(&arg1);
-		free_var(&arg2);
+		result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
+								num1->n_weight, NUMERIC_SIGN(num1),
+								NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
+								num2->n_weight, NUMERIC_SIGN(num2));
 	}
 
 	return result;
@@ -2591,11 +2594,11 @@ int8_avg(PG_FUNCTION_ARGS)
 static void
 dump_numeric(const char *str, Numeric num)
 {
-	NumericDigit *digits = (NumericDigit *) num->n_data;
+	NumericDigit *digits = NUMERIC_DIGITS(num);
 	int			ndigits;
 	int			i;
 
-	ndigits = (num->varlen - NUMERIC_HDRSZ) / sizeof(NumericDigit);
+	ndigits = NUMERIC_NDIGITS(num);
 
 	printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
 	switch (NUMERIC_SIGN(num))
@@ -2895,7 +2898,7 @@ set_var_from_num(Numeric num, NumericVar *dest)
 {
 	int			ndigits;
 
-	ndigits = (num->varlen - NUMERIC_HDRSZ) / sizeof(NumericDigit);
+	ndigits = NUMERIC_NDIGITS(num);
 
 	alloc_var(dest, ndigits);
 
@@ -3394,32 +3397,52 @@ numericvar_to_double_no_overflow(NumericVar *var)
 static int
 cmp_var(NumericVar *var1, NumericVar *var2)
 {
-	if (var1->ndigits == 0)
+	return cmp_var_common(var1->digits, var1->ndigits,
+						  var1->weight, var1->sign,
+						  var2->digits, var2->ndigits,
+						  var2->weight, var2->sign);
+}
+
+/*
+ * cmp_var_common() -
+ *
+ *  Main routine of cmp_var(). This function can be used by both
+ *  NumericVar and Numeric. 
+ */
+static int
+cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
+			   int var1weight, int var1sign,
+			   const NumericDigit *var2digits, int var2ndigits,
+			   int var2weight, int var2sign)
+{
+	if (var1ndigits == 0)
 	{
-		if (var2->ndigits == 0)
+		if (var2ndigits == 0)
 			return 0;
-		if (var2->sign == NUMERIC_NEG)
+		if (var2sign == NUMERIC_NEG)
 			return 1;
 		return -1;
 	}
-	if (var2->ndigits == 0)
+	if (var2ndigits == 0)
 	{
-		if (var1->sign == NUMERIC_POS)
+		if (var1sign == NUMERIC_POS)
 			return 1;
 		return -1;
 	}
 
-	if (var1->sign == NUMERIC_POS)
+	if (var1sign == NUMERIC_POS)
 	{
-		if (var2->sign == NUMERIC_NEG)
+		if (var2sign == NUMERIC_NEG)
 			return 1;
-		return cmp_abs(var1, var2);
+		return cmp_abs_common(var1digits, var1ndigits, var1weight,
+							  var2digits, var2ndigits, var2weight);
 	}
 
-	if (var2->sign == NUMERIC_POS)
+	if (var2sign == NUMERIC_POS)
 		return -1;
 
-	return cmp_abs(var2, var1);
+	return cmp_abs_common(var2digits, var2ndigits, var2weight,
+						  var1digits, var1ndigits, var1weight);
 }
 
 
@@ -4814,33 +4837,44 @@ power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
 static int
 cmp_abs(NumericVar *var1, NumericVar *var2)
 {
-	NumericDigit *var1digits = var1->digits;
-	NumericDigit *var2digits = var2->digits;
+	return cmp_abs_common(var1->digits, var1->ndigits, var1->weight,
+						  var2->digits, var2->ndigits, var2->weight);
+}
+
+/* ----------
+ * cmp_abs_common() -
+ *
+ *  Main routine of cmp_abs(). This function can be used by both
+ *  NumericVar and Numeric. 
+ * ----------
+ */
+static int
+cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight,
+			   const NumericDigit *var2digits, int var2ndigits, int var2weight)
+{
 	int			i1 = 0;
 	int			i2 = 0;
-	int			w1 = var1->weight;
-	int			w2 = var2->weight;
 
 	/* Check any digits before the first common digit */
 
-	while (w1 > w2 && i1 < var1->ndigits)
+	while (var1weight > var2weight && i1 < var1ndigits)
 	{
 		if (var1digits[i1++] != 0)
 			return 1;
-		w1--;
+		var1weight--;
 	}
-	while (w2 > w1 && i2 < var2->ndigits)
+	while (var2weight > var1weight && i2 < var2ndigits)
 	{
 		if (var2digits[i2++] != 0)
 			return -1;
-		w2--;
+		var2weight--;
 	}
 
 	/* At this point, either w1 == w2 or we've run out of digits */
 
-	if (w1 == w2)
+	if (var1weight == var2weight)
 	{
-		while (i1 < var1->ndigits && i2 < var2->ndigits)
+		while (i1 < var1ndigits && i2 < var2ndigits)
 		{
 			int			stat = var1digits[i1++] - var2digits[i2++];
 
@@ -4857,12 +4891,12 @@ cmp_abs(NumericVar *var1, NumericVar *var2)
 	 * At this point, we've run out of digits on one side or the other; so any
 	 * remaining nonzero digits imply that side is larger
 	 */
-	while (i1 < var1->ndigits)
+	while (i1 < var1ndigits)
 	{
 		if (var1digits[i1++] != 0)
 			return 1;
 	}
-	while (i2 < var2->ndigits)
+	while (i2 < var2ndigits)
 	{
 		if (var2digits[i2++] != 0)
 			return -1;
-- 
GitLab