fix(VectorStore): fix task concurrency and batch processing issue (#2671)

- Refactor task execution to improve concurrency control
- Implement batch processing for vector deletion in Chroma store

# Description

1 ,When chunk a knowledge base file, if there are too many chunks ,
using asyncio.gather(*tasks) will cause the CPU to freeze
2 , When using the Chroma vector database, if the one-time deletion of
embedding documents ids exceeds 5461, it will fail

![0e64098babaa448ec139933fecf5776](https://github.com/user-attachments/assets/f9fa81c1-2914-44e6-8d31-a16d7717c6b3)


# How Has This Been Tested?

# Snapshots:

![ff0ca0d2b35be11dd19aa1983a196ba](https://github.com/user-attachments/assets/4db4202e-5c71-4c97-b075-465ea5419657)

![c8821b068b0c8b73757700d2ecaa23a](https://github.com/user-attachments/assets/b6bc28d0-9406-4b6b-92cf-6a9765288fd2)

Include snapshots for easier review.

# Checklist:

- [x] My code follows the style guidelines of this project
- [x] I have already rebased the commits and make the commit message
conform to the project standard.
- [x] I have performed a self-review of my own code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] Any dependent changes have been merged and published in downstream
modules
This commit is contained in:
alanchen 2025-05-12 16:25:06 +08:00 committed by GitHub
commit d83704d233
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 29 additions and 9 deletions

View File

@ -1,5 +1,6 @@
"""Index store base class."""
import asyncio
import logging
import time
from abc import ABC, abstractmethod
@ -182,9 +183,7 @@ class IndexStoreBase(ABC):
for chunk_group in chunk_groups:
tasks.append(self.aload_document(chunk_group))
import asyncio
results = await asyncio.gather(*tasks)
results = await self._run_tasks_with_concurrency(tasks, max_threads)
ids = []
loaded_cnt = 0
@ -195,6 +194,14 @@ class IndexStoreBase(ABC):
return ids
async def _run_tasks_with_concurrency(self, tasks, max_concurrent):
results = []
for i in range(0, len(tasks), max_concurrent):
batch = tasks[i : i + max_concurrent]
batch_results = await asyncio.gather(*batch, return_exceptions=True)
results.extend(batch_results)
return results
def similar_search(
self, text: str, topk: int, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:

View File

@ -278,12 +278,25 @@ class ChromaStore(VectorStoreBase):
logger.error(f"Error during vector store deletion: {e}")
raise
def delete_by_ids(self, ids):
"""Delete vector by ids."""
logger.info(f"begin delete chroma ids: {ids}")
ids = ids.split(",")
if len(ids) > 0:
self._collection.delete(ids=ids)
def delete_by_ids(self, ids, batch_size: int = 1000):
"""Delete vector by ids in batches.
Args:
ids (str): Comma-separated string of IDs to delete.
batch_size (int): Number of IDs per batch. Default is 1000.
"""
logger.info(f"begin delete chroma ids in batches (batch size: {batch_size})")
id_list = ids.split(",")
total = len(id_list)
logger.info(f"Total IDs to delete: {total}")
for i in range(0, total, batch_size):
batch_ids = id_list[i : i + batch_size]
logger.info(f"Deleting batch {i // batch_size + 1}: {len(batch_ids)} IDs")
try:
self._collection.delete(ids=batch_ids)
except Exception as e:
logger.error(f"Failed to delete batch starting at index {i}: {e}")
def truncate(self) -> List[str]:
"""Truncate data index_name."""