From 0563a3a8b59150bf3cc8b2b7077f684e0eaf8aff Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Fri, 13 Jan 2017 14:03:52 -0500
Subject: [PATCH] Fix a bug in how we generate partition constraints.

Move the code for doing parent attnos to child attnos mapping for Vars
in partition constraint expressions to a separate function
map_partition_varattnos() and call it from the appropriate places.
Doing it in get_qual_from_partbound(), as is now, would produce wrong
result in certain multi-level partitioning cases, because it only
considers the current pair of parent-child relations.  In certain
multi-level partitioning cases, attnums for the same key attribute(s)
might differ between various levels causing the same attribute to be
numbered differently in different instances of the Var corresponding
to a given attribute.

With this commit, in generate_partition_qual(), we first generate the
the whole partition constraint (considering all levels of partitioning)
and then do the mapping, so that Vars in the final expression are
numbered according the leaf relation (to which it is supposed to apply).

Amit Langote, reviewed by me.
---
 src/backend/catalog/partition.c           | 99 +++++++++++------------
 src/backend/commands/tablecmds.c          |  7 +-
 src/include/catalog/partition.h           |  1 +
 src/test/regress/expected/alter_table.out | 30 +++++++
 src/test/regress/sql/alter_table.sql      | 25 ++++++
 5 files changed, 110 insertions(+), 52 deletions(-)

diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index f54e1bdf3fb..874e69d8d62 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -852,10 +852,6 @@ get_qual_from_partbound(Relation rel, Relation parent, Node *bound)
 	PartitionBoundSpec *spec = (PartitionBoundSpec *) bound;
 	PartitionKey key = RelationGetPartitionKey(parent);
 	List	   *my_qual = NIL;
-	TupleDesc	parent_tupdesc = RelationGetDescr(parent);
-	AttrNumber	parent_attno;
-	AttrNumber *partition_attnos;
-	bool		found_whole_row;
 
 	Assert(key != NULL);
 
@@ -876,38 +872,51 @@ get_qual_from_partbound(Relation rel, Relation parent, Node *bound)
 				 (int) key->strategy);
 	}
 
-	/*
-	 * Translate vars in the generated expression to have correct attnos. Note
-	 * that the vars in my_qual bear attnos dictated by key which carries
-	 * physical attnos of the parent.  We must allow for a case where physical
-	 * attnos of a partition can be different from the parent.
-	 */
-	partition_attnos = (AttrNumber *)
-		palloc0(parent_tupdesc->natts * sizeof(AttrNumber));
-	for (parent_attno = 1; parent_attno <= parent_tupdesc->natts;
-		 parent_attno++)
+	return my_qual;
+}
+
+/*
+ * map_partition_varattnos - maps varattno of any Vars in expr from the
+ * parent attno to partition attno.
+ *
+ * We must allow for a case where physical attnos of a partition can be
+ * different from the parent's.
+ */
+List *
+map_partition_varattnos(List *expr, Relation partrel, Relation parent)
+{
+	TupleDesc	tupdesc = RelationGetDescr(parent);
+	AttrNumber	attno;
+	AttrNumber *part_attnos;
+	bool		found_whole_row;
+
+	if (expr == NIL)
+		return NIL;
+
+	part_attnos = (AttrNumber *) palloc0(tupdesc->natts * sizeof(AttrNumber));
+	for (attno = 1; attno <= tupdesc->natts; attno++)
 	{
-		Form_pg_attribute attribute = parent_tupdesc->attrs[parent_attno - 1];
+		Form_pg_attribute attribute = tupdesc->attrs[attno - 1];
 		char	   *attname = NameStr(attribute->attname);
-		AttrNumber	partition_attno;
+		AttrNumber	part_attno;
 
 		if (attribute->attisdropped)
 			continue;
 
-		partition_attno = get_attnum(RelationGetRelid(rel), attname);
-		partition_attnos[parent_attno - 1] = partition_attno;
+		part_attno = get_attnum(RelationGetRelid(partrel), attname);
+		part_attnos[attno - 1] = part_attno;
 	}
 
-	my_qual = (List *) map_variable_attnos((Node *) my_qual,
-										   1, 0,
-										   partition_attnos,
-										   parent_tupdesc->natts,
-										   &found_whole_row);
-	/* there can never be a whole-row reference here */
+	expr = (List *) map_variable_attnos((Node *) expr,
+										1, 0,
+										part_attnos,
+										tupdesc->natts,
+										&found_whole_row);
+	/* There can never be a whole-row reference here */
 	if (found_whole_row)
 		elog(ERROR, "unexpected whole-row reference found in partition key");
 
-	return my_qual;
+	return expr;
 }
 
 /*
@@ -1496,27 +1505,15 @@ generate_partition_qual(Relation rel)
 	/* Guard against stack overflow due to overly deep partition tree */
 	check_stack_depth();
 
+	/* Quick copy */
+	if (rel->rd_partcheck != NIL)
+		return copyObject(rel->rd_partcheck);
+
 	/* Grab at least an AccessShareLock on the parent table */
 	parent = heap_open(get_partition_parent(RelationGetRelid(rel)),
 					   AccessShareLock);
 
-	/* Quick copy */
-	if (rel->rd_partcheck)
-	{
-		if (parent->rd_rel->relispartition)
-			result = list_concat(generate_partition_qual(parent),
-								 copyObject(rel->rd_partcheck));
-		else
-			result = copyObject(rel->rd_partcheck);
-
-		heap_close(parent, AccessShareLock);
-		return result;
-	}
-
 	/* Get pg_class.relpartbound */
-	if (!rel->rd_rel->relispartition)	/* should not happen */
-		elog(ERROR, "relation \"%s\" has relispartition = false",
-			 RelationGetRelationName(rel));
 	tuple = SearchSysCache1(RELOID, RelationGetRelid(rel));
 	if (!HeapTupleIsValid(tuple))
 		elog(ERROR, "cache lookup failed for relation %u",
@@ -1533,20 +1530,22 @@ generate_partition_qual(Relation rel)
 
 	my_qual = get_qual_from_partbound(rel, parent, bound);
 
-	/* If requested, add parent's quals to the list (if any) */
+	/* Add the parent's quals to the list (if any) */
 	if (parent->rd_rel->relispartition)
-	{
-		List	   *parent_check;
-
-		parent_check = generate_partition_qual(parent);
-		result = list_concat(parent_check, my_qual);
-	}
+		result = list_concat(generate_partition_qual(parent), my_qual);
 	else
 		result = my_qual;
 
-	/* Save a copy of my_qual in the relcache */
+	/*
+	 * Change Vars to have partition's attnos instead of the parent's.
+	 * We do this after we concatenate the parent's quals, because
+	 * we want every Var in it to bear this relation's attnos.
+	 */
+	result = map_partition_varattnos(result, rel, parent);
+
+	/* Save a copy in the relcache */
 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-	rel->rd_partcheck = copyObject(my_qual);
+	rel->rd_partcheck = copyObject(result);
 	MemoryContextSwitchTo(oldcxt);
 
 	/* Keep the parent locked until commit */
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 42558ec42e0..f913e87bc8b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -13429,6 +13429,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
 			Oid			part_relid = lfirst_oid(lc);
 			Relation	part_rel;
 			Expr	   *constr;
+			List	   *my_constr;
 
 			/* Lock already taken */
 			if (part_relid != RelationGetRelid(attachRel))
@@ -13451,8 +13452,10 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
 			tab = ATGetQueueEntry(wqueue, part_rel);
 
 			constr = linitial(partConstraint);
-			tab->partition_constraint = make_ands_implicit((Expr *) constr);
-
+			my_constr = make_ands_implicit((Expr *) constr);
+			tab->partition_constraint = map_partition_varattnos(my_constr,
+																part_rel,
+																rel);
 			/* keep our lock until commit */
 			if (part_rel != attachRel)
 				heap_close(part_rel, NoLock);
diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h
index 1c3c4d6ac36..537f0aad675 100644
--- a/src/include/catalog/partition.h
+++ b/src/include/catalog/partition.h
@@ -77,6 +77,7 @@ extern bool partition_bounds_equal(PartitionKey key,
 extern void check_new_partition_bound(char *relname, Relation parent, Node *bound);
 extern Oid get_partition_parent(Oid relid);
 extern List *get_qual_from_partbound(Relation rel, Relation parent, Node *bound);
+extern List *map_partition_varattnos(List *expr, Relation partrel, Relation parent);
 extern List *RelationGetPartitionQual(Relation rel);
 
 /* For tuple routing */
diff --git a/src/test/regress/expected/alter_table.out b/src/test/regress/expected/alter_table.out
index d8028c67470..b290bc810eb 100644
--- a/src/test/regress/expected/alter_table.out
+++ b/src/test/regress/expected/alter_table.out
@@ -3339,3 +3339,33 @@ drop cascades to table part_2
 drop cascades to table part_5
 drop cascades to table part_5_a
 drop cascades to table part_1
+-- more tests for certain multi-level partitioning scenarios
+create table p (a int, b int) partition by range (a, b);
+create table p1 (b int, a int not null) partition by range (b);
+create table p11 (like p1);
+alter table p11 drop a;
+alter table p11 add a int;
+alter table p11 drop a;
+alter table p11 add a int not null;
+-- attnum for key attribute 'a' is different in p, p1, and p11
+select attrelid::regclass, attname, attnum
+from pg_attribute
+where attname = 'a'
+ and (attrelid = 'p'::regclass
+   or attrelid = 'p1'::regclass
+   or attrelid = 'p11'::regclass);
+ attrelid | attname | attnum 
+----------+---------+--------
+ p        | a       |      1
+ p1       | a       |      2
+ p11      | a       |      4
+(3 rows)
+
+alter table p1 attach partition p11 for values from (2) to (5);
+insert into p1 (a, b) values (2, 3);
+-- check that partition validation scan correctly detects violating rows
+alter table p attach partition p1 for values from (1, 2) to (1, 10);
+ERROR:  partition constraint is violated by some row
+-- cleanup
+drop table p, p1 cascade;
+NOTICE:  drop cascades to table p11
diff --git a/src/test/regress/sql/alter_table.sql b/src/test/regress/sql/alter_table.sql
index 22652665172..5d07e2ede44 100644
--- a/src/test/regress/sql/alter_table.sql
+++ b/src/test/regress/sql/alter_table.sql
@@ -2191,3 +2191,28 @@ ALTER TABLE list_parted2 ALTER COLUMN b TYPE text;
 
 -- cleanup
 DROP TABLE list_parted, list_parted2, range_parted CASCADE;
+
+-- more tests for certain multi-level partitioning scenarios
+create table p (a int, b int) partition by range (a, b);
+create table p1 (b int, a int not null) partition by range (b);
+create table p11 (like p1);
+alter table p11 drop a;
+alter table p11 add a int;
+alter table p11 drop a;
+alter table p11 add a int not null;
+-- attnum for key attribute 'a' is different in p, p1, and p11
+select attrelid::regclass, attname, attnum
+from pg_attribute
+where attname = 'a'
+ and (attrelid = 'p'::regclass
+   or attrelid = 'p1'::regclass
+   or attrelid = 'p11'::regclass);
+
+alter table p1 attach partition p11 for values from (2) to (5);
+
+insert into p1 (a, b) values (2, 3);
+-- check that partition validation scan correctly detects violating rows
+alter table p attach partition p1 for values from (1, 2) to (1, 10);
+
+-- cleanup
+drop table p, p1 cascade;
-- 
GitLab