diff --git a/common/obj-cache.c b/common/obj-cache.c index 679b975..503e77f 100644 --- a/common/obj-cache.c +++ b/common/obj-cache.c @@ -187,3 +187,11 @@ objcache_publish (ObjCache *cache, const char *channel, const char *msg) ret = cache->publish (cache, channel, msg); return ret; } + +int +objcache_push (ObjCache *cache, const char *list, const char *msg) +{ + int ret; + ret = cache->push (cache, list, msg); + return ret; +} diff --git a/common/obj-cache.h b/common/obj-cache.h index 4bf35b8..42c2af7 100644 --- a/common/obj-cache.h +++ b/common/obj-cache.h @@ -30,6 +30,10 @@ struct ObjCache { const char *channel, const char *msg); + int (*push) (ObjCache *cache, + const char *list, + const char *msg); + int mc_expiry; char *host; int port; @@ -69,4 +73,7 @@ objcache_delete_object_existence (struct ObjCache *cache, const char *obj_id, co int objcache_publish (ObjCache *cache, const char *channel, const char *msg); +int +objcache_push (ObjCache *cache, const char *list, const char *msg); + #endif diff --git a/common/redis-cache.c b/common/redis-cache.c index 80eb000..6984a6a 100644 --- a/common/redis-cache.c +++ b/common/redis-cache.c @@ -355,7 +355,7 @@ redis_cache_publish (ObjCache *cache, const char *channel, const char *msg) reply = redisCommand(conn->ac, "PUBLISH %s %s", channel, msg); if (!reply) { - seaf_warning ("Failed to publish metrics to redis channel.\n"); + seaf_warning ("Failed to publish message to redis channel %s.\n", channel); ret = -1; conn->release = TRUE; goto out; @@ -364,7 +364,45 @@ redis_cache_publish (ObjCache *cache, const char *channel, const char *msg) reply->integer < 0) { if (reply->type == REDIS_REPLY_ERROR) { conn->release = TRUE; - seaf_warning ("Failed to publish metrics to redis channel.\n"); + seaf_warning ("Failed to publish message to redis channel %s.\n", channel); + } + ret = -1; + } + +out: + freeReplyObject(reply); + redis_connection_pool_return_connection (pool, conn); + + return ret; +} + +int +redis_cache_push (ObjCache *cache, const char *list, const char *msg) +{ + RedisConnection *conn; + redisReply *reply; + int ret = 0; + RedisPriv *priv = cache->priv; + RedisConnectionPool *pool = priv->redis_pool; + + conn = redis_connection_pool_get_connection (pool, priv->passwd); + if (!conn) { + seaf_warning ("Failed to get redis connection to host %s.\n", cache->host); + return -1; + } + + reply = redisCommand(conn->ac, "LPUSH %s %s", list, msg); + if (!reply) { + seaf_warning ("Failed to push message to redis list %s.\n", list); + ret = -1; + conn->release = TRUE; + goto out; + } + if (reply->type != REDIS_REPLY_INTEGER || + reply->integer < 0) { + if (reply->type == REDIS_REPLY_ERROR) { + conn->release = TRUE; + seaf_warning ("Failed to push message to redis list %s.\n", list); } ret = -1; } @@ -399,6 +437,7 @@ redis_cache_new (const char *host, const char *passwd, cache->test_object = redis_cache_test_object; cache->delete_object = redis_cache_delete_object; cache->publish = redis_cache_publish; + cache->push = redis_cache_push; return cache; } diff --git a/fileserver/size_sched.go b/fileserver/size_sched.go index 1b47ef4..23ba6d7 100644 --- a/fileserver/size_sched.go +++ b/fileserver/size_sched.go @@ -2,23 +2,32 @@ package main import ( "context" + "encoding/json" "fmt" "path/filepath" + "time" "gopkg.in/ini.v1" "database/sql" + "github.com/go-redis/redis/v8" "github.com/haiwen/seafile-server/fileserver/commitmgr" "github.com/haiwen/seafile-server/fileserver/diff" "github.com/haiwen/seafile-server/fileserver/fsmgr" "github.com/haiwen/seafile-server/fileserver/option" "github.com/haiwen/seafile-server/fileserver/repomgr" "github.com/haiwen/seafile-server/fileserver/workerpool" + log "github.com/sirupsen/logrus" ) +const ( + RepoSizeList = "repo_size_task" +) + var updateSizePool *workerpool.WorkPool +var redisClient *redis.Client func sizeSchedulerInit() { var n int = 1 @@ -41,6 +50,16 @@ func sizeSchedulerInit() { } } updateSizePool = workerpool.CreateWorkerPool(computeRepoSize, n) + + server := fmt.Sprintf("%s:%d", option.RedisHost, option.RedisPort) + opt := &redis.Options{ + Addr: server, + Password: option.RedisPasswd, + } + opt.PoolSize = n + + redisClient = redis.NewClient(opt) + } func computeRepoSize(args ...interface{}) error { @@ -119,6 +138,11 @@ func computeRepoSize(args ...interface{}) error { return err } + err = notifyRepoSizeChange(repo.StoreID) + if err != nil { + log.Warnf("Failed to notify repo size change for repo %s: %v", repoID, err) + } + return nil } @@ -189,6 +213,33 @@ func setRepoSizeAndFileCount(repoID, newHeadID string, size, fileCount int64) er return nil } +type RepoSizeChangeTask struct { + RepoID string `json:"repo_id"` +} + +func notifyRepoSizeChange(repoID string) error { + if !option.HasRedisOptions { + return nil + } + + task := &RepoSizeChangeTask{RepoID: repoID} + + data, err := json.Marshal(task) + if err != nil { + return fmt.Errorf("failed to encode repo size change task: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = redisClient.LPush(ctx, RepoSizeList, data).Err() + if err != nil { + return fmt.Errorf("failed to push message to redis list %s: %w", RepoSizeList, err) + } + + return nil +} + // RepoInfo contains repo information. type RepoInfo struct { HeadID string diff --git a/server/size-sched.c b/server/size-sched.c index 6480743..547e3fe 100644 --- a/server/size-sched.c +++ b/server/size-sched.c @@ -7,10 +7,14 @@ #include "diff-simple.h" #define DEBUG_FLAG SEAFILE_DEBUG_OTHER #include "log.h" +#include "obj-cache.h" + +#define REPO_SIZE_LIST "repo_size_task" typedef struct SizeSchedulerPriv { pthread_t thread_id; GThreadPool *compute_repo_size_thread_pool; + struct ObjCache *cache; } SizeSchedulerPriv; typedef struct RepoSizeJob { @@ -49,6 +53,8 @@ size_scheduler_new (SeafileSession *session) return NULL; } + sched->priv->cache = session->obj_cache; + sched->seaf = session; sched_thread_num = g_key_file_get_integer (session->config, "scheduler", "size_sched_thread_num", NULL); @@ -282,6 +288,30 @@ get_old_repo_info_from_db (SeafDB *db, const char *repo_id, gboolean *is_db_err) } +static void +notify_repo_size_change (SizeScheduler *sched, const char *repo_id) +{ + ObjCache *cache = sched->priv->cache; + if (!cache) { + return; + } + + json_t *obj = NULL; + char *msg = NULL; + + obj = json_object (); + + json_object_set_new (obj, "repo_id", json_string(repo_id)); + + msg = json_dumps (obj, JSON_COMPACT); + + objcache_push (cache, REPO_SIZE_LIST, msg); + +out: + g_free (msg); + json_decref (obj); +} + static void* compute_repo_size (void *vjob) { @@ -378,6 +408,8 @@ compute_repo_size (void *vjob) goto out; } + notify_repo_size_change (sched, repo->store_id); + out: seaf_repo_unref (repo); seaf_commit_unref (head);