mirror of
https://github.com/haiwen/ccnet-server.git
synced 2025-04-27 18:25:06 +00:00
176 lines
4.4 KiB
C
176 lines
4.4 KiB
C
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
|
|
|
|
#include "common.h"
|
|
|
|
#include "ccnet-db.h"
|
|
#include "timer.h"
|
|
|
|
#include "peer.h"
|
|
#include "session.h"
|
|
#include "processors/mqserver-proc.h"
|
|
|
|
#include "message.h"
|
|
#include "message-manager.h"
|
|
#include "peer-mgr.h"
|
|
|
|
#define DEBUG_FLAG CCNET_DEBUG_MESSAGE
|
|
#include "log.h"
|
|
|
|
|
|
struct MessageManagerPriv {
|
|
GHashTable *subscribers;
|
|
|
|
#ifdef CCNET_SERVER
|
|
|
|
#endif
|
|
};
|
|
|
|
#define GET_PRIV(o) \
|
|
(G_TYPE_INSTANCE_GET_PRIVATE ((o), CCNET_TYPE_MESSAGE_MANAGER, MessageManagerPriv))
|
|
|
|
G_DEFINE_TYPE (CcnetMessageManager, ccnet_message_manager, G_TYPE_OBJECT);
|
|
|
|
|
|
static void
|
|
ccnet_message_manager_class_init (CcnetMessageManagerClass *class)
|
|
{
|
|
/* GObjectClass *object_class; */
|
|
|
|
g_type_class_add_private (class, sizeof (MessageManagerPriv));
|
|
}
|
|
|
|
static void
|
|
ccnet_message_manager_init (CcnetMessageManager *manager)
|
|
{
|
|
manager->priv = GET_PRIV(manager);
|
|
}
|
|
|
|
CcnetMessageManager *
|
|
ccnet_message_manager_new (CcnetSession *session)
|
|
{
|
|
CcnetMessageManager *manager;
|
|
|
|
manager = g_object_new (CCNET_TYPE_MESSAGE_MANAGER, NULL);
|
|
manager->session = session;
|
|
|
|
manager->priv->subscribers = g_hash_table_new_full (
|
|
g_str_hash, g_str_equal, g_free, NULL);
|
|
|
|
return manager;
|
|
}
|
|
|
|
int
|
|
ccnet_message_manager_start (CcnetMessageManager *manager)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
static gboolean
|
|
handle_inner_message (CcnetMessageManager *manager,
|
|
CcnetMessage *msg)
|
|
{
|
|
if (strcmp(msg->app, IPEERMGR_APP) == 0) {
|
|
ccnet_peer_manager_receive_message (manager->session->peer_mgr, msg);
|
|
return TRUE;
|
|
}
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
int
|
|
ccnet_message_manager_add_msg(CcnetMessageManager *manager,
|
|
CcnetMessage *msg,
|
|
int msg_type)
|
|
{
|
|
MessageManagerPriv *priv = manager->priv;
|
|
GList *app_subscribers, *ptr;
|
|
CcnetProcessor *processor;
|
|
|
|
switch (msg_type) {
|
|
case MSG_TYPE_RECV:
|
|
if (handle_inner_message(manager, msg))
|
|
break;
|
|
|
|
app_subscribers = g_hash_table_lookup (priv->subscribers,
|
|
msg->app);
|
|
if (!app_subscribers)
|
|
break;
|
|
|
|
ptr = app_subscribers;
|
|
while (ptr) {
|
|
processor = ptr->data;
|
|
ccnet_mqserver_proc_put_message (processor, msg);
|
|
ptr = ptr->next;
|
|
}
|
|
break;
|
|
case MSG_TYPE_SYS:
|
|
app_subscribers = g_hash_table_lookup (priv->subscribers, msg->app);
|
|
if (!app_subscribers)
|
|
break;
|
|
|
|
ptr = app_subscribers;
|
|
while (ptr) {
|
|
processor = ptr->data;
|
|
ccnet_mqserver_proc_put_message (processor, msg);
|
|
ptr = ptr->next;
|
|
}
|
|
break;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
ccnet_message_manager_subscribe_app (CcnetMessageManager *manager,
|
|
CcnetProcessor *mq_proc,
|
|
int n_app, char **apps)
|
|
{
|
|
MessageManagerPriv *priv = manager->priv;
|
|
GList *app_subscribers;
|
|
int i;
|
|
|
|
for (i = 0; i < n_app; ++i) {
|
|
ccnet_debug ("[Msg] subscribe app %s\n", apps[i]);
|
|
|
|
app_subscribers = g_hash_table_lookup (priv->subscribers, apps[i]);
|
|
app_subscribers = g_list_prepend (app_subscribers, mq_proc);
|
|
g_hash_table_replace (priv->subscribers, g_strdup (apps[i]),
|
|
app_subscribers);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
ccnet_message_manager_unsubscribe_app (CcnetMessageManager *manager,
|
|
CcnetProcessor *mq_proc,
|
|
int n_app, char **apps)
|
|
{
|
|
MessageManagerPriv *priv = manager->priv;
|
|
GList *app_subscribers;
|
|
int i;
|
|
int ret = 0;
|
|
|
|
for (i = 0; i < n_app; ++i) {
|
|
app_subscribers = g_hash_table_lookup (priv->subscribers, apps[i]);
|
|
if (!app_subscribers) {
|
|
ccnet_warning ("cannot unsubscribe from app %s, "
|
|
"no such app subscribed.\n", apps[i]);
|
|
ret = -1;
|
|
continue;
|
|
}
|
|
|
|
ccnet_debug ("[Msg] unsubscribe app %s\n", apps[i]);
|
|
|
|
app_subscribers = g_list_remove (app_subscribers, mq_proc);
|
|
|
|
if (app_subscribers)
|
|
g_hash_table_replace (priv->subscribers, g_strdup (apps[i]),
|
|
app_subscribers);
|
|
else
|
|
g_hash_table_remove (priv->subscribers, apps[i]);
|
|
}
|
|
|
|
return ret;
|
|
}
|