mirror of
https://github.com/haiwen/ccnet-server.git
synced 2025-04-28 02:30:08 +00:00
1200 lines
32 KiB
C
1200 lines
32 KiB
C
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
|
|
|
|
#include "common.h"
|
|
|
|
#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
|
|
#include <event2/event.h>
|
|
#include <event2/bufferevent.h>
|
|
#include <event2/bufferevent_compat.h>
|
|
#include <event2/bufferevent_struct.h>
|
|
#else
|
|
#include <event.h>
|
|
#endif
|
|
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
|
|
#ifdef WIN32
|
|
#include <winsock2.h>
|
|
#else
|
|
#include <netinet/in.h>
|
|
#include <arpa/inet.h>
|
|
#endif
|
|
#include "net.h"
|
|
|
|
#include <ctype.h>
|
|
|
|
#include "timer.h"
|
|
|
|
#include "peer.h"
|
|
#include "packet-io.h"
|
|
|
|
#include "rsa.h"
|
|
|
|
#include "session.h"
|
|
#include "peer-mgr.h"
|
|
#include "perm-mgr.h"
|
|
#include "processor.h"
|
|
#include "proc-factory.h"
|
|
#include "processors/service-proxy-proc.h"
|
|
#include "connect-mgr.h"
|
|
|
|
#include "utils.h"
|
|
|
|
#define DEBUG_FLAG CCNET_DEBUG_PEER
|
|
#include "log.h"
|
|
|
|
|
|
enum {
|
|
DOWN_SIG, /* connection down */
|
|
AUTH_DONE_SIG, /* peer become reachable and the auth state
|
|
is AUTH_FULL, see keepalive-proc */
|
|
|
|
AUTH_UPDATED_SIG,
|
|
|
|
LAST_SIGNAL
|
|
};
|
|
|
|
static guint signals[LAST_SIGNAL] = { 0 };
|
|
|
|
#define OBJECT_TYPE_STRING "peer"
|
|
|
|
#include "../lib/peer-common.h"
|
|
|
|
void ccnet_peer_set_net_state (CcnetPeer *peer, int net_state);
|
|
static void ccnet_peer_finalize (GObject *object);
|
|
|
|
static void shutdown_processors (CcnetPeer *peer);
|
|
|
|
static void
|
|
set_property (GObject *object, guint property_id,
|
|
const GValue *v, GParamSpec *pspec)
|
|
{
|
|
set_property_common (object, property_id, v, pspec);
|
|
}
|
|
|
|
void
|
|
ccnet_peer_finalize (GObject *object)
|
|
{
|
|
CcnetPeer *peer = CCNET_PEER (object);
|
|
|
|
g_free (peer->name);
|
|
g_free (peer->addr_str);
|
|
g_free (peer->service_url);
|
|
g_hash_table_unref (peer->processors);
|
|
g_free (peer->session_key);
|
|
evbuffer_free (peer->packet);
|
|
|
|
if (peer->pubkey)
|
|
RSA_free (peer->pubkey);
|
|
|
|
G_OBJECT_CLASS(ccnet_peer_parent_class)->finalize (object);
|
|
}
|
|
|
|
void
|
|
ccnet_peer_free (CcnetPeer *peer)
|
|
{
|
|
ccnet_peer_shutdown (peer);
|
|
g_object_unref (peer);
|
|
}
|
|
|
|
static void
|
|
ccnet_peer_class_init (CcnetPeerClass *klass)
|
|
{
|
|
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
|
|
|
|
gobject_class->finalize = ccnet_peer_finalize;
|
|
gobject_class->get_property = get_property;
|
|
gobject_class->set_property = set_property;
|
|
|
|
define_properties (gobject_class);
|
|
|
|
signals[AUTH_DONE_SIG] =
|
|
g_signal_new ("auth-done", CCNET_TYPE_PEER,
|
|
G_SIGNAL_RUN_LAST,
|
|
0, /* no class singal handler */
|
|
NULL, NULL, /* no accumulator */
|
|
g_cclosure_marshal_VOID__VOID,
|
|
G_TYPE_NONE, 0);
|
|
|
|
signals[DOWN_SIG] =
|
|
g_signal_new ("down", CCNET_TYPE_PEER,
|
|
G_SIGNAL_RUN_LAST,
|
|
0, /* no class singal handler */
|
|
NULL, NULL, /* no accumulator */
|
|
g_cclosure_marshal_VOID__VOID,
|
|
G_TYPE_NONE, 0);
|
|
|
|
signals[AUTH_UPDATED_SIG] =
|
|
g_signal_new ("auth-updated", CCNET_TYPE_PEER,
|
|
G_SIGNAL_RUN_LAST,
|
|
0, /* no class singal handler */
|
|
NULL, NULL, /* no accumulator */
|
|
g_cclosure_marshal_VOID__VOID,
|
|
G_TYPE_NONE, 0);
|
|
|
|
}
|
|
|
|
static void
|
|
ccnet_peer_init (CcnetPeer *peer)
|
|
{
|
|
|
|
}
|
|
|
|
|
|
CcnetPeer*
|
|
ccnet_peer_new (const char *id)
|
|
{
|
|
CcnetPeer *peer;
|
|
|
|
g_return_val_if_fail (strlen(id) == 40, NULL);
|
|
|
|
peer = g_object_new (CCNET_TYPE_PEER, NULL);
|
|
memcpy (peer->id, id, 40);
|
|
peer->id[40] = '\0';
|
|
|
|
peer->net_state = PEER_DOWN;
|
|
peer->public_port = 0;
|
|
|
|
peer->processors = g_hash_table_new_full (g_direct_hash, g_direct_equal,
|
|
NULL, NULL);
|
|
peer->reqID = CCNET_USER_ID_START;
|
|
|
|
peer->packet = evbuffer_new ();
|
|
|
|
return peer;
|
|
}
|
|
|
|
|
|
inline static void append_string_property (GString *buf, const char *name,
|
|
const char *value)
|
|
{
|
|
if (value && value[0])
|
|
g_string_append_printf (buf, "%s %s\n", name, value);
|
|
}
|
|
|
|
inline static void append_int_property (GString *buf, const char *name,
|
|
int value)
|
|
{
|
|
if (value != -1)
|
|
g_string_append_printf (buf, "%s %d\n", name, value);
|
|
}
|
|
|
|
inline static void append_int64_property (GString *buf, const char *name,
|
|
gint64 value)
|
|
{
|
|
g_string_append_printf (buf, "%s %"G_GINT64_FORMAT"\n", name, value);
|
|
}
|
|
|
|
|
|
GString*
|
|
ccnet_peer_to_string (CcnetPeer *peer)
|
|
{
|
|
GString *buf = g_string_new (NULL);
|
|
g_string_append (buf, "peer/");
|
|
g_string_append (buf, peer->id);
|
|
g_string_append (buf, "\n");
|
|
|
|
append_string_property (buf, "name", peer->name);
|
|
append_string_property (buf, "service-url", peer->service_url);
|
|
|
|
if (peer->pubkey) {
|
|
GString *str = public_key_to_gstring(peer->pubkey);
|
|
g_string_append_printf (buf, "%s %s\n", "pubkey", str->str);
|
|
g_string_free(str, TRUE);
|
|
}
|
|
|
|
return buf;
|
|
}
|
|
|
|
|
|
static void parse_field (CcnetPeer *peer, const char *key, char *value)
|
|
{
|
|
if (strcmp(key, "name") == 0) {
|
|
g_free (peer->name);
|
|
peer->name = g_strdup(value);
|
|
return;
|
|
}
|
|
|
|
if (strcmp(key, "service-url") == 0) {
|
|
g_free (peer->service_url);
|
|
peer->service_url = g_strdup(value);
|
|
return;
|
|
}
|
|
|
|
if (strcmp(key, "pubkey") == 0) {
|
|
if (peer->pubkey)
|
|
RSA_free(peer->pubkey);
|
|
peer->pubkey = public_key_from_string (value);
|
|
return;
|
|
}
|
|
}
|
|
|
|
CcnetPeer*
|
|
ccnet_peer_from_string (char *content)
|
|
{
|
|
CcnetPeer *peer = NULL;
|
|
|
|
char *ptr, *start = content;
|
|
|
|
if ( !(ptr = strchr(start, '\n')) ) return NULL;
|
|
*ptr = '\0';
|
|
char *object_id = start;
|
|
start = ptr + 1;
|
|
|
|
char *object_type = ccnet_object_type_from_id (object_id);
|
|
if (g_strcmp0(object_type, OBJECT_TYPE_STRING) != 0)
|
|
goto out;
|
|
|
|
char *pure_id = object_id + strlen(object_type) + 1;
|
|
if (!peer_id_valid(pure_id)) {
|
|
ccnet_warning ("Wrong peer id %s\n", pure_id);
|
|
goto out;
|
|
}
|
|
|
|
peer = ccnet_peer_new (pure_id);
|
|
parse_key_value_pairs (
|
|
start, (KeyValueFunc)parse_field, peer);
|
|
|
|
out:
|
|
g_free (object_type);
|
|
return peer;
|
|
}
|
|
|
|
void
|
|
ccnet_peer_update_from_string (CcnetPeer *peer, char *content)
|
|
{
|
|
char *ptr, *start = content;
|
|
|
|
if ( !(ptr = strchr(start, '\n')) ) return;
|
|
*ptr = '\0';
|
|
char *object_id = start;
|
|
start = ptr + 1;
|
|
|
|
char *object_type = ccnet_object_type_from_id (object_id);
|
|
if (strcmp(object_type, OBJECT_TYPE_STRING) != 0)
|
|
goto out;
|
|
|
|
char *pure_id = object_id + strlen(object_type) + 1;
|
|
g_return_if_fail (strcmp(pure_id, peer->id) == 0);
|
|
|
|
parse_key_value_pairs (
|
|
start, (KeyValueFunc)parse_field, peer);
|
|
|
|
peer->need_saving = 1;
|
|
out:
|
|
g_free (object_type);
|
|
}
|
|
|
|
void
|
|
ccnet_peer_set_net_state (CcnetPeer *peer, int net_state)
|
|
{
|
|
/* do not need saving */
|
|
|
|
if (peer->net_state == net_state)
|
|
return;
|
|
|
|
ccnet_debug ("[Peer] Peer %s(%.8s) net state changed: %s->%s\n",
|
|
peer->name, peer->id,
|
|
ccnet_peer_get_net_state_string(peer->net_state),
|
|
ccnet_peer_get_net_state_string(net_state));
|
|
peer->last_net_state = peer->net_state;
|
|
|
|
if (net_state == PEER_DOWN) {
|
|
if (!peer->is_local)
|
|
--peer->manager->connected_peer;
|
|
} else
|
|
if (!peer->is_local)
|
|
++peer->manager->connected_peer;
|
|
|
|
if (net_state == PEER_CONNECTED && !peer->io->is_incoming)
|
|
g_object_set (peer, "can-connect", 1, NULL);
|
|
|
|
g_object_set (peer, "net-state", net_state, NULL);
|
|
}
|
|
|
|
static void
|
|
ccnet_peer_set_addr_str (CcnetPeer *peer, const char *addr_str)
|
|
{
|
|
/* do not need saving */
|
|
if (!addr_str)
|
|
return;
|
|
if (peer->addr_str && strcmp(addr_str, peer->addr_str) == 0)
|
|
return;
|
|
|
|
g_object_set (peer, "ip", addr_str, NULL);
|
|
ccnet_debug ("[Peer] Updated peer %s(%.10s) address %s\n",
|
|
peer->name, peer->id, addr_str);
|
|
}
|
|
|
|
void
|
|
ccnet_peer_update_address (CcnetPeer *peer, const char *addr_str,
|
|
unsigned short port)
|
|
{
|
|
if (!is_valid_ipaddr (addr_str))
|
|
return;
|
|
|
|
ccnet_peer_set_addr_str (peer, addr_str);
|
|
|
|
if (port == 0)
|
|
return;
|
|
peer->port = port;
|
|
}
|
|
|
|
void ccnet_peer_set_pubkey (CcnetPeer *peer, char *str)
|
|
{
|
|
g_object_set (peer, "pubkey", str, NULL);
|
|
if (!peer->pubkey)
|
|
ccnet_warning("Wrong public key format\n");
|
|
peer->need_saving = 1;
|
|
}
|
|
|
|
int
|
|
ccnet_peer_prepare_channel_encryption (CcnetPeer *peer)
|
|
{
|
|
if (!peer->session_key)
|
|
return -1;
|
|
|
|
if ( ccnet_generate_cipher(peer->session_key, strlen(peer->session_key),
|
|
peer->key, peer->iv) < 0)
|
|
return -1;
|
|
|
|
peer->encrypt_channel = 1;
|
|
return 0;
|
|
}
|
|
|
|
/* -------- role management -------- */
|
|
|
|
void
|
|
ccnet_peer_add_role (CcnetPeer *peer, const char *role)
|
|
{
|
|
if (!ccnet_peer_has_role(peer, role)) {
|
|
peer->role_list = string_list_append_sorted (
|
|
peer->role_list, role);
|
|
}
|
|
}
|
|
|
|
void
|
|
ccnet_peer_remove_role (CcnetPeer *peer, const char *role)
|
|
{
|
|
g_return_if_fail (role != NULL);
|
|
|
|
if (!string_list_is_exists(peer->role_list, role))
|
|
return;
|
|
|
|
peer->role_list = string_list_remove (peer->role_list, role);
|
|
}
|
|
|
|
gboolean
|
|
ccnet_peer_has_role (CcnetPeer *peer, const char *role)
|
|
{
|
|
return string_list_is_exists(peer->role_list, role);
|
|
}
|
|
|
|
gboolean
|
|
ccnet_peer_has_my_role (CcnetPeer *peer, const char *role)
|
|
{
|
|
return string_list_is_exists(peer->myrole_list, role);
|
|
}
|
|
|
|
void
|
|
ccnet_peer_set_roles (CcnetPeer *peer, const char *roles)
|
|
{
|
|
GList *role_list = string_list_parse_sorted (roles, ",");
|
|
string_list_free (peer->role_list);
|
|
peer->role_list = role_list;
|
|
}
|
|
|
|
void
|
|
ccnet_peer_set_myroles (CcnetPeer *peer, const char *roles)
|
|
{
|
|
GList *role_list = string_list_parse_sorted (roles, ",");
|
|
string_list_free (peer->myrole_list);
|
|
peer->myrole_list = role_list;
|
|
|
|
/* ccnet_debug ("[Peer] Myrole on %s(%.8s) is set to %s\n", */
|
|
/* peer->id, peer->name, roles); */
|
|
}
|
|
|
|
void
|
|
ccnet_peer_get_roles_str (CcnetPeer *peer, GString* buf)
|
|
{
|
|
string_list_join (peer->role_list, buf, ",");
|
|
}
|
|
|
|
void
|
|
ccnet_peer_get_myroles_str (CcnetPeer *peer, GString* buf)
|
|
{
|
|
string_list_join (peer->myrole_list, buf, ",");
|
|
}
|
|
|
|
|
|
|
|
/* ----------- Packet Handling & Networking --------------------- */
|
|
|
|
static void remove_write_callbacks (CcnetPeer *peer)
|
|
{
|
|
g_list_foreach (peer->write_cbs, (GFunc)g_free, NULL);
|
|
g_list_free (peer->write_cbs);
|
|
}
|
|
|
|
static void
|
|
_peer_shutdown (CcnetPeer *peer)
|
|
{
|
|
peer->in_shutdown = 1;
|
|
|
|
if (peer->net_state == PEER_CONNECTED) {
|
|
peer->last_down = time(NULL);
|
|
ccnet_packet_io_free (peer->io);
|
|
peer->io = NULL;
|
|
g_object_set (peer, "can-connect", 0, NULL);
|
|
}
|
|
peer->is_ready = 0;
|
|
g_free (peer->dns_addr);
|
|
peer->dns_addr = NULL;
|
|
peer->dns_done = 0;
|
|
|
|
/* clear session key when peer down */
|
|
peer->encrypt_channel = 0;
|
|
g_free (peer->session_key);
|
|
peer->session_key = NULL;
|
|
|
|
ccnet_debug ("Shutdown all processors for peer %s\n", peer->name);
|
|
shutdown_processors (peer);
|
|
remove_write_callbacks (peer);
|
|
|
|
ccnet_peer_set_net_state (peer, PEER_DOWN);
|
|
|
|
g_signal_emit (peer, signals[DOWN_SIG], 0);
|
|
|
|
peer->in_shutdown = 0;
|
|
}
|
|
|
|
int
|
|
shutdown_peer (CcnetPeer *peer)
|
|
{
|
|
_peer_shutdown (peer);
|
|
peer->shutdown_scheduled = 0;
|
|
g_object_unref (peer);
|
|
return FALSE;
|
|
}
|
|
|
|
|
|
static void
|
|
schedule_shutdown (CcnetPeer *peer)
|
|
{
|
|
if (peer->shutdown_scheduled)
|
|
return;
|
|
|
|
g_object_ref (peer);
|
|
ccnet_timer_new ((TimerCB)shutdown_peer, peer, 1);
|
|
peer->shutdown_scheduled = 1;
|
|
}
|
|
|
|
void
|
|
ccnet_peer_shutdown (CcnetPeer *peer)
|
|
{
|
|
schedule_shutdown (peer);
|
|
}
|
|
|
|
void
|
|
ccnet_peer_shutdown_no_delay (CcnetPeer *peer)
|
|
{
|
|
if (!peer->shutdown_scheduled) {
|
|
peer->shutdown_scheduled = 1;
|
|
_peer_shutdown (peer);
|
|
peer->shutdown_scheduled = 0;
|
|
}
|
|
}
|
|
|
|
static void
|
|
create_remote_processor (CcnetPeer *peer, CcnetPeer *remote_peer,
|
|
int req_id, int argc, char **argv)
|
|
{
|
|
CcnetProcessor *processor;
|
|
CcnetProcFactory *factory = peer->manager->session->proc_factory;
|
|
|
|
processor = ccnet_proc_factory_create_slave_processor (
|
|
factory,"service-proxy", peer, req_id);
|
|
ccnet_processor_start (processor, 0, NULL);
|
|
|
|
ccnet_service_proxy_invoke_remote (processor, remote_peer, argc, argv);
|
|
}
|
|
|
|
static void
|
|
create_local_processor (CcnetPeer *peer, int req_id, int argc, char **argv)
|
|
{
|
|
CcnetProcessor *processor;
|
|
CcnetProcFactory *factory = peer->manager->session->proc_factory;
|
|
|
|
processor = ccnet_proc_factory_create_slave_processor (
|
|
factory, argv[0], peer, req_id);
|
|
|
|
if (processor) {
|
|
ccnet_processor_start (processor, argc-1, argv+1);
|
|
} else {
|
|
CcnetService *service;
|
|
|
|
service = ccnet_session_get_service (peer->manager->session, argv[0]);
|
|
if (service != NULL) {
|
|
processor = ccnet_proc_factory_create_slave_processor (
|
|
factory, "service-proxy", peer, req_id);
|
|
ccnet_processor_start (processor, 0, NULL);
|
|
ccnet_service_proxy_invoke_local (processor, service->provider,
|
|
argc, argv);
|
|
} else {
|
|
ccnet_peer_send_response (peer, req_id, SC_UNKNOWN_SERVICE,
|
|
SS_UNKNOWN_SERVICE,
|
|
NULL, 0);
|
|
ccnet_debug ("Unknown service %s invoke by %s(%.8s)\n",
|
|
argv[0], peer->name, peer->id);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void create_processor (CcnetPeer *peer, int req_id,
|
|
int argc, char **argv)
|
|
{
|
|
CcnetSession *session = peer->manager->session;
|
|
|
|
if (strcmp(argv[0], "remote") == 0) {
|
|
/* we have check this before (in permission checking) */
|
|
CcnetPeer *remote_peer;
|
|
|
|
remote_peer = ccnet_peer_manager_get_peer (peer->manager, argv[1]);
|
|
if (!remote_peer) {
|
|
ccnet_peer_send_response (peer, req_id, SC_UNKNOWN_PEER,
|
|
SS_UNKNOWN_PEER, NULL, 0);
|
|
ccnet_warning ("Unknown remote peer in invoking remote service\n");
|
|
return;
|
|
}
|
|
/* if (remote_peer->net_state == PEER_DOWN) { */
|
|
/* ccnet_peerSendResponse (peer, req_id, SC_PEER_UNREACHABLE, */
|
|
/* SS_PEER_UNREACHABLE, NULL, 0); */
|
|
/* ccnet_warning ("Unreachable remote peer in invoking remote service\n"); */
|
|
/* return; */
|
|
/* } */
|
|
|
|
/* To simplify caller's logic, we allow starting a remote processor to
|
|
* local host. Translate this call into a local one.
|
|
*/
|
|
if (session->myself == remote_peer) {
|
|
create_local_processor (peer, req_id, argc-2, argv+2);
|
|
g_object_unref (remote_peer);
|
|
return;
|
|
}
|
|
|
|
create_remote_processor (peer, remote_peer, req_id, argc-2, argv+2);
|
|
g_object_unref (remote_peer);
|
|
return;
|
|
}
|
|
|
|
create_local_processor (peer, req_id, argc, argv);
|
|
}
|
|
|
|
static void
|
|
handle_request (CcnetPeer *peer, int req_id, char *data, int len)
|
|
{
|
|
char *msg;
|
|
gchar **commands;
|
|
gchar **pcmd;
|
|
int i, perm;
|
|
|
|
/* TODO: remove string copy */
|
|
if (len < 1)
|
|
return;
|
|
msg = g_malloc (len+1);
|
|
memcpy (msg, data, len);
|
|
msg[len] = '\0';
|
|
|
|
commands = g_strsplit_set (msg, " \t", 10);
|
|
for (i=0, pcmd = commands; *pcmd; pcmd++)
|
|
i++;
|
|
if (i <= 0) return;
|
|
g_free (msg);
|
|
|
|
/* permission checking */
|
|
if (!peer->is_local) {
|
|
perm = ccnet_perm_manager_check_permission(peer->manager->session->perm_mgr,
|
|
peer, commands[0],
|
|
req_id,
|
|
i, commands);
|
|
if (perm == PERM_CHECK_ERROR) {
|
|
ccnet_peer_send_response (peer, req_id, SC_PERM_ERR, SS_PERM_ERR,
|
|
NULL, 0);
|
|
goto ret;
|
|
} else if (perm == PERM_CHECK_DELAY) {
|
|
ccnet_peer_send_response (peer, req_id, SC_PERM_ERR, SS_PERM_ERR,
|
|
NULL, 0);
|
|
goto ret;
|
|
} else if (perm == PERM_CHECK_NOSERVICE) {
|
|
ccnet_peer_send_response (peer, req_id, SC_UNKNOWN_SERVICE_IN_PERM,
|
|
SS_UNKNOWN_SERVICE_IN_PERM, NULL, 0);
|
|
goto ret;
|
|
}
|
|
}
|
|
|
|
/* check duplication request */
|
|
CcnetProcessor *processor;
|
|
processor = ccnet_peer_get_processor (peer, SLAVE_ID(req_id));
|
|
if (processor != NULL) {
|
|
ccnet_warning ("Received duplication request, id is %d\n", req_id);
|
|
goto ret;
|
|
}
|
|
|
|
create_processor (peer, req_id, i, commands);
|
|
|
|
ret:
|
|
g_strfreev (commands);
|
|
}
|
|
|
|
static void
|
|
handle_response (CcnetPeer *peer, int req_id, char *data, int len)
|
|
{
|
|
CcnetProcessor *processor;
|
|
char *code, *code_msg = 0, *content = 0;
|
|
int clen;
|
|
char *ptr, *end;
|
|
|
|
if (len < 4)
|
|
goto error;
|
|
|
|
code = data;
|
|
|
|
ptr = data + 3;
|
|
if (*ptr == '\n') {
|
|
/* no code_msg */
|
|
*ptr++ = '\0';
|
|
content = ptr;
|
|
clen = len - (ptr - data);
|
|
goto parsed;
|
|
}
|
|
|
|
if (*ptr != ' ')
|
|
goto error;
|
|
|
|
*ptr++ = '\0';
|
|
code_msg = ptr;
|
|
|
|
end = data + len;
|
|
for (ptr = data; *ptr != '\n' && ptr != end; ptr++) ;
|
|
|
|
if (ptr == end) /* must end with '\n' */
|
|
goto error;
|
|
|
|
/* if (*(ptr-1) == '\r') */
|
|
/* *(ptr-1) = '\0'; */
|
|
*ptr++ = '\0';
|
|
content = ptr;
|
|
clen = len - (ptr - data);
|
|
|
|
parsed:
|
|
processor = ccnet_peer_get_processor (peer, MASTER_ID (req_id));
|
|
if (processor == NULL) {
|
|
/* do nothing if receiving SC_PROC_DEAD and the processor on
|
|
* this side is also not present. Otherwise send SC_PROC_DEAD
|
|
*/
|
|
if (memcmp(code, SC_PROC_DEAD, 3) != 0) {
|
|
ccnet_debug ("Delayed response from %s(%.10s), id is %d, %s %s\n",
|
|
peer->name, peer->id, req_id, code, code_msg);
|
|
ccnet_peer_send_update (peer, req_id,
|
|
SC_PROC_DEAD, SS_PROC_DEAD,
|
|
NULL, 0);
|
|
}
|
|
return;
|
|
}
|
|
/* if (!peer->is_local) */
|
|
/* ccnet_debug ("[RECV] handle_response %s id is %d, %s %s\n", */
|
|
/* GET_PNAME(processor), PRINT_ID(processor->id), */
|
|
/* code, code_msg); */
|
|
|
|
peer->in_processor_call = 1;
|
|
ccnet_processor_handle_response (processor, code, code_msg, content, clen);
|
|
peer->in_processor_call = 0;
|
|
return;
|
|
|
|
error:
|
|
ccnet_warning ("Bad response format from %s\n", peer->id);
|
|
}
|
|
|
|
static void
|
|
handle_update (CcnetPeer *peer, int req_id, char *data, int len)
|
|
{
|
|
CcnetProcessor *processor;
|
|
char *code, *code_msg = 0, *content = 0;
|
|
int clen;
|
|
char *ptr, *end;
|
|
|
|
if (len < 4)
|
|
goto error;
|
|
|
|
code = data;
|
|
|
|
ptr = data + 3;
|
|
if (*ptr == '\n') {
|
|
/* no code_msg */
|
|
*ptr++ = '\0';
|
|
content = ptr;
|
|
clen = len - (ptr - data);
|
|
goto parsed;
|
|
}
|
|
|
|
if (*ptr != ' ')
|
|
goto error;
|
|
|
|
*ptr++ = '\0';
|
|
code_msg = ptr;
|
|
|
|
end = data + len;
|
|
for (ptr = data; *ptr != '\n' && ptr != end; ptr++) ;
|
|
|
|
if (ptr == end) /* must end with '\n' */
|
|
goto error;
|
|
|
|
/* if (*(ptr-1) == '\r') */
|
|
/* *(ptr-1) = '\0'; */
|
|
*ptr++ = '\0';
|
|
content = ptr;
|
|
clen = len - (ptr - data);
|
|
|
|
parsed:
|
|
processor = ccnet_peer_get_processor (peer, SLAVE_ID(req_id));
|
|
if (processor == NULL) {
|
|
if (memcmp(code, SC_PROC_DEAD, 3) != 0
|
|
&& memcmp(code, SC_PROC_DONE, 3) != 0) {
|
|
ccnet_debug ("Delayed update from %s(%.8s), id is %d, %s %s\n",
|
|
peer->name, peer->id, req_id, code, code_msg);
|
|
ccnet_peer_send_response (peer, req_id,
|
|
SC_PROC_DEAD, SS_PROC_DEAD,
|
|
NULL, 0);
|
|
}
|
|
return;
|
|
}
|
|
|
|
/* if (!peer->is_local) */
|
|
/* ccnet_debug ("[RECV] handle_update %s id is %d, %s %s\n", */
|
|
/* GET_PNAME(processor), PRINT_ID(processor->id), */
|
|
/* code, code_msg); */
|
|
peer->in_processor_call = 1;
|
|
ccnet_processor_handle_update (processor, code, code_msg, content, clen);
|
|
peer->in_processor_call = 0;
|
|
return;
|
|
|
|
error:
|
|
ccnet_warning ("Bad update format from %s\n", peer->id);
|
|
}
|
|
|
|
|
|
static void
|
|
handle_packet (ccnet_packet *packet, CcnetPeer *peer)
|
|
{
|
|
switch (packet->header.type) {
|
|
case CCNET_MSG_REQUEST:
|
|
handle_request (peer, packet->header.id,
|
|
packet->data, packet->header.length);
|
|
break;
|
|
case CCNET_MSG_RESPONSE:
|
|
handle_response (peer, packet->header.id,
|
|
packet->data, packet->header.length);
|
|
break;
|
|
case CCNET_MSG_UPDATE:
|
|
handle_update (peer, packet->header.id,
|
|
packet->data, packet->header.length);
|
|
break;
|
|
default:
|
|
ccnet_warning ("Unknown header type %d\n", packet->header.type);
|
|
};
|
|
}
|
|
|
|
static void
|
|
canRead (ccnet_packet *packet, void *vpeer)
|
|
{
|
|
CcnetPeer *peer = vpeer;
|
|
g_object_ref (peer);
|
|
|
|
/* if (!peer->is_local) */
|
|
/* ccnet_debug ("[RECV] Recieve packat from %s type is %d, id is %d\n", */
|
|
/* peer->id, packet->header.type, packet->header.id); */
|
|
|
|
if (packet->header.id == 0)
|
|
return;
|
|
|
|
if (packet->header.type != CCNET_MSG_ENCPACKET) {
|
|
handle_packet (packet, peer);
|
|
} else {
|
|
/* ccnet_debug ("receive an encrypt packet\n"); */
|
|
|
|
if (!peer->session_key) {
|
|
ccnet_debug("Receive a encrypted packet from %s(%.8s) while "
|
|
"not having session key \n", peer->name, peer->id);
|
|
goto out;
|
|
}
|
|
|
|
char *data;
|
|
int len;
|
|
int ret;
|
|
ret = ccnet_decrypt_with_key (&data, &len, packet->data, packet->header.id,
|
|
peer->key, peer->iv);
|
|
if (ret < 0)
|
|
ccnet_warning ("[SEND] decryption error for peer %s(%.8s) \n",
|
|
peer->name, peer->id);
|
|
else {
|
|
ccnet_packet *new_pac = (ccnet_packet *)data;
|
|
/* byte order, from network to host */
|
|
new_pac->header.length = ntohs(new_pac->header.length);
|
|
new_pac->header.id = ntohl (new_pac->header.id);
|
|
|
|
handle_packet (new_pac, peer);
|
|
g_free (data);
|
|
}
|
|
}
|
|
|
|
out:
|
|
g_object_unref (peer);
|
|
}
|
|
|
|
struct WriteCallback {
|
|
int removing : 1;
|
|
|
|
PeerWriteCallback func;
|
|
void *user_data;
|
|
};
|
|
|
|
void
|
|
ccnet_peer_add_write_callback (CcnetPeer *peer,
|
|
PeerWriteCallback func,
|
|
void *user_data)
|
|
{
|
|
if (peer->net_state == PEER_CONNECTED) {
|
|
struct WriteCallback *wcb = g_new0 (struct WriteCallback, 1);
|
|
wcb->func = func;
|
|
wcb->user_data = user_data;
|
|
|
|
peer->write_cbs = g_list_prepend (peer->write_cbs, wcb);
|
|
} else {
|
|
ccnet_warning ("add_write_callback error: Peer not reachable\n");
|
|
}
|
|
}
|
|
|
|
void
|
|
ccnet_peer_remove_write_callback (CcnetPeer *peer,
|
|
PeerWriteCallback func,
|
|
void *user_data)
|
|
{
|
|
GList *ptr;
|
|
|
|
for (ptr = peer->write_cbs; ptr; ptr = ptr->next) {
|
|
struct WriteCallback *wcb = ptr->data;
|
|
if (wcb->func == func && wcb->user_data == user_data) {
|
|
peer->write_cbs = g_list_delete_link (peer->write_cbs, ptr);
|
|
g_free (wcb);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
didWrite(struct bufferevent * evin, void * vpeer)
|
|
{
|
|
CcnetPeer *peer = vpeer;
|
|
GList *ptr;
|
|
|
|
g_object_ref (peer);
|
|
peer->in_writecb = 1;
|
|
|
|
for (ptr = peer->write_cbs; ptr; ) {
|
|
struct WriteCallback *wcb = ptr->data;
|
|
GList *cur = ptr;
|
|
ptr = ptr->next;
|
|
if (wcb->func(peer, wcb->user_data) == FALSE) {
|
|
peer->write_cbs = g_list_delete_link (peer->write_cbs, cur);
|
|
g_free (wcb);
|
|
}
|
|
}
|
|
|
|
peer->in_writecb = 0;
|
|
g_object_unref (peer);
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
gotError (struct bufferevent *evbuf, short what, void *vpeer)
|
|
{
|
|
CcnetPeer *peer = vpeer;
|
|
g_object_ref (peer);
|
|
/* ccnet_warning ("libevent got an error on peer %s what==%d, errno=%d (%s)\n", */
|
|
/* peer->name, (int)what, errno, strerror(errno)); */
|
|
|
|
if (what & EVBUFFER_TIMEOUT) {
|
|
ccnet_warning ("libevent got a timeout for peer %s(%.8s), what=%hd, timeout secs=%ld\n",
|
|
peer->name, peer->id, what, *((time_t *)&(evbuf->timeout_read)));
|
|
ccnet_message ("Peer %s (%.10s) down for timeout\n", peer->name, peer->id);
|
|
peer->num_fails++;
|
|
ccnet_peer_shutdown (peer);
|
|
}
|
|
|
|
if (what & (EVBUFFER_EOF | EVBUFFER_ERROR)) {
|
|
if (what & EVBUFFER_ERROR)
|
|
ccnet_warning ("libevent got an error! what=%hd, errno=%d (%s)\n",
|
|
what, errno, strerror(errno));
|
|
if (peer->is_local) {
|
|
ccnet_message ("Local peer down\n");
|
|
ccnet_peer_shutdown (peer);
|
|
ccnet_session_unregister_service (peer->manager->session, peer);
|
|
ccnet_peer_manager_remove_local_peer (peer->manager,
|
|
peer);
|
|
} else {
|
|
ccnet_message ("[Net Error] Peer %s (%.10s) down\n", peer->name, peer->id);
|
|
peer->num_fails++;
|
|
ccnet_peer_shutdown (peer);
|
|
}
|
|
}
|
|
|
|
g_object_unref (peer);
|
|
}
|
|
|
|
/* static void */
|
|
/* ccnet_peer_reset_io (CcnetPeer *peer) */
|
|
/* { */
|
|
/* ccnet_packet_io_set_timeout_secs (peer->io, 0); /\* disable timeout *\/ */
|
|
/* ccnet_packet_io_set_iofuncs (peer->io, canRead, didWrite, gotError, peer); */
|
|
/* } */
|
|
|
|
void
|
|
ccnet_peer_set_io (CcnetPeer *peer, CcnetPacketIO *io)
|
|
{
|
|
peer->io = io;
|
|
/* libevent remove a previous timeout seems not work in libevent 2.0,
|
|
so we have to disable timeout by set it to a large value */
|
|
if (!peer->is_local)
|
|
ccnet_packet_io_set_timeout_secs (peer->io, 10000);
|
|
ccnet_packet_io_set_iofuncs (peer->io, canRead, didWrite, gotError, peer);
|
|
}
|
|
|
|
|
|
int
|
|
ccnet_peer_get_request_id (CcnetPeer *peer)
|
|
{
|
|
return (++peer->reqID);
|
|
}
|
|
|
|
|
|
#undef DEBUG_FLAG
|
|
/* #define DEBUG_FLAG CCNET_DEBUG_NETIO */
|
|
#include "log.h"
|
|
|
|
void
|
|
ccnet_peer_packet_prepare (const CcnetPeer *peer, int type, int id)
|
|
{
|
|
ccnet_header header;
|
|
|
|
header.version = 1;
|
|
header.type = type;
|
|
header.length = 0;
|
|
header.id = htonl (id);
|
|
evbuffer_add (peer->packet, &header, sizeof (header));
|
|
}
|
|
|
|
void
|
|
ccnet_peer_packet_write_string (const CcnetPeer *peer, const char *str)
|
|
{
|
|
int len;
|
|
|
|
len = strlen(str);
|
|
evbuffer_add (peer->packet, str, len);
|
|
}
|
|
|
|
void
|
|
ccnet_peer_packet_finish (const CcnetPeer *peer)
|
|
{
|
|
ccnet_header *header;
|
|
header = (ccnet_header *) EVBUFFER_DATA(peer->packet);
|
|
header->length = htons (EVBUFFER_LENGTH(peer->packet)
|
|
- CCNET_PACKET_LENGTH_HEADER);
|
|
}
|
|
|
|
void
|
|
ccnet_peer_packet_send (const CcnetPeer *peer)
|
|
{
|
|
int ret = 0;
|
|
if (peer->is_local) {
|
|
bufferevent_write_buffer (peer->io->bufev, peer->packet);
|
|
return;
|
|
}
|
|
|
|
if (peer->net_state == PEER_CONNECTED) {
|
|
if (!peer->encrypt_channel) {
|
|
ret = bufferevent_write_buffer (peer->io->bufev, peer->packet);
|
|
} else {
|
|
ccnet_header enc_header;
|
|
char *data = (char *)EVBUFFER_DATA(peer->packet);
|
|
uint32_t len = EVBUFFER_LENGTH(peer->packet);
|
|
char *enc_data;
|
|
int enc_len;
|
|
ret = ccnet_encrypt_with_key (&enc_data, &enc_len, data, len,
|
|
peer->key, peer->iv);
|
|
if (ret < 0) {
|
|
ccnet_warning ("[SEND] encryption error for sending packet "
|
|
"to peer %s(%.8s) \n", peer->name, peer->id);
|
|
evbuffer_drain (peer->packet, EVBUFFER_LENGTH(peer->packet));
|
|
return;
|
|
}
|
|
|
|
enc_header.version = 1;
|
|
enc_header.type = CCNET_MSG_ENCPACKET;
|
|
enc_header.length = 0;
|
|
enc_header.id = htonl(enc_len);
|
|
bufferevent_write (peer->io->bufev, &enc_header, sizeof (enc_header));
|
|
bufferevent_write (peer->io->bufev, enc_data, enc_len);
|
|
g_free (enc_data);
|
|
evbuffer_drain (peer->packet, EVBUFFER_LENGTH(peer->packet));
|
|
}
|
|
if (ret < 0)
|
|
ccnet_warning ("[SEND] bufferevent failed to send packet to peer(%.8s) \n",
|
|
peer->id);
|
|
} else {
|
|
ccnet_warning ("Unable to send packet when peer is not connected.\n");
|
|
evbuffer_drain (peer->packet, EVBUFFER_LENGTH(peer->packet));
|
|
}
|
|
}
|
|
|
|
void
|
|
ccnet_peer_packet_finish_send (const CcnetPeer *peer)
|
|
{
|
|
ccnet_peer_packet_finish (peer);
|
|
ccnet_peer_packet_send (peer);
|
|
}
|
|
|
|
void
|
|
ccnet_peer_send_request (const CcnetPeer *peer, int req_id, const char *req)
|
|
{
|
|
if (!peer->is_local)
|
|
ccnet_debug ("[network] Send a request: id %d, cmd %s\n", req_id, req);
|
|
ccnet_peer_packet_prepare (peer, CCNET_MSG_REQUEST, req_id);
|
|
ccnet_peer_packet_write_string (peer, req);
|
|
ccnet_peer_packet_finish_send (peer);
|
|
}
|
|
|
|
|
|
void
|
|
ccnet_peer_send_response (const CcnetPeer *peer, int req_id,
|
|
const char *code, const char *reason,
|
|
const char *content, int clen)
|
|
{
|
|
if ( (strlen(code) != 3) || !isdigit(code[0]) || !isdigit(code[1])
|
|
|| !isdigit(code[1]) ) {
|
|
ccnet_warning ("Bad code number\n");
|
|
return;
|
|
}
|
|
|
|
g_return_if_fail (clen < 65536);
|
|
|
|
ccnet_peer_packet_prepare (peer, CCNET_MSG_RESPONSE, req_id);
|
|
|
|
/* code line */
|
|
evbuffer_add (peer->packet, code, 3);
|
|
if (reason) {
|
|
evbuffer_add (peer->packet, " ", 1);
|
|
ccnet_peer_packet_write_string (peer, reason);
|
|
}
|
|
evbuffer_add (peer->packet, "\n", 1);
|
|
|
|
if (content)
|
|
evbuffer_add (peer->packet, content, clen);
|
|
|
|
ccnet_peer_packet_finish_send (peer);
|
|
|
|
if (!peer->is_local)
|
|
ccnet_debug ("[SEND] Send a response: id %d code %s %s\n",
|
|
req_id, code, reason);
|
|
}
|
|
|
|
void
|
|
ccnet_peer_send_update (const CcnetPeer *peer, int req_id,
|
|
const char *code, const char *reason,
|
|
const char *content, int clen)
|
|
{
|
|
ccnet_peer_packet_prepare (peer, CCNET_MSG_UPDATE, req_id);
|
|
|
|
/* code line */
|
|
evbuffer_add (peer->packet, code, 3);
|
|
if (reason) {
|
|
evbuffer_add (peer->packet, " ", 1);
|
|
ccnet_peer_packet_write_string (peer, reason);
|
|
}
|
|
evbuffer_add (peer->packet, "\n", 1);
|
|
|
|
if (content)
|
|
evbuffer_add (peer->packet, content, clen);
|
|
|
|
ccnet_peer_packet_finish_send (peer);
|
|
|
|
if (!peer->is_local)
|
|
ccnet_debug ("[SEND] Send an update: id %d code %s %s\n",
|
|
req_id, code, reason?reason:"NULL");
|
|
}
|
|
|
|
|
|
/* ---------------- Processors ---------------- */
|
|
|
|
#undef DEBUG_FLAG
|
|
#define DEBUG_FLAG CCNET_DEBUG_PROCESSOR
|
|
#include "log.h"
|
|
|
|
void
|
|
ccnet_peer_add_processor (CcnetPeer *peer, CcnetProcessor *processor)
|
|
{
|
|
if (!peer->is_local)
|
|
ccnet_debug ("[Proc] Add %s(%d) to peer %s\n", GET_PNAME(processor),
|
|
PRINT_ID(processor->id), peer->name);
|
|
g_hash_table_insert (peer->processors, (gpointer)(long)processor->id, processor);
|
|
processor->detached = 0;
|
|
}
|
|
|
|
|
|
void
|
|
ccnet_peer_remove_processor (CcnetPeer *peer, CcnetProcessor *processor)
|
|
{
|
|
/* ccnet_debug ("[Proc] Remove %s(%d) from peer %s\n", GET_PNAME(processor), */
|
|
/* PRINT_ID(processor->id), peer->name); */
|
|
g_hash_table_remove (peer->processors, (gpointer)(long)processor->id);
|
|
processor->detached = 1;
|
|
}
|
|
|
|
|
|
CcnetProcessor *
|
|
ccnet_peer_get_processor (CcnetPeer *peer, unsigned int id)
|
|
{
|
|
return g_hash_table_lookup (peer->processors, (gpointer)(long)id);
|
|
}
|
|
|
|
|
|
static void shutdown_processors (CcnetPeer *peer)
|
|
{
|
|
ccnet_proc_factory_shutdown_processors (
|
|
peer->manager->session->proc_factory, peer);
|
|
}
|
|
|
|
|
|
/* -------- redirect related code -------- */
|
|
|
|
void
|
|
ccnet_peer_set_redirect (CcnetPeer *peer, const char *addr, uint16_t port)
|
|
{
|
|
g_return_if_fail (peer->redirected == 0);
|
|
|
|
peer->redirected = 1;
|
|
peer->redirect_addr = g_strdup (addr);
|
|
peer->redirect_port = port;
|
|
}
|
|
|
|
void
|
|
ccnet_peer_unset_redirect (CcnetPeer *peer)
|
|
{
|
|
peer->redirected = 0;
|
|
g_free (peer->redirect_addr);
|
|
peer->redirect_port = 0;
|
|
}
|