From 1aefa7a85f2ece8b4f6e019f37da3b286a1527d4 Mon Sep 17 00:00:00 2001 From: wangruidong <940853815@qq.com> Date: Tue, 22 Apr 2025 15:51:59 +0800 Subject: [PATCH 1/2] perf: update max_connections to use configurable setting in Redis configuration --- apps/jumpserver/conf.py | 1 + apps/jumpserver/settings/base.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/jumpserver/conf.py b/apps/jumpserver/conf.py index 597ca3872..654b6fcef 100644 --- a/apps/jumpserver/conf.py +++ b/apps/jumpserver/conf.py @@ -204,6 +204,7 @@ class Config(dict): 'REDIS_SSL_CERT': None, 'REDIS_SSL_CA': None, 'REDIS_SSL_REQUIRED': 'none', + 'REDIS_MAX_CONNECTIONS': 100, # Redis Sentinel 'REDIS_SENTINEL_HOSTS': '', 'REDIS_SENTINEL_PASSWORD': '', diff --git a/apps/jumpserver/settings/base.py b/apps/jumpserver/settings/base.py index 00bb729c4..a336536b2 100644 --- a/apps/jumpserver/settings/base.py +++ b/apps/jumpserver/settings/base.py @@ -364,7 +364,7 @@ REDIS_OPTIONS = { "health_check_interval": 30 }, "CONNECTION_POOL_KWARGS": { - 'max_connections': 100, + 'max_connections': CONFIG.REDIS_MAX_CONNECTIONS, } } if REDIS_USE_SSL: From 51badfebe771c434e59bd9ca0abc938e3a1ecd6c Mon Sep 17 00:00:00 2001 From: wangruidong <940853815@qq.com> Date: Wed, 23 Apr 2025 18:00:09 +0800 Subject: [PATCH 2/2] perf: Optimize redis connection number --- apps/common/utils/connection.py | 56 ++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/apps/common/utils/connection.py b/apps/common/utils/connection.py index 73d845ad8..ca58f8292 100644 --- a/apps/common/utils/connection.py +++ b/apps/common/utils/connection.py @@ -4,7 +4,6 @@ import time import redis from django.core.cache import cache -from redis.client import PubSub from common.db.utils import safe_db_connection from common.utils import get_logger @@ -19,21 +18,29 @@ def get_redis_client(db=0): class RedisPubSub: + handlers = {} + lock = threading.Lock() + redis = get_redis_client() + pubsub = redis.pubsub() + def __init__(self, ch, db=10): self.ch = ch self.db = db - self.redis = get_redis_client(db) def subscribe(self, _next, error=None, complete=None): - ps = self.redis.pubsub() - ps.subscribe(self.ch) - sub = Subscription(self, ps) - sub.keep_handle_msg(_next, error, complete) + with self.lock: + if self.ch not in self.handlers: + self.pubsub.subscribe(self.ch) + self.handlers[self.ch] = _next + + sub = Subscription(self, self.handlers) + sub.keep_handle_msg(self.handlers, error, complete) return sub - def resubscribe(self, _next, error=None, complete=None): - self.redis = get_redis_client(self.db) - self.subscribe(_next, error, complete) + @classmethod + def resubscribe(cls, handles, error=None, complete=None): + for ch, handler in handles.items(): + cls(ch).subscribe(handler, error, complete) def publish(self, data): data_json = json.dumps(data) @@ -42,12 +49,14 @@ class RedisPubSub: class Subscription: - def __init__(self, pb: RedisPubSub, sub: PubSub): + running = False + + def __init__(self, pb: RedisPubSub, handlers: dict): self.pb = pb self.ch = pb.ch - self.sub = sub self.unsubscribed = False - logger.info(f"Subscribed to channel: {sub}") + self.sub = self.pb.pubsub + self.handlers = handlers def _handle_msg(self, _next, error, complete): """ @@ -69,13 +78,15 @@ class Subscription: for msg in msgs: if msg["type"] != "message": continue + channel = msg['channel'].decode() + handler = self.handlers.get(channel) item = None try: item_json = msg['data'].decode() item = json.loads(item_json) with safe_db_connection(): - _next(item) + handler(item) except Exception as e: error(msg, item) logger.error('Subscribe handler handle msg error: {}'.format(e)) @@ -84,7 +95,7 @@ class Subscription: logger.debug('Subscription unsubscribed') else: logger.error('Consume msg error: {}'.format(e)) - self.retry(_next, error, complete) + self.retry(self.handlers, error, complete) return try: @@ -98,28 +109,29 @@ class Subscription: except Exception as e: logger.error("Redis observer close error: {}".format(e)) - def keep_handle_msg(self, _next, error, complete): - t = threading.Thread(target=self._handle_msg, args=(_next, error, complete)) - t.daemon = True - t.start() - return t + def keep_handle_msg(self, handlers, error, complete): + if not self.running: + self.running = True + t = threading.Thread(target=self._handle_msg, args=(handlers, error, complete)) + t.daemon = True + t.start() def unsubscribe(self): self.unsubscribed = True logger.info(f"Unsubscribed from channel: {self.sub}") try: - self.sub.close() + self.pb.pubsub.close() except Exception as e: logger.warning(f'Unsubscribe msg error: {e}') - def retry(self, _next, error, complete): + def retry(self, handlers, error, complete): logger.info('Retry subscribe channel: {}'.format(self.ch)) times = 0 while True: try: self.unsubscribe() - self.pb.resubscribe(_next, error, complete) + self.pb.resubscribe(handlers, error, complete) break except Exception as e: logger.error('Retry #{} {} subscribe channel error: {}'.format(times, self.ch, e))