1
0
mirror of https://github.com/haiwen/ccnet-server.git synced 2025-05-01 12:03:19 +00:00
ccnet-server/lib/ccnet-client.c
2019-10-21 23:10:40 -07:00

873 lines
23 KiB
C

/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include "option.h"
#include "include.h"
#include <signal.h>
#include <dirent.h>
#include <stdio.h>
#ifdef WIN32
#include <inttypes.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/un.h>
#endif
#include "message.h"
#include "processor.h"
#include "packet-io.h"
#include "peer.h"
#include "ccnet-client.h"
#include "proc-factory.h"
#include "job-mgr.h"
#include "ccnet-object.h"
/**
* SECTION:ccnet-client
* @short_description: The basic class to interact with the ccnet daemon
* @title: CcnetClient
* @include: ccnet.h
*
* CcnetClient is the basic class to interact with the ccnet daemon.
* CcnetClient can work in two different mode, i.e., CCNET_CLIENT_SYNC
* and CCNET_CLIENT_ASYNC (See #CcnetClientMode). The two modes have
* different sets of APIs.
*
*/
G_DEFINE_TYPE (CcnetClient, ccnet_client, CCNET_TYPE_SESSION_BASE);
#define DEFAULT_NAME "server"
#define DEFAULT_ID "8e4b13b49ca79f35732d9f44a0804940d985627c"
static void handle_packet (ccnet_packet *packet, void *vclient);
static void ccnet_client_free (GObject *object);
static void free_rpc_pool (CcnetClient *client);
static void
set_property (GObject *object, guint property_id,
const GValue *v, GParamSpec *pspec)
{
/* CcnetClient *client = CCNET_CLIENT (object); */
switch (property_id) {
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
return;
}
}
static void
get_property (GObject *object, guint property_id,
GValue *v, GParamSpec *pspec)
{
/* CcnetClient *client = CCNET_CLIENT (object); */
switch (property_id) {
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void
ccnet_client_class_init (CcnetClientClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
gobject_class->set_property = set_property;
gobject_class->get_property = get_property;
gobject_class->finalize = ccnet_client_free;
/* ccnet_object_init (); */
}
static void
ccnet_client_init (CcnetClient *client)
{
client->connfd = -1;
client->req_id = CCNET_USER_ID_START;
client->processors = g_hash_table_new_full (g_int_hash, g_int_equal,
g_free, NULL);
client->proc_factory = ccnet_proc_factory_new (client);
/* job_mgr should be created by user, so that max_thread can
* be chosen properly. */
}
CcnetClient*
ccnet_client_new (void)
{
return g_object_new (CCNET_TYPE_CLIENT, NULL);
}
static void
ccnet_client_free (GObject *object)
{
CcnetClient *client = CCNET_CLIENT (object);
if (client->io)
ccnet_client_disconnect_daemon (client);
if (client->config_dir)
free (client->config_dir);
g_free (client->config_file);
if (client->proc_factory)
g_object_unref (client->proc_factory);
if (client->job_mgr)
ccnet_job_manager_free (client->job_mgr);
if (client->processors)
g_hash_table_destroy (client->processors);
free_rpc_pool (client);
G_OBJECT_CLASS(ccnet_client_parent_class)->finalize (object);
}
int
ccnet_client_load_confdir (CcnetClient *client, const char *central_config_dir_r, const char *config_dir_r)
{
char *config_file = NULL, *config_dir = NULL, *central_config_dir = NULL;
char *port_str = NULL, *service_url = NULL;
unsigned char sha1[20];
GKeyFile *key_file;
CcnetSessionBase *base = CCNET_SESSION_BASE(client);
config_dir = ccnet_util_expand_path (config_dir_r);
if (ccnet_util_checkdir(config_dir) < 0) {
g_warning ("Config dir %s does not exist or is not "
"a directory.\n", config_dir);
return -1;
}
if (central_config_dir_r) {
central_config_dir = ccnet_util_expand_path (central_config_dir_r);
if (ccnet_util_checkdir(config_dir) < 0) {
g_warning ("Server config dir %s does not exist or is not "
"a directory.\n", central_config_dir);
return -1;
}
}
config_file =
g_strconcat(central_config_dir ? central_config_dir : config_dir, "/",
SESSION_CONFIG_FILENAME, NULL);
ccnet_debug ("using config file %s\n", config_file);
key_file = g_key_file_new();
if (!g_key_file_load_from_file (key_file, config_file,
G_KEY_FILE_KEEP_COMMENTS, NULL))
{
g_warning ("Can't load config file %s.\n", config_file);
goto onerror;
}
service_url = ccnet_util_key_file_get_string (key_file, "General", "SERVICE_URL");
port_str = ccnet_util_key_file_get_string (key_file, "Client", "PORT");
memcpy (base->id, DEFAULT_ID, 40);
base->id[40] = '\0';
if (ccnet_util_hex_to_sha1 (base->id, sha1) < 0) {
ccnet_error ("Failed to get sha1 of ID.\n");
g_key_file_free (key_file);
goto onerror;
}
memcpy (base->id_sha1, sha1, 20);
if (service_url)
base->service_url = g_strdup(service_url);
base->name = DEFAULT_NAME;
client->config_file = g_strdup(config_file);
client->config_dir = config_dir;
client->central_config_dir = central_config_dir;
if (port_str)
client->daemon_port = atoi (port_str);
g_free (port_str);
g_free (config_file);
g_free (service_url);
g_key_file_free (key_file);
return 0;
onerror:
g_free (port_str);
g_free (config_file);
g_free (service_url);
return -1;
}
void
ccnet_client_run_synchronizer (CcnetClient *client)
{
g_return_if_fail(client->mode == CCNET_CLIENT_ASYNC);
}
int
ccnet_client_disconnect_daemon (CcnetClient *client)
{
ccnet_packet_io_free (client->io);
client->io = NULL;
client->connfd = -1;
client->connected = 0;
free_rpc_pool (client);
return 0;
}
uint32_t
ccnet_client_get_request_id (CcnetClient *client)
{
return (++client->req_id);
}
typedef struct RpcPoolItem {
uint32_t req_id;
char *peer_id;
char *service;
} RpcPoolItem;
static void
free_rpc_pool_item (RpcPoolItem *item)
{
g_free (item->peer_id);
g_free (item->service);
g_free (item);
}
static void
free_rpc_pool (CcnetClient *client)
{
GList *ptr;
for (ptr = client->rpc_pool; ptr; ptr = ptr->next) {
RpcPoolItem *item = ptr->data;
free_rpc_pool_item (item);
}
g_list_free (client->rpc_pool);
client->rpc_pool = NULL;
}
static RpcPoolItem *
get_pool_item (CcnetClient *client, const char *peer_id,
const char *service)
{
GList *ptr;
for (ptr = client->rpc_pool; ptr; ptr = ptr->next) {
RpcPoolItem *item = ptr->data;
if (g_strcmp0(peer_id, item->peer_id) == 0 &&
g_strcmp0(service, item->service) == 0)
return item;
}
return NULL;
}
static uint32_t
start_request (CcnetClient *client, const char *peer_id,
const char *service)
{
uint32_t req_id = ccnet_client_get_request_id (client);
char buf[512];
if (!peer_id)
snprintf (buf, 512, "%s", service);
else
snprintf (buf, 512, "remote %s %s", peer_id, service);
ccnet_client_send_request (client, req_id, buf);
if (ccnet_client_read_response (client) < 0) {
g_warning ("[RPC] failed to read response.\n");
return 0;
}
if (memcmp (client->response.code, "200", 3) != 0) {
g_warning ("[RPC] failed to start rpc server: %s %s.\n",
client->response.code, client->response.code_msg);
return 0;
}
return req_id;
}
uint32_t
ccnet_client_get_rpc_request_id (CcnetClient *client, const char *peer_id,
const char *service)
{
RpcPoolItem *item = get_pool_item (client, peer_id, service);
if (item)
return item->req_id;
uint32_t req_id = start_request (client, peer_id, service);
if (req_id == 0)
return 0;
item = g_new0 (RpcPoolItem, 1);
item->req_id = req_id;
item->peer_id = g_strdup (peer_id);
item->service = g_strdup (service);
client->rpc_pool = g_list_prepend (client->rpc_pool, item);
return req_id;
}
void
ccnet_client_clean_rpc_request (CcnetClient *client, uint32_t req_id)
{
GList *ptr;
RpcPoolItem *target = NULL;
for (ptr = client->rpc_pool; ptr; ptr = ptr->next) {
RpcPoolItem *item = ptr->data;
if (req_id == item->req_id)
target = item;
}
if (!target) return;
client->rpc_pool = g_list_remove (client->rpc_pool, target);
free_rpc_pool_item (target);
}
/* functions used in ASYNC mode */
void
ccnet_client_add_processor (CcnetClient *client, CcnetProcessor *processor)
{
int *key = g_new0 (int, 1);
*key = processor->id;
g_hash_table_insert (client->processors, key, processor);
}
void
ccnet_client_remove_processor (CcnetClient *client, CcnetProcessor *processor)
{
g_hash_table_remove (client->processors, &processor->id);
}
CcnetProcessor *
ccnet_client_get_processor (CcnetClient *client, int id)
{
return g_hash_table_lookup (client->processors, &id);
}
int
ccnet_client_read_input (CcnetClient *client)
{
if (!client->io)
return -1;
return ccnet_packet_io_read(client->io);
}
static void create_processor (CcnetClient *client, int req_id,
int argc, char **argv)
{
CcnetProcessor *processor;
CcnetProcFactory *factory = client->proc_factory;
char *peer_id;
if (strcmp(argv[0], "remote") == 0) {
peer_id = argv[1];
argc -= 2;
argv += 2;
} else
peer_id = client->base.id;
processor = ccnet_proc_factory_create_slave_processor (
factory, argv[0], peer_id, req_id);
if (processor) {
ccnet_processor_start (processor, argc-1, argv+1);
} else
ccnet_client_send_response (client, req_id,
SC_CREATE_PROC_ERR,
SS_CREATE_PROC_ERR,
NULL, 0);
}
#if 0
void
ccnet_client_send_event (CcnetClient *client, GObject *event)
{
if (!event) return;
CcnetProcessor *processor = NULL;
processor = ccnet_proc_factory_create_master_processor
(client->proc_factory, "send-event");
ccnet_sendevent_proc_set_event (CCNET_SENDEVENT_PROC(processor),
(CcnetEvent *)event);
ccnet_processor_start (processor, 0, NULL);
}
#endif
static void
handle_request (CcnetClient *client, int req_id, char *data, int len)
{
char *msg;
gchar **commands;
gchar **pcmd;
int i;
/* TODO: remove string copy */
g_return_if_fail (len >= 1);
msg = g_malloc (len+1);
memcpy (msg, data, len);
msg[len] = '\0';
commands = g_strsplit_set (msg, " \t", 10);
for (i=0, pcmd = commands; *pcmd; pcmd++)
i++;
g_free (msg);
create_processor (client, req_id, i, commands);
g_strfreev (commands);
}
static void
handle_response (CcnetClient *client, int req_id, char *data, int len)
{
CcnetProcessor *processor;
char *code, *code_msg = 0, *content = 0;
int clen;
char *ptr, *end;
g_return_if_fail (len >= 4);
code = data;
ptr = data + 3;
if (*ptr == '\n') {
/* no code_msg */
*ptr++ = '\0';
content = ptr;
clen = len - (ptr - data);
goto parsed;
}
if (*ptr != ' ')
goto error;
*ptr++ = '\0';
code_msg = ptr;
end = data + len;
for (ptr = data; *ptr != '\n' && ptr != end; ptr++) ;
if (ptr == end) /* must end with '\n' */
goto error;
/* if (*(ptr-1) == '\r') */
/* *(ptr-1) = '\0'; */
*ptr++ = '\0';
content = ptr;
clen = len - (ptr - data);
parsed:
/* ccnet_message ("Receive response %s from %s\n", msg, peer->id); */
/* ccnet_message ("code %s, code_msg %s, content %s\n", code, code_msg, */
/* content); */
processor = ccnet_client_get_processor (client, MASTER_ID (req_id));
if (processor == NULL) {
if (strcmp (code, SC_PROC_DEAD) != 0) {
ccnet_debug ("Delayed response from daemon, id is %d, %s %s\n",
MASTER_ID(req_id), code, code_msg);
ccnet_client_send_update (client, req_id,
SC_PROC_DEAD, SS_PROC_DEAD,
NULL, 0);
}
return;
}
/* ccnet_debug ("[client] handle_response %s id is %d, %s %s\n", */
/* GET_PNAME(processor), req_id, code, code_msg); */
ccnet_processor_handle_response (processor, code, code_msg, content, clen);
return;
error:
g_warning ("Bad response format from daemon\n");
}
static void
handle_update (CcnetClient *client, int req_id, char *data, int len)
{
CcnetProcessor *processor;
char *code, *code_msg = 0, *content = 0;
int clen;
char *ptr, *end;
g_return_if_fail (len >= 4);
code = data;
ptr = data + 3;
if (*ptr == '\n') {
/* no code_msg */
*ptr++ = '\0';
content = ptr;
clen = len - (ptr - data);
goto parsed;
}
if (*ptr != ' ')
goto error;
*ptr++ = '\0';
code_msg = ptr;
end = data + len;
for (ptr = data; *ptr != '\n' && ptr != end; ptr++) ;
if (ptr == end) /* must end with '\n' */
goto error;
/* if (*(ptr-1) == '\r') */
/* *(ptr-1) = '\0'; */
*ptr++ = '\0';
content = ptr;
clen = len - (ptr - data);
parsed:
processor = ccnet_client_get_processor (client, SLAVE_ID(req_id));
if (processor == NULL) {
if (strcmp (code, SC_PROC_DEAD) != 0) {
ccnet_debug ("Delayed update from daemon, id is %d, %s %s\n",
req_id, code, code_msg);
ccnet_client_send_response (client, req_id,
SC_PROC_DEAD, SS_PROC_DEAD,
NULL, 0);
}
return;
}
/* ccnet_debug ("[client] handle_update %s id is %d, %s %s\n", */
/* GET_PNAME(processor), req_id, code, code_msg); */
ccnet_processor_handle_update (processor, code, code_msg, content, clen);
return;
error:
g_warning ("Bad update format\n");
}
static void handle_packet (ccnet_packet *packet, void *vclient)
{
CcnetClient *client = vclient;
if (packet == NULL) {
/* disconnected from daemon */
g_warning ("Disconnected from daemon\n");
return;
}
switch (packet->header.type) {
case CCNET_MSG_REQUEST:
handle_request (client, packet->header.id,
packet->data, packet->header.length);
break;
case CCNET_MSG_RESPONSE:
handle_response (client, packet->header.id,
packet->data, packet->header.length);
break;
case CCNET_MSG_UPDATE:
handle_update (client, packet->header.id,
packet->data, packet->header.length);
break;
default:
g_return_if_reached ();
}
}
void
ccnet_client_send_request (CcnetClient *client, int req_id, const char *req)
{
ccnet_packet_prepare (client->io, CCNET_MSG_REQUEST, req_id);
ccnet_packet_write_string (client->io, req);
ccnet_packet_finish_send (client->io);
g_debug ("Send a request: id %d, cmd %s\n", req_id, req);
}
/**
* ccnet_client_send_update:
* @client:
* @req_id: request id
* @code: A string of three numbers. Like "200"
* @reason: long description for @code, can't contain '\n', can be %NULL
* @content: A char array, can be %NULL.
* @clen: length of %content, in bytes.
*/
void
ccnet_client_send_update (CcnetClient *client, int req_id,
const char *code, const char *reason,
const char *content, int clen)
{
g_return_if_fail (req_id > 0);
g_return_if_fail (clen < CCNET_PACKET_MAX_PAYLOAD_LEN);
ccnet_packet_prepare (client->io, CCNET_MSG_UPDATE, req_id);
/* code line */
ccnet_packet_add (client->io, code, 3);
if (reason) {
ccnet_packet_add (client->io, " ", 1);
ccnet_packet_write_string (client->io, reason);
}
ccnet_packet_add (client->io, "\n", 1);
if (content)
ccnet_packet_add (client->io, content, clen);
ccnet_packet_finish_send (client->io);
/* g_debug ("[client] Send an update: id %d: %s %s len=%d\n", */
/* req_id, code, reason, clen); */
}
void
ccnet_client_send_response (CcnetClient *client, int req_id,
const char *code, const char *reason,
const char *content, int clen)
{
g_return_if_fail (clen < CCNET_PACKET_MAX_PAYLOAD_LEN);
ccnet_packet_prepare (client->io, CCNET_MSG_RESPONSE, req_id);
/* code line */
ccnet_packet_add (client->io, code, 3);
if (reason) {
ccnet_packet_add (client->io, " ", 1);
ccnet_packet_write_string (client->io, reason);
}
ccnet_packet_add (client->io, "\n", 1);
if (content)
ccnet_packet_add (client->io, content, clen);
ccnet_packet_finish_send (client->io);
/* g_debug ("[client] Send an response: id %d: %s %s len=%d\n", */
/* req_id, code, reason, clen); */
}
/* functions used in SYNC mode */
/**
* ccnet_client_read_response:
* @client:
*
* Read response from the daemon. The response can be accessed by
* client->response.
*
* Returns: -1 if io error, -2 if response packet format error
*/
int
ccnet_client_read_response (CcnetClient *client)
{
ccnet_packet *packet;
char *data;
int len, clen;
char *code, *code_msg = 0, *content = 0;
char *ptr, *end;
restart:
if ( (packet = ccnet_packet_io_read_packet (client->io)) == NULL)
return -1;
if (packet->header.type != CCNET_MSG_RESPONSE)
goto error;
data = packet->data;
len = packet->header.length;
g_return_val_if_fail (len >= 4, -1);
code = data;
ptr = data + 3;
if (*ptr == '\n') {
/* no code_msg */
*ptr++ = '\0';
content = ptr;
clen = len - (ptr - data);
goto parsed;
}
if (*ptr != ' ')
goto error;
*ptr++ = '\0';
code_msg = ptr;
end = data + len;
for (ptr = data; *ptr != '\n' && ptr != end; ptr++) ;
if (ptr == end) /* must end with '\n' */
goto error;
/* if (*(ptr-1) == '\r') */
/* *(ptr-1) = '\0'; */
*ptr++ = '\0';
content = ptr;
clen = len - (ptr - data);
/* In synchronized mode, we only have one processor at a
time. The processor id is client->req_id, other
processors are all treat as dead. */
/*
if (packet->header.id != client->req_id) {
if (strcmp (code, SC_PROC_DEAD) != 0) {
g_debug ("Read response error: want %d, get %d %s %s\n",
client->req_id, packet->header.id, code, code_msg);
ccnet_client_send_update(client, packet->header.id,
SC_PROC_DEAD, SS_PROC_DEAD, NULL, 0);
}
goto restart;
}
*/
/* handle processor keep alive response */
if (strncmp(code, SC_PROC_KEEPALIVE, 3) == 0) {
ccnet_client_send_update(client, packet->header.id,
SC_PROC_ALIVE, SS_PROC_ALIVE, NULL, 0);
goto restart;
}
parsed:
client->response.code = code;
client->response.code_msg = code_msg;
client->response.content = content;
client->response.clen = clen;
return 0;
error:
g_warning ("Bad response format from daemon\n");
return -2;
}
static int read_response_common(CcnetClient *client)
{
if (ccnet_client_read_response(client) < 0)
return -1;
if (client->response.code[0] == '4' ||
client->response.code[0] == '5') {
g_warning ("Error response from daemon: %s %s\n",
client->response.code, client->response.code_msg);
return -1;
}
return 0;
}
static gboolean
check_response_error (CcnetClient *client, GError **error)
{
if (client->response.code[0] == '4' ||
client->response.code[0] == '5') {
g_set_error (error, CCNET_DOMAIN, atoi(client->response.code),
"%s", client->response.code_msg);
return TRUE;
}
return FALSE;
}
const char *
ccnet_client_send_cmd (CcnetClient *client, const char *cmd, GError **error)
{
int req_id = ccnet_client_get_request_id (client);
ccnet_client_send_request (client, req_id, "receive-cmd");
if (ccnet_client_read_response(client) < 0) {
g_set_error (error, CCNET_DOMAIN, EC_NETWORK_ERR, "%s", ES_NETWORK_ERR);
goto on_error;
}
ccnet_client_send_update (client, req_id,
"200", NULL, cmd, strlen(cmd) + 1);
if (ccnet_client_read_response(client) < 0) {
g_set_error (error, CCNET_DOMAIN, EC_NETWORK_ERR, "%s", ES_NETWORK_ERR);
goto on_error;
}
if (check_response_error(client, error)) {
goto on_error;
}
ccnet_client_send_update (client, req_id,
SC_PROC_DONE, SS_PROC_DONE,
NULL, 0);
return client->response.content;
on_error:
ccnet_client_send_update (client, req_id,
SC_PROC_DONE, SS_PROC_DONE,
NULL, 0);
return NULL;
}
#define SC_MSG "300"
int
ccnet_client_send_message (CcnetClient *client,
CcnetMessage *message)
{
GString *buf;
int req_id = ccnet_client_get_request_id (client);
ccnet_client_send_request (client, req_id, "mq-server");
if (ccnet_client_read_response (client) < 0)
return -1; /* TODO: handle response code */
buf = g_string_new (NULL);
ccnet_message_to_string_buf (message, buf);
ccnet_client_send_update (client, req_id,
SC_MSG, NULL, buf->str, buf->len+1);
if (ccnet_client_read_response (client) < 0)
return -1;
g_string_free (buf, TRUE);
return 0;
}
int
ccnet_client_prepare_recv_message (CcnetClient *client,
const char *app)
{
int req_id = ccnet_client_get_request_id (client);
char buf[256];
snprintf (buf, sizeof(buf), "mq-server %s", app);
ccnet_client_send_request (client, req_id, buf);
if (read_response_common (client) < 0)
return -1;
if (memcmp(client->response.code, "200", 3) != 0)
return -1;
return 0;
}
CcnetMessage *
ccnet_client_receive_message (CcnetClient *client)
{
CcnetMessage *message;
if (read_response_common (client) < 0)
return NULL;
message = ccnet_message_from_string (client->response.content,
client->response.clen);
return message;
}