feat: 全面修改 ansible 执行方式为 receptor (#12975)

* feat: 修复 receptor kill job  的问题

* feat: 全面修改 ansible 执行方式为 receptor

---------

Co-authored-by: Aaron3S <chenyang@fit2cloud.com>
This commit is contained in:
fit2bot 2024-04-10 11:35:38 +08:00 committed by GitHub
parent 8911c9c649
commit a8112c86e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 76 additions and 31 deletions

View File

@ -618,7 +618,7 @@ class Config(dict):
# Ansible Receptor # Ansible Receptor
'ANSIBLE_RECEPTOR_ENABLE': True, 'ANSIBLE_RECEPTOR_ENABLE': True,
'ANSIBLE_RECEPTOR_SOCK_PATH': '/opt/jumpserver/data/share/control.sock' 'ANSIBLE_RECEPTOR_SOCK_PATH': 'data/share/control.sock'
} }
old_config_map = { old_config_map = {

View File

@ -48,9 +48,6 @@ class DefaultCallback:
event = data.get('event', None) event = data.get('event', None)
if not event: if not event:
return return
# pid = data.get('pid', None)
# if pid:
# self.write_pid(pid)
event_data = data.get('event_data', {}) event_data = data.get('event_data', {})
host = event_data.get('remote_addr', '') host = event_data.get('remote_addr', '')
task = event_data.get('task', '') task = event_data.get('task', '')
@ -158,11 +155,4 @@ class DefaultCallback:
def status_handler(self, data, **kwargs): def status_handler(self, data, **kwargs):
status = data.get('status', '') status = data.get('status', '')
self.status = self.STATUS_MAPPER.get(status, 'unknown') self.status = self.STATUS_MAPPER.get(status, 'unknown')
self.private_data_dir = data.get("private_data_dir", None)
rc = kwargs.get('runner_config', None)
# self.private_data_dir = rc.private_data_dir if rc else '/tmp/'
# def write_pid(self, pid):
# pid_filepath = os.path.join(self.private_data_dir, 'local.pid')
# with open(pid_filepath, 'w') as f:
# f.write(str(pid))

View File

@ -1,16 +1,27 @@
import concurrent.futures import concurrent.futures
import os
import queue import queue
import socket import socket
from django.conf import settings
import ansible_runner import ansible_runner
from django.utils.functional import LazyObject
from receptorctl import ReceptorControl from receptorctl import ReceptorControl
receptor_ctl = ReceptorControl('control.sock')
class WarpedReceptorctl(LazyObject):
def _setup(self):
self._wrapped = self.get_receptorctl()
@staticmethod
def get_receptorctl():
return ReceptorControl(settings.ANSIBLE_RECEPTOR_SOCK_PATH)
def init_receptor_ctl(sock_path): receptor_ctl = WarpedReceptorctl()
global receptor_ctl
receptor_ctl = ReceptorControl(sock_path)
def cancel(unit_id):
return receptor_ctl.simple_command("work cancel {}".format(unit_id))
def nodes(): def nodes():
@ -27,6 +38,14 @@ class AnsibleReceptorRunner:
self.runner_params = kwargs self.runner_params = kwargs
self.unit_id = None self.unit_id = None
def write_unit_id(self):
if not self.unit_id:
return
private_dir = self.runner_params.get("private_data_dir", "")
with open(os.path.join(private_dir, "local.unitid"), "w") as f:
f.write(self.unit_id)
f.flush()
def run(self): def run(self):
input, output = socket.socketpair() input, output = socket.socketpair()
@ -38,6 +57,7 @@ class AnsibleReceptorRunner:
output.close() output.close()
self.unit_id = result['unitid'] self.unit_id = result['unitid']
self.write_unit_id()
transmitter_future.result() transmitter_future.result()
@ -69,20 +89,29 @@ class AnsibleReceptorRunner:
def processor(self, _result_file, stdout_queue): def processor(self, _result_file, stdout_queue):
try: try:
original_handler = self.runner_params.pop("event_handler", None) original_event_handler = self.runner_params.pop("event_handler", None)
original_status_handler = self.runner_params.pop("status_handler", None)
def event_handler(data, **kwargs): def event_handler(data, **kwargs):
stdout = data.get('stdout', '') stdout = data.get('stdout', '')
if stdout: if stdout:
stdout_queue.put(stdout) stdout_queue.put(stdout)
if original_handler: if original_event_handler:
original_handler(data, **kwargs) original_event_handler(data, **kwargs)
def status_handler(data, **kwargs):
private_data_dir = self.runner_params.get("private_data_dir", None)
if private_data_dir:
data["private_data_dir"] = private_data_dir
if original_status_handler:
original_status_handler(data, **kwargs)
return ansible_runner.interface.run( return ansible_runner.interface.run(
quite=True, quite=True,
streamer='process', streamer='process',
_input=_result_file, _input=_result_file,
event_handler=event_handler, event_handler=event_handler,
status_handler=status_handler,
**self.runner_params, **self.runner_params,
) )
finally: finally:

View File

@ -26,8 +26,6 @@ class AnsibleWrappedRunner(LazyObject):
@staticmethod @staticmethod
def get_runner(): def get_runner():
if settings.ANSIBLE_RECEPTOR_ENABLE and settings.ANSIBLE_RECEPTOR_SOCK_PATH: if settings.ANSIBLE_RECEPTOR_ENABLE and settings.ANSIBLE_RECEPTOR_SOCK_PATH:
logger.info("Ansible receptor enabled, run ansible task via receptor")
receptor_runner.init_receptor_ctl(settings.ANSIBLE_RECEPTOR_SOCK_PATH)
return receptor_runner return receptor_runner
return ansible_runner return ansible_runner

View File

@ -23,6 +23,7 @@ from assets.models import Asset
from assets.automations.base.manager import SSHTunnelManager from assets.automations.base.manager import SSHTunnelManager
from common.db.encoder import ModelJSONFieldEncoder from common.db.encoder import ModelJSONFieldEncoder
from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner, CommandInBlackListException, UploadFileRunner from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner, CommandInBlackListException, UploadFileRunner
from ops.ansible.receptor import receptor_runner
from ops.mixin import PeriodTaskModelMixin from ops.mixin import PeriodTaskModelMixin
from ops.variables import * from ops.variables import *
from ops.const import Types, RunasPolicies, JobStatus, JobModules from ops.const import Types, RunasPolicies, JobStatus, JobModules
@ -521,15 +522,16 @@ class JobExecution(JMSOrgBaseModel):
ssh_tunnel.local_gateway_clean(runner) ssh_tunnel.local_gateway_clean(runner)
def stop(self): def stop(self):
from ops.signal_handlers import job_execution_stop_pub_sub
with open(os.path.join(self.private_dir, 'local.pid')) as f: unit_id_path = os.path.join(self.private_dir, "local.unitid")
try: if os.path.exists(unit_id_path):
pid = f.read() with open(unit_id_path) as f:
job_execution_stop_pub_sub.publish(int(pid)) try:
except Exception as e: unit_id = f.read()
print(e) receptor_runner.cancel(unit_id)
self.set_error('Job stop by "kill -9 {}"'.format(pid)) except Exception as e:
print(e)
self.set_error('Job stop by "receptor worker cancel, unit id is {}"'.format(unit_id))
class Meta: class Meta:
verbose_name = _("Job Execution") verbose_name = _("Job Execution")

View File

@ -2,12 +2,15 @@
# coding: utf-8 # coding: utf-8
import argparse import argparse
import shutil
import subprocess import subprocess
import os import os
import signal import signal
import tempfile
ANSIBLE_RUNNER_COMMAND = "ansible-runner" ANSIBLE_RUNNER_COMMAND = "ansible-runner"
DEFAULT_CONTROL_SOCK_PATH = "/opt/jumpserver/data/share/control.sock" DEFAULT_SHARE_DIR = "data/share"
DEFAULT_CONTROL_SOCK_PATH = os.path.join(DEFAULT_SHARE_DIR, "control.sock")
class ReceptorService: class ReceptorService:
@ -27,7 +30,15 @@ class ReceptorService:
'allowruntimeparams=true' 'allowruntimeparams=true'
] ]
@staticmethod
def before_start():
os.makedirs(os.path.join(DEFAULT_SHARE_DIR), exist_ok=True)
status_dir = os.path.join(tempfile.gettempdir(), "receptor")
if os.path.exists(status_dir):
shutil.rmtree(status_dir)
def start(self): def start(self):
self.before_start()
if os.path.exists(self.pid_file): if os.path.exists(self.pid_file):
with open(self.pid_file, 'r') as f: with open(self.pid_file, 'r') as f:
pid_str = f.read() pid_str = f.read()
@ -47,6 +58,14 @@ class ReceptorService:
f.write(str(process.pid)) f.write(str(process.pid))
print("\n- Receptor service started successfully.") print("\n- Receptor service started successfully.")
def exit_handler(signum, frame):
process.terminate()
process.kill()
signal.signal(signal.SIGINT, exit_handler)
signal.signal(signal.SIGTERM, exit_handler)
process.wait()
def stop(self): def stop(self):
if not os.path.exists(self.pid_file): if not os.path.exists(self.pid_file):
print("\n- Receptor service is not running.") print("\n- Receptor service is not running.")

View File

@ -21,3 +21,10 @@ fi
echo "4. For Apple processor" echo "4. For Apple processor"
LDFLAGS="-L$(brew --prefix freetds)/lib -L$(brew --prefix openssl@1.1)/lib" CFLAGS="-I$(brew --prefix freetds)/include" pip install $(grep 'pymssql' requirements.txt) LDFLAGS="-L$(brew --prefix freetds)/lib -L$(brew --prefix openssl@1.1)/lib" CFLAGS="-I$(brew --prefix freetds)/include" pip install $(grep 'pymssql' requirements.txt)
echo "5. Install Ansible Receptor"
export RECEPTOR_VERSION=v1.4.5
export ARCH=`arch`
wget -O ${TMPDIR}receptor.tar.gz https://github.com/ansible/receptor/releases/download/${RECEPTOR_VERSION}/receptor_${RECEPTOR_VERSION/v/}_darwin_${ARCH}.tar.gz
tar -xf ${TMPDIR}receptor.tar.gz -C /opt/homebrew/bin/