From c847caabaa5201e52b52496f4fd2e1a65fc48fdd Mon Sep 17 00:00:00 2001 From: ibuler Date: Mon, 2 Feb 2026 13:45:41 +0800 Subject: [PATCH] perf: change some sec issue --- apps/common/utils/zip.py | 145 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 apps/common/utils/zip.py diff --git a/apps/common/utils/zip.py b/apps/common/utils/zip.py new file mode 100644 index 000000000..b11443d3d --- /dev/null +++ b/apps/common/utils/zip.py @@ -0,0 +1,145 @@ +import os +import stat +import hashlib +from pathlib import Path +from zipfile import ZipFile, BadZipFile + + +# ------------------------- +# 可按你们安全规范调整 +# ------------------------- +MAX_FILES = 1000 +MAX_SINGLE_FILE_SIZE = 200 * 1024 * 1024 # 200MB +MAX_TOTAL_SIZE = 1 * 1024 * 1024 * 1024 # 1GB +MAX_COMPRESSION_RATIO = 100 # 解压 / 压缩 + + +class ZipSecurityError(Exception): + pass + + +# ------------------------- +# 工具函数 +# ------------------------- +def _is_symlink(zip_info): + return stat.S_ISLNK(zip_info.external_attr >> 16) + + +def _is_safe_path(base_dir: Path, target: Path) -> bool: + try: + return target.resolve().is_relative_to(base_dir.resolve()) + except AttributeError: + # Python < 3.9 + return str(target.resolve()).startswith(str(base_dir.resolve())) + + +def _verify_signature(zip_path: Path, sig_path: Path, public_key_pem: bytes): + """ + 示例:RSA + SHA256 + 你可以替换成你们自己的验签逻辑 + """ + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import padding + + data = zip_path.read_bytes() + signature = sig_path.read_bytes() + + public_key = serialization.load_pem_public_key(public_key_pem) + + public_key.verify( + signature, + data, + padding.PKCS1v15(), + hashes.SHA256(), + ) + + +# ------------------------- +# 主函数 +# ------------------------- +def safe_extract_zip( + zip_path: str | Path, + extract_dir: str | Path, + zip_sign_path: str | Path | None = None, + *, + public_key_pem: bytes | None = None, +): + """ + 安全解压 zip + + :param zip_path: zip 文件路径 + :param extract_dir: 解压目标目录 + :param zip_sign_path: 可选,zip 签名文件路径 + :param public_key_pem: 可选,验签用公钥 + """ + + zip_path = Path(zip_path) + extract_dir = Path(extract_dir) + extract_dir.mkdir(parents=True, exist_ok=True) + + # 1️⃣ 签名校验 + if zip_sign_path: + if not public_key_pem: + raise ZipSecurityError("Signature provided but public key missing") + + _verify_signature( + zip_path, + Path(zip_sign_path), + public_key_pem, + ) + + try: + with ZipFile(zip_path) as zf: + infos = zf.infolist() + + # 2️⃣ 条目数量限制 + if len(infos) > MAX_FILES: + raise ZipSecurityError("Too many files in zip") + + total_size = 0 + + for info in infos: + name = info.filename + + # 3️⃣ 基础文件名校验 + if name.startswith(("/", "\\")): + raise ZipSecurityError(f"Absolute path not allowed: {name}") + + if ".." in Path(name).parts: + raise ZipSecurityError(f"Path traversal detected: {name}") + + # 4️⃣ 软链接检测 + if _is_symlink(info): + raise ZipSecurityError(f"Symlink not allowed: {name}") + + # 5️⃣ 文件大小限制 + if info.file_size > MAX_SINGLE_FILE_SIZE: + raise ZipSecurityError(f"File too large: {name}") + + total_size += info.file_size + if total_size > MAX_TOTAL_SIZE: + raise ZipSecurityError("Total extracted size exceeded") + + # 6️⃣ 压缩比校验(防 zip bomb) + if info.compress_size > 0: + ratio = info.file_size / info.compress_size + if ratio > MAX_COMPRESSION_RATIO: + raise ZipSecurityError( + f"Suspicious compression ratio ({ratio:.1f}): {name}" + ) + + # 7️⃣ 最终路径校验 + target_path = extract_dir / name + if not _is_safe_path(extract_dir, target_path): + raise ZipSecurityError(f"Unsafe extract path: {name}") + + # 8️⃣ 解压(手动) + if info.is_dir(): + target_path.mkdir(parents=True, exist_ok=True) + else: + target_path.parent.mkdir(parents=True, exist_ok=True) + with zf.open(info) as src, open(target_path, "wb") as dst: + dst.write(src.read()) + + except BadZipFile: + raise ZipSecurityError("Invalid zip file") \ No newline at end of file