diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 5a8b3fc96504c5de086f30a8ae8bd435cdf86aca..3126e82dfc37f1cbb0897855014e772a08674118 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -6,7 +6,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.67 1999/01/17 06:18:15 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.68 1999/01/23 22:27:26 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -303,9 +303,7 @@ DoCopy(char *relname, bool binary, bool oids, bool from, bool pipe, } else if (!from && !binary) { - CopySendData("\\.\n",3,fp); - if (IsUnderPostmaster) - pq_flush(); + CopySendData("\\.\n",3,fp); } } } diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 8f9c14fee9fcca8d56f13912182289dd2e3ac842..386643fe95c3fdfb52b87cd64a2c893e81d19df7 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -5,39 +5,40 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: pqcomm.c,v 1.63 1999/01/17 06:18:26 momjian Exp $ + * $Id: pqcomm.c,v 1.64 1999/01/23 22:27:28 tgl Exp $ * *------------------------------------------------------------------------- */ /* * INTERFACE ROUTINES - * pq_init - initialize libpq + * pq_init - initialize libpq * pq_getport - return the PGPORT setting * pq_close - close input / output connections * pq_flush - flush pending output + * pq_recvbuf - load some bytes into the input buffer * pq_getstr - get a null terminated string from connection - * pq_getchar - get 1 character from connection - * pq_peekchar - peek at first character in connection + * pq_getchar - get 1 character from connection + * pq_peekchar - peek at next character from connection * pq_getnchar - get n characters from connection, and null-terminate * pq_getint - get an integer from connection - * pq_putchar - send 1 character to connection + * pq_putchar - send 1 character to connection * pq_putstr - send a null terminated string to connection * pq_putnchar - send n characters to connection * pq_putint - send an integer to connection - * pq_putncharlen - send n characters to connection + * pq_putncharlen - send n characters to connection * (also send an int header indicating * the length) * pq_getinaddr - initialize address from host and port number * pq_getinserv - initialize address from host and service name * - * StreamDoUnlink - Shutdown UNIX socket connectioin - * StreamServerPort - Open sock stream - * StreamConnection - Create new connection with client - * StreamClose - Close a client/backend connection + * StreamDoUnlink - Shutdown UNIX socket connection + * StreamServerPort - Open socket stream + * StreamConnection - Create new connection with client + * StreamClose - Close a client/backend connection * * NOTES - * Frontend is now completey in interfaces/libpq, and no - * functions from this file is used. + * Frontend is now completely in interfaces/libpq, and no + * functions from this file are used there. * */ #include "postgres.h" @@ -79,6 +80,14 @@ extern FILE * debug_port; /* in util.c */ +/* + * Buffers + */ +char PqSendBuffer[PQ_BUFFER_SIZE]; +char PqRecvBuffer[PQ_BUFFER_SIZE]; +int PqSendPointer,PqRecvPointer,PqRecvLength; + + /* -------------------------------- * pq_init - open portal file descriptors * -------------------------------- @@ -86,6 +95,7 @@ extern FILE * debug_port; /* in util.c */ void pq_init(int fd) { + PqSendPointer = PqRecvPointer = PqRecvLength = 0; PQnotifies_init(); if (getenv("LIBPQ_DEBUG")) debug_port = stderr; @@ -94,40 +104,40 @@ pq_init(int fd) /* ------------------------- * pq_getchar() * - * get a character from the input file, - * + * get a character from the input file, or EOF if trouble + * -------------------------------- */ int pq_getchar(void) { - char c; - - while (recv(MyProcPort->sock, &c, 1, 0) != 1) { - if (errno != EINTR) - return EOF; /* Not interrupted, so something went wrong */ + while (PqRecvPointer >= PqRecvLength) + { + if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + return EOF; /* Failed to recv data */ } - - return c; + return PqRecvBuffer[PqRecvPointer++]; } -/* +/* ------------------------- + * pq_peekchar() + * + * get a character from the connection, but leave it in the buffer + * to be read again * -------------------------------- - * pq_peekchar - get 1 character from connection, but leave it in the stream */ -int -pq_peekchar(void) { - char c; - while (recv(MyProcPort->sock, &c, 1, MSG_PEEK) != 1) { - if (errno != EINTR) - return EOF; /* Not interrupted, so something went wrong */ +int +pq_peekchar(void) +{ + while (PqRecvPointer >= PqRecvLength) + { + if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + return EOF; /* Failed to recv data */ } - - return c; + /* Note we don't bump the pointer... */ + return PqRecvBuffer[PqRecvPointer]; } - - /* -------------------------------- * pq_getport - return the PGPORT setting @@ -150,18 +160,91 @@ pq_getport() void pq_close() { - close(MyProcPort->sock); + close(MyProcPort->sock); PQnotifies_init(); } /* -------------------------------- * pq_flush - flush pending output + * + * returns 0 if OK, EOF if trouble * -------------------------------- */ -void +int pq_flush() { - /* Not supported/required? */ + char *bufptr = PqSendBuffer; + char *bufend = PqSendBuffer + PqSendPointer; + + while (bufptr < bufend) + { + int r = send(MyProcPort->sock, bufptr, bufend - bufptr, 0); + if (r <= 0) + { + if (errno == EINTR) + continue; /* Ok if we were interrupted */ + /* We would like to use elog() here, but cannot because elog + * tries to write to the client, which would cause a recursive + * flush attempt! So just write it out to the postmaster log. + */ + fprintf(stderr, "pq_flush: send() failed, errno %d\n", errno); + /* We drop the buffered data anyway so that processing + * can continue, even though we'll probably quit soon. + */ + PqSendPointer = 0; + return EOF; + } + bufptr += r; + } + PqSendPointer = 0; + return 0; +} + +/* -------------------------------- + * pq_recvbuf - load some bytes into the input buffer + * + * returns 0 if OK, EOF if trouble + * -------------------------------- + */ + +int +pq_recvbuf() +{ + if (PqRecvPointer > 0) + { + if (PqRecvLength > PqRecvPointer) + { + /* still some unread data, left-justify it in the buffer */ + memmove(PqRecvBuffer, PqRecvBuffer+PqRecvPointer, + PqRecvLength-PqRecvPointer); + PqRecvLength -= PqRecvPointer; + PqRecvPointer = 0; + } + else + PqRecvLength = PqRecvPointer = 0; + } + + /* Can fill buffer from PqRecvLength and upwards */ + for (;;) + { + int r = recv(MyProcPort->sock, PqRecvBuffer + PqRecvLength, + PQ_BUFFER_SIZE - PqRecvLength, 0); + if (r <= 0) + { + if (errno == EINTR) + continue; /* Ok if interrupted */ + /* We would like to use elog() here, but dare not because elog + * tries to write to the client, which will cause problems + * if we have a hard communications failure ... + * So just write the message to the postmaster log. + */ + fprintf(stderr, "pq_recvbuf: recv() failed, errno %d\n", errno); + return EOF; + } + /* r contains number of bytes read, so just incr length */ + PqRecvLength += r; + return 0; + } } /* -------------------------------- @@ -194,7 +277,7 @@ pq_getstr(char *s, int maxlen) int pq_getnchar(char *s, int off, int maxlen) { - int r = pqGetNBytes(s + off, maxlen); + int r = pqGetNBytes(s + off, maxlen); s[off+maxlen] = '\0'; return r; } @@ -602,7 +685,7 @@ StreamConnection(int server_fd, Port *port) if (setsockopt(port->sock, pe->p_proto, TCP_NODELAY, &on, sizeof(on)) < 0) { - elog(ERROR, "postmaster: setsockopt failed"); + elog(ERROR, "postmaster: setsockopt failed: %m"); return STATUS_ERROR; } } @@ -644,18 +727,9 @@ pq_putncharlen(char *s, int n) */ int pq_putchar(char c) { - char isDone = 0; - - do { - if (send(MyProcPort->sock, &c, 1, 0) != 1) { - if (errno != EINTR) - return EOF; /* Anything other than interrupt is error! */ - } - else - isDone = 1; /* Done if we sent one char */ - } while (!isDone); - return c; + if (PqSendPointer >= PQ_BUFFER_SIZE) + if (pq_flush()) /* If buffer is full, then flush it out */ + return EOF; + PqSendBuffer[PqSendPointer++] = c; /* Put in buffer */ + return c; } - - - diff --git a/src/backend/libpq/pqcomprim.c b/src/backend/libpq/pqcomprim.c index 23ecfd4e19fe7a13ef0fbb0552ae6091348c096d..e48a1c16888600ee82c27fbb3143abf3cb5bb56b 100644 --- a/src/backend/libpq/pqcomprim.c +++ b/src/backend/libpq/pqcomprim.c @@ -98,7 +98,7 @@ pqPutLong(int integer) n = ((PG_PROTOCOL_MAJOR(FrontendProtocol) == 0) ? hton_l(integer) : htonl((uint32) integer)); #endif - return pqPutNBytes((char *)&n,4); + return pqPutNBytes((char *)&n, 4); } /* --------------------------------------------------------------------- */ @@ -107,7 +107,7 @@ pqGetShort(int *result) { uint16 n; - if (pqGetNBytes((char *)&n,2) != 0) + if (pqGetNBytes((char *)&n, 2) != 0) return EOF; #ifdef FRONTEND @@ -138,28 +138,29 @@ pqGetLong(int *result) } /* --------------------------------------------------------------------- */ -/* pqGetNBytes: Read a chunk of exactly len bytes in buffer s (which must be 1 - byte longer) and terminate it with a '\0'. - Return 0 if ok. -*/ +/* pqGetNBytes: Read a chunk of exactly len bytes into buffer s. + * Return 0 if ok, EOF if trouble. + */ int pqGetNBytes(char *s, size_t len) { - int bytesDone = 0; - - do { - int r = recv(MyProcPort->sock, s+bytesDone, len-bytesDone, 0); - if (r == 0 || r == -1) { - if (errno != EINTR) - return EOF; /* All other than signal-interrupted is error */ - continue; /* Otherwise, try again */ - } - - /* r contains number of bytes received */ - bytesDone += r; - - } while (bytesDone < len); - /* Zero-termination now in pq_getnchar() instead */ + size_t amount; + + while (len > 0) + { + while (PqRecvPointer >= PqRecvLength) + { + if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + return EOF; /* Failed to recv data */ + } + amount = PqRecvLength - PqRecvPointer; + if (amount > len) + amount = len; + memcpy(s, PqRecvBuffer + PqRecvPointer, amount); + PqRecvPointer += amount; + s += amount; + len -= amount; + } return 0; } @@ -167,20 +168,21 @@ pqGetNBytes(char *s, size_t len) int pqPutNBytes(const char *s, size_t len) { - int bytesDone = 0; - - do { - int r = send(MyProcPort->sock, s+bytesDone, len-bytesDone, 0); - if (r == 0 || r == -1) { - if (errno != EINTR) - return EOF; /* Only signal interruption allowed */ - continue; /* If interruped and read nothing, just try again */ - } - - /* r contains number of bytes sent so far */ - bytesDone += r; - } while (bytesDone < len); - + size_t amount; + + while (len > 0) + { + if (PqSendPointer >= PQ_BUFFER_SIZE) + if (pq_flush()) /* If buffer is full, then flush it out */ + return EOF; + amount = PQ_BUFFER_SIZE - PqSendPointer; + if (amount > len) + amount = len; + memcpy(PqSendBuffer + PqSendPointer, s, amount); + PqSendPointer += amount; + s += amount; + len -= amount; + } return 0; } @@ -191,8 +193,8 @@ pqGetString(char *s, size_t len) int c; /* - * Keep on reading until we get the terminating '\0' and discard those - * bytes we don't have room for. + * Keep on reading until we get the terminating '\0', + * discarding any bytes we don't have room for. */ while ((c = pq_getchar()) != EOF && c != '\0') @@ -216,4 +218,3 @@ pqPutString(const char *s) { return pqPutNBytes(s,strlen(s)+1); } - diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index 4e68c1e24a8177e2438f9e3c97bccdad7f45a345..473fc06c3e11cfb667024c9646b9a7bfb8312532 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/error/elog.c,v 1.37 1999/01/11 03:56:07 scrappy Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/error/elog.c,v 1.38 1999/01/23 22:27:29 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -192,8 +192,15 @@ elog(int lev, const char *fmt,...) pq_putnchar("N", 1); else pq_putnchar("E", 1); - /* pq_putint(-101, 4); *//* should be query id */ pq_putstr(line + TIMESTAMP_SIZE); /* don't show timestamps */ + /* + * This flush is normally not necessary, since postgres.c will + * flush out waiting data when control returns to the main loop. + * But it seems best to leave it here, so that the client has some + * clue what happened if the backend dies before getting back to the + * main loop ... error/notice messages should not be a performance- + * critical path anyway, so an extra flush won't hurt much ... + */ pq_flush(); } if (!IsUnderPostmaster) diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index a315521eb3594ce5d901208a77b4c0e04bb50db7..c1cdd8ac5d97cc79e13a1b4ca0dfa16f4d16e4d4 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -6,7 +6,7 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: libpq.h,v 1.23 1999/01/12 12:49:52 scrappy Exp $ + * $Id: libpq.h,v 1.24 1999/01/23 22:27:25 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -254,12 +254,13 @@ extern void pq_init(int fd); extern void pq_gettty(char *tp); extern int pq_getport(void); extern void pq_close(void); -extern void pq_flush(void); +extern int pq_flush(void); +extern int pq_recvbuf(void); extern int pq_getstr(char *s, int maxlen); extern int PQgetline(char *s, int maxlen); extern int PQputline(char *s); -extern int pq_getchar(void); -extern int pq_peekchar(void); +extern int pq_getchar(void); +extern int pq_peekchar(void); extern int pq_getnchar(char *s, int off, int maxlen); extern int pq_getint(int b); extern int pq_putchar(char c); @@ -282,4 +283,18 @@ extern int StreamServerPort(char *hostName, short portName, int *fdP); extern int StreamConnection(int server_fd, Port *port); extern void StreamClose(int sock); +/* + * Internal send/receive buffers in libpq. + * These probably shouldn't be global at all, but unless we merge + * pqcomm.c and pqcomprim.c they have to be... + */ + +#define PQ_BUFFER_SIZE 8192 + +extern char PqSendBuffer[PQ_BUFFER_SIZE]; +extern int PqSendPointer; /* Next index to store a byte in PqSendBuffer */ +extern char PqRecvBuffer[PQ_BUFFER_SIZE]; +extern int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ +extern int PqRecvLength; /* End of data available in PqRecvBuffer */ + #endif /* LIBPQ_H */