From 64e03a44129717866fdc1bd8d38cc9ac2a08dbd3 Mon Sep 17 00:00:00 2001 From: Aaron3S Date: Mon, 24 Oct 2022 20:14:18 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1api?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/drf/serializers/common.py | 4 +- apps/ops/api/adhoc.py | 2 +- apps/ops/api/celery.py | 28 ++++++-- .../ops/migrations/0027_auto_20221024_1709.py | 67 +++++++++++++++++++ .../ops/migrations/0028_auto_20221024_1712.py | 25 +++++++ apps/ops/models/celery.py | 19 ++++++ apps/ops/serializers/celery.py | 26 +++++-- apps/ops/signal_handlers.py | 23 +++++-- apps/ops/tasks.py | 6 +- apps/ops/urls/api_urls.py | 5 +- 10 files changed, 178 insertions(+), 27 deletions(-) create mode 100644 apps/ops/migrations/0027_auto_20221024_1709.py create mode 100644 apps/ops/migrations/0028_auto_20221024_1712.py diff --git a/apps/common/drf/serializers/common.py b/apps/common/drf/serializers/common.py index 688e1bfcc..a522e3a82 100644 --- a/apps/common/drf/serializers/common.py +++ b/apps/common/drf/serializers/common.py @@ -12,7 +12,7 @@ from .mixin import BulkListSerializerMixin, BulkSerializerMixin __all__ = [ 'MethodSerializer', 'EmptySerializer', 'BulkModelSerializer', - 'AdaptedBulkListSerializer', 'CeleryTaskSerializer', + 'AdaptedBulkListSerializer', 'CeleryTaskExecutionSerializer', 'WritableNestedModelSerializer', 'GroupedChoiceSerializer', ] @@ -73,7 +73,7 @@ class AdaptedBulkListSerializer(BulkListSerializerMixin, BulkListSerializer): pass -class CeleryTaskSerializer(serializers.Serializer): +class CeleryTaskExecutionSerializer(serializers.Serializer): task = serializers.CharField(read_only=True) diff --git a/apps/ops/api/adhoc.py b/apps/ops/api/adhoc.py index 8644ac5d2..71e818fbb 100644 --- a/apps/ops/api/adhoc.py +++ b/apps/ops/api/adhoc.py @@ -5,7 +5,7 @@ from django.shortcuts import get_object_or_404 from rest_framework import viewsets, generics from rest_framework.views import Response -from common.drf.serializers import CeleryTaskSerializer +from common.drf.serializers import CeleryTaskExecutionSerializer from ..models import AdHoc, AdHocExecution from ..serializers import ( AdHocSerializer, diff --git a/apps/ops/api/celery.py b/apps/ops/api/celery.py index cd452c471..5fa8c902b 100644 --- a/apps/ops/api/celery.py +++ b/apps/ops/api/celery.py @@ -4,6 +4,7 @@ import os import re +from celery import current_app from django.utils.translation import ugettext as _ from rest_framework import viewsets from celery.result import AsyncResult @@ -12,20 +13,21 @@ from django_celery_beat.models import PeriodicTask from common.permissions import IsValidUser from common.api import LogTailApi -from ..models import CeleryTask +from ..models import CeleryTaskExecution, CeleryTask from ..serializers import CeleryResultSerializer, CeleryPeriodTaskSerializer from ..celery.utils import get_celery_task_log_path from ..ansible.utils import get_ansible_task_log_path from common.mixins.api import CommonApiMixin - __all__ = [ - 'CeleryTaskLogApi', 'CeleryResultApi', 'CeleryPeriodTaskViewSet', - 'AnsibleTaskLogApi', + 'CeleryTaskExecutionLogApi', 'CeleryResultApi', 'CeleryPeriodTaskViewSet', + 'AnsibleTaskLogApi', 'CeleryTaskViewSet', 'CeleryTaskExecutionViewSet' ] +from ..serializers.celery import CeleryTaskSerializer, CeleryTaskExecutionSerializer -class CeleryTaskLogApi(LogTailApi): + +class CeleryTaskExecutionLogApi(LogTailApi): permission_classes = (IsValidUser,) task = None task_id = '' @@ -46,8 +48,8 @@ class CeleryTaskLogApi(LogTailApi): if new_path and os.path.isfile(new_path): return new_path try: - task = CeleryTask.objects.get(id=self.task_id) - except CeleryTask.DoesNotExist: + task = CeleryTaskExecution.objects.get(id=self.task_id) + except CeleryTaskExecution.DoesNotExist: return None return task.full_log_path @@ -94,3 +96,15 @@ class CeleryPeriodTaskViewSet(CommonApiMixin, viewsets.ModelViewSet): queryset = super().get_queryset() queryset = queryset.exclude(description='') return queryset + + +class CeleryTaskViewSet(CommonApiMixin, viewsets.ModelViewSet): + queryset = CeleryTask.objects.all() + serializer_class = CeleryTaskSerializer + http_method_names = ('get',) + + +class CeleryTaskExecutionViewSet(CommonApiMixin, viewsets.ModelViewSet): + queryset = CeleryTaskExecution.objects.all() + serializer_class = CeleryTaskExecutionSerializer + http_method_names = ('get',) diff --git a/apps/ops/migrations/0027_auto_20221024_1709.py b/apps/ops/migrations/0027_auto_20221024_1709.py new file mode 100644 index 000000000..4b29e4a3e --- /dev/null +++ b/apps/ops/migrations/0027_auto_20221024_1709.py @@ -0,0 +1,67 @@ +# Generated by Django 3.2.14 on 2022-10-24 09:09 + +from django.db import migrations, models +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ('ops', '0026_auto_20221009_2050'), + ] + + operations = [ + migrations.CreateModel( + name='CeleryTaskExecution', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)), + ('name', models.CharField(max_length=1024)), + ('args', models.JSONField(verbose_name='Args')), + ('kwargs', models.JSONField(verbose_name='Kwargs')), + ('state', models.CharField(max_length=16, verbose_name='State')), + ('is_finished', models.BooleanField(default=False, verbose_name='Finished')), + ('date_published', models.DateTimeField(auto_now_add=True)), + ('date_start', models.DateTimeField(null=True)), + ('date_finished', models.DateTimeField(null=True)), + ], + ), + migrations.RenameField( + model_name='celerytask', + old_name='date_finished', + new_name='date_last_published', + ), + migrations.RemoveField( + model_name='celerytask', + name='args', + ), + migrations.RemoveField( + model_name='celerytask', + name='date_published', + ), + migrations.RemoveField( + model_name='celerytask', + name='date_start', + ), + migrations.RemoveField( + model_name='celerytask', + name='is_finished', + ), + migrations.RemoveField( + model_name='celerytask', + name='kwargs', + ), + migrations.RemoveField( + model_name='celerytask', + name='state', + ), + migrations.AddField( + model_name='celerytask', + name='description', + field=models.CharField(max_length=2048, null=True), + ), + migrations.AddField( + model_name='celerytask', + name='verbose_name', + field=models.CharField(max_length=1024, null=True), + ), + ] diff --git a/apps/ops/migrations/0028_auto_20221024_1712.py b/apps/ops/migrations/0028_auto_20221024_1712.py new file mode 100644 index 000000000..d246adbd8 --- /dev/null +++ b/apps/ops/migrations/0028_auto_20221024_1712.py @@ -0,0 +1,25 @@ +# Generated by Django 3.2.14 on 2022-10-24 09:12 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('ops', '0027_auto_20221024_1709'), + ] + + operations = [ + migrations.RemoveField( + model_name='celerytask', + name='date_last_published', + ), + migrations.RemoveField( + model_name='celerytask', + name='description', + ), + migrations.RemoveField( + model_name='celerytask', + name='verbose_name', + ), + ] diff --git a/apps/ops/models/celery.py b/apps/ops/models/celery.py index 2291eb6f1..6ea4e2641 100644 --- a/apps/ops/models/celery.py +++ b/apps/ops/models/celery.py @@ -7,8 +7,27 @@ from django.utils.translation import gettext_lazy as _ from django.conf import settings from django.db import models +from ops.celery import app + class CeleryTask(models.Model): + id = models.UUIDField(primary_key=True, default=uuid.uuid4) + name = models.CharField(max_length=1024) + + @property + def verbose_name(self): + task = app.tasks.get(self.name, None) + if task: + return getattr(task, 'verbose_name', None) + + @property + def description(self): + task = app.tasks.get(self.name, None) + if task: + return getattr(task, 'description', None) + + +class CeleryTaskExecution(models.Model): LOG_DIR = os.path.join(settings.PROJECT_DIR, 'data', 'celery') id = models.UUIDField(primary_key=True, default=uuid.uuid4) name = models.CharField(max_length=1024) diff --git a/apps/ops/serializers/celery.py b/apps/ops/serializers/celery.py index 8015c2482..8122ed636 100644 --- a/apps/ops/serializers/celery.py +++ b/apps/ops/serializers/celery.py @@ -5,10 +5,12 @@ from rest_framework import serializers from django_celery_beat.models import PeriodicTask __all__ = [ - 'CeleryResultSerializer', 'CeleryTaskSerializer', - 'CeleryPeriodTaskSerializer' + 'CeleryResultSerializer', 'CeleryTaskExecutionSerializer', + 'CeleryPeriodTaskSerializer', 'CeleryTaskSerializer' ] +from ops.models import CeleryTask, CeleryTaskExecution + class CeleryResultSerializer(serializers.Serializer): id = serializers.UUIDField() @@ -16,10 +18,6 @@ class CeleryResultSerializer(serializers.Serializer): state = serializers.CharField(max_length=16) -class CeleryTaskSerializer(serializers.Serializer): - pass - - class CeleryPeriodTaskSerializer(serializers.ModelSerializer): class Meta: model = PeriodicTask @@ -27,3 +25,19 @@ class CeleryPeriodTaskSerializer(serializers.ModelSerializer): 'name', 'task', 'enabled', 'description', 'last_run_at', 'total_run_count' ] + + +class CeleryTaskSerializer(serializers.ModelSerializer): + class Meta: + model = CeleryTask + fields = [ + 'name', 'verbose_name', 'description', + ] + + +class CeleryTaskExecutionSerializer(serializers.ModelSerializer): + class Meta: + model = CeleryTaskExecution + fields = [ + "name", "args", "kwargs", "state", "is_finished", "date_published", "date_start", "date_finished" + ] diff --git a/apps/ops/signal_handlers.py b/apps/ops/signal_handlers.py index e48802d84..5882a10e8 100644 --- a/apps/ops/signal_handlers.py +++ b/apps/ops/signal_handlers.py @@ -1,12 +1,15 @@ import ast +from django.db import transaction +from django.dispatch import receiver from django.utils import translation, timezone from django.core.cache import cache -from celery import signals +from celery import signals, current_app from common.db.utils import close_old_connections, get_logger -from .models import CeleryTask - +from common.signals import django_ready +from .celery import app +from .models import CeleryTaskExecution, CeleryTask logger = get_logger(__name__) @@ -14,6 +17,14 @@ TASK_LANG_CACHE_KEY = 'TASK_LANG_{}' TASK_LANG_CACHE_TTL = 1800 +@receiver(django_ready) +def sync_registered_tasks(*args, **kwargs): + with transaction.atomic(): + CeleryTask.objects.all().delete() + for key in app.tasks: + CeleryTask(name=key).save() + + @signals.before_task_publish.connect def before_task_publish(headers=None, **kwargs): task_id = headers.get('id') @@ -25,7 +36,7 @@ def before_task_publish(headers=None, **kwargs): @signals.task_prerun.connect def on_celery_task_pre_run(task_id='', **kwargs): # 更新状态 - CeleryTask.objects.filter(id=task_id).update(state='RUNNING', date_start=timezone.now()) + CeleryTaskExecution.objects.filter(id=task_id).update(state='RUNNING', date_start=timezone.now()) # 关闭之前的数据库连接 close_old_connections() @@ -41,7 +52,7 @@ def on_celery_task_post_run(task_id='', state='', **kwargs): close_old_connections() print("Task post run: ", task_id, state) - CeleryTask.objects.filter(id=task_id).update( + CeleryTaskExecution.objects.filter(id=task_id).update( state=state, date_finished=timezone.now(), is_finished=True ) @@ -72,4 +83,4 @@ def task_sent_handler(headers=None, body=None, **kwargs): 'args': args, 'kwargs': kwargs } - CeleryTask.objects.create(**data) + CeleryTaskExecution.objects.create(**data) diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index e9ba28eb7..dc3ac6e68 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -20,7 +20,7 @@ from .celery.utils import ( create_or_update_celery_periodic_tasks, get_celery_periodic_task, disable_celery_periodic_task, delete_celery_periodic_task ) -from .models import CeleryTask, AdHoc, Playbook +from .models import CeleryTaskExecution, AdHoc, Playbook from .notifications import ServerPerformanceCheckUtil logger = get_logger(__file__) @@ -94,9 +94,9 @@ def clean_celery_tasks_period(): logger.debug("Start clean celery task history") expire_days = get_log_keep_day('TASK_LOG_KEEP_DAYS') days_ago = timezone.now() - timezone.timedelta(days=expire_days) - tasks = CeleryTask.objects.filter(date_start__lt=days_ago) + tasks = CeleryTaskExecution.objects.filter(date_start__lt=days_ago) tasks.delete() - tasks = CeleryTask.objects.filter(date_start__isnull=True) + tasks = CeleryTaskExecution.objects.filter(date_start__isnull=True) tasks.delete() command = "find %s -mtime +%s -name '*.log' -type f -exec rm -f {} \\;" % ( settings.CELERY_LOG_DIR, expire_days diff --git a/apps/ops/urls/api_urls.py b/apps/ops/urls/api_urls.py index 49038b9b1..bc263c184 100644 --- a/apps/ops/urls/api_urls.py +++ b/apps/ops/urls/api_urls.py @@ -6,7 +6,6 @@ from rest_framework.routers import DefaultRouter from rest_framework_bulk.routes import BulkRouter from .. import api - app_name = "ops" router = DefaultRouter() @@ -15,9 +14,11 @@ bulk_router = BulkRouter() router.register(r'adhoc', api.AdHocViewSet, 'adhoc') router.register(r'adhoc-executions', api.AdHocExecutionViewSet, 'execution') router.register(r'celery/period-tasks', api.CeleryPeriodTaskViewSet, 'celery-period-task') +router.register(r'celery/tasks', api.CeleryTaskViewSet, 'celery-task') +router.register(r'celery/task-executions', api.CeleryTaskExecutionViewSet, 'task-execution') urlpatterns = [ - path('celery/task//log/', api.CeleryTaskLogApi.as_view(), name='celery-task-log'), + path('celery/task//log/', api.CeleryTaskExecutionLogApi.as_view(), name='celery-task-log'), path('celery/task//result/', api.CeleryResultApi.as_view(), name='celery-result'), path('ansible/task//log/', api.AnsibleTaskLogApi.as_view(), name='ansible-task-log'),