From 11d337185d61028e07cc696306694ec59ac6bfe2 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Sat, 23 Nov 2002 18:59:25 +0000
Subject: [PATCH] Fix inappropriate quoting in dblink.  From Joe Conway.

---
 contrib/dblink/README.dblink       |  4 ++
 contrib/dblink/dblink.c            | 67 ++++++++++++++++++++++++------
 contrib/dblink/expected/dblink.out | 37 +++++++++++++++++
 contrib/dblink/sql/dblink.sql      | 20 +++++++++
 4 files changed, 116 insertions(+), 12 deletions(-)

diff --git a/contrib/dblink/README.dblink b/contrib/dblink/README.dblink
index f304b7729d1..af627901dc8 100644
--- a/contrib/dblink/README.dblink
+++ b/contrib/dblink/README.dblink
@@ -151,6 +151,10 @@ Installation:
 
 Documentation:
 
+  Note: Parameters representing relation names must include double
+     quotes if the names are mixed-case or contain special characters. They
+     must also be appropriately qualified with schema name if applicable.
+
   See the following files:
      doc/connection
      doc/cursor
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 8cbce09d1ba..accfd97e9f8 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -71,6 +71,7 @@ static dblink_results *get_res_ptr(int32 res_id_index);
 static void append_res_ptr(dblink_results * results);
 static void remove_res_ptr(dblink_results * results);
 static TupleDesc pgresultGetTupleDesc(PGresult *res);
+static char *generate_relation_name(Oid relid);
 
 /* Global */
 List	   *res_id = NIL;
@@ -171,7 +172,7 @@ dblink_open(PG_FUNCTION_ARGS)
 	}
 	PQclear(res);
 
-	appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql);
+	appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
 	res = PQexec(conn, str->data);
 	if (!res ||
 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
@@ -210,7 +211,7 @@ dblink_close(PG_FUNCTION_ARGS)
 	else
 		elog(ERROR, "dblink_close: no connection available");
 
-	appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname));
+	appendStringInfo(str, "CLOSE %s", curname);
 
 	/* close the cursor */
 	res = PQexec(conn, str->data);
@@ -287,7 +288,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 		else
 			elog(ERROR, "dblink_fetch: no connection available");
 
-		appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname));
+		appendStringInfo(str, "FETCH %d FROM %s", howmany, curname);
 
 		res = PQexec(conn, str->data);
 		if (!res ||
@@ -306,7 +307,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 		{
 			/* cursor does not exist - closed already or bad name */
 			PQclear(res);
-			elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname));
+			elog(ERROR, "dblink_fetch: cursor %s does not exist", curname);
 		}
 
 		funcctx->max_calls = PQntuples(res);
@@ -1527,11 +1528,13 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
 	int			i;
 	bool		needComma;
 
+	/* get relation name including any needed schema prefix and quoting */
+	relname = generate_relation_name(relid);
+
 	/*
 	 * Open relation using relid
 	 */
 	rel = relation_open(relid, AccessShareLock);
-	relname = RelationGetRelationName(rel);
 	tupdesc = rel->rd_att;
 	natts = tupdesc->natts;
 
@@ -1539,7 +1542,7 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
 	if (!tuple)
 		elog(ERROR, "dblink_build_sql_insert: row not found");
 
-	appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
+	appendStringInfo(str, "INSERT INTO %s(", relname);
 
 	needComma = false;
 	for (i = 0; i < natts; i++)
@@ -1610,15 +1613,17 @@ get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattval
 	char	   *val;
 	int			i;
 
+	/* get relation name including any needed schema prefix and quoting */
+	relname = generate_relation_name(relid);
+
 	/*
 	 * Open relation using relid
 	 */
 	rel = relation_open(relid, AccessShareLock);
-	relname = RelationGetRelationName(rel);
 	tupdesc = rel->rd_att;
 	natts = tupdesc->natts;
 
-	appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
+	appendStringInfo(str, "DELETE FROM %s WHERE ", relname);
 	for (i = 0; i < pknumatts; i++)
 	{
 		int16		pkattnum = pkattnums[i];
@@ -1669,11 +1674,13 @@ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
 	int			i;
 	bool		needComma;
 
+	/* get relation name including any needed schema prefix and quoting */
+	relname = generate_relation_name(relid);
+
 	/*
 	 * Open relation using relid
 	 */
 	rel = relation_open(relid, AccessShareLock);
-	relname = RelationGetRelationName(rel);
 	tupdesc = rel->rd_att;
 	natts = tupdesc->natts;
 
@@ -1681,7 +1688,7 @@ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
 	if (!tuple)
 		elog(ERROR, "dblink_build_sql_update: row not found");
 
-	appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
+	appendStringInfo(str, "UPDATE %s SET ", relname);
 
 	needComma = false;
 	for (i = 0; i < natts; i++)
@@ -1813,11 +1820,13 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
 	int			i;
 	char	   *val = NULL;
 
+	/* get relation name including any needed schema prefix and quoting */
+	relname = generate_relation_name(relid);
+
 	/*
 	 * Open relation using relid
 	 */
 	rel = relation_open(relid, AccessShareLock);
-	relname = RelationGetRelationName(rel);
 	tupdesc = CreateTupleDescCopy(rel->rd_att);
 	relation_close(rel, AccessShareLock);
 
@@ -1831,7 +1840,7 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
 	 * Build sql statement to look up tuple of interest Use src_pkattvals
 	 * as the criteria.
 	 */
-	appendStringInfo(str, "SELECT * FROM %s WHERE ", quote_ident_cstr(relname));
+	appendStringInfo(str, "SELECT * FROM %s WHERE ", relname);
 
 	for (i = 0; i < pknumatts; i++)
 	{
@@ -2003,3 +2012,37 @@ pgresultGetTupleDesc(PGresult *res)
 
 	return desc;
 }
+
+/*
+ * generate_relation_name - copied from ruleutils.c
+ *		Compute the name to display for a relation specified by OID
+ *
+ * The result includes all necessary quoting and schema-prefixing.
+ */
+static char *
+generate_relation_name(Oid relid)
+{
+	HeapTuple	tp;
+	Form_pg_class reltup;
+	char	   *nspname;
+	char	   *result;
+
+	tp = SearchSysCache(RELOID,
+						ObjectIdGetDatum(relid),
+						0, 0, 0);
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup of relation %u failed", relid);
+	reltup = (Form_pg_class) GETSTRUCT(tp);
+
+	/* Qualify the name if not visible in search path */
+	if (RelationIsVisible(relid))
+		nspname = NULL;
+	else
+		nspname = get_namespace_name(reltup->relnamespace);
+
+	result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
+
+	ReleaseSysCache(tp);
+
+	return result;
+}
diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index 3d801ebdbbe..ed3a87cfa0e 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -59,6 +59,43 @@ SELECT dblink_build_sql_delete('foo','1 2',2,'{"0", "a"}');
  DELETE FROM foo WHERE f1 = '0' AND f2 = 'a'
 (1 row)
 
+-- retest using a quoted and schema qualified table
+CREATE SCHEMA "MySchema";
+CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key (f1,f2));
+NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index 'Foo_pkey' for table 'Foo'
+INSERT INTO "MySchema"."Foo" VALUES (0,'a','{"a0","b0","c0"}');
+-- list the primary key fields
+SELECT *
+FROM dblink_get_pkey('"MySchema"."Foo"');
+ position | colname 
+----------+---------
+        1 | f1
+        2 | f2
+(2 rows)
+
+-- build an insert statement based on a local tuple,
+-- replacing the primary key values with new ones
+SELECT dblink_build_sql_insert('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+                        dblink_build_sql_insert                         
+------------------------------------------------------------------------
+ INSERT INTO "MySchema"."Foo"(f1,f2,f3) VALUES('99','xyz','{a0,b0,c0}')
+(1 row)
+
+-- build an update statement based on a local tuple,
+-- replacing the primary key values with new ones
+SELECT dblink_build_sql_update('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+                                       dblink_build_sql_update                                       
+-----------------------------------------------------------------------------------------------------
+ UPDATE "MySchema"."Foo" SET f1 = '99', f2 = 'xyz', f3 = '{a0,b0,c0}' WHERE f1 = '99' AND f2 = 'xyz'
+(1 row)
+
+-- build a delete statement based on a local tuple,
+SELECT dblink_build_sql_delete('"MySchema"."Foo"','1 2',2,'{"0", "a"}');
+                 dblink_build_sql_delete                  
+----------------------------------------------------------
+ DELETE FROM "MySchema"."Foo" WHERE f1 = '0' AND f2 = 'a'
+(1 row)
+
 -- regular old dblink
 SELECT *
 FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c text[])
diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql
index 542a42eda0f..4d534e50c1c 100644
--- a/contrib/dblink/sql/dblink.sql
+++ b/contrib/dblink/sql/dblink.sql
@@ -44,6 +44,26 @@ SELECT dblink_build_sql_update('foo','1 2',2,'{"0", "a"}','{"99", "xyz"}');
 -- build a delete statement based on a local tuple,
 SELECT dblink_build_sql_delete('foo','1 2',2,'{"0", "a"}');
 
+-- retest using a quoted and schema qualified table
+CREATE SCHEMA "MySchema";
+CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key (f1,f2));
+INSERT INTO "MySchema"."Foo" VALUES (0,'a','{"a0","b0","c0"}');
+
+-- list the primary key fields
+SELECT *
+FROM dblink_get_pkey('"MySchema"."Foo"');
+
+-- build an insert statement based on a local tuple,
+-- replacing the primary key values with new ones
+SELECT dblink_build_sql_insert('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+
+-- build an update statement based on a local tuple,
+-- replacing the primary key values with new ones
+SELECT dblink_build_sql_update('"MySchema"."Foo"','1 2',2,'{"0", "a"}','{"99", "xyz"}');
+
+-- build a delete statement based on a local tuple,
+SELECT dblink_build_sql_delete('"MySchema"."Foo"','1 2',2,'{"0", "a"}');
+
 -- regular old dblink
 SELECT *
 FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c text[])
-- 
GitLab