From a14b52c61454e3d23f7d0ab0affa56f8abfdf505 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Fri, 3 Nov 2017 11:59:20 -0400
Subject: [PATCH] Don't reset additional columns on subscriber to NULL on
 UPDATE

When a publisher table has fewer columns than a subscriber, the update
of a row on the publisher should result in updating of only the columns
in common.  The previous coding mistakenly reset the values of
additional columns on the subscriber to NULL because it failed to skip
updates of columns not found in the attribute map.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
---
 src/backend/replication/logical/worker.c   |  7 +-
 src/test/subscription/t/008_diff_schema.pl | 80 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 2 deletions(-)
 create mode 100644 src/test/subscription/t/008_diff_schema.pl

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b58d2e10086..eedc3a8816b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -391,10 +391,13 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 		Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
 		int			remoteattnum = rel->attrmap[i];
 
-		if (remoteattnum >= 0 && !replaces[remoteattnum])
+		if (remoteattnum < 0)
 			continue;
 
-		if (remoteattnum >= 0 && values[remoteattnum] != NULL)
+		if (!replaces[remoteattnum])
+			continue;
+
+		if (values[remoteattnum] != NULL)
 		{
 			Oid			typinput;
 			Oid			typioparam;
diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl
new file mode 100644
index 00000000000..b71be6e4870
--- /dev/null
+++ b/src/test/subscription/t/008_diff_schema.pl
@@ -0,0 +1,80 @@
+# Test behavior with different schema on subscriber
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+sub wait_for_caught_up
+{
+	my ($node, $appname) = @_;
+
+	$node->poll_query_until('postgres',
+"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
+	) or die "Timed out while waiting for subscriber to catch up";
+}
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"
+);
+
+wait_for_caught_up($node_publisher, $appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+# Update the rows on the publisher and check the additional columns on
+# subscriber didn't change
+$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(b)");
+
+wait_for_caught_up($node_publisher, $appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check extra columns contain local defaults');
+
+# Change the local values of the extra columns on the subscriber,
+# update publisher, and check that subscriber retains the expected
+# values
+$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'");
+$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)");
+
+wait_for_caught_up($node_publisher, $appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check extra columns contain locally changed data');
+
+$node_subscriber->stop;
+$node_publisher->stop;
-- 
GitLab