diff --git a/configure.ac b/configure.ac index bebe151..a16c96f 100644 --- a/configure.ac +++ b/configure.ac @@ -51,15 +51,6 @@ AC_ARG_ENABLE([compile-universal], AM_CONDITIONAL([COMPILE_UNIVERSAL], [test x${compile_universal} = xyes]) -# option: compile-epoll -# default: no -AC_ARG_ENABLE([compile-epoll], -[AS_HELP_STRING([--enable-compile-epoll], -[compile epoll@<:@default: no@:>@])], -[compile_epoll=${enableval}], [compile_epoll=no]) - -AM_CONDITIONAL([COMPILE_EPOLL], [test x${compile_epoll} = xyes]) - dnl - check if the macro WIN32 is defined on this compiler. AC_MSG_CHECKING(for WIN32) diff --git a/lib/Makefile.am b/lib/Makefile.am index 6f6cfe3..adc5168 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -10,10 +10,6 @@ AM_CFLAGS += -arch x86_64 -arch arm64 endif endif -if COMPILE_EPOLL -AM_CFLAGS += -DCOMPILE_EPOLL -endif - lib_LTLIBRARIES = libsearpc.la include_HEADERS = searpc-client.h searpc-server.h searpc-utils.h searpc.h searpc-named-pipe-transport.h diff --git a/lib/searpc-named-pipe-transport.c b/lib/searpc-named-pipe-transport.c index c3ac054..f9d0fb2 100644 --- a/lib/searpc-named-pipe-transport.c +++ b/lib/searpc-named-pipe-transport.c @@ -8,7 +8,7 @@ #include #include #include -#ifdef COMPILE_EPOLL +#ifdef __linux__ #include #include #endif @@ -158,24 +158,26 @@ int searpc_named_pipe_server_start(SearpcNamedPipeServer *server) goto failed; } -#ifdef COMPILE_EPOLL - int epoll_fd; - struct epoll_event event; +#ifdef __linux__ + if (server->use_epoll) { + int epoll_fd; + struct epoll_event event; - epoll_fd = epoll_create1(0); - if (epoll_fd < 0) { - g_warning ("failed to open an epoll file descriptor: %s\n", strerror(errno)); - goto failed; + epoll_fd = epoll_create1(0); + if (epoll_fd < 0) { + g_warning ("failed to open an epoll file descriptor: %s\n", strerror(errno)); + goto failed; + } + + event.events = EPOLLIN; + event.data.fd = pipe_fd; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fd, &event) == -1) { + g_warning ("failed to add pipe fd to epoll list: %s\n", strerror(errno)); + goto failed; + } + + server->epoll_fd = epoll_fd; } - - event.events = EPOLLIN; - event.data.fd = pipe_fd; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fd, &event) == -1) { - g_warning ("failed to add pipe fd to epoll list: %s\n", strerror(errno)); - goto failed; - } - - server->epoll_fd = epoll_fd; #endif server->pipe_fd = pipe_fd; @@ -193,22 +195,119 @@ failed: #endif } -#ifdef COMPILE_EPOLL typedef struct { - int connfd; - int len; -} EpollAux; + SearpcNamedPipe connfd; + char *buffer; + guint32 len; + guint32 offset; + SearpcNamedPipeServer *server; + gboolean use_epoll; +} ServerHandlerData; + +// EPOLL +#ifdef __linux__ + +// Since the socket fd is set to nonblocking, epoll_read_n supports returning read data. +// When errno is set to EAGAIN, it means that the client did not continue to send data, +// and it needs to continue to read when the socket is readable next time. +gssize +epoll_read_n(int fd, void *vptr, size_t n, int *err) +{ + size_t nleft; + gssize nread; + char *ptr; + + ptr = vptr; + nleft = n; + while (nleft > 0) { + if ( (nread = read(fd, ptr, nleft)) < 0) { + *err = errno; + if (*err == EINTR) + nread = 0; /* and call read() again */ + else if (*err == EAGAIN) + return (n-nleft); + else + return(-1); + } else if (nread == 0) + break; /* EOF */ + + nleft -= nread; + ptr += nread; + } + return(n - nleft); /* return >= 0 */ +} + +static void epoll_handler(void *data) +{ + ServerHandlerData *handler_data = data; + SearpcNamedPipe connfd = handler_data->connfd; + SearpcNamedPipeServer *server = handler_data->server; + const char *buf = handler_data->buffer; + guint32 len = handler_data->len; + char *ret_str = NULL; + int ret = 0; + + char *service, *body; + if (request_from_json (buf, len, &service, &body) < 0) { + ret = -1; + goto out; + } + + gsize ret_len; + ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len); + g_free (service); + g_free (body); + + len = (guint32)ret_len; + if (pipe_write_n(connfd, &len, sizeof(guint32)) < 0) { + ret = -1; + goto out; + } + + if (pipe_write_n(connfd, ret_str, ret_len) < 0) { + ret = -1; + goto out; + } + + g_free (handler_data->buffer); + handler_data->buffer = NULL; + handler_data->len = 0; + handler_data->offset = 0; + struct epoll_event event; + event.events = EPOLLIN | EPOLLRDHUP; + event.data.ptr = (void *)handler_data; + + if (epoll_ctl (server->epoll_fd, EPOLL_CTL_ADD, connfd, &event) == -1) { + ret = -1; + g_warning ("failed to add client fd to epoll list: %s\n", strerror(errno)); + goto out; + } + +out: + if (ret < 0) { + close (connfd); + g_free (handler_data->buffer); + g_free (handler_data); + } + g_free (ret_str); +} static int -read_client_request (int connfd, EpollAux *aux, char **req, guint32 *req_len) +read_client_request (int connfd, ServerHandlerData *data) { - guint32 len = aux->len; + guint32 len = data->len; char *buf; + int err = 0; + int n; // read length of request. if (len == 0) { - if (pipe_read_n(connfd, &len, sizeof(guint32)) < 0) { - g_warning("failed to read rpc request size: %s\n", strerror(errno)); + n = epoll_read_n(connfd, &len, sizeof(guint32), &err); + if (err == EAGAIN) { + return 0; + } + if (n < 0) { + g_warning("failed to read rpc request size: %s\n", strerror(err)); return -1; } if (len == 0) { @@ -216,41 +315,37 @@ read_client_request (int connfd, EpollAux *aux, char **req, guint32 *req_len) } } - buf = g_malloc (len); + data->len = len; + if (!data->buffer) { + data->buffer = g_new0 (char, len); + } // read content of request. - if (pipe_read_n(connfd, buf, len) < 0) { - g_free (buf); - if (errno == EAGAIN) { - aux->len = len; - return 0; - } + n = epoll_read_n(connfd, data->buffer + data->offset, len - data->offset, &err); + if (err == EAGAIN) { + data->offset += n; + return 0; + } + if (n < 0) { g_warning ("failed to read rpc request: %s\n", strerror(errno)); return -1; } - - *req = buf; - *req_len = len; - aux->len = 0; + data->offset += n; return 0; } -#endif -typedef struct { - SearpcNamedPipe connfd; -#ifdef COMPILE_EPOLL - char *req; - guint32 len; - SearpcNamedPipeServer *server; -#endif -} ServerHandlerData; +int set_nonblocking(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) { + return -1; + } + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} -static void* named_pipe_listen(void *arg) +static void +epoll_listen (SearpcNamedPipeServer *server) { - SearpcNamedPipeServer *server = arg; - -#ifdef COMPILE_EPOLL #define MAX_EVENTS 1000 struct epoll_event event; struct epoll_event events[MAX_EVENTS]; @@ -261,6 +356,9 @@ static void* named_pipe_listen(void *arg) while (1) { n_events = epoll_wait (server->epoll_fd, events, MAX_EVENTS, 1000); if (n_events <= 0) { + if (n_events < 0) { + g_warning ("Failed to call waits for events on the epoll: %s\n", strerror(errno)); + } continue; } for (i = 0; i < n_events; i++) { @@ -271,48 +369,58 @@ static void* named_pipe_listen(void *arg) // new client connection connfd = accept (server->pipe_fd, NULL, 0); if (connfd < 0) { - g_warning ("failed to accept new client connection: %s\n", strerror(errno)); + g_warning ("Failed to accept new client connection: %s\n", strerror(errno)); + continue; + } + + if (set_nonblocking(connfd) < 0) { + close(connfd); + g_warning ("Failed to set connection to noblocking.\n"); continue; } event.events = EPOLLIN | EPOLLRDHUP; - EpollAux *aux = g_new0 (EpollAux, 1); - aux->connfd = connfd; - event.data.ptr = (void *)aux; + ServerHandlerData *data = g_new0(ServerHandlerData, 1); + data->use_epoll = TRUE; + data->connfd = connfd; + data->server = server; + event.data.ptr = (void *)data; if (epoll_ctl (server->epoll_fd, EPOLL_CTL_ADD, connfd, &event) == -1) { - g_warning ("failed to add client fd to epoll list: %s\n", strerror(errno)); + g_warning ("Failed to add client fd to epoll list: %s\n", strerror(errno)); epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL); close (connfd); - g_free (aux); + g_free (data); continue; } g_message ("start to serve on pipe client\n"); } else { - EpollAux *aux = (EpollAux *)events[i].data.ptr; - connfd = aux->connfd; + ServerHandlerData *data = (ServerHandlerData *)events[i].data.ptr; + connfd = data->connfd; if (events[i].events & (EPOLLHUP | EPOLLRDHUP)) { - g_free (aux); + if (data->len > 0) + g_free (data->buffer); + g_free (data); epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL); close (connfd); continue; } - int req_len = 0; - char *req = NULL; - int rc = read_client_request (connfd, aux, &req, &req_len); + int rc = read_client_request (connfd, data); if (rc < 0) { - g_free (aux); + if (data->len) + g_free (data->buffer); + g_free (data); epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL); close (connfd); continue; - } else if (req_len == 0) { + } else if (data->len == 0) { + // Continue reading request length. + continue; + } else if (data->len != data->offset) { + // Continue reading request content. continue; } - ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData)); - data->connfd = connfd; - data->req = req; - data->len = req_len; - data->server = server; + epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL); if (server->named_pipe_server_thread_pool) { if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) { g_warning("The rpc server thread pool is full, the maximum number of threads is %d\n", server->pool_size); @@ -328,12 +436,25 @@ static void* named_pipe_listen(void *arg) } } } -#else +} +#endif + +static void* named_pipe_listen(void *arg) +{ + SearpcNamedPipeServer *server = arg; + #if !defined(WIN32) +#ifdef __linux__ + if (server->use_epoll) { + epoll_listen (server); + return NULL; + } +#endif while (1) { int connfd = accept (server->pipe_fd, NULL, 0); ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData)); data->connfd = connfd; + data->use_epoll = FALSE; if (server->named_pipe_server_thread_pool) { if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) { g_warning("The rpc server thread pool is full, the maximum number of threads is %d\n", server->pool_size); @@ -383,6 +504,7 @@ static void* named_pipe_listen(void *arg) ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData)); data->connfd = connfd; + data->use_epoll = FALSE; if (server->named_pipe_server_thread_pool) g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL); else { @@ -394,12 +516,18 @@ static void* named_pipe_listen(void *arg) } } #endif // !defined(WIN32) -#endif // !defined(COMPILE_EPOLL) return NULL; } static void* handle_named_pipe_client_with_thread(void *arg) { +#ifdef __linux__ + ServerHandlerData *handler_data = arg; + if (handler_data->use_epoll) { + epoll_handler (arg); + return NULL; + } +#endif named_pipe_client_handler(arg); return NULL; @@ -407,44 +535,16 @@ static void* handle_named_pipe_client_with_thread(void *arg) static void handle_named_pipe_client_with_threadpool(void *data, void *user_data) { +#ifdef __linux__ + ServerHandlerData *handler_data = data; + if (handler_data->use_epoll) { + epoll_handler (data); + return; + } +#endif named_pipe_client_handler(data); } -#ifdef COMPILE_EPOLL -static void named_pipe_client_handler(void *data) -{ - ServerHandlerData *handler_data = data; - SearpcNamedPipe connfd = handler_data->connfd; - SearpcNamedPipeServer *server = handler_data->server; - const char *buf = handler_data->req; - guint32 len = handler_data->len; - char *ret_str = NULL; - - char *service, *body; - if (request_from_json (buf, len, &service, &body) < 0) { - goto out; - } - - gsize ret_len; - ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len); - g_free (service); - g_free (body); - - len = (guint32)ret_len; - if (pipe_write_n(connfd, &len, sizeof(guint32)) < 0) { - goto out; - } - - if (pipe_write_n(connfd, ret_str, ret_len) < 0) { - goto out; - } - -out: - g_free (ret_str); - g_free (handler_data->req); - g_free (data); -} -#else static void named_pipe_client_handler(void *data) { ServerHandlerData *handler_data = data; @@ -513,7 +613,6 @@ static void named_pipe_client_handler(void *data) g_free (data); g_free (buf); } -#endif int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client) { diff --git a/lib/searpc-named-pipe-transport.h b/lib/searpc-named-pipe-transport.h index 89c73fd..2bd085f 100644 --- a/lib/searpc-named-pipe-transport.h +++ b/lib/searpc-named-pipe-transport.h @@ -41,9 +41,8 @@ struct _SearpcNamedPipeServer { char path[4096]; pthread_t listener_thread; SearpcNamedPipe pipe_fd; -#ifdef COMPILE_EPOLL + gboolean use_epoll; int epoll_fd; -#endif GThreadPool *named_pipe_server_thread_pool; int pool_size; };