diff --git a/dbgpt/serve/agent/hub/plugin_hub.py b/dbgpt/serve/agent/hub/plugin_hub.py index 49ee29008..8b159ca29 100644 --- a/dbgpt/serve/agent/hub/plugin_hub.py +++ b/dbgpt/serve/agent/hub/plugin_hub.py @@ -2,8 +2,10 @@ import glob import json import logging import os +import re import shutil import tempfile +from pathlib import Path from typing import Any from fastapi import UploadFile @@ -165,24 +167,73 @@ class PluginHub: except Exception as e: raise ValueError(f"Update Agent Hub Db Info Faild!{str(e)}") + def _sanitize_filename(self, filename: str) -> str: + """Sanitize the filename to prevent directory traversal attacks. + + Args: + filename: The original filename + + Returns: + str: Sanitized filename + """ + # Only keep the basic filename, remove any path information + filename = os.path.basename(filename) + + # Remove any unsafe characters + filename = re.sub(r"[^a-zA-Z0-9._-]", "", filename) + + # Ensure the filename is not empty and valid + if not filename or filename.startswith("."): + raise ValueError("Invalid filename") + + return filename + async def upload_my_plugin(self, doc_file: UploadFile, user: Any = Default_User): - # We can not move temp file in windows system when we open file in context of `with` - file_path = os.path.join(self.plugin_dir, doc_file.filename) + # Verify and clean file names + try: + safe_filename = self._sanitize_filename(doc_file.filename) + except ValueError as e: + raise ValueError(f"Invalid plugin file: {str(e)}") + + # Structure a safe file path + file_path = os.path.join(self.plugin_dir, safe_filename) + + # Verify the final path is within the allowed directory + if ( + not Path(file_path) + .resolve() + .is_relative_to(Path(self.plugin_dir).resolve()) + ): + raise ValueError("Invalid file path") + if os.path.exists(file_path): os.remove(file_path) - tmp_fd, tmp_path = tempfile.mkstemp(dir=os.path.join(self.plugin_dir)) - with os.fdopen(tmp_fd, "wb") as tmp: - tmp.write(await doc_file.read()) - shutil.move( - tmp_path, - os.path.join(self.plugin_dir, doc_file.filename), - ) - my_plugins = scan_plugins(self.plugin_dir, doc_file.filename) + # Use a temporary file for secure file writing + tmp_fd, tmp_path = tempfile.mkstemp(dir=self.plugin_dir) + try: + with os.fdopen(tmp_fd, "wb") as tmp: + tmp.write(await doc_file.read()) + shutil.move(tmp_path, file_path) + except Exception as e: + # Ensure the temporary file is cleaned up + if os.path.exists(tmp_path): + os.remove(tmp_path) + raise e + + # Scan and validate the plugin + try: + my_plugins = scan_plugins(self.plugin_dir, safe_filename) + except Exception as e: + # If the plugin validation fails, clean up the uploaded file + if os.path.exists(file_path): + os.remove(file_path) + raise ValueError(f"Invalid plugin file: {str(e)}") if user is None or len(user) <= 0: user = Default_User + # Update the database for my_plugin in my_plugins: my_plugin_entiy = self.my_plugin_dao.get_by_user_and_plugin( user, my_plugin._name @@ -195,7 +246,7 @@ class PluginHub: my_plugin_entiy.user_code = user my_plugin_entiy.user_name = user my_plugin_entiy.tenant = "" - my_plugin_entiy.file_name = doc_file.filename + my_plugin_entiy.file_name = safe_filename self.my_plugin_dao.raw_update(my_plugin_entiy) def reload_my_plugins(self):