From c85249be361050f04c21d75c3a1f7cfbd2250b72 Mon Sep 17 00:00:00 2001 From: ibuler Date: Fri, 26 Nov 2021 11:12:53 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BC=98=E5=8C=96=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=8C=E5=BD=A2=E6=88=90=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../signals_handler/node_assets_mapping.py | 35 ++++-------- apps/common/db/utils.py | 12 +++- apps/common/utils/connection.py | 42 +++++++++++++- apps/notifications/models/site_msg.py | 1 - apps/notifications/signals_handler.py | 1 - apps/notifications/ws.py | 56 +++++-------------- apps/orgs/signals_handler/common.py | 25 ++------- apps/settings/signals_handler.py | 21 +------ 8 files changed, 82 insertions(+), 111 deletions(-) diff --git a/apps/assets/signals_handler/node_assets_mapping.py b/apps/assets/signals_handler/node_assets_mapping.py index e598c8698..5c2439005 100644 --- a/apps/assets/signals_handler/node_assets_mapping.py +++ b/apps/assets/signals_handler/node_assets_mapping.py @@ -10,7 +10,6 @@ from django.dispatch import receiver from django.utils.functional import LazyObject from common.signals import django_ready -from common.db.utils import close_old_connections from common.utils.connection import RedisPubSub from common.utils import get_logger from assets.models import Asset, Node @@ -78,31 +77,17 @@ def on_node_asset_change(sender, instance, **kwargs): def subscribe_node_assets_mapping_expire(sender, **kwargs): logger.debug("Start subscribe for expire node assets id mapping from memory") + def handle_node_relation_change(org_id): + root_org_id = Organization.ROOT_ID + Node.expire_node_all_asset_ids_mapping_from_memory(org_id) + Node.expire_node_all_asset_ids_mapping_from_memory(root_org_id) + logger.debug( + "Expire node assets id mapping from memory of org={}, pid={}" + "".format(str(org_id), os.getpid()) + ) + def keep_subscribe_node_assets_relation(): - while True: - try: - subscribe = node_assets_mapping_for_memory_pub_sub.subscribe() - msgs = subscribe.listen() - # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 - for message in msgs: - if message["type"] != "message": - continue - close_old_connections() - org_id = message['data'].decode() - root_org_id = Organization.ROOT_ID - Node.expire_node_all_asset_ids_mapping_from_memory(org_id) - Node.expire_node_all_asset_ids_mapping_from_memory(root_org_id) - logger.debug( - "Expire node assets id mapping from memory of org={}, pid={}" - "".format(str(org_id), os.getpid()) - ) - close_old_connections() - except Exception as e: - logger.exception(f'subscribe_node_assets_mapping_expire: {e}') - Node.expire_all_orgs_node_all_asset_ids_mapping_from_memory() - finally: - # 请求结束,关闭连接 - close_old_connections() + node_assets_mapping_for_memory_pub_sub.keep_handle_msg(handle_node_relation_change) t = threading.Thread(target=keep_subscribe_node_assets_relation) t.daemon = True diff --git a/apps/common/db/utils.py b/apps/common/db/utils.py index 9d6123884..eb6328a9f 100644 --- a/apps/common/db/utils.py +++ b/apps/common/db/utils.py @@ -1,6 +1,9 @@ -from common.utils import get_logger +from contextlib import contextmanager + from django.db import connections +from common.utils import get_logger + logger = get_logger(__file__) @@ -44,3 +47,10 @@ def get_objects(model, pks): def close_old_connections(): for conn in connections.all(): conn.close_if_unusable_or_obsolete() + + +@contextmanager +def safe_db_connection(): + close_old_connections() + yield + close_old_connections() diff --git a/apps/common/utils/connection.py b/apps/common/utils/connection.py index 9bdf39628..b7d1f165a 100644 --- a/apps/common/utils/connection.py +++ b/apps/common/utils/connection.py @@ -1,6 +1,13 @@ +import json + import redis from django.conf import settings +from common.db.utils import safe_db_connection +from common.utils import get_logger + +logger = get_logger(__name__) + def get_redis_client(db): rc = redis.StrictRedis( @@ -23,5 +30,38 @@ class RedisPubSub: return ps def publish(self, data): - self.redis.publish(self.ch, data) + data_json = json.dumps(data) + self.redis.publish(self.ch, data_json) return True + + def keep_handle_msg(self, handle): + """ + handle arg is the pub published + + :param handle: lambda item: do_something + :return: + """ + sub = self.subscribe() + msgs = sub.listen() + + try: + for msg in msgs: + if msg["type"] != "message": + continue + try: + item_json = msg['data'].decode() + item = json.loads(item_json) + + with safe_db_connection(): + handle(item) + except Exception as e: + logger.error('Subscribe handler handle msg error: ', e) + + except Exception as e: + logger.error('Consume msg error: ', e) + + try: + sub.close() + except Exception as e: + logger.error("Redis observer close error: ", e) + diff --git a/apps/notifications/models/site_msg.py b/apps/notifications/models/site_msg.py index 556c00607..3e3c09baa 100644 --- a/apps/notifications/models/site_msg.py +++ b/apps/notifications/models/site_msg.py @@ -1,5 +1,4 @@ from django.db import models -from django.utils.translation import gettext_lazy as _ from common.db.models import JMSModel diff --git a/apps/notifications/signals_handler.py b/apps/notifications/signals_handler.py index 019d2a3da..79964464a 100644 --- a/apps/notifications/signals_handler.py +++ b/apps/notifications/signals_handler.py @@ -46,7 +46,6 @@ def on_site_message_create(sender, instance, created, **kwargs): 'message': instance.message, 'users': user_ids } - data = json.dumps(data) new_site_msg_chan.publish(data) diff --git a/apps/notifications/ws.py b/apps/notifications/ws.py index b5cd55fb1..d1032b237 100644 --- a/apps/notifications/ws.py +++ b/apps/notifications/ws.py @@ -1,10 +1,9 @@ import threading import json -from redis.exceptions import ConnectionError from channels.generic.websocket import JsonWebsocketConsumer -from common.db.utils import close_old_connections from common.utils import get_logger +from common.db.utils import safe_db_connection from .site_msg import SiteMessageUtil from .signals_handler import new_site_msg_chan @@ -13,15 +12,13 @@ logger = get_logger(__name__) class SiteMsgWebsocket(JsonWebsocketConsumer): refresh_every_seconds = 10 - chan = None def connect(self): user = self.scope["user"] if user.is_authenticated: self.accept() - self.chan = new_site_msg_chan.subscribe() - thread = threading.Thread(target=self.unread_site_msg_count) + thread = threading.Thread(target=self.watch_recv_new_site_msg) thread.start() else: self.close() @@ -45,45 +42,18 @@ class SiteMsgWebsocket(JsonWebsocketConsumer): logger.debug('Send unread count to user: {} {}'.format(user_id, unread_count)) self.send_json({'type': 'unread_count', 'unread_count': unread_count}) - def unread_site_msg_count(self): + def watch_recv_new_site_msg(self): + ws = self user_id = str(self.scope["user"].id) - self.send_unread_msg_count() - - try: - msgs = self.chan.listen() - # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 - for message in msgs: - if message['type'] != 'message': - continue - close_old_connections() - try: - msg = json.loads(message['data'].decode()) - except json.JSONDecoder as e: - logger.debug('Decode json error: ', e) - continue - if not msg: - continue - - logger.debug('New site msg recv, may be mine: {}'.format(msg)) - users = msg.get('users', []) - logger.debug('Message users: {}'.format(users)) - if user_id in users: - self.send_unread_msg_count() - close_old_connections() - except ConnectionError: - logger.error('Redis chan closed') - finally: - logger.info('Notification ws thread end') - close_old_connections() - - def disconnect(self, close_code): - try: - if self.chan is not None: - self.chan.close() - self.close() - finally: - close_old_connections() - logger.info('Notification websocket disconnect') + # 先发一个消息再说 + with safe_db_connection(): + self.send_unread_msg_count() + def handle_new_site_msg_recv(msg): + users = msg.get('users', []) + logger.debug('New site msg recv, message users: {}'.format(users)) + if user_id in users: + ws.send_unread_msg_count() + new_site_msg_chan.keep_handle_msg(handle_new_site_msg_recv) diff --git a/apps/orgs/signals_handler/common.py b/apps/orgs/signals_handler/common.py index 72715885b..ff22172b6 100644 --- a/apps/orgs/signals_handler/common.py +++ b/apps/orgs/signals_handler/common.py @@ -6,9 +6,8 @@ from functools import partial from django.dispatch import receiver from django.utils.functional import LazyObject -from common.db.utils import close_old_connections from django.db.models.signals import m2m_changed -from django.db.models.signals import post_save, post_delete, pre_delete +from django.db.models.signals import post_save, pre_delete from orgs.utils import tmp_to_org from orgs.models import Organization, OrganizationMember @@ -47,25 +46,9 @@ def subscribe_orgs_mapping_expire(sender, **kwargs): logger.debug("Start subscribe for expire orgs mapping from memory") def keep_subscribe_org_mapping(): - while True: - try: - subscribe = orgs_mapping_for_memory_pub_sub.subscribe() - msgs = subscribe.listen() - # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 - close_old_connections() - for message in msgs: - if message['type'] != 'message': - continue - if message['data'] == b'error': - raise ValueError - Organization.expire_orgs_mapping() - logger.debug('Expire orgs mapping: ' + str(message['data'])) - except Exception as e: - logger.exception(f'subscribe_orgs_mapping_expire: {e}') - Organization.expire_orgs_mapping() - finally: - # 结束收关闭连接 - close_old_connections() + orgs_mapping_for_memory_pub_sub.keep_handle_msg( + lambda org_id: Organization.expire_orgs_mapping() + ) t = threading.Thread(target=keep_subscribe_org_mapping) t.daemon = True diff --git a/apps/settings/signals_handler.py b/apps/settings/signals_handler.py index b247afd91..241574995 100644 --- a/apps/settings/signals_handler.py +++ b/apps/settings/signals_handler.py @@ -11,7 +11,6 @@ from jumpserver.utils import current_request from common.decorator import on_transaction_commit from common.utils import get_logger, ssh_key_gen from common.utils.connection import RedisPubSub -from common.db.utils import close_old_connections from common.signals import django_ready from .models import Setting @@ -81,23 +80,9 @@ def subscribe_settings_change(sender, **kwargs): logger.debug("Start subscribe setting change") def keep_subscribe_settings_change(): - while True: - try: - sub = setting_pub_sub.subscribe() - msgs = sub.listen() - # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 - for msg in msgs: - if msg["type"] != "message": - continue - close_old_connections() - item = msg['data'].decode() - logger.debug("Found setting change: {}".format(str(item))) - Setting.refresh_item(item) - close_old_connections() - except Exception as e: - logger.exception(f'subscribe_settings_change: {e}') - Setting.refresh_all_settings() - close_old_connections() + setting_pub_sub.keep_handle_msg( + lambda name: Setting.refresh_item(name) + ) t = threading.Thread(target=keep_subscribe_settings_change) t.daemon = True