diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c index e92cd73031f7645cfe0015e83fd27c19a5486fb2..0862b81dd7b4764f1f1af57a665fd1006241e2bf 100644 --- a/src/backend/postmaster/syslogger.c +++ b/src/backend/postmaster/syslogger.c @@ -18,7 +18,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.31 2007/06/04 22:21:42 adunstan Exp $ + * $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.32 2007/06/14 01:48:51 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -31,6 +31,7 @@ #include <sys/stat.h> #include <sys/time.h> +#include "lib/stringinfo.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgtime.h" @@ -54,6 +55,13 @@ #define LBF_MODE _IOLBF #endif +/* + * We read() into a temp buffer twice as big as a chunk, so that any fragment + * left after processing can be moved down to the front and we'll still have + * room to read a full chunk. + */ +#define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE) + /* * GUC parameters. Redirect_stderr cannot be changed after postmaster @@ -75,15 +83,28 @@ bool am_syslogger = false; * Private state */ static pg_time_t next_rotation_time; - static bool redirection_done = false; - static bool pipe_eof_seen = false; - static FILE *syslogFile = NULL; - static char *last_file_name = NULL; +/* + * Buffers for saving partial messages from different backends. We don't expect + * that there will be very many outstanding at one time, so 20 seems plenty of + * leeway. If this array gets full we won't lose messages, but we will lose + * the protocol protection against them being partially written or interleaved. + * + * An inactive buffer has pid == 0 and undefined contents of data. + */ +typedef struct +{ + int32 pid; /* PID of source process */ + StringInfoData data; /* accumulated data, as a StringInfo */ +} save_buffer; + +#define CHUNK_SLOTS 20 +static save_buffer saved_chunks[CHUNK_SLOTS]; + /* These must be exported for EXEC_BACKEND case ... annoying */ #ifndef WIN32 int syslogPipe[2] = {-1, -1}; @@ -108,6 +129,8 @@ static volatile sig_atomic_t rotation_requested = false; static pid_t syslogger_forkexec(void); static void syslogger_parseArgs(int argc, char *argv[]); #endif +static void process_pipe_input(char *logbuffer, int *bytes_in_logbuffer); +static void flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer); #ifdef WIN32 static unsigned int __stdcall pipeThread(void *arg); @@ -126,6 +149,10 @@ static void sigUsr1Handler(SIGNAL_ARGS); NON_EXEC_STATIC void SysLoggerMain(int argc, char *argv[]) { +#ifndef WIN32 + char logbuffer[READ_BUF_SIZE]; + int bytes_in_logbuffer = 0; +#endif char *currentLogDir; char *currentLogFilename; int currentLogRotationAge; @@ -244,7 +271,6 @@ SysLoggerMain(int argc, char *argv[]) bool time_based_rotation = false; #ifndef WIN32 - char logbuffer[1024]; int bytesRead; int rc; fd_set rfds; @@ -326,8 +352,8 @@ SysLoggerMain(int argc, char *argv[]) else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds)) { bytesRead = piperead(syslogPipe[0], - logbuffer, sizeof(logbuffer)); - + logbuffer + bytes_in_logbuffer, + sizeof(logbuffer) - bytes_in_logbuffer); if (bytesRead < 0) { if (errno != EINTR) @@ -337,7 +363,8 @@ SysLoggerMain(int argc, char *argv[]) } else if (bytesRead > 0) { - write_syslogger_file(logbuffer, bytesRead); + bytes_in_logbuffer += bytesRead; + process_pipe_input(logbuffer, &bytes_in_logbuffer); continue; } else @@ -349,6 +376,9 @@ SysLoggerMain(int argc, char *argv[]) * and all backends are shut down, and we are done. */ pipe_eof_seen = true; + + /* if there's any data left then force it out now */ + flush_pipe_input(logbuffer, &bytes_in_logbuffer); } } #else /* WIN32 */ @@ -611,6 +641,207 @@ syslogger_parseArgs(int argc, char *argv[]) #endif /* EXEC_BACKEND */ +/* -------------------------------- + * pipe protocol handling + * -------------------------------- + */ + +/* + * Process data received through the syslogger pipe. + * + * This routine interprets the log pipe protocol which sends log messages as + * (hopefully atomic) chunks - such chunks are detected and reassembled here. + * + * The protocol has a header that starts with two nul bytes, then has a 16 bit + * length, the pid of the sending process, and a flag to indicate if it is + * the last chunk in a message. Incomplete chunks are saved until we read some + * more, and non-final chunks are accumulated until we get the final chunk. + * + * All of this is to avoid 2 problems: + * . partial messages being written to logfiles (messes rotation), and + * . messages from different backends being interleaved (messages garbled). + * + * Any non-protocol messages are written out directly. These should only come + * from non-PostgreSQL sources, however (e.g. third party libraries writing to + * stderr). + * + * logbuffer is the data input buffer, and *bytes_in_logbuffer is the number + * of bytes present. On exit, any not-yet-eaten data is left-justified in + * logbuffer, and *bytes_in_logbuffer is updated. + */ +static void +process_pipe_input(char *logbuffer, int *bytes_in_logbuffer) +{ + char *cursor = logbuffer; + int count = *bytes_in_logbuffer; + + /* While we have enough for a header, process data... */ + while (count >= (int) sizeof(PipeProtoHeader)) + { + PipeProtoHeader p; + int chunklen; + + /* Do we have a valid header? */ + memcpy(&p, cursor, sizeof(PipeProtoHeader)); + if (p.nuls[0] == '\0' && p.nuls[1] == '\0' && + p.len > 0 && p.len <= PIPE_MAX_PAYLOAD && + p.pid != 0 && + (p.is_last == 't' || p.is_last == 'f')) + { + chunklen = PIPE_HEADER_SIZE + p.len; + + /* Fall out of loop if we don't have the whole chunk yet */ + if (count < chunklen) + break; + + if (p.is_last == 'f') + { + /* + * Save a complete non-final chunk in the per-pid buffer + * if possible - if not just write it out. + */ + int free_slot = -1, existing_slot = -1; + int i; + StringInfo str; + + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid == p.pid) + { + existing_slot = i; + break; + } + if (free_slot < 0 && saved_chunks[i].pid == 0) + free_slot = i; + } + if (existing_slot >= 0) + { + str = &(saved_chunks[existing_slot].data); + appendBinaryStringInfo(str, + cursor + PIPE_HEADER_SIZE, + p.len); + } + else if (free_slot >= 0) + { + saved_chunks[free_slot].pid = p.pid; + str = &(saved_chunks[free_slot].data); + initStringInfo(str); + appendBinaryStringInfo(str, + cursor + PIPE_HEADER_SIZE, + p.len); + } + else + { + /* + * If there is no free slot we'll just have to take our + * chances and write out a partial message and hope that + * it's not followed by something from another pid. + */ + write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len); + } + } + else + { + /* + * Final chunk --- add it to anything saved for that pid, and + * either way write the whole thing out. + */ + int existing_slot = -1; + int i; + StringInfo str; + + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid == p.pid) + { + existing_slot = i; + break; + } + } + if (existing_slot >= 0) + { + str = &(saved_chunks[existing_slot].data); + appendBinaryStringInfo(str, + cursor + PIPE_HEADER_SIZE, + p.len); + write_syslogger_file(str->data, str->len); + saved_chunks[existing_slot].pid = 0; + pfree(str->data); + } + else + { + /* The whole message was one chunk, evidently. */ + write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len); + } + } + + /* Finished processing this chunk */ + cursor += chunklen; + count -= chunklen; + } + else + { + /* Process non-protocol data */ + + /* + * Look for the start of a protocol header. If found, dump data + * up to there and repeat the loop. Otherwise, dump it all and + * fall out of the loop. (Note: we want to dump it all if + * at all possible, so as to avoid dividing non-protocol messages + * across logfiles. We expect that in many scenarios, a + * non-protocol message will arrive all in one read(), and we + * want to respect the read() boundary if possible.) + */ + for (chunklen = 1; chunklen < count; chunklen++) + { + if (cursor[chunklen] == '\0') + break; + } + write_syslogger_file(cursor, chunklen); + cursor += chunklen; + count -= chunklen; + } + } + + /* We don't have a full chunk, so left-align what remains in the buffer */ + if (count > 0 && cursor != logbuffer) + memmove(logbuffer, cursor, count); + *bytes_in_logbuffer = count; +} + +/* + * Force out any buffered data + * + * This is currently used only at syslogger shutdown, but could perhaps be + * useful at other times, so it is careful to leave things in a clean state. + */ +static void +flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer) +{ + int i; + StringInfo str; + + /* Dump any incomplete protocol messages */ + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid != 0) + { + str = &(saved_chunks[i].data); + write_syslogger_file(str->data, str->len); + saved_chunks[i].pid = 0; + pfree(str->data); + } + } + /* + * Force out any remaining pipe data as-is; we don't bother trying to + * remove any protocol headers that may exist in it. + */ + if (*bytes_in_logbuffer > 0) + write_syslogger_file(logbuffer, *bytes_in_logbuffer); + *bytes_in_logbuffer = 0; +} + + /* -------------------------------- * logfile routines * -------------------------------- @@ -653,12 +884,16 @@ write_syslogger_file(const char *buffer, int count) static unsigned int __stdcall pipeThread(void *arg) { - DWORD bytesRead; - char logbuffer[1024]; + char logbuffer[READ_BUF_SIZE]; + int bytes_in_logbuffer = 0; for (;;) { - if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer), + DWORD bytesRead; + + if (!ReadFile(syslogPipe[0], + logbuffer + bytes_in_logbuffer, + sizeof(logbuffer) - bytes_in_logbuffer, &bytesRead, 0)) { DWORD error = GetLastError(); @@ -672,11 +907,18 @@ pipeThread(void *arg) errmsg("could not read from logger pipe: %m"))); } else if (bytesRead > 0) - write_syslogger_file(logbuffer, bytesRead); + { + bytes_in_logbuffer += bytesRead; + process_pipe_input(logbuffer, &bytes_in_logbuffer); + } } /* We exit the above loop only upon detecting pipe EOF */ pipe_eof_seen = true; + + /* if there's any data left then force it out now */ + flush_pipe_input(logbuffer, &bytes_in_logbuffer); + _endthread(); return 0; } diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index c6952ef20e85e45e6fc266b436ffba025d4673e2..c762475d65a4a41c2008f6300b7ed3f02b890b2b 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -42,7 +42,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/error/elog.c,v 1.186 2007/06/07 21:45:59 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/utils/error/elog.c,v 1.187 2007/06/14 01:48:51 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -124,6 +124,7 @@ static const char *useful_strerror(int errnum); static const char *error_severity(int elevel); static void append_with_tabs(StringInfo buf, const char *str); static bool is_log_level_output(int elevel, int log_min_level); +static void write_pipe_chunks(int fd, char *data, int len); /* @@ -1783,7 +1784,10 @@ send_message_to_server_log(ErrorData *edata) write_eventlog(edata->elevel, buf.data); else #endif - fprintf(stderr, "%s", buf.data); + if (Redirect_stderr) + write_pipe_chunks(fileno(stderr), buf.data, buf.len); + else + write(fileno(stderr), buf.data, buf.len); } /* If in the syslogger process, try to write messages direct to file */ @@ -1793,6 +1797,37 @@ send_message_to_server_log(ErrorData *edata) pfree(buf.data); } +/* + * Send data to the syslogger using the chunked protocol + */ +static void +write_pipe_chunks(int fd, char *data, int len) +{ + PipeProtoChunk p; + + Assert(len > 0); + + p.proto.nuls[0] = p.proto.nuls[1] = '\0'; + p.proto.pid = MyProcPid; + + /* write all but the last chunk */ + while (len > PIPE_MAX_PAYLOAD) + { + p.proto.is_last = 'f'; + p.proto.len = PIPE_MAX_PAYLOAD; + memcpy(p.proto.data, data, PIPE_MAX_PAYLOAD); + write(fd, &p, PIPE_HEADER_SIZE + PIPE_MAX_PAYLOAD); + data += PIPE_MAX_PAYLOAD; + len -= PIPE_MAX_PAYLOAD; + } + + /* write the last chunk */ + p.proto.is_last = 't'; + p.proto.len = len; + memcpy(p.proto.data, data, len); + write(fd, &p, PIPE_HEADER_SIZE + len); +} + /* * Write error report to client @@ -2115,6 +2150,7 @@ write_stderr(const char *fmt,...) #ifndef WIN32 /* On Unix, we just fprintf to stderr */ vfprintf(stderr, fmt, ap); + fflush(stderr); #else /* @@ -2130,8 +2166,11 @@ write_stderr(const char *fmt,...) write_eventlog(ERROR, errbuf); } else + { /* Not running as service, write to stderr */ vfprintf(stderr, fmt, ap); + fflush(stderr); + } #endif va_end(ap); } diff --git a/src/include/postmaster/syslogger.h b/src/include/postmaster/syslogger.h index 3e8b59dbacf2e28dda1573c3fe2a1f34c626389a..72c14c99cdde314665d548d492ce7c2ee069f355 100644 --- a/src/include/postmaster/syslogger.h +++ b/src/include/postmaster/syslogger.h @@ -5,13 +5,61 @@ * * Copyright (c) 2004-2007, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/include/postmaster/syslogger.h,v 1.8 2007/01/05 22:19:57 momjian Exp $ + * $PostgreSQL: pgsql/src/include/postmaster/syslogger.h,v 1.9 2007/06/14 01:48:51 adunstan Exp $ * *------------------------------------------------------------------------- */ #ifndef _SYSLOGGER_H #define _SYSLOGGER_H +#include <limits.h> /* for PIPE_BUF */ + + +/* + * Primitive protocol structure for writing to syslogger pipe(s). The idea + * here is to divide long messages into chunks that are not more than + * PIPE_BUF bytes long, which according to POSIX spec must be written into + * the pipe atomically. The pipe reader then uses the protocol headers to + * reassemble the parts of a message into a single string. The reader can + * also cope with non-protocol data coming down the pipe, though we cannot + * guarantee long strings won't get split apart. + * + * We use 't' or 'f' instead of a bool for is_last to make the protocol a tiny + * bit more robust against finding a false double nul byte prologue. But we + * still might find it in the len and/or pid bytes unless we're careful. + */ + +#ifdef PIPE_BUF +/* Are there any systems with PIPE_BUF > 64K? Unlikely, but ... */ +#if PIPE_BUF > 65536 +#define PIPE_CHUNK_SIZE 65536 +#else +#define PIPE_CHUNK_SIZE ((int) PIPE_BUF) +#endif +#else /* not defined */ +/* POSIX says the value of PIPE_BUF must be at least 512, so use that */ +#define PIPE_CHUNK_SIZE 512 +#endif + +typedef struct +{ + char nuls[2]; /* always \0\0 */ + uint16 len; /* size of this chunk (counts data only) */ + int32 pid; /* writer's pid */ + char is_last; /* last chunk of message? 't' or 'f' */ + char data[1]; /* data payload starts here */ +} PipeProtoHeader; + +typedef union +{ + PipeProtoHeader proto; + char filler[PIPE_CHUNK_SIZE]; +} PipeProtoChunk; + +#define PIPE_HEADER_SIZE offsetof(PipeProtoHeader, data) +#define PIPE_MAX_PAYLOAD ((int) (PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE)) + + /* GUC options */ extern bool Redirect_stderr; extern int Log_RotationAge;