diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index c310deb2415..75efe1f5659 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -408,18 +408,17 @@ def index( # mypy isn't good enough to determine that source ids cannot be None # here due to a check that's happening above, so we check again. - if any(source_id is None for source_id in source_ids): - msg = "Source ids cannot be if cleanup=='incremental'." - raise AssertionError(msg) + for source_id in source_ids: + if source_id is None: + msg = "Source ids cannot be None here." + raise AssertionError(msg) - indexed_source_ids = cast( - Sequence[str], [source_id_assigner(doc) for doc in docs_to_index] - ) + _source_ids = cast(Sequence[str], source_ids) uids_to_delete = record_manager.list_keys( - group_ids=indexed_source_ids, before=index_start_dt + group_ids=_source_ids, before=index_start_dt ) - if indexed_source_ids and uids_to_delete: + if uids_to_delete: # Then delete from vector store. destination.delete(uids_to_delete) # First delete from record store. @@ -669,18 +668,17 @@ async def aindex( # mypy isn't good enough to determine that source ids cannot be None # here due to a check that's happening above, so we check again. - if any(source_id is None for source_id in source_ids): - msg = "Source ids cannot be if cleanup=='incremental'." - raise AssertionError(msg) + for source_id in source_ids: + if source_id is None: + msg = "Source ids cannot be None here." + raise AssertionError(msg) - indexed_source_ids = cast( - Sequence[str], [source_id_assigner(doc) for doc in docs_to_index] - ) + _source_ids = cast(Sequence[str], source_ids) uids_to_delete = await record_manager.alist_keys( - group_ids=indexed_source_ids, before=index_start_dt + group_ids=_source_ids, before=index_start_dt ) - if indexed_source_ids and uids_to_delete: + if uids_to_delete: # Then delete from vector store. await destination.adelete(uids_to_delete) # First delete from record store. diff --git a/libs/core/tests/unit_tests/indexing/test_indexing.py b/libs/core/tests/unit_tests/indexing/test_indexing.py index 287b6b49f66..4b5874b85b7 100644 --- a/libs/core/tests/unit_tests/indexing/test_indexing.py +++ b/libs/core/tests/unit_tests/indexing/test_indexing.py @@ -544,7 +544,7 @@ def test_incremental_delete( ) with patch.object( - record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() ): assert index( loader, @@ -630,6 +630,82 @@ def test_incremental_delete( } +def test_incremental_delete_with_same_source( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing with incremental deletion strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "1"}, + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"This is another document.", "This is a test document."} + + # Delete 1 document and unchange 1 document + loader = ToyLoader( + documents=[ + Document( + page_content="This is another document.", # <-- Same as original + metadata={"source": "1"}, + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) == { + "num_added": 0, + "num_deleted": 1, + "num_skipped": 1, + "num_updated": 0, + } + + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == { + "This is another document.", + } + + def test_incremental_indexing_with_batch_size( record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore ) -> None: @@ -690,9 +766,9 @@ def test_incremental_indexing_with_batch_size( source_id_key="source", batch_size=2, ) == { - "num_added": 0, - "num_deleted": 0, - "num_skipped": 4, + "num_added": 2, + "num_deleted": 2, + "num_skipped": 2, "num_updated": 0, } @@ -704,149 +780,6 @@ def test_incremental_indexing_with_batch_size( assert doc_texts == {"1", "2", "3", "4"} -def test_incremental_indexing_with_batch_size_with_optimization( - record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore -) -> None: - """Test case when batch_size < num of docs. - - Here, we'll verify that an indexing optimization works as expected. - """ - documents = [ - Document( - page_content="1", - metadata={"source": "1"}, - ), - Document( - page_content="2", - metadata={"source": "1"}, - ), - Document( - page_content="3", - metadata={"source": "1"}, - ), - Document( - page_content="4", - metadata={"source": "1"}, - ), - ] - - with patch.object( - record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() - ): - assert index( - documents, - record_manager, - vector_store, - cleanup="incremental", - source_id_key="source", - batch_size=2, - ) == { - "num_added": 4, - "num_deleted": 0, - "num_skipped": 0, - "num_updated": 0, - } - - doc_texts = { - # Ignoring type since doc should be in the store and not a None - vector_store.get_by_ids([uid])[0].page_content # type: ignore - for uid in vector_store.store - } - assert doc_texts == {"1", "2", "3", "4"} - - # Mutate content in first batch - doc_first_batch_mutation = [ - Document( - page_content="updated 1", - metadata={"source": "1"}, - ), - Document( - page_content="2", - metadata={"source": "1"}, - ), - Document( - page_content="3", - metadata={"source": "1"}, - ), - Document( - page_content="4", - metadata={"source": "1"}, - ), - ] - - with patch.object( - record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() - ): - assert index( - doc_first_batch_mutation, - record_manager, - vector_store, - cleanup="incremental", - source_id_key="source", - batch_size=2, - ) == { - # Cannot optimize here since the first batch was changed - # So only skpping indexing the document with content "2" - "num_added": 3, - "num_deleted": 3, - "num_skipped": 1, - "num_updated": 0, - } - - doc_texts = { - # Ignoring type since doc should be in the store and not a None - vector_store.get_by_ids([uid])[0].page_content # type: ignore - for uid in vector_store.store - } - assert doc_texts == {"updated 1", "2", "3", "4"} - - # Mutate content in second batch - doc_second_batch_mutation = [ - Document( - page_content="updated 1", # <-- This was already previously updated - metadata={"source": "1"}, - ), - Document( - page_content="2", - metadata={"source": "1"}, - ), - Document( - page_content="3", - metadata={"source": "1"}, - ), - Document( - page_content="updated 4", - metadata={"source": "1"}, - ), - ] - - with patch.object( - record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp() - ): - assert index( - doc_second_batch_mutation, - record_manager, - vector_store, - cleanup="incremental", - source_id_key="source", - batch_size=2, - ) == { - # Skips updating content from the first batch, only updates - # the `updated 4` content - "num_added": 1, - "num_deleted": 1, - "num_skipped": 3, - "num_updated": 0, - } - - doc_texts = { - # Ignoring type since doc should be in the store and not a None - vector_store.get_by_ids([uid])[0].page_content # type: ignore - for uid in vector_store.store - } - assert doc_texts == {"updated 1", "2", "3", "updated 4"} - - def test_incremental_delete_with_batch_size( record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore ) -> None: @@ -1588,149 +1521,6 @@ async def test_aindex_into_document_index( } -async def test_incremental_aindexing_with_batch_size_with_optimization( - arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore -) -> None: - """Test case when batch_size < num of docs. - - Here, we'll verify that an indexing optimization works as expected. - """ - documents = [ - Document( - page_content="1", - metadata={"source": "1"}, - ), - Document( - page_content="2", - metadata={"source": "1"}, - ), - Document( - page_content="3", - metadata={"source": "1"}, - ), - Document( - page_content="4", - metadata={"source": "1"}, - ), - ] - - with patch.object( - arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() - ): - assert await aindex( - documents, - arecord_manager, - vector_store, - cleanup="incremental", - source_id_key="source", - batch_size=2, - ) == { - "num_added": 4, - "num_deleted": 0, - "num_skipped": 0, - "num_updated": 0, - } - - doc_texts = { - # Ignoring type since doc should be in the store and not a None - vector_store.get_by_ids([uid])[0].page_content # type: ignore - for uid in vector_store.store - } - assert doc_texts == {"1", "2", "3", "4"} - - # Mutate content in first batch - doc_first_batch_mutation = [ - Document( - page_content="updated 1", - metadata={"source": "1"}, - ), - Document( - page_content="2", - metadata={"source": "1"}, - ), - Document( - page_content="3", - metadata={"source": "1"}, - ), - Document( - page_content="4", - metadata={"source": "1"}, - ), - ] - - with patch.object( - arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() - ): - assert await aindex( - doc_first_batch_mutation, - arecord_manager, - vector_store, - cleanup="incremental", - source_id_key="source", - batch_size=2, - ) == { - # Cannot optimize here since the first batch was changed - # So only skpping indexing the document with content "2" - "num_added": 3, - "num_deleted": 3, - "num_skipped": 1, - "num_updated": 0, - } - - doc_texts = { - # Ignoring type since doc should be in the store and not a None - vector_store.get_by_ids([uid])[0].page_content # type: ignore - for uid in vector_store.store - } - assert doc_texts == {"updated 1", "2", "3", "4"} - - # Mutate content in second batch - doc_second_batch_mutation = [ - Document( - page_content="updated 1", # <-- This was already previously updated - metadata={"source": "1"}, - ), - Document( - page_content="2", - metadata={"source": "1"}, - ), - Document( - page_content="3", - metadata={"source": "1"}, - ), - Document( - page_content="updated 4", - metadata={"source": "1"}, - ), - ] - - with patch.object( - arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp() - ): - assert await aindex( - doc_second_batch_mutation, - arecord_manager, - vector_store, - cleanup="incremental", - source_id_key="source", - batch_size=2, - ) == { - # Skips updating content from the first batch, only updates - # the `updated 4` content - "num_added": 1, - "num_deleted": 1, - "num_skipped": 3, - "num_updated": 0, - } - - doc_texts = { - # Ignoring type since doc should be in the store and not a None - vector_store.get_by_ids([uid])[0].page_content # type: ignore - for uid in vector_store.store - } - assert doc_texts == {"updated 1", "2", "3", "updated 4"} - - def test_index_with_upsert_kwargs( record_manager: InMemoryRecordManager, upserting_vector_store: InMemoryVectorStore ) -> None: