From 0219f5733b4b648ac1f5aeec6244429e982a2682 Mon Sep 17 00:00:00 2001 From: Fangyin Cheng Date: Thu, 29 Aug 2024 23:07:51 +0800 Subject: [PATCH] feat: Support query file metadatas --- dbgpt/serve/file/api/endpoints.py | 77 ++++++++++++++++++++++++++++- dbgpt/serve/file/api/schemas.py | 48 +++++++++++++++++- dbgpt/serve/file/service/service.py | 39 ++++++++++++++- 3 files changed, 159 insertions(+), 5 deletions(-) diff --git a/dbgpt/serve/file/api/endpoints.py b/dbgpt/serve/file/api/endpoints.py index 26bbb9673..d5b65bc54 100644 --- a/dbgpt/serve/file/api/endpoints.py +++ b/dbgpt/serve/file/api/endpoints.py @@ -1,3 +1,4 @@ +import asyncio import logging from functools import cache from typing import List, Optional @@ -13,7 +14,13 @@ from dbgpt.util import PaginationResult from ..config import APP_NAME, SERVE_APP_NAME, SERVE_SERVICE_COMPONENT_NAME, ServeConfig from ..service.service import Service -from .schemas import ServeRequest, ServerResponse, UploadFileResponse +from .schemas import ( + FileMetadataBatchRequest, + FileMetadataResponse, + ServeRequest, + ServerResponse, + UploadFileResponse, +) router = APIRouter() logger = logging.getLogger(__name__) @@ -162,6 +169,74 @@ async def delete_file( return Result.succ(None) +@router.get( + "/files/metadata", + response_model=Result[FileMetadataResponse], + dependencies=[Depends(check_api_key)], +) +async def get_file_metadata( + uri: Optional[str] = Query(None, description="File URI"), + bucket: Optional[str] = Query(None, description="Bucket name"), + file_id: Optional[str] = Query(None, description="File ID"), + service: Service = Depends(get_service), +) -> Result[FileMetadataResponse]: + """Get file metadata by URI or by bucket and file_id.""" + if not uri and not (bucket and file_id): + raise HTTPException( + status_code=400, + detail="Either uri or (bucket and file_id) must be provided", + ) + + metadata = await blocking_func_to_async( + global_system_app, service.get_file_metadata, uri, bucket, file_id + ) + return Result.succ(metadata) + + +@router.post( + "/files/metadata/batch", + response_model=Result[List[FileMetadataResponse]], + dependencies=[Depends(check_api_key)], +) +async def get_files_metadata_batch( + request: FileMetadataBatchRequest, service: Service = Depends(get_service) +) -> Result[List[FileMetadataResponse]]: + """Get metadata for multiple files by URIs or bucket and file_id pairs.""" + if not request.uris and not request.bucket_file_pairs: + raise HTTPException( + status_code=400, + detail="Either uris or bucket_file_pairs must be provided", + ) + + batch_req = [] + if request.uris: + for uri in request.uris: + batch_req.append((uri, None, None)) + elif request.bucket_file_pairs: + for pair in request.bucket_file_pairs: + batch_req.append((None, pair.bucket, pair.file_id)) + else: + raise HTTPException( + status_code=400, + detail="Either uris or bucket_file_pairs must be provided", + ) + + batch_req_tasks = [ + blocking_func_to_async( + global_system_app, service.get_file_metadata, uri, bucket, file_id + ) + for uri, bucket, file_id in batch_req + ] + + metadata_list = await asyncio.gather(*batch_req_tasks) + if not metadata_list: + raise HTTPException( + status_code=404, + detail="File metadata not found", + ) + return Result.succ(metadata_list) + + def init_endpoints(system_app: SystemApp) -> None: """Initialize the endpoints""" global global_system_app diff --git a/dbgpt/serve/file/api/schemas.py b/dbgpt/serve/file/api/schemas.py index 911f71db3..bd8b3bbf2 100644 --- a/dbgpt/serve/file/api/schemas.py +++ b/dbgpt/serve/file/api/schemas.py @@ -1,7 +1,13 @@ # Define your Pydantic schemas here -from typing import Any, Dict +from typing import Any, Dict, List, Optional -from dbgpt._private.pydantic import BaseModel, ConfigDict, Field, model_to_dict +from dbgpt._private.pydantic import ( + BaseModel, + ConfigDict, + Field, + model_to_dict, + model_validator, +) from ..config import SERVE_APP_NAME_HUMP @@ -41,3 +47,41 @@ class UploadFileResponse(BaseModel): def to_dict(self, **kwargs) -> Dict[str, Any]: """Convert the model to a dictionary""" return model_to_dict(self, **kwargs) + + +class _BucketFilePair(BaseModel): + """Bucket file pair model""" + + bucket: str = Field(..., title="The bucket of the file") + file_id: str = Field(..., title="The ID of the file") + + +class FileMetadataBatchRequest(BaseModel): + """File metadata batch request model""" + + uris: Optional[List[str]] = Field(None, title="The URIs of the files") + bucket_file_pairs: Optional[List[_BucketFilePair]] = Field( + None, title="The bucket file pairs" + ) + + @model_validator(mode="after") + def check_uris_or_bucket_file_pairs(self): + # Check if either uris or bucket_file_pairs is provided + if not (self.uris or self.bucket_file_pairs): + raise ValueError("Either uris or bucket_file_pairs must be provided") + # Check only one of uris or bucket_file_pairs is provided + if self.uris and self.bucket_file_pairs: + raise ValueError("Only one of uris or bucket_file_pairs can be provided") + return self + + +class FileMetadataResponse(BaseModel): + """File metadata model""" + + file_name: str = Field(..., title="The name of the file") + file_id: str = Field(..., title="The ID of the file") + bucket: str = Field(..., title="The bucket of the file") + uri: str = Field(..., title="The URI of the file") + file_size: int = Field(..., title="The size of the file") + user_name: Optional[str] = Field(None, title="The user name") + sys_code: Optional[str] = Field(None, title="The system code") diff --git a/dbgpt/serve/file/service/service.py b/dbgpt/serve/file/service/service.py index 13e8b6225..85940ed35 100644 --- a/dbgpt/serve/file/service/service.py +++ b/dbgpt/serve/file/service/service.py @@ -1,7 +1,7 @@ import logging from typing import BinaryIO, List, Optional, Tuple -from fastapi import UploadFile +from fastapi import HTTPException, UploadFile from dbgpt.component import BaseComponent, SystemApp from dbgpt.core.interface.file import FileMetadata, FileStorageClient, FileStorageURI @@ -10,7 +10,12 @@ from dbgpt.storage.metadata import BaseDao from dbgpt.util.pagination_utils import PaginationResult from dbgpt.util.tracer import root_tracer, trace -from ..api.schemas import ServeRequest, ServerResponse, UploadFileResponse +from ..api.schemas import ( + FileMetadataResponse, + ServeRequest, + ServerResponse, + UploadFileResponse, +) from ..config import SERVE_CONFIG_KEY_PREFIX, SERVE_SERVICE_COMPONENT_NAME, ServeConfig from ..models.models import ServeDao, ServeEntity @@ -117,3 +122,33 @@ class Service(BaseService[ServeEntity, ServeRequest, ServerResponse]): def delete_file(self, bucket: str, file_id: str) -> None: """Delete a file by file_id.""" self.file_storage_client.delete_file_by_id(bucket, file_id) + + def get_file_metadata( + self, + uri: Optional[str] = None, + bucket: Optional[str] = None, + file_id: Optional[str] = None, + ) -> Optional[FileMetadataResponse]: + """Get the metadata of a file by file_id.""" + if uri: + parsed_uri = FileStorageURI.parse(uri) + bucket, file_id = parsed_uri.bucket, parsed_uri.file_id + if not (bucket and file_id): + raise ValueError("Either uri or bucket and file_id must be provided.") + metadata = self.file_storage_client.storage_system.get_file_metadata( + bucket, file_id + ) + if not metadata: + raise HTTPException( + status_code=404, + detail=f"File metadata not found: bucket={bucket}, file_id={file_id}, uri={uri}", + ) + return FileMetadataResponse( + file_name=metadata.file_name, + file_id=metadata.file_id, + bucket=metadata.bucket, + uri=metadata.uri, + file_size=metadata.file_size, + user_name=metadata.user_name, + sys_code=metadata.sys_code, + )