Fix Critical Directory Traversal Vulnerability (#2098)

This commit is contained in:
Raphael 2024-11-17 17:46:33 -08:00 committed by GitHub
parent b392d51adf
commit 780ce803e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3,8 +3,9 @@ import os
import shutil import shutil
import tempfile import tempfile
from typing import List from typing import List
from pathlib import Path
from fastapi import APIRouter, Depends, File, Form, UploadFile from fastapi import APIRouter, Depends, File, Form, UploadFile, HTTPException
from dbgpt._private.config import Config from dbgpt._private.config import Config
from dbgpt.app.knowledge.request.request import ( from dbgpt.app.knowledge.request.request import (
@ -340,43 +341,59 @@ async def document_upload(
print(f"/document/upload params: {space_name}") print(f"/document/upload params: {space_name}")
try: try:
if doc_file: if doc_file:
if not os.path.exists(os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space_name)): # Sanitize inputs to prevent path traversal
os.makedirs(os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space_name)) safe_space_name = os.path.basename(space_name)
# We can not move temp file in windows system when we open file in context of `with` safe_filename = os.path.basename(doc_file.filename)
tmp_fd, tmp_path = tempfile.mkstemp(
dir=os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space_name) # Create absolute paths and verify they are within allowed directory
) upload_dir = os.path.abspath(os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, safe_space_name))
target_path = os.path.abspath(os.path.join(upload_dir, safe_filename))
if not os.path.abspath(KNOWLEDGE_UPLOAD_ROOT_PATH) in target_path:
raise HTTPException(status_code=400, detail="Invalid path detected")
if not os.path.exists(upload_dir):
os.makedirs(upload_dir)
# Create temp file
tmp_fd, tmp_path = tempfile.mkstemp(dir=upload_dir)
try:
with os.fdopen(tmp_fd, "wb") as tmp: with os.fdopen(tmp_fd, "wb") as tmp:
tmp.write(await doc_file.read()) tmp.write(await doc_file.read())
shutil.move(
tmp_path, shutil.move(tmp_path, target_path)
os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space_name, doc_file.filename),
)
request = KnowledgeDocumentRequest() request = KnowledgeDocumentRequest()
request.doc_name = doc_name request.doc_name = doc_name
request.doc_type = doc_type request.doc_type = doc_type
request.content = os.path.join( request.content = target_path
KNOWLEDGE_UPLOAD_ROOT_PATH, space_name, doc_file.filename
)
space_res = knowledge_space_service.get_knowledge_space( space_res = knowledge_space_service.get_knowledge_space(
KnowledgeSpaceRequest(name=space_name) KnowledgeSpaceRequest(name=safe_space_name)
) )
if len(space_res) == 0: if len(space_res) == 0:
# create default space # create default space
if "default" != space_name: if "default" != safe_space_name:
raise Exception(f"you have not create your knowledge space.") raise Exception(f"you have not create your knowledge space.")
knowledge_space_service.create_knowledge_space( knowledge_space_service.create_knowledge_space(
KnowledgeSpaceRequest( KnowledgeSpaceRequest(
name=space_name, name=safe_space_name,
desc="first db-gpt rag application", desc="first db-gpt rag application",
owner="dbgpt", owner="dbgpt",
) )
) )
return Result.succ( return Result.succ(
knowledge_space_service.create_knowledge_document( knowledge_space_service.create_knowledge_document(
space=space_name, request=request space=safe_space_name, request=request
) )
) )
except Exception as e:
# Clean up temp file if anything goes wrong
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise e
return Result.failed(code="E000X", msg=f"doc_file is None") return Result.failed(code="E000X", msg=f"doc_file is None")
except Exception as e: except Exception as e:
return Result.failed(code="E000X", msg=f"document add error {e}") return Result.failed(code="E000X", msg=f"document add error {e}")