diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index 0462e0838ba..add07930624 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -27,7 +27,7 @@ from typing import ( from langchain_core.document_loaders.base import BaseLoader from langchain_core.documents import Document -from langchain_core.indexing.base import RecordManager +from langchain_core.indexing.base import DocumentIndex, RecordManager from langchain_core.pydantic_v1 import root_validator from langchain_core.vectorstores import VectorStore @@ -106,6 +106,7 @@ class _HashedDocument(Document): def to_document(self) -> Document: """Return a Document object.""" return Document( + id=self.uid, page_content=self.page_content, metadata=self.metadata, ) @@ -195,7 +196,7 @@ class IndexingResult(TypedDict): def index( docs_source: Union[BaseLoader, Iterable[Document]], record_manager: RecordManager, - vector_store: VectorStore, + vector_store: Union[VectorStore, DocumentIndex], *, batch_size: int = 100, cleanup: Literal["incremental", "full", None] = None, @@ -232,7 +233,7 @@ def index( docs_source: Data loader or iterable of documents to index. record_manager: Timestamped set to keep track of which documents were updated. - vector_store: Vector store to index the documents into. + vector_store: VectorStore or DocumentIndex to index the documents into. batch_size: Batch size to use when indexing. Default is 100. cleanup: How to handle clean up of documents. Default is None. - Incremental: Cleans up all documents that haven't been updated AND @@ -274,19 +275,30 @@ def index( if cleanup == "incremental" and source_id_key is None: raise ValueError("Source id key is required when cleanup mode is incremental.") - # Check that the Vectorstore has required methods implemented - methods = ["delete", "add_documents"] + destination = vector_store # Renaming internally for clarity - for method in methods: - if not hasattr(vector_store, method): - raise ValueError( - f"Vectorstore {vector_store} does not have required method {method}" - ) + # If it's a vectorstore, let's check if it has the required methods. + if isinstance(destination, VectorStore): + # Check that the Vectorstore has required methods implemented + methods = ["delete", "add_documents"] - if type(vector_store).delete == VectorStore.delete: - # Checking if the vectorstore has overridden the default delete method - # implementation which just raises a NotImplementedError - raise ValueError("Vectorstore has not implemented the delete method") + for method in methods: + if not hasattr(destination, method): + raise ValueError( + f"Vectorstore {destination} does not have required method {method}" + ) + + if type(destination).delete == VectorStore.delete: + # Checking if the vectorstore has overridden the default delete method + # implementation which just raises a NotImplementedError + raise ValueError("Vectorstore has not implemented the delete method") + elif isinstance(destination, DocumentIndex): + pass + else: + raise TypeError( + f"Vectorstore should be either a VectorStore or a DocumentIndex. " + f"Got {type(destination)}." + ) if isinstance(docs_source, BaseLoader): try: @@ -354,7 +366,13 @@ def index( # Be pessimistic and assume that all vector store write will fail. # First write to vector store if docs_to_index: - vector_store.add_documents(docs_to_index, ids=uids, batch_size=batch_size) + if isinstance(destination, VectorStore): + destination.add_documents( + docs_to_index, ids=uids, batch_size=batch_size + ) + elif isinstance(destination, DocumentIndex): + destination.upsert(docs_to_index) + num_added += len(docs_to_index) - len(seen_docs) num_updated += len(seen_docs) @@ -384,7 +402,7 @@ def index( ) if uids_to_delete: # Then delete from vector store. - vector_store.delete(uids_to_delete) + destination.delete(uids_to_delete) # First delete from record store. record_manager.delete_keys(uids_to_delete) num_deleted += len(uids_to_delete) @@ -394,7 +412,7 @@ def index( before=index_start_dt, limit=cleanup_batch_size ): # First delete from record store. - vector_store.delete(uids_to_delete) + destination.delete(uids_to_delete) # Then delete from record manager. record_manager.delete_keys(uids_to_delete) num_deleted += len(uids_to_delete) @@ -417,7 +435,7 @@ async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]: async def aindex( docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]], record_manager: RecordManager, - vector_store: VectorStore, + vectorstore: Union[VectorStore, DocumentIndex], *, batch_size: int = 100, cleanup: Literal["incremental", "full", None] = None, @@ -446,7 +464,7 @@ async def aindex( docs_source: Data loader or iterable of documents to index. record_manager: Timestamped set to keep track of which documents were updated. - vector_store: Vector store to index the documents into. + vectorstore: Vector store or Document Index to index the documents into batch_size: Batch size to use when indexing. Default is 100. cleanup: How to handle clean up of documents. Default is None. - Incremental: Cleans up all documents that haven't been updated AND @@ -488,20 +506,31 @@ async def aindex( if cleanup == "incremental" and source_id_key is None: raise ValueError("Source id key is required when cleanup mode is incremental.") - # Check that the Vectorstore has required methods implemented - methods = ["adelete", "aadd_documents"] + destination = vectorstore # Renaming internally for clarity - for method in methods: - if not hasattr(vector_store, method): - raise ValueError( - f"Vectorstore {vector_store} does not have required method {method}" - ) + # If it's a vectorstore, let's check if it has the required methods. + if isinstance(destination, VectorStore): + # Check that the Vectorstore has required methods implemented + # Check that the Vectorstore has required methods implemented + methods = ["adelete", "aadd_documents"] - if type(vector_store).adelete == VectorStore.adelete: - # Checking if the vectorstore has overridden the default delete method - # implementation which just raises a NotImplementedError - raise ValueError("Vectorstore has not implemented the delete method") + for method in methods: + if not hasattr(destination, method): + raise ValueError( + f"Vectorstore {destination} does not have required method {method}" + ) + if type(destination).adelete == VectorStore.adelete: + # Checking if the vectorstore has overridden the default delete method + # implementation which just raises a NotImplementedError + raise ValueError("Vectorstore has not implemented the delete method") + elif isinstance(destination, DocumentIndex): + pass + else: + raise TypeError( + f"Vectorstore should be either a VectorStore or a DocumentIndex. " + f"Got {type(destination)}." + ) async_doc_iterator: AsyncIterator[Document] if isinstance(docs_source, BaseLoader): try: @@ -577,9 +606,12 @@ async def aindex( # Be pessimistic and assume that all vector store write will fail. # First write to vector store if docs_to_index: - await vector_store.aadd_documents( - docs_to_index, ids=uids, batch_size=batch_size - ) + if isinstance(destination, VectorStore): + await destination.aadd_documents( + docs_to_index, ids=uids, batch_size=batch_size + ) + elif isinstance(destination, DocumentIndex): + await destination.aupsert(docs_to_index) num_added += len(docs_to_index) - len(seen_docs) num_updated += len(seen_docs) @@ -610,7 +642,7 @@ async def aindex( ) if uids_to_delete: # Then delete from vector store. - await vector_store.adelete(uids_to_delete) + await destination.adelete(uids_to_delete) # First delete from record store. await record_manager.adelete_keys(uids_to_delete) num_deleted += len(uids_to_delete) @@ -620,7 +652,7 @@ async def aindex( before=index_start_dt, limit=cleanup_batch_size ): # First delete from record store. - await vector_store.adelete(uids_to_delete) + await destination.adelete(uids_to_delete) # Then delete from record manager. await record_manager.adelete_keys(uids_to_delete) num_deleted += len(uids_to_delete) diff --git a/libs/core/tests/unit_tests/indexing/test_indexing.py b/libs/core/tests/unit_tests/indexing/test_indexing.py index a056f32fa2f..d4bd4e1ec2d 100644 --- a/libs/core/tests/unit_tests/indexing/test_indexing.py +++ b/libs/core/tests/unit_tests/indexing/test_indexing.py @@ -16,6 +16,7 @@ from langchain_core.documents import Document from langchain_core.embeddings import DeterministicFakeEmbedding from langchain_core.indexing import InMemoryRecordManager, aindex, index from langchain_core.indexing.api import _abatch, _HashedDocument +from langchain_core.indexing.in_memory import InMemoryDocumentIndex from langchain_core.vectorstores import InMemoryVectorStore, VectorStore @@ -1284,7 +1285,10 @@ def test_indexing_custom_batch_size( index(docs, record_manager, vector_store, batch_size=batch_size) args, kwargs = mock_add_documents.call_args - assert args == (docs,) + doc_with_id = Document( + id=ids[0], page_content="This is a test document.", metadata={"source": "1"} + ) + assert args == ([doc_with_id],) assert kwargs == {"ids": ids, "batch_size": batch_size} finally: vector_store.add_documents = original # type: ignore @@ -1304,8 +1308,102 @@ async def test_aindexing_custom_batch_size( batch_size = 1 mock_add_documents = AsyncMock() + doc_with_id = Document( + id=ids[0], page_content="This is a test document.", metadata={"source": "1"} + ) vector_store.aadd_documents = mock_add_documents # type: ignore await aindex(docs, arecord_manager, vector_store, batch_size=batch_size) args, kwargs = mock_add_documents.call_args - assert args == (docs,) + assert args == ([doc_with_id],) assert kwargs == {"ids": ids, "batch_size": batch_size} + + +def test_index_into_document_index(record_manager: InMemoryRecordManager) -> None: + """Get an in memory index.""" + document_index = InMemoryDocumentIndex() + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + ] + + assert index(docs, record_manager, document_index, cleanup="full") == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + assert index(docs, record_manager, document_index, cleanup="full") == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + assert index( + docs, record_manager, document_index, cleanup="full", force_update=True + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 2, + } + + assert index([], record_manager, document_index, cleanup="full") == { + "num_added": 0, + "num_deleted": 2, + "num_skipped": 0, + "num_updated": 0, + } + + +async def test_aindex_into_document_index( + arecord_manager: InMemoryRecordManager, +) -> None: + """Get an in memory index.""" + document_index = InMemoryDocumentIndex() + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + ] + + assert await aindex(docs, arecord_manager, document_index, cleanup="full") == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + assert await aindex(docs, arecord_manager, document_index, cleanup="full") == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + assert await aindex( + docs, arecord_manager, document_index, cleanup="full", force_update=True + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 2, + } + + assert await aindex([], arecord_manager, document_index, cleanup="full") == { + "num_added": 0, + "num_deleted": 2, + "num_skipped": 0, + "num_updated": 0, + } diff --git a/libs/langchain/tests/unit_tests/indexes/test_indexing.py b/libs/langchain/tests/unit_tests/indexes/test_indexing.py index 454b1b126df..fdbf13c38b5 100644 --- a/libs/langchain/tests/unit_tests/indexes/test_indexing.py +++ b/libs/langchain/tests/unit_tests/indexes/test_indexing.py @@ -1386,7 +1386,14 @@ def test_indexing_custom_batch_size( with patch.object(vector_store, "add_documents") as mock_add_documents: index(docs, record_manager, vector_store, batch_size=batch_size) args, kwargs = mock_add_documents.call_args - assert args == (docs,) + docs_with_id = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + id=ids[0], + ) + ] + assert args == (docs_with_id,) assert kwargs == {"ids": ids, "batch_size": batch_size} @@ -1407,5 +1414,12 @@ async def test_aindexing_custom_batch_size( with patch.object(vector_store, "aadd_documents") as mock_add_documents: await aindex(docs, arecord_manager, vector_store, batch_size=batch_size) args, kwargs = mock_add_documents.call_args - assert args == (docs,) + docs_with_id = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + id=ids[0], + ) + ] + assert args == (docs_with_id,) assert kwargs == {"ids": ids, "batch_size": batch_size}