mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-26 13:27:46 +00:00
155 lines
5.2 KiB
Python
155 lines
5.2 KiB
Python
import logging
|
|
from typing import BinaryIO, List, Optional, Tuple
|
|
|
|
from fastapi import HTTPException, UploadFile
|
|
|
|
from dbgpt.component import BaseComponent, SystemApp
|
|
from dbgpt.core.interface.file import FileMetadata, FileStorageClient, FileStorageURI
|
|
from dbgpt.serve.core import BaseService
|
|
from dbgpt.storage.metadata import BaseDao
|
|
from dbgpt.util.pagination_utils import PaginationResult
|
|
from dbgpt.util.tracer import root_tracer, trace
|
|
|
|
from ..api.schemas import (
|
|
FileMetadataResponse,
|
|
ServeRequest,
|
|
ServerResponse,
|
|
UploadFileResponse,
|
|
)
|
|
from ..config import SERVE_CONFIG_KEY_PREFIX, SERVE_SERVICE_COMPONENT_NAME, ServeConfig
|
|
from ..models.models import ServeDao, ServeEntity
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Service(BaseService[ServeEntity, ServeRequest, ServerResponse]):
|
|
"""The service class for File"""
|
|
|
|
name = SERVE_SERVICE_COMPONENT_NAME
|
|
|
|
def __init__(self, system_app: SystemApp, dao: Optional[ServeDao] = None):
|
|
self._system_app = None
|
|
self._serve_config: ServeConfig = None
|
|
self._dao: ServeDao = dao
|
|
super().__init__(system_app)
|
|
|
|
def init_app(self, system_app: SystemApp) -> None:
|
|
"""Initialize the service
|
|
|
|
Args:
|
|
system_app (SystemApp): The system app
|
|
"""
|
|
super().init_app(system_app)
|
|
self._serve_config = ServeConfig.from_app_config(
|
|
system_app.config, SERVE_CONFIG_KEY_PREFIX
|
|
)
|
|
self._dao = self._dao or ServeDao(self._serve_config)
|
|
self._system_app = system_app
|
|
|
|
@property
|
|
def dao(self) -> BaseDao[ServeEntity, ServeRequest, ServerResponse]:
|
|
"""Returns the internal DAO."""
|
|
return self._dao
|
|
|
|
@property
|
|
def config(self) -> ServeConfig:
|
|
"""Returns the internal ServeConfig."""
|
|
return self._serve_config
|
|
|
|
@property
|
|
def file_storage_client(self) -> FileStorageClient:
|
|
"""Returns the internal FileStorageClient.
|
|
|
|
Returns:
|
|
FileStorageClient: The internal FileStorageClient
|
|
"""
|
|
file_storage_client = FileStorageClient.get_instance(
|
|
self._system_app, default_component=None
|
|
)
|
|
if file_storage_client:
|
|
return file_storage_client
|
|
else:
|
|
from ..serve import Serve
|
|
|
|
file_storage_client = Serve.get_instance(
|
|
self._system_app
|
|
).file_storage_client
|
|
self._system_app.register_instance(file_storage_client)
|
|
return file_storage_client
|
|
|
|
@trace("upload_files")
|
|
def upload_files(
|
|
self,
|
|
bucket: str,
|
|
storage_type: str,
|
|
files: List[UploadFile],
|
|
user_name: Optional[str] = None,
|
|
sys_code: Optional[str] = None,
|
|
) -> List[UploadFileResponse]:
|
|
"""Upload files by a list of UploadFile."""
|
|
results = []
|
|
for file in files:
|
|
file_name = file.filename
|
|
logger.info(f"Uploading file {file_name} to bucket {bucket}")
|
|
custom_metadata = {
|
|
"user_name": user_name,
|
|
"sys_code": sys_code,
|
|
}
|
|
uri = self.file_storage_client.save_file(
|
|
bucket,
|
|
file_name,
|
|
file_data=file.file,
|
|
storage_type=storage_type,
|
|
custom_metadata=custom_metadata,
|
|
)
|
|
parsed_uri = FileStorageURI.parse(uri)
|
|
logger.info(f"Uploaded file {file_name} to bucket {bucket}, uri={uri}")
|
|
results.append(
|
|
UploadFileResponse(
|
|
file_name=file_name,
|
|
file_id=parsed_uri.file_id,
|
|
bucket=bucket,
|
|
uri=uri,
|
|
)
|
|
)
|
|
return results
|
|
|
|
@trace("download_file")
|
|
def download_file(self, bucket: str, file_id: str) -> Tuple[BinaryIO, FileMetadata]:
|
|
"""Download a file by file_id."""
|
|
return self.file_storage_client.get_file_by_id(bucket, file_id)
|
|
|
|
def delete_file(self, bucket: str, file_id: str) -> None:
|
|
"""Delete a file by file_id."""
|
|
self.file_storage_client.delete_file_by_id(bucket, file_id)
|
|
|
|
def get_file_metadata(
|
|
self,
|
|
uri: Optional[str] = None,
|
|
bucket: Optional[str] = None,
|
|
file_id: Optional[str] = None,
|
|
) -> Optional[FileMetadataResponse]:
|
|
"""Get the metadata of a file by file_id."""
|
|
if uri:
|
|
parsed_uri = FileStorageURI.parse(uri)
|
|
bucket, file_id = parsed_uri.bucket, parsed_uri.file_id
|
|
if not (bucket and file_id):
|
|
raise ValueError("Either uri or bucket and file_id must be provided.")
|
|
metadata = self.file_storage_client.storage_system.get_file_metadata(
|
|
bucket, file_id
|
|
)
|
|
if not metadata:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"File metadata not found: bucket={bucket}, file_id={file_id}, uri={uri}",
|
|
)
|
|
return FileMetadataResponse(
|
|
file_name=metadata.file_name,
|
|
file_id=metadata.file_id,
|
|
bucket=metadata.bucket,
|
|
uri=metadata.uri,
|
|
file_size=metadata.file_size,
|
|
user_name=metadata.user_name,
|
|
sys_code=metadata.sys_code,
|
|
)
|