diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index 084fe9b8907..66c4343bb65 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -466,10 +466,9 @@ def index( _source_ids = cast("Sequence[str]", source_ids) - uids_to_delete = record_manager.list_keys( - group_ids=_source_ids, before=index_start_dt - ) - if uids_to_delete: + while uids_to_delete := record_manager.list_keys( + group_ids=_source_ids, before=index_start_dt, limit=cleanup_batch_size + ): # Then delete from vector store. _delete(destination, uids_to_delete) # First delete from record store. @@ -780,10 +779,9 @@ async def aindex( _source_ids = cast("Sequence[str]", source_ids) - uids_to_delete = await record_manager.alist_keys( - group_ids=_source_ids, before=index_start_dt - ) - if uids_to_delete: + while uids_to_delete := await record_manager.alist_keys( + group_ids=_source_ids, before=index_start_dt, limit=cleanup_batch_size + ): # Then delete from vector store. await _adelete(destination, uids_to_delete) # First delete from record store. diff --git a/libs/core/tests/unit_tests/indexing/test_indexing.py b/libs/core/tests/unit_tests/indexing/test_indexing.py index f3a9632eef0..78500719228 100644 --- a/libs/core/tests/unit_tests/indexing/test_indexing.py +++ b/libs/core/tests/unit_tests/indexing/test_indexing.py @@ -1882,7 +1882,7 @@ async def test_adeduplication( } -def test_cleanup_with_different_batchsize( +def test_full_cleanup_with_different_batchsize( record_manager: InMemoryRecordManager, vector_store: VectorStore ) -> None: """Check that we can clean up with different batch size.""" @@ -1919,7 +1919,56 @@ def test_cleanup_with_different_batchsize( } -async def test_async_cleanup_with_different_batchsize( +def test_incremental_cleanup_with_different_batchsize( + record_manager: InMemoryRecordManager, vector_store: VectorStore +) -> None: + """Check that we can clean up with different batch size.""" + + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": str(d)}, + ) + for d in range(1000) + ] + + assert index( + docs, + record_manager, + vector_store, + source_id_key="source", + cleanup="incremental", + ) == { + "num_added": 1000, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + docs = [ + Document( + page_content="Different doc", + metadata={"source": str(d)}, + ) + for d in range(1001) + ] + + assert index( + docs, + record_manager, + vector_store, + source_id_key="source", + cleanup="incremental", + cleanup_batch_size=17, + ) == { + "num_added": 1001, + "num_deleted": 1000, + "num_skipped": 0, + "num_updated": 0, + } + + +async def test_afull_cleanup_with_different_batchsize( arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore ) -> None: """Check that we can clean up with different batch size.""" @@ -1956,6 +2005,54 @@ async def test_async_cleanup_with_different_batchsize( } +async def test_aincremental_cleanup_with_different_batchsize( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Check that we can clean up with different batch size.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": str(d)}, + ) + for d in range(1000) + ] + + assert await aindex( + docs, + arecord_manager, + vector_store, + source_id_key="source", + cleanup="incremental", + ) == { + "num_added": 1000, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + docs = [ + Document( + page_content="Different doc", + metadata={"source": str(d)}, + ) + for d in range(1001) + ] + + assert await aindex( + docs, + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + cleanup_batch_size=17, + ) == { + "num_added": 1001, + "num_deleted": 1000, + "num_skipped": 0, + "num_updated": 0, + } + + def test_deduplication_v2( record_manager: InMemoryRecordManager, vector_store: VectorStore ) -> None: