mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-18 16:39:28 +00:00
perf: 优化异步认识显示名称
This commit is contained in:
@@ -104,10 +104,12 @@ class CelerySummaryAPIView(generics.RetrieveAPIView):
|
||||
|
||||
|
||||
class CeleryTaskViewSet(CommonApiMixin, viewsets.ReadOnlyModelViewSet):
|
||||
queryset = CeleryTask.objects.all()
|
||||
serializer_class = CeleryTaskSerializer
|
||||
http_method_names = ('get', 'head', 'options',)
|
||||
|
||||
def get_queryset(self):
|
||||
return CeleryTask.objects.exclude(name__startswith='celery')
|
||||
|
||||
|
||||
class CeleryTaskExecutionViewSet(CommonApiMixin, viewsets.ReadOnlyModelViewSet):
|
||||
serializer_class = CeleryTaskExecutionSerializer
|
||||
|
@@ -37,6 +37,9 @@ class CeleryTask(models.Model):
|
||||
return "yellow"
|
||||
return "green"
|
||||
|
||||
class Meta:
|
||||
ordering = ('name',)
|
||||
|
||||
|
||||
class CeleryTaskExecution(models.Model):
|
||||
LOG_DIR = os.path.join(settings.PROJECT_DIR, 'data', 'celery')
|
||||
|
@@ -16,4 +16,4 @@ class Playbook(BaseCreateUpdateModel):
|
||||
|
||||
@property
|
||||
def work_path(self):
|
||||
return os.path.join(settings.DATA_DIR, "ops", "playbook", self.id.__str__())
|
||||
return os.path.join(settings.DATA_DIR, "ops", "playbook", self.id.__str__(), "main.yaml")
|
||||
|
@@ -1,19 +1,15 @@
|
||||
# coding: utf-8
|
||||
import os
|
||||
import random
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from django.conf import settings
|
||||
from celery import shared_task, subtask
|
||||
from celery import signals
|
||||
from celery import shared_task
|
||||
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from django.utils import timezone
|
||||
from django.utils.translation import ugettext_lazy as _, gettext
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from common.utils import get_logger, get_object_or_none, get_log_keep_day
|
||||
from orgs.utils import tmp_to_root_org, tmp_to_org
|
||||
from .celery.decorator import (
|
||||
register_as_period_task, after_app_shutdown_clean_periodic,
|
||||
after_app_ready_start
|
||||
@@ -22,16 +18,12 @@ from .celery.utils import (
|
||||
create_or_update_celery_periodic_tasks, get_celery_periodic_task,
|
||||
disable_celery_periodic_task, delete_celery_periodic_task
|
||||
)
|
||||
from .models import CeleryTaskExecution, Playbook, Job, JobExecution
|
||||
from .models import CeleryTaskExecution, Job, JobExecution
|
||||
from .notifications import ServerPerformanceCheckUtil
|
||||
|
||||
logger = get_logger(__file__)
|
||||
|
||||
|
||||
def rerun_task():
|
||||
pass
|
||||
|
||||
|
||||
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task"))
|
||||
def run_ops_job(job_id, **kwargs):
|
||||
job = get_object_or_none(Job, id=job_id)
|
||||
@@ -59,64 +51,7 @@ def run_ops_job_executions(execution_id, **kwargs):
|
||||
logger.error("Start adhoc execution error: {}".format(e))
|
||||
|
||||
|
||||
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task"))
|
||||
def run_adhoc(tid, **kwargs):
|
||||
"""
|
||||
:param tid: is the tasks serialized data
|
||||
:param callback: callback function name
|
||||
:return:
|
||||
"""
|
||||
with tmp_to_root_org():
|
||||
task = get_object_or_none(AdHoc, id=tid)
|
||||
if not task:
|
||||
logger.error("No task found")
|
||||
return
|
||||
with tmp_to_org(task.org):
|
||||
execution = task.create_execution()
|
||||
try:
|
||||
execution.start(**kwargs)
|
||||
except SoftTimeLimitExceeded:
|
||||
execution.set_error('Run timeout')
|
||||
logger.error("Run adhoc timeout")
|
||||
except Exception as e:
|
||||
execution.set_error(e)
|
||||
logger.error("Start adhoc execution error: {}".format(e))
|
||||
|
||||
|
||||
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible command"))
|
||||
def run_playbook(pid, **kwargs):
|
||||
with tmp_to_root_org():
|
||||
task = get_object_or_none(Playbook, id=pid)
|
||||
if not task:
|
||||
logger.error("No task found")
|
||||
return
|
||||
|
||||
with tmp_to_org(task.org):
|
||||
execution = task.create_execution()
|
||||
try:
|
||||
execution.start(**kwargs)
|
||||
except SoftTimeLimitExceeded:
|
||||
execution.set_error('Run timeout')
|
||||
logger.error("Run playbook timeout")
|
||||
except Exception as e:
|
||||
execution.set_error(e)
|
||||
logger.error("Run playbook execution error: {}".format(e))
|
||||
|
||||
|
||||
@shared_task
|
||||
@after_app_shutdown_clean_periodic
|
||||
@register_as_period_task(interval=3600 * 24, description=_("Clean task history period"))
|
||||
def clean_tasks_adhoc_period():
|
||||
logger.debug("Start clean task adhoc and run history")
|
||||
tasks = Task.objects.all()
|
||||
for task in tasks:
|
||||
adhoc = task.adhoc.all().order_by('-date_created')[5:]
|
||||
for ad in adhoc:
|
||||
ad.execution.all().delete()
|
||||
ad.delete()
|
||||
|
||||
|
||||
@shared_task
|
||||
@shared_task(verbose_name=_('Periodic clear celery tasks'))
|
||||
@after_app_shutdown_clean_periodic
|
||||
@register_as_period_task(interval=3600 * 24, description=_("Clean celery log period"))
|
||||
def clean_celery_tasks_period():
|
||||
@@ -135,7 +70,7 @@ def clean_celery_tasks_period():
|
||||
subprocess.call(command, shell=True)
|
||||
|
||||
|
||||
@shared_task
|
||||
@shared_task(verbose_name=_('Clear celery periodic tasks'))
|
||||
@after_app_ready_start
|
||||
def clean_celery_periodic_tasks():
|
||||
"""清除celery定时任务"""
|
||||
@@ -158,7 +93,7 @@ def clean_celery_periodic_tasks():
|
||||
logger.info('Clean task failure: {}'.format(task))
|
||||
|
||||
|
||||
@shared_task
|
||||
@shared_task(verbose_name=_('Create or update periodic tasks'))
|
||||
@after_app_ready_start
|
||||
def create_or_update_registered_periodic_tasks():
|
||||
from .celery.decorator import get_register_period_tasks
|
||||
@@ -166,42 +101,7 @@ def create_or_update_registered_periodic_tasks():
|
||||
create_or_update_celery_periodic_tasks(task)
|
||||
|
||||
|
||||
@shared_task
|
||||
@shared_task(verbose_name=_("Periodic check service performance"))
|
||||
@register_as_period_task(interval=3600)
|
||||
def check_server_performance_period():
|
||||
ServerPerformanceCheckUtil().check_and_publish()
|
||||
|
||||
|
||||
@shared_task(verbose_name=_("Hello"), comment="an test shared task")
|
||||
def hello(name, callback=None):
|
||||
from users.models import User
|
||||
import time
|
||||
|
||||
count = User.objects.count()
|
||||
print(gettext("Hello") + ': ' + name)
|
||||
print("Count: ", count)
|
||||
time.sleep(1)
|
||||
return gettext("Hello")
|
||||
|
||||
|
||||
@shared_task(verbose_name=_("Hello Error"), comment="an test shared task error")
|
||||
def hello_error():
|
||||
raise Exception("must be error")
|
||||
|
||||
|
||||
@shared_task(verbose_name=_("Hello Random"), comment="some time error and some time success")
|
||||
def hello_random():
|
||||
i = random.randint(0, 1)
|
||||
if i == 1:
|
||||
raise Exception("must be error")
|
||||
|
||||
|
||||
@shared_task(verbose_name="Hello Running", comment="an task running 1m")
|
||||
def hello_running(sec=60):
|
||||
time.sleep(sec)
|
||||
|
||||
|
||||
@shared_task
|
||||
def hello_callback(result):
|
||||
print(result)
|
||||
print("Hello callback")
|
||||
|
Reference in New Issue
Block a user