From 61633f79048d040aefeed68dcac0e0b996a06189 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 3 Oct 2016 22:11:36 -0700
Subject: [PATCH] Correct logical decoding restore behaviour for
 subtransactions.

Before initializing iteration over a subtransaction's changes, the last
few changes were not spilled to disk. That's correct if the transaction
didn't spill to disk, but otherwise... This bug can lead to missed or
misorderd subtransaction contents when they were spilled to disk.

Move spilling of the remaining in-memory changes to
ReorderBufferIterTXNInit(), where it can easily be applied to the top
transaction and, if present, subtransactions.

Since this code had too many bugs already, noticeably increase test
coverage.

Fixes: #14319
Reported-By: Huan Ruan
Discussion: <20160909012610.20024.58169@wrigleys.postgresql.org>
Backport: 9,4-, where logical decoding was added
---
 contrib/test_decoding/Makefile                |   3 +-
 contrib/test_decoding/expected/spill.out      | 256 ++++++++++++++++++
 contrib/test_decoding/sql/spill.sql           | 179 ++++++++++++
 .../replication/logical/reorderbuffer.c       |  13 +-
 4 files changed, 445 insertions(+), 6 deletions(-)
 create mode 100644 contrib/test_decoding/expected/spill.out
 create mode 100644 contrib/test_decoding/sql/spill.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 309cb0b39a3..a6641f5040d 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,8 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
-	decoding_into_rel binary prepared replorigin time messages
+	decoding_into_rel binary prepared replorigin time messages \
+	spill
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/spill.out b/contrib/test_decoding/expected/spill.out
new file mode 100644
index 00000000000..363e9a3d17c
--- /dev/null
+++ b/contrib/test_decoding/expected/spill.out
@@ -0,0 +1,256 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE spill_test(data text);
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+-- spilling main xact
+BEGIN;
+INSERT INTO spill_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+ regexp_split_to_array | count |                              array_agg                              |                               array_agg                                
+-----------------------+-------+---------------------------------------------------------------------+------------------------------------------------------------------------
+ 'serialize-topbig--1  |  5000 | table public.spill_test: INSERT: data[text]:'serialize-topbig--1:1' | table public.spill_test: INSERT: data[text]:'serialize-topbig--1:5000'
+(1 row)
+
+-- spilling subxact, nothing in main
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+ regexp_split_to_array | count |                              array_agg                              |                               array_agg                                
+-----------------------+-------+---------------------------------------------------------------------+------------------------------------------------------------------------
+ 'serialize-subbig--1  |  5000 | table public.spill_test: INSERT: data[text]:'serialize-subbig--1:1' | table public.spill_test: INSERT: data[text]:'serialize-subbig--1:5000'
+(1 row)
+
+-- spilling subxact, spilling main xact
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+    regexp_split_to_array    | count |                                   array_agg                                   |                                   array_agg                                    
+-----------------------------+-------+-------------------------------------------------------------------------------+--------------------------------------------------------------------------------
+ 'serialize-subbig-topbig--1 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-subbig-topbig--1:1'    | table public.spill_test: INSERT: data[text]:'serialize-subbig-topbig--1:5000'
+ 'serialize-subbig-topbig--2 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-subbig-topbig--2:5001' | table public.spill_test: INSERT: data[text]:'serialize-subbig-topbig--2:10000'
+(2 rows)
+
+-- spilling subxact, non-spilling main xact
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+     regexp_split_to_array     | count |                                    array_agg                                    |                                    array_agg                                    
+-------------------------------+-------+---------------------------------------------------------------------------------+---------------------------------------------------------------------------------
+ 'serialize-subbig-topsmall--1 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-subbig-topsmall--1:1'    | table public.spill_test: INSERT: data[text]:'serialize-subbig-topsmall--1:5000'
+ 'serialize-subbig-topsmall--2 |     1 | table public.spill_test: INSERT: data[text]:'serialize-subbig-topsmall--2:5001' | table public.spill_test: INSERT: data[text]:'serialize-subbig-topsmall--2:5001'
+(2 rows)
+
+-- not-spilling subxact, spilling main xact
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+    regexp_split_to_array    | count |                                   array_agg                                   |                                   array_agg                                    
+-----------------------------+-------+-------------------------------------------------------------------------------+--------------------------------------------------------------------------------
+ 'serialize-subbig-topbig--1 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-subbig-topbig--1:1'    | table public.spill_test: INSERT: data[text]:'serialize-subbig-topbig--1:5000'
+ 'serialize-subbig-topbig--2 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-subbig-topbig--2:5001' | table public.spill_test: INSERT: data[text]:'serialize-subbig-topbig--2:10000'
+(2 rows)
+
+-- spilling main xact, spilling subxact
+BEGIN;
+INSERT INTO spill_test SELECT 'serialize-topbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-topbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+    regexp_split_to_array    | count |                                   array_agg                                   |                                   array_agg                                    
+-----------------------------+-------+-------------------------------------------------------------------------------+--------------------------------------------------------------------------------
+ 'serialize-topbig-subbig--1 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-topbig-subbig--1:1'    | table public.spill_test: INSERT: data[text]:'serialize-topbig-subbig--1:5000'
+ 'serialize-topbig-subbig--2 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-topbig-subbig--2:5001' | table public.spill_test: INSERT: data[text]:'serialize-topbig-subbig--2:10000'
+(2 rows)
+
+-- spilling main xact, not spilling subxact
+BEGIN;
+INSERT INTO spill_test SELECT 'serialize-topbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-topbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+RELEASE SAVEPOINT s;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+     regexp_split_to_array     | count |                                    array_agg                                    |                                    array_agg                                    
+-------------------------------+-------+---------------------------------------------------------------------------------+---------------------------------------------------------------------------------
+ 'serialize-topbig-subsmall--1 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-topbig-subsmall--1:1'    | table public.spill_test: INSERT: data[text]:'serialize-topbig-subsmall--1:5000'
+ 'serialize-topbig-subsmall--2 |     1 | table public.spill_test: INSERT: data[text]:'serialize-topbig-subsmall--2:5001' | table public.spill_test: INSERT: data[text]:'serialize-topbig-subsmall--2:5001'
+(2 rows)
+
+-- spilling subxact, followed by another spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-subbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s1;
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-subbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s2;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+    regexp_split_to_array    | count |                                   array_agg                                   |                                   array_agg                                    
+-----------------------------+-------+-------------------------------------------------------------------------------+--------------------------------------------------------------------------------
+ 'serialize-subbig-subbig--1 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-subbig-subbig--1:1'    | table public.spill_test: INSERT: data[text]:'serialize-subbig-subbig--1:5000'
+ 'serialize-subbig-subbig--2 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-subbig-subbig--2:5001' | table public.spill_test: INSERT: data[text]:'serialize-subbig-subbig--2:10000'
+(2 rows)
+
+-- spilling subxact, followed by not spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-subbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s1;
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-subbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+RELEASE SAVEPOINT s2;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+     regexp_split_to_array     | count |                                    array_agg                                    |                                    array_agg                                    
+-------------------------------+-------+---------------------------------------------------------------------------------+---------------------------------------------------------------------------------
+ 'serialize-subbig-subsmall--1 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-subbig-subsmall--1:1'    | table public.spill_test: INSERT: data[text]:'serialize-subbig-subsmall--1:5000'
+ 'serialize-subbig-subsmall--2 |     1 | table public.spill_test: INSERT: data[text]:'serialize-subbig-subsmall--2:5001' | table public.spill_test: INSERT: data[text]:'serialize-subbig-subsmall--2:5001'
+(2 rows)
+
+-- not spilling subxact, followed by spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-subsmall-subbig--1:'||g.i FROM generate_series(1, 1) g(i);
+RELEASE SAVEPOINT s1;
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-subsmall-subbig--2:'||g.i FROM generate_series(2, 5001) g(i);
+RELEASE SAVEPOINT s2;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+     regexp_split_to_array     | count |                                  array_agg                                   |                                    array_agg                                    
+-------------------------------+-------+------------------------------------------------------------------------------+---------------------------------------------------------------------------------
+ 'serialize-subsmall-subbig--1 |     1 | table public.spill_test: INSERT: data[text]:'serialize-subsmall-subbig--1:1' | table public.spill_test: INSERT: data[text]:'serialize-subsmall-subbig--1:1'
+ 'serialize-subsmall-subbig--2 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-subsmall-subbig--2:2' | table public.spill_test: INSERT: data[text]:'serialize-subsmall-subbig--2:5001'
+(2 rows)
+
+-- spilling subxact, containing another spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s2;
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+       regexp_split_to_array        | count |                                      array_agg                                       |                                       array_agg                                       
+------------------------------------+-------+--------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------
+ 'serialize-nested-subbig-subbig--1 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbig--1:1'    | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbig--1:5000'
+ 'serialize-nested-subbig-subbig--2 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbig--2:5001' | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbig--2:10000'
+(2 rows)
+
+-- spilling subxact, containing a not spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+RELEASE SAVEPOINT s2;
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+        regexp_split_to_array         | count |                                       array_agg                                        |                                       array_agg                                        
+--------------------------------------+-------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------
+ 'serialize-nested-subbig-subsmall--1 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subsmall--1:1'    | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subsmall--1:5000'
+ 'serialize-nested-subbig-subsmall--2 |     1 | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subsmall--2:5001' | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subsmall--2:5001'
+(2 rows)
+
+-- not spilling subxact, containing a spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subsmall-subbig--1:'||g.i FROM generate_series(1, 1) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subsmall-subbig--2:'||g.i FROM generate_series(2, 5001) g(i);
+RELEASE SAVEPOINT s2;
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+        regexp_split_to_array         | count |                                      array_agg                                      |                                       array_agg                                        
+--------------------------------------+-------+-------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------
+ 'serialize-nested-subsmall-subbig--1 |     1 | table public.spill_test: INSERT: data[text]:'serialize-nested-subsmall-subbig--1:1' | table public.spill_test: INSERT: data[text]:'serialize-nested-subsmall-subbig--1:1'
+ 'serialize-nested-subsmall-subbig--2 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-nested-subsmall-subbig--2:2' | table public.spill_test: INSERT: data[text]:'serialize-nested-subsmall-subbig--2:5001'
+(2 rows)
+
+-- not spilling subxact, containing a spilling subxact that aborts and one that commits
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort--2:'||g.i FROM generate_series(5001, 10000) g(i);
+ROLLBACK TO SAVEPOINT s2;
+SAVEPOINT s3;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort-subbig-3:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+             regexp_split_to_array             | count |                                            array_agg                                            |                                            array_agg                                             
+-----------------------------------------------+-------+-------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------
+ 'serialize-nested-subbig-subbigabort--1       |  5000 | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort--1:1'          | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort--1:5000'
+ 'serialize-nested-subbig-subbigabort-subbig-3 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort-subbig-3:5001' | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort-subbig-3:10000'
+(2 rows)
+
+DROP TABLE spill_test;
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/spill.sql b/contrib/test_decoding/sql/spill.sql
new file mode 100644
index 00000000000..358af0bfcd4
--- /dev/null
+++ b/contrib/test_decoding/sql/spill.sql
@@ -0,0 +1,179 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE spill_test(data text);
+
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- spilling main xact
+BEGIN;
+INSERT INTO spill_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, nothing in main
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, spilling main xact
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, non-spilling main xact
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- not-spilling subxact, spilling main xact
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling main xact, spilling subxact
+BEGIN;
+INSERT INTO spill_test SELECT 'serialize-topbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-topbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling main xact, not spilling subxact
+BEGIN;
+INSERT INTO spill_test SELECT 'serialize-topbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-topbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+RELEASE SAVEPOINT s;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, followed by another spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-subbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s1;
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-subbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s2;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, followed by not spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-subbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s1;
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-subbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+RELEASE SAVEPOINT s2;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- not spilling subxact, followed by spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-subsmall-subbig--1:'||g.i FROM generate_series(1, 1) g(i);
+RELEASE SAVEPOINT s1;
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-subsmall-subbig--2:'||g.i FROM generate_series(2, 5001) g(i);
+RELEASE SAVEPOINT s2;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, containing another spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s2;
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, containing a not spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+RELEASE SAVEPOINT s2;
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- not spilling subxact, containing a spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subsmall-subbig--1:'||g.i FROM generate_series(1, 1) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subsmall-subbig--2:'||g.i FROM generate_series(2, 5001) g(i);
+RELEASE SAVEPOINT s2;
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- not spilling subxact, containing a spilling subxact that aborts and one that commits
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort--2:'||g.i FROM generate_series(5001, 10000) g(i);
+ROLLBACK TO SAVEPOINT s2;
+SAVEPOINT s3;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort-subbig-3:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+DROP TABLE spill_test;
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e2a502c4431..6ad7e7de76c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -935,8 +935,12 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		ReorderBufferChange *cur_change;
 
 		if (txn->nentries != txn->nentries_mem)
+		{
+			/* serialize remaining changes */
+			ReorderBufferSerializeTXN(rb, txn);
 			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
 										&state->entries[off].segno);
+		}
 
 		cur_change = dlist_head_element(ReorderBufferChange, node,
 										&txn->changes);
@@ -960,10 +964,13 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 			ReorderBufferChange *cur_change;
 
 			if (cur_txn->nentries != cur_txn->nentries_mem)
+			{
+				/* serialize remaining changes */
+				ReorderBufferSerializeTXN(rb, cur_txn);
 				ReorderBufferRestoreChanges(rb, cur_txn,
 											&state->entries[off].fd,
 											&state->entries[off].segno);
-
+			}
 			cur_change = dlist_head_element(ReorderBufferChange, node,
 											&cur_txn->changes);
 
@@ -1367,10 +1374,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	txn->origin_id = origin_id;
 	txn->origin_lsn = origin_lsn;
 
-	/* serialize the last bunch of changes if we need start earlier anyway */
-	if (txn->nentries_mem != txn->nentries)
-		ReorderBufferSerializeTXN(rb, txn);
-
 	/*
 	 * If this transaction didn't have any real changes in our database, it's
 	 * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
-- 
GitLab