From 2869338e2cacadbfcc4bfda5a82e22d2fd696e5b Mon Sep 17 00:00:00 2001 From: xinwen Date: Mon, 7 Feb 2022 17:46:53 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=B6=88=E6=81=AF=E8=AE=A2=E9=98=85redi?= =?UTF-8?q?s=E8=BF=9E=E6=8E=A5=E6=9C=AA=E5=85=B3=E9=97=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/utils/connection.py | 8 ++++++++ apps/notifications/ws.py | 14 ++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/apps/common/utils/connection.py b/apps/common/utils/connection.py index b7d1f165a..ea371f28f 100644 --- a/apps/common/utils/connection.py +++ b/apps/common/utils/connection.py @@ -23,6 +23,7 @@ class RedisPubSub: def __init__(self, ch, db=10): self.ch = ch self.redis = get_redis_client(db) + self.subscriber = None def subscribe(self): ps = self.redis.pubsub() @@ -41,7 +42,9 @@ class RedisPubSub: :param handle: lambda item: do_something :return: """ + self.close_handle_msg() sub = self.subscribe() + self.subscriber = sub msgs = sub.listen() try: @@ -65,3 +68,8 @@ class RedisPubSub: except Exception as e: logger.error("Redis observer close error: ", e) + def close_handle_msg(self): + if self.subscriber: + self.subscriber.close() + self.subscriber = None + diff --git a/apps/notifications/ws.py b/apps/notifications/ws.py index d1032b237..9936427b5 100644 --- a/apps/notifications/ws.py +++ b/apps/notifications/ws.py @@ -5,7 +5,7 @@ from channels.generic.websocket import JsonWebsocketConsumer from common.utils import get_logger from common.db.utils import safe_db_connection from .site_msg import SiteMessageUtil -from .signals_handler import new_site_msg_chan +from .signals_handler import NewSiteMsgSubPub logger = get_logger(__name__) @@ -13,6 +13,10 @@ logger = get_logger(__name__) class SiteMsgWebsocket(JsonWebsocketConsumer): refresh_every_seconds = 10 + def __init__(self, *args, **kwargs): + super(SiteMsgWebsocket, self).__init__(*args, **kwargs) + self.subscriber = None + def connect(self): user = self.scope["user"] if user.is_authenticated: @@ -23,6 +27,10 @@ class SiteMsgWebsocket(JsonWebsocketConsumer): else: self.close() + def disconnect(self, code): + if self.subscriber: + self.subscriber.close_handle_msg() + def receive(self, text_data=None, bytes_data=None, **kwargs): data = json.loads(text_data) refresh_every_seconds = data.get('refresh_every_seconds') @@ -56,4 +64,6 @@ class SiteMsgWebsocket(JsonWebsocketConsumer): if user_id in users: ws.send_unread_msg_count() - new_site_msg_chan.keep_handle_msg(handle_new_site_msg_recv) + subscriber = NewSiteMsgSubPub() + self.subscriber = subscriber + subscriber.keep_handle_msg(handle_new_site_msg_recv)