mirror of
				https://github.com/jumpserver/jumpserver.git
				synced 2025-10-25 01:40:16 +00:00 
			
		
		
		
	* 添加站内信 * s * s * 添加接口 * fix * fix * 重构了一些 * 完成 * 完善 * s * s * s * s * s * s * 测试ok * 替换业务中发送消息的方式 * 修改 * s * 去掉 update 兼容 create * 添加 unread total 接口 * 调整json字段 Co-authored-by: xinwen <coderWen@126.com>
		
			
				
	
	
		
			186 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			186 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # coding: utf-8
 | |
| import os
 | |
| import subprocess
 | |
| import time
 | |
| 
 | |
| from django.conf import settings
 | |
| from celery import shared_task, subtask
 | |
| from celery.exceptions import SoftTimeLimitExceeded
 | |
| from django.utils import timezone
 | |
| from django.utils.translation import ugettext_lazy as _
 | |
| 
 | |
| from common.utils import get_logger, get_object_or_none, get_disk_usage, get_log_keep_day
 | |
| from orgs.utils import tmp_to_root_org, tmp_to_org
 | |
| from .celery.decorator import (
 | |
|     register_as_period_task, after_app_shutdown_clean_periodic,
 | |
|     after_app_ready_start
 | |
| )
 | |
| from .celery.utils import (
 | |
|     create_or_update_celery_periodic_tasks, get_celery_periodic_task,
 | |
|     disable_celery_periodic_task, delete_celery_periodic_task
 | |
| )
 | |
| from .models import Task, CommandExecution, CeleryTask
 | |
| from .notifications import ServerPerformanceMessage
 | |
| 
 | |
| logger = get_logger(__file__)
 | |
| 
 | |
| 
 | |
| def rerun_task():
 | |
|     pass
 | |
| 
 | |
| 
 | |
| @shared_task(queue="ansible")
 | |
| def run_ansible_task(tid, callback=None, **kwargs):
 | |
|     """
 | |
|     :param tid: is the tasks serialized data
 | |
|     :param callback: callback function name
 | |
|     :return:
 | |
|     """
 | |
|     with tmp_to_root_org():
 | |
|         task = get_object_or_none(Task, id=tid)
 | |
|     if not task:
 | |
|         logger.error("No task found")
 | |
|         return
 | |
|     with tmp_to_org(task.org):
 | |
|         result = task.run()
 | |
|         if callback is not None:
 | |
|             subtask(callback).delay(result, task_name=task.name)
 | |
|         return result
 | |
| 
 | |
| 
 | |
| @shared_task(soft_time_limit=60, queue="ansible")
 | |
| def run_command_execution(cid, **kwargs):
 | |
|     with tmp_to_root_org():
 | |
|         execution = get_object_or_none(CommandExecution, id=cid)
 | |
|     if not execution:
 | |
|         logger.error("Not found the execution id: {}".format(cid))
 | |
|         return
 | |
|     with tmp_to_org(execution.run_as.org):
 | |
|         try:
 | |
|             os.environ.update({
 | |
|                 "TERM_ROWS": kwargs.get("rows", ""),
 | |
|                 "TERM_COLS": kwargs.get("cols", ""),
 | |
|             })
 | |
|             execution.run()
 | |
|         except SoftTimeLimitExceeded:
 | |
|             logger.error("Run time out")
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| @after_app_shutdown_clean_periodic
 | |
| @register_as_period_task(interval=3600*24, description=_("Clean task history period"))
 | |
| def clean_tasks_adhoc_period():
 | |
|     logger.debug("Start clean task adhoc and run history")
 | |
|     tasks = Task.objects.all()
 | |
|     for task in tasks:
 | |
|         adhoc = task.adhoc.all().order_by('-date_created')[5:]
 | |
|         for ad in adhoc:
 | |
|             ad.execution.all().delete()
 | |
|             ad.delete()
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| @after_app_shutdown_clean_periodic
 | |
| @register_as_period_task(interval=3600*24, description=_("Clean celery log period"))
 | |
| def clean_celery_tasks_period():
 | |
|     logger.debug("Start clean celery task history")
 | |
|     expire_days = get_log_keep_day('TASK_LOG_KEEP_DAYS')
 | |
|     days_ago = timezone.now() - timezone.timedelta(days=expire_days)
 | |
|     tasks = CeleryTask.objects.filter(date_start__lt=days_ago)
 | |
|     tasks.delete()
 | |
|     tasks = CeleryTask.objects.filter(date_start__isnull=True)
 | |
|     tasks.delete()
 | |
|     command = "find %s -mtime +%s -name '*.log' -type f -exec rm -f {} \\;" % (
 | |
|         settings.CELERY_LOG_DIR, expire_days
 | |
|     )
 | |
|     subprocess.call(command, shell=True)
 | |
|     command = "echo > {}".format(os.path.join(settings.LOG_DIR, 'celery.log'))
 | |
|     subprocess.call(command, shell=True)
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| @after_app_ready_start
 | |
| def clean_celery_periodic_tasks():
 | |
|     """清除celery定时任务"""
 | |
|     need_cleaned_tasks = [
 | |
|         'handle_be_interrupted_change_auth_task_periodic',
 | |
|     ]
 | |
|     logger.info('Start clean celery periodic tasks: {}'.format(need_cleaned_tasks))
 | |
|     for task_name in need_cleaned_tasks:
 | |
|         logger.info('Start clean task: {}'.format(task_name))
 | |
|         task = get_celery_periodic_task(task_name)
 | |
|         if task is None:
 | |
|             logger.info('Task does not exist: {}'.format(task_name))
 | |
|             continue
 | |
|         disable_celery_periodic_task(task_name)
 | |
|         delete_celery_periodic_task(task_name)
 | |
|         task = get_celery_periodic_task(task_name)
 | |
|         if task is None:
 | |
|             logger.info('Clean task success: {}'.format(task_name))
 | |
|         else:
 | |
|             logger.info('Clean task failure: {}'.format(task))
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| @after_app_ready_start
 | |
| def create_or_update_registered_periodic_tasks():
 | |
|     from .celery.decorator import get_register_period_tasks
 | |
|     for task in get_register_period_tasks():
 | |
|         create_or_update_celery_periodic_tasks(task)
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| @register_as_period_task(interval=3600)
 | |
| def check_server_performance_period():
 | |
|     if not settings.DISK_CHECK_ENABLED:
 | |
|         return
 | |
|     usages = get_disk_usage()
 | |
|     uncheck_paths = ['/etc', '/boot']
 | |
| 
 | |
|     for path, usage in usages.items():
 | |
|         need_check = True
 | |
|         for uncheck_path in uncheck_paths:
 | |
|             if path.startswith(uncheck_path):
 | |
|                 need_check = False
 | |
|         if need_check and usage.percent > 80:
 | |
|             ServerPerformanceMessage(path=path, usage=usage).publish()
 | |
| 
 | |
| 
 | |
| @shared_task(queue="ansible")
 | |
| def hello(name, callback=None):
 | |
|     import time
 | |
|     time.sleep(10)
 | |
|     print("Hello {}".format(name))
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| # @after_app_shutdown_clean_periodic
 | |
| # @register_as_period_task(interval=30)
 | |
| def hello123():
 | |
|     return None
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def hello_callback(result):
 | |
|     print(result)
 | |
|     print("Hello callback")
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def add(a, b):
 | |
|     time.sleep(5)
 | |
|     return a + b
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def add_m(x):
 | |
|     from celery import chain
 | |
|     a = range(x)
 | |
|     b = [a[i:i + 10] for i in range(0, len(a), 10)]
 | |
|     s = list()
 | |
|     s.append(add.s(b[0], b[1]))
 | |
|     for i in b[1:]:
 | |
|         s.append(add.s(i))
 | |
|     res = chain(*tuple(s))()
 | |
|     return res
 |