Files
jumpserver/apps/common/utils/yml.py

188 lines
5.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import tempfile
import yaml
from ansible.utils.unsafe_proxy import wrap_var
from django.conf import settings
from jinja2 import StrictUndefined
from jinja2.sandbox import SandboxedEnvironment
class AnsibleUnsafeDumper(yaml.SafeDumper):
pass
UnsafeTextType = type(wrap_var(''))
AnsibleUnsafeDumper.add_representer(
UnsafeTextType,
lambda dumper, data: dumper.represent_scalar('!unsafe', str(data))
)
def translate(key, i18n, lang):
lang = settings.LANGUAGE_CODE if lang is None else lang
lang = lang[:2]
lang_data = i18n.get(key, {})
return lang_data.get(lang, key)
def yaml_load_with_i18n(stream, lang=None):
ori_text = stream.read()
data = yaml.safe_load(ori_text)
i18n = data.get("i18n", {})
env = SandboxedEnvironment(
undefined=StrictUndefined,
autoescape=False,
)
def safe_trans(key):
if not isinstance(key, str):
raise ValueError("invalid i18n key")
return translate(key, i18n, lang)
env.filters.clear()
env.globals.clear()
env.filters["trans"] = safe_trans
template = env.from_string(ori_text)
try:
rendered = template.render()
except Exception as e:
rendered = ori_text
result = yaml.safe_load(rendered)
result.pop("i18n", None)
return result
def wrap_ansible_unsafe(value):
if value is None:
return value
if isinstance(value, dict):
return {k: wrap_ansible_unsafe(v) for k, v in value.items()}
if isinstance(value, list):
return [wrap_ansible_unsafe(v) for v in value]
if isinstance(value, tuple):
return tuple(wrap_ansible_unsafe(v) for v in value)
if isinstance(value, str):
return wrap_var(value)
return value
def dump_ansible_yaml(data, stream):
yaml.dump(
data,
stream,
Dumper=AnsibleUnsafeDumper,
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
)
INVENTORY_LITERAL_PATHS = (
("all", "hosts", "*", "ansible_host"),
("all", "hosts", "*", "jms_asset", "address"),
("all", "hosts", "*", "jms_asset", "origin_address"),
)
def escape_ansible_jinja(value):
if not isinstance(value, str):
return value
return (
value
.replace('{{', '{{ "{{" }}')
.replace('}}', '{{ "}}" }}')
.replace('{%', '{{ "{%" }}')
.replace('%}', '{{ "%}" }}')
.replace('{#', '{{ "{#" }}')
.replace('#}', '{{ "#}" }}')
)
def sanitize_ansible_inventory_value(value):
if isinstance(value, dict):
return {k: sanitize_ansible_inventory_value(v) for k, v in value.items()}
if isinstance(value, list):
return [sanitize_ansible_inventory_value(v) for v in value]
if isinstance(value, tuple):
return tuple(sanitize_ansible_inventory_value(v) for v in value)
return escape_ansible_jinja(value)
def sanitize_inventory_by_paths(value, path_patterns, current_path=()):
# 递归遍历 inventory只在命中高风险路径时处理对应值
if isinstance(value, dict):
return {
key: sanitize_inventory_by_paths(item, path_patterns, current_path + (key,))
for key, item in value.items()
}
if isinstance(value, list):
return [
sanitize_inventory_by_paths(item, path_patterns, current_path + (str(index),))
for index, item in enumerate(value)
]
if isinstance(value, tuple):
return tuple(
sanitize_inventory_by_paths(item, path_patterns, current_path + (str(index),))
for index, item in enumerate(value)
)
if any(path_matches_pattern(current_path, pattern) for pattern in path_patterns):
return sanitize_ansible_inventory_value(value)
return value
def path_matches_pattern(path, pattern):
# `*` 只匹配单层,例如 host 名
if len(path) != len(pattern):
return False
return all(expected == "*" or actual == expected for actual, expected in zip(path, pattern))
def atomic_dump_text(dest_path, writer):
# 先写临时文件,再原子替换,避免目标文件出现半写入状态
dest_dir = os.path.dirname(dest_path) or "."
fd, tmp_path = tempfile.mkstemp(dir=dest_dir)
try:
with os.fdopen(fd, "w") as f:
writer(f)
os.replace(tmp_path, dest_path)
except Exception:
try:
os.remove(tmp_path)
except OSError:
pass
raise
def sanitize_ansible_playbook(playbook_path, dest_path):
with open(playbook_path) as f:
plays = yaml.safe_load(f)
for play in plays or []:
vars_data = play.get("vars")
if isinstance(vars_data, dict):
play["vars"] = wrap_ansible_unsafe(vars_data)
atomic_dump_text(dest_path, lambda f: dump_ansible_yaml(plays, f))
def sanitize_ansible_inventory_json(inventory_path, dest_path):
with open(inventory_path) as f:
data = json.load(f)
data = sanitize_inventory_by_paths(data, INVENTORY_LITERAL_PATHS)
atomic_dump_text(dest_path, lambda f: json.dump(data, f, indent=4))
if __name__ == '__main__':
with open('manifest.yml') as f:
data = yaml_load_with_i18n(f)
print(data)