diff --git a/lib/searpc-named-pipe-transport.c b/lib/searpc-named-pipe-transport.c index f9d0fb2..fb2c5f6 100644 --- a/lib/searpc-named-pipe-transport.c +++ b/lib/searpc-named-pipe-transport.c @@ -199,6 +199,7 @@ typedef struct { SearpcNamedPipe connfd; char *buffer; guint32 len; + guint32 header_offset; guint32 offset; SearpcNamedPipeServer *server; gboolean use_epoll; @@ -207,11 +208,14 @@ typedef struct { // EPOLL #ifdef __linux__ -// Since the socket fd is set to nonblocking, epoll_read_n supports returning read data. -// When errno is set to EAGAIN, it means that the client did not continue to send data, -// and it needs to continue to read when the socket is readable next time. +// This function will keep reading until the following conditions are met: +// 1. the socket has beed closed. +// 2. there is no more readable data in the system cache; +// 3. the requested size has been read; +// 4. an unrecoverable error has been encountered; +// The first two cases are not errors. gssize -epoll_read_n(int fd, void *vptr, size_t n, int *err) +epoll_read_n(int fd, void *vptr, size_t n) { size_t nleft; gssize nread; @@ -221,11 +225,10 @@ epoll_read_n(int fd, void *vptr, size_t n, int *err) nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft)) < 0) { - *err = errno; - if (*err == EINTR) + if (errno == EINTR) nread = 0; /* and call read() again */ - else if (*err == EAGAIN) - return (n-nleft); + else if (errno == EAGAIN) + break; else return(-1); } else if (nread == 0) @@ -272,6 +275,7 @@ static void epoll_handler(void *data) g_free (handler_data->buffer); handler_data->buffer = NULL; handler_data->len = 0; + handler_data->header_offset = 0; handler_data->offset = 0; struct epoll_event event; event.events = EPOLLIN | EPOLLRDHUP; @@ -297,17 +301,20 @@ read_client_request (int connfd, ServerHandlerData *data) { guint32 len = data->len; char *buf; - int err = 0; int n; // read length of request. - if (len == 0) { - n = epoll_read_n(connfd, &len, sizeof(guint32), &err); - if (err == EAGAIN) { - return 0; - } + if (data->header_offset != 4) { + n = epoll_read_n(connfd, (void *)&len + data->header_offset, sizeof(guint32) - data->header_offset); if (n < 0) { - g_warning("failed to read rpc request size: %s\n", strerror(err)); + g_warning("Failed to read rpc request size: %s\n", strerror(errno)); + return -1; + } + data->header_offset += n; + if (data->header_offset < 4) { + return 0; + } else if (data->header_offset > 4) { + g_warning("Failed to read rpc request size\n"); return -1; } if (len == 0) { @@ -321,13 +328,9 @@ read_client_request (int connfd, ServerHandlerData *data) } // read content of request. - n = epoll_read_n(connfd, data->buffer + data->offset, len - data->offset, &err); - if (err == EAGAIN) { - data->offset += n; - return 0; - } + n = epoll_read_n(connfd, data->buffer + data->offset, len - data->offset); if (n < 0) { - g_warning ("failed to read rpc request: %s\n", strerror(errno)); + g_warning ("Failed to read rpc request: %s\n", strerror(errno)); return -1; } data->offset += n; @@ -412,7 +415,7 @@ epoll_listen (SearpcNamedPipeServer *server) epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL); close (connfd); continue; - } else if (data->len == 0) { + } else if (data->header_offset != 4) { // Continue reading request length. continue; } else if (data->len != data->offset) { @@ -420,6 +423,8 @@ epoll_listen (SearpcNamedPipeServer *server) continue; } + // After reading the contents of a request, remove the socket from the epoll and do not read the next request for a while to prevent client errors. + // After the worker finishes processing the current request, we will add the socket back into the epoll. epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL); if (server->named_pipe_server_thread_pool) { if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) {