diff --git a/apps/common/utils/connection.py b/apps/common/utils/connection.py index b7d1f165a..ea371f28f 100644 --- a/apps/common/utils/connection.py +++ b/apps/common/utils/connection.py @@ -23,6 +23,7 @@ class RedisPubSub: def __init__(self, ch, db=10): self.ch = ch self.redis = get_redis_client(db) + self.subscriber = None def subscribe(self): ps = self.redis.pubsub() @@ -41,7 +42,9 @@ class RedisPubSub: :param handle: lambda item: do_something :return: """ + self.close_handle_msg() sub = self.subscribe() + self.subscriber = sub msgs = sub.listen() try: @@ -65,3 +68,8 @@ class RedisPubSub: except Exception as e: logger.error("Redis observer close error: ", e) + def close_handle_msg(self): + if self.subscriber: + self.subscriber.close() + self.subscriber = None + diff --git a/apps/notifications/ws.py b/apps/notifications/ws.py index d1032b237..9936427b5 100644 --- a/apps/notifications/ws.py +++ b/apps/notifications/ws.py @@ -5,7 +5,7 @@ from channels.generic.websocket import JsonWebsocketConsumer from common.utils import get_logger from common.db.utils import safe_db_connection from .site_msg import SiteMessageUtil -from .signals_handler import new_site_msg_chan +from .signals_handler import NewSiteMsgSubPub logger = get_logger(__name__) @@ -13,6 +13,10 @@ logger = get_logger(__name__) class SiteMsgWebsocket(JsonWebsocketConsumer): refresh_every_seconds = 10 + def __init__(self, *args, **kwargs): + super(SiteMsgWebsocket, self).__init__(*args, **kwargs) + self.subscriber = None + def connect(self): user = self.scope["user"] if user.is_authenticated: @@ -23,6 +27,10 @@ class SiteMsgWebsocket(JsonWebsocketConsumer): else: self.close() + def disconnect(self, code): + if self.subscriber: + self.subscriber.close_handle_msg() + def receive(self, text_data=None, bytes_data=None, **kwargs): data = json.loads(text_data) refresh_every_seconds = data.get('refresh_every_seconds') @@ -56,4 +64,6 @@ class SiteMsgWebsocket(JsonWebsocketConsumer): if user_id in users: ws.send_unread_msg_count() - new_site_msg_chan.keep_handle_msg(handle_new_site_msg_recv) + subscriber = NewSiteMsgSubPub() + self.subscriber = subscriber + subscriber.keep_handle_msg(handle_new_site_msg_recv)