mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-25 06:42:49 +00:00
feat: 优化代码结构,receptor开关,修改为 tcp 通信 (#13078)
* feat: 优化代码结构,receptor开关,修改为 tcp 通信 * fix: 修改导包路径 * fix: 修复错别字 * fix: 修改导包路径 * perf: 优化代码 * fix: 修复任务不执行的问题 * perf: 优化配置项名称 * perf: 优化代码结构 * perf: 优化代码 --------- Co-authored-by: Aaron3S <chenyang@fit2cloud.com>
This commit is contained in:
3
apps/ops/ansible/runners/__init__.py
Normal file
3
apps/ops/ansible/runners/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .base import *
|
||||
from .native import *
|
||||
from .receptor import *
|
42
apps/ops/ansible/runners/base.py
Normal file
42
apps/ops/ansible/runners/base.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from ops.ansible.cleaner import WorkPostRunCleaner, cleanup_post_run
|
||||
|
||||
|
||||
class BaseRunner(WorkPostRunCleaner):
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.runner_params = kwargs
|
||||
self.clean_workspace = kwargs.pop("clean_workspace", True)
|
||||
|
||||
@classmethod
|
||||
def kill_precess(cls, pid):
|
||||
return NotImplementedError
|
||||
|
||||
@property
|
||||
def clean_dir(self):
|
||||
if not self.clean_workspace:
|
||||
return None
|
||||
return self.private_data_dir
|
||||
|
||||
@property
|
||||
def private_data_dir(self):
|
||||
return self.runner_params.get('private_data_dir', None)
|
||||
|
||||
def get_event_handler(self):
|
||||
_event_handler = self.runner_params.pop("event_handler", None)
|
||||
return _event_handler
|
||||
|
||||
def get_status_handler(self):
|
||||
_status_handler = self.runner_params.pop("status_handler", None)
|
||||
|
||||
if not _status_handler:
|
||||
return
|
||||
|
||||
def _handler(data, **kwargs):
|
||||
if self.private_data_dir:
|
||||
data["private_data_dir"] = self.private_data_dir
|
||||
_status_handler(data, **kwargs)
|
||||
|
||||
return _handler
|
||||
|
||||
def run(self):
|
||||
raise NotImplementedError()
|
24
apps/ops/ansible/runners/native.py
Normal file
24
apps/ops/ansible/runners/native.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import ansible_runner
|
||||
|
||||
from libs.process.ssh import kill_ansible_ssh_process
|
||||
from ops.ansible.cleaner import cleanup_post_run
|
||||
from ops.ansible.runners.base import BaseRunner
|
||||
|
||||
__all__ = ['AnsibleNativeRunner']
|
||||
|
||||
|
||||
class AnsibleNativeRunner(BaseRunner):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@classmethod
|
||||
def kill_precess(cls, pid):
|
||||
return kill_ansible_ssh_process(pid)
|
||||
|
||||
@cleanup_post_run
|
||||
def run(self):
|
||||
ansible_runner.run(
|
||||
event_handler=self.get_event_handler(),
|
||||
status_handler=self.get_status_handler(),
|
||||
**self.runner_params,
|
||||
)
|
100
apps/ops/ansible/runners/receptor.py
Normal file
100
apps/ops/ansible/runners/receptor.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import concurrent.futures
|
||||
import os
|
||||
import queue
|
||||
import socket
|
||||
|
||||
import ansible_runner
|
||||
|
||||
from ops.ansible.cleaner import cleanup_post_run
|
||||
from ops.ansible.runners.receptorctl.receptorctl import ReceptorCtl
|
||||
from ops.ansible.runners.base import BaseRunner
|
||||
|
||||
__all__ = ['AnsibleReceptorRunner']
|
||||
|
||||
receptor_ctl = ReceptorCtl()
|
||||
|
||||
|
||||
class AnsibleReceptorRunner(BaseRunner):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.unit_id = None
|
||||
self.stdout_queue = None
|
||||
|
||||
@classmethod
|
||||
def kill_precess(cls, pid):
|
||||
return receptor_ctl.kill_process(pid)
|
||||
|
||||
def write_unit_id(self):
|
||||
if not self.unit_id:
|
||||
return
|
||||
private_dir = self.runner_params.get("private_data_dir", "")
|
||||
with open(os.path.join(private_dir, "local.unitid"), "w") as f:
|
||||
f.write(self.unit_id)
|
||||
f.flush()
|
||||
|
||||
@cleanup_post_run
|
||||
def run(self):
|
||||
input, output = socket.socketpair()
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
transmitter_future = executor.submit(self.transmit, input)
|
||||
result = receptor_ctl.submit_work(payload=output.makefile('rb'),
|
||||
node='primary', worktype='ansible-runner')
|
||||
|
||||
input.close()
|
||||
output.close()
|
||||
|
||||
self.unit_id = result['unitid']
|
||||
self.write_unit_id()
|
||||
|
||||
transmitter_future.result()
|
||||
|
||||
result_file = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
|
||||
|
||||
self.stdout_queue = queue.Queue()
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
processor_future = executor.submit(self.processor, result_file)
|
||||
|
||||
while not processor_future.done() or \
|
||||
not self.stdout_queue.empty():
|
||||
msg = self.stdout_queue.get()
|
||||
if msg is None:
|
||||
break
|
||||
print(msg)
|
||||
|
||||
return processor_future.result()
|
||||
|
||||
def transmit(self, _socket):
|
||||
try:
|
||||
ansible_runner.run(
|
||||
streamer='transmit',
|
||||
_output=_socket.makefile('wb'),
|
||||
**self.runner_params
|
||||
)
|
||||
finally:
|
||||
_socket.shutdown(socket.SHUT_WR)
|
||||
|
||||
def get_event_handler(self):
|
||||
_event_handler = super().get_event_handler()
|
||||
|
||||
def _handler(data, **kwargs):
|
||||
stdout = data.get('stdout', '')
|
||||
if stdout:
|
||||
self.stdout_queue.put(stdout)
|
||||
_event_handler(data, **kwargs)
|
||||
|
||||
return _handler
|
||||
|
||||
def processor(self, _result_file):
|
||||
try:
|
||||
return ansible_runner.interface.run(
|
||||
quite=True,
|
||||
streamer='process',
|
||||
_input=_result_file,
|
||||
event_handler=self.get_event_handler(),
|
||||
status_handler=self.get_status_handler(),
|
||||
**self.runner_params,
|
||||
)
|
||||
finally:
|
||||
self.stdout_queue.put(None)
|
0
apps/ops/ansible/runners/receptorctl/__init__.py
Normal file
0
apps/ops/ansible/runners/receptorctl/__init__.py
Normal file
38
apps/ops/ansible/runners/receptorctl/receptorctl.py
Normal file
38
apps/ops/ansible/runners/receptorctl/receptorctl.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from django.conf import settings
|
||||
from receptorctl import ReceptorControl
|
||||
|
||||
|
||||
class ReceptorCtl:
|
||||
@property
|
||||
def ctl(self):
|
||||
return ReceptorControl("tcp://{}".format(settings.ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS))
|
||||
|
||||
def cancel(self, unit_id):
|
||||
return self.ctl.simple_command("work cancel {}".format(unit_id))
|
||||
|
||||
def nodes(self):
|
||||
return self.ctl.simple_command("status").get("Advertisements", None)
|
||||
|
||||
def submit_work(self,
|
||||
worktype,
|
||||
payload,
|
||||
node=None,
|
||||
tlsclient=None,
|
||||
ttl=None,
|
||||
signwork=False,
|
||||
params=None, ):
|
||||
return self.ctl.submit_work(worktype, payload, node, tlsclient, ttl, signwork, params)
|
||||
|
||||
def get_work_results(self, unit_id, startpos=0, return_socket=False, return_sockfile=True):
|
||||
return self.ctl.get_work_results(unit_id, startpos, return_socket, return_sockfile)
|
||||
|
||||
def kill_process(self, pid):
|
||||
submit_result = self.submit_work(worktype="kill", node="primary", payload=str(pid))
|
||||
unit_id = submit_result["unitid"]
|
||||
result_socket, result_file = self.get_work_results(unit_id=unit_id, return_sockfile=True,
|
||||
return_socket=True)
|
||||
while not result_socket.close():
|
||||
buf = result_file.read()
|
||||
if not buf:
|
||||
break
|
||||
print(buf.decode('utf8'))
|
Reference in New Issue
Block a user