diff --git a/libs/core/langchain_core/indexing/__init__.py b/libs/core/langchain_core/indexing/__init__.py index 58e492e083c..11933effd0b 100644 --- a/libs/core/langchain_core/indexing/__init__.py +++ b/libs/core/langchain_core/indexing/__init__.py @@ -7,6 +7,7 @@ if it's unchanged. from langchain_core.indexing.api import IndexingResult, aindex, index from langchain_core.indexing.base import ( + AsyncDocumentIndexer, DocumentIndexer, InMemoryRecordManager, RecordManager, @@ -16,6 +17,7 @@ from langchain_core.indexing.base import ( __all__ = [ "aindex", "DocumentIndexer", + "AsyncDocumentIndexer", "index", "IndexingResult", "InMemoryRecordManager", diff --git a/libs/core/langchain_core/indexing/base.py b/libs/core/langchain_core/indexing/base.py index 70c10e10209..744685b40da 100644 --- a/libs/core/langchain_core/indexing/base.py +++ b/libs/core/langchain_core/indexing/base.py @@ -122,7 +122,73 @@ class DocumentIndexer(abc.ABC): .. versionadded:: ___version___ """ - async def aupsert( + @abc.abstractmethod + def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse: + """Delete by IDs or other criteria. + + Calling delete without any input parameters should raise a ValueError! + + Args: + ids: List of ids to delete. + kwargs: Additional keyword arguments. This is up to the implementation. + For example, can include an option to delete the entire index, + or else issue a non blocking delete etc. + + Returns: + DeleteResponse: A response object that contains the list of IDs that were + successfully deleted and the list of IDs that failed to be deleted. + """ + + @abc.abstractmethod + def get( + self, + ids: Sequence[str], + /, + **kwargs: Any, + ) -> List[Document]: + """Get documents by id. + + Fewer documents may be returned than requested if some IDs are not found or + if there are duplicated IDs. + + Users should not assume that the order of the returned documents matches + the order of the input IDs. Instead, users should rely on the ID field of the + returned documents. + + This method should **NOT** raise exceptions if no documents are found for + some IDs. + + Args: + ids: List of IDs to get. + kwargs: Additional keyword arguments. These are up to the implementation. + + Returns: + List[Document]: List of documents that were found. + + .. versionadded:: ___version___ + """ + + +@beta(message="Added in ___version___. The API is subject to change.") +class AsyncDocumentIndexer(abc.ABC): + """An abstraction for indexing documents. Async Variant. + + This indexing interface is designed to be a generic abstraction for storing and + querying documents that has an ID and metadata associated with it. + + The interface is designed to be agnostic to the underlying implementation of the + indexing system. + + The interface is designed to support the following operations: + + 1. Storing content in the index. + 2. Retrieving content by ID. + + .. versionadded:: ___version___ + """ + + @abc.abstractmethod + async def upsert( self, items: Sequence[Document], /, **kwargs: Any ) -> UpsertResponse: """Add or update documents in the vectorstore. Async version of upsert. @@ -154,23 +220,7 @@ class DocumentIndexer(abc.ABC): ) @abc.abstractmethod - def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse: - """Delete by IDs or other criteria. - - Calling delete without any input parameters should raise a ValueError! - - Args: - ids: List of ids to delete. - kwargs: Additional keyword arguments. This is up to the implementation. - For example, can include an option to delete the entire index, - or else issue a non blocking delete etc. - - Returns: - DeleteResponse: A response object that contains the list of IDs that were - successfully deleted and the list of IDs that failed to be deleted. - """ - - async def adelete( + async def delete( self, ids: Optional[List[str]] = None, **kwargs: Any ) -> DeleteResponse: """Delete by IDs or other criteria. Async variant. @@ -186,15 +236,9 @@ class DocumentIndexer(abc.ABC): DeleteResponse: A response object that contains the list of IDs that were successfully deleted and the list of IDs that failed to be deleted. """ - return await run_in_executor( - None, - self.delete, - ids, - **kwargs, - ) @abc.abstractmethod - def get( + async def get( self, ids: Sequence[str], /, @@ -222,40 +266,6 @@ class DocumentIndexer(abc.ABC): .. versionadded:: ___version___ """ - async def aget( - self, - ids: Sequence[str], - /, - **kwargs: Any, - ) -> List[Document]: - """Get documents by id. - - Fewer documents may be returned than requested if some IDs are not found or - if there are duplicated IDs. - - Users should not assume that the order of the returned documents matches - the order of the input IDs. Instead, users should rely on the ID field of the - returned documents. - - This method should **NOT** raise exceptions if no documents are found for - some IDs. - - Args: - ids: List of IDs to get. - kwargs: Additional keyword arguments. These are up to the implementation. - - Returns: - List[Document]: List of documents that were found. - - .. versionadded:: ___version___ - """ - return await run_in_executor( - None, - self.get, - ids, - **kwargs, - ) - class RecordManager(ABC): """Abstract base class representing the interface for a record manager. diff --git a/libs/core/langchain_core/indexing/in_memory.py b/libs/core/langchain_core/indexing/in_memory.py index 757535948e0..15a2008f70b 100644 --- a/libs/core/langchain_core/indexing/in_memory.py +++ b/libs/core/langchain_core/indexing/in_memory.py @@ -6,7 +6,7 @@ from langchain_core.indexing import UpsertResponse from langchain_core.indexing.base import DeleteResponse, DocumentIndexer -class InMemoryIndexer(DocumentIndexer): +class InMemoryDocumentIndexer(DocumentIndexer): """In memory sync indexer.""" def __init__(self, *, store: Optional[Dict[str, Document]] = None) -> None: @@ -55,3 +55,27 @@ class InMemoryIndexer(DocumentIndexer): found_documents.append(self.store[id_]) return found_documents + + +class AsyncInMemoryDocumentIndexer(DocumentIndexer): + """An in memory async indexer implementation.""" + + def __init__(self, *, store: Optional[Dict[str, Document]] = None) -> None: + """An in memory implementation of a document indexer.""" + self.indexer = InMemoryDocumentIndexer(store=store) + + async def upsert( + self, items: Sequence[Document], /, **kwargs: Any + ) -> UpsertResponse: + """Upsert items into the indexer.""" + return self.indexer.upsert(items, **kwargs) + + async def delete( + self, ids: Optional[List[str]] = None, **kwargs: Any + ) -> DeleteResponse: + """Delete by ID.""" + return self.indexer.delete(ids, **kwargs) + + async def get(self, ids: Sequence[str], /, **kwargs: Any) -> List[Document]: + """Get by ids.""" + return self.indexer.get(ids, **kwargs) diff --git a/libs/core/tests/unit_tests/indexing/test_in_memory_indexer.py b/libs/core/tests/unit_tests/indexing/test_in_memory_indexer.py index 8068f108207..4f6ceadb162 100644 --- a/libs/core/tests/unit_tests/indexing/test_in_memory_indexer.py +++ b/libs/core/tests/unit_tests/indexing/test_in_memory_indexer.py @@ -4,14 +4,24 @@ from typing import Generator import pytest from langchain_standard_tests.integration_tests.indexer import ( - BaseDocumentIndexerTestSuite, + AsyncDocumentIndexerTestSuite, + DocumentIndexerTestSuite, ) -from langchain_core.indexing import DocumentIndexer -from langchain_core.indexing.in_memory import InMemoryIndexer +from langchain_core.indexing import AsyncDocumentIndexer, DocumentIndexer +from langchain_core.indexing.in_memory import ( + AsyncInMemoryDocumentIndexer, + InMemoryDocumentIndexer, +) -class TestDocumentIndexerTestSuite(BaseDocumentIndexerTestSuite): +class TestDocumentIndexerTestSuite(DocumentIndexerTestSuite): @pytest.fixture() def indexer(self) -> Generator[DocumentIndexer, None, None]: - return InMemoryIndexer() + yield InMemoryDocumentIndexer() + + +class TestAsyncDocumentIndexerTestSuite(AsyncDocumentIndexerTestSuite): + @pytest.fixture() + async def indexer(self) -> Generator[AsyncDocumentIndexer, None, None]: + yield AsyncInMemoryDocumentIndexer() diff --git a/libs/standard-tests/Makefile b/libs/standard-tests/Makefile index f55907f9544..070e83e1ead 100644 --- a/libs/standard-tests/Makefile +++ b/libs/standard-tests/Makefile @@ -30,14 +30,14 @@ lint_tests: PYTHON_FILES=tests lint_tests: MYPY_CACHE=.mypy_cache_test lint lint_diff lint_package lint_tests: - poetry run ruff . + poetry run ruff check . poetry run ruff format $(PYTHON_FILES) --diff - poetry run ruff --select I $(PYTHON_FILES) + poetry run ruff check --select I $(PYTHON_FILES) mkdir $(MYPY_CACHE); poetry run mypy $(PYTHON_FILES) --cache-dir $(MYPY_CACHE) format format_diff: poetry run ruff format $(PYTHON_FILES) - poetry run ruff --select I --fix $(PYTHON_FILES) + poetry run ruff check --select I --fix $(PYTHON_FILES) spell_check: poetry run codespell --toml pyproject.toml diff --git a/libs/standard-tests/langchain_standard_tests/integration_tests/indexer.py b/libs/standard-tests/langchain_standard_tests/integration_tests/indexer.py index 275b3315a82..7fed3f69c6b 100644 --- a/libs/standard-tests/langchain_standard_tests/integration_tests/indexer.py +++ b/libs/standard-tests/langchain_standard_tests/integration_tests/indexer.py @@ -1,19 +1,16 @@ """Test suite to check indexer implementations.""" + import inspect import uuid from abc import ABC, abstractmethod -from typing import Generator +from typing import AsyncGenerator, Generator import pytest from langchain_core.documents import Document from langchain_core.indexing import DocumentIndexer -# Arbitrarily chosen. Using a small embedding size -# so tests are faster and easier to debug. -EMBEDDING_SIZE = 6 - -class BaseDocumentIndexerTestSuite(ABC): +class DocumentIndexerTestSuite(ABC): """Test suite for checking the read-write of a document indexer. Implementers should subclass this test suite and provide a fixture @@ -172,6 +169,8 @@ class BaseDocumentIndexerTestSuite(ABC): def test_delete_no_args(self, indexer: DocumentIndexer) -> None: """Test delete with no args raises ValueError.""" + with pytest.raises(ValueError): + indexer.delete() def test_delete_missing_content(self, indexer: DocumentIndexer) -> None: """Deleting missing content should not raise an exception.""" @@ -201,3 +200,195 @@ class BaseDocumentIndexerTestSuite(ABC): # This should not raise an exception documents = indexer.get(["1", "2", "3"]) assert documents == [] + + +class AsyncDocumentIndexerTestSuite(ABC): + """Test suite for checking the read-write of a document indexer. + + Implementers should subclass this test suite and provide a fixture + that returns an empty indexer for each test. + """ + + @abstractmethod + @pytest.fixture + async def indexer(self) -> AsyncGenerator[DocumentIndexer, None]: + """Get the indexer.""" + + async def test_upsert_documents_has_no_ids(self, indexer: DocumentIndexer) -> None: + """Verify that there is not parameter called ids in upsert""" + signature = inspect.signature(indexer.upsert) + assert "ids" not in signature.parameters + + async def test_upsert_no_ids(self, indexer: DocumentIndexer) -> None: + """Upsert works with documents that do not have IDs. + + At the moment, the ID field in documents is optional. + """ + documents = [ + Document(page_content="foo", metadata={"id": 1}), + Document(page_content="bar", metadata={"id": 2}), + ] + response = await indexer.upsert(documents) + ids = sorted(response["succeeded"]) + + # Ordering is not guaranteed, need to test carefully + documents = await indexer.get(ids) + sorted_documents = sorted(documents, key=lambda x: x.id) + + if sorted_documents[0].page_content == "bar": + assert sorted_documents[0] == Document( + page_content="bar", metadata={"id": 2}, id=ids[0] + ) + assert sorted_documents[1] == Document( + page_content="foo", metadata={"id": 1}, id=ids[1] + ) + else: + assert sorted_documents[0] == Document( + page_content="foo", metadata={"id": 1}, id=ids[0] + ) + assert sorted_documents[1] == Document( + page_content="bar", metadata={"id": 2}, id=ids[1] + ) + + async def test_upsert_some_ids(self, indexer: DocumentIndexer) -> None: + """Test an upsert where some docs have ids and some dont.""" + foo_uuid = str(uuid.UUID(int=7)) + documents = [ + Document(id=foo_uuid, page_content="foo", metadata={"id": 1}), + Document(page_content="bar", metadata={"id": 2}), + ] + response = await indexer.upsert(documents) + ids = response["succeeded"] + other_id = list(set(ids) - {foo_uuid})[0] + assert response["failed"] == [] + assert foo_uuid in ids + # Ordering is not guaranteed, so we use a set. + documents = await indexer.get(ids) + first_doc = documents[0] + if first_doc.id == foo_uuid: + assert documents == [ + Document(page_content="foo", metadata={"id": 1}, id=foo_uuid), + Document(page_content="bar", metadata={"id": 2}, id=other_id), + ] + else: + assert documents == [ + Document(page_content="bar", metadata={"id": 2}, id=other_id), + Document(page_content="foo", metadata={"id": 1}, id=foo_uuid), + ] + + async def test_upsert_overwrites(self, indexer: DocumentIndexer) -> None: + """Test that upsert overwrites existing content.""" + foo_uuid = str(uuid.UUID(int=7)) + documents = [ + Document(id=foo_uuid, page_content="foo", metadata={"bar": 1}), + ] + response = await indexer.upsert(documents) + ids = response["succeeded"] + assert response["failed"] == [] + + assert await indexer.get(ids) == [ + Document(page_content="foo", metadata={"bar": 1}, id=foo_uuid), + ] + + # Now let's overwrite foo + await indexer.upsert( + [Document(id=foo_uuid, page_content="foo2", metadata={"meow": 2})] + ) + documents = await indexer.get([foo_uuid]) + assert documents == [ + Document(page_content="foo2", metadata={"meow": 2}, id=foo_uuid) + ] + + async def test_delete_missing_docs(self, indexer: DocumentIndexer) -> None: + """Verify that we can delete docs that aren't there.""" + assert await indexer.get(["1"]) == [] # Should be empty. + + delete_response = await indexer.delete(["1"]) + if "num_deleted" in delete_response: + assert delete_response["num_deleted"] == 0 + + if "num_failed" in delete_response: + # Deleting a missing an ID is **not** failure!! + assert delete_response["num_failed"] == 0 + + if "succeeded" in delete_response: + # There was nothing to delete! + assert delete_response["succeeded"] == [] + + if "failed" in delete_response: + # Nothing should have failed + assert delete_response["failed"] == [] + + async def test_delete_semantics(self, indexer: DocumentIndexer) -> None: + """Test deletion of content has appropriate semantics.""" + # Let's index a document first. + foo_uuid = str(uuid.UUID(int=7)) + upsert_response = await indexer.upsert( + [Document(id=foo_uuid, page_content="foo", metadata={})] + ) + assert upsert_response == {"succeeded": [foo_uuid], "failed": []} + + delete_response = await indexer.delete(["missing_id", foo_uuid]) + + if "num_deleted" in delete_response: + assert delete_response["num_deleted"] == 1 + + if "num_failed" in delete_response: + # Deleting a missing an ID is **not** failure!! + assert delete_response["num_failed"] == 0 + + if "succeeded" in delete_response: + # There was nothing to delete! + assert delete_response["succeeded"] == [foo_uuid] + + if "failed" in delete_response: + # Nothing should have failed + assert delete_response["failed"] == [] + + async def test_bulk_delete(self, indexer: DocumentIndexer) -> None: + """Test that we can delete several documents at once.""" + documents = [ + Document(id="1", page_content="foo", metadata={"id": 1}), + Document(id="2", page_content="bar", metadata={"id": 2}), + Document(id="3", page_content="baz", metadata={"id": 3}), + ] + + await indexer.upsert(documents) + await indexer.delete(["1", "2"]) + assert await indexer.get(["1", "2", "3"]) == [ + Document(page_content="baz", metadata={"id": 3}, id="3") + ] + + async def test_delete_no_args(self, indexer: DocumentIndexer) -> None: + """Test delete with no args raises ValueError.""" + with pytest.raises(ValueError): + await indexer.delete() + + async def test_delete_missing_content(self, indexer: DocumentIndexer) -> None: + """Deleting missing content should not raise an exception.""" + await indexer.delete(["1"]) + await indexer.delete(["1", "2", "3"]) + + async def test_get_with_missing_ids(self, indexer: DocumentIndexer) -> None: + """Test get with missing IDs.""" + documents = [ + Document(id="1", page_content="foo", metadata={"id": 1}), + Document(id="2", page_content="bar", metadata={"id": 2}), + ] + upsert_response = await indexer.upsert(documents) + assert upsert_response == { + "succeeded": ["1", "2"], + "failed": [], + } + retrieved_documents = await indexer.get(["1", "2", "3", "4"]) + # The ordering is not guaranteed, so we use a set. + assert sorted(retrieved_documents, key=lambda x: x.id) == [ + Document(page_content="foo", metadata={"id": 1}, id="1"), + Document(page_content="bar", metadata={"id": 2}, id="2"), + ] + + async def test_get_missing(self, indexer: DocumentIndexer) -> None: + """Test get by IDs with missing IDs.""" + # This should not raise an exception + documents = await indexer.get(["1", "2", "3"]) + assert documents == []