Skip to content
Snippets Groups Projects
Commit ea551608 authored by Heikki Linnakangas's avatar Heikki Linnakangas
Browse files

In walsender, don't sleep if there's outstanding WAL waiting to be sent,

otherwise we effectively rate-limit the streaming as pointed out by
Simon Riggs. Also, send the WAL in smaller chunks, to respond to signals
more promptly.
parent 4ed4b6c5
No related branches found
No related tags found
No related merge requests found
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.20 2010/05/09 18:11:55 tgl Exp $ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.21 2010/05/26 22:21:33 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -100,13 +100,19 @@ static void InitWalSnd(void); ...@@ -100,13 +100,19 @@ static void InitWalSnd(void);
static void WalSndHandshake(void); static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg); static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(StringInfo outMsg); static bool XLogSend(StringInfo outMsg, bool *caughtup);
static void CheckClosedConnection(void); static void CheckClosedConnection(void);
/* /*
* How much WAL to send in one message? Must be >= XLOG_BLCKSZ. * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
*
* We don't have a good idea of what a good value would be; there's some
* overhead per message in both walsender and walreceiver, but on the other
* hand sending large batches makes walsender less responsive to signals
* because signals are checked only between messages. 128kB seems like
* a reasonable guess for now.
*/ */
#define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2) #define MAX_SEND_SIZE (128 * 1024)
/* Main entry point for walsender process */ /* Main entry point for walsender process */
int int
...@@ -360,6 +366,7 @@ static int ...@@ -360,6 +366,7 @@ static int
WalSndLoop(void) WalSndLoop(void)
{ {
StringInfoData output_message; StringInfoData output_message;
bool caughtup = false;
initStringInfo(&output_message); initStringInfo(&output_message);
...@@ -387,7 +394,7 @@ WalSndLoop(void) ...@@ -387,7 +394,7 @@ WalSndLoop(void)
*/ */
if (ready_to_stop) if (ready_to_stop)
{ {
XLogSend(&output_message); XLogSend(&output_message, &caughtup);
shutdown_requested = true; shutdown_requested = true;
} }
...@@ -402,31 +409,32 @@ WalSndLoop(void) ...@@ -402,31 +409,32 @@ WalSndLoop(void)
} }
/* /*
* Nap for the configured time or until a message arrives. * If we had sent all accumulated WAL in last round, nap for the
* configured time before retrying.
* *
* On some platforms, signals won't interrupt the sleep. To ensure we * On some platforms, signals won't interrupt the sleep. To ensure we
* respond reasonably promptly when someone signals us, break down the * respond reasonably promptly when someone signals us, break down the
* sleep into NAPTIME_PER_CYCLE increments, and check for * sleep into NAPTIME_PER_CYCLE increments, and check for
* interrupts after each nap. * interrupts after each nap.
*/ */
if (caughtup)
{
remain = WalSndDelay * 1000L; remain = WalSndDelay * 1000L;
while (remain > 0) while (remain > 0)
{ {
/* Check for interrupts */
if (got_SIGHUP || shutdown_requested || ready_to_stop) if (got_SIGHUP || shutdown_requested || ready_to_stop)
break; break;
/* /* Sleep and check that the connection is still alive */
* Check to see whether a message from the standby or an interrupt
* from other processes has arrived.
*/
pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain); pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
CheckClosedConnection(); CheckClosedConnection();
remain -= NAPTIME_PER_CYCLE; remain -= NAPTIME_PER_CYCLE;
} }
}
/* Attempt to send the log once every loop */ /* Attempt to send the log once every loop */
if (!XLogSend(&output_message)) if (!XLogSend(&output_message, &caughtup))
goto eof; goto eof;
} }
...@@ -623,15 +631,20 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) ...@@ -623,15 +631,20 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
} }
/* /*
* Read all WAL that's been written (and flushed) since last cycle, and send * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
* it to client. * but not yet sent to the client, and send it. If there is no unsent WAL,
* *caughtup is set to true and nothing is sent, otherwise *caughtup is set
* to false.
* *
* Returns true if OK, false if trouble. * Returns true if OK, false if trouble.
*/ */
static bool static bool
XLogSend(StringInfo outMsg) XLogSend(StringInfo outMsg, bool *caughtup)
{ {
XLogRecPtr SendRqstPtr; XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
char activitymsg[50]; char activitymsg[50];
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
...@@ -642,18 +655,17 @@ XLogSend(StringInfo outMsg) ...@@ -642,18 +655,17 @@ XLogSend(StringInfo outMsg)
/* Quick exit if nothing to do */ /* Quick exit if nothing to do */
if (!XLByteLT(sentPtr, SendRqstPtr)) if (!XLByteLT(sentPtr, SendRqstPtr))
{
*caughtup = true;
return true; return true;
}
/* /*
* We gather multiple records together by issuing just one XLogRead() of a * Otherwise let the caller know that we're not fully caught up. Unless
* suitable size, and send them as one CopyData message. Repeat until * there's a huge backlog, we'll be caught up to the current WriteRecPtr
* we've sent everything we can. * after we've sent everything below, but more WAL could accumulate while
* we're busy sending.
*/ */
while (XLByteLT(sentPtr, SendRqstPtr)) *caughtup = false;
{
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
/* /*
* Figure out how much to send in one message. If there's less than * Figure out how much to send in one message. If there's less than
...@@ -719,7 +731,6 @@ XLogSend(StringInfo outMsg) ...@@ -719,7 +731,6 @@ XLogSend(StringInfo outMsg)
pq_putmessage('d', outMsg->data, outMsg->len); pq_putmessage('d', outMsg->data, outMsg->len);
resetStringInfo(outMsg); resetStringInfo(outMsg);
}
/* Update shared memory status */ /* Update shared memory status */
SpinLockAcquire(&walsnd->mutex); SpinLockAcquire(&walsnd->mutex);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment