From e55704d8b2fe522fbc9435acbb5bc59033478bd5 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Tue, 10 Dec 2013 18:33:45 -0500
Subject: [PATCH] Add new wal_level, logical, sufficient for logical decoding.

When wal_level=logical, we'll log columns from the old tuple as
configured by the REPLICA IDENTITY facility added in commit
07cacba983ef79be4a84fcd0e0ca3b5fcb85dd65.  This makes it possible
a properly-configured logical replication solution to correctly
follow table updates even if they change the chosen key columns,
or, with REPLICA IDENTITY FULL, even if the table has no key at
all.  Note that updates which do not modify the replica identity
column won't log anything extra, making the choice of a good key
(i.e. one that will rarely be changed) important to performance
when wal_level=logical is configured.

Each insert, update, or delete to a catalog table will also log
the CMIN and/or CMAX values of stamped by the current transaction.
This is necessary because logical decoding will require access to
historical snapshots of the catalog in order to decode some data
types, and the CMIN/CMAX values that we may need in order to judge
row visibility may have been overwritten by the time we need them.

Andres Freund, reviewed in various versions by myself, Heikki
Linnakangas, KONDO Mitsumasa, and many others.
---
 doc/src/sgml/backup.sgml                      |   4 +-
 doc/src/sgml/config.sgml                      |  44 +-
 doc/src/sgml/high-availability.sgml           |   5 +-
 src/backend/access/heap/heapam.c              | 603 +++++++++++++++---
 src/backend/access/rmgrdesc/heapdesc.c        |   9 +
 src/backend/access/rmgrdesc/xlogdesc.c        |   1 +
 src/backend/access/transam/twophase.c         |   4 +-
 src/backend/access/transam/xact.c             |  41 +-
 src/backend/access/transam/xlog.c             |   8 +-
 src/backend/catalog/index.c                   |   2 +-
 src/backend/commands/trigger.c                |   3 +-
 src/backend/postmaster/postmaster.c           |   4 +-
 src/backend/utils/cache/relcache.c            |  50 +-
 src/backend/utils/misc/postgresql.conf.sample |   2 +-
 src/bin/pg_controldata/pg_controldata.c       |   2 +
 src/include/access/heapam_xlog.h              |  70 +-
 src/include/access/xact.h                     |   1 +
 src/include/access/xlog.h                     |   8 +-
 src/include/access/xlog_internal.h            |   2 +-
 src/include/utils/rel.h                       |  24 +
 src/include/utils/relcache.h                  |  12 +-
 src/tools/pgindent/typedefs.list              |   3 +
 22 files changed, 745 insertions(+), 157 deletions(-)

diff --git a/doc/src/sgml/backup.sgml b/doc/src/sgml/backup.sgml
index 995933c62dd..a2361d780fb 100644
--- a/doc/src/sgml/backup.sgml
+++ b/doc/src/sgml/backup.sgml
@@ -587,7 +587,7 @@ tar -cf backup.tar /usr/local/pgsql/data
 
    <para>
     To enable WAL archiving, set the <xref linkend="guc-wal-level">
-    configuration parameter to <literal>archive</> (or <literal>hot_standby</>),
+    configuration parameter to <literal>archive</> or higher,
     <xref linkend="guc-archive-mode"> to <literal>on</>,
     and specify the shell command to use in the <xref
     linkend="guc-archive-command"> configuration parameter.  In practice
@@ -1259,7 +1259,7 @@ restore_command = 'cp /mnt/server/archivedir/%f %p'
       If more flexibility in copying the backup files is needed, a lower
       level process can be used for standalone hot backups as well.
       To prepare for low level standalone hot backups, set <varname>wal_level</> to
-      <literal>archive</> (or <literal>hot_standby</>), <varname>archive_mode</> to
+      <literal>archive</> or higher, <varname>archive_mode</> to
       <literal>on</>, and set up an <varname>archive_command</> that performs
       archiving only when a <emphasis>switch file</> exists.  For example:
 <programlisting>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index fee83c1496b..f33a16b7aad 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1648,10 +1648,12 @@ include 'filename'
         <varname>wal_level</> determines how much information is written
         to the WAL. The default value is <literal>minimal</>, which writes
         only the information needed to recover from a crash or immediate
-        shutdown. <literal>archive</> adds logging required for WAL archiving,
-        and <literal>hot_standby</> further adds information required to run
-        read-only queries on a standby server.
-        This parameter can only be set at server start.
+        shutdown. <literal>archive</> adds logging required for WAL archiving;
+        <literal>hot_standby</> further adds information required to run
+        read-only queries on a standby server; and, finally
+        <literal>logical</> adds information necessary to support logical
+        decoding.  Each level includes the information logged at all lower
+        levels.  This parameter can only be set at server start.
        </para>
        <para>
         In <literal>minimal</> level, WAL-logging of some bulk
@@ -1665,24 +1667,30 @@ include 'filename'
          <member><command>COPY</> into tables that were created or truncated in the same
          transaction</member>
         </simplelist>
-        But minimal WAL does not contain
-        enough information to reconstruct the data from a base backup and the
-        WAL logs, so either <literal>archive</> or <literal>hot_standby</>
-        level must be used to enable
-        WAL archiving (<xref linkend="guc-archive-mode">) and streaming
-        replication.
+        But minimal WAL does not contain enough information to reconstruct the
+        data from a base backup and the WAL logs, so <literal>archive</> or
+        higher must be used to enable WAL archiving
+        (<xref linkend="guc-archive-mode">) and streaming replication.
        </para>
        <para>
         In <literal>hot_standby</> level, the same information is logged as
         with <literal>archive</>, plus information needed to reconstruct
         the status of running transactions from the WAL. To enable read-only
         queries on a standby server, <varname>wal_level</> must be set to
-        <literal>hot_standby</> on the primary, and
+        <literal>hot_standby</> or higher on the primary, and
         <xref linkend="guc-hot-standby"> must be enabled in the standby. It is
-        thought that there is
-        little measurable difference in performance between using
-        <literal>hot_standby</> and <literal>archive</> levels, so feedback
-        is welcome if any production impacts are noticeable.
+        thought that there is little measurable difference in performance
+        between using <literal>hot_standby</> and <literal>archive</> levels,
+        so feedback is welcome if any production impacts are noticeable.
+       </para>
+       <para>
+        In <literal>logical</> level, the same information is logged as
+        with <literal>hot_standby</>, plus information needed to allow
+        extracting logical changesets from the WAL. Using a level of
+        <literal>logical</> will increase the WAL volume, particularly if many
+        tables are configured for <literal>REPLICA IDENTITY FULL</literal> and
+        many <command>UPDATE</> and <command>DELETE</> statements are
+        executed.
        </para>
       </listitem>
      </varlistentry>
@@ -2239,9 +2247,9 @@ include 'filename'
         disabled. WAL sender processes count towards the total number
         of connections, so the parameter cannot be set higher than
         <xref linkend="guc-max-connections">.  This parameter can only
-        be set at server start. <varname>wal_level</> must be set
-        to <literal>archive</> or <literal>hot_standby</> to allow
-        connections from standby servers.
+        be set at server start. <varname>wal_level</> must be set to
+        <literal>archive</> or higher to allow connections from standby
+        servers.
        </para>
        </listitem>
       </varlistentry>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index c8f6fa8a54d..e2e5ac93ab9 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -1861,8 +1861,9 @@ LOG:  database system is ready to accept read only connections
     Consistency information is recorded once per checkpoint on the primary.
     It is not possible to enable hot standby when reading WAL
     written during a period when <varname>wal_level</> was not set to
-    <literal>hot_standby</> on the primary.  Reaching a consistent state can
-    also be delayed in the presence of both of these conditions:
+    <literal>hot_standby</> or <literal>logical</> on the primary.  Reaching
+    a consistent state can also be delayed in the presence of both of these
+    conditions:
 
       <itemizedlist>
        <listitem>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 2035a2158f1..249fffeb061 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -85,12 +85,14 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 					TransactionId xid, CommandId cid, int options);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
 				Buffer newbuf, HeapTuple oldtup,
-				HeapTuple newtup, bool all_visible_cleared,
-				bool new_all_visible_cleared);
+				HeapTuple newtup, HeapTuple old_key_tup,
+				bool all_visible_cleared, bool new_all_visible_cleared);
 static void HeapSatisfiesHOTandKeyUpdate(Relation relation,
-							 Bitmapset *hot_attrs, Bitmapset *key_attrs,
-							 bool *satisfies_hot, bool *satisfies_key,
-							 HeapTuple oldtup, HeapTuple newtup);
+						  Bitmapset *hot_attrs,
+						  Bitmapset *key_attrs, Bitmapset *id_attrs,
+						  bool *satisfies_hot, bool *satisfies_key,
+						  bool *satisfies_id,
+						  HeapTuple oldtup, HeapTuple newtup);
 static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask,
 						  uint16 old_infomask2, TransactionId add_to_xmax,
 						  LockTupleMode mode, bool is_update,
@@ -108,6 +110,9 @@ static void MultiXactIdWait(MultiXactId multi, MultiXactStatus status,
 static bool ConditionalMultiXactIdWait(MultiXactId multi,
 						   MultiXactStatus status, int *remaining,
 						   uint16 infomask);
+static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
+static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_modified,
+										bool *copy);
 
 
 /*
@@ -2103,11 +2108,24 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 		xl_heap_insert xlrec;
 		xl_heap_header xlhdr;
 		XLogRecPtr	recptr;
-		XLogRecData rdata[3];
+		XLogRecData rdata[4];
 		Page		page = BufferGetPage(buffer);
 		uint8		info = XLOG_HEAP_INSERT;
+		bool		need_tuple_data;
+
+		/*
+		 * For logical decoding, we need the tuple even if we're doing a
+		 * full page write, so make sure to log it separately. (XXX We could
+		 * alternatively store a pointer into the FPW).
+		 *
+		 * Also, if this is a catalog, we need to transmit combocids to
+		 * properly decode, so log that as well.
+		 */
+		need_tuple_data = RelationIsLogicallyLogged(relation);
+		if (RelationIsAccessibleInLogicalDecoding(relation))
+			log_heap_new_cid(relation, heaptup);
 
-		xlrec.all_visible_cleared = all_visible_cleared;
+		xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
 		xlrec.target.node = relation->rd_node;
 		xlrec.target.tid = heaptup->t_self;
 		rdata[0].data = (char *) &xlrec;
@@ -2126,17 +2144,35 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 		 */
 		rdata[1].data = (char *) &xlhdr;
 		rdata[1].len = SizeOfHeapHeader;
-		rdata[1].buffer = buffer;
+		rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
 		rdata[1].buffer_std = true;
 		rdata[1].next = &(rdata[2]);
 
 		/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
 		rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);
 		rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
-		rdata[2].buffer = buffer;
+		rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer;
 		rdata[2].buffer_std = true;
 		rdata[2].next = NULL;
 
+		/*
+		 * Make a separate rdata entry for the tuple's buffer if we're
+		 * doing logical decoding, so that an eventual FPW doesn't
+		 * remove the tuple's data.
+		 */
+		if (need_tuple_data)
+		{
+			rdata[2].next = &(rdata[3]);
+
+			rdata[3].data = NULL;
+			rdata[3].len = 0;
+			rdata[3].buffer = buffer;
+			rdata[3].buffer_std = true;
+			rdata[3].next = NULL;
+
+			xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+		}
+
 		/*
 		 * If this is the single and first tuple on page, we can reinit the
 		 * page instead of restoring the whole thing.  Set flag, and hide
@@ -2146,7 +2182,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
 		{
 			info |= XLOG_HEAP_INIT_PAGE;
-			rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
+			rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
 		}
 
 		recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@@ -2272,6 +2308,8 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	Page		page;
 	bool		needwal;
 	Size		saveFreeSpace;
+	bool		need_tuple_data = RelationIsLogicallyLogged(relation);
+	bool		need_cids = RelationIsAccessibleInLogicalDecoding(relation);
 
 	needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
@@ -2358,7 +2396,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 		{
 			XLogRecPtr	recptr;
 			xl_heap_multi_insert *xlrec;
-			XLogRecData rdata[2];
+			XLogRecData rdata[3];
 			uint8		info = XLOG_HEAP2_MULTI_INSERT;
 			char	   *tupledata;
 			int			totaldatalen;
@@ -2388,7 +2426,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 			/* the rest of the scratch space is used for tuple data */
 			tupledata = scratchptr;
 
-			xlrec->all_visible_cleared = all_visible_cleared;
+			xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
 			xlrec->node = relation->rd_node;
 			xlrec->blkno = BufferGetBlockNumber(buffer);
 			xlrec->ntuples = nthispage;
@@ -2420,6 +2458,13 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 					   datalen);
 				tuphdr->datalen = datalen;
 				scratchptr += datalen;
+
+				/*
+				 * We don't use heap_multi_insert for catalog tuples yet, but
+				 * better be prepared...
+				 */
+				if (need_cids)
+					log_heap_new_cid(relation, heaptup);
 			}
 			totaldatalen = scratchptr - tupledata;
 			Assert((scratchptr - scratch) < BLCKSZ);
@@ -2431,17 +2476,34 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 
 			rdata[1].data = tupledata;
 			rdata[1].len = totaldatalen;
-			rdata[1].buffer = buffer;
+			rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
 			rdata[1].buffer_std = true;
 			rdata[1].next = NULL;
 
+			/*
+			 * Make a separate rdata entry for the tuple's buffer if
+			 * we're doing logical decoding, so that an eventual FPW
+			 * doesn't remove the tuple's data.
+			 */
+			if (need_tuple_data)
+			{
+				rdata[1].next = &(rdata[2]);
+
+				rdata[2].data = NULL;
+				rdata[2].len = 0;
+				rdata[2].buffer = buffer;
+				rdata[2].buffer_std = true;
+				rdata[2].next = NULL;
+				xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+			}
+
 			/*
 			 * If we're going to reinitialize the whole page using the WAL
 			 * record, hide buffer reference from XLogInsert.
 			 */
 			if (init)
 			{
-				rdata[1].buffer = InvalidBuffer;
+				rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
 				info |= XLOG_HEAP_INIT_PAGE;
 			}
 
@@ -2561,6 +2623,8 @@ heap_delete(Relation relation, ItemPointer tid,
 	bool		have_tuple_lock = false;
 	bool		iscombo;
 	bool		all_visible_cleared = false;
+	HeapTuple	old_key_tuple = NULL; /* replica identity of the tuple */
+	bool		old_key_copied = false;
 
 	Assert(ItemPointerIsValid(tid));
 
@@ -2734,6 +2798,12 @@ l1:
 	/* replace cid with a combo cid if necessary */
 	HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo);
 
+	/*
+	 * Compute replica identity tuple before entering the critical section so
+	 * we don't PANIC upon a memory allocation failure.
+	 */
+	old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
+
 	START_CRIT_SECTION();
 
 	/*
@@ -2786,9 +2856,13 @@ l1:
 	{
 		xl_heap_delete xlrec;
 		XLogRecPtr	recptr;
-		XLogRecData rdata[2];
+		XLogRecData rdata[4];
+
+		/* For logical decode we need combocids to properly decode the catalog */
+		if (RelationIsAccessibleInLogicalDecoding(relation))
+			log_heap_new_cid(relation, &tp);
 
-		xlrec.all_visible_cleared = all_visible_cleared;
+		xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
 		xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
 											  tp.t_data->t_infomask2);
 		xlrec.target.node = relation->rd_node;
@@ -2805,6 +2879,37 @@ l1:
 		rdata[1].buffer_std = true;
 		rdata[1].next = NULL;
 
+		/*
+		 * Log replica identity of the deleted tuple if there is one
+		 */
+		if (old_key_tuple != NULL)
+		{
+			xl_heap_header xlhdr;
+
+			xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+			xlhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+			xlhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+			rdata[1].next = &(rdata[2]);
+			rdata[2].data = (char*)&xlhdr;
+			rdata[2].len = SizeOfHeapHeader;
+			rdata[2].buffer = InvalidBuffer;
+			rdata[2].next = NULL;
+
+			rdata[2].next = &(rdata[3]);
+			rdata[3].data = (char *) old_key_tuple->t_data
+				+ offsetof(HeapTupleHeaderData, t_bits);
+			rdata[3].len = old_key_tuple->t_len
+				- offsetof(HeapTupleHeaderData, t_bits);
+			rdata[3].buffer = InvalidBuffer;
+			rdata[3].next = NULL;
+
+			if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+				xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
+			else
+				xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+		}
+
 		recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
 
 		PageSetLSN(page, recptr);
@@ -2850,6 +2955,9 @@ l1:
 
 	pgstat_count_heap_delete(relation);
 
+	if (old_key_tuple != NULL && old_key_copied)
+		heap_freetuple(old_key_tuple);
+
 	return HeapTupleMayBeUpdated;
 }
 
@@ -2934,9 +3042,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 	TransactionId xid = GetCurrentTransactionId();
 	Bitmapset  *hot_attrs;
 	Bitmapset  *key_attrs;
+	Bitmapset  *id_attrs;
 	ItemId		lp;
 	HeapTupleData oldtup;
 	HeapTuple	heaptup;
+	HeapTuple	old_key_tuple = NULL;
+	bool		old_key_copied = false;
 	Page		page;
 	BlockNumber block;
 	MultiXactStatus mxact_status;
@@ -2952,6 +3063,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 	bool		iscombo;
 	bool		satisfies_hot;
 	bool		satisfies_key;
+	bool		satisfies_id;
 	bool		use_hot_update = false;
 	bool		key_intact;
 	bool		all_visible_cleared = false;
@@ -2979,8 +3091,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 	 * Note that we get a copy here, so we need not worry about relcache flush
 	 * happening midway through.
 	 */
-	hot_attrs = RelationGetIndexAttrBitmap(relation, false);
-	key_attrs = RelationGetIndexAttrBitmap(relation, true);
+	hot_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_ALL);
+	key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY);
+	id_attrs = RelationGetIndexAttrBitmap(relation,
+										   INDEX_ATTR_BITMAP_IDENTITY_KEY);
 
 	block = ItemPointerGetBlockNumber(otid);
 	buffer = ReadBuffer(relation, block);
@@ -3038,9 +3152,9 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 	 * is updates that don't manipulate key columns, not those that
 	 * serendipitiously arrive at the same key values.
 	 */
-	HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs,
+	HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, id_attrs,
 								 &satisfies_hot, &satisfies_key,
-								 &oldtup, newtup);
+								 &satisfies_id, &oldtup, newtup);
 	if (satisfies_key)
 	{
 		*lockmode = LockTupleNoKeyExclusive;
@@ -3514,6 +3628,14 @@ l2:
 		PageSetFull(page);
 	}
 
+	/*
+	 * Compute replica identity tuple before entering the critical section so
+	 * we don't PANIC upon a memory allocation failure.
+	 * ExtractReplicaIdentity() will return NULL if nothing needs to be
+	 * logged.
+	 */
+	old_key_tuple = ExtractReplicaIdentity(relation, &oldtup, !satisfies_id, &old_key_copied);
+
 	/* NO EREPORT(ERROR) from here till changes are logged */
 	START_CRIT_SECTION();
 
@@ -3589,11 +3711,23 @@ l2:
 	/* XLOG stuff */
 	if (RelationNeedsWAL(relation))
 	{
-		XLogRecPtr	recptr = log_heap_update(relation, buffer,
-											 newbuf, &oldtup, heaptup,
-											 all_visible_cleared,
-											 all_visible_cleared_new);
+		XLogRecPtr	recptr;
 
+		/*
+		 * For logical decoding we need combocids to properly decode the
+		 * catalog.
+		 */
+		if (RelationIsAccessibleInLogicalDecoding(relation))
+		{
+			log_heap_new_cid(relation, &oldtup);
+			log_heap_new_cid(relation, heaptup);
+		}
+
+		recptr = log_heap_update(relation, buffer,
+								 newbuf, &oldtup, heaptup,
+								 old_key_tuple,
+								 all_visible_cleared,
+								 all_visible_cleared_new);
 		if (newbuf != buffer)
 		{
 			PageSetLSN(BufferGetPage(newbuf), recptr);
@@ -3644,6 +3778,9 @@ l2:
 		heap_freetuple(heaptup);
 	}
 
+	if (old_key_tuple != NULL && old_key_copied)
+		heap_freetuple(old_key_tuple);
+
 	bms_free(hot_attrs);
 	bms_free(key_attrs);
 
@@ -3731,63 +3868,72 @@ heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,
 /*
  * Check which columns are being updated.
  *
- * This simultaneously checks conditions for HOT updates and for FOR KEY
- * SHARE updates.  Since much of the time they will be checking very similar
- * sets of columns, and doing the same tests on them, it makes sense to
- * optimize and do them together.
+ * This simultaneously checks conditions for HOT updates, for FOR KEY
+ * SHARE updates, and REPLICA IDENTITY concerns.  Since much of the time they
+ * will be checking very similar sets of columns, and doing the same tests on
+ * them, it makes sense to optimize and do them together.
  *
- * We receive two bitmapsets comprising the two sets of columns we're
+ * We receive three bitmapsets comprising the three sets of columns we're
  * interested in.  Note these are destructively modified; that is OK since
  * this is invoked at most once in heap_update.
  *
  * hot_result is set to TRUE if it's okay to do a HOT update (i.e. it does not
  * modified indexed columns); key_result is set to TRUE if the update does not
- * modify columns used in the key.
+ * modify columns used in the key; id_result is set to TRUE if the update does
+ * not modify columns in any index marked as the REPLICA IDENTITY.
  */
 static void
-HeapSatisfiesHOTandKeyUpdate(Relation relation,
-							 Bitmapset *hot_attrs, Bitmapset *key_attrs,
+HeapSatisfiesHOTandKeyUpdate(Relation relation, Bitmapset *hot_attrs,
+							 Bitmapset *key_attrs, Bitmapset *id_attrs,
 							 bool *satisfies_hot, bool *satisfies_key,
+							 bool *satisfies_id,
 							 HeapTuple oldtup, HeapTuple newtup)
 {
 	int			next_hot_attnum;
 	int			next_key_attnum;
+	int			next_id_attnum;
 	bool		hot_result = true;
 	bool		key_result = true;
-	bool		key_done = false;
-	bool		hot_done = false;
+	bool		id_result = true;
 
-	next_hot_attnum = bms_first_member(hot_attrs);
-	if (next_hot_attnum == -1)
-		hot_done = true;
-	else
-		/* Adjust for system attributes */
-		next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
+	/* If REPLICA IDENTITY is set to FULL, id_attrs will be empty. */
+	Assert(bms_is_subset(id_attrs, key_attrs));
+	Assert(bms_is_subset(key_attrs, hot_attrs));
 
+	/*
+	 * If one of these sets contains no remaining bits, bms_first_member will
+	 * return -1, and after adding FirstLowInvalidHeapAttributeNumber (which
+	 * is negative!)  we'll get an attribute number that can't possibly be
+	 * real, and thus won't match any actual attribute number.
+	 */
+	next_hot_attnum = bms_first_member(hot_attrs);
+	next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
 	next_key_attnum = bms_first_member(key_attrs);
-	if (next_key_attnum == -1)
-		key_done = true;
-	else
-		/* Adjust for system attributes */
-		next_key_attnum += FirstLowInvalidHeapAttributeNumber;
+	next_key_attnum += FirstLowInvalidHeapAttributeNumber;
+	next_id_attnum = bms_first_member(id_attrs);
+	next_id_attnum += FirstLowInvalidHeapAttributeNumber;
 
 	for (;;)
 	{
-		int			check_now;
 		bool		changed;
+		int			check_now;
 
-		/* both bitmapsets are now empty */
-		if (key_done && hot_done)
-			break;
-
-		/* XXX there's probably an easier way ... */
-		if (hot_done)
-			check_now = next_key_attnum;
-		if (key_done)
+		/*
+		 * Since the HOT attributes are a superset of the key attributes and
+		 * the key attributes are a superset of the id attributes, this logic
+		 * is guaranteed to identify the next column that needs to be
+		 * checked.
+		 */
+		if (hot_result && next_hot_attnum > FirstLowInvalidHeapAttributeNumber)
 			check_now = next_hot_attnum;
+		else if (key_result && next_key_attnum > FirstLowInvalidHeapAttributeNumber)
+			check_now = next_key_attnum;
+		else if (id_result && next_id_attnum > FirstLowInvalidHeapAttributeNumber)
+			check_now = next_id_attnum;
 		else
-			check_now = Min(next_hot_attnum, next_key_attnum);
+			break;
 
+		/* See whether it changed. */
 		changed = !heap_tuple_attr_equals(RelationGetDescr(relation),
 										  check_now, oldtup, newtup);
 		if (changed)
@@ -3796,34 +3942,42 @@ HeapSatisfiesHOTandKeyUpdate(Relation relation,
 				hot_result = false;
 			if (check_now == next_key_attnum)
 				key_result = false;
-		}
+			if (check_now == next_id_attnum)
+				id_result = false;
 
-		/* if both are false now, we can stop checking */
-		if (!hot_result && !key_result)
-			break;
+			/* if all are false now, we can stop checking */
+			if (!hot_result && !key_result && !id_result)
+				break;
+		}
 
-		if (check_now == next_hot_attnum)
+		/*
+		 * Advance the next attribute numbers for the sets that contain
+		 * the attribute we just checked.  As we work our way through the
+		 * columns, the next_attnum values will rise; but when each set
+		 * becomes empty, bms_first_member() will return -1 and the attribute
+		 * number will end up with a value less than
+		 * FirstLowInvalidHeapAttributeNumber.
+		 */
+		if (hot_result && check_now == next_hot_attnum)
 		{
 			next_hot_attnum = bms_first_member(hot_attrs);
-			if (next_hot_attnum == -1)
-				hot_done = true;
-			else
-				/* Adjust for system attributes */
-				next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
+			next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
 		}
-		if (check_now == next_key_attnum)
+		if (key_result && check_now == next_key_attnum)
 		{
 			next_key_attnum = bms_first_member(key_attrs);
-			if (next_key_attnum == -1)
-				key_done = true;
-			else
-				/* Adjust for system attributes */
-				next_key_attnum += FirstLowInvalidHeapAttributeNumber;
+			next_key_attnum += FirstLowInvalidHeapAttributeNumber;
+		}
+		if (id_result && check_now == next_id_attnum)
+		{
+			next_id_attnum = bms_first_member(id_attrs);
+			next_id_attnum += FirstLowInvalidHeapAttributeNumber;
 		}
 	}
 
 	*satisfies_hot = hot_result;
 	*satisfies_key = key_result;
+	*satisfies_id = id_result;
 }
 
 /*
@@ -6140,14 +6294,17 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
 static XLogRecPtr
 log_heap_update(Relation reln, Buffer oldbuf,
 				Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
+				HeapTuple old_key_tuple,
 				bool all_visible_cleared, bool new_all_visible_cleared)
 {
 	xl_heap_update xlrec;
-	xl_heap_header xlhdr;
+	xl_heap_header_len xlhdr;
+	xl_heap_header_len xlhdr_idx;
 	uint8		info;
 	XLogRecPtr	recptr;
-	XLogRecData rdata[4];
+	XLogRecData rdata[7];
 	Page		page = BufferGetPage(newbuf);
+	bool		need_tuple_data = RelationIsLogicallyLogged(reln);
 
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
@@ -6163,9 +6320,12 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask,
 											  oldtup->t_data->t_infomask2);
 	xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
-	xlrec.all_visible_cleared = all_visible_cleared;
+	xlrec.flags = 0;
+	if (all_visible_cleared)
+		xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED;
 	xlrec.newtid = newtup->t_self;
-	xlrec.new_all_visible_cleared = new_all_visible_cleared;
+	if (new_all_visible_cleared)
+		xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED;
 
 	rdata[0].data = (char *) &xlrec;
 	rdata[0].len = SizeOfHeapUpdate;
@@ -6178,33 +6338,86 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	rdata[1].buffer_std = true;
 	rdata[1].next = &(rdata[2]);
 
-	xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
-	xlhdr.t_infomask = newtup->t_data->t_infomask;
-	xlhdr.t_hoff = newtup->t_data->t_hoff;
+	xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2;
+	xlhdr.header.t_infomask = newtup->t_data->t_infomask;
+	xlhdr.header.t_hoff = newtup->t_data->t_hoff;
+	xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
 
 	/*
-	 * As with insert records, we need not store the rdata[2] segment if we
-	 * decide to store the whole buffer instead.
+	 * As with insert records, we need not store the rdata[2] segment
+	 * if we decide to store the whole buffer instead unless we're
+	 * doing logical decoding.
 	 */
 	rdata[2].data = (char *) &xlhdr;
-	rdata[2].len = SizeOfHeapHeader;
-	rdata[2].buffer = newbuf;
+	rdata[2].len = SizeOfHeapHeaderLen;
+	rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf;
 	rdata[2].buffer_std = true;
 	rdata[2].next = &(rdata[3]);
 
 	/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
-	rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
+	rdata[3].data = (char *) newtup->t_data
+		+ offsetof(HeapTupleHeaderData, t_bits);
 	rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
-	rdata[3].buffer = newbuf;
+	rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf;
 	rdata[3].buffer_std = true;
 	rdata[3].next = NULL;
 
+	/*
+	 * Separate storage for the FPW buffer reference of the new page in the
+	 * wal_level >= logical case.
+	*/
+	if (need_tuple_data)
+	{
+		rdata[3].next = &(rdata[4]);
+
+		rdata[4].data = NULL,
+		rdata[4].len = 0;
+		rdata[4].buffer = newbuf;
+		rdata[4].buffer_std = true;
+		rdata[4].next = NULL;
+		xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+
+		/* We need to log a tuple identity */
+		if (old_key_tuple)
+		{
+			/* don't really need this, but its more comfy to decode */
+			xlhdr_idx.header.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+			xlhdr_idx.header.t_infomask = old_key_tuple->t_data->t_infomask;
+			xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff;
+			xlhdr_idx.t_len = old_key_tuple->t_len;
+
+			rdata[4].next = &(rdata[5]);
+			rdata[5].data = (char *) &xlhdr_idx;
+			rdata[5].len = SizeOfHeapHeaderLen;
+			rdata[5].buffer = InvalidBuffer;
+			rdata[5].next = &(rdata[6]);
+
+			/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
+			rdata[6].data = (char *) old_key_tuple->t_data
+				+ offsetof(HeapTupleHeaderData, t_bits);
+			rdata[6].len = old_key_tuple->t_len
+				- offsetof(HeapTupleHeaderData, t_bits);
+			rdata[6].buffer = InvalidBuffer;
+			rdata[6].next = NULL;
+
+			if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+				xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
+			else
+				xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+		}
+	}
+
 	/* If new tuple is the single and first tuple on page... */
 	if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
 		PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
 	{
+		XLogRecData *rcur = &rdata[2];
 		info |= XLOG_HEAP_INIT_PAGE;
-		rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
+		while (rcur != NULL)
+		{
+			rcur->buffer = InvalidBuffer;
+			rcur = rcur->next;
+		}
 	}
 
 	recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@@ -6339,6 +6552,184 @@ log_newpage_buffer(Buffer buffer, bool page_std)
 	return log_newpage(&rnode, forkNum, blkno, page, page_std);
 }
 
+/*
+ * Perform XLogInsert of a XLOG_HEAP2_NEW_CID record
+ *
+ * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog
+ * tuples.
+ */
+static XLogRecPtr
+log_heap_new_cid(Relation relation, HeapTuple tup)
+{
+	xl_heap_new_cid xlrec;
+
+	XLogRecPtr	recptr;
+	XLogRecData rdata[1];
+	HeapTupleHeader hdr = tup->t_data;
+
+	Assert(ItemPointerIsValid(&tup->t_self));
+	Assert(tup->t_tableOid != InvalidOid);
+
+	xlrec.top_xid = GetTopTransactionId();
+	xlrec.target.node = relation->rd_node;
+	xlrec.target.tid = tup->t_self;
+
+	/*
+	 * If the tuple got inserted & deleted in the same TX we definitely have a
+	 * combocid, set cmin and cmax.
+	 */
+	if (hdr->t_infomask & HEAP_COMBOCID)
+	{
+		Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID));
+		Assert(!(hdr->t_infomask & HEAP_XMIN_INVALID));
+		xlrec.cmin = HeapTupleHeaderGetCmin(hdr);
+		xlrec.cmax = HeapTupleHeaderGetCmax(hdr);
+		xlrec.combocid = HeapTupleHeaderGetRawCommandId(hdr);
+	}
+	/* No combocid, so only cmin or cmax can be set by this TX */
+	else
+	{
+		/*
+		 * Tuple inserted.
+		 *
+		 * We need to check for LOCK ONLY because multixacts might be
+		 * transferred to the new tuple in case of FOR KEY SHARE updates in
+		 * which case there will be a xmax, although the tuple just got
+		 * inserted.
+		 */
+		if (hdr->t_infomask & HEAP_XMAX_INVALID ||
+			HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask))
+		{
+			xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr);
+			xlrec.cmax = InvalidCommandId;
+		}
+		/* Tuple from a different tx updated or deleted. */
+		else
+		{
+			xlrec.cmin = InvalidCommandId;
+			xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr);
+
+		}
+		xlrec.combocid = InvalidCommandId;
+	}
+
+	rdata[0].data = (char *) &xlrec;
+	rdata[0].len = SizeOfHeapNewCid;
+	rdata[0].buffer = InvalidBuffer;
+	rdata[0].next = NULL;
+
+	recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata);
+
+	return recptr;
+}
+
+/*
+ * Build a heap tuple representing the configured REPLICA IDENTITY to represent
+ * the old tuple in a UPDATE or DELETE.
+ *
+ * Returns NULL if there's no need to log a identity or if there's no suitable
+ * key in the Relation relation.
+ */
+static HeapTuple
+ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *copy)
+{
+	TupleDesc	desc = RelationGetDescr(relation);
+	Relation	idx_rel;
+	TupleDesc	idx_desc;
+	char		replident = relation->rd_rel->relreplident;
+	HeapTuple	key_tuple = NULL;
+	bool		copy_oid = false;
+	bool		nulls[MaxHeapAttributeNumber];
+	Datum		values[MaxHeapAttributeNumber];
+	int			natt;
+
+	*copy = false;
+
+	if (!RelationIsLogicallyLogged(relation))
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_NOTHING)
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_FULL)
+	{
+		/*
+		 * When logging the entire old tuple, it very well could contain
+		 * toasted columns. If so, force them to be inlined.
+		 */
+		if (HeapTupleHasExternal(tp))
+		{
+			*copy = true;
+			tp = toast_flatten_tuple(tp, RelationGetDescr(relation));
+		}
+		return tp;
+	}
+
+	/* if the key hasn't changed and we're only logging the key, we're done */
+	if (!key_changed)
+		return NULL;
+
+	/* needs to already have been fetched? */
+	if (relation->rd_indexvalid == 0)
+		RelationGetIndexList(relation);
+
+	if (!OidIsValid(relation->rd_replidindex))
+	{
+		elog(DEBUG4, "Could not find configured replica identity for table \"%s\"",
+			 RelationGetRelationName(relation));
+		return NULL;
+	}
+
+	idx_rel = RelationIdGetRelation(relation->rd_replidindex);
+	idx_desc = RelationGetDescr(idx_rel);
+
+	/* deform tuple, so we have fast access to columns */
+	heap_deform_tuple(tp, desc, values, nulls);
+
+	/* set all columns to NULL, regardless of whether they actually are */
+	memset(nulls, 1, sizeof(nulls));
+
+	/*
+	 * Now set all columns contained in the index to NOT NULL, they cannot
+	 * currently be NULL.
+	 */
+	for (natt = 0; natt < idx_desc->natts; natt++)
+	{
+		int attno = idx_rel->rd_index->indkey.values[natt];
+
+		if (attno == ObjectIdAttributeNumber)
+			copy_oid = true;
+		else if (attno < 0)
+			elog(ERROR, "system column in index");
+		else
+			nulls[attno - 1] = false;
+	}
+
+	key_tuple = heap_form_tuple(desc, values, nulls);
+	*copy = true;
+	RelationClose(idx_rel);
+
+	/* XXX: we could also do this unconditionally, the space is used anyway */
+	if (copy_oid)
+		HeapTupleSetOid(key_tuple, HeapTupleGetOid(tp));
+
+	/*
+	 * If the tuple, which by here only contains indexed columns, still has
+	 * toasted columns, force them to be inlined. This is somewhat unlikely
+	 * since there's limits on the size of indexed columns, so we don't
+	 * duplicate toast_flatten_tuple()s functionality in the above loop over
+	 * the indexed columns, even if it would be more efficient.
+	 */
+	if (HeapTupleHasExternal(key_tuple))
+	{
+		HeapTuple oldtup = key_tuple;
+		key_tuple = toast_flatten_tuple(oldtup, RelationGetDescr(relation));
+		heap_freetuple(oldtup);
+	}
+
+	return key_tuple;
+}
+
 /*
  * Handles CLEANUP_INFO
  */
@@ -6714,7 +7105,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
 	 * The visibility map may need to be fixed even if the heap page is
 	 * already up-to-date.
 	 */
-	if (xlrec->all_visible_cleared)
+	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 	{
 		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
 		Buffer		vmbuffer = InvalidBuffer;
@@ -6763,7 +7154,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
 	/* Mark the page as a candidate for pruning */
 	PageSetPrunable(page, record->xl_xid);
 
-	if (xlrec->all_visible_cleared)
+	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 		PageClearAllVisible(page);
 
 	/* Make sure there is no forward chain link in t_ctid */
@@ -6797,7 +7188,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
 	 * The visibility map may need to be fixed even if the heap page is
 	 * already up-to-date.
 	 */
-	if (xlrec->all_visible_cleared)
+	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 	{
 		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
 		Buffer		vmbuffer = InvalidBuffer;
@@ -6868,7 +7259,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
 
 	PageSetLSN(page, lsn);
 
-	if (xlrec->all_visible_cleared)
+	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 		PageClearAllVisible(page);
 
 	MarkBufferDirty(buffer);
@@ -6931,7 +7322,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
 	 * The visibility map may need to be fixed even if the heap page is
 	 * already up-to-date.
 	 */
-	if (xlrec->all_visible_cleared)
+	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 	{
 		Relation	reln = CreateFakeRelcacheEntry(xlrec->node);
 		Buffer		vmbuffer = InvalidBuffer;
@@ -7014,7 +7405,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
 
 	PageSetLSN(page, lsn);
 
-	if (xlrec->all_visible_cleared)
+	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 		PageClearAllVisible(page);
 
 	MarkBufferDirty(buffer);
@@ -7053,7 +7444,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 		HeapTupleHeaderData hdr;
 		char		data[MaxHeapTupleSize];
 	}			tbuf;
-	xl_heap_header xlhdr;
+	xl_heap_header_len xlhdr;
 	int			hsize;
 	uint32		newlen;
 	Size		freespace;
@@ -7062,7 +7453,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 	 * The visibility map may need to be fixed even if the heap page is
 	 * already up-to-date.
 	 */
-	if (xlrec->all_visible_cleared)
+	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 	{
 		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
 		BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
@@ -7140,7 +7531,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 	/* Mark the page as a candidate for pruning */
 	PageSetPrunable(page, record->xl_xid);
 
-	if (xlrec->all_visible_cleared)
+	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 		PageClearAllVisible(page);
 
 	/*
@@ -7164,7 +7555,7 @@ newt:;
 	 * The visibility map may need to be fixed even if the heap page is
 	 * already up-to-date.
 	 */
-	if (xlrec->new_all_visible_cleared)
+	if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
 	{
 		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
 		BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
@@ -7222,13 +7613,13 @@ newsame:;
 	if (PageGetMaxOffsetNumber(page) + 1 < offnum)
 		elog(PANIC, "heap_update_redo: invalid max offset number");
 
-	hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
+	hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen;
 
-	newlen = record->xl_len - hsize;
-	Assert(newlen <= MaxHeapTupleSize);
 	memcpy((char *) &xlhdr,
 		   (char *) xlrec + SizeOfHeapUpdate,
-		   SizeOfHeapHeader);
+		   SizeOfHeapHeaderLen);
+	newlen = xlhdr.t_len;
+	Assert(newlen <= MaxHeapTupleSize);
 	htup = &tbuf.hdr;
 	MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
 	/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
@@ -7236,9 +7627,9 @@ newsame:;
 		   (char *) xlrec + hsize,
 		   newlen);
 	newlen += offsetof(HeapTupleHeaderData, t_bits);
-	htup->t_infomask2 = xlhdr.t_infomask2;
-	htup->t_infomask = xlhdr.t_infomask;
-	htup->t_hoff = xlhdr.t_hoff;
+	htup->t_infomask2 = xlhdr.header.t_infomask2;
+	htup->t_infomask = xlhdr.header.t_infomask;
+	htup->t_hoff = xlhdr.header.t_hoff;
 
 	HeapTupleHeaderSetXmin(htup, record->xl_xid);
 	HeapTupleHeaderSetCmin(htup, FirstCommandId);
@@ -7250,7 +7641,7 @@ newsame:;
 	if (offnum == InvalidOffsetNumber)
 		elog(PANIC, "heap_update_redo: failed to add tuple");
 
-	if (xlrec->new_all_visible_cleared)
+	if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
 		PageClearAllVisible(page);
 
 	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
@@ -7501,6 +7892,12 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
 		case XLOG_HEAP2_LOCK_UPDATED:
 			heap_xlog_lock_updated(lsn, record);
 			break;
+		case XLOG_HEAP2_NEW_CID:
+			/*
+			 * Nothing to do on a real replay, only used during logical
+			 * decoding.
+			 */
+			break;
 		default:
 			elog(PANIC, "heap2_redo: unknown op code %u", info);
 	}
diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c
index e14c0539105..39c53d0022d 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -184,6 +184,15 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
 						 xlrec->infobits_set);
 		out_target(buf, &(xlrec->target));
 	}
+	else if (info == XLOG_HEAP2_NEW_CID)
+	{
+		xl_heap_new_cid *xlrec = (xl_heap_new_cid *) rec;
+
+		appendStringInfo(buf, "new_cid: ");
+		out_target(buf, &(xlrec->target));
+		appendStringInfo(buf, "; cmin: %u, cmax: %u, combo: %u",
+						 xlrec->cmin, xlrec->cmax, xlrec->combocid);
+	}
 	else
 		appendStringInfoString(buf, "UNKNOWN");
 }
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 1d70494233e..dd67b1f643f 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -28,6 +28,7 @@ const struct config_enum_entry wal_level_options[] = {
 	{"minimal", WAL_LEVEL_MINIMAL, false},
 	{"archive", WAL_LEVEL_ARCHIVE, false},
 	{"hot_standby", WAL_LEVEL_HOT_STANDBY, false},
+	{"logical", WAL_LEVEL_LOGICAL, false},
 	{NULL, 0, false}
 };
 
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e975f8d26d0..48203d2c81e 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -47,6 +47,7 @@
 #include "access/twophase.h"
 #include "access/twophase_rmgr.h"
 #include "access/xact.h"
+#include "access/xlog.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_type.h"
 #include "catalog/storage.h"
@@ -1920,7 +1921,8 @@ RecoverPreparedTransactions(void)
 			 * the prepared transaction generated xid assignment records. Test
 			 * here must match one used in AssignTransactionId().
 			 */
-			if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
+			if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
+								 XLogLogicalInfoActive()))
 				overwriteOK = true;
 
 			/*
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b467b5c89d1..ad0d19c4a90 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -148,6 +148,7 @@ typedef struct TransactionStateData
 	int			prevSecContext; /* previous SecurityRestrictionContext */
 	bool		prevXactReadOnly;		/* entry-time xact r/o state */
 	bool		startedInRecovery;		/* did we start in recovery? */
+	bool		didLogXid;		/* has xid been included in WAL record? */
 	struct TransactionStateData *parent;		/* back link to parent */
 } TransactionStateData;
 
@@ -177,6 +178,7 @@ static TransactionStateData TopTransactionStateData = {
 	0,							/* previous SecurityRestrictionContext */
 	false,						/* entry-time xact r/o state */
 	false,						/* startedInRecovery */
+	false,						/* didLogXid */
 	NULL						/* link to parent state block */
 };
 
@@ -394,6 +396,19 @@ GetCurrentTransactionIdIfAny(void)
 	return CurrentTransactionState->transactionId;
 }
 
+/*
+ *	MarkCurrentTransactionIdLoggedIfAny
+ *
+ * Remember that the current xid - if it is assigned - now has been wal logged.
+ */
+void
+MarkCurrentTransactionIdLoggedIfAny(void)
+{
+	if (TransactionIdIsValid(CurrentTransactionState->transactionId))
+		CurrentTransactionState->didLogXid = true;
+}
+
+
 /*
  *	GetStableLatestTransactionId
  *
@@ -435,6 +450,7 @@ AssignTransactionId(TransactionState s)
 {
 	bool		isSubXact = (s->parent != NULL);
 	ResourceOwner currentOwner;
+	bool log_unknown_top = false;
 
 	/* Assert that caller didn't screw up */
 	Assert(!TransactionIdIsValid(s->transactionId));
@@ -469,6 +485,20 @@ AssignTransactionId(TransactionState s)
 		pfree(parents);
 	}
 
+	/*
+	 * When wal_level=logical, guarantee that a subtransaction's xid can only
+	 * be seen in the WAL stream if its toplevel xid has been logged
+	 * before. If necessary we log a xact_assignment record with fewer than
+	 * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set
+	 * for a transaction even though it appears in a WAL record, we just might
+	 * superfluously log something. That can happen when an xid is included
+	 * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in
+	 * xl_standby_locks.
+	 */
+	if (isSubXact && XLogLogicalInfoActive() &&
+		!TopTransactionStateData.didLogXid)
+		log_unknown_top = true;
+
 	/*
 	 * Generate a new Xid and record it in PG_PROC and pg_subtrans.
 	 *
@@ -523,6 +553,9 @@ AssignTransactionId(TransactionState s)
 	 * top-level transaction that each subxact belongs to. This is correct in
 	 * recovery only because aborted subtransactions are separately WAL
 	 * logged.
+	 *
+	 * This is correct even for the case where several levels above us didn't
+	 * have an xid assigned as we recursed up to them beforehand.
 	 */
 	if (isSubXact && XLogStandbyInfoActive())
 	{
@@ -533,7 +566,8 @@ AssignTransactionId(TransactionState s)
 		 * ensure this test matches similar one in
 		 * RecoverPreparedTransactions()
 		 */
-		if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS)
+		if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS ||
+			log_unknown_top)
 		{
 			XLogRecData rdata[2];
 			xl_xact_assignment xlrec;
@@ -552,13 +586,15 @@ AssignTransactionId(TransactionState s)
 			rdata[0].next = &rdata[1];
 
 			rdata[1].data = (char *) unreportedXids;
-			rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId);
+			rdata[1].len = nUnreportedXids * sizeof(TransactionId);
 			rdata[1].buffer = InvalidBuffer;
 			rdata[1].next = NULL;
 
 			(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata);
 
 			nUnreportedXids = 0;
+			/* mark top, not current xact as having been logged */
+			TopTransactionStateData.didLogXid = true;
 		}
 	}
 }
@@ -1737,6 +1773,7 @@ StartTransaction(void)
 	 * initialize reported xid accounting
 	 */
 	nUnreportedXids = 0;
+	s->didLogXid = false;
 
 	/*
 	 * must initialize resource-management stuff first
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b68230d1969..6fa5479c92b 100755
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1191,6 +1191,8 @@ begin:;
 	 */
 	WALInsertSlotRelease();
 
+	MarkCurrentTransactionIdLoggedIfAny();
+
 	END_CRIT_SECTION();
 
 	/*
@@ -5961,7 +5963,7 @@ CheckRequiredParameterValues(void)
 	{
 		if (ControlFile->wal_level < WAL_LEVEL_HOT_STANDBY)
 			ereport(ERROR,
-					(errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" on the master server"),
+					(errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" or higher on the master server"),
 					 errhint("Either set wal_level to \"hot_standby\" on the master, or turn off hot_standby here.")));
 
 		/* We ignore autovacuum_max_workers when we make this test. */
@@ -9650,7 +9652,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 			  errmsg("WAL level not sufficient for making an online backup"),
-				 errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start.")));
+				 errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start.")));
 
 	if (strlen(backupidstr) > MAXPGPATH)
 		ereport(ERROR,
@@ -9988,7 +9990,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 			  errmsg("WAL level not sufficient for making an online backup"),
-				 errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start.")));
+				 errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start.")));
 
 	/*
 	 * OK to update backup counters and forcePageWrites
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 02752406f53..aa31429b8ac 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -3321,7 +3321,7 @@ reindex_relation(Oid relid, int flags)
 
 	/* Ensure rd_indexattr is valid; see comments for RelationSetIndexList */
 	if (is_pg_class)
-		(void) RelationGetIndexAttrBitmap(rel, false);
+		(void) RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_ALL);
 
 	PG_TRY();
 	{
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 0008fc633ba..4eff1845f5c 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -2355,7 +2355,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
 	 * concurrency.
 	 */
 	modifiedCols = GetModifiedColumns(relinfo, estate);
-	keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc, true);
+	keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc,
+										 INDEX_ATTR_BITMAP_KEY);
 	if (bms_overlap(keyCols, modifiedCols))
 		lockmode = LockTupleExclusive;
 	else
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index d294a5a47f8..048a1894e59 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -816,10 +816,10 @@ PostmasterMain(int argc, char *argv[])
 	}
 	if (XLogArchiveMode && wal_level == WAL_LEVEL_MINIMAL)
 		ereport(ERROR,
-				(errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\" or \"hot_standby\"")));
+				(errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\", \"hot_standby\" or \"logical\"")));
 	if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL)
 		ereport(ERROR,
-				(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\"")));
+				(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\", \"hot_standby\" or \"logical\"")));
 
 	/*
 	 * Other one-time internal sanity checks can go here, if they are fast.
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index d0acca871e3..03bbb9f3381 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -3818,8 +3818,9 @@ RelationGetIndexPredicate(Relation relation)
  * simple index keys, but attributes used in expressions and partial-index
  * predicates.)
  *
- * If "keyAttrs" is true, only attributes that can be referenced by foreign
- * keys are considered.
+ * Depending on attrKind, a bitmap covering the attnums for all index columns,
+ * for all key columns or for all the columns the configured replica identity
+ * are returned.
  *
  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
  * we can include system attributes (e.g., OID) in the bitmap representation.
@@ -3832,17 +3833,28 @@ RelationGetIndexPredicate(Relation relation)
  * be bms_free'd when not needed anymore.
  */
 Bitmapset *
-RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
+RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
 {
-	Bitmapset  *indexattrs;
-	Bitmapset  *uindexattrs;
+	Bitmapset  *indexattrs; /* indexed columns */
+	Bitmapset  *uindexattrs; /* columns in unique indexes */
+	Bitmapset  *idindexattrs; /* columns in the the replica identity */
 	List	   *indexoidlist;
 	ListCell   *l;
 	MemoryContext oldcxt;
 
 	/* Quick exit if we already computed the result. */
 	if (relation->rd_indexattr != NULL)
-		return bms_copy(keyAttrs ? relation->rd_keyattr : relation->rd_indexattr);
+		switch(attrKind)
+		{
+			case INDEX_ATTR_BITMAP_IDENTITY_KEY:
+				return bms_copy(relation->rd_idattr);
+			case INDEX_ATTR_BITMAP_KEY:
+				return bms_copy(relation->rd_keyattr);
+			case INDEX_ATTR_BITMAP_ALL:
+				return bms_copy(relation->rd_indexattr);
+			default:
+				elog(ERROR, "unknown attrKind %u", attrKind);
+		}
 
 	/* Fast path if definitely no indexes */
 	if (!RelationGetForm(relation)->relhasindex)
@@ -3869,13 +3881,16 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
 	 */
 	indexattrs = NULL;
 	uindexattrs = NULL;
+	idindexattrs = NULL;
 	foreach(l, indexoidlist)
 	{
 		Oid			indexOid = lfirst_oid(l);
 		Relation	indexDesc;
 		IndexInfo  *indexInfo;
 		int			i;
-		bool		isKey;
+		bool		isKey; /* candidate key */
+		bool		isIDKey; /* replica identity index */
+
 
 		indexDesc = index_open(indexOid, AccessShareLock);
 
@@ -3887,6 +3902,9 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
 			indexInfo->ii_Expressions == NIL &&
 			indexInfo->ii_Predicate == NIL;
 
+		/* Is this index the configured (or default) replica identity? */
+		isIDKey = indexOid == relation->rd_replidindex;
+
 		/* Collect simple attribute references */
 		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
 		{
@@ -3896,6 +3914,11 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
 			{
 				indexattrs = bms_add_member(indexattrs,
 							   attrnum - FirstLowInvalidHeapAttributeNumber);
+
+				if (isIDKey)
+					idindexattrs = bms_add_member(idindexattrs,
+												 attrnum - FirstLowInvalidHeapAttributeNumber);
+
 				if (isKey)
 					uindexattrs = bms_add_member(uindexattrs,
 							   attrnum - FirstLowInvalidHeapAttributeNumber);
@@ -3917,10 +3940,21 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 	relation->rd_indexattr = bms_copy(indexattrs);
 	relation->rd_keyattr = bms_copy(uindexattrs);
+	relation->rd_idattr = bms_copy(idindexattrs);
 	MemoryContextSwitchTo(oldcxt);
 
 	/* We return our original working copy for caller to play with */
-	return keyAttrs ? uindexattrs : indexattrs;
+	switch(attrKind)
+	{
+		case INDEX_ATTR_BITMAP_IDENTITY_KEY:
+			return idindexattrs;
+		case INDEX_ATTR_BITMAP_KEY:
+			return uindexattrs;
+		case INDEX_ATTR_BITMAP_ALL:
+			return indexattrs;
+		default:
+			elog(ERROR, "unknown attrKind %u", attrKind);
+	}
 }
 
 /*
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 983cae7fda2..6096fe44457 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -170,7 +170,7 @@
 
 # - Settings -
 
-#wal_level = minimal			# minimal, archive, or hot_standby
+#wal_level = minimal			# minimal, archive, hot_standby or logical
 					# (change requires restart)
 #fsync = on				# turns forced synchronization on or off
 #synchronous_commit = on		# synchronization level;
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index fde483a616a..8c6cf24d237 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -77,6 +77,8 @@ wal_level_str(WalLevel wal_level)
 			return "archive";
 		case WAL_LEVEL_HOT_STANDBY:
 			return "hot_standby";
+		case WAL_LEVEL_LOGICAL:
+			return "logical";
 	}
 	return _("unrecognized wal_level");
 }
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 63b73d03291..438e79db48e 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -55,6 +55,22 @@
 #define XLOG_HEAP2_VISIBLE		0x40
 #define XLOG_HEAP2_MULTI_INSERT 0x50
 #define XLOG_HEAP2_LOCK_UPDATED 0x60
+#define XLOG_HEAP2_NEW_CID		0x70
+
+/*
+ * xl_heap_* ->flag values, 8 bits are available.
+ */
+/* PD_ALL_VISIBLE was cleared */
+#define XLOG_HEAP_ALL_VISIBLE_CLEARED		(1<<0)
+/* PD_ALL_VISIBLE was cleared in the 2nd page */
+#define XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED	(1<<1)
+#define XLOG_HEAP_CONTAINS_OLD_TUPLE		(1<<2)
+#define XLOG_HEAP_CONTAINS_OLD_KEY			(1<<3)
+#define XLOG_HEAP_CONTAINS_NEW_TUPLE		(1<<4)
+
+/* convenience macro for checking whether any form of old tuple was logged */
+#define XLOG_HEAP_CONTAINS_OLD 						\
+	(XLOG_HEAP_CONTAINS_OLD_TUPLE | XLOG_HEAP_CONTAINS_OLD_KEY)
 
 /*
  * All what we need to find changed tuple
@@ -78,10 +94,10 @@ typedef struct xl_heap_delete
 	xl_heaptid	target;			/* deleted tuple id */
 	TransactionId xmax;			/* xmax of the deleted tuple */
 	uint8		infobits_set;	/* infomask bits */
-	bool		all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
+	uint8		flags;
 } xl_heap_delete;
 
-#define SizeOfHeapDelete	(offsetof(xl_heap_delete, all_visible_cleared) + sizeof(bool))
+#define SizeOfHeapDelete	(offsetof(xl_heap_delete, flags) + sizeof(uint8))
 
 /*
  * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
@@ -100,15 +116,29 @@ typedef struct xl_heap_header
 
 #define SizeOfHeapHeader	(offsetof(xl_heap_header, t_hoff) + sizeof(uint8))
 
+/*
+ * Variant of xl_heap_header that contains the length of the tuple, which is
+ * useful if the length of the tuple cannot be computed using the overall
+ * record length. E.g. because there are several tuples inside a single
+ * record.
+ */
+typedef struct xl_heap_header_len
+{
+	uint16      t_len;
+	xl_heap_header header;
+} xl_heap_header_len;
+
+#define SizeOfHeapHeaderLen	(offsetof(xl_heap_header_len, header) + SizeOfHeapHeader)
+
 /* This is what we need to know about insert */
 typedef struct xl_heap_insert
 {
 	xl_heaptid	target;			/* inserted tuple id */
-	bool		all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
+	uint8		flags;
 	/* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
 } xl_heap_insert;
 
-#define SizeOfHeapInsert	(offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
+#define SizeOfHeapInsert	(offsetof(xl_heap_insert, flags) + sizeof(uint8))
 
 /*
  * This is what we need to know about a multi-insert. The record consists of
@@ -120,7 +150,7 @@ typedef struct xl_heap_multi_insert
 {
 	RelFileNode node;
 	BlockNumber blkno;
-	bool		all_visible_cleared;
+	uint8		flags;
 	uint16		ntuples;
 	OffsetNumber offsets[1];
 
@@ -147,13 +177,12 @@ typedef struct xl_heap_update
 	TransactionId old_xmax;		/* xmax of the old tuple */
 	TransactionId new_xmax;		/* xmax of the new tuple */
 	ItemPointerData newtid;		/* new inserted tuple id */
-	uint8		old_infobits_set;		/* infomask bits to set on old tuple */
-	bool		all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
-	bool		new_all_visible_cleared;		/* same for the page of newtid */
+	uint8		old_infobits_set;	/* infomask bits to set on old tuple */
+	uint8		flags;
 	/* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */
 } xl_heap_update;
 
-#define SizeOfHeapUpdate	(offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool))
+#define SizeOfHeapUpdate	(offsetof(xl_heap_update, flags) + sizeof(uint8))
 
 /*
  * This is what we need to know about vacuum page cleanup/redirect
@@ -263,6 +292,29 @@ typedef struct xl_heap_visible
 
 #define SizeOfHeapVisible (offsetof(xl_heap_visible, cutoff_xid) + sizeof(TransactionId))
 
+typedef struct xl_heap_new_cid
+{
+	/*
+	 * store toplevel xid so we don't have to merge cids from different
+	 * transactions
+	 */
+	TransactionId top_xid;
+	CommandId cmin;
+	CommandId cmax;
+	/*
+	 * don't really need the combocid since we have the actual values
+	 * right in this struct, but the padding makes it free and its
+	 * useful for debugging.
+	 */
+	CommandId combocid;
+	/*
+	 * Store the relfilenode/ctid pair to facilitate lookups.
+	 */
+	xl_heaptid target;
+} xl_heap_new_cid;
+
+#define SizeOfHeapNewCid (offsetof(xl_heap_new_cid, target) + SizeOfHeapTid)
+
 extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 									   TransactionId *latestRemovedXid);
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 1d3e7d8938a..9632378865e 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -215,6 +215,7 @@ extern TransactionId GetCurrentTransactionId(void);
 extern TransactionId GetCurrentTransactionIdIfAny(void);
 extern TransactionId GetStableLatestTransactionId(void);
 extern SubTransactionId GetCurrentSubTransactionId(void);
+extern void MarkCurrentTransactionIdLoggedIfAny(void);
 extern bool SubTransactionIsActive(SubTransactionId subxid);
 extern CommandId GetCurrentCommandId(bool used);
 extern TimestampTz GetCurrentTransactionStartTimestamp(void);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 002862cca50..7415a261bbd 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -197,7 +197,8 @@ typedef enum WalLevel
 {
 	WAL_LEVEL_MINIMAL = 0,
 	WAL_LEVEL_ARCHIVE,
-	WAL_LEVEL_HOT_STANDBY
+	WAL_LEVEL_HOT_STANDBY,
+	WAL_LEVEL_LOGICAL
 } WalLevel;
 extern int	wal_level;
 
@@ -210,9 +211,12 @@ extern int	wal_level;
  */
 #define XLogIsNeeded() (wal_level >= WAL_LEVEL_ARCHIVE)
 
-/* Do we need to WAL-log information required only for Hot Standby? */
+/* Do we need to WAL-log information required only for Hot Standby and logical replication? */
 #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY)
 
+/* Do we need to WAL-log information required only for logical replication? */
+#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL)
+
 #ifdef WAL_DEBUG
 extern bool XLOG_DEBUG;
 #endif
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 9fba8c3db86..64ba55355b9 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -55,7 +55,7 @@ typedef struct BkpBlock
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD078	/* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD079	/* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 21d5871454b..ad878cf1a22 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -104,6 +104,7 @@ typedef struct RelationData
 	List	   *rd_indexlist;	/* list of OIDs of indexes on relation */
 	Bitmapset  *rd_indexattr;	/* identifies columns used in indexes */
 	Bitmapset  *rd_keyattr;		/* cols that can be ref'd by foreign keys */
+	Bitmapset  *rd_idattr;		/* included in replica identity index */
 	Oid			rd_oidindex;	/* OID of unique index on OID, if any */
 	LockInfoData rd_lockInfo;	/* lock mgr's info for locking relation */
 	RuleLock   *rd_rules;		/* rewrite rules */
@@ -453,6 +454,29 @@ typedef struct StdRdOptions
  */
 #define RelationIsPopulated(relation) ((relation)->rd_rel->relispopulated)
 
+/*
+ * RelationIsAccessibleInLogicalDecoding
+ *		True if we need to log enough information to have access via
+ *		decoding snapshot.
+ */
+#define RelationIsAccessibleInLogicalDecoding(relation) \
+	(XLogLogicalInfoActive() && \
+	 RelationNeedsWAL(relation) && \
+	 IsCatalogRelation(relation))
+
+/*
+ * RelationIsLogicallyLogged
+ *		True if we need to log enough information to extract the data from the
+ *		WAL stream.
+ *
+ * We don't log information for unlogged tables (since they don't WAL log
+ * anyway) and for system tables (their content is hard to make sense of, and
+ * it would complicate decoding slightly for little gain).
+ */
+#define RelationIsLogicallyLogged(relation) \
+	(XLogLogicalInfoActive() && \
+	 RelationNeedsWAL(relation) && \
+	 !IsCatalogRelation(relation))
 
 /* routines in utils/cache/relcache.c */
 extern void RelationIncrementReferenceCount(Relation rel);
diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h
index 8ac2549cb3a..d7604ec113f 100644
--- a/src/include/utils/relcache.h
+++ b/src/include/utils/relcache.h
@@ -41,7 +41,17 @@ extern List *RelationGetIndexList(Relation relation);
 extern Oid	RelationGetOidIndex(Relation relation);
 extern List *RelationGetIndexExpressions(Relation relation);
 extern List *RelationGetIndexPredicate(Relation relation);
-extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs);
+
+typedef enum IndexAttrBitmapKind
+{
+	INDEX_ATTR_BITMAP_ALL,
+	INDEX_ATTR_BITMAP_KEY,
+	INDEX_ATTR_BITMAP_IDENTITY_KEY
+} IndexAttrBitmapKind;
+
+extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation,
+											 IndexAttrBitmapKind keyAttrs);
+
 extern void RelationGetExclusionInfo(Relation indexRelation,
 						 Oid **operators,
 						 Oid **procs,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b20eb0d5ac9..c5200372aec 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -791,6 +791,7 @@ IdentifySystemCmd
 IncrementVarSublevelsUp_context
 Index
 IndexArrayKeyInfo
+IndexAttrBitmapKind
 IndexBuildCallback
 IndexBuildResult
 IndexBulkDeleteCallback
@@ -2419,11 +2420,13 @@ xl_heap_cleanup_info
 xl_heap_delete
 xl_heap_freeze
 xl_heap_header
+xl_heap_header_len
 xl_heap_inplace
 xl_heap_insert
 xl_heap_lock
 xl_heap_lock_updated
 xl_heap_multi_insert
+xl_heap_new_cid
 xl_heap_newpage
 xl_heap_update
 xl_heap_visible
-- 
GitLab