Files
jumpserver/apps/i18n/ci_translate.py
2026-03-03 11:40:42 +08:00

297 lines
8.9 KiB
Python

#!/usr/bin/env python3
import argparse
import asyncio
import json
import os
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
import polib
from _translator.base import BaseTranslateManager
from _translator.utils import build_translator
REPO_ROOT = Path(__file__).resolve().parents[2]
I18N_ROOT = Path(__file__).resolve().parent
def _run_git(args: list[str]) -> str:
out = subprocess.check_output(["git", *args], cwd=REPO_ROOT, text=True)
return out
def _git_show_text(rev: str, relpath: str) -> str | None:
try:
return _run_git(["show", f"{rev}:{relpath}"])
except subprocess.CalledProcessError:
return None
def _read_json_text(path: Path) -> dict:
if not path.exists():
return {}
return json.loads(path.read_text(encoding="utf-8"))
def _read_json_text_from_git(rev: str, relpath: str) -> dict:
txt = _git_show_text(rev, relpath)
if not txt:
return {}
return json.loads(txt)
def _read_po_from_text(txt: str) -> polib.POFile:
with tempfile.NamedTemporaryFile("w+", encoding="utf-8", suffix=".po", delete=False) as f:
f.write(txt)
f.flush()
name = f.name
try:
return polib.pofile(name)
finally:
try:
os.unlink(name)
except OSError:
pass
def _read_po_from_git(rev: str, relpath: str) -> polib.POFile | None:
txt = _git_show_text(rev, relpath)
if not txt:
return None
return _read_po_from_text(txt)
def _read_po_from_fs(path: Path) -> polib.POFile:
return polib.pofile(str(path))
def _changed_keys_json(base: dict, head: dict) -> set[str]:
changed: set[str] = set()
for k, v in head.items():
if not k:
continue
if k not in base or base.get(k) != v:
changed.add(k)
return changed
def _po_translated_dict(po: polib.POFile) -> dict[str, str]:
return {e.msgid: e.msgstr for e in po.translated_entries()}
def _changed_msgids_po(base_po: polib.POFile | None, head_po: polib.POFile) -> set[str]:
base = _po_translated_dict(base_po) if base_po else {}
head = _po_translated_dict(head_po)
changed: set[str] = set()
for msgid, msgstr in head.items():
if msgid not in base or base.get(msgid) != msgstr:
changed.add(msgid)
return changed
@dataclass(frozen=True)
class WorkItem:
# For json: kind="json", src points to zh.json
# For po: kind="po", src points to zh/LC_MESSAGES/{django.po|djangojs.po}
kind: str
src_relpath: str
class _BulkTranslator(BaseTranslateManager):
# Reuse BaseTranslateManager.bulk_translate
pass
async def _translate_json_item(
translator,
module_dir: Path,
zh_relpath: str,
changed_keys: set[str],
overwrite: bool,
):
zh_path = REPO_ROOT / zh_relpath
zh_dict = _read_json_text(zh_path)
if not zh_dict:
return
mgr = _BulkTranslator(str(module_dir), translator)
for file_prefix, target_lang in mgr.LANG_MAPPER.items():
file_prefix = file_prefix.lower()
if file_prefix == "zh":
continue
target_path = module_dir / f"{file_prefix}.json"
target_dict = _read_json_text(target_path)
to_update: dict[str, str] = {}
for k in changed_keys:
if k not in zh_dict:
continue
if overwrite or k not in target_dict:
to_update[k] = zh_dict[k]
if not to_update:
continue
translated = await mgr.bulk_translate(to_update, target_lang)
target_dict.update(translated)
target_path.write_text(
json.dumps(target_dict, ensure_ascii=False, sort_keys=True, indent=4) + "\n",
encoding="utf-8",
)
async def _translate_po_item(
translator,
module_dir: Path,
po_name: str,
zh_relpath: str,
changed_msgids: set[str],
overwrite: bool,
):
zh_path = REPO_ROOT / zh_relpath
if not zh_path.exists():
return
zh_po = _read_po_from_fs(zh_path)
zh_dict = _po_translated_dict(zh_po)
if not zh_dict:
return
mgr = _BulkTranslator(str(module_dir), translator)
for file_prefix, target_lang in mgr.LANG_MAPPER.items():
if file_prefix == "zh":
continue
target_path = module_dir / file_prefix / "LC_MESSAGES" / po_name
if not target_path.exists():
continue
trans_po = _read_po_from_fs(target_path)
to_update: dict[str, str] = {}
for msgid in changed_msgids:
if msgid not in zh_dict:
continue
entry = trans_po.find(msgid)
if not entry:
continue
if overwrite or (not entry.msgstr) or ("fuzzy" in entry.flags):
to_update[msgid] = zh_dict[msgid]
if not to_update:
continue
translated = await mgr.bulk_translate(to_update, target_lang)
for msgid, msgstr in translated.items():
entry = trans_po.find(msgid)
if not entry:
continue
entry.flags = []
entry.previous_msgid = None
entry.msgstr = msgstr
trans_po.save(str(target_path))
def _discover_work_items_from_diff(base: str, head: str) -> list[WorkItem]:
changed_files = _run_git(["diff", "--name-only", f"{base}..{head}"]).splitlines()
items: list[WorkItem] = []
for p in changed_files:
if not p.startswith("apps/i18n/"):
continue
# json modules
if p.endswith("/zh.json") and "/LC_MESSAGES/" not in p:
items.append(WorkItem(kind="json", src_relpath=p))
continue
# gettext sources
if p.endswith("/zh/LC_MESSAGES/django.po") or p.endswith("/zh/LC_MESSAGES/djangojs.po"):
items.append(WorkItem(kind="po", src_relpath=p))
continue
# de-dup
uniq: dict[tuple[str, str], WorkItem] = {(i.kind, i.src_relpath): i for i in items}
return list(uniq.values())
async def run_pr(base: str, head: str, overwrite: bool):
translator = build_translator()
items = _discover_work_items_from_diff(base, head)
if not items:
print("No i18n source changes detected; skip.")
return
for item in items:
if item.kind == "json":
module_dir = (REPO_ROOT / item.src_relpath).parent
base_dict = _read_json_text_from_git(base, item.src_relpath)
head_dict = _read_json_text(REPO_ROOT / item.src_relpath)
changed = _changed_keys_json(base_dict, head_dict)
if not changed:
continue
await _translate_json_item(translator, module_dir, item.src_relpath, changed, overwrite=overwrite)
elif item.kind == "po":
src_path = REPO_ROOT / item.src_relpath
# .../core/zh/LC_MESSAGES/django.po -> module_dir=.../core
module_dir = src_path.parents[2]
po_name = src_path.name
base_po = _read_po_from_git(base, item.src_relpath)
head_po = _read_po_from_fs(src_path)
changed = _changed_msgids_po(base_po, head_po)
if not changed:
continue
await _translate_po_item(
translator,
module_dir,
po_name=po_name,
zh_relpath=item.src_relpath,
changed_msgids=changed,
overwrite=overwrite,
)
async def run_full():
# Full run: reuse existing translate logic, but load by file path
# to avoid name conflicts with any third-party "translate" package.
import importlib.util
translate_path = I18N_ROOT / "translate.py"
spec = importlib.util.spec_from_file_location("jumpserver_i18n_translate", translate_path)
if not spec or not spec.loader:
raise RuntimeError(f"Failed to load translate module from {translate_path}")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
translator = build_translator()
manager = mod.Translate(translator)
await manager.run()
def main(argv: Iterable[str] | None = None):
parser = argparse.ArgumentParser(description="JumpServer i18n CI translator")
parser.add_argument("--mode", choices=["full", "pr"], required=True)
parser.add_argument("--base", help="Base git sha (PR only)")
parser.add_argument("--head", help="Head git sha (PR only)", default="HEAD")
parser.add_argument(
"--overwrite",
action=argparse.BooleanOptionalAction,
default=True,
help="Overwrite existing translations for changed source keys/msgids",
)
args = parser.parse_args(list(argv) if argv is not None else None)
if args.mode == "full":
asyncio.run(run_full())
return
if not args.base:
raise SystemExit("--base is required for --mode pr")
asyncio.run(run_pr(args.base, args.head, overwrite=args.overwrite))
if __name__ == "__main__":
main()