diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index d36f2f3af1f291af8f9fd9052167485e16996b87..cb2dfb2ebc00ab6486b6570b4ad2a273d6d5b173 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1302,10 +1302,13 @@
 
 <para>
 To initiate streaming replication, the frontend sends the
-<literal>replication</> parameter in the startup message. This tells the
-backend to go into walsender mode, wherein a small set of replication commands
-can be issued instead of SQL statements. Only the simple query protocol can be
-used in walsender mode.
+<literal>replication</> parameter in the startup message. A boolean value
+of <literal>true</> tells the backend to go into walsender mode, wherein a
+small set of replication commands can be issued instead of SQL statements. Only
+the simple query protocol can be used in walsender mode.
+Passing <literal>database</> as the value instructs walsender to connect to
+the database specified in the <literal>dbname</> parameter, which will allow
+the connection to be used for logical replication from that database.
 
 The commands accepted in walsender mode are:
 
@@ -1315,7 +1318,7 @@ The commands accepted in walsender mode are:
     <listitem>
      <para>
       Requests the server to identify itself. Server replies with a result
-      set of a single row, containing three fields:
+      set of a single row, containing four fields:
      </para>
 
      <para>
@@ -1357,6 +1360,17 @@ The commands accepted in walsender mode are:
       </listitem>
       </varlistentry>
 
+      <varlistentry>
+      <term>
+       dbname
+      </term>
+      <listitem>
+      <para>
+       Database connected to or NULL.
+      </para>
+      </listitem>
+      </varlistentry>
+
       </variablelist>
      </para>
     </listitem>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index b7f99fc18d3128eccd7d671d0a5927d124615510..5db01d104a15d63fb348641f567061c49bee2ec7 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1884,10 +1884,23 @@ retry1:
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
 			{
-				if (!parse_bool(valptr, &am_walsender))
+				/*
+				 * Due to backward compatibility concerns the replication
+				 * parameter is a hybrid beast which allows the value to be
+				 * either boolean or the string 'database'. The latter
+				 * connects to a specific database which is e.g. required for
+				 * logical decoding while.
+				 */
+				if (strcmp(valptr, "database") == 0)
+				{
+					am_walsender = true;
+					am_db_walsender = true;
+				}
+				else if (!parse_bool(valptr, &am_walsender))
 					ereport(FATAL,
 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-							 errmsg("invalid value for boolean option \"replication\"")));
+							 errmsg("invalid value for parameter \"replication\""),
+							 errhint("Valid values are: false, 0, true, 1, database.")));
 			}
 			else
 			{
@@ -1968,8 +1981,15 @@ retry1:
 	if (strlen(port->user_name) >= NAMEDATALEN)
 		port->user_name[NAMEDATALEN - 1] = '\0';
 
-	/* Walsender is not related to a particular database */
-	if (am_walsender)
+	/*
+	 * Normal walsender backends, e.g. for streaming replication, are not
+	 * connected to a particular database. But walsenders used for logical
+	 * replication need to connect to a specific database. We allow streaming
+	 * replication commands to be issued even if connected to a database as it
+	 * can make sense to first make a basebackup and then stream changes
+	 * starting from that.
+	 */
+	if (am_walsender && !am_db_walsender)
 		port->database_name[0] = '\0';
 
 	/*
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index c10374cdbca5f6b9db6589f5e55e556f68ac0d74..96f31c4c55b0cec5fbcba9d81f915fb4ef8ffa1a 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -131,7 +131,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 						"the primary server: %s",
 						PQerrorMessage(streamConn))));
 	}
-	if (PQnfields(res) != 3 || PQntuples(res) != 1)
+	if (PQnfields(res) < 3 || PQntuples(res) != 1)
 	{
 		int			ntuples = PQntuples(res);
 		int			nfields = PQnfields(res);
@@ -139,8 +139,8 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 		PQclear(res);
 		ereport(ERROR,
 				(errmsg("invalid response from primary server"),
-				 errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
-						   ntuples, nfields)));
+				 errdetail("Could not identify system: Got %d rows and %d fields, expected %d rows and %d or more fields.",
+						   ntuples, nfields, 3, 1)));
 	}
 	primary_sysid = PQgetvalue(res, 0, 0);
 	*primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 308889b5c9a299a9bd8609793182241f397e5522..154aaace9f50721a0ae55b60afd7ac23e2ed5f8c 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -73,13 +73,17 @@ Node *replication_parse_result;
 %token K_WAL
 %token K_TIMELINE
 %token K_PHYSICAL
+%token K_LOGICAL
 %token K_SLOT
 
 %type <node>	command
-%type <node>	base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history
+%type <node>	base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system timeline_history
 %type <list>	base_backup_opt_list
 %type <defelt>	base_backup_opt
 %type <uintval>	opt_timeline
+%type <list>	plugin_options plugin_opt_list
+%type <defelt>	plugin_opt_elem
+%type <node>	plugin_opt_arg
 %type <str>		opt_slot
 
 %%
@@ -98,6 +102,7 @@ command:
 			identify_system
 			| base_backup
 			| start_replication
+			| start_logical_replication
 			| create_replication_slot
 			| drop_replication_slot
 			| timeline_history
@@ -165,8 +170,8 @@ base_backup_opt:
 				}
 			;
 
-/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */
 create_replication_slot:
+			/* CREATE_REPLICATION_SLOT slot PHYSICAL */
 			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL
 				{
 					CreateReplicationSlotCmd *cmd;
@@ -175,9 +180,19 @@ create_replication_slot:
 					cmd->slotname = $2;
 					$$ = (Node *) cmd;
 				}
+			/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
+			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
+				{
+					CreateReplicationSlotCmd *cmd;
+					cmd = makeNode(CreateReplicationSlotCmd);
+					cmd->kind = REPLICATION_KIND_LOGICAL;
+					cmd->slotname = $2;
+					cmd->plugin = $4;
+					$$ = (Node *) cmd;
+				}
 			;
 
-/* DROP_REPLICATION_SLOT SLOT slot */
+/* DROP_REPLICATION_SLOT slot */
 drop_replication_slot:
 			K_DROP_REPLICATION_SLOT IDENT
 				{
@@ -205,19 +220,19 @@ start_replication:
 				}
 			;
 
-opt_timeline:
-			K_TIMELINE UCONST
+/* START_REPLICATION SLOT slot LOGICAL %X/%X options */
+start_logical_replication:
+			K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options
 				{
-					if ($2 <= 0)
-						ereport(ERROR,
-								(errcode(ERRCODE_SYNTAX_ERROR),
-								 (errmsg("invalid timeline %u", $2))));
-					$$ = $2;
+					StartReplicationCmd *cmd;
+					cmd = makeNode(StartReplicationCmd);
+					cmd->kind = REPLICATION_KIND_LOGICAL;;
+					cmd->slotname = $3;
+					cmd->startpoint = $5;
+					cmd->options = $6;
+					$$ = (Node *) cmd;
 				}
-			| /* EMPTY */
-				{ $$ = 0; }
 			;
-
 /*
  * TIMELINE_HISTORY %d
  */
@@ -250,6 +265,46 @@ opt_slot:
 				{ $$ = NULL; }
 			;
 
+opt_timeline:
+			K_TIMELINE UCONST
+				{
+					if ($2 <= 0)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 (errmsg("invalid timeline %u", $2))));
+					$$ = $2;
+				}
+				| /* EMPTY */			{ $$ = 0; }
+			;
+
+
+plugin_options:
+			'(' plugin_opt_list ')'			{ $$ = $2; }
+			| /* EMPTY */					{ $$ = NIL; }
+		;
+
+plugin_opt_list:
+			plugin_opt_elem
+				{
+					$$ = list_make1($1);
+				}
+			| plugin_opt_list ',' plugin_opt_elem
+				{
+					$$ = lappend($1, $3);
+				}
+		;
+
+plugin_opt_elem:
+			IDENT plugin_opt_arg
+				{
+					$$ = makeDefElem($1, $2);
+				}
+		;
+
+plugin_opt_arg:
+			SCONST							{ $$ = (Node *) makeString($1); }
+			| /* EMPTY */					{ $$ = NULL; }
+		;
 %%
 
 #include "repl_scanner.c"
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index ca32aa67ff16b7ea8b9e31e0aee4642f3efbbacb..a2571244229549dcf19f83eaf4562cca63468866 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -94,6 +94,7 @@ CREATE_REPLICATION_SLOT		{ return K_CREATE_REPLICATION_SLOT; }
 DROP_REPLICATION_SLOT		{ return K_DROP_REPLICATION_SLOT; }
 TIMELINE_HISTORY	{ return K_TIMELINE_HISTORY; }
 PHYSICAL			{ return K_PHYSICAL; }
+LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 
 ","				{ return ','; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 003c797e0ead4b70eabc50e95a9eac8dadb918f6..09854112062de7e286461ff71590ce2e3dc26e91 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -45,15 +45,22 @@
 
 #include "access/timeline.h"
 #include "access/transam.h"
+#include "access/xact.h"
 #include "access/xlog_internal.h"
+
 #include "catalog/pg_type.h"
+#include "commands/dbcommands.h"
 #include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "nodes/replnodes.h"
 #include "replication/basebackup.h"
+#include "replication/decode.h"
+#include "replication/logical.h"
+#include "replication/logicalfuncs.h"
 #include "replication/slot.h"
+#include "replication/snapbuild.h"
 #include "replication/syncrep.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -92,9 +99,10 @@ WalSndCtlData *WalSndCtl = NULL;
 WalSnd	   *MyWalSnd = NULL;
 
 /* Global state */
-bool		am_walsender = false;		/* Am I a walsender process ? */
+bool		am_walsender = false;		/* Am I a walsender process? */
 bool		am_cascading_walsender = false;		/* Am I cascading WAL to
-												 * another standby ? */
+												 * another standby? */
+bool		am_db_walsender = false;	/* Connected to a database? */
 
 /* User-settable parameters for walsender */
 int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
@@ -145,7 +153,7 @@ static StringInfoData tmpbuf;
 static TimestampTz last_reply_timestamp;
 
 /* Have we sent a heartbeat message asking for reply, since last reply? */
-static bool ping_sent = false;
+static bool waiting_for_ping_response = false;
 
 /*
  * While streaming WAL in Copy mode, streamingDoneSending is set to true
@@ -156,6 +164,9 @@ static bool ping_sent = false;
 static bool streamingDoneSending;
 static bool streamingDoneReceiving;
 
+/* Are we there yet? */
+static bool		WalSndCaughtUp = false;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t walsender_ready_to_stop = false;
@@ -168,24 +179,42 @@ static volatile sig_atomic_t walsender_ready_to_stop = false;
  */
 static volatile sig_atomic_t replication_active = false;
 
+static LogicalDecodingContext *logical_decoding_ctx = NULL;
+static XLogRecPtr  logical_startptr = InvalidXLogRecPtr;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static void WalSndLoop(void);
+typedef void (*WalSndSendDataCallback)(void);
+static void WalSndLoop(WalSndSendDataCallback send_data);
 static void InitWalSenderSlot(void);
 static void WalSndKill(int code, Datum arg);
-static void XLogSend(bool *caughtup);
+static void WalSndShutdown(void) __attribute__((noreturn));
+static void XLogSendPhysical(void);
+static void XLogSendLogical(void);
+static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
+static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
+static void StartLogicalReplication(StartReplicationCmd *cmd);
 static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
 static void WalSndKeepalive(bool requestReply);
+static void WalSndKeepaliveIfNecessary(TimestampTz now);
+static void WalSndCheckTimeOut(TimestampTz now);
+static long WalSndComputeSleeptime(TimestampTz now);
+static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+
+static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -240,6 +269,23 @@ WalSndErrorCleanup()
 	WalSndSetState(WALSNDSTATE_STARTUP);
 }
 
+/*
+ * Handle a client's connection abort in an orderly manner.
+ */
+static void
+WalSndShutdown(void)
+{
+	/*
+	 * Reset whereToSendOutput to prevent ereport from attempting to send any
+	 * more messages to the standby.
+	 */
+	if (whereToSendOutput == DestRemote)
+		whereToSendOutput = DestNone;
+
+	proc_exit(0);
+	abort();					/* keep the compiler quiet */
+}
+
 /*
  * Handle the IDENTIFY_SYSTEM command.
  */
@@ -251,10 +297,12 @@ IdentifySystem(void)
 	char		tli[11];
 	char		xpos[MAXFNAMELEN];
 	XLogRecPtr	logptr;
+	char	   *dbname = NULL;
 
 	/*
-	 * Reply with a result set with one row, three columns. First col is
-	 * system ID, second is timeline ID, and third is current xlog location.
+	 * Reply with a result set with one row, four columns. First col is system
+	 * ID, second is timeline ID, third is current xlog location and the fourth
+	 * contains the database name if we are connected to one.
 	 */
 
 	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
@@ -273,9 +321,23 @@ IdentifySystem(void)
 
 	snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
+	if (MyDatabaseId != InvalidOid)
+	{
+		MemoryContext cur = CurrentMemoryContext;
+
+		/* syscache access needs a transaction env. */
+		StartTransactionCommand();
+		/* make dbname live outside TX context */
+		MemoryContextSwitchTo(cur);
+		dbname = get_database_name(MyDatabaseId);
+		CommitTransactionCommand();
+		/* CommitTransactionCommand switches to TopMemoryContext */
+		MemoryContextSwitchTo(cur);
+	}
+
 	/* Send a RowDescription message */
 	pq_beginmessage(&buf, 'T');
-	pq_sendint(&buf, 3, 2);		/* 3 fields */
+	pq_sendint(&buf, 4, 2);		/* 4 fields */
 
 	/* first field */
 	pq_sendstring(&buf, "systemid");	/* col name */
@@ -296,24 +358,43 @@ IdentifySystem(void)
 	pq_sendint(&buf, 0, 2);		/* format code */
 
 	/* third field */
-	pq_sendstring(&buf, "xlogpos");
-	pq_sendint(&buf, 0, 4);
-	pq_sendint(&buf, 0, 2);
-	pq_sendint(&buf, TEXTOID, 4);
-	pq_sendint(&buf, -1, 2);
-	pq_sendint(&buf, 0, 4);
-	pq_sendint(&buf, 0, 2);
+	pq_sendstring(&buf, "xlogpos"); /* col name */
+	pq_sendint(&buf, 0, 4);     /* table oid */
+	pq_sendint(&buf, 0, 2);     /* attnum */
+	pq_sendint(&buf, TEXTOID, 4);       /* type oid */
+	pq_sendint(&buf, -1, 2);        /* typlen */
+	pq_sendint(&buf, 0, 4);     /* typmod */
+	pq_sendint(&buf, 0, 2);     /* format code */
+
+	/* fourth field */
+	pq_sendstring(&buf, "dbname");  /* col name */
+	pq_sendint(&buf, 0, 4);     /* table oid */
+	pq_sendint(&buf, 0, 2);     /* attnum */
+	pq_sendint(&buf, TEXTOID, 4);       /* type oid */
+	pq_sendint(&buf, -1, 2);        /* typlen */
+	pq_sendint(&buf, 0, 4);     /* typmod */
+	pq_sendint(&buf, 0, 2);     /* format code */
 	pq_endmessage(&buf);
 
 	/* Send a DataRow message */
 	pq_beginmessage(&buf, 'D');
-	pq_sendint(&buf, 3, 2);		/* # of columns */
+	pq_sendint(&buf, 4, 2);		/* # of columns */
 	pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
 	pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
 	pq_sendint(&buf, strlen(tli), 4);	/* col2 len */
 	pq_sendbytes(&buf, (char *) tli, strlen(tli));
 	pq_sendint(&buf, strlen(xpos), 4);	/* col3 len */
 	pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+	/* send NULL if not connected to a database */
+	if (dbname)
+	{
+		pq_sendint(&buf, strlen(dbname), 4);    /* col4 len */
+		pq_sendbytes(&buf, (char *) dbname, strlen(dbname));
+	}
+	else
+	{
+		pq_sendint(&buf, -1, 4);    /* col4 len, NULL */
+	}
 
 	pq_endmessage(&buf);
 }
@@ -572,7 +653,7 @@ StartReplication(StartReplicationCmd *cmd)
 		/* Main loop of walsender */
 		replication_active = true;
 
-		WalSndLoop();
+		WalSndLoop(XLogSendPhysical);
 
 		replication_active = false;
 		if (walsender_ready_to_stop)
@@ -642,6 +723,39 @@ StartReplication(StartReplicationCmd *cmd)
 	pq_puttextmessage('C', "START_STREAMING");
 }
 
+/*
+ * read_page callback for logical decoding contexts, as a walsender process.
+ *
+ * Inside the walsender we can do better than logical_read_local_xlog_page,
+ * which has to do a plain sleep/busy loop, because the walsender's latch gets
+ * set everytime WAL is flushed.
+ */
+static int
+logical_read_xlog_page(XLogReaderState* state, XLogRecPtr targetPagePtr, int reqLen,
+				 XLogRecPtr targetRecPtr, char* cur_page, TimeLineID *pageTLI)
+{
+	XLogRecPtr flushptr;
+	int		count;
+
+	/* make sure we have enough WAL available */
+	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+	/* more than one block available */
+	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+		count = XLOG_BLCKSZ;
+	/* not enough WAL synced, that can happen during shutdown */
+	else if (targetPagePtr + reqLen > flushptr)
+		return -1;
+	/* part of the page available */
+	else
+		count = flushptr - targetPagePtr;
+
+	/* now actually read the data, we know it's there */
+	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+
+	return count;
+}
+
 /*
  * Create a new replication slot.
  */
@@ -649,6 +763,8 @@ static void
 CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 {
 	const char *slot_name;
+	const char *snapshot_name = NULL;
+	char        xpos[MAXFNAMELEN];
 	StringInfoData buf;
 
 	Assert(!MyReplicationSlot);
@@ -657,24 +773,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	sendTimeLineIsHistoric = false;
 	sendTimeLine = ThisTimeLineID;
 
-	ReplicationSlotCreate(cmd->slotname,
-						  cmd->kind == REPLICATION_KIND_LOGICAL,
-						  RS_PERSISTENT);
+	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
+	{
+		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT);
+	}
+	else
+	{
+		CheckLogicalDecodingRequirements();
+		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
+	}
 
 	initStringInfo(&output_message);
 
 	slot_name = NameStr(MyReplicationSlot->data.name);
 
-	/*
-	 * It may seem somewhat pointless to send back the same slot name the
-	 * client just requested and nothing else, but logical replication
-	 * will add more fields here.  (We could consider removing the slot
-	 * name from what's sent back, though, since the client has specified
-	 * that.)
-	 */
+	if (cmd->kind == REPLICATION_KIND_LOGICAL)
+	{
+		LogicalDecodingContext *ctx;
+
+		ctx = CreateInitDecodingContext(
+			cmd->plugin, NIL,
+			logical_read_xlog_page,
+			WalSndPrepareWrite, WalSndWriteData);
+
+		/* build initial snapshot, might take a while */
+		DecodingContextFindStartpoint(ctx);
+
+		/*
+		 * Export a plain (not of the snapbuild.c type) snapshot to the user
+		 * that can be imported into another session.
+		 */
+		snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+
+		/* don't need the decoding context anymore */
+		FreeDecodingContext(ctx);
+
+		ReplicationSlotPersist();
+	}
+
+	slot_name = NameStr(MyReplicationSlot->data.name);
+	snprintf(xpos, sizeof(xpos), "%X/%X",
+			 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
+			 (uint32) MyReplicationSlot->data.confirmed_flush);
 
 	pq_beginmessage(&buf, 'T');
-	pq_sendint(&buf, 1, 2);		/* 1 field */
+	pq_sendint(&buf, 4, 2);		/* 4 fields */
 
 	/* first field: slot name */
 	pq_sendstring(&buf, "slot_name");	/* col name */
@@ -685,16 +828,65 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	pq_sendint(&buf, 0, 4);		/* typmod */
 	pq_sendint(&buf, 0, 2);		/* format code */
 
+	/* second field: LSN at which we became consistent */
+	pq_sendstring(&buf, "consistent_point");	/* col name */
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, 0, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, 0, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+
+	/* third field: exported snapshot's name */
+	pq_sendstring(&buf, "snapshot_name");	/* col name */
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, 0, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, 0, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+
+	/* fourth field: output plugin */
+	pq_sendstring(&buf, "output_plugin");	/* col name */
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, 0, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, 0, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+
 	pq_endmessage(&buf);
 
 	/* Send a DataRow message */
 	pq_beginmessage(&buf, 'D');
-	pq_sendint(&buf, 1, 2);		/* # of columns */
+	pq_sendint(&buf, 4, 2);		/* # of columns */
 
 	/* slot_name */
 	pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
 	pq_sendbytes(&buf, slot_name, strlen(slot_name));
 
+	/* consistent wal location */
+	pq_sendint(&buf, strlen(xpos), 4); /* col2 len */
+	pq_sendbytes(&buf, xpos, strlen(xpos));
+
+	/* snapshot name */
+	if (snapshot_name != NULL)
+	{
+		pq_sendint(&buf, strlen(snapshot_name), 4); /* col3 len */
+		pq_sendbytes(&buf, snapshot_name, strlen(snapshot_name));
+	}
+	else
+		pq_sendint(&buf, -1, 4);    /* col3 len, NULL */
+
+	/* plugin */
+	if (cmd->plugin != NULL)
+	{
+		pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */
+		pq_sendbytes(&buf, cmd->plugin, strlen(cmd->plugin));
+	}
+	else
+		pq_sendint(&buf, -1, 4);	/* col4 len, NULL */
+
 	pq_endmessage(&buf);
 
 	/*
@@ -713,6 +905,339 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
 	EndCommand("DROP_REPLICATION_SLOT", DestRemote);
 }
 
+/*
+ * Load previously initiated logical slot and prepare for sending data (via
+ * WalSndLoop).
+ */
+static void
+StartLogicalReplication(StartReplicationCmd *cmd)
+{
+	StringInfoData buf;
+
+	/* make sure that our requirements are still fulfilled */
+	CheckLogicalDecodingRequirements();
+
+	Assert(!MyReplicationSlot);
+
+	ReplicationSlotAcquire(cmd->slotname);
+
+	/*
+	 * Force a disconnect, so that the decoding code doesn't need to care
+	 * about a eventual switch from running in recovery, to running in a
+	 * normal environment. Client code is expected to handle reconnects.
+	 */
+	if (am_cascading_walsender && !RecoveryInProgress())
+	{
+		ereport(LOG,
+				(errmsg("terminating walsender process after promotion")));
+		walsender_ready_to_stop = true;
+	}
+
+	WalSndSetState(WALSNDSTATE_CATCHUP);
+
+	/* Send a CopyBothResponse message, and start streaming */
+	pq_beginmessage(&buf, 'W');
+	pq_sendbyte(&buf, 0);
+	pq_sendint(&buf, 0, 2);
+	pq_endmessage(&buf);
+	pq_flush();
+
+	/* setup state for XLogReadPage */
+	sendTimeLineIsHistoric = false;
+	sendTimeLine = ThisTimeLineID;
+
+	/*
+	 * Initialize position to the last ack'ed one, then the xlog records begin
+	 * to be shipped from that position.
+	 */
+	logical_decoding_ctx = CreateDecodingContext(
+		cmd->startpoint, cmd->options,
+		logical_read_xlog_page,
+		WalSndPrepareWrite, WalSndWriteData);
+
+	/* Start reading WAL from the oldest required WAL. */
+	logical_startptr = MyReplicationSlot->data.restart_lsn;
+
+	/*
+	 * Report the location after which we'll send out further commits as the
+	 * current sentPtr.
+	 */
+	sentPtr = MyReplicationSlot->data.confirmed_flush;
+
+	/* Also update the sent position status in shared memory */
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = MyWalSnd;
+
+		SpinLockAcquire(&walsnd->mutex);
+		walsnd->sentPtr = MyReplicationSlot->data.restart_lsn;
+		SpinLockRelease(&walsnd->mutex);
+	}
+
+	replication_active = true;
+
+	SyncRepInitConfig();
+
+	/* Main loop of walsender */
+	WalSndLoop(XLogSendLogical);
+
+	FreeDecodingContext(logical_decoding_ctx);
+	ReplicationSlotRelease();
+
+	replication_active = false;
+	if (walsender_ready_to_stop)
+		proc_exit(0);
+	WalSndSetState(WALSNDSTATE_STARTUP);
+
+	/* Get out of COPY mode (CommandComplete). */
+	EndCommand("COPY 0", DestRemote);
+}
+
+/*
+ * LogicalDecodingContext 'prepare_write' callback.
+ *
+ * Prepare a write into a StringInfo.
+ *
+ * Don't do anything lasting in here, it's quite possible that nothing will done
+ * with the data.
+ */
+static void
+WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
+{
+	/* can't have sync rep confused by sending the same LSN several times */
+	if (!last_write)
+		lsn = InvalidXLogRecPtr;
+
+	resetStringInfo(ctx->out);
+
+	pq_sendbyte(ctx->out, 'w');
+	pq_sendint64(ctx->out, lsn);	/* dataStart */
+	pq_sendint64(ctx->out, lsn);	/* walEnd */
+	/*
+	 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
+	 * reserve space here.
+	 */
+	pq_sendint64(ctx->out, 0);		/* sendtime */
+}
+
+/*
+ * LogicalDecodingContext 'write' callback.
+ *
+ * Actually write out data previously prepared by WalSndPrepareWrite out to
+ * the network. Take as long as needed, but process replies from the other
+ * side and check timeouts during that.
+ */
+static void
+WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+				bool last_write)
+{
+	/* output previously gathered data in a CopyData packet */
+	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
+
+	/*
+	 * Fill the send timestamp last, so that it is taken as late as
+	 * possible. This is somewhat ugly, but the protocol's set as it's already
+	 * used for several releases by streaming physical replication.
+	 */
+	resetStringInfo(&tmpbuf);
+	pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
+	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
+		   tmpbuf.data, sizeof(int64));
+
+	/* fast path */
+	/* Try to flush pending output to the client */
+	if (pq_flush_if_writable() != 0)
+		WalSndShutdown();
+
+	if (!pq_is_send_pending())
+		return;
+
+	for (;;)
+	{
+		int			wakeEvents;
+		long		sleeptime;
+		TimestampTz	now;
+
+		/*
+		 * Emergency bailout if postmaster has died.  This is to avoid the
+		 * necessity for manual cleanup of all postmaster children.
+		 */
+		if (!PostmasterIsAlive())
+			exit(1);
+
+		/* Process any requests or signals received recently */
+		if (got_SIGHUP)
+		{
+			got_SIGHUP = false;
+			ProcessConfigFile(PGC_SIGHUP);
+			SyncRepInitConfig();
+		}
+
+		/* Check for input from the client */
+		ProcessRepliesIfAny();
+
+		/* Clear any already-pending wakeups */
+		ResetLatch(&MyWalSnd->latch);
+
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+
+		/* If we finished clearing the buffered data, we're done here. */
+		if (!pq_is_send_pending())
+			break;
+
+		now = GetCurrentTimestamp();
+
+		/* die if timeout was reached */
+		WalSndCheckTimeOut(now);
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary(now);
+
+		sleeptime = WalSndComputeSleeptime(now);
+
+		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
+
+		/* Sleep until something happens or we time out */
+		ImmediateInterruptOK = true;
+		CHECK_FOR_INTERRUPTS();
+		WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+						  MyProcPort->sock, sleeptime);
+		ImmediateInterruptOK = false;
+	}
+
+	/* reactivate latch so WalSndLoop knows to continue */
+	SetLatch(&MyWalSnd->latch);
+}
+
+/*
+ * Wait till WAL < loc is flushed to disk so it can be safely read.
+ */
+static XLogRecPtr
+WalSndWaitForWal(XLogRecPtr loc)
+{
+	int			wakeEvents;
+	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+
+
+	/*
+	 * Fast path to avoid acquiring the spinlock in the we already know we
+	 * have enough WAL available. This is particularly interesting if we're
+	 * far behind.
+	 */
+	if (RecentFlushPtr != InvalidXLogRecPtr &&
+		loc <= RecentFlushPtr)
+		return RecentFlushPtr;
+
+	/* Get a more recent flush pointer. */
+	if (!RecoveryInProgress())
+		RecentFlushPtr = GetFlushRecPtr();
+	else
+		RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+
+	for (;;)
+	{
+		long		sleeptime;
+		TimestampTz	now;
+
+		/*
+		 * Emergency bailout if postmaster has died.  This is to avoid the
+		 * necessity for manual cleanup of all postmaster children.
+		 */
+		if (!PostmasterIsAlive())
+			exit(1);
+
+		/* Process any requests or signals received recently */
+		if (got_SIGHUP)
+		{
+			got_SIGHUP = false;
+			ProcessConfigFile(PGC_SIGHUP);
+			SyncRepInitConfig();
+		}
+
+		/* Check for input from the client */
+		ProcessRepliesIfAny();
+
+		/* Clear any already-pending wakeups */
+		ResetLatch(&MyWalSnd->latch);
+
+		/* Update our idea of the currently flushed position. */
+		if (!RecoveryInProgress())
+			RecentFlushPtr = GetFlushRecPtr();
+		else
+			RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+
+		/*
+		 * If postmaster asked us to stop, don't wait here anymore. This will
+		 * cause the xlogreader to return without reading a full record, which
+		 * is the fastest way to reach the mainloop which then can quit.
+		 *
+		 * It's important to do this check after the recomputation of
+		 * RecentFlushPtr, so we can send all remaining data before shutting
+		 * down.
+		 */
+		if (walsender_ready_to_stop)
+			break;
+
+		/*
+		 * We only send regular messages to the client for full decoded
+		 * transactions, but a synchronous replication and walsender shutdown
+		 * possibly are waiting for a later location. So we send pings
+		 * containing the flush location every now and then.
+		 */
+		if (MyWalSnd->flush < sentPtr && !waiting_for_ping_response)
+		{
+			WalSndKeepalive(true);
+			waiting_for_ping_response = true;
+		}
+
+		/* check whether we're done */
+		if (loc <= RecentFlushPtr)
+			break;
+
+		/* Waiting for new WAL. Since we need to wait, we're now caught up. */
+		WalSndCaughtUp = true;
+
+		/*
+		 * Try to flush pending output to the client. Also wait for the socket
+		 * becoming writable, if there's still pending output after an attempt
+		 * to flush. Otherwise we might just sit on output data while waiting
+		 * for new WAL being generated.
+		 */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+
+		now = GetCurrentTimestamp();
+
+		/* die if timeout was reached */
+		WalSndCheckTimeOut(now);
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary(now);
+
+		sleeptime = WalSndComputeSleeptime(now);
+
+		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+			WL_SOCKET_READABLE | WL_TIMEOUT;
+
+		if (pq_is_send_pending())
+			wakeEvents |= WL_SOCKET_WRITEABLE;
+
+		/* Sleep until something happens or we time out */
+		ImmediateInterruptOK = true;
+		CHECK_FOR_INTERRUPTS();
+		WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+						  MyProcPort->sock, sleeptime);
+		ImmediateInterruptOK = false;
+	}
+
+	/* reactivate latch so WalSndLoop knows to continue */
+	SetLatch(&MyWalSnd->latch);
+	return RecentFlushPtr;
+}
+
 /*
  * Execute an incoming replication command.
  */
@@ -724,6 +1249,12 @@ exec_replication_command(const char *cmd_string)
 	MemoryContext cmd_context;
 	MemoryContext old_context;
 
+	/*
+	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
+	 * command arrives. Clean up the old stuff if there's anything.
+	 */
+	SnapBuildClearExportedSnapshot();
+
 	elog(DEBUG1, "received replication command: %s", cmd_string);
 
 	CHECK_FOR_INTERRUPTS();
@@ -769,7 +1300,7 @@ exec_replication_command(const char *cmd_string)
 				if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 					StartReplication(cmd);
 				else
-					elog(ERROR, "cannot handle logical decoding yet");
+					StartLogicalReplication(cmd);
 				break;
 			}
 
@@ -887,7 +1418,7 @@ ProcessRepliesIfAny(void)
 	if (received)
 	{
 		last_reply_timestamp = GetCurrentTimestamp();
-		ping_sent = false;
+		waiting_for_ping_response = false;
 	}
 }
 
@@ -1020,7 +1551,7 @@ ProcessStandbyReplyMessage(void)
 	if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
 	{
 		if (MyReplicationSlot->data.database != InvalidOid)
-			elog(ERROR, "cannot handle logical decoding yet");
+			LogicalConfirmReceivedLocation(flushPtr);
 		else
 			PhysicalConfirmReceivedLocation(flushPtr);
 	}
@@ -1146,12 +1677,81 @@ ProcessStandbyHSFeedbackMessage(void)
 		MyPgXact->xmin = feedbackXmin;
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Compute how long send/receive loops should sleep.
+ *
+ * If wal_sender_timeout is enabled we want to wake up in time to send
+ * keepalives and to abort the connection if wal_sender_timeout has been
+ * reached.
+ */
+static long
+WalSndComputeSleeptime(TimestampTz now)
+{
+	long		sleeptime = 10000;		/* 10 s */
+
+	if (wal_sender_timeout > 0)
+	{
+		TimestampTz wakeup_time;
+		long sec_to_timeout;
+		int microsec_to_timeout;
+
+		/*
+		 * At the latest stop sleeping once wal_sender_timeout has been
+		 * reached.
+		 */
+		wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+												  wal_sender_timeout);
+
+		/*
+		 * If no ping has been sent yet, wakeup when it's time to do
+		 * so. WalSndKeepaliveIfNecessary() wants to send a keepalive once
+		 * half of the timeout passed without a response.
+		 */
+		if (!waiting_for_ping_response)
+			wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+													wal_sender_timeout / 2);
+
+		/* Compute relative time until wakeup. */
+		TimestampDifference(now, wakeup_time,
+							&sec_to_timeout, &microsec_to_timeout);
+
+		sleeptime = sec_to_timeout * 1000 +
+			microsec_to_timeout / 1000;
+	}
+
+	return sleeptime;
+}
+
+/*
+ * Check whether there have been responses by the client within
+ * wal_sender_timeout and shutdown if not.
+ */
 static void
-WalSndLoop(void)
+WalSndCheckTimeOut(TimestampTz now)
 {
-	bool		caughtup = false;
+	TimestampTz timeout;
+
+	timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+										  wal_sender_timeout);
 
+	if (wal_sender_timeout > 0 && now >= timeout)
+	{
+		/*
+		 * Since typically expiration of replication timeout means
+		 * communication problem, we don't send the error message to
+		 * the standby.
+		 */
+		ereport(COMMERROR,
+				(errmsg("terminating walsender process due to replication timeout")));
+
+		WalSndShutdown();
+	}
+}
+
+/* Main loop of walsender process that streams the WAL over Copy messages. */
+static void
+WalSndLoop(WalSndSendDataCallback send_data)
+{
 	/*
 	 * Allocate buffers that will be used for each outgoing and incoming
 	 * message.  We do this just once to reduce palloc overhead.
@@ -1162,7 +1762,7 @@ WalSndLoop(void)
 
 	/* Initialize the last reply timestamp */
 	last_reply_timestamp = GetCurrentTimestamp();
-	ping_sent = false;
+	waiting_for_ping_response = false;
 
 	/*
 	 * Loop until we reach the end of this timeline or the client requests to
@@ -1170,8 +1770,7 @@ WalSndLoop(void)
 	 */
 	for (;;)
 	{
-		/* Clear any already-pending wakeups */
-		ResetLatch(&MyWalSnd->latch);
+		TimestampTz	now;
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -1193,6 +1792,9 @@ WalSndLoop(void)
 		/* Check for input from the client */
 		ProcessRepliesIfAny();
 
+		/* Clear any already-pending wakeups */
+		ResetLatch(&MyWalSnd->latch);
+
 		/*
 		 * If we have received CopyDone from the client, sent CopyDone
 		 * ourselves, and the output buffer is empty, it's time to exit
@@ -1203,21 +1805,21 @@ WalSndLoop(void)
 
 		/*
 		 * If we don't have any pending data in the output buffer, try to send
-		 * some more.  If there is some, we don't bother to call XLogSend
+		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
 		 */
 		if (!pq_is_send_pending())
-			XLogSend(&caughtup);
+			send_data();
 		else
-			caughtup = false;
+			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
-			goto send_failure;
+			WalSndShutdown();
 
 		/* If nothing remains to be sent right now ... */
-		if (caughtup && !pq_is_send_pending())
+		if (WalSndCaughtUp && !pq_is_send_pending())
 		{
 			/*
 			 * If we're in catchup state, move to streaming.  This is an
@@ -1243,111 +1845,47 @@ WalSndLoop(void)
 			 * the walsender is not sure which.
 			 */
 			if (walsender_ready_to_stop)
-			{
-				/* ... let's just be real sure we're caught up ... */
-				XLogSend(&caughtup);
-				if (caughtup && sentPtr == MyWalSnd->flush &&
-					!pq_is_send_pending())
-				{
-					/* Inform the standby that XLOG streaming is done */
-					EndCommand("COPY 0", DestRemote);
-					pq_flush();
-
-					proc_exit(0);
-				}
-			}
+				WalSndDone(send_data);
 		}
 
-		/*
-		 * If half of wal_sender_timeout has elapsed without receiving any
-		 * reply from standby, send a keep-alive message requesting an
-		 * immediate reply.
-		 */
-		if (wal_sender_timeout > 0 && !ping_sent)
-		{
-			TimestampTz timeout;
+		now = GetCurrentTimestamp();
 
-			timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
-												  wal_sender_timeout / 2);
-			if (GetCurrentTimestamp() >= timeout)
-			{
-				WalSndKeepalive(true);
-				ping_sent = true;
-				/* Try to flush pending output to the client */
-				if (pq_flush_if_writable() != 0)
-					goto send_failure;
-			}
-		}
+		/* Check for replication timeout. */
+		WalSndCheckTimeOut(now);
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary(now);
 
 		/*
 		 * We don't block if not caught up, unless there is unsent data
 		 * pending in which case we'd better block until the socket is
-		 * write-ready.  This test is only needed for the case where XLogSend
-		 * loaded a subset of the available data but then pq_flush_if_writable
-		 * flushed it all --- we should immediately try to send more.
+		 * write-ready.  This test is only needed for the case where the
+		 * send_data callback handled a subset of the available data but then
+		 * pq_flush_if_writable flushed it all --- we should immediately try
+		 * to send more.
 		 */
-		if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
+		if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
 		{
-			TimestampTz timeout;
-			long		sleeptime = 10000;		/* 10 s */
+			long		sleeptime;
 			int			wakeEvents;
 
 			wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
 				WL_SOCKET_READABLE;
 
+			sleeptime = WalSndComputeSleeptime(now);
+
 			if (pq_is_send_pending())
 				wakeEvents |= WL_SOCKET_WRITEABLE;
 
-			/*
-			 * If wal_sender_timeout is active, sleep in smaller increments
-			 * to not go over the timeout too much. XXX: Why not just sleep
-			 * until the timeout has elapsed?
-			 */
-			if (wal_sender_timeout > 0)
-				sleeptime = 1 + (wal_sender_timeout / 10);
-
 			/* Sleep until something happens or we time out */
 			ImmediateInterruptOK = true;
 			CHECK_FOR_INTERRUPTS();
 			WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
 							  MyProcPort->sock, sleeptime);
 			ImmediateInterruptOK = false;
-
-			/*
-			 * Check for replication timeout.  Note we ignore the corner case
-			 * possibility that the client replied just as we reached the
-			 * timeout ... he's supposed to reply *before* that.
-			 */
-			timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
-												  wal_sender_timeout);
-			if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
-			{
-				/*
-				 * Since typically expiration of replication timeout means
-				 * communication problem, we don't send the error message to
-				 * the standby.
-				 */
-				ereport(COMMERROR,
-						(errmsg("terminating walsender process due to replication timeout")));
-				goto send_failure;
-			}
 		}
 	}
 	return;
-
-send_failure:
-
-	/*
-	 * Get here on send failure.  Clean up and exit.
-	 *
-	 * Reset whereToSendOutput to prevent ereport from attempting to send any
-	 * more messages to the standby.
-	 */
-	if (whereToSendOutput == DestRemote)
-		whereToSendOutput = DestNone;
-
-	proc_exit(0);
-	abort();					/* keep the compiler quiet */
 }
 
 /* Initialize a per-walsender data structure for this walsender process */
@@ -1605,15 +2143,17 @@ retry:
 }
 
 /*
+ * Send out the WAL in its normal physical/stored form.
+ *
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
  * but not yet sent to the client, and buffer it in the libpq output
  * buffer.
  *
- * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
- * *caughtup is set to false.
+ * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
+ * otherwise WalSndCaughtUp is set to false.
  */
 static void
-XLogSend(bool *caughtup)
+XLogSendPhysical(void)
 {
 	XLogRecPtr	SendRqstPtr;
 	XLogRecPtr	startptr;
@@ -1622,7 +2162,7 @@ XLogSend(bool *caughtup)
 
 	if (streamingDoneSending)
 	{
-		*caughtup = true;
+		WalSndCaughtUp = true;
 		return;
 	}
 
@@ -1739,7 +2279,7 @@ XLogSend(bool *caughtup)
 		pq_putmessage_noblock('c', NULL, 0);
 		streamingDoneSending = true;
 
-		*caughtup = true;
+		WalSndCaughtUp = true;
 
 		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
 			 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
@@ -1751,7 +2291,7 @@ XLogSend(bool *caughtup)
 	Assert(sentPtr <= SendRqstPtr);
 	if (SendRqstPtr <= sentPtr)
 	{
-		*caughtup = true;
+		WalSndCaughtUp = true;
 		return;
 	}
 
@@ -1775,15 +2315,15 @@ XLogSend(bool *caughtup)
 	{
 		endptr = SendRqstPtr;
 		if (sendTimeLineIsHistoric)
-			*caughtup = false;
+			WalSndCaughtUp = false;
 		else
-			*caughtup = true;
+			WalSndCaughtUp = true;
 	}
 	else
 	{
 		/* round down to page boundary. */
 		endptr -= (endptr % XLOG_BLCKSZ);
-		*caughtup = false;
+		WalSndCaughtUp = false;
 	}
 
 	nbytes = endptr - startptr;
@@ -1843,6 +2383,85 @@ XLogSend(bool *caughtup)
 	return;
 }
 
+/*
+ * Stream out logically decoded data.
+ */
+static void
+XLogSendLogical(void)
+{
+	XLogRecord *record;
+	char	   *errm;
+
+	/*
+	 * Don't know whether we've caught up yet. We'll set it to true in
+	 * WalSndWaitForWal, if we're actually waiting. We also set to true if
+	 * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
+	 * i.e. when we're shutting down.
+	 */
+	WalSndCaughtUp = false;
+
+	record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
+	logical_startptr = InvalidXLogRecPtr;
+
+	/* xlog record was invalid */
+	if (errm != NULL)
+		elog(ERROR, "%s", errm);
+
+	if (record != NULL)
+	{
+		LogicalDecodingProcessRecord(logical_decoding_ctx, record);
+
+		sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+	}
+	else
+	{
+		/*
+		 * If the record we just wanted read is at or beyond the flushed point,
+		 * then we're caught up.
+		 */
+		if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
+			WalSndCaughtUp = true;
+	}
+
+	/* Update shared memory status */
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = MyWalSnd;
+
+		SpinLockAcquire(&walsnd->mutex);
+		walsnd->sentPtr = sentPtr;
+		SpinLockRelease(&walsnd->mutex);
+	}
+}
+
+/*
+ * Shutdown if the sender is caught up.
+ *
+ * NB: This should only be called when the shutdown signal has been received
+ * from postmaster.
+ *
+ * Note that if we determine that there's still more data to send, this
+ * function will return control to the caller.
+ */
+static void
+WalSndDone(WalSndSendDataCallback send_data)
+{
+	/* ... let's just be real sure we're caught up ... */
+	send_data();
+
+	if (WalSndCaughtUp && sentPtr == MyWalSnd->flush &&
+		!pq_is_send_pending())
+	{
+		/* Inform the standby that XLOG streaming is done */
+		EndCommand("COPY 0", DestRemote);
+		pq_flush();
+
+		proc_exit(0);
+	}
+	if (!waiting_for_ping_response)
+		WalSndKeepalive(true);
+}
+
 /*
  * Returns the latest point in WAL that has been safely flushed to disk, and
  * can be sent to the standby. This should only be called when in recovery,
@@ -2238,6 +2857,38 @@ WalSndKeepalive(bool requestReply)
 	pq_putmessage_noblock('d', output_message.data, output_message.len);
 }
 
+/*
+ * Send keepalive message if too much time has elapsed.
+ */
+static void
+WalSndKeepaliveIfNecessary(TimestampTz now)
+{
+	TimestampTz ping_time;
+
+	if (wal_sender_timeout <= 0)
+		return;
+
+	if (waiting_for_ping_response)
+		return;
+
+	/*
+	 * If half of wal_sender_timeout has lapsed without receiving any reply
+	 * from the standby, send a keep-alive message to the standby requesting
+	 * an immediate reply.
+	 */
+	ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+											wal_sender_timeout / 2);
+	if (now >= ping_time)
+	{
+		WalSndKeepalive(true);
+		waiting_for_ping_response = true;
+
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+	}
+}
+
 /*
  * This isn't currently used for anything. Monitoring tools might be
  * interested in the future, and we'll need something like this in the
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 3ecc4d3ae0db750f8209dd0c386b728c4f63890c..89a7c9e15c21f201da7a508891feb7132343418a 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -729,11 +729,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
 				 errmsg("remaining connection slots are reserved for non-replication superuser connections")));
 
-	/*
-	 * If walsender, we don't want to connect to any particular database. Just
-	 * finish the backend startup by processing any options from the startup
-	 * packet, and we're done.
-	 */
+	/* Check replication permissions needed for walsender processes. */
 	if (am_walsender)
 	{
 		Assert(!bootstrap);
@@ -742,7 +738,16 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 			ereport(FATAL,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 					 errmsg("must be superuser or replication role to start walsender")));
+	}
 
+	/*
+	 * If this is a plain walsender only supporting physical replication, we
+	 * don't want to connect to any particular database. Just finish the
+	 * backend startup by processing any options from the startup packet, and
+	 * we're done.
+	 */
+	if (am_walsender && !am_db_walsender)
+	{
 		/* process any options passed in the startup packet */
 		if (MyProcPort != NULL)
 			process_startup_options(MyProcPort, am_superuser);
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 919805f5cfa383b50ff4f8af7aa53a17ae44afed..1a468fa1b74137968fe4d84e159e0d2d9d3644b2 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1639,10 +1639,10 @@ BaseBackup(void)
 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
 		disconnect_and_exit(1);
 	}
-	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
 	{
 		fprintf(stderr,
-				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
+				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
 				progname, PQntuples(res), PQnfields(res), 1, 3);
 		disconnect_and_exit(1);
 	}
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 0f191ce6bb389d4f930979c97392c64cabeeee20..2478789ca21f7941b0a5a68c83ddecc703247860 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -275,10 +275,10 @@ StreamLog(void)
 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
 		disconnect_and_exit(1);
 	}
-	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
 	{
 		fprintf(stderr,
-				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
+				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
 				progname, PQntuples(res), PQnfields(res), 1, 3);
 		disconnect_and_exit(1);
 	}
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index ef73b4b166c5ac37614f90951fd175a8246423fb..febe3d1a2b7063917332d9008b9d27f442e55e52 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -563,10 +563,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			PQclear(res);
 			return false;
 		}
-		if (PQnfields(res) != 3 || PQntuples(res) != 1)
+		if (PQntuples(res) != 1 || PQnfields(res) < 3)
 		{
 			fprintf(stderr,
-					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
+					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
 					progname, PQntuples(res), PQnfields(res), 1, 3);
 			PQclear(res);
 			return false;
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index b67cf63d7665e855a923af9a775423e48a49d6d2..cff2be6d8f6eec0b857d025c5c7c7332c4d72d33 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -19,6 +19,7 @@
 /* global state */
 extern bool am_walsender;
 extern bool am_cascading_walsender;
+extern bool am_db_walsender;
 extern bool wake_wal_senders;
 
 /* user-settable parameters */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f9604541c7aa19bbbc5b9131e23d49c0f519c0a2..62a892be3ba74f8ed112eea9f6b0f433d3ea032e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1909,6 +1909,7 @@ WalRcvData
 WalRcvState
 WalSnd
 WalSndCtlData
+WalSndSendDataCallback
 WalSndState
 WholeRowVarExprState
 WindowAgg