From b2dd66dc6d11f2e991c64d6f747ed39713cc74df Mon Sep 17 00:00:00 2001 From: Fangyin Cheng Date: Mon, 17 Mar 2025 11:59:00 +0800 Subject: [PATCH] feat(storage): Support oss and s3 --- configs/dbgpt-cloud-storage.example.toml | 51 ++ .../dbgpt-app/src/dbgpt_app/knowledge/api.py | 1 - .../src/dbgpt_app/openapi/api_v1/api_v1.py | 1 - .../chat_excel/excel_analyze/chat.py | 1 - .../src/dbgpt/core/awel/flow/base.py | 5 + .../src/dbgpt/core/interface/file.py | 34 +- .../dbgpt-core/src/dbgpt/util/module_utils.py | 20 + packages/dbgpt-ext/pyproject.toml | 7 + .../src/dbgpt_ext/storage/file/__init__.py | 0 .../dbgpt_ext/storage/file/oss/__init__.py | 0 .../src/dbgpt_ext/storage/file/oss/config.py | 102 +++ .../dbgpt_ext/storage/file/oss/oss_storage.py | 484 ++++++++++++++ .../src/dbgpt_ext/storage/file/s3/__init__.py | 0 .../src/dbgpt_ext/storage/file/s3/config.py | 118 ++++ .../dbgpt_ext/storage/file/s3/s3_storage.py | 589 ++++++++++++++++++ .../src/dbgpt_serve/file/api/endpoints.py | 1 - .../src/dbgpt_serve/file/config.py | 20 +- .../dbgpt-serve/src/dbgpt_serve/file/serve.py | 11 + .../src/dbgpt_serve/file/service/service.py | 2 - .../src/dbgpt_serve/rag/service/service.py | 1 - uv.lock | 99 +++ 21 files changed, 1535 insertions(+), 12 deletions(-) create mode 100644 configs/dbgpt-cloud-storage.example.toml create mode 100644 packages/dbgpt-ext/src/dbgpt_ext/storage/file/__init__.py create mode 100644 packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/__init__.py create mode 100644 packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/config.py create mode 100644 packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/oss_storage.py create mode 100644 packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/__init__.py create mode 100644 packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/config.py create mode 100644 packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/s3_storage.py diff --git a/configs/dbgpt-cloud-storage.example.toml b/configs/dbgpt-cloud-storage.example.toml new file mode 100644 index 000000000..f397338cb --- /dev/null +++ b/configs/dbgpt-cloud-storage.example.toml @@ -0,0 +1,51 @@ +[system] +# Load language from environment variable(It is set by the hook) +language = "${env:DBGPT_LANG:-zh}" +log_level = "INFO" +api_keys = [] +encrypt_key = "your_secret_key" + +# Server Configurations +[service.web] +host = "0.0.0.0" +port = 5670 + +[service.web.database] +type = "sqlite" +path = "pilot/meta_data/dbgpt.db" + +[[serves]] +type = "file" +# Default backend for file server +default_backend = "s3" + +[[serves.backends]] +type = "oss" +endpoint = "https://oss-cn-beijing.aliyuncs.com" +region = "oss-cn-beijing" +access_key_id = "${env:OSS_ACCESS_KEY_ID}" +access_key_secret = "${env:OSS_ACCESS_KEY_SECRET}" +fixed_bucket = "{your_bucket_name}" + +[[serves.backends]] +# Use Tencent COS s3 compatible API as the file server +type = "s3" +endpoint = "https://cos.ap-beijing.myqcloud.com" +region = "ap-beijing" +access_key_id = "${env:COS_SECRETID}" +access_key_secret = "${env:COS_SECRETKEY}" +fixed_bucket = "{your_bucket_name}" + +# Model Configurations +[models] +[[models.llms]] +name = "${env:LLM_MODEL_NAME:-gpt-4o}" +provider = "${env:LLM_MODEL_PROVIDER:-proxy/openai}" +api_base = "${env:OPENAI_API_BASE:-https://api.openai.com/v1}" +api_key = "${env:OPENAI_API_KEY}" + +[[models.embeddings]] +name = "${env:EMBEDDING_MODEL_NAME:-text-embedding-3-small}" +provider = "${env:EMBEDDING_MODEL_PROVIDER:-proxy/openai}" +api_url = "${env:EMBEDDING_MODEL_API_URL:-https://api.openai.com/v1/embeddings}" +api_key = "${env:OPENAI_API_KEY}" diff --git a/packages/dbgpt-app/src/dbgpt_app/knowledge/api.py b/packages/dbgpt-app/src/dbgpt_app/knowledge/api.py index b6ce9833d..f51b8ed82 100644 --- a/packages/dbgpt-app/src/dbgpt_app/knowledge/api.py +++ b/packages/dbgpt-app/src/dbgpt_app/knowledge/api.py @@ -393,7 +393,6 @@ async def document_upload( bucket, safe_filename, doc_file.file, - storage_type="distributed", custom_metadata=custom_metadata, ) diff --git a/packages/dbgpt-app/src/dbgpt_app/openapi/api_v1/api_v1.py b/packages/dbgpt-app/src/dbgpt_app/openapi/api_v1/api_v1.py index b87875078..cc611d3d3 100644 --- a/packages/dbgpt-app/src/dbgpt_app/openapi/api_v1/api_v1.py +++ b/packages/dbgpt-app/src/dbgpt_app/openapi/api_v1/api_v1.py @@ -359,7 +359,6 @@ async def file_upload( bucket, file_name, doc_file.file, - storage_type="distributed", custom_metadata=custom_metadata, ) diff --git a/packages/dbgpt-app/src/dbgpt_app/scene/chat_data/chat_excel/excel_analyze/chat.py b/packages/dbgpt-app/src/dbgpt_app/scene/chat_data/chat_excel/excel_analyze/chat.py index 4e7bd6c44..e07fa6418 100644 --- a/packages/dbgpt-app/src/dbgpt_app/scene/chat_data/chat_excel/excel_analyze/chat.py +++ b/packages/dbgpt-app/src/dbgpt_app/scene/chat_data/chat_excel/excel_analyze/chat.py @@ -181,7 +181,6 @@ class ChatExcel(BaseChat): self.fs_client.upload_file, self._bucket, self._database_file_path, - storage_type="distributed", file_id=self._database_file_id, ) return result diff --git a/packages/dbgpt-core/src/dbgpt/core/awel/flow/base.py b/packages/dbgpt-core/src/dbgpt/core/awel/flow/base.py index 6ff975db6..453384e49 100644 --- a/packages/dbgpt-core/src/dbgpt/core/awel/flow/base.py +++ b/packages/dbgpt-core/src/dbgpt/core/awel/flow/base.py @@ -1115,6 +1115,7 @@ def auto_register_resource( alias: Optional[List[str]] = None, tags: Optional[Dict[str, str]] = None, show_in_ui: bool = True, + skip_fields: Optional[List[str]] = None, **decorator_kwargs, ): """Auto register the resource. @@ -1130,6 +1131,8 @@ def auto_register_resource( alias (Optional[List[str]], optional): The alias of the resource. Defaults to None. For compatibility, we can use the alias to register the resource. tags (Optional[Dict[str, str]]): The tags of the resource + show_in_ui (bool): Whether show the resource in UI. + skip_fields (Optional[List[str]]): The fields to skip. """ from dataclasses import fields, is_dataclass @@ -1147,6 +1150,8 @@ def auto_register_resource( parameters: List[Parameter] = [] raw_fields = fields(cls) for i, fd in enumerate(fields_desc_list): + if skip_fields and fd.param_name in skip_fields: + continue param_type = fd.param_type if param_type in TYPE_STRING_TO_TYPE: # Basic type diff --git a/packages/dbgpt-core/src/dbgpt/core/interface/file.py b/packages/dbgpt-core/src/dbgpt/core/interface/file.py index 855043a75..388f43704 100644 --- a/packages/dbgpt-core/src/dbgpt/core/interface/file.py +++ b/packages/dbgpt-core/src/dbgpt/core/interface/file.py @@ -16,6 +16,7 @@ import requests from dbgpt.component import BaseComponent, ComponentType, SystemApp from dbgpt.util.tracer import root_tracer, trace +from ...util import BaseParameters, RegisterParameters from .storage import ( InMemoryStorage, QuerySpec, @@ -116,6 +117,17 @@ class FileMetadata(StorageItem): self._identifier = obj._identifier +@dataclasses.dataclass +class StorageBackendConfig(BaseParameters, RegisterParameters): + """Storage backend configuration""" + + __type__ = "___storage_backend_config___" + + def create_storage(self) -> "StorageBackend": + """Create the storage""" + raise NotImplementedError() + + class FileStorageURI: """File storage URI.""" @@ -489,6 +501,7 @@ class FileStorageClient(BaseComponent): system_app: Optional[SystemApp] = None, storage_system: Optional[FileStorageSystem] = None, save_chunk_size: int = 1024 * 1024, + default_storage_type: Optional[str] = None, ): """Initialize the file storage client.""" super().__init__(system_app=system_app) @@ -503,10 +516,14 @@ class FileStorageClient(BaseComponent): ) } ) + if not default_storage_type: + if storage_system and storage_system.storage_backends: + default_storage_type = list(storage_system.storage_backends.keys())[0] self.system_app = system_app self._storage_system = storage_system self.save_chunk_size = save_chunk_size + self.default_storage_type = default_storage_type def init_app(self, system_app: SystemApp): """Initialize the application.""" @@ -523,7 +540,7 @@ class FileStorageClient(BaseComponent): self, bucket: str, file_path: str, - storage_type: str, + storage_type: Optional[str] = None, custom_metadata: Optional[Dict[str, Any]] = None, file_id: Optional[str] = None, ) -> str: @@ -556,7 +573,7 @@ class FileStorageClient(BaseComponent): bucket: str, file_name: str, file_data: BinaryIO, - storage_type: str, + storage_type: Optional[str] = None, custom_metadata: Optional[Dict[str, Any]] = None, file_id: Optional[str] = None, ) -> str: @@ -575,12 +592,20 @@ class FileStorageClient(BaseComponent): Returns: str: The file URI """ + if not storage_type: + storage_type = self.default_storage_type + if not storage_type: + raise ValueError("Storage type not provided") return self.storage_system.save_file( bucket, file_name, file_data, storage_type, custom_metadata, file_id ) def download_file( - self, uri: str, dest_path: Optional[str] = None, dest_dir: Optional[str] = None + self, + uri: str, + dest_path: Optional[str] = None, + dest_dir: Optional[str] = None, + cache: bool = True, ) -> Tuple[str, FileMetadata]: """Download a file from the storage system. @@ -595,6 +620,7 @@ class FileStorageClient(BaseComponent): uri (str): The file URI dest_path (str, optional): The destination path. Defaults to None. dest_dir (str, optional): The destination directory. Defaults to None. + cache (bool, optional): Whether to cache the file. Defaults to True. Raises: FileNotFoundError: If the file is not found @@ -617,7 +643,7 @@ class FileStorageClient(BaseComponent): os.makedirs(base_path, exist_ok=True) target_path = os.path.join(base_path, file_metadata.file_id + extension) file_hash = file_metadata.file_hash - if os.path.exists(target_path): + if os.path.exists(target_path) and cache: logger.debug(f"File {target_path} already exists, begin hash check") with open(target_path, "rb") as f: if file_hash == calculate_file_hash(f, self.save_chunk_size): diff --git a/packages/dbgpt-core/src/dbgpt/util/module_utils.py b/packages/dbgpt-core/src/dbgpt/util/module_utils.py index d4ac6a612..d8a487512 100644 --- a/packages/dbgpt-core/src/dbgpt/util/module_utils.py +++ b/packages/dbgpt-core/src/dbgpt/util/module_utils.py @@ -253,6 +253,26 @@ class ModelScanner(Generic[T]): for key, value in scanned_items.items(): self._registered_items[key] = value + child_items = {} + for key, value in self._registered_items.items(): + if hasattr(value, "__scan_config__"): + _child_scanner = ModelScanner() + _child_config = value.__scan_config__ + if not isinstance(_child_config, ScannerConfig): + continue + if ( + hasattr(value, "__is_already_scanned__") + and value.__is_already_scanned__ + ): + continue + try: + _child_scanner.scan_and_register(_child_config) + child_items.update(_child_scanner.get_registered_items()) + value.__is_already_scanned__ = True + except Exception as e: + logger.warning(f"Error scanning child module {key}: {str(e)}") + self._registered_items.update(child_items) + except ImportError as e: logger.warning(f"Error importing module {config.module_path}: {str(e)}") diff --git a/packages/dbgpt-ext/pyproject.toml b/packages/dbgpt-ext/pyproject.toml index a56dd4fc3..387007aad 100644 --- a/packages/dbgpt-ext/pyproject.toml +++ b/packages/dbgpt-ext/pyproject.toml @@ -75,6 +75,13 @@ storage_chromadb = [ storage_elasticsearch = ["elasticsearch"] storage_obvector = ["pyobvector"] +file_oss = [ + "oss2" # Aliyun OSS +] +file_s3 = [ + "boto3" +] + [tool.uv] managed = true dev-dependencies = [ diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/file/__init__.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/__init__.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/config.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/config.py new file mode 100644 index 000000000..710c62ca6 --- /dev/null +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/config.py @@ -0,0 +1,102 @@ +from dataclasses import dataclass, field +from typing import Optional + +from dbgpt.core.interface.file import StorageBackend, StorageBackendConfig +from dbgpt.util.i18n_utils import _ + + +@dataclass +class OSSStorageConfig(StorageBackendConfig): + __type__ = "oss" + endpoint: str = field( + metadata={ + "help": _( + "The endpoint of the OSS server. " + "e.g. https://oss-cn-hangzhou.aliyuncs.com" + ) + }, + ) + region: str = field( + metadata={"help": _("The region of the OSS server. e.g. cn-hangzhou")}, + ) + access_key_id: Optional[str] = field( + default=None, + metadata={ + "help": _( + "The access key ID of the OSS server. You can also set it in the " + "environment variable OSS_ACCESS_KEY_ID" + ), + "tags": "privacy", + }, + ) + access_key_secret: Optional[str] = field( + default=None, + metadata={ + "help": _( + "The access key secret of the OSS server. You can also set it in the " + "environment variable OSS_ACCESS_KEY_SECRET" + ), + "tags": "privacy", + }, + ) + use_environment_credentials: Optional[bool] = field( + default=False, + metadata={ + "help": _( + "Whether to use the environment variables OSS_ACCESS_KEY_ID and " + "OSS_ACCESS_KEY_SECRET as the credentials. Default is False." + ), + }, + ) + fixed_bucket: Optional[str] = field( + default=None, + metadata={ + "help": _( + "The fixed bucket name to use. If set, all logical buckets in DB-GPT " + "will be mapped to this bucket. We suggest you set this value to avoid " + "bucket name conflicts." + ) + }, + ) + bucket_prefix: Optional[str] = field( + default="dbgpt-fs-", + metadata={ + "help": _( + "The prefix of the bucket name. If set, all logical buckets in DB-GPT " + "will be prefixed with this value. Just work when fixed_bucket is None." + ) + }, + ) + auto_create_bucket: Optional[bool] = field( + default=True, + metadata={ + "help": _( + "Whether to create the bucket automatically if it does not exist. " + "If set to False, the bucket must exist before using it." + ) + }, + ) + save_chunk_size: Optional[int] = field( + default=1024 * 1024, + metadata={ + "help": _( + "The chunk size when saving the file. When the file is larger 10x than " + "this value, it will be uploaded in multiple parts. Default is 1M." + ) + }, + ) + + def create_storage(self) -> StorageBackend: + from .oss_storage import AliyunOSSStorage + + return AliyunOSSStorage( + endpoint=self.endpoint, + region=self.region, + access_key_id=self.access_key_id, + access_key_secret=self.access_key_secret, + use_environment_credentials=self.use_environment_credentials, + fixed_bucket=self.fixed_bucket, + bucket_prefix=self.bucket_prefix, + auto_create_bucket=self.auto_create_bucket, + save_chunk_size=self.save_chunk_size, + ) diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/oss_storage.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/oss_storage.py new file mode 100644 index 000000000..a530ba60b --- /dev/null +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/oss/oss_storage.py @@ -0,0 +1,484 @@ +"""Aliyun OSS storage backend.""" + +import hashlib +import io +import logging +import os +import random +import time +from typing import BinaryIO, Callable, Dict, Optional, Union + +import oss2 +from oss2.credentials import EnvironmentVariableCredentialsProvider + +from dbgpt.core.interface.file import FileMetadata, StorageBackend + +logger = logging.getLogger(__name__) + + +def does_bucket_exist(bucket): + try: + bucket.get_bucket_info() + except oss2.exceptions.NoSuchBucket: + return False + except: + raise + return True + + +class AliyunOSSStorage(StorageBackend): + """Aliyun OSS storage backend implementation.""" + + storage_type: str = "oss" + + def __init__( + self, + endpoint: str, + region: str, + access_key_id: Optional[str] = None, + access_key_secret: Optional[str] = None, + save_chunk_size: int = 1024 * 1024, + use_environment_credentials: bool = False, + fixed_bucket: Optional[str] = None, + bucket_prefix: str = "dbgpt-fs-", + bucket_mapper: Optional[Callable[[str], str]] = None, + auto_create_bucket: bool = True, + ): + """Initialize the Aliyun OSS storage backend. + + Args: + endpoint (str): OSS endpoint, e.g., "https://oss-cn-hangzhou.aliyuncs.com" + region (str): OSS region, e.g., "cn-hangzhou" + access_key_id (Optional[str], optional): Aliyun Access Key ID. Defaults to + None. + access_key_secret (Optional[str], optional): Aliyun Access Key Secret. + Defaults to None. + save_chunk_size (int, optional): Chunk size for saving files. Defaults to + 1024*1024 (1MB). + use_environment_credentials (bool, optional): Whether to use credentials + from environment variables. Defaults to False. + fixed_bucket (Optional[str], optional): A fixed OSS bucket to use for all + operations. If provided, all logical buckets will be mapped to this + single bucket. Defaults to None. + bucket_prefix (str, optional): Prefix for dynamically created buckets. + Defaults to "dbgpt-fs-". + bucket_mapper (Optional[Callable[[str], str]], optional): Custom function + to map logical bucket names to actual OSS bucket names. Defaults to + None. + auto_create_bucket (bool, optional): Whether to automatically create + buckets that don't exist. Defaults to True. + """ + self.endpoint = endpoint + self.region = region + self._save_chunk_size = save_chunk_size + self.fixed_bucket = fixed_bucket + self.bucket_prefix = bucket_prefix + self.custom_bucket_mapper = bucket_mapper + self.auto_create_bucket = auto_create_bucket + + # Initialize OSS authentication + if use_environment_credentials: + # Check required environment variables + required_env_vars = ["OSS_ACCESS_KEY_ID", "OSS_ACCESS_KEY_SECRET"] + for var in required_env_vars: + if var not in os.environ: + raise ValueError(f"Environment variable {var} is not set.") + self.auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider()) + else: + if not access_key_id or not access_key_secret: + raise ValueError( + "Access key ID and secret are required when not using environment " + "credentials" + ) + # Use provided credentials + self.auth = oss2.Auth(access_key_id, access_key_secret) + + # Store buckets dict to avoid recreating bucket objects + self._buckets: Dict[str, oss2.Bucket] = {} + + # Create fixed bucket if specified + if self.fixed_bucket and self.auto_create_bucket: + self._ensure_bucket_exists(self.fixed_bucket) + + @property + def save_chunk_size(self) -> int: + """Get the save chunk size.""" + return self._save_chunk_size + + def _map_bucket_name(self, logical_bucket: str) -> str: + """Map logical bucket name to actual OSS bucket name. + + Args: + logical_bucket (str): Logical bucket name used by the application + + Returns: + str: Actual OSS bucket name to use + """ + # 1. If using a fixed bucket, always return that + if self.fixed_bucket: + return self.fixed_bucket + + # 2. If a custom mapper is provided, use that + if self.custom_bucket_mapper: + return self.custom_bucket_mapper(logical_bucket) + + # 3. Otherwise, use a hash-based approach to generate a unique but + # deterministic name + # This avoids bucket name conflicts while maintaining consistency + bucket_hash = hashlib.md5(logical_bucket.encode()).hexdigest()[:8] + return f"{self.bucket_prefix}{bucket_hash}-{logical_bucket}" + + def _generate_dynamic_bucket_name(self) -> str: + """Generate a unique bucket name for dynamic creation. + + Returns: + str: A unique bucket name + """ + # Using timestamp + random number to ensure uniqueness + timestamp = int(time.time()) + random_number = random.randint(0, 9999) + return f"{self.bucket_prefix}{timestamp}-{random_number}" + + def _ensure_bucket_exists(self, bucket_name: str) -> bool: + """Ensure the bucket exists, create it if needed and if auto_create_bucket is + True. + + Args: + bucket_name (str): Bucket name + + Returns: + bool: True if the bucket exists or was created, False otherwise + """ + bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name, region=self.region) + + try: + if does_bucket_exist(bucket): + return True + + if not self.auto_create_bucket: + logger.warning( + f"Bucket {bucket_name} does not exist and auto_create_bucket is " + f"False" + ) + return False + + logger.info(f"Creating bucket {bucket_name}") + bucket.create_bucket(oss2.models.BUCKET_ACL_PRIVATE) + return True + except oss2.exceptions.ServerError as e: + # Handle the case where bucket name is already taken by someone else + if e.status == 409 and "BucketAlreadyExists" in str(e): + logger.warning( + f"Bucket name {bucket_name} already exists and is owned by " + "someone else" + ) + return False + raise + except oss2.exceptions.OssError as e: + logger.error(f"Failed to create or check bucket {bucket_name}: {e}") + raise + + def _get_bucket(self, logical_bucket: str) -> Union[oss2.Bucket, None]: + """Get or create an OSS bucket object for the given logical bucket. + + Args: + logical_bucket (str): Logical bucket name + + Returns: + Union[oss2.Bucket, None]: Bucket object or None if bucket creation failed + """ + # Get the actual OSS bucket name + actual_bucket_name = self._map_bucket_name(logical_bucket) + + # Check if we've already cached this bucket + if actual_bucket_name in self._buckets: + return self._buckets[actual_bucket_name] + + # Try to ensure the mapped bucket exists + if self._ensure_bucket_exists(actual_bucket_name): + # Cache and return the bucket + self._buckets[actual_bucket_name] = oss2.Bucket( + self.auth, self.endpoint, actual_bucket_name, region=self.region + ) + return self._buckets[actual_bucket_name] + + # If we get here, the bucket doesn't exist and couldn't be created + # Try to create a dynamic bucket if we're not using a fixed bucket + if not self.fixed_bucket and self.auto_create_bucket: + # Generate a new unique bucket name + dynamic_bucket = self._generate_dynamic_bucket_name() + logger.info( + f"Attempting to create dynamic bucket {dynamic_bucket} for logical " + f"bucket {logical_bucket}" + ) + + if self._ensure_bucket_exists(dynamic_bucket): + self._buckets[actual_bucket_name] = oss2.Bucket( + self.auth, self.endpoint, dynamic_bucket, region=self.region + ) + return self._buckets[actual_bucket_name] + + # If all attempts failed + raise ValueError( + f"Failed to get or create bucket for logical bucket {logical_bucket}" + ) + + def save(self, bucket: str, file_id: str, file_data: BinaryIO) -> str: + """Save the file data to Aliyun OSS. + + Args: + bucket (str): The logical bucket name + file_id (str): The file ID + file_data (BinaryIO): The file data + + Returns: + str: The storage path (OSS URI) + """ + # Get the actual OSS bucket + oss_bucket = self._get_bucket(bucket) + + # Generate OSS object name based on whether we're using fixed bucket + object_name = file_id + if self.fixed_bucket: + # When using a fixed bucket, we need to prefix with logical bucket name to + # avoid conflicts + object_name = f"{bucket}/{file_id}" + + # For large files, use multipart upload + file_size = self._get_file_size(file_data) + + if file_size > 10 * self.save_chunk_size: # If file is larger than 10MB + logger.info( + f"Using multipart upload for large file: {object_name} " + f"(size: {file_size})" + ) + self._multipart_upload(oss_bucket, object_name, file_data) + else: + logger.info(f"Uploading file using simple upload: {object_name}") + try: + oss_bucket.put_object(object_name, file_data) + except oss2.exceptions.OssError as e: + logger.error( + f"Failed to upload file {object_name} to bucket " + f"{oss_bucket.bucket_name}: {e}" + ) + raise + + # Store the OSS bucket name and object path for future reference + actual_bucket_name = oss_bucket.bucket_name + + # Format: oss://{actual_bucket_name}/{object_name} + # We store both the actual bucket name and the object path in the URI + # But we'll also keep the logical bucket in the external URI format + return f"oss://{bucket}/{file_id}?actual_bucket={actual_bucket_name}&object_name={object_name}" # noqa + + def _get_file_size(self, file_data: BinaryIO) -> int: + """Get file size without consuming the file object. + + Args: + file_data (BinaryIO): The file data + + Returns: + int: The file size in bytes + """ + current_pos = file_data.tell() + file_data.seek(0, io.SEEK_END) + size = file_data.tell() + file_data.seek(current_pos) # Reset the file pointer + return size + + def _multipart_upload( + self, oss_bucket: oss2.Bucket, file_id: str, file_data: BinaryIO + ) -> None: + """Handle multipart upload for large files. + + Args: + oss_bucket (oss2.Bucket): OSS bucket object + file_id (str): The file ID + file_data (BinaryIO): The file data + """ + # Initialize multipart upload + upload_id = oss_bucket.init_multipart_upload(file_id).upload_id + + # Upload parts + part_number = 1 + parts = [] + + while True: + chunk = file_data.read(self.save_chunk_size) + if not chunk: + break + + # Upload part + etag = oss_bucket.upload_part(file_id, upload_id, part_number, chunk).etag + parts.append(oss2.models.PartInfo(part_number, etag)) + part_number += 1 + + # Complete multipart upload + oss_bucket.complete_multipart_upload(file_id, upload_id, parts) + + def _parse_storage_path(self, storage_path: str) -> Dict[str, str]: + """Parse the OSS storage path to extract actual bucket and object name. + + Args: + storage_path (str): The storage path URI + + Returns: + Dict[str, str]: A dictionary with actual_bucket and object_name keys + """ + if not storage_path.startswith("oss://"): + raise ValueError(f"Invalid storage path for Aliyun OSS: {storage_path}") + + # Example URI: + # oss://logical_bucket/file_id?actual_bucket=oss_bucket&object_name=logical_bucket/file_id # noqa + + # Try to parse the URL parameters + from urllib.parse import parse_qs, urlparse + + parsed_url = urlparse(storage_path) + params = parse_qs(parsed_url.query) + + # Extract the parameters + actual_bucket = params.get("actual_bucket", [None])[0] + object_name = params.get("object_name", [None])[0] + + # Extract the logical bucket and file_id from the path + path_parts = parsed_url.path.strip("/").split("/", 1) + logical_bucket = path_parts[0] if path_parts else None + logical_file_id = path_parts[1] if len(path_parts) > 1 else None + + # If parameters aren't in the URL (backward compatibility or simplified URL), + # derive them from the logical values + if not actual_bucket: + # Try to use the bucket mapper to get the actual bucket + actual_bucket = ( + self._map_bucket_name(logical_bucket) if logical_bucket else None + ) + + if not object_name: + # If using fixed bucket, the object name includes the logical bucket + # as prefix + if self.fixed_bucket: + object_name = ( + f"{logical_bucket}/{logical_file_id}" + if logical_bucket and logical_file_id + else None + ) + else: + object_name = logical_file_id + + return { + "logical_bucket": logical_bucket, + "logical_file_id": logical_file_id, + "actual_bucket": actual_bucket, + "object_name": object_name, + } + + def load(self, fm: FileMetadata) -> BinaryIO: + """Load the file data from Aliyun OSS. + + Args: + fm (FileMetadata): The file metadata + + Returns: + BinaryIO: The file data as a binary IO object + """ + # Parse the storage path + path_info = self._parse_storage_path(fm.storage_path) + + # Get actual bucket and object name + actual_bucket_name = path_info["actual_bucket"] + object_name = path_info["object_name"] + logical_bucket = path_info["logical_bucket"] + + # If we couldn't determine the actual bucket from the URI, try with the + # logical bucket + if not actual_bucket_name and logical_bucket: + actual_bucket_name = self._map_bucket_name(logical_bucket) + + # Use the file_id as object name if object_name is still None + if not object_name: + object_name = fm.file_id + # If using fixed bucket, prefix with logical bucket + if self.fixed_bucket and logical_bucket: + object_name = f"{logical_bucket}/{fm.file_id}" + + # Get the bucket object + try: + oss_bucket = oss2.Bucket( + self.auth, self.endpoint, actual_bucket_name, region=self.region + ) + + # Get object as stream + object_stream = oss_bucket.get_object(object_name) + + # Convert to BytesIO for compatibility + content = io.BytesIO(object_stream.read()) + content.seek(0) + return content + except oss2.exceptions.NoSuchKey as e: + logger.error( + f"File {object_name} not found in bucket {actual_bucket_name}: {e}" + ) + raise FileNotFoundError( + f"File {object_name} not found in bucket {actual_bucket_name}" + ) + except oss2.exceptions.OssError as e: + logger.error( + f"Failed to download file {object_name} from bucket " + f"{actual_bucket_name}: {e}" + ) + raise + + def delete(self, fm: FileMetadata) -> bool: + """Delete the file data from Aliyun OSS. + + Args: + fm (FileMetadata): The file metadata + + Returns: + bool: True if the file was deleted, False otherwise + """ + # Parse the storage path + path_info = self._parse_storage_path(fm.storage_path) + + # Get actual bucket and object name + actual_bucket_name = path_info["actual_bucket"] + object_name = path_info["object_name"] + logical_bucket = path_info["logical_bucket"] + + # If we couldn't determine the actual bucket from the URI, try with the + # logical bucket + if not actual_bucket_name and logical_bucket: + actual_bucket_name = self._map_bucket_name(logical_bucket) + + # Use the file_id as object name if object_name is still None + if not object_name: + object_name = fm.file_id + # If using fixed bucket, prefix with logical bucket + if self.fixed_bucket and logical_bucket: + object_name = f"{logical_bucket}/{fm.file_id}" + + try: + # Get the bucket object + oss_bucket = oss2.Bucket( + self.auth, self.endpoint, actual_bucket_name, region=self.region + ) + + # Check if the object exists + if not oss_bucket.object_exists(object_name): + logger.warning( + f"File {object_name} does not exist in bucket {actual_bucket_name}" + ) + return False + + # Delete the object + oss_bucket.delete_object(object_name) + logger.info(f"File {object_name} deleted from bucket {actual_bucket_name}") + return True + except oss2.exceptions.OssError as e: + logger.error( + f"Failed to delete file {object_name} from bucket {actual_bucket_name}:" + f" {e}" + ) + return False diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/__init__.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/config.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/config.py new file mode 100644 index 000000000..cee8f9da3 --- /dev/null +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/config.py @@ -0,0 +1,118 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +from dbgpt.core.interface.file import StorageBackend, StorageBackendConfig +from dbgpt.util.i18n_utils import _ + + +@dataclass +class S3StorageConfig(StorageBackendConfig): + __type__ = "s3" + endpoint: str = field( + metadata={ + "help": _( + "The endpoint of the s3 server. e.g. https://s3.us-east-1.amazonaws.com" + ) + }, + ) + region: str = field( + metadata={"help": _("The region of the s3 server. e.g. us-east-1")}, + ) + access_key_id: Optional[str] = field( + default=None, + metadata={ + "help": _( + "The access key ID of the s3 server. You can also set it in the " + "environment variable AWS_ACCESS_KEY_ID" + ), + "tags": "privacy", + }, + ) + access_key_secret: Optional[str] = field( + default=None, + metadata={ + "help": _( + "The access key secret of the s3 server. You can also set it in the " + "environment variable AWS_SECRET_ACCESS_KEY" + ), + "tags": "privacy", + }, + ) + use_environment_credentials: Optional[bool] = field( + default=False, + metadata={ + "help": _( + "Whether to use the environment variables AWS_ACCESS_KEY_ID and " + "AWS_SECRET_ACCESS_KEY as the credentials. Default is False." + ), + }, + ) + fixed_bucket: Optional[str] = field( + default=None, + metadata={ + "help": _( + "The fixed bucket name to use. If set, all logical buckets in DB-GPT " + "will be mapped to this bucket. We suggest you set this value to avoid " + "bucket name conflicts." + ) + }, + ) + bucket_prefix: Optional[str] = field( + default="dbgpt-fs-", + metadata={ + "help": _( + "The prefix of the bucket name. If set, all logical buckets in DB-GPT " + "will be prefixed with this value. Just work when fixed_bucket is None." + ) + }, + ) + auto_create_bucket: Optional[bool] = field( + default=True, + metadata={ + "help": _( + "Whether to create the bucket automatically if it does not exist. " + "If set to False, the bucket must exist before using it." + ) + }, + ) + save_chunk_size: Optional[int] = field( + default=1024 * 1024, + metadata={ + "help": _( + "The chunk size when saving the file. When the file is larger 10x than " + "this value, it will be uploaded in multiple parts. Default is 1M." + ) + }, + ) + signature_version: Optional[str] = field( + default=None, + metadata={ + "help": _( + "The signature version of the s3 server. " + "e.g. s3v4, s3v2, None (default)" + ) + }, + ) + s3_config: Optional[Dict[str, Any]] = field( + default_factory=dict, + metadata={ + "help": _("The additional configuration for the S3 client."), + }, + ) + + def create_storage(self) -> StorageBackend: + from .s3_storage import S3Storage + + return S3Storage( + endpoint_url=self.endpoint, + region_name=self.region, + access_key_id=self.access_key_id, + secret_access_key=self.access_key_secret, + use_environment_credentials=self.use_environment_credentials, + fixed_bucket=self.fixed_bucket, + bucket_prefix=self.bucket_prefix, + auto_create_bucket=self.auto_create_bucket, + save_chunk_size=self.save_chunk_size, + signature_version=self.signature_version, + s3_config=self.s3_config, + ) diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/s3_storage.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/s3_storage.py new file mode 100644 index 000000000..22b460579 --- /dev/null +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/file/s3/s3_storage.py @@ -0,0 +1,589 @@ +"""S3 compatible storage backend.""" + +import hashlib +import io +import logging +import os +import random +import time +from typing import BinaryIO, Callable, Dict, Optional, Union +from urllib.parse import parse_qs, urlparse + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError + +from dbgpt.core.interface.file import FileMetadata, StorageBackend + +logger = logging.getLogger(__name__) + + +class S3Storage(StorageBackend): + """S3 compatible storage backend implementation.""" + + storage_type: str = "s3" + + def __init__( + self, + endpoint_url: str, + region_name: str, + access_key_id: str, + secret_access_key: str, + save_chunk_size: int = 1024 * 1024, + use_environment_credentials: bool = False, + fixed_bucket: Optional[str] = None, + bucket_prefix: str = "dbgpt-fs-", + bucket_mapper: Optional[Callable[[str], str]] = None, + auto_create_bucket: bool = True, + signature_version: Optional[str] = None, + s3_config: Optional[Dict[str, Union[str, int]]] = None, + ): + """Initialize the S3 compatible storage backend. + + Args: + endpoint_url (str): S3 endpoint URL, e.g., + "https://s3.us-east-1.amazonaws.com" + region_name (str): S3 region, e.g., "us-east-1" + access_key_id (str): AWS/S3 Access Key ID + secret_access_key (str): AWS/S3 Secret Access Key + save_chunk_size (int, optional): Chunk size for saving files. Defaults to + 1024*1024 (1MB). + use_environment_credentials (bool, optional): Whether to use credentials + from environment variables. Defaults to False. + fixed_bucket (Optional[str], optional): A fixed S3 bucket to use for all + operations. If provided, all logical buckets will be mapped to this + single bucket. Defaults to None. + bucket_prefix (str, optional): Prefix for dynamically created buckets. + Defaults to "dbgpt-fs-". + bucket_mapper (Optional[Callable[[str], str]], optional): Custom function + to map logical bucket names to actual S3 bucket names. Defaults to None. + auto_create_bucket (bool, optional): Whether to automatically create + buckets that don't exist. Defaults to True. + signature_version (str, optional): S3 signature version to use. + s3_config (Optional[Dict[str, Union[str, int]]], optional): Additional + S3 configuration options. Defaults to None. + """ + self.endpoint_url = endpoint_url + self.region_name = region_name + self._save_chunk_size = save_chunk_size + self.fixed_bucket = fixed_bucket + self.bucket_prefix = bucket_prefix + self.custom_bucket_mapper = bucket_mapper + self.auto_create_bucket = auto_create_bucket + self.signature_version = signature_version + + # Build S3 client configuration + if not s3_config: + s3_config = { + "s3": { + # Use virtual addressing style + "addressing_style": "virtual", + }, + "signature_version": signature_version or "v4", + } + if "request_checksum_calculation" not in s3_config: + s3_config["request_checksum_calculation"] = "when_required" + if "response_checksum_validation" not in s3_config: + s3_config["response_checksum_validation"] = "when_required" + config = Config(**s3_config) + + # Initialize S3 authentication + if use_environment_credentials: + # Check required environment variables + required_env_vars = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] + for var in required_env_vars: + if var not in os.environ: + raise ValueError(f"Environment variable {var} is not set.") + + # Use environment credentials + self.s3_client = boto3.client( + "s3", + endpoint_url=self.endpoint_url, + region_name=self.region_name, + config=config, + ) + else: + if not access_key_id or not secret_access_key: + raise ValueError( + "Access key ID and secret are required when not using environment " + "credentials" + ) + # Use provided credentials + self.s3_client = boto3.client( + "s3", + endpoint_url=self.endpoint_url, + region_name=self.region_name, + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + config=config, + ) + + # Create fixed bucket if specified + if self.fixed_bucket and self.auto_create_bucket: + self._ensure_bucket_exists(self.fixed_bucket) + + @property + def save_chunk_size(self) -> int: + """Get the save chunk size.""" + return self._save_chunk_size + + def _map_bucket_name(self, logical_bucket: str) -> str: + """Map logical bucket name to actual S3 bucket name. + + Args: + logical_bucket (str): Logical bucket name used by the application + + Returns: + str: Actual S3 bucket name to use + """ + # 1. If using a fixed bucket, always return that + if self.fixed_bucket: + return self.fixed_bucket + + # 2. If a custom mapper is provided, use that + if self.custom_bucket_mapper: + return self.custom_bucket_mapper(logical_bucket) + + # 3. Otherwise, use a hash-based approach to generate a unique but + # deterministic name + # This avoids bucket name conflicts while maintaining consistency + bucket_hash = hashlib.md5(logical_bucket.encode()).hexdigest()[:8] + return f"{self.bucket_prefix}{bucket_hash}-{logical_bucket}" + + def _generate_dynamic_bucket_name(self) -> str: + """Generate a unique bucket name for dynamic creation. + + Returns: + str: A unique bucket name + """ + # Using timestamp + random number to ensure uniqueness + timestamp = int(time.time()) + random_number = random.randint(0, 9999) + return f"{self.bucket_prefix}{timestamp}-{random_number}" + + def _ensure_bucket_exists(self, bucket_name: str) -> bool: + """Ensure the bucket exists, create it if needed and if auto_create_bucket is + True. + + Args: + bucket_name (str): Bucket name + + Returns: + bool: True if the bucket exists or was created, False otherwise + """ + try: + # Check if bucket exists + self.s3_client.head_bucket(Bucket=bucket_name) + logger.info(f"Bucket {bucket_name} exists") + return True + except ClientError as e: + error_code = e.response.get("Error", {}).get("Code") + error_msg = str(e) + + logger.info( + f"Bucket check failed with error_code={error_code}, msg={error_msg}" + ) + + # Bucket doesn't exist or we don't have permission to access it + if error_code in ["404", "403", "NoSuchBucket", "Forbidden"]: + if not self.auto_create_bucket: + logger.warning( + f"Bucket {bucket_name} does not exist and auto_create_bucket " + "is False" + ) + return False + + # Create bucket + try: + logger.info(f"Creating bucket {bucket_name}") + + # Try different creation methods to adapt to different + # S3-compatible APIs + creation_methods = [ + # Method 1: Use LocationConstraint + lambda: self.s3_client.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={ + "LocationConstraint": self.region_name + }, + ), + # Method 2: Without LocationConstraint + lambda: self.s3_client.create_bucket(Bucket=bucket_name), + # Method 3: Use empty CreateBucketConfiguration + lambda: self.s3_client.create_bucket( + Bucket=bucket_name, CreateBucketConfiguration={} + ), + ] + + # Try different creation methods + last_error = None + for create_method in creation_methods: + try: + create_method() + logger.info(f"Successfully created bucket {bucket_name}") + return True + except ClientError as method_error: + logger.info( + f"Bucket creation method failed: {method_error}" + ) + last_error = method_error + continue + + # If all methods failed, raise the last error + if last_error: + raise last_error + + return False + + except ClientError as create_error: + # Handle the case where bucket name is already taken by someone else + logger.error( + f"Failed to create bucket {bucket_name}: {create_error}" + ) + if "BucketAlreadyExists" in str(create_error): + logger.warning( + f"Bucket name {bucket_name} already exists and is owned by " + "someone else" + ) + return False + else: + # Some other error + logger.error(f"Failed to check bucket {bucket_name}: {e}") + return False + + def save(self, bucket: str, file_id: str, file_data: BinaryIO) -> str: + """Save the file data to S3. + + Args: + bucket (str): The logical bucket name + file_id (str): The file ID + file_data (BinaryIO): The file data + + Returns: + str: The storage path (S3 URI) + """ + # Get the actual S3 bucket + actual_bucket_name = self._map_bucket_name(bucket) + logger.info( + f"Mapped logical bucket '{bucket}' to actual bucket '{actual_bucket_name}'" + ) + + # Ensure bucket exists + bucket_exists = self._ensure_bucket_exists(actual_bucket_name) + + if not bucket_exists: + logger.warning( + f"Could not ensure bucket {actual_bucket_name} exists, trying " + "alternatives" + ) + + # Try to create a dynamic bucket if we're not using a fixed bucket + if not self.fixed_bucket and self.auto_create_bucket: + dynamic_bucket = self._generate_dynamic_bucket_name() + logger.info( + f"Attempting to create dynamic bucket {dynamic_bucket} for logical " + f"bucket {bucket}" + ) + + if self._ensure_bucket_exists(dynamic_bucket): + logger.info(f"Successfully created dynamic bucket {dynamic_bucket}") + actual_bucket_name = dynamic_bucket + else: + error_msg = ( + f"Failed to get or create bucket for logical bucket {bucket}" + ) + logger.error(error_msg) + raise ValueError(error_msg) + else: + error_msg = ( + f"Failed to get or create bucket for logical bucket {bucket}" + ) + logger.error(error_msg) + raise ValueError(error_msg) + + # Generate S3 object key based on whether we're using fixed bucket + object_key = file_id + if self.fixed_bucket: + # When using a fixed bucket, we need to prefix with logical bucket name to + # avoid conflicts + object_key = f"{bucket}/{file_id}" + + # For large files, use multipart upload + file_size = self._get_file_size(file_data) + + if file_size > 10 * self.save_chunk_size: # If file is larger than 10MB + logger.info( + f"Using multipart upload for large file: {object_key} " + f"(size: {file_size})" + ) + self._multipart_upload(actual_bucket_name, object_key, file_data) + else: + logger.info(f"Uploading file using simple upload: {object_key}") + try: + # Reset the file pointer to the beginning + file_data.seek(0) + + # Read the file content into memory + file_content = file_data.read() + + # Use put_object for small files + self.s3_client.put_object( + Bucket=actual_bucket_name, Key=object_key, Body=file_content + ) + except ClientError as e: + logger.error( + f"Failed to upload file {object_key} to bucket " + f"{actual_bucket_name}: {e}" + ) + raise + + # Format: s3://{logical_bucket}/{file_id}?actual_bucket={actual_bucket_name}&object_key={object_key} # noqa + return f"s3://{bucket}/{file_id}?actual_bucket={actual_bucket_name}&object_key={object_key}" # noqa + + def _get_file_size(self, file_data: BinaryIO) -> int: + """Get file size without consuming the file object. + + Args: + file_data (BinaryIO): The file data + + Returns: + int: The file size in bytes + """ + current_pos = file_data.tell() + file_data.seek(0, io.SEEK_END) + size = file_data.tell() + file_data.seek(current_pos) # Reset the file pointer + return size + + def _multipart_upload( + self, bucket_name: str, object_key: str, file_data: BinaryIO + ) -> None: + """Handle multipart upload for large files. + + Args: + bucket_name (str): S3 bucket name + object_key (str): The object key (file path in S3) + file_data (BinaryIO): The file data + """ + # Initialize multipart upload + try: + mpu = self.s3_client.create_multipart_upload( + Bucket=bucket_name, Key=object_key + ) + upload_id = mpu["UploadId"] + + # Upload parts + part_number = 1 + parts = [] + file_data.seek(0) # Make sure we're at the beginning of the file + + while True: + # Read the chunk + chunk = file_data.read(self.save_chunk_size) + if not chunk: + break + + # Upload the part + response = self.s3_client.upload_part( + Bucket=bucket_name, + Key=object_key, + UploadId=upload_id, + PartNumber=part_number, + Body=chunk, + ) + + parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) + + part_number += 1 + + # Complete multipart upload + self.s3_client.complete_multipart_upload( + Bucket=bucket_name, + Key=object_key, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, + ) + except ClientError as e: + logger.error(f"Error in multipart upload: {e}") + # Attempt to abort the multipart upload if it was initialized + if "upload_id" in locals(): + try: + self.s3_client.abort_multipart_upload( + Bucket=bucket_name, Key=object_key, UploadId=upload_id + ) + except ClientError as abort_error: + logger.error(f"Error aborting multipart upload: {abort_error}") + raise + + def _parse_storage_path(self, storage_path: str) -> Dict[str, str]: + """Parse the S3 storage path to extract actual bucket and object key. + + Args: + storage_path (str): The storage path URI + + Returns: + Dict[str, str]: A dictionary with actual_bucket and object_key keys + """ + if not storage_path.startswith("s3://"): + raise ValueError(f"Invalid storage path for S3: {storage_path}") + + # Example URI: + # s3://logical_bucket/file_id?actual_bucket=s3_bucket&object_key=logical_bucket/file_id # noqa + + # Parse the URL + parsed_url = urlparse(storage_path) + params = parse_qs(parsed_url.query) + + # Extract the parameters + actual_bucket = params.get("actual_bucket", [None])[0] + object_key = params.get("object_key", [None])[0] + + # Extract the logical bucket and file_id from the path + path_parts = parsed_url.path.strip("/").split("/", 1) + logical_bucket = path_parts[0] if path_parts else None + logical_file_id = path_parts[1] if len(path_parts) > 1 else None + + # If parameters aren't in the URL (backward compatibility or simplified URL), + # derive them from the logical values + if not actual_bucket: + # Try to use the bucket mapper to get the actual bucket + actual_bucket = ( + self._map_bucket_name(logical_bucket) if logical_bucket else None + ) + + if not object_key: + # If using fixed bucket, the object key includes the logical bucket + # as prefix + if self.fixed_bucket: + object_key = ( + f"{logical_bucket}/{logical_file_id}" + if logical_bucket and logical_file_id + else None + ) + else: + object_key = logical_file_id + + return { + "logical_bucket": logical_bucket, + "logical_file_id": logical_file_id, + "actual_bucket": actual_bucket, + "object_key": object_key, + } + + def load(self, fm: FileMetadata) -> BinaryIO: + """Load the file data from S3. + + Args: + fm (FileMetadata): The file metadata + + Returns: + BinaryIO: The file data as a binary IO object + """ + # Parse the storage path + path_info = self._parse_storage_path(fm.storage_path) + + # Get actual bucket and object key + actual_bucket_name = path_info["actual_bucket"] + object_key = path_info["object_key"] + logical_bucket = path_info["logical_bucket"] + + # If we couldn't determine the actual bucket from the URI, try with the + # logical bucket + if not actual_bucket_name and logical_bucket: + actual_bucket_name = self._map_bucket_name(logical_bucket) + + # Use the file_id as object key if object_key is still None + if not object_key: + object_key = fm.file_id + # If using fixed bucket, prefix with logical bucket + if self.fixed_bucket and logical_bucket: + object_key = f"{logical_bucket}/{fm.file_id}" + + try: + # Get object from S3 + response = self.s3_client.get_object( + Bucket=actual_bucket_name, Key=object_key + ) + + # Read the streaming body into a BytesIO object + content = io.BytesIO() + body = response["Body"] + + # Stream the data in chunks + while True: + chunk = body.read(self.save_chunk_size) + if not chunk: + break + content.write(chunk) + + content.seek(0) + return content + except ClientError as e: + error_code = e.response.get("Error", {}).get("Code") + if error_code == "NoSuchKey": + logger.error( + f"File {object_key} not found in bucket {actual_bucket_name}: {e}" + ) + raise FileNotFoundError( + f"File {object_key} not found in bucket {actual_bucket_name}" + ) + logger.error( + f"Failed to download file {object_key} from bucket " + f"{actual_bucket_name}: {e}" + ) + raise + + def delete(self, fm: FileMetadata) -> bool: + """Delete the file data from S3. + + Args: + fm (FileMetadata): The file metadata + + Returns: + bool: True if the file was deleted, False otherwise + """ + # Parse the storage path + path_info = self._parse_storage_path(fm.storage_path) + + # Get actual bucket and object key + actual_bucket_name = path_info["actual_bucket"] + object_key = path_info["object_key"] + logical_bucket = path_info["logical_bucket"] + + # If we couldn't determine the actual bucket from the URI, try with the + # logical bucket + if not actual_bucket_name and logical_bucket: + actual_bucket_name = self._map_bucket_name(logical_bucket) + + # Use the file_id as object key if object_key is still None + if not object_key: + object_key = fm.file_id + # If using fixed bucket, prefix with logical bucket + if self.fixed_bucket and logical_bucket: + object_key = f"{logical_bucket}/{fm.file_id}" + + try: + # Check if the object exists + try: + self.s3_client.head_object(Bucket=actual_bucket_name, Key=object_key) + except ClientError as e: + error_code = e.response.get("Error", {}).get("Code") + if error_code == "404" or error_code == "NoSuchKey": + logger.warning( + f"File {object_key} does not exist in bucket " + f"{actual_bucket_name}" + ) + return False + raise + + # Delete the object + self.s3_client.delete_object(Bucket=actual_bucket_name, Key=object_key) + + logger.info(f"File {object_key} deleted from bucket {actual_bucket_name}") + return True + except ClientError as e: + logger.error( + f"Failed to delete file {object_key} from bucket {actual_bucket_name}:" + f" {e}" + ) + return False diff --git a/packages/dbgpt-serve/src/dbgpt_serve/file/api/endpoints.py b/packages/dbgpt-serve/src/dbgpt_serve/file/api/endpoints.py index e751e39ed..d0c5149e4 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/file/api/endpoints.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/file/api/endpoints.py @@ -120,7 +120,6 @@ async def upload_files( global_system_app, service.upload_files, bucket, - "distributed", files, user_name, sys_code, diff --git a/packages/dbgpt-serve/src/dbgpt_serve/file/config.py b/packages/dbgpt-serve/src/dbgpt_serve/file/config.py index 40df2cb7a..ac25396dc 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/file/config.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/file/config.py @@ -1,12 +1,14 @@ from dataclasses import dataclass, field -from typing import Optional +from typing import List, Optional from dbgpt.core.awel.flow import ( TAGS_ORDER_HIGH, ResourceCategory, auto_register_resource, ) +from dbgpt.core.interface.file import StorageBackendConfig from dbgpt.util.i18n_utils import _ +from dbgpt.util.module_utils import ScannerConfig from dbgpt_serve.core import BaseServeConfig APP_NAME = "file" @@ -27,6 +29,7 @@ SERVER_APP_TABLE_NAME = "dbgpt_serve_file" "files in the file server." ), show_in_ui=False, + skip_fields=["backends"], ) @dataclass class ServeConfig(BaseServeConfig): @@ -34,6 +37,13 @@ class ServeConfig(BaseServeConfig): __type__ = APP_NAME + __scan_config__ = ScannerConfig( + module_path="dbgpt_ext.storage.file", + base_class=StorageBackendConfig, + recursive=True, + specific_files=["config"], + ) + check_hash: Optional[bool] = field( default=True, metadata={"help": _("Check the hash of the file when downloading")}, @@ -62,6 +72,14 @@ class ServeConfig(BaseServeConfig): local_storage_path: Optional[str] = field( default=None, metadata={"help": _("The local storage path")} ) + default_backend: Optional[str] = field( + default=None, + metadata={"help": _("The default storage backend")}, + ) + backends: List[StorageBackendConfig] = field( + default_factory=list, + metadata={"help": _("The storage backend configurations")}, + ) def get_node_address(self) -> str: """Get the node address""" diff --git a/packages/dbgpt-serve/src/dbgpt_serve/file/serve.py b/packages/dbgpt-serve/src/dbgpt_serve/file/serve.py index e19f78205..7b08daa1f 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/file/serve.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/file/serve.py @@ -88,6 +88,7 @@ class Serve(BaseServe): FileMetadataAdapter(), serializer, ) + default_backend = self._serve_config.default_backend simple_distributed_storage = SimpleDistributedStorage( node_address=self._serve_config.get_node_address(), local_storage_path=self._serve_config.get_local_storage_path(), @@ -98,6 +99,15 @@ class Serve(BaseServe): storage_backends = { simple_distributed_storage.storage_type: simple_distributed_storage, } + for backend_config in self._serve_config.backends: + storage_backend = backend_config.create_storage() + storage_backends[storage_backend.storage_type] = storage_backend + if not default_backend: + # First backend is the default backend + default_backend = storage_backend.storage_type + if not default_backend: + default_backend = simple_distributed_storage.storage_type + fs = FileStorageSystem( storage_backends, metadata_storage=storage, @@ -107,6 +117,7 @@ class Serve(BaseServe): system_app=self._system_app, storage_system=fs, save_chunk_size=self._serve_config.save_chunk_size, + default_storage_type=default_backend, ) self._system_app.register_instance(self._file_storage_client) diff --git a/packages/dbgpt-serve/src/dbgpt_serve/file/service/service.py b/packages/dbgpt-serve/src/dbgpt_serve/file/service/service.py index 17d539f89..1d26e88b6 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/file/service/service.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/file/service/service.py @@ -79,7 +79,6 @@ class Service(BaseService[ServeEntity, ServeRequest, ServerResponse]): def upload_files( self, bucket: str, - storage_type: str, files: List[UploadFile], user_name: Optional[str] = None, sys_code: Optional[str] = None, @@ -97,7 +96,6 @@ class Service(BaseService[ServeEntity, ServeRequest, ServerResponse]): bucket, file_name, file_data=file.file, - storage_type=storage_type, custom_metadata=custom_metadata, ) parsed_uri = FileStorageURI.parse(uri) diff --git a/packages/dbgpt-serve/src/dbgpt_serve/rag/service/service.py b/packages/dbgpt-serve/src/dbgpt_serve/rag/service/service.py index 75dfc4dc3..b8a325bbc 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/rag/service/service.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/rag/service/service.py @@ -187,7 +187,6 @@ class Service(BaseService[KnowledgeSpaceEntity, SpaceServeRequest, SpaceServeRes bucket, safe_filename, doc_file.file, - storage_type="distributed", custom_metadata=custom_metadata, ) request.content = file_uri diff --git a/uv.lock b/uv.lock index 072c9e886..96abe226d 100644 --- a/uv.lock +++ b/uv.lock @@ -211,6 +211,28 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/a2/8b/46919127496036c8e990b2b236454a0d8655fd46e1df2fd35610a9cbc842/alembic-1.12.0-py3-none-any.whl", hash = "sha256:03226222f1cf943deee6c85d9464261a6c710cd19b4fe867a3ad1f25afda610f", size = 226041 }, ] +[[package]] +name = "aliyun-python-sdk-core" +version = "2.16.0" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "cryptography" }, + { name = "jmespath" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/3e/09/da9f58eb38b4fdb97ba6523274fbf445ef6a06be64b433693da8307b4bec/aliyun-python-sdk-core-2.16.0.tar.gz", hash = "sha256:651caad597eb39d4fad6cf85133dffe92837d53bdf62db9d8f37dab6508bb8f9", size = 449555 } + +[[package]] +name = "aliyun-python-sdk-kms" +version = "2.16.5" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "aliyun-python-sdk-core" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/a8/2c/9877d0e6b18ecf246df671ac65a5d1d9fecbf85bdcb5d43efbde0d4662eb/aliyun-python-sdk-kms-2.16.5.tar.gz", hash = "sha256:f328a8a19d83ecbb965ffce0ec1e9930755216d104638cd95ecd362753b813b3", size = 12018 } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/11/5c/0132193d7da2c735669a1ed103b142fd63c9455984d48c5a88a1a516efaa/aliyun_python_sdk_kms-2.16.5-py2.py3-none-any.whl", hash = "sha256:24b6cdc4fd161d2942619479c8d050c63ea9cd22b044fe33b60bbb60153786f0", size = 99495 }, +] + [[package]] name = "annotated-types" version = "0.7.0" @@ -668,6 +690,34 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/9a/91/4aea63dccee6491a54c630d9817656a886e086ab97222e2d8101d8cdf894/blis-0.7.11-cp312-cp312-win_amd64.whl", hash = "sha256:5a305dbfc96d202a20d0edd6edf74a406b7e1404f4fa4397d24c68454e60b1b4", size = 6624079 }, ] +[[package]] +name = "boto3" +version = "1.37.13" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "botocore" }, + { name = "jmespath" }, + { name = "s3transfer" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/d6/50/1183ffa4782408907891af344a8e91d7bc5d7a9bae12e43fca8874da567e/boto3-1.37.13.tar.gz", hash = "sha256:295648f887464ab74c5c301a44982df76f9ba39ebfc16be5b8f071ad1a81fe95", size = 111349 } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/a2/64/9f9578142ba1ed3ecc6b82a53c5c4c4352108e1424f1d5d02b6239b4314f/boto3-1.37.13-py3-none-any.whl", hash = "sha256:90fa5a91d7d7456219f0b7c4a93b38335dc5cf4613d885da4d4c1d099e04c6b7", size = 139552 }, +] + +[[package]] +name = "botocore" +version = "1.37.13" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "jmespath" }, + { name = "python-dateutil" }, + { name = "urllib3" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/18/53/3593b438ab1f9b6837cc90a8582dfa71c71c639e9359a01fd4d110f0566e/botocore-1.37.13.tar.gz", hash = "sha256:60dfb831c54eb466db9b91891a6c8a0c223626caa049969d5d42858ad1e7f8c7", size = 13647494 } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/09/43/2aa89ca8ab69196890b0682820469e62d93c4cf402ceb46a3007fd44b0c3/botocore-1.37.13-py3-none-any.whl", hash = "sha256:aa417bac0f4d79533080e6e17c0509e149353aec83cfe7879597a7942f7f08d0", size = 13411385 }, +] + [[package]] name = "bs4" version = "0.0.2" @@ -1266,6 +1316,12 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/b0/5c/3ba7d12e7a79566f97b8f954400926d7b6eb33bcdccc1315a857f200f1f1/crashtest-0.4.1-py3-none-any.whl", hash = "sha256:8d23eac5fa660409f57472e3851dab7ac18aba459a8d19cbbba86d3d5aecd2a5", size = 7558 }, ] +[[package]] +name = "crcmod" +version = "1.7" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/6b/b0/e595ce2a2527e169c3bcd6c33d2473c1918e0b7f6826a043ca1245dd4e5b/crcmod-1.7.tar.gz", hash = "sha256:dc7051a0db5f2bd48665a990d3ec1cc305a466a77358ca4492826f41f283601e", size = 89670 } + [[package]] name = "cryptography" version = "44.0.1" @@ -1855,6 +1911,12 @@ datasource-spark = [ datasource-vertica = [ { name = "vertica-python" }, ] +file-oss = [ + { name = "oss2" }, +] +file-s3 = [ + { name = "boto3" }, +] graph-rag = [ { name = "dbgpt-tugraph-plugins" }, { name = "neo4j" }, @@ -1896,6 +1958,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "boto3", marker = "extra == 'file-s3'" }, { name = "bs4", marker = "extra == 'rag'" }, { name = "chromadb", marker = "extra == 'storage-chromadb'", specifier = ">=0.4.22" }, { name = "clickhouse-connect", marker = "extra == 'datasource-clickhouse'" }, @@ -1910,6 +1973,7 @@ requires-dist = [ { name = "neo4j", marker = "extra == 'graph-rag'" }, { name = "networkx", marker = "extra == 'graph-rag'" }, { name = "onnxruntime", marker = "extra == 'storage-chromadb'", specifier = ">=1.14.1,<=1.18.1" }, + { name = "oss2", marker = "extra == 'file-oss'" }, { name = "pdfplumber", marker = "extra == 'rag'" }, { name = "psycopg2-binary", marker = "extra == 'datasource-postgres'" }, { name = "pyhive", marker = "extra == 'datasource-hive'" }, @@ -3177,6 +3241,15 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/91/61/c80ef80ed8a0a21158e289ef70dac01e351d929a1c30cb0f49be60772547/jiter-0.8.2-cp313-cp313t-win_amd64.whl", hash = "sha256:3ac9f578c46f22405ff7f8b1f5848fb753cc4b8377fbec8470a7dc3997ca7566", size = 202374 }, ] +[[package]] +name = "jmespath" +version = "0.10.0" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/3c/56/3f325b1eef9791759784aa5046a8f6a1aff8f7c898a2e34506771d3b99d8/jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9", size = 21607 } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/07/cb/5f001272b6faeb23c1c9e0acc04d48eaaf5c862c17709d20e3469c6e0139/jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f", size = 24489 }, +] + [[package]] name = "joblib" version = "1.4.2" @@ -5478,6 +5551,20 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/27/f1/1d7ec15b20f8ce9300bc850de1e059132b88990e46cd0ccac29cbf11e4f9/orjson-3.10.15-cp313-cp313-win_amd64.whl", hash = "sha256:fd56a26a04f6ba5fb2045b0acc487a63162a958ed837648c5781e1fe3316cfbf", size = 133444 }, ] +[[package]] +name = "oss2" +version = "2.19.1" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "aliyun-python-sdk-core" }, + { name = "aliyun-python-sdk-kms" }, + { name = "crcmod" }, + { name = "pycryptodome" }, + { name = "requests" }, + { name = "six" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/df/b5/f2cb1950dda46ac2284d6c950489fdacd0e743c2d79a347924d3cc44b86f/oss2-2.19.1.tar.gz", hash = "sha256:a8ab9ee7eb99e88a7e1382edc6ea641d219d585a7e074e3776e9dec9473e59c1", size = 298845 } + [[package]] name = "outlines" version = "0.1.11" @@ -7470,6 +7557,18 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/31/5e/d3a6fdf61f6373e53bfb45d6819a72dfef741bc8a9ff31c64496688e7c39/ruff_lsp-0.0.62-py3-none-any.whl", hash = "sha256:fb6c04a0cb09bb3ae316121b084ff09497edd01df58b36fa431f14515c63029e", size = 20980 }, ] +[[package]] +name = "s3transfer" +version = "0.11.4" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "botocore" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/0f/ec/aa1a215e5c126fe5decbee2e107468f51d9ce190b9763cb649f76bb45938/s3transfer-0.11.4.tar.gz", hash = "sha256:559f161658e1cf0a911f45940552c696735f5c74e64362e515f333ebed87d679", size = 148419 } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/86/62/8d3fc3ec6640161a5649b2cddbbf2b9fa39c92541225b33f117c37c5a2eb/s3transfer-0.11.4-py3-none-any.whl", hash = "sha256:ac265fa68318763a03bf2dc4f39d5cbd6a9e178d81cc9483ad27da33637e320d", size = 84412 }, +] + [[package]] name = "safetensors" version = "0.5.2"