feat: Support query file metadatas

This commit is contained in:
Fangyin Cheng 2024-08-29 23:07:51 +08:00
parent 93527e0b04
commit 0219f5733b
3 changed files with 159 additions and 5 deletions

View File

@ -1,3 +1,4 @@
import asyncio
import logging import logging
from functools import cache from functools import cache
from typing import List, Optional from typing import List, Optional
@ -13,7 +14,13 @@ from dbgpt.util import PaginationResult
from ..config import APP_NAME, SERVE_APP_NAME, SERVE_SERVICE_COMPONENT_NAME, ServeConfig from ..config import APP_NAME, SERVE_APP_NAME, SERVE_SERVICE_COMPONENT_NAME, ServeConfig
from ..service.service import Service from ..service.service import Service
from .schemas import ServeRequest, ServerResponse, UploadFileResponse from .schemas import (
FileMetadataBatchRequest,
FileMetadataResponse,
ServeRequest,
ServerResponse,
UploadFileResponse,
)
router = APIRouter() router = APIRouter()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -162,6 +169,74 @@ async def delete_file(
return Result.succ(None) return Result.succ(None)
@router.get(
"/files/metadata",
response_model=Result[FileMetadataResponse],
dependencies=[Depends(check_api_key)],
)
async def get_file_metadata(
uri: Optional[str] = Query(None, description="File URI"),
bucket: Optional[str] = Query(None, description="Bucket name"),
file_id: Optional[str] = Query(None, description="File ID"),
service: Service = Depends(get_service),
) -> Result[FileMetadataResponse]:
"""Get file metadata by URI or by bucket and file_id."""
if not uri and not (bucket and file_id):
raise HTTPException(
status_code=400,
detail="Either uri or (bucket and file_id) must be provided",
)
metadata = await blocking_func_to_async(
global_system_app, service.get_file_metadata, uri, bucket, file_id
)
return Result.succ(metadata)
@router.post(
"/files/metadata/batch",
response_model=Result[List[FileMetadataResponse]],
dependencies=[Depends(check_api_key)],
)
async def get_files_metadata_batch(
request: FileMetadataBatchRequest, service: Service = Depends(get_service)
) -> Result[List[FileMetadataResponse]]:
"""Get metadata for multiple files by URIs or bucket and file_id pairs."""
if not request.uris and not request.bucket_file_pairs:
raise HTTPException(
status_code=400,
detail="Either uris or bucket_file_pairs must be provided",
)
batch_req = []
if request.uris:
for uri in request.uris:
batch_req.append((uri, None, None))
elif request.bucket_file_pairs:
for pair in request.bucket_file_pairs:
batch_req.append((None, pair.bucket, pair.file_id))
else:
raise HTTPException(
status_code=400,
detail="Either uris or bucket_file_pairs must be provided",
)
batch_req_tasks = [
blocking_func_to_async(
global_system_app, service.get_file_metadata, uri, bucket, file_id
)
for uri, bucket, file_id in batch_req
]
metadata_list = await asyncio.gather(*batch_req_tasks)
if not metadata_list:
raise HTTPException(
status_code=404,
detail="File metadata not found",
)
return Result.succ(metadata_list)
def init_endpoints(system_app: SystemApp) -> None: def init_endpoints(system_app: SystemApp) -> None:
"""Initialize the endpoints""" """Initialize the endpoints"""
global global_system_app global global_system_app

View File

@ -1,7 +1,13 @@
# Define your Pydantic schemas here # Define your Pydantic schemas here
from typing import Any, Dict from typing import Any, Dict, List, Optional
from dbgpt._private.pydantic import BaseModel, ConfigDict, Field, model_to_dict from dbgpt._private.pydantic import (
BaseModel,
ConfigDict,
Field,
model_to_dict,
model_validator,
)
from ..config import SERVE_APP_NAME_HUMP from ..config import SERVE_APP_NAME_HUMP
@ -41,3 +47,41 @@ class UploadFileResponse(BaseModel):
def to_dict(self, **kwargs) -> Dict[str, Any]: def to_dict(self, **kwargs) -> Dict[str, Any]:
"""Convert the model to a dictionary""" """Convert the model to a dictionary"""
return model_to_dict(self, **kwargs) return model_to_dict(self, **kwargs)
class _BucketFilePair(BaseModel):
"""Bucket file pair model"""
bucket: str = Field(..., title="The bucket of the file")
file_id: str = Field(..., title="The ID of the file")
class FileMetadataBatchRequest(BaseModel):
"""File metadata batch request model"""
uris: Optional[List[str]] = Field(None, title="The URIs of the files")
bucket_file_pairs: Optional[List[_BucketFilePair]] = Field(
None, title="The bucket file pairs"
)
@model_validator(mode="after")
def check_uris_or_bucket_file_pairs(self):
# Check if either uris or bucket_file_pairs is provided
if not (self.uris or self.bucket_file_pairs):
raise ValueError("Either uris or bucket_file_pairs must be provided")
# Check only one of uris or bucket_file_pairs is provided
if self.uris and self.bucket_file_pairs:
raise ValueError("Only one of uris or bucket_file_pairs can be provided")
return self
class FileMetadataResponse(BaseModel):
"""File metadata model"""
file_name: str = Field(..., title="The name of the file")
file_id: str = Field(..., title="The ID of the file")
bucket: str = Field(..., title="The bucket of the file")
uri: str = Field(..., title="The URI of the file")
file_size: int = Field(..., title="The size of the file")
user_name: Optional[str] = Field(None, title="The user name")
sys_code: Optional[str] = Field(None, title="The system code")

View File

@ -1,7 +1,7 @@
import logging import logging
from typing import BinaryIO, List, Optional, Tuple from typing import BinaryIO, List, Optional, Tuple
from fastapi import UploadFile from fastapi import HTTPException, UploadFile
from dbgpt.component import BaseComponent, SystemApp from dbgpt.component import BaseComponent, SystemApp
from dbgpt.core.interface.file import FileMetadata, FileStorageClient, FileStorageURI from dbgpt.core.interface.file import FileMetadata, FileStorageClient, FileStorageURI
@ -10,7 +10,12 @@ from dbgpt.storage.metadata import BaseDao
from dbgpt.util.pagination_utils import PaginationResult from dbgpt.util.pagination_utils import PaginationResult
from dbgpt.util.tracer import root_tracer, trace from dbgpt.util.tracer import root_tracer, trace
from ..api.schemas import ServeRequest, ServerResponse, UploadFileResponse from ..api.schemas import (
FileMetadataResponse,
ServeRequest,
ServerResponse,
UploadFileResponse,
)
from ..config import SERVE_CONFIG_KEY_PREFIX, SERVE_SERVICE_COMPONENT_NAME, ServeConfig from ..config import SERVE_CONFIG_KEY_PREFIX, SERVE_SERVICE_COMPONENT_NAME, ServeConfig
from ..models.models import ServeDao, ServeEntity from ..models.models import ServeDao, ServeEntity
@ -117,3 +122,33 @@ class Service(BaseService[ServeEntity, ServeRequest, ServerResponse]):
def delete_file(self, bucket: str, file_id: str) -> None: def delete_file(self, bucket: str, file_id: str) -> None:
"""Delete a file by file_id.""" """Delete a file by file_id."""
self.file_storage_client.delete_file_by_id(bucket, file_id) self.file_storage_client.delete_file_by_id(bucket, file_id)
def get_file_metadata(
self,
uri: Optional[str] = None,
bucket: Optional[str] = None,
file_id: Optional[str] = None,
) -> Optional[FileMetadataResponse]:
"""Get the metadata of a file by file_id."""
if uri:
parsed_uri = FileStorageURI.parse(uri)
bucket, file_id = parsed_uri.bucket, parsed_uri.file_id
if not (bucket and file_id):
raise ValueError("Either uri or bucket and file_id must be provided.")
metadata = self.file_storage_client.storage_system.get_file_metadata(
bucket, file_id
)
if not metadata:
raise HTTPException(
status_code=404,
detail=f"File metadata not found: bucket={bucket}, file_id={file_id}, uri={uri}",
)
return FileMetadataResponse(
file_name=metadata.file_name,
file_id=metadata.file_id,
bucket=metadata.bucket,
uri=metadata.uri,
file_size=metadata.file_size,
user_name=metadata.user_name,
sys_code=metadata.sys_code,
)