From 94d626ff5a82422825976c096e250b07657cd6f7 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Thu, 13 Aug 2015 13:02:10 -0300
Subject: [PATCH] Use materialize SRF mode in brin_page_items

This function was using the single-value-per-call mechanism, but the
code relied on a relcache entry that wasn't kept open across calls.
This manifested as weird errors in buildfarm during the short time that
the "brin-1" isolation test lived.

Backpatch to 9.5, where it was introduced.
---
 contrib/pageinspect/brinfuncs.c | 223 +++++++++++++++-----------------
 1 file changed, 101 insertions(+), 122 deletions(-)

diff --git a/contrib/pageinspect/brinfuncs.c b/contrib/pageinspect/brinfuncs.c
index 7adcfa89370..a3d4cc5ef35 100644
--- a/contrib/pageinspect/brinfuncs.c
+++ b/contrib/pageinspect/brinfuncs.c
@@ -37,18 +37,6 @@ typedef struct brin_column_state
 	FmgrInfo	outputFn[FLEXIBLE_ARRAY_MEMBER];
 } brin_column_state;
 
-typedef struct brin_page_state
-{
-	BrinDesc   *bdesc;
-	Page		page;
-	OffsetNumber offset;
-	bool		unusedItem;
-	bool		done;
-	AttrNumber	attno;
-	BrinMemTuple *dtup;
-	brin_column_state *columns[FLEXIBLE_ARRAY_MEMBER];
-} brin_page_state;
-
 
 static Page verify_brin_page(bytea *raw_page, uint16 type,
 				 const char *strtype);
@@ -119,89 +107,89 @@ verify_brin_page(bytea *raw_page, uint16 type, const char *strtype)
 Datum
 brin_page_items(PG_FUNCTION_ARGS)
 {
-	brin_page_state *state;
-	FuncCallContext *fctx;
+	bytea	   *raw_page = PG_GETARG_BYTEA_P(0);
+	Oid			indexRelid = PG_GETARG_OID(1);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	MemoryContext oldcontext;
+	Tuplestorestate *tupstore;
+	Relation	indexRel;
+	brin_column_state **columns;
+	BrinDesc   *bdesc;
+	BrinMemTuple *dtup;
+	Page		page;
+	OffsetNumber offset;
+	AttrNumber	attno;
+	bool		unusedItem;
 
 	if (!superuser())
 		ereport(ERROR,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 				 (errmsg("must be superuser to use raw page functions"))));
 
-	if (SRF_IS_FIRSTCALL())
-	{
-		bytea	   *raw_page = PG_GETARG_BYTEA_P(0);
-		Oid			indexRelid = PG_GETARG_OID(1);
-		Page		page;
-		TupleDesc	tupdesc;
-		MemoryContext mctx;
-		Relation	indexRel;
-		AttrNumber	attno;
-
-		/* minimally verify the page we got */
-		page = verify_brin_page(raw_page, BRIN_PAGETYPE_REGULAR, "regular");
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize) ||
+		rsinfo->expectedDesc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
 
-		/* create a function context for cross-call persistence */
-		fctx = SRF_FIRSTCALL_INIT();
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
 
-		/* switch to memory context appropriate for multiple function calls */
-		mctx = MemoryContextSwitchTo(fctx->multi_call_memory_ctx);
+	/* Build tuplestore to hold the result rows */
+	oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
 
-		/* Build a tuple descriptor for our result type */
-		if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-			elog(ERROR, "return type must be a row type");
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
 
-		indexRel = index_open(indexRelid, AccessShareLock);
+	MemoryContextSwitchTo(oldcontext);
 
-		state = palloc(offsetof(brin_page_state, columns) +
-			  sizeof(brin_column_state) * RelationGetDescr(indexRel)->natts);
+	indexRel = index_open(indexRelid, AccessShareLock);
+	bdesc = brin_build_desc(indexRel);
 
-		state->bdesc = brin_build_desc(indexRel);
-		state->page = page;
-		state->offset = FirstOffsetNumber;
-		state->unusedItem = false;
-		state->done = false;
-		state->dtup = NULL;
+	/* minimally verify the page we got */
+	page = verify_brin_page(raw_page, BRIN_PAGETYPE_REGULAR, "regular");
 
-		/*
-		 * Initialize output functions for all indexed datatypes; simplifies
-		 * calling them later.
-		 */
-		for (attno = 1; attno <= state->bdesc->bd_tupdesc->natts; attno++)
+	/*
+	 * Initialize output functions for all indexed datatypes; simplifies
+	 * calling them later.
+	 */
+	columns = palloc(sizeof(brin_column_state *) * RelationGetDescr(indexRel)->natts);
+	for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
+	{
+		Oid			output;
+		bool		isVarlena;
+		BrinOpcInfo *opcinfo;
+		int			i;
+		brin_column_state *column;
+
+		opcinfo = bdesc->bd_info[attno - 1];
+		column = palloc(offsetof(brin_column_state, outputFn) +
+						sizeof(FmgrInfo) * opcinfo->oi_nstored);
+
+		column->nstored = opcinfo->oi_nstored;
+		for (i = 0; i < opcinfo->oi_nstored; i++)
 		{
-			Oid			output;
-			bool		isVarlena;
-			BrinOpcInfo *opcinfo;
-			int			i;
-			brin_column_state *column;
-
-			opcinfo = state->bdesc->bd_info[attno - 1];
-			column = palloc(offsetof(brin_column_state, outputFn) +
-							sizeof(FmgrInfo) * opcinfo->oi_nstored);
-
-			column->nstored = opcinfo->oi_nstored;
-			for (i = 0; i < opcinfo->oi_nstored; i++)
-			{
-				getTypeOutputInfo(opcinfo->oi_typcache[i]->type_id, &output, &isVarlena);
-				fmgr_info(output, &column->outputFn[i]);
-			}
-
-			state->columns[attno - 1] = column;
+			getTypeOutputInfo(opcinfo->oi_typcache[i]->type_id, &output, &isVarlena);
+			fmgr_info(output, &column->outputFn[i]);
 		}
 
-		index_close(indexRel, AccessShareLock);
-
-		fctx->user_fctx = state;
-		fctx->tuple_desc = BlessTupleDesc(tupdesc);
-
-		MemoryContextSwitchTo(mctx);
+		columns[attno - 1] = column;
 	}
 
-	fctx = SRF_PERCALL_SETUP();
-	state = fctx->user_fctx;
-
-	if (!state->done)
+	offset = FirstOffsetNumber;
+	unusedItem = false;
+	dtup = NULL;
+	for (;;)
 	{
-		HeapTuple	result;
 		Datum		values[7];
 		bool		nulls[7];
 
@@ -211,39 +199,30 @@ brin_page_items(PG_FUNCTION_ARGS)
 		 * signal for obtaining and decoding the next one.  If that's not the
 		 * case, we output the next attribute.
 		 */
-		if (state->dtup == NULL)
+		if (dtup == NULL)
 		{
-			BrinTuple  *tup;
-			MemoryContext mctx;
 			ItemId		itemId;
 
-			/* deformed tuple must live across calls */
-			mctx = MemoryContextSwitchTo(fctx->multi_call_memory_ctx);
-
 			/* verify item status: if there's no data, we can't decode */
-			itemId = PageGetItemId(state->page, state->offset);
+			itemId = PageGetItemId(page, offset);
 			if (ItemIdIsUsed(itemId))
 			{
-				tup = (BrinTuple *) PageGetItem(state->page,
-												PageGetItemId(state->page,
-															  state->offset));
-				state->dtup = brin_deform_tuple(state->bdesc, tup);
-				state->attno = 1;
-				state->unusedItem = false;
+				dtup = brin_deform_tuple(bdesc,
+									(BrinTuple *) PageGetItem(page, itemId));
+				attno = 1;
+				unusedItem = false;
 			}
 			else
-				state->unusedItem = true;
-
-			MemoryContextSwitchTo(mctx);
+				unusedItem = true;
 		}
 		else
-			state->attno++;
+			attno++;
 
 		MemSet(nulls, 0, sizeof(nulls));
 
-		if (state->unusedItem)
+		if (unusedItem)
 		{
-			values[0] = UInt16GetDatum(state->offset);
+			values[0] = UInt16GetDatum(offset);
 			nulls[1] = true;
 			nulls[2] = true;
 			nulls[3] = true;
@@ -253,17 +232,17 @@ brin_page_items(PG_FUNCTION_ARGS)
 		}
 		else
 		{
-			int			att = state->attno - 1;
-
-			values[0] = UInt16GetDatum(state->offset);
-			values[1] = UInt32GetDatum(state->dtup->bt_blkno);
-			values[2] = UInt16GetDatum(state->attno);
-			values[3] = BoolGetDatum(state->dtup->bt_columns[att].bv_allnulls);
-			values[4] = BoolGetDatum(state->dtup->bt_columns[att].bv_hasnulls);
-			values[5] = BoolGetDatum(state->dtup->bt_placeholder);
-			if (!state->dtup->bt_columns[att].bv_allnulls)
+			int			att = attno - 1;
+
+			values[0] = UInt16GetDatum(offset);
+			values[1] = UInt32GetDatum(dtup->bt_blkno);
+			values[2] = UInt16GetDatum(attno);
+			values[3] = BoolGetDatum(dtup->bt_columns[att].bv_allnulls);
+			values[4] = BoolGetDatum(dtup->bt_columns[att].bv_hasnulls);
+			values[5] = BoolGetDatum(dtup->bt_placeholder);
+			if (!dtup->bt_columns[att].bv_allnulls)
 			{
-				BrinValues *bvalues = &state->dtup->bt_columns[att];
+				BrinValues *bvalues = &dtup->bt_columns[att];
 				StringInfoData s;
 				bool		first;
 				int			i;
@@ -272,14 +251,14 @@ brin_page_items(PG_FUNCTION_ARGS)
 				appendStringInfoChar(&s, '{');
 
 				first = true;
-				for (i = 0; i < state->columns[att]->nstored; i++)
+				for (i = 0; i < columns[att]->nstored; i++)
 				{
 					char	   *val;
 
 					if (!first)
 						appendStringInfoString(&s, " .. ");
 					first = false;
-					val = OutputFunctionCall(&state->columns[att]->outputFn[i],
+					val = OutputFunctionCall(&columns[att]->outputFn[i],
 											 bvalues->bv_values[i]);
 					appendStringInfoString(&s, val);
 					pfree(val);
@@ -295,35 +274,35 @@ brin_page_items(PG_FUNCTION_ARGS)
 			}
 		}
 
-		result = heap_form_tuple(fctx->tuple_desc, values, nulls);
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 
 		/*
 		 * If the item was unused, jump straight to the next one; otherwise,
 		 * the only cleanup needed here is to set our signal to go to the next
 		 * tuple in the following iteration, by freeing the current one.
 		 */
-		if (state->unusedItem)
-			state->offset = OffsetNumberNext(state->offset);
-		else if (state->attno >= state->bdesc->bd_tupdesc->natts)
+		if (unusedItem)
+			offset = OffsetNumberNext(offset);
+		else if (attno >= bdesc->bd_tupdesc->natts)
 		{
-			pfree(state->dtup);
-			state->dtup = NULL;
-			state->offset = OffsetNumberNext(state->offset);
+			pfree(dtup);
+			dtup = NULL;
+			offset = OffsetNumberNext(offset);
 		}
 
 		/*
-		 * If we're beyond the end of the page, set flag to end the function
-		 * in the following iteration.
+		 * If we're beyond the end of the page, we're done.
 		 */
-		if (state->offset > PageGetMaxOffsetNumber(state->page))
-			state->done = true;
-
-		SRF_RETURN_NEXT(fctx, HeapTupleGetDatum(result));
+		if (offset > PageGetMaxOffsetNumber(page))
+			break;
 	}
 
-	brin_free_desc(state->bdesc);
+	/* clean up and return the tuplestore */
+	brin_free_desc(bdesc);
+	tuplestore_donestoring(tupstore);
+	index_close(indexRel, AccessShareLock);
 
-	SRF_RETURN_DONE(fctx);
+	return (Datum) 0;
 }
 
 Datum
-- 
GitLab