1
0
mirror of https://github.com/haiwen/ccnet-server.git synced 2025-04-27 18:25:06 +00:00
ccnet-server/net/common/message-manager.c
2016-08-19 13:54:34 +08:00

176 lines
4.4 KiB
C

/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include "common.h"
#include "ccnet-db.h"
#include "timer.h"
#include "peer.h"
#include "session.h"
#include "processors/mqserver-proc.h"
#include "message.h"
#include "message-manager.h"
#include "peer-mgr.h"
#define DEBUG_FLAG CCNET_DEBUG_MESSAGE
#include "log.h"
struct MessageManagerPriv {
GHashTable *subscribers;
#ifdef CCNET_SERVER
#endif
};
#define GET_PRIV(o) \
(G_TYPE_INSTANCE_GET_PRIVATE ((o), CCNET_TYPE_MESSAGE_MANAGER, MessageManagerPriv))
G_DEFINE_TYPE (CcnetMessageManager, ccnet_message_manager, G_TYPE_OBJECT);
static void
ccnet_message_manager_class_init (CcnetMessageManagerClass *class)
{
/* GObjectClass *object_class; */
g_type_class_add_private (class, sizeof (MessageManagerPriv));
}
static void
ccnet_message_manager_init (CcnetMessageManager *manager)
{
manager->priv = GET_PRIV(manager);
}
CcnetMessageManager *
ccnet_message_manager_new (CcnetSession *session)
{
CcnetMessageManager *manager;
manager = g_object_new (CCNET_TYPE_MESSAGE_MANAGER, NULL);
manager->session = session;
manager->priv->subscribers = g_hash_table_new_full (
g_str_hash, g_str_equal, g_free, NULL);
return manager;
}
int
ccnet_message_manager_start (CcnetMessageManager *manager)
{
return 0;
}
static gboolean
handle_inner_message (CcnetMessageManager *manager,
CcnetMessage *msg)
{
if (strcmp(msg->app, IPEERMGR_APP) == 0) {
ccnet_peer_manager_receive_message (manager->session->peer_mgr, msg);
return TRUE;
}
return FALSE;
}
int
ccnet_message_manager_add_msg(CcnetMessageManager *manager,
CcnetMessage *msg,
int msg_type)
{
MessageManagerPriv *priv = manager->priv;
GList *app_subscribers, *ptr;
CcnetProcessor *processor;
switch (msg_type) {
case MSG_TYPE_RECV:
if (handle_inner_message(manager, msg))
break;
app_subscribers = g_hash_table_lookup (priv->subscribers,
msg->app);
if (!app_subscribers)
break;
ptr = app_subscribers;
while (ptr) {
processor = ptr->data;
ccnet_mqserver_proc_put_message (processor, msg);
ptr = ptr->next;
}
break;
case MSG_TYPE_SYS:
app_subscribers = g_hash_table_lookup (priv->subscribers, msg->app);
if (!app_subscribers)
break;
ptr = app_subscribers;
while (ptr) {
processor = ptr->data;
ccnet_mqserver_proc_put_message (processor, msg);
ptr = ptr->next;
}
break;
}
return 0;
}
int
ccnet_message_manager_subscribe_app (CcnetMessageManager *manager,
CcnetProcessor *mq_proc,
int n_app, char **apps)
{
MessageManagerPriv *priv = manager->priv;
GList *app_subscribers;
int i;
for (i = 0; i < n_app; ++i) {
ccnet_debug ("[Msg] subscribe app %s\n", apps[i]);
app_subscribers = g_hash_table_lookup (priv->subscribers, apps[i]);
app_subscribers = g_list_prepend (app_subscribers, mq_proc);
g_hash_table_replace (priv->subscribers, g_strdup (apps[i]),
app_subscribers);
}
return 0;
}
int
ccnet_message_manager_unsubscribe_app (CcnetMessageManager *manager,
CcnetProcessor *mq_proc,
int n_app, char **apps)
{
MessageManagerPriv *priv = manager->priv;
GList *app_subscribers;
int i;
int ret = 0;
for (i = 0; i < n_app; ++i) {
app_subscribers = g_hash_table_lookup (priv->subscribers, apps[i]);
if (!app_subscribers) {
ccnet_warning ("cannot unsubscribe from app %s, "
"no such app subscribed.\n", apps[i]);
ret = -1;
continue;
}
ccnet_debug ("[Msg] unsubscribe app %s\n", apps[i]);
app_subscribers = g_list_remove (app_subscribers, mq_proc);
if (app_subscribers)
g_hash_table_replace (priv->subscribers, g_strdup (apps[i]),
app_subscribers);
else
g_hash_table_remove (priv->subscribers, apps[i]);
}
return ret;
}