1
0
mirror of https://github.com/haiwen/libsearpc.git synced 2025-04-28 02:30:08 +00:00

Blocking read and write socket

This commit is contained in:
杨赫然 2024-09-25 18:19:31 +08:00
parent 986ddf5e4f
commit 5d00dc4d44

View File

@ -197,10 +197,6 @@ failed:
typedef struct {
SearpcNamedPipe connfd;
char *buffer;
guint32 len;
guint32 header_offset;
guint32 offset;
SearpcNamedPipeServer *server;
gboolean use_epoll;
} ServerHandlerData;
@ -208,78 +204,35 @@ typedef struct {
// EPOLL
#ifdef __linux__
// This function will keep reading until the following conditions are met:
// 1. the socket has beed closed.
// 2. there is no more readable data in the system cache;
// 3. the requested size has been read;
// 4. an unrecoverable error has been encountered;
// The first two cases are not errors.
gssize
epoll_read_n(int fd, void *vptr, size_t n)
{
size_t nleft;
gssize nread;
char *ptr;
ptr = vptr;
nleft = n;
while (nleft > 0) {
if ( (nread = read(fd, ptr, nleft)) < 0) {
if (errno == EINTR)
nread = 0; /* and call read() again */
else if (errno == EAGAIN)
break;
else
return(-1);
} else if (nread == 0)
break; /* EOF */
nleft -= nread;
ptr += nread;
}
return(n - nleft); /* return >= 0 */
}
// This function will keep writing until the following conditions are met:
// 1. the socket has beed closed.
// 2. the requested size has been writed;
// 3. an unrecoverable error has been encountered;
// The first two cases are not errors.
gssize
epoll_write_n(int fd, const void *vptr, size_t n)
{
size_t nleft;
gssize nwritten;
const char *ptr;
ptr = vptr;
nleft = n;
while (nleft > 0) {
if ( (nwritten = write(fd, ptr, nleft)) <= 0)
{
if (errno == EAGAIN)
nwritten = 0; /* and call write() again */
else if (nwritten < 0 && errno == EINTR)
nwritten = 0; /* and call write() again */
else
return(-1); /* error */
}
nleft -= nwritten;
ptr += nwritten;
}
return(n);
}
static void epoll_handler(void *data)
{
ServerHandlerData *handler_data = data;
SearpcNamedPipe connfd = handler_data->connfd;
SearpcNamedPipeServer *server = handler_data->server;
const char *buf = handler_data->buffer;
guint32 len = handler_data->len;
char *buf = NULL;
guint32 len = 0;
char *ret_str = NULL;
int ret = 0;
int n;
if (pipe_read_n(connfd, &len, sizeof(guint32)) < 0) {
g_warning("failed to read rpc request size: %s\n", strerror(errno));
ret = -1;
goto out;
}
if (len <= 0) {
ret = -1;
goto out;
}
buf = g_new0 (char, len);
n = pipe_read_n (connfd, buf, len);
if (n < 0) {
g_warning ("failed to read rpc request: %s\n", strerror(errno));
ret = -1;
goto out;
}
char *service, *body;
if (request_from_json (buf, len, &service, &body) < 0) {
@ -293,21 +246,16 @@ static void epoll_handler(void *data)
g_free (body);
len = (guint32)ret_len;
if (epoll_write_n(connfd, &len, sizeof(guint32)) < 0) {
if (pipe_write_n (connfd, &len, sizeof(guint32)) < 0) {
ret = -1;
goto out;
}
if (epoll_write_n(connfd, ret_str, ret_len) < 0) {
if (pipe_write_n (connfd, ret_str, ret_len) < 0) {
ret = -1;
goto out;
}
g_free (handler_data->buffer);
handler_data->buffer = NULL;
handler_data->len = 0;
handler_data->header_offset = 0;
handler_data->offset = 0;
struct epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = (void *)handler_data;
@ -319,62 +267,14 @@ static void epoll_handler(void *data)
}
out:
g_free (buf);
if (ret < 0) {
close (connfd);
g_free (handler_data->buffer);
g_free (handler_data);
}
g_free (ret_str);
}
static int
read_client_request (int connfd, ServerHandlerData *data)
{
char *buf;
int n;
// read length of request.
if (data->header_offset != 4) {
n = epoll_read_n(connfd, (void *)&(data->len) + data->header_offset, sizeof(guint32) - data->header_offset);
if (n < 0) {
g_warning("Failed to read rpc request size: %s\n", strerror(errno));
return -1;
}
data->header_offset += n;
if (data->header_offset < 4) {
return 0;
} else if (data->header_offset > 4) {
g_warning("Failed to read rpc request size\n");
return -1;
}
if (data->len == 0) {
return -1;
}
}
if (!data->buffer) {
data->buffer = g_new0 (char, data->len);
}
// read content of request.
n = epoll_read_n(connfd, data->buffer + data->offset, data->len - data->offset);
if (n < 0) {
g_warning ("Failed to read rpc request: %s\n", strerror(errno));
return -1;
}
data->offset += n;
return 0;
}
int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) {
return -1;
}
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
static void
epoll_listen (SearpcNamedPipeServer *server)
{
@ -405,12 +305,6 @@ epoll_listen (SearpcNamedPipeServer *server)
continue;
}
if (set_nonblocking(connfd) < 0) {
close(connfd);
g_warning ("Failed to set connection to noblocking.\n");
continue;
}
event.events = EPOLLIN | EPOLLRDHUP;
ServerHandlerData *data = g_new0(ServerHandlerData, 1);
data->use_epoll = TRUE;
@ -429,27 +323,12 @@ epoll_listen (SearpcNamedPipeServer *server)
ServerHandlerData *data = (ServerHandlerData *)events[i].data.ptr;
connfd = data->connfd;
if (events[i].events & (EPOLLHUP | EPOLLRDHUP)) {
if (data->len > 0)
g_free (data->buffer);
g_free (data);
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
close (connfd);
continue;
}
int rc = read_client_request (connfd, data);
if (rc < 0) {
if (data->len)
g_free (data->buffer);
g_free (data);
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
close (connfd);
continue;
} else if (data->header_offset != 4 || data->len != data->offset) {
// Continue reading request.
continue;
}
// After reading the contents of a request, remove the socket from the epoll and do not read the next request for a while to prevent client errors.
// After socket is readable, remove the socket from the epoll.
// After the worker finishes processing the current request, we will add the socket back into the epoll.
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
if (server->named_pipe_server_thread_pool) {