diff --git a/packages/dbgpt-core/src/dbgpt/rag/retriever/time_weighted.py b/packages/dbgpt-core/src/dbgpt/rag/retriever/time_weighted.py index cf150851c..33b8ebff6 100644 --- a/packages/dbgpt-core/src/dbgpt/rag/retriever/time_weighted.py +++ b/packages/dbgpt-core/src/dbgpt/rag/retriever/time_weighted.py @@ -149,7 +149,7 @@ class TimeWeightedEmbeddingRetriever(EmbeddingRetriever): self._save_memory_stream() # Add to vector store - return self._index_store.load_document(dup_docs) + return self._index_store.load_document_with_limit(dup_docs) def _retrieve( self, query: str, filters: Optional[MetadataFilters] = None diff --git a/packages/dbgpt-core/src/dbgpt/storage/base.py b/packages/dbgpt-core/src/dbgpt/storage/base.py index 7017fc3d0..b025d71a7 100644 --- a/packages/dbgpt-core/src/dbgpt/storage/base.py +++ b/packages/dbgpt-core/src/dbgpt/storage/base.py @@ -27,9 +27,16 @@ class IndexStoreConfig(BaseParameters): class IndexStoreBase(ABC): """Index store base class.""" - def __init__(self, executor: Optional[Executor] = None): + def __init__( + self, + executor: Optional[Executor] = None, + max_chunks_once_load: Optional[int] = None, + max_threads: Optional[int] = None, + ): """Init index store.""" self._executor = executor or ThreadPoolExecutor() + self._max_chunks_once_load = max_chunks_once_load or 10 + self._max_threads = max_threads or 1 @abstractmethod def get_config(self) -> IndexStoreConfig: @@ -102,7 +109,10 @@ class IndexStoreBase(ABC): return True def load_document_with_limit( - self, chunks: List[Chunk], max_chunks_once_load: int = 10, max_threads: int = 1 + self, + chunks: List[Chunk], + max_chunks_once_load: Optional[int] = None, + max_threads: Optional[int] = None, ) -> List[str]: """Load document in index database with specified limit. @@ -114,6 +124,8 @@ class IndexStoreBase(ABC): Return: List[str]: Chunk ids. """ + max_chunks_once_load = max_chunks_once_load or self._max_chunks_once_load + max_threads = max_threads or self._max_threads # Group the chunks into chunks of size max_chunks chunk_groups = [ chunks[i : i + max_chunks_once_load] @@ -141,7 +153,10 @@ class IndexStoreBase(ABC): return ids async def aload_document_with_limit( - self, chunks: List[Chunk], max_chunks_once_load: int = 10, max_threads: int = 1 + self, + chunks: List[Chunk], + max_chunks_once_load: Optional[int] = None, + max_threads: Optional[int] = None, ) -> List[str]: """Load document in index database with specified limit. @@ -153,6 +168,8 @@ class IndexStoreBase(ABC): Return: List[str]: Chunk ids. """ + max_chunks_once_load = max_chunks_once_load or self._max_chunks_once_load + max_threads = max_threads or self._max_threads chunk_groups = [ chunks[i : i + max_chunks_once_load] for i in range(0, len(chunks), max_chunks_once_load) diff --git a/packages/dbgpt-core/src/dbgpt/storage/vector_store/base.py b/packages/dbgpt-core/src/dbgpt/storage/vector_store/base.py index a3e64ed8d..dac1f2c6e 100644 --- a/packages/dbgpt-core/src/dbgpt/storage/vector_store/base.py +++ b/packages/dbgpt-core/src/dbgpt/storage/vector_store/base.py @@ -88,6 +88,24 @@ class VectorStoreConfig(IndexStoreConfig, RegisterParameters): ), }, ) + max_chunks_once_load: Optional[int] = field( + default=None, + metadata={ + "help": _( + "The max chunks once load in vector store, " + "if not set, will use the default value 10." + ), + }, + ) + max_threads: Optional[int] = field( + default=None, + metadata={ + "help": _( + "The max threads in vector store, " + "if not set, will use the default value 1." + ), + }, + ) def create_store(self, **kwargs) -> "VectorStoreBase": """Create a new index store from the config.""" @@ -97,9 +115,16 @@ class VectorStoreConfig(IndexStoreConfig, RegisterParameters): class VectorStoreBase(IndexStoreBase, ABC): """Vector store base class.""" - def __init__(self, executor: Optional[ThreadPoolExecutor] = None): + def __init__( + self, + executor: Optional[ThreadPoolExecutor] = None, + max_chunks_once_load: Optional[int] = None, + max_threads: Optional[int] = None, + ): """Initialize vector store.""" - super().__init__(executor) + super().__init__( + executor, max_chunks_once_load=max_chunks_once_load, max_threads=max_threads + ) @abstractmethod def get_config(self) -> VectorStoreConfig: diff --git a/packages/dbgpt-ext/src/dbgpt_ext/rag/assembler/embedding.py b/packages/dbgpt-ext/src/dbgpt_ext/rag/assembler/embedding.py index 6bfccb900..1c9bacfb0 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/rag/assembler/embedding.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/rag/assembler/embedding.py @@ -131,8 +131,8 @@ class EmbeddingAssembler(BaseAssembler): Returns: List[str]: List of chunk ids. """ - max_chunks_once_load = kwargs.get("max_chunks_once_load", 10) - max_threads = kwargs.get("max_threads", 1) + max_chunks_once_load = kwargs.get("max_chunks_once_load") + max_threads = kwargs.get("max_threads") return self._index_store.load_document_with_limit( self._chunks, max_chunks_once_load, max_threads ) @@ -144,8 +144,8 @@ class EmbeddingAssembler(BaseAssembler): List[str]: List of chunk ids. """ # persist chunks into vector store - max_chunks_once_load = kwargs.get("max_chunks_once_load", 10) - max_threads = kwargs.get("max_threads", 1) + max_chunks_once_load = kwargs.get("max_chunks_once_load") + max_threads = kwargs.get("max_threads") return await self._index_store.aload_document_with_limit( self._chunks, max_chunks_once_load, max_threads ) diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py index a518e87be..c28ebaa2b 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py @@ -91,6 +91,8 @@ class ChromaStore(VectorStoreBase): embedding_fn: Optional[Embeddings] = None, chroma_client: Optional["PersistentClient"] = None, # type: ignore # noqa collection_metadata: Optional[dict] = None, + max_chunks_once_load: Optional[int] = None, + max_threads: Optional[int] = None, ) -> None: """Create a ChromaStore instance. @@ -100,8 +102,12 @@ class ChromaStore(VectorStoreBase): embedding_fn(Embeddings): embedding function. chroma_client(PersistentClient): chroma client. collection_metadata(dict): collection metadata. + max_chunks_once_load(int): max chunks once load. + max_threads(int): max threads. """ - super().__init__() + super().__init__( + max_chunks_once_load=max_chunks_once_load, max_threads=max_threads + ) self._vector_store_config = vector_store_config try: from chromadb import PersistentClient, Settings diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/elastic_store.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/elastic_store.py index 8232476fd..7c3dc091d 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/elastic_store.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/elastic_store.py @@ -157,13 +157,17 @@ class ElasticStore(VectorStoreBase): vector_store_config: ElasticsearchStoreConfig, name: Optional[str], embedding_fn: Optional[Embeddings] = None, + max_chunks_once_load: Optional[int] = None, + max_threads: Optional[int] = None, ) -> None: """Create a ElasticsearchStore instance. Args: vector_store_config (ElasticsearchStoreConfig): ElasticsearchStore config. """ - super().__init__() + super().__init__( + max_chunks_once_load=max_chunks_once_load, max_threads=max_threads + ) self._vector_store_config = vector_store_config connect_kwargs = {} diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/milvus_store.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/milvus_store.py index 01203b4d9..8569b2c99 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/milvus_store.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/milvus_store.py @@ -197,6 +197,8 @@ class MilvusStore(VectorStoreBase): vector_store_config: MilvusVectorConfig, name: Optional[str], embedding_fn: Optional[Embeddings] = None, + max_chunks_once_load: Optional[int] = None, + max_threads: Optional[int] = None, ) -> None: """Create a MilvusStore instance. @@ -204,7 +206,9 @@ class MilvusStore(VectorStoreBase): vector_store_config (MilvusVectorConfig): MilvusStore config. refer to https://milvus.io/docs/v2.0.x/manage_connection.md """ - super().__init__() + super().__init__( + max_chunks_once_load=max_chunks_once_load, max_threads=max_threads + ) self._vector_store_config = vector_store_config try: diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/oceanbase_store.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/oceanbase_store.py index 4abedd4a0..3638ebb48 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/oceanbase_store.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/oceanbase_store.py @@ -192,6 +192,8 @@ class OceanBaseStore(VectorStoreBase): vector_store_config: OceanBaseConfig, name: Optional[str], embedding_fn: Optional[Embeddings] = None, + max_chunks_once_load: Optional[int] = None, + max_threads: Optional[int] = None, ) -> None: """Create a OceanBaseStore instance.""" try: @@ -205,7 +207,9 @@ class OceanBaseStore(VectorStoreBase): if vector_store_config.embedding_fn is None: raise ValueError("embedding_fn is required for OceanBaseStore") - super().__init__() + super().__init__( + max_chunks_once_load=max_chunks_once_load, max_threads=max_threads + ) self._vector_store_config = vector_store_config self.embedding_function = embedding_fn diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/pgvector_store.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/pgvector_store.py index 01d266c44..3d0a774c4 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/pgvector_store.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/pgvector_store.py @@ -85,6 +85,8 @@ class PGVectorStore(VectorStoreBase): vector_store_config: PGVectorConfig, name: Optional[str], embedding_fn: Optional[Embeddings] = None, + max_chunks_once_load: Optional[int] = None, + max_threads: Optional[int] = None, ) -> None: """Create a PGVectorStore instance.""" try: @@ -93,7 +95,9 @@ class PGVectorStore(VectorStoreBase): raise ImportError( "Please install the `langchain` package to use the PGVector." ) - super().__init__() + super().__init__( + max_chunks_once_load=max_chunks_once_load, max_threads=max_threads + ) self._vector_store_config = vector_store_config self.connection_string = vector_store_config.connection_string diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/weaviate_store.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/weaviate_store.py index e120a8421..b8be7b98f 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/weaviate_store.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/weaviate_store.py @@ -96,6 +96,8 @@ class WeaviateStore(VectorStoreBase): vector_store_config: WeaviateVectorConfig, name: Optional[str], embedding_fn: Optional[Embeddings] = None, + max_chunks_once_load: Optional[int] = None, + max_threads: Optional[int] = None, ) -> None: """Initialize with Weaviate client.""" try: @@ -105,7 +107,9 @@ class WeaviateStore(VectorStoreBase): "Could not import weaviate python package. " "Please install it with `pip install weaviate-client`." ) - super().__init__() + super().__init__( + max_chunks_once_load=max_chunks_once_load, max_threads=max_threads + ) self._vector_store_config = vector_store_config self.weaviate_url = vector_store_config.weaviate_url diff --git a/packages/dbgpt-serve/src/dbgpt_serve/rag/storage_manager.py b/packages/dbgpt-serve/src/dbgpt_serve/rag/storage_manager.py index 3f7550c91..5f9ed13df 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/rag/storage_manager.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/rag/storage_manager.py @@ -68,7 +68,10 @@ class StorageManager(BaseComponent): embedding_fn = embedding_factory.create() vector_store_config: VectorStoreConfig = storage_config.vector return vector_store_config.create_store( - name=index_name, embedding_fn=embedding_fn + name=index_name, + embedding_fn=embedding_fn, + max_chunks_once_load=vector_store_config.max_chunks_once_load, + max_threads=vector_store_config.max_threads, ) def create_kg_store(