Files
jumpserver/apps/common/decorators.py
fit2bot 3f4141ca0b merge: with pam (#14911)
* perf: change i18n

* perf: pam

* perf: change translate

* perf: add check account

* perf: add date field

* perf: add account filter

* perf: remove some js

* perf: add account status action

* perf: update pam

* perf: 修改 discover account

* perf: update filter

* perf: update gathered account

* perf: 修改账号同步

* perf: squash migrations

* perf: update pam

* perf: change i18n

* perf: update account risk

* perf: 更新风险发现

* perf: remove css

* perf: Admin connection token

* perf: Add a switch to check connectivity after changing the password, and add a custom ssh command for push tasks

* perf: Modify account migration files

* perf: update pam

* perf: remove to check account dir

* perf: Admin connection token

* perf: update check account

* perf: 优化发送结果

* perf: update pam

* perf: update bulk update create

* perf: prepaire using thread timer for bulk_create_decorator

* perf: update bulk create decorator

* perf: 优化 playbook manager

* perf: 优化收集账号的报表

* perf: Update poetry

* perf: Update Dockerfile with new base image tag

* fix: Account migrate 0012 file

* perf: 修改备份

* perf: update pam

* fix: Expand resource_type filter to include raw type

* feat: PAM Service (#14552)

* feat: PAM Service

* perf: import package name

---------

Co-authored-by: jiangweidong <1053570670@qq.com>

* perf: Change secret dashboard (#14551)

Co-authored-by: feng <1304903146@qq.com>

* perf: update migrations

* perf: 修改支持 pam

* perf: Change secret record table dashboard

* perf: update status

* fix: Automation send report

* perf: Change secret report

* feat: windows accounts gather

* perf: update change status

* perf: Account backup

* perf: Account backup report

* perf: Account migrate

* perf: update service to application

* perf: update migrations

* perf: update logo

* feat: oracle accounts gather (#14571)

* feat: oracle accounts gather

* feat: sqlserver accounts gather

* feat: postgresql accounts gather

* feat: mysql accounts gather

---------

Co-authored-by: wangruidong <940853815@qq.com>

* feat: mongodb accounts gather

* perf: Change secret

* perf: Migrate

* perf: Merge conflicting migration files

* perf: Change secret

* perf: Automation filter org

* perf: Account push

* perf: Random secret string

* perf: Enhance SQL query and update risk handling in accounts

* perf: Ticket filter assignee_id

* perf: 修改 account remote

* perf: 修改一些 adhoc 任务

* perf: Change secret

* perf: Remove push account extra api

* perf: update status

* perf: The entire organization can view activity log

* fix: risk field check

* perf: add account details api

* perf: add demo mode

* perf: Delete gather_account

* perf: Perfect solution to account version problem

* perf: Update status action to handle multiple accounts

* perf: Add GatherAccountDetailField and update serializers

* perf: Display account history in combination with password change records

* perf: Lina translate

* fix: Update mysql_filter to handle nested user info

* perf: Admin connection token validate_permission account

* perf: copy move account

* perf: account filter risk

* perf: account risk filter

* perf: Copy move account failed message

* fix: gather account sync account to asset

* perf: Pam dashboard

* perf: Account dashboard total accounts

* perf: Pam dashboard

* perf: Change secret filter account secret_reset

* perf: 修改 risk filter

* perf: pam translate

* feat: Check for leaked duplicate passwords. (#14711)

* feat: Check for leaked duplicate passwords.

* perf: Use SQLite instead of txt as leak password database

---------

Co-authored-by: jiangweidong <1053570670@qq.com>
Co-authored-by: 老广 <ibuler@qq.com>

* perf: merge with remote

* perf: Add risk change_password_add handle

* perf: Pam dashboard

* perf: check account manager import

* perf: 重构扫描

* perf: 修改 db

* perf: Gather account manager

* perf: update change db lib

* perf: dashboard

* perf: Account gather

* perf: 修改 asset get queryset

* perf: automation report

* perf: Pam account

* perf: Pam dashboard api

* perf: risk add account

* perf: 修改 risk check

* perf: Risk account

* perf: update risk add reopen action

* perf: add pylintrc

* Revert "perf: automation report"

This reverts commit 22aee54207.

* perf: check account engine

* perf: Perf: Optimism Gather Report Style

* Perf: Remove unuser actions

* Perf: Perf push account

* perf: perf gather account

* perf: Automation report

* perf: Push account recorder

* perf: Push account record

* perf: Pam dashboard

* perf: perf

* perf: update intergration

* perf: integrations application detail add account tab page

* feat: Custom change password supports configuration of interactive items

* perf: Go and Python demo code

* perf: Custom secret change

* perf: add user filter

* perf: translate

* perf: Add demo code docs

* perf: update some i18n

* perf: update some i18n

* perf: Add Java, Node, Go, and cURL demo code

* perf: Translate

* perf: Change secret translate

* perf: Translate

* perf: update some i18n

* perf: translate

* perf: Ansible playbook

* perf: update some choice

* perf: update some choice

* perf: update account serializer remote unused code

* perf: conflict

* perf: update import

---------

Co-authored-by: ibuler <ibuler@qq.com>
Co-authored-by: feng <1304903146@qq.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: wangruidong <940853815@qq.com>
Co-authored-by: jiangweidong <1053570670@qq.com>
Co-authored-by: feng626 <57284900+feng626@users.noreply.github.com>
Co-authored-by: zhaojisen <1301338853@qq.com>
2025-02-21 16:39:57 +08:00

385 lines
12 KiB
Python

# -*- coding: utf-8 -*-
#
import asyncio
import functools
import inspect
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from django.db import transaction
from .db.utils import open_db_connection, safe_db_connection
from .utils import logger
def on_transaction_commit(func):
"""
如果不调用on_commit, 对象创建时添加多对多字段值失败
"""
def inner(*args, **kwargs):
transaction.on_commit(lambda: func(*args, **kwargs))
return inner
class Singleton(object):
""" 单例类 """
def __init__(self, cls):
self._cls = cls
self._instance = {}
def __call__(self):
if self._cls not in self._instance:
self._instance[self._cls] = self._cls()
return self._instance[self._cls]
def default_suffix_key(*args, **kwargs):
return 'default'
def key_by_org(*args, **kwargs):
values = list(kwargs.values())
if not values:
return 'default'
return values[0][0].org_id
class EventLoopThread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._loop = asyncio.new_event_loop()
def run(self) -> None:
asyncio.set_event_loop(self._loop)
try:
self._loop.run_forever()
except Exception as e:
logger.error("Event loop stopped with err: {} ".format(e))
def get_loop(self):
return self._loop
_loop_thread = EventLoopThread()
_loop_thread.daemon = True
_loop_thread.start()
executor = ThreadPoolExecutor(
max_workers=10,
thread_name_prefix='debouncer'
)
_loop_debouncer_func_task_cache = {}
_loop_debouncer_func_args_cache = {}
_loop_debouncer_func_task_time_cache = {}
def get_loop():
return _loop_thread.get_loop()
def cancel_or_remove_debouncer_task(cache_key):
task = _loop_debouncer_func_task_cache.get(cache_key, None)
if not task:
return
if task.done():
del _loop_debouncer_func_task_cache[cache_key]
else:
task.cancel()
def run_debouncer_func(cache_key, org, ttl, func, *args, **kwargs):
cancel_or_remove_debouncer_task(cache_key)
run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func)
current = time.time()
first_run_time = _loop_debouncer_func_task_time_cache.get(cache_key, None)
if first_run_time is None:
_loop_debouncer_func_task_time_cache[cache_key] = current
first_run_time = current
if current - first_run_time > ttl:
_loop_debouncer_func_args_cache.pop(cache_key, None)
_loop_debouncer_func_task_time_cache.pop(cache_key, None)
executor.submit(run_func_partial, *args, **kwargs)
logger.debug('pid {} executor submit run {}'.format(
os.getpid(), func.__name__, ))
return
loop = _loop_thread.get_loop()
_debouncer = Debouncer(run_func_partial, lambda: True, ttl, loop=loop, executor=executor)
task = asyncio.run_coroutine_threadsafe(_debouncer(*args, **kwargs), loop=loop)
_loop_debouncer_func_task_cache[cache_key] = task
class Debouncer(object):
def __init__(self, callback, check, delay, loop=None, executor=None):
self.callback = callback
self.check = check
self.delay = delay
self.loop = loop
if not loop:
self.loop = asyncio.get_event_loop()
self.executor = executor
async def __call__(self, *args, **kwargs):
await asyncio.sleep(self.delay)
ok = await self._run_sync_to_async(self.check)
if ok:
callback_func = functools.partial(self.callback, *args, **kwargs)
return await self._run_sync_to_async(callback_func)
async def _run_sync_to_async(self, func):
if asyncio.iscoroutinefunction(func):
return await func()
return await self.loop.run_in_executor(self.executor, func)
ignore_err_exceptions = (
"(3101, 'Plugin instructed the server to rollback the current transaction.')",
)
def _run_func_with_org(key, org, func, *args, **kwargs):
from orgs.utils import set_current_org
try:
with open_db_connection() as conn:
# 保证执行时使用的是新的 connection 数据库连接
# 避免出现 MySQL server has gone away 的情况
set_current_org(org)
func(*args, **kwargs)
except Exception as e:
msg = str(e)
log_func = logger.error
if msg in ignore_err_exceptions:
log_func = logger.info
pid = os.getpid()
thread_name = threading.current_thread()
log_func('pid {} thread {} delay run {} error: {}'.format(
pid, thread_name, func.__name__, msg))
_loop_debouncer_func_task_cache.pop(key, None)
_loop_debouncer_func_args_cache.pop(key, None)
_loop_debouncer_func_task_time_cache.pop(key, None)
def delay_run(ttl=5, key=None):
"""
延迟执行函数, 在 ttl 秒内, 只执行最后一次
:param ttl:
:param key: 是否合并参数, 一个 callback
:return:
"""
def inner(func):
suffix_key_func = key if key else default_suffix_key
sigs = inspect.signature(func)
if len(sigs.parameters) != 0:
raise ValueError('Merge delay run must not arguments: %s' % func.__name__)
@functools.wraps(func)
def wrapper(*args, **kwargs):
from orgs.utils import get_current_org
org = get_current_org()
func_name = f'{func.__module__}_{func.__name__}'
key_suffix = suffix_key_func(*args)
cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
run_debouncer_func(cache_key, org, ttl, func, *args, **kwargs)
return wrapper
return inner
def merge_delay_run(ttl=5, key=None):
"""
延迟执行函数, 在 ttl 秒内, 只执行最后一次, 并且合并参数
:param ttl:
:param key: 是否合并参数, 一个 callback
:return:
"""
def delay(func, *args, **kwargs):
from orgs.utils import get_current_org
# 每次调用 delay 时可以指定本次调用的 ttl
current_ttl = kwargs.pop('ttl', ttl)
suffix_key_func = key if key else default_suffix_key
org = get_current_org()
func_name = f'{func.__module__}_{func.__name__}'
key_suffix = suffix_key_func(*args, **kwargs)
cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}'
cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {})
for k, v in kwargs.items():
if not isinstance(v, (tuple, list, set)):
raise ValueError('func kwargs value must be list or tuple: %s %s' % (func.__name__, v))
v = set(v)
if k not in cache_kwargs:
cache_kwargs[k] = v
else:
cache_kwargs[k] = cache_kwargs[k].union(v)
_loop_debouncer_func_args_cache[cache_key] = cache_kwargs
run_debouncer_func(cache_key, org, current_ttl, func, *args, **cache_kwargs)
def apply(func, sync=False, *args, **kwargs):
if sync:
return func(*args, **kwargs)
else:
return delay(func, *args, **kwargs)
def inner(func):
sigs = inspect.signature(func)
if len(sigs.parameters) != 1:
raise ValueError('func must have one arguments: %s' % func.__name__)
param = list(sigs.parameters.values())[0]
if not isinstance(param.default, tuple):
raise ValueError('func default must be tuple: %s' % param.default)
func.delay = functools.partial(delay, func)
func.apply = functools.partial(apply, func)
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return inner
@delay_run(ttl=5)
def test_delay_run():
print("Hello, now is %s" % time.time())
@merge_delay_run(ttl=5, key=lambda users=(): users[0][0])
def test_merge_delay_run(users=()):
name = ','.join(users)
time.sleep(2)
print("Hello, %s, now is %s" % (name, time.time()))
def do_test():
s = time.time()
print("start : %s" % time.time())
for i in range(100):
# test_delay_run('test', year=i)
test_merge_delay_run(users=['test %s' % i])
test_merge_delay_run(users=['best %s' % i])
test_delay_run('test run %s' % i)
end = time.time()
using = end - s
print("end : %s, using: %s" % (end, using))
def cached_method(ttl=20):
_cache = {}
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
key = (func, args, tuple(sorted(kwargs.items())))
# 检查缓存是否存在且未过期
if key in _cache and time.time() - _cache[key]['timestamp'] < ttl:
return _cache[key]['result']
# 缓存过期或不存在,执行方法并缓存结果
result = func(*args, **kwargs)
_cache[key] = {'result': result, 'timestamp': time.time()}
return result
return wrapper
return decorator
def bulk_handle(handler, batch_size=50, timeout=0.5):
def decorator(func):
from orgs.utils import get_current_org_id
cache = [] # 缓存实例的列表
lock = threading.Lock() # 用于线程安全
timer = [None] # 定时器对象,列表存储以便重置
org_id = None
def reset_timer():
"""重置定时器"""
if timer[0] is not None:
timer[0].cancel()
timer[0] = threading.Timer(timeout, handle_remaining)
timer[0].start()
def handle_it():
from orgs.utils import tmp_to_org
with lock:
if not cache:
return
with tmp_to_org(org_id):
with safe_db_connection():
handler(cache)
cache.clear()
def handle_on_org_changed():
nonlocal org_id
if org_id is None:
org_id = get_current_org_id()
else:
c_org_id = get_current_org_id()
if org_id != c_org_id:
handle_it()
org_id = c_org_id
@wraps(func)
def wrapper(*args, **kwargs):
nonlocal cache
handle_on_org_changed()
# 调用被装饰的函数,生成一个实例
instance = func(*args, **kwargs)
if instance is None:
return None
# 添加实例到缓存
cache.append(instance)
print(f"Instance added to cache. Cache size: {len(cache)}")
# 如果缓存大小达到批量保存阈值,执行保存
if len(cache) >= batch_size:
handle_it()
reset_timer()
return instance
# 提交剩余实例的方法
def handle_remaining():
if not cache:
return
print("Timer expired. Saving remaining instances.")
from orgs.utils import tmp_to_org
with tmp_to_org(org_id):
handle_it()
wrapper.finish = handle_remaining
return wrapper
return decorator
def bulk_create_decorator(instance_model, batch_size=50, ignore_conflicts=True, timeout=0.3):
def handle(cache):
instance_model.objects.bulk_create(cache, ignore_conflicts=ignore_conflicts)
return bulk_handle(handle, batch_size, timeout)
def bulk_update_decorator(instance_model, batch_size=50, update_fields=None, timeout=0.3):
def handle(cache):
instance_model.objects.bulk_update(cache, update_fields)
return bulk_handle(handle, batch_size, timeout)