From aa0185203babc3e8dccb25b71cd26ce81429d07f Mon Sep 17 00:00:00 2001 From: Xiangyue Cai Date: Mon, 22 Jul 2019 11:18:32 +0800 Subject: [PATCH] add rpc publish_event() , pop_event() and remove cevent (#253) --- common/Makefile.am | 1 + common/branch-mgr.c | 56 +++++++++++---------------- common/mq-mgr.c | 69 +++++++++++++++++++++++++++++++++ common/mq-mgr.h | 22 +++++++++++ common/rpc-service.c | 26 +++++++++++++ include/seafile-rpc.h | 5 +++ python/seafile/rpcclient.py | 10 +++++ python/seaserv/api.py | 6 +++ server/Makefile.am | 1 + server/http-server.c | 76 ++++++++++++++----------------------- server/seaf-server.c | 12 ++++++ server/seafile-session.c | 11 ++---- server/seafile-session.h | 3 +- 13 files changed, 208 insertions(+), 90 deletions(-) create mode 100644 common/mq-mgr.c create mode 100644 common/mq-mgr.h diff --git a/common/Makefile.am b/common/Makefile.am index 17a5982..6dfebde 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -20,6 +20,7 @@ noinst_HEADERS = \ obj-backend.h \ block-backend.h \ block.h \ + mq-mgr.h \ seaf-db.h \ config-mgr.h \ merge-new.h \ diff --git a/common/branch-mgr.c b/common/branch-mgr.c index fcbaa85..fc46f57 100644 --- a/common/branch-mgr.c +++ b/common/branch-mgr.c @@ -298,6 +298,8 @@ seaf_branch_manager_update_branch (SeafBranchManager *mgr, SeafBranch *branch) #if defined( SEAFILE_SERVER ) && defined( FULL_FEATURE ) +#include "mq-mgr.h" + static gboolean get_commit_id (SeafDBRow *row, void *data) { @@ -311,43 +313,29 @@ get_commit_id (SeafDBRow *row, void *data) return FALSE; } -/* typedef struct { */ -/* char *repo_id; */ -/* char *commit_id; */ -/* } RepoUpdateEventData; */ +static void +publish_repo_update_event (char *repo_id, char *commit_id) +{ + char buf[128]; + snprintf (buf, sizeof(buf), "repo-update\t%s\t%s", + repo_id, commit_id); -/* static void */ -/* publish_repo_update_event (CEvent *event, void *data) */ -/* { */ -/* RepoUpdateEventData *rdata = event->data; */ + publish_event (seaf->mq_mgr, SEAFILE_SERVER_CHANNEL_EVENT, buf); -/* char buf[128]; */ -/* snprintf (buf, sizeof(buf), "repo-update\t%s\t%s", */ -/* rdata->repo_id, rdata->commit_id); */ + g_free (repo_id); + g_free (commit_id); +} -/* seaf_mq_manager_publish_event (seaf->mq_mgr, buf); */ + static void + on_branch_updated (SeafBranchManager *mgr, SeafBranch *branch) + { + seaf_repo_manager_update_repo_info (seaf->repo_mgr, branch->repo_id, branch->commit_id); + + if (seaf_repo_manager_is_virtual_repo (seaf->repo_mgr, branch->repo_id)) + return; -/* g_free (rdata->repo_id); */ -/* g_free (rdata->commit_id); */ -/* g_free (rdata); */ -/* } */ - -/* static void */ -/* on_branch_updated (SeafBranchManager *mgr, SeafBranch *branch) */ -/* { */ -/* seaf_repo_manager_update_repo_info (seaf->repo_mgr, branch->repo_id, branch->commit_id); */ - -/* if (seaf_repo_manager_is_virtual_repo (seaf->repo_mgr, branch->repo_id)) */ -/* return; */ - -/* RepoUpdateEventData *rdata = g_new0 (RepoUpdateEventData, 1); */ - -/* rdata->repo_id = g_strdup (branch->repo_id); */ -/* rdata->commit_id = g_strdup (branch->commit_id); */ - -/* cevent_manager_add_event (seaf->ev_mgr, mgr->priv->cevent_id, rdata); */ - -/* } */ + publish_repo_update_event (branch->repo_id, branch->commit_id); + } int seaf_branch_manager_test_and_update_branch (SeafBranchManager *mgr, @@ -407,7 +395,7 @@ seaf_branch_manager_test_and_update_branch (SeafBranchManager *mgr, seaf_db_trans_close (trans); - /* on_branch_updated (mgr, branch); */ + on_branch_updated (mgr, branch); return 0; } diff --git a/common/mq-mgr.c b/common/mq-mgr.c new file mode 100644 index 0000000..f8cd389 --- /dev/null +++ b/common/mq-mgr.c @@ -0,0 +1,69 @@ +#include "common.h" +#include "log.h" +#include "utils.h" +#include "mq-mgr.h" + +typedef struct SeafMqManagerPriv { + // chan <-> async_queue + GHashTable *chans; +} SeafMqManagerPriv; + +SeafMqManager * +seaf_mq_manager_new () +{ + SeafMqManager *mgr = g_new0 (SeafMqManager, 1); + mgr->priv = g_new0 (SeafMqManagerPriv, 1); + mgr->priv->chans = g_hash_table_new_full (g_str_hash, g_str_equal, + (GDestroyNotify)g_free, + (GDestroyNotify)g_async_queue_unref); + + return mgr; +} + +static GAsyncQueue * +seaf_mq_manager_channel_new (SeafMqManager *mgr, const char *channel) +{ + GAsyncQueue *async_queue = NULL; + async_queue = g_async_queue_new_full ((GDestroyNotify)g_free); + + g_hash_table_replace (mgr->priv->chans, g_strdup (channel), async_queue); + + return async_queue; +} + +int +publish_event (SeafMqManager *mgr, const char *channel, const char *content) +{ + int ret = 0; + + if (!channel || !content) { + seaf_warning ("type and content should not be NULL.\n"); + return -1; + } + + GAsyncQueue *async_queue = g_hash_table_lookup (mgr->priv->chans, channel); + if (!async_queue) { + async_queue = seaf_mq_manager_channel_new(mgr, channel); + } + + if (!async_queue) { + seaf_warning("%s channel creation failed.\n", channel); + return -1; + } + + g_async_queue_push (async_queue, g_strdup (content)); + + return ret; +} + +char * +pop_event (SeafMqManager *mgr, const char *channel) +{ + GAsyncQueue *async_queue = g_hash_table_lookup (mgr->priv->chans, channel); + if (!async_queue) { + seaf_warning ("Unkonwn message channel %s.\n", channel); + return NULL; + } + + return g_async_queue_try_pop (async_queue); +} diff --git a/common/mq-mgr.h b/common/mq-mgr.h new file mode 100644 index 0000000..c845cc9 --- /dev/null +++ b/common/mq-mgr.h @@ -0,0 +1,22 @@ +#ifndef SEAF_MQ_MANAGER_H +#define SEAF_MQ_MANAGER_H + +#define SEAFILE_SERVER_CHANNEL_EVENT "seaf_server.event" +#define SEAFILE_SERVER_CHANNEL_STATS "seaf_server.stats" + +struct SeafMqManagerPriv; + +typedef struct SeafMqManager { + struct SeafMqManagerPriv *priv; +} SeafMqManager; + +SeafMqManager * +seaf_mq_manager_new (); + +int +publish_event (SeafMqManager *mgr, const char *channel, const char *content); + +char * +pop_event (SeafMqManager *mgr, const char *channel); + +#endif diff --git a/common/rpc-service.c b/common/rpc-service.c index df77693..d0324b9 100644 --- a/common/rpc-service.c +++ b/common/rpc-service.c @@ -14,6 +14,7 @@ #include "repo-mgr.h" #include "seafile-error.h" #include "seafile-rpc.h" +#include "mq-mgr.h" #ifdef SEAFILE_SERVER #include "web-accesstoken-mgr.h" @@ -233,6 +234,31 @@ seafile_restore_repo_from_trash (const char *repo_id, GError **error) return ret; } + +int +seafile_publish_event(const char *channel, const char *content, GError **error) +{ + int ret = 0; + + if (!channel || !content) { + g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Argument should not be null"); + return -1; + } + + ret = publish_event (seaf->mq_mgr, channel, content); + + return ret; +} + +char* +seafile_pop_event(const char *channel, GError **error) +{ + if (!channel) { + g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Argument should not be null"); + return NULL; + } + return pop_event (seaf->mq_mgr, channel); +} #endif GList* diff --git a/include/seafile-rpc.h b/include/seafile-rpc.h index 9214599..8b280bc 100644 --- a/include/seafile-rpc.h +++ b/include/seafile-rpc.h @@ -1122,4 +1122,9 @@ seafile_set_repo_status(const char *repo_id, int status, GError **error); int seafile_get_repo_status(const char *repo_id, GError **error); +int +seafile_publish_event(const char *channel, const char *content, GError **error); + +char* +seafile_pop_event(const char *channel, GError **error); #endif diff --git a/python/seafile/rpcclient.py b/python/seafile/rpcclient.py index 8a56ac9..489c0fe 100644 --- a/python/seafile/rpcclient.py +++ b/python/seafile/rpcclient.py @@ -789,3 +789,13 @@ class SeafServerThreadedRpcClient(NamedPipeClient): @searpc_func("int", ["string"]) def cancel_copy_task(task_id): pass + + # event + @searpc_func("int", ["string", "string"]) + def publish_event(channel, content): + pass + + @searpc_func("string", ["string"]) + def pop_event(channel): + pass + diff --git a/python/seaserv/api.py b/python/seaserv/api.py index eff7817..20ee546 100644 --- a/python/seaserv/api.py +++ b/python/seaserv/api.py @@ -827,6 +827,12 @@ class SeafileAPI(object): def convert_repo_path(self, repo_id, path, user, is_org=False): return seafserv_threaded_rpc.convert_repo_path(repo_id, path, user, 1 if is_org else 0) + def publish_event(self, channel, content): + return seafserv_threaded_rpc.publish_event(channel, content) + + def pop_event(self, channel): + return seafserv_threaded_rpc.pop_event(channel) + seafile_api = SeafileAPI() class CcnetAPI(object): diff --git a/server/Makefile.am b/server/Makefile.am index 3c16306..8ed7584 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -63,6 +63,7 @@ seaf_server_SOURCES = \ ../common/obj-backend-fs.c \ ../common/seafile-crypt.c \ ../common/diff-simple.c \ + ../common/mq-mgr.c \ ../common/block-mgr.c \ ../common/block-backend.c \ ../common/block-backend-fs.c \ diff --git a/server/http-server.c b/server/http-server.c index 0e38496..481a6ad 100644 --- a/server/http-server.c +++ b/server/http-server.c @@ -14,6 +14,7 @@ #include +#include "mq-mgr.h" #include "utils.h" #include "log.h" #include "http-server.h" @@ -62,9 +63,6 @@ struct _HttpServer { GHashTable *vir_repo_info_cache; pthread_mutex_t vir_repo_info_cache_lock; - /* uint32_t cevent_id; /\* Used for sending activity events. *\/ */ - /* uint32_t stats_event_id; /\* Used for sending events for statistics. *\/ */ - event_t *reap_timer; }; typedef struct _HttpServer HttpServer; @@ -497,7 +495,7 @@ typedef struct { char *client_name; } RepoEventData; -/* + static void free_repo_event_data (RepoEventData *data) { @@ -525,77 +523,71 @@ free_stats_event_data (StatsEventData *data) } static void -publish_repo_event (CEvent *event, void *data) +publish_repo_event (RepoEventData *rdata) { - RepoEventData *rdata = event->data; - GString *buf = g_string_new (NULL); g_string_printf (buf, "%s\t%s\t%s\t%s\t%s\t%s", rdata->etype, rdata->user, rdata->ip, rdata->client_name ? rdata->client_name : "", rdata->repo_id, rdata->path ? rdata->path : "/"); - seaf_mq_manager_publish_event (seaf->mq_mgr, buf->str); + publish_event (seaf->mq_mgr, SEAFILE_SERVER_CHANNEL_EVENT, buf->str); g_string_free (buf, TRUE); free_repo_event_data (rdata); } static void -publish_stats_event (CEvent *event, void *data) +publish_stats_event (StatsEventData *rdata) { - StatsEventData *rdata = event->data; - GString *buf = g_string_new (NULL); g_string_printf (buf, "%s\t%s\t%s\t%"G_GUINT64_FORMAT, rdata->etype, rdata->user, rdata->repo_id, rdata->bytes); - seaf_mq_manager_publish_stats_event (seaf->mq_mgr, buf->str); + publish_event (seaf->mq_mgr, SEAFILE_SERVER_CHANNEL_STATS, buf->str); g_string_free (buf, TRUE); free_stats_event_data (rdata); } -*/ static void on_repo_oper (HttpServer *htp_server, const char *etype, const char *repo_id, char *user, char *ip, char *client_name) { - /* RepoEventData *rdata = g_new0 (RepoEventData, 1); */ - /* SeafVirtRepo *vinfo = seaf_repo_manager_get_virtual_repo_info (seaf->repo_mgr, */ - /* repo_id); */ + RepoEventData *rdata = g_new0 (RepoEventData, 1); + SeafVirtRepo *vinfo = seaf_repo_manager_get_virtual_repo_info (seaf->repo_mgr, + repo_id); - /* if (vinfo) { */ - /* memcpy (rdata->repo_id, vinfo->origin_repo_id, 36); */ - /* rdata->path = g_strdup(vinfo->path); */ - /* } else */ - /* memcpy (rdata->repo_id, repo_id, 36); */ - /* rdata->etype = g_strdup (etype); */ - /* rdata->user = g_strdup (user); */ - /* rdata->ip = g_strdup (ip); */ - /* rdata->client_name = g_strdup(client_name); */ + if (vinfo) { + memcpy (rdata->repo_id, vinfo->origin_repo_id, 36); + rdata->path = g_strdup(vinfo->path); + } else + memcpy (rdata->repo_id, repo_id, 36); + rdata->etype = g_strdup (etype); + rdata->user = g_strdup (user); + rdata->ip = g_strdup (ip); + rdata->client_name = g_strdup(client_name); - /* cevent_manager_add_event (seaf->ev_mgr, htp_server->cevent_id, rdata); */ - - /* if (vinfo) { */ - /* g_free (vinfo->path); */ - /* g_free (vinfo); */ - /* } */ + publish_repo_event(rdata); + if (vinfo) { + g_free (vinfo->path); + g_free (vinfo); + } return; } void send_statistic_msg (const char *repo_id, char *user, char *operation, guint64 bytes) { - /* StatsEventData *rdata = g_new0 (StatsEventData, 1); */ + StatsEventData *rdata = g_new0 (StatsEventData, 1); - /* memcpy (rdata->repo_id, repo_id, 36); */ - /* rdata->etype = g_strdup (operation); */ - /* rdata->user = g_strdup (user); */ - /* rdata->bytes = bytes; */ + memcpy (rdata->repo_id, repo_id, 36); + rdata->etype = g_strdup (operation); + rdata->user = g_strdup (user); + rdata->bytes = bytes; - /* cevent_manager_add_event (seaf->ev_mgr, seaf->http_server->priv->stats_event_id, rdata); */ + publish_stats_event(rdata); return; } @@ -2284,16 +2276,6 @@ seaf_http_server_new (struct _SeafileSession *session) int seaf_http_server_start (HttpServerStruct *server) { -/* - server->priv->cevent_id = cevent_manager_register (seaf->ev_mgr, - (cevent_handler)publish_repo_event, - NULL); - - server->priv->stats_event_id = cevent_manager_register (seaf->ev_mgr, - (cevent_handler)publish_stats_event, - NULL); -*/ - int ret = pthread_create (&server->priv->thread_id, NULL, http_server_run, server); if (ret != 0) return -1; diff --git a/server/seaf-server.c b/server/seaf-server.c index 05ea72f..cb26509 100644 --- a/server/seaf-server.c +++ b/server/seaf-server.c @@ -637,6 +637,18 @@ static void start_rpc_service (int cloud_mode, char *seafile_dir) "seafile_get_file_id_by_commit_and_path", searpc_signature_string__string_string_string()); + /* event */ + searpc_server_register_function ("seafserv-threaded-rpcserver", + seafile_publish_event, + "publish_event", + searpc_signature_int__string_string()); + + searpc_server_register_function ("seafserv-threaded-rpcserver", + seafile_pop_event, + "pop_event", + searpc_signature_string__string()); + + if (!cloud_mode) { searpc_server_register_function ("seafserv-threaded-rpcserver", seafile_set_inner_pub_repo, diff --git a/server/seafile-session.c b/server/seafile-session.c index 44564ed..6d7351d 100644 --- a/server/seafile-session.c +++ b/server/seafile-session.c @@ -16,11 +16,11 @@ #include -#include #include "utils.h" #include "seafile-session.h" +#include "mq-mgr.h" #include "seaf-db.h" #include "seaf-utils.h" @@ -139,8 +139,8 @@ seafile_session_new(const char *central_config_dir, session->size_sched = size_scheduler_new (session); - session->ev_mgr = cevent_manager_new (); - if (!session->ev_mgr) + session->mq_mgr = seaf_mq_manager_new (); + if (!session->mq_mgr) goto onerror; session->http_server = seaf_http_server_new (session); @@ -201,11 +201,6 @@ seafile_session_init (SeafileSession *session) int seafile_session_start (SeafileSession *session) { - if (cevent_manager_start (session->ev_mgr) < 0) { - seaf_warning ("Failed to start event manager.\n"); - return -1; - } - if (seaf_share_manager_start (session->share_mgr) < 0) { seaf_warning ("Failed to start share manager.\n"); return -1; diff --git a/server/seafile-session.h b/server/seafile-session.h index 32b8a12..6204e78 100644 --- a/server/seafile-session.h +++ b/server/seafile-session.h @@ -14,6 +14,7 @@ #include "repo-mgr.h" #include "db.h" #include "seaf-db.h" +#include "mq-mgr.h" #include "share-mgr.h" #include "web-accesstoken-mgr.h" @@ -56,7 +57,7 @@ struct _SeafileSession { SeafWebAccessTokenManager *web_at_mgr; - CEventManager *ev_mgr; + SeafMqManager *mq_mgr; CcnetJobManager *job_mgr; SizeScheduler *size_sched;