diff --git a/libs/langchain/langchain/indexes/_api.py b/libs/langchain/langchain/indexes/_api.py index 29bf6dfb5ea..03a84499cff 100644 --- a/libs/langchain/langchain/indexes/_api.py +++ b/libs/langchain/langchain/indexes/_api.py @@ -195,6 +195,7 @@ def index( cleanup: Literal["incremental", "full", None] = None, source_id_key: Union[str, Callable[[Document], str], None] = None, cleanup_batch_size: int = 1_000, + force_update: bool = False, ) -> IndexingResult: """Index data from the loader into the vector store. @@ -233,6 +234,8 @@ def index( source_id_key: Optional key that helps identify the original source of the document. cleanup_batch_size: Batch size to use when cleaning up documents. + force_update: Force update documents even if they are present in the + record manager. Useful if you are re-indexing with updated embeddings. Returns: Indexing result which contains information about how many documents @@ -308,10 +311,14 @@ def index( uids = [] docs_to_index = [] uids_to_refresh = [] + seen_docs: Set[str] = set() for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): if doc_exists: - uids_to_refresh.append(hashed_doc.uid) - continue + if force_update: + seen_docs.add(hashed_doc.uid) + else: + uids_to_refresh.append(hashed_doc.uid) + continue uids.append(hashed_doc.uid) docs_to_index.append(hashed_doc.to_document()) @@ -324,7 +331,8 @@ def index( # First write to vector store if docs_to_index: vector_store.add_documents(docs_to_index, ids=uids) - num_added += len(docs_to_index) + num_added += len(docs_to_index) - len(seen_docs) + num_updated += len(seen_docs) # And only then update the record store. # Update ALL records, even if they already exist since we want to refresh @@ -391,6 +399,7 @@ async def aindex( cleanup: Literal["incremental", "full", None] = None, source_id_key: Union[str, Callable[[Document], str], None] = None, cleanup_batch_size: int = 1_000, + force_update: bool = False, ) -> IndexingResult: """Index data from the loader into the vector store. @@ -429,6 +438,8 @@ async def aindex( source_id_key: Optional key that helps identify the original source of the document. cleanup_batch_size: Batch size to use when cleaning up documents. + force_update: Force update documents even if they are present in the + record manager. Useful if you are re-indexing with updated embeddings. Returns: Indexing result which contains information about how many documents @@ -508,11 +519,14 @@ async def aindex( uids: list[str] = [] docs_to_index: list[Document] = [] uids_to_refresh = [] - + seen_docs: Set[str] = set() for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): if doc_exists: - uids_to_refresh.append(hashed_doc.uid) - continue + if force_update: + seen_docs.add(hashed_doc.uid) + else: + uids_to_refresh.append(hashed_doc.uid) + continue uids.append(hashed_doc.uid) docs_to_index.append(hashed_doc.to_document()) @@ -525,7 +539,8 @@ async def aindex( # First write to vector store if docs_to_index: await vector_store.aadd_documents(docs_to_index, ids=uids) - num_added += len(docs_to_index) + num_added += len(docs_to_index) - len(seen_docs) + num_updated += len(seen_docs) # And only then update the record store. # Update ALL records, even if they already exist since we want to refresh diff --git a/libs/langchain/tests/unit_tests/indexes/test_indexing.py b/libs/langchain/tests/unit_tests/indexes/test_indexing.py index 198e0178255..884654f38e6 100644 --- a/libs/langchain/tests/unit_tests/indexes/test_indexing.py +++ b/libs/langchain/tests/unit_tests/indexes/test_indexing.py @@ -58,9 +58,10 @@ class ToyLoader(BaseLoader): class InMemoryVectorStore(VectorStore): """In-memory implementation of VectorStore using a dictionary.""" - def __init__(self) -> None: + def __init__(self, permit_upserts: bool = False) -> None: """Vector store interface for testing things in memory.""" self.store: Dict[str, Document] = {} + self.permit_upserts = permit_upserts def delete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None: """Delete the given documents from the store using their IDs.""" @@ -91,7 +92,7 @@ class InMemoryVectorStore(VectorStore): raise NotImplementedError("This is not implemented yet.") for _id, document in zip(ids, documents): - if _id in self.store: + if _id in self.store and not self.permit_upserts: raise ValueError( f"Document with uid {_id} already exists in the store." ) @@ -115,7 +116,7 @@ class InMemoryVectorStore(VectorStore): raise NotImplementedError("This is not implemented yet.") for _id, document in zip(ids, documents): - if _id in self.store: + if _id in self.store and not self.permit_upserts: raise ValueError( f"Document with uid {_id} already exists in the store." ) @@ -176,6 +177,12 @@ def vector_store() -> InMemoryVectorStore: return InMemoryVectorStore() +@pytest.fixture +def upserting_vector_store() -> InMemoryVectorStore: + """Vector store fixture.""" + return InMemoryVectorStore(permit_upserts=True) + + def test_indexing_same_content( record_manager: SQLRecordManager, vector_store: InMemoryVectorStore ) -> None: @@ -1074,6 +1081,101 @@ async def test_abatch() -> None: assert [batch async for batch in batches] == [[0, 1], [2, 3], [4]] +def test_indexing_force_update( + record_manager: SQLRecordManager, upserting_vector_store: VectorStore +) -> None: + """Test indexing with force update.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + ] + + assert index(docs, record_manager, upserting_vector_store, cleanup="full") == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + assert index(docs, record_manager, upserting_vector_store, cleanup="full") == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + assert index( + docs, record_manager, upserting_vector_store, cleanup="full", force_update=True + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 2, + } + + +@pytest.mark.requires("aiosqlite") +async def test_aindexing_force_update( + arecord_manager: SQLRecordManager, upserting_vector_store: VectorStore +) -> None: + """Test indexing with force update.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + ] + + assert await aindex( + docs, arecord_manager, upserting_vector_store, cleanup="full" + ) == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + assert await aindex( + docs, arecord_manager, upserting_vector_store, cleanup="full" + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + assert await aindex( + docs, + arecord_manager, + upserting_vector_store, + cleanup="full", + force_update=True, + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 2, + } + + def test_compatible_vectorstore_documentation() -> None: """Test which vectorstores are compatible with the indexing API.