From 9a1858aea54e68aeda570058142578c66c334f98 Mon Sep 17 00:00:00 2001 From: Xiangyue Cai Date: Thu, 25 Jul 2019 11:59:28 +0800 Subject: [PATCH] support python3 (#35) * use searpc-named-pipe-transport * delete unused files * remove not threaded rpc * use NamedPipeClient * remove not threaded rpc * remove unused code * remove unused rpc function * remove unused code and source files * Add name of rpc named pipe to ccent.h. --- include/ccnet.h | 94 +----- include/ccnet/Makefile.am | 4 +- include/ccnet/ccnetrpc-transport.h | 30 -- lib/Makefile.am | 7 +- lib/ccnet-rpc-wrapper.c | 292 ------------------ lib/ccnetrpc-transport.c | 184 ------------ lib/client-pool.c | 63 ---- lib/mainloop.c | 160 ---------- net/common/rpc-service.c | 443 +--------------------------- net/common/rpc-service.h | 68 +---- net/server/ccnet-server.c | 5 +- python/ccnet/Makefile.am | 13 +- python/ccnet/__init__.py | 8 +- python/ccnet/async/__init__.py | 14 - python/ccnet/async/async_client.py | 250 ---------------- python/ccnet/async/mqclientproc.py | 50 ---- python/ccnet/async/processor.py | 57 ---- python/ccnet/async/rpcserverproc.py | 43 --- python/ccnet/async/sendcmdproc.py | 34 --- python/ccnet/async/timer.py | 21 -- python/ccnet/client.py | 148 ---------- python/ccnet/errors.py | 7 - python/ccnet/message.py | 47 --- python/ccnet/packet.py | 129 -------- python/ccnet/pool.py | 36 --- python/ccnet/rpc.py | 178 +---------- python/ccnet/status_code.py | 77 ----- python/ccnet/sync_client.py | 94 ------ python/ccnet/test-client.py | 62 ---- python/ccnet/test-server.py | 45 --- python/ccnet/utils.py | 45 --- 31 files changed, 27 insertions(+), 2681 deletions(-) delete mode 100644 include/ccnet/ccnetrpc-transport.h delete mode 100644 lib/ccnetrpc-transport.c delete mode 100644 lib/client-pool.c delete mode 100644 lib/mainloop.c delete mode 100644 python/ccnet/async/__init__.py delete mode 100644 python/ccnet/async/async_client.py delete mode 100644 python/ccnet/async/mqclientproc.py delete mode 100644 python/ccnet/async/processor.py delete mode 100644 python/ccnet/async/rpcserverproc.py delete mode 100644 python/ccnet/async/sendcmdproc.py delete mode 100644 python/ccnet/async/timer.py delete mode 100644 python/ccnet/client.py delete mode 100644 python/ccnet/errors.py delete mode 100644 python/ccnet/message.py delete mode 100644 python/ccnet/packet.py delete mode 100644 python/ccnet/pool.py delete mode 100644 python/ccnet/status_code.py delete mode 100644 python/ccnet/sync_client.py delete mode 100644 python/ccnet/test-client.py delete mode 100644 python/ccnet/test-server.py delete mode 100644 python/ccnet/utils.py diff --git a/include/ccnet.h b/include/ccnet.h index d327d52..4bd41e6 100644 --- a/include/ccnet.h +++ b/include/ccnet.h @@ -25,71 +25,8 @@ #include -/* mainloop */ +#define CCNET_RPC_PIPE_NAME "ccnet-rpc.sock" -void ccnet_main (CcnetClient *client); - -typedef void (*RegisterServiceCB) (gboolean success); -void ccnet_register_service (CcnetClient *client, - const char *service, const char *group, - GType proc_type, RegisterServiceCB cb); -gboolean ccnet_register_service_sync (CcnetClient *client, - const char *service, - const char *group); -CcnetClient *ccnet_init (const char *central_config_dir, const char *confdir); - -void ccnet_send_command (CcnetClient *client, const char *command, - SendcmdProcRcvrspCallback cmd_cb, void *cbdata); - -void ccnet_add_peer (CcnetClient *client, const char *id, const char *addr); - -void ccnet_connect_peer (CcnetClient *client, const char *id); -void ccnet_disconnect_peer (CcnetClient *client, const char *id); - -/* client pool */ - -struct CcnetClientPool; -typedef struct CcnetClientPool CcnetClientPool; - -struct CcnetClientPool * -ccnet_client_pool_new (const char *central_config_dir, const char *conf_dir); - -CcnetClient * -ccnet_client_pool_get_client (struct CcnetClientPool *cpool); - -void -ccnet_client_pool_return_client (struct CcnetClientPool *cpool, - CcnetClient *client); - -/* rpc wrapper */ - -/* Create rpc client using a single client for transport. */ -SearpcClient * -ccnet_create_rpc_client (CcnetClient *cclient, const char *peer_id, - const char *service_name); - -/* Create rpc client using client pool for transport. */ -SearpcClient * -ccnet_create_pooled_rpc_client (struct CcnetClientPool *cpool, - const char *peer_id, - const char *service); - -SearpcClient * -ccnet_create_async_rpc_client (CcnetClient *cclient, const char *peer_id, - const char *service_name); - -void ccnet_rpc_client_free (SearpcClient *client); -void ccnet_async_rpc_client_free (SearpcClient *client); - -CcnetPeer *ccnet_get_peer (SearpcClient *client, const char *peer_id); -CcnetPeer *ccnet_get_peer_by_idname (SearpcClient *client, const char *idname); -int ccnet_get_peer_net_state (SearpcClient *client, const char *peer_id); -int ccnet_get_peer_bind_status (SearpcClient *client, const char *peer_id); -int ccnet_peer_is_ready (SearpcClient *client, const char *peer_id); -CcnetPeer *ccnet_get_default_relay (SearpcClient *client); -GList *ccnet_get_peers_by_role (SearpcClient *client, const char *role); - -char *ccnet_get_binding_email (SearpcClient *client, const char *peer_id); GList *ccnet_get_groups_by_user (SearpcClient *client, const char *user, int return_ancestors); GList *ccnet_get_org_groups_by_user (SearpcClient *client, const char *user, int org_id); GList * @@ -97,33 +34,4 @@ ccnet_get_group_members (SearpcClient *client, int group_id); int ccnet_org_user_exists (SearpcClient *client, int org_id, const char *user); -int -ccnet_get_binding_email_async (SearpcClient *client, const char *peer_id, - AsyncCallback callback, void *user_data); - -char *ccnet_sign_message (SearpcClient *client, const char *message); -int ccnet_verify_message (SearpcClient *client, - const char *message, - const char *sig_base64, - const char *peer_id); - -char * -ccnet_pubkey_encrypt (SearpcClient *client, - const char *msg_base64, - const char *peer_id); - -char * -ccnet_privkey_decrypt (SearpcClient *client, const char *msg_base64); - -char *ccnet_get_config (SearpcClient *client, const char *key); -int ccnet_set_config (SearpcClient *client, const char *key, const char *value); - -void -ccnet_login_to_relay (SearpcClient *client, const char *relay_id, - const char *username, const char *passwd); - -int -ccnet_update_peer_address (SearpcClient *client, const char *peer_id, - const char *addr, int port); - #endif diff --git a/include/ccnet/Makefile.am b/include/ccnet/Makefile.am index 704cbdd..450c85d 100644 --- a/include/ccnet/Makefile.am +++ b/include/ccnet/Makefile.am @@ -7,5 +7,5 @@ ccnet_HEADERS = ccnet-client.h peer.h proc-factory.h \ mqclient-proc.h invoke-service-proc.h \ status-code.h cevent.h timer.h ccnet-session-base.h \ valid-check.h job-mgr.h packet.h \ - async-rpc-proc.h ccnetrpc-transport.h \ - rpcserver-proc.h threaded-rpcserver-proc.h + async-rpc-proc.h rpcserver-proc.h \ + threaded-rpcserver-proc.h diff --git a/include/ccnet/ccnetrpc-transport.h b/include/ccnet/ccnetrpc-transport.h deleted file mode 100644 index 5c6a696..0000000 --- a/include/ccnet/ccnetrpc-transport.h +++ /dev/null @@ -1,30 +0,0 @@ -#ifndef CCNETRPC_TRANPORT_H -#define CCNETRPC_TRANPORT_H - -#include - -typedef struct { - /* either session or pool will be set. */ - CcnetClient *session; - CcnetClientPool *pool; - char *peer_id; /* NULL if local */ - char *service; -} CcnetrpcTransportParam; /* this structure will be parsed to - * ccnet_transport_send () - */ - -typedef struct { - CcnetClient *session; - char *peer_id; /* NULL if local */ - char *service; -} CcnetrpcAsyncTransportParam; /* this structure will be parsed to - * ccnet_async_transport_send () - */ - -char *ccnetrpc_transport_send (void *arg, - const gchar *fcall_str, size_t fcall_len, size_t *ret_len); - -int ccnetrpc_async_transport_send (void *arg, gchar *fcall_str, - size_t fcall_len, void *rpc_priv); - -#endif /* SEARPC_TRANPORT_H */ diff --git a/lib/Makefile.am b/lib/Makefile.am index f744675..0d28c47 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -38,11 +38,10 @@ libccnet_la_SOURCES = ccnet-client.c packet-io.c libccnet_utils.c \ peer.c sendcmd-proc.c \ mqclient-proc.c invoke-service-proc.c \ marshal.c \ - mainloop.c cevent.c timer.c ccnet-session-base.c job-mgr.c \ - rpcserver-proc.c ccnetrpc-transport.c threaded-rpcserver-proc.c \ + cevent.c timer.c ccnet-session-base.c job-mgr.c \ + rpcserver-proc.c threaded-rpcserver-proc.c \ ccnetobj.c \ - async-rpc-proc.c ccnet-rpc-wrapper.c \ - client-pool.c + async-rpc-proc.c ccnet-rpc-wrapper.c EXTRA_DIST = ccnetobj.vala rpc_table.py diff --git a/lib/ccnet-rpc-wrapper.c b/lib/ccnet-rpc-wrapper.c index 94b8ca6..ede13fb 100644 --- a/lib/ccnet-rpc-wrapper.c +++ b/lib/ccnet-rpc-wrapper.c @@ -5,180 +5,6 @@ #include #include #include -#include - - -SearpcClient * -ccnet_create_rpc_client (CcnetClient *cclient, - const char *peer_id, - const char *service_name) -{ - SearpcClient *rpc_client; - CcnetrpcTransportParam *priv; - - priv = g_new0(CcnetrpcTransportParam, 1); - priv->session = cclient; - priv->peer_id = g_strdup(peer_id); - priv->service = g_strdup(service_name); - - rpc_client = searpc_client_new (); - rpc_client->send = ccnetrpc_transport_send; - rpc_client->arg = priv; - - return rpc_client; -} - -SearpcClient * -ccnet_create_pooled_rpc_client (struct CcnetClientPool *cpool, - const char *peer_id, - const char *service) -{ - SearpcClient *rpc_client; - CcnetrpcTransportParam *priv; - - priv = g_new0(CcnetrpcTransportParam, 1); - priv->pool = cpool; - priv->peer_id = g_strdup(peer_id); - priv->service = g_strdup(service); - - rpc_client = searpc_client_new (); - rpc_client->send = ccnetrpc_transport_send; - rpc_client->arg = priv; - - return rpc_client; -} - -SearpcClient * -ccnet_create_async_rpc_client (CcnetClient *cclient, const char *peer_id, - const char *service_name) -{ - SearpcClient *rpc_client; - CcnetrpcAsyncTransportParam *async_priv; - - async_priv = g_new0 (CcnetrpcAsyncTransportParam, 1); - async_priv->session = cclient; - async_priv->peer_id = g_strdup(peer_id); - async_priv->service = g_strdup (service_name); - - rpc_client = searpc_client_new (); - - rpc_client->async_send = ccnetrpc_async_transport_send; - rpc_client->async_arg = async_priv; - - return rpc_client; -} - -void -ccnet_rpc_client_free (SearpcClient *client) -{ - CcnetrpcTransportParam *priv; - - if (!client) return; - - priv = client->arg; - - g_free (priv->peer_id); - g_free (priv->service); - g_free (priv); - - searpc_client_free (client); -} - -void -ccnet_async_rpc_client_free (SearpcClient *client) -{ - CcnetrpcAsyncTransportParam *priv = client->arg; - - g_free (priv->peer_id); - g_free (priv->service); - g_free (priv); - - searpc_client_free (client); -} - -CcnetPeer * -ccnet_get_peer (SearpcClient *client, const char *peer_id) -{ - if (!peer_id) - return NULL; - - return (CcnetPeer *) searpc_client_call__object( - client, "get_peer",CCNET_TYPE_PEER, NULL, - 1, "string", peer_id); -} - -CcnetPeer * -ccnet_get_peer_by_idname (SearpcClient *client, const char *idname) -{ - if (!idname) - return NULL; - return (CcnetPeer *) searpc_client_call__object( - client, "get_peer_by_idname", CCNET_TYPE_PEER, NULL, - 1, "string", idname); -} - -int -ccnet_get_peer_net_state (SearpcClient *client, const char *peer_id) -{ - CcnetPeer *peer; - int ret; - peer = ccnet_get_peer (client, peer_id); - if (!peer) - return PEER_DOWN; - ret = peer->net_state; - g_object_unref (peer); - return ret; -} - -int -ccnet_get_peer_bind_status (SearpcClient *client, const char *peer_id) -{ - CcnetPeer *peer; - int ret; - peer = ccnet_get_peer (client, peer_id); - if (!peer) - return BIND_UNKNOWN; - ret = peer->bind_status; - g_object_unref (peer); - return ret; -} - -CcnetPeer * -ccnet_get_default_relay (SearpcClient *client) -{ - CcnetSessionBase *base = (CcnetSessionBase *) - searpc_client_call__object( - client, "get_session_info", CCNET_TYPE_SESSION_BASE, NULL, 0); - - if (!base) - return NULL; - - CcnetPeer *relay = ccnet_get_peer (client, base->relay_id); - g_object_unref (base); - return relay; -} - -GList * -ccnet_get_peers_by_role (SearpcClient *client, const char *role) -{ - if (!role) - return NULL; - - return searpc_client_call__objlist ( - client, "get_peers_by_role", CCNET_TYPE_PEER, NULL, - 1, "string", role); -} - -char * -ccnet_get_binding_email (SearpcClient *client, const char *peer_id) -{ - if (!peer_id) - return NULL; - - return searpc_client_call__string ( - client, "get_binding_email", NULL, - 1, "string", peer_id); -} GList * ccnet_get_groups_by_user (SearpcClient *client, const char *user, int return_ancestors) @@ -216,121 +42,3 @@ ccnet_org_user_exists (SearpcClient *client, int org_id, const char *user) return searpc_client_call__int (client, "org_user_exists", NULL, 2, "int", org_id, "string", user); } - -#if 0 -int -ccnet_get_peer_async (SearpcClient *client, const char *peer_id, - AsyncCallback callback, void *user_data) -{ - return get_peer_async (client, peer_id, callback, user_data); -} -#endif - -int -ccnet_get_binding_email_async (SearpcClient *client, const char *peer_id, - AsyncCallback callback, void *user_data) -{ - return searpc_client_async_call__string ( - client, "get_binding_email", callback, user_data, - 1, "string", peer_id); -} - -char * -ccnet_sign_message (SearpcClient *client, const char *message) -{ - if (!message) - return NULL; - - return searpc_client_call__string ( - client, "sign_message", NULL, - 1, "string", message); -} - -int -ccnet_verify_message (SearpcClient *client, - const char *message, - const char *sig_base64, - const char *peer_id) -{ - if (!message || !sig_base64 || !peer_id) - return -1; - - return searpc_client_call__int ( - client, "verify_message", NULL, - 3, "string", message, "string", sig_base64, "string", peer_id); -} - -char * -ccnet_pubkey_encrypt (SearpcClient *client, - const char *msg_base64, - const char *peer_id) -{ - if (!msg_base64 || !peer_id) - return NULL; - - return searpc_client_call__string (client, "pubkey_encrypt", NULL, 2, - "string", msg_base64, "string", peer_id); -} - -char * -ccnet_privkey_decrypt (SearpcClient *client, const char *msg_base64) -{ - if (!msg_base64) - return NULL; - - return searpc_client_call__string (client, "privkey_decrypt", NULL, 1, - "string", msg_base64); -} - -char * -ccnet_get_config (SearpcClient *client, const char *key) -{ - if (!key) - return NULL; - - return searpc_client_call__string ( - client, "get_config", NULL, - 1, "string", key); -} - -int -ccnet_set_config (SearpcClient *client, const char *key, const char *value) -{ - if (!key || !value) - return -1; - - return searpc_client_call__int ( - client, "set_config", NULL, - 2, "string", key, "string", value); -} - -void -ccnet_login_to_relay (SearpcClient *client, const char *relay_id, - const char *username, const char *passwd) -{ - searpc_client_call__int (client, "login_relay", NULL, - 3, "string", relay_id, - "string", username, "string", passwd); -} - -gboolean -ccnet_peer_is_ready (SearpcClient *client, const char *peer_id) -{ - CcnetPeer *peer; - gboolean ret; - peer = ccnet_get_peer (client, peer_id); - if (!peer) - return FALSE; - ret = peer->is_ready; - g_object_unref (peer); - return ret; -} - -int -ccnet_update_peer_address (SearpcClient *client, const char *peer_id, - const char *addr, int port) -{ - return searpc_client_call__int ( - client, "update_peer_address", NULL, - 3, "string", peer_id, "string", addr, "int", port); -} diff --git a/lib/ccnetrpc-transport.c b/lib/ccnetrpc-transport.c deleted file mode 100644 index 9da452d..0000000 --- a/lib/ccnetrpc-transport.c +++ /dev/null @@ -1,184 +0,0 @@ - -#include -#include -#include - -#include -#include "rpc-common.h" -#include - -static char * -invoke_service (CcnetClient *session, - const char *peer_id, - const char *service, - const char *fcall_str, - size_t fcall_len, - size_t *ret_len) -{ - struct CcnetResponse *rsp; - uint32_t req_id; - GString *buf; - - req_id = ccnet_client_get_rpc_request_id (session, peer_id, service); - if (req_id == 0) { - *ret_len = 0; - return NULL; - } - - ccnet_client_send_update (session, req_id, - SC_CLIENT_CALL, SS_CLIENT_CALL, - fcall_str, fcall_len); - - if (ccnet_client_read_response (session) < 0) { - *ret_len = 0; - ccnet_client_clean_rpc_request (session, req_id); - return NULL; - } - rsp = &session->response; - - if (memcmp (rsp->code, SC_SERVER_RET, 3) == 0) { - *ret_len = (size_t) rsp->clen; - return g_strndup (rsp->content, rsp->clen); - } else if (memcmp (rsp->code, SC_SERVER_MORE, 3) != 0) { - g_warning ("[Sea RPC] Bad response: %s %s.\n", rsp->code, rsp->code_msg); - *ret_len = 0; - return NULL; - } - - buf = g_string_new_len (rsp->content, rsp->clen); - while (1) { - ccnet_client_send_update (session, req_id, - SC_CLIENT_MORE, SS_CLIENT_MORE, - fcall_str, fcall_len); - - if (ccnet_client_read_response (session) < 0) { - *ret_len = 0; - ccnet_client_clean_rpc_request (session, req_id); - g_string_free (buf, TRUE); - return NULL; - } - rsp = &session->response; - - if (memcmp (rsp->code, SC_SERVER_RET, 3) == 0) { - g_string_append_len (buf, rsp->content, rsp->clen); - *ret_len = buf->len; - return g_string_free (buf, FALSE); - } else if (memcmp (rsp->code, SC_SERVER_MORE, 3) == 0) { - g_string_append_len (buf, rsp->content, rsp->clen); - } else { - g_warning ("[Sea RPC] Bad response: %s %s.\n", - rsp->code, rsp->code_msg); - *ret_len = 0; - g_string_free (buf, TRUE); - return NULL; - } - } - - /* Never reach here. */ - return NULL; -} - -static CcnetClient * -create_new_client (const char *central_config_dir, const char *conf_dir) -{ - CcnetClient *client; - - client = ccnet_client_new (); - if (ccnet_client_load_confdir (client, central_config_dir, conf_dir) < 0) { - g_warning ("[Sea RPC] Failed to load conf dir.\n"); - g_object_unref (client); - return NULL; - } - if (ccnet_client_connect_daemon (client, CCNET_CLIENT_SYNC) < 0) { - g_warning ("[Sea RPC] Failed to connect ccnet.\n"); - g_object_unref (client); - return NULL; - } - - return client; -} - -char * -ccnetrpc_transport_send (void *arg, const gchar *fcall_str, - size_t fcall_len, size_t *ret_len) -{ - CcnetrpcTransportParam *priv; - CcnetClient *session, *new_session; - - g_warn_if_fail (arg != NULL && fcall_str != NULL); - - priv = (CcnetrpcTransportParam *)arg; - - if (priv->session != NULL) { - /* Use single ccnet client as transport. */ - return invoke_service (priv->session, priv->peer_id, priv->service, - fcall_str, fcall_len, ret_len); - } else { - /* Use client pool as transport. */ - - session = ccnet_client_pool_get_client (priv->pool); - if (!session) { - g_warning ("[Sea RPC] Failed to get client from pool.\n"); - *ret_len = 0; - return NULL; - } - - char *ret = invoke_service (session, priv->peer_id, priv->service, - fcall_str, fcall_len, ret_len); - if (ret != NULL) { - ccnet_client_pool_return_client (priv->pool, session); - return ret; - } - - /* If we failed to send data through the ccnet client returned by - * client pool, ccnet may have been restarted. - * In this case, we create a new ccnet client and put it into - * the client pool after use. - */ - - g_message ("[Sea RPC] Ccnet disconnected. Connect again.\n"); - - new_session = create_new_client (session->central_config_dir, session->config_dir); - if (!new_session) { - *ret_len = 0; - return NULL; - } - g_object_unref (session); - - ret = invoke_service (new_session, priv->peer_id, priv->service, - fcall_str, fcall_len, ret_len); - if (ret != NULL) - ccnet_client_pool_return_client (priv->pool, new_session); - else - g_object_unref (new_session); - - return ret; - } -} - - -int -ccnetrpc_async_transport_send (void *arg, gchar *fcall_str, - size_t fcall_len, void *rpc_priv) -{ - CcnetrpcAsyncTransportParam *priv; - CcnetClient *session; - CcnetProcessor *proc; - - g_warn_if_fail (arg != NULL && fcall_str != NULL); - - priv = (CcnetrpcAsyncTransportParam *)arg; - session = priv->session; - - if (!priv->peer_id) - proc = ccnet_proc_factory_create_master_processor ( - session->proc_factory, "async-rpc"); - else - proc = ccnet_proc_factory_create_remote_master_processor ( - session->proc_factory, "async-rpc", priv->peer_id); - - ccnet_async_rpc_proc_set_rpc ((CcnetAsyncRpcProc *)proc, priv->service, - fcall_str, fcall_len, rpc_priv); - ccnet_processor_start (proc, 0, NULL); - return 0; -} diff --git a/lib/client-pool.c b/lib/client-pool.c deleted file mode 100644 index 4b0b423..0000000 --- a/lib/client-pool.c +++ /dev/null @@ -1,63 +0,0 @@ -#include "include.h" - -#include -#include - -#include -#include - -struct CcnetClientPool { - GQueue *clients; - pthread_mutex_t lock; - const char *central_config_dir; - const char *conf_dir; -}; - -struct CcnetClientPool * -ccnet_client_pool_new (const char *central_config_dir, const char *conf_dir) -{ - CcnetClientPool *pool = g_new0 (CcnetClientPool, 1); - - pool->clients = g_queue_new (); - pthread_mutex_init (&pool->lock, NULL); - pool->conf_dir = g_strdup(conf_dir); - pool->central_config_dir = g_strdup(central_config_dir); - - return pool; -} - -CcnetClient * -ccnet_client_pool_get_client (struct CcnetClientPool *cpool) -{ - CcnetClient *client; - - pthread_mutex_lock (&cpool->lock); - client = g_queue_pop_head (cpool->clients); - pthread_mutex_unlock (&cpool->lock); - - if (!client) { - client = ccnet_client_new (); - if (ccnet_client_load_confdir (client, cpool->central_config_dir, cpool->conf_dir) < 0) { - g_warning ("[client pool] Failed to load conf dir.\n"); - g_object_unref (client); - return NULL; - } - if (ccnet_client_connect_daemon (client, CCNET_CLIENT_SYNC) < 0) { - g_warning ("[client pool] Failed to connect.\n"); - g_object_unref (client); - return NULL; - } - } - - return client; -} - -void -ccnet_client_pool_return_client (struct CcnetClientPool *cpool, - CcnetClient *client) -{ - pthread_mutex_lock (&cpool->lock); - g_queue_push_tail (cpool->clients, client); - pthread_mutex_unlock (&cpool->lock); -} - diff --git a/lib/mainloop.c b/lib/mainloop.c deleted file mode 100644 index 08c00e8..0000000 --- a/lib/mainloop.c +++ /dev/null @@ -1,160 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -#include "include.h" -#include - -#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) -#include -#include -#include -#else -#include -#endif - -static int -cmdrsp_cb (const char *code, char *content, int clen, void *data) -{ - RegisterServiceCB cb = data; - - if (clen != 0) - ccnet_debug ("Receive cmd response {\n%s}\n", content); - else - ccnet_debug ("Receive cmd response null\n"); - - if (cb) { - if (memcmp (SC_SERV_EXISTED, code, 3) == 0) - cb (FALSE); - else - cb (TRUE); - } - return 0; -} - - -void -ccnet_register_service (CcnetClient *client, - const char *service, - const char *group, - GType proc_type, - RegisterServiceCB cb) -{ - char buf[512]; - g_return_if_fail (group); - - ccnet_proc_factory_register_processor (client->proc_factory, - service, - proc_type); - snprintf (buf, 512, "register-service %s %s", service, group); - ccnet_send_command (client, buf, cmdrsp_cb, cb); -} - -gboolean -ccnet_register_service_sync (CcnetClient *client, - const char *service, - const char *group) -{ - char buf[512]; - GError *error = NULL; - - snprintf (buf, 512, "register-service %s %s", service, group); - ccnet_client_send_cmd (client, buf, &error); - if (error) { - ccnet_warning ("Bad response for register service %s: %d %s", - service, error->code, error->message); - return FALSE; - } - return TRUE; -} - -static void read_cb (evutil_socket_t fd, short event, void *vclient) -{ - CcnetClient *client = vclient; - - if (ccnet_client_read_input (client) <= 0) { - ccnet_client_disconnect_daemon (client); - exit (1); - } -} - - -/** - * Inititialize ccnet client structure, connect daemon and initialize - * event loop. - */ -CcnetClient * -ccnet_init (const char *central_config_dir, const char *confdir) -{ - CcnetClient *client; - - client = ccnet_client_new (); - if ( (ccnet_client_load_confdir(client, central_config_dir, confdir)) < 0 ) { - ccnet_warning ("Read config dir error\n"); - return NULL; - } - - - if (ccnet_client_connect_daemon (client, CCNET_CLIENT_ASYNC) < 0) { - ccnet_warning ("Connect to ccnet daemon error\n"); - exit(1); - } - - ccnet_client_run_synchronizer (client); - - event_init (); - - return client; -} - -void -ccnet_main (CcnetClient *client) -{ - struct event ev; - - event_set (&ev, client->connfd, EV_READ | EV_PERSIST, read_cb, client); - event_add (&ev, NULL); - - event_dispatch (); -} - -void ccnet_send_command (CcnetClient *client, const char *command, - SendcmdProcRcvrspCallback cmd_cb, void *cbdata) -{ - CcnetSendcmdProc *sendcmd_proc = (CcnetSendcmdProc *) - ccnet_proc_factory_create_master_processor (client->proc_factory, - "send-cmd"); - ccnet_sendcmd_proc_set_rcvrsp_cb (sendcmd_proc, cmd_cb, cbdata); - ccnet_processor_start (CCNET_PROCESSOR(sendcmd_proc), 0, NULL); - ccnet_sendcmd_proc_send_command (sendcmd_proc, command); -} - -/* add-peer [--id ] [--addr ] - */ -void ccnet_add_peer (CcnetClient *client, const char *id, const char *addr) -{ - char buf[256]; - if (id == NULL || strlen(id) != 40 || addr == NULL) - return; - - snprintf (buf, 256, "add-peer --id %s --addr %s", id, addr); - ccnet_send_command (client, buf, NULL, NULL); -} - -void ccnet_connect_peer (CcnetClient *client, const char *id) -{ - char buf[256]; - if (id == NULL || strlen(id) != 40) - return; - - snprintf (buf, 256, "connect %s", id); - ccnet_send_command (client, buf, NULL, NULL); -} - -void ccnet_disconnect_peer (CcnetClient *client, const char *id) -{ - char buf[256]; - if (id == NULL || strlen(id) != 40) - return; - - snprintf (buf, 256, "disconnect %s", id); - ccnet_send_command (client, buf, NULL, NULL); -} diff --git a/net/common/rpc-service.c b/net/common/rpc-service.c index 9df8515..ad81c39 100644 --- a/net/common/rpc-service.c +++ b/net/common/rpc-service.c @@ -15,10 +15,6 @@ #include "ccnet-object.h" -#include "processors/rpcserver-proc.h" -#ifdef CCNET_SERVER -#include "processors/threaded-rpcserver-proc.h" -#endif #include "searpc-server.h" #include "ccnet-config.h" @@ -36,109 +32,24 @@ extern CcnetSession *session; #include +#include #include "searpc-signature.h" #include "searpc-marshal.h" -void +#define CCNET_SOCKET_NAME "ccnet-rpc.sock" + +int ccnet_start_rpc(CcnetSession *session) { searpc_server_init (register_marshals); - searpc_create_service ("ccnet-rpcserver"); - ccnet_proc_factory_register_processor (session->proc_factory, - "ccnet-rpcserver", - CCNET_TYPE_RPCSERVER_PROC); - #ifdef CCNET_SERVER searpc_create_service ("ccnet-threaded-rpcserver"); - ccnet_proc_factory_register_processor (session->proc_factory, - "ccnet-threaded-rpcserver", - CCNET_TYPE_THREADED_RPCSERVER_PROC); #endif - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_list_peers, - "list_peers", - searpc_signature_string__void()); - - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_list_resolving_peers, - "list_resolving_peers", - searpc_signature_objlist__void()); - - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_get_peers_by_role, - "get_peers_by_role", - searpc_signature_objlist__string()); - - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_get_peer, - "get_peer", - searpc_signature_object__string()); - - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_get_peer_by_idname, - "get_peer_by_idname", - searpc_signature_object__string()); - - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_update_peer_address, - "update_peer_address", - searpc_signature_int__string_string_int()); - - - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_get_session_info, - "get_session_info", - searpc_signature_object__void()); - - - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_add_client, - "add_client", - searpc_signature_int__string()); - - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_add_role, - "add_role", - searpc_signature_int__string_string()); - - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_remove_role, - "remove_role", - searpc_signature_int__string_string()); - - - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_get_config, - "get_config", - searpc_signature_string__string()); - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_set_config, - "set_config", - searpc_signature_int__string_string()); - - /* RSA encrypt a message with peer's public key. */ - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_pubkey_encrypt, - "pubkey_encrypt", - searpc_signature_string__string_string()); - - /* RSA decrypt a message with my private key. */ - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_privkey_decrypt, - "privkey_decrypt", - searpc_signature_string__string()); - #ifdef CCNET_SERVER - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_list_peer_stat, - "list_peer_stat", - searpc_signature_objlist__void()); - - searpc_server_register_function ("ccnet-threaded-rpcserver", ccnet_rpc_add_emailuser, "add_emailuser", @@ -200,18 +111,6 @@ ccnet_start_rpc(CcnetSession *session) "get_emailusers_in_list", searpc_signature_objlist__string_string()); - /* RSA sign a message with my private key. */ - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_sign_message, - "sign_message", - searpc_signature_string__string()); - - /* Verify a message with a peer's public key */ - searpc_server_register_function ("ccnet-rpcserver", - ccnet_rpc_verify_message, - "verify_message", - searpc_signature_int__string_string_string()); - searpc_server_register_function ("ccnet-threaded-rpcserver", ccnet_rpc_create_group, "create_group", @@ -413,248 +312,17 @@ ccnet_start_rpc(CcnetSession *session) #endif /* CCNET_SERVER */ -} - -char * -ccnet_rpc_list_peers(GError **error) -{ - CcnetPeerManager *peer_mgr = session->peer_mgr; - GList *peer_list, *ptr; - GString *result; - CcnetPeer *peer; - - peer_list = ccnet_peer_manager_get_peer_list(peer_mgr); - if (peer_list == NULL) { - g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Failed to get peer list"); - return NULL; - } - - result = g_string_new(""); - ptr = peer_list; - while (ptr) { - peer = ptr->data; - g_string_append_printf(result, "%s\n", peer->id); - ptr = ptr->next; - } - g_list_free(peer_list); - - return g_string_free(result, FALSE); -} - -GList * -ccnet_rpc_list_resolving_peers (GError **error) -{ - CcnetPeerManager *peer_mgr = session->peer_mgr; - return ccnet_peer_manager_get_resolve_peers(peer_mgr); -} - -GList * -ccnet_rpc_get_peers_by_role(const char *role, GError **error) -{ - CcnetPeerManager *peer_mgr = session->peer_mgr; - return ccnet_peer_manager_get_peers_with_role (peer_mgr, role); -} - - -GObject * -ccnet_rpc_get_peer(const char *peer_id, GError **error) -{ - if (!peer_id) - return NULL; - - CcnetPeerManager *peer_mgr = session->peer_mgr; - CcnetPeer *peer = ccnet_peer_manager_get_peer(peer_mgr, peer_id); - return (GObject*)peer; -} - -int -ccnet_rpc_update_peer_address (const char *peer_id, - const char *addr, - int port, - GError **error) -{ - if (!peer_id || !addr || port <= 0 || port > 65536) { - g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Invalid arguments"); + char *path = g_build_filename (session->config_dir, CCNET_SOCKET_NAME, NULL); + SearpcNamedPipeServer *server = searpc_create_named_pipe_server (path); + if (!server) { + ccnet_warning ("Failed to create named pipe server.\n"); + g_free (path); return -1; } - CcnetPeer *peer = ccnet_peer_manager_get_peer (session->peer_mgr, peer_id); - if (!peer) { - return -1; - } - - ccnet_peer_manager_set_peer_public_addr (session->peer_mgr, peer, addr, port); - g_object_unref (peer); - - return 0; + return searpc_named_pipe_server_start (server); } -GObject * -ccnet_rpc_get_peer_by_idname(const char *idname, GError **error) -{ - if (!idname) - return NULL; - - CcnetPeerManager *peer_mgr = session->peer_mgr; - CcnetPeer *peer = ccnet_peer_manager_get_peer(peer_mgr, idname); - if (!peer) - peer = ccnet_peer_manager_get_peer_by_name (peer_mgr, idname); - if (peer) { - return (GObject*)peer; - } - return NULL; -} - -GObject * -ccnet_rpc_get_session_info(GError **error) -{ - g_object_ref (session); - return (GObject*)session; -} - -int -ccnet_rpc_add_client(const char *peer_id, GError **error) -{ - CcnetPeerManager *mgr = session->peer_mgr; - CcnetPeer *peer; - - if (strlen(peer_id) != 40) { - g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Peer ID must be of length 40"); - return -1; - } - - peer = ccnet_peer_manager_get_peer (mgr, peer_id); - if (!peer) { - peer = ccnet_peer_new (peer_id); - ccnet_peer_manager_add_peer (mgr, peer); - } - - ccnet_peer_manager_add_role (mgr, peer, "MyClient"); - g_object_unref (peer); - return 0; -} - -int -ccnet_rpc_add_role(const char *peer_id, const char *role, GError **error) -{ - CcnetPeerManager *mgr = session->peer_mgr; - CcnetPeer *peer; - - if (!peer_id || strlen(peer_id) != 40) { - g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Peer ID must be of length 40"); - return -1; - } - - if (!role || strlen(role) <= 2) { - g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Invalid role"); - return -1; - } - - peer = ccnet_peer_manager_get_peer (mgr, peer_id); - if (!peer) { - g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "No such peer"); - return -1; - } - ccnet_peer_manager_add_role (mgr, peer, role); - - g_object_unref (peer); - return 0; -} - -int -ccnet_rpc_remove_role(const char *peer_id, const char *role, GError **error) -{ - CcnetPeerManager *mgr = session->peer_mgr; - CcnetPeer *peer; - - if (!peer_id || strlen(peer_id) != 40) { - g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Peer ID must be of length 40"); - return -1; - } - - if (!role || strlen(role) <= 2) { - g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Invalid role"); - return -1; - } - - peer = ccnet_peer_manager_get_peer (mgr, peer_id); - if (!peer) { - g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "No such peer"); - return -1; - } - - ccnet_peer_manager_remove_role (mgr, peer, role); - - g_object_unref (peer); - return 0; -} - -char * -ccnet_rpc_get_config (const char *key, GError **error) -{ - return ccnet_session_config_get_string (session, key); -} - -int -ccnet_rpc_set_config (const char *key, const char *value, GError **error) -{ - return ccnet_session_config_set_string (session, key, value); -} - -char * -ccnet_rpc_pubkey_encrypt (const char *msg_base64, const char *peer_id, GError **error) -{ - unsigned char *msg; - gsize msg_len; - CcnetPeer *peer; - unsigned char *enc_msg; - int enc_msg_len; - char *ret; - - peer = ccnet_peer_manager_get_peer (session->peer_mgr, peer_id); - if (!peer) { - g_warning ("Cannot find peer %s.\n", peer_id); - return NULL; - } - - msg = g_base64_decode (msg_base64, &msg_len); - - enc_msg = public_key_encrypt (peer->pubkey, msg, (int)msg_len, &enc_msg_len); - - ret = g_base64_encode (enc_msg, enc_msg_len); - - g_free (msg); - g_free (enc_msg); - g_object_unref (peer); - return ret; -} - -char * -ccnet_rpc_privkey_decrypt (const char *msg_base64, GError **error) -{ - unsigned char *msg; - gsize msg_len; - unsigned char *dec_msg; - int dec_msg_len; - char *ret; - - msg = g_base64_decode (msg_base64, &msg_len); - - dec_msg = private_key_decrypt (session->privkey, msg, (int)msg_len, &dec_msg_len); - - if (dec_msg_len < 0) { - g_warning ("Failed to decrypt message with RSA priv key.\n"); - g_set_error (error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Failed to decrypt"); - g_free (msg); - return NULL; - } - - ret = g_base64_encode (dec_msg, dec_msg_len); - - g_free (msg); - g_free (dec_msg); - return ret; -} #ifdef CCNET_SERVER @@ -662,48 +330,6 @@ ccnet_rpc_privkey_decrypt (const char *msg_base64, GError **error) #include "group-mgr.h" #include "org-mgr.h" - -GList * -ccnet_rpc_list_peer_stat (GError **error) -{ - GList *res = NULL; - GList *ptr, *peer_list; - CcnetPeer *peer; - CcnetPeerManager *peer_mgr = session->peer_mgr; - - peer_list = ccnet_peer_manager_get_peer_list(peer_mgr); - if (peer_list == NULL) { - g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Failed to get peer list"); - return NULL; - } - - ptr = peer_list; - while (ptr) { - peer = ptr->data; - if (peer->is_self) { - ptr = ptr->next; - continue; - } - - guint proc_num = g_hash_table_size (peer->processors); - - CcnetPeerStat* stat = ccnet_peer_stat_new (); - g_object_set (stat, "id", peer->id, - "name", peer->name, - "ip", peer->addr_str, - "encrypt", peer->encrypt_channel, - "last_up", (gint64) peer->last_up, - "proc_num", (int)proc_num, - NULL); - res = g_list_prepend (res, stat); - ptr = ptr->next; - } - g_list_free(peer_list); - - return g_list_reverse (res); -} - - int ccnet_rpc_add_emailuser (const char *email, const char *passwd, int is_staff, int is_active, GError **error) @@ -965,55 +591,6 @@ ccnet_rpc_get_superusers (GError **error) return ccnet_user_manager_get_superusers(user_mgr); } -char * -ccnet_rpc_sign_message (const char *message, GError **error) -{ - unsigned char *sig; - unsigned int sig_len; - char *sigret; - - sig = g_new0(unsigned char, RSA_size(session->privkey)); - - if (!RSA_sign (NID_sha1, (const unsigned char *)message, strlen(message), - sig, &sig_len, session->privkey)) { - g_warning ("Failed to sign message: %lu.\n", ERR_get_error()); - return NULL; - } - - sigret = g_base64_encode (sig, sig_len); - g_free (sig); - - return sigret; -} - -int -ccnet_rpc_verify_message (const char *message, - const char *sig_base64, - const char *peer_id, - GError **error) -{ - unsigned char *sig; - gsize sig_len; - CcnetPeer *peer; - - sig = g_base64_decode (sig_base64, &sig_len); - - peer = ccnet_peer_manager_get_peer (session->peer_mgr, peer_id); - if (!peer) { - g_warning ("Cannot find peer %s.\n", peer_id); - return -1; - } - - if (!RSA_verify (NID_sha1, (const unsigned char *)message, strlen(message), - sig, (guint)sig_len, peer->pubkey)) { - g_object_unref (peer); - return -1; - } - - g_object_unref (peer); - return 0; -} - int ccnet_rpc_create_group (const char *group_name, const char *user_name, const char *type, int parent_group_id, GError **error) diff --git a/net/common/rpc-service.h b/net/common/rpc-service.h index 632bad7..fe769b4 100644 --- a/net/common/rpc-service.h +++ b/net/common/rpc-service.h @@ -5,22 +5,7 @@ struct CcnetSession; -void ccnet_start_rpc(CcnetSession *session); - -char *ccnet_rpc_list_peers(GError **error); -GList *ccnet_rpc_list_resolving_peers (GError **error); - - -GList* ccnet_rpc_get_peers_by_role(const char *role, GError **error); - - -GObject *ccnet_rpc_get_peer(const char *peerid, GError **error); - -int -ccnet_rpc_update_peer_address(const char *peer_id, const char *addr, - int port, GError **error); - -GObject *ccnet_rpc_get_peer_by_idname(const char *idname, GError **error); +int ccnet_start_rpc(CcnetSession *session); char * ccnet_rpc_list_users(GError **error); @@ -32,40 +17,9 @@ ccnet_rpc_get_user(const char *userid, GError **error); GObject * ccnet_rpc_get_user_of_peer(const char *peerid, GError **error); - -GObject *ccnet_rpc_get_session_info(GError **error); - -int -ccnet_rpc_add_client(const char *user_id, GError **error); - -int -ccnet_rpc_add_role(const char *user_id, const char *role, GError **error); - -int -ccnet_rpc_remove_role(const char *user_id, const char *role, GError **error); - - GList *ccnet_rpc_get_events(int offset, int limit, GError **error); int ccnet_rpc_count_event (GError **error); - -/** - * ccnet_get_config: - * - * Return the config value with key @key - */ -char * -ccnet_rpc_get_config (const char *key, GError **error); - -/** - * ccnet_rpc_set_config: - * - * Set the value of config item with key @key to @value - */ -int -ccnet_rpc_set_config (const char *key, const char *value, GError **error); - - /** * ccnet_rpc_upload_profile: * @@ -74,19 +28,8 @@ ccnet_rpc_set_config (const char *key, const char *value, GError **error); int ccnet_rpc_upload_profile (const char *relay_id, GError **error); -char * -ccnet_rpc_pubkey_encrypt (const char *msg_base64, - const char *peer_id, - GError **error); - -char * -ccnet_rpc_privkey_decrypt (const char *msg_base64, GError **error); - #ifdef CCNET_SERVER -GList * -ccnet_rpc_list_peer_stat (GError **error); - int ccnet_rpc_add_emailuser (const char *email, const char *passwd, int is_staff, int is_active, GError **error); @@ -170,15 +113,6 @@ ccnet_rpc_remove_one_binding (const char *email, const char *peer_id, GList * ccnet_rpc_get_peers_by_email (const char *email, GError **error); -char * -ccnet_rpc_sign_message (const char *message, GError **error); - -int -ccnet_rpc_verify_message (const char *message, - const char *sig_base64, - const char *peer_id, - GError **error); - int ccnet_rpc_create_group (const char *group_name, const char *user_name, const char *type, int parent_group_id, GError **error); diff --git a/net/server/ccnet-server.c b/net/server/ccnet-server.c index 9346b5f..efb39a6 100644 --- a/net/server/ccnet-server.c +++ b/net/server/ccnet-server.c @@ -360,7 +360,10 @@ main (int argc, char **argv) #endif ccnet_session_start (session); - ccnet_start_rpc(session); + if(ccnet_start_rpc(session) < 0) { + ccnet_warning ("Failed to start ccnet rpc server.\n"); + exit (-1); + } /* actually enter the event loop */ event_dispatch (); diff --git a/python/ccnet/Makefile.am b/python/ccnet/Makefile.am index 1098f21..6b3e6d5 100644 --- a/python/ccnet/Makefile.am +++ b/python/ccnet/Makefile.am @@ -1,13 +1,2 @@ ccnetdir=${pyexecdir}/ccnet - -ccnet_PYTHON = __init__.py errors.py status_code.py utils.py \ - packet.py message.py \ - client.py sync_client.py \ - pool.py rpc.py - -ccnet_asyncdir = ${ccnetdir}/async - -ccnet_async_PYTHON = async/__init__.py \ - async/async_client.py async/processor.py \ - async/rpcserverproc.py async/sendcmdproc.py \ - async/mqclientproc.py async/timer.py +ccnet_PYTHON = __init__.py rpc.py diff --git a/python/ccnet/__init__.py b/python/ccnet/__init__.py index 16ed084..53daff9 100644 --- a/python/ccnet/__init__.py +++ b/python/ccnet/__init__.py @@ -1,7 +1 @@ -from ccnet.errors import NetworkError -from ccnet.sync_client import SyncClient - -from ccnet.pool import ClientPool -from ccnet.rpc import RpcClientBase, CcnetRpcClient, CcnetThreadedRpcClient - -from ccnet.message import Message \ No newline at end of file +from ccnet.rpc import CcnetThreadedRpcClient diff --git a/python/ccnet/async/__init__.py b/python/ccnet/async/__init__.py deleted file mode 100644 index 4617964..0000000 --- a/python/ccnet/async/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -''' -@module: ccnet.async -@description: The async client of ccnet depends on python-libevent, -so we move it to a standalone package. - -''' -from .async_client import AsyncClient - -from .processor import Processor -from .rpcserverproc import RpcServerProc -from .sendcmdproc import SendCmdProc -from .mqclientproc import MqClientProc - -from .timer import Timer \ No newline at end of file diff --git a/python/ccnet/async/async_client.py b/python/ccnet/async/async_client.py deleted file mode 100644 index 280fa13..0000000 --- a/python/ccnet/async/async_client.py +++ /dev/null @@ -1,250 +0,0 @@ -import logging -import libevent - -from ccnet.client import Client, parse_update, parse_response - -from ccnet.packet import response_to_packet, parse_header, Packet -from ccnet.packet import to_response_id, to_master_id, to_slave_id, to_packet_id -from ccnet.packet import CCNET_MSG_REQUEST, CCNET_MSG_UPDATE, CCNET_MSG_RESPONSE, \ - CCNET_HEADER_LENGTH, CCNET_MAX_PACKET_LENGTH - -from ccnet.status_code import SC_PROC_DONE, SC_PROC_DEAD, SS_PROC_DEAD, \ - SC_UNKNOWN_SERVICE, SC_PROC_KEEPALIVE, SS_PROC_KEEPALIVE, SC_PERM_ERR - -from ccnet.status_code import PROC_NO_SERVICE, PROC_PERM_ERR, \ - PROC_BAD_RESP, PROC_REMOTE_DEAD - -from ccnet.errors import NetworkError - -from .processor import Processor -from .sendcmdproc import SendCmdProc -from .mqclientproc import MqClientProc - - -__all__ = [ - 'AsyncClient', -] - -def debug_print(msg): - print msg - -class AsyncClient(Client): - '''Async mode client''' - def __init__(self, config_dir, event_base, central_config_dir=None): - Client.__init__(self, config_dir, central_config_dir) - self.proc_types = {} - self.procs = {} - self.register_processors() - self._bev = None - - self._evbase = event_base - - def get_event_base(self): - return self._evbase - - def add_processor(self, proc): - self.procs[proc.id] = proc - - def remove_processor(self, proc): - if proc.id in self.procs: - del self.procs[proc.id] - - def get_proc(self, id): - return self.procs.get(id, None) - - def write_packet(self, pkt): - outbuf = self._bev.output - - outbuf.add(pkt.header.to_string()) - outbuf.add(pkt.body) - - def send_response(self, id, code, code_msg, content=''): - id = to_response_id(id) - pkt = response_to_packet(id, code, code_msg, content) - self.write_packet(pkt) - - def handle_packet(self, pkt): - ptype = pkt.header.ptype - if ptype == CCNET_MSG_REQUEST: - self.handle_request(pkt.header.id, pkt.body) - - elif ptype == CCNET_MSG_UPDATE: - code, code_msg, content = parse_update(pkt.body) - self.handle_update(pkt.header.id, code, code_msg, content) - - elif ptype == CCNET_MSG_RESPONSE: - code, code_msg, content = parse_response(pkt.body) - self.handle_response(pkt.header.id, code, code_msg, content) - - else: - logging.warning("unknown packet type %d", ptype) - - def handle_request(self, id, req): - commands = req.split() - self.create_slave_processor(to_slave_id(id), commands) - - def create_slave_processor(self, id, commands): - peer_id = self.peer_id - if commands[0] == 'remote': - if len(commands) < 3: - logging.warning("invalid request %s", commands) - return - peer_id = commands[1] - commands = commands[2:] - - proc_name = commands[0] - - if not proc_name in self.proc_types: - logging.warning("unknown processor type %s", proc_name) - return - - cls = self.proc_types[proc_name] - - proc = cls(proc_name, id, peer_id, self) - self.add_processor(proc) - proc.start(*commands[1:]) - - def create_master_processor(self, proc_name): - id = self.get_request_id() - - cls = self.proc_types.get(proc_name, None) - if cls == None: - logging.error('unknown processor type %s', proc_name) - return None - - proc = cls(proc_name, id, self.peer_id, self) - self.add_processor(proc) - return proc - - def handle_update(self, id, code, code_msg, content): - proc = self.get_proc(to_slave_id(id)) - if proc == None: - if code != SC_PROC_DEAD: - self.send_response(id, SC_PROC_DEAD, SS_PROC_DEAD) - return - - if code[0] == '5': - logging.info('shutdown processor %s(%d): %s %s\n', - proc.name, to_packet_id(proc.id), code, code_msg) - if code == SC_UNKNOWN_SERVICE: - proc.shutdown(PROC_NO_SERVICE) - elif code == SC_PERM_ERR: - proc.shutdown(PROC_PERM_ERR) - else: - proc.shutdown(PROC_BAD_RESP) - - elif code == SC_PROC_KEEPALIVE: - proc.send_response(SC_PROC_KEEPALIVE, SS_PROC_KEEPALIVE) - - elif code == SC_PROC_DEAD: - logging.info('shutdown processor %s(%d): when peer(%.8s) processor is dead\n', - proc.name, to_packet_id(proc.id), proc.peer_id) - proc.shutdown(PROC_REMOTE_DEAD) - - elif code == SC_PROC_DONE: - proc.done(True) - - else: - proc.handle_update(code, code_msg, content) - - def handle_response(self, id, code, code_msg, content): - proc = self.get_proc(to_master_id(id)) - if proc == None: - if code != SC_PROC_DEAD: - self.send_update(id, SC_PROC_DEAD, SS_PROC_DEAD) - return - - if code[0] == '5': - logging.info('shutdown processor %s(%d): %s %s\n', - proc.name, to_packet_id(proc.id), code, code_msg) - if code == SC_UNKNOWN_SERVICE: - proc.shutdown(PROC_NO_SERVICE) - elif code == SC_PERM_ERR: - proc.shutdown(PROC_PERM_ERR) - else: - proc.shutdown(PROC_BAD_RESP) - - elif code == SC_PROC_KEEPALIVE: - proc.send_update(id, SC_PROC_KEEPALIVE, SS_PROC_KEEPALIVE) - - elif code == SC_PROC_DEAD: - logging.info('shutdown processor %s(%d): when peer(%.8s) processor is dead\n', - proc.name, to_packet_id(proc.id), proc.peer_id) - proc.shutdown(PROC_REMOTE_DEAD) - - else: - proc.handle_response(code, code_msg, content) - - def register_processor(self, proc_name, proc_type): - assert Processor in proc_type.mro() - - self.proc_types[proc_name] = proc_type - - def register_processors(self): - self.register_processor("send-cmd", SendCmdProc) - self.register_processor("mq-client", MqClientProc) - - def register_service(self, service, group, proc_type, callback=None): - self.register_processor(service, proc_type) - cmd = 'register-service %s %s' % (service, group) - self.send_cmd(cmd, callback) - - def send_cmd(self, cmd, callback=None): - proc = self.create_master_processor("send-cmd") - if callback: - proc.set_callback(callback) - proc.start() - proc.send_cmd(cmd) - - def _read_cb(self, bev, cb_data): - dummy = bev, cb_data - - inbuf = self._bev.input - while (True): - raw = inbuf.copyout(CCNET_HEADER_LENGTH) - header = parse_header(raw) - if len(inbuf) < CCNET_HEADER_LENGTH + header.length: - break - - inbuf.drain(CCNET_HEADER_LENGTH) - data = inbuf.copyout(header.length) - pkt = Packet(header, data) - - self.handle_packet(pkt) - - inbuf.drain(header.length) - - if len(inbuf) < CCNET_HEADER_LENGTH: - break - - def _event_cb(self, bev, what, cb_data): - dummy = bev, cb_data - logging.warning('libevent error: what = %s' % what) - if what & libevent.BEV_EVENT_EOF or \ - what & libevent.BEV_EVENT_ERROR or \ - what & libevent.BEV_EVENT_READING or \ - what & libevent.BEV_EVENT_WRITING: - if self._bev is not None: - self._bev = None - raise NetworkError('libevent error: what = %s' % what) - - def base_loop(self): - '''Create an event base -> register socket events -> loop''' - self._bev = libevent.BufferEvent(self._evbase, - self._connfd.fileno()) - - self._bev.set_watermark(libevent.EV_READ, - CCNET_HEADER_LENGTH, # low wartermark - CCNET_MAX_PACKET_LENGTH * 2) # highmark - - self._bev.set_callbacks(self._read_cb, # read callback - None, # write callback - self._event_cb) # event callback - - self._bev.enable(libevent.EV_READ | libevent.EV_WRITE) - - self._evbase.loop() - - def main_loop(self): - self.base_loop() - diff --git a/python/ccnet/async/mqclientproc.py b/python/ccnet/async/mqclientproc.py deleted file mode 100644 index 22a7e21..0000000 --- a/python/ccnet/async/mqclientproc.py +++ /dev/null @@ -1,50 +0,0 @@ -import logging -from .processor import Processor -from ccnet.message import message_from_string, message_to_string - -INIT = 0 -REQUEST_SENT = 1 -READY = 2 - -SC_MSG = '300' -SC_UNSUBSCRIBE = '301' - -class MqClientProc(Processor): - def __init__(self, *args, **kwargs): - Processor.__init__(self, *args, **kwargs) - self.state = INIT - self.callback = None - - def start(self, *argv): - req = 'mq-server ' + ' '.join(argv) - self.send_request(req) - self.state = REQUEST_SENT - - def set_callback(self, cb): - self.callback = cb - - def handle_response(self, code, code_msg, content): - if self.state == REQUEST_SENT: - if code[0] != '2': - logging.warning('bad response: %s %s\n', code, code_msg) - self.done(False) - - self.state = READY - - elif self.state == READY: - if code[0] != '2' and code[0] != '3': - logging.warning('bad response: %s %s\n', code, code_msg) - return - - if code[0] == '3' and code[2] == '0': - msg = message_from_string(content[:-1]) - if self.callback: - self.callback(msg) - - def put_message(self, msg): - buf = message_to_string(msg) - self.send_update(SC_MSG, '', buf + '\000') - - def unsubscribe(self): - self.send_update(SC_UNSUBSCRIBE, '') - self.done(True) \ No newline at end of file diff --git a/python/ccnet/async/processor.py b/python/ccnet/async/processor.py deleted file mode 100644 index 1071162..0000000 --- a/python/ccnet/async/processor.py +++ /dev/null @@ -1,57 +0,0 @@ -import logging - -from ccnet.packet import SLAVE_BIT_MASK, to_print_id -from ccnet.status_code import SC_PROC_DONE, SS_PROC_DONE, PROC_DONE - -class Processor(object): - """Base processor class""" - - name = "Processor" - def __init__(self, name, id, peer_id, client): - self.name = name - self.id = id - self.peer_id = peer_id - self.client = client - - def start(self, *args, **kwargs): - raise NotImplementedError - - def handle_request(self, *args, **kwargs): - raise NotImplementedError - - def handle_update(self, *args, **kwargs): - raise NotImplementedError - - def handle_response(self, *args, **kwargs): - raise NotImplementedError - - def __str__(self): - return "" % (self.name, to_print_id(self.id)) - - def is_master(self): - return not (self.id & SLAVE_BIT_MASK) - - def send_request(self, buf): - assert self.is_master() - - return self.client.send_request(self.id, buf) - - def send_response(self, code, code_msg, content=''): - assert not self.is_master() - - return self.client.send_response(self.id, code, code_msg, content) - - def send_update(self, code, code_msg, content=''): - assert self.is_master() - - return self.client.send_update(self.id, code, code_msg, content) - - def done(self, success): - if self.is_master() and success: - self.send_update(SC_PROC_DONE, SS_PROC_DONE, '') - self.client.remove_processor(self) - - def shutdown(self, reason): - if reason > PROC_DONE: - logging.debug('shut down %s: %s', self, reason) - self.client.remove_processor(self) \ No newline at end of file diff --git a/python/ccnet/async/rpcserverproc.py b/python/ccnet/async/rpcserverproc.py deleted file mode 100644 index 6e13678..0000000 --- a/python/ccnet/async/rpcserverproc.py +++ /dev/null @@ -1,43 +0,0 @@ -from pysearpc import searpc_server - -from ccnet.status_code import SC_OK, SS_OK -from ccnet.status_code import SC_SERVER_RET, SS_SERVER_RET, SC_SERVER_MORE, SS_SERVER_MORE, \ - SC_CLIENT_CALL, SC_CLIENT_MORE, SC_CLIENT_CALL_MORE - -from .processor import Processor - -class RpcServerProc(Processor): - name = 'rpcserver-proc' - max_transfer_length = 65535 - 128 - - def __init__(self, *args, **kwargs): - Processor.__init__(self, *args, **kwargs) - self.fretstr = '' - self.fcallstr = '' - - def start(self, *argv): - self.send_response(SC_OK, SS_OK, '') - - def send_fret(self): - maxlen = self.max_transfer_length - l = len(self.fretstr) - if l < maxlen: - self.send_response(SC_SERVER_RET, SS_SERVER_RET, self.fretstr) - self.fretstr = '' - - else: - buf = self.fretstr[:maxlen] - self.send_response(SC_SERVER_MORE, SS_SERVER_MORE, buf) - self.fretstr = self.fretstr[maxlen:] - - def handle_update(self, code, code_msg, content): - if code == SC_CLIENT_CALL_MORE: - self.fcallstr += content - return - elif code == SC_CLIENT_CALL: - self.fcallstr += content - self.fretstr = searpc_server.call_function(self.name, self.fcallstr) - self.fcallstr = '' - self.send_fret() - elif code == SC_CLIENT_MORE: - self.send_fret() \ No newline at end of file diff --git a/python/ccnet/async/sendcmdproc.py b/python/ccnet/async/sendcmdproc.py deleted file mode 100644 index 50b7b50..0000000 --- a/python/ccnet/async/sendcmdproc.py +++ /dev/null @@ -1,34 +0,0 @@ -import logging -from .processor import Processor - -INIT = 0 -REQUET_SENT = 1 -CONNECTED = 2 - -class SendCmdProc(Processor): - name = "send-cmd" - def __init__(self, *args, **kwargs): - Processor.__init__(self, *args, **kwargs) - self.callback = None - self.state = INIT - - def start(self, *argv): - self.send_request('receive-cmd') - self.state = REQUET_SENT - - def set_callback(self, cb): - self.callback = cb - - def send_cmd(self, cmd): - self.send_update('200', '', cmd + '\000') - - def handle_response(self, code, code_msg, content): - if code[0] != '2': - logging.warning("Received bad response %s %s", code, code_msg) - - if self.state == REQUET_SENT: - self.state = CONNECTED - - elif self.state == CONNECTED: - if self.callback: - self.callback(code, code_msg, content) \ No newline at end of file diff --git a/python/ccnet/async/timer.py b/python/ccnet/async/timer.py deleted file mode 100644 index fba0a9b..0000000 --- a/python/ccnet/async/timer.py +++ /dev/null @@ -1,21 +0,0 @@ -import libevent -import logging - -class Timer(object): - '''Wraps aroud a libevent timeout event''' - def __init__(self, ev_base, timeout): - self._timeout = timeout - self._evtimer = libevent.Timer(ev_base, self._callback, None) - self._evtimer.add(timeout) # pylint: disable=E1101 - - def _callback(self, evtimer, user_data): - dummy = user_data - try: - self.callback() - except: - logging.exception('error in timer callback:') - - evtimer.add(self._timeout) - - def callback(self): - raise NotImplementedError \ No newline at end of file diff --git a/python/ccnet/client.py b/python/ccnet/client.py deleted file mode 100644 index 1c57c68..0000000 --- a/python/ccnet/client.py +++ /dev/null @@ -1,148 +0,0 @@ -#coding: UTF-8 - -import os -import socket -import ConfigParser -import logging - -from ccnet.packet import to_request_id, to_update_id -from ccnet.packet import request_to_packet, update_to_packet -from ccnet.packet import write_packet - -from ccnet.errors import NetworkError - -from .utils import is_win32, make_socket_closeonexec - -CCNET_PIPE_NAME = 'ccnet.sock' - -def parse_response(body): - '''Parse the content of the response - The struct of response data: - - first 3 bytes is the - - from the 4th byte to the first occurrence of '\n' is the . If the 4th byte is '\n', then there is no - - from the first occurrence of '\n' to the end is the - ''' - code = body[:3] - if body[3] == '\n': - code_msg = '' - content = body[4:] - else: - pos = body.index('\n') - code_msg = body[4:pos] - content = body[pos + 1:] - - return code, code_msg, content - -def parse_update(body): - '''The structure of an update is the same with a response''' - code = body[:3] - if body[3] == '\n': - code_msg = '' - content = body[4:] - else: - pos = body.index('\n') - code_msg = body[4:pos] - content = body[pos + 1:] - - return code, code_msg, content - -class Client(object): - '''Base ccnet client class''' - def __init__(self, config_dir, central_config_dir=None): - if not isinstance(config_dir, unicode): - config_dir = config_dir.decode('UTF-8') - - if central_config_dir: - central_config_dir = os.path.expanduser(central_config_dir) - if not os.path.exists(central_config_dir): - raise RuntimeError(u'%s does not exits' % central_config_dir) - config_dir = os.path.expanduser(config_dir) - config_file = os.path.join(central_config_dir if central_config_dir else config_dir, - u'ccnet.conf') - logging.debug('using config file %s', config_file) - if not os.path.exists(config_file): - raise RuntimeError(u'%s does not exits' % config_file) - - self.central_config_dir = central_config_dir - self.config_dir = config_dir - self.config_file = config_file - self.config = None - - self.port = None - self.peer_id = None - self.peer_name = None - - self.parse_config() - - self._connfd = None - self._req_id = 1000 - - def __del__(self): - '''Destructor of the client class. We close the socket here, if - connetced to daemon - - ''' - if self.is_connected(): - try: - self._connfd.close() - except: - pass - - def parse_config(self): - self.config = ConfigParser.ConfigParser() - self.config.read(self.config_file) - if self.config.has_option('Client', 'PORT'): - self.port = self.config.getint('Client', 'PORT') - else: - self.port = 10001 - self.un_path = '' - if self.config.has_option('Client', 'UNIX_SOCKET'): - self.un_path = self.config.get('Client', 'UNIX_SOCKET') - self.peer_id = self.config.get('General', 'ID') - self.peer_name = self.config.get('General', 'NAME') - - def connect_daemon_with_pipe(self): - self._connfd = socket.socket(socket.AF_UNIX) - if not self.un_path: - pipe_name = os.path.join(self.config_dir, CCNET_PIPE_NAME) - else: - pipe_name = self.un_path - try: - self._connfd.connect(pipe_name) - except: - raise NetworkError("Can't connect to daemon") - - make_socket_closeonexec(self._connfd.fileno()) - - def connect_daemon_with_socket(self): - self._connfd = socket.socket() - self._connfd.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - try: - self._connfd.connect(('127.0.0.1', self.port)) - except: - raise NetworkError("Can't connect to daemon") - - make_socket_closeonexec(self._connfd.fileno()) - - def connect_daemon(self): - if is_win32(): - return self.connect_daemon_with_socket() - else: - return self.connect_daemon_with_pipe() - - def is_connected(self): - return self._connfd is not None - - def send_request(self, id, req): - id = to_request_id(id) - pkt = request_to_packet(id, req) - write_packet(self._connfd, pkt) - - def send_update(self, id, code, code_msg, content=''): - id = to_update_id(id) - pkt = update_to_packet(id, code, code_msg, content) - write_packet(self._connfd, pkt) - - def get_request_id(self): - self._req_id += 1 - return self._req_id diff --git a/python/ccnet/errors.py b/python/ccnet/errors.py deleted file mode 100644 index f7caa70..0000000 --- a/python/ccnet/errors.py +++ /dev/null @@ -1,7 +0,0 @@ -class NetworkError(Exception): - def __init__(self, msg): - Exception.__init__(self) - self.msg = msg - - def __str__(self): - return self.msg diff --git a/python/ccnet/message.py b/python/ccnet/message.py deleted file mode 100644 index d8d867b..0000000 --- a/python/ccnet/message.py +++ /dev/null @@ -1,47 +0,0 @@ -#coding: UTF-8 - -'''Message is the carrier of a simple Pub/Sub system on top of ccnet''' - -import datetime -import re -import uuid -import time - -MESSAGE_PATTERN = re.compile(r'(?P[\d]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P.*)') - -class Message(object): - def __init__(self, d): - self.flags = int(d['flags']) - self.from_ = d['from'] - self.to = d['to'] - self.id = d['id'] - self.ctime = float(d['ctime']) - self.rtime = float(d['rtime']) - self.app = d['app'] - self.body = d['body'] - -def message_from_string(s): - results = MESSAGE_PATTERN.match(s) - if results is None: - raise RuntimeError('Bad message: %s' % s) - - d = results.groupdict() - return Message(d) - - -def gen_inner_message_string(self_id, app, content): - result = "%d %s %s %s %d %d %s %s\000" % (0, self_id, self_id, str(uuid.uuid1()), - int(time.time()), 0, - app, content) - return result - -def message_to_string(msg): - f = '%(flags)s %(from_)s %(to)s %(id)s %(ctime)s %(rtime)s %(app)s %(body)s' - return f % dict(flags=msg.flags, - from_=msg.from_, - to=msg.to, - id=msg.id, - ctime=msg.ctime, - rtime=msg.rtime, - app=msg.app, - body=msg.body) diff --git a/python/ccnet/packet.py b/python/ccnet/packet.py deleted file mode 100644 index 5780c49..0000000 --- a/python/ccnet/packet.py +++ /dev/null @@ -1,129 +0,0 @@ -#coding: UTF-8 - -""" -Packet level protocol of ccnet. - -About various types of id: - - - A slave processor's id has its highest bit set; a master processor has its highest bit clear - - The field of a ccnet packet always has its highest bit clear. The - field of the packet determines what type of the packet is (a - request, a response, or an update) -""" - -import logging -import struct - -from ccnet.utils import recvall, sendall, NetworkError - -REQUEST_ID_MASK = 0x7fffffff -SLAVE_BIT_MASK = 0x80000000 - -CCNET_MSG_OK = 0 -CCNET_MSG_HANDSHAKE = 1 -CCNET_MSG_REQUEST = 2 -CCNET_MSG_RESPONSE = 3 -CCNET_MSG_UPDATE = 4 -CCNET_MSG_RELAY = 5 - -def to_request_id(id): - return id & REQUEST_ID_MASK - -to_response_id = to_request_id -to_update_id = to_request_id -to_master_id = to_request_id -to_packet_id = to_request_id - -def to_slave_id(id): - return id | SLAVE_BIT_MASK - -def to_print_id(id): - if id & SLAVE_BIT_MASK: - return -to_request_id(id) - else: - return id - -# the byte sequence of ccnet packet header -CCNET_HEADER_FORMAT = '>BBHI' -# Number of bytes for the header -CCNET_HEADER_LENGTH = struct.calcsize(CCNET_HEADER_FORMAT) - -CCNET_MAX_PACKET_LENGTH = 65535 - -class PacketHeader(object): - def __init__(self, ver, ptype, length, id): - self.ver = ver - self.ptype = ptype - self.length = length - self.id = id - - def to_string(self): - return struct.pack(CCNET_HEADER_FORMAT, self.ver, self.ptype, self.length, self.id) - - def __str__(self): - return "" % (self.ptype, self.length, self.id) - -class Packet(object): - version = 1 - def __init__(self, header, body): - self.header = header - self.body = body - -def parse_header(buf): - try: - ver, ptype, length, id = struct.unpack(CCNET_HEADER_FORMAT, buf) - except struct.error, e: - raise NetworkError('error when unpack packet header: %s' % e) - - return PacketHeader(ver, ptype, length, id) - -def format_response(code, code_msg, content): - body = code - if code_msg: - body += " " + code_msg - body += "\n" - - if content: - body += content - - return body - -format_update = format_response - -def request_to_packet(id, buf): - hdr = PacketHeader(1, CCNET_MSG_REQUEST, len(buf), to_request_id(id)) - return Packet(hdr, buf) - -def response_to_packet(id, code, code_msg, content): - body = format_response(code, code_msg, content) - hdr = PacketHeader(1, CCNET_MSG_RESPONSE, len(body), to_response_id(id)) - return Packet(hdr, body) - -def update_to_packet(id, code, code_msg, content): - body = format_update(code, code_msg, content) - hdr = PacketHeader(1, CCNET_MSG_UPDATE, len(body), to_update_id(id)) - return Packet(hdr, body) - -def read_packet(fd): - hdr = recvall(fd, CCNET_HEADER_LENGTH) - if len(hdr) == 0: - logging.warning('connection to daemon is lost') - raise NetworkError('Connection to daemon is lost') - elif len(hdr) < CCNET_HEADER_LENGTH: - raise NetworkError('Only read %d bytes header, expected 8' % len(hdr)) - - header = parse_header(hdr) - - if header.length == 0: - body = '' - else: - body = recvall(fd, header.length) - if len(body) < header.length: - raise NetworkError('Only read %d bytes body, expected %d' % (len(body), header.length)) - - return Packet(header, body) - -def write_packet(fd, packet): - hdr = packet.header.to_string() - sendall(fd, hdr) - sendall(fd, packet.body) \ No newline at end of file diff --git a/python/ccnet/pool.py b/python/ccnet/pool.py deleted file mode 100644 index e738dcb..0000000 --- a/python/ccnet/pool.py +++ /dev/null @@ -1,36 +0,0 @@ -from ccnet.sync_client import SyncClient -import Queue - -class ClientPool(object): - """ccnet client pool.""" - - def __init__(self, conf_dir, pool_size=5, central_config_dir=None): - """ - :param central_config_dir: path to the central config dir for ccnet/seafile/seahub/seafdav etc. - :param conf_dir: the ccnet configuration directory - :param pool_size: - """ - self.central_config_dir = central_config_dir - self.conf_dir = conf_dir - self.pool_size = pool_size - self._pool = Queue.Queue(pool_size) - - def _create_client(self): - client = SyncClient(self.conf_dir, self.central_config_dir) - client.req_ids = {} - client.connect_daemon() - - return client - - def get_client(self): - try: - client = self._pool.get(False) - except: - client = self._create_client() - return client - - def return_client(self, client): - try: - self._pool.put(client, False) - except Queue.Full: - pass diff --git a/python/ccnet/rpc.py b/python/ccnet/rpc.py index 6e4513e..69a8ce7 100644 --- a/python/ccnet/rpc.py +++ b/python/ccnet/rpc.py @@ -1,179 +1,9 @@ -from pysearpc import SearpcClient, searpc_func, SearpcError +from pysearpc import searpc_func, NamedPipeClient -from ccnet.status_code import SC_CLIENT_CALL, SS_CLIENT_CALL, \ - SC_CLIENT_MORE, SS_CLIENT_MORE, SC_SERVER_RET, \ - SC_SERVER_MORE, SC_PROC_DEAD +class CcnetThreadedRpcClient(NamedPipeClient): -from ccnet.errors import NetworkError - -class DeadProcError(Exception): - def __str__(self): - return "Processor is dead" - - -class RpcClientBase(SearpcClient): - - def __init__(self, ccnet_client_pool, service_name, retry_num=1, - is_remote=False, remote_peer_id='', req_pool=False): - SearpcClient.__init__(self) - self.pool = ccnet_client_pool - self.service_name = service_name - self.retry_num = retry_num - self.is_remote = is_remote - self.remote_peer_id = remote_peer_id - self.req_pool = req_pool - if self.is_remote and len(self.remote_peer_id) != 40: - raise ValueError("Invalid remote peer id") - - def _start_service(self, client): - req_id = client.get_request_id() - req_str = self.service_name - if self.is_remote: - req_str = "remote " + self.remote_peer_id + " " + self.service_name - client.send_request(req_id, req_str) - rsp = client.read_response() - if rsp.code != "200": - raise SearpcError("Error received: %s %s (In _start_service)" % (rsp.code, rsp.code_msg)) - return req_id - - def _real_call(self, client, req_id, fcall_str): - client.send_update(req_id, SC_CLIENT_CALL, SS_CLIENT_CALL, fcall_str) - - rsp = client.read_response() - if rsp.code == SC_SERVER_RET: - return rsp.content - elif rsp.code == SC_SERVER_MORE: - buf = rsp.content - while True: - client.send_update(req_id, SC_CLIENT_MORE, - SS_CLIENT_MORE, '') - rsp = client.read_response() - if rsp.code == SC_SERVER_MORE: - buf += rsp.content - elif rsp.code == SC_SERVER_RET: - buf += rsp.content - break - else: - raise SearpcError("Error received: %s %s (In Read More)" % (rsp.code, rsp.code_msg)) - - return buf - elif rsp.code == SC_PROC_DEAD: - raise DeadProcError() - else: - raise SearpcError("Error received: %s %s" % (rsp.code, rsp.code_msg)) - - def call_remote_func_sync(self, fcall_str): - """Call remote function `fcall_str` and wait response.""" - - retried = 0 - while True: - try: - client = self.pool.get_client() - if self.req_pool: - req_id = client.req_ids.get(self.service_name, -1) - if req_id == -1: - req_id = self._start_service(client) - client.req_ids[self.service_name] = req_id - try: - ret = self._real_call(client, req_id, fcall_str) - except DeadProcError: - client.req_ids[self.service_name] = -1 - self.pool.return_client(client) - if retried < self.retry_num: - retried = retried + 1 - continue - else: - raise - - self.pool.return_client(client) - return ret - else: - # no req pool - req_id = self._start_service(client) - ret = self._real_call(client, req_id, fcall_str) - client.send_update(req_id, "103", "service is done", "") - self.pool.return_client(client) - return ret - except (NetworkError, SearpcError): - # the client is not returned to the pool and is freed automatically - if retried < self.retry_num: - retried = retried + 1 - continue - else: - raise - -class CcnetRpcClient(RpcClientBase): - - def __init__(self, ccnet_client_pool, retry_num=1, *args, **kwargs): - RpcClientBase.__init__(self, ccnet_client_pool, "ccnet-rpcserver", - *args, **kwargs) - - @searpc_func("string", []) - def list_peers(self): - pass - - @searpc_func("objlist", []) - def list_resolving_peers(self): - pass - - @searpc_func("objlist", ["string"]) - def get_peers_by_role(self): - pass - - @searpc_func("object", ["string"]) - def get_peer(self): - pass - - @searpc_func("object", []) - def get_session_info(self): - pass - - @searpc_func("int", ["string"]) - def add_client(self): - pass - - @searpc_func("int", ["string", "string"]) - def add_role(self, peer_id, role): - pass - - @searpc_func("int", ["string", "string"]) - def remove_role(self, peer_id, role): - pass - - @searpc_func("objlist", ["int", "int"]) - def get_procs_alive(self, offset, limit): - pass - - @searpc_func("int", []) - def count_procs_alive(self): - pass - - @searpc_func("objlist", ["int", "int"]) - def get_procs_dead(self, offset, limit): - pass - - @searpc_func("int", []) - def count_procs_dead(self): - pass - - @searpc_func("string", ["string"]) - def get_config(self, key): - pass - - @searpc_func("int", ["string", "string"]) - def set_config(self, key, value): - pass - - @searpc_func("objlist", []) - def list_peer_stat(self, key, value): - pass - - -class CcnetThreadedRpcClient(RpcClientBase): - - def __init__(self, ccnet_client_pool, retry_num=1, *args, **kwargs): - RpcClientBase.__init__(self, ccnet_client_pool, "ccnet-threaded-rpcserver", - *args, **kwargs) + def __init__(self, socket_path): + NamedPipeClient.__init__(self, socket_path, "ccnet-threaded-rpcserver") @searpc_func("int", ["string", "string", "int", "int"]) def add_emailuser(self, email, passwd, is_staff, is_active): diff --git a/python/ccnet/status_code.py b/python/ccnet/status_code.py deleted file mode 100644 index 4241074..0000000 --- a/python/ccnet/status_code.py +++ /dev/null @@ -1,77 +0,0 @@ -#coding: UTF-8 - -'''Status code and status messages used in ccnet. Should be treated as constants''' - -EC_NETWORK_ERR = 1 -ES_NETWORK_ERR = 'Network Error' - -SC_PROC_KEEPALIVE = '100' -SS_PROC_KEEPALIVE = 'processor keep alive' -SC_PROC_ALIVE = '101' -SS_PROC_ALIVE = 'processor is alive' -SC_PROC_DEAD = '102' -SS_PROC_DEAD = 'processor is dead' -SC_PROC_DONE = '103' -SS_PROC_DONE = 'service is done' - - -SC_OK = '200' -SS_OK = 'OK' -SC_SERV_EXISTED = '210' -SS_SERV_EXISTED = 'The service existed' -SC_PERM_CHECKING = '250' -SS_PERM_CHECKING = 'Permission Checking' - - -SC_SHUTDOWN = '500' -SS_SHUTDOWN = 'Shutdown' -SC_CREATE_PROC_ERR = '501' -SS_CREATE_PROC_ERR = 'Create Processor Error' -SC_BAD_PEER = '502' -SS_BAD_PEER = 'Bad peer id' -SC_BAD_USER = '502' -SS_BAD_USER = 'Bad user id' -SC_BAD_ARGS = '503' -SS_BAD_ARGS = 'Bad arguments' -SC_PERM_ERR = '504' -SS_PERM_ERR = 'Permission Error' -SC_BAD_UPDATE_CODE = '506' -SS_BAD_UPDATE_CODE = 'Bad update code' -SC_BAD_RESPONSE_CODE = '507' -SS_BAD_RESPONSE_CODE = 'Bad response code' -SC_VERSION_MISMATCH = '508' -SS_VERSION_MISMATCH = 'Version Mismatch' -SC_UNKNOWN_PEER = '510' -SS_UNKNOWN_PEER = 'Unknown peer' -SC_UNKNOWN_SERVICE = '511' -SS_UNKNOWN_SERVICE = 'Unknown service' -SC_PEER_UNREACHABLE = '512' -SS_PEER_UNREACHABLE = 'Peer Unreachable' -SC_CON_TIMEOUT = '513' -SS_CON_TIMEOUT = 'connection timeout' -SC_KEEPALIVE_TIMEOUT = '514' -SS_KEEPALIVE_TIMEOUT = 'keepalive timeout' -SC_NETDOWN = '515' -SS_NETDOWN = 'peer down' - - - -PROC_NOTSET = 0 -PROC_DONE = 1 -PROC_REMOTE_DEAD = 2 -PROC_NO_SERVICE = 3 -PROC_PERM_ERR = 4 -PROC_BAD_RESP = 5 - -SC_CLIENT_CALL = '301' -SS_CLIENT_CALL = 'CLIENT CALL' -SC_CLIENT_MORE = '302' -SS_CLIENT_MORE = 'MORE' -SC_CLIENT_CALL_MORE = '303' -SS_CLIENT_CALL_MORE = 'CLIENT HAS MORE' -SC_SERVER_RET = '311' -SS_SERVER_RET = 'SERVER RET' -SC_SERVER_MORE = '312' -SS_SERVER_MORE = 'HAS MORE' -SC_SERVER_ERR = '411' -SS_SERVER_ERR = 'Fail to invoke the function, check the function' \ No newline at end of file diff --git a/python/ccnet/sync_client.py b/python/ccnet/sync_client.py deleted file mode 100644 index 67adbd7..0000000 --- a/python/ccnet/sync_client.py +++ /dev/null @@ -1,94 +0,0 @@ -from ccnet.client import Client, parse_response -from ccnet.packet import read_packet, CCNET_MSG_RESPONSE -from ccnet.status_code import SC_PROC_DONE, SS_PROC_DONE -from ccnet.message import message_from_string, gen_inner_message_string - -_REQ_ID_START = 1000 - -class Response(object): - def __init__(self, code, code_msg, content): - self.code = code - self.code_msg = code_msg - self.content = content - -class SyncClient(Client): - '''sync mode client''' - def __init__(self, config_dir, central_config_dir=None): - Client.__init__(self, config_dir, central_config_dir) - self._req_id = _REQ_ID_START - self.mq_req_id = -1 - - def disconnect_daemon(self): - if self.is_connected(): - try: - self._connfd.close() - except: - pass - - def read_response(self): - packet = read_packet(self._connfd) - if packet.header.ptype != CCNET_MSG_RESPONSE: - raise RuntimeError('Invalid Response') - - code, code_msg, content = parse_response(packet.body) - - return Response(code, code_msg, content) - - def send_cmd(self, cmd): - req_id = self.get_request_id() - self.send_request(req_id, 'receive-cmd') - resp = self.read_response() - if resp.code != '200': - raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg)) - - cmd += '\000' - self.send_update(req_id, '200', '', cmd) - - resp = self.read_response() - if resp.code != '200': - raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg)) - - self.send_update(req_id, SC_PROC_DONE, SS_PROC_DONE, '') - - def prepare_recv_message(self, msg_type): - request = 'mq-server %s' % msg_type - req_id = self.get_request_id() - self.send_request(req_id, request) - - resp = self.read_response() - if resp.code != '200': - raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg)) - - def receive_message(self): - resp = self.read_response() - # the message from ccnet daemon has the trailing null byte included - msg = message_from_string(resp.content[:-1]) - return msg - - def prepare_send_message(self): - request = 'mq-server' - mq_req_id = self.get_request_id() - self.send_request(mq_req_id, request) - resp = self.read_response() - if resp.code != '200': - raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg)) - self.mq_req_id = mq_req_id - - def send_message(self, msg_type, content): - if self.mq_req_id == -1: - self.prepare_send_message() - - msg = gen_inner_message_string(self.peer_id, msg_type, content) - self.send_update(self.mq_req_id, "300", '', msg) - resp = self.read_response() - if resp.code != '200': - self.mq_req_id = -1 - raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg)) - - def register_service_sync(self, service, group): - '''Mainly used by a program to register a dummy service to ensure only - single instance of that program is running - - ''' - cmd = 'register-service %s %s' % (service, group) - self.send_cmd(cmd) diff --git a/python/ccnet/test-client.py b/python/ccnet/test-client.py deleted file mode 100644 index 2bb811b..0000000 --- a/python/ccnet/test-client.py +++ /dev/null @@ -1,62 +0,0 @@ -import sys -import os -from ccnet import ClientPool, RpcClientBase -from pysearpc import searpc_func -import threading -import logging - -RPC_SERVICE_NAME = 'test-rpcserver' -CCNET_CONF_DIR = os.path.expanduser('~/.ccnet') - -class TestRpcClient(RpcClientBase): - def __init__(self, client_pool, *args, **kwargs): - RpcClientBase.__init__(self, client_pool, RPC_SERVICE_NAME, *args, **kwargs) - - @searpc_func('string', ['string', 'int']) - def str_mul(self, s, n): - pass - - -class Worker(threading.Thread): - def __init__(self, rpc): - threading.Thread.__init__(self) - self.rpc = rpc - - def run(self): - s = 'abcdef' - n = 100 - assert self.rpc.str_mul(s, n) == s * n - -def test(n): - rpcclient = TestRpcClient(ClientPool(CCNET_CONF_DIR, CCNET_CONF_DIR)) - - workers = [] - for i in xrange(n): - t = Worker(rpcclient) - t.start() - workers.append(t) - - for t in workers: - t.join() - -def setup_logging(): - kw = { - 'format': '[%(asctime)s][%(module)s]: %(message)s', - 'datefmt': '%m/%d/%Y %H:%M:%S', - 'level': logging.DEBUG, - 'stream': sys.stdout, - } - - logging.basicConfig(**kw) - -def main(): - setup_logging() - if len(sys.argv) > 1: - test(int(sys.argv[1])) - else: - test(100) - - print 'test passed' - -if __name__ == '__main__': - main() diff --git a/python/ccnet/test-server.py b/python/ccnet/test-server.py deleted file mode 100644 index d69e28a..0000000 --- a/python/ccnet/test-server.py +++ /dev/null @@ -1,45 +0,0 @@ -import os -import logging -import libevent - -from pysearpc import searpc_server -from ccnet.async import AsyncClient, RpcServerProc - -RPC_SERVICE_NAME = 'test-rpcserver' -CCNET_CONF_DIR = os.path.expanduser('~/.ccnet') - -def init_logging(): - """Configure logging module""" - level = logging.DEBUG - - kw = { - 'format': '[%(asctime)s] %(message)s', - 'datefmt': '%m/%d/%Y %H:%M:%S', - 'level': level, - } - - logging.basicConfig(**kw) - -i = 0 -def register_rpc_functions(session): - def str_mul(a, b): - global i - i = i + 1 - print '[%s] a = %s, b = %s' % (i, a, b) - return a * b - - searpc_server.create_service(RPC_SERVICE_NAME) - searpc_server.register_function(RPC_SERVICE_NAME, str_mul) - - session.register_service(RPC_SERVICE_NAME, 'basic', RpcServerProc) - -def main(): - init_logging() - evbase = libevent.Base() - session = AsyncClient(CCNET_CONF_DIR, evbase, CCNET_CONF_DIR) - session.connect_daemon() - register_rpc_functions(session) - session.main_loop() - -if __name__ == '__main__': - main() diff --git a/python/ccnet/utils.py b/python/ccnet/utils.py deleted file mode 100644 index cc2f51d..0000000 --- a/python/ccnet/utils.py +++ /dev/null @@ -1,45 +0,0 @@ -import os -import socket - -from ccnet.errors import NetworkError - -def recvall(fd, total): - remain = total - data = '' - while remain > 0: - try: - new = fd.recv(remain) - except socket.error as e: - raise NetworkError('Failed to read from socket: %s' % e) - - n = len(new) - if n <= 0: - raise NetworkError("Failed to read from socket") - else: - data += new - remain -= n - - return data - -def sendall(fd, data): - total = len(data) - offset = 0 - while offset < total: - try: - n = fd.send(data[offset:]) - except socket.error as e: - raise NetworkError('Failed to write to socket: %s' % e) - - if n <= 0: - raise NetworkError('Failed to write to socket') - else: - offset += n - -def is_win32(): - return os.name == 'nt' - -def make_socket_closeonexec(fd): - if not is_win32(): - import fcntl - old_flags = fcntl.fcntl(fd, fcntl.F_GETFD) - fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)