1
0
mirror of https://github.com/haiwen/libsearpc.git synced 2025-08-22 22:25:44 +00:00

Compare commits

..

No commits in common. "master" and "v3.1.0" have entirely different histories.

32 changed files with 218 additions and 950 deletions

View File

@ -1,9 +1,5 @@
sudo: false sudo: false
language: python language: python
python:
- "2.7"
- "3.5"
- "3.6"
compiler: compiler:
- gcc - gcc
- clang - clang
@ -11,8 +7,6 @@ addons:
apt: apt:
packages: packages:
- libjansson-dev - libjansson-dev
install:
- pip install future
before_install: before_install:
- git clean -x -f - git clean -x -f
- ./autogen.sh - ./autogen.sh

View File

@ -18,5 +18,12 @@ endif
SUBDIRS = lib pysearpc ${MAKE_DEMO} tests SUBDIRS = lib pysearpc ${MAKE_DEMO} tests
install-data-local:
if MACOS
sed -i '' -e "s|(DESTDIR)|${DESTDIR}|g" $(pcfiles)
else
${SED} -i "s|(DESTDIR)|${DESTDIR}|g" $(pcfiles)
endif
dist-hook: dist-hook:
git log -1 > $(distdir)/latest_commit git log -1 > $(distdir)/latest_commit

View File

@ -42,15 +42,6 @@ AC_ARG_ENABLE(server-pkg,
AC_HELP_STRING([--enable-server-pkg], [enable static compile]), AC_HELP_STRING([--enable-server-pkg], [enable static compile]),
[server_pkg=$enableval],[server_pkg="no"]) [server_pkg=$enableval],[server_pkg="no"])
# option: compile-universal
# default: no
AC_ARG_ENABLE([compile-universal],
[AS_HELP_STRING([--enable-compile-universal],
[compile seafile universal @<:@default: no@:>@])],
[compile_universal=${enableval}], [compile_universal=no])
AM_CONDITIONAL([COMPILE_UNIVERSAL], [test x${compile_universal} = xyes])
dnl - check if the macro WIN32 is defined on this compiler. dnl - check if the macro WIN32 is defined on this compiler.
AC_MSG_CHECKING(for WIN32) AC_MSG_CHECKING(for WIN32)
@ -78,23 +69,6 @@ fi
AM_CONDITIONAL([MACOS], [test "$bmac" = "yes"]) AM_CONDITIONAL([MACOS], [test "$bmac" = "yes"])
AC_SUBST(MACOS) AC_SUBST(MACOS)
AC_MSG_CHECKING(for FreeBSD)
if test "$(uname -s)" = "FreeBSD"; then
bfbsd=yes
fi
if test x$bfbsd = "xyes"; then
server_pkg=no
AC_MSG_RESULT(compile in FreeBSD)
fi
AM_CONDITIONAL([FBSD], [test "$bfbsd" = "yes"])
AC_SUBST(FBSD)
AC_ARG_WITH([python3], [AS_HELP_STRING([--with-python3], [use python3])],
[with_python3="yes"],[])
# Checks for libraries. # Checks for libraries.
GLIB_REQUIRED=2.26.0 GLIB_REQUIRED=2.26.0
@ -110,12 +84,7 @@ PKG_CHECK_MODULES(JANSSON, [jansson >= $JANSSON_REQUIRED])
AC_SUBST(JANSSON_CFLAGS) AC_SUBST(JANSSON_CFLAGS)
AC_SUBST(JANSSON_LIBS) AC_SUBST(JANSSON_LIBS)
if test "$with_python3" = "yes"; then AM_PATH_PYTHON([2.4])
AM_PATH_PYTHON([3.5])
else
AM_PATH_PYTHON([2.7])
fi
if test "$bwin32" = true; then if test "$bwin32" = true; then
if test x$PYTHON_DIR != x; then if test x$PYTHON_DIR != x; then
# set pyexecdir to somewhere like /c/Python26/Lib/site-packages # set pyexecdir to somewhere like /c/Python26/Lib/site-packages

6
debian/changelog vendored
View File

@ -1,9 +1,3 @@
libsearpc1 (3.2.0) unstable; urgency=low
* new upstream release
-- Jonathan Xu <jonathan.xu@seafile.com> Thu, 24 Oct 2019 11:05:10 +0800
libsearpc1 (3.1.0) unstable; urgency=low libsearpc1 (3.1.0) unstable; urgency=low
* new upstream release * new upstream release

7
debian/control vendored
View File

@ -4,11 +4,10 @@ Priority: extra
Maintainer: m.eik michalke <meik.michalke@hhu.de> Maintainer: m.eik michalke <meik.michalke@hhu.de>
Build-Depends: Build-Depends:
debhelper (>= 7), debhelper (>= 7),
dh-python,
autotools-dev, autotools-dev,
intltool, intltool,
libglib2.0-dev, libglib2.0-dev,
python3 (>= 3.5), python,
libtool, libtool,
libjansson-dev libjansson-dev
Standards-Version: 3.9.5 Standards-Version: 3.9.5
@ -31,7 +30,7 @@ Section: libdevel
Architecture: any Architecture: any
Depends: Depends:
${misc:Depends}, ${misc:Depends},
python3 (>= 3.5), python (>= 2.7),
libsearpc1 (= ${binary:Version}) libsearpc1 (= ${binary:Version})
Conflicts: seafile Conflicts: seafile
Description: Development files for the libsearpc1 package. Description: Development files for the libsearpc1 package.
@ -50,7 +49,7 @@ Package: python-searpc
Section: python Section: python
Multi-Arch: foreign Multi-Arch: foreign
Architecture: all Architecture: all
Depends: ${python3:Depends}, Depends: ${python:Depends},
${shlibs:Depends}, ${shlibs:Depends},
${misc:Depends} ${misc:Depends}
Description: simple and easy-to-use C language RPC framework Description: simple and easy-to-use C language RPC framework

View File

@ -1 +1 @@
usr/lib/python3*/site-packages/pysearpc/*.py usr/lib/python3/dist-packages/pysearpc usr/lib/python*/dist-packages/pysearpc/*.py

4
debian/rules vendored
View File

@ -2,11 +2,11 @@
# -*- makefile -*- # -*- makefile -*-
%: %:
dh $@ --with python3 --with autotools_dev --builddirectory=build dh $@ --with python2 --with autotools_dev --builddirectory=build
override_dh_auto_configure: override_dh_auto_configure:
./autogen.sh ./autogen.sh
dh_auto_configure -- --disable-compile-demo --with-python3 dh_auto_configure -- --disable-compile-demo
override_dh_strip: override_dh_strip:
# emptying the dependency_libs field in .la files # emptying the dependency_libs field in .la files

View File

@ -4,7 +4,6 @@
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <unistd.h>
#include <errno.h> #include <errno.h>
#ifdef WIN32 #ifdef WIN32

View File

@ -4,8 +4,6 @@
#include <glib.h> #include <glib.h>
#include <glib-object.h> #include <glib-object.h>
GType test_object_get_type (void);
#define TEST_OBJECT_TYPE (test_object_get_type()) #define TEST_OBJECT_TYPE (test_object_get_type())
#define TEST_OBJECT(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), TEST_OBJECT_TYPE, TestObject)) #define TEST_OBJECT(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), TEST_OBJECT_TYPE, TestObject))
#define IS_TEST_OBJCET(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), TEST_OBJCET_TYPE)) #define IS_TEST_OBJCET(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), TEST_OBJCET_TYPE))

View File

@ -2,13 +2,8 @@
AM_CFLAGS = @GLIB_CFLAGS@ \ AM_CFLAGS = @GLIB_CFLAGS@ \
@JANSSON_CFLAGS@ \ @JANSSON_CFLAGS@ \
-I${top_builddir}/lib \ -I${top_builddir}/lib \
-I${top_srcdir}/lib -I${top_srcdir}/lib \
-DG_LOG_DOMAIN=\"Searpc\"
if MACOS
if COMPILE_UNIVERSAL
AM_CFLAGS += -arch x86_64 -arch arm64
endif
endif
lib_LTLIBRARIES = libsearpc.la lib_LTLIBRARIES = libsearpc.la

View File

@ -36,7 +36,7 @@ static void clean_objlist(GList *list)
SearpcClient * SearpcClient *
searpc_client_new (void) searpc_client_new ()
{ {
return g_new0 (SearpcClient, 1); return g_new0 (SearpcClient, 1);
} }

View File

@ -5,16 +5,6 @@
#include <glib-object.h> #include <glib-object.h>
#include <jansson.h> #include <jansson.h>
#ifdef __cplusplus
extern "C" {
#endif
#ifdef LIBSEARPC_EXPORTS
#define LIBSEARPC_API __declspec(dllexport)
#else
#define LIBSEARPC_API
#endif
#ifndef DFT_DOMAIN #ifndef DFT_DOMAIN
#define DFT_DOMAIN g_quark_from_string(G_LOG_DOMAIN) #define DFT_DOMAIN g_quark_from_string(G_LOG_DOMAIN)
#endif #endif
@ -41,88 +31,85 @@ struct _SearpcClient {
void *async_arg; void *async_arg;
}; };
typedef struct _SearpcClient LIBSEARPC_API SearpcClient; typedef struct _SearpcClient SearpcClient;
LIBSEARPC_API SearpcClient *searpc_client_new ();
SearpcClient *searpc_client_new (void);
LIBSEARPC_API void void searpc_client_free (SearpcClient *client);
searpc_client_free (SearpcClient *client);
LIBSEARPC_API void void
searpc_client_call (SearpcClient *client, const char *fname, searpc_client_call (SearpcClient *client, const char *fname,
const char *ret_type, GType gobject_type, const char *ret_type, GType gobject_type,
void *ret_ptr, GError **error, void *ret_ptr, GError **error,
int n_params, ...); int n_params, ...);
LIBSEARPC_API int int
searpc_client_call__int (SearpcClient *client, const char *fname, searpc_client_call__int (SearpcClient *client, const char *fname,
GError **error, int n_params, ...); GError **error, int n_params, ...);
LIBSEARPC_API gint64 gint64
searpc_client_call__int64 (SearpcClient *client, const char *fname, searpc_client_call__int64 (SearpcClient *client, const char *fname,
GError **error, int n_params, ...); GError **error, int n_params, ...);
LIBSEARPC_API char * char *
searpc_client_call__string (SearpcClient *client, const char *fname, searpc_client_call__string (SearpcClient *client, const char *fname,
GError **error, int n_params, ...); GError **error, int n_params, ...);
LIBSEARPC_API GObject * GObject *
searpc_client_call__object (SearpcClient *client, const char *fname, searpc_client_call__object (SearpcClient *client, const char *fname,
GType object_type, GType object_type,
GError **error, int n_params, ...); GError **error, int n_params, ...);
LIBSEARPC_API GList* GList*
searpc_client_call__objlist (SearpcClient *client, const char *fname, searpc_client_call__objlist (SearpcClient *client, const char *fname,
GType object_type, GType object_type,
GError **error, int n_params, ...); GError **error, int n_params, ...);
LIBSEARPC_API json_t * json_t *
searpc_client_call__json (SearpcClient *client, const char *fname, searpc_client_call__json (SearpcClient *client, const char *fname,
GError **error, int n_params, ...); GError **error, int n_params, ...);
LIBSEARPC_API char* char* searpc_client_transport_send (SearpcClient *client,
searpc_client_transport_send (SearpcClient *client, const gchar *fcall_str,
const gchar *fcall_str, size_t fcall_len,
size_t fcall_len, size_t *ret_len);
size_t *ret_len);
LIBSEARPC_API int int
searpc_client_async_call__int (SearpcClient *client, searpc_client_async_call__int (SearpcClient *client,
const char *fname, const char *fname,
AsyncCallback callback, void *cbdata, AsyncCallback callback, void *cbdata,
int n_params, ...); int n_params, ...);
LIBSEARPC_API int int
searpc_client_async_call__int64 (SearpcClient *client, searpc_client_async_call__int64 (SearpcClient *client,
const char *fname, const char *fname,
AsyncCallback callback, void *cbdata, AsyncCallback callback, void *cbdata,
int n_params, ...); int n_params, ...);
LIBSEARPC_API int int
searpc_client_async_call__string (SearpcClient *client, searpc_client_async_call__string (SearpcClient *client,
const char *fname, const char *fname,
AsyncCallback callback, void *cbdata, AsyncCallback callback, void *cbdata,
int n_params, ...); int n_params, ...);
LIBSEARPC_API int int
searpc_client_async_call__object (SearpcClient *client, searpc_client_async_call__object (SearpcClient *client,
const char *fname, const char *fname,
AsyncCallback callback, AsyncCallback callback,
GType object_type, void *cbdata, GType object_type, void *cbdata,
int n_params, ...); int n_params, ...);
LIBSEARPC_API int int
searpc_client_async_call__objlist (SearpcClient *client, searpc_client_async_call__objlist (SearpcClient *client,
const char *fname, const char *fname,
AsyncCallback callback, AsyncCallback callback,
GType object_type, void *cbdata, GType object_type, void *cbdata,
int n_params, ...); int n_params, ...);
LIBSEARPC_API int int
searpc_client_async_call__json (SearpcClient *client, searpc_client_async_call__json (SearpcClient *client,
const char *fname, const char *fname,
AsyncCallback callback, void *cbdata, AsyncCallback callback, void *cbdata,
@ -131,7 +118,7 @@ searpc_client_async_call__json (SearpcClient *client,
/* called by the transport layer, the rpc layer should be able to /* called by the transport layer, the rpc layer should be able to
* modify the str, but not take ownership of it */ * modify the str, but not take ownership of it */
LIBSEARPC_API int int
searpc_client_generic_callback (char *retstr, size_t len, searpc_client_generic_callback (char *retstr, size_t len,
void *vdata, const char *errstr); void *vdata, const char *errstr);
@ -141,9 +128,5 @@ searpc_client_generic_callback (char *retstr, size_t len,
#define TRANSPORT_ERROR "Transport Error" #define TRANSPORT_ERROR "Transport Error"
#define TRANSPORT_ERROR_CODE 500 #define TRANSPORT_ERROR_CODE 500
#ifdef __cplusplus
}
#endif
#endif #endif

View File

@ -139,7 +139,7 @@ def generate_marshal_register_item(ret_type, arg_types):
signature_name=signature_name) signature_name=signature_name)
def gen_marshal_register_function(f): def gen_marshal_register_function(f):
write_file(f, "static void register_marshals(void)""") write_file(f, "static void register_marshals()""")
write_file(f, "{") write_file(f, "{")
for item in func_table: for item in func_table:
write_file(f, generate_marshal_register_item(item[0], item[1])) write_file(f, generate_marshal_register_item(item[0], item[1]))
@ -147,7 +147,7 @@ def gen_marshal_register_function(f):
signature_template = r""" signature_template = r"""
inline static gchar * inline static gchar *
${signature_name}(void) ${signature_name}()
{ {
return searpc_compute_signature (${args}); return searpc_compute_signature (${args});
} }

View File

@ -8,13 +8,8 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
#include <unistd.h> #include <unistd.h>
#ifdef __linux__
#include <fcntl.h>
#include <sys/epoll.h>
#endif
#endif // !defined(WIN32) #endif // !defined(WIN32)
#include <glib.h>
#include <glib/gstdio.h> #include <glib/gstdio.h>
#include <jansson.h> #include <jansson.h>
@ -37,9 +32,7 @@ static char* formatErrorMessage();
#endif // defined(WIN32) #endif // defined(WIN32)
static void* named_pipe_listen(void *arg); static void* named_pipe_listen(void *arg);
static void* handle_named_pipe_client_with_thread (void *arg); static void* named_pipe_client_handler(void *arg);
static void handle_named_pipe_client_with_threadpool(void *data, void *user_data);
static void named_pipe_client_handler (void *data);
static char* searpc_named_pipe_send(void *arg, const gchar *fcall_str, size_t fcall_len, size_t *ret_len); static char* searpc_named_pipe_send(void *arg, const gchar *fcall_str, size_t fcall_len, size_t *ret_len);
static char * request_to_json(const char *service, const char *fcall_str, size_t fcall_len); static char * request_to_json(const char *service, const char *fcall_str, size_t fcall_len);
@ -47,8 +40,8 @@ static int request_from_json (const char *content, size_t len, char **service, c
static void json_object_set_string_member (json_t *object, const char *key, const char *value); static void json_object_set_string_member (json_t *object, const char *key, const char *value);
static const char * json_object_get_string_member (json_t *object, const char *key); static const char * json_object_get_string_member (json_t *object, const char *key);
static gssize pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n); static ssize_t pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n);
static gssize pipe_read_n(SearpcNamedPipe fd, void *vptr, size_t n); static ssize_t pipe_read_n(SearpcNamedPipe fd, void *vptr, size_t n);
typedef struct { typedef struct {
SearpcNamedPipeClient* client; SearpcNamedPipeClient* client;
@ -81,33 +74,6 @@ SearpcNamedPipeServer* searpc_create_named_pipe_server(const char *path)
{ {
SearpcNamedPipeServer *server = g_malloc0(sizeof(SearpcNamedPipeServer)); SearpcNamedPipeServer *server = g_malloc0(sizeof(SearpcNamedPipeServer));
memcpy(server->path, path, strlen(path) + 1); memcpy(server->path, path, strlen(path) + 1);
return server;
}
SearpcNamedPipeServer* searpc_create_named_pipe_server_with_threadpool (const char *path, int named_pipe_server_thread_pool_size)
{
GError *error = NULL;
SearpcNamedPipeServer *server = g_malloc0(sizeof(SearpcNamedPipeServer));
memcpy(server->path, path, strlen(path) + 1);
server->pool_size = named_pipe_server_thread_pool_size;
server->named_pipe_server_thread_pool = g_thread_pool_new (handle_named_pipe_client_with_threadpool,
NULL,
named_pipe_server_thread_pool_size,
FALSE,
&error);
if (!server->named_pipe_server_thread_pool) {
if (error) {
g_warning ("Falied to create named pipe server thread pool : %s\n", error->message);
g_clear_error (&error);
} else {
g_warning ("Falied to create named pipe server thread pool.\n");
}
g_free (server);
return NULL;
}
return server; return server;
} }
@ -133,7 +99,7 @@ int searpc_named_pipe_server_start(SearpcNamedPipeServer *server)
} }
if (g_file_test (un_path, G_FILE_TEST_EXISTS)) { if (g_file_test (un_path, G_FILE_TEST_EXISTS)) {
g_message ("socket file exists, delete it anyway\n"); g_debug ("socket file exists, delete it anyway\n");
if (g_unlink (un_path) < 0) { if (g_unlink (un_path) < 0) {
g_warning ("delete socket file failed : %s\n", strerror(errno)); g_warning ("delete socket file failed : %s\n", strerror(errno));
goto failed; goto failed;
@ -158,28 +124,6 @@ int searpc_named_pipe_server_start(SearpcNamedPipeServer *server)
goto failed; goto failed;
} }
#ifdef __linux__
if (server->use_epoll) {
int epoll_fd;
struct epoll_event event;
epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
g_warning ("failed to open an epoll file descriptor: %s\n", strerror(errno));
goto failed;
}
event.events = EPOLLIN;
event.data.fd = pipe_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fd, &event) == -1) {
g_warning ("failed to add pipe fd to epoll list: %s\n", strerror(errno));
goto failed;
}
server->epoll_fd = epoll_fd;
}
#endif
server->pipe_fd = pipe_fd; server->pipe_fd = pipe_fd;
#endif // !defined(WIN32) #endif // !defined(WIN32)
@ -196,187 +140,26 @@ failed:
} }
typedef struct { typedef struct {
SearpcNamedPipe connfd;
SearpcNamedPipeServer *server; SearpcNamedPipeServer *server;
gboolean use_epoll; SearpcNamedPipe connfd;
} ServerHandlerData; } ServerHandlerData;
// EPOLL
#ifdef __linux__
static void epoll_handler(void *data)
{
ServerHandlerData *handler_data = data;
SearpcNamedPipe connfd = handler_data->connfd;
SearpcNamedPipeServer *server = handler_data->server;
char *buf = NULL;
guint32 len = 0;
char *ret_str = NULL;
int ret = 0;
int n;
if (pipe_read_n(connfd, &len, sizeof(guint32)) < 0) {
g_warning("failed to read rpc request size: %s\n", strerror(errno));
ret = -1;
goto out;
}
if (len <= 0) {
ret = -1;
goto out;
}
buf = g_new0 (char, len);
n = pipe_read_n (connfd, buf, len);
if (n < 0) {
g_warning ("failed to read rpc request: %s\n", strerror(errno));
ret = -1;
goto out;
}
char *service, *body;
if (request_from_json (buf, len, &service, &body) < 0) {
ret = -1;
goto out;
}
gsize ret_len;
ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len);
g_free (service);
g_free (body);
len = (guint32)ret_len;
if (pipe_write_n (connfd, &len, sizeof(guint32)) < 0) {
ret = -1;
goto out;
}
if (pipe_write_n (connfd, ret_str, ret_len) < 0) {
ret = -1;
goto out;
}
struct epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = (void *)handler_data;
if (epoll_ctl (server->epoll_fd, EPOLL_CTL_ADD, connfd, &event) == -1) {
ret = -1;
g_warning ("failed to add client fd to epoll list: %s\n", strerror(errno));
goto out;
}
out:
g_free (buf);
if (ret < 0) {
close (connfd);
g_free (handler_data);
}
g_free (ret_str);
}
static void
epoll_listen (SearpcNamedPipeServer *server)
{
#define MAX_EVENTS 1000
struct epoll_event event;
struct epoll_event events[MAX_EVENTS];
int connfd;
int n_events;
int i;
while (1) {
n_events = epoll_wait (server->epoll_fd, events, MAX_EVENTS, 1000);
if (n_events <= 0) {
if (n_events < 0) {
g_warning ("Failed to call waits for events on the epoll: %s\n", strerror(errno));
}
continue;
}
for (i = 0; i < n_events; i++) {
if (events[i].data.fd == server->pipe_fd) {
if (!(events[i].events & EPOLLIN)) {
continue;
}
// new client connection
connfd = accept (server->pipe_fd, NULL, 0);
if (connfd < 0) {
g_warning ("Failed to accept new client connection: %s\n", strerror(errno));
continue;
}
event.events = EPOLLIN | EPOLLRDHUP;
ServerHandlerData *data = g_new0(ServerHandlerData, 1);
data->use_epoll = TRUE;
data->connfd = connfd;
data->server = server;
event.data.ptr = (void *)data;
if (epoll_ctl (server->epoll_fd, EPOLL_CTL_ADD, connfd, &event) == -1) {
g_warning ("Failed to add client fd to epoll list: %s\n", strerror(errno));
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
close (connfd);
g_free (data);
continue;
}
} else {
ServerHandlerData *data = (ServerHandlerData *)events[i].data.ptr;
connfd = data->connfd;
if (events[i].events & (EPOLLHUP | EPOLLRDHUP)) {
g_free (data);
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
close (connfd);
continue;
}
// After socket is readable, remove the socket from the epoll.
// After the worker finishes processing the current request, we will add the socket back into the epoll.
epoll_ctl(server->epoll_fd, EPOLL_CTL_DEL, connfd, NULL);
if (server->named_pipe_server_thread_pool) {
if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) {
g_warning("The rpc server thread pool is full, the maximum number of threads is %d\n", server->pool_size);
}
g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL);
} else {
pthread_t handler;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&handler, &attr, handle_named_pipe_client_with_thread, data);
}
}
}
}
}
#endif
static void* named_pipe_listen(void *arg) static void* named_pipe_listen(void *arg)
{ {
SearpcNamedPipeServer *server = arg; SearpcNamedPipeServer *server = arg;
#if !defined(WIN32) #if !defined(WIN32)
#ifdef __linux__
if (server->use_epoll) {
epoll_listen (server);
return NULL;
}
#endif
while (1) { while (1) {
int connfd = accept (server->pipe_fd, NULL, 0); int connfd = accept (server->pipe_fd, NULL, 0);
pthread_t *handler = g_malloc(sizeof(pthread_t));
ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData)); ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData));
data->server = server;
data->connfd = connfd; data->connfd = connfd;
data->use_epoll = FALSE; // TODO(low priority): Instead of using a thread to handle each client,
if (server->named_pipe_server_thread_pool) { // use select(unix)/iocp(windows) to do it.
if (g_thread_pool_get_num_threads (server->named_pipe_server_thread_pool) >= server->pool_size) { pthread_create(handler, NULL, named_pipe_client_handler, data);
g_warning("The rpc server thread pool is full, the maximum number of threads is %d\n", server->pool_size); server->handlers = g_list_append(server->handlers, handler);
}
g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL);
} else {
pthread_t handler;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&handler, &attr, handle_named_pipe_client_with_thread, data);
}
} }
#else // !defined(WIN32) #else // !defined(WIN32)
while (1) { while (1) {
HANDLE connfd = INVALID_HANDLE_VALUE; HANDLE connfd = INVALID_HANDLE_VALUE;
@ -409,79 +192,53 @@ static void* named_pipe_listen(void *arg)
break; break;
} }
/* g_debug ("Accepted a named pipe client\n"); */ g_debug ("Accepted a named pipe client\n");
pthread_t *handler = g_malloc(sizeof(pthread_t));
ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData)); ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData));
data->server = server;
data->connfd = connfd; data->connfd = connfd;
data->use_epoll = FALSE; // TODO(low priority): Instead of using a thread to handle each client,
if (server->named_pipe_server_thread_pool) // use select(unix)/iocp(windows) to do it.
g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL); pthread_create(handler, NULL, named_pipe_client_handler, data);
else { server->handlers = g_list_append(server->handlers, handler);
pthread_t handler;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&handler, &attr, handle_named_pipe_client_with_thread, data);
}
} }
#endif // !defined(WIN32) #endif // !defined(WIN32)
return NULL; return NULL;
} }
static void* handle_named_pipe_client_with_thread(void *arg) static void* named_pipe_client_handler(void *arg)
{ {
#ifdef __linux__ ServerHandlerData *data = arg;
ServerHandlerData *handler_data = arg; // SearpcNamedPipeServer *server = data->server;
if (handler_data->use_epoll) { SearpcNamedPipe connfd = data->connfd;
epoll_handler (arg);
return NULL;
}
#endif
named_pipe_client_handler(arg);
return NULL; size_t len;
} size_t bufsize = 4096;
static void handle_named_pipe_client_with_threadpool(void *data, void *user_data)
{
#ifdef __linux__
ServerHandlerData *handler_data = data;
if (handler_data->use_epoll) {
epoll_handler (data);
return;
}
#endif
named_pipe_client_handler(data);
}
static void named_pipe_client_handler(void *data)
{
ServerHandlerData *handler_data = data;
SearpcNamedPipe connfd = handler_data->connfd;
guint32 len;
guint32 bufsize = 4096;
char *buf = g_malloc(bufsize); char *buf = g_malloc(bufsize);
g_debug ("start to serve on pipe client\n");
while (1) { while (1) {
len = 0; len = 0;
if (pipe_read_n(connfd, &len, sizeof(guint32)) < 0) { if (pipe_read_n(connfd, &len, sizeof(uint32_t)) < 0) {
g_warning("failed to read rpc request size: %s\n", strerror(errno)); g_warning("failed to read rpc request size: %s", strerror(errno));
break; break;
} }
if (len == 0) { if (len == 0) {
/* g_debug("EOF reached, pipe connection lost"); */ g_debug("EOF reached, pipe connection lost");
break; break;
} }
while (bufsize < len) { while (bufsize < len) {
bufsize *= 2; bufsize *= 2;
buf = g_realloc(buf, bufsize); buf = realloc(buf, bufsize);
} }
if (pipe_read_n(connfd, buf, len) < 0 || len == 0) { if (pipe_read_n(connfd, buf, len) < 0 || len == 0) {
g_warning("failed to read rpc request: %s\n", strerror(errno)); g_warning("failed to read rpc request: %s", strerror(errno));
g_free (buf);
break; break;
} }
@ -490,20 +247,19 @@ static void named_pipe_client_handler(void *data)
break; break;
} }
gsize ret_len; size_t ret_len;
char *ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len); char *ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len);
g_free (service); g_free (service);
g_free (body); g_free (body);
len = (guint32)ret_len; if (pipe_write_n(connfd, &ret_len, sizeof(uint32_t)) < 0) {
if (pipe_write_n(connfd, &len, sizeof(guint32)) < 0) { g_warning("failed to send rpc response(%s): %s", ret_str, strerror(errno));
g_warning("failed to send rpc response(%s): %s\n", ret_str, strerror(errno));
g_free (ret_str); g_free (ret_str);
break; break;
} }
if (pipe_write_n(connfd, ret_str, ret_len) < 0) { if (pipe_write_n(connfd, ret_str, ret_len) < 0) {
g_warning("failed to send rpc response: %s\n", strerror(errno)); g_warning("failed to send rpc response: %s", strerror(errno));
g_free (ret_str); g_free (ret_str);
break; break;
} }
@ -517,10 +273,11 @@ static void named_pipe_client_handler(void *data)
DisconnectNamedPipe(connfd); DisconnectNamedPipe(connfd);
CloseHandle(connfd); CloseHandle(connfd);
#endif // !defined(WIN32) #endif // !defined(WIN32)
g_free (data);
g_free (buf); return NULL;
} }
int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client) int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client)
{ {
#if !defined(WIN32) #if !defined(WIN32)
@ -531,39 +288,29 @@ int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client)
g_strlcpy (servaddr.sun_path, client->path, sizeof(servaddr.sun_path)); g_strlcpy (servaddr.sun_path, client->path, sizeof(servaddr.sun_path));
if (connect(client->pipe_fd, (struct sockaddr *)&servaddr, (socklen_t)sizeof(servaddr)) < 0) { if (connect(client->pipe_fd, (struct sockaddr *)&servaddr, (socklen_t)sizeof(servaddr)) < 0) {
g_warning ("pipe client failed to connect to server: %s\n", strerror(errno)); g_warning ("pipe client failed to connect to server: %s\n", strerror(errno));
close(client->pipe_fd);
return -1; return -1;
} }
#else // !defined(WIN32) #else // !defined(WIN32)
SearpcNamedPipe pipe_fd; SearpcNamedPipe pipe_fd;
pipe_fd = CreateFile(
client->path, // pipe name
GENERIC_READ | // read and write access
GENERIC_WRITE,
0, // no sharing
NULL, // default security attributes
OPEN_EXISTING, // opens existing pipe
0, // default attributes
NULL); // no template file
for (;;) { if (pipe_fd == INVALID_HANDLE_VALUE) {
pipe_fd = CreateFile( G_WARNING_WITH_LAST_ERROR("Failed to connect to named pipe");
client->path, // pipe name return -1;
GENERIC_READ | // read and write access
GENERIC_WRITE,
0, // no sharing
NULL, // default security attributes
OPEN_EXISTING, // opens existing pipe
0, // default attributes
NULL); // no template file
if (pipe_fd != INVALID_HANDLE_VALUE) {
break;
}
/* wait with default timeout (approx. 50ms) */
if (GetLastError() != ERROR_PIPE_BUSY || !WaitNamedPipe(client->path, NMPWAIT_USE_DEFAULT_WAIT)) {
G_WARNING_WITH_LAST_ERROR("Failed to connect to named pipe");
return -1;
}
} }
DWORD mode = PIPE_READMODE_MESSAGE; DWORD mode = PIPE_READMODE_MESSAGE;
if (!SetNamedPipeHandleState(pipe_fd, &mode, NULL, NULL)) { if (!SetNamedPipeHandleState(pipe_fd, &mode, NULL, NULL)) {
G_WARNING_WITH_LAST_ERROR("Failed to set named pipe mode"); G_WARNING_WITH_LAST_ERROR("Failed to set named pipe mode");
CloseHandle (pipe_fd);
return -1; return -1;
} }
@ -571,7 +318,7 @@ int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client)
#endif // !defined(WIN32) #endif // !defined(WIN32)
/* g_debug ("pipe client connected to server\n"); */ g_debug ("pipe client connected to server\n");
return 0; return 0;
} }
@ -585,7 +332,6 @@ void searpc_free_client_with_pipe_transport (SearpcClient *client)
close(pipe_client->pipe_fd); close(pipe_client->pipe_fd);
#endif #endif
g_free (pipe_client); g_free (pipe_client);
g_free (data->service);
g_free (data); g_free (data);
searpc_client_free (client); searpc_client_free (client);
} }
@ -593,36 +339,37 @@ void searpc_free_client_with_pipe_transport (SearpcClient *client)
char *searpc_named_pipe_send(void *arg, const gchar *fcall_str, char *searpc_named_pipe_send(void *arg, const gchar *fcall_str,
size_t fcall_len, size_t *ret_len) size_t fcall_len, size_t *ret_len)
{ {
/* g_debug ("searpc_named_pipe_send is called\n"); */ g_debug ("searpc_named_pipe_send is called\n");
ClientTransportData *data = arg; ClientTransportData *data = arg;
SearpcNamedPipeClient *client = data->client; SearpcNamedPipeClient *client = data->client;
char *json_str = request_to_json(data->service, fcall_str, fcall_len); char *json_str = request_to_json(data->service, fcall_str, fcall_len);
guint32 len = (guint32)strlen(json_str); size_t json_len = strlen(json_str);
if (pipe_write_n(client->pipe_fd, &len, sizeof(guint32)) < 0) { uint32_t len = json_len;
g_warning("failed to send rpc call: %s\n", strerror(errno)); if (pipe_write_n(client->pipe_fd, &len, sizeof(uint32_t)) < 0) {
g_warning("failed to send rpc call: %s", strerror(errno));
free (json_str); free (json_str);
return NULL; return NULL;
} }
if (pipe_write_n(client->pipe_fd, json_str, len) < 0) { if (pipe_write_n(client->pipe_fd, json_str, json_len) < 0) {
g_warning("failed to send rpc call: %s\n", strerror(errno)); g_warning("failed to send rpc call: %s", strerror(errno));
free (json_str); free (json_str);
return NULL; return NULL;
} }
free (json_str); free (json_str);
if (pipe_read_n(client->pipe_fd, &len, sizeof(guint32)) < 0) { if (pipe_read_n(client->pipe_fd, &len, sizeof(uint32_t)) < 0) {
g_warning("failed to read rpc response: %s\n", strerror(errno)); g_warning("failed to read rpc response: %s", strerror(errno));
return NULL; return NULL;
} }
char *buf = g_malloc(len); char *buf = g_malloc(len);
if (pipe_read_n(client->pipe_fd, buf, len) < 0) { if (pipe_read_n(client->pipe_fd, buf, len) < 0) {
g_warning("failed to read rpc response: %s\n", strerror(errno)); g_warning("failed to read rpc response: %s", strerror(errno));
g_free (buf); g_free (buf);
return NULL; return NULL;
} }
@ -690,11 +437,11 @@ json_object_get_string_member (json_t *object, const char *key)
#if !defined(WIN32) #if !defined(WIN32)
// Write "n" bytes to a descriptor. // Write "n" bytes to a descriptor.
gssize ssize_t
pipe_write_n(int fd, const void *vptr, size_t n) pipe_write_n(int fd, const void *vptr, size_t n)
{ {
size_t nleft; size_t nleft;
gssize nwritten; ssize_t nwritten;
const char *ptr; const char *ptr;
ptr = vptr; ptr = vptr;
@ -715,11 +462,11 @@ pipe_write_n(int fd, const void *vptr, size_t n)
} }
// Read "n" bytes from a descriptor. // Read "n" bytes from a descriptor.
gssize ssize_t
pipe_read_n(int fd, void *vptr, size_t n) pipe_read_n(int fd, void *vptr, size_t n)
{ {
size_t nleft; size_t nleft;
gssize nread; ssize_t nread;
char *ptr; char *ptr;
ptr = vptr; ptr = vptr;
@ -741,7 +488,7 @@ pipe_read_n(int fd, void *vptr, size_t n)
#else // !defined(WIN32) #else // !defined(WIN32)
gssize pipe_read_n (SearpcNamedPipe fd, void *vptr, size_t n) ssize_t pipe_read_n (SearpcNamedPipe fd, void *vptr, size_t n)
{ {
DWORD bytes_read; DWORD bytes_read;
BOOL success = ReadFile( BOOL success = ReadFile(
@ -762,7 +509,7 @@ gssize pipe_read_n (SearpcNamedPipe fd, void *vptr, size_t n)
return n; return n;
} }
gssize pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n) ssize_t pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n)
{ {
DWORD bytes_written; DWORD bytes_written;
BOOL success = WriteFile( BOOL success = WriteFile(
@ -773,14 +520,38 @@ gssize pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n)
NULL); // not overlapped I/O NULL); // not overlapped I/O
if (!success || bytes_written != (DWORD)n) { if (!success || bytes_written != (DWORD)n) {
G_WARNING_WITH_LAST_ERROR("failed to write to named pipe"); G_WARNING_WITH_LAST_ERROR("failed to read command from the pipe");
return -1;
} }
FlushFileBuffers(fd); FlushFileBuffers(fd);
return 0; return 0;
} }
static char *locale_to_utf8 (const gchar *src)
{
if (!src)
return NULL;
gsize bytes_read = 0;
gsize bytes_written = 0;
GError *error = NULL;
gchar *dst = NULL;
dst = g_locale_to_utf8
(src, /* locale specific string */
strlen(src), /* len of src */
&bytes_read, /* length processed */
&bytes_written, /* output length */
&error);
if (error) {
return NULL;
}
return dst;
}
// http://stackoverflow.com/questions/3006229/get-a-text-from-the-error-code-returns-from-the-getlasterror-function // http://stackoverflow.com/questions/3006229/get-a-text-from-the-error-code-returns-from-the-getlasterror-function
// The caller is responsible to free the returned message. // The caller is responsible to free the returned message.
char* formatErrorMessage() char* formatErrorMessage()
@ -793,12 +564,11 @@ char* formatErrorMessage()
FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM, FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM,
NULL, NULL,
error_code, error_code,
/* EN_US */ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
MAKELANGID(LANG_ENGLISH, 0x01),
buf, buf,
sizeof(buf) - 1, sizeof(buf) - 1,
NULL); NULL);
return g_strdup(buf); return locale_to_utf8(buf);
} }
#endif // !defined(WIN32) #endif // !defined(WIN32)

View File

@ -10,16 +10,6 @@
#include <windows.h> #include <windows.h>
#endif #endif
#ifdef __cplusplus
extern "C" {
#endif
#ifdef LIBSEARPC_EXPORTS
#define LIBSEARPC_API __declspec(dllexport)
#else
#define LIBSEARPC_API
#endif
// Implementatin of a searpc transport based on named pipe. It uses unix domain // Implementatin of a searpc transport based on named pipe. It uses unix domain
// sockets on linux/osx, and named pipes on windows. // sockets on linux/osx, and named pipes on windows.
// //
@ -40,22 +30,14 @@ typedef int SearpcNamedPipe;
struct _SearpcNamedPipeServer { struct _SearpcNamedPipeServer {
char path[4096]; char path[4096];
pthread_t listener_thread; pthread_t listener_thread;
GList *handlers;
SearpcNamedPipe pipe_fd; SearpcNamedPipe pipe_fd;
gboolean use_epoll;
int epoll_fd;
GThreadPool *named_pipe_server_thread_pool;
int pool_size;
}; };
typedef struct _SearpcNamedPipeServer LIBSEARPC_API SearpcNamedPipeServer; typedef struct _SearpcNamedPipeServer SearpcNamedPipeServer;
LIBSEARPC_API
SearpcNamedPipeServer* searpc_create_named_pipe_server(const char *path); SearpcNamedPipeServer* searpc_create_named_pipe_server(const char *path);
LIBSEARPC_API
SearpcNamedPipeServer* searpc_create_named_pipe_server_with_threadpool(const char *path, int named_pipe_server_thread_pool_size);
LIBSEARPC_API
int searpc_named_pipe_server_start(SearpcNamedPipeServer *server); int searpc_named_pipe_server_start(SearpcNamedPipeServer *server);
// Client side interface. // Client side interface.
@ -65,22 +47,14 @@ struct _SearpcNamedPipeClient {
SearpcNamedPipe pipe_fd; SearpcNamedPipe pipe_fd;
}; };
typedef struct _SearpcNamedPipeClient LIBSEARPC_API SearpcNamedPipeClient; typedef struct _SearpcNamedPipeClient SearpcNamedPipeClient;
LIBSEARPC_API
SearpcNamedPipeClient* searpc_create_named_pipe_client(const char *path); SearpcNamedPipeClient* searpc_create_named_pipe_client(const char *path);
LIBSEARPC_API
SearpcClient * searpc_client_with_named_pipe_transport(SearpcNamedPipeClient *client, const char *service); SearpcClient * searpc_client_with_named_pipe_transport(SearpcNamedPipeClient *client, const char *service);
LIBSEARPC_API
int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client); int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client);
LIBSEARPC_API
void searpc_free_client_with_pipe_transport (SearpcClient *client); void searpc_free_client_with_pipe_transport (SearpcClient *client);
#ifdef __cplusplus
}
#endif
#endif // SEARPC_NAMED_PIPE_TRANSPORT_H #endif // SEARPC_NAMED_PIPE_TRANSPORT_H

View File

@ -9,10 +9,8 @@
#include "searpc-server.h" #include "searpc-server.h"
#include "searpc-utils.h" #include "searpc-utils.h"
#ifdef __linux__ #ifdef PROFILE
#include <sys/time.h> #include <sys/time.h>
#include <sys/errno.h>
#include <pthread.h>
#endif #endif
struct FuncItem; struct FuncItem;
@ -36,14 +34,6 @@ typedef struct {
static GHashTable *marshal_table; static GHashTable *marshal_table;
static GHashTable *service_table; static GHashTable *service_table;
#ifdef __linux__
static FILE *slow_log_fp = NULL;
static gint64 slow_threshold;
static GList *filtered_funcs;
static pthread_mutex_t slow_log_lock;
static gboolean log_to_stdout = FALSE;
#endif
static void static void
func_item_free (FuncItem *item) func_item_free (FuncItem *item)
{ {
@ -146,10 +136,7 @@ searpc_set_objlist_to_ret_object (json_t *object, GList *ret)
void void
searpc_set_json_to_ret_object (json_t *object, json_t *ret) searpc_set_json_to_ret_object (json_t *object, json_t *ret)
{ {
if (ret == NULL) json_object_set_new (object, "ret", ret);
json_object_set_new(object, "ret", json_null ());
else
json_object_set_new (object, "ret", ret);
} }
char * char *
@ -198,67 +185,8 @@ searpc_server_init (RegisterMarshalFunc register_func)
register_func (); register_func ();
} }
#ifdef __linux__
int
searpc_server_init_with_slow_log (RegisterMarshalFunc register_func,
const char *slow_log_path,
gint64 slow_threshold_in,
GList *filtered_funcs_in)
{
const char *log_to_stdout_env = g_getenv("SEAFILE_LOG_TO_STDOUT");
if (g_strcmp0(log_to_stdout_env, "true") == 0) {
slow_log_fp = stdout;
log_to_stdout = TRUE;
} else if (slow_log_path) {
slow_log_fp = fopen (slow_log_path, "a+");
if (!slow_log_fp) {
g_warning ("Failed to open RPC slow log file %s: %s\n", slow_log_path, strerror(errno));
return -1;
}
}
slow_threshold = slow_threshold_in;
filtered_funcs = filtered_funcs_in;
pthread_mutex_init (&slow_log_lock, NULL);
searpc_server_init (register_func);
return 0;
}
int
searpc_server_reopen_slow_log (const char *slow_log_path)
{
FILE *fp, *oldfp;
if (log_to_stdout) {
return 0;
}
if ((fp = fopen (slow_log_path, "a+")) == NULL) {
g_warning ("Failed to open RPC slow log file %s\n", slow_log_path);
return -1;
}
pthread_mutex_lock (&slow_log_lock);
oldfp = slow_log_fp;
slow_log_fp = fp;
pthread_mutex_unlock (&slow_log_lock);
if (fclose(oldfp) < 0) {
g_warning ("Failed to close old RPC slow log file\n");
return -1;
}
return 0;
}
#endif
void void
searpc_server_final(void) searpc_server_final()
{ {
g_hash_table_destroy (service_table); g_hash_table_destroy (service_table);
g_hash_table_destroy (marshal_table); g_hash_table_destroy (marshal_table);
@ -316,53 +244,6 @@ searpc_server_register_function (const char *svc_name,
return TRUE; return TRUE;
} }
#ifdef __linux__
static gboolean
rpc_include_passwd (const char *fname) {
GList *ptr;
char *rpc_name;
for (ptr = filtered_funcs; ptr; ptr = ptr->next) {
rpc_name = ptr->data;
if (g_strcmp0 (fname, rpc_name) == 0) {
return TRUE;
}
}
return FALSE;
}
static void
print_slow_log_if_necessary (const char *svc_name, const char *func, gsize len,
const struct timeval *start,
const struct timeval *intv)
{
char time_buf[64];
gint64 intv_in_usec = ((gint64)intv->tv_sec) * G_USEC_PER_SEC + (gint64)intv->tv_usec;
gint64 intv_in_msec = intv_in_usec/1000;
double intv_in_sec = ((double)intv_in_usec)/G_USEC_PER_SEC;
if (intv_in_msec < slow_threshold)
return;
strftime(time_buf, 64, "%Y/%m/%d %H:%M:%S", localtime(&start->tv_sec));
pthread_mutex_lock (&slow_log_lock);
if (log_to_stdout) {
fprintf (slow_log_fp, "[seafile-slow-rpc] ");
}
fprintf (slow_log_fp, "[%s] \"%s\" %.*s %.3f\n",
time_buf, svc_name, (int)len, func, intv_in_sec);
fflush (slow_log_fp);
pthread_mutex_unlock (&slow_log_lock);
}
#endif
/* Called by RPC transport. */ /* Called by RPC transport. */
char* char*
searpc_server_call_function (const char *svc_name, searpc_server_call_function (const char *svc_name,
@ -374,12 +255,10 @@ searpc_server_call_function (const char *svc_name,
json_error_t jerror; json_error_t jerror;
GError *error = NULL; GError *error = NULL;
#ifdef __linux__ #ifdef PROFILE
struct timeval start, end, intv; struct timeval start, end, intv;
if (slow_log_fp) { gettimeofday(&start, NULL);
gettimeofday(&start, NULL);
}
#endif #endif
service = g_hash_table_lookup (service_table, svc_name); service = g_hash_table_lookup (service_table, svc_name);
@ -411,14 +290,11 @@ searpc_server_call_function (const char *svc_name,
ret = fitem->marshal->mfunc (fitem->func, array, ret_len); ret = fitem->marshal->mfunc (fitem->func, array, ret_len);
#ifdef __linux__ #ifdef PROFILE
if (slow_log_fp) { gettimeofday(&end, NULL);
if (!filtered_funcs || !rpc_include_passwd (fitem->fname)) { timersub(&end, &start, &intv);
gettimeofday(&end, NULL); g_debug ("[searpc] Time spend in call %s: %ds %dus\n",
timersub(&end, &start, &intv); fname, intv.tv_sec, intv.tv_usec);
print_slow_log_if_necessary (svc_name, func, len, &start, &intv);
}
}
#endif #endif
json_decref(array); json_decref(array);

View File

@ -5,16 +5,6 @@
#include <glib-object.h> #include <glib-object.h>
#include <jansson.h> #include <jansson.h>
#ifdef __cplusplus
extern "C" {
#endif
#ifdef LIBSEARPC_EXPORTS
#define LIBSEARPC_API __declspec(dllexport)
#else
#define LIBSEARPC_API
#endif
#ifndef DFT_DOMAIN #ifndef DFT_DOMAIN
#define DFT_DOMAIN g_quark_from_string(G_LOG_DOMAIN) #define DFT_DOMAIN g_quark_from_string(G_LOG_DOMAIN)
#endif #endif
@ -23,17 +13,11 @@ typedef gchar* (*SearpcMarshalFunc) (void *func, json_t *param_array,
gsize *ret_len); gsize *ret_len);
typedef void (*RegisterMarshalFunc) (void); typedef void (*RegisterMarshalFunc) (void);
LIBSEARPC_API
void searpc_set_string_to_ret_object (json_t *object, char *ret); void searpc_set_string_to_ret_object (json_t *object, char *ret);
LIBSEARPC_API
void searpc_set_int_to_ret_object (json_t *object, json_int_t ret); void searpc_set_int_to_ret_object (json_t *object, json_int_t ret);
LIBSEARPC_API
void searpc_set_object_to_ret_object (json_t *object, GObject *ret); void searpc_set_object_to_ret_object (json_t *object, GObject *ret);
LIBSEARPC_API
void searpc_set_objlist_to_ret_object (json_t *object, GList *ret); void searpc_set_objlist_to_ret_object (json_t *object, GList *ret);
LIBSEARPC_API
void searpc_set_json_to_ret_object (json_t *object, json_t *ret); void searpc_set_json_to_ret_object (json_t *object, json_t *ret);
LIBSEARPC_API
char *searpc_marshal_set_ret_common (json_t *object, gsize *len, GError *error); char *searpc_marshal_set_ret_common (json_t *object, gsize *len, GError *error);
/** /**
@ -41,33 +25,14 @@ char *searpc_marshal_set_ret_common (json_t *object, gsize *len, GError *error);
* *
* Inititalize searpc server. * Inititalize searpc server.
*/ */
LIBSEARPC_API
void searpc_server_init (RegisterMarshalFunc register_func); void searpc_server_init (RegisterMarshalFunc register_func);
/**
* searpc_server_init_with_slow_log:
*
* Inititalize searpc server with slow log file.
*/
LIBSEARPC_API int
searpc_server_init_with_slow_log (RegisterMarshalFunc register_func,
const char *slow_log_path,
gint64 slow_threshold_in,
GList *filtered_funcs_in);
/**
* Used in log rotate.
*/
LIBSEARPC_API int
searpc_server_reopen_slow_log (const char *slow_log_path);
/** /**
* searpc_server_final: * searpc_server_final:
* *
* Free the server structure. * Free the server structure.
*/ */
LIBSEARPC_API void searpc_server_final ();
void searpc_server_final (void);
/** /**
* searpc_create_service: * searpc_create_service:
@ -77,7 +42,6 @@ void searpc_server_final (void);
* *
* @svc_name: Service name. * @svc_name: Service name.
*/ */
LIBSEARPC_API
int searpc_create_service (const char *svc_name); int searpc_create_service (const char *svc_name);
/** /**
@ -85,7 +49,6 @@ int searpc_create_service (const char *svc_name);
* *
* Remove the service from the server. * Remove the service from the server.
*/ */
LIBSEARPC_API
void searpc_remove_service (const char *svc_name); void searpc_remove_service (const char *svc_name);
/** /**
@ -96,7 +59,6 @@ void searpc_remove_service (const char *svc_name);
* @signature: the signature of the marshal, register_marshal() will take * @signature: the signature of the marshal, register_marshal() will take
* owner of this string. * owner of this string.
*/ */
LIBSEARPC_API
gboolean searpc_server_register_marshal (gchar *signature, gboolean searpc_server_register_marshal (gchar *signature,
SearpcMarshalFunc marshal); SearpcMarshalFunc marshal);
@ -108,7 +70,6 @@ gboolean searpc_server_register_marshal (gchar *signature,
* @signature: the signature of the function, register_function() will take * @signature: the signature of the function, register_function() will take
* owner of this string. * owner of this string.
*/ */
LIBSEARPC_API
gboolean searpc_server_register_function (const char *service, gboolean searpc_server_register_function (const char *service,
void* func, void* func,
const gchar *fname, const gchar *fname,
@ -125,7 +86,6 @@ gboolean searpc_server_register_function (const char *service,
* *
* Returns the serialized representatio of the returned value. * Returns the serialized representatio of the returned value.
*/ */
LIBSEARPC_API
gchar *searpc_server_call_function (const char *service, gchar *searpc_server_call_function (const char *service,
gchar *func, gsize len, gsize *ret_len); gchar *func, gsize len, gsize *ret_len);
@ -136,11 +96,6 @@ gchar *searpc_server_call_function (const char *service,
* *
* Compute function signature. * Compute function signature.
*/ */
LIBSEARPC_API
char* searpc_compute_signature (const gchar *ret_type, int pnum, ...); char* searpc_compute_signature (const gchar *ret_type, int pnum, ...);
#ifdef __cplusplus
}
#endif
#endif #endif

View File

@ -1,4 +1,5 @@
#include <errno.h> #include <errno.h>
#include <unistd.h>
#include <glib.h> #include <glib.h>
#include <glib-object.h> #include <glib-object.h>
#include <jansson.h> #include <jansson.h>

View File

@ -2,23 +2,15 @@
#include <glib-object.h> #include <glib-object.h>
#include <jansson.h> #include <jansson.h>
#ifdef LIBSEARPC_EXPORTS
#define LIBSEARPC_API __declspec(dllexport)
#else
#define LIBSEARPC_API
#endif
#define SEARPC_JSON_DOMAIN g_quark_from_string("SEARPC_JSON") #define SEARPC_JSON_DOMAIN g_quark_from_string("SEARPC_JSON")
typedef enum { typedef enum {
SEARPC_JSON_ERROR_LOAD, SEARPC_JSON_ERROR_LOAD,
SEARPC_JSON_ERROR_PACK, SEARPC_JSON_ERROR_PACK,
SEARPC_JSON_ERROR_UPACK SEARPC_JSON_ERROR_UPACK
} LIBSEARPC_API SEARPCJSONERROR; } SEARPCJSONERROR;
LIBSEARPC_API
json_t *json_gobject_serialize (GObject *); json_t *json_gobject_serialize (GObject *);
LIBSEARPC_API
GObject *json_gobject_deserialize (GType , json_t *); GObject *json_gobject_deserialize (GType , json_t *);
inline static void setjetoge(const json_error_t *jerror, GError **error) inline static void setjetoge(const json_error_t *jerror, GError **error)

View File

@ -1,4 +1,4 @@
prefix=@prefix@ prefix=(DESTDIR)@prefix@
exec_prefix=@exec_prefix@ exec_prefix=@exec_prefix@
libdir=@libdir@ libdir=@libdir@
includedir=@includedir@ includedir=@includedir@
@ -8,4 +8,4 @@ Description: Simple C rpc library
Version: @VERSION@ Version: @VERSION@
Libs: -L${libdir} -lsearpc Libs: -L${libdir} -lsearpc
Cflags: -I${includedir} -I${includedir}/searpc Cflags: -I${includedir} -I${includedir}/searpc
Requires: gobject-2.0 >= 2.26.0 gio-2.0 jansson >= 2.2.1 Requires: gobject-2.0 gio-2.0 jansson

View File

@ -1,31 +0,0 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29215.179
MinimumVisualStudioVersion = 10.0.40219.1
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "libsearpc", "libsearpc.vcxproj", "{7E7BDAA2-D61E-466D-B138-CC2454CAB1D3}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
Debug|x86 = Debug|x86
Release|x64 = Release|x64
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{7E7BDAA2-D61E-466D-B138-CC2454CAB1D3}.Debug|x64.ActiveCfg = Debug|x64
{7E7BDAA2-D61E-466D-B138-CC2454CAB1D3}.Debug|x64.Build.0 = Debug|x64
{7E7BDAA2-D61E-466D-B138-CC2454CAB1D3}.Debug|x86.ActiveCfg = Debug|Win32
{7E7BDAA2-D61E-466D-B138-CC2454CAB1D3}.Debug|x86.Build.0 = Debug|Win32
{7E7BDAA2-D61E-466D-B138-CC2454CAB1D3}.Release|x64.ActiveCfg = Release|x64
{7E7BDAA2-D61E-466D-B138-CC2454CAB1D3}.Release|x64.Build.0 = Release|x64
{7E7BDAA2-D61E-466D-B138-CC2454CAB1D3}.Release|x86.ActiveCfg = Release|Win32
{7E7BDAA2-D61E-466D-B138-CC2454CAB1D3}.Release|x86.Build.0 = Release|Win32
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {393C05AB-0CCA-4180-93EF-57D9CB079042}
EndGlobalSection
EndGlobal

View File

@ -1,137 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x64">
<Configuration>Release</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<VCProjectVersion>16.0</VCProjectVersion>
<ProjectGuid>{7E7BDAA2-D61E-466D-B138-CC2454CAB1D3}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<WindowsTargetPlatformVersion>10.0</WindowsTargetPlatformVersion>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="Shared">
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>true</LinkIncremental>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<OutDir>$(ProjectDir)$(Platform)\$(Configuration)\</OutDir>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" />
<PropertyGroup Label="Vcpkg">
<VcpkgEnableManifest>true</VcpkgEnableManifest>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;LIBSEARPC_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<RuntimeLibrary>MultiThreadedDebugDLL</RuntimeLibrary>
<WarningLevel>Level3</WarningLevel>
<DebugInformationFormat>ProgramDatabase</DebugInformationFormat>
<Optimization>Disabled</Optimization>
</ClCompile>
<Link>
<TargetMachine>MachineX86</TargetMachine>
<GenerateDebugInformation>true</GenerateDebugInformation>
<SubSystem>Windows</SubSystem>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<RuntimeLibrary>MultiThreadedDLL</RuntimeLibrary>
<WarningLevel>Level3</WarningLevel>
<DebugInformationFormat>ProgramDatabase</DebugInformationFormat>
</ClCompile>
<Link>
<TargetMachine>MachineX86</TargetMachine>
<GenerateDebugInformation>true</GenerateDebugInformation>
<SubSystem>Windows</SubSystem>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
<PreprocessorDefinitions>LIBSEARPC_EXPORTS;WIN32</PreprocessorDefinitions>
<AdditionalIncludeDirectories>$(ProjectDir)vcpkg_installed\x64-windows\x64-windows\include\glib-2.0;$(ProjectDir)vcpkg_installed\x64-windows\x64-windows\lib\glib-2.0\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
<PreprocessorDefinitions>LIBSEARPC_EXPORTS;WIN32</PreprocessorDefinitions>
<AdditionalIncludeDirectories>$(ProjectDir)vcpkg_installed\x64-windows\x64-windows\include\glib-2.0;$(ProjectDir)vcpkg_installed\x64-windows\x64-windows\lib\glib-2.0\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="lib\searpc-client.c" />
<ClCompile Include="lib\searpc-named-pipe-transport.c" />
<ClCompile Include="lib\searpc-server.c" />
<ClCompile Include="lib\searpc-utils.c" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="config.h" />
<ClInclude Include="lib\searpc-client.h" />
<ClInclude Include="lib\searpc-named-pipe-transport.h" />
<ClInclude Include="lib\searpc-server.h" />
<ClInclude Include="lib\searpc-utils.h" />
<ClInclude Include="lib\searpc.h" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>

View File

@ -1,5 +1,5 @@
import json import json
from .common import SearpcError from common import SearpcError
def _fret_int(ret_str): def _fret_int(ret_str):
try: try:
@ -7,10 +7,10 @@ def _fret_int(ret_str):
except: except:
raise SearpcError('Invalid response format') raise SearpcError('Invalid response format')
if 'err_code' in dicts: if dicts.has_key('err_code'):
raise SearpcError(dicts['err_msg']) raise SearpcError(dicts['err_msg'])
if 'ret' in dicts: if dicts.has_key('ret'):
return dicts['ret'] return dicts['ret']
else: else:
raise SearpcError('Invalid response format') raise SearpcError('Invalid response format')
@ -21,10 +21,10 @@ def _fret_string(ret_str):
except: except:
raise SearpcError('Invalid response format') raise SearpcError('Invalid response format')
if 'err_code' in dicts: if dicts.has_key('err_code'):
raise SearpcError(dicts['err_msg']) raise SearpcError(dicts['err_msg'])
if 'ret' in dicts: if dicts.has_key('ret'):
return dicts['ret'] return dicts['ret']
else: else:
raise SearpcError('Invalid response format') raise SearpcError('Invalid response format')
@ -61,7 +61,7 @@ def _fret_obj(ret_str):
except: except:
raise SearpcError('Invalid response format') raise SearpcError('Invalid response format')
if 'err_code' in dicts: if dicts.has_key('err_code'):
raise SearpcError(dicts['err_msg']) raise SearpcError(dicts['err_msg'])
if dicts['ret']: if dicts['ret']:
@ -75,7 +75,7 @@ def _fret_objlist(ret_str):
except: except:
raise SearpcError('Invalid response format') raise SearpcError('Invalid response format')
if 'err_code' in dicts: if dicts.has_key('err_code'):
raise SearpcError(dicts['err_msg']) raise SearpcError(dicts['err_msg'])
l = [] l = []
@ -85,19 +85,6 @@ def _fret_objlist(ret_str):
return l return l
def _fret_json(ret_str):
try:
dicts = json.loads(ret_str)
except:
raise SearpcError('Invalid response format')
if 'err_code' in dicts:
raise SearpcError(dicts['err_msg'])
if dicts['ret']:
return dicts['ret']
else:
return None
def searpc_func(ret_type, param_types): def searpc_func(ret_type, param_types):
@ -114,8 +101,6 @@ def searpc_func(ret_type, param_types):
fret = _fret_int fret = _fret_int
elif ret_type == "string": elif ret_type == "string":
fret = _fret_string fret = _fret_string
elif ret_type == "json":
fret = _fret_json
else: else:
raise SearpcError('Invial return type') raise SearpcError('Invial return type')

View File

@ -8,7 +8,6 @@ import os
import socket import socket
import struct import struct
from threading import Thread from threading import Thread
import queue
from .client import SearpcClient from .client import SearpcClient
from .server import searpc_server from .server import searpc_server
@ -37,67 +36,52 @@ class NamedPipeTransport(SearpcTransport):
def __init__(self, socket_path): def __init__(self, socket_path):
self.socket_path = socket_path self.socket_path = socket_path
self.pipe = None self.pipe_fd = None
def connect(self): def connect(self):
self.pipe = socket.socket(socket.AF_UNIX) self.pipe_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
self.pipe.connect(self.socket_path) make_socket_closeonexec(self.pipe_fd)
self.pipe_fd.connect(self.socket_path)
def stop(self): def stop(self):
if self.pipe: if self.pipe_fd:
self.pipe.close() self.pipe_fd.close()
self.pipe = None self.pipe_fd = None
def send(self, service, fcall_str): def send(self, service, fcall_str):
body = json.dumps({ body = json.dumps({
'service': service, 'service': service,
'request': fcall_str, 'request': fcall_str,
}) })
body_utf8 = body.encode(encoding='utf-8')
# "I" for unsiged int # "I" for unsiged int
header = struct.pack('=I', len(body_utf8)) header = struct.pack('I', len(body))
sendall(self.pipe, header) sendall(self.pipe_fd, header)
sendall(self.pipe, body_utf8) sendall(self.pipe_fd, body)
resp_header = recvall(self.pipe, 4) resp_header = recvall(self.pipe_fd, 4)
# logger.info('resp_header is %s', resp_header) # logger.info('resp_header is %s', resp_header)
resp_size, = struct.unpack('=I', resp_header) resp_size, = struct.unpack('I', resp_header)
# logger.info('resp_size is %s', resp_size) # logger.info('resp_size is %s', resp_size)
resp = recvall(self.pipe, resp_size) resp = recvall(self.pipe_fd, resp_size)
# logger.info('resp is %s', resp) # logger.info('resp is %s', resp)
return resp.decode(encoding='utf-8') return resp
class NamedPipeClient(SearpcClient): class NamedPipeClient(SearpcClient):
def __init__(self, socket_path, service_name, pool_size=5): def __init__(self, socket_path, service_name):
self.socket_path = socket_path self.socket_path = socket_path
self.service_name = service_name self.service_name = service_name
self.pool_size = pool_size self.transport = NamedPipeTransport(socket_path)
self._pool = queue.Queue(pool_size) self.connected = False
def _create_transport(self): def stop(self):
transport = NamedPipeTransport(self.socket_path) self.transport.stop()
transport.connect()
return transport
def _get_transport(self):
try:
transport = self._pool.get(False)
except:
transport = self._create_transport()
return transport
def _return_transport(self, transport):
try:
self._pool.put(transport, False)
except queue.Full:
transport.stop()
def call_remote_func_sync(self, fcall_str): def call_remote_func_sync(self, fcall_str):
transport = self._get_transport() if not self.connected:
ret_str = transport.send(self.service_name, fcall_str) self.transport.connect()
self._return_transport(transport) self.connected = True
return ret_str return self.transport.send(self.service_name, fcall_str)
class NamedPipeServer(object): class NamedPipeServer(object):
@ -107,7 +91,7 @@ class NamedPipeServer(object):
""" """
def __init__(self, socket_path): def __init__(self, socket_path):
self.socket_path = socket_path self.socket_path = socket_path
self.pipe = None self.pipe_fd = None
self.thread = Thread(target=self.accept_loop) self.thread = Thread(target=self.accept_loop)
self.thread.setDaemon(True) self.thread.setDaemon(True)
@ -127,40 +111,40 @@ class NamedPipeServer(object):
'Failed to remove existing unix socket {}'. 'Failed to remove existing unix socket {}'.
format(self.socket_path) format(self.socket_path)
) )
self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) self.pipe_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
make_socket_closeonexec(self.pipe) make_socket_closeonexec(self.pipe_fd)
self.pipe.bind(self.socket_path) self.pipe_fd.bind(self.socket_path)
self.pipe.listen(10) self.pipe_fd.listen(10)
logger.info('Server now listening at %s', self.socket_path) logger.info('Server now listening at %s', self.socket_path)
def accept_loop(self): def accept_loop(self):
logger.info('Waiting for clients') logger.info('Waiting for clients')
while True: while True:
connfd, _ = self.pipe.accept() connfd, _ = self.pipe_fd.accept()
logger.info('New pip client') logger.info('New pip client')
t = PipeHandlerThread(connfd) t = PipeHandlerThread(connfd)
t.start() t.start()
class PipeHandlerThread(Thread): class PipeHandlerThread(Thread):
def __init__(self, pipe): def __init__(self, pipe_fd):
Thread.__init__(self) Thread.__init__(self)
self.setDaemon(True) self.setDaemon(True)
self.pipe = pipe self.pipe_fd = pipe_fd
def run(self): def run(self):
while True: while True:
req_header = recvall(self.pipe, 4) req_header = recvall(self.pipe_fd, 4)
# logger.info('Got req header %s', req_header) # logger.info('Got req header %s', req_header)
req_size, = struct.unpack('I', req_header) req_size, = struct.unpack('I', req_header)
# logger.info('req size is %s', req_size) # logger.info('req size is %s', req_size)
req = recvall(self.pipe, req_size) req = recvall(self.pipe_fd, req_size)
# logger.info('req is %s', req) # logger.info('req is %s', req)
data = json.loads(req.decode(encoding='utf-8')) data = json.loads(req)
resp = searpc_server.call_function(data['service'], data['request']) resp = searpc_server.call_function(data['service'], data['request'])
# logger.info('resp is %s', resp) # logger.info('resp is %s', resp)
resp_header = struct.pack('I', len(resp)) resp_header = struct.pack('I', len(resp))
sendall(self.pipe, resp_header) sendall(self.pipe_fd, resp_header)
sendall(self.pipe, resp.encode(encoding='utf-8')) sendall(self.pipe_fd, resp)

View File

@ -1,5 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
import string import string
import sys import sys
@ -28,7 +29,7 @@ def gen_fcall_funcs_array(arg_types):
pyfuncname = "fcall__" + "_".join(arg_types) pyfuncname = "fcall__" + "_".join(arg_types)
tmplist = [] tmplist = []
for arg in arg_types: for arg in arg_types:
tmplist.append(arg.capitalize()) tmplist.append(string.capitalize(arg))
cfuncname = "SearpcClient_Fcall__" + "_".join(tmplist) cfuncname = "SearpcClient_Fcall__" + "_".join(tmplist)
@ -46,7 +47,7 @@ def gen_fret_funcs_array(ret_type):
cfuncname = "SearpcClient_Fret__Void" cfuncname = "SearpcClient_Fret__Void"
else: else:
pyfuncname = "fret__" + ret_type pyfuncname = "fret__" + ret_type
cfuncname = "SearpcClient_Fret__" + ret_type.capitalize() cfuncname = "SearpcClient_Fret__" + string.capitalize(ret_type)
return string.Template(func_item_template)\ return string.Template(func_item_template)\
.substitute(pyfuncname=pyfuncname, .substitute(pyfuncname=pyfuncname,
@ -78,8 +79,8 @@ def gen_module_funcs_array():
array = fcall_array array = fcall_array
array += fret_array array += fret_array
print(string.Template(module_func_array_template)\ print string.Template(module_func_array_template)\
.substitute(array=array)) .substitute(array=array)
type_table = { type_table = {
@ -127,7 +128,7 @@ def gen_fcall_func(arg_types):
tmplist = [] tmplist = []
for arg in arg_types: for arg in arg_types:
tmplist.append(arg.capitalize()) tmplist.append(string.capitalize(arg))
Suffix = "_".join(tmplist) Suffix = "_".join(tmplist)
suffix = "_".join(arg_types) suffix = "_".join(arg_types)
def_args = "" def_args = ""
@ -160,7 +161,7 @@ def gen_fcall_list():
arg_types_list.append(item[1]) arg_types_list.append(item[1])
for item in arg_types_list: for item in arg_types_list:
print(gen_fcall_func(item)) print gen_fcall_func(item)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,6 +1,6 @@
import json import json
from .common import SearpcError from common import SearpcError
class SearpcService(object): class SearpcService(object):
def __init__(self, name): def __init__(self, name):
@ -25,7 +25,7 @@ class SearpcServer(object):
"""input str -> output str""" """input str -> output str"""
try: try:
argv = json.loads(fcallstr) argv = json.loads(fcallstr)
except Exception as e: except Exception, e:
raise SearpcError('bad call str: ' + str(e)) raise SearpcError('bad call str: ' + str(e))
service = self.services[svcname] service = self.services[svcname]
@ -41,7 +41,7 @@ class SearpcServer(object):
def call_function(self, svcname, fcallstr): def call_function(self, svcname, fcallstr):
try: try:
retVal = self._call_function(svcname, fcallstr) retVal = self._call_function(svcname, fcallstr)
except Exception as e: except Exception, e:
ret = {'err_code': 555, 'err_msg': str(e)} ret = {'err_code': 555, 'err_msg': str(e)}
else: else:
ret = {'ret': retVal} ret = {'ret': retVal}

View File

@ -22,14 +22,6 @@ def init_server():
searpc_server.create_service(SVCNAME) searpc_server.create_service(SVCNAME)
searpc_server.register_function(SVCNAME, add, 'add') searpc_server.register_function(SVCNAME, add, 'add')
searpc_server.register_function(SVCNAME, mul, 'multi') searpc_server.register_function(SVCNAME, mul, 'multi')
searpc_server.register_function(SVCNAME, json_func, 'json_func')
searpc_server.register_function(SVCNAME, get_str, 'get_str')
def json_func(a, b):
return {'a': a, 'b': b}
def get_str():
return u'这是一个测试'
class DummyTransport(SearpcTransport): class DummyTransport(SearpcTransport):
@ -48,14 +40,6 @@ class RpcMixin(object):
def multi(self, x, y): def multi(self, x, y):
pass pass
@searpc_func("json", ["string", "int"])
def json_func(self, x, y):
pass
@searpc_func("string", [])
def get_str(self):
pass
class DummyRpcClient(SearpcClient, RpcMixin): class DummyRpcClient(SearpcClient, RpcMixin):
def __init__(self): def __init__(self):
self.transport = DummyTransport() self.transport = DummyTransport()
@ -82,6 +66,7 @@ class SearpcTest(unittest.TestCase):
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
cls.named_pipe_client.stop()
cls.named_pipe_server.stop() cls.named_pipe_server.stop()
def test_normal_transport(self): def test_normal_transport(self):
@ -101,12 +86,6 @@ class SearpcTest(unittest.TestCase):
v = client.multi('abc', 2) v = client.multi('abc', 2)
self.assertEqual(v, 'abcabc') self.assertEqual(v, 'abcabc')
v = client.json_func(1, 2)
self.assertEqual(v, json_func(1, 2))
v = client.get_str()
self.assertEqual(v, u'这是一个测试')
def setup_logging(level=logging.INFO): def setup_logging(level=logging.INFO):
kw = { kw = {
# 'format': '[%(asctime)s][%(pathname)s]: %(message)s', # 'format': '[%(asctime)s][%(pathname)s]: %(message)s',

View File

@ -1,3 +1,4 @@
class SearpcTransport(object): class SearpcTransport(object):
""" """
A transport is repsonsible to send the serialized request to the A transport is repsonsible to send the serialized request to the

View File

@ -5,7 +5,7 @@ from pysearpc.errors import NetworkError
def recvall(fd, total): def recvall(fd, total):
remain = total remain = total
data = bytearray() data = ''
while remain > 0: while remain > 0:
try: try:
new = fd.recv(remain) new = fd.recv(remain)
@ -16,10 +16,10 @@ def recvall(fd, total):
if n <= 0: if n <= 0:
raise NetworkError("Failed to read from socket") raise NetworkError("Failed to read from socket")
else: else:
data.extend(new) data += new
remain -= n remain -= n
return bytes(data) return data
def sendall(fd, data): def sendall(fd, data):
total = len(data) total = len(data)

View File

@ -371,7 +371,7 @@ clar_test_init(int argc, char **argv)
} }
int int
clar_test_run(void) clar_test_run()
{ {
if (_clar.argc > 1) if (_clar.argc > 1)
clar_parse_args(_clar.argc, _clar.argv); clar_parse_args(_clar.argc, _clar.argv);
@ -386,7 +386,7 @@ clar_test_run(void)
} }
void void
clar_test_shutdown(void) clar_test_shutdown()
{ {
clar_print_shutdown( clar_print_shutdown(
_clar.tests_ran, _clar.tests_ran,

View File

@ -27,8 +27,6 @@ static const char *pipe_path = "\\\\.\\pipe\\libsearpc-test";
#define MAMAN_IS_BAR_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), MAMAN_TYPE_BAR)) #define MAMAN_IS_BAR_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), MAMAN_TYPE_BAR))
#define MAMAN_BAR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), MAMAN_TYPE_BAR, MamanBarClass)) #define MAMAN_BAR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), MAMAN_TYPE_BAR, MamanBarClass))
#define NAMED_PIPE_SERVER_THREAD_POOL_SIZE 50
typedef struct _MamanBar MamanBar; typedef struct _MamanBar MamanBar;
typedef struct _MamanBarClass MamanBarClass; typedef struct _MamanBarClass MamanBarClass;
@ -204,7 +202,7 @@ get_substring (const gchar *orig_str, int sub_len, GError **error)
} }
static SearpcClient * static SearpcClient *
do_create_client_with_pipe_transport(void) do_create_client_with_pipe_transport()
{ {
SearpcNamedPipeClient *pipe_client = searpc_create_named_pipe_client(pipe_path); SearpcNamedPipeClient *pipe_client = searpc_create_named_pipe_client(pipe_path);
cl_must_pass_(searpc_named_pipe_client_connect(pipe_client), "named pipe client failed to connect"); cl_must_pass_(searpc_named_pipe_client_connect(pipe_client), "named pipe client failed to connect");
@ -547,7 +545,7 @@ test_searpc__initialize (void)
client->async_send = sample_async_send; client->async_send = sample_async_send;
client->async_arg = "test_async"; client->async_arg = "test_async";
SearpcNamedPipeServer *pipe_server = searpc_create_named_pipe_server_with_threadpool(pipe_path, NAMED_PIPE_SERVER_THREAD_POOL_SIZE); SearpcNamedPipeServer *pipe_server = searpc_create_named_pipe_server(pipe_path);
cl_must_pass_(searpc_named_pipe_server_start(pipe_server), "named pipe server failed to start"); cl_must_pass_(searpc_named_pipe_server_start(pipe_server), "named pipe server failed to start");
#if defined(WIN32) #if defined(WIN32)
// Wait for the server thread to start // Wait for the server thread to start

View File

@ -1,18 +0,0 @@
{
"builtin-baseline": "f63682b9182187131b564c1395e4ac8ecb0c5ea8",
"dependencies": [
"glib",
"jansson",
"pthreads"
],
"overrides": [
{
"name": "jansson",
"version": "2.12-1"
},
{
"name": "pthreads",
"version": "3.0.0-4"
}
]
}