This commit is contained in:
fit2bot 2025-06-17 22:16:16 +08:00 committed by GitHub
commit b97ce9c998
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 36 additions and 23 deletions

View File

@ -4,7 +4,6 @@ import time
import redis import redis
from django.core.cache import cache from django.core.cache import cache
from redis.client import PubSub
from common.db.utils import safe_db_connection from common.db.utils import safe_db_connection
from common.utils import get_logger from common.utils import get_logger
@ -19,21 +18,29 @@ def get_redis_client(db=0):
class RedisPubSub: class RedisPubSub:
handlers = {}
lock = threading.Lock()
redis = get_redis_client()
pubsub = redis.pubsub()
def __init__(self, ch, db=10): def __init__(self, ch, db=10):
self.ch = ch self.ch = ch
self.db = db self.db = db
self.redis = get_redis_client(db)
def subscribe(self, _next, error=None, complete=None): def subscribe(self, _next, error=None, complete=None):
ps = self.redis.pubsub() with self.lock:
ps.subscribe(self.ch) if self.ch not in self.handlers:
sub = Subscription(self, ps) self.pubsub.subscribe(self.ch)
sub.keep_handle_msg(_next, error, complete) self.handlers[self.ch] = _next
sub = Subscription(self, self.handlers)
sub.keep_handle_msg(self.handlers, error, complete)
return sub return sub
def resubscribe(self, _next, error=None, complete=None): @classmethod
self.redis = get_redis_client(self.db) def resubscribe(cls, handles, error=None, complete=None):
self.subscribe(_next, error, complete) for ch, handler in handles.items():
cls(ch).subscribe(handler, error, complete)
def publish(self, data): def publish(self, data):
data_json = json.dumps(data) data_json = json.dumps(data)
@ -42,12 +49,14 @@ class RedisPubSub:
class Subscription: class Subscription:
def __init__(self, pb: RedisPubSub, sub: PubSub): running = False
def __init__(self, pb: RedisPubSub, handlers: dict):
self.pb = pb self.pb = pb
self.ch = pb.ch self.ch = pb.ch
self.sub = sub
self.unsubscribed = False self.unsubscribed = False
logger.info(f"Subscribed to channel: {sub}") self.sub = self.pb.pubsub
self.handlers = handlers
def _handle_msg(self, _next, error, complete): def _handle_msg(self, _next, error, complete):
""" """
@ -69,13 +78,15 @@ class Subscription:
for msg in msgs: for msg in msgs:
if msg["type"] != "message": if msg["type"] != "message":
continue continue
channel = msg['channel'].decode()
handler = self.handlers.get(channel)
item = None item = None
try: try:
item_json = msg['data'].decode() item_json = msg['data'].decode()
item = json.loads(item_json) item = json.loads(item_json)
with safe_db_connection(): with safe_db_connection():
_next(item) handler(item)
except Exception as e: except Exception as e:
error(msg, item) error(msg, item)
logger.error('Subscribe handler handle msg error: {}'.format(e)) logger.error('Subscribe handler handle msg error: {}'.format(e))
@ -84,7 +95,7 @@ class Subscription:
logger.debug('Subscription unsubscribed') logger.debug('Subscription unsubscribed')
else: else:
logger.error('Consume msg error: {}'.format(e)) logger.error('Consume msg error: {}'.format(e))
self.retry(_next, error, complete) self.retry(self.handlers, error, complete)
return return
try: try:
@ -98,28 +109,29 @@ class Subscription:
except Exception as e: except Exception as e:
logger.error("Redis observer close error: {}".format(e)) logger.error("Redis observer close error: {}".format(e))
def keep_handle_msg(self, _next, error, complete): def keep_handle_msg(self, handlers, error, complete):
t = threading.Thread(target=self._handle_msg, args=(_next, error, complete)) if not self.running:
t.daemon = True self.running = True
t.start() t = threading.Thread(target=self._handle_msg, args=(handlers, error, complete))
return t t.daemon = True
t.start()
def unsubscribe(self): def unsubscribe(self):
self.unsubscribed = True self.unsubscribed = True
logger.info(f"Unsubscribed from channel: {self.sub}") logger.info(f"Unsubscribed from channel: {self.sub}")
try: try:
self.sub.close() self.pb.pubsub.close()
except Exception as e: except Exception as e:
logger.warning(f'Unsubscribe msg error: {e}') logger.warning(f'Unsubscribe msg error: {e}')
def retry(self, _next, error, complete): def retry(self, handlers, error, complete):
logger.info('Retry subscribe channel: {}'.format(self.ch)) logger.info('Retry subscribe channel: {}'.format(self.ch))
times = 0 times = 0
while True: while True:
try: try:
self.unsubscribe() self.unsubscribe()
self.pb.resubscribe(_next, error, complete) self.pb.resubscribe(handlers, error, complete)
break break
except Exception as e: except Exception as e:
logger.error('Retry #{} {} subscribe channel error: {}'.format(times, self.ch, e)) logger.error('Retry #{} {} subscribe channel error: {}'.format(times, self.ch, e))

View File

@ -204,6 +204,7 @@ class Config(dict):
'REDIS_SSL_CERT': None, 'REDIS_SSL_CERT': None,
'REDIS_SSL_CA': None, 'REDIS_SSL_CA': None,
'REDIS_SSL_REQUIRED': 'none', 'REDIS_SSL_REQUIRED': 'none',
'REDIS_MAX_CONNECTIONS': 100,
# Redis Sentinel # Redis Sentinel
'REDIS_SENTINEL_HOSTS': '', 'REDIS_SENTINEL_HOSTS': '',
'REDIS_SENTINEL_PASSWORD': '', 'REDIS_SENTINEL_PASSWORD': '',

View File

@ -365,7 +365,7 @@ REDIS_OPTIONS = {
"health_check_interval": 30 "health_check_interval": 30
}, },
"CONNECTION_POOL_KWARGS": { "CONNECTION_POOL_KWARGS": {
'max_connections': 100, 'max_connections': CONFIG.REDIS_MAX_CONNECTIONS,
} }
} }
if REDIS_USE_SSL: if REDIS_USE_SSL: