mirror of
https://github.com/haiwen/ccnet-server.git
synced 2025-05-01 12:03:19 +00:00
873 lines
23 KiB
C
873 lines
23 KiB
C
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
|
|
|
|
#include "option.h"
|
|
#include "include.h"
|
|
|
|
|
|
#include <signal.h>
|
|
#include <dirent.h>
|
|
#include <stdio.h>
|
|
|
|
#ifdef WIN32
|
|
#include <inttypes.h>
|
|
#include <winsock2.h>
|
|
#include <ws2tcpip.h>
|
|
#else
|
|
#include <sys/types.h>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <arpa/inet.h>
|
|
#include <sys/un.h>
|
|
#endif
|
|
|
|
#include "message.h"
|
|
|
|
#include "processor.h"
|
|
#include "packet-io.h"
|
|
#include "peer.h"
|
|
#include "ccnet-client.h"
|
|
|
|
#include "proc-factory.h"
|
|
#include "job-mgr.h"
|
|
|
|
#include "ccnet-object.h"
|
|
|
|
/**
|
|
* SECTION:ccnet-client
|
|
* @short_description: The basic class to interact with the ccnet daemon
|
|
* @title: CcnetClient
|
|
* @include: ccnet.h
|
|
*
|
|
* CcnetClient is the basic class to interact with the ccnet daemon.
|
|
* CcnetClient can work in two different mode, i.e., CCNET_CLIENT_SYNC
|
|
* and CCNET_CLIENT_ASYNC (See #CcnetClientMode). The two modes have
|
|
* different sets of APIs.
|
|
*
|
|
*/
|
|
|
|
G_DEFINE_TYPE (CcnetClient, ccnet_client, CCNET_TYPE_SESSION_BASE);
|
|
|
|
#define DEFAULT_NAME "server"
|
|
#define DEFAULT_ID "8e4b13b49ca79f35732d9f44a0804940d985627c"
|
|
|
|
static void handle_packet (ccnet_packet *packet, void *vclient);
|
|
static void ccnet_client_free (GObject *object);
|
|
static void free_rpc_pool (CcnetClient *client);
|
|
|
|
|
|
static void
|
|
set_property (GObject *object, guint property_id,
|
|
const GValue *v, GParamSpec *pspec)
|
|
{
|
|
/* CcnetClient *client = CCNET_CLIENT (object); */
|
|
switch (property_id) {
|
|
default:
|
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
|
|
return;
|
|
}
|
|
}
|
|
|
|
static void
|
|
get_property (GObject *object, guint property_id,
|
|
GValue *v, GParamSpec *pspec)
|
|
{
|
|
/* CcnetClient *client = CCNET_CLIENT (object); */
|
|
switch (property_id) {
|
|
default:
|
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static void
|
|
ccnet_client_class_init (CcnetClientClass *klass)
|
|
{
|
|
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
|
|
|
|
gobject_class->set_property = set_property;
|
|
gobject_class->get_property = get_property;
|
|
gobject_class->finalize = ccnet_client_free;
|
|
|
|
/* ccnet_object_init (); */
|
|
}
|
|
|
|
static void
|
|
ccnet_client_init (CcnetClient *client)
|
|
{
|
|
client->connfd = -1;
|
|
client->req_id = CCNET_USER_ID_START;
|
|
client->processors = g_hash_table_new_full (g_int_hash, g_int_equal,
|
|
g_free, NULL);
|
|
client->proc_factory = ccnet_proc_factory_new (client);
|
|
/* job_mgr should be created by user, so that max_thread can
|
|
* be chosen properly. */
|
|
}
|
|
|
|
CcnetClient*
|
|
ccnet_client_new (void)
|
|
{
|
|
return g_object_new (CCNET_TYPE_CLIENT, NULL);
|
|
}
|
|
|
|
static void
|
|
ccnet_client_free (GObject *object)
|
|
{
|
|
CcnetClient *client = CCNET_CLIENT (object);
|
|
if (client->io)
|
|
ccnet_client_disconnect_daemon (client);
|
|
|
|
if (client->config_dir)
|
|
free (client->config_dir);
|
|
g_free (client->config_file);
|
|
if (client->proc_factory)
|
|
g_object_unref (client->proc_factory);
|
|
if (client->job_mgr)
|
|
ccnet_job_manager_free (client->job_mgr);
|
|
if (client->processors)
|
|
g_hash_table_destroy (client->processors);
|
|
|
|
free_rpc_pool (client);
|
|
|
|
G_OBJECT_CLASS(ccnet_client_parent_class)->finalize (object);
|
|
}
|
|
|
|
int
|
|
ccnet_client_load_confdir (CcnetClient *client, const char *central_config_dir_r, const char *config_dir_r)
|
|
{
|
|
char *config_file = NULL, *config_dir = NULL, *central_config_dir = NULL;
|
|
char *port_str = NULL, *service_url = NULL;
|
|
unsigned char sha1[20];
|
|
GKeyFile *key_file;
|
|
CcnetSessionBase *base = CCNET_SESSION_BASE(client);
|
|
|
|
config_dir = ccnet_util_expand_path (config_dir_r);
|
|
|
|
if (ccnet_util_checkdir(config_dir) < 0) {
|
|
g_warning ("Config dir %s does not exist or is not "
|
|
"a directory.\n", config_dir);
|
|
return -1;
|
|
}
|
|
|
|
if (central_config_dir_r) {
|
|
central_config_dir = ccnet_util_expand_path (central_config_dir_r);
|
|
if (ccnet_util_checkdir(config_dir) < 0) {
|
|
g_warning ("Server config dir %s does not exist or is not "
|
|
"a directory.\n", central_config_dir);
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
config_file =
|
|
g_strconcat(central_config_dir ? central_config_dir : config_dir, "/",
|
|
SESSION_CONFIG_FILENAME, NULL);
|
|
ccnet_debug ("using config file %s\n", config_file);
|
|
key_file = g_key_file_new();
|
|
if (!g_key_file_load_from_file (key_file, config_file,
|
|
G_KEY_FILE_KEEP_COMMENTS, NULL))
|
|
{
|
|
g_warning ("Can't load config file %s.\n", config_file);
|
|
goto onerror;
|
|
}
|
|
|
|
service_url = ccnet_util_key_file_get_string (key_file, "General", "SERVICE_URL");
|
|
port_str = ccnet_util_key_file_get_string (key_file, "Client", "PORT");
|
|
|
|
memcpy (base->id, DEFAULT_ID, 40);
|
|
base->id[40] = '\0';
|
|
if (ccnet_util_hex_to_sha1 (base->id, sha1) < 0) {
|
|
ccnet_error ("Failed to get sha1 of ID.\n");
|
|
g_key_file_free (key_file);
|
|
goto onerror;
|
|
}
|
|
memcpy (base->id_sha1, sha1, 20);
|
|
if (service_url)
|
|
base->service_url = g_strdup(service_url);
|
|
base->name = DEFAULT_NAME;
|
|
|
|
client->config_file = g_strdup(config_file);
|
|
client->config_dir = config_dir;
|
|
client->central_config_dir = central_config_dir;
|
|
|
|
if (port_str)
|
|
client->daemon_port = atoi (port_str);
|
|
|
|
g_free (port_str);
|
|
g_free (config_file);
|
|
g_free (service_url);
|
|
g_key_file_free (key_file);
|
|
return 0;
|
|
|
|
onerror:
|
|
g_free (port_str);
|
|
g_free (config_file);
|
|
g_free (service_url);
|
|
return -1;
|
|
}
|
|
|
|
void
|
|
ccnet_client_run_synchronizer (CcnetClient *client)
|
|
{
|
|
g_return_if_fail(client->mode == CCNET_CLIENT_ASYNC);
|
|
}
|
|
|
|
|
|
int
|
|
ccnet_client_disconnect_daemon (CcnetClient *client)
|
|
{
|
|
ccnet_packet_io_free (client->io);
|
|
client->io = NULL;
|
|
client->connfd = -1;
|
|
client->connected = 0;
|
|
free_rpc_pool (client);
|
|
|
|
return 0;
|
|
}
|
|
|
|
uint32_t
|
|
ccnet_client_get_request_id (CcnetClient *client)
|
|
{
|
|
return (++client->req_id);
|
|
}
|
|
|
|
typedef struct RpcPoolItem {
|
|
uint32_t req_id;
|
|
char *peer_id;
|
|
char *service;
|
|
} RpcPoolItem;
|
|
|
|
static void
|
|
free_rpc_pool_item (RpcPoolItem *item)
|
|
{
|
|
g_free (item->peer_id);
|
|
g_free (item->service);
|
|
g_free (item);
|
|
}
|
|
|
|
static void
|
|
free_rpc_pool (CcnetClient *client)
|
|
{
|
|
GList *ptr;
|
|
for (ptr = client->rpc_pool; ptr; ptr = ptr->next) {
|
|
RpcPoolItem *item = ptr->data;
|
|
free_rpc_pool_item (item);
|
|
}
|
|
g_list_free (client->rpc_pool);
|
|
client->rpc_pool = NULL;
|
|
}
|
|
|
|
static RpcPoolItem *
|
|
get_pool_item (CcnetClient *client, const char *peer_id,
|
|
const char *service)
|
|
{
|
|
GList *ptr;
|
|
for (ptr = client->rpc_pool; ptr; ptr = ptr->next) {
|
|
RpcPoolItem *item = ptr->data;
|
|
if (g_strcmp0(peer_id, item->peer_id) == 0 &&
|
|
g_strcmp0(service, item->service) == 0)
|
|
return item;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static uint32_t
|
|
start_request (CcnetClient *client, const char *peer_id,
|
|
const char *service)
|
|
{
|
|
uint32_t req_id = ccnet_client_get_request_id (client);
|
|
char buf[512];
|
|
|
|
if (!peer_id)
|
|
snprintf (buf, 512, "%s", service);
|
|
else
|
|
snprintf (buf, 512, "remote %s %s", peer_id, service);
|
|
ccnet_client_send_request (client, req_id, buf);
|
|
|
|
if (ccnet_client_read_response (client) < 0) {
|
|
g_warning ("[RPC] failed to read response.\n");
|
|
return 0;
|
|
}
|
|
|
|
if (memcmp (client->response.code, "200", 3) != 0) {
|
|
g_warning ("[RPC] failed to start rpc server: %s %s.\n",
|
|
client->response.code, client->response.code_msg);
|
|
return 0;
|
|
}
|
|
|
|
return req_id;
|
|
}
|
|
|
|
uint32_t
|
|
ccnet_client_get_rpc_request_id (CcnetClient *client, const char *peer_id,
|
|
const char *service)
|
|
{
|
|
RpcPoolItem *item = get_pool_item (client, peer_id, service);
|
|
if (item)
|
|
return item->req_id;
|
|
|
|
uint32_t req_id = start_request (client, peer_id, service);
|
|
if (req_id == 0)
|
|
return 0;
|
|
|
|
item = g_new0 (RpcPoolItem, 1);
|
|
item->req_id = req_id;
|
|
item->peer_id = g_strdup (peer_id);
|
|
item->service = g_strdup (service);
|
|
client->rpc_pool = g_list_prepend (client->rpc_pool, item);
|
|
return req_id;
|
|
}
|
|
|
|
void
|
|
ccnet_client_clean_rpc_request (CcnetClient *client, uint32_t req_id)
|
|
{
|
|
GList *ptr;
|
|
RpcPoolItem *target = NULL;
|
|
|
|
for (ptr = client->rpc_pool; ptr; ptr = ptr->next) {
|
|
RpcPoolItem *item = ptr->data;
|
|
if (req_id == item->req_id)
|
|
target = item;
|
|
}
|
|
if (!target) return;
|
|
|
|
client->rpc_pool = g_list_remove (client->rpc_pool, target);
|
|
free_rpc_pool_item (target);
|
|
}
|
|
|
|
|
|
/* functions used in ASYNC mode */
|
|
void
|
|
ccnet_client_add_processor (CcnetClient *client, CcnetProcessor *processor)
|
|
{
|
|
int *key = g_new0 (int, 1);
|
|
|
|
*key = processor->id;
|
|
g_hash_table_insert (client->processors, key, processor);
|
|
}
|
|
|
|
void
|
|
ccnet_client_remove_processor (CcnetClient *client, CcnetProcessor *processor)
|
|
{
|
|
g_hash_table_remove (client->processors, &processor->id);
|
|
}
|
|
|
|
CcnetProcessor *
|
|
ccnet_client_get_processor (CcnetClient *client, int id)
|
|
{
|
|
return g_hash_table_lookup (client->processors, &id);
|
|
}
|
|
|
|
|
|
int
|
|
ccnet_client_read_input (CcnetClient *client)
|
|
{
|
|
if (!client->io)
|
|
return -1;
|
|
return ccnet_packet_io_read(client->io);
|
|
}
|
|
|
|
static void create_processor (CcnetClient *client, int req_id,
|
|
int argc, char **argv)
|
|
{
|
|
CcnetProcessor *processor;
|
|
CcnetProcFactory *factory = client->proc_factory;
|
|
char *peer_id;
|
|
|
|
if (strcmp(argv[0], "remote") == 0) {
|
|
peer_id = argv[1];
|
|
argc -= 2;
|
|
argv += 2;
|
|
} else
|
|
peer_id = client->base.id;
|
|
|
|
processor = ccnet_proc_factory_create_slave_processor (
|
|
factory, argv[0], peer_id, req_id);
|
|
|
|
if (processor) {
|
|
ccnet_processor_start (processor, argc-1, argv+1);
|
|
} else
|
|
ccnet_client_send_response (client, req_id,
|
|
SC_CREATE_PROC_ERR,
|
|
SS_CREATE_PROC_ERR,
|
|
NULL, 0);
|
|
}
|
|
|
|
#if 0
|
|
void
|
|
ccnet_client_send_event (CcnetClient *client, GObject *event)
|
|
{
|
|
if (!event) return;
|
|
CcnetProcessor *processor = NULL;
|
|
processor = ccnet_proc_factory_create_master_processor
|
|
(client->proc_factory, "send-event");
|
|
ccnet_sendevent_proc_set_event (CCNET_SENDEVENT_PROC(processor),
|
|
(CcnetEvent *)event);
|
|
ccnet_processor_start (processor, 0, NULL);
|
|
}
|
|
#endif
|
|
|
|
static void
|
|
handle_request (CcnetClient *client, int req_id, char *data, int len)
|
|
{
|
|
char *msg;
|
|
gchar **commands;
|
|
gchar **pcmd;
|
|
int i;
|
|
|
|
/* TODO: remove string copy */
|
|
g_return_if_fail (len >= 1);
|
|
msg = g_malloc (len+1);
|
|
memcpy (msg, data, len);
|
|
msg[len] = '\0';
|
|
|
|
commands = g_strsplit_set (msg, " \t", 10);
|
|
for (i=0, pcmd = commands; *pcmd; pcmd++)
|
|
i++;
|
|
g_free (msg);
|
|
|
|
create_processor (client, req_id, i, commands);
|
|
g_strfreev (commands);
|
|
}
|
|
|
|
|
|
static void
|
|
handle_response (CcnetClient *client, int req_id, char *data, int len)
|
|
{
|
|
CcnetProcessor *processor;
|
|
char *code, *code_msg = 0, *content = 0;
|
|
int clen;
|
|
char *ptr, *end;
|
|
|
|
g_return_if_fail (len >= 4);
|
|
|
|
code = data;
|
|
|
|
ptr = data + 3;
|
|
if (*ptr == '\n') {
|
|
/* no code_msg */
|
|
*ptr++ = '\0';
|
|
content = ptr;
|
|
clen = len - (ptr - data);
|
|
goto parsed;
|
|
}
|
|
|
|
if (*ptr != ' ')
|
|
goto error;
|
|
|
|
*ptr++ = '\0';
|
|
code_msg = ptr;
|
|
|
|
end = data + len;
|
|
for (ptr = data; *ptr != '\n' && ptr != end; ptr++) ;
|
|
|
|
if (ptr == end) /* must end with '\n' */
|
|
goto error;
|
|
|
|
/* if (*(ptr-1) == '\r') */
|
|
/* *(ptr-1) = '\0'; */
|
|
*ptr++ = '\0';
|
|
content = ptr;
|
|
clen = len - (ptr - data);
|
|
|
|
parsed:
|
|
/* ccnet_message ("Receive response %s from %s\n", msg, peer->id); */
|
|
/* ccnet_message ("code %s, code_msg %s, content %s\n", code, code_msg, */
|
|
/* content); */
|
|
processor = ccnet_client_get_processor (client, MASTER_ID (req_id));
|
|
if (processor == NULL) {
|
|
if (strcmp (code, SC_PROC_DEAD) != 0) {
|
|
ccnet_debug ("Delayed response from daemon, id is %d, %s %s\n",
|
|
MASTER_ID(req_id), code, code_msg);
|
|
ccnet_client_send_update (client, req_id,
|
|
SC_PROC_DEAD, SS_PROC_DEAD,
|
|
NULL, 0);
|
|
}
|
|
return;
|
|
}
|
|
|
|
/* ccnet_debug ("[client] handle_response %s id is %d, %s %s\n", */
|
|
/* GET_PNAME(processor), req_id, code, code_msg); */
|
|
|
|
ccnet_processor_handle_response (processor, code, code_msg, content, clen);
|
|
return;
|
|
|
|
error:
|
|
g_warning ("Bad response format from daemon\n");
|
|
}
|
|
|
|
static void
|
|
handle_update (CcnetClient *client, int req_id, char *data, int len)
|
|
{
|
|
CcnetProcessor *processor;
|
|
char *code, *code_msg = 0, *content = 0;
|
|
int clen;
|
|
char *ptr, *end;
|
|
|
|
g_return_if_fail (len >= 4);
|
|
|
|
code = data;
|
|
|
|
ptr = data + 3;
|
|
if (*ptr == '\n') {
|
|
/* no code_msg */
|
|
*ptr++ = '\0';
|
|
content = ptr;
|
|
clen = len - (ptr - data);
|
|
goto parsed;
|
|
}
|
|
|
|
if (*ptr != ' ')
|
|
goto error;
|
|
|
|
*ptr++ = '\0';
|
|
code_msg = ptr;
|
|
|
|
end = data + len;
|
|
for (ptr = data; *ptr != '\n' && ptr != end; ptr++) ;
|
|
|
|
if (ptr == end) /* must end with '\n' */
|
|
goto error;
|
|
|
|
/* if (*(ptr-1) == '\r') */
|
|
/* *(ptr-1) = '\0'; */
|
|
*ptr++ = '\0';
|
|
content = ptr;
|
|
clen = len - (ptr - data);
|
|
|
|
parsed:
|
|
processor = ccnet_client_get_processor (client, SLAVE_ID(req_id));
|
|
if (processor == NULL) {
|
|
if (strcmp (code, SC_PROC_DEAD) != 0) {
|
|
ccnet_debug ("Delayed update from daemon, id is %d, %s %s\n",
|
|
req_id, code, code_msg);
|
|
ccnet_client_send_response (client, req_id,
|
|
SC_PROC_DEAD, SS_PROC_DEAD,
|
|
NULL, 0);
|
|
}
|
|
return;
|
|
}
|
|
/* ccnet_debug ("[client] handle_update %s id is %d, %s %s\n", */
|
|
/* GET_PNAME(processor), req_id, code, code_msg); */
|
|
|
|
ccnet_processor_handle_update (processor, code, code_msg, content, clen);
|
|
return;
|
|
|
|
error:
|
|
g_warning ("Bad update format\n");
|
|
}
|
|
|
|
|
|
static void handle_packet (ccnet_packet *packet, void *vclient)
|
|
{
|
|
CcnetClient *client = vclient;
|
|
|
|
if (packet == NULL) {
|
|
/* disconnected from daemon */
|
|
g_warning ("Disconnected from daemon\n");
|
|
return;
|
|
}
|
|
|
|
switch (packet->header.type) {
|
|
case CCNET_MSG_REQUEST:
|
|
handle_request (client, packet->header.id,
|
|
packet->data, packet->header.length);
|
|
break;
|
|
case CCNET_MSG_RESPONSE:
|
|
handle_response (client, packet->header.id,
|
|
packet->data, packet->header.length);
|
|
break;
|
|
case CCNET_MSG_UPDATE:
|
|
handle_update (client, packet->header.id,
|
|
packet->data, packet->header.length);
|
|
break;
|
|
default:
|
|
g_return_if_reached ();
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
ccnet_client_send_request (CcnetClient *client, int req_id, const char *req)
|
|
{
|
|
ccnet_packet_prepare (client->io, CCNET_MSG_REQUEST, req_id);
|
|
ccnet_packet_write_string (client->io, req);
|
|
ccnet_packet_finish_send (client->io);
|
|
|
|
g_debug ("Send a request: id %d, cmd %s\n", req_id, req);
|
|
}
|
|
|
|
/**
|
|
* ccnet_client_send_update:
|
|
* @client:
|
|
* @req_id: request id
|
|
* @code: A string of three numbers. Like "200"
|
|
* @reason: long description for @code, can't contain '\n', can be %NULL
|
|
* @content: A char array, can be %NULL.
|
|
* @clen: length of %content, in bytes.
|
|
*/
|
|
void
|
|
ccnet_client_send_update (CcnetClient *client, int req_id,
|
|
const char *code, const char *reason,
|
|
const char *content, int clen)
|
|
{
|
|
g_return_if_fail (req_id > 0);
|
|
g_return_if_fail (clen < CCNET_PACKET_MAX_PAYLOAD_LEN);
|
|
|
|
ccnet_packet_prepare (client->io, CCNET_MSG_UPDATE, req_id);
|
|
/* code line */
|
|
ccnet_packet_add (client->io, code, 3);
|
|
if (reason) {
|
|
ccnet_packet_add (client->io, " ", 1);
|
|
ccnet_packet_write_string (client->io, reason);
|
|
}
|
|
ccnet_packet_add (client->io, "\n", 1);
|
|
if (content)
|
|
ccnet_packet_add (client->io, content, clen);
|
|
|
|
ccnet_packet_finish_send (client->io);
|
|
|
|
/* g_debug ("[client] Send an update: id %d: %s %s len=%d\n", */
|
|
/* req_id, code, reason, clen); */
|
|
}
|
|
|
|
|
|
void
|
|
ccnet_client_send_response (CcnetClient *client, int req_id,
|
|
const char *code, const char *reason,
|
|
const char *content, int clen)
|
|
{
|
|
g_return_if_fail (clen < CCNET_PACKET_MAX_PAYLOAD_LEN);
|
|
|
|
ccnet_packet_prepare (client->io, CCNET_MSG_RESPONSE, req_id);
|
|
/* code line */
|
|
ccnet_packet_add (client->io, code, 3);
|
|
if (reason) {
|
|
ccnet_packet_add (client->io, " ", 1);
|
|
ccnet_packet_write_string (client->io, reason);
|
|
}
|
|
ccnet_packet_add (client->io, "\n", 1);
|
|
if (content)
|
|
ccnet_packet_add (client->io, content, clen);
|
|
|
|
ccnet_packet_finish_send (client->io);
|
|
|
|
/* g_debug ("[client] Send an response: id %d: %s %s len=%d\n", */
|
|
/* req_id, code, reason, clen); */
|
|
}
|
|
|
|
|
|
/* functions used in SYNC mode */
|
|
|
|
/**
|
|
* ccnet_client_read_response:
|
|
* @client:
|
|
*
|
|
* Read response from the daemon. The response can be accessed by
|
|
* client->response.
|
|
*
|
|
* Returns: -1 if io error, -2 if response packet format error
|
|
*/
|
|
int
|
|
ccnet_client_read_response (CcnetClient *client)
|
|
{
|
|
ccnet_packet *packet;
|
|
char *data;
|
|
int len, clen;
|
|
char *code, *code_msg = 0, *content = 0;
|
|
char *ptr, *end;
|
|
|
|
restart:
|
|
if ( (packet = ccnet_packet_io_read_packet (client->io)) == NULL)
|
|
return -1;
|
|
|
|
if (packet->header.type != CCNET_MSG_RESPONSE)
|
|
goto error;
|
|
|
|
data = packet->data;
|
|
len = packet->header.length;
|
|
|
|
g_return_val_if_fail (len >= 4, -1);
|
|
|
|
code = data;
|
|
|
|
ptr = data + 3;
|
|
if (*ptr == '\n') {
|
|
/* no code_msg */
|
|
*ptr++ = '\0';
|
|
content = ptr;
|
|
clen = len - (ptr - data);
|
|
goto parsed;
|
|
}
|
|
|
|
if (*ptr != ' ')
|
|
goto error;
|
|
|
|
*ptr++ = '\0';
|
|
code_msg = ptr;
|
|
|
|
end = data + len;
|
|
for (ptr = data; *ptr != '\n' && ptr != end; ptr++) ;
|
|
|
|
if (ptr == end) /* must end with '\n' */
|
|
goto error;
|
|
|
|
/* if (*(ptr-1) == '\r') */
|
|
/* *(ptr-1) = '\0'; */
|
|
*ptr++ = '\0';
|
|
content = ptr;
|
|
clen = len - (ptr - data);
|
|
|
|
/* In synchronized mode, we only have one processor at a
|
|
time. The processor id is client->req_id, other
|
|
processors are all treat as dead. */
|
|
|
|
/*
|
|
if (packet->header.id != client->req_id) {
|
|
if (strcmp (code, SC_PROC_DEAD) != 0) {
|
|
g_debug ("Read response error: want %d, get %d %s %s\n",
|
|
client->req_id, packet->header.id, code, code_msg);
|
|
ccnet_client_send_update(client, packet->header.id,
|
|
SC_PROC_DEAD, SS_PROC_DEAD, NULL, 0);
|
|
}
|
|
goto restart;
|
|
}
|
|
*/
|
|
|
|
/* handle processor keep alive response */
|
|
if (strncmp(code, SC_PROC_KEEPALIVE, 3) == 0) {
|
|
ccnet_client_send_update(client, packet->header.id,
|
|
SC_PROC_ALIVE, SS_PROC_ALIVE, NULL, 0);
|
|
goto restart;
|
|
}
|
|
|
|
|
|
parsed:
|
|
client->response.code = code;
|
|
client->response.code_msg = code_msg;
|
|
client->response.content = content;
|
|
client->response.clen = clen;
|
|
return 0;
|
|
|
|
error:
|
|
g_warning ("Bad response format from daemon\n");
|
|
return -2;
|
|
}
|
|
|
|
static int read_response_common(CcnetClient *client)
|
|
{
|
|
if (ccnet_client_read_response(client) < 0)
|
|
return -1;
|
|
if (client->response.code[0] == '4' ||
|
|
client->response.code[0] == '5') {
|
|
g_warning ("Error response from daemon: %s %s\n",
|
|
client->response.code, client->response.code_msg);
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static gboolean
|
|
check_response_error (CcnetClient *client, GError **error)
|
|
{
|
|
if (client->response.code[0] == '4' ||
|
|
client->response.code[0] == '5') {
|
|
g_set_error (error, CCNET_DOMAIN, atoi(client->response.code),
|
|
"%s", client->response.code_msg);
|
|
return TRUE;
|
|
}
|
|
return FALSE;
|
|
}
|
|
|
|
|
|
const char *
|
|
ccnet_client_send_cmd (CcnetClient *client, const char *cmd, GError **error)
|
|
{
|
|
int req_id = ccnet_client_get_request_id (client);
|
|
ccnet_client_send_request (client, req_id, "receive-cmd");
|
|
|
|
if (ccnet_client_read_response(client) < 0) {
|
|
g_set_error (error, CCNET_DOMAIN, EC_NETWORK_ERR, "%s", ES_NETWORK_ERR);
|
|
goto on_error;
|
|
}
|
|
|
|
ccnet_client_send_update (client, req_id,
|
|
"200", NULL, cmd, strlen(cmd) + 1);
|
|
if (ccnet_client_read_response(client) < 0) {
|
|
g_set_error (error, CCNET_DOMAIN, EC_NETWORK_ERR, "%s", ES_NETWORK_ERR);
|
|
goto on_error;
|
|
}
|
|
|
|
if (check_response_error(client, error)) {
|
|
goto on_error;
|
|
}
|
|
|
|
ccnet_client_send_update (client, req_id,
|
|
SC_PROC_DONE, SS_PROC_DONE,
|
|
NULL, 0);
|
|
return client->response.content;
|
|
|
|
on_error:
|
|
ccnet_client_send_update (client, req_id,
|
|
SC_PROC_DONE, SS_PROC_DONE,
|
|
NULL, 0);
|
|
return NULL;
|
|
}
|
|
|
|
|
|
#define SC_MSG "300"
|
|
|
|
int
|
|
ccnet_client_send_message (CcnetClient *client,
|
|
CcnetMessage *message)
|
|
{
|
|
GString *buf;
|
|
int req_id = ccnet_client_get_request_id (client);
|
|
|
|
ccnet_client_send_request (client, req_id, "mq-server");
|
|
|
|
if (ccnet_client_read_response (client) < 0)
|
|
return -1; /* TODO: handle response code */
|
|
|
|
buf = g_string_new (NULL);
|
|
ccnet_message_to_string_buf (message, buf);
|
|
ccnet_client_send_update (client, req_id,
|
|
SC_MSG, NULL, buf->str, buf->len+1);
|
|
if (ccnet_client_read_response (client) < 0)
|
|
return -1;
|
|
|
|
g_string_free (buf, TRUE);
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
ccnet_client_prepare_recv_message (CcnetClient *client,
|
|
const char *app)
|
|
{
|
|
int req_id = ccnet_client_get_request_id (client);
|
|
char buf[256];
|
|
|
|
snprintf (buf, sizeof(buf), "mq-server %s", app);
|
|
ccnet_client_send_request (client, req_id, buf);
|
|
|
|
if (read_response_common (client) < 0)
|
|
return -1;
|
|
|
|
if (memcmp(client->response.code, "200", 3) != 0)
|
|
return -1;
|
|
|
|
return 0;
|
|
}
|
|
|
|
CcnetMessage *
|
|
ccnet_client_receive_message (CcnetClient *client)
|
|
{
|
|
CcnetMessage *message;
|
|
|
|
if (read_response_common (client) < 0)
|
|
return NULL;
|
|
|
|
message = ccnet_message_from_string (client->response.content,
|
|
client->response.clen);
|
|
return message;
|
|
}
|