diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index fd713952cd6a932dba4c500627bd2dc4c5b609b2..4200d15bfa44b76aae547deb4fe48ef44178278d 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -54,7 +54,8 @@ regresscheck-install-force: | submake-regress submake-test_decoding
 	    --extra-install=contrib/test_decoding \
 	    $(REGRESSCHECKS)
 
-ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml
+ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \
+	oldest_xmin snapshot_transfer
 
 isolationcheck: all | submake-isolation submake-test_decoding
 	$(MKDIR_P) isolation_output
diff --git a/contrib/test_decoding/expected/oldest_xmin.out b/contrib/test_decoding/expected/oldest_xmin.out
new file mode 100644
index 0000000000000000000000000000000000000000..d09342c4bec5992a4f2405c7124202dc24f4817f
--- /dev/null
+++ b/contrib/test_decoding/expected/oldest_xmin.out
@@ -0,0 +1,27 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_get_changes s1_commit s0_vacuum s0_get_changes
+step s0_begin: BEGIN;
+step s0_getxid: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s1_begin: BEGIN;
+step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3));
+step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos;
+step s0_commit: COMMIT;
+step s0_checkpoint: CHECKPOINT;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data           
+
+step s1_commit: COMMIT;
+step s0_vacuum: VACUUM FULL;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data           
+
+BEGIN          
+table public.harvest: INSERT: fruits[basket]:'(1,2,3)'
+COMMIT         
+?column?       
+
+stop           
diff --git a/contrib/test_decoding/expected/snapshot_transfer.out b/contrib/test_decoding/expected/snapshot_transfer.out
new file mode 100644
index 0000000000000000000000000000000000000000..87bed03f766c1dccc85aa848b23e93e3d0ef98e1
--- /dev/null
+++ b/contrib/test_decoding/expected/snapshot_transfer.out
@@ -0,0 +1,49 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub0 s0_commit s0_get_changes
+step s0_begin: BEGIN;
+step s0_begin_sub0: SAVEPOINT s0;
+step s0_log_assignment: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0);
+step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int;
+step s0_insert: INSERT INTO harvest VALUES (1, 2, 3);
+step s0_end_sub0: RELEASE SAVEPOINT s0;
+step s0_commit: COMMIT;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data           
+
+BEGIN          
+table public.dummy: INSERT: i[integer]:0
+table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3
+COMMIT         
+?column?       
+
+stop           
+
+starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_begin_sub1 s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub1 s0_end_sub0 s0_commit s0_get_changes
+step s0_begin: BEGIN;
+step s0_begin_sub0: SAVEPOINT s0;
+step s0_log_assignment: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s0_begin_sub1: SAVEPOINT s1;
+step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0);
+step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int;
+step s0_insert: INSERT INTO harvest VALUES (1, 2, 3);
+step s0_end_sub1: RELEASE SAVEPOINT s1;
+step s0_end_sub0: RELEASE SAVEPOINT s0;
+step s0_commit: COMMIT;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data           
+
+BEGIN          
+table public.dummy: INSERT: i[integer]:0
+table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3
+COMMIT         
+?column?       
+
+stop           
diff --git a/contrib/test_decoding/specs/oldest_xmin.spec b/contrib/test_decoding/specs/oldest_xmin.spec
new file mode 100644
index 0000000000000000000000000000000000000000..4f8af70aa264b5a64fa089cc2d08ee98568909b6
--- /dev/null
+++ b/contrib/test_decoding/specs/oldest_xmin.spec
@@ -0,0 +1,37 @@
+# Test advancement of the slot's oldest xmin
+
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
+    DROP TYPE IF EXISTS basket;
+    CREATE TYPE basket AS (apples integer, pears integer, mangos integer);
+    DROP TABLE IF EXISTS harvest;
+    CREATE TABLE harvest(fruits basket);
+}
+
+teardown
+{
+    DROP TABLE IF EXISTS harvest;
+    DROP TYPE IF EXISTS basket;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+step "s0_begin" { BEGIN; }
+step "s0_getxid" { SELECT txid_current() IS NULL; }
+step "s0_alter" { ALTER TYPE basket DROP ATTRIBUTE mangos; }
+step "s0_commit" { COMMIT; }
+step "s0_checkpoint" { CHECKPOINT; }
+step "s0_vacuum" { VACUUM FULL; }
+step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+session "s1"
+step "s1_begin" { BEGIN; }
+step "s1_insert" { INSERT INTO harvest VALUES ((1, 2, 3)); }
+step "s1_commit" { COMMIT; }
+
+# Checkpoint with following get_changes forces to advance xmin. ALTER of a
+# composite type is a rare form of DDL which allows T1 to see the tuple which
+# will be removed (xmax set) before T1 commits. That is, interlocking doesn't
+# forbid modifying catalog after someone read it (and didn't commit yet).
+permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes"
diff --git a/contrib/test_decoding/specs/snapshot_transfer.spec b/contrib/test_decoding/specs/snapshot_transfer.spec
new file mode 100644
index 0000000000000000000000000000000000000000..47db7fd90aec51a05037810293f39ff9da1fba87
--- /dev/null
+++ b/contrib/test_decoding/specs/snapshot_transfer.spec
@@ -0,0 +1,42 @@
+# Test snapshot transfer from subxact to top-level and receival of later snaps.
+
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
+    DROP TABLE IF EXISTS dummy;
+    CREATE TABLE dummy(i int);
+    DROP TABLE IF EXISTS harvest;
+    CREATE TABLE harvest(apples int, pears int);
+}
+
+teardown
+{
+    DROP TABLE IF EXISTS harvest;
+    DROP TABLE IF EXISTS dummy;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+step "s0_begin" { BEGIN; }
+step "s0_begin_sub0" { SAVEPOINT s0; }
+step "s0_log_assignment" { SELECT txid_current() IS NULL; }
+step "s0_begin_sub1" { SAVEPOINT s1; }
+step "s0_sub_get_base_snap" { INSERT INTO dummy VALUES (0); }
+step "s0_insert" { INSERT INTO harvest VALUES (1, 2, 3); }
+step "s0_end_sub0" { RELEASE SAVEPOINT s0; }
+step "s0_end_sub1" { RELEASE SAVEPOINT s1; }
+step "s0_insert2" { INSERT INTO harvest VALUES (1, 2, 3, 4); }
+step "s0_commit" { COMMIT; }
+step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+session "s1"
+step "s1_produce_new_snap" { ALTER TABLE harvest ADD COLUMN mangos int; }
+
+# start top-level without base snap, get base snap in subxact, then create new
+# snap and make sure it is queued.
+permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub0" "s0_commit" "s0_get_changes"
+
+# In previous test, we firstly associated subxact with xact and only then got
+# base snap; now nest one more subxact to get snap first and only then (at
+# commit) associate it with toplevel.
+permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_begin_sub1" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub1" "s0_end_sub0" "s0_commit" "s0_get_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index ce570e44ccb8540cacb652251c10424f06284369..7a8b706f2217fab2192b8292715720cc95bb03e6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -169,6 +169,8 @@ static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
 					  TransactionId xid, bool create, bool *is_new,
 					  XLogRecPtr lsn, bool create_as_top);
+static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
+								  ReorderBufferTXN *subtxn);
 
 static void AssertTXNLsnOrder(ReorderBuffer *rb);
 
@@ -269,6 +271,7 @@ ReorderBufferAllocate(void)
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
 	dlist_init(&buffer->toplevel_by_lsn);
+	dlist_init(&buffer->txns_by_base_snapshot_lsn);
 	dlist_init(&buffer->cached_transactions);
 	dlist_init(&buffer->cached_changes);
 	slist_init(&buffer->cached_tuplebufs);
@@ -550,7 +553,6 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
 	bool		found;
 
 	Assert(TransactionIdIsValid(xid));
-	Assert(!create || lsn != InvalidXLogRecPtr);
 
 	/*
 	 * Check the one-entry lookup cache first
@@ -594,6 +596,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
 	{
 		/* initialize the new entry, if creation was requested */
 		Assert(ent != NULL);
+		Assert(lsn != InvalidXLogRecPtr);
 
 		ent->txn = ReorderBufferGetTXN(rb);
 		ent->txn->xid = xid;
@@ -641,42 +644,80 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	ReorderBufferCheckSerializeTXN(rb, txn);
 }
 
+/*
+ * AssertTXNLsnOrder
+ *		Verify LSN ordering of transaction lists in the reorderbuffer
+ *
+ * Other LSN-related invariants are checked too.
+ *
+ * No-op if assertions are not in use.
+ */
 static void
 AssertTXNLsnOrder(ReorderBuffer *rb)
 {
 #ifdef USE_ASSERT_CHECKING
 	dlist_iter	iter;
 	XLogRecPtr	prev_first_lsn = InvalidXLogRecPtr;
+	XLogRecPtr	prev_base_snap_lsn = InvalidXLogRecPtr;
 
 	dlist_foreach(iter, &rb->toplevel_by_lsn)
 	{
-		ReorderBufferTXN *cur_txn;
+		ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
+													iter.cur);
 
-		cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
+		/* start LSN must be set */
 		Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
 
+		/* If there is an end LSN, it must be higher than start LSN */
 		if (cur_txn->end_lsn != InvalidXLogRecPtr)
 			Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
 
+		/* Current initial LSN must be strictly higher than previous */
 		if (prev_first_lsn != InvalidXLogRecPtr)
 			Assert(prev_first_lsn < cur_txn->first_lsn);
 
+		/* known-as-subtxn txns must not be listed */
 		Assert(!cur_txn->is_known_as_subxact);
+
 		prev_first_lsn = cur_txn->first_lsn;
 	}
+
+	dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
+	{
+		ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
+													base_snapshot_node,
+													iter.cur);
+
+		/* base snapshot (and its LSN) must be set */
+		Assert(cur_txn->base_snapshot != NULL);
+		Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
+
+		/* current LSN must be strictly higher than previous */
+		if (prev_base_snap_lsn != InvalidXLogRecPtr)
+			Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
+
+		/* known-as-subtxn txns must not be listed */
+		Assert(!cur_txn->is_known_as_subxact);
+
+		prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
+	}
 #endif
 }
 
+/*
+ * ReorderBufferGetOldestTXN
+ *		Return oldest transaction in reorderbuffer
+ */
 ReorderBufferTXN *
 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
 {
 	ReorderBufferTXN *txn;
 
+	AssertTXNLsnOrder(rb);
+
 	if (dlist_is_empty(&rb->toplevel_by_lsn))
 		return NULL;
 
-	AssertTXNLsnOrder(rb);
-
 	txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
 
 	Assert(!txn->is_known_as_subxact);
@@ -684,12 +725,44 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb)
 	return txn;
 }
 
+/*
+ * ReorderBufferGetOldestXmin
+ *		Return oldest Xmin in reorderbuffer
+ *
+ * Returns oldest possibly running Xid from the point of view of snapshots
+ * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
+ * there are none.
+ *
+ * Since snapshots are assigned monotonically, this equals the Xmin of the
+ * base snapshot with minimal base_snapshot_lsn.
+ */
+TransactionId
+ReorderBufferGetOldestXmin(ReorderBuffer *rb)
+{
+	ReorderBufferTXN *txn;
+
+	AssertTXNLsnOrder(rb);
+
+	if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
+		return InvalidTransactionId;
+
+	txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
+							 &rb->txns_by_base_snapshot_lsn);
+	return txn->base_snapshot->xmin;
+}
+
 void
 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
 {
 	rb->current_restart_decoding_lsn = ptr;
 }
 
+/*
+ * ReorderBufferAssignChild
+ *
+ * Make note that we know that subxid is a subtransaction of xid, seen as of
+ * the given lsn.
+ */
 void
 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
 						 TransactionId subxid, XLogRecPtr lsn)
@@ -702,32 +775,107 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
 	txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
 	subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
 
-	if (new_sub)
+	if (new_top && !new_sub)
+		elog(ERROR, "subtransaction logged without previous top-level txn record");
+
+	if (!new_sub)
 	{
-		/*
-		 * we assign subtransactions to top level transaction even if we don't
-		 * have data for it yet, assignment records frequently reference xids
-		 * that have not yet produced any records. Knowing those aren't top
-		 * level xids allows us to make processing cheaper in some places.
-		 */
-		dlist_push_tail(&txn->subtxns, &subtxn->node);
-		txn->nsubtxns++;
+		if (subtxn->is_known_as_subxact)
+		{
+			/* already associated, nothing to do */
+			return;
+		}
+		else
+		{
+			/*
+			 * We already saw this transaction, but initially added it to the list
+			 * of top-level txns.  Now that we know it's not top-level, remove
+			 * it from there.
+			 */
+			dlist_delete(&subtxn->node);
+		}
 	}
-	else if (!subtxn->is_known_as_subxact)
-	{
-		subtxn->is_known_as_subxact = true;
-		Assert(subtxn->nsubtxns == 0);
 
-		/* remove from lsn order list of top-level transactions */
-		dlist_delete(&subtxn->node);
+	subtxn->is_known_as_subxact = true;
+	subtxn->toplevel_xid = xid;
+	Assert(subtxn->nsubtxns == 0);
 
-		/* add to toplevel transaction */
-		dlist_push_tail(&txn->subtxns, &subtxn->node);
-		txn->nsubtxns++;
-	}
-	else if (new_top)
+	/* add to subtransaction list */
+	dlist_push_tail(&txn->subtxns, &subtxn->node);
+	txn->nsubtxns++;
+
+	/* Possibly transfer the subtxn's snapshot to its top-level txn. */
+	ReorderBufferTransferSnapToParent(txn, subtxn);
+
+	/* Verify LSN-ordering invariant */
+	AssertTXNLsnOrder(rb);
+}
+
+/*
+ * ReorderBufferTransferSnapToParent
+ *		Transfer base snapshot from subtxn to top-level txn, if needed
+ *
+ * This is done if the top-level txn doesn't have a base snapshot, or if the
+ * subtxn's base snapshot has an earlier LSN than the top-level txn's base
+ * snapshot's LSN.  This can happen if there are no changes in the toplevel
+ * txn but there are some in the subtxn, or the first change in subtxn has
+ * earlier LSN than first change in the top-level txn and we learned about
+ * their kinship only now.
+ *
+ * The subtransaction's snapshot is cleared regardless of the transfer
+ * happening, since it's not needed anymore in either case.
+ *
+ * We do this as soon as we become aware of their kinship, to avoid queueing
+ * extra snapshots to txns known-as-subtxns -- only top-level txns will
+ * receive further snapshots.
+ */
+static void
+ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
+								  ReorderBufferTXN *subtxn)
+{
+	Assert(subtxn->toplevel_xid == txn->xid);
+
+	if (subtxn->base_snapshot != NULL)
 	{
-		elog(ERROR, "existing subxact assigned to unknown toplevel xact");
+		if (txn->base_snapshot == NULL ||
+			subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
+		{
+			/*
+			 * If the toplevel transaction already has a base snapshot but
+			 * it's newer than the subxact's, purge it.
+			 */
+			if (txn->base_snapshot != NULL)
+			{
+				SnapBuildSnapDecRefcount(txn->base_snapshot);
+				dlist_delete(&txn->base_snapshot_node);
+			}
+
+			/*
+			 * The snapshot is now the top transaction's; transfer it, and
+			 * adjust the list position of the top transaction in the list by
+			 * moving it to where the subtransaction is.
+			 */
+			txn->base_snapshot = subtxn->base_snapshot;
+			txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
+			dlist_insert_before(&subtxn->base_snapshot_node,
+								&txn->base_snapshot_node);
+
+			/*
+			 * The subtransaction doesn't have a snapshot anymore (so it
+			 * mustn't be in the list.)
+			 */
+			subtxn->base_snapshot = NULL;
+			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
+			dlist_delete(&subtxn->base_snapshot_node);
+		}
+		else
+		{
+			/* Base snap of toplevel is fine, so subxact's is not needed */
+			SnapBuildSnapDecRefcount(subtxn->base_snapshot);
+			dlist_delete(&subtxn->base_snapshot_node);
+			subtxn->base_snapshot = NULL;
+			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
+		}
 	}
 }
 
@@ -740,7 +888,6 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
 						 TransactionId subxid, XLogRecPtr commit_lsn,
 						 XLogRecPtr end_lsn)
 {
-	ReorderBufferTXN *txn;
 	ReorderBufferTXN *subtxn;
 
 	subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
@@ -752,42 +899,14 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
 	if (!subtxn)
 		return;
 
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
-
-	if (txn == NULL)
-		elog(ERROR, "subxact logged without previous toplevel record");
-
-	/*
-	 * Pass our base snapshot to the parent transaction if it doesn't have
-	 * one, or ours is older. That can happen if there are no changes in the
-	 * toplevel transaction but in one of the child transactions. This allows
-	 * the parent to simply use its base snapshot initially.
-	 */
-	if (subtxn->base_snapshot != NULL &&
-		(txn->base_snapshot == NULL ||
-		 txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
-	{
-		txn->base_snapshot = subtxn->base_snapshot;
-		txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
-		subtxn->base_snapshot = NULL;
-		subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
-	}
-
 	subtxn->final_lsn = commit_lsn;
 	subtxn->end_lsn = end_lsn;
 
-	if (!subtxn->is_known_as_subxact)
-	{
-		subtxn->is_known_as_subxact = true;
-		Assert(subtxn->nsubtxns == 0);
-
-		/* remove from lsn order list of top-level transactions */
-		dlist_delete(&subtxn->node);
-
-		/* add to subtransaction list */
-		dlist_push_tail(&txn->subtxns, &subtxn->node);
-		txn->nsubtxns++;
-	}
+	/*
+	 * Assign this subxact as a child of the toplevel xact (no-op if already
+	 * done.)
+	 */
+	ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
 }
 
 
@@ -1111,11 +1230,13 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		ReorderBufferReturnChange(rb, change);
 	}
 
+	/*
+	 * Cleanup the base snapshot, if set.
+	 */
 	if (txn->base_snapshot != NULL)
 	{
 		SnapBuildSnapDecRefcount(txn->base_snapshot);
-		txn->base_snapshot = NULL;
-		txn->base_snapshot_lsn = InvalidXLogRecPtr;
+		dlist_delete(&txn->base_snapshot_node);
 	}
 
 	/* delete from list of known subxacts */
@@ -1293,17 +1414,17 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
 }
 
 /*
- * Perform the replay of a transaction and it's non-aborted subtransactions.
+ * Perform the replay of a transaction and its non-aborted subtransactions.
  *
  * Subtransactions previously have to be processed by
  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
  * transaction with ReorderBufferAssignChild.
  *
- * We currently can only decode a transaction's contents in when their commit
- * record is read because that's currently the only place where we know about
- * cache invalidations. Thus, once a toplevel commit is read, we iterate over
- * the top and subtransactions (using a k-way merge) and replay the changes in
- * lsn order.
+ * We currently can only decode a transaction's contents when its commit
+ * record is read because that's the only place where we know about cache
+ * invalidations. Thus, once a toplevel commit is read, we iterate over the top
+ * and subtransactions (using a k-way merge) and replay the changes in lsn
+ * order.
  */
 void
 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
@@ -1328,10 +1449,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	txn->commit_time = commit_time;
 
 	/*
-	 * If this transaction didn't have any real changes in our database, it's
-	 * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
-	 * transferred its snapshot to this transaction if it had one and the
-	 * toplevel tx didn't.
+	 * If this transaction has no snapshot, it didn't make any changes to the
+	 * database, so there's nothing to decode.  Note that
+	 * ReorderBufferCommitChild will have transferred any snapshots from
+	 * subtransactions if there were any.
 	 */
 	if (txn->base_snapshot == NULL)
 	{
@@ -1767,12 +1888,10 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
- * Setup the base snapshot of a transaction. The base snapshot is the snapshot
- * that is used to decode all changes until either this transaction modifies
- * the catalog or another catalog modifying transaction commits.
+ * Set up the transaction's base snapshot.
  *
- * Needs to be called before any changes are added with
- * ReorderBufferQueueChange().
+ * If we know that xid is a subtransaction, set the base snapshot on the
+ * top-level transaction instead.
  */
 void
 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
@@ -1781,12 +1900,23 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
 	ReorderBufferTXN *txn;
 	bool		is_new;
 
+	AssertArg(snap != NULL);
+
+	/*
+	 * Fetch the transaction to operate on.  If we know it's a subtransaction,
+	 * operate on its top-level transaction instead.
+	 */
 	txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
+	if (txn->is_known_as_subxact)
+		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
+									NULL, InvalidXLogRecPtr, false);
 	Assert(txn->base_snapshot == NULL);
-	Assert(snap != NULL);
 
 	txn->base_snapshot = snap;
 	txn->base_snapshot_lsn = lsn;
+	dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
+
+	AssertTXNLsnOrder(rb);
 }
 
 /*
@@ -1905,25 +2035,26 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
 }
 
 /*
- * Have we already added the first snapshot?
+ * ReorderBufferXidHasBaseSnapshot
+ *		Have we already set the base snapshot for the given txn/subtxn?
  */
 bool
 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
 {
 	ReorderBufferTXN *txn;
 
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-								false);
+	txn = ReorderBufferTXNByXid(rb, xid, false,
+								NULL, InvalidXLogRecPtr, false);
 
 	/* transaction isn't known yet, ergo no snapshot */
 	if (txn == NULL)
 		return false;
 
-	/*
-	 * TODO: It would be a nice improvement if we would check the toplevel
-	 * transaction in subtransactions, but we'd need to keep track of a bit
-	 * more state.
-	 */
+	/* a known subtxn? operate on top-level txn instead */
+	if (txn->is_known_as_subxact)
+		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
+									NULL, InvalidXLogRecPtr, false);
+
 	return txn->base_snapshot != NULL;
 }
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 84314da0f8ad0f8eb568567429a91fa8b813cdff..71cc06d46368db37168a45e769206363ae48e5d7 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -795,9 +795,9 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 		 * all. We'll add a snapshot when the first change gets queued.
 		 *
 		 * NB: This works correctly even for subtransactions because
-		 * ReorderBufferCommitChild() takes care to pass the parent the base
-		 * snapshot, and while iterating the changequeue we'll get the change
-		 * from the subtxn.
+		 * ReorderBufferAssignChild() takes care to transfer the base snapshot
+		 * to the top-level transaction, and while iterating the changequeue
+		 * we'll get the change from the subtxn.
 		 */
 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
 			continue;
@@ -1038,7 +1038,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		/* refcount of the snapshot builder for the new snapshot */
 		SnapBuildSnapIncRefcount(builder->snapshot);
 
-		/* add a new SnapshotNow to all currently running transactions */
+		/* add a new catalog snapshot to all currently running transactions */
 		SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
 	}
 }
@@ -1058,6 +1058,7 @@ void
 SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 {
 	ReorderBufferTXN *txn;
+	TransactionId xmin;
 
 	/*
 	 * If we're not consistent yet, inspect the record to see whether it
@@ -1090,15 +1091,21 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	/* Remove transactions we don't need to keep track off anymore */
 	SnapBuildPurgeCommittedTxn(builder);
 
-	elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
-		 builder->xmin, builder->xmax,
-		 running->oldestRunningXid);
-
 	/*
-	 * Inrease shared memory limits, so vacuum can work on tuples we prevented
-	 * from being pruned till now.
+	 * Advance the xmin limit for the current replication slot, to allow
+	 * vacuum to clean up the tuples this slot has been protecting.
+	 *
+	 * The reorderbuffer might have an xmin among the currently running
+	 * snapshots; use it if so.  If not, we need only consider the snapshots
+	 * we'll produce later, which can't be less than the oldest running xid in
+	 * the record we're reading now.
 	 */
-	LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
+	xmin = ReorderBufferGetOldestXmin(builder->reorder);
+	if (xmin == InvalidTransactionId)
+		xmin = running->oldestRunningXid;
+	elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
+		 builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
+	LogicalIncreaseXminForSlot(lsn, xmin);
 
 	/*
 	 * Also tell the slot where we can restart decoding from. We don't want to
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 17fe811d6e2f622fd79ad37171edda35075349af..cb2f1ad9f18a227df5cc7036a18326e35d35ae41 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -132,10 +132,9 @@ typedef struct ReorderBufferTXN
 	/* did the TX have catalog changes */
 	bool		has_catalog_changes;
 
-	/*
-	 * Do we know this is a subxact?
-	 */
+	/* Do we know this is a subxact?  Xid of top-level txn if so */
 	bool		is_known_as_subxact;
+	TransactionId toplevel_xid;
 
 	/*
 	 * LSN of the first data carrying, WAL record with knowledge about this
@@ -177,10 +176,13 @@ typedef struct ReorderBufferTXN
 	TimestampTz commit_time;
 
 	/*
-	 * Base snapshot or NULL.
+	 * The base snapshot is used to decode all changes until either this
+	 * transaction modifies the catalog, or another catalog-modifying
+	 * transaction commits.
 	 */
 	Snapshot	base_snapshot;
 	XLogRecPtr	base_snapshot_lsn;
+	dlist_node	base_snapshot_node;	/* link in txns_by_base_snapshot_lsn */
 
 	/*
 	 * How many ReorderBufferChange's do we have in this txn.
@@ -247,7 +249,7 @@ typedef struct ReorderBufferTXN
 	 * Position in one of three lists:
 	 * * list of subtransactions if we are *known* to be subxact
 	 * * list of toplevel xacts (can be an as-yet unknown subxact)
-	 * * list of preallocated ReorderBufferTXNs
+	 * * list of preallocated ReorderBufferTXNs (if unused)
 	 * ---
 	 */
 	dlist_node	node;
@@ -288,6 +290,15 @@ struct ReorderBuffer
 	 */
 	dlist_head	toplevel_by_lsn;
 
+	/*
+	 * Transactions and subtransactions that have a base snapshot, ordered by
+	 * LSN of the record which caused us to first obtain the base snapshot.
+	 * This is not the same as toplevel_by_lsn, because we only set the base
+	 * snapshot on the first logical-decoding-relevant record (eg. heap
+	 * writes), whereas the initial LSN could be set by other operations.
+	 */
+	dlist_head	txns_by_base_snapshot_lsn;
+
 	/*
 	 * one-entry sized cache for by_txn. Very frequently the same txn gets
 	 * looked up over and over again.
@@ -376,6 +387,7 @@ bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
+TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 
 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);