diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index 9a1a0b67a04..3a8d859379f 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -22,6 +22,7 @@ from pydantic import model_validator from langchain_core.document_loaders.base import BaseLoader from langchain_core.documents import Document +from langchain_core.exceptions import LangChainException from langchain_core.indexing.base import DocumentIndex, RecordManager from langchain_core.vectorstores import VectorStore @@ -175,6 +176,32 @@ def _deduplicate_in_order( yield hashed_doc +class IndexingException(LangChainException): + """Raised when an indexing operation fails.""" + + +def _delete( + vector_store: Union[VectorStore, DocumentIndex], + ids: list[str], +) -> None: + if isinstance(vector_store, VectorStore): + delete_ok = vector_store.delete(ids) + if delete_ok is not None and delete_ok is False: + msg = "The delete operation to VectorStore failed." + raise IndexingException(msg) + elif isinstance(vector_store, DocumentIndex): + delete_response = vector_store.delete(ids) + if "num_failed" in delete_response and delete_response["num_failed"] > 0: + msg = "The delete operation to DocumentIndex failed." + raise IndexingException(msg) + else: + msg = ( + f"Vectorstore should be either a VectorStore or a DocumentIndex. " + f"Got {type(vector_store)}." + ) + raise TypeError(msg) + + # PUBLIC API @@ -441,7 +468,7 @@ def index( ) if uids_to_delete: # Then delete from vector store. - destination.delete(uids_to_delete) + _delete(destination, uids_to_delete) # First delete from record store. record_manager.delete_keys(uids_to_delete) num_deleted += len(uids_to_delete) @@ -454,7 +481,7 @@ def index( group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size ): # First delete from record store. - destination.delete(uids_to_delete) + _delete(destination, uids_to_delete) # Then delete from record manager. record_manager.delete_keys(uids_to_delete) num_deleted += len(uids_to_delete) @@ -474,6 +501,28 @@ async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]: yield item +async def _adelete( + vector_store: Union[VectorStore, DocumentIndex], + ids: list[str], +) -> None: + if isinstance(vector_store, VectorStore): + delete_ok = await vector_store.adelete(ids) + if delete_ok is not None and delete_ok is False: + msg = "The delete operation to VectorStore failed." + raise IndexingException(msg) + elif isinstance(vector_store, DocumentIndex): + delete_response = await vector_store.adelete(ids) + if "num_failed" in delete_response and delete_response["num_failed"] > 0: + msg = "The delete operation to DocumentIndex failed." + raise IndexingException(msg) + else: + msg = ( + f"Vectorstore should be either a VectorStore or a DocumentIndex. " + f"Got {type(vector_store)}." + ) + raise TypeError(msg) + + async def aindex( docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]], record_manager: RecordManager, @@ -733,7 +782,7 @@ async def aindex( ) if uids_to_delete: # Then delete from vector store. - await destination.adelete(uids_to_delete) + await _adelete(destination, uids_to_delete) # First delete from record store. await record_manager.adelete_keys(uids_to_delete) num_deleted += len(uids_to_delete) @@ -746,7 +795,7 @@ async def aindex( group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size ): # First delete from record store. - await destination.adelete(uids_to_delete) + await _adelete(destination, uids_to_delete) # Then delete from record manager. await record_manager.adelete_keys(uids_to_delete) num_deleted += len(uids_to_delete) diff --git a/libs/core/tests/unit_tests/indexing/test_indexing.py b/libs/core/tests/unit_tests/indexing/test_indexing.py index dd339b28985..fbe59013e83 100644 --- a/libs/core/tests/unit_tests/indexing/test_indexing.py +++ b/libs/core/tests/unit_tests/indexing/test_indexing.py @@ -13,7 +13,7 @@ from langchain_core.document_loaders.base import BaseLoader from langchain_core.documents import Document from langchain_core.embeddings import DeterministicFakeEmbedding from langchain_core.indexing import InMemoryRecordManager, aindex, index -from langchain_core.indexing.api import _abatch, _HashedDocument +from langchain_core.indexing.api import IndexingException, _abatch, _HashedDocument from langchain_core.indexing.in_memory import InMemoryDocumentIndex from langchain_core.vectorstores import InMemoryVectorStore, VectorStore @@ -287,6 +287,164 @@ async def test_aindex_simple_delete_full( } +def test_index_delete_full_recovery_after_deletion_failure( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Indexing some content to confirm it gets added only once.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + ), + Document( + page_content="This is another document.", + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert index(loader, record_manager, vector_store, cleanup="full") == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + loader = ToyLoader( + documents=[ + Document( + page_content="mutated document 1", + ), + Document( + page_content="This is another document.", # <-- Same as original + ), + ] + ) + + with ( + patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ), + patch.object(vector_store, "delete", return_value=False), + pytest.raises(IndexingException), + ): + indexing_result = index(loader, record_manager, vector_store, cleanup="full") + + # At this point, there should be 3 records in both the record manager + # and the vector store + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == { + "This is a test document.", + "mutated document 1", + "This is another document.", + } + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp() + ): + indexing_result = index(loader, record_manager, vector_store, cleanup="full") + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"mutated document 1", "This is another document."} + + assert indexing_result == { + "num_added": 0, + "num_deleted": 1, + "num_skipped": 2, + "num_updated": 0, + } + + +async def test_aindex_delete_full_recovery_after_deletion_failure( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Indexing some content to confirm it gets added only once.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + ), + Document( + page_content="This is another document.", + ), + ] + ) + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert await aindex(loader, arecord_manager, vector_store, cleanup="full") == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + loader = ToyLoader( + documents=[ + Document( + page_content="mutated document 1", + ), + Document( + page_content="This is another document.", # <-- Same as original + ), + ] + ) + + with ( + patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ), + patch.object(vector_store, "adelete", return_value=False), + pytest.raises(IndexingException), + ): + indexing_result = await aindex( + loader, arecord_manager, vector_store, cleanup="full" + ) + + # At this point, there should be 3 records in both the record manager + # and the vector store + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == { + "This is a test document.", + "mutated document 1", + "This is another document.", + } + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp() + ): + indexing_result = await aindex( + loader, arecord_manager, vector_store, cleanup="full" + ) + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"mutated document 1", "This is another document."} + + assert indexing_result == { + "num_added": 0, + "num_deleted": 1, + "num_skipped": 2, + "num_updated": 0, + } + + def test_incremental_fails_with_bad_source_ids( record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore ) -> None: