mirror of
https://github.com/haiwen/libsearpc.git
synced 2025-08-07 23:43:31 +00:00
Support reading request length
This commit is contained in:
parent
eeb44249d6
commit
a816018bd3
@ -199,6 +199,7 @@ typedef struct {
|
|||||||
SearpcNamedPipe connfd;
|
SearpcNamedPipe connfd;
|
||||||
char *buffer;
|
char *buffer;
|
||||||
guint32 len;
|
guint32 len;
|
||||||
|
guint32 header_offset;
|
||||||
guint32 offset;
|
guint32 offset;
|
||||||
SearpcNamedPipeServer *server;
|
SearpcNamedPipeServer *server;
|
||||||
gboolean use_epoll;
|
gboolean use_epoll;
|
||||||
@ -207,11 +208,14 @@ typedef struct {
|
|||||||
// EPOLL
|
// EPOLL
|
||||||
#ifdef __linux__
|
#ifdef __linux__
|
||||||
|
|
||||||
// Since the socket fd is set to nonblocking, epoll_read_n supports returning read data.
|
// This function will keep reading until the following conditions are met:
|
||||||
// When errno is set to EAGAIN, it means that the client did not continue to send data,
|
// 1. the socket has beed closed.
|
||||||
// and it needs to continue to read when the socket is readable next time.
|
// 2. there is no more readable data in the system cache;
|
||||||
|
// 3. the requested size has been read;
|
||||||
|
// 4. an unrecoverable error has been encountered;
|
||||||
|
// The first two cases are not errors.
|
||||||
gssize
|
gssize
|
||||||
epoll_read_n(int fd, void *vptr, size_t n, int *err)
|
epoll_read_n(int fd, void *vptr, size_t n)
|
||||||
{
|
{
|
||||||
size_t nleft;
|
size_t nleft;
|
||||||
gssize nread;
|
gssize nread;
|
||||||
@ -221,11 +225,10 @@ epoll_read_n(int fd, void *vptr, size_t n, int *err)
|
|||||||
nleft = n;
|
nleft = n;
|
||||||
while (nleft > 0) {
|
while (nleft > 0) {
|
||||||
if ( (nread = read(fd, ptr, nleft)) < 0) {
|
if ( (nread = read(fd, ptr, nleft)) < 0) {
|
||||||
*err = errno;
|
if (errno == EINTR)
|
||||||
if (*err == EINTR)
|
|
||||||
nread = 0; /* and call read() again */
|
nread = 0; /* and call read() again */
|
||||||
else if (*err == EAGAIN)
|
else if (errno == EAGAIN)
|
||||||
return (n-nleft);
|
break;
|
||||||
else
|
else
|
||||||
return(-1);
|
return(-1);
|
||||||
} else if (nread == 0)
|
} else if (nread == 0)
|
||||||
@ -272,6 +275,7 @@ static void epoll_handler(void *data)
|
|||||||
g_free (handler_data->buffer);
|
g_free (handler_data->buffer);
|
||||||
handler_data->buffer = NULL;
|
handler_data->buffer = NULL;
|
||||||
handler_data->len = 0;
|
handler_data->len = 0;
|
||||||
|
handler_data->header_offset = 0;
|
||||||
handler_data->offset = 0;
|
handler_data->offset = 0;
|
||||||
struct epoll_event event;
|
struct epoll_event event;
|
||||||
event.events = EPOLLIN | EPOLLRDHUP;
|
event.events = EPOLLIN | EPOLLRDHUP;
|
||||||
@ -297,17 +301,20 @@ read_client_request (int connfd, ServerHandlerData *data)
|
|||||||
{
|
{
|
||||||
guint32 len = data->len;
|
guint32 len = data->len;
|
||||||
char *buf;
|
char *buf;
|
||||||
int err = 0;
|
|
||||||
int n;
|
int n;
|
||||||
|
|
||||||
// read length of request.
|
// read length of request.
|
||||||
if (len == 0) {
|
if (data->header_offset != 4) {
|
||||||
n = epoll_read_n(connfd, &len, sizeof(guint32), &err);
|
n = epoll_read_n(connfd, (void *)&len + data->header_offset, sizeof(guint32) - data->header_offset);
|
||||||
if (err == EAGAIN) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
g_warning("failed to read rpc request size: %s\n", strerror(err));
|
g_warning("Failed to read rpc request size: %s\n", strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
data->header_offset += n;
|
||||||
|
if (data->header_offset < 4) {
|
||||||
|
return 0;
|
||||||
|
} else if (data->header_offset > 4) {
|
||||||
|
g_warning("Failed to read rpc request size\n");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (len == 0) {
|
if (len == 0) {
|
||||||
@ -321,13 +328,9 @@ read_client_request (int connfd, ServerHandlerData *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// read content of request.
|
// read content of request.
|
||||||
n = epoll_read_n(connfd, data->buffer + data->offset, len - data->offset, &err);
|
n = epoll_read_n(connfd, data->buffer + data->offset, len - data->offset);
|
||||||
if (err == EAGAIN) {
|
|
||||||
data->offset += n;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
g_warning ("failed to read rpc request: %s\n", strerror(errno));
|
g_warning ("Failed to read rpc request: %s\n", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
data->offset += n;
|
data->offset += n;
|
||||||
@ -412,7 +415,7 @@ epoll_listen (SearpcNamedPipeServer *server)
|
|||||||
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
|
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
|
||||||
close (connfd);
|
close (connfd);
|
||||||
continue;
|
continue;
|
||||||
} else if (data->len == 0) {
|
} else if (data->header_offset != 4) {
|
||||||
// Continue reading request length.
|
// Continue reading request length.
|
||||||
continue;
|
continue;
|
||||||
} else if (data->len != data->offset) {
|
} else if (data->len != data->offset) {
|
||||||
@ -420,6 +423,8 @@ epoll_listen (SearpcNamedPipeServer *server)
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After reading the contents of a request, remove the socket from the epoll and do not read the next request for a while to prevent client errors.
|
||||||
|
// After the worker finishes processing the current request, we will add the socket back into the epoll.
|
||||||
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
|
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
|
||||||
if (server->named_pipe_server_thread_pool) {
|
if (server->named_pipe_server_thread_pool) {
|
||||||
if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) {
|
if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) {
|
||||||
|
Loading…
Reference in New Issue
Block a user