From 49c0864d7ef5227faa24f903902db90e5c9d5d69 Mon Sep 17 00:00:00 2001 From: Robert Haas <rhaas@postgresql.org> Date: Tue, 18 Mar 2014 13:20:01 -0400 Subject: [PATCH] Documentation for logical decoding. Craig Ringer, Andres Freund, Christian Kruse, with edits by me. --- doc/src/sgml/catalogs.sgml | 27 +- doc/src/sgml/filelist.sgml | 1 + doc/src/sgml/func.sgml | 99 +++++ doc/src/sgml/logicaldecoding.sgml | 560 +++++++++++++++++++++++++++ doc/src/sgml/postgres.sgml | 1 + doc/src/sgml/protocol.sgml | 52 ++- doc/src/sgml/ref/allfiles.sgml | 1 + doc/src/sgml/ref/alter_table.sgml | 2 +- doc/src/sgml/ref/create_table.sgml | 15 +- doc/src/sgml/ref/pg_recvlogical.sgml | 331 ++++++++++++++++ doc/src/sgml/reference.sgml | 1 + 11 files changed, 1085 insertions(+), 5 deletions(-) create mode 100644 doc/src/sgml/logicaldecoding.sgml create mode 100644 doc/src/sgml/ref/pg_recvlogical.sgml diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 908f947f81a..a12ee56ad02 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5177,7 +5177,7 @@ <para> For more on replication slots, - see <xref linkend="streaming-replication-slots">. + see <xref linkend="streaming-replication-slots"> and <xref linkend="logicaldecoding">. </para> <table> @@ -5209,6 +5209,13 @@ <entry>The slot type - <literal>physical</> or <literal>logical</></entry> </row> + <row> + <entry><structfield>plugin</structfield></entry> + <entry><type>text</type></entry> + <entry></entry> + <entry>The basename of the shared object containing the output plugin this logical slot is using, or null for physical slots.</entry> + </row> + <row> <entry><structfield>datoid</structfield></entry> <entry><type>oid</type></entry> @@ -5242,6 +5249,24 @@ </entry> </row> + <row> + <entry><structfield>xmin</structfield></entry> + <entry><type>xid</type></entry> + <entry></entry> + <entry>The oldest transaction that this slot needs the database to + retain. <literal>VACUUM</literal> cannot remove catalog tuples deleted + by any later transaction. + </entry> + </row> + + <row> + <entry><structfield>catalog_xmin</structfield></entry> + <entry><type>xid</type></entry> + <entry></entry> + <entry>The <literal>xmin</literal>, or oldest transaction ID, that this + slot forces to be retained in the system catalogs. </entry> + </row> + <row> <entry><structfield>restart_lsn</structfield></entry> <entry><type>pg_lsn</type></entry> diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index 0e863ee064e..6c8e254a584 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -91,6 +91,7 @@ <!ENTITY nls SYSTEM "nls.sgml"> <!ENTITY plhandler SYSTEM "plhandler.sgml"> <!ENTITY fdwhandler SYSTEM "fdwhandler.sgml"> +<!ENTITY logicaldecoding SYSTEM "logicaldecoding.sgml"> <!ENTITY protocol SYSTEM "protocol.sgml"> <!ENTITY sources SYSTEM "sources.sgml"> <!ENTITY storage SYSTEM "storage.sgml"> diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 080da434591..71b9829d852 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -16437,9 +16437,108 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); command <literal>DROP_REPLICATION_SLOT</>. </entry> </row> + + <row> + <entry> + <indexterm> + <primary>pg_create_logical_replication_slot</primary> + </indexterm> + <literal><function>pg_create_logical_replication_slot(<parameter>slotname</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type>)</function></literal> + </entry> + <entry> + (<parameter>slotname</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>) + </entry> + <entry> + Creates a new logical (decoding) replication slot named + <parameter>slotname</parameter> using the output plugin + <parameter>plugin</parameter>. A call to this function has the same + effect as the replication protocol command + <literal>CREATE REPLICATION SLOT ... LOGICAL</literal>. + </entry> + </row> + + <row> + <entry> + <indexterm> + <primary>pg_logical_slot_get_changes</primary> + </indexterm> + <literal><function>pg_logical_slot_get_changes(<parameter>slotname</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal> + </entry> + <entry> + (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>text</type>) + </entry> + <entry> + Returns changes in the slot <parameter>slotname</parameter>, starting + from the point at which since changes have been consumed last. If + <parameter>upto_lsn</> and <parameter>upto_nchanges</> are NULL, + logical decoding will continue until end of WAL. If + <parameter>upto_lsn</> is non-NULL, decoding will include only + those transactions which commit prior to the specified LSN. If + <parameter>upto_nchanges</parameter> is non-NULL, decoding will + stop when the number of rows produced by decoding exceeds + the specified value. Note, however, that the actual number of + rows returned may be larger, since this limit is only checked after + adding the rows produced when decoding each new transaction commit. + </entry> + </row> + + <row> + <entry> + <indexterm> + <primary>pg_logical_slot_peek_changes</primary> + </indexterm> + <literal><function>pg_logical_slot_peek_changes(<parameter>slotname</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal> + </entry> + <entry> + (<parameter>location</parameter> <type>text</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>text</type>) + </entry> + <entry> + Behaves just like + the <function>pg_logical_slot_get_changes()</function> function, + except that changes are not consumed; that is, they will be returned + again on future calls. + </entry> + </row> + + <row> + <entry> + <indexterm> + <primary>pg_logical_slot_get_binary_changes</primary> + </indexterm> + <literal><function>pg_logical_slot_get_binary_changes(<parameter>slotname</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal> + </entry> + <entry> + (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>bytea</type>) + </entry> + <entry> + Behaves just like + the <function>pg_logical_slot_get_changes()</function> function, + except that changes are returned as <type>bytea</type>. + </entry> + </row> + + <row> + <entry> + <indexterm> + <primary>pg_logical_slot_peek_binary_changes</primary> + </indexterm> + <literal><function>pg_logical_slot_peek_binary_changes(<parameter>slotname</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal> + </entry> + <entry> + (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>bytea</type>) + </entry> + <entry> + Behaves just like + the <function>pg_logical_slot_get_changes()</function> function, + except that changes are returned as <type>bytea</type> and that + changes are not consumed; that is, they will be returned again + on future calls. + </entry> + </row> </tbody> </tgroup> </table> + </sect2> <sect2 id="functions-admin-dbobject"> diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml new file mode 100644 index 00000000000..eabdd5f592e --- /dev/null +++ b/doc/src/sgml/logicaldecoding.sgml @@ -0,0 +1,560 @@ +<!-- doc/src/sgml/logicaldecoding.sgml --> + <chapter id="logicaldecoding"> + <title>Logical Decoding</title> + <indexterm zone="logicaldecoding"> + <primary>Logical Decoding</primary> + </indexterm> + <para> + PostgreSQL provides infrastructure to stream the modifications performed + via SQL to external consumers. This functionality can be used to for a + variety of purposes, including replication solutions and auditing. + </para> + + <para> + Changes are sent out in streams identified by logical replication slots. + Each stream outputs each change exactly once. + </para> + + <para> + The format in which those changes are streamed is determined by the output + plugin used. An example plugin is provided, and additional plugins can be + written to extend the choice of available formats without modifying any + core code. + Every output plugin has access to each individual new row produced + by <command>INSERT</command> and the new row version created + by <command>UPDATE</command>. Availability of old row versions for + <command>UPDATE</command> and delete <command>DELETE</command> depends on + the configured + <link linkend="SQL-CREATETABLE-REPLICA-IDENTITY"><literal>REPLICA + IDENTITY</literal></link>. + </para> + + <para> + Changes can be consumed either using the streaming replication protocol + (see <xref linkend="protocol-replication"> and + <xref linkend="logicaldecoding-walsender">), or by calling functions + via SQL (see <xref linkend="logicaldecoding-sql">). It is also possible + to write additional methods of consuming the output of a replication slot + without modifying core code + (see <xref linkend="logicaldecoding-writer">). + </para> + + <sect1 id="logicaldecoding-example"> + <title>Logical Decoding Example</title> + <para> + The following example demonstartes the SQL interface. + </para> + <para> + Before you can use logical decoding, you must set + <xref linkend="guc-wal-level"> to logical and + <xref linkend="guc-max-replication-slots"> ot at least 1. + Then, you should connect to the target database (in the example + below, <literal>postgres</literal>) as a superuser. + </para> + <programlisting> +postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding' +postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + slotname | xlog_position +-----------------+--------------- + regression_slot | 0/16B1970 +(1 row) + +postgres=# SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn +-----------------+---------------+-----------+--------+----------+--------+--------+--------------+------------- + regression_slot | test_decoding | logical | 12052 | postgres | f | | 684 | 0/16A4408 +(1 row) + +postgres=# -- There are no changes to see yet +postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + location | xid | data +----------+-----+------ +(0 rows) + +postgres=# CREATE TABLE data(id serial primary key, data text); +CREATE TABLE + +postgres=# -- DDL isn't replicated, so all you'll see is the transaction +postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + location | xid | data +-----------+-----+------------ + 0/16D5D48 | 688 | BEGIN 688 + 0/16E0380 | 688 | COMMIT 688 +(2 rows) + +postgres=# -- Once changes are read, they're consumed and not emitted +postgres=# -- in a subsequent call: +postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + location | xid | data +----------+-----+------ +(0 rows) + +postgres=# BEGIN; +postgres=# INSERT INTO data(data) VALUES('1'); +postgres=# INSERT INTO data(data) VALUES('2'); +postgres=# COMMIT; + +postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + location | xid | data +-----------+-----+----------------------------------------------- + 0/16E0478 | 689 | BEGIN 689 + 0/16E0478 | 689 | table public.data: INSERT: id[int4]:1 data[text]:'1' + 0/16E0580 | 689 | table public.data: INSERT: id[int4]:2 data[text]:'2' + 0/16E0650 | 689 | COMMIT 689 +(4 rows) + +postgres=# INSERT INTO data(data) VALUES('3'); + +postgres=# -- You can also peek ahead in the change stream without consuming changes +postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); + location | xid | data +-----------+-----+----------------------------------------------- + 0/16E09C0 | 690 | BEGIN 690 + 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3' + 0/16E0B90 | 690 | COMMIT 690 +(3 rows) + +postgres=# -- You can also peek ahead in the change stream without consuming changes +postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); + location | xid | data +-----------+-----+----------------------------------------------- + 0/16E09C0 | 690 | BEGIN 690 + 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3' + 0/16E0B90 | 690 | COMMIT 690 +(3 rows) + +postgres=# -- options can be passed to output plugin, to influence the formatting +postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on'); + location | xid | data +-----------+-----+----------------------------------------------- + 0/16E09C0 | 690 | BEGIN 690 + 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3' + 0/16E0B90 | 690 | COMMIT 690 (at 2014-02-27 16:41:51.863092+01) +(3 rows) + +postgres=# -- Remember to destroy a slot you no longer need to stop it consuming +postgres=# -- server resources: +postgres=# SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +----------------------- + +(1 row) + </programlisting> + <para> + The following example shows usage of the walsender interface using + the <link linkend="app-pgrecvlogical"><command>pg_recvlogical</command></link> + shell command. It requires the replication configurations to be allowed + (see <xref linkend="streaming-replication-authentication">) + and <varname>max_wal_senders</varname> to be set sufficiently high for + another connection. + </para> + <programlisting> +# pg_recvlogical -d testdb --slot test --create +# pg_recvlogical -d testdb --slot test --start -f - +CTRL-Z +# psql -c "INSERT INTO data(data) VALUES('4');" +# fg +BEGIN 693 +table public.data: INSERT: id[int4]:4 data[text]:'4' +COMMIT 693 +CTRL-C +# pg_recvlogical -d testdb --slot test --drop + </programlisting> + </sect1> + <sect1 id="logicaldecoding-explanation"> + <title>Logical Decoding Concepts</title> + <sect2> + <indexterm> + <primary>Logical Decoding</primary> + </indexterm> + <title>Logical Decoding</title> + <para> + Logical decoding is the the process of extracting all persistent changes + to a database's tables into a coherent, easy to understand format which + can be interpreted without detailed knowledge of the database's internal + state. + </para> + <para> + In <productname>PostgreSQL</productname>, logical decoding is implemented + by decoding the contents of the <link linkend="wal">write-ahead + log</link>, which describe changes on a storage level, into an + application-specific form such as a stream of tuples or SQL statements. + </para> + </sect2> + + <sect2> + <indexterm> + <primary>Logical Replication Slot</primary> + </indexterm> + <indexterm> + <primary>Replication Slot</primary> + </indexterm> + <title>Replication Slots</title> + <para> + In the context of logical replication, a slot represents a stream of + changes which can be replayed to a client in the order they were made on + the origin server. Each slot streams a sequence of changes from a single + database, sending each change exactly once (except when peeking forward + in the stream). + </para> + <note> + <para>PostgreSQL also has streaming replication slots + (see <xref linkend="streaming-replication">), but they are used somewhat + differently there. + </para> + </note> + <para> + Replication slots have an identifier which is unique across all databases + in a <productname>PostgreSQL</productname> cluster. Slots persist + independently of the connection using them and are crash-safe. + </para> + <para> + Multiple independent slots may exist for a single database. Each slot has + its own state, allowing different consumers to receive changes from + different points in the database change stream. For most applications, a + separate slot will be required for each consumer. + </para> + <para> + A logical replication slot knows nothing about the state of the + receiver(s). It's even possible to have multiple different receivers using + the same slot at different times; they'll just get the changes following + on from when the last receiver stopped consuming them. Only one receiver + may consume changes from a slot at any given time. + </para> + <note> + <para> + Replication slots persist across crashes and know nothing about the state + of their consumer(s). They will prevent removal of required resources + even when there is no connection using them. This consumes storage + because neither required WAL nor required rows from the system catalogs + can be removed by VACUUM as long as they are required by a replication + slot, so if a slot is no longer required it should be dropped. + </para> + </note> + </sect2> + <sect2> + <title>Output Plugins</title> + <para> + Output plugins transform the data from the write-ahead log's internal + representation into the format the consumer of a replication slot desires. + </para> + </sect2> + <sect2> + <title>Exported Snapshots</title> + <para> + When a new replication slot is created using the walsender interface a + snapshot is exported + (see <xref linkend="functions-snapshot-synchronization">) which will show + exactly the state of the database after which all changes will be + included in the change stream. This can be used to create a new replica by + using <link linkend="sql-set-transaction"><literal>SET TRANSACTION + SNAPSHOT</literal></link> to read the state of the database at the moment + the slot was created. This transaction can then be used to dump the + database's state at that point in time which afterwards can be updated + using the slot's contents without loosing any changes. + </para> + </sect2> + </sect1> + <sect1 id="logicaldecoding-walsender"> + <title>Streaming Replication Protocol Interface</title> + <para> + The <literal>CREATE_REPLICATION_SLOT SLOT slotname LOGICAL + options</literal>, <literal>DROP_REPLICATION_SLOT SLOT slotname</literal> + and <literal>START_REPLICATION SLOT slotname LOGICAL options</literal> + commands can be used to create, drop and stream changes from a replication + slot respectively. These commands are only available over a replication + connection; they cannot be used via SQL. + See <xref linkend="protocol-replication">. + </para> + <para> + The <command>pg_recvlogical</command> command + (see <xref linkend="app-pgrecvlogical">) can be used to control logical + decoding over a walsender connection. + </para> + </sect1> + <sect1 id="logicaldecoding-sql"> + <title>Logical Decoding <acronym>SQL</acronym> Interface</title> + <para> + See <xref linkend="functions-replication"> for detailed documentation on + the SQL-level API for interacting with logical decoding. + </para> + <para> + Synchronous replication (see <xref linkend="synchronous-replication">) is + only supported on replication slots used over the walsender interface. The + function interface and additional, non-core interfaces do not support + synchronous replication. + </para> + </sect1> + <sect1 id="logicaldecoding-catalogs"> + <title>System catalogs related to logical decoding</title> + <para> + The <link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link> + view and the + <link linkend="monitoring-stats-views-table"><structname>pg_stat_replication</structname></link> + view provide information about the current state of replication slots and + walsender connections respectively. These views apply to both physical and + logical replication. + </para> + </sect1> + <sect1 id="logicaldecoding-output-plugin"> + <title>Logical Decoding Output Plugins</title> + <para> + An example output plugin can be found in the + <link linkend="test-decoding"> + <filename>contrib/test_decoding</filename> + </link> + subdirectory of the PostgreSQL source tree. + </para> + <sect2 id="logicaldecoding-output-init"> + <title>Initialization Function</title> + <indexterm zone="logicaldecoding"> + <primary>_PG_output_plugin_init</primary> + </indexterm> + <para> + An output plugin is loaded by dynamically loading a shared library with + the output plugin's name as the library basename. The normal library + search path is used to locate the library. To provide the required output + plugin callbacks and to indicate that the library is actually an output + plugin it needs to provide a function named + <function>_PG_output_plugin_init</function>. This function is passed a + struct that needs to be filled with the callback function pointers for + individual actions. + <programlisting> +typedef struct OutputPluginCallbacks +{ + LogicalDecodeStartupCB startup_cb; + LogicalDecodeBeginCB begin_cb; + LogicalDecodeChangeCB change_cb; + LogicalDecodeCommitCB commit_cb; + LogicalDecodeShutdownCB shutdown_cb; +} OutputPluginCallbacks; +typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb); + </programlisting> + The <function>begin_cb</function>, <function>change_cb</function> + and <function>commit_cb</function> callbacks are required, + while <function>startup_cb</function> + and <function>shutdown_cb</function> are optional. + </para> + </sect2> + + <sect2 id="logicaldecoding-capabilities"> + <title>Capabilities</title> + <para> + To decode, format and output changes, output plugins can use most of the + backend's normal infrastructure, including calling output functions. Read + only access to relations is permitted as long as only relations are + accessed that either have been created by <command>initdb</command> in + the <literal>pg_catalog</literal> schema, or have are marked as user + provided catalog tables using + <programlisting> +ALTER TABLE user_catalog_table SET (user_catalog_table = true); +CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); + </programlisting> + Any actions leading to xid assignment are prohibited. That, among others, + includes writing to tables, performing DDL changes and + calling <literal>txid_current()</literal>. + </para> + </sect2> + + <sect2 id="logicaldecoding-output-plugin-callbacks"> + <title>Output Plugin Callbacks</title> + <para> + An output plugin gets notified about changes that are happening via + various callbacks it needs to provide. + </para> + <para> + Concurrent transactions are decoded in commit order and only changes + belonging to a specific transaction are decoded inbetween + the <literal>begin</literal> and <literal>commit</literal> + callbacks. Transactions that were rolled back explicitly or implicitly + never get + decoded. Successfull <link linkend="SQL-SAVEPOINT">SAVEPOINTs</link> are + folded into the transaction containing them in the order they were + exectuded within that transaction. + </para> + <note> + <para> + Only transactions that have already safely been flushed to disk will be + decoded. That can lead to a COMMIT not immediately being decoded in a + directly following <literal>pg_logical_slot_get_changes()</literal> + when <varname>synchronous_commit</varname> is set + to <literal>off</literal>. + </para> + </note> + <sect3 id="logicaldecoding-output-plugin-startup"> + <title>Startup Callback</title> + <para> + The optional <function>startup_cb</function> callback is called whenever + an replication slot is created or asked to stream changes, independent + of the number of changes that are ready to be put out. + <programlisting> +typedef void (*LogicalDecodeStartupCB) ( + struct LogicalDecodingContext *ctx, + OutputPluginOptions *options, + bool is_init +); + </programlisting> + The <literal>is_init</literal> paramter will be true when the + replication slot is being created and false + otherwise. <parameter>options</parameter> points to a struct of options + that output plugins can set: + <programlisting> +typedef struct OutputPluginOptions +{ + OutputPluginOutputType output_type; +} OutputPluginOptions; + </programlisting> + <literal>output_type</literal> has to either be set to + <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> + or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. + </para> + <para> + The startup callback should validate the options present in + <literal>ctx->output_plugin_options</literal>. If the output plugin + needs to have a state, it can + use <literal>ctx->output_plugin_private</literal> to store it. + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-shutdown"> + <title>Shutdown Callback</title> + <para> + The optional <function>shutdown_cb</function> callback is called + whenever a formerly active replication slot is not used anymore and can + be used to deallocate resources private to the output plugin. The slot + isn't necessarily being dropped, streaming is just being stopped. + <programlisting> +typedef void (*LogicalDecodeShutdownCB) ( + struct LogicalDecodingContext *ctx +); + </programlisting> + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-begin"> + <title>Transaction Begin Callback</title> + <para> + The required <function>begin_cb</function> callback is called whenever a + start of a commited transaction has been decoded. Aborted transactions + and their contents never get decoded. + <programlisting> +typedef void (*LogicalDecodeBeginCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn +); + </programlisting> + The <parameter>txn</parameter> parameter contains meta information about + the transaction, like the timestamp at which it has been committed and + its XID. + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-commit"> + <title>Transaction End Callback</title> + <para> + The required <function>commit_cb</function> callback is called whenever + a transaction commit has been + decoded. The <function>change_cb</function> callbacks for all modified + rows will have been called before this, if there have been any modified + rows. + <programlisting> +typedef void (*LogicalDecodeCommitCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn +); + </programlisting> + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-change"> + <title>Callback called for each individual change in a + transaction</title> + <para> + The required <function>change_cb</function> callback is called for every + individual row modification inside a transaction, may it be + an <command>INSERT</command>, <command>UPDATE</command> + or <command>DELETE</command>. Even if the original command modified + several rows at once the callback will be called indvidually for each + row. + <programlisting> +typedef void (*LogicalDecodeChangeCB) ( + struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change +); + </programlisting> + The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters + have the same contents as for the <function>begin_cb</function> + and <function>commit_cb</function> callbacks, but additionally the + relation descriptor <parameter>relation</parameter> points to the + relation the row belongs to and a struct + <parameter>change</parameter> describing the row modification are passed + in. + </para> + <note> + <para> + Only changes in user defined tables that are not unlogged + (see <xref linkend="SQL-CREATETABLE-UNLOGGED">) and not temporary + (see <xref linkend="SQL-CREATETABLE-TEMPORARY">) can be extracted using + logical decoding. + </para> + </note> + </sect3> + </sect2> + <sect2 id="logicaldecoding-output-plugin-output"> + <title>Functions for producing output from an output plugin</title> + <para> + To actually produce output, output plugins can write data to + the <literal>StringInfo</literal> output buffer + in <literal>ctx->out</literal> when inside + the <function>begin_cb</function>, <function>commit_cb</function> + or <function>change_cb</function> callbacks. Before writing to the output + buffer <function>OutputPluginPrepareWrite(ctx, last_write)</function> has + to be called, and after finishing writing to the + buffer <function>OutputPluginWrite(ctx, last_write)</function> has to be + called to perform the write. The <parameter>last_write</parameter> + indicates whether a particular write was the callback's last write. + </para> + <para> + The following example shows how to output data to the consumer of an + output plugin: + <programlisting> +OutputPluginPrepareWrite(ctx, true); +appendStringInfo(ctx->out, "BEGIN %u", txn->xid); +OutputPluginWrite(ctx, true); + </programlisting> + </para> + </sect2> + </sect1> + <sect1 id="logicaldecoding-writer"> + <title>Logical Decoding Output Writers</title> + <para> + It is possible to add more output methods for logical decoding. + For details, see + <filename>src/backend/replication/logical/logicalfuncs.c</filename>. + Essentially, three functions need to be provided: one to read WAL, one to + prepare writing output, and one to write the output + (see <xref linkend="logicaldecoding-output-plugin-output">). + </para> + </sect1> + <sect1 id="logicaldecoding-synchronous"> + <title>Synchronous replication support for Logical Decoding</title> + <para> + Logical decoding may be used to to build + <link linkend="synchronous-replication">synchronous + replication</link> solutions with the same user interface as synchronous + replication for <link linkend="streaming-replication">streaming + replication</link>. To do this, the walsender interface + (see <xref linkend="logicaldecoding-walsender">) must be used to stream out + data. Clients have to send <literal>Standby status update (F)</literal> + (see <xref linkend="protocol-replication">) messages, just like streaming + replication clients do. + </para> + <note> + <para> + A synchronous replica receiving changes via logical decoding will work in + the scope of a single database. Since, in contrast to + that, <parameter>synchronous_standby_names</parameter> currently is + server wide, this means this technique will not work properly if more + than one database is actively used. + </para> + </note> + </sect1> + </chapter> diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml index b47bf529a2a..9bde1085e9b 100644 --- a/doc/src/sgml/postgres.sgml +++ b/doc/src/sgml/postgres.sgml @@ -219,6 +219,7 @@ &spi; &bgworker; + &logicaldecoding; </part> diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index cb2dfb2ebc0..ea48c270852 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1309,9 +1309,22 @@ the simple query protocol can be used in walsender mode. Passing <literal>database</> as the value instructs walsender to connect to the database specified in the <literal>dbname</> parameter, which will allow the connection to be used for logical replication from that database. +</para> +<para> + For the purpose of testing replication commands, you can make a replication + connection via <application>psql</application> or any other <literal>libpq</literal>-using + tool with a connection string including the <literal>replication</literal> option, + e.g.: + <programlisting> + psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + </programlisting> + However it is often more useful to use + <application>pg_receivexlog</application> (for physical replication) or + <application>pg_recvlogical</application> (for logical replication). +</para> +<para> The commands accepted in walsender mode are: - <variablelist> <varlistentry> <term>IDENTIFY_SYSTEM</term> @@ -1764,6 +1777,43 @@ The commands accepted in walsender mode are: </para> </listitem> </varlistentry> + <varlistentry> + <term><literal>START_REPLICATION</literal> <literal>SLOT</literal> <replaceable class="parameter">slotname</> <literal>LOGICAL</literal> <replaceable class="parameter">XXX/XXX</></term> + <listitem> + <para> + Instructs server to start streaming WAL for logical replication, starting + at WAL position <replaceable class="parameter">XXX/XXX</>. The server can + reply with an error, e.g. if the requested section of WAL has already + been recycled. On success, server responds with a CopyBothResponse + message, and then starts to stream WAL to the frontend. + </para> + <para> + The output plugin associated with the selected slot is used + to process the output for streaming. + </para> + <variablelist> + <varlistentry> + <term><literal>SLOT</literal> <replaceable class="parameter">slotname</></term> + <listitem> + <para> + The name of the slot to stream changes from. This parameter is required, + and must correspond to an existing logical replication slot created + with <literal>CREATE_REPLICATION_SLOT</literal> in + <literal>LOGICAL</literal> mode. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><replaceable class="parameter">XXX/XXX</></term> + <listitem> + <para> + The WAL position to begin streaming at. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> <varlistentry> <term><literal>DROP_REPLICATION_SLOT</literal> <replaceable class="parameter">slotname</></term> diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index ce7a5e3cb6c..1b0962c253d 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -183,6 +183,7 @@ Complete list of usable sgml source files in this directory. <!ENTITY pgDumpall SYSTEM "pg_dumpall.sgml"> <!ENTITY pgIsready SYSTEM "pg_isready.sgml"> <!ENTITY pgReceivexlog SYSTEM "pg_receivexlog.sgml"> +<!ENTITY pgRecvlogical SYSTEM "pg_recvlogical.sgml"> <!ENTITY pgResetxlog SYSTEM "pg_resetxlog.sgml"> <!ENTITY pgRestore SYSTEM "pg_restore.sgml"> <!ENTITY postgres SYSTEM "postgres-ref.sgml"> diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml index 2b02e668e08..4847d663165 100644 --- a/doc/src/sgml/ref/alter_table.sgml +++ b/doc/src/sgml/ref/alter_table.sgml @@ -580,7 +580,7 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="PARAMETER">name</replaceable> </listitem> </varlistentry> - <varlistentry> + <varlistentry id="SQL-CREATETABLE-REPLICA-IDENTITY"> <term><literal>REPLICA IDENTITY</literal></term> <listitem> <para> diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml index fc7ad09786f..2a985b82e5d 100644 --- a/doc/src/sgml/ref/create_table.sgml +++ b/doc/src/sgml/ref/create_table.sgml @@ -137,7 +137,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI <variablelist> - <varlistentry> + <varlistentry id="SQL-CREATETABLE-TEMPORARY"> <term><literal>TEMPORARY</> or <literal>TEMP</></term> <listitem> <para> @@ -171,7 +171,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI </listitem> </varlistentry> - <varlistentry> + <varlistentry id="SQL-CREATETABLE-UNLOGGED"> <term><literal>UNLOGGED</></term> <listitem> <para> @@ -1051,6 +1051,17 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI </listitem> </varlistentry> + <varlistentry> + <term><literal>user_catalog_table</literal> (<type>boolean</type>)</term> + <listitem> + <para> + Declare a table as an additional catalog table, e.g. for the purpose of + logical replication. See + <xref linkend="logicaldecoding-capabilities"> for details. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect2> diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml new file mode 100644 index 00000000000..41e5e838ad1 --- /dev/null +++ b/doc/src/sgml/ref/pg_recvlogical.sgml @@ -0,0 +1,331 @@ +<!-- +doc/src/sgml/ref/pg_recvlogical.sgml +PostgreSQL documentation +--> + +<refentry id="app-pgrecvlogical"> + <refmeta> + <refentrytitle><application>pg_recvlogical</application></refentrytitle> + <manvolnum>1</manvolnum> + <refmiscinfo>Application</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>pg_recvlogical</refname> + <refpurpose>Control logical decoding (see <xref linkend="logicaldecoding">) + streams over a walsender connection.</refpurpose> + </refnamediv> + + <indexterm zone="app-pgrecvlogical"> + <primary>pg_recvlogical</primary> + </indexterm> + + <refsynopsisdiv> + <cmdsynopsis> + <command>pg_recvlogical</command> + <arg rep="repeat" choice="opt"><option>option</option></arg> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1 id="R1-APP-PGRECVLOGICAL-1"> + <title>Description</title> + <para> + <command>pg_recvlogical</command> controls logical decoding replication + slots and streams data from such replication slots. + </para> + <para> + It creates a replication-mode connection, so it is subject to the same + constraints as <link + linkend="app-pgreceivexlog"><application>pg_receivexlog</application></link>, + plus those for logical replication (see <xref + linkend="logicaldecoding">). + </para> + + </refsect1> + + <refsect1> + <title>Options</title> + + <para> + <application>pg_recvlogical</application> runs in one of three modes, which + control its primary action: + + <variablelist> + + <varlistentry> + <term><option>--create</option></term> + <listitem> + <para> + Create a new logical replication slot with the name specified in + <option>--slot</option>, using the output plugin + <option>--plugin</option>, then exit. The slot is created for the + database given in <option>--dbname</option>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>--start</option></term> + <listitem> + <para> + Begin streaming changes from the logical replication slot with the name + specified in <option>--slot</option>, continuing until terminated with a + signal. If the server side change stream ends with a server + shutdown or disconnect, retry in a loop unless + <option>--no-loop</option> is specified. The stream format is + determined by the output plugin specified when the slot was created. + </para> + <para> + You must connect to the same database used to create the slot. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>--drop</option></term> + <listitem> + <para> + Drop the replication slot with the name specified + in <option>--slot</option>, then exit. + </para> + </listitem> + </varlistentry> + </variablelist> + + </para> + + <para> + <application>pg_recvlogical</application> supports all the usual + <literal>libpq</literal>-based options. These are explained in detail in + the documentation for + <link linkend="APP-PSQL"><application>psql</application></link> and for + <link linkend="libpq"><literal>libpq</literal></link>. + + <variablelist> + + <varlistentry> + <term><option>-U <replaceable>user</replaceable></option></term> + <term><option>--username <replaceable>user</replaceable></option></term> + <listitem> + <para> + Username to connect as. Must have a suitable <literal>pg_hba.conf</literal> + entry allowing <literal>replication</literal> connections. Defaults to + current operating system user name. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-d <replaceable>database</replaceable></option></term> + <term><option>--dbname <replaceable>database</replaceable></option></term> + <listitem> + <para> + The database to connect to in <literal>replication</literal> mode; see + mode descriptions for details. May be + a <link linkend="LIBPQ-CONNSTRING">libpq connstring</link> + instead. Defaults to user name. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-h <replaceable>hostname-or-ip</replaceable></option></term> + <term><option>--host <replaceable>hostname-or-ip</replaceable></option></term> + <listitem> + <para> + Host or socket to connect + to. See <link linkend="APP-PSQL"><application>psql</application></link> + and <link linkend="libpq"><literal>libpq</literal></link> + documentation. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-p <replaceable>port</replaceable></option></term> + <term><option>--port <replaceable>port</replaceable></option></term> + <listitem> + <para> + Port number to connect to. See + <link linkend="R1-APP-PSQL-3"><application>psql</application></link> + for an explanation of default port choices when this is not + specified. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-w</option></term> + <term><option>--no-password</option></term> + <listitem> + <para> + Prevent prompting for a password. Will exit with an error code if a + password is required but not available. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-W</option></term> + <term><option>--password</option></term> + <listitem> + <para> + Provide a password for this connection. Please use the pgservice file + (see <xref linkend="libpq-pgservice">) or an environment variable + instead of this option. + </para> + </listitem> + </varlistentry> + + </variablelist> + + </para> + + <para> + The following command-line options control the location and format of the + output and other replication behaviour: + + <variablelist> + + <varlistentry> + <term><option>-f <replaceable>filename</replaceable></option></term> + <term><option>--file=<replaceable>filename</replaceable></option></term> + <listitem> + <para> + Write received and decoded transaction data into this + file. Use <literal>-</> for stdout. + </para> + </listitem> + </varlistentry> + + + <varlistentry> + <term><option>-n</option></term> + <term><option>--no-loop</option></term> + <listitem> + <para> + When the connection to the server is lost, do not retry in a loop, just exit. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-o <replaceable>NAME</replaceable>[=<replaceable>VALUE</replaceable>]</option></term> + <term><option>--option=<replaceable>NAME</replaceable>[=<replaceable>VALUE</replaceable>]</option></term> + <listitem> + <para> + Pass the option <parameter>NAME</parameter> to the output plugin with, + if specified, the option value <parameter>NAME</parameter>. Which + options exist and their effects depends on the used output plugin. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-F <replaceable>interval_seconds</replaceable></option></term> + <term><option>--fsync-interval=<replaceable>interval_seconds</replaceable></option></term> + <listitem> + <para> + How often should + <link linkend="app-pgreceivexlog"><application>pg_receivexlog</application></link> + issue sync commands to ensure the <parameter>--outputfile</parameter> + is safely flushed to disk without being asked by the server to do + so. Specifying an interval of <literal>0</literal> disables issuing + fsyncs altogether, while still reporting progress the server. + In this case, data may be lost in the event of a crash. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-P <replaceable>plugin</replaceable></option></term> + <term><option>--plugin=<replaceable>plugin</replaceable></option></term> + <listitem> + <para> + When creating a slot, use the logical decoding output + plugin. See <xref linkend="logicaldecoding">. This option has no + effect if the slot already exists. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-s <replaceable>interval_seconds</replaceable></option></term> + <term><option>--status-interval=<replaceable>interval_seconds</replaceable></option></term> + <listitem> + <para> + This option has the same effect as the option of the same name in <link + linkend="app-pgreceivexlog"><application>pg_receivexlog</application></link>. + See the description there. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-S <replaceable>slot_name</replaceable></option></term> + <term><option>--slot=<replaceable>slot_name</replaceable></option></term> + <listitem> + <para> + In <option>--start</option> mode, use the existing logical replication slot named + <replaceable>slot_name</replaceable>. In <option>--create</option> mode, create the + slot with this name. In <option>--drop</option> mode, delete the slot with this name. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-I <replaceable>lsn</replaceable></option></term> + <term><option>--startpos=<replaceable>lsn</replaceable></option></term> + <listitem> + <para> + In <option>--start</option> mode, start replication from the given + LSN. For details on the effect of this, see the documentation + in <xref linkend="logicaldecoding"> + and <xref linkend="protocol-replication">. Ignored in other modes. + </para> + </listitem> + </varlistentry> + </variablelist> + + </para> + + <para> + The following additional options are available: + + <variablelist> + + <varlistentry> + <term><option>-v</></term> + <term><option>--verbose</></term> + <listitem> + <para> + Enables verbose mode. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-V</></term> + <term><option>--version</></term> + <listitem> + <para> + Print the <application>pg_recvlogical</application> version and exit. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>-?</></term> + <term><option>--help</></term> + <listitem> + <para> + Show help about <application>pg_recvlogical</application> command line + arguments, and exit. + </para> + </listitem> + </varlistentry> + + </variablelist> + </para> + </refsect1> +</refentry> diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index 87e8e9ee8ff..a6575f52ac0 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -231,6 +231,7 @@ &pgDumpall; &pgIsready; &pgReceivexlog; + &pgRecvlogical; &pgRestore; &psqlRef; &reindexdb; -- GitLab