1
0
mirror of https://github.com/haiwen/ccnet-server.git synced 2025-06-20 10:51:55 +00:00
ccnet-server/lib/ccnetrpc-transport.c
2016-08-19 13:54:34 +08:00

185 lines
5.6 KiB
C

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ccnet/ccnetrpc-transport.h>
#include "rpc-common.h"
#include <ccnet/async-rpc-proc.h>
static char *
invoke_service (CcnetClient *session,
const char *peer_id,
const char *service,
const char *fcall_str,
size_t fcall_len,
size_t *ret_len)
{
struct CcnetResponse *rsp;
uint32_t req_id;
GString *buf;
req_id = ccnet_client_get_rpc_request_id (session, peer_id, service);
if (req_id == 0) {
*ret_len = 0;
return NULL;
}
ccnet_client_send_update (session, req_id,
SC_CLIENT_CALL, SS_CLIENT_CALL,
fcall_str, fcall_len);
if (ccnet_client_read_response (session) < 0) {
*ret_len = 0;
ccnet_client_clean_rpc_request (session, req_id);
return NULL;
}
rsp = &session->response;
if (memcmp (rsp->code, SC_SERVER_RET, 3) == 0) {
*ret_len = (size_t) rsp->clen;
return g_strndup (rsp->content, rsp->clen);
} else if (memcmp (rsp->code, SC_SERVER_MORE, 3) != 0) {
g_warning ("[Sea RPC] Bad response: %s %s.\n", rsp->code, rsp->code_msg);
*ret_len = 0;
return NULL;
}
buf = g_string_new_len (rsp->content, rsp->clen);
while (1) {
ccnet_client_send_update (session, req_id,
SC_CLIENT_MORE, SS_CLIENT_MORE,
fcall_str, fcall_len);
if (ccnet_client_read_response (session) < 0) {
*ret_len = 0;
ccnet_client_clean_rpc_request (session, req_id);
g_string_free (buf, TRUE);
return NULL;
}
rsp = &session->response;
if (memcmp (rsp->code, SC_SERVER_RET, 3) == 0) {
g_string_append_len (buf, rsp->content, rsp->clen);
*ret_len = buf->len;
return g_string_free (buf, FALSE);
} else if (memcmp (rsp->code, SC_SERVER_MORE, 3) == 0) {
g_string_append_len (buf, rsp->content, rsp->clen);
} else {
g_warning ("[Sea RPC] Bad response: %s %s.\n",
rsp->code, rsp->code_msg);
*ret_len = 0;
g_string_free (buf, TRUE);
return NULL;
}
}
/* Never reach here. */
return NULL;
}
static CcnetClient *
create_new_client (const char *central_config_dir, const char *conf_dir)
{
CcnetClient *client;
client = ccnet_client_new ();
if (ccnet_client_load_confdir (client, central_config_dir, conf_dir) < 0) {
g_warning ("[Sea RPC] Failed to load conf dir.\n");
g_object_unref (client);
return NULL;
}
if (ccnet_client_connect_daemon (client, CCNET_CLIENT_SYNC) < 0) {
g_warning ("[Sea RPC] Failed to connect ccnet.\n");
g_object_unref (client);
return NULL;
}
return client;
}
char *
ccnetrpc_transport_send (void *arg, const gchar *fcall_str,
size_t fcall_len, size_t *ret_len)
{
CcnetrpcTransportParam *priv;
CcnetClient *session, *new_session;
g_warn_if_fail (arg != NULL && fcall_str != NULL);
priv = (CcnetrpcTransportParam *)arg;
if (priv->session != NULL) {
/* Use single ccnet client as transport. */
return invoke_service (priv->session, priv->peer_id, priv->service,
fcall_str, fcall_len, ret_len);
} else {
/* Use client pool as transport. */
session = ccnet_client_pool_get_client (priv->pool);
if (!session) {
g_warning ("[Sea RPC] Failed to get client from pool.\n");
*ret_len = 0;
return NULL;
}
char *ret = invoke_service (session, priv->peer_id, priv->service,
fcall_str, fcall_len, ret_len);
if (ret != NULL) {
ccnet_client_pool_return_client (priv->pool, session);
return ret;
}
/* If we failed to send data through the ccnet client returned by
* client pool, ccnet may have been restarted.
* In this case, we create a new ccnet client and put it into
* the client pool after use.
*/
g_message ("[Sea RPC] Ccnet disconnected. Connect again.\n");
new_session = create_new_client (session->central_config_dir, session->config_dir);
if (!new_session) {
*ret_len = 0;
return NULL;
}
g_object_unref (session);
ret = invoke_service (new_session, priv->peer_id, priv->service,
fcall_str, fcall_len, ret_len);
if (ret != NULL)
ccnet_client_pool_return_client (priv->pool, new_session);
else
g_object_unref (new_session);
return ret;
}
}
int
ccnetrpc_async_transport_send (void *arg, gchar *fcall_str,
size_t fcall_len, void *rpc_priv)
{
CcnetrpcAsyncTransportParam *priv;
CcnetClient *session;
CcnetProcessor *proc;
g_warn_if_fail (arg != NULL && fcall_str != NULL);
priv = (CcnetrpcAsyncTransportParam *)arg;
session = priv->session;
if (!priv->peer_id)
proc = ccnet_proc_factory_create_master_processor (
session->proc_factory, "async-rpc");
else
proc = ccnet_proc_factory_create_remote_master_processor (
session->proc_factory, "async-rpc", priv->peer_id);
ccnet_async_rpc_proc_set_rpc ((CcnetAsyncRpcProc *)proc, priv->service,
fcall_str, fcall_len, rpc_priv);
ccnet_processor_start (proc, 0, NULL);
return 0;
}