diff --git a/apps/ops/api/job.py b/apps/ops/api/job.py index a2a0c00ee..8ccd16f61 100644 --- a/apps/ops/api/job.py +++ b/apps/ops/api/job.py @@ -1,5 +1,6 @@ import json import os +from psutil import NoSuchProcess from celery.result import AsyncResult from django.conf import settings @@ -198,7 +199,7 @@ class JobExecutionViewSet(OrgBulkModelViewSet): return Response({'error': serializer.errors}, status=400) task_id = serializer.validated_data['task_id'] try: - instance = get_object_or_404(JobExecution, task_id=task_id, creator=request.user) + instance = get_object_or_404(JobExecution, pk=task_id, creator=request.user) except Http404: return Response( {'error': _('The task is being created and cannot be interrupted. Please try again later.')}, @@ -207,7 +208,10 @@ class JobExecutionViewSet(OrgBulkModelViewSet): task = AsyncResult(task_id, app=app) inspect = app.control.inspect() + for worker in inspect.registered().keys(): + if not worker.startswith('ansible'): + continue if task_id not in [at['id'] for at in inspect.active().get(worker, [])]: # 在队列中未执行使用revoke执行 task.revoke(terminate=True) @@ -239,7 +243,7 @@ class JobExecutionTaskDetail(APIView): task_id = str(kwargs.get('task_id')) with tmp_to_org(org): - execution = get_object_or_404(JobExecution, task_id=task_id) + execution = get_object_or_404(JobExecution, pk=task_id) return Response(data={ 'status': execution.status, diff --git a/apps/ops/models/job.py b/apps/ops/models/job.py index f7831251b..3a067141a 100644 --- a/apps/ops/models/job.py +++ b/apps/ops/models/job.py @@ -555,10 +555,12 @@ class JobExecution(JMSOrgBaseModel): ssh_tunnel.local_gateway_clean(runner) def stop(self): + from ops.signal_handlers import job_execution_stop_pub_sub + with open(os.path.join(self.private_dir, 'local.pid')) as f: try: pid = f.read() - os.kill(int(pid), 9) + job_execution_stop_pub_sub.publish(int(pid)) except Exception as e: print(e) self.set_error('Job stop by "kill -9 {}"'.format(pid)) diff --git a/apps/ops/signal_handlers.py b/apps/ops/signal_handlers.py index 417561111..d00d33ad0 100644 --- a/apps/ops/signal_handlers.py +++ b/apps/ops/signal_handlers.py @@ -1,4 +1,6 @@ import ast +import psutil +from psutil import NoSuchProcess import time from celery import signals @@ -8,9 +10,11 @@ from django.db.models.signals import pre_save from django.db.utils import ProgrammingError from django.dispatch import receiver from django.utils import translation, timezone +from django.utils.functional import LazyObject from common.db.utils import close_old_connections, get_logger from common.signals import django_ready +from common.utils.connection import RedisPubSub from orgs.utils import get_current_org_id, set_current_org from .celery import app from .models import CeleryTaskExecution, CeleryTask, Job @@ -144,3 +148,39 @@ def task_sent_handler(headers=None, body=None, **kwargs): } CeleryTaskExecution.objects.create(**data) CeleryTask.objects.filter(name=task).update(date_last_publish=timezone.now()) + + +@receiver(django_ready) +def subscribe_stop_job_execution(sender, **kwargs): + logger.info("Start subscribe for stop job execution") + + def on_stop(pid): + logger.info(f"Stop job execution {pid} start") + try: + current_process = psutil.Process(pid) + except NoSuchProcess as e: + logger.error(e) + return + + children = current_process.children(recursive=True) + logger.debug(f"Job execution process children: {children}") + for child in children: + if child.pid == 1: + continue + if child.name() != 'ssh': + continue + try: + child.kill() + logger.debug(f"Kill job execution process {pid} children process {child.pid} success") + except Exception as e: + logger.error(e) + + job_execution_stop_pub_sub.subscribe(on_stop) + + +class JobExecutionPubSub(LazyObject): + def _setup(self): + self._wrapped = RedisPubSub('fm.job_execution_stop') + + +job_execution_stop_pub_sub = JobExecutionPubSub()