refactor: tighten ansible safety layer boundaries

This commit is contained in:
Crane.z
2026-04-09 19:07:43 +08:00
parent 135b96b03c
commit 7a3e1ae2fd
2 changed files with 77 additions and 42 deletions

View File

@@ -1,4 +1,6 @@
import json
import os
import tempfile
import yaml
from ansible.utils.unsafe_proxy import wrap_var
@@ -80,6 +82,13 @@ def dump_ansible_yaml(data, stream):
)
INVENTORY_LITERAL_PATHS = (
("all", "hosts", "*", "ansible_host"),
("all", "hosts", "*", "jms_asset", "address"),
("all", "hosts", "*", "jms_asset", "origin_address"),
)
def escape_ansible_jinja(value):
if not isinstance(value, str):
return value
@@ -105,6 +114,52 @@ def sanitize_ansible_inventory_value(value):
return escape_ansible_jinja(value)
def sanitize_inventory_by_paths(value, path_patterns, current_path=()):
# 递归遍历 inventory只在命中高风险路径时处理对应值
if isinstance(value, dict):
return {
key: sanitize_inventory_by_paths(item, path_patterns, current_path + (key,))
for key, item in value.items()
}
if isinstance(value, list):
return [
sanitize_inventory_by_paths(item, path_patterns, current_path + (str(index),))
for index, item in enumerate(value)
]
if isinstance(value, tuple):
return tuple(
sanitize_inventory_by_paths(item, path_patterns, current_path + (str(index),))
for index, item in enumerate(value)
)
if any(path_matches_pattern(current_path, pattern) for pattern in path_patterns):
return sanitize_ansible_inventory_value(value)
return value
def path_matches_pattern(path, pattern):
# `*` 只匹配单层,例如 host 名
if len(path) != len(pattern):
return False
return all(expected == "*" or actual == expected for actual, expected in zip(path, pattern))
def atomic_dump_text(dest_path, writer):
# 先写临时文件,再原子替换,避免目标文件出现半写入状态
dest_dir = os.path.dirname(dest_path) or "."
fd, tmp_path = tempfile.mkstemp(dir=dest_dir)
try:
with os.fdopen(fd, "w") as f:
writer(f)
os.replace(tmp_path, dest_path)
except Exception:
try:
os.remove(tmp_path)
except OSError:
pass
raise
def sanitize_ansible_playbook(playbook_path, dest_path):
with open(playbook_path) as f:
plays = yaml.safe_load(f)
@@ -114,19 +169,16 @@ def sanitize_ansible_playbook(playbook_path, dest_path):
if isinstance(vars_data, dict):
play["vars"] = wrap_ansible_unsafe(vars_data)
with open(dest_path, "w") as f:
dump_ansible_yaml(plays, f)
atomic_dump_text(dest_path, lambda f: dump_ansible_yaml(plays, f))
def sanitize_ansible_inventory_json(inventory_path, dest_path):
with open(inventory_path) as f:
data = json.load(f)
for host_name, host in data.get("all", {}).get("hosts", {}).items():
data["all"]["hosts"][host_name] = sanitize_ansible_inventory_value(host)
data = sanitize_inventory_by_paths(data, INVENTORY_LITERAL_PATHS)
with open(dest_path, "w") as f:
json.dump(data, f, indent=4)
atomic_dump_text(dest_path, lambda f: json.dump(data, f, indent=4))
if __name__ == '__main__':

View File

@@ -98,7 +98,6 @@ class PlaybookRunner:
self.extra_vars = extra_vars
self.safety_mode = safety_mode
self.inventory_safety = inventory_safety
self.safe_paths = []
def copy_playbook(self):
entry = os.path.basename(self.playbook)
@@ -111,26 +110,17 @@ class PlaybookRunner:
# Security anchor:
# For system-generated Ansible inputs that may contain user-controlled values,
# callers should explicitly enable safety_mode / inventory_safety so the runner
# executes sanitized copies instead of raw files.
# sanitizes the task-private playbook / inventory before Ansible runs.
# This is intended for system-generated inputs whose values should be treated as
# literal data, not for arbitrary user-authored playbooks or execution logic.
if self.safety_mode == "playbook_unsafe":
project_playbook = os.path.join(self.project_dir, "project", self.playbook)
suffix = uuid.uuid4().hex[:8]
safe_playbook = os.path.join(self.project_dir, "project", f"__safe__{suffix}__{self.playbook}")
sanitize_ansible_playbook(project_playbook, safe_playbook)
os.chmod(safe_playbook, 0o600)
self.safe_paths.append(safe_playbook)
self.playbook = os.path.basename(safe_playbook)
sanitize_ansible_playbook(project_playbook, project_playbook)
os.chmod(project_playbook, 0o600)
if self.inventory_safety == "json_escape":
inventory_dir = os.path.join(self.project_dir, "inventory")
os.makedirs(inventory_dir, exist_ok=True)
inventory_name = os.path.basename(self.inventory)
suffix = uuid.uuid4().hex[:8]
safe_inventory = os.path.join(inventory_dir, f"__safe__{suffix}__{inventory_name}")
sanitize_ansible_inventory_json(self.inventory, safe_inventory)
os.chmod(safe_inventory, 0o600)
self.safe_paths.append(safe_inventory)
self.inventory = safe_inventory
sanitize_ansible_inventory_json(self.inventory, self.inventory)
os.chmod(self.inventory, 0o600)
def run(self, verbosity=0, **kwargs):
self.copy_playbook()
@@ -146,25 +136,18 @@ class PlaybookRunner:
kwargs['process_isolation'] = True
kwargs['process_isolation_executable'] = 'bwrap'
try:
interface.run(
private_data_dir=self.project_dir,
inventory=self.inventory,
playbook=self.playbook,
verbosity=verbosity,
event_handler=self.cb.event_handler,
status_handler=self.cb.status_handler,
host_cwd=self.project_dir,
envvars=self.envs,
extravars=self.extra_vars,
**kwargs
)
finally:
for path in self.safe_paths:
try:
os.remove(path)
except OSError:
pass
interface.run(
private_data_dir=self.project_dir,
inventory=self.inventory,
playbook=self.playbook,
verbosity=verbosity,
event_handler=self.cb.event_handler,
status_handler=self.cb.status_handler,
host_cwd=self.project_dir,
envvars=self.envs,
extravars=self.extra_vars,
**kwargs
)
return self.cb