From 138a3a2f46b80377f9f33faa40b36e4fe6cf86b7 Mon Sep 17 00:00:00 2001 From: Aaron3S Date: Wed, 17 Apr 2024 18:19:39 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20receptor=5Fctl=20?= =?UTF-8?q?=E7=9A=84=E5=B9=B6=E5=8F=91=E5=AE=89=E5=85=A8=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/ops/ansible/receptor/receptor_runner.py | 62 +++++++++++--------- apps/ops/signal_handlers.py | 4 +- 2 files changed, 35 insertions(+), 31 deletions(-) diff --git a/apps/ops/ansible/receptor/receptor_runner.py b/apps/ops/ansible/receptor/receptor_runner.py index 091c86c36..3c493bd3c 100644 --- a/apps/ops/ansible/receptor/receptor_runner.py +++ b/apps/ops/ansible/receptor/receptor_runner.py @@ -5,44 +5,48 @@ import socket from django.conf import settings import ansible_runner -from django.utils.functional import LazyObject from receptorctl import ReceptorControl from ops.ansible.cleaner import WorkPostRunCleaner, cleanup_post_run -class WarpedReceptorctl(LazyObject): - def _setup(self): - self._wrapped = self.get_receptorctl() - - @staticmethod - def get_receptorctl(): +class ReceptorCtl: + @property + def ctl(self): return ReceptorControl(settings.ANSIBLE_RECEPTOR_SOCK_PATH) + def cancel(self, unit_id): + return self.ctl.simple_command("work cancel {}".format(unit_id)) -receptor_ctl = WarpedReceptorctl() + def nodes(self): + return self.ctl.simple_command("status").get("Advertisements", None) + + def submit_work(self, + worktype, + payload, + node=None, + tlsclient=None, + ttl=None, + signwork=False, + params=None, ): + return self.ctl.submit_work(worktype, payload, node, tlsclient, ttl, signwork, params) + + def get_work_results(self, unit_id, startpos=0, return_socket=False, return_sockfile=True): + return self.ctl.get_work_results(unit_id, startpos, return_socket, return_sockfile) + + def kill_process(self, pid): + submit_result = self.submit_work(worktype="kill", node="primary", payload=str(pid)) + unit_id = submit_result["unitid"] + result_socket, result_file = self.get_work_results(unit_id=unit_id, return_sockfile=True, + return_socket=True) + while not result_socket.close(): + buf = result_file.read() + if not buf: + break + print(buf.decode('utf8')) -def cancel(unit_id): - return receptor_ctl.simple_command("work cancel {}".format(unit_id)) - - -def nodes(): - return receptor_ctl.simple_command("status").get("Advertisements", None) - - -def kill_process(pid): - submit_result = receptor_ctl.submit_work(worktype="kill", node="primary", payload=str(pid)) - - unit_id = submit_result["unitid"] - - result_socket, result_file = receptor_ctl.get_work_results(unit_id=unit_id, return_sockfile=True, - return_socket=True) - while not result_socket.close(): - buf = result_file.read() - if not buf: - break - print(buf.decode('utf8')) +receptor_ctl = ReceptorCtl() def run(**kwargs): @@ -74,7 +78,7 @@ class AnsibleReceptorRunner(WorkPostRunCleaner): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: transmitter_future = executor.submit(self.transmit, input) result = receptor_ctl.submit_work(payload=output.makefile('rb'), - node='primary', worktype='ansible-runner') + node='primary', worktype='ansible-runner') input.close() output.close() diff --git a/apps/ops/signal_handlers.py b/apps/ops/signal_handlers.py index f2c147a12..2d56de828 100644 --- a/apps/ops/signal_handlers.py +++ b/apps/ops/signal_handlers.py @@ -15,7 +15,7 @@ from common.signals import django_ready from common.utils.connection import RedisPubSub from jumpserver.utils import get_current_request from orgs.utils import get_current_org_id, set_current_org -from .ansible.receptor.receptor_runner import kill_process +from .ansible.receptor.receptor_runner import receptor_ctl from .celery import app from .models import CeleryTaskExecution, CeleryTask, Job @@ -159,7 +159,7 @@ def subscribe_stop_job_execution(sender, **kwargs): def on_stop(pid): logger.info(f"Stop job execution {pid} start") - kill_process(pid) + receptor_ctl.kill_process(pid) job_execution_stop_pub_sub.subscribe(on_stop)