1
0
mirror of https://github.com/haiwen/seafile-server.git synced 2025-04-27 19:15:07 +00:00
seafile-server/common/mq-mgr.c
2019-09-09 13:36:45 +08:00

71 lines
1.8 KiB
C

#include "common.h"
#include "log.h"
#include "utils.h"
#include "mq-mgr.h"
typedef struct SeafMqManagerPriv {
// chan <-> async_queue
GHashTable *chans;
} SeafMqManagerPriv;
SeafMqManager *
seaf_mq_manager_new ()
{
SeafMqManager *mgr = g_new0 (SeafMqManager, 1);
mgr->priv = g_new0 (SeafMqManagerPriv, 1);
mgr->priv->chans = g_hash_table_new_full (g_str_hash, g_str_equal,
(GDestroyNotify)g_free,
(GDestroyNotify)g_async_queue_unref);
return mgr;
}
static GAsyncQueue *
seaf_mq_manager_channel_new (SeafMqManager *mgr, const char *channel)
{
GAsyncQueue *async_queue = NULL;
async_queue = g_async_queue_new_full ((GDestroyNotify)json_decref);
g_hash_table_replace (mgr->priv->chans, g_strdup (channel), async_queue);
return async_queue;
}
int
seaf_mq_manager_publish_event (SeafMqManager *mgr, const char *channel, const char *content)
{
int ret = 0;
if (!channel || !content) {
seaf_warning ("type and content should not be NULL.\n");
return -1;
}
GAsyncQueue *async_queue = g_hash_table_lookup (mgr->priv->chans, channel);
if (!async_queue) {
async_queue = seaf_mq_manager_channel_new(mgr, channel);
}
if (!async_queue) {
seaf_warning("%s channel creation failed.\n", channel);
return -1;
}
json_t *msg = json_object();
json_object_set_new (msg, "content", json_string(content));
json_object_set_new (msg, "ctime", json_integer(time(NULL)));
g_async_queue_push (async_queue, msg);
return ret;
}
json_t *
seaf_mq_manager_pop_event (SeafMqManager *mgr, const char *channel)
{
GAsyncQueue *async_queue = g_hash_table_lookup (mgr->priv->chans, channel);
if (!async_queue)
return NULL;
return g_async_queue_try_pop (async_queue);
}