diff --git a/apps/accounts/signal_handlers.py b/apps/accounts/signal_handlers.py index 4c87df5cf..3ae1fba5c 100644 --- a/apps/accounts/signal_handlers.py +++ b/apps/accounts/signal_handlers.py @@ -63,7 +63,7 @@ def create_accounts_activities(account, action='create'): def on_account_create_by_template(sender, instance, created=False, **kwargs): if not created or instance.source != 'template': return - push_accounts_if_need(accounts=(instance,)) + push_accounts_if_need.delay(accounts=(instance,)) create_accounts_activities(instance, action='create') diff --git a/apps/assets/signal_handlers/asset.py b/apps/assets/signal_handlers/asset.py index 5ab4a4117..7ced509d6 100644 --- a/apps/assets/signal_handlers/asset.py +++ b/apps/assets/signal_handlers/asset.py @@ -63,13 +63,13 @@ def on_asset_create(sender, instance=None, created=False, **kwargs): return logger.info("Asset create signal recv: {}".format(instance)) - ensure_asset_has_node(assets=(instance,)) + ensure_asset_has_node.delay(assets=(instance,)) # 获取资产硬件信息 auto_config = instance.auto_config if auto_config.get('ping_enabled'): logger.debug('Asset {} ping enabled, test connectivity'.format(instance.name)) - test_assets_connectivity_handler(assets=(instance,)) + test_assets_connectivity_handler.delay(assets=(instance,)) if auto_config.get('gather_facts_enabled'): logger.debug('Asset {} gather facts enabled, gather facts'.format(instance.name)) gather_assets_facts_handler(assets=(instance,)) diff --git a/apps/assets/signal_handlers/node_assets_amount.py b/apps/assets/signal_handlers/node_assets_amount.py index ea2b3ba8a..5c4633dbd 100644 --- a/apps/assets/signal_handlers/node_assets_amount.py +++ b/apps/assets/signal_handlers/node_assets_amount.py @@ -2,14 +2,16 @@ # from operator import add, sub +from django.conf import settings from django.db.models.signals import m2m_changed from django.dispatch import receiver from assets.models import Asset, Node from common.const.signals import PRE_CLEAR, POST_ADD, PRE_REMOVE from common.decorators import on_transaction_commit, merge_delay_run +from common.signals import django_ready from common.utils import get_logger -from orgs.utils import tmp_to_org +from orgs.utils import tmp_to_org, tmp_to_root_org from ..tasks import check_node_assets_amount_task logger = get_logger(__file__) @@ -34,7 +36,7 @@ def on_node_asset_change(sender, action, instance, reverse, pk_set, **kwargs): node_ids = [instance.id] else: node_ids = list(pk_set) - update_nodes_assets_amount(node_ids=node_ids) + update_nodes_assets_amount.delay(node_ids=node_ids) @merge_delay_run(ttl=30) @@ -52,3 +54,18 @@ def update_nodes_assets_amount(node_ids=()): node.assets_amount = node.get_assets_amount() Node.objects.bulk_update(nodes, ['assets_amount']) + + +@receiver(django_ready) +def set_assets_size_to_setting(sender, **kwargs): + from assets.models import Asset + try: + with tmp_to_root_org(): + amount = Asset.objects.order_by().count() + except: + amount = 0 + + if amount > 20000: + settings.ASSET_SIZE = 'large' + elif amount > 2000: + settings.ASSET_SIZE = 'medium' diff --git a/apps/assets/signal_handlers/node_assets_mapping.py b/apps/assets/signal_handlers/node_assets_mapping.py index a53e534b6..1177f02d6 100644 --- a/apps/assets/signal_handlers/node_assets_mapping.py +++ b/apps/assets/signal_handlers/node_assets_mapping.py @@ -44,18 +44,18 @@ def on_node_post_create(sender, instance, created, update_fields, **kwargs): need_expire = False if need_expire: - expire_node_assets_mapping(org_ids=(instance.org_id,)) + expire_node_assets_mapping.delay(org_ids=(instance.org_id,)) @receiver(post_delete, sender=Node) def on_node_post_delete(sender, instance, **kwargs): - expire_node_assets_mapping(org_ids=(instance.org_id,)) + expire_node_assets_mapping.delay(org_ids=(instance.org_id,)) @receiver(m2m_changed, sender=Asset.nodes.through) def on_node_asset_change(sender, instance, action='pre_remove', **kwargs): if action.startswith('post'): - expire_node_assets_mapping(org_ids=(instance.org_id,)) + expire_node_assets_mapping.delay(org_ids=(instance.org_id,)) @receiver(django_ready) diff --git a/apps/authentication/backends/drf.py b/apps/authentication/backends/drf.py index 29d586e27..4cf2577c1 100644 --- a/apps/authentication/backends/drf.py +++ b/apps/authentication/backends/drf.py @@ -34,9 +34,9 @@ def update_user_last_used(users=()): def after_authenticate_update_date(user, token=None): - update_user_last_used(users=(user.id,)) + update_user_last_used.delay(users=(user.id,)) if token: - update_token_last_used(tokens=(token,)) + update_token_last_used.delay(tokens=(token,)) class AccessTokenAuthentication(authentication.BaseAuthentication): diff --git a/apps/common/decorators.py b/apps/common/decorators.py index 12a72ce28..171a4ff33 100644 --- a/apps/common/decorators.py +++ b/apps/common/decorators.py @@ -199,6 +199,32 @@ def merge_delay_run(ttl=5, key=None): :return: """ + def delay(func, *args, **kwargs): + from orgs.utils import get_current_org + suffix_key_func = key if key else default_suffix_key + org = get_current_org() + func_name = f'{func.__module__}_{func.__name__}' + key_suffix = suffix_key_func(*args, **kwargs) + cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}' + cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {}) + + for k, v in kwargs.items(): + if not isinstance(v, (tuple, list, set)): + raise ValueError('func kwargs value must be list or tuple: %s %s' % (func.__name__, v)) + v = set(v) + if k not in cache_kwargs: + cache_kwargs[k] = v + else: + cache_kwargs[k] = cache_kwargs[k].union(v) + _loop_debouncer_func_args_cache[cache_key] = cache_kwargs + run_debouncer_func(cache_key, org, ttl, func, *args, **cache_kwargs) + + def apply(func, sync=False, *args, **kwargs): + if sync: + return func(*args, **kwargs) + else: + return delay(func, *args, **kwargs) + def inner(func): sigs = inspect.signature(func) if len(sigs.parameters) != 1: @@ -206,27 +232,12 @@ def merge_delay_run(ttl=5, key=None): param = list(sigs.parameters.values())[0] if not isinstance(param.default, tuple): raise ValueError('func default must be tuple: %s' % param.default) - suffix_key_func = key if key else default_suffix_key + func.delay = functools.partial(delay, func) + func.apply = functools.partial(apply, func) @functools.wraps(func) def wrapper(*args, **kwargs): - from orgs.utils import get_current_org - org = get_current_org() - func_name = f'{func.__module__}_{func.__name__}' - key_suffix = suffix_key_func(*args, **kwargs) - cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}' - cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {}) - - for k, v in kwargs.items(): - if not isinstance(v, (tuple, list, set)): - raise ValueError('func kwargs value must be list or tuple: %s %s' % (func.__name__, v)) - v = set(v) - if k not in cache_kwargs: - cache_kwargs[k] = v - else: - cache_kwargs[k] = cache_kwargs[k].union(v) - _loop_debouncer_func_args_cache[cache_key] = cache_kwargs - run_debouncer_func(cache_key, org, ttl, func, *args, **cache_kwargs) + return func(*args, **kwargs) return wrapper diff --git a/apps/jumpserver/settings/custom.py b/apps/jumpserver/settings/custom.py index 311e54aa8..2457b28b1 100644 --- a/apps/jumpserver/settings/custom.py +++ b/apps/jumpserver/settings/custom.py @@ -214,6 +214,9 @@ PERM_TREE_REGEN_INTERVAL = CONFIG.PERM_TREE_REGEN_INTERVAL MAGNUS_ORACLE_PORTS = CONFIG.MAGNUS_ORACLE_PORTS LIMIT_SUPER_PRIV = CONFIG.LIMIT_SUPER_PRIV +# Asset account may be too many +ASSET_SIZE = 'small' + # Chat AI CHAT_AI_ENABLED = CONFIG.CHAT_AI_ENABLED GPT_API_KEY = CONFIG.GPT_API_KEY diff --git a/apps/orgs/signal_handlers/cache.py b/apps/orgs/signal_handlers/cache.py index 4dc3c796e..975392ad9 100644 --- a/apps/orgs/signal_handlers/cache.py +++ b/apps/orgs/signal_handlers/cache.py @@ -87,7 +87,7 @@ class OrgResourceStatisticsRefreshUtil: if not cache_field_name: return org = getattr(instance, 'org', None) - cls.refresh_org_fields(((org, cache_field_name),)) + cls.refresh_org_fields.delay(org_fields=((org, cache_field_name),)) @receiver(post_save) diff --git a/apps/perms/utils/user_perm_tree.py b/apps/perms/utils/user_perm_tree.py index 13577a9b1..4514c78ca 100644 --- a/apps/perms/utils/user_perm_tree.py +++ b/apps/perms/utils/user_perm_tree.py @@ -72,7 +72,7 @@ class UserPermTreeRefreshUtil(_UserPermTreeCacheMixin): @timeit def refresh_if_need(self, force=False): - built_just_now = cache.get(self.cache_key_time) + built_just_now = False if settings.ASSET_SIZE == 'small' else cache.get(self.cache_key_time) if built_just_now: logger.info('Refresh user perm tree just now, pass: {}'.format(built_just_now)) return @@ -80,12 +80,18 @@ class UserPermTreeRefreshUtil(_UserPermTreeCacheMixin): if not to_refresh_orgs: logger.info('Not have to refresh orgs') return + logger.info("Delay refresh user orgs: {} {}".format(self.user, [o.name for o in to_refresh_orgs])) - refresh_user_orgs_perm_tree(user_orgs=((self.user, tuple(to_refresh_orgs)),)) - refresh_user_favorite_assets(users=(self.user,)) + sync = True if settings.ASSET_SIZE == 'small' else False + refresh_user_orgs_perm_tree.apply(sync=sync, user_orgs=((self.user, tuple(to_refresh_orgs)),)) + refresh_user_favorite_assets.apply(sync=sync, users=(self.user,)) @timeit def refresh_tree_manual(self): + """ + 用来手动 debug + :return: + """ built_just_now = cache.get(self.cache_key_time) if built_just_now: logger.info('Refresh just now, pass: {}'.format(built_just_now)) @@ -105,8 +111,9 @@ class UserPermTreeRefreshUtil(_UserPermTreeCacheMixin): return self._clean_user_perm_tree_for_legacy_org() - ttl = settings.PERM_TREE_REGEN_INTERVAL - cache.set(self.cache_key_time, int(time.time()), ttl) + if settings.ASSET_SIZE != 'small': + ttl = settings.PERM_TREE_REGEN_INTERVAL + cache.set(self.cache_key_time, int(time.time()), ttl) lock = UserGrantedTreeRebuildLock(self.user.id) got = lock.acquire(blocking=False)