1
0
mirror of https://github.com/haiwen/libsearpc.git synced 2025-08-11 01:11:31 +00:00

Set socket to nonblocking and re-add socket to epoll

This commit is contained in:
杨赫然 2024-07-25 16:09:53 +08:00
parent f4093e136d
commit eeb44249d6
4 changed files with 206 additions and 121 deletions

View File

@ -51,15 +51,6 @@ AC_ARG_ENABLE([compile-universal],
AM_CONDITIONAL([COMPILE_UNIVERSAL], [test x${compile_universal} = xyes]) AM_CONDITIONAL([COMPILE_UNIVERSAL], [test x${compile_universal} = xyes])
# option: compile-epoll
# default: no
AC_ARG_ENABLE([compile-epoll],
[AS_HELP_STRING([--enable-compile-epoll],
[compile epoll@<:@default: no@:>@])],
[compile_epoll=${enableval}], [compile_epoll=no])
AM_CONDITIONAL([COMPILE_EPOLL], [test x${compile_epoll} = xyes])
dnl - check if the macro WIN32 is defined on this compiler. dnl - check if the macro WIN32 is defined on this compiler.
AC_MSG_CHECKING(for WIN32) AC_MSG_CHECKING(for WIN32)

View File

@ -10,10 +10,6 @@ AM_CFLAGS += -arch x86_64 -arch arm64
endif endif
endif endif
if COMPILE_EPOLL
AM_CFLAGS += -DCOMPILE_EPOLL
endif
lib_LTLIBRARIES = libsearpc.la lib_LTLIBRARIES = libsearpc.la
include_HEADERS = searpc-client.h searpc-server.h searpc-utils.h searpc.h searpc-named-pipe-transport.h include_HEADERS = searpc-client.h searpc-server.h searpc-utils.h searpc.h searpc-named-pipe-transport.h

View File

@ -8,7 +8,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
#include <unistd.h> #include <unistd.h>
#ifdef COMPILE_EPOLL #ifdef __linux__
#include <fcntl.h> #include <fcntl.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#endif #endif
@ -158,24 +158,26 @@ int searpc_named_pipe_server_start(SearpcNamedPipeServer *server)
goto failed; goto failed;
} }
#ifdef COMPILE_EPOLL #ifdef __linux__
int epoll_fd; if (server->use_epoll) {
struct epoll_event event; int epoll_fd;
struct epoll_event event;
epoll_fd = epoll_create1(0); epoll_fd = epoll_create1(0);
if (epoll_fd < 0) { if (epoll_fd < 0) {
g_warning ("failed to open an epoll file descriptor: %s\n", strerror(errno)); g_warning ("failed to open an epoll file descriptor: %s\n", strerror(errno));
goto failed; goto failed;
}
event.events = EPOLLIN;
event.data.fd = pipe_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fd, &event) == -1) {
g_warning ("failed to add pipe fd to epoll list: %s\n", strerror(errno));
goto failed;
}
server->epoll_fd = epoll_fd;
} }
event.events = EPOLLIN;
event.data.fd = pipe_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fd, &event) == -1) {
g_warning ("failed to add pipe fd to epoll list: %s\n", strerror(errno));
goto failed;
}
server->epoll_fd = epoll_fd;
#endif #endif
server->pipe_fd = pipe_fd; server->pipe_fd = pipe_fd;
@ -193,22 +195,119 @@ failed:
#endif #endif
} }
#ifdef COMPILE_EPOLL
typedef struct { typedef struct {
int connfd; SearpcNamedPipe connfd;
int len; char *buffer;
} EpollAux; guint32 len;
guint32 offset;
SearpcNamedPipeServer *server;
gboolean use_epoll;
} ServerHandlerData;
// EPOLL
#ifdef __linux__
// Since the socket fd is set to nonblocking, epoll_read_n supports returning read data.
// When errno is set to EAGAIN, it means that the client did not continue to send data,
// and it needs to continue to read when the socket is readable next time.
gssize
epoll_read_n(int fd, void *vptr, size_t n, int *err)
{
size_t nleft;
gssize nread;
char *ptr;
ptr = vptr;
nleft = n;
while (nleft > 0) {
if ( (nread = read(fd, ptr, nleft)) < 0) {
*err = errno;
if (*err == EINTR)
nread = 0; /* and call read() again */
else if (*err == EAGAIN)
return (n-nleft);
else
return(-1);
} else if (nread == 0)
break; /* EOF */
nleft -= nread;
ptr += nread;
}
return(n - nleft); /* return >= 0 */
}
static void epoll_handler(void *data)
{
ServerHandlerData *handler_data = data;
SearpcNamedPipe connfd = handler_data->connfd;
SearpcNamedPipeServer *server = handler_data->server;
const char *buf = handler_data->buffer;
guint32 len = handler_data->len;
char *ret_str = NULL;
int ret = 0;
char *service, *body;
if (request_from_json (buf, len, &service, &body) < 0) {
ret = -1;
goto out;
}
gsize ret_len;
ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len);
g_free (service);
g_free (body);
len = (guint32)ret_len;
if (pipe_write_n(connfd, &len, sizeof(guint32)) < 0) {
ret = -1;
goto out;
}
if (pipe_write_n(connfd, ret_str, ret_len) < 0) {
ret = -1;
goto out;
}
g_free (handler_data->buffer);
handler_data->buffer = NULL;
handler_data->len = 0;
handler_data->offset = 0;
struct epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = (void *)handler_data;
if (epoll_ctl (server->epoll_fd, EPOLL_CTL_ADD, connfd, &event) == -1) {
ret = -1;
g_warning ("failed to add client fd to epoll list: %s\n", strerror(errno));
goto out;
}
out:
if (ret < 0) {
close (connfd);
g_free (handler_data->buffer);
g_free (handler_data);
}
g_free (ret_str);
}
static int static int
read_client_request (int connfd, EpollAux *aux, char **req, guint32 *req_len) read_client_request (int connfd, ServerHandlerData *data)
{ {
guint32 len = aux->len; guint32 len = data->len;
char *buf; char *buf;
int err = 0;
int n;
// read length of request. // read length of request.
if (len == 0) { if (len == 0) {
if (pipe_read_n(connfd, &len, sizeof(guint32)) < 0) { n = epoll_read_n(connfd, &len, sizeof(guint32), &err);
g_warning("failed to read rpc request size: %s\n", strerror(errno)); if (err == EAGAIN) {
return 0;
}
if (n < 0) {
g_warning("failed to read rpc request size: %s\n", strerror(err));
return -1; return -1;
} }
if (len == 0) { if (len == 0) {
@ -216,41 +315,37 @@ read_client_request (int connfd, EpollAux *aux, char **req, guint32 *req_len)
} }
} }
buf = g_malloc (len); data->len = len;
if (!data->buffer) {
data->buffer = g_new0 (char, len);
}
// read content of request. // read content of request.
if (pipe_read_n(connfd, buf, len) < 0) { n = epoll_read_n(connfd, data->buffer + data->offset, len - data->offset, &err);
g_free (buf); if (err == EAGAIN) {
if (errno == EAGAIN) { data->offset += n;
aux->len = len; return 0;
return 0; }
} if (n < 0) {
g_warning ("failed to read rpc request: %s\n", strerror(errno)); g_warning ("failed to read rpc request: %s\n", strerror(errno));
return -1; return -1;
} }
data->offset += n;
*req = buf;
*req_len = len;
aux->len = 0;
return 0; return 0;
} }
#endif
typedef struct { int set_nonblocking(int fd) {
SearpcNamedPipe connfd; int flags = fcntl(fd, F_GETFL, 0);
#ifdef COMPILE_EPOLL if (flags < 0) {
char *req; return -1;
guint32 len; }
SearpcNamedPipeServer *server; return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
#endif }
} ServerHandlerData;
static void* named_pipe_listen(void *arg) static void
epoll_listen (SearpcNamedPipeServer *server)
{ {
SearpcNamedPipeServer *server = arg;
#ifdef COMPILE_EPOLL
#define MAX_EVENTS 1000 #define MAX_EVENTS 1000
struct epoll_event event; struct epoll_event event;
struct epoll_event events[MAX_EVENTS]; struct epoll_event events[MAX_EVENTS];
@ -261,6 +356,9 @@ static void* named_pipe_listen(void *arg)
while (1) { while (1) {
n_events = epoll_wait (server->epoll_fd, events, MAX_EVENTS, 1000); n_events = epoll_wait (server->epoll_fd, events, MAX_EVENTS, 1000);
if (n_events <= 0) { if (n_events <= 0) {
if (n_events < 0) {
g_warning ("Failed to call waits for events on the epoll: %s\n", strerror(errno));
}
continue; continue;
} }
for (i = 0; i < n_events; i++) { for (i = 0; i < n_events; i++) {
@ -271,48 +369,58 @@ static void* named_pipe_listen(void *arg)
// new client connection // new client connection
connfd = accept (server->pipe_fd, NULL, 0); connfd = accept (server->pipe_fd, NULL, 0);
if (connfd < 0) { if (connfd < 0) {
g_warning ("failed to accept new client connection: %s\n", strerror(errno)); g_warning ("Failed to accept new client connection: %s\n", strerror(errno));
continue;
}
if (set_nonblocking(connfd) < 0) {
close(connfd);
g_warning ("Failed to set connection to noblocking.\n");
continue; continue;
} }
event.events = EPOLLIN | EPOLLRDHUP; event.events = EPOLLIN | EPOLLRDHUP;
EpollAux *aux = g_new0 (EpollAux, 1); ServerHandlerData *data = g_new0(ServerHandlerData, 1);
aux->connfd = connfd; data->use_epoll = TRUE;
event.data.ptr = (void *)aux; data->connfd = connfd;
data->server = server;
event.data.ptr = (void *)data;
if (epoll_ctl (server->epoll_fd, EPOLL_CTL_ADD, connfd, &event) == -1) { if (epoll_ctl (server->epoll_fd, EPOLL_CTL_ADD, connfd, &event) == -1) {
g_warning ("failed to add client fd to epoll list: %s\n", strerror(errno)); g_warning ("Failed to add client fd to epoll list: %s\n", strerror(errno));
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL); epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
close (connfd); close (connfd);
g_free (aux); g_free (data);
continue; continue;
} }
g_message ("start to serve on pipe client\n"); g_message ("start to serve on pipe client\n");
} else { } else {
EpollAux *aux = (EpollAux *)events[i].data.ptr; ServerHandlerData *data = (ServerHandlerData *)events[i].data.ptr;
connfd = aux->connfd; connfd = data->connfd;
if (events[i].events & (EPOLLHUP | EPOLLRDHUP)) { if (events[i].events & (EPOLLHUP | EPOLLRDHUP)) {
g_free (aux); if (data->len > 0)
g_free (data->buffer);
g_free (data);
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL); epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
close (connfd); close (connfd);
continue; continue;
} }
int req_len = 0; int rc = read_client_request (connfd, data);
char *req = NULL;
int rc = read_client_request (connfd, aux, &req, &req_len);
if (rc < 0) { if (rc < 0) {
g_free (aux); if (data->len)
g_free (data->buffer);
g_free (data);
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL); epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
close (connfd); close (connfd);
continue; continue;
} else if (req_len == 0) { } else if (data->len == 0) {
// Continue reading request length.
continue;
} else if (data->len != data->offset) {
// Continue reading request content.
continue; continue;
} }
ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData)); epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
data->connfd = connfd;
data->req = req;
data->len = req_len;
data->server = server;
if (server->named_pipe_server_thread_pool) { if (server->named_pipe_server_thread_pool) {
if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) { if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) {
g_warning("The rpc server thread pool is full, the maximum number of threads is %d\n", server->pool_size); g_warning("The rpc server thread pool is full, the maximum number of threads is %d\n", server->pool_size);
@ -328,12 +436,25 @@ static void* named_pipe_listen(void *arg)
} }
} }
} }
#else }
#endif
static void* named_pipe_listen(void *arg)
{
SearpcNamedPipeServer *server = arg;
#if !defined(WIN32) #if !defined(WIN32)
#ifdef __linux__
if (server->use_epoll) {
epoll_listen (server);
return NULL;
}
#endif
while (1) { while (1) {
int connfd = accept (server->pipe_fd, NULL, 0); int connfd = accept (server->pipe_fd, NULL, 0);
ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData)); ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData));
data->connfd = connfd; data->connfd = connfd;
data->use_epoll = FALSE;
if (server->named_pipe_server_thread_pool) { if (server->named_pipe_server_thread_pool) {
if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) { if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) {
g_warning("The rpc server thread pool is full, the maximum number of threads is %d\n", server->pool_size); g_warning("The rpc server thread pool is full, the maximum number of threads is %d\n", server->pool_size);
@ -383,6 +504,7 @@ static void* named_pipe_listen(void *arg)
ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData)); ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData));
data->connfd = connfd; data->connfd = connfd;
data->use_epoll = FALSE;
if (server->named_pipe_server_thread_pool) if (server->named_pipe_server_thread_pool)
g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL); g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL);
else { else {
@ -394,12 +516,18 @@ static void* named_pipe_listen(void *arg)
} }
} }
#endif // !defined(WIN32) #endif // !defined(WIN32)
#endif // !defined(COMPILE_EPOLL)
return NULL; return NULL;
} }
static void* handle_named_pipe_client_with_thread(void *arg) static void* handle_named_pipe_client_with_thread(void *arg)
{ {
#ifdef __linux__
ServerHandlerData *handler_data = arg;
if (handler_data->use_epoll) {
epoll_handler (arg);
return NULL;
}
#endif
named_pipe_client_handler(arg); named_pipe_client_handler(arg);
return NULL; return NULL;
@ -407,44 +535,16 @@ static void* handle_named_pipe_client_with_thread(void *arg)
static void handle_named_pipe_client_with_threadpool(void *data, void *user_data) static void handle_named_pipe_client_with_threadpool(void *data, void *user_data)
{ {
#ifdef __linux__
ServerHandlerData *handler_data = data;
if (handler_data->use_epoll) {
epoll_handler (data);
return;
}
#endif
named_pipe_client_handler(data); named_pipe_client_handler(data);
} }
#ifdef COMPILE_EPOLL
static void named_pipe_client_handler(void *data)
{
ServerHandlerData *handler_data = data;
SearpcNamedPipe connfd = handler_data->connfd;
SearpcNamedPipeServer *server = handler_data->server;
const char *buf = handler_data->req;
guint32 len = handler_data->len;
char *ret_str = NULL;
char *service, *body;
if (request_from_json (buf, len, &service, &body) < 0) {
goto out;
}
gsize ret_len;
ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len);
g_free (service);
g_free (body);
len = (guint32)ret_len;
if (pipe_write_n(connfd, &len, sizeof(guint32)) < 0) {
goto out;
}
if (pipe_write_n(connfd, ret_str, ret_len) < 0) {
goto out;
}
out:
g_free (ret_str);
g_free (handler_data->req);
g_free (data);
}
#else
static void named_pipe_client_handler(void *data) static void named_pipe_client_handler(void *data)
{ {
ServerHandlerData *handler_data = data; ServerHandlerData *handler_data = data;
@ -513,7 +613,6 @@ static void named_pipe_client_handler(void *data)
g_free (data); g_free (data);
g_free (buf); g_free (buf);
} }
#endif
int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client) int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client)
{ {

View File

@ -41,9 +41,8 @@ struct _SearpcNamedPipeServer {
char path[4096]; char path[4096];
pthread_t listener_thread; pthread_t listener_thread;
SearpcNamedPipe pipe_fd; SearpcNamedPipe pipe_fd;
#ifdef COMPILE_EPOLL gboolean use_epoll;
int epoll_fd; int epoll_fd;
#endif
GThreadPool *named_pipe_server_thread_pool; GThreadPool *named_pipe_server_thread_pool;
int pool_size; int pool_size;
}; };