From a8112c86e3abf8258b0750d1fa478a654c29f73f Mon Sep 17 00:00:00 2001 From: fit2bot <68588906+fit2bot@users.noreply.github.com> Date: Wed, 10 Apr 2024 11:35:38 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=85=A8=E9=9D=A2=E4=BF=AE=E6=94=B9=20?= =?UTF-8?q?ansible=20=E6=89=A7=E8=A1=8C=E6=96=B9=E5=BC=8F=E4=B8=BA=20recep?= =?UTF-8?q?tor=20(#12975)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 修复 receptor kill job 的问题 * feat: 全面修改 ansible 执行方式为 receptor --------- Co-authored-by: Aaron3S --- apps/jumpserver/conf.py | 2 +- apps/ops/ansible/callback.py | 12 +----- apps/ops/ansible/receptor/receptor_runner.py | 45 ++++++++++++++++---- apps/ops/ansible/runner.py | 2 - apps/ops/models/job.py | 18 ++++---- receptor | 21 ++++++++- requirements/mac_pkg.sh | 7 +++ 7 files changed, 76 insertions(+), 31 deletions(-) diff --git a/apps/jumpserver/conf.py b/apps/jumpserver/conf.py index bc08e0bf2..8af82e875 100644 --- a/apps/jumpserver/conf.py +++ b/apps/jumpserver/conf.py @@ -618,7 +618,7 @@ class Config(dict): # Ansible Receptor 'ANSIBLE_RECEPTOR_ENABLE': True, - 'ANSIBLE_RECEPTOR_SOCK_PATH': '/opt/jumpserver/data/share/control.sock' + 'ANSIBLE_RECEPTOR_SOCK_PATH': 'data/share/control.sock' } old_config_map = { diff --git a/apps/ops/ansible/callback.py b/apps/ops/ansible/callback.py index bf4a28c4c..7ccd0888f 100644 --- a/apps/ops/ansible/callback.py +++ b/apps/ops/ansible/callback.py @@ -48,9 +48,6 @@ class DefaultCallback: event = data.get('event', None) if not event: return - # pid = data.get('pid', None) - # if pid: - # self.write_pid(pid) event_data = data.get('event_data', {}) host = event_data.get('remote_addr', '') task = event_data.get('task', '') @@ -158,11 +155,4 @@ class DefaultCallback: def status_handler(self, data, **kwargs): status = data.get('status', '') self.status = self.STATUS_MAPPER.get(status, 'unknown') - - rc = kwargs.get('runner_config', None) - # self.private_data_dir = rc.private_data_dir if rc else '/tmp/' - - # def write_pid(self, pid): - # pid_filepath = os.path.join(self.private_data_dir, 'local.pid') - # with open(pid_filepath, 'w') as f: - # f.write(str(pid)) + self.private_data_dir = data.get("private_data_dir", None) diff --git a/apps/ops/ansible/receptor/receptor_runner.py b/apps/ops/ansible/receptor/receptor_runner.py index 3f4893a97..f15b75bd9 100644 --- a/apps/ops/ansible/receptor/receptor_runner.py +++ b/apps/ops/ansible/receptor/receptor_runner.py @@ -1,16 +1,27 @@ import concurrent.futures +import os import queue import socket - +from django.conf import settings import ansible_runner +from django.utils.functional import LazyObject from receptorctl import ReceptorControl -receptor_ctl = ReceptorControl('control.sock') + +class WarpedReceptorctl(LazyObject): + def _setup(self): + self._wrapped = self.get_receptorctl() + + @staticmethod + def get_receptorctl(): + return ReceptorControl(settings.ANSIBLE_RECEPTOR_SOCK_PATH) -def init_receptor_ctl(sock_path): - global receptor_ctl - receptor_ctl = ReceptorControl(sock_path) +receptor_ctl = WarpedReceptorctl() + + +def cancel(unit_id): + return receptor_ctl.simple_command("work cancel {}".format(unit_id)) def nodes(): @@ -27,6 +38,14 @@ class AnsibleReceptorRunner: self.runner_params = kwargs self.unit_id = None + def write_unit_id(self): + if not self.unit_id: + return + private_dir = self.runner_params.get("private_data_dir", "") + with open(os.path.join(private_dir, "local.unitid"), "w") as f: + f.write(self.unit_id) + f.flush() + def run(self): input, output = socket.socketpair() @@ -38,6 +57,7 @@ class AnsibleReceptorRunner: output.close() self.unit_id = result['unitid'] + self.write_unit_id() transmitter_future.result() @@ -69,20 +89,29 @@ class AnsibleReceptorRunner: def processor(self, _result_file, stdout_queue): try: - original_handler = self.runner_params.pop("event_handler", None) + original_event_handler = self.runner_params.pop("event_handler", None) + original_status_handler = self.runner_params.pop("status_handler", None) def event_handler(data, **kwargs): stdout = data.get('stdout', '') if stdout: stdout_queue.put(stdout) - if original_handler: - original_handler(data, **kwargs) + if original_event_handler: + original_event_handler(data, **kwargs) + + def status_handler(data, **kwargs): + private_data_dir = self.runner_params.get("private_data_dir", None) + if private_data_dir: + data["private_data_dir"] = private_data_dir + if original_status_handler: + original_status_handler(data, **kwargs) return ansible_runner.interface.run( quite=True, streamer='process', _input=_result_file, event_handler=event_handler, + status_handler=status_handler, **self.runner_params, ) finally: diff --git a/apps/ops/ansible/runner.py b/apps/ops/ansible/runner.py index d7800aecf..ad5899683 100644 --- a/apps/ops/ansible/runner.py +++ b/apps/ops/ansible/runner.py @@ -26,8 +26,6 @@ class AnsibleWrappedRunner(LazyObject): @staticmethod def get_runner(): if settings.ANSIBLE_RECEPTOR_ENABLE and settings.ANSIBLE_RECEPTOR_SOCK_PATH: - logger.info("Ansible receptor enabled, run ansible task via receptor") - receptor_runner.init_receptor_ctl(settings.ANSIBLE_RECEPTOR_SOCK_PATH) return receptor_runner return ansible_runner diff --git a/apps/ops/models/job.py b/apps/ops/models/job.py index 723369e25..2b38592f8 100644 --- a/apps/ops/models/job.py +++ b/apps/ops/models/job.py @@ -23,6 +23,7 @@ from assets.models import Asset from assets.automations.base.manager import SSHTunnelManager from common.db.encoder import ModelJSONFieldEncoder from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner, CommandInBlackListException, UploadFileRunner +from ops.ansible.receptor import receptor_runner from ops.mixin import PeriodTaskModelMixin from ops.variables import * from ops.const import Types, RunasPolicies, JobStatus, JobModules @@ -521,15 +522,16 @@ class JobExecution(JMSOrgBaseModel): ssh_tunnel.local_gateway_clean(runner) def stop(self): - from ops.signal_handlers import job_execution_stop_pub_sub - with open(os.path.join(self.private_dir, 'local.pid')) as f: - try: - pid = f.read() - job_execution_stop_pub_sub.publish(int(pid)) - except Exception as e: - print(e) - self.set_error('Job stop by "kill -9 {}"'.format(pid)) + unit_id_path = os.path.join(self.private_dir, "local.unitid") + if os.path.exists(unit_id_path): + with open(unit_id_path) as f: + try: + unit_id = f.read() + receptor_runner.cancel(unit_id) + except Exception as e: + print(e) + self.set_error('Job stop by "receptor worker cancel, unit id is {}"'.format(unit_id)) class Meta: verbose_name = _("Job Execution") diff --git a/receptor b/receptor index 15170f0ba..e6a1493cb 100755 --- a/receptor +++ b/receptor @@ -2,12 +2,15 @@ # coding: utf-8 import argparse +import shutil import subprocess import os import signal +import tempfile ANSIBLE_RUNNER_COMMAND = "ansible-runner" -DEFAULT_CONTROL_SOCK_PATH = "/opt/jumpserver/data/share/control.sock" +DEFAULT_SHARE_DIR = "data/share" +DEFAULT_CONTROL_SOCK_PATH = os.path.join(DEFAULT_SHARE_DIR, "control.sock") class ReceptorService: @@ -27,7 +30,15 @@ class ReceptorService: 'allowruntimeparams=true' ] + @staticmethod + def before_start(): + os.makedirs(os.path.join(DEFAULT_SHARE_DIR), exist_ok=True) + status_dir = os.path.join(tempfile.gettempdir(), "receptor") + if os.path.exists(status_dir): + shutil.rmtree(status_dir) + def start(self): + self.before_start() if os.path.exists(self.pid_file): with open(self.pid_file, 'r') as f: pid_str = f.read() @@ -47,6 +58,14 @@ class ReceptorService: f.write(str(process.pid)) print("\n- Receptor service started successfully.") + def exit_handler(signum, frame): + process.terminate() + process.kill() + + signal.signal(signal.SIGINT, exit_handler) + signal.signal(signal.SIGTERM, exit_handler) + process.wait() + def stop(self): if not os.path.exists(self.pid_file): print("\n- Receptor service is not running.") diff --git a/requirements/mac_pkg.sh b/requirements/mac_pkg.sh index 4caf8dd1a..c70080059 100644 --- a/requirements/mac_pkg.sh +++ b/requirements/mac_pkg.sh @@ -21,3 +21,10 @@ fi echo "4. For Apple processor" LDFLAGS="-L$(brew --prefix freetds)/lib -L$(brew --prefix openssl@1.1)/lib" CFLAGS="-I$(brew --prefix freetds)/include" pip install $(grep 'pymssql' requirements.txt) + + +echo "5. Install Ansible Receptor" +export RECEPTOR_VERSION=v1.4.5 +export ARCH=`arch` +wget -O ${TMPDIR}receptor.tar.gz https://github.com/ansible/receptor/releases/download/${RECEPTOR_VERSION}/receptor_${RECEPTOR_VERSION/v/}_darwin_${ARCH}.tar.gz +tar -xf ${TMPDIR}receptor.tar.gz -C /opt/homebrew/bin/ \ No newline at end of file