mirror of
https://github.com/jumpserver/jumpserver.git
synced 2026-03-19 11:32:09 +00:00
297 lines
8.9 KiB
Python
297 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
import polib
|
|
|
|
from _translator.base import BaseTranslateManager
|
|
from _translator.utils import build_translator
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
I18N_ROOT = Path(__file__).resolve().parent
|
|
|
|
|
|
def _run_git(args: list[str]) -> str:
|
|
out = subprocess.check_output(["git", *args], cwd=REPO_ROOT, text=True)
|
|
return out
|
|
|
|
|
|
def _git_show_text(rev: str, relpath: str) -> str | None:
|
|
try:
|
|
return _run_git(["show", f"{rev}:{relpath}"])
|
|
except subprocess.CalledProcessError:
|
|
return None
|
|
|
|
|
|
def _read_json_text(path: Path) -> dict:
|
|
if not path.exists():
|
|
return {}
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def _read_json_text_from_git(rev: str, relpath: str) -> dict:
|
|
txt = _git_show_text(rev, relpath)
|
|
if not txt:
|
|
return {}
|
|
return json.loads(txt)
|
|
|
|
|
|
def _read_po_from_text(txt: str) -> polib.POFile:
|
|
with tempfile.NamedTemporaryFile("w+", encoding="utf-8", suffix=".po", delete=False) as f:
|
|
f.write(txt)
|
|
f.flush()
|
|
name = f.name
|
|
try:
|
|
return polib.pofile(name)
|
|
finally:
|
|
try:
|
|
os.unlink(name)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _read_po_from_git(rev: str, relpath: str) -> polib.POFile | None:
|
|
txt = _git_show_text(rev, relpath)
|
|
if not txt:
|
|
return None
|
|
return _read_po_from_text(txt)
|
|
|
|
|
|
def _read_po_from_fs(path: Path) -> polib.POFile:
|
|
return polib.pofile(str(path))
|
|
|
|
|
|
def _changed_keys_json(base: dict, head: dict) -> set[str]:
|
|
changed: set[str] = set()
|
|
for k, v in head.items():
|
|
if not k:
|
|
continue
|
|
if k not in base or base.get(k) != v:
|
|
changed.add(k)
|
|
return changed
|
|
|
|
|
|
def _po_translated_dict(po: polib.POFile) -> dict[str, str]:
|
|
return {e.msgid: e.msgstr for e in po.translated_entries()}
|
|
|
|
|
|
def _changed_msgids_po(base_po: polib.POFile | None, head_po: polib.POFile) -> set[str]:
|
|
base = _po_translated_dict(base_po) if base_po else {}
|
|
head = _po_translated_dict(head_po)
|
|
changed: set[str] = set()
|
|
for msgid, msgstr in head.items():
|
|
if msgid not in base or base.get(msgid) != msgstr:
|
|
changed.add(msgid)
|
|
return changed
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class WorkItem:
|
|
# For json: kind="json", src points to zh.json
|
|
# For po: kind="po", src points to zh/LC_MESSAGES/{django.po|djangojs.po}
|
|
kind: str
|
|
src_relpath: str
|
|
|
|
|
|
class _BulkTranslator(BaseTranslateManager):
|
|
# Reuse BaseTranslateManager.bulk_translate
|
|
pass
|
|
|
|
|
|
async def _translate_json_item(
|
|
translator,
|
|
module_dir: Path,
|
|
zh_relpath: str,
|
|
changed_keys: set[str],
|
|
overwrite: bool,
|
|
):
|
|
zh_path = REPO_ROOT / zh_relpath
|
|
zh_dict = _read_json_text(zh_path)
|
|
if not zh_dict:
|
|
return
|
|
|
|
mgr = _BulkTranslator(str(module_dir), translator)
|
|
for file_prefix, target_lang in mgr.LANG_MAPPER.items():
|
|
file_prefix = file_prefix.lower()
|
|
if file_prefix == "zh":
|
|
continue
|
|
|
|
target_path = module_dir / f"{file_prefix}.json"
|
|
target_dict = _read_json_text(target_path)
|
|
|
|
to_update: dict[str, str] = {}
|
|
for k in changed_keys:
|
|
if k not in zh_dict:
|
|
continue
|
|
if overwrite or k not in target_dict:
|
|
to_update[k] = zh_dict[k]
|
|
|
|
if not to_update:
|
|
continue
|
|
translated = await mgr.bulk_translate(to_update, target_lang)
|
|
target_dict.update(translated)
|
|
target_path.write_text(
|
|
json.dumps(target_dict, ensure_ascii=False, sort_keys=True, indent=4) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
async def _translate_po_item(
|
|
translator,
|
|
module_dir: Path,
|
|
po_name: str,
|
|
zh_relpath: str,
|
|
changed_msgids: set[str],
|
|
overwrite: bool,
|
|
):
|
|
zh_path = REPO_ROOT / zh_relpath
|
|
if not zh_path.exists():
|
|
return
|
|
zh_po = _read_po_from_fs(zh_path)
|
|
zh_dict = _po_translated_dict(zh_po)
|
|
if not zh_dict:
|
|
return
|
|
|
|
mgr = _BulkTranslator(str(module_dir), translator)
|
|
for file_prefix, target_lang in mgr.LANG_MAPPER.items():
|
|
if file_prefix == "zh":
|
|
continue
|
|
|
|
target_path = module_dir / file_prefix / "LC_MESSAGES" / po_name
|
|
if not target_path.exists():
|
|
continue
|
|
|
|
trans_po = _read_po_from_fs(target_path)
|
|
to_update: dict[str, str] = {}
|
|
|
|
for msgid in changed_msgids:
|
|
if msgid not in zh_dict:
|
|
continue
|
|
entry = trans_po.find(msgid)
|
|
if not entry:
|
|
continue
|
|
if overwrite or (not entry.msgstr) or ("fuzzy" in entry.flags):
|
|
to_update[msgid] = zh_dict[msgid]
|
|
|
|
if not to_update:
|
|
continue
|
|
translated = await mgr.bulk_translate(to_update, target_lang)
|
|
for msgid, msgstr in translated.items():
|
|
entry = trans_po.find(msgid)
|
|
if not entry:
|
|
continue
|
|
entry.flags = []
|
|
entry.previous_msgid = None
|
|
entry.msgstr = msgstr
|
|
trans_po.save(str(target_path))
|
|
|
|
|
|
def _discover_work_items_from_diff(base: str, head: str) -> list[WorkItem]:
|
|
changed_files = _run_git(["diff", "--name-only", f"{base}..{head}"]).splitlines()
|
|
|
|
items: list[WorkItem] = []
|
|
for p in changed_files:
|
|
if not p.startswith("apps/i18n/"):
|
|
continue
|
|
|
|
# json modules
|
|
if p.endswith("/zh.json") and "/LC_MESSAGES/" not in p:
|
|
items.append(WorkItem(kind="json", src_relpath=p))
|
|
continue
|
|
|
|
# gettext sources
|
|
if p.endswith("/zh/LC_MESSAGES/django.po") or p.endswith("/zh/LC_MESSAGES/djangojs.po"):
|
|
items.append(WorkItem(kind="po", src_relpath=p))
|
|
continue
|
|
|
|
# de-dup
|
|
uniq: dict[tuple[str, str], WorkItem] = {(i.kind, i.src_relpath): i for i in items}
|
|
return list(uniq.values())
|
|
|
|
|
|
async def run_pr(base: str, head: str, overwrite: bool):
|
|
translator = build_translator()
|
|
items = _discover_work_items_from_diff(base, head)
|
|
if not items:
|
|
print("No i18n source changes detected; skip.")
|
|
return
|
|
|
|
for item in items:
|
|
if item.kind == "json":
|
|
module_dir = (REPO_ROOT / item.src_relpath).parent
|
|
base_dict = _read_json_text_from_git(base, item.src_relpath)
|
|
head_dict = _read_json_text(REPO_ROOT / item.src_relpath)
|
|
changed = _changed_keys_json(base_dict, head_dict)
|
|
if not changed:
|
|
continue
|
|
await _translate_json_item(translator, module_dir, item.src_relpath, changed, overwrite=overwrite)
|
|
elif item.kind == "po":
|
|
src_path = REPO_ROOT / item.src_relpath
|
|
# .../core/zh/LC_MESSAGES/django.po -> module_dir=.../core
|
|
module_dir = src_path.parents[2]
|
|
po_name = src_path.name
|
|
base_po = _read_po_from_git(base, item.src_relpath)
|
|
head_po = _read_po_from_fs(src_path)
|
|
changed = _changed_msgids_po(base_po, head_po)
|
|
if not changed:
|
|
continue
|
|
await _translate_po_item(
|
|
translator,
|
|
module_dir,
|
|
po_name=po_name,
|
|
zh_relpath=item.src_relpath,
|
|
changed_msgids=changed,
|
|
overwrite=overwrite,
|
|
)
|
|
|
|
|
|
async def run_full():
|
|
# Full run: reuse existing translate logic, but load by file path
|
|
# to avoid name conflicts with any third-party "translate" package.
|
|
import importlib.util
|
|
|
|
translate_path = I18N_ROOT / "translate.py"
|
|
spec = importlib.util.spec_from_file_location("jumpserver_i18n_translate", translate_path)
|
|
if not spec or not spec.loader:
|
|
raise RuntimeError(f"Failed to load translate module from {translate_path}")
|
|
mod = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(mod)
|
|
|
|
translator = build_translator()
|
|
manager = mod.Translate(translator)
|
|
await manager.run()
|
|
|
|
|
|
def main(argv: Iterable[str] | None = None):
|
|
parser = argparse.ArgumentParser(description="JumpServer i18n CI translator")
|
|
parser.add_argument("--mode", choices=["full", "pr"], required=True)
|
|
parser.add_argument("--base", help="Base git sha (PR only)")
|
|
parser.add_argument("--head", help="Head git sha (PR only)", default="HEAD")
|
|
parser.add_argument(
|
|
"--overwrite",
|
|
action=argparse.BooleanOptionalAction,
|
|
default=True,
|
|
help="Overwrite existing translations for changed source keys/msgids",
|
|
)
|
|
args = parser.parse_args(list(argv) if argv is not None else None)
|
|
|
|
if args.mode == "full":
|
|
asyncio.run(run_full())
|
|
return
|
|
|
|
if not args.base:
|
|
raise SystemExit("--base is required for --mode pr")
|
|
asyncio.run(run_pr(args.base, args.head, overwrite=args.overwrite))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|