1
0
mirror of https://github.com/haiwen/seafile-server.git synced 2025-04-27 19:15:07 +00:00
seafile-server/common/redis-cache.c
feiniks 05dd35b4c9
Notify repo size change to redis (#750)
Co-authored-by: Heran Yang <heran.yang@seafile.com>
2025-04-02 17:11:24 +08:00

444 lines
12 KiB
C

/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include "common.h"
#include <hiredis.h>
#include "redis-cache.h"
#define DEBUG_FLAG SEAFILE_DEBUG_OTHER
#include "log.h"
struct _RedisConnectionPool {
char *host;
int port;
GPtrArray *connections;
pthread_mutex_t lock;
int max_connections;
};
typedef struct _RedisConnectionPool RedisConnectionPool;
struct _RedisConnection {
gboolean is_available;
redisContext *ac;
gint64 ctime; /* Used to clean up unused connection. */
gboolean release; /* If TRUE, the connection will be released. */
};
typedef struct _RedisConnection RedisConnection;
typedef struct RedisPriv {
RedisConnectionPool *redis_pool;
char *passwd;
} RedisPriv;
static int
redis_auth (RedisConnection *conn, const char *passwd)
{
redisReply *reply;
int ret = 0;
if (!passwd) {
return 0;
}
reply = redisCommand(conn->ac, "AUTH %s", passwd);
if (!reply) {
seaf_warning ("Failed to auth redis server.\n");
ret = -1;
goto out;
}
if (reply->type != REDIS_REPLY_STATUS ||
g_strcmp0 (reply->str, "OK") != 0) {
if (reply->type == REDIS_REPLY_ERROR) {
seaf_warning ("Failed to auth redis: %s.\n", reply->str);
}
ret = -1;
goto out;
}
out:
freeReplyObject (reply);
return ret;
}
static RedisConnection *
redis_connection_new (const char *host, const char *passwd, int port)
{
RedisConnection *conn = g_new0 (RedisConnection, 1);
conn->ac = redisConnect(host, port);
if (!conn->ac || conn->ac->err) {
if (conn->ac) {
seaf_warning ("Failed to connect to redis : %s\n", conn->ac->errstr);
redisFree (conn->ac);
} else {
seaf_warning ("Can't allocate redis context\n");
}
g_free (conn);
return NULL;
}
if (redis_auth (conn, passwd) < 0) {
redisFree (conn->ac);
g_free (conn);
return NULL;
}
conn->ctime = (gint64)time(NULL);
return conn;
}
static void
redis_connection_free (RedisConnection *conn)
{
if (!conn)
return;
if (conn->ac)
redisFree(conn->ac);
g_free (conn);
}
static RedisConnectionPool *
redis_connection_pool_new (const char *host, int port, int max_connections)
{
RedisConnectionPool *pool = g_new0 (RedisConnectionPool, 1);
pool->host = g_strdup(host);
pool->port = port;
pool->connections = g_ptr_array_sized_new (max_connections);
pool->max_connections = max_connections;
pthread_mutex_init (&pool->lock, NULL);
return pool;
}
static RedisConnection *
redis_connection_pool_get_connection (RedisConnectionPool *pool, const char *passwd)
{
RedisConnection *conn = NULL;
if (pool->max_connections == 0) {
conn = redis_connection_new (pool->host, passwd, pool->port);
return conn;
}
pthread_mutex_lock (&pool->lock);
guint i, size = pool->connections->len;
for (i = 0; i < size; ++i) {
conn = g_ptr_array_index (pool->connections, i);
if (!conn->is_available) {
continue;
}
conn->is_available = FALSE;
goto out;
}
conn = NULL;
if (size < pool->max_connections) {
conn = redis_connection_new (pool->host, passwd, pool->port);
if (conn) {
conn->is_available = FALSE;
g_ptr_array_add (pool->connections, conn);
}
} else {
seaf_warning ("The number of redis connections exceeds the limit. The maximum connections is %d.\n", pool->max_connections);
}
out:
pthread_mutex_unlock (&pool->lock);
return conn;
}
static void
redis_connection_pool_return_connection (RedisConnectionPool *pool, RedisConnection *conn)
{
if (!conn)
return;
if (pool->max_connections == 0) {
redis_connection_free (conn);
return;
}
if (conn->release) {
pthread_mutex_lock (&pool->lock);
g_ptr_array_remove (pool->connections, conn);
pthread_mutex_unlock (&pool->lock);
redis_connection_free (conn);
return;
}
pthread_mutex_lock (&pool->lock);
conn->is_available = TRUE;
pthread_mutex_unlock (&pool->lock);
}
void *
redis_cache_get_object (ObjCache *cache, const char *obj_id, size_t *len)
{
RedisConnection *conn;
char *object = NULL;
redisReply *reply;
RedisPriv *priv = cache->priv;
RedisConnectionPool *pool = priv->redis_pool;
conn = redis_connection_pool_get_connection (pool, priv->passwd);
if (!conn) {
seaf_warning ("Failed to get redis connection to host %s.\n", cache->host);
return NULL;
}
reply = redisCommand(conn->ac, "GET %s", obj_id);
if (!reply) {
seaf_warning ("Failed to get object %s from redis cache.\n", obj_id);
conn->release = TRUE;
goto out;
}
if (reply->type != REDIS_REPLY_STRING) {
if (reply->type == REDIS_REPLY_ERROR) {
conn->release = TRUE;
seaf_warning ("Failed to get %s from redis cache: %s.\n",
obj_id, reply->str);
}
goto out;
}
*len = reply->len;
object = g_memdup (reply->str, reply->len);
out:
freeReplyObject(reply);
redis_connection_pool_return_connection (pool, conn);
return object;
}
int
redis_cache_set_object (ObjCache *cache,
const char *obj_id,
const void *object,
int len,
int expiry)
{
RedisConnection *conn;
redisReply *reply;
int ret = 0;
RedisPriv *priv = cache->priv;
RedisConnectionPool *pool = priv->redis_pool;
conn = redis_connection_pool_get_connection (pool, priv->passwd);
if (!conn) {
seaf_warning ("Failed to get redis connection to host %s.\n", cache->host);
return -1;
}
if (expiry <= 0)
expiry = cache->mc_expiry;
reply = redisCommand(conn->ac, "SET %s %b EX %d", obj_id, object, len, expiry);
if (!reply) {
seaf_warning ("Failed to set object %s to redis cache.\n", obj_id);
ret = -1;
conn->release = TRUE;
goto out;
}
if (reply->type != REDIS_REPLY_STATUS ||
g_strcmp0 (reply->str, "OK") != 0) {
if (reply->type == REDIS_REPLY_ERROR) {
conn->release = TRUE;
seaf_warning ("Failed to set %s to redis: %s.\n",
obj_id, reply->str);
}
ret = -1;
}
out:
freeReplyObject(reply);
redis_connection_pool_return_connection (pool, conn);
return ret;
}
gboolean
redis_cache_test_object (ObjCache *cache, const char *obj_id)
{
RedisConnection *conn;
redisReply *reply;
gboolean ret = FALSE;
RedisPriv *priv = cache->priv;
RedisConnectionPool *pool = priv->redis_pool;
conn = redis_connection_pool_get_connection (pool, priv->passwd);
if (!conn) {
seaf_warning ("Failed to get redis connection to host %s.\n", cache->host);
return ret;
}
reply = redisCommand(conn->ac, "EXISTS %s", obj_id);
if (!reply) {
seaf_warning ("Failed to test object %s from redis cache.\n", obj_id);
conn->release = TRUE;
goto out;
}
if (reply->type != REDIS_REPLY_INTEGER ||
reply->integer != 1) {
if (reply->type == REDIS_REPLY_ERROR) {
conn->release = TRUE;
seaf_warning ("Failed to test %s from redis: %s.\n",
obj_id, reply->str);
}
goto out;
}
ret = TRUE;
out:
freeReplyObject(reply);
redis_connection_pool_return_connection (pool, conn);
return ret;
}
int
redis_cache_delete_object (ObjCache *cache, const char *obj_id)
{
RedisConnection *conn;
redisReply *reply;
int ret = 0;
RedisPriv *priv = cache->priv;
RedisConnectionPool *pool = priv->redis_pool;
conn = redis_connection_pool_get_connection (pool, priv->passwd);
if (!conn) {
seaf_warning ("Failed to get redis connection to host %s.\n", cache->host);
return -1;
}
reply = redisCommand(conn->ac, "DEL %s", obj_id);
if (!reply) {
seaf_warning ("Failed to delete object %s from redis cache.\n", obj_id);
ret = -1;
conn->release = TRUE;
goto out;
}
if (reply->type != REDIS_REPLY_INTEGER ||
reply->integer != 1) {
if (reply->type == REDIS_REPLY_ERROR) {
conn->release = TRUE;
seaf_warning ("Failed to del %s from redis: %s.\n",
obj_id, reply->str);
}
ret = -1;
}
out:
freeReplyObject(reply);
redis_connection_pool_return_connection (pool, conn);
return ret;
}
int
redis_cache_publish (ObjCache *cache, const char *channel, const char *msg)
{
RedisConnection *conn;
redisReply *reply;
int ret = 0;
RedisPriv *priv = cache->priv;
RedisConnectionPool *pool = priv->redis_pool;
conn = redis_connection_pool_get_connection (pool, priv->passwd);
if (!conn) {
seaf_warning ("Failed to get redis connection to host %s.\n", cache->host);
return -1;
}
reply = redisCommand(conn->ac, "PUBLISH %s %s", channel, msg);
if (!reply) {
seaf_warning ("Failed to publish message to redis channel %s.\n", channel);
ret = -1;
conn->release = TRUE;
goto out;
}
if (reply->type != REDIS_REPLY_INTEGER ||
reply->integer < 0) {
if (reply->type == REDIS_REPLY_ERROR) {
conn->release = TRUE;
seaf_warning ("Failed to publish message to redis channel %s.\n", channel);
}
ret = -1;
}
out:
freeReplyObject(reply);
redis_connection_pool_return_connection (pool, conn);
return ret;
}
int
redis_cache_push (ObjCache *cache, const char *list, const char *msg)
{
RedisConnection *conn;
redisReply *reply;
int ret = 0;
RedisPriv *priv = cache->priv;
RedisConnectionPool *pool = priv->redis_pool;
conn = redis_connection_pool_get_connection (pool, priv->passwd);
if (!conn) {
seaf_warning ("Failed to get redis connection to host %s.\n", cache->host);
return -1;
}
reply = redisCommand(conn->ac, "LPUSH %s %s", list, msg);
if (!reply) {
seaf_warning ("Failed to push message to redis list %s.\n", list);
ret = -1;
conn->release = TRUE;
goto out;
}
if (reply->type != REDIS_REPLY_INTEGER ||
reply->integer < 0) {
if (reply->type == REDIS_REPLY_ERROR) {
conn->release = TRUE;
seaf_warning ("Failed to push message to redis list %s.\n", list);
}
ret = -1;
}
out:
freeReplyObject(reply);
redis_connection_pool_return_connection (pool, conn);
return ret;
}
ObjCache *
redis_cache_new (const char *host, const char *passwd,
int port, int redis_expiry,
int max_connections)
{
ObjCache *cache = g_new0 (ObjCache, 1);
RedisPriv *priv = g_new0 (RedisPriv, 1);
priv->redis_pool = redis_connection_pool_new (host, port, max_connections);
cache->priv = priv;
cache->host = g_strdup (host);
priv->passwd = g_strdup (passwd);
cache->port = port;
cache->mc_expiry = redis_expiry;
cache->cache_type = TYPE_REDIS;
cache->get_object = redis_cache_get_object;
cache->set_object = redis_cache_set_object;
cache->test_object = redis_cache_test_object;
cache->delete_object = redis_cache_delete_object;
cache->publish = redis_cache_publish;
cache->push = redis_cache_push;
return cache;
}