From e4d932f0c02c9941ba376ce04f48bbdecf22c7a8 Mon Sep 17 00:00:00 2001 From: plt Date: Tue, 13 Sep 2011 13:07:31 +0800 Subject: [PATCH] Implement asynchronized rpc --- .gitignore | 1 + demo/Makefile.am | 7 +- demo/demo-async-client.c | 140 +++++++++++++++++++++++++++++++++++++++ lib/gencode.py | 48 ++++++++++++++ lib/searpc-client.c | 67 ++++++++++++++++++- lib/searpc-client.h | 32 +++++++++ 6 files changed, 293 insertions(+), 2 deletions(-) create mode 100644 demo/demo-async-client.c diff --git a/.gitignore b/.gitignore index 000483c..d2e2d6c 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ Makefile.in stamp-h1 searpc-demo-server searpc-demo-client +searpc-async-client pysearpc/fcallfret.h m4/l* depcomp diff --git a/demo/Makefile.am b/demo/Makefile.am index 06a6da3..6558a76 100644 --- a/demo/Makefile.am +++ b/demo/Makefile.am @@ -1,6 +1,6 @@ AM_CFLAGS = @GLIB2_CFLAGS@ @GOBJECT_CFLAGS@ @JSON_GLIB_CFLAGS@ -I${top_builddir}/lib -I${top_srcdir}/lib -noinst_PROGRAMS = searpc-demo-server searpc-demo-client +noinst_PROGRAMS = searpc-demo-server searpc-demo-client searpc-async-client searpc_demo_server_SOURCES = searpc-demo-server.c searpc-demo-packet.h @@ -10,3 +10,8 @@ searpc_demo_client_SOURCES = searpc-demo-client.c searpc-demo-packet.h searpc_demo_client_LDADD = ${top_builddir}/lib/libsearpc.la +searpc_async_client_SOURCES = demo-async-client.c searpc-demo-packet.h + +searpc_async_client_LDADD = ${top_builddir}/lib/libsearpc.la + + diff --git a/demo/demo-async-client.c b/demo/demo-async-client.c new file mode 100644 index 0000000..12250eb --- /dev/null +++ b/demo/demo-async-client.c @@ -0,0 +1,140 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include "searpc-client.h" +#include "searpc-demo-packet.h" + +#define BUFLEN 256 +#define MAGIC_STRING "ABCD" + +/* define the client-side function */ +SEARPC_CLIENT_ASYNC_DEFUN_INT__STRING(searpc_strlen, 0); + + +typedef struct { + int fd; + void *rpc_priv; +} TcpTransport; + +/* rpc_priv is used by the rpc_client to store information related to + * this rpc call. */ +static int transport_send(void *arg, const char *fcall_str, + size_t fcall_len, void *rpc_priv) +{ + TcpTransport *trans = arg; + int fd, ret; + char buf[BUFLEN]; + packet *pac, *pac_ret; + + pac = (packet *)buf; + + /* construct the packet */ + pac->length = htons((uint16_t)fcall_len); + strncpy(pac->data, fcall_str, fcall_len); + + /* send the packet */ + if ( writen (trans->fd, buf, PACKET_HEADER_LENGTH + fcall_len) == -1) { + fprintf (stderr, "write failed: %s\n", strerror(errno)); + exit(-1); + } + + trans->rpc_priv = rpc_priv; + + return 0; +} + +static void +transport_read(TcpTransport *trans) +{ + packet *pac; + int ret_len; + char buf[BUFLEN]; + + /* read the returned packet */ + pac = read_packet(trans->fd, buf); + if (pac == NULL) { + fprintf(stderr, "read packet failed: %s\n", strerror(errno)); + exit(-1); + } + + ret_len = ntohs(pac->length); + searpc_client_generic_callback (pac->data, ret_len, trans->rpc_priv, NULL); + trans->rpc_priv = NULL; +} + +static void +strlen_callback(void *vresult, void *user_data, GError *error) +{ + const char *str = user_data; + int len = (int)(long)vresult; + + g_assert (strcmp(str, "user data") == 0); + printf("the length of string 'hello searpc' is %d.\n", len); +} + +int +main(int argc, char *argv[]) +{ + int sockfd, ret; + char *ret_str; + struct sockaddr_in servaddr; + SearpcClient *rpc_client; + GError *error = NULL; + TcpTransport *transport; + + g_type_init(); + + ret = sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (ret < 0) { + fprintf(stderr, "socket failed: %s\n", strerror(errno)); + exit(-1); + } + + int on = 1; + if (setsockopt (sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { + fprintf (stderr, "setsockopt of SO_REUSEADDR error: %s\n", strerror(errno)); + exit(-1); + } + memset(&servaddr, 0, sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_port = htons(12345); + inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr); + + ret = connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); + if (ret < 0) { + fprintf(stderr, "connect failed: %s\n", strerror(errno)); + exit(-1); + } + + transport = g_new0 (TcpTransport, 1); + transport->fd = sockfd; + + /* create an rpc_client and supply the transport function. */ + rpc_client = searpc_client_new(); + rpc_client->async_send = transport_send; + rpc_client->async_arg = (void *)(long)transport; + + /* call the client-side funcion */ + searpc_strlen(rpc_client, "hello searpc", strlen_callback, "user data"); + + /* call the transport to receive response */ + transport_read (transport); + if (error != NULL) { + fprintf(stderr, "error: %s\n", error->message); + exit(-1); + } + + close(sockfd); + + return 0; +} diff --git a/lib/gencode.py b/lib/gencode.py index 9a75659..5dc5811 100644 --- a/lib/gencode.py +++ b/lib/gencode.py @@ -329,6 +329,53 @@ def gen_dfun_macro_list(): for item in func_table: print gen_dfun_macro(item[0], item[1]) + +async_dfun_template = r""" +#define SEARPC_CLIENT_ASYNC_DEFUN_${RET_TYPE}__${ARG_TYPES}(funcname, gtype) \ +int \ +funcname (SearpcClient *client, ${args}, \ + AsyncCallback callback, void *user_data) \ +{ \ + char *fcall; \ + size_t fcall_len; \ + \ + fcall = searpc_client_fcall__${arg_types} (#funcname, \ + ${fcall_args}); \ + return searpc_client_async_call (client, fcall, \ + fcall_len, callback, "${ret_type}", \ + gtype, user_data); \ +} +""" + + +def gen_async_dfun_macro(ret_type, arg_types): + template = string.Template(async_dfun_template) + + if len(arg_types) == 0: + arg_types_str = "void" + else: + arg_types_str = "_".join(arg_types) + + args = "" + for i, arg_type in enumerate(arg_types): + args += type_table[arg_type][0] + " param" + str(i+1) + + fcall_args = "" + for i, arg_type in enumerate(arg_types): + fcall_args += " param" + str(i+1) + ", " + fcall_args += "&fcall_len" + + return template.substitute(ret_type=ret_type, RET_TYPE=ret_type.upper(), + arg_types=arg_types_str, + ARG_TYPES=arg_types_str.upper(), + args=args, fcall_args=fcall_args) + + +def gen_async_dfun_macro_list(): + from rpc_table import func_table + for item in func_table: + print gen_async_dfun_macro(item[0], item[1]) + if __name__ == "__main__": command = sys.argv[1] if command == "gen-marshal": @@ -342,5 +389,6 @@ if __name__ == "__main__": gen_fcall_declare_list() elif command == "gen-dfun-macro": gen_dfun_macro_list() + gen_async_dfun_macro_list() else: print "Unknown command %s" % (command) diff --git a/lib/searpc-client.c b/lib/searpc-client.c index 475d3c1..9123c59 100644 --- a/lib/searpc-client.c +++ b/lib/searpc-client.c @@ -32,6 +32,71 @@ searpc_client_transport_send (SearpcClient *client, fcall_len, ret_len); } +typedef struct { + SearpcClient *client; + AsyncCallback callback; + const gchar *ret_type; + int gtype; + void *cbdata; +} AsyncCallData; + +int +searpc_client_generic_callback (char *retstr, size_t len, + void *vdata, const char *errstr) +{ + AsyncCallData *data = vdata; + GError *error = NULL; + void *result = NULL; + + if (errstr) { + g_set_error (&error, 0, 500, "Transport error: %s", errstr); + data->callback (NULL, data->cbdata, error); + g_error_free (error); + } else { + /* parse result and call the callback */ + if (strcmp(data->ret_type, "int") == 0) { + int ret = searpc_client_fret__int (retstr, len, &error); + printf ("ret is %d\n", ret); + result = (void *)(long)ret; + } else if (strcmp(data->ret_type, "string") == 0) { + result = (void *)searpc_client_fret__string (retstr, len, &error); + } else if (strcmp(data->ret_type, "object") == 0) { + result = (void *)searpc_client_fret__object ( + data->gtype, retstr, len, &error); + } else if (strcmp(data->ret_type, "objlist") == 0) { + result = (void *)searpc_client_fret__objlist ( + data->gtype, retstr, len, &error); + } + data->callback (result, data->cbdata, error); + } + g_free (data); +} + +int +searpc_client_async_call (SearpcClient *client, + const gchar *fcall_str, + size_t fcall_len, + AsyncCallback callback, + const gchar *ret_type, + int gtype, + void *cbdata) +{ + int ret; + AsyncCallData *data = g_new0(AsyncCallData, 1); + data->client = client; + data->callback = callback; + data->ret_type = ret_type; + data->gtype = gtype; + data->cbdata = cbdata; + + ret = client->async_send (client->async_arg, fcall_str, fcall_len, data); + if (ret < 0) { + g_free (data); + return -1; + } + return 0; +} + /* * serialize function call from array to string */ @@ -77,7 +142,7 @@ handle_ret_common (char *data, size_t len, JsonParser **parser, g_return_val_if_fail (root != 0 || object != 0, -1); *parser = json_parser_new (); - if (!json_parser_load_from_data (*parser, data, strlen(data), error)) { + if (!json_parser_load_from_data (*parser, data, len, error)) { g_object_unref (*parser); *parser = NULL; return -1; diff --git a/lib/searpc-client.h b/lib/searpc-client.h index 4abab89..8be753d 100644 --- a/lib/searpc-client.h +++ b/lib/searpc-client.h @@ -7,9 +7,19 @@ typedef char *(*TransportCB)(void *arg, const gchar *fcall_str, size_t fcall_len, size_t *ret_len); +/* rpc_priv is used by the rpc_client to store information related to + * this rpc call. */ +typedef int (*AsyncTransportSend)(void *arg, const gchar *fcall_str, + size_t fcall_len, void *rpc_priv); + +typedef void (*AsyncCallback) (void *result, void *user_data, GError *error); + typedef struct { TransportCB transport; void *arg; + + AsyncTransportSend async_send; + void *async_arg; } SearpcClient; SearpcClient *searpc_client_new (); @@ -21,6 +31,28 @@ char* searpc_client_transport_send (SearpcClient *client, size_t fcall_len, size_t *ret_len); + +/** + * Send the serialized function call to server. + * + * @fcall_str: the serialized function. + * @ret_type: the return type. + * @gtype: specify the type id if @ret_type is `object` or `objlist`, + * or 0 otherwise. + * @cbdata: the data that will be given to the callback. + */ +int searpc_client_async_call (SearpcClient *client, + const gchar *fcall_str, + size_t fcall_len, + AsyncCallback callback, + const gchar *ret_type, + int gtype, + void *cbdata); + +int +searpc_client_generic_callback (char *retstr, size_t len, + void *vdata, const char *errstr); + #include