From 9cca11c915e458323d0e746c68203f2c11da0302 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 2 Sep 2016 11:51:49 +0300
Subject: [PATCH] Speed up SUM calculation in numeric aggregates.

This introduces a numeric sum accumulator, which performs better than
repeatedly calling add_var(). The performance comes from using wider digits
and delaying carry propagation, tallying positive and negative values
separately, and avoiding a round of palloc/pfree on every value. This
speeds up SUM(), as well as other standard aggregates like AVG() and
STDDEV() that also calculate a sum internally.

Reviewed-by: Andrey Borodin
Discussion: <c0545351-a467-5b76-6d46-4840d1ea8aa4@iki.fi>
---
 src/backend/utils/adt/numeric.c       | 601 +++++++++++++++++++++-----
 src/test/regress/expected/numeric.out |  16 +
 src/test/regress/sql/numeric.sql      |   8 +
 3 files changed, 522 insertions(+), 103 deletions(-)

diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 620226cea11..27efd310abb 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -302,6 +302,49 @@ typedef struct
 	hyperLogLogState abbr_card; /* cardinality estimator */
 } NumericSortSupport;
 
+
+/* ----------
+ * Fast sum accumulator.
+ *
+ * NumericSumAccum is used to implement SUM(), and other standard aggregates
+ * that track the sum of input values.  It uses 32-bit integers to store the
+ * digits, instead of the normal 16-bit integers (with NBASE=10000).  This
+ * way, we can safely accumulate up to NBASE - 1 values without propagating
+ * carry, before risking overflow of any of the digits.  'num_uncarried'
+ * tracks how many values have been accumulated without propagating carry.
+ *
+ * Positive and negative values are accumulated separately, in 'pos_digits'
+ * and 'neg_digits'.  This is simpler and faster than deciding whether to add
+ * or subtract from the current value, for each new value (see sub_var() for
+ * the logic we avoid by doing this).  Both buffers are of same size, and
+ * have the same weight and scale.  In accum_sum_final(), the positive and
+ * negative sums are added together to produce the final result.
+ *
+ * When a new value has a larger ndigits or weight than the accumulator
+ * currently does, the accumulator is enlarged to accommodate the new value.
+ * We normally have one zero digit reserved for carry propagation, and that
+ * is indicated by the 'have_carry_space' flag.  When accum_sum_carry() uses
+ * up the reserved digit, it clears the 'have_carry_space' flag.  The next
+ * call to accum_sum_add() will enlarge the buffer, to make room for the
+ * extra digit, and set the flag again.
+ *
+ * To initialize a new accumulator, simply reset all fields to zeros.
+ *
+ * The accumulator does not handle NaNs.
+ * ----------
+ */
+typedef struct NumericSumAccum
+{
+	int			ndigits;
+	int			weight;
+	int			dscale;
+	int			num_uncarried;
+	bool		have_carry_space;
+	int32	   *pos_digits;
+	int32	   *neg_digits;
+} NumericSumAccum;
+
+
 /*
  * We define our own macros for packing and unpacking abbreviated-key
  * representations for numeric values in order to avoid depending on
@@ -490,6 +533,14 @@ static void strip_var(NumericVar *var);
 static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
 			   NumericVar *count_var, NumericVar *result_var);
 
+static void accum_sum_add(NumericSumAccum *accum, NumericVar *var1);
+static void accum_sum_rescale(NumericSumAccum *accum, NumericVar *val);
+static void accum_sum_carry(NumericSumAccum *accum);
+static void accum_sum_reset(NumericSumAccum *accum);
+static void accum_sum_final(NumericSumAccum *accum, NumericVar *result);
+static void accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src);
+static void accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2);
+
 
 /* ----------------------------------------------------------------------
  *
@@ -3140,8 +3191,8 @@ typedef struct NumericAggState
 	bool		calcSumX2;		/* if true, calculate sumX2 */
 	MemoryContext agg_context;	/* context we're calculating in */
 	int64		N;				/* count of processed numbers */
-	NumericVar	sumX;			/* sum of processed numbers */
-	NumericVar	sumX2;			/* sum of squares of processed numbers */
+	NumericSumAccum sumX;		/* sum of processed numbers */
+	NumericSumAccum sumX2;		/* sum of squares of processed numbers */
 	int			maxScale;		/* maximum scale seen so far */
 	int64		maxScaleCount;	/* number of values seen with maximum scale */
 	int64		NaNcount;		/* count of NaN values (not included in N!) */
@@ -3230,22 +3281,13 @@ do_numeric_accum(NumericAggState *state, Numeric newval)
 	/* The rest of this needs to work in the aggregate context */
 	old_context = MemoryContextSwitchTo(state->agg_context);
 
-	if (state->N++ > 0)
-	{
-		/* Accumulate sums */
-		add_var(&X, &(state->sumX), &(state->sumX));
+	state->N++;
 
-		if (state->calcSumX2)
-			add_var(&X2, &(state->sumX2), &(state->sumX2));
-	}
-	else
-	{
-		/* First input, so initialize sums */
-		set_var_from_var(&X, &(state->sumX));
+	/* Accumulate sums */
+	accum_sum_add(&(state->sumX), &X);
 
-		if (state->calcSumX2)
-			set_var_from_var(&X2, &(state->sumX2));
-	}
+	if (state->calcSumX2)
+		accum_sum_add(&(state->sumX2), &X2);
 
 	MemoryContextSwitchTo(old_context);
 }
@@ -3324,16 +3366,25 @@ do_numeric_discard(NumericAggState *state, Numeric newval)
 
 	if (state->N-- > 1)
 	{
-		/* De-accumulate sums */
-		sub_var(&(state->sumX), &X, &(state->sumX));
+		/* Negate X, to subtract it from the sum */
+		X.sign = (X.sign == NUMERIC_POS ? NUMERIC_NEG : NUMERIC_POS);
+		accum_sum_add(&(state->sumX), &X);
 
 		if (state->calcSumX2)
-			sub_var(&(state->sumX2), &X2, &(state->sumX2));
+		{
+			/* Negate X^2. X^2 is always positive */
+			X2.sign = NUMERIC_NEG;
+			accum_sum_add(&(state->sumX2), &X2);
+		}
 	}
 	else
 	{
-		/* Sums will be reset by next call to do_numeric_accum */
+		/* Zero the sums */
 		Assert(state->N == 0);
+
+		accum_sum_reset(&state->sumX);
+		if (state->calcSumX2)
+			accum_sum_reset(&state->sumX2);
 	}
 
 	MemoryContextSwitchTo(old_context);
@@ -3392,11 +3443,8 @@ numeric_combine(PG_FUNCTION_ARGS)
 		state1->maxScale = state2->maxScale;
 		state1->maxScaleCount = state2->maxScaleCount;
 
-		init_var(&state1->sumX);
-		set_var_from_var(&state2->sumX, &state1->sumX);
-
-		init_var(&state1->sumX2);
-		set_var_from_var(&state2->sumX2, &state1->sumX2);
+		accum_sum_copy(&state1->sumX, &state2->sumX);
+		accum_sum_copy(&state1->sumX2, &state2->sumX2);
 
 		MemoryContextSwitchTo(old_context);
 
@@ -3424,8 +3472,8 @@ numeric_combine(PG_FUNCTION_ARGS)
 		old_context = MemoryContextSwitchTo(agg_context);
 
 		/* Accumulate sums */
-		add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
-		add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+		accum_sum_combine(&state1->sumX, &state2->sumX);
+		accum_sum_combine(&state1->sumX2, &state2->sumX2);
 
 		MemoryContextSwitchTo(old_context);
 	}
@@ -3483,8 +3531,7 @@ numeric_avg_combine(PG_FUNCTION_ARGS)
 		state1->maxScale = state2->maxScale;
 		state1->maxScaleCount = state2->maxScaleCount;
 
-		init_var(&state1->sumX);
-		set_var_from_var(&state2->sumX, &state1->sumX);
+		accum_sum_copy(&state1->sumX, &state2->sumX);
 
 		MemoryContextSwitchTo(old_context);
 
@@ -3512,7 +3559,7 @@ numeric_avg_combine(PG_FUNCTION_ARGS)
 		old_context = MemoryContextSwitchTo(agg_context);
 
 		/* Accumulate sums */
-		add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+		accum_sum_combine(&state1->sumX, &state2->sumX);
 
 		MemoryContextSwitchTo(old_context);
 	}
@@ -3532,6 +3579,7 @@ numeric_avg_serialize(PG_FUNCTION_ARGS)
 	Datum		temp;
 	bytea	   *sumX;
 	bytea	   *result;
+	NumericVar	tmp_var;
 
 	/* Ensure we disallow calling when not in aggregate context */
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -3545,9 +3593,13 @@ numeric_avg_serialize(PG_FUNCTION_ARGS)
 	 * splitting the tasks in numeric_send into separate functions to stop
 	 * this? Doing so would also remove the fmgr call overhead.
 	 */
+	init_var(&tmp_var);
+	accum_sum_final(&state->sumX, &tmp_var);
+
 	temp = DirectFunctionCall1(numeric_send,
-							   NumericGetDatum(make_result(&state->sumX)));
+							   NumericGetDatum(make_result(&tmp_var)));
 	sumX = DatumGetByteaP(temp);
+	free_var(&tmp_var);
 
 	pq_begintypsend(&buf);
 
@@ -3582,6 +3634,7 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS)
 	bytea	   *sstate;
 	NumericAggState *result;
 	Datum		temp;
+	NumericVar	tmp_var;
 	StringInfoData buf;
 
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -3606,7 +3659,8 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS)
 							   PointerGetDatum(&buf),
 							   InvalidOid,
 							   -1);
-	set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+	init_var_from_num(DatumGetNumeric(temp), &tmp_var);
+	accum_sum_add(&(result->sumX), &tmp_var);
 
 	/* maxScale */
 	result->maxScale = pq_getmsgint(&buf, 4);
@@ -3635,6 +3689,7 @@ numeric_serialize(PG_FUNCTION_ARGS)
 	StringInfoData buf;
 	Datum		temp;
 	bytea	   *sumX;
+	NumericVar	tmp_var;
 	bytea	   *sumX2;
 	bytea	   *result;
 
@@ -3650,14 +3705,20 @@ numeric_serialize(PG_FUNCTION_ARGS)
 	 * splitting the tasks in numeric_send into separate functions to stop
 	 * this? Doing so would also remove the fmgr call overhead.
 	 */
+	init_var(&tmp_var);
+
+	accum_sum_final(&state->sumX, &tmp_var);
 	temp = DirectFunctionCall1(numeric_send,
-							   NumericGetDatum(make_result(&state->sumX)));
+							   NumericGetDatum(make_result(&tmp_var)));
 	sumX = DatumGetByteaP(temp);
 
+	accum_sum_final(&state->sumX2, &tmp_var);
 	temp = DirectFunctionCall1(numeric_send,
-							   NumericGetDatum(make_result(&state->sumX2)));
+							   NumericGetDatum(make_result(&tmp_var)));
 	sumX2 = DatumGetByteaP(temp);
 
+	free_var(&tmp_var);
+
 	pq_begintypsend(&buf);
 
 	/* N */
@@ -3694,6 +3755,8 @@ numeric_deserialize(PG_FUNCTION_ARGS)
 	bytea	   *sstate;
 	NumericAggState *result;
 	Datum		temp;
+	NumericVar	sumX_var;
+	NumericVar	sumX2_var;
 	StringInfoData buf;
 
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -3718,14 +3781,16 @@ numeric_deserialize(PG_FUNCTION_ARGS)
 							   PointerGetDatum(&buf),
 							   InvalidOid,
 							   -1);
-	set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+	init_var_from_num(DatumGetNumeric(temp), &sumX_var);
+	accum_sum_add(&(result->sumX), &sumX_var);
 
 	/* sumX2 */
 	temp = DirectFunctionCall3(numeric_recv,
 							   PointerGetDatum(&buf),
 							   InvalidOid,
 							   -1);
-	set_var_from_num(DatumGetNumeric(temp), &result->sumX2);
+	init_var_from_num(DatumGetNumeric(temp), &sumX2_var);
+	accum_sum_add(&(result->sumX2), &sumX2_var);
 
 	/* maxScale */
 	result->maxScale = pq_getmsgint(&buf, 4);
@@ -3974,11 +4039,8 @@ numeric_poly_combine(PG_FUNCTION_ARGS)
 		state1->sumX = state2->sumX;
 		state1->sumX2 = state2->sumX2;
 #else
-		init_var(&(state1->sumX));
-		set_var_from_var(&(state2->sumX), &(state1->sumX));
-
-		init_var(&state1->sumX2);
-		set_var_from_var(&(state2->sumX2), &(state1->sumX2));
+		accum_sum_copy(&state2->sumX, &state1->sumX);
+		accum_sum_copy(&state2->sumX2, &state1->sumX2);
 #endif
 
 		MemoryContextSwitchTo(old_context);
@@ -3998,8 +4060,8 @@ numeric_poly_combine(PG_FUNCTION_ARGS)
 		old_context = MemoryContextSwitchTo(agg_context);
 
 		/* Accumulate sums */
-		add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
-		add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+		accum_sum_combine(&state1->sumX, &state2->sumX);
+		accum_sum_combine(&state1->sumX2, &state2->sumX2);
 
 		MemoryContextSwitchTo(old_context);
 #endif
@@ -4038,30 +4100,29 @@ numeric_poly_serialize(PG_FUNCTION_ARGS)
 	 */
 	{
 		Datum		temp;
-
-#ifdef HAVE_INT128
 		NumericVar	num;
 
 		init_var(&num);
+
+#ifdef HAVE_INT128
 		int128_to_numericvar(state->sumX, &num);
+#else
+		accum_sum_final(&state->sumX, &num);
+#endif
 		temp = DirectFunctionCall1(numeric_send,
 								   NumericGetDatum(make_result(&num)));
 		sumX = DatumGetByteaP(temp);
 
+#ifdef HAVE_INT128
 		int128_to_numericvar(state->sumX2, &num);
+#else
+		accum_sum_final(&state->sumX2, &num);
+#endif
 		temp = DirectFunctionCall1(numeric_send,
 								   NumericGetDatum(make_result(&num)));
 		sumX2 = DatumGetByteaP(temp);
-		free_var(&num);
-#else
-		temp = DirectFunctionCall1(numeric_send,
-								 NumericGetDatum(make_result(&state->sumX)));
-		sumX = DatumGetByteaP(temp);
 
-		temp = DirectFunctionCall1(numeric_send,
-								NumericGetDatum(make_result(&state->sumX2)));
-		sumX2 = DatumGetByteaP(temp);
-#endif
+		free_var(&num);
 	}
 
 	pq_begintypsend(&buf);
@@ -4091,7 +4152,9 @@ numeric_poly_deserialize(PG_FUNCTION_ARGS)
 	bytea	   *sstate;
 	PolyNumAggState *result;
 	Datum		sumX;
+	NumericVar	sumX_var;
 	Datum		sumX2;
+	NumericVar	sumX2_var;
 	StringInfoData buf;
 
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -4123,22 +4186,18 @@ numeric_poly_deserialize(PG_FUNCTION_ARGS)
 								InvalidOid,
 								-1);
 
+	init_var_from_num(DatumGetNumeric(sumX), &sumX_var);
 #ifdef HAVE_INT128
-	{
-		NumericVar	num;
-
-		init_var(&num);
-		set_var_from_num(DatumGetNumeric(sumX), &num);
-		numericvar_to_int128(&num, &result->sumX);
-
-		set_var_from_num(DatumGetNumeric(sumX2), &num);
-		numericvar_to_int128(&num, &result->sumX2);
+	numericvar_to_int128(&sumX_var, &result->sumX);
+#else
+	accum_sum_add(&result->sumX, &sumX_var);
+#endif
 
-		free_var(&num);
-	}
+	set_var_from_num(DatumGetNumeric(sumX2), &sumX2_var);
+#ifdef HAVE_INT128
+	numericvar_to_int128(&sumX2_var, &result->sumX2);
 #else
-	set_var_from_num(DatumGetNumeric(sumX), &result->sumX);
-	set_var_from_num(DatumGetNumeric(sumX2), &result->sumX2);
+	accum_sum_add(&result->sumX2, &sumX_var);
 #endif
 
 	pq_getmsgend(&buf);
@@ -4209,8 +4268,7 @@ int8_avg_combine(PG_FUNCTION_ARGS)
 #ifdef HAVE_INT128
 		state1->sumX = state2->sumX;
 #else
-		init_var(&state1->sumX);
-		set_var_from_var(&state2->sumX, &state1->sumX);
+		accum_sum_copy(&state1->sumX, &state2->sumX);
 #endif
 		MemoryContextSwitchTo(old_context);
 
@@ -4228,7 +4286,7 @@ int8_avg_combine(PG_FUNCTION_ARGS)
 		old_context = MemoryContextSwitchTo(agg_context);
 
 		/* Accumulate sums */
-		add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+		accum_sum_combine(&state1->sumX, &state2->sumX);
 
 		MemoryContextSwitchTo(old_context);
 #endif
@@ -4266,20 +4324,20 @@ int8_avg_serialize(PG_FUNCTION_ARGS)
 	 */
 	{
 		Datum		temp;
-#ifdef HAVE_INT128
 		NumericVar	num;
 
 		init_var(&num);
+
+#ifdef HAVE_INT128
 		int128_to_numericvar(state->sumX, &num);
-		temp = DirectFunctionCall1(numeric_send,
-								   NumericGetDatum(make_result(&num)));
-		free_var(&num);
-		sumX = DatumGetByteaP(temp);
 #else
+		accum_sum_final(&state->sumX, &num);
+#endif
 		temp = DirectFunctionCall1(numeric_send,
-								 NumericGetDatum(make_result(&state->sumX)));
+								   NumericGetDatum(make_result(&num)));
 		sumX = DatumGetByteaP(temp);
-#endif
+
+		free_var(&num);
 	}
 
 	pq_begintypsend(&buf);
@@ -4306,6 +4364,7 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
 	PolyNumAggState *result;
 	StringInfoData buf;
 	Datum		temp;
+	NumericVar	num;
 
 	if (!AggCheckCallContext(fcinfo, NULL))
 		elog(ERROR, "aggregate function called in non-aggregate context");
@@ -4329,18 +4388,11 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
 							   PointerGetDatum(&buf),
 							   InvalidOid,
 							   -1);
-
+	init_var_from_num(DatumGetNumeric(temp), &num);
 #ifdef HAVE_INT128
-	{
-		NumericVar	num;
-
-		init_var(&num);
-		set_var_from_num(DatumGetNumeric(temp), &num);
-		numericvar_to_int128(&num, &result->sumX);
-		free_var(&num);
-	}
+	numericvar_to_int128(&num, &result->sumX);
 #else
-	set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+	accum_sum_add(&result->sumX, &num);
 #endif
 
 	pq_getmsgend(&buf);
@@ -4534,6 +4586,7 @@ numeric_avg(PG_FUNCTION_ARGS)
 	NumericAggState *state;
 	Datum		N_datum;
 	Datum		sumX_datum;
+	NumericVar	sumX_var;
 
 	state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
 
@@ -4545,7 +4598,11 @@ numeric_avg(PG_FUNCTION_ARGS)
 		PG_RETURN_NUMERIC(make_result(&const_nan));
 
 	N_datum = DirectFunctionCall1(int8_numeric, Int64GetDatum(state->N));
-	sumX_datum = NumericGetDatum(make_result(&state->sumX));
+
+	init_var(&sumX_var);
+	accum_sum_final(&state->sumX, &sumX_var);
+	sumX_datum = NumericGetDatum(make_result(&sumX_var));
+	free_var(&sumX_var);
 
 	PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumX_datum, N_datum));
 }
@@ -4554,6 +4611,8 @@ Datum
 numeric_sum(PG_FUNCTION_ARGS)
 {
 	NumericAggState *state;
+	NumericVar	sumX_var;
+	Numeric		result;
 
 	state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
 
@@ -4564,7 +4623,12 @@ numeric_sum(PG_FUNCTION_ARGS)
 	if (state->NaNcount > 0)	/* there was at least one NaN input */
 		PG_RETURN_NUMERIC(make_result(&const_nan));
 
-	PG_RETURN_NUMERIC(make_result(&(state->sumX)));
+	init_var(&sumX_var);
+	accum_sum_final(&state->sumX, &sumX_var);
+	result = make_result(&sumX_var);
+	free_var(&sumX_var);
+
+	PG_RETURN_NUMERIC(result);
 }
 
 /*
@@ -4608,8 +4672,8 @@ numeric_stddev_internal(NumericAggState *state,
 	init_var(&vsumX2);
 
 	int64_to_numericvar(state->N, &vN);
-	set_var_from_var(&(state->sumX), &vsumX);
-	set_var_from_var(&(state->sumX2), &vsumX2);
+	accum_sum_final(&(state->sumX), &vsumX);
+	accum_sum_final(&(state->sumX2), &vsumX2);
 
 	/*
 	 * Sample stddev and variance are undefined when N <= 1; population stddev
@@ -4739,26 +4803,38 @@ numeric_poly_stddev_internal(Int128AggState *state,
 	NumericAggState numstate;
 	Numeric		res;
 
-	init_var(&numstate.sumX);
-	init_var(&numstate.sumX2);
-	numstate.NaNcount = 0;
-	numstate.agg_context = NULL;
+	/* Initialize an empty agg state */
+	memset(&numstate, 0, sizeof(NumericAggState));
 
 	if (state)
 	{
+		NumericVar	tmp_var;
+
 		numstate.N = state->N;
-		int128_to_numericvar(state->sumX, &numstate.sumX);
-		int128_to_numericvar(state->sumX2, &numstate.sumX2);
-	}
-	else
-	{
-		numstate.N = 0;
+
+		init_var(&tmp_var);
+
+		int128_to_numericvar(state->sumX, &tmp_var);
+		accum_sum_add(&numstate.sumX, &tmp_var);
+
+		int128_to_numericvar(state->sumX2, &tmp_var);
+		accum_sum_add(&numstate.sumX2, &tmp_var);
+
+		free_var(&tmp_var);
 	}
 
 	res = numeric_stddev_internal(&numstate, variance, sample, is_null);
 
-	free_var(&numstate.sumX);
-	free_var(&numstate.sumX2);
+	if (numstate.sumX.ndigits > 0)
+	{
+		pfree(numstate.sumX.pos_digits);
+		pfree(numstate.sumX.neg_digits);
+	}
+	if (numstate.sumX2.ndigits > 0)
+	{
+		pfree(numstate.sumX2.pos_digits);
+		pfree(numstate.sumX2.neg_digits);
+	}
 
 	return res;
 }
@@ -8702,3 +8778,322 @@ strip_var(NumericVar *var)
 	var->digits = digits;
 	var->ndigits = ndigits;
 }
+
+
+/* ----------------------------------------------------------------------
+ *
+ * Fast sum accumulator functions
+ *
+ * ----------------------------------------------------------------------
+ */
+
+/*
+ * Reset the accumulator's value to zero.  The buffers to hold the digits
+ * are not free'd.
+ */
+static void
+accum_sum_reset(NumericSumAccum *accum)
+{
+	int			i;
+
+	accum->dscale = 0;
+	for (i = 0; i < accum->ndigits; i++)
+	{
+		accum->pos_digits[i] = 0;
+		accum->neg_digits[i] = 0;
+	}
+}
+
+/*
+ * Accumulate a new value.
+ */
+static void
+accum_sum_add(NumericSumAccum *accum, NumericVar *val)
+{
+	int32	   *accum_digits;
+	int			i,
+				val_i;
+	int			val_ndigits;
+	NumericDigit *val_digits;
+
+	/*
+	 * If we have accumulated too many values since the last carry
+	 * propagation, do it now, to avoid overflowing.  (We could allow more
+	 * than NBASE - 1, if we reserved two extra digits, rather than one, for
+	 * carry propagation.  But even with NBASE - 1, this needs to be done so
+	 * seldom, that the performance difference is negligible.)
+	 */
+	if (accum->num_uncarried == NBASE - 1)
+		accum_sum_carry(accum);
+
+	/*
+	 * Adjust the weight or scale of the old value, so that it can accommodate
+	 * the new value.
+	 */
+	accum_sum_rescale(accum, val);
+
+	/* */
+	if (val->sign == NUMERIC_POS)
+		accum_digits = accum->pos_digits;
+	else
+		accum_digits = accum->neg_digits;
+
+	/* copy these values into local vars for speed in loop */
+	val_ndigits = val->ndigits;
+	val_digits = val->digits;
+
+	i = accum->weight - val->weight;
+	for (val_i = 0; val_i < val_ndigits; val_i++)
+	{
+		accum_digits[i] += (int32) val_digits[val_i];
+		i++;
+	}
+
+	accum->num_uncarried++;
+}
+
+/*
+ * Propagate carries.
+ */
+static void
+accum_sum_carry(NumericSumAccum *accum)
+{
+	int			i;
+	int			ndigits;
+	int32	   *dig;
+	int32		carry;
+	int32		newdig = 0;
+
+	/*
+	 * If no new values have been added since last carry propagation, nothing
+	 * to do.
+	 */
+	if (accum->num_uncarried == 0)
+		return;
+
+	/*
+	 * We maintain that the weight of the accumulator is always one larger
+	 * than needed to hold the current value, before carrying, to make sure
+	 * there is enough space for the possible extra digit when carry is
+	 * propagated.  We cannot expand the buffer here, unless we require
+	 * callers of accum_sum_final() to switch to the right memory context.
+	 */
+	Assert(accum->pos_digits[0] == 0 && accum->neg_digits[0] == 0);
+
+	ndigits = accum->ndigits;
+
+	/* Propagate carry in the positive sum */
+	dig = accum->pos_digits;
+	carry = 0;
+	for (i = ndigits - 1; i >= 0; i--)
+	{
+		newdig = dig[i] + carry;
+		if (newdig >= NBASE)
+		{
+			carry = newdig / NBASE;
+			newdig -= carry * NBASE;
+		}
+		else
+			carry = 0;
+		dig[i] = newdig;
+	}
+	/* Did we use up the digit reserved for carry propagation? */
+	if (newdig > 0)
+		accum->have_carry_space = false;
+
+	/* And the same for the negative sum */
+	dig = accum->neg_digits;
+	carry = 0;
+	for (i = ndigits - 1; i >= 0; i--)
+	{
+		newdig = dig[i] + carry;
+		if (newdig >= NBASE)
+		{
+			carry = newdig / NBASE;
+			newdig -= carry * NBASE;
+		}
+		else
+			carry = 0;
+		dig[i] = newdig;
+	}
+	if (newdig > 0)
+		accum->have_carry_space = false;
+
+	accum->num_uncarried = 0;
+}
+
+/*
+ * Re-scale accumulator to accommodate new value.
+ *
+ * If the new value has more digits than the current digit buffers in the
+ * accumulator, enlarge the buffers.
+ */
+static void
+accum_sum_rescale(NumericSumAccum *accum, NumericVar *val)
+{
+	int			old_weight = accum->weight;
+	int			old_ndigits = accum->ndigits;
+	int			accum_ndigits;
+	int			accum_weight;
+	int			accum_rscale;
+	int			val_rscale;
+
+	accum_weight = old_weight;
+	accum_ndigits = old_ndigits;
+
+	/*
+	 * Does the new value have a larger weight? If so, enlarge the buffers,
+	 * and shift the existing value to the new weight, by adding leading
+	 * zeros.
+	 *
+	 * We enforce that the accumulator always has a weight one larger than
+	 * needed for the inputs, so that we have space for an extra digit at the
+	 * final carry-propagation phase, if necessary.
+	 */
+	if (val->weight >= accum_weight)
+	{
+		accum_weight = val->weight + 1;
+		accum_ndigits = accum_ndigits + (accum_weight - old_weight);
+	}
+
+	/*
+	 * Even though the new value is small, we might've used up the space
+	 * reserved for the carry digit in the last call to accum_sum_carry().  If
+	 * so, enlarge to make room for another one.
+	 */
+	else if (!accum->have_carry_space)
+	{
+		accum_weight++;
+		accum_ndigits++;
+	}
+
+	/* Is the new value wider on the right side? */
+	accum_rscale = accum_ndigits - accum_weight - 1;
+	val_rscale = val->ndigits - val->weight - 1;
+	if (val_rscale > accum_rscale)
+		accum_ndigits = accum_ndigits + (val_rscale - accum_rscale);
+
+	if (accum_ndigits != old_ndigits ||
+		accum_weight != old_weight)
+	{
+		int32	   *new_pos_digits;
+		int32	   *new_neg_digits;
+		int			weightdiff;
+
+		weightdiff = accum_weight - old_weight;
+
+		new_pos_digits = palloc0(accum_ndigits * sizeof(int32));
+		new_neg_digits = palloc0(accum_ndigits * sizeof(int32));
+
+		if (accum->pos_digits)
+		{
+			memcpy(&new_pos_digits[weightdiff], accum->pos_digits,
+				   old_ndigits * sizeof(int32));
+			pfree(accum->pos_digits);
+
+			memcpy(&new_neg_digits[weightdiff], accum->neg_digits,
+				   old_ndigits * sizeof(int32));
+			pfree(accum->neg_digits);
+		}
+
+		accum->pos_digits = new_pos_digits;
+		accum->neg_digits = new_neg_digits;
+
+		accum->weight = accum_weight;
+		accum->ndigits = accum_ndigits;
+
+		Assert(accum->pos_digits[0] == 0 && accum->neg_digits[0] == 0);
+		accum->have_carry_space = true;
+	}
+
+	if (val->dscale > accum->dscale)
+		accum->dscale = val->dscale;
+}
+
+/*
+ * Return the current value of the accumulator.  This perform final carry
+ * propagation, and adds together the positive and negative sums.
+ *
+ * Unlike all the other routines, the caller is not required to switch to
+ * the memory context that holds the accumulator.
+ */
+static void
+accum_sum_final(NumericSumAccum *accum, NumericVar *result)
+{
+	int			i;
+	NumericVar	pos_var;
+	NumericVar	neg_var;
+
+	if (accum->ndigits == 0)
+	{
+		set_var_from_var(&const_zero, result);
+		return;
+	}
+
+	/* Perform final carry */
+	accum_sum_carry(accum);
+
+	/* Create NumericVars representing the positive and negative sums */
+	init_var(&pos_var);
+	init_var(&neg_var);
+
+	pos_var.ndigits = neg_var.ndigits = accum->ndigits;
+	pos_var.weight = neg_var.weight = accum->weight;
+	pos_var.dscale = neg_var.dscale = accum->dscale;
+	pos_var.sign = NUMERIC_POS;
+	neg_var.sign = NUMERIC_NEG;
+
+	pos_var.buf = pos_var.digits = digitbuf_alloc(accum->ndigits);
+	neg_var.buf = neg_var.digits = digitbuf_alloc(accum->ndigits);
+
+	for (i = 0; i < accum->ndigits; i++)
+	{
+		Assert(accum->pos_digits[i] < NBASE);
+		pos_var.digits[i] = (int16) accum->pos_digits[i];
+
+		Assert(accum->neg_digits[i] < NBASE);
+		neg_var.digits[i] = (int16) accum->neg_digits[i];
+	}
+
+	/* And add them together */
+	add_var(&pos_var, &neg_var, result);
+
+	/* Remove leading/trailing zeroes */
+	strip_var(result);
+}
+
+/*
+ * Copy an accumulator's state.
+ *
+ * 'dst' is assumed to be uninitialized beforehand.  No attempt is made at
+ * freeing old values.
+ */
+static void
+accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src)
+{
+	dst->pos_digits = palloc(src->ndigits * sizeof(int32));
+	dst->neg_digits = palloc(src->ndigits * sizeof(int32));
+
+	memcpy(dst->pos_digits, src->pos_digits, src->ndigits * sizeof(int32));
+	memcpy(dst->neg_digits, src->neg_digits, src->ndigits * sizeof(int32));
+	dst->num_uncarried = src->num_uncarried;
+	dst->ndigits = src->ndigits;
+	dst->weight = src->weight;
+	dst->dscale = src->dscale;
+}
+
+/*
+ * Add the current value of 'accum2' into 'accum'.
+ */
+static void
+accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2)
+{
+	NumericVar	tmp_var;
+
+	init_var(&tmp_var);
+
+	accum_sum_final(accum2, &tmp_var);
+	accum_sum_add(accum, &tmp_var);
+
+	free_var(&tmp_var);
+}
diff --git a/src/test/regress/expected/numeric.out b/src/test/regress/expected/numeric.out
index f1f50560ee1..ae0beb9b68f 100644
--- a/src/test/regress/expected/numeric.out
+++ b/src/test/regress/expected/numeric.out
@@ -1909,3 +1909,19 @@ select scale(-13.000000000000000);
     15
 (1 row)
 
+--
+-- Tests for SUM()
+--
+-- cases that need carry propagation
+SELECT SUM(9999::numeric) FROM generate_series(1, 100000);
+    sum    
+-----------
+ 999900000
+(1 row)
+
+SELECT SUM((-9999)::numeric) FROM generate_series(1, 100000);
+    sum     
+------------
+ -999900000
+(1 row)
+
diff --git a/src/test/regress/sql/numeric.sql b/src/test/regress/sql/numeric.sql
index fc472187d87..b51225c47f3 100644
--- a/src/test/regress/sql/numeric.sql
+++ b/src/test/regress/sql/numeric.sql
@@ -997,3 +997,11 @@ select scale(1.12345);
 select scale(110123.12475871856128);
 select scale(-1123.12471856128);
 select scale(-13.000000000000000);
+
+--
+-- Tests for SUM()
+--
+
+-- cases that need carry propagation
+SELECT SUM(9999::numeric) FROM generate_series(1, 100000);
+SELECT SUM((-9999)::numeric) FROM generate_series(1, 100000);
-- 
GitLab