From eb7fb022de19753b14c4b21d28a2460f08913edb Mon Sep 17 00:00:00 2001 From: Shuai Lin Date: Wed, 25 May 2016 11:42:01 +0800 Subject: [PATCH] named pipe transport: compile on windows. --- autogen.sh | 53 +++++-- lib/searpc-named-pipe-transport.c | 255 +++++++++++++++++++++++++++++- lib/searpc-named-pipe-transport.h | 15 +- lib/searpc-utils.c | 54 ------- lib/searpc-utils.h | 6 - tests/searpc.c | 55 ++++++- 6 files changed, 346 insertions(+), 92 deletions(-) diff --git a/autogen.sh b/autogen.sh index b6a7d24..257b58a 100755 --- a/autogen.sh +++ b/autogen.sh @@ -1,26 +1,22 @@ -#!/bin/sh +#!/bin/bash # Run this to generate all the initial makefiles, etc. : ${AUTOCONF=autoconf} : ${AUTOHEADER=autoheader} : ${AUTOMAKE=automake} : ${ACLOCAL=aclocal} -if test "$(uname -s)" != "Darwin"; then - : ${LIBTOOLIZE=libtoolize} - : ${LIBTOOL=libtool} +if test "$(uname)" != "Darwin"; then + : ${LIBTOOLIZE=libtoolize} else - : ${LIBTOOLIZE=glibtoolize} - : ${LIBTOOL=glibtool} + : ${LIBTOOLIZE=glibtoolize} fi +: ${LIBTOOL=libtool} srcdir=`dirname $0` test -z "$srcdir" && srcdir=. -ORIGDIR=`pwd` cd $srcdir PROJECT=libsearpc -TEST_TYPE=-f -FILE=searpc-server.h CONFIGURE=configure.ac DIE=0 @@ -41,7 +37,7 @@ DIE=0 DIE=1 } -if test "$(uname -s)" != "Darwin"; then +if test "$(uname)" != "Darwin"; then (grep "^AC_PROG_LIBTOOL" $CONFIGURE >/dev/null) && { ($LIBTOOL --version) < /dev/null > /dev/null 2>&1 || { echo @@ -53,14 +49,41 @@ if test "$(uname -s)" != "Darwin"; then } fi + if test "$DIE" -eq 1; then exit 1 fi +dr=`dirname .` +echo processing $dr +aclocalinclude="$aclocalinclude -I m4" + if test x"$MSYSTEM" = x"MINGW32"; then - autoreconf --install -I/local/share/aclocal -elif test "$(uname -s)" = "Darwin"; then - autoreconf --install -I/opt/local/share/aclocal -else - autoreconf --install + aclocalinclude="$aclocalinclude -I /mingw32/share/aclocal" +elif test "$(uname)" = "Darwin"; then + aclocalinclude="$aclocalinclude -I /opt/local/share/aclocal" fi + + +echo "Creating $dr/aclocal.m4 ..." +test -r $dr/aclocal.m4 || touch $dr/aclocal.m4 +echo "Running glib-gettextize... Ignore non-fatal messages." +echo "no" | glib-gettextize --force --copy + +echo "Making $dr/aclocal.m4 writable ..." +test -r $dr/aclocal.m4 && chmod u+w $dr/aclocal.m4 + +echo "Running $LIBTOOLIZE..." +$LIBTOOLIZE --force --copy + +echo "Running $ACLOCAL $aclocalinclude ..." +$ACLOCAL $aclocalinclude + +echo "Running $AUTOHEADER..." +$AUTOHEADER + +echo "Running $AUTOMAKE --gnu $am_opt ..." +$AUTOMAKE --add-missing --gnu $am_opt + +echo "Running $AUTOCONF ..." +$AUTOCONF diff --git a/lib/searpc-named-pipe-transport.c b/lib/searpc-named-pipe-transport.c index 231ae09..ed813d8 100644 --- a/lib/searpc-named-pipe-transport.c +++ b/lib/searpc-named-pipe-transport.c @@ -1,10 +1,14 @@ #include #include #include -#include -#include -#include -#include +#include + +#if !defined(WIN32) + #include + #include + #include + #include +#endif // !defined(WIN32) #include #include @@ -14,6 +18,19 @@ #include "searpc-server.h" #include "searpc-named-pipe-transport.h" +#if defined(WIN32) +static const int kPipeBufSize = 1024; +static char* formatErrorMessage(); + +#define G_WARNING_WITH_LAST_ERROR(fmt) \ + do { \ + char *error_msg__ = formatErrorMessage(); \ + g_warning(fmt ": %s\n", error_msg__); \ + g_free (error_msg__); \ + } while(0); + +#endif // defined(WIN32) + static void* named_pipe_listen(void *arg); static void* named_pipe_client_handler(void *arg); static char* searpc_named_pipe_send(void *arg, const gchar *fcall_str, size_t fcall_len, size_t *ret_len); @@ -23,6 +40,9 @@ static int request_from_json (const char *content, size_t len, char **service, c static void json_object_set_string_member (json_t *object, const char *key, const char *value); static const char * json_object_get_string_member (json_t *object, const char *key); +static ssize_t pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n); +static ssize_t pipe_read_n(SearpcNamedPipe fd, void *vptr, size_t n); + typedef struct { SearpcNamedPipeClient* client; char *service; @@ -59,6 +79,7 @@ SearpcNamedPipeServer* searpc_create_named_pipe_server(const char *path) int searpc_named_pipe_server_start(SearpcNamedPipeServer *server) { +#if !defined(WIN32) int pipe_fd = socket (AF_UNIX, SOCK_STREAM, 0); const char *un_path = server->path; if (pipe_fd < 0) { @@ -105,25 +126,28 @@ int searpc_named_pipe_server_start(SearpcNamedPipeServer *server) server->pipe_fd = pipe_fd; +#endif // !defined(WIN32) + /* TODO: use glib thread pool */ pthread_create(&server->listener_thread, NULL, named_pipe_listen, server); - return 0; +#if !defined(WIN32) failed: close(pipe_fd); - return -1; +#endif } typedef struct { SearpcNamedPipeServer *server; - int connfd; + SearpcNamedPipe connfd; } ServerHandlerData; static void* named_pipe_listen(void *arg) { SearpcNamedPipeServer *server = arg; +#if !defined(WIN32) while (1) { int connfd = accept (server->pipe_fd, NULL, 0); pthread_t *handler = g_malloc(sizeof(pthread_t)); @@ -135,6 +159,51 @@ static void* named_pipe_listen(void *arg) pthread_create(handler, NULL, named_pipe_client_handler, data); server->handlers = g_list_append(server->handlers, handler); } + +#else // !defined(WIN32) + while (1) { + HANDLE connfd = INVALID_HANDLE_VALUE; + BOOL connected = FALSE; + + connfd = CreateNamedPipe( + server->path, // pipe name + PIPE_ACCESS_DUPLEX, // read/write access + PIPE_TYPE_MESSAGE | // message type pipe + PIPE_READMODE_MESSAGE | // message-read mode + PIPE_WAIT, // blocking mode + PIPE_UNLIMITED_INSTANCES, // max. instances + kPipeBufSize, // output buffer size + kPipeBufSize, // input buffer size + 0, // client time-out + NULL); // default security attribute + + if (connfd == INVALID_HANDLE_VALUE) { + G_WARNING_WITH_LAST_ERROR ("Failed to create named pipe"); + break; + } + + /* listening on this pipe */ + connected = ConnectNamedPipe(connfd, NULL) ? + TRUE : (GetLastError() == ERROR_PIPE_CONNECTED); + + if (!connected) { + G_WARNING_WITH_LAST_ERROR ("failed to ConnectNamedPipe()"); + CloseHandle(connfd); + break; + } + + g_debug ("Accepted a named pipe client\n"); + + pthread_t *handler = g_malloc(sizeof(pthread_t)); + ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData)); + data->server = server; + data->connfd = connfd; + // TODO(low priority): Instead of using a thread to handle each client, + // use select(unix)/iocp(windows) to do it. + pthread_create(handler, NULL, named_pipe_client_handler, data); + server->handlers = g_list_append(server->handlers, handler); + } +#endif // !defined(WIN32) return NULL; } @@ -142,7 +211,7 @@ static void* named_pipe_client_handler(void *arg) { ServerHandlerData *data = arg; // SearpcNamedPipeServer *server = data->server; - int connfd = data->connfd; + SearpcNamedPipe connfd = data->connfd; size_t len; size_t bufsize = 4096; @@ -190,7 +259,12 @@ static void* named_pipe_client_handler(void *arg) } } +#if !defined(WIN32) close(connfd); +#else // !defined(WIN32) + DisconnectNamedPipe(connfd); + CloseHandle(connfd); +#endif // !defined(WIN32) return NULL; } @@ -198,6 +272,7 @@ static void* named_pipe_client_handler(void *arg) int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client) { +#if !defined(WIN32) client->pipe_fd = socket(AF_UNIX, SOCK_STREAM, 0); struct sockaddr_un servaddr; servaddr.sun_family = AF_UNIX; @@ -207,6 +282,34 @@ int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client) g_warning ("pipe client failed to connect to server\n"); return -1; } + +#else // !defined(WIN32) + SearpcNamedPipe pipe_fd; + pipe_fd = CreateFile( + client->path, // pipe name + GENERIC_READ | // read and write access + GENERIC_WRITE, + 0, // no sharing + NULL, // default security attributes + OPEN_EXISTING, // opens existing pipe + 0, // default attributes + NULL); // no template file + + if (pipe_fd == INVALID_HANDLE_VALUE) { + G_WARNING_WITH_LAST_ERROR("Failed to connect to named pipe"); + return -1; + } + + DWORD mode = PIPE_READMODE_MESSAGE; + if (!SetNamedPipeHandleState(pipe_fd, &mode, NULL, NULL)) { + G_WARNING_WITH_LAST_ERROR("Failed to set named pipe mode"); + return -1; + } + + client->pipe_fd = pipe_fd; + +#endif // !defined(WIN32) + g_warning ("pipe client connectd to server\n"); return 0; } @@ -302,3 +405,139 @@ json_object_get_string_member (json_t *object, const char *key) return NULL; return json_string_value (string); } + +#if !defined(WIN32) + +// Write "n" bytes to a descriptor. +ssize_t +pipe_write_n(int fd, const void *vptr, size_t n) +{ + size_t nleft; + ssize_t nwritten; + const char *ptr; + + ptr = vptr; + nleft = n; + while (nleft > 0) { + if ( (nwritten = write(fd, ptr, nleft)) <= 0) + { + if (nwritten < 0 && errno == EINTR) + nwritten = 0; /* and call write() again */ + else + return(-1); /* error */ + } + + nleft -= nwritten; + ptr += nwritten; + } + return(n); +} + +// Read "n" bytes from a descriptor. +ssize_t +pipe_read_n(int fd, void *vptr, size_t n) +{ + size_t nleft; + ssize_t nread; + char *ptr; + + ptr = vptr; + nleft = n; + while (nleft > 0) { + if ( (nread = read(fd, ptr, nleft)) < 0) { + if (errno == EINTR) + nread = 0; /* and call read() again */ + else + return(-1); + } else if (nread == 0) + break; /* EOF */ + + nleft -= nread; + ptr += nread; + } + return(n - nleft); /* return >= 0 */ +} + +#else // !defined(WIN32) + +ssize_t pipe_read_n (SearpcNamedPipe fd, void *vptr, size_t n) +{ + DWORD bytes_read; + BOOL success = ReadFile( + fd, // handle to pipe + vptr, // buffer to receive data + (DWORD)n, // size of buffer + &bytes_read, // number of bytes read + NULL); // not overlapped I/O + + if (!success || bytes_read != (DWORD)n) { + G_WARNING_WITH_LAST_ERROR("failed to read from pipe"); + return -1; + } + + return n; +} + +ssize_t pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n) +{ + DWORD bytes_written; + BOOL success = WriteFile( + fd, // handle to pipe + vptr, // buffer to receive data + (DWORD)n, // size of buffer + &bytes_written, // number of bytes written + NULL); // not overlapped I/O + + if (!success || bytes_written != (DWORD)n) { + G_WARNING_WITH_LAST_ERROR("failed to read command from the pipe"); + } + + FlushFileBuffers(fd); + return 0; +} + +static char *locale_to_utf8 (const gchar *src) +{ + if (!src) + return NULL; + + gsize bytes_read = 0; + gsize bytes_written = 0; + GError *error = NULL; + gchar *dst = NULL; + + dst = g_locale_to_utf8 + (src, /* locale specific string */ + strlen(src), /* len of src */ + &bytes_read, /* length processed */ + &bytes_written, /* output length */ + &error); + + if (error) { + return NULL; + } + + return dst; +} + + +// http://stackoverflow.com/questions/3006229/get-a-text-from-the-error-code-returns-from-the-getlasterror-function +// The caller is responsible to free the returned message. +char* formatErrorMessage() +{ + DWORD error_code = GetLastError(); + if (error_code == 0) { + return g_strdup("no error"); + } + char buf[256] = {0}; + FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM, + NULL, + error_code, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + buf, + sizeof(buf) - 1, + NULL); + return locale_to_utf8(buf); +} + +#endif // !defined(WIN32) diff --git a/lib/searpc-named-pipe-transport.h b/lib/searpc-named-pipe-transport.h index 229d222..cb2cf9e 100644 --- a/lib/searpc-named-pipe-transport.h +++ b/lib/searpc-named-pipe-transport.h @@ -1,10 +1,15 @@ #ifndef SEARPC_NAMED_PIPE_TRANSPORT_H #define SEARPC_NAMED_PIPE_TRANSPORT_H +#include #include #include #include +#if defined(WIN32) +#include +#endif + // Implementatin of a searpc transport based on named pipe. It uses unix domain // sockets on linux/osx, and named pipes on windows. // @@ -14,13 +19,19 @@ // the RPC functions implementation's responsibility to guarantee thread safety // of the RPC calls. (e.g. using mutexes). +#if defined(WIN32) +typedef HANDLE SearpcNamedPipe; +#else +typedef int SearpcNamedPipe; +#endif + // Server side interface. typedef struct { char path[4096]; pthread_t listener_thread; GList *handlers; - int pipe_fd; + SearpcNamedPipe pipe_fd; } SearpcNamedPipeServer; SearpcNamedPipeServer* searpc_create_named_pipe_server(const char *path); @@ -31,7 +42,7 @@ int searpc_named_pipe_server_start(SearpcNamedPipeServer *server); typedef struct { char path[4096]; - int pipe_fd; + SearpcNamedPipe pipe_fd; } SearpcNamedPipeClient; SearpcNamedPipeClient* searpc_create_named_pipe_client(const char *path); diff --git a/lib/searpc-utils.c b/lib/searpc-utils.c index c3913e5..c7345e7 100644 --- a/lib/searpc-utils.c +++ b/lib/searpc-utils.c @@ -242,57 +242,3 @@ GObject *json_gobject_deserialize (GType gtype, json_t *object) return ret; } - -// Write "n" bytes to a descriptor. -ssize_t -pipe_write_n(int fd, const void *vptr, size_t n) -{ - size_t nleft; - ssize_t nwritten; - const char *ptr; - - ptr = vptr; - nleft = n; - while (nleft > 0) { -#ifndef WIN32 - if ( (nwritten = write(fd, ptr, nleft)) <= 0) -#else - if ( (nwritten = send(fd, ptr, nleft, 0)) <= 0) -#endif - { - if (nwritten < 0 && errno == EINTR) - nwritten = 0; /* and call write() again */ - else - return(-1); /* error */ - } - - nleft -= nwritten; - ptr += nwritten; - } - return(n); -} - -// Read "n" bytes from a descriptor. -ssize_t -pipe_read_n(int fd, void *vptr, size_t n) -{ - size_t nleft; - ssize_t nread; - char *ptr; - - ptr = vptr; - nleft = n; - while (nleft > 0) { - if ( (nread = read(fd, ptr, nleft)) < 0) { - if (errno == EINTR) - nread = 0; /* and call read() again */ - else - return(-1); - } else if (nread == 0) - break; /* EOF */ - - nleft -= nread; - ptr += nread; - } - return(n - nleft); /* return >= 0 */ -} diff --git a/lib/searpc-utils.h b/lib/searpc-utils.h index a34cb87..ff41e1d 100644 --- a/lib/searpc-utils.h +++ b/lib/searpc-utils.h @@ -57,9 +57,3 @@ inline static json_int_t json_array_get_int_element (json_t *array, size_t index { return json_integer_value (json_array_get (array, index)); } - -// Write "n" bytes to a descriptor. -ssize_t pipe_write_n(int fd, const void *vptr, size_t n); - -// Read "n" bytes from a descriptor. -ssize_t pipe_read_n(int fd, void *vptr, size_t n); diff --git a/tests/searpc.c b/tests/searpc.c index d48711c..88019b9 100644 --- a/tests/searpc.c +++ b/tests/searpc.c @@ -217,8 +217,6 @@ test_searpc__simple_call (void) void test_searpc__invalid_call (void) { - char *fcall, *fret; - gsize fcall_len, ret_len; gchar* result; GError *error = NULL; @@ -306,8 +304,6 @@ void simple_callback (void *result, void *user_data, GError *error) void simple_callback_error (void *result, void *user_data, GError *error) { - char *res = (char *)result; - cl_assert (result == NULL); cl_assert (error != NULL); } @@ -344,6 +340,44 @@ test_searpc__pipe_simple_call (void) g_free (result); } +void +test_searpc__pipe_large_request (void) +{ + gchar* result; + GError *error = NULL; + + // 10MB + int size = 10 * 1024 * 1024; + GString *large_string = g_string_sized_new(size); + while (large_string->len < size) { + g_string_append(large_string, "aaaa"); + } + + // Large request + result = searpc_client_call__string (client_with_pipe_transport, "get_substring", &error, + 2, "string", large_string->str, "int", 2); + cl_assert_ (error == NULL, error ? error->message : ""); + cl_assert_ (strcmp(result, "aa") == 0, result); + g_free (result); + + // Large request & Large response + result = searpc_client_call__string (client_with_pipe_transport, "get_substring", &error, + 2, "string", large_string->str, "int", size - 2); + cl_assert_ (error == NULL, error ? error->message : ""); + // cl_assert (strcmp(result, "aa") == 0); + g_free (result); + + g_string_free (large_string, TRUE); +} + +void +test_searpc__pipe_multiple_clients (void) +{ + //TODO: Implement a test where multiple clients access the server + //simultaneously. +} + + #include "searpc-signature.h" #include "searpc-marshal.h" @@ -370,14 +404,21 @@ test_searpc__initialize (void) client->async_send = sample_async_send; client->async_arg = "test_async"; +#if !defined(WIN32) const char *pipe_path = "/tmp/.searpc-test"; +#else + const char *pipe_path = "\\\\.\\pipe\\libsearpc-test"; +#endif SearpcNamedPipeServer *pipe_server = searpc_create_named_pipe_server(pipe_path); - SearpcNamedPipeClient *pipe_client = searpc_create_named_pipe_client(pipe_path); - cl_must_pass_(searpc_named_pipe_server_start(pipe_server), "named pipe server failed to start"); - cl_must_pass_(searpc_named_pipe_client_connect(pipe_client), "named pipe client failed to connect"); +#if defined(WIN32) + // Wait for the server thread to start + Sleep(1000); +#endif + SearpcNamedPipeClient *pipe_client = searpc_create_named_pipe_client(pipe_path); + cl_must_pass_(searpc_named_pipe_client_connect(pipe_client), "named pipe client failed to connect"); client_with_pipe_transport = searpc_client_with_named_pipe_transport(pipe_client, "test"); }