perf: change some sec issue

This commit is contained in:
ibuler
2026-02-02 13:45:41 +08:00
parent e66406a548
commit c847caabaa

145
apps/common/utils/zip.py Normal file
View File

@@ -0,0 +1,145 @@
import os
import stat
import hashlib
from pathlib import Path
from zipfile import ZipFile, BadZipFile
# -------------------------
# 可按你们安全规范调整
# -------------------------
MAX_FILES = 1000
MAX_SINGLE_FILE_SIZE = 200 * 1024 * 1024 # 200MB
MAX_TOTAL_SIZE = 1 * 1024 * 1024 * 1024 # 1GB
MAX_COMPRESSION_RATIO = 100 # 解压 / 压缩
class ZipSecurityError(Exception):
pass
# -------------------------
# 工具函数
# -------------------------
def _is_symlink(zip_info):
return stat.S_ISLNK(zip_info.external_attr >> 16)
def _is_safe_path(base_dir: Path, target: Path) -> bool:
try:
return target.resolve().is_relative_to(base_dir.resolve())
except AttributeError:
# Python < 3.9
return str(target.resolve()).startswith(str(base_dir.resolve()))
def _verify_signature(zip_path: Path, sig_path: Path, public_key_pem: bytes):
"""
示例RSA + SHA256
你可以替换成你们自己的验签逻辑
"""
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
data = zip_path.read_bytes()
signature = sig_path.read_bytes()
public_key = serialization.load_pem_public_key(public_key_pem)
public_key.verify(
signature,
data,
padding.PKCS1v15(),
hashes.SHA256(),
)
# -------------------------
# 主函数
# -------------------------
def safe_extract_zip(
zip_path: str | Path,
extract_dir: str | Path,
zip_sign_path: str | Path | None = None,
*,
public_key_pem: bytes | None = None,
):
"""
安全解压 zip
:param zip_path: zip 文件路径
:param extract_dir: 解压目标目录
:param zip_sign_path: 可选zip 签名文件路径
:param public_key_pem: 可选,验签用公钥
"""
zip_path = Path(zip_path)
extract_dir = Path(extract_dir)
extract_dir.mkdir(parents=True, exist_ok=True)
# 1⃣ 签名校验
if zip_sign_path:
if not public_key_pem:
raise ZipSecurityError("Signature provided but public key missing")
_verify_signature(
zip_path,
Path(zip_sign_path),
public_key_pem,
)
try:
with ZipFile(zip_path) as zf:
infos = zf.infolist()
# 2⃣ 条目数量限制
if len(infos) > MAX_FILES:
raise ZipSecurityError("Too many files in zip")
total_size = 0
for info in infos:
name = info.filename
# 3⃣ 基础文件名校验
if name.startswith(("/", "\\")):
raise ZipSecurityError(f"Absolute path not allowed: {name}")
if ".." in Path(name).parts:
raise ZipSecurityError(f"Path traversal detected: {name}")
# 4⃣ 软链接检测
if _is_symlink(info):
raise ZipSecurityError(f"Symlink not allowed: {name}")
# 5⃣ 文件大小限制
if info.file_size > MAX_SINGLE_FILE_SIZE:
raise ZipSecurityError(f"File too large: {name}")
total_size += info.file_size
if total_size > MAX_TOTAL_SIZE:
raise ZipSecurityError("Total extracted size exceeded")
# 6⃣ 压缩比校验(防 zip bomb
if info.compress_size > 0:
ratio = info.file_size / info.compress_size
if ratio > MAX_COMPRESSION_RATIO:
raise ZipSecurityError(
f"Suspicious compression ratio ({ratio:.1f}): {name}"
)
# 7⃣ 最终路径校验
target_path = extract_dir / name
if not _is_safe_path(extract_dir, target_path):
raise ZipSecurityError(f"Unsafe extract path: {name}")
# 8⃣ 解压(手动)
if info.is_dir():
target_path.mkdir(parents=True, exist_ok=True)
else:
target_path.parent.mkdir(parents=True, exist_ok=True)
with zf.open(info) as src, open(target_path, "wb") as dst:
dst.write(src.read())
except BadZipFile:
raise ZipSecurityError("Invalid zip file")