From 55fae1667d2a8f9f02ecddbe590ae63529937ae7 Mon Sep 17 00:00:00 2001 From: ibuler Date: Mon, 7 Feb 2022 19:01:13 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20redis=20=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E8=BF=87=E5=A4=9A=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/utils/connection.py | 103 +++++++++++++++++++++----------- apps/notifications/ws.py | 12 ++-- 2 files changed, 77 insertions(+), 38 deletions(-) diff --git a/apps/common/utils/connection.py b/apps/common/utils/connection.py index b7d1f165a..bf1462d78 100644 --- a/apps/common/utils/connection.py +++ b/apps/common/utils/connection.py @@ -1,4 +1,5 @@ import json +import threading import redis from django.conf import settings @@ -19,49 +20,83 @@ def get_redis_client(db): return rc +class Subscription: + def __init__(self, ch, sub, ): + self.ch = ch + self.sub = sub + + def _handle_msg(self, _next, error, complete): + """ + handle arg is the pub published + + :param _next: next msg handler + :param error: error msg handler + :param complete: complete msg handler + :return: + """ + msgs = self.sub.listen() + + if error is None: + error = lambda m, i: None + + if complete is None: + complete = lambda: None + + try: + for msg in msgs: + if msg["type"] != "message": + continue + item = None + try: + item_json = msg['data'].decode() + item = json.loads(item_json) + + with safe_db_connection(): + _next(item) + except Exception as e: + error(msg, item) + logger.error('Subscribe handler handle msg error: ', e) + except Exception as e: + logger.error('Consume msg error: ', e) + + try: + complete() + except Exception as e: + logger.error('Complete subscribe error: {}'.format(e)) + pass + + try: + self.unsubscribe() + except Exception as e: + logger.error("Redis observer close error: {}".format(e)) + + def keep_handle_msg(self, _next, error, complete): + t = threading.Thread(target=self._handle_msg, args=(_next, error, complete)) + t.daemon = True + t.start() + return t + + def unsubscribe(self): + try: + self.sub.close() + except Exception as e: + logger.error('Unsubscribe msg error: {}'.format(e)) + + class RedisPubSub: def __init__(self, ch, db=10): self.ch = ch self.redis = get_redis_client(db) - def subscribe(self): + def subscribe(self, _next, error=None, complete=None): ps = self.redis.pubsub() - ps.subscribe(self.ch) - return ps + sub = Subscription(self.ch, ps) + sub.keep_handle_msg(_next, error, complete) + return sub def publish(self, data): data_json = json.dumps(data) self.redis.publish(self.ch, data_json) return True - def keep_handle_msg(self, handle): - """ - handle arg is the pub published - - :param handle: lambda item: do_something - :return: - """ - sub = self.subscribe() - msgs = sub.listen() - - try: - for msg in msgs: - if msg["type"] != "message": - continue - try: - item_json = msg['data'].decode() - item = json.loads(item_json) - - with safe_db_connection(): - handle(item) - except Exception as e: - logger.error('Subscribe handler handle msg error: ', e) - - except Exception as e: - logger.error('Consume msg error: ', e) - - try: - sub.close() - except Exception as e: - logger.error("Redis observer close error: ", e) - + diff --git a/apps/notifications/ws.py b/apps/notifications/ws.py index d1032b237..e3d48f79d 100644 --- a/apps/notifications/ws.py +++ b/apps/notifications/ws.py @@ -12,14 +12,13 @@ logger = get_logger(__name__) class SiteMsgWebsocket(JsonWebsocketConsumer): refresh_every_seconds = 10 + sub = None def connect(self): user = self.scope["user"] if user.is_authenticated: self.accept() - - thread = threading.Thread(target=self.watch_recv_new_site_msg) - thread.start() + self.sub = self.watch_recv_new_site_msg() else: self.close() @@ -56,4 +55,9 @@ class SiteMsgWebsocket(JsonWebsocketConsumer): if user_id in users: ws.send_unread_msg_count() - new_site_msg_chan.keep_handle_msg(handle_new_site_msg_recv) + return new_site_msg_chan.subscribe(handle_new_site_msg_recv) + + def disconnect(self, code): + if self.sub: + self.sub.unsubscribe() +