From 4eb06299e8cb73071362b32cf69523ebb811149a Mon Sep 17 00:00:00 2001 From: tam Date: Tue, 6 May 2025 23:23:40 +0800 Subject: [PATCH 1/2] fix(VectorStore): fix task concurrency and batch processing - Refactor task execution to improve concurrency control - Implement batch processing for vector deletion in Chroma store --- packages/dbgpt-core/src/dbgpt/storage/base.py | 14 +++++++---- .../storage/vector_store/chroma_store.py | 25 ++++++++++++++----- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/packages/dbgpt-core/src/dbgpt/storage/base.py b/packages/dbgpt-core/src/dbgpt/storage/base.py index b025d71a7..c52f6ec24 100644 --- a/packages/dbgpt-core/src/dbgpt/storage/base.py +++ b/packages/dbgpt-core/src/dbgpt/storage/base.py @@ -1,5 +1,5 @@ """Index store base class.""" - +import asyncio import logging import time from abc import ABC, abstractmethod @@ -182,9 +182,7 @@ class IndexStoreBase(ABC): for chunk_group in chunk_groups: tasks.append(self.aload_document(chunk_group)) - import asyncio - - results = await asyncio.gather(*tasks) + results = await self._run_tasks_with_concurrency(tasks, max_threads) ids = [] loaded_cnt = 0 @@ -194,7 +192,13 @@ class IndexStoreBase(ABC): logger.info(f"Loaded {loaded_cnt} chunks, total {len(chunks)} chunks.") return ids - + async def _run_tasks_with_concurrency(self, tasks, max_concurrent): + results = [] + for i in range(0, len(tasks), max_concurrent): + batch = tasks[i:i + max_concurrent] + batch_results = await asyncio.gather(*batch, return_exceptions=True) + results.extend(batch_results) + return results def similar_search( self, text: str, topk: int, filters: Optional[MetadataFilters] = None ) -> List[Chunk]: diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py index 3520cc6d5..c2a1f01ed 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py @@ -278,12 +278,25 @@ class ChromaStore(VectorStoreBase): logger.error(f"Error during vector store deletion: {e}") raise - def delete_by_ids(self, ids): - """Delete vector by ids.""" - logger.info(f"begin delete chroma ids: {ids}") - ids = ids.split(",") - if len(ids) > 0: - self._collection.delete(ids=ids) + def delete_by_ids(self, ids, batch_size: int = 1000): + """Delete vector by ids in batches. + + Args: + ids (str): Comma-separated string of IDs to delete. + batch_size (int): Number of IDs per batch. Default is 1000. + """ + logger.info(f"begin delete chroma ids in batches (batch size: {batch_size})") + id_list = ids.split(",") + total = len(id_list) + logger.info(f"Total IDs to delete: {total}") + + for i in range(0, total, batch_size): + batch_ids = id_list[i:i + batch_size] + logger.info(f"Deleting batch {i // batch_size + 1}: {len(batch_ids)} IDs") + try: + self._collection.delete(ids=batch_ids) + except Exception as e: + logger.error(f"Failed to delete batch starting at index {i}: {e}") def truncate(self) -> List[str]: """Truncate data index_name.""" From fc6c9071ffca1ae082085dc2ea75b83147188a51 Mon Sep 17 00:00:00 2001 From: tam Date: Tue, 6 May 2025 23:47:06 +0800 Subject: [PATCH 2/2] format code --- packages/dbgpt-core/src/dbgpt/storage/base.py | 5 ++++- .../src/dbgpt_ext/storage/vector_store/chroma_store.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/dbgpt-core/src/dbgpt/storage/base.py b/packages/dbgpt-core/src/dbgpt/storage/base.py index c52f6ec24..b14e0661b 100644 --- a/packages/dbgpt-core/src/dbgpt/storage/base.py +++ b/packages/dbgpt-core/src/dbgpt/storage/base.py @@ -1,4 +1,5 @@ """Index store base class.""" + import asyncio import logging import time @@ -192,13 +193,15 @@ class IndexStoreBase(ABC): logger.info(f"Loaded {loaded_cnt} chunks, total {len(chunks)} chunks.") return ids + async def _run_tasks_with_concurrency(self, tasks, max_concurrent): results = [] for i in range(0, len(tasks), max_concurrent): - batch = tasks[i:i + max_concurrent] + batch = tasks[i : i + max_concurrent] batch_results = await asyncio.gather(*batch, return_exceptions=True) results.extend(batch_results) return results + def similar_search( self, text: str, topk: int, filters: Optional[MetadataFilters] = None ) -> List[Chunk]: diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py index c2a1f01ed..ef91ced42 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/chroma_store.py @@ -291,7 +291,7 @@ class ChromaStore(VectorStoreBase): logger.info(f"Total IDs to delete: {total}") for i in range(0, total, batch_size): - batch_ids = id_list[i:i + batch_size] + batch_ids = id_list[i : i + batch_size] logger.info(f"Deleting batch {i // batch_size + 1}: {len(batch_ids)} IDs") try: self._collection.delete(ids=batch_ids)