1
0
mirror of https://github.com/haiwen/ccnet-server.git synced 2025-04-27 02:10:49 +00:00

support python3 (#35)

* use searpc-named-pipe-transport

* delete unused files

* remove not threaded rpc

* use NamedPipeClient

* remove not threaded rpc

* remove unused code

* remove unused rpc function

* remove unused code and source files

* Add name of rpc named pipe to ccent.h.
This commit is contained in:
Xiangyue Cai 2019-07-25 11:59:28 +08:00 committed by Jiaqiang Xu
parent f014fc5263
commit 9a1858aea5
31 changed files with 27 additions and 2681 deletions

View File

@ -25,71 +25,8 @@
#include <searpc-client.h>
/* mainloop */
#define CCNET_RPC_PIPE_NAME "ccnet-rpc.sock"
void ccnet_main (CcnetClient *client);
typedef void (*RegisterServiceCB) (gboolean success);
void ccnet_register_service (CcnetClient *client,
const char *service, const char *group,
GType proc_type, RegisterServiceCB cb);
gboolean ccnet_register_service_sync (CcnetClient *client,
const char *service,
const char *group);
CcnetClient *ccnet_init (const char *central_config_dir, const char *confdir);
void ccnet_send_command (CcnetClient *client, const char *command,
SendcmdProcRcvrspCallback cmd_cb, void *cbdata);
void ccnet_add_peer (CcnetClient *client, const char *id, const char *addr);
void ccnet_connect_peer (CcnetClient *client, const char *id);
void ccnet_disconnect_peer (CcnetClient *client, const char *id);
/* client pool */
struct CcnetClientPool;
typedef struct CcnetClientPool CcnetClientPool;
struct CcnetClientPool *
ccnet_client_pool_new (const char *central_config_dir, const char *conf_dir);
CcnetClient *
ccnet_client_pool_get_client (struct CcnetClientPool *cpool);
void
ccnet_client_pool_return_client (struct CcnetClientPool *cpool,
CcnetClient *client);
/* rpc wrapper */
/* Create rpc client using a single client for transport. */
SearpcClient *
ccnet_create_rpc_client (CcnetClient *cclient, const char *peer_id,
const char *service_name);
/* Create rpc client using client pool for transport. */
SearpcClient *
ccnet_create_pooled_rpc_client (struct CcnetClientPool *cpool,
const char *peer_id,
const char *service);
SearpcClient *
ccnet_create_async_rpc_client (CcnetClient *cclient, const char *peer_id,
const char *service_name);
void ccnet_rpc_client_free (SearpcClient *client);
void ccnet_async_rpc_client_free (SearpcClient *client);
CcnetPeer *ccnet_get_peer (SearpcClient *client, const char *peer_id);
CcnetPeer *ccnet_get_peer_by_idname (SearpcClient *client, const char *idname);
int ccnet_get_peer_net_state (SearpcClient *client, const char *peer_id);
int ccnet_get_peer_bind_status (SearpcClient *client, const char *peer_id);
int ccnet_peer_is_ready (SearpcClient *client, const char *peer_id);
CcnetPeer *ccnet_get_default_relay (SearpcClient *client);
GList *ccnet_get_peers_by_role (SearpcClient *client, const char *role);
char *ccnet_get_binding_email (SearpcClient *client, const char *peer_id);
GList *ccnet_get_groups_by_user (SearpcClient *client, const char *user, int return_ancestors);
GList *ccnet_get_org_groups_by_user (SearpcClient *client, const char *user, int org_id);
GList *
@ -97,33 +34,4 @@ ccnet_get_group_members (SearpcClient *client, int group_id);
int
ccnet_org_user_exists (SearpcClient *client, int org_id, const char *user);
int
ccnet_get_binding_email_async (SearpcClient *client, const char *peer_id,
AsyncCallback callback, void *user_data);
char *ccnet_sign_message (SearpcClient *client, const char *message);
int ccnet_verify_message (SearpcClient *client,
const char *message,
const char *sig_base64,
const char *peer_id);
char *
ccnet_pubkey_encrypt (SearpcClient *client,
const char *msg_base64,
const char *peer_id);
char *
ccnet_privkey_decrypt (SearpcClient *client, const char *msg_base64);
char *ccnet_get_config (SearpcClient *client, const char *key);
int ccnet_set_config (SearpcClient *client, const char *key, const char *value);
void
ccnet_login_to_relay (SearpcClient *client, const char *relay_id,
const char *username, const char *passwd);
int
ccnet_update_peer_address (SearpcClient *client, const char *peer_id,
const char *addr, int port);
#endif

View File

@ -7,5 +7,5 @@ ccnet_HEADERS = ccnet-client.h peer.h proc-factory.h \
mqclient-proc.h invoke-service-proc.h \
status-code.h cevent.h timer.h ccnet-session-base.h \
valid-check.h job-mgr.h packet.h \
async-rpc-proc.h ccnetrpc-transport.h \
rpcserver-proc.h threaded-rpcserver-proc.h
async-rpc-proc.h rpcserver-proc.h \
threaded-rpcserver-proc.h

View File

@ -1,30 +0,0 @@
#ifndef CCNETRPC_TRANPORT_H
#define CCNETRPC_TRANPORT_H
#include <ccnet.h>
typedef struct {
/* either session or pool will be set. */
CcnetClient *session;
CcnetClientPool *pool;
char *peer_id; /* NULL if local */
char *service;
} CcnetrpcTransportParam; /* this structure will be parsed to
* ccnet_transport_send ()
*/
typedef struct {
CcnetClient *session;
char *peer_id; /* NULL if local */
char *service;
} CcnetrpcAsyncTransportParam; /* this structure will be parsed to
* ccnet_async_transport_send ()
*/
char *ccnetrpc_transport_send (void *arg,
const gchar *fcall_str, size_t fcall_len, size_t *ret_len);
int ccnetrpc_async_transport_send (void *arg, gchar *fcall_str,
size_t fcall_len, void *rpc_priv);
#endif /* SEARPC_TRANPORT_H */

View File

@ -38,11 +38,10 @@ libccnet_la_SOURCES = ccnet-client.c packet-io.c libccnet_utils.c \
peer.c sendcmd-proc.c \
mqclient-proc.c invoke-service-proc.c \
marshal.c \
mainloop.c cevent.c timer.c ccnet-session-base.c job-mgr.c \
rpcserver-proc.c ccnetrpc-transport.c threaded-rpcserver-proc.c \
cevent.c timer.c ccnet-session-base.c job-mgr.c \
rpcserver-proc.c threaded-rpcserver-proc.c \
ccnetobj.c \
async-rpc-proc.c ccnet-rpc-wrapper.c \
client-pool.c
async-rpc-proc.c ccnet-rpc-wrapper.c
EXTRA_DIST = ccnetobj.vala rpc_table.py

View File

@ -5,180 +5,6 @@
#include <ccnet.h>
#include <ccnet-object.h>
#include <searpc-client.h>
#include <ccnet/ccnetrpc-transport.h>
SearpcClient *
ccnet_create_rpc_client (CcnetClient *cclient,
const char *peer_id,
const char *service_name)
{
SearpcClient *rpc_client;
CcnetrpcTransportParam *priv;
priv = g_new0(CcnetrpcTransportParam, 1);
priv->session = cclient;
priv->peer_id = g_strdup(peer_id);
priv->service = g_strdup(service_name);
rpc_client = searpc_client_new ();
rpc_client->send = ccnetrpc_transport_send;
rpc_client->arg = priv;
return rpc_client;
}
SearpcClient *
ccnet_create_pooled_rpc_client (struct CcnetClientPool *cpool,
const char *peer_id,
const char *service)
{
SearpcClient *rpc_client;
CcnetrpcTransportParam *priv;
priv = g_new0(CcnetrpcTransportParam, 1);
priv->pool = cpool;
priv->peer_id = g_strdup(peer_id);
priv->service = g_strdup(service);
rpc_client = searpc_client_new ();
rpc_client->send = ccnetrpc_transport_send;
rpc_client->arg = priv;
return rpc_client;
}
SearpcClient *
ccnet_create_async_rpc_client (CcnetClient *cclient, const char *peer_id,
const char *service_name)
{
SearpcClient *rpc_client;
CcnetrpcAsyncTransportParam *async_priv;
async_priv = g_new0 (CcnetrpcAsyncTransportParam, 1);
async_priv->session = cclient;
async_priv->peer_id = g_strdup(peer_id);
async_priv->service = g_strdup (service_name);
rpc_client = searpc_client_new ();
rpc_client->async_send = ccnetrpc_async_transport_send;
rpc_client->async_arg = async_priv;
return rpc_client;
}
void
ccnet_rpc_client_free (SearpcClient *client)
{
CcnetrpcTransportParam *priv;
if (!client) return;
priv = client->arg;
g_free (priv->peer_id);
g_free (priv->service);
g_free (priv);
searpc_client_free (client);
}
void
ccnet_async_rpc_client_free (SearpcClient *client)
{
CcnetrpcAsyncTransportParam *priv = client->arg;
g_free (priv->peer_id);
g_free (priv->service);
g_free (priv);
searpc_client_free (client);
}
CcnetPeer *
ccnet_get_peer (SearpcClient *client, const char *peer_id)
{
if (!peer_id)
return NULL;
return (CcnetPeer *) searpc_client_call__object(
client, "get_peer",CCNET_TYPE_PEER, NULL,
1, "string", peer_id);
}
CcnetPeer *
ccnet_get_peer_by_idname (SearpcClient *client, const char *idname)
{
if (!idname)
return NULL;
return (CcnetPeer *) searpc_client_call__object(
client, "get_peer_by_idname", CCNET_TYPE_PEER, NULL,
1, "string", idname);
}
int
ccnet_get_peer_net_state (SearpcClient *client, const char *peer_id)
{
CcnetPeer *peer;
int ret;
peer = ccnet_get_peer (client, peer_id);
if (!peer)
return PEER_DOWN;
ret = peer->net_state;
g_object_unref (peer);
return ret;
}
int
ccnet_get_peer_bind_status (SearpcClient *client, const char *peer_id)
{
CcnetPeer *peer;
int ret;
peer = ccnet_get_peer (client, peer_id);
if (!peer)
return BIND_UNKNOWN;
ret = peer->bind_status;
g_object_unref (peer);
return ret;
}
CcnetPeer *
ccnet_get_default_relay (SearpcClient *client)
{
CcnetSessionBase *base = (CcnetSessionBase *)
searpc_client_call__object(
client, "get_session_info", CCNET_TYPE_SESSION_BASE, NULL, 0);
if (!base)
return NULL;
CcnetPeer *relay = ccnet_get_peer (client, base->relay_id);
g_object_unref (base);
return relay;
}
GList *
ccnet_get_peers_by_role (SearpcClient *client, const char *role)
{
if (!role)
return NULL;
return searpc_client_call__objlist (
client, "get_peers_by_role", CCNET_TYPE_PEER, NULL,
1, "string", role);
}
char *
ccnet_get_binding_email (SearpcClient *client, const char *peer_id)
{
if (!peer_id)
return NULL;
return searpc_client_call__string (
client, "get_binding_email", NULL,
1, "string", peer_id);
}
GList *
ccnet_get_groups_by_user (SearpcClient *client, const char *user, int return_ancestors)
@ -216,121 +42,3 @@ ccnet_org_user_exists (SearpcClient *client, int org_id, const char *user)
return searpc_client_call__int (client, "org_user_exists", NULL,
2, "int", org_id, "string", user);
}
#if 0
int
ccnet_get_peer_async (SearpcClient *client, const char *peer_id,
AsyncCallback callback, void *user_data)
{
return get_peer_async (client, peer_id, callback, user_data);
}
#endif
int
ccnet_get_binding_email_async (SearpcClient *client, const char *peer_id,
AsyncCallback callback, void *user_data)
{
return searpc_client_async_call__string (
client, "get_binding_email", callback, user_data,
1, "string", peer_id);
}
char *
ccnet_sign_message (SearpcClient *client, const char *message)
{
if (!message)
return NULL;
return searpc_client_call__string (
client, "sign_message", NULL,
1, "string", message);
}
int
ccnet_verify_message (SearpcClient *client,
const char *message,
const char *sig_base64,
const char *peer_id)
{
if (!message || !sig_base64 || !peer_id)
return -1;
return searpc_client_call__int (
client, "verify_message", NULL,
3, "string", message, "string", sig_base64, "string", peer_id);
}
char *
ccnet_pubkey_encrypt (SearpcClient *client,
const char *msg_base64,
const char *peer_id)
{
if (!msg_base64 || !peer_id)
return NULL;
return searpc_client_call__string (client, "pubkey_encrypt", NULL, 2,
"string", msg_base64, "string", peer_id);
}
char *
ccnet_privkey_decrypt (SearpcClient *client, const char *msg_base64)
{
if (!msg_base64)
return NULL;
return searpc_client_call__string (client, "privkey_decrypt", NULL, 1,
"string", msg_base64);
}
char *
ccnet_get_config (SearpcClient *client, const char *key)
{
if (!key)
return NULL;
return searpc_client_call__string (
client, "get_config", NULL,
1, "string", key);
}
int
ccnet_set_config (SearpcClient *client, const char *key, const char *value)
{
if (!key || !value)
return -1;
return searpc_client_call__int (
client, "set_config", NULL,
2, "string", key, "string", value);
}
void
ccnet_login_to_relay (SearpcClient *client, const char *relay_id,
const char *username, const char *passwd)
{
searpc_client_call__int (client, "login_relay", NULL,
3, "string", relay_id,
"string", username, "string", passwd);
}
gboolean
ccnet_peer_is_ready (SearpcClient *client, const char *peer_id)
{
CcnetPeer *peer;
gboolean ret;
peer = ccnet_get_peer (client, peer_id);
if (!peer)
return FALSE;
ret = peer->is_ready;
g_object_unref (peer);
return ret;
}
int
ccnet_update_peer_address (SearpcClient *client, const char *peer_id,
const char *addr, int port)
{
return searpc_client_call__int (
client, "update_peer_address", NULL,
3, "string", peer_id, "string", addr, "int", port);
}

View File

@ -1,184 +0,0 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ccnet/ccnetrpc-transport.h>
#include "rpc-common.h"
#include <ccnet/async-rpc-proc.h>
static char *
invoke_service (CcnetClient *session,
const char *peer_id,
const char *service,
const char *fcall_str,
size_t fcall_len,
size_t *ret_len)
{
struct CcnetResponse *rsp;
uint32_t req_id;
GString *buf;
req_id = ccnet_client_get_rpc_request_id (session, peer_id, service);
if (req_id == 0) {
*ret_len = 0;
return NULL;
}
ccnet_client_send_update (session, req_id,
SC_CLIENT_CALL, SS_CLIENT_CALL,
fcall_str, fcall_len);
if (ccnet_client_read_response (session) < 0) {
*ret_len = 0;
ccnet_client_clean_rpc_request (session, req_id);
return NULL;
}
rsp = &session->response;
if (memcmp (rsp->code, SC_SERVER_RET, 3) == 0) {
*ret_len = (size_t) rsp->clen;
return g_strndup (rsp->content, rsp->clen);
} else if (memcmp (rsp->code, SC_SERVER_MORE, 3) != 0) {
g_warning ("[Sea RPC] Bad response: %s %s.\n", rsp->code, rsp->code_msg);
*ret_len = 0;
return NULL;
}
buf = g_string_new_len (rsp->content, rsp->clen);
while (1) {
ccnet_client_send_update (session, req_id,
SC_CLIENT_MORE, SS_CLIENT_MORE,
fcall_str, fcall_len);
if (ccnet_client_read_response (session) < 0) {
*ret_len = 0;
ccnet_client_clean_rpc_request (session, req_id);
g_string_free (buf, TRUE);
return NULL;
}
rsp = &session->response;
if (memcmp (rsp->code, SC_SERVER_RET, 3) == 0) {
g_string_append_len (buf, rsp->content, rsp->clen);
*ret_len = buf->len;
return g_string_free (buf, FALSE);
} else if (memcmp (rsp->code, SC_SERVER_MORE, 3) == 0) {
g_string_append_len (buf, rsp->content, rsp->clen);
} else {
g_warning ("[Sea RPC] Bad response: %s %s.\n",
rsp->code, rsp->code_msg);
*ret_len = 0;
g_string_free (buf, TRUE);
return NULL;
}
}
/* Never reach here. */
return NULL;
}
static CcnetClient *
create_new_client (const char *central_config_dir, const char *conf_dir)
{
CcnetClient *client;
client = ccnet_client_new ();
if (ccnet_client_load_confdir (client, central_config_dir, conf_dir) < 0) {
g_warning ("[Sea RPC] Failed to load conf dir.\n");
g_object_unref (client);
return NULL;
}
if (ccnet_client_connect_daemon (client, CCNET_CLIENT_SYNC) < 0) {
g_warning ("[Sea RPC] Failed to connect ccnet.\n");
g_object_unref (client);
return NULL;
}
return client;
}
char *
ccnetrpc_transport_send (void *arg, const gchar *fcall_str,
size_t fcall_len, size_t *ret_len)
{
CcnetrpcTransportParam *priv;
CcnetClient *session, *new_session;
g_warn_if_fail (arg != NULL && fcall_str != NULL);
priv = (CcnetrpcTransportParam *)arg;
if (priv->session != NULL) {
/* Use single ccnet client as transport. */
return invoke_service (priv->session, priv->peer_id, priv->service,
fcall_str, fcall_len, ret_len);
} else {
/* Use client pool as transport. */
session = ccnet_client_pool_get_client (priv->pool);
if (!session) {
g_warning ("[Sea RPC] Failed to get client from pool.\n");
*ret_len = 0;
return NULL;
}
char *ret = invoke_service (session, priv->peer_id, priv->service,
fcall_str, fcall_len, ret_len);
if (ret != NULL) {
ccnet_client_pool_return_client (priv->pool, session);
return ret;
}
/* If we failed to send data through the ccnet client returned by
* client pool, ccnet may have been restarted.
* In this case, we create a new ccnet client and put it into
* the client pool after use.
*/
g_message ("[Sea RPC] Ccnet disconnected. Connect again.\n");
new_session = create_new_client (session->central_config_dir, session->config_dir);
if (!new_session) {
*ret_len = 0;
return NULL;
}
g_object_unref (session);
ret = invoke_service (new_session, priv->peer_id, priv->service,
fcall_str, fcall_len, ret_len);
if (ret != NULL)
ccnet_client_pool_return_client (priv->pool, new_session);
else
g_object_unref (new_session);
return ret;
}
}
int
ccnetrpc_async_transport_send (void *arg, gchar *fcall_str,
size_t fcall_len, void *rpc_priv)
{
CcnetrpcAsyncTransportParam *priv;
CcnetClient *session;
CcnetProcessor *proc;
g_warn_if_fail (arg != NULL && fcall_str != NULL);
priv = (CcnetrpcAsyncTransportParam *)arg;
session = priv->session;
if (!priv->peer_id)
proc = ccnet_proc_factory_create_master_processor (
session->proc_factory, "async-rpc");
else
proc = ccnet_proc_factory_create_remote_master_processor (
session->proc_factory, "async-rpc", priv->peer_id);
ccnet_async_rpc_proc_set_rpc ((CcnetAsyncRpcProc *)proc, priv->service,
fcall_str, fcall_len, rpc_priv);
ccnet_processor_start (proc, 0, NULL);
return 0;
}

View File

@ -1,63 +0,0 @@
#include "include.h"
#include <ccnet.h>
#include <ccnet/ccnetrpc-transport.h>
#include <glib.h>
#include <pthread.h>
struct CcnetClientPool {
GQueue *clients;
pthread_mutex_t lock;
const char *central_config_dir;
const char *conf_dir;
};
struct CcnetClientPool *
ccnet_client_pool_new (const char *central_config_dir, const char *conf_dir)
{
CcnetClientPool *pool = g_new0 (CcnetClientPool, 1);
pool->clients = g_queue_new ();
pthread_mutex_init (&pool->lock, NULL);
pool->conf_dir = g_strdup(conf_dir);
pool->central_config_dir = g_strdup(central_config_dir);
return pool;
}
CcnetClient *
ccnet_client_pool_get_client (struct CcnetClientPool *cpool)
{
CcnetClient *client;
pthread_mutex_lock (&cpool->lock);
client = g_queue_pop_head (cpool->clients);
pthread_mutex_unlock (&cpool->lock);
if (!client) {
client = ccnet_client_new ();
if (ccnet_client_load_confdir (client, cpool->central_config_dir, cpool->conf_dir) < 0) {
g_warning ("[client pool] Failed to load conf dir.\n");
g_object_unref (client);
return NULL;
}
if (ccnet_client_connect_daemon (client, CCNET_CLIENT_SYNC) < 0) {
g_warning ("[client pool] Failed to connect.\n");
g_object_unref (client);
return NULL;
}
}
return client;
}
void
ccnet_client_pool_return_client (struct CcnetClientPool *cpool,
CcnetClient *client)
{
pthread_mutex_lock (&cpool->lock);
g_queue_push_tail (cpool->clients, client);
pthread_mutex_unlock (&cpool->lock);
}

View File

@ -1,160 +0,0 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include "include.h"
#include <ccnet.h>
#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
#include <event2/event.h>
#include <event2/event_compat.h>
#include <event2/event_struct.h>
#else
#include <event.h>
#endif
static int
cmdrsp_cb (const char *code, char *content, int clen, void *data)
{
RegisterServiceCB cb = data;
if (clen != 0)
ccnet_debug ("Receive cmd response {\n%s}\n", content);
else
ccnet_debug ("Receive cmd response null\n");
if (cb) {
if (memcmp (SC_SERV_EXISTED, code, 3) == 0)
cb (FALSE);
else
cb (TRUE);
}
return 0;
}
void
ccnet_register_service (CcnetClient *client,
const char *service,
const char *group,
GType proc_type,
RegisterServiceCB cb)
{
char buf[512];
g_return_if_fail (group);
ccnet_proc_factory_register_processor (client->proc_factory,
service,
proc_type);
snprintf (buf, 512, "register-service %s %s", service, group);
ccnet_send_command (client, buf, cmdrsp_cb, cb);
}
gboolean
ccnet_register_service_sync (CcnetClient *client,
const char *service,
const char *group)
{
char buf[512];
GError *error = NULL;
snprintf (buf, 512, "register-service %s %s", service, group);
ccnet_client_send_cmd (client, buf, &error);
if (error) {
ccnet_warning ("Bad response for register service %s: %d %s",
service, error->code, error->message);
return FALSE;
}
return TRUE;
}
static void read_cb (evutil_socket_t fd, short event, void *vclient)
{
CcnetClient *client = vclient;
if (ccnet_client_read_input (client) <= 0) {
ccnet_client_disconnect_daemon (client);
exit (1);
}
}
/**
* Inititialize ccnet client structure, connect daemon and initialize
* event loop.
*/
CcnetClient *
ccnet_init (const char *central_config_dir, const char *confdir)
{
CcnetClient *client;
client = ccnet_client_new ();
if ( (ccnet_client_load_confdir(client, central_config_dir, confdir)) < 0 ) {
ccnet_warning ("Read config dir error\n");
return NULL;
}
if (ccnet_client_connect_daemon (client, CCNET_CLIENT_ASYNC) < 0) {
ccnet_warning ("Connect to ccnet daemon error\n");
exit(1);
}
ccnet_client_run_synchronizer (client);
event_init ();
return client;
}
void
ccnet_main (CcnetClient *client)
{
struct event ev;
event_set (&ev, client->connfd, EV_READ | EV_PERSIST, read_cb, client);
event_add (&ev, NULL);
event_dispatch ();
}
void ccnet_send_command (CcnetClient *client, const char *command,
SendcmdProcRcvrspCallback cmd_cb, void *cbdata)
{
CcnetSendcmdProc *sendcmd_proc = (CcnetSendcmdProc *)
ccnet_proc_factory_create_master_processor (client->proc_factory,
"send-cmd");
ccnet_sendcmd_proc_set_rcvrsp_cb (sendcmd_proc, cmd_cb, cbdata);
ccnet_processor_start (CCNET_PROCESSOR(sendcmd_proc), 0, NULL);
ccnet_sendcmd_proc_send_command (sendcmd_proc, command);
}
/* add-peer [--id <peer-id>] [--addr <peer-addr:port>]
*/
void ccnet_add_peer (CcnetClient *client, const char *id, const char *addr)
{
char buf[256];
if (id == NULL || strlen(id) != 40 || addr == NULL)
return;
snprintf (buf, 256, "add-peer --id %s --addr %s", id, addr);
ccnet_send_command (client, buf, NULL, NULL);
}
void ccnet_connect_peer (CcnetClient *client, const char *id)
{
char buf[256];
if (id == NULL || strlen(id) != 40)
return;
snprintf (buf, 256, "connect %s", id);
ccnet_send_command (client, buf, NULL, NULL);
}
void ccnet_disconnect_peer (CcnetClient *client, const char *id)
{
char buf[256];
if (id == NULL || strlen(id) != 40)
return;
snprintf (buf, 256, "disconnect %s", id);
ccnet_send_command (client, buf, NULL, NULL);
}

View File

@ -15,10 +15,6 @@
#include "ccnet-object.h"
#include "processors/rpcserver-proc.h"
#ifdef CCNET_SERVER
#include "processors/threaded-rpcserver-proc.h"
#endif
#include "searpc-server.h"
#include "ccnet-config.h"
@ -36,109 +32,24 @@
extern CcnetSession *session;
#include <searpc.h>
#include <searpc-named-pipe-transport.h>
#include "searpc-signature.h"
#include "searpc-marshal.h"
void
#define CCNET_SOCKET_NAME "ccnet-rpc.sock"
int
ccnet_start_rpc(CcnetSession *session)
{
searpc_server_init (register_marshals);
searpc_create_service ("ccnet-rpcserver");
ccnet_proc_factory_register_processor (session->proc_factory,
"ccnet-rpcserver",
CCNET_TYPE_RPCSERVER_PROC);
#ifdef CCNET_SERVER
searpc_create_service ("ccnet-threaded-rpcserver");
ccnet_proc_factory_register_processor (session->proc_factory,
"ccnet-threaded-rpcserver",
CCNET_TYPE_THREADED_RPCSERVER_PROC);
#endif
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_list_peers,
"list_peers",
searpc_signature_string__void());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_list_resolving_peers,
"list_resolving_peers",
searpc_signature_objlist__void());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_get_peers_by_role,
"get_peers_by_role",
searpc_signature_objlist__string());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_get_peer,
"get_peer",
searpc_signature_object__string());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_get_peer_by_idname,
"get_peer_by_idname",
searpc_signature_object__string());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_update_peer_address,
"update_peer_address",
searpc_signature_int__string_string_int());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_get_session_info,
"get_session_info",
searpc_signature_object__void());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_add_client,
"add_client",
searpc_signature_int__string());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_add_role,
"add_role",
searpc_signature_int__string_string());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_remove_role,
"remove_role",
searpc_signature_int__string_string());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_get_config,
"get_config",
searpc_signature_string__string());
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_set_config,
"set_config",
searpc_signature_int__string_string());
/* RSA encrypt a message with peer's public key. */
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_pubkey_encrypt,
"pubkey_encrypt",
searpc_signature_string__string_string());
/* RSA decrypt a message with my private key. */
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_privkey_decrypt,
"privkey_decrypt",
searpc_signature_string__string());
#ifdef CCNET_SERVER
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_list_peer_stat,
"list_peer_stat",
searpc_signature_objlist__void());
searpc_server_register_function ("ccnet-threaded-rpcserver",
ccnet_rpc_add_emailuser,
"add_emailuser",
@ -200,18 +111,6 @@ ccnet_start_rpc(CcnetSession *session)
"get_emailusers_in_list",
searpc_signature_objlist__string_string());
/* RSA sign a message with my private key. */
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_sign_message,
"sign_message",
searpc_signature_string__string());
/* Verify a message with a peer's public key */
searpc_server_register_function ("ccnet-rpcserver",
ccnet_rpc_verify_message,
"verify_message",
searpc_signature_int__string_string_string());
searpc_server_register_function ("ccnet-threaded-rpcserver",
ccnet_rpc_create_group,
"create_group",
@ -413,248 +312,17 @@ ccnet_start_rpc(CcnetSession *session)
#endif /* CCNET_SERVER */
}
char *
ccnet_rpc_list_peers(GError **error)
{
CcnetPeerManager *peer_mgr = session->peer_mgr;
GList *peer_list, *ptr;
GString *result;
CcnetPeer *peer;
peer_list = ccnet_peer_manager_get_peer_list(peer_mgr);
if (peer_list == NULL) {
g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Failed to get peer list");
return NULL;
}
result = g_string_new("");
ptr = peer_list;
while (ptr) {
peer = ptr->data;
g_string_append_printf(result, "%s\n", peer->id);
ptr = ptr->next;
}
g_list_free(peer_list);
return g_string_free(result, FALSE);
}
GList *
ccnet_rpc_list_resolving_peers (GError **error)
{
CcnetPeerManager *peer_mgr = session->peer_mgr;
return ccnet_peer_manager_get_resolve_peers(peer_mgr);
}
GList *
ccnet_rpc_get_peers_by_role(const char *role, GError **error)
{
CcnetPeerManager *peer_mgr = session->peer_mgr;
return ccnet_peer_manager_get_peers_with_role (peer_mgr, role);
}
GObject *
ccnet_rpc_get_peer(const char *peer_id, GError **error)
{
if (!peer_id)
return NULL;
CcnetPeerManager *peer_mgr = session->peer_mgr;
CcnetPeer *peer = ccnet_peer_manager_get_peer(peer_mgr, peer_id);
return (GObject*)peer;
}
int
ccnet_rpc_update_peer_address (const char *peer_id,
const char *addr,
int port,
GError **error)
{
if (!peer_id || !addr || port <= 0 || port > 65536) {
g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Invalid arguments");
char *path = g_build_filename (session->config_dir, CCNET_SOCKET_NAME, NULL);
SearpcNamedPipeServer *server = searpc_create_named_pipe_server (path);
if (!server) {
ccnet_warning ("Failed to create named pipe server.\n");
g_free (path);
return -1;
}
CcnetPeer *peer = ccnet_peer_manager_get_peer (session->peer_mgr, peer_id);
if (!peer) {
return -1;
}
ccnet_peer_manager_set_peer_public_addr (session->peer_mgr, peer, addr, port);
g_object_unref (peer);
return 0;
return searpc_named_pipe_server_start (server);
}
GObject *
ccnet_rpc_get_peer_by_idname(const char *idname, GError **error)
{
if (!idname)
return NULL;
CcnetPeerManager *peer_mgr = session->peer_mgr;
CcnetPeer *peer = ccnet_peer_manager_get_peer(peer_mgr, idname);
if (!peer)
peer = ccnet_peer_manager_get_peer_by_name (peer_mgr, idname);
if (peer) {
return (GObject*)peer;
}
return NULL;
}
GObject *
ccnet_rpc_get_session_info(GError **error)
{
g_object_ref (session);
return (GObject*)session;
}
int
ccnet_rpc_add_client(const char *peer_id, GError **error)
{
CcnetPeerManager *mgr = session->peer_mgr;
CcnetPeer *peer;
if (strlen(peer_id) != 40) {
g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Peer ID must be of length 40");
return -1;
}
peer = ccnet_peer_manager_get_peer (mgr, peer_id);
if (!peer) {
peer = ccnet_peer_new (peer_id);
ccnet_peer_manager_add_peer (mgr, peer);
}
ccnet_peer_manager_add_role (mgr, peer, "MyClient");
g_object_unref (peer);
return 0;
}
int
ccnet_rpc_add_role(const char *peer_id, const char *role, GError **error)
{
CcnetPeerManager *mgr = session->peer_mgr;
CcnetPeer *peer;
if (!peer_id || strlen(peer_id) != 40) {
g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Peer ID must be of length 40");
return -1;
}
if (!role || strlen(role) <= 2) {
g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Invalid role");
return -1;
}
peer = ccnet_peer_manager_get_peer (mgr, peer_id);
if (!peer) {
g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "No such peer");
return -1;
}
ccnet_peer_manager_add_role (mgr, peer, role);
g_object_unref (peer);
return 0;
}
int
ccnet_rpc_remove_role(const char *peer_id, const char *role, GError **error)
{
CcnetPeerManager *mgr = session->peer_mgr;
CcnetPeer *peer;
if (!peer_id || strlen(peer_id) != 40) {
g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Peer ID must be of length 40");
return -1;
}
if (!role || strlen(role) <= 2) {
g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Invalid role");
return -1;
}
peer = ccnet_peer_manager_get_peer (mgr, peer_id);
if (!peer) {
g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "No such peer");
return -1;
}
ccnet_peer_manager_remove_role (mgr, peer, role);
g_object_unref (peer);
return 0;
}
char *
ccnet_rpc_get_config (const char *key, GError **error)
{
return ccnet_session_config_get_string (session, key);
}
int
ccnet_rpc_set_config (const char *key, const char *value, GError **error)
{
return ccnet_session_config_set_string (session, key, value);
}
char *
ccnet_rpc_pubkey_encrypt (const char *msg_base64, const char *peer_id, GError **error)
{
unsigned char *msg;
gsize msg_len;
CcnetPeer *peer;
unsigned char *enc_msg;
int enc_msg_len;
char *ret;
peer = ccnet_peer_manager_get_peer (session->peer_mgr, peer_id);
if (!peer) {
g_warning ("Cannot find peer %s.\n", peer_id);
return NULL;
}
msg = g_base64_decode (msg_base64, &msg_len);
enc_msg = public_key_encrypt (peer->pubkey, msg, (int)msg_len, &enc_msg_len);
ret = g_base64_encode (enc_msg, enc_msg_len);
g_free (msg);
g_free (enc_msg);
g_object_unref (peer);
return ret;
}
char *
ccnet_rpc_privkey_decrypt (const char *msg_base64, GError **error)
{
unsigned char *msg;
gsize msg_len;
unsigned char *dec_msg;
int dec_msg_len;
char *ret;
msg = g_base64_decode (msg_base64, &msg_len);
dec_msg = private_key_decrypt (session->privkey, msg, (int)msg_len, &dec_msg_len);
if (dec_msg_len < 0) {
g_warning ("Failed to decrypt message with RSA priv key.\n");
g_set_error (error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Failed to decrypt");
g_free (msg);
return NULL;
}
ret = g_base64_encode (dec_msg, dec_msg_len);
g_free (msg);
g_free (dec_msg);
return ret;
}
#ifdef CCNET_SERVER
@ -662,48 +330,6 @@ ccnet_rpc_privkey_decrypt (const char *msg_base64, GError **error)
#include "group-mgr.h"
#include "org-mgr.h"
GList *
ccnet_rpc_list_peer_stat (GError **error)
{
GList *res = NULL;
GList *ptr, *peer_list;
CcnetPeer *peer;
CcnetPeerManager *peer_mgr = session->peer_mgr;
peer_list = ccnet_peer_manager_get_peer_list(peer_mgr);
if (peer_list == NULL) {
g_set_error(error, CCNET_DOMAIN, CCNET_ERR_INTERNAL, "Failed to get peer list");
return NULL;
}
ptr = peer_list;
while (ptr) {
peer = ptr->data;
if (peer->is_self) {
ptr = ptr->next;
continue;
}
guint proc_num = g_hash_table_size (peer->processors);
CcnetPeerStat* stat = ccnet_peer_stat_new ();
g_object_set (stat, "id", peer->id,
"name", peer->name,
"ip", peer->addr_str,
"encrypt", peer->encrypt_channel,
"last_up", (gint64) peer->last_up,
"proc_num", (int)proc_num,
NULL);
res = g_list_prepend (res, stat);
ptr = ptr->next;
}
g_list_free(peer_list);
return g_list_reverse (res);
}
int
ccnet_rpc_add_emailuser (const char *email, const char *passwd,
int is_staff, int is_active, GError **error)
@ -965,55 +591,6 @@ ccnet_rpc_get_superusers (GError **error)
return ccnet_user_manager_get_superusers(user_mgr);
}
char *
ccnet_rpc_sign_message (const char *message, GError **error)
{
unsigned char *sig;
unsigned int sig_len;
char *sigret;
sig = g_new0(unsigned char, RSA_size(session->privkey));
if (!RSA_sign (NID_sha1, (const unsigned char *)message, strlen(message),
sig, &sig_len, session->privkey)) {
g_warning ("Failed to sign message: %lu.\n", ERR_get_error());
return NULL;
}
sigret = g_base64_encode (sig, sig_len);
g_free (sig);
return sigret;
}
int
ccnet_rpc_verify_message (const char *message,
const char *sig_base64,
const char *peer_id,
GError **error)
{
unsigned char *sig;
gsize sig_len;
CcnetPeer *peer;
sig = g_base64_decode (sig_base64, &sig_len);
peer = ccnet_peer_manager_get_peer (session->peer_mgr, peer_id);
if (!peer) {
g_warning ("Cannot find peer %s.\n", peer_id);
return -1;
}
if (!RSA_verify (NID_sha1, (const unsigned char *)message, strlen(message),
sig, (guint)sig_len, peer->pubkey)) {
g_object_unref (peer);
return -1;
}
g_object_unref (peer);
return 0;
}
int
ccnet_rpc_create_group (const char *group_name, const char *user_name,
const char *type, int parent_group_id, GError **error)

View File

@ -5,22 +5,7 @@
struct CcnetSession;
void ccnet_start_rpc(CcnetSession *session);
char *ccnet_rpc_list_peers(GError **error);
GList *ccnet_rpc_list_resolving_peers (GError **error);
GList* ccnet_rpc_get_peers_by_role(const char *role, GError **error);
GObject *ccnet_rpc_get_peer(const char *peerid, GError **error);
int
ccnet_rpc_update_peer_address(const char *peer_id, const char *addr,
int port, GError **error);
GObject *ccnet_rpc_get_peer_by_idname(const char *idname, GError **error);
int ccnet_start_rpc(CcnetSession *session);
char *
ccnet_rpc_list_users(GError **error);
@ -32,40 +17,9 @@ ccnet_rpc_get_user(const char *userid, GError **error);
GObject *
ccnet_rpc_get_user_of_peer(const char *peerid, GError **error);
GObject *ccnet_rpc_get_session_info(GError **error);
int
ccnet_rpc_add_client(const char *user_id, GError **error);
int
ccnet_rpc_add_role(const char *user_id, const char *role, GError **error);
int
ccnet_rpc_remove_role(const char *user_id, const char *role, GError **error);
GList *ccnet_rpc_get_events(int offset, int limit, GError **error);
int ccnet_rpc_count_event (GError **error);
/**
* ccnet_get_config:
*
* Return the config value with key @key
*/
char *
ccnet_rpc_get_config (const char *key, GError **error);
/**
* ccnet_rpc_set_config:
*
* Set the value of config item with key @key to @value
*/
int
ccnet_rpc_set_config (const char *key, const char *value, GError **error);
/**
* ccnet_rpc_upload_profile:
*
@ -74,19 +28,8 @@ ccnet_rpc_set_config (const char *key, const char *value, GError **error);
int
ccnet_rpc_upload_profile (const char *relay_id, GError **error);
char *
ccnet_rpc_pubkey_encrypt (const char *msg_base64,
const char *peer_id,
GError **error);
char *
ccnet_rpc_privkey_decrypt (const char *msg_base64, GError **error);
#ifdef CCNET_SERVER
GList *
ccnet_rpc_list_peer_stat (GError **error);
int
ccnet_rpc_add_emailuser (const char *email, const char *passwd,
int is_staff, int is_active, GError **error);
@ -170,15 +113,6 @@ ccnet_rpc_remove_one_binding (const char *email, const char *peer_id,
GList *
ccnet_rpc_get_peers_by_email (const char *email, GError **error);
char *
ccnet_rpc_sign_message (const char *message, GError **error);
int
ccnet_rpc_verify_message (const char *message,
const char *sig_base64,
const char *peer_id,
GError **error);
int
ccnet_rpc_create_group (const char *group_name, const char *user_name,
const char *type, int parent_group_id, GError **error);

View File

@ -360,7 +360,10 @@ main (int argc, char **argv)
#endif
ccnet_session_start (session);
ccnet_start_rpc(session);
if(ccnet_start_rpc(session) < 0) {
ccnet_warning ("Failed to start ccnet rpc server.\n");
exit (-1);
}
/* actually enter the event loop */
event_dispatch ();

View File

@ -1,13 +1,2 @@
ccnetdir=${pyexecdir}/ccnet
ccnet_PYTHON = __init__.py errors.py status_code.py utils.py \
packet.py message.py \
client.py sync_client.py \
pool.py rpc.py
ccnet_asyncdir = ${ccnetdir}/async
ccnet_async_PYTHON = async/__init__.py \
async/async_client.py async/processor.py \
async/rpcserverproc.py async/sendcmdproc.py \
async/mqclientproc.py async/timer.py
ccnet_PYTHON = __init__.py rpc.py

View File

@ -1,7 +1 @@
from ccnet.errors import NetworkError
from ccnet.sync_client import SyncClient
from ccnet.pool import ClientPool
from ccnet.rpc import RpcClientBase, CcnetRpcClient, CcnetThreadedRpcClient
from ccnet.message import Message
from ccnet.rpc import CcnetThreadedRpcClient

View File

@ -1,14 +0,0 @@
'''
@module: ccnet.async
@description: The async client of ccnet depends on python-libevent,
so we move it to a standalone package.
'''
from .async_client import AsyncClient
from .processor import Processor
from .rpcserverproc import RpcServerProc
from .sendcmdproc import SendCmdProc
from .mqclientproc import MqClientProc
from .timer import Timer

View File

@ -1,250 +0,0 @@
import logging
import libevent
from ccnet.client import Client, parse_update, parse_response
from ccnet.packet import response_to_packet, parse_header, Packet
from ccnet.packet import to_response_id, to_master_id, to_slave_id, to_packet_id
from ccnet.packet import CCNET_MSG_REQUEST, CCNET_MSG_UPDATE, CCNET_MSG_RESPONSE, \
CCNET_HEADER_LENGTH, CCNET_MAX_PACKET_LENGTH
from ccnet.status_code import SC_PROC_DONE, SC_PROC_DEAD, SS_PROC_DEAD, \
SC_UNKNOWN_SERVICE, SC_PROC_KEEPALIVE, SS_PROC_KEEPALIVE, SC_PERM_ERR
from ccnet.status_code import PROC_NO_SERVICE, PROC_PERM_ERR, \
PROC_BAD_RESP, PROC_REMOTE_DEAD
from ccnet.errors import NetworkError
from .processor import Processor
from .sendcmdproc import SendCmdProc
from .mqclientproc import MqClientProc
__all__ = [
'AsyncClient',
]
def debug_print(msg):
print msg
class AsyncClient(Client):
'''Async mode client'''
def __init__(self, config_dir, event_base, central_config_dir=None):
Client.__init__(self, config_dir, central_config_dir)
self.proc_types = {}
self.procs = {}
self.register_processors()
self._bev = None
self._evbase = event_base
def get_event_base(self):
return self._evbase
def add_processor(self, proc):
self.procs[proc.id] = proc
def remove_processor(self, proc):
if proc.id in self.procs:
del self.procs[proc.id]
def get_proc(self, id):
return self.procs.get(id, None)
def write_packet(self, pkt):
outbuf = self._bev.output
outbuf.add(pkt.header.to_string())
outbuf.add(pkt.body)
def send_response(self, id, code, code_msg, content=''):
id = to_response_id(id)
pkt = response_to_packet(id, code, code_msg, content)
self.write_packet(pkt)
def handle_packet(self, pkt):
ptype = pkt.header.ptype
if ptype == CCNET_MSG_REQUEST:
self.handle_request(pkt.header.id, pkt.body)
elif ptype == CCNET_MSG_UPDATE:
code, code_msg, content = parse_update(pkt.body)
self.handle_update(pkt.header.id, code, code_msg, content)
elif ptype == CCNET_MSG_RESPONSE:
code, code_msg, content = parse_response(pkt.body)
self.handle_response(pkt.header.id, code, code_msg, content)
else:
logging.warning("unknown packet type %d", ptype)
def handle_request(self, id, req):
commands = req.split()
self.create_slave_processor(to_slave_id(id), commands)
def create_slave_processor(self, id, commands):
peer_id = self.peer_id
if commands[0] == 'remote':
if len(commands) < 3:
logging.warning("invalid request %s", commands)
return
peer_id = commands[1]
commands = commands[2:]
proc_name = commands[0]
if not proc_name in self.proc_types:
logging.warning("unknown processor type %s", proc_name)
return
cls = self.proc_types[proc_name]
proc = cls(proc_name, id, peer_id, self)
self.add_processor(proc)
proc.start(*commands[1:])
def create_master_processor(self, proc_name):
id = self.get_request_id()
cls = self.proc_types.get(proc_name, None)
if cls == None:
logging.error('unknown processor type %s', proc_name)
return None
proc = cls(proc_name, id, self.peer_id, self)
self.add_processor(proc)
return proc
def handle_update(self, id, code, code_msg, content):
proc = self.get_proc(to_slave_id(id))
if proc == None:
if code != SC_PROC_DEAD:
self.send_response(id, SC_PROC_DEAD, SS_PROC_DEAD)
return
if code[0] == '5':
logging.info('shutdown processor %s(%d): %s %s\n',
proc.name, to_packet_id(proc.id), code, code_msg)
if code == SC_UNKNOWN_SERVICE:
proc.shutdown(PROC_NO_SERVICE)
elif code == SC_PERM_ERR:
proc.shutdown(PROC_PERM_ERR)
else:
proc.shutdown(PROC_BAD_RESP)
elif code == SC_PROC_KEEPALIVE:
proc.send_response(SC_PROC_KEEPALIVE, SS_PROC_KEEPALIVE)
elif code == SC_PROC_DEAD:
logging.info('shutdown processor %s(%d): when peer(%.8s) processor is dead\n',
proc.name, to_packet_id(proc.id), proc.peer_id)
proc.shutdown(PROC_REMOTE_DEAD)
elif code == SC_PROC_DONE:
proc.done(True)
else:
proc.handle_update(code, code_msg, content)
def handle_response(self, id, code, code_msg, content):
proc = self.get_proc(to_master_id(id))
if proc == None:
if code != SC_PROC_DEAD:
self.send_update(id, SC_PROC_DEAD, SS_PROC_DEAD)
return
if code[0] == '5':
logging.info('shutdown processor %s(%d): %s %s\n',
proc.name, to_packet_id(proc.id), code, code_msg)
if code == SC_UNKNOWN_SERVICE:
proc.shutdown(PROC_NO_SERVICE)
elif code == SC_PERM_ERR:
proc.shutdown(PROC_PERM_ERR)
else:
proc.shutdown(PROC_BAD_RESP)
elif code == SC_PROC_KEEPALIVE:
proc.send_update(id, SC_PROC_KEEPALIVE, SS_PROC_KEEPALIVE)
elif code == SC_PROC_DEAD:
logging.info('shutdown processor %s(%d): when peer(%.8s) processor is dead\n',
proc.name, to_packet_id(proc.id), proc.peer_id)
proc.shutdown(PROC_REMOTE_DEAD)
else:
proc.handle_response(code, code_msg, content)
def register_processor(self, proc_name, proc_type):
assert Processor in proc_type.mro()
self.proc_types[proc_name] = proc_type
def register_processors(self):
self.register_processor("send-cmd", SendCmdProc)
self.register_processor("mq-client", MqClientProc)
def register_service(self, service, group, proc_type, callback=None):
self.register_processor(service, proc_type)
cmd = 'register-service %s %s' % (service, group)
self.send_cmd(cmd, callback)
def send_cmd(self, cmd, callback=None):
proc = self.create_master_processor("send-cmd")
if callback:
proc.set_callback(callback)
proc.start()
proc.send_cmd(cmd)
def _read_cb(self, bev, cb_data):
dummy = bev, cb_data
inbuf = self._bev.input
while (True):
raw = inbuf.copyout(CCNET_HEADER_LENGTH)
header = parse_header(raw)
if len(inbuf) < CCNET_HEADER_LENGTH + header.length:
break
inbuf.drain(CCNET_HEADER_LENGTH)
data = inbuf.copyout(header.length)
pkt = Packet(header, data)
self.handle_packet(pkt)
inbuf.drain(header.length)
if len(inbuf) < CCNET_HEADER_LENGTH:
break
def _event_cb(self, bev, what, cb_data):
dummy = bev, cb_data
logging.warning('libevent error: what = %s' % what)
if what & libevent.BEV_EVENT_EOF or \
what & libevent.BEV_EVENT_ERROR or \
what & libevent.BEV_EVENT_READING or \
what & libevent.BEV_EVENT_WRITING:
if self._bev is not None:
self._bev = None
raise NetworkError('libevent error: what = %s' % what)
def base_loop(self):
'''Create an event base -> register socket events -> loop'''
self._bev = libevent.BufferEvent(self._evbase,
self._connfd.fileno())
self._bev.set_watermark(libevent.EV_READ,
CCNET_HEADER_LENGTH, # low wartermark
CCNET_MAX_PACKET_LENGTH * 2) # highmark
self._bev.set_callbacks(self._read_cb, # read callback
None, # write callback
self._event_cb) # event callback
self._bev.enable(libevent.EV_READ | libevent.EV_WRITE)
self._evbase.loop()
def main_loop(self):
self.base_loop()

View File

@ -1,50 +0,0 @@
import logging
from .processor import Processor
from ccnet.message import message_from_string, message_to_string
INIT = 0
REQUEST_SENT = 1
READY = 2
SC_MSG = '300'
SC_UNSUBSCRIBE = '301'
class MqClientProc(Processor):
def __init__(self, *args, **kwargs):
Processor.__init__(self, *args, **kwargs)
self.state = INIT
self.callback = None
def start(self, *argv):
req = 'mq-server ' + ' '.join(argv)
self.send_request(req)
self.state = REQUEST_SENT
def set_callback(self, cb):
self.callback = cb
def handle_response(self, code, code_msg, content):
if self.state == REQUEST_SENT:
if code[0] != '2':
logging.warning('bad response: %s %s\n', code, code_msg)
self.done(False)
self.state = READY
elif self.state == READY:
if code[0] != '2' and code[0] != '3':
logging.warning('bad response: %s %s\n', code, code_msg)
return
if code[0] == '3' and code[2] == '0':
msg = message_from_string(content[:-1])
if self.callback:
self.callback(msg)
def put_message(self, msg):
buf = message_to_string(msg)
self.send_update(SC_MSG, '', buf + '\000')
def unsubscribe(self):
self.send_update(SC_UNSUBSCRIBE, '')
self.done(True)

View File

@ -1,57 +0,0 @@
import logging
from ccnet.packet import SLAVE_BIT_MASK, to_print_id
from ccnet.status_code import SC_PROC_DONE, SS_PROC_DONE, PROC_DONE
class Processor(object):
"""Base processor class"""
name = "Processor"
def __init__(self, name, id, peer_id, client):
self.name = name
self.id = id
self.peer_id = peer_id
self.client = client
def start(self, *args, **kwargs):
raise NotImplementedError
def handle_request(self, *args, **kwargs):
raise NotImplementedError
def handle_update(self, *args, **kwargs):
raise NotImplementedError
def handle_response(self, *args, **kwargs):
raise NotImplementedError
def __str__(self):
return "<proc %s(%d)>" % (self.name, to_print_id(self.id))
def is_master(self):
return not (self.id & SLAVE_BIT_MASK)
def send_request(self, buf):
assert self.is_master()
return self.client.send_request(self.id, buf)
def send_response(self, code, code_msg, content=''):
assert not self.is_master()
return self.client.send_response(self.id, code, code_msg, content)
def send_update(self, code, code_msg, content=''):
assert self.is_master()
return self.client.send_update(self.id, code, code_msg, content)
def done(self, success):
if self.is_master() and success:
self.send_update(SC_PROC_DONE, SS_PROC_DONE, '')
self.client.remove_processor(self)
def shutdown(self, reason):
if reason > PROC_DONE:
logging.debug('shut down %s: %s', self, reason)
self.client.remove_processor(self)

View File

@ -1,43 +0,0 @@
from pysearpc import searpc_server
from ccnet.status_code import SC_OK, SS_OK
from ccnet.status_code import SC_SERVER_RET, SS_SERVER_RET, SC_SERVER_MORE, SS_SERVER_MORE, \
SC_CLIENT_CALL, SC_CLIENT_MORE, SC_CLIENT_CALL_MORE
from .processor import Processor
class RpcServerProc(Processor):
name = 'rpcserver-proc'
max_transfer_length = 65535 - 128
def __init__(self, *args, **kwargs):
Processor.__init__(self, *args, **kwargs)
self.fretstr = ''
self.fcallstr = ''
def start(self, *argv):
self.send_response(SC_OK, SS_OK, '')
def send_fret(self):
maxlen = self.max_transfer_length
l = len(self.fretstr)
if l < maxlen:
self.send_response(SC_SERVER_RET, SS_SERVER_RET, self.fretstr)
self.fretstr = ''
else:
buf = self.fretstr[:maxlen]
self.send_response(SC_SERVER_MORE, SS_SERVER_MORE, buf)
self.fretstr = self.fretstr[maxlen:]
def handle_update(self, code, code_msg, content):
if code == SC_CLIENT_CALL_MORE:
self.fcallstr += content
return
elif code == SC_CLIENT_CALL:
self.fcallstr += content
self.fretstr = searpc_server.call_function(self.name, self.fcallstr)
self.fcallstr = ''
self.send_fret()
elif code == SC_CLIENT_MORE:
self.send_fret()

View File

@ -1,34 +0,0 @@
import logging
from .processor import Processor
INIT = 0
REQUET_SENT = 1
CONNECTED = 2
class SendCmdProc(Processor):
name = "send-cmd"
def __init__(self, *args, **kwargs):
Processor.__init__(self, *args, **kwargs)
self.callback = None
self.state = INIT
def start(self, *argv):
self.send_request('receive-cmd')
self.state = REQUET_SENT
def set_callback(self, cb):
self.callback = cb
def send_cmd(self, cmd):
self.send_update('200', '', cmd + '\000')
def handle_response(self, code, code_msg, content):
if code[0] != '2':
logging.warning("Received bad response %s %s", code, code_msg)
if self.state == REQUET_SENT:
self.state = CONNECTED
elif self.state == CONNECTED:
if self.callback:
self.callback(code, code_msg, content)

View File

@ -1,21 +0,0 @@
import libevent
import logging
class Timer(object):
'''Wraps aroud a libevent timeout event'''
def __init__(self, ev_base, timeout):
self._timeout = timeout
self._evtimer = libevent.Timer(ev_base, self._callback, None)
self._evtimer.add(timeout) # pylint: disable=E1101
def _callback(self, evtimer, user_data):
dummy = user_data
try:
self.callback()
except:
logging.exception('error in timer callback:')
evtimer.add(self._timeout)
def callback(self):
raise NotImplementedError

View File

@ -1,148 +0,0 @@
#coding: UTF-8
import os
import socket
import ConfigParser
import logging
from ccnet.packet import to_request_id, to_update_id
from ccnet.packet import request_to_packet, update_to_packet
from ccnet.packet import write_packet
from ccnet.errors import NetworkError
from .utils import is_win32, make_socket_closeonexec
CCNET_PIPE_NAME = 'ccnet.sock'
def parse_response(body):
'''Parse the content of the response
The struct of response data:
- first 3 bytes is the <status code>
- from the 4th byte to the first occurrence of '\n' is the <status message>. If the 4th byte is '\n', then there is no <status message>
- from the first occurrence of '\n' to the end is the <content>
'''
code = body[:3]
if body[3] == '\n':
code_msg = ''
content = body[4:]
else:
pos = body.index('\n')
code_msg = body[4:pos]
content = body[pos + 1:]
return code, code_msg, content
def parse_update(body):
'''The structure of an update is the same with a response'''
code = body[:3]
if body[3] == '\n':
code_msg = ''
content = body[4:]
else:
pos = body.index('\n')
code_msg = body[4:pos]
content = body[pos + 1:]
return code, code_msg, content
class Client(object):
'''Base ccnet client class'''
def __init__(self, config_dir, central_config_dir=None):
if not isinstance(config_dir, unicode):
config_dir = config_dir.decode('UTF-8')
if central_config_dir:
central_config_dir = os.path.expanduser(central_config_dir)
if not os.path.exists(central_config_dir):
raise RuntimeError(u'%s does not exits' % central_config_dir)
config_dir = os.path.expanduser(config_dir)
config_file = os.path.join(central_config_dir if central_config_dir else config_dir,
u'ccnet.conf')
logging.debug('using config file %s', config_file)
if not os.path.exists(config_file):
raise RuntimeError(u'%s does not exits' % config_file)
self.central_config_dir = central_config_dir
self.config_dir = config_dir
self.config_file = config_file
self.config = None
self.port = None
self.peer_id = None
self.peer_name = None
self.parse_config()
self._connfd = None
self._req_id = 1000
def __del__(self):
'''Destructor of the client class. We close the socket here, if
connetced to daemon
'''
if self.is_connected():
try:
self._connfd.close()
except:
pass
def parse_config(self):
self.config = ConfigParser.ConfigParser()
self.config.read(self.config_file)
if self.config.has_option('Client', 'PORT'):
self.port = self.config.getint('Client', 'PORT')
else:
self.port = 10001
self.un_path = ''
if self.config.has_option('Client', 'UNIX_SOCKET'):
self.un_path = self.config.get('Client', 'UNIX_SOCKET')
self.peer_id = self.config.get('General', 'ID')
self.peer_name = self.config.get('General', 'NAME')
def connect_daemon_with_pipe(self):
self._connfd = socket.socket(socket.AF_UNIX)
if not self.un_path:
pipe_name = os.path.join(self.config_dir, CCNET_PIPE_NAME)
else:
pipe_name = self.un_path
try:
self._connfd.connect(pipe_name)
except:
raise NetworkError("Can't connect to daemon")
make_socket_closeonexec(self._connfd.fileno())
def connect_daemon_with_socket(self):
self._connfd = socket.socket()
self._connfd.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
try:
self._connfd.connect(('127.0.0.1', self.port))
except:
raise NetworkError("Can't connect to daemon")
make_socket_closeonexec(self._connfd.fileno())
def connect_daemon(self):
if is_win32():
return self.connect_daemon_with_socket()
else:
return self.connect_daemon_with_pipe()
def is_connected(self):
return self._connfd is not None
def send_request(self, id, req):
id = to_request_id(id)
pkt = request_to_packet(id, req)
write_packet(self._connfd, pkt)
def send_update(self, id, code, code_msg, content=''):
id = to_update_id(id)
pkt = update_to_packet(id, code, code_msg, content)
write_packet(self._connfd, pkt)
def get_request_id(self):
self._req_id += 1
return self._req_id

View File

@ -1,7 +0,0 @@
class NetworkError(Exception):
def __init__(self, msg):
Exception.__init__(self)
self.msg = msg
def __str__(self):
return self.msg

View File

@ -1,47 +0,0 @@
#coding: UTF-8
'''Message is the carrier of a simple Pub/Sub system on top of ccnet'''
import datetime
import re
import uuid
import time
MESSAGE_PATTERN = re.compile(r'(?P<flags>[\d]+) (?P<from>[^ ]+) (?P<to>[^ ]+) (?P<id>[^ ]+) (?P<ctime>[^ ]+) (?P<rtime>[^ ]+) (?P<app>[^ ]+) (?P<body>.*)')
class Message(object):
def __init__(self, d):
self.flags = int(d['flags'])
self.from_ = d['from']
self.to = d['to']
self.id = d['id']
self.ctime = float(d['ctime'])
self.rtime = float(d['rtime'])
self.app = d['app']
self.body = d['body']
def message_from_string(s):
results = MESSAGE_PATTERN.match(s)
if results is None:
raise RuntimeError('Bad message: %s' % s)
d = results.groupdict()
return Message(d)
def gen_inner_message_string(self_id, app, content):
result = "%d %s %s %s %d %d %s %s\000" % (0, self_id, self_id, str(uuid.uuid1()),
int(time.time()), 0,
app, content)
return result
def message_to_string(msg):
f = '%(flags)s %(from_)s %(to)s %(id)s %(ctime)s %(rtime)s %(app)s %(body)s'
return f % dict(flags=msg.flags,
from_=msg.from_,
to=msg.to,
id=msg.id,
ctime=msg.ctime,
rtime=msg.rtime,
app=msg.app,
body=msg.body)

View File

@ -1,129 +0,0 @@
#coding: UTF-8
"""
Packet level protocol of ccnet.
About various types of id:
- A slave processor's id has its highest bit set; a master processor has its highest bit clear
- The <id> field of a ccnet packet always has its highest bit clear. The
<type> field of the packet determines what type of the packet is (a
request, a response, or an update)
"""
import logging
import struct
from ccnet.utils import recvall, sendall, NetworkError
REQUEST_ID_MASK = 0x7fffffff
SLAVE_BIT_MASK = 0x80000000
CCNET_MSG_OK = 0
CCNET_MSG_HANDSHAKE = 1
CCNET_MSG_REQUEST = 2
CCNET_MSG_RESPONSE = 3
CCNET_MSG_UPDATE = 4
CCNET_MSG_RELAY = 5
def to_request_id(id):
return id & REQUEST_ID_MASK
to_response_id = to_request_id
to_update_id = to_request_id
to_master_id = to_request_id
to_packet_id = to_request_id
def to_slave_id(id):
return id | SLAVE_BIT_MASK
def to_print_id(id):
if id & SLAVE_BIT_MASK:
return -to_request_id(id)
else:
return id
# the byte sequence of ccnet packet header
CCNET_HEADER_FORMAT = '>BBHI'
# Number of bytes for the header
CCNET_HEADER_LENGTH = struct.calcsize(CCNET_HEADER_FORMAT)
CCNET_MAX_PACKET_LENGTH = 65535
class PacketHeader(object):
def __init__(self, ver, ptype, length, id):
self.ver = ver
self.ptype = ptype
self.length = length
self.id = id
def to_string(self):
return struct.pack(CCNET_HEADER_FORMAT, self.ver, self.ptype, self.length, self.id)
def __str__(self):
return "<PacketHeader: type = %d, length = %d, id = %u>" % (self.ptype, self.length, self.id)
class Packet(object):
version = 1
def __init__(self, header, body):
self.header = header
self.body = body
def parse_header(buf):
try:
ver, ptype, length, id = struct.unpack(CCNET_HEADER_FORMAT, buf)
except struct.error, e:
raise NetworkError('error when unpack packet header: %s' % e)
return PacketHeader(ver, ptype, length, id)
def format_response(code, code_msg, content):
body = code
if code_msg:
body += " " + code_msg
body += "\n"
if content:
body += content
return body
format_update = format_response
def request_to_packet(id, buf):
hdr = PacketHeader(1, CCNET_MSG_REQUEST, len(buf), to_request_id(id))
return Packet(hdr, buf)
def response_to_packet(id, code, code_msg, content):
body = format_response(code, code_msg, content)
hdr = PacketHeader(1, CCNET_MSG_RESPONSE, len(body), to_response_id(id))
return Packet(hdr, body)
def update_to_packet(id, code, code_msg, content):
body = format_update(code, code_msg, content)
hdr = PacketHeader(1, CCNET_MSG_UPDATE, len(body), to_update_id(id))
return Packet(hdr, body)
def read_packet(fd):
hdr = recvall(fd, CCNET_HEADER_LENGTH)
if len(hdr) == 0:
logging.warning('connection to daemon is lost')
raise NetworkError('Connection to daemon is lost')
elif len(hdr) < CCNET_HEADER_LENGTH:
raise NetworkError('Only read %d bytes header, expected 8' % len(hdr))
header = parse_header(hdr)
if header.length == 0:
body = ''
else:
body = recvall(fd, header.length)
if len(body) < header.length:
raise NetworkError('Only read %d bytes body, expected %d' % (len(body), header.length))
return Packet(header, body)
def write_packet(fd, packet):
hdr = packet.header.to_string()
sendall(fd, hdr)
sendall(fd, packet.body)

View File

@ -1,36 +0,0 @@
from ccnet.sync_client import SyncClient
import Queue
class ClientPool(object):
"""ccnet client pool."""
def __init__(self, conf_dir, pool_size=5, central_config_dir=None):
"""
:param central_config_dir: path to the central config dir for ccnet/seafile/seahub/seafdav etc.
:param conf_dir: the ccnet configuration directory
:param pool_size:
"""
self.central_config_dir = central_config_dir
self.conf_dir = conf_dir
self.pool_size = pool_size
self._pool = Queue.Queue(pool_size)
def _create_client(self):
client = SyncClient(self.conf_dir, self.central_config_dir)
client.req_ids = {}
client.connect_daemon()
return client
def get_client(self):
try:
client = self._pool.get(False)
except:
client = self._create_client()
return client
def return_client(self, client):
try:
self._pool.put(client, False)
except Queue.Full:
pass

View File

@ -1,179 +1,9 @@
from pysearpc import SearpcClient, searpc_func, SearpcError
from pysearpc import searpc_func, NamedPipeClient
from ccnet.status_code import SC_CLIENT_CALL, SS_CLIENT_CALL, \
SC_CLIENT_MORE, SS_CLIENT_MORE, SC_SERVER_RET, \
SC_SERVER_MORE, SC_PROC_DEAD
class CcnetThreadedRpcClient(NamedPipeClient):
from ccnet.errors import NetworkError
class DeadProcError(Exception):
def __str__(self):
return "Processor is dead"
class RpcClientBase(SearpcClient):
def __init__(self, ccnet_client_pool, service_name, retry_num=1,
is_remote=False, remote_peer_id='', req_pool=False):
SearpcClient.__init__(self)
self.pool = ccnet_client_pool
self.service_name = service_name
self.retry_num = retry_num
self.is_remote = is_remote
self.remote_peer_id = remote_peer_id
self.req_pool = req_pool
if self.is_remote and len(self.remote_peer_id) != 40:
raise ValueError("Invalid remote peer id")
def _start_service(self, client):
req_id = client.get_request_id()
req_str = self.service_name
if self.is_remote:
req_str = "remote " + self.remote_peer_id + " " + self.service_name
client.send_request(req_id, req_str)
rsp = client.read_response()
if rsp.code != "200":
raise SearpcError("Error received: %s %s (In _start_service)" % (rsp.code, rsp.code_msg))
return req_id
def _real_call(self, client, req_id, fcall_str):
client.send_update(req_id, SC_CLIENT_CALL, SS_CLIENT_CALL, fcall_str)
rsp = client.read_response()
if rsp.code == SC_SERVER_RET:
return rsp.content
elif rsp.code == SC_SERVER_MORE:
buf = rsp.content
while True:
client.send_update(req_id, SC_CLIENT_MORE,
SS_CLIENT_MORE, '')
rsp = client.read_response()
if rsp.code == SC_SERVER_MORE:
buf += rsp.content
elif rsp.code == SC_SERVER_RET:
buf += rsp.content
break
else:
raise SearpcError("Error received: %s %s (In Read More)" % (rsp.code, rsp.code_msg))
return buf
elif rsp.code == SC_PROC_DEAD:
raise DeadProcError()
else:
raise SearpcError("Error received: %s %s" % (rsp.code, rsp.code_msg))
def call_remote_func_sync(self, fcall_str):
"""Call remote function `fcall_str` and wait response."""
retried = 0
while True:
try:
client = self.pool.get_client()
if self.req_pool:
req_id = client.req_ids.get(self.service_name, -1)
if req_id == -1:
req_id = self._start_service(client)
client.req_ids[self.service_name] = req_id
try:
ret = self._real_call(client, req_id, fcall_str)
except DeadProcError:
client.req_ids[self.service_name] = -1
self.pool.return_client(client)
if retried < self.retry_num:
retried = retried + 1
continue
else:
raise
self.pool.return_client(client)
return ret
else:
# no req pool
req_id = self._start_service(client)
ret = self._real_call(client, req_id, fcall_str)
client.send_update(req_id, "103", "service is done", "")
self.pool.return_client(client)
return ret
except (NetworkError, SearpcError):
# the client is not returned to the pool and is freed automatically
if retried < self.retry_num:
retried = retried + 1
continue
else:
raise
class CcnetRpcClient(RpcClientBase):
def __init__(self, ccnet_client_pool, retry_num=1, *args, **kwargs):
RpcClientBase.__init__(self, ccnet_client_pool, "ccnet-rpcserver",
*args, **kwargs)
@searpc_func("string", [])
def list_peers(self):
pass
@searpc_func("objlist", [])
def list_resolving_peers(self):
pass
@searpc_func("objlist", ["string"])
def get_peers_by_role(self):
pass
@searpc_func("object", ["string"])
def get_peer(self):
pass
@searpc_func("object", [])
def get_session_info(self):
pass
@searpc_func("int", ["string"])
def add_client(self):
pass
@searpc_func("int", ["string", "string"])
def add_role(self, peer_id, role):
pass
@searpc_func("int", ["string", "string"])
def remove_role(self, peer_id, role):
pass
@searpc_func("objlist", ["int", "int"])
def get_procs_alive(self, offset, limit):
pass
@searpc_func("int", [])
def count_procs_alive(self):
pass
@searpc_func("objlist", ["int", "int"])
def get_procs_dead(self, offset, limit):
pass
@searpc_func("int", [])
def count_procs_dead(self):
pass
@searpc_func("string", ["string"])
def get_config(self, key):
pass
@searpc_func("int", ["string", "string"])
def set_config(self, key, value):
pass
@searpc_func("objlist", [])
def list_peer_stat(self, key, value):
pass
class CcnetThreadedRpcClient(RpcClientBase):
def __init__(self, ccnet_client_pool, retry_num=1, *args, **kwargs):
RpcClientBase.__init__(self, ccnet_client_pool, "ccnet-threaded-rpcserver",
*args, **kwargs)
def __init__(self, socket_path):
NamedPipeClient.__init__(self, socket_path, "ccnet-threaded-rpcserver")
@searpc_func("int", ["string", "string", "int", "int"])
def add_emailuser(self, email, passwd, is_staff, is_active):

View File

@ -1,77 +0,0 @@
#coding: UTF-8
'''Status code and status messages used in ccnet. Should be treated as constants'''
EC_NETWORK_ERR = 1
ES_NETWORK_ERR = 'Network Error'
SC_PROC_KEEPALIVE = '100'
SS_PROC_KEEPALIVE = 'processor keep alive'
SC_PROC_ALIVE = '101'
SS_PROC_ALIVE = 'processor is alive'
SC_PROC_DEAD = '102'
SS_PROC_DEAD = 'processor is dead'
SC_PROC_DONE = '103'
SS_PROC_DONE = 'service is done'
SC_OK = '200'
SS_OK = 'OK'
SC_SERV_EXISTED = '210'
SS_SERV_EXISTED = 'The service existed'
SC_PERM_CHECKING = '250'
SS_PERM_CHECKING = 'Permission Checking'
SC_SHUTDOWN = '500'
SS_SHUTDOWN = 'Shutdown'
SC_CREATE_PROC_ERR = '501'
SS_CREATE_PROC_ERR = 'Create Processor Error'
SC_BAD_PEER = '502'
SS_BAD_PEER = 'Bad peer id'
SC_BAD_USER = '502'
SS_BAD_USER = 'Bad user id'
SC_BAD_ARGS = '503'
SS_BAD_ARGS = 'Bad arguments'
SC_PERM_ERR = '504'
SS_PERM_ERR = 'Permission Error'
SC_BAD_UPDATE_CODE = '506'
SS_BAD_UPDATE_CODE = 'Bad update code'
SC_BAD_RESPONSE_CODE = '507'
SS_BAD_RESPONSE_CODE = 'Bad response code'
SC_VERSION_MISMATCH = '508'
SS_VERSION_MISMATCH = 'Version Mismatch'
SC_UNKNOWN_PEER = '510'
SS_UNKNOWN_PEER = 'Unknown peer'
SC_UNKNOWN_SERVICE = '511'
SS_UNKNOWN_SERVICE = 'Unknown service'
SC_PEER_UNREACHABLE = '512'
SS_PEER_UNREACHABLE = 'Peer Unreachable'
SC_CON_TIMEOUT = '513'
SS_CON_TIMEOUT = 'connection timeout'
SC_KEEPALIVE_TIMEOUT = '514'
SS_KEEPALIVE_TIMEOUT = 'keepalive timeout'
SC_NETDOWN = '515'
SS_NETDOWN = 'peer down'
PROC_NOTSET = 0
PROC_DONE = 1
PROC_REMOTE_DEAD = 2
PROC_NO_SERVICE = 3
PROC_PERM_ERR = 4
PROC_BAD_RESP = 5
SC_CLIENT_CALL = '301'
SS_CLIENT_CALL = 'CLIENT CALL'
SC_CLIENT_MORE = '302'
SS_CLIENT_MORE = 'MORE'
SC_CLIENT_CALL_MORE = '303'
SS_CLIENT_CALL_MORE = 'CLIENT HAS MORE'
SC_SERVER_RET = '311'
SS_SERVER_RET = 'SERVER RET'
SC_SERVER_MORE = '312'
SS_SERVER_MORE = 'HAS MORE'
SC_SERVER_ERR = '411'
SS_SERVER_ERR = 'Fail to invoke the function, check the function'

View File

@ -1,94 +0,0 @@
from ccnet.client import Client, parse_response
from ccnet.packet import read_packet, CCNET_MSG_RESPONSE
from ccnet.status_code import SC_PROC_DONE, SS_PROC_DONE
from ccnet.message import message_from_string, gen_inner_message_string
_REQ_ID_START = 1000
class Response(object):
def __init__(self, code, code_msg, content):
self.code = code
self.code_msg = code_msg
self.content = content
class SyncClient(Client):
'''sync mode client'''
def __init__(self, config_dir, central_config_dir=None):
Client.__init__(self, config_dir, central_config_dir)
self._req_id = _REQ_ID_START
self.mq_req_id = -1
def disconnect_daemon(self):
if self.is_connected():
try:
self._connfd.close()
except:
pass
def read_response(self):
packet = read_packet(self._connfd)
if packet.header.ptype != CCNET_MSG_RESPONSE:
raise RuntimeError('Invalid Response')
code, code_msg, content = parse_response(packet.body)
return Response(code, code_msg, content)
def send_cmd(self, cmd):
req_id = self.get_request_id()
self.send_request(req_id, 'receive-cmd')
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg))
cmd += '\000'
self.send_update(req_id, '200', '', cmd)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg))
self.send_update(req_id, SC_PROC_DONE, SS_PROC_DONE, '')
def prepare_recv_message(self, msg_type):
request = 'mq-server %s' % msg_type
req_id = self.get_request_id()
self.send_request(req_id, request)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
def receive_message(self):
resp = self.read_response()
# the message from ccnet daemon has the trailing null byte included
msg = message_from_string(resp.content[:-1])
return msg
def prepare_send_message(self):
request = 'mq-server'
mq_req_id = self.get_request_id()
self.send_request(mq_req_id, request)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
self.mq_req_id = mq_req_id
def send_message(self, msg_type, content):
if self.mq_req_id == -1:
self.prepare_send_message()
msg = gen_inner_message_string(self.peer_id, msg_type, content)
self.send_update(self.mq_req_id, "300", '', msg)
resp = self.read_response()
if resp.code != '200':
self.mq_req_id = -1
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
def register_service_sync(self, service, group):
'''Mainly used by a program to register a dummy service to ensure only
single instance of that program is running
'''
cmd = 'register-service %s %s' % (service, group)
self.send_cmd(cmd)

View File

@ -1,62 +0,0 @@
import sys
import os
from ccnet import ClientPool, RpcClientBase
from pysearpc import searpc_func
import threading
import logging
RPC_SERVICE_NAME = 'test-rpcserver'
CCNET_CONF_DIR = os.path.expanduser('~/.ccnet')
class TestRpcClient(RpcClientBase):
def __init__(self, client_pool, *args, **kwargs):
RpcClientBase.__init__(self, client_pool, RPC_SERVICE_NAME, *args, **kwargs)
@searpc_func('string', ['string', 'int'])
def str_mul(self, s, n):
pass
class Worker(threading.Thread):
def __init__(self, rpc):
threading.Thread.__init__(self)
self.rpc = rpc
def run(self):
s = 'abcdef'
n = 100
assert self.rpc.str_mul(s, n) == s * n
def test(n):
rpcclient = TestRpcClient(ClientPool(CCNET_CONF_DIR, CCNET_CONF_DIR))
workers = []
for i in xrange(n):
t = Worker(rpcclient)
t.start()
workers.append(t)
for t in workers:
t.join()
def setup_logging():
kw = {
'format': '[%(asctime)s][%(module)s]: %(message)s',
'datefmt': '%m/%d/%Y %H:%M:%S',
'level': logging.DEBUG,
'stream': sys.stdout,
}
logging.basicConfig(**kw)
def main():
setup_logging()
if len(sys.argv) > 1:
test(int(sys.argv[1]))
else:
test(100)
print 'test passed'
if __name__ == '__main__':
main()

View File

@ -1,45 +0,0 @@
import os
import logging
import libevent
from pysearpc import searpc_server
from ccnet.async import AsyncClient, RpcServerProc
RPC_SERVICE_NAME = 'test-rpcserver'
CCNET_CONF_DIR = os.path.expanduser('~/.ccnet')
def init_logging():
"""Configure logging module"""
level = logging.DEBUG
kw = {
'format': '[%(asctime)s] %(message)s',
'datefmt': '%m/%d/%Y %H:%M:%S',
'level': level,
}
logging.basicConfig(**kw)
i = 0
def register_rpc_functions(session):
def str_mul(a, b):
global i
i = i + 1
print '[%s] a = %s, b = %s' % (i, a, b)
return a * b
searpc_server.create_service(RPC_SERVICE_NAME)
searpc_server.register_function(RPC_SERVICE_NAME, str_mul)
session.register_service(RPC_SERVICE_NAME, 'basic', RpcServerProc)
def main():
init_logging()
evbase = libevent.Base()
session = AsyncClient(CCNET_CONF_DIR, evbase, CCNET_CONF_DIR)
session.connect_daemon()
register_rpc_functions(session)
session.main_loop()
if __name__ == '__main__':
main()

View File

@ -1,45 +0,0 @@
import os
import socket
from ccnet.errors import NetworkError
def recvall(fd, total):
remain = total
data = ''
while remain > 0:
try:
new = fd.recv(remain)
except socket.error as e:
raise NetworkError('Failed to read from socket: %s' % e)
n = len(new)
if n <= 0:
raise NetworkError("Failed to read from socket")
else:
data += new
remain -= n
return data
def sendall(fd, data):
total = len(data)
offset = 0
while offset < total:
try:
n = fd.send(data[offset:])
except socket.error as e:
raise NetworkError('Failed to write to socket: %s' % e)
if n <= 0:
raise NetworkError('Failed to write to socket')
else:
offset += n
def is_win32():
return os.name == 'nt'
def make_socket_closeonexec(fd):
if not is_win32():
import fcntl
old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)