mirror of
https://github.com/haiwen/ccnet-server.git
synced 2025-05-01 12:03:19 +00:00
334 lines
10 KiB
C
334 lines
10 KiB
C
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
|
|
|
|
#include "common.h"
|
|
|
|
#include <string.h>
|
|
|
|
#include "peer.h"
|
|
#include "session.h"
|
|
|
|
#include "processor.h"
|
|
#include "proc-factory.h"
|
|
#include "peer-mgr.h"
|
|
#include "timer.h"
|
|
#include "processors/keepalive2-proc.h"
|
|
#include "processors/rcvcmd-proc.h"
|
|
#include "processors/service-proxy-proc.h"
|
|
#include "processors/service-stub-proc.h"
|
|
#include "processors/sendsessionkey-proc.h"
|
|
#include "processors/recvsessionkey-proc.h"
|
|
#include "processors/sendsessionkey-v2-proc.h"
|
|
#include "processors/recvsessionkey-v2-proc.h"
|
|
|
|
|
|
#define DEBUG_FLAG CCNET_DEBUG_PROCESSOR
|
|
#include "log.h"
|
|
#include "utils.h"
|
|
|
|
|
|
/* Note, timeout here must be larger than the timeout of keepalive-proc
|
|
*
|
|
* Here, we only handle problems which happen when tcp connection is ok but
|
|
* processors are dead or not created by peer.
|
|
*
|
|
*
|
|
*/
|
|
|
|
/* The timeout of keepalive-proc is 180s now. */
|
|
#define DEFAULT_NO_PACKET_TIMEOUT 10 /* 10 seconds */
|
|
#define KEEPALIVE_PULSE 5 * 1000 /* 5 seconds */
|
|
#define CONNECTION_TIMEOUT 182
|
|
#define MAX_PROCS_KEEPALIVE 5 /* we check 5 proc for each peer at most */
|
|
|
|
typedef struct {
|
|
GHashTable *proc_type_table;
|
|
} CcnetProcFactoryPriv;
|
|
|
|
#define GET_PRIV(o) \
|
|
(G_TYPE_INSTANCE_GET_PRIVATE ((o), CCNET_TYPE_PROC_FACTORY, CcnetProcFactoryPriv))
|
|
|
|
G_DEFINE_TYPE (CcnetProcFactory, ccnet_proc_factory, G_TYPE_OBJECT);
|
|
|
|
static void
|
|
ccnet_proc_factory_class_init (CcnetProcFactoryClass *klass)
|
|
{
|
|
g_type_class_add_private (klass, sizeof (CcnetProcFactoryPriv));
|
|
}
|
|
|
|
static void
|
|
ccnet_proc_factory_init (CcnetProcFactory *factory)
|
|
{
|
|
CcnetProcFactoryPriv *priv = GET_PRIV (factory);
|
|
|
|
priv->proc_type_table = g_hash_table_new_full (
|
|
g_str_hash, g_str_equal, g_free, NULL);
|
|
}
|
|
|
|
void
|
|
ccnet_proc_factory_register_processor (CcnetProcFactory *factory,
|
|
const char *serv_name,
|
|
GType type)
|
|
{
|
|
CcnetProcFactoryPriv *priv = GET_PRIV (factory);
|
|
|
|
CcnetProcessorClass *proc_class =
|
|
(CcnetProcessorClass *)g_type_class_ref(type);
|
|
g_type_class_unref (proc_class);
|
|
|
|
g_hash_table_insert (priv->proc_type_table, g_strdup (serv_name),
|
|
(gpointer) type);
|
|
}
|
|
|
|
|
|
GType ccnet_getpubinfo_proc_get_type ();
|
|
GType ccnet_putpubinfo_proc_get_type ();
|
|
|
|
GType ccnet_sendmsg_proc_get_type ();
|
|
GType ccnet_rcvmsg_proc_get_type ();
|
|
|
|
GType ccnet_rcvcmd_proc_get_type ();
|
|
GType ccnet_getperm_proc_get_type ();
|
|
|
|
GType ccnet_keepalive2_proc_get_type ();
|
|
|
|
GType ccnet_mqserver_proc_get_type ();
|
|
|
|
GType ccnet_service_proxy_proc_get_type ();
|
|
GType ccnet_service_stub_proc_get_type ();
|
|
|
|
GType ccnet_sync_relay_proc_get_type ();
|
|
GType ccnet_sync_relay_slave_proc_get_type ();
|
|
|
|
GType ccnet_rpcserver_proc_get_type ();
|
|
GType ccnet_echo_proc_get_type ();
|
|
|
|
CcnetProcFactory *
|
|
ccnet_proc_factory_new (CcnetSession *session)
|
|
{
|
|
CcnetProcFactory *factory;
|
|
|
|
factory = g_object_new (CCNET_TYPE_PROC_FACTORY, NULL);
|
|
factory->session = session;
|
|
factory->no_packet_timeout = DEFAULT_NO_PACKET_TIMEOUT;
|
|
factory->procs = NULL;
|
|
|
|
/* register fundamental processors */
|
|
/* FIXME: These processor types shall be regitered by managers */
|
|
ccnet_proc_factory_register_processor (factory, "get-pubinfo",
|
|
ccnet_getpubinfo_proc_get_type ());
|
|
ccnet_proc_factory_register_processor (factory, "put-pubinfo",
|
|
ccnet_putpubinfo_proc_get_type ());
|
|
|
|
ccnet_proc_factory_register_processor (factory, "send-msg",
|
|
ccnet_sendmsg_proc_get_type ());
|
|
ccnet_proc_factory_register_processor (factory, "receive-msg",
|
|
ccnet_rcvmsg_proc_get_type ());
|
|
|
|
ccnet_proc_factory_register_processor (factory, "receive-cmd",
|
|
ccnet_rcvcmd_proc_get_type ());
|
|
/* ccnet_proc_factory_register_processor (factory, "receive-event", */
|
|
/* ccnet_rcvevent_proc_get_type ()); */
|
|
|
|
ccnet_proc_factory_register_processor (factory, "keepalive2",
|
|
ccnet_keepalive2_proc_get_type ());
|
|
|
|
ccnet_proc_factory_register_processor (factory, "send-session-key",
|
|
ccnet_sendsessionkey_proc_get_type());
|
|
|
|
ccnet_proc_factory_register_processor (factory, "receive-session-key",
|
|
ccnet_recvsessionkey_proc_get_type ());
|
|
|
|
ccnet_proc_factory_register_processor (factory, "send-skey2",
|
|
ccnet_sendskey2_proc_get_type());
|
|
ccnet_proc_factory_register_processor (factory, "receive-skey2",
|
|
ccnet_recvskey2_proc_get_type ());
|
|
|
|
ccnet_proc_factory_register_processor (factory, "mq-server",
|
|
ccnet_mqserver_proc_get_type ());
|
|
|
|
ccnet_proc_factory_register_processor (factory, "service-proxy",
|
|
ccnet_service_proxy_proc_get_type());
|
|
|
|
ccnet_proc_factory_register_processor (factory, "service-stub",
|
|
ccnet_service_stub_proc_get_type());
|
|
|
|
ccnet_proc_factory_register_processor (factory, "ccnet-rpcserver",
|
|
ccnet_rpcserver_proc_get_type());
|
|
|
|
|
|
|
|
/*
|
|
if (session->is_relay) {
|
|
ccnet_proc_factory_register_processor (factory, "sync-relay-slave",
|
|
ccnet_sync_relay_slave_proc_get_type());
|
|
} else {
|
|
ccnet_proc_factory_register_processor (factory, "sync-relay",
|
|
ccnet_sync_relay_proc_get_type());
|
|
}
|
|
*/
|
|
|
|
ccnet_proc_factory_register_processor (factory, "echo",
|
|
ccnet_echo_proc_get_type ());
|
|
|
|
return factory;
|
|
}
|
|
|
|
void
|
|
ccnet_proc_factory_start (CcnetProcFactory *factory)
|
|
{
|
|
/* factory->keepalive_timer = ccnet_timer_new ( */
|
|
/* (TimerCB) keepalive_pulse, factory, KEEPALIVE_PULSE); */
|
|
}
|
|
|
|
static GType
|
|
ccnet_proc_factory_get_proc_type (CcnetProcFactory *factory,
|
|
const char *serv_name)
|
|
{
|
|
CcnetProcFactoryPriv *priv = GET_PRIV (factory);
|
|
|
|
return (GType) g_hash_table_lookup (priv->proc_type_table, serv_name);
|
|
}
|
|
|
|
static inline CcnetProcessor *
|
|
create_processor_common (CcnetProcFactory *factory,
|
|
const char *serv_name,
|
|
CcnetPeer *peer,
|
|
int req_id)
|
|
{
|
|
GType type;
|
|
CcnetProcessor *processor;
|
|
|
|
type = ccnet_proc_factory_get_proc_type (factory, serv_name);
|
|
if (type == 0) {
|
|
return NULL;
|
|
}
|
|
|
|
processor = g_object_new (type, NULL);
|
|
processor->peer = peer;
|
|
g_object_ref (peer);
|
|
processor->session = factory->session;
|
|
processor->id = req_id;
|
|
/* Set the real processor name.
|
|
* This may be different from the processor class name.
|
|
*/
|
|
processor->name = g_strdup(serv_name);
|
|
|
|
if (!peer->is_local)
|
|
ccnet_debug ("Create processor %s(%d) %s\n", GET_PNAME(processor),
|
|
PRINT_ID(processor->id), processor->name);
|
|
ccnet_peer_add_processor (processor->peer, processor);
|
|
|
|
factory->procs_alive_cnt++;
|
|
|
|
return processor;
|
|
}
|
|
|
|
CcnetProcessor *
|
|
ccnet_proc_factory_create_slave_processor (CcnetProcFactory *factory,
|
|
const char *serv_name,
|
|
CcnetPeer *peer,
|
|
int req_id)
|
|
{
|
|
|
|
return create_processor_common(factory, serv_name,
|
|
peer, SLAVE_ID (req_id));
|
|
}
|
|
|
|
CcnetProcessor *
|
|
ccnet_proc_factory_create_master_processor (CcnetProcFactory *factory,
|
|
const char *serv_name,
|
|
CcnetPeer *peer)
|
|
{
|
|
return create_processor_common (
|
|
factory, serv_name,
|
|
peer, MASTER_ID(ccnet_peer_get_request_id (peer)) );
|
|
}
|
|
|
|
static void inline
|
|
recycle (CcnetProcFactory *factory, CcnetProcessor *processor)
|
|
{
|
|
factory->procs_alive_cnt--;
|
|
|
|
#ifdef DEBUG_PROC
|
|
if (strcmp(GET_PNAME(processor), "rpcserver-proc") != 0) {
|
|
/* ignore rpcserver-proc */
|
|
|
|
CcnetProc *proc = ccnet_proc_new();
|
|
g_object_set (proc, "name", GET_PNAME(processor),
|
|
"peer-name", processor->peer->name,
|
|
"ctime", (int) processor->start_time,
|
|
"dtime", (int) time(NULL), NULL);
|
|
factory->procs = g_list_prepend (factory->procs, proc);
|
|
}
|
|
#endif
|
|
|
|
/* TODO: implement processor pool */
|
|
g_object_unref (processor);
|
|
}
|
|
|
|
void
|
|
ccnet_proc_factory_recycle (CcnetProcFactory *factory,
|
|
CcnetProcessor *processor)
|
|
{
|
|
recycle (factory, processor);
|
|
}
|
|
|
|
static void
|
|
shutdown_processor (CcnetProcessor *processor,
|
|
char *code, char *code_msg)
|
|
{
|
|
/* Send an error message to shutdown the processor.
|
|
* If it's a proxy or stub proc, it'll first relay the message.
|
|
*/
|
|
if (!IS_SLAVE (processor)) {
|
|
ccnet_processor_handle_response (processor, code, code_msg,
|
|
NULL, 0);
|
|
} else {
|
|
ccnet_processor_handle_update (processor, code, code_msg,
|
|
NULL, 0);
|
|
}
|
|
}
|
|
|
|
void
|
|
ccnet_proc_factory_shutdown_processors (CcnetProcFactory *factory,
|
|
CcnetPeer *peer)
|
|
{
|
|
GList *list, *ptr;
|
|
CcnetProcessor *processor;
|
|
char *code = g_strdup (SC_NETDOWN);
|
|
char *code_msg = g_strdup (SS_NETDOWN);
|
|
|
|
list = g_hash_table_get_values (peer->processors);
|
|
for (ptr = list; ptr; ptr = ptr->next) {
|
|
processor = CCNET_PROCESSOR (ptr->data);
|
|
processor->detached = TRUE;
|
|
shutdown_processor (processor, code, code_msg);
|
|
}
|
|
g_hash_table_remove_all (peer->processors);
|
|
g_list_free (list);
|
|
|
|
g_free (code);
|
|
g_free (code_msg);
|
|
}
|
|
|
|
void
|
|
ccnet_proc_factory_set_keepalive_timeout (CcnetProcFactory *factory,
|
|
int timeout)
|
|
{
|
|
factory->no_packet_timeout = timeout;
|
|
}
|
|
|
|
/* Don't send keepalive or reclaim inactive processors. */
|
|
|
|
#if 0
|
|
|
|
static gint
|
|
compare_procs (gconstpointer a, gconstpointer b)
|
|
{
|
|
const CcnetProcessor *proc_a = a, *proc_b = b;
|
|
|
|
return (proc_a->t_keepalive_sent - proc_b->t_keepalive_sent);
|
|
}
|
|
|
|
#endif /* 0 */
|