Skip to content
Snippets Groups Projects
Commit e6725d15 authored by Tom Lane's avatar Tom Lane
Browse files

Add explicit buffering in backend libpq, to compensate for

buffering lost by not going through stdio anymore for client I/O.
parent 13c7c183
No related branches found
No related tags found
No related merge requests found
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.67 1999/01/17 06:18:15 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.68 1999/01/23 22:27:26 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -303,9 +303,7 @@ DoCopy(char *relname, bool binary, bool oids, bool from, bool pipe, ...@@ -303,9 +303,7 @@ DoCopy(char *relname, bool binary, bool oids, bool from, bool pipe,
} }
else if (!from && !binary) else if (!from && !binary)
{ {
CopySendData("\\.\n",3,fp); CopySendData("\\.\n",3,fp);
if (IsUnderPostmaster)
pq_flush();
} }
} }
} }
......
...@@ -5,39 +5,40 @@ ...@@ -5,39 +5,40 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: pqcomm.c,v 1.63 1999/01/17 06:18:26 momjian Exp $ * $Id: pqcomm.c,v 1.64 1999/01/23 22:27:28 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
/* /*
* INTERFACE ROUTINES * INTERFACE ROUTINES
* pq_init - initialize libpq * pq_init - initialize libpq
* pq_getport - return the PGPORT setting * pq_getport - return the PGPORT setting
* pq_close - close input / output connections * pq_close - close input / output connections
* pq_flush - flush pending output * pq_flush - flush pending output
* pq_recvbuf - load some bytes into the input buffer
* pq_getstr - get a null terminated string from connection * pq_getstr - get a null terminated string from connection
* pq_getchar - get 1 character from connection * pq_getchar - get 1 character from connection
* pq_peekchar - peek at first character in connection * pq_peekchar - peek at next character from connection
* pq_getnchar - get n characters from connection, and null-terminate * pq_getnchar - get n characters from connection, and null-terminate
* pq_getint - get an integer from connection * pq_getint - get an integer from connection
* pq_putchar - send 1 character to connection * pq_putchar - send 1 character to connection
* pq_putstr - send a null terminated string to connection * pq_putstr - send a null terminated string to connection
* pq_putnchar - send n characters to connection * pq_putnchar - send n characters to connection
* pq_putint - send an integer to connection * pq_putint - send an integer to connection
* pq_putncharlen - send n characters to connection * pq_putncharlen - send n characters to connection
* (also send an int header indicating * (also send an int header indicating
* the length) * the length)
* pq_getinaddr - initialize address from host and port number * pq_getinaddr - initialize address from host and port number
* pq_getinserv - initialize address from host and service name * pq_getinserv - initialize address from host and service name
* *
* StreamDoUnlink - Shutdown UNIX socket connectioin * StreamDoUnlink - Shutdown UNIX socket connection
* StreamServerPort - Open sock stream * StreamServerPort - Open socket stream
* StreamConnection - Create new connection with client * StreamConnection - Create new connection with client
* StreamClose - Close a client/backend connection * StreamClose - Close a client/backend connection
* *
* NOTES * NOTES
* Frontend is now completey in interfaces/libpq, and no * Frontend is now completely in interfaces/libpq, and no
* functions from this file is used. * functions from this file are used there.
* *
*/ */
#include "postgres.h" #include "postgres.h"
...@@ -79,6 +80,14 @@ ...@@ -79,6 +80,14 @@
extern FILE * debug_port; /* in util.c */ extern FILE * debug_port; /* in util.c */
/*
* Buffers
*/
char PqSendBuffer[PQ_BUFFER_SIZE];
char PqRecvBuffer[PQ_BUFFER_SIZE];
int PqSendPointer,PqRecvPointer,PqRecvLength;
/* -------------------------------- /* --------------------------------
* pq_init - open portal file descriptors * pq_init - open portal file descriptors
* -------------------------------- * --------------------------------
...@@ -86,6 +95,7 @@ extern FILE * debug_port; /* in util.c */ ...@@ -86,6 +95,7 @@ extern FILE * debug_port; /* in util.c */
void void
pq_init(int fd) pq_init(int fd)
{ {
PqSendPointer = PqRecvPointer = PqRecvLength = 0;
PQnotifies_init(); PQnotifies_init();
if (getenv("LIBPQ_DEBUG")) if (getenv("LIBPQ_DEBUG"))
debug_port = stderr; debug_port = stderr;
...@@ -94,40 +104,40 @@ pq_init(int fd) ...@@ -94,40 +104,40 @@ pq_init(int fd)
/* ------------------------- /* -------------------------
* pq_getchar() * pq_getchar()
* *
* get a character from the input file, * get a character from the input file, or EOF if trouble
* * --------------------------------
*/ */
int int
pq_getchar(void) pq_getchar(void)
{ {
char c; while (PqRecvPointer >= PqRecvLength)
{
while (recv(MyProcPort->sock, &c, 1, 0) != 1) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */
if (errno != EINTR) return EOF; /* Failed to recv data */
return EOF; /* Not interrupted, so something went wrong */
} }
return PqRecvBuffer[PqRecvPointer++];
return c;
} }
/* /* -------------------------
* pq_peekchar()
*
* get a character from the connection, but leave it in the buffer
* to be read again
* -------------------------------- * --------------------------------
* pq_peekchar - get 1 character from connection, but leave it in the stream
*/ */
int
pq_peekchar(void) {
char c;
while (recv(MyProcPort->sock, &c, 1, MSG_PEEK) != 1) { int
if (errno != EINTR) pq_peekchar(void)
return EOF; /* Not interrupted, so something went wrong */ {
while (PqRecvPointer >= PqRecvLength)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
} }
/* Note we don't bump the pointer... */
return c; return PqRecvBuffer[PqRecvPointer];
} }
/* -------------------------------- /* --------------------------------
* pq_getport - return the PGPORT setting * pq_getport - return the PGPORT setting
...@@ -150,18 +160,91 @@ pq_getport() ...@@ -150,18 +160,91 @@ pq_getport()
void void
pq_close() pq_close()
{ {
close(MyProcPort->sock); close(MyProcPort->sock);
PQnotifies_init(); PQnotifies_init();
} }
/* -------------------------------- /* --------------------------------
* pq_flush - flush pending output * pq_flush - flush pending output
*
* returns 0 if OK, EOF if trouble
* -------------------------------- * --------------------------------
*/ */
void int
pq_flush() pq_flush()
{ {
/* Not supported/required? */ char *bufptr = PqSendBuffer;
char *bufend = PqSendBuffer + PqSendPointer;
while (bufptr < bufend)
{
int r = send(MyProcPort->sock, bufptr, bufend - bufptr, 0);
if (r <= 0)
{
if (errno == EINTR)
continue; /* Ok if we were interrupted */
/* We would like to use elog() here, but cannot because elog
* tries to write to the client, which would cause a recursive
* flush attempt! So just write it out to the postmaster log.
*/
fprintf(stderr, "pq_flush: send() failed, errno %d\n", errno);
/* We drop the buffered data anyway so that processing
* can continue, even though we'll probably quit soon.
*/
PqSendPointer = 0;
return EOF;
}
bufptr += r;
}
PqSendPointer = 0;
return 0;
}
/* --------------------------------
* pq_recvbuf - load some bytes into the input buffer
*
* returns 0 if OK, EOF if trouble
* --------------------------------
*/
int
pq_recvbuf()
{
if (PqRecvPointer > 0)
{
if (PqRecvLength > PqRecvPointer)
{
/* still some unread data, left-justify it in the buffer */
memmove(PqRecvBuffer, PqRecvBuffer+PqRecvPointer,
PqRecvLength-PqRecvPointer);
PqRecvLength -= PqRecvPointer;
PqRecvPointer = 0;
}
else
PqRecvLength = PqRecvPointer = 0;
}
/* Can fill buffer from PqRecvLength and upwards */
for (;;)
{
int r = recv(MyProcPort->sock, PqRecvBuffer + PqRecvLength,
PQ_BUFFER_SIZE - PqRecvLength, 0);
if (r <= 0)
{
if (errno == EINTR)
continue; /* Ok if interrupted */
/* We would like to use elog() here, but dare not because elog
* tries to write to the client, which will cause problems
* if we have a hard communications failure ...
* So just write the message to the postmaster log.
*/
fprintf(stderr, "pq_recvbuf: recv() failed, errno %d\n", errno);
return EOF;
}
/* r contains number of bytes read, so just incr length */
PqRecvLength += r;
return 0;
}
} }
/* -------------------------------- /* --------------------------------
...@@ -194,7 +277,7 @@ pq_getstr(char *s, int maxlen) ...@@ -194,7 +277,7 @@ pq_getstr(char *s, int maxlen)
int int
pq_getnchar(char *s, int off, int maxlen) pq_getnchar(char *s, int off, int maxlen)
{ {
int r = pqGetNBytes(s + off, maxlen); int r = pqGetNBytes(s + off, maxlen);
s[off+maxlen] = '\0'; s[off+maxlen] = '\0';
return r; return r;
} }
...@@ -602,7 +685,7 @@ StreamConnection(int server_fd, Port *port) ...@@ -602,7 +685,7 @@ StreamConnection(int server_fd, Port *port)
if (setsockopt(port->sock, pe->p_proto, TCP_NODELAY, if (setsockopt(port->sock, pe->p_proto, TCP_NODELAY,
&on, sizeof(on)) < 0) &on, sizeof(on)) < 0)
{ {
elog(ERROR, "postmaster: setsockopt failed"); elog(ERROR, "postmaster: setsockopt failed: %m");
return STATUS_ERROR; return STATUS_ERROR;
} }
} }
...@@ -644,18 +727,9 @@ pq_putncharlen(char *s, int n) ...@@ -644,18 +727,9 @@ pq_putncharlen(char *s, int n)
*/ */
int pq_putchar(char c) int pq_putchar(char c)
{ {
char isDone = 0; if (PqSendPointer >= PQ_BUFFER_SIZE)
if (pq_flush()) /* If buffer is full, then flush it out */
do { return EOF;
if (send(MyProcPort->sock, &c, 1, 0) != 1) { PqSendBuffer[PqSendPointer++] = c; /* Put in buffer */
if (errno != EINTR) return c;
return EOF; /* Anything other than interrupt is error! */
}
else
isDone = 1; /* Done if we sent one char */
} while (!isDone);
return c;
} }
...@@ -98,7 +98,7 @@ pqPutLong(int integer) ...@@ -98,7 +98,7 @@ pqPutLong(int integer)
n = ((PG_PROTOCOL_MAJOR(FrontendProtocol) == 0) ? hton_l(integer) : htonl((uint32) integer)); n = ((PG_PROTOCOL_MAJOR(FrontendProtocol) == 0) ? hton_l(integer) : htonl((uint32) integer));
#endif #endif
return pqPutNBytes((char *)&n,4); return pqPutNBytes((char *)&n, 4);
} }
/* --------------------------------------------------------------------- */ /* --------------------------------------------------------------------- */
...@@ -107,7 +107,7 @@ pqGetShort(int *result) ...@@ -107,7 +107,7 @@ pqGetShort(int *result)
{ {
uint16 n; uint16 n;
if (pqGetNBytes((char *)&n,2) != 0) if (pqGetNBytes((char *)&n, 2) != 0)
return EOF; return EOF;
#ifdef FRONTEND #ifdef FRONTEND
...@@ -138,28 +138,29 @@ pqGetLong(int *result) ...@@ -138,28 +138,29 @@ pqGetLong(int *result)
} }
/* --------------------------------------------------------------------- */ /* --------------------------------------------------------------------- */
/* pqGetNBytes: Read a chunk of exactly len bytes in buffer s (which must be 1 /* pqGetNBytes: Read a chunk of exactly len bytes into buffer s.
byte longer) and terminate it with a '\0'. * Return 0 if ok, EOF if trouble.
Return 0 if ok. */
*/
int int
pqGetNBytes(char *s, size_t len) pqGetNBytes(char *s, size_t len)
{ {
int bytesDone = 0; size_t amount;
do { while (len > 0)
int r = recv(MyProcPort->sock, s+bytesDone, len-bytesDone, 0); {
if (r == 0 || r == -1) { while (PqRecvPointer >= PqRecvLength)
if (errno != EINTR) {
return EOF; /* All other than signal-interrupted is error */ if (pq_recvbuf()) /* If nothing in buffer, then recv some */
continue; /* Otherwise, try again */ return EOF; /* Failed to recv data */
} }
amount = PqRecvLength - PqRecvPointer;
/* r contains number of bytes received */ if (amount > len)
bytesDone += r; amount = len;
memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
} while (bytesDone < len); PqRecvPointer += amount;
/* Zero-termination now in pq_getnchar() instead */ s += amount;
len -= amount;
}
return 0; return 0;
} }
...@@ -167,20 +168,21 @@ pqGetNBytes(char *s, size_t len) ...@@ -167,20 +168,21 @@ pqGetNBytes(char *s, size_t len)
int int
pqPutNBytes(const char *s, size_t len) pqPutNBytes(const char *s, size_t len)
{ {
int bytesDone = 0; size_t amount;
do { while (len > 0)
int r = send(MyProcPort->sock, s+bytesDone, len-bytesDone, 0); {
if (r == 0 || r == -1) { if (PqSendPointer >= PQ_BUFFER_SIZE)
if (errno != EINTR) if (pq_flush()) /* If buffer is full, then flush it out */
return EOF; /* Only signal interruption allowed */ return EOF;
continue; /* If interruped and read nothing, just try again */ amount = PQ_BUFFER_SIZE - PqSendPointer;
} if (amount > len)
amount = len;
/* r contains number of bytes sent so far */ memcpy(PqSendBuffer + PqSendPointer, s, amount);
bytesDone += r; PqSendPointer += amount;
} while (bytesDone < len); s += amount;
len -= amount;
}
return 0; return 0;
} }
...@@ -191,8 +193,8 @@ pqGetString(char *s, size_t len) ...@@ -191,8 +193,8 @@ pqGetString(char *s, size_t len)
int c; int c;
/* /*
* Keep on reading until we get the terminating '\0' and discard those * Keep on reading until we get the terminating '\0',
* bytes we don't have room for. * discarding any bytes we don't have room for.
*/ */
while ((c = pq_getchar()) != EOF && c != '\0') while ((c = pq_getchar()) != EOF && c != '\0')
...@@ -216,4 +218,3 @@ pqPutString(const char *s) ...@@ -216,4 +218,3 @@ pqPutString(const char *s)
{ {
return pqPutNBytes(s,strlen(s)+1); return pqPutNBytes(s,strlen(s)+1);
} }
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/error/elog.c,v 1.37 1999/01/11 03:56:07 scrappy Exp $ * $Header: /cvsroot/pgsql/src/backend/utils/error/elog.c,v 1.38 1999/01/23 22:27:29 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -192,8 +192,15 @@ elog(int lev, const char *fmt,...) ...@@ -192,8 +192,15 @@ elog(int lev, const char *fmt,...)
pq_putnchar("N", 1); pq_putnchar("N", 1);
else else
pq_putnchar("E", 1); pq_putnchar("E", 1);
/* pq_putint(-101, 4); *//* should be query id */
pq_putstr(line + TIMESTAMP_SIZE); /* don't show timestamps */ pq_putstr(line + TIMESTAMP_SIZE); /* don't show timestamps */
/*
* This flush is normally not necessary, since postgres.c will
* flush out waiting data when control returns to the main loop.
* But it seems best to leave it here, so that the client has some
* clue what happened if the backend dies before getting back to the
* main loop ... error/notice messages should not be a performance-
* critical path anyway, so an extra flush won't hurt much ...
*/
pq_flush(); pq_flush();
} }
if (!IsUnderPostmaster) if (!IsUnderPostmaster)
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: libpq.h,v 1.23 1999/01/12 12:49:52 scrappy Exp $ * $Id: libpq.h,v 1.24 1999/01/23 22:27:25 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -254,12 +254,13 @@ extern void pq_init(int fd); ...@@ -254,12 +254,13 @@ extern void pq_init(int fd);
extern void pq_gettty(char *tp); extern void pq_gettty(char *tp);
extern int pq_getport(void); extern int pq_getport(void);
extern void pq_close(void); extern void pq_close(void);
extern void pq_flush(void); extern int pq_flush(void);
extern int pq_recvbuf(void);
extern int pq_getstr(char *s, int maxlen); extern int pq_getstr(char *s, int maxlen);
extern int PQgetline(char *s, int maxlen); extern int PQgetline(char *s, int maxlen);
extern int PQputline(char *s); extern int PQputline(char *s);
extern int pq_getchar(void); extern int pq_getchar(void);
extern int pq_peekchar(void); extern int pq_peekchar(void);
extern int pq_getnchar(char *s, int off, int maxlen); extern int pq_getnchar(char *s, int off, int maxlen);
extern int pq_getint(int b); extern int pq_getint(int b);
extern int pq_putchar(char c); extern int pq_putchar(char c);
...@@ -282,4 +283,18 @@ extern int StreamServerPort(char *hostName, short portName, int *fdP); ...@@ -282,4 +283,18 @@ extern int StreamServerPort(char *hostName, short portName, int *fdP);
extern int StreamConnection(int server_fd, Port *port); extern int StreamConnection(int server_fd, Port *port);
extern void StreamClose(int sock); extern void StreamClose(int sock);
/*
* Internal send/receive buffers in libpq.
* These probably shouldn't be global at all, but unless we merge
* pqcomm.c and pqcomprim.c they have to be...
*/
#define PQ_BUFFER_SIZE 8192
extern char PqSendBuffer[PQ_BUFFER_SIZE];
extern int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
extern char PqRecvBuffer[PQ_BUFFER_SIZE];
extern int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
extern int PqRecvLength; /* End of data available in PqRecvBuffer */
#endif /* LIBPQ_H */ #endif /* LIBPQ_H */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment