From 5d08438dade4ff765c41ac8b44f4e6dc079283e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E5=B9=BF?= Date: Tue, 21 Jul 2020 15:33:33 +0800 Subject: [PATCH] =?UTF-8?q?feat(ops):=20=E9=A1=B9=E7=9B=AE=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E6=97=B6=EF=BC=8C=E6=B8=85=E9=99=A4=E6=8C=87=E5=AE=9A?= =?UTF-8?q?=E7=9A=84celery=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1;=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E8=8E=B7=E5=8F=96celery=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E5=87=BD=E6=95=B0=20(#4378)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(ops): 添加获取celery定时任务的函数 * feat(ops): 项目启动时,清除指定的celery定时任务 * feat(ops): 项目启动时,清除指定的celery定时任务 2 Co-authored-by: Bai --- apps/ops/celery/utils.py | 6 ++++++ apps/ops/tasks.py | 28 +++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/apps/ops/celery/utils.py b/apps/ops/celery/utils.py index 55ce4a9d1..0c758b70e 100644 --- a/apps/ops/celery/utils.py +++ b/apps/ops/celery/utils.py @@ -93,6 +93,12 @@ def delete_celery_periodic_task(task_name): PeriodicTasks.update_changed() +def get_celery_periodic_task(task_name): + from django_celery_beat.models import PeriodicTask + task = PeriodicTask.objects.filter(name=task_name).first() + return task + + def get_celery_task_log_path(task_id): task_id = str(task_id) rel_path = os.path.join(task_id[0], task_id[1], task_id + '.log') diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index 7ea11a3e4..3419c8976 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -15,7 +15,10 @@ from .celery.decorator import ( register_as_period_task, after_app_shutdown_clean_periodic, after_app_ready_start ) -from .celery.utils import create_or_update_celery_periodic_tasks +from .celery.utils import ( + create_or_update_celery_periodic_tasks, get_celery_periodic_task, + disable_celery_periodic_task, delete_celery_periodic_task +) from .models import Task, CommandExecution, CeleryTask from .utils import send_server_performance_mail @@ -95,6 +98,29 @@ def clean_celery_tasks_period(): subprocess.call(command, shell=True) +@shared_task +@after_app_ready_start +def clean_celery_periodic_tasks(): + """清除celery定时任务""" + need_cleaned_tasks = [ + 'handle_be_interrupted_change_auth_task_periodic', + ] + logger.info('Start clean celery periodic tasks: {}'.format(need_cleaned_tasks)) + for task_name in need_cleaned_tasks: + logger.info('Start clean task: {}'.format(task_name)) + task = get_celery_periodic_task(task_name) + if task is None: + logger.info('Task does not exist: {}'.format(task_name)) + continue + disable_celery_periodic_task(task_name) + delete_celery_periodic_task(task_name) + task = get_celery_periodic_task(task_name) + if task is None: + logger.info('Clean task success: {}'.format(task_name)) + else: + logger.info('Clean task failure: {}'.format(task)) + + @shared_task @after_app_ready_start def create_or_update_registered_periodic_tasks():