diff --git a/lib/Makefile.am b/lib/Makefile.am index 43bebfe..e875292 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -7,9 +7,9 @@ AM_CFLAGS = @GLIB_CFLAGS@ \ lib_LTLIBRARIES = libsearpc.la -include_HEADERS = searpc-client.h searpc-server.h searpc-utils.h searpc.h +include_HEADERS = searpc-client.h searpc-server.h searpc-utils.h searpc.h searpc-named-pipe-transport.h -libsearpc_la_SOURCES = searpc-client.c searpc-server.c searpc-utils.c +libsearpc_la_SOURCES = searpc-client.c searpc-server.c searpc-utils.c searpc-named-pipe-transport.c libsearpc_la_LDFLAGS = -version-info 1:2:0 -no-undefined diff --git a/lib/searpc-named-pipe-transport.c b/lib/searpc-named-pipe-transport.c new file mode 100644 index 0000000..4bf124b --- /dev/null +++ b/lib/searpc-named-pipe-transport.c @@ -0,0 +1,295 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include "searpc-utils.h" +#include "searpc-client.h" +#include "searpc-server.h" +#include "searpc-named-pipe-transport.h" + +static void* named_pipe_listen(void *arg); +static void* named_pipe_client_handler(void *arg); +static char* searpc_named_pipe_send(void *arg, const gchar *fcall_str, size_t fcall_len, size_t *ret_len); + +static char * request_to_json(const char *service, const char *fcall_str, size_t fcall_len); + +typedef struct { + SearpcNamedPipeClient* client; + char *service; +} ClientTransportData; + +SearpcClient* +searpc_client_with_named_pipe_transport(SearpcNamedPipeClient *pipe_client, + const char *service) +{ + SearpcClient *client= searpc_client_new(); + client->send = searpc_named_pipe_send; + + ClientTransportData *data = g_malloc(sizeof(ClientTransportData)); + data->client = pipe_client; + data->service = g_strdup(service); + + client->arg = data; + return client; +} + +SearpcNamedPipeClient* searpc_create_named_pipe_client(const char *path) +{ + SearpcNamedPipeClient *client = g_malloc0(sizeof(SearpcNamedPipeClient)); + memcpy(client->path, path, strlen(path) + 1); + return client; +} + +SearpcNamedPipeServer* searpc_create_named_pipe_server(const char *path) +{ + SearpcNamedPipeServer *server = g_malloc0(sizeof(SearpcNamedPipeServer)); + memcpy(server->path, path, strlen(path) + 1); + return server; +} + +int searpc_named_pipe_server_start(SearpcNamedPipeServer *server) +{ + int pipe_fd = socket (AF_UNIX, SOCK_STREAM, 0); + const char *un_path = server->path; + if (pipe_fd < 0) { + g_warning ("Failed to create unix socket fd : %s\n", + strerror(errno)); + return -1; + } + + struct sockaddr_un saddr; + saddr.sun_family = AF_UNIX; + + if (strlen(server->path) > sizeof(saddr.sun_path)-1) { + g_warning ("Unix socket path %s is too long." + "Please set or modify UNIX_SOCKET option in ccnet.conf.\n", + un_path); + goto failed; + } + + if (g_file_test (un_path, G_FILE_TEST_EXISTS)) { + g_warning ("socket file exists, delete it anyway\n"); + if (g_unlink (un_path) < 0) { + g_warning ("delete socket file failed : %s\n", strerror(errno)); + goto failed; + } + } + + g_strlcpy (saddr.sun_path, un_path, sizeof(saddr.sun_path)); + if (bind(pipe_fd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { + g_warning ("failed to bind unix socket fd to %s : %s\n", + un_path, strerror(errno)); + goto failed; + } + + if (listen(pipe_fd, 3) < 0) { + g_warning ("failed to listen to unix socket: %s\n", strerror(errno)); + goto failed; + } + + if (chmod(un_path, 0700) < 0) { + g_warning ("failed to set permisson for unix socket %s: %s\n", + un_path, strerror(errno)); + goto failed; + } + + server->pipe_fd = pipe_fd; + + /* TODO: use glib thread pool */ + pthread_create(&server->listener_thread, NULL, named_pipe_listen, server); + + return 0; + +failed: + close(pipe_fd); + + return -1; +} + +typedef struct { + SearpcNamedPipeServer *server; + int connfd; +} ServerHandlerData; + +static void* named_pipe_listen(void *arg) +{ + SearpcNamedPipeServer *server = arg; + while (1) { + int connfd = accept (server->pipe_fd, NULL, 0); + pthread_t *handler = g_malloc(sizeof(pthread_t)); + ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData)); + data->server = server; + data->connfd = connfd; + // TODO(low priority): Instead of using a thread to handle each client, + // use select(unix)/iocp(windows) to do it. + pthread_create(handler, NULL, named_pipe_client_handler, data); + server->handlers = g_list_append(server->handlers, handler); + } + return NULL; +} + +static void* named_pipe_client_handler(void *arg) +{ + ServerHandlerData *data = arg; + // SearpcNamedPipeServer *server = data->server; + int connfd = data->connfd; + + size_t len; + size_t bufsize = 4096; + char *buf = g_malloc(bufsize); + + g_warning ("start to serve on pipe client\n"); + + while (1) { + if (pipe_read_n(connfd, &len, sizeof(uint32_t)) < 0) { + g_warning("failed to read rpc request size: %s", strerror(errno)); + break; + } + + while (bufsize < len) { + bufsize *= 2; + buf = realloc(buf, bufsize); + } + + if (pipe_read_n(connfd, buf, len) < 0) { + g_warning("failed to read rpc request: %s", strerror(errno)); + g_free (buf); + break; + } + + size_t ret_len; + char *ret_str = searpc_server_call_function ("test", buf, len, &ret_len); + + if (pipe_write_n(connfd, &ret_len, sizeof(uint32_t)) < 0) { + g_warning("failed to send rpc resopnse: %s", strerror(errno)); + g_free (ret_str); + break; + } + + if (pipe_write_n(connfd, ret_str, ret_len) < 0) { + g_warning("failed to send rpc resopnse: %s", strerror(errno)); + g_free (ret_str); + break; + } + } + + return NULL; +} + + +int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client) +{ + client->pipe_fd = socket(AF_UNIX, SOCK_STREAM, 0); + struct sockaddr_un servaddr; + servaddr.sun_family = AF_UNIX; + + g_strlcpy (servaddr.sun_path, client->path, sizeof(servaddr.sun_path)); + if (connect(client->pipe_fd, (struct sockaddr *)&servaddr, (socklen_t)sizeof(servaddr)) < 0) { + g_warning ("pipe client failed to connect to server\n"); + return -1; + } + g_warning ("pipe client connectd to server\n"); + return 0; +} + +char *searpc_named_pipe_send(void *arg, const gchar *fcall_str, + size_t fcall_len, size_t *ret_len) +{ + g_warning ("searpc_named_pipe_send is called\n"); + ClientTransportData *data = arg; + SearpcNamedPipeClient *client = data->client; + + char *json_str = request_to_json(data->service, fcall_str, fcall_len); + size_t json_len = strlen(json_str); + + uint32_t len = fcall_len; + if (pipe_write_n(client->pipe_fd, &len, sizeof(uint32_t)) < 0) { + g_warning("failed to send rpc call: %s", strerror(errno)); + return NULL; + } + + if (pipe_write_n(client->pipe_fd, json_str, json_len) < 0) { + g_warning("failed to send rpc call: %s", strerror(errno)); + return NULL; + } + + if (pipe_read_n(client->pipe_fd, &len, sizeof(uint32_t)) < 0) { + g_warning("failed to read rpc response: %s", strerror(errno)); + return NULL; + } + + char *buf = g_malloc(len); + + if (pipe_read_n(client->pipe_fd, buf, len) < 0) { + g_warning("failed to read rpc response: %s", strerror(errno)); + g_free (buf); + return NULL; + } + + *ret_len = len; + return buf; +} + +static char * +request_to_json (const char *service, const char *fcall_str, size_t fcall_len) +{ + // TODO + char *ret = g_malloc0(fcall_len + 1); + memcpy(ret, fcall_str, fcall_len); + return ret; + + /* json_t *object; */ + + /* object = json_object (); */ + + /* json_object_set_string_member (object, "commit_id", commit->commit_id); */ + /* json_object_set_string_member (object, "root_id", commit->root_id); */ + /* json_object_set_string_member (object, "repo_id", commit->repo_id); */ + /* if (commit->creator_name) */ + /* json_object_set_string_member (object, "creator_name", commit->creator_name); */ + /* json_object_set_string_member (object, "creator", commit->creator_id); */ + /* json_object_set_string_member (object, "description", commit->desc); */ + /* json_object_set_int_member (object, "ctime", (gint64)commit->ctime); */ + /* json_object_set_string_or_null_member (object, "parent_id", commit->parent_id); */ + /* json_object_set_string_or_null_member (object, "second_parent_id", */ + /* commit->second_parent_id); */ + /* /\* */ + /* * also save repo's properties to commit file, for easy sharing of */ + /* * repo info */ + /* *\/ */ + /* json_object_set_string_member (object, "repo_name", commit->repo_name); */ + /* json_object_set_string_member (object, "repo_desc", */ + /* commit->repo_desc); */ + /* json_object_set_string_or_null_member (object, "repo_category", */ + /* commit->repo_category); */ + /* if (commit->device_name) */ + /* json_object_set_string_member (object, "device_name", commit->device_name); */ + + /* if (commit->encrypted) */ + /* json_object_set_string_member (object, "encrypted", "true"); */ + + /* if (commit->encrypted) { */ + /* json_object_set_int_member (object, "enc_version", commit->enc_version); */ + /* if (commit->enc_version >= 1) */ + /* json_object_set_string_member (object, "magic", commit->magic); */ + /* if (commit->enc_version == 2) */ + /* json_object_set_string_member (object, "key", commit->random_key); */ + /* } */ + /* if (commit->no_local_history) */ + /* json_object_set_int_member (object, "no_local_history", 1); */ + /* if (commit->version != 0) */ + /* json_object_set_int_member (object, "version", commit->version); */ + /* if (commit->conflict) */ + /* json_object_set_int_member (object, "conflict", 1); */ + /* if (commit->new_merge) */ + /* json_object_set_int_member (object, "new_merge", 1); */ + /* if (commit->repaired) */ + /* json_object_set_int_member (object, "repaired", 1); */ + + /* return object; */ +} diff --git a/lib/searpc-named-pipe-transport.h b/lib/searpc-named-pipe-transport.h new file mode 100644 index 0000000..676cdf5 --- /dev/null +++ b/lib/searpc-named-pipe-transport.h @@ -0,0 +1,30 @@ +#ifndef SEARPC_NAMED_PIPE_TRANSPORT_H +#define SEARPC_NAMED_PIPE_TRANSPORT_H + +#include +#include +#include + +typedef struct { + char path[4096]; + pthread_t listener_thread; + GList *handlers; + int pipe_fd; +} SearpcNamedPipeServer; + +typedef struct { + char path[4096]; + int pipe_fd; +} SearpcNamedPipeClient; + +SearpcNamedPipeClient* searpc_create_named_pipe_client(const char *path); + +SearpcNamedPipeServer* searpc_create_named_pipe_server(const char *path); + +SearpcClient * searpc_client_with_named_pipe_transport(SearpcNamedPipeClient *client, const char *service); + +int searpc_named_pipe_server_start(SearpcNamedPipeServer *server); + +int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client); + +#endif // SEARPC_NAMED_PIPE_TRANSPORT_H diff --git a/lib/searpc-utils.c b/lib/searpc-utils.c index 08c00d8..b8a25eb 100644 --- a/lib/searpc-utils.c +++ b/lib/searpc-utils.c @@ -1,3 +1,5 @@ +#include +#include #include #include #include @@ -16,7 +18,7 @@ static json_t *json_serialize_pspec (const GValue *value) case G_TYPE_STRING: if (!g_value_get_string (value)) break; - else + else return json_string (g_value_get_string (value)); case G_TYPE_BOOLEAN: if (g_value_get_boolean (value)) @@ -53,10 +55,10 @@ static json_t *json_serialize_pspec (const GValue *value) return json_gobject_serialize (object); } break; - defalut: + default: g_warning("Unsuppoted type `%s'",g_type_name (G_VALUE_TYPE (value))); } - return json_null(); + return json_null(); } json_t *json_gobject_serialize (GObject *gobject) @@ -89,7 +91,7 @@ json_t *json_gobject_serialize (GObject *gobject) } static gboolean json_deserialize_pspec (GValue *value, GParamSpec *pspec, json_t *node) -{ +{ switch (json_typeof(node)) { case JSON_OBJECT: if (g_type_is_a (G_VALUE_TYPE (value), G_TYPE_OBJECT)) { @@ -222,7 +224,7 @@ GObject *json_gobject_deserialize (GType gtype, json_t *object) g_array_append_val (construct_params, param); } else - g_warning ("Failed to deserialize \"%s\" property of type \"%s\" for an object of type \"%s\"", + g_warning ("Failed to deserialize \"%s\" property of type \"%s\" for an object of type \"%s\"", pspec->name, g_type_name (G_VALUE_TYPE (¶m.value)), g_type_name (gtype)); } @@ -240,3 +242,57 @@ GObject *json_gobject_deserialize (GType gtype, json_t *object) return ret; } + +// Write "n" bytes to a descriptor. +ssize_t +pipe_write_n(int fd, const void *vptr, size_t n) +{ + size_t nleft; + ssize_t nwritten; + const char *ptr; + + ptr = vptr; + nleft = n; + while (nleft > 0) { +#ifndef WIN32 + if ( (nwritten = write(fd, ptr, nleft)) <= 0) +#else + if ( (nwritten = send(fd, ptr, nleft, 0)) <= 0) +#endif + { + if (nwritten < 0 && errno == EINTR) + nwritten = 0; /* and call write() again */ + else + return(-1); /* error */ + } + + nleft -= nwritten; + ptr += nwritten; + } + return(n); +} + +// Read "n" bytes from a descriptor. +ssize_t +pipe_read_n(int fd, void *vptr, size_t n) +{ + size_t nleft; + ssize_t nread; + char *ptr; + + ptr = vptr; + nleft = n; + while (nleft > 0) { + if ( (nread = read(fd, ptr, nleft)) < 0) { + if (errno == EINTR) + nread = 0; /* and call read() again */ + else + return(-1); + } else if (nread == 0) + break; /* EOF */ + + nleft -= nread; + ptr += nread; + } + return(n - nleft); /* return >= 0 */ +} diff --git a/lib/searpc-utils.h b/lib/searpc-utils.h index 8e4f3c0..25e65cb 100644 --- a/lib/searpc-utils.h +++ b/lib/searpc-utils.h @@ -58,3 +58,8 @@ inline static json_int_t json_array_get_int_element (json_t *array, size_t index return json_integer_value (json_array_get (array, index)); } +// Write "n" bytes to a descriptor. +ssize_t pipe_write_n(int fd, const void *vptr, size_t n); + +// Read "n" bytes from a descriptor. +ssize_t pipe_read_n(int fd, void *vptr, size_t n); diff --git a/tests/searpc.c b/tests/searpc.c index b8b462d..d48711c 100644 --- a/tests/searpc.c +++ b/tests/searpc.c @@ -9,6 +9,7 @@ #include "searpc-server.h" #include "searpc-client.h" +#include "searpc-named-pipe-transport.h" #include "clar.h" /* sample class */ @@ -62,7 +63,7 @@ maman_bar_set_property (GObject *object, case PROP_PAPA_NUMBER: self->papa_number = g_value_get_uchar (value); break; - + default: /* We don't have any other property... */ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); @@ -77,7 +78,7 @@ maman_bar_get_property (GObject *object, GParamSpec *pspec) { MamanBar *self = MAMAN_BAR (object); - + switch (property_id) { case PROP_MAMAN_NAME: g_value_set_string (value, self->name); @@ -86,7 +87,7 @@ maman_bar_get_property (GObject *object, case PROP_PAPA_NUMBER: g_value_set_uchar (value, self->papa_number); break; - + default: /* We don't have any other property... */ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); @@ -166,7 +167,7 @@ sample_async_send (void *arg, gchar *fcall_str, size_t fcall_len, void *rpc_priv) { cl_assert (strcmp(arg, "test_async") == 0); - + char *ret; size_t ret_len; gchar *temp = g_strdup(fcall_str); @@ -299,7 +300,7 @@ test_searpc__objlist_call (void) void simple_callback (void *result, void *user_data, GError *error) { char *res = (char *)result; - + cl_assert (strcmp(res, "he") == 0); } @@ -354,11 +355,11 @@ test_searpc__initialize (void) #endif searpc_server_init (register_marshals); searpc_create_service ("test"); - searpc_server_register_function ("test", get_substring, "get_substring", + searpc_server_register_function ("test", get_substring, "get_substring", searpc_signature_string__string_int()); - searpc_server_register_function ("test", get_maman_bar, "get_maman_bar", + searpc_server_register_function ("test", get_maman_bar, "get_maman_bar", searpc_signature_object__string()); - searpc_server_register_function ("test", get_maman_bar_list, "get_maman_bar_list", + searpc_server_register_function ("test", get_maman_bar_list, "get_maman_bar_list", searpc_signature_objlist__string_int()); /* sample client */