mirror of
https://github.com/jumpserver/jumpserver.git
synced 2026-03-19 19:42:07 +00:00
146 lines
4.4 KiB
Python
146 lines
4.4 KiB
Python
import os
|
||
import stat
|
||
import hashlib
|
||
from pathlib import Path
|
||
from rest_framework.exceptions import ValidationError
|
||
from zipfile import ZipFile, BadZipFile
|
||
|
||
|
||
# -------------------------
|
||
# 可按你们安全规范调整
|
||
# -------------------------
|
||
MAX_FILES = 1000
|
||
MAX_SINGLE_FILE_SIZE = 200 * 1024 * 1024 # 200MB
|
||
MAX_TOTAL_SIZE = 1 * 1024 * 1024 * 1024 # 1GB
|
||
MAX_COMPRESSION_RATIO = 100 # 解压 / 压缩
|
||
|
||
|
||
class ZipSecurityError(ValidationError):
|
||
pass
|
||
|
||
|
||
# -------------------------
|
||
# 工具函数
|
||
# -------------------------
|
||
def _is_symlink(zip_info):
|
||
return stat.S_ISLNK(zip_info.external_attr >> 16)
|
||
|
||
|
||
def _is_safe_path(base_dir: Path, target: Path) -> bool:
|
||
try:
|
||
return target.resolve().is_relative_to(base_dir.resolve())
|
||
except AttributeError:
|
||
# Python < 3.9
|
||
return str(target.resolve()).startswith(str(base_dir.resolve()))
|
||
|
||
|
||
def _verify_signature(zip_path: Path, sig_path: Path, public_key_pem: bytes):
|
||
"""
|
||
示例:RSA + SHA256
|
||
你可以替换成你们自己的验签逻辑
|
||
"""
|
||
from cryptography.hazmat.primitives import hashes, serialization
|
||
from cryptography.hazmat.primitives.asymmetric import padding
|
||
|
||
data = zip_path.read_bytes()
|
||
signature = sig_path.read_bytes()
|
||
|
||
public_key = serialization.load_pem_public_key(public_key_pem)
|
||
|
||
public_key.verify(
|
||
signature,
|
||
data,
|
||
padding.PKCS1v15(),
|
||
hashes.SHA256(),
|
||
)
|
||
|
||
|
||
# -------------------------
|
||
# 主函数
|
||
# -------------------------
|
||
def safe_extract_zip(
|
||
zip_path: str | Path,
|
||
extract_dir: str | Path,
|
||
zip_sign_path: str | Path | None = None,
|
||
*,
|
||
public_key_pem: bytes | None = None,
|
||
):
|
||
"""
|
||
安全解压 zip
|
||
|
||
:param zip_path: zip 文件路径
|
||
:param extract_dir: 解压目标目录
|
||
:param zip_sign_path: 可选,zip 签名文件路径
|
||
:param public_key_pem: 可选,验签用公钥
|
||
"""
|
||
|
||
zip_path = Path(zip_path)
|
||
extract_dir = Path(extract_dir)
|
||
extract_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 1️⃣ 签名校验
|
||
if zip_sign_path:
|
||
if not public_key_pem:
|
||
raise ZipSecurityError("Signature provided but public key missing")
|
||
|
||
_verify_signature(
|
||
zip_path,
|
||
Path(zip_sign_path),
|
||
public_key_pem,
|
||
)
|
||
|
||
try:
|
||
with ZipFile(zip_path) as zf:
|
||
infos = zf.infolist()
|
||
|
||
# 2️⃣ 条目数量限制
|
||
if len(infos) > MAX_FILES:
|
||
raise ZipSecurityError("Too many files in zip")
|
||
|
||
total_size = 0
|
||
|
||
for info in infos:
|
||
name = info.filename
|
||
|
||
# 3️⃣ 基础文件名校验
|
||
if name.startswith(("/", "\\")):
|
||
raise ZipSecurityError(f"Absolute path not allowed: {name}")
|
||
|
||
if ".." in Path(name).parts:
|
||
raise ZipSecurityError(f"Path traversal detected: {name}")
|
||
|
||
# 4️⃣ 软链接检测
|
||
if _is_symlink(info):
|
||
raise ZipSecurityError(f"Symlink not allowed: {name}")
|
||
|
||
# 5️⃣ 文件大小限制
|
||
if info.file_size > MAX_SINGLE_FILE_SIZE:
|
||
raise ZipSecurityError(f"File too large: {name}")
|
||
|
||
total_size += info.file_size
|
||
if total_size > MAX_TOTAL_SIZE:
|
||
raise ZipSecurityError("Total extracted size exceeded")
|
||
|
||
# 6️⃣ 压缩比校验(防 zip bomb)
|
||
if info.compress_size > 0:
|
||
ratio = info.file_size / info.compress_size
|
||
if ratio > MAX_COMPRESSION_RATIO:
|
||
raise ZipSecurityError(
|
||
f"Suspicious compression ratio ({ratio:.1f}): {name}"
|
||
)
|
||
|
||
# 7️⃣ 最终路径校验
|
||
target_path = extract_dir / name
|
||
if not _is_safe_path(extract_dir, target_path):
|
||
raise ZipSecurityError(f"Unsafe extract path: {name}")
|
||
|
||
# 8️⃣ 解压(手动)
|
||
if info.is_dir():
|
||
target_path.mkdir(parents=True, exist_ok=True)
|
||
else:
|
||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with zf.open(info) as src, open(target_path, "wb") as dst:
|
||
dst.write(src.read())
|
||
|
||
except BadZipFile:
|
||
raise ZipSecurityError("Invalid zip file") |