Files
2026-02-02 18:49:12 +08:00

146 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import stat
import hashlib
from pathlib import Path
from rest_framework.exceptions import ValidationError
from zipfile import ZipFile, BadZipFile
# -------------------------
# 可按你们安全规范调整
# -------------------------
MAX_FILES = 1000
MAX_SINGLE_FILE_SIZE = 200 * 1024 * 1024 # 200MB
MAX_TOTAL_SIZE = 1 * 1024 * 1024 * 1024 # 1GB
MAX_COMPRESSION_RATIO = 100 # 解压 / 压缩
class ZipSecurityError(ValidationError):
pass
# -------------------------
# 工具函数
# -------------------------
def _is_symlink(zip_info):
return stat.S_ISLNK(zip_info.external_attr >> 16)
def _is_safe_path(base_dir: Path, target: Path) -> bool:
try:
return target.resolve().is_relative_to(base_dir.resolve())
except AttributeError:
# Python < 3.9
return str(target.resolve()).startswith(str(base_dir.resolve()))
def _verify_signature(zip_path: Path, sig_path: Path, public_key_pem: bytes):
"""
示例RSA + SHA256
你可以替换成你们自己的验签逻辑
"""
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
data = zip_path.read_bytes()
signature = sig_path.read_bytes()
public_key = serialization.load_pem_public_key(public_key_pem)
public_key.verify(
signature,
data,
padding.PKCS1v15(),
hashes.SHA256(),
)
# -------------------------
# 主函数
# -------------------------
def safe_extract_zip(
zip_path: str | Path,
extract_dir: str | Path,
zip_sign_path: str | Path | None = None,
*,
public_key_pem: bytes | None = None,
):
"""
安全解压 zip
:param zip_path: zip 文件路径
:param extract_dir: 解压目标目录
:param zip_sign_path: 可选zip 签名文件路径
:param public_key_pem: 可选,验签用公钥
"""
zip_path = Path(zip_path)
extract_dir = Path(extract_dir)
extract_dir.mkdir(parents=True, exist_ok=True)
# 1⃣ 签名校验
if zip_sign_path:
if not public_key_pem:
raise ZipSecurityError("Signature provided but public key missing")
_verify_signature(
zip_path,
Path(zip_sign_path),
public_key_pem,
)
try:
with ZipFile(zip_path) as zf:
infos = zf.infolist()
# 2⃣ 条目数量限制
if len(infos) > MAX_FILES:
raise ZipSecurityError("Too many files in zip")
total_size = 0
for info in infos:
name = info.filename
# 3⃣ 基础文件名校验
if name.startswith(("/", "\\")):
raise ZipSecurityError(f"Absolute path not allowed: {name}")
if ".." in Path(name).parts:
raise ZipSecurityError(f"Path traversal detected: {name}")
# 4⃣ 软链接检测
if _is_symlink(info):
raise ZipSecurityError(f"Symlink not allowed: {name}")
# 5⃣ 文件大小限制
if info.file_size > MAX_SINGLE_FILE_SIZE:
raise ZipSecurityError(f"File too large: {name}")
total_size += info.file_size
if total_size > MAX_TOTAL_SIZE:
raise ZipSecurityError("Total extracted size exceeded")
# 6⃣ 压缩比校验(防 zip bomb
if info.compress_size > 0:
ratio = info.file_size / info.compress_size
if ratio > MAX_COMPRESSION_RATIO:
raise ZipSecurityError(
f"Suspicious compression ratio ({ratio:.1f}): {name}"
)
# 7⃣ 最终路径校验
target_path = extract_dir / name
if not _is_safe_path(extract_dir, target_path):
raise ZipSecurityError(f"Unsafe extract path: {name}")
# 8⃣ 解压(手动)
if info.is_dir():
target_path.mkdir(parents=True, exist_ok=True)
else:
target_path.parent.mkdir(parents=True, exist_ok=True)
with zf.open(info) as src, open(target_path, "wb") as dst:
dst.write(src.read())
except BadZipFile:
raise ZipSecurityError("Invalid zip file")