diff --git a/apps/assets/signals_handler/node_assets_mapping.py b/apps/assets/signals_handler/node_assets_mapping.py index 8d9580a27..30eb9ed0b 100644 --- a/apps/assets/signals_handler/node_assets_mapping.py +++ b/apps/assets/signals_handler/node_assets_mapping.py @@ -10,6 +10,7 @@ from django.dispatch import receiver from django.utils.functional import LazyObject from common.signals import django_ready +from common.db.utils import close_old_connections from common.utils.connection import RedisPubSub from common.utils import get_logger from assets.models import Asset, Node @@ -77,11 +78,14 @@ def on_node_asset_change(sender, instance, **kwargs): def subscribe_node_assets_mapping_expire(sender, **kwargs): logger.debug("Start subscribe for expire node assets id mapping from memory") - def keep_subscribe(): + def keep_subscribe_node_assets_relation(): while True: try: subscribe = node_assets_mapping_for_memory_pub_sub.subscribe() - for message in subscribe.listen(): + msgs = subscribe.listen() + # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 + close_old_connections() + for message in msgs: if message["type"] != "message": continue org_id = message['data'].decode() @@ -95,7 +99,10 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs): except Exception as e: logger.exception(f'subscribe_node_assets_mapping_expire: {e}') Node.expire_all_orgs_node_all_asset_ids_mapping_from_memory() + finally: + # 请求结束,关闭连接 + close_old_connections() - t = threading.Thread(target=keep_subscribe) + t = threading.Thread(target=keep_subscribe_node_assets_relation) t.daemon = True t.start() diff --git a/apps/common/db/utils.py b/apps/common/db/utils.py index d3d735fea..9d6123884 100644 --- a/apps/common/db/utils.py +++ b/apps/common/db/utils.py @@ -1,4 +1,5 @@ from common.utils import get_logger +from django.db import connections logger = get_logger(__file__) @@ -38,3 +39,8 @@ def get_objects(model, pks): not_found_pks = pks - exists_pks logger.error(f'DoesNotExist: <{model.__name__}: {not_found_pks}>') return objs + + +def close_old_connections(): + for conn in connections.all(): + conn.close_if_unusable_or_obsolete() diff --git a/apps/jumpserver/settings/custom.py b/apps/jumpserver/settings/custom.py index d5088ea48..0673cbc1d 100644 --- a/apps/jumpserver/settings/custom.py +++ b/apps/jumpserver/settings/custom.py @@ -143,12 +143,16 @@ SMS_TEST_PHONE = CONFIG.SMS_TEST_PHONE # Alibaba ALIBABA_ACCESS_KEY_ID = CONFIG.ALIBABA_ACCESS_KEY_ID ALIBABA_ACCESS_KEY_SECRET = CONFIG.ALIBABA_ACCESS_KEY_SECRET +ALIBABA_VERIFY_SIGN_NAME = CONFIG.ALIBABA_VERIFY_SIGN_NAME +ALIBABA_VERIFY_TEMPLATE_CODE = CONFIG.ALIBABA_VERIFY_TEMPLATE_CODE ALIBABA_SMS_SIGN_AND_TEMPLATES = CONFIG.ALIBABA_SMS_SIGN_AND_TEMPLATES # TENCENT TENCENT_SECRET_ID = CONFIG.TENCENT_SECRET_ID TENCENT_SECRET_KEY = CONFIG.TENCENT_SECRET_KEY TENCENT_SDKAPPID = CONFIG.TENCENT_SDKAPPID +TENCENT_VERIFY_SIGN_NAME = CONFIG.TENCENT_VERIFY_SIGN_NAME +TENCENT_VERIFY_TEMPLATE_CODE = CONFIG.TENCENT_VERIFY_TEMPLATE_CODE TENCENT_SMS_SIGN_AND_TEMPLATES = CONFIG.TENCENT_SMS_SIGN_AND_TEMPLATES # 公告 diff --git a/apps/notifications/ws.py b/apps/notifications/ws.py index a9426a7c6..423639356 100644 --- a/apps/notifications/ws.py +++ b/apps/notifications/ws.py @@ -3,6 +3,7 @@ import json from redis.exceptions import ConnectionError from channels.generic.websocket import JsonWebsocketConsumer +from common.db.utils import close_old_connections from common.utils import get_logger from .site_msg import SiteMessageUtil from .signals_handler import new_site_msg_chan @@ -49,27 +50,40 @@ class SiteMsgWebsocket(JsonWebsocketConsumer): self.send_unread_msg_count() try: - for message in self.chan.listen(): + msgs = self.chan.listen() + # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 + close_old_connections() + for message in msgs: if message['type'] != 'message': continue + try: msg = json.loads(message['data'].decode()) - logger.debug('New site msg recv, may be mine: {}'.format(msg)) - if not msg: - continue - users = msg.get('users', []) - logger.debug('Message users: {}'.format(users)) - if user_id in users: - self.send_unread_msg_count() except json.JSONDecoder as e: logger.debug('Decode json error: ', e) + continue + if not msg: + continue + + logger.debug('New site msg recv, may be mine: {}'.format(msg)) + users = msg.get('users', []) + logger.debug('Message users: {}'.format(users)) + if user_id in users: + self.send_unread_msg_count() except ConnectionError: - logger.debug('Redis chan closed') + logger.error('Redis chan closed') + finally: + logger.info('Notification ws thread end') + close_old_connections() def disconnect(self, close_code): - if self.chan is not None: - try: + try: + if self.chan is not None: self.chan.close() - except: - pass - self.close() + self.close() + finally: + close_old_connections() + logger.info('Notification websocket disconnect') + + + diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index 00a0027cd..fb0c2e71c 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -137,9 +137,14 @@ def check_server_performance_period(): @shared_task(queue="ansible") def hello(name, callback=None): + from users.models import User import time - time.sleep(10) + + count = User.objects.count() print("Hello {}".format(name)) + print("Count: ", count) + time.sleep(1) + return count @shared_task diff --git a/apps/ops/ws.py b/apps/ops/ws.py index e9cb38d28..94d71d90d 100644 --- a/apps/ops/ws.py +++ b/apps/ops/ws.py @@ -2,12 +2,12 @@ import time import os import threading import json +from channels.generic.websocket import JsonWebsocketConsumer from common.utils import get_logger - +from common.db.utils import close_old_connections from .celery.utils import get_celery_task_log_path from .ansible.utils import get_ansible_task_log_path -from channels.generic.websocket import JsonWebsocketConsumer logger = get_logger(__name__) @@ -86,3 +86,4 @@ class TaskLogWebsocket(JsonWebsocketConsumer): def disconnect(self, close_code): self.disconnected = True self.close() + close_old_connections() diff --git a/apps/orgs/signals_handler/common.py b/apps/orgs/signals_handler/common.py index aed695b6e..72715885b 100644 --- a/apps/orgs/signals_handler/common.py +++ b/apps/orgs/signals_handler/common.py @@ -6,6 +6,7 @@ from functools import partial from django.dispatch import receiver from django.utils.functional import LazyObject +from common.db.utils import close_old_connections from django.db.models.signals import m2m_changed from django.db.models.signals import post_save, post_delete, pre_delete @@ -45,11 +46,14 @@ def expire_orgs_mapping_for_memory(org_id): def subscribe_orgs_mapping_expire(sender, **kwargs): logger.debug("Start subscribe for expire orgs mapping from memory") - def keep_subscribe(): + def keep_subscribe_org_mapping(): while True: try: subscribe = orgs_mapping_for_memory_pub_sub.subscribe() - for message in subscribe.listen(): + msgs = subscribe.listen() + # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 + close_old_connections() + for message in msgs: if message['type'] != 'message': continue if message['data'] == b'error': @@ -59,8 +63,11 @@ def subscribe_orgs_mapping_expire(sender, **kwargs): except Exception as e: logger.exception(f'subscribe_orgs_mapping_expire: {e}') Organization.expire_orgs_mapping() + finally: + # 结束收关闭连接 + close_old_connections() - t = threading.Thread(target=keep_subscribe) + t = threading.Thread(target=keep_subscribe_org_mapping) t.daemon = True t.start() diff --git a/apps/settings/signals_handler.py b/apps/settings/signals_handler.py index fefcc6ec1..483b26ee2 100644 --- a/apps/settings/signals_handler.py +++ b/apps/settings/signals_handler.py @@ -6,12 +6,12 @@ import threading from django.dispatch import receiver from django.db.models.signals import post_save, pre_save from django.utils.functional import LazyObject -from django.db import close_old_connections from jumpserver.utils import current_request from common.decorator import on_transaction_commit from common.utils import get_logger, ssh_key_gen from common.utils.connection import RedisPubSub +from common.db.utils import close_old_connections from common.signals import django_ready from .models import Setting @@ -80,12 +80,14 @@ def on_create_set_created_by(sender, instance=None, **kwargs): def subscribe_settings_change(sender, **kwargs): logger.debug("Start subscribe setting change") - def keep_subscribe(): + def keep_subscribe_settings_change(): while True: try: sub = setting_pub_sub.subscribe() - for msg in sub.listen(): - close_old_connections() + msgs = sub.listen() + # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 + close_old_connections() + for msg in msgs: if msg["type"] != "message": continue item = msg['data'].decode() @@ -93,9 +95,10 @@ def subscribe_settings_change(sender, **kwargs): Setting.refresh_item(item) except Exception as e: logger.exception(f'subscribe_settings_change: {e}') - close_old_connections() Setting.refresh_all_settings() + finally: + close_old_connections() - t = threading.Thread(target=keep_subscribe) + t = threading.Thread(target=keep_subscribe_settings_change) t.daemon = True t.start() diff --git a/apps/settings/utils/ldap.py b/apps/settings/utils/ldap.py index 89ab7b153..68f856da9 100644 --- a/apps/settings/utils/ldap.py +++ b/apps/settings/utils/ldap.py @@ -24,9 +24,9 @@ from copy import deepcopy from common.const import LDAP_AD_ACCOUNT_DISABLE from common.utils import timeit, get_logger +from common.db.utils import close_old_connections from users.utils import construct_user_email from users.models import User -from orgs.models import Organization from authentication.backends.ldap import LDAPAuthorizationBackend, LDAPUser logger = get_logger(__file__) @@ -114,7 +114,7 @@ class LDAPServerUtil(object): cookie = self.connection.result['controls']['1.2.840.113556.1.4.319']['value']['cookie'] return cookie except Exception as e: - logger.error(e, exc_info=True) + logger.debug(e, exc_info=True) return None def get_search_filter_extra(self): @@ -331,15 +331,17 @@ class LDAPSyncUtil(object): def perform_sync(self): logger.info('Start perform sync ldap users from server to cache') - self.pre_sync() try: + self.pre_sync() self.sync() + self.post_sync() except Exception as e: error_msg = str(e) logger.error(error_msg) self.set_task_error_msg(error_msg) - self.post_sync() - logger.info('End perform sync ldap users from server to cache') + finally: + logger.info('End perform sync ldap users from server to cache') + close_old_connections() class LDAPImportUtil(object): diff --git a/apps/templates/_mfa_login_field.html b/apps/templates/_mfa_login_field.html index 52918acfd..8aa3cbdd6 100644 --- a/apps/templates/_mfa_login_field.html +++ b/apps/templates/_mfa_login_field.html @@ -75,18 +75,19 @@ } $('.input-style').each(function (i, ele){ - $(ele).attr('name', '') + $(ele).attr('name', 'code-test') }) const currentMFAInputRef = $('#mfa-' + name + ' .input-style') - currentMFAInputRef.attr('name', 'code').attr('required', true) + currentMFAInputRef.attr('name', 'code') // 登录页时,不应该默认focus const usernameRef = $('input[name="username"]') if (!usernameRef || usernameRef.length === 0) { - currentMFAInputRef.focus() + setTimeout(() => { + currentMFAInputRef.focus() + }) } - currentMFAInputRef.attr('name', 'code') } function sendChallengeCode(currentBtn) { @@ -127,4 +128,4 @@ flash_message: false }) } - \ No newline at end of file + diff --git a/apps/terminal/startup.py b/apps/terminal/startup.py index 7c3fffe69..7e454da83 100644 --- a/apps/terminal/startup.py +++ b/apps/terminal/startup.py @@ -3,8 +3,12 @@ import time import socket import threading from django.conf import settings +from django.db.utils import OperationalError + +from common.db.utils import close_old_connections from common.decorator import Singleton from common.utils import get_disk_usage, get_cpu_load, get_memory_usage, get_logger + from .serializers.terminal import TerminalRegistrationSerializer, StatusSerializer from .const import TerminalTypeChoices from .models.terminal import Terminal @@ -52,9 +56,12 @@ class BaseTerminal(object): status_serializer.validated_data.pop('sessions', None) terminal = self.get_or_register_terminal() status_serializer.validated_data['terminal'] = terminal - status_serializer.save() - time.sleep(self.interval) + try: + status_serializer.save() + time.sleep(self.interval) + except OperationalError: + close_old_connections() def get_or_register_terminal(self): terminal = Terminal.objects.filter(