1
0
mirror of https://github.com/haiwen/seafile-server.git synced 2025-09-04 00:44:21 +00:00

Send events use json format (#726)

Co-authored-by: 杨赫然 <heran.yang@seafile.com>
This commit is contained in:
feiniks
2024-12-13 11:01:24 +08:00
committed by GitHub
parent d04c324b24
commit c11e8ccbe9
3 changed files with 83 additions and 31 deletions

View File

@@ -320,11 +320,18 @@ get_commit_id (SeafDBRow *row, void *data)
static void static void
publish_repo_update_event (const char *repo_id, const char *commit_id) publish_repo_update_event (const char *repo_id, const char *commit_id)
{ {
char buf[128]; json_t *msg = json_object ();
snprintf (buf, sizeof(buf), "repo-update\t%s\t%s", char *msg_str = NULL;
repo_id, commit_id);
seaf_mq_manager_publish_event (seaf->mq_mgr, SEAFILE_SERVER_CHANNEL_EVENT, buf); json_object_set_new (msg, "msg_type", json_string("repo-update"));
json_object_set_new (msg, "repo_id", json_string(repo_id));
json_object_set_new (msg, "commit_id", json_string(commit_id));
msg_str = json_dumps (msg, JSON_PRESERVE_ORDER);
seaf_mq_manager_publish_event (seaf->mq_mgr, SEAFILE_SERVER_CHANNEL_EVENT, msg_str);
g_free (msg_str);
json_decref (msg);
} }
static void static void

View File

@@ -81,7 +81,7 @@ type repoEventData struct {
clientName string clientName string
} }
type statusEventData struct { type statsEventData struct {
eType string eType string
user string user string
repoID string repoID string
@@ -904,16 +904,23 @@ func getRepoStoreID(repoID string) (string, error) {
} }
func sendStatisticMsg(repoID, user, operation string, bytes uint64) { func sendStatisticMsg(repoID, user, operation string, bytes uint64) {
rData := &statusEventData{operation, user, repoID, bytes} rData := &statsEventData{operation, user, repoID, bytes}
publishStatusEvent(rData) publishStatsEvent(rData)
} }
func publishStatusEvent(rData *statusEventData) { func publishStatsEvent(rData *statsEventData) {
buf := fmt.Sprintf("%s\t%s\t%s\t%d", data := make(map[string]interface{})
rData.eType, rData.user, data["msg_type"] = rData.eType
rData.repoID, rData.bytes) data["user_name"] = rData.user
if _, err := rpcclient.Call("publish_event", seafileServerChannelStats, buf); err != nil { data["repo_id"] = rData.repoID
data["bytes"] = rData.bytes
jsonData, err := json.Marshal(data)
if err != nil {
log.Warnf("Failed to publish event: %v", err)
return
}
if _, err := rpcclient.Call("publish_event", seafileServerChannelStats, string(jsonData)); err != nil {
log.Warnf("Failed to publish event: %v", err) log.Warnf("Failed to publish event: %v", err)
} }
} }
@@ -1374,17 +1381,34 @@ func publishRepoEvent(rData *repoEventData) {
if rData.path == "" { if rData.path == "" {
rData.path = "/" rData.path = "/"
} }
buf := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%s", data := make(map[string]interface{})
rData.eType, rData.user, rData.ip, data["msg_type"] = rData.eType
rData.clientName, rData.repoID, rData.path) data["user_name"] = rData.user
if _, err := rpcclient.Call("publish_event", seafileServerChannelEvent, buf); err != nil { data["ip"] = rData.ip
data["user_agent"] = rData.clientName
data["repo_id"] = rData.repoID
data["file_path"] = rData.path
jsonData, err := json.Marshal(data)
if err != nil {
log.Warnf("Failed to publish event: %v", err)
return
}
if _, err := rpcclient.Call("publish_event", seafileServerChannelEvent, string(jsonData)); err != nil {
log.Warnf("Failed to publish event: %v", err) log.Warnf("Failed to publish event: %v", err)
} }
} }
func publishUpdateEvent(repoID string, commitID string) { func publishUpdateEvent(repoID string, commitID string) {
buf := fmt.Sprintf("repo-update\t%s\t%s", repoID, commitID) data := make(map[string]interface{})
if _, err := rpcclient.Call("publish_event", seafileServerChannelEvent, buf); err != nil { data["msg_type"] = "repo-update"
data["repo_id"] = repoID
data["commit_id"] = commitID
jsonData, err := json.Marshal(data)
if err != nil {
log.Warnf("Failed to publish event: %v", err)
return
}
if _, err := rpcclient.Call("publish_event", seafileServerChannelEvent, string(jsonData)); err != nil {
log.Warnf("Failed to publish event: %v", err) log.Warnf("Failed to publish event: %v", err)
} }
} }

View File

@@ -541,28 +541,49 @@ free_stats_event_data (StatsEventData *data)
static void static void
publish_repo_event (RepoEventData *rdata) publish_repo_event (RepoEventData *rdata)
{ {
GString *buf = g_string_new (NULL); json_t *msg = json_object ();
g_string_printf (buf, "%s\t%s\t%s\t%s\t%s\t%s", char *msg_str = NULL;
rdata->etype, rdata->user, rdata->ip,
rdata->client_name ? rdata->client_name : "",
rdata->repo_id, rdata->path ? rdata->path : "/");
seaf_mq_manager_publish_event (seaf->mq_mgr, SEAFILE_SERVER_CHANNEL_EVENT, buf->str); json_object_set_new (msg, "msg_type", json_string(rdata->etype));
json_object_set_new (msg, "user_name", json_string(rdata->user));
json_object_set_new (msg, "ip", json_string(rdata->ip));
if (rdata->client_name) {
json_object_set_new (msg, "user_agent", json_string(rdata->client_name));
} else {
json_object_set_new (msg, "user_agent", json_string(""));
}
json_object_set_new (msg, "repo_id", json_string(rdata->repo_id));
if (rdata->path) {
json_object_set_new (msg, "file_path", json_string(rdata->path));
} else {
json_object_set_new (msg, "file_path", json_string("/"));
}
g_string_free (buf, TRUE); msg_str = json_dumps (msg, JSON_PRESERVE_ORDER);
seaf_mq_manager_publish_event (seaf->mq_mgr, SEAFILE_SERVER_CHANNEL_EVENT, msg_str);
g_free (msg_str);
json_decref (msg);
} }
static void static void
publish_stats_event (StatsEventData *rdata) publish_stats_event (StatsEventData *rdata)
{ {
GString *buf = g_string_new (NULL); json_t *msg = json_object ();
g_string_printf (buf, "%s\t%s\t%s\t%"G_GUINT64_FORMAT, char *msg_str = NULL;
rdata->etype, rdata->user,
rdata->repo_id, rdata->bytes);
seaf_mq_manager_publish_event (seaf->mq_mgr, SEAFILE_SERVER_CHANNEL_STATS, buf->str); json_object_set_new (msg, "msg_type", json_string(rdata->etype));
json_object_set_new (msg, "user_name", json_string(rdata->user));
json_object_set_new (msg, "repo_id", json_string(rdata->repo_id));
json_object_set_new (msg, "bytes", json_integer(rdata->bytes));
g_string_free (buf, TRUE); msg_str = json_dumps (msg, JSON_PRESERVE_ORDER);
seaf_mq_manager_publish_event (seaf->mq_mgr, SEAFILE_SERVER_CHANNEL_STATS, msg_str);
g_free (msg_str);
json_decref (msg);
} }
static void static void