fix(VectorStore): fix task concurrency and batch processing

- Refactor task execution to improve concurrency control
- Implement batch processing for vector deletion in Chroma store
This commit is contained in:
tam 2025-05-06 23:23:40 +08:00
parent 3a00aca113
commit 4eb06299e8
2 changed files with 28 additions and 11 deletions

View File

@ -1,5 +1,5 @@
"""Index store base class.""" """Index store base class."""
import asyncio
import logging import logging
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
@ -182,9 +182,7 @@ class IndexStoreBase(ABC):
for chunk_group in chunk_groups: for chunk_group in chunk_groups:
tasks.append(self.aload_document(chunk_group)) tasks.append(self.aload_document(chunk_group))
import asyncio results = await self._run_tasks_with_concurrency(tasks, max_threads)
results = await asyncio.gather(*tasks)
ids = [] ids = []
loaded_cnt = 0 loaded_cnt = 0
@ -194,7 +192,13 @@ class IndexStoreBase(ABC):
logger.info(f"Loaded {loaded_cnt} chunks, total {len(chunks)} chunks.") logger.info(f"Loaded {loaded_cnt} chunks, total {len(chunks)} chunks.")
return ids return ids
async def _run_tasks_with_concurrency(self, tasks, max_concurrent):
results = []
for i in range(0, len(tasks), max_concurrent):
batch = tasks[i:i + max_concurrent]
batch_results = await asyncio.gather(*batch, return_exceptions=True)
results.extend(batch_results)
return results
def similar_search( def similar_search(
self, text: str, topk: int, filters: Optional[MetadataFilters] = None self, text: str, topk: int, filters: Optional[MetadataFilters] = None
) -> List[Chunk]: ) -> List[Chunk]:

View File

@ -278,12 +278,25 @@ class ChromaStore(VectorStoreBase):
logger.error(f"Error during vector store deletion: {e}") logger.error(f"Error during vector store deletion: {e}")
raise raise
def delete_by_ids(self, ids): def delete_by_ids(self, ids, batch_size: int = 1000):
"""Delete vector by ids.""" """Delete vector by ids in batches.
logger.info(f"begin delete chroma ids: {ids}")
ids = ids.split(",") Args:
if len(ids) > 0: ids (str): Comma-separated string of IDs to delete.
self._collection.delete(ids=ids) batch_size (int): Number of IDs per batch. Default is 1000.
"""
logger.info(f"begin delete chroma ids in batches (batch size: {batch_size})")
id_list = ids.split(",")
total = len(id_list)
logger.info(f"Total IDs to delete: {total}")
for i in range(0, total, batch_size):
batch_ids = id_list[i:i + batch_size]
logger.info(f"Deleting batch {i // batch_size + 1}: {len(batch_ids)} IDs")
try:
self._collection.delete(ids=batch_ids)
except Exception as e:
logger.error(f"Failed to delete batch starting at index {i}: {e}")
def truncate(self) -> List[str]: def truncate(self) -> List[str]:
"""Truncate data index_name.""" """Truncate data index_name."""