[BUG] RCE Vulnerability in DB-GPT Plugin Upload System (#2649)

Co-authored-by: nkoorty <amalyshau2002@gmail.com>
This commit is contained in:
Gecko Security 2025-04-28 07:53:42 +02:00 committed by GitHub
parent 445076b433
commit b16c6793ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,3 +1,4 @@
import ast
import glob
import json
import logging
@ -29,6 +30,93 @@ class PluginHub:
os.makedirs(plugin_dir, exist_ok=True)
self.plugin_dir = plugin_dir
self.temp_hub_file_path = os.path.join(plugin_dir, "temp")
# Disallowed modules that might lead to RCE
self.disallowed_imports = {
"subprocess",
"os.system",
"os.popen",
"os.spawn",
"os.exec",
"eval",
"exec",
"compile",
"pty",
"commands",
"asyncio.create_subprocess",
"multiprocessing",
"pickle",
"marshal",
"shelve",
}
# Disallowed AST nodes that could be used for code execution
self.disallowed_ast_nodes = {ast.Expr, ast.Call}
def _validate_plugin_code(self, file_path: str) -> bool:
"""Validate plugin code for potentially malicious operations.
Args:
file_path: Path to the Python file to validate
Returns:
bool: True if the code is safe, raises an exception otherwise
"""
with open(file_path, "r", encoding="utf-8") as f:
code = f.read()
# Parse the code into an AST
try:
tree = ast.parse(code)
except SyntaxError:
raise ValueError("Plugin contains invalid Python syntax")
# Check for potentially dangerous imports
for node in ast.walk(tree):
# Check for import statements
if isinstance(node, ast.Import):
for name in node.names:
if name.name in self.disallowed_imports:
raise ValueError(
f"Plugin contains disallowed import: {name.name}"
)
# Check for from ... import statements
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
if module in self.disallowed_imports:
raise ValueError(f"Plugin contains disallowed import: {module}")
for name in node.names:
combined = f"{module}.{name.name}" if module else name.name
if (
combined in self.disallowed_imports
or name.name in self.disallowed_imports
):
raise ValueError(
f"Plugin contains disallowed import: {combined}"
)
# Check for calls to dangerous functions
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in {"eval", "exec", "compile"}:
raise ValueError(
f"Plugin contains potentially dangerous function call: "
f"{node.func.id}"
)
elif isinstance(node.func, ast.Attribute):
if isinstance(node.func.value, ast.Name):
if node.func.value.id == "os" and node.func.attr in {
"system",
"popen",
"spawn",
"exec",
}:
raise ValueError(
f"Plugin contains potentially dangerous function call: "
f"os.{node.func.attr}"
)
return True
def install_plugin(self, plugin_name: str, user_name: str = None):
logger.info(f"install_plugin {plugin_name}")
@ -188,6 +276,23 @@ class PluginHub:
return filename
def _validate_extracted_files(self, directory: str) -> bool:
"""Validate all Python files in the extracted plugin directory.
Args:
directory: The directory containing extracted plugin files
Returns:
bool: True if all files are safe, raises an exception otherwise
"""
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
logger.info(f"Validating plugin file: {file_path}")
self._validate_plugin_code(file_path)
return True
async def upload_my_plugin(self, doc_file: UploadFile, user: Any = Default_User):
# Verify and clean file names
try:
@ -221,6 +326,48 @@ class PluginHub:
os.remove(tmp_path)
raise e
# If it's a zip file, extract and validate each Python file before loading
if safe_filename.lower().endswith(".zip"):
import zipfile
extract_dir = os.path.join(
self.plugin_dir, f"temp_extract_{os.path.splitext(safe_filename)[0]}"
)
try:
with zipfile.ZipFile(file_path, "r") as zip_ref:
# Create a secure temp directory for extraction
os.makedirs(extract_dir, exist_ok=True)
# Check for zip file issues (bombs, traversal)
for zip_info in zip_ref.infolist():
if (
zip_info.file_size > 10 * 1024 * 1024
): # 10MB limit for any single file
raise ValueError("Zip contains files that are too large")
# Check for path traversal
if ".." in zip_info.filename or zip_info.filename.startswith(
"/"
):
raise ValueError("Zip contains potentially malicious paths")
# Extract the zip
zip_ref.extractall(extract_dir)
# Validate all Python files before allowing them to be loaded
self._validate_extracted_files(extract_dir)
except zipfile.BadZipFile:
raise ValueError("Invalid or corrupted zip file")
finally:
# Clean up the extraction directory
if os.path.exists(extract_dir):
shutil.rmtree(extract_dir)
# Validate single file plugins
elif safe_filename.lower().endswith(".py"):
self._validate_plugin_code(file_path)
# Scan and validate the plugin
try:
my_plugins = scan_plugins(self.plugin_dir, safe_filename)