fix(security): fix path traversal vulnerability (CVE-2024-10834) (#2267)

Co-authored-by: aries_ckt <916701291@qq.com>
This commit is contained in:
haawha 2025-01-03 14:18:51 +08:00 committed by GitHub
parent db76fb2dec
commit ad1e8e27a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,8 +2,10 @@ import glob
import json
import logging
import os
import re
import shutil
import tempfile
from pathlib import Path
from typing import Any
from fastapi import UploadFile
@ -165,24 +167,73 @@ class PluginHub:
except Exception as e:
raise ValueError(f"Update Agent Hub Db Info Faild!{str(e)}")
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize the filename to prevent directory traversal attacks.
Args:
filename: The original filename
Returns:
str: Sanitized filename
"""
# Only keep the basic filename, remove any path information
filename = os.path.basename(filename)
# Remove any unsafe characters
filename = re.sub(r"[^a-zA-Z0-9._-]", "", filename)
# Ensure the filename is not empty and valid
if not filename or filename.startswith("."):
raise ValueError("Invalid filename")
return filename
async def upload_my_plugin(self, doc_file: UploadFile, user: Any = Default_User):
# We can not move temp file in windows system when we open file in context of `with`
file_path = os.path.join(self.plugin_dir, doc_file.filename)
# Verify and clean file names
try:
safe_filename = self._sanitize_filename(doc_file.filename)
except ValueError as e:
raise ValueError(f"Invalid plugin file: {str(e)}")
# Structure a safe file path
file_path = os.path.join(self.plugin_dir, safe_filename)
# Verify the final path is within the allowed directory
if (
not Path(file_path)
.resolve()
.is_relative_to(Path(self.plugin_dir).resolve())
):
raise ValueError("Invalid file path")
if os.path.exists(file_path):
os.remove(file_path)
tmp_fd, tmp_path = tempfile.mkstemp(dir=os.path.join(self.plugin_dir))
with os.fdopen(tmp_fd, "wb") as tmp:
tmp.write(await doc_file.read())
shutil.move(
tmp_path,
os.path.join(self.plugin_dir, doc_file.filename),
)
my_plugins = scan_plugins(self.plugin_dir, doc_file.filename)
# Use a temporary file for secure file writing
tmp_fd, tmp_path = tempfile.mkstemp(dir=self.plugin_dir)
try:
with os.fdopen(tmp_fd, "wb") as tmp:
tmp.write(await doc_file.read())
shutil.move(tmp_path, file_path)
except Exception as e:
# Ensure the temporary file is cleaned up
if os.path.exists(tmp_path):
os.remove(tmp_path)
raise e
# Scan and validate the plugin
try:
my_plugins = scan_plugins(self.plugin_dir, safe_filename)
except Exception as e:
# If the plugin validation fails, clean up the uploaded file
if os.path.exists(file_path):
os.remove(file_path)
raise ValueError(f"Invalid plugin file: {str(e)}")
if user is None or len(user) <= 0:
user = Default_User
# Update the database
for my_plugin in my_plugins:
my_plugin_entiy = self.my_plugin_dao.get_by_user_and_plugin(
user, my_plugin._name
@ -195,7 +246,7 @@ class PluginHub:
my_plugin_entiy.user_code = user
my_plugin_entiy.user_name = user
my_plugin_entiy.tenant = ""
my_plugin_entiy.file_name = doc_file.filename
my_plugin_entiy.file_name = safe_filename
self.my_plugin_dao.raw_update(my_plugin_entiy)
def reload_my_plugins(self):