diff --git a/packages/dbgpt-serve/src/dbgpt_serve/agent/hub/plugin_hub.py b/packages/dbgpt-serve/src/dbgpt_serve/agent/hub/plugin_hub.py index 54e3d0aeb..8832a45e5 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/agent/hub/plugin_hub.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/agent/hub/plugin_hub.py @@ -1,3 +1,4 @@ +import ast import glob import json import logging @@ -29,6 +30,93 @@ class PluginHub: os.makedirs(plugin_dir, exist_ok=True) self.plugin_dir = plugin_dir self.temp_hub_file_path = os.path.join(plugin_dir, "temp") + # Disallowed modules that might lead to RCE + self.disallowed_imports = { + "subprocess", + "os.system", + "os.popen", + "os.spawn", + "os.exec", + "eval", + "exec", + "compile", + "pty", + "commands", + "asyncio.create_subprocess", + "multiprocessing", + "pickle", + "marshal", + "shelve", + } + # Disallowed AST nodes that could be used for code execution + self.disallowed_ast_nodes = {ast.Expr, ast.Call} + + def _validate_plugin_code(self, file_path: str) -> bool: + """Validate plugin code for potentially malicious operations. + + Args: + file_path: Path to the Python file to validate + + Returns: + bool: True if the code is safe, raises an exception otherwise + """ + with open(file_path, "r", encoding="utf-8") as f: + code = f.read() + + # Parse the code into an AST + try: + tree = ast.parse(code) + except SyntaxError: + raise ValueError("Plugin contains invalid Python syntax") + + # Check for potentially dangerous imports + for node in ast.walk(tree): + # Check for import statements + if isinstance(node, ast.Import): + for name in node.names: + if name.name in self.disallowed_imports: + raise ValueError( + f"Plugin contains disallowed import: {name.name}" + ) + + # Check for from ... import statements + elif isinstance(node, ast.ImportFrom): + module = node.module or "" + if module in self.disallowed_imports: + raise ValueError(f"Plugin contains disallowed import: {module}") + + for name in node.names: + combined = f"{module}.{name.name}" if module else name.name + if ( + combined in self.disallowed_imports + or name.name in self.disallowed_imports + ): + raise ValueError( + f"Plugin contains disallowed import: {combined}" + ) + + # Check for calls to dangerous functions + elif isinstance(node, ast.Call): + if isinstance(node.func, ast.Name): + if node.func.id in {"eval", "exec", "compile"}: + raise ValueError( + f"Plugin contains potentially dangerous function call: " + f"{node.func.id}" + ) + elif isinstance(node.func, ast.Attribute): + if isinstance(node.func.value, ast.Name): + if node.func.value.id == "os" and node.func.attr in { + "system", + "popen", + "spawn", + "exec", + }: + raise ValueError( + f"Plugin contains potentially dangerous function call: " + f"os.{node.func.attr}" + ) + + return True def install_plugin(self, plugin_name: str, user_name: str = None): logger.info(f"install_plugin {plugin_name}") @@ -188,6 +276,23 @@ class PluginHub: return filename + def _validate_extracted_files(self, directory: str) -> bool: + """Validate all Python files in the extracted plugin directory. + + Args: + directory: The directory containing extracted plugin files + + Returns: + bool: True if all files are safe, raises an exception otherwise + """ + for root, _, files in os.walk(directory): + for file in files: + if file.endswith(".py"): + file_path = os.path.join(root, file) + logger.info(f"Validating plugin file: {file_path}") + self._validate_plugin_code(file_path) + return True + async def upload_my_plugin(self, doc_file: UploadFile, user: Any = Default_User): # Verify and clean file names try: @@ -221,6 +326,48 @@ class PluginHub: os.remove(tmp_path) raise e + # If it's a zip file, extract and validate each Python file before loading + if safe_filename.lower().endswith(".zip"): + import zipfile + + extract_dir = os.path.join( + self.plugin_dir, f"temp_extract_{os.path.splitext(safe_filename)[0]}" + ) + try: + with zipfile.ZipFile(file_path, "r") as zip_ref: + # Create a secure temp directory for extraction + os.makedirs(extract_dir, exist_ok=True) + + # Check for zip file issues (bombs, traversal) + for zip_info in zip_ref.infolist(): + if ( + zip_info.file_size > 10 * 1024 * 1024 + ): # 10MB limit for any single file + raise ValueError("Zip contains files that are too large") + + # Check for path traversal + if ".." in zip_info.filename or zip_info.filename.startswith( + "/" + ): + raise ValueError("Zip contains potentially malicious paths") + + # Extract the zip + zip_ref.extractall(extract_dir) + + # Validate all Python files before allowing them to be loaded + self._validate_extracted_files(extract_dir) + + except zipfile.BadZipFile: + raise ValueError("Invalid or corrupted zip file") + finally: + # Clean up the extraction directory + if os.path.exists(extract_dir): + shutil.rmtree(extract_dir) + + # Validate single file plugins + elif safe_filename.lower().endswith(".py"): + self._validate_plugin_code(file_path) + # Scan and validate the plugin try: my_plugins = scan_plugins(self.plugin_dir, safe_filename)