From ba1e066e4637d64886b6ba12706b18ca35a6e258 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Fri, 9 May 2003 23:01:45 +0000
Subject: [PATCH] Implement array_send/array_recv (binary I/O for arrays). 
 This exposed the folly of not passing element type to typsend/typreceive, so
 fix that.

---
 doc/src/sgml/ref/create_type.sgml         |  19 +-
 src/backend/commands/typecmds.c           |  20 +-
 src/backend/utils/adt/arrayfuncs.c        | 294 ++++++++++++++++++++--
 src/include/catalog/catversion.h          |   4 +-
 src/include/catalog/pg_proc.h             |   6 +-
 src/test/regress/expected/type_sanity.out |   4 +-
 src/test/regress/sql/type_sanity.sql      |   4 +-
 7 files changed, 316 insertions(+), 35 deletions(-)

diff --git a/doc/src/sgml/ref/create_type.sgml b/doc/src/sgml/ref/create_type.sgml
index 5fe4d2be4fc..2d2b92a85aa 100644
--- a/doc/src/sgml/ref/create_type.sgml
+++ b/doc/src/sgml/ref/create_type.sgml
@@ -1,5 +1,5 @@
 <!--
-$Header: /cvsroot/pgsql/doc/src/sgml/ref/create_type.sgml,v 1.42 2003/05/08 22:19:56 tgl Exp $
+$Header: /cvsroot/pgsql/doc/src/sgml/ref/create_type.sgml,v 1.43 2003/05/09 23:01:44 tgl Exp $
 PostgreSQL documentation
 -->
 
@@ -117,15 +117,20 @@ CREATE TYPE <replaceable class="parameter">typename</replaceable> (
    representation is in the machine's native byte order.)  The receive
    function should perform adequate checking to ensure that the value is
    valid.
-   The receive function should be declared as taking one argument of type
-   <type>internal</type> and returning a value of the data type itself.
-   (The argument actually supplied is a pointer to a StringInfo buffer
-   holding the received byte string.)  Similarly, the optional
+   The receive function may be declared as taking one argument of type
+   <type>internal</type>, or two arguments of types <type>internal</type>
+   and <type>oid</type>.  It must return a value of the data type itself.
+   (The first argument is a pointer to a StringInfo buffer
+   holding the received byte string; the optional second argument is the
+   element type in case this is an array type.)  Similarly, the optional
    <replaceable class="parameter">send_function</replaceable> converts
    from the internal representation to the external binary representation.
    If this function is not supplied, the type cannot participate in binary
-   output.  The send function should be declared as taking one argument of the
-   new data type and returning type <type>bytea</type>.
+   output.  The send function may be
+   declared as taking one argument of the new data type,  or as taking
+   two arguments of which the second is type <type>oid</type>.
+   The second argument is again the array element type for array types.
+   The send function must return type <type>bytea</type>.
   </para>
 
   <para>
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index f7bf3d3ee87..2036b9e714a 100644
--- a/src/backend/commands/typecmds.c
+++ b/src/backend/commands/typecmds.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/commands/typecmds.c,v 1.35 2003/05/08 22:19:56 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/commands/typecmds.c,v 1.36 2003/05/09 23:01:45 tgl Exp $
  *
  * DESCRIPTION
  *	  The "DefineFoo" routines take the parse tree and pick out the
@@ -911,7 +911,8 @@ findTypeReceiveFunction(List *procname, Oid typeOid)
 	Oid			procOid;
 
 	/*
-	 * Receive functions take a single argument of type INTERNAL.
+	 * Receive functions can take a single argument of type INTERNAL, or
+	 * two arguments (internal, oid).
 	 */
 	MemSet(argList, 0, FUNC_MAX_ARGS * sizeof(Oid));
 
@@ -921,6 +922,12 @@ findTypeReceiveFunction(List *procname, Oid typeOid)
 	if (OidIsValid(procOid))
 		return procOid;
 
+	argList[1] = OIDOID;
+
+	procOid = LookupFuncName(procname, 2, argList);
+	if (OidIsValid(procOid))
+		return procOid;
+
 	func_error("TypeCreate", procname, 1, argList, NULL);
 
 	return InvalidOid;			/* keep compiler quiet */
@@ -933,7 +940,8 @@ findTypeSendFunction(List *procname, Oid typeOid)
 	Oid			procOid;
 
 	/*
-	 * Send functions take a single argument of the type.
+	 * Send functions can take a single argument of the type, or two
+	 * arguments (data value, element OID).
 	 */
 	MemSet(argList, 0, FUNC_MAX_ARGS * sizeof(Oid));
 
@@ -943,6 +951,12 @@ findTypeSendFunction(List *procname, Oid typeOid)
 	if (OidIsValid(procOid))
 		return procOid;
 
+	argList[1] = OIDOID;
+
+	procOid = LookupFuncName(procname, 2, argList);
+	if (OidIsValid(procOid))
+		return procOid;
+
 	func_error("TypeCreate", procname, 1, argList, NULL);
 
 	return InvalidOid;			/* keep compiler quiet */
diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c
index f713fda7d57..b53c896e431 100644
--- a/src/backend/utils/adt/arrayfuncs.c
+++ b/src/backend/utils/adt/arrayfuncs.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.88 2003/05/08 22:19:56 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.89 2003/05/09 23:01:45 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -19,6 +19,7 @@
 #include "access/tupmacs.h"
 #include "catalog/catalog.h"
 #include "catalog/pg_type.h"
+#include "libpq/pqformat.h"
 #include "parser/parse_coerce.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -69,6 +70,15 @@
 
 #define RETURN_NULL(type)  do { *isNull = true; return (type) 0; } while (0)
 
+/* I/O function selector for system_cache_lookup */
+typedef enum IOFuncSelector
+{
+	IOFunc_input,
+	IOFunc_output,
+	IOFunc_receive,
+	IOFunc_send
+} IOFuncSelector;
+
 
 static int	ArrayCount(char *str, int *dim, char typdelim);
 static Datum *ReadArrayStr(char *arrayStr, int nitems, int ndim, int *dim,
@@ -76,12 +86,17 @@ static Datum *ReadArrayStr(char *arrayStr, int nitems, int ndim, int *dim,
 			 char typdelim,
 			 int typlen, bool typbyval, char typalign,
 			 int *nbytes);
+static Datum *ReadArrayBinary(StringInfo buf, int nitems,
+							  FmgrInfo *receiveproc, Oid typelem,
+							  int typlen, bool typbyval, char typalign,
+							  int *nbytes);
 static void CopyArrayEls(char *p, Datum *values, int nitems,
 			 int typlen, bool typbyval, char typalign,
 			 bool freedata);
-static void system_cache_lookup(Oid element_type, bool input, int *typlen,
-					bool *typbyval, char *typdelim, Oid *typelem,
-					Oid *proc, char *typalign);
+static void system_cache_lookup(Oid element_type, IOFuncSelector which_func,
+								int *typlen, bool *typbyval,
+								char *typdelim, Oid *typelem,
+								Oid *proc, char *typalign);
 static Datum ArrayCast(char *value, bool byval, int len);
 static int ArrayCastAndSet(Datum src,
 				int typlen, bool typbyval, char typalign,
@@ -141,7 +156,8 @@ array_in(PG_FUNCTION_ARGS)
 	char		typalign;
 
 	/* Get info about element type, including its input conversion proc */
-	system_cache_lookup(element_type, true, &typlen, &typbyval, &typdelim,
+	system_cache_lookup(element_type, IOFunc_input,
+						&typlen, &typbyval, &typdelim,
 						&typelem, &typinput, &typalign);
 	fmgr_info(typinput, &inputproc);
 
@@ -622,8 +638,9 @@ array_out(PG_FUNCTION_ARGS)
 			   *dim;
 
 	element_type = ARR_ELEMTYPE(v);
-	system_cache_lookup(element_type, false, &typlen, &typbyval,
-						&typdelim, &typelem, &typoutput, &typalign);
+	system_cache_lookup(element_type, IOFunc_output,
+						&typlen, &typbyval, &typdelim,
+						&typelem, &typoutput, &typalign);
 	fmgr_info(typoutput, &outputproc);
 
 	ndim = ARR_NDIM(v);
@@ -763,10 +780,178 @@ array_out(PG_FUNCTION_ARGS)
 Datum
 array_recv(PG_FUNCTION_ARGS)
 {
-	elog(ERROR, "array_recv: not implemented yet");
-	return 0;
+	StringInfo	buf = (StringInfo) PG_GETARG_POINTER(0);
+	Oid			spec_element_type = PG_GETARG_OID(1);	/* type of an array
+														 * element */
+	Oid			element_type;
+	int			typlen;
+	bool		typbyval;
+	char		typdelim;
+	Oid			typreceive;
+	Oid			typelem;
+	FmgrInfo	receiveproc;
+	int			i,
+				nitems;
+	int32		nbytes;
+	Datum	   *dataPtr;
+	ArrayType  *retval;
+	int			ndim,
+				flags,
+				dim[MAXDIM],
+				lBound[MAXDIM];
+	char		typalign;
+
+	/* Get the array header information */
+	ndim = pq_getmsgint(buf, 4);
+	if (ndim < 0 || ndim > MAXDIM)
+		elog(ERROR, "array_recv: invalid number of dimensions");
+	flags = pq_getmsgint(buf, 4);
+	if (flags != 0)
+		elog(ERROR, "array_recv: invalid array flags");
+	element_type = pq_getmsgint(buf, sizeof(Oid));
+	if (element_type != spec_element_type)
+	{
+		/* XXX Can we allow taking the input element type in any cases? */
+		elog(ERROR, "array_recv: wrong element type");
+	}
+
+	for (i = 0; i < ndim; i++)
+	{
+		dim[i] = pq_getmsgint(buf, 4);
+		lBound[i] = pq_getmsgint(buf, 4);
+	}
+	nitems = ArrayGetNItems(ndim, dim);
+
+	if (nitems == 0)
+	{
+		/* Return empty array */
+		retval = (ArrayType *) palloc0(sizeof(ArrayType));
+		retval->size = sizeof(ArrayType);
+		retval->elemtype = element_type;
+		PG_RETURN_ARRAYTYPE_P(retval);
+	}
+
+	/* Get info about element type, including its receive conversion proc */
+	system_cache_lookup(element_type, IOFunc_receive,
+						&typlen, &typbyval, &typdelim,
+						&typelem, &typreceive, &typalign);
+	if (!OidIsValid(typreceive))
+		elog(ERROR, "No binary input function available for type %s",
+			 format_type_be(element_type));
+	fmgr_info(typreceive, &receiveproc);
+
+	dataPtr = ReadArrayBinary(buf, nitems, &receiveproc, typelem,
+							  typlen, typbyval, typalign,
+							  &nbytes);
+	nbytes += ARR_OVERHEAD(ndim);
+
+	retval = (ArrayType *) palloc0(nbytes);
+	retval->size = nbytes;
+	retval->ndim = ndim;
+	retval->elemtype = element_type;
+	memcpy((char *) ARR_DIMS(retval), (char *) dim,
+		   ndim * sizeof(int));
+	memcpy((char *) ARR_LBOUND(retval), (char *) lBound,
+		   ndim * sizeof(int));
+
+	CopyArrayEls(ARR_DATA_PTR(retval), dataPtr, nitems,
+				 typlen, typbyval, typalign, true);
+	pfree(dataPtr);
+
+	PG_RETURN_ARRAYTYPE_P(retval);
+}
+
+/*---------------------------------------------------------------------------
+ * ReadArrayBinary:
+ *	 collect the data elements of an array being read in binary style.
+ * result :
+ *	 returns a palloc'd array of Datum representations of the array elements.
+ *	 If element type is pass-by-ref, the Datums point to palloc'd values.
+ *	 *nbytes is set to the amount of data space needed for the array,
+ *	 including alignment padding but not including array header overhead.
+ *---------------------------------------------------------------------------
+ */
+static Datum *
+ReadArrayBinary(StringInfo buf,
+				int nitems,
+				FmgrInfo *receiveproc,
+				Oid typelem,
+				int typlen,
+				bool typbyval,
+				char typalign,
+				int *nbytes)
+{
+	Datum	   *values;
+	int			i;
+
+	values = (Datum *) palloc(nitems * sizeof(Datum));
+
+	for (i = 0; i < nitems; i++)
+	{
+		int		itemlen;
+		StringInfoData elem_buf;
+		char	csave;
+
+		/* Get and check the item length */
+		itemlen = pq_getmsgint(buf, 4);
+		if (itemlen < 0 || itemlen > (buf->len - buf->cursor))
+			elog(ERROR, "insufficient data left in message");
+
+		/*
+		 * Rather than copying data around, we just set up a phony
+		 * StringInfo pointing to the correct portion of the input
+		 * buffer.  We assume we can scribble on the input buffer
+		 * so as to maintain the convention that StringInfos have
+		 * a trailing null.
+		 */
+		elem_buf.data = &buf->data[buf->cursor];
+		elem_buf.maxlen = itemlen + 1;
+		elem_buf.len = itemlen;
+		elem_buf.cursor = 0;
+
+		buf->cursor += itemlen;
+
+		csave = buf->data[buf->cursor];
+		buf->data[buf->cursor] = '\0';
+
+		/* Now call the element's receiveproc */
+		values[i] = FunctionCall2(receiveproc,
+								  PointerGetDatum(&elem_buf),
+								  ObjectIdGetDatum(typelem));
+
+		/* Trouble if it didn't eat the whole buffer */
+		if (elem_buf.cursor != itemlen)
+			elog(ERROR, "Improper binary format in array element %d",
+				 i + 1);
+
+		buf->data[buf->cursor] = csave;
+	}
+
+	/*
+	 * Compute total data space needed
+	 */
+	if (typlen > 0)
+	{
+		*nbytes = nitems * att_align(typlen, typalign);
+	}
+	else
+	{
+		Assert(!typbyval);
+		*nbytes = 0;
+		for (i = 0; i < nitems; i++)
+		{
+			/* let's just make sure data is not toasted */
+			if (typlen == -1)
+				values[i] = PointerGetDatum(PG_DETOAST_DATUM(values[i]));
+			*nbytes = att_addlength(*nbytes, typlen, values[i]);
+			*nbytes = att_align(*nbytes, typalign);
+		}
+	}
+
+	return values;
 }
 
+
 /*-------------------------------------------------------------------------
  * array_send :
  *		   takes the internal representation of an array and returns a bytea
@@ -776,8 +961,70 @@ array_recv(PG_FUNCTION_ARGS)
 Datum
 array_send(PG_FUNCTION_ARGS)
 {
-	elog(ERROR, "array_send: not implemented yet");
-	return 0;
+	ArrayType  *v = PG_GETARG_ARRAYTYPE_P(0);
+	Oid			element_type;
+	int			typlen;
+	bool		typbyval;
+	char		typdelim;
+	Oid			typsend,
+				typelem;
+	FmgrInfo	sendproc;
+	char		typalign;
+	char	   *p;
+	int			nitems,
+				i;
+	int			ndim,
+			   *dim;
+	StringInfoData buf;
+
+	/* Get information about the element type and the array dimensions */
+	element_type = ARR_ELEMTYPE(v);
+	system_cache_lookup(element_type, IOFunc_send, &typlen, &typbyval,
+						&typdelim, &typelem, &typsend, &typalign);
+	if (!OidIsValid(typsend))
+		elog(ERROR, "No binary output function available for type %s",
+			 format_type_be(element_type));
+	fmgr_info(typsend, &sendproc);
+
+	ndim = ARR_NDIM(v);
+	dim = ARR_DIMS(v);
+	nitems = ArrayGetNItems(ndim, dim);
+
+	pq_begintypsend(&buf);
+
+	/* Send the array header information */
+	pq_sendint(&buf, ndim, 4);
+	pq_sendint(&buf, v->flags, 4);
+	pq_sendint(&buf, element_type, sizeof(Oid));
+	for (i = 0; i < ndim; i++)
+	{
+		pq_sendint(&buf, ARR_DIMS(v)[i], 4);
+		pq_sendint(&buf, ARR_LBOUND(v)[i], 4);
+	}
+
+	/* Send the array elements using the element's own sendproc */
+	p = ARR_DATA_PTR(v);
+	for (i = 0; i < nitems; i++)
+	{
+		Datum		itemvalue;
+		bytea	   *outputbytes;
+
+		itemvalue = fetch_att(p, typbyval, typlen);
+
+		outputbytes = DatumGetByteaP(FunctionCall2(&sendproc,
+												   itemvalue,
+												   ObjectIdGetDatum(typelem)));
+		/* We assume the result will not have been toasted */
+		pq_sendint(&buf, VARSIZE(outputbytes) - VARHDRSZ, 4);
+		pq_sendbytes(&buf, VARDATA(outputbytes),
+					 VARSIZE(outputbytes) - VARHDRSZ);
+		pfree(outputbytes);
+
+		p = att_addlength(p, typlen, PointerGetDatum(p));
+		p = (char *) att_align(p, typalign);
+	}
+
+	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
 }
 
 /*-------------------------------------------------------------------------
@@ -1583,9 +1830,9 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType)
 		PG_RETURN_ARRAYTYPE_P(v);
 
 	/* Lookup source and result types. Unneeded variables are reused. */
-	system_cache_lookup(inpType, false, &inp_typlen, &inp_typbyval,
+	system_cache_lookup(inpType, IOFunc_input, &inp_typlen, &inp_typbyval,
 						&typdelim, &typelem, &proc, &inp_typalign);
-	system_cache_lookup(retType, false, &typlen, &typbyval,
+	system_cache_lookup(retType, IOFunc_input, &typlen, &typbyval,
 						&typdelim, &typelem, &proc, &typalign);
 
 	/* Allocate temporary array for new values */
@@ -1832,7 +2079,7 @@ array_eq(PG_FUNCTION_ARGS)
 
 static void
 system_cache_lookup(Oid element_type,
-					bool input,
+					IOFuncSelector which_func,
 					int *typlen,
 					bool *typbyval,
 					char *typdelim,
@@ -1855,10 +2102,21 @@ system_cache_lookup(Oid element_type,
 	*typdelim = typeStruct->typdelim;
 	*typelem = typeStruct->typelem;
 	*typalign = typeStruct->typalign;
-	if (input)
-		*proc = typeStruct->typinput;
-	else
-		*proc = typeStruct->typoutput;
+	switch (which_func)
+	{
+		case IOFunc_input:
+			*proc = typeStruct->typinput;
+			break;
+		case IOFunc_output:
+			*proc = typeStruct->typoutput;
+			break;
+		case IOFunc_receive:
+			*proc = typeStruct->typreceive;
+			break;
+		case IOFunc_send:
+			*proc = typeStruct->typsend;
+			break;
+	}
 	ReleaseSysCache(typeTuple);
 }
 
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index e205448ec6d..e4b28000ded 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -37,7 +37,7 @@
  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: catversion.h,v 1.189 2003/05/09 21:19:49 tgl Exp $
+ * $Id: catversion.h,v 1.190 2003/05/09 23:01:45 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -53,6 +53,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	200305092
+#define CATALOG_VERSION_NO	200305093
 
 #endif
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index bc186f2eed9..a6acb4cf93d 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: pg_proc.h,v 1.296 2003/05/09 21:19:49 tgl Exp $
+ * $Id: pg_proc.h,v 1.297 2003/05/09 23:01:45 tgl Exp $
  *
  * NOTES
  *	  The script catalog/genbki.sh reads this file and generates .bki
@@ -3153,9 +3153,9 @@ DATA(insert OID =  2311 (  md5	   PGNSP PGUID 12 f f t f i 1 25 "25"  md5_text -
 DESCR("calculates md5 hash");
 
 
-DATA(insert OID = 2400 (  array_recv		   PGNSP PGUID 12 f f t f s 1 2277 "2281"  array_recv - _null_ ));
+DATA(insert OID = 2400 (  array_recv		   PGNSP PGUID 12 f f t f s 2 2277 "2281 26"  array_recv - _null_ ));
 DESCR("I/O");
-DATA(insert OID = 2401 (  array_send		   PGNSP PGUID 12 f f t f s 1 17 "2277"  array_send - _null_ ));
+DATA(insert OID = 2401 (  array_send		   PGNSP PGUID 12 f f t f s 2 17 "2277 26"  array_send - _null_ ));
 DESCR("I/O");
 DATA(insert OID = 2402 (  record_recv		   PGNSP PGUID 12 f f t f i 1 2249 "2281"  record_recv - _null_ ));
 DESCR("I/O");
diff --git a/src/test/regress/expected/type_sanity.out b/src/test/regress/expected/type_sanity.out
index d466052fed3..8bd18ca29b2 100644
--- a/src/test/regress/expected/type_sanity.out
+++ b/src/test/regress/expected/type_sanity.out
@@ -143,7 +143,9 @@ WHERE p1.typoutput = p2.oid AND p1.typtype in ('b', 'p') AND NOT
 SELECT p1.oid, p1.typname, p2.oid, p2.proname
 FROM pg_type AS p1, pg_proc AS p2
 WHERE p1.typreceive = p2.oid AND p1.typtype in ('b', 'p') AND NOT
-    (p2.pronargs = 1 AND p2.proargtypes[0] = 'internal'::regtype);
+    ((p2.pronargs = 1 AND p2.proargtypes[0] = 'internal'::regtype) OR
+     (p2.pronargs = 2 AND p2.proargtypes[0] = 'internal'::regtype AND
+      p2.proargtypes[1] = 'oid'::regtype));
  oid | typname | oid | proname 
 -----+---------+-----+---------
 (0 rows)
diff --git a/src/test/regress/sql/type_sanity.sql b/src/test/regress/sql/type_sanity.sql
index 04a0557d836..3c9004ce569 100644
--- a/src/test/regress/sql/type_sanity.sql
+++ b/src/test/regress/sql/type_sanity.sql
@@ -114,7 +114,9 @@ WHERE p1.typoutput = p2.oid AND p1.typtype in ('b', 'p') AND NOT
 SELECT p1.oid, p1.typname, p2.oid, p2.proname
 FROM pg_type AS p1, pg_proc AS p2
 WHERE p1.typreceive = p2.oid AND p1.typtype in ('b', 'p') AND NOT
-    (p2.pronargs = 1 AND p2.proargtypes[0] = 'internal'::regtype);
+    ((p2.pronargs = 1 AND p2.proargtypes[0] = 'internal'::regtype) OR
+     (p2.pronargs = 2 AND p2.proargtypes[0] = 'internal'::regtype AND
+      p2.proargtypes[1] = 'oid'::regtype));
 
 -- As of 7.4, this check finds refcursor, which is borrowing
 -- other types' I/O routines
-- 
GitLab