perf: remote_client add debug

This commit is contained in:
feng
2026-05-28 13:14:38 +08:00
committed by 老广
parent 605a134a8c
commit 27051574d7
2 changed files with 446 additions and 9 deletions

View File

@@ -1,6 +1,9 @@
import os
import re
import signal
import sys
import time
import traceback
from functools import wraps
import paramiko
@@ -79,6 +82,29 @@ def _strip_wrapping_quotes(value):
return value
def _build_switch_state_re():
return re.compile(
r'__JMS_SWITCH__:[^\r\n]*',
flags=re.IGNORECASE,
)
def _shorten_text(value, limit=300):
if value is None:
return value
text = str(value).replace('\r', '\\r').replace('\n', '\\n')
if len(text) <= limit:
return text
return text[:limit] + f'...<{len(text)} chars>'
def _extract_switch_state(output):
if not output:
return None
matches = re.findall(r'__JMS_SWITCH__:[^\r\n]*', output)
return matches[-1] if matches else None
class OldSSHTransport(paramiko.transport.Transport):
_preferred_pubkeys = (
"ssh-ed25519",
@@ -98,12 +124,48 @@ class SSHClient:
self.gateway_server = None
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.debug_enabled = (
str(os.environ.get('JMS_REMOTE_CLIENT_DEBUG', '')).lower()
in {'1', 'true', 'yes', 'on'}
)
self.connect_params = self.get_connect_params()
self._channel = None
self.buffer_size = 1024
self.prompt = self.module.params['prompt']
self.timeout = self.module.params['recv_timeout']
self._debug(
'init',
login_host=self.module.params.get('login_host'),
login_port=self.module.params.get('login_port'),
login_user=self.module.params.get('login_user'),
become=self.module.params.get('become'),
become_method=self.module.params.get('become_method'),
become_user=self.module.params.get('become_user'),
has_gateway=bool(self.module.params.get('gateway_args')),
old_ssh_version=self.module.params.get('old_ssh_version'),
)
def _debug(self, event, **kwargs):
if not self.debug_enabled:
return
details = ', '.join(
f'{key}={_shorten_text(value)}'
for key, value in kwargs.items()
)
message = f'[remote_client] {event}'
if details:
message += f' | {details}'
print(message, file=sys.stderr, flush=True)
def _sanitize_command(self, command):
secrets = {
self.module.params.get('login_password'),
self.module.params.get('become_password'),
}
if command and command in secrets:
return '<redacted>'
return command
@property
def channel(self):
@@ -133,15 +195,31 @@ class SSHClient:
if p['old_ssh_version']:
params['transport_factory'] = OldSSHTransport
self._debug(
'connect params prepared',
hostname=params.get('hostname'),
port=params.get('port'),
username=params.get('username'),
has_password=bool(params.get('password')),
key_filename=params.get('key_filename'),
transport_factory=getattr(params.get('transport_factory'), '__name__', None),
)
return params
def switch_user(self):
p = self.module.params
if not p['become']:
self._debug('switch user skipped', reason='become disabled')
return
method = p['become_method']
username = p['login_user']
self._debug(
'switch user start',
method=method,
connect_as=self.connect_params.get('username'),
target_user=username,
)
if method == 'sudo':
switch_cmd = 'sudo su -'
@@ -150,22 +228,69 @@ class SSHClient:
switch_cmd = 'su -'
pword = p['login_password']
else:
self._debug('switch user unsupported', method=method)
self.module.fail_json(msg=f'Become method {method} not supported.')
return
# Expected to see a prompt, type the password, and check the username
output, error = self.execute(
[f'{switch_cmd} {username}', pword, 'whoami'],
[become_prompt_re, DEFAULT_RE, username]
# Username-based verification is unreliable for UID 0 alias accounts:
# `su - useradmin` may legitimately land in a shell that reports
# `root/root` for USER and LOGNAME. Compare shell state before and
# after `su` instead; if password auth fails, the marker runs in the
# original shell and the state stays unchanged.
switch_state_cmd = 'printf "__JMS_SWITCH__:%s:%s:%s\\n" "$USER" "$LOGNAME" "$HOME"'
switch_state_re = _build_switch_state_re()
baseline_output, baseline_error = self.execute(
[switch_state_cmd],
[switch_state_re]
)
baseline_state = _extract_switch_state(baseline_output)
self._debug(
'switch user baseline',
output=baseline_output,
error=baseline_error,
state=baseline_state,
)
if baseline_error:
self.module.fail_json(msg=f'Failed to capture shell state before switching user. Output: {baseline_output}')
# Expected to see a prompt, type the password, and verify the target
# shell state is no longer the original login shell.
output, error = self.execute(
[f'{switch_cmd} {username}', pword, switch_state_cmd],
[become_prompt_re, DEFAULT_RE, switch_state_re]
)
switched_state = _extract_switch_state(output)
self._debug('switch user result', output=output, error=error)
if error:
self.module.fail_json(msg=f'Failed to become user {username}. Output: {output}')
if baseline_state == switched_state:
self.module.fail_json(
msg=(
f'Failed to become user {username}. '
f'Shell state did not change. Output: {output}'
)
)
def connect(self):
self.before_runner_start()
try:
self._debug(
'connect start',
hostname=self.connect_params.get('hostname'),
port=self.connect_params.get('port'),
username=self.connect_params.get('username'),
)
self.before_runner_start()
self._debug(
'connect after gateway prepare',
hostname=self.connect_params.get('hostname'),
port=self.connect_params.get('port'),
username=self.connect_params.get('username'),
)
self.client.connect(**self.connect_params)
self._debug('client.connect ok')
self._channel = self.client.invoke_shell()
self._debug('invoke_shell ok')
# Always perform a gentle handshake that works for servers and
# network devices: drain banner, brief settle, send newline, then
# read in quiet mode to avoid blocking on missing prompt.
@@ -181,7 +306,13 @@ class SSHClient:
pass
self._get_match_recv()
self.switch_user()
self._debug('connect complete')
except Exception as error:
self._debug(
'connect failed',
error=str(error),
traceback=traceback.format_exc(),
)
self.module.fail_json(msg=str(error))
@staticmethod
@@ -242,6 +373,12 @@ class SSHClient:
prev_str = buffer_str
time.sleep(0.01)
self._debug(
'recv complete',
use_regex_match=use_regex_match,
check_reg=check_reg,
output=buffer_str,
)
return buffer_str
@raise_timeout('Wait send message')
@@ -256,13 +393,27 @@ class SSHClient:
try:
answers = self._fit_answers(commands, answers)
for cmd, ans_regex in zip(commands, answers):
self._debug('execute start', total_commands=len(commands))
for index, (cmd, ans_regex) in enumerate(zip(commands, answers), start=1):
self._check_send()
self._debug(
'execute send',
index=index,
command=self._sanitize_command(cmd),
answer_reg=ans_regex,
)
self.channel.send(cmd + '\n')
combined_output += self._get_match_recv(ans_regex) + '\n'
output = self._get_match_recv(ans_regex)
combined_output += output + '\n'
self._debug('execute recv', index=index, output=output)
except Exception as e:
error_msg = str(e)
self._debug(
'execute failed',
error=error_msg,
traceback=traceback.format_exc(),
)
return combined_output, error_msg
@@ -274,11 +425,23 @@ class SSHClient:
)
match = re.search(pattern, gateway_args)
if not match:
if gateway_args:
self._debug('gateway parse skipped', gateway_args=gateway_args)
return
password, port, username, remote_addr, key_path = match.groups()
password = _strip_wrapping_quotes(password) or None
key_path = _strip_wrapping_quotes(key_path) or None
self._debug(
'gateway parsed',
gateway_host=remote_addr,
gateway_port=port,
gateway_user=username,
has_password=bool(password),
key_path=key_path,
remote_bind_host=self.module.params['login_host'],
remote_bind_port=self.module.params['login_port'],
)
server = SSHTunnelForwarder(
(remote_addr, int(port)),
@@ -291,14 +454,25 @@ class SSHClient:
)
)
server.start()
try:
server.start()
except Exception:
self._debug('gateway start failed', traceback=traceback.format_exc())
raise
self.connect_params['hostname'] = '127.0.0.1'
self.connect_params['port'] = server.local_bind_port
self.gateway_server = server
self._debug(
'gateway start ok',
local_bind_host=self.connect_params['hostname'],
local_bind_port=self.connect_params['port'],
)
def local_gateway_clean(self):
if self.gateway_server:
self._debug('gateway stop start')
self.gateway_server.stop()
self._debug('gateway stop ok')
def before_runner_start(self):
self.local_gateway_prepare()
@@ -317,4 +491,4 @@ class SSHClient:
if self.client:
self.client.close()
except Exception: # noqa
pass
self._debug('cleanup failed', traceback=traceback.format_exc())

View File

@@ -0,0 +1,263 @@
#!/usr/bin/env python3
import argparse
import json
import os
import signal
import sys
import traceback
from pathlib import Path
def find_root_dir():
current = Path(__file__).resolve().parent
for candidate in [current, *current.parents]:
apps_dir = candidate / 'apps'
remote_client_file = apps_dir / 'libs' / 'ansible' / 'modules_utils' / 'remote_client.py'
if remote_client_file.exists():
return candidate
raise RuntimeError('Could not locate project root containing apps/libs/ansible/modules_utils/remote_client.py')
ROOT_DIR = find_root_dir()
APPS_DIR = ROOT_DIR / 'apps'
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
if str(APPS_DIR) not in sys.path:
sys.path.insert(0, str(APPS_DIR))
from libs.ansible.modules_utils.remote_client import SSHClient # noqa: E402
class DummyModule:
def __init__(self, params):
self.params = params
def fail_json(self, **kwargs):
raise RuntimeError(kwargs['msg'])
def mask(value):
return '***' if value else value
def load_inventory(args):
if args.inventory_file:
with open(args.inventory_file, 'r', encoding='utf-8') as f:
return json.load(f)
if not sys.stdin.isatty():
return json.load(sys.stdin)
raise SystemExit('Provide --inventory-file or pipe inventory JSON to stdin.')
def get_target_host(inventory, host_name=None):
hosts = inventory.get('all', {}).get('hosts', {})
if host_name:
try:
return host_name, hosts[host_name]
except KeyError as exc:
available = ', '.join(sorted(hosts))
raise SystemExit(f'Host {host_name!r} not found. Available: {available}') from exc
for name, host in hosts.items():
if name != 'localhost':
return name, host
raise SystemExit('No remote hosts found in inventory JSON.')
def build_change_secret_privileged_params(host):
jms_asset = host['jms_asset']
jms_account = host['jms_account']
host_params = host.get('params', {})
return {
'login_host': jms_asset['address'],
'login_port': jms_asset['port'],
'login_user': jms_account['username'],
'login_password': jms_account['secret'],
'login_secret_type': jms_account['secret_type'],
'login_private_key_path': jms_account['private_key_path'],
'gateway_args': jms_asset.get('ansible_ssh_common_args', ''),
'recv_timeout': host_params.get('recv_timeout', 30),
'delay_time': host_params.get('delay_time', 2),
'prompt': host_params.get('prompt', '.*'),
'answers': host_params.get('answers', '.*'),
'commands': None,
'become': host.get('jms_custom_become', False),
'become_method': host.get('jms_custom_become_method', 'su'),
'become_user': host.get('jms_custom_become_user', ''),
'become_password': host.get('jms_custom_become_password', ''),
'become_private_key_path': host.get('jms_custom_become_private_key_path'),
'old_ssh_version': jms_asset.get('old_ssh_version', False),
}
def print_effective_params(host_name, params):
print(f'host = {host_name}')
print(
json.dumps(
{
'login_host': params['login_host'],
'login_port': params['login_port'],
'login_user': params['login_user'],
'login_password': mask(params['login_password']),
'login_secret_type': params['login_secret_type'],
'login_private_key_path': params['login_private_key_path'],
'become': params['become'],
'become_method': params['become_method'],
'become_user': params['become_user'],
'become_password': mask(params['become_password']),
'become_private_key_path': params['become_private_key_path'],
'old_ssh_version': params['old_ssh_version'],
'gateway_args': params['gateway_args'],
'recv_timeout': params['recv_timeout'],
},
ensure_ascii=False,
indent=2,
)
)
def run_with_timeout(timeout, step_name, func):
if timeout <= 0:
return func()
def handler(signum, frame):
raise TimeoutError(f'{step_name} timed out after {timeout}s')
previous = signal.signal(signal.SIGALRM, handler)
try:
signal.alarm(timeout)
return func()
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, previous)
def run_client(params, args):
module = DummyModule(params)
with SSHClient(module) as client:
client.connect_params.update(
{
'timeout': args.connect_timeout,
'banner_timeout': args.banner_timeout,
'auth_timeout': args.auth_timeout,
}
)
print(
'connect_params =',
json.dumps(
{
'hostname': client.connect_params.get('hostname'),
'port': client.connect_params.get('port'),
'username': client.connect_params.get('username'),
'password': mask(client.connect_params.get('password')),
'key_filename': client.connect_params.get('key_filename'),
'transport_factory': getattr(
client.connect_params.get('transport_factory'),
'__name__',
None,
),
'timeout': client.connect_params.get('timeout'),
'banner_timeout': client.connect_params.get('banner_timeout'),
'auth_timeout': client.connect_params.get('auth_timeout'),
},
ensure_ascii=False,
),
)
run_with_timeout(
args.overall_connect_timeout,
'SSH connect',
client.connect,
)
if args.command:
output, error = client.execute([args.command], ['.*'])
print('command =', args.command)
print('output =', output)
print('error =', error)
def parse_args():
parser = argparse.ArgumentParser(
description='Debug remote_client.py using JumpServer inventory JSON.',
)
parser.add_argument(
'--inventory-file',
help='Path to a hosts.json or equivalent inventory JSON file.',
)
parser.add_argument(
'--host',
help='Inventory host key, for example: dqyhd009010(useradmin)',
)
parser.add_argument(
'--command',
default='whoami',
help='Optional command to run after connect(); default: whoami',
)
parser.add_argument(
'--connect-timeout',
type=int,
default=10,
help='Socket connect timeout passed to paramiko.connect(); default: 10',
)
parser.add_argument(
'--banner-timeout',
type=int,
default=10,
help='Banner timeout passed to paramiko.connect(); default: 10',
)
parser.add_argument(
'--auth-timeout',
type=int,
default=10,
help='Authentication timeout passed to paramiko.connect(); default: 10',
)
parser.add_argument(
'--overall-connect-timeout',
type=int,
default=20,
help='Hard timeout around client.connect(); set 0 to disable; default: 20',
)
parser.add_argument(
'--no-debug',
action='store_true',
help='Do not enable JMS_REMOTE_CLIENT_DEBUG automatically.',
)
return parser.parse_args()
def main():
args = parse_args()
if not args.no_debug:
os.environ.setdefault('JMS_REMOTE_CLIENT_DEBUG', '1')
inventory = load_inventory(args)
host_name, host = get_target_host(inventory, args.host)
params = build_change_secret_privileged_params(host)
print_effective_params(host_name, params)
print('flow = ssh as become_user -> su/sudo to login_user')
try:
run_client(params, args)
except Exception as exc:
print(f'error = {exc}', file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
raise SystemExit(1) from exc
'''
docker cp ./hosts.json jms_core:/opt/jumpserver/
docker cp ./debug_remote_client.py jms_core:/opt/jumpserver/
root@jms_core:/opt/jumpserver# pwd
/opt/jumpserver
dqyhd009010(useradmin) is an example host key from inventory JSON, replace it with your actual host key.
PYTHONPATH=/opt/jumpserver/apps JMS_REMOTE_CLIENT_DEBUG=1 python debug_remote_client.py --inventory-file hosts.json --host 'dqyhd009010(useradmin)' --command 'whoami' --connect-timeout 5 --banner-timeout 5 --auth-timeout 5 --overall-connect-timeout 15
'''
if __name__ == '__main__':
main()