From 6f08e11d7c4a8ed4fc1f73eff40a76e3a10c3226 Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Fri, 5 Jul 2024 12:21:40 -0400 Subject: [PATCH] core[minor]: add upsert, streaming_upsert, aupsert, astreaming_upsert methods to the VectorStore abstraction (#23774) This PR rolls out part of the new proposed interface for vectorstores (https://github.com/langchain-ai/langchain/pull/23544) to existing store implementations. The PR makes the following changes: 1. Adds standard upsert, streaming_upsert, aupsert, astreaming_upsert methods to the vectorstore. 2. Updates `add_texts` and `aadd_texts` to be non required with a default implementation that delegates to `upsert` and `aupsert` if those have been implemented. The original `add_texts` and `aadd_texts` methods are problematic as they spread object specific information across document and **kwargs. (e.g., ids are not a part of the document) 3. Adds a default implementation to `add_documents` and `aadd_documents` that delegates to `upsert` and `aupsert` respectively. 4. Adds standard unit tests to verify that a given vectorstore implements a correct read/write API. A downside of this implementation is that it creates `upsert` with a very similar signature to `add_documents`. The reason for introducing `upsert` is to: * Remove any ambiguities about what information is allowed in `kwargs`. Specifically kwargs should only be used for information common to all indexed data. (e.g., indexing timeout). *Allow inheriting from an anticipated generalized interface for indexing that will allow indexing `BaseMedia` (i.e., allow making a vectorstore for images/audio etc.) `add_documents` can be deprecated in the future in favor of `upsert` to make sure that users have a single correct way of indexing content. --------- Co-authored-by: ccurme --- .../vectorstores/inmemory.py | 55 ++-- .../vectorstores/milvus.py | 2 +- .../unit_tests/vectorstores/test_inmemory.py | 19 +- libs/core/langchain_core/indexing/__init__.py | 7 +- libs/core/langchain_core/indexing/base.py | 13 + libs/core/langchain_core/utils/__init__.py | 4 + libs/core/langchain_core/utils/aiter.py | 26 ++ libs/core/langchain_core/vectorstores.py | 303 +++++++++++++++++- .../unit_tests/indexing/test_public_api.py | 1 + .../core/tests/unit_tests/utils/test_aiter.py | 31 ++ .../tests/unit_tests/utils/test_imports.py | 2 + .../tests/unit_tests/vectorstores/__init__.py | 0 .../vectorstores/test_vectorstore.py | 194 +++++++++++ .../integration_tests/vectorstores.py | 93 +++--- 14 files changed, 667 insertions(+), 83 deletions(-) create mode 100644 libs/core/tests/unit_tests/utils/test_aiter.py create mode 100644 libs/core/tests/unit_tests/vectorstores/__init__.py create mode 100644 libs/core/tests/unit_tests/vectorstores/test_vectorstore.py diff --git a/libs/community/langchain_community/vectorstores/inmemory.py b/libs/community/langchain_community/vectorstores/inmemory.py index ce3f2ddeb51..61a8aa13d24 100644 --- a/libs/community/langchain_community/vectorstores/inmemory.py +++ b/libs/community/langchain_community/vectorstores/inmemory.py @@ -6,6 +6,7 @@ from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tupl import numpy as np from langchain_core.documents import Document from langchain_core.embeddings import Embeddings +from langchain_core.indexing import UpsertResponse from langchain_core.load import dumpd, load from langchain_core.vectorstores import VectorStore @@ -37,27 +38,41 @@ class InMemoryVectorStore(VectorStore): async def adelete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None: self.delete(ids) - def add_texts( - self, - texts: Iterable[str], - metadatas: Optional[List[dict]] = None, - ids: Optional[Sequence[str]] = None, - **kwargs: Any, - ) -> List[str]: - """Add texts to the store.""" - vectors = self.embedding.embed_documents(list(texts)) - ids_ = [] - - for i, text in enumerate(texts): - doc_id = ids[i] if ids else str(uuid.uuid4()) - ids_.append(doc_id) + def upsert(self, items: Sequence[Document], /, **kwargs: Any) -> UpsertResponse: + vectors = self.embedding.embed_documents([item.page_content for item in items]) + ids = [] + for item, vector in zip(items, vectors): + doc_id = item.id if item.id else str(uuid.uuid4()) + ids.append(doc_id) self.store[doc_id] = { "id": doc_id, - "vector": vectors[i], - "text": text, - "metadata": metadatas[i] if metadatas else {}, + "vector": vector, + "text": item.page_content, + "metadata": item.metadata, } - return ids_ + return { + "succeeded": ids, + "failed": [], + } + + def get_by_ids(self, ids: Sequence[str], /) -> List[Document]: + """Get documents by their ids.""" + documents = [] + + for doc_id in ids: + doc = self.store.get(doc_id) + if doc: + documents.append( + Document( + id=doc["id"], + page_content=doc["text"], + metadata=doc["metadata"], + ) + ) + return documents + + async def aget_by_ids(self, ids: Sequence[str], /) -> List[Document]: + return self.get_by_ids(ids) async def aadd_texts( self, @@ -80,7 +95,9 @@ class InMemoryVectorStore(VectorStore): similarity = float(cosine_similarity([embedding], [vector]).item(0)) result.append( ( - Document(page_content=doc["text"], metadata=doc["metadata"]), + Document( + id=doc["id"], page_content=doc["text"], metadata=doc["metadata"] + ), similarity, vector, ) diff --git a/libs/community/langchain_community/vectorstores/milvus.py b/libs/community/langchain_community/vectorstores/milvus.py index 576a48b3213..01d5df92c82 100644 --- a/libs/community/langchain_community/vectorstores/milvus.py +++ b/libs/community/langchain_community/vectorstores/milvus.py @@ -1053,7 +1053,7 @@ class Milvus(VectorStore): pks = [item.get(self._primary_field) for item in query_result] return pks - def upsert( + def upsert( # type: ignore[override] self, ids: Optional[List[str]] = None, documents: List[Document] | None = None, diff --git a/libs/community/tests/unit_tests/vectorstores/test_inmemory.py b/libs/community/tests/unit_tests/vectorstores/test_inmemory.py index 10abe633f00..7381335103a 100644 --- a/libs/community/tests/unit_tests/vectorstores/test_inmemory.py +++ b/libs/community/tests/unit_tests/vectorstores/test_inmemory.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import Any import pytest from langchain_core.documents import Document @@ -13,6 +14,11 @@ from tests.integration_tests.vectorstores.fake_embeddings import ( ) +class AnyStr(str): + def __eq__(self, other: Any) -> bool: + return isinstance(other, str) + + class TestInMemoryReadWriteTestSuite(ReadWriteTestSuite): @pytest.fixture def vectorstore(self) -> InMemoryVectorStore: @@ -31,10 +37,13 @@ async def test_inmemory() -> None: ["foo", "bar", "baz"], ConsistentFakeEmbeddings() ) output = await store.asimilarity_search("foo", k=1) - assert output == [Document(page_content="foo")] + assert output == [Document(page_content="foo", id=AnyStr())] output = await store.asimilarity_search("bar", k=2) - assert output == [Document(page_content="bar"), Document(page_content="baz")] + assert output == [ + Document(page_content="bar", id=AnyStr()), + Document(page_content="baz", id=AnyStr()), + ] output2 = await store.asimilarity_search_with_score("bar", k=2) assert output2[0][1] > output2[1][1] @@ -61,8 +70,8 @@ async def test_inmemory_mmr() -> None: "foo", k=10, lambda_mult=0.1 ) assert len(output) == len(texts) - assert output[0] == Document(page_content="foo") - assert output[1] == Document(page_content="foy") + assert output[0] == Document(page_content="foo", id=AnyStr()) + assert output[1] == Document(page_content="foy", id=AnyStr()) async def test_inmemory_dump_load(tmp_path: Path) -> None: @@ -90,4 +99,4 @@ async def test_inmemory_filter() -> None: output = await store.asimilarity_search( "baz", filter=lambda doc: doc.metadata["id"] == 1 ) - assert output == [Document(page_content="foo", metadata={"id": 1})] + assert output == [Document(page_content="foo", metadata={"id": 1}, id=AnyStr())] diff --git a/libs/core/langchain_core/indexing/__init__.py b/libs/core/langchain_core/indexing/__init__.py index 3643b130410..305ae7b459d 100644 --- a/libs/core/langchain_core/indexing/__init__.py +++ b/libs/core/langchain_core/indexing/__init__.py @@ -6,7 +6,11 @@ if it's unchanged. """ from langchain_core.indexing.api import IndexingResult, aindex, index -from langchain_core.indexing.base import InMemoryRecordManager, RecordManager +from langchain_core.indexing.base import ( + InMemoryRecordManager, + RecordManager, + UpsertResponse, +) __all__ = [ "aindex", @@ -14,4 +18,5 @@ __all__ = [ "IndexingResult", "InMemoryRecordManager", "RecordManager", + "UpsertResponse", ] diff --git a/libs/core/langchain_core/indexing/base.py b/libs/core/langchain_core/indexing/base.py index 15912358ba0..e5037b664a3 100644 --- a/libs/core/langchain_core/indexing/base.py +++ b/libs/core/langchain_core/indexing/base.py @@ -421,3 +421,16 @@ class InMemoryRecordManager(RecordManager): keys: A list of keys to delete. """ self.delete_keys(keys) + + +class UpsertResponse(TypedDict): + """A generic response for upsert operations. + + The upsert response will be used by abstractions that implement an upsert + operation for content that can be upserted by ID. + """ + + succeeded: List[str] + """The IDs that were successfully indexed.""" + failed: List[str] + """The IDs that failed to index.""" diff --git a/libs/core/langchain_core/utils/__init__.py b/libs/core/langchain_core/utils/__init__.py index 92f919bac39..80fbf680f3e 100644 --- a/libs/core/langchain_core/utils/__init__.py +++ b/libs/core/langchain_core/utils/__init__.py @@ -5,6 +5,7 @@ These functions do not depend on any other LangChain module. """ from langchain_core.utils import image +from langchain_core.utils.aiter import abatch_iterate from langchain_core.utils.env import get_from_dict_or_env, get_from_env from langchain_core.utils.formatting import StrictFormatter, formatter from langchain_core.utils.input import ( @@ -13,6 +14,7 @@ from langchain_core.utils.input import ( get_colored_text, print_text, ) +from langchain_core.utils.iter import batch_iterate from langchain_core.utils.loading import try_load_from_hub from langchain_core.utils.strings import comma_list, stringify_dict, stringify_value from langchain_core.utils.utils import ( @@ -48,4 +50,6 @@ __all__ = [ "stringify_dict", "comma_list", "stringify_value", + "batch_iterate", + "abatch_iterate", ] diff --git a/libs/core/langchain_core/utils/aiter.py b/libs/core/langchain_core/utils/aiter.py index a75de1bae4c..eb55079e601 100644 --- a/libs/core/langchain_core/utils/aiter.py +++ b/libs/core/langchain_core/utils/aiter.py @@ -11,6 +11,7 @@ from typing import ( Any, AsyncContextManager, AsyncGenerator, + AsyncIterable, AsyncIterator, Awaitable, Callable, @@ -245,3 +246,28 @@ class aclosing(AbstractAsyncContextManager): ) -> None: if hasattr(self.thing, "aclose"): await self.thing.aclose() + + +async def abatch_iterate( + size: int, iterable: AsyncIterable[T] +) -> AsyncIterator[List[T]]: + """Utility batching function for async iterables. + + Args: + size: The size of the batch. + iterable: The async iterable to batch. + + Returns: + An async iterator over the batches + """ + batch: List[T] = [] + async for element in iterable: + if len(batch) < size: + batch.append(element) + + if len(batch) >= size: + yield batch + batch = [] + + if batch: + yield batch diff --git a/libs/core/langchain_core/vectorstores.py b/libs/core/langchain_core/vectorstores.py index 428e2981ecd..273b98de333 100644 --- a/libs/core/langchain_core/vectorstores.py +++ b/libs/core/langchain_core/vectorstores.py @@ -25,26 +25,34 @@ import logging import math import warnings from abc import ABC, abstractmethod +from itertools import cycle from typing import ( TYPE_CHECKING, Any, + AsyncIterable, + AsyncIterator, Callable, ClassVar, Collection, Dict, Iterable, + Iterator, List, Optional, Sequence, Tuple, Type, TypeVar, + Union, ) +from langchain_core._api import beta from langchain_core.embeddings import Embeddings from langchain_core.pydantic_v1 import Field, root_validator from langchain_core.retrievers import BaseRetriever from langchain_core.runnables.config import run_in_executor +from langchain_core.utils.aiter import abatch_iterate +from langchain_core.utils.iter import batch_iterate if TYPE_CHECKING: from langchain_core.callbacks.manager import ( @@ -52,6 +60,7 @@ if TYPE_CHECKING: CallbackManagerForRetrieverRun, ) from langchain_core.documents import Document + from langchain_core.indexing.base import UpsertResponse logger = logging.getLogger(__name__) @@ -61,11 +70,14 @@ VST = TypeVar("VST", bound="VectorStore") class VectorStore(ABC): """Interface for vector store.""" - @abstractmethod def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, + # One of the kwargs should be `ids` which is a list of ids + # associated with the texts. + # This is not yet enforced in the type signature for backwards compatibility + # with existing implementations. **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. @@ -74,16 +86,205 @@ class VectorStore(ABC): texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. **kwargs: vectorstore specific parameters. + One of the kwargs should be `ids` which is a list of ids + associated with the texts. Returns: List of ids from adding the texts into the vectorstore. """ + if type(self).upsert != VectorStore.upsert: + # Import document in local scope to avoid circular imports + from langchain_core.documents import Document + + # This condition is triggered if the subclass has provided + # an implementation of the upsert method. + # The existing add_texts + texts_: Sequence[str] = ( + texts if isinstance(texts, (list, tuple)) else list(texts) + ) + if metadatas and len(metadatas) != len(texts_): + raise ValueError( + "The number of metadatas must match the number of texts." + "Got {len(metadatas)} metadatas and {len(texts_)} texts." + ) + + if "ids" in kwargs: + ids = kwargs.pop("ids") + if ids and len(ids) != len(texts_): + raise ValueError( + "The number of ids must match the number of texts." + "Got {len(ids)} ids and {len(texts_)} texts." + ) + else: + ids = None + + metadatas_ = iter(metadatas) if metadatas else cycle([{}]) + ids_: Iterable[Union[str, None]] = ids if ids is not None else cycle([None]) + docs = [ + Document(page_content=text, metadata=metadata_, id=id_) + for text, metadata_, id_ in zip(texts, metadatas_, ids_) + ] + upsert_response = self.upsert(docs, **kwargs) + return upsert_response["succeeded"] + raise NotImplementedError( + f"`add_texts` has not been implemented for {self.__class__.__name__} " + ) + + # Developer guidelines: + # Do not override streaming_upsert! + @beta(message="Added in 0.2.11. The API is subject to change.") + def streaming_upsert( + self, items: Iterable[Document], /, batch_size: int, **kwargs: Any + ) -> Iterator[UpsertResponse]: + """Upsert documents in a streaming fashion. + + Args: + items: Iterable of Documents to add to the vectorstore. + batch_size: The size of each batch to upsert. + **kwargs: Additional keyword arguments. + kwargs should only include parameters that are common to all + documents. (e.g., timeout for indexing, retry policy, etc.) + kwargs should not include ids to avoid ambiguous semantics. + Instead the ID should be provided as part of the Document object. + + .. versionadded:: 0.2.11 + """ + # The default implementation of this method breaks the input into + # batches of size `batch_size` and calls the `upsert` method on each batch. + # Subclasses can override this method to provide a more efficient + # implementation. + for item_batch in batch_iterate(batch_size, items): + yield self.upsert(item_batch, **kwargs) + + # Please note that we've added a new method `upsert` instead of re-using the + # existing `add_documents` method. + # This was done to resolve potential ambiguities around the behavior of **kwargs + # in existing add_documents / add_texts methods which could include per document + # information (e.g., the `ids` parameter). + # Over time the `add_documents` could be denoted as legacy and deprecated + # in favor of the `upsert` method. + @beta(message="Added in 0.2.11. The API is subject to change.") + def upsert(self, items: Sequence[Document], /, **kwargs: Any) -> UpsertResponse: + """Add or update documents in the vectorstore. + + The upsert functionality should utilize the ID field of the Document object + if it is provided. If the ID is not provided, the upsert method is free + to generate an ID for the document. + + When an ID is specified and the document already exists in the vectorstore, + the upsert method should update the document with the new data. If the document + does not exist, the upsert method should add the document to the vectorstore. + + Args: + items: Sequence of Documents to add to the vectorstore. + **kwargs: Additional keyword arguments. + + Returns: + UpsertResponse: A response object that contains the list of IDs that were + successfully added or updated in the vectorstore and the list of IDs that + failed to be added or updated. + + .. versionadded:: 0.2.11 + """ + # Developer guidelines: + # + # Vectorstores implementations are free to extend `upsert` implementation + # to take in additional data per document. + # + # This data **SHOULD NOT** be part of the **kwargs** parameter, instead + # sub-classes can use a Union type on `documents` to include additional + # supported formats for the input data stream. + # + # For example, + # + # .. code-block:: python + # from typing import TypedDict + # + # class DocumentWithVector(TypedDict): + # document: Document + # vector: List[float] + # + # def upsert( + # self, + # documents: Union[Iterable[Document], Iterable[DocumentWithVector]], + # /, + # **kwargs + # ) -> UpsertResponse: + # \"\"\"Add or update documents in the vectorstore.\"\"\" + # # Implementation should check if documents is an + # # iterable of DocumentWithVector or Document + # pass + # + # Implementations that override upsert should include a new doc-string + # that explains the semantics of upsert and includes in code + # examples of how to insert using the alternate data formats. + + # The implementation does not delegate to the `add_texts` method or + # the `add_documents` method by default since those implementations + raise NotImplementedError( + f"upsert has not been implemented for {self.__class__.__name__}" + ) + + @beta(message="Added in 0.2.11. The API is subject to change.") + async def astreaming_upsert( + self, + items: AsyncIterable[Document], + /, + batch_size: int, + **kwargs: Any, + ) -> AsyncIterator[UpsertResponse]: + """Upsert documents in a streaming fashion. Async version of streaming_upsert. + + Args: + items: Iterable of Documents to add to the vectorstore. + batch_size: The size of each batch to upsert. + **kwargs: Additional keyword arguments. + kwargs should only include parameters that are common to all + documents. (e.g., timeout for indexing, retry policy, etc.) + kwargs should not include ids to avoid ambiguous semantics. + Instead the ID should be provided as part of the Document object. + + .. versionadded:: 0.2.11 + """ + async for batch in abatch_iterate(batch_size, items): + yield await self.aupsert(batch, **kwargs) + + @beta(message="Added in 0.2.11. The API is subject to change.") + async def aupsert( + self, items: Sequence[Document], /, **kwargs: Any + ) -> UpsertResponse: + """Add or update documents in the vectorstore. Async version of upsert. + + The upsert functionality should utilize the ID field of the Document object + if it is provided. If the ID is not provided, the upsert method is free + to generate an ID for the document. + + When an ID is specified and the document already exists in the vectorstore, + the upsert method should update the document with the new data. If the document + does not exist, the upsert method should add the document to the vectorstore. + + Args: + items: Sequence of Documents to add to the vectorstore. + **kwargs: Additional keyword arguments. + + Returns: + UpsertResponse: A response object that contains the list of IDs that were + successfully added or updated in the vectorstore and the list of IDs that + failed to be added or updated. + + .. versionadded:: 0.2.11 + """ + # Developer guidelines: See guidelines for the `upsert` method. + # The implementation does not delegate to the `add_texts` method or + # the `add_documents` method by default since those implementations + return await run_in_executor(None, self.upsert, items, **kwargs) @property def embeddings(self) -> Optional[Embeddings]: """Access the query embedding object if available.""" logger.debug( - f"{Embeddings.__name__} is not implemented for {self.__class__.__name__}" + f"The embeddings property has not been " + f"implemented for {self.__class__.__name__}" ) return None @@ -187,17 +388,81 @@ class VectorStore(ABC): Returns: List of ids from adding the texts into the vectorstore. """ + if type(self).aupsert != VectorStore.aupsert: + # Import document in local scope to avoid circular imports + from langchain_core.documents import Document + + # This condition is triggered if the subclass has provided + # an implementation of the upsert method. + # The existing add_texts + texts_: Sequence[str] = ( + texts if isinstance(texts, (list, tuple)) else list(texts) + ) + if metadatas and len(metadatas) != len(texts_): + raise ValueError( + "The number of metadatas must match the number of texts." + "Got {len(metadatas)} metadatas and {len(texts_)} texts." + ) + + if "ids" in kwargs: + ids = kwargs.pop("ids") + if ids and len(ids) != len(texts_): + raise ValueError( + "The number of ids must match the number of texts." + "Got {len(ids)} ids and {len(texts_)} texts." + ) + else: + ids = None + + metadatas_ = iter(metadatas) if metadatas else cycle([{}]) + ids_: Iterable[Union[str, None]] = ids if ids is not None else cycle([None]) + docs = [ + Document(page_content=text, metadata=metadata_, id=id_) + for text, metadata_, id_ in zip(texts, metadatas_, ids_) + ] + upsert_response = await self.aupsert(docs, **kwargs) + return upsert_response["succeeded"] return await run_in_executor(None, self.add_texts, texts, metadatas, **kwargs) def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]: - """Run more documents through the embeddings and add to the vectorstore. + """Add or update documents in the vectorstore. Args: documents: Documents to add to the vectorstore. + kwargs: Additional keyword arguments. + if kwargs contains ids and documents contain ids, + the ids in the kwargs will receive precedence. Returns: List of IDs of the added texts. """ + if type(self).upsert != VectorStore.upsert: + from langchain_core.documents import Document + + if "ids" in kwargs: + ids = kwargs.pop("ids") + if ids and len(ids) != len(documents): + raise ValueError( + "The number of ids must match the number of documents. " + "Got {len(ids)} ids and {len(documents)} documents." + ) + + documents_ = [] + + for id_, document in zip(ids, documents): + doc_with_id = Document( + page_content=document.page_content, + metadata=document.metadata, + id=id_, + ) + documents_.append(doc_with_id) + else: + documents_ = documents + + # If upsert has been implemented, we can use it to add documents + return self.upsert(documents_, **kwargs)["succeeded"] + + # Code path that delegates to add_text for backwards compatibility # TODO: Handle the case where the user doesn't provide ids on the Collection texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] @@ -214,6 +479,38 @@ class VectorStore(ABC): Returns: List of IDs of the added texts. """ + # If either upsert or aupsert has been implemented, we delegate to them! + if ( + type(self).aupsert != VectorStore.aupsert + or type(self).upsert != VectorStore.upsert + ): + # If aupsert has been implemented, we can use it to add documents + from langchain_core.documents import Document + + if "ids" in kwargs: + ids = kwargs.pop("ids") + if ids and len(ids) != len(documents): + raise ValueError( + "The number of ids must match the number of documents." + "Got {len(ids)} ids and {len(documents)} documents." + ) + + documents_ = [] + + for id_, document in zip(ids, documents): + doc_with_id = Document( + page_content=document.page_content, + metadata=document.metadata, + id=id_, + ) + documents_.append(doc_with_id) + else: + documents_ = documents + + # If upsert has been implemented, we can use it to add documents + upsert_response = await self.aupsert(documents_, **kwargs) + return upsert_response["succeeded"] + texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] return await self.aadd_texts(texts, metadatas, **kwargs) diff --git a/libs/core/tests/unit_tests/indexing/test_public_api.py b/libs/core/tests/unit_tests/indexing/test_public_api.py index 89c52cf6810..0259017a954 100644 --- a/libs/core/tests/unit_tests/indexing/test_public_api.py +++ b/libs/core/tests/unit_tests/indexing/test_public_api.py @@ -10,4 +10,5 @@ def test_all() -> None: "IndexingResult", "InMemoryRecordManager", "RecordManager", + "UpsertResponse", ] diff --git a/libs/core/tests/unit_tests/utils/test_aiter.py b/libs/core/tests/unit_tests/utils/test_aiter.py new file mode 100644 index 00000000000..3b035a89277 --- /dev/null +++ b/libs/core/tests/unit_tests/utils/test_aiter.py @@ -0,0 +1,31 @@ +from typing import AsyncIterator, List + +import pytest + +from langchain_core.utils.aiter import abatch_iterate + + +@pytest.mark.parametrize( + "input_size, input_iterable, expected_output", + [ + (2, [1, 2, 3, 4, 5], [[1, 2], [3, 4], [5]]), + (3, [10, 20, 30, 40, 50], [[10, 20, 30], [40, 50]]), + (1, [100, 200, 300], [[100], [200], [300]]), + (4, [], []), + ], +) +async def test_abatch_iterate( + input_size: int, input_iterable: List[str], expected_output: List[str] +) -> None: + """Test batching function.""" + + async def _to_async_iterable(iterable: List[str]) -> AsyncIterator[str]: + for item in iterable: + yield item + + iterator_ = abatch_iterate(input_size, _to_async_iterable(input_iterable)) + + assert isinstance(iterator_, AsyncIterator) + + output = [el async for el in iterator_] + assert output == expected_output diff --git a/libs/core/tests/unit_tests/utils/test_imports.py b/libs/core/tests/unit_tests/utils/test_imports.py index 64528cfd521..8a1d4236688 100644 --- a/libs/core/tests/unit_tests/utils/test_imports.py +++ b/libs/core/tests/unit_tests/utils/test_imports.py @@ -6,6 +6,8 @@ EXPECTED_ALL = [ "convert_to_secret_str", "formatter", "get_bolded_text", + "abatch_iterate", + "batch_iterate", "get_color_mapping", "get_colored_text", "get_pydantic_field_names", diff --git a/libs/core/tests/unit_tests/vectorstores/__init__.py b/libs/core/tests/unit_tests/vectorstores/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/libs/core/tests/unit_tests/vectorstores/test_vectorstore.py b/libs/core/tests/unit_tests/vectorstores/test_vectorstore.py new file mode 100644 index 00000000000..dc4955e70a2 --- /dev/null +++ b/libs/core/tests/unit_tests/vectorstores/test_vectorstore.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import uuid +from typing import Any, Dict, List, Optional, Sequence, Union + +from typing_extensions import TypedDict + +from langchain_core.documents import Document +from langchain_core.embeddings import Embeddings +from langchain_core.indexing.base import UpsertResponse +from langchain_core.vectorstores import VectorStore + + +def test_custom_upsert_type() -> None: + """Test that we can override the signature of the upsert method + of the VectorStore class without creating typing issues by violating + the Liskov Substitution Principle. + """ + + class ByVector(TypedDict): + document: Document + vector: List[float] + + class CustomVectorStore(VectorStore): + def upsert( + # This unit test verifies that the signature of the upsert method + # specifically the items parameter can be overridden without + # violating the Liskov Substitution Principle (and getting + # typing errors). + self, + items: Union[Sequence[Document], Sequence[ByVector]], + /, + **kwargs: Any, + ) -> UpsertResponse: + raise NotImplementedError() + + +class CustomSyncVectorStore(VectorStore): + """A vectorstore that only implements the synchronous methods.""" + + def __init__(self) -> None: + self.store: Dict[str, Document] = {} + + def upsert( + self, + items: Sequence[Document], + /, + **kwargs: Any, + ) -> UpsertResponse: + ids = [] + for item in items: + if item.id is None: + new_item = item.copy() + id_: str = str(uuid.uuid4()) + new_item.id = id_ + else: + id_ = item.id + new_item = item + + self.store[id_] = new_item + ids.append(id_) + + return { + "succeeded": ids, + "failed": [], + } + + def get_by_ids(self, ids: Sequence[str], /) -> List[Document]: + return [self.store[id] for id in ids if id in self.store] + + def from_texts( # type: ignore + cls, + texts: List[str], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> CustomSyncVectorStore: + vectorstore = CustomSyncVectorStore() + vectorstore.add_texts(texts, metadatas=metadatas, **kwargs) + return vectorstore + + def similarity_search( + self, query: str, k: int = 4, **kwargs: Any + ) -> List[Document]: + raise NotImplementedError() + + +def test_implement_upsert() -> None: + """Test that we can implement the upsert method of the CustomVectorStore + class without violating the Liskov Substitution Principle. + """ + + store = CustomSyncVectorStore() + + # Check upsert with id + assert store.upsert([Document(id="1", page_content="hello")]) == { + "succeeded": ["1"], + "failed": [], + } + + assert store.get_by_ids(["1"]) == [Document(id="1", page_content="hello")] + + # Check upsert without id + response = store.upsert([Document(page_content="world")]) + assert len(response["succeeded"]) == 1 + id_ = response["succeeded"][0] + assert id_ is not None + assert store.get_by_ids([id_]) == [Document(id=id_, page_content="world")] + + # Check that default implementation of add_texts works + assert store.add_texts(["hello", "world"], ids=["3", "4"]) == ["3", "4"] + assert store.get_by_ids(["3", "4"]) == [ + Document(id="3", page_content="hello"), + Document(id="4", page_content="world"), + ] + + # Add texts without ids + ids_ = store.add_texts(["foo", "bar"]) + assert len(ids_) == 2 + assert store.get_by_ids(ids_) == [ + Document(id=ids_[0], page_content="foo"), + Document(id=ids_[1], page_content="bar"), + ] + + # Add texts with metadatas + ids_2 = store.add_texts(["foo", "bar"], metadatas=[{"foo": "bar"}] * 2) + assert len(ids_2) == 2 + assert store.get_by_ids(ids_2) == [ + Document(id=ids_2[0], page_content="foo", metadata={"foo": "bar"}), + Document(id=ids_2[1], page_content="bar", metadata={"foo": "bar"}), + ] + + # Check that add_documents works + assert store.add_documents([Document(id="5", page_content="baz")]) == ["5"] + + # Test add documents with id specified in both document and ids + original_document = Document(id="7", page_content="baz") + assert store.add_documents([original_document], ids=["6"]) == ["6"] + assert original_document.id == "7" # original document should not be modified + assert store.get_by_ids(["6"]) == [Document(id="6", page_content="baz")] + + +async def test_aupsert_delegation_to_upsert() -> None: + """Test delegation to the synchronous upsert method in async execution + if async methods are not implemented. + """ + store = CustomSyncVectorStore() + + # Check upsert with id + assert await store.aupsert([Document(id="1", page_content="hello")]) == { + "succeeded": ["1"], + "failed": [], + } + + assert await store.aget_by_ids(["1"]) == [Document(id="1", page_content="hello")] + + # Check upsert without id + response = await store.aupsert([Document(page_content="world")]) + assert len(response["succeeded"]) == 1 + id_ = response["succeeded"][0] + assert id_ is not None + assert await store.aget_by_ids([id_]) == [Document(id=id_, page_content="world")] + + # Check that default implementation of add_texts works + assert await store.aadd_texts(["hello", "world"], ids=["3", "4"]) == ["3", "4"] + assert await store.aget_by_ids(["3", "4"]) == [ + Document(id="3", page_content="hello"), + Document(id="4", page_content="world"), + ] + + # Add texts without ids + ids_ = await store.aadd_texts(["foo", "bar"]) + assert len(ids_) == 2 + assert await store.aget_by_ids(ids_) == [ + Document(id=ids_[0], page_content="foo"), + Document(id=ids_[1], page_content="bar"), + ] + + # Add texts with metadatas + ids_2 = await store.aadd_texts(["foo", "bar"], metadatas=[{"foo": "bar"}] * 2) + assert len(ids_2) == 2 + assert await store.aget_by_ids(ids_2) == [ + Document(id=ids_2[0], page_content="foo", metadata={"foo": "bar"}), + Document(id=ids_2[1], page_content="bar", metadata={"foo": "bar"}), + ] + + # Check that add_documents works + assert await store.aadd_documents([Document(id="5", page_content="baz")]) == ["5"] + + # Test add documents with id specified in both document and ids + original_document = Document(id="7", page_content="baz") + assert await store.aadd_documents([original_document], ids=["6"]) == ["6"] + assert original_document.id == "7" # original document should not be modified + assert await store.aget_by_ids(["6"]) == [Document(id="6", page_content="baz")] diff --git a/libs/standard-tests/langchain_standard_tests/integration_tests/vectorstores.py b/libs/standard-tests/langchain_standard_tests/integration_tests/vectorstores.py index d65eb129349..81c3e828857 100644 --- a/libs/standard-tests/langchain_standard_tests/integration_tests/vectorstores.py +++ b/libs/standard-tests/langchain_standard_tests/integration_tests/vectorstores.py @@ -46,15 +46,21 @@ class ReadWriteTestSuite(ABC): def test_add_documents(self, vectorstore: VectorStore) -> None: """Test adding documents into the vectorstore.""" - documents = [ + original_documents = [ Document(page_content="foo", metadata={"id": 1}), Document(page_content="bar", metadata={"id": 2}), ] - vectorstore.add_documents(documents) + ids = vectorstore.add_documents(original_documents) documents = vectorstore.similarity_search("bar", k=2) assert documents == [ - Document(page_content="bar", metadata={"id": 2}), + Document(page_content="bar", metadata={"id": 2}, id=ids[1]), + Document(page_content="foo", metadata={"id": 1}, id=ids[0]), + ] + # Verify that the original document object does not get mutated! + # (e.g., an ID is added to the original document object) + assert original_documents == [ Document(page_content="foo", metadata={"id": 1}), + Document(page_content="bar", metadata={"id": 2}), ] def test_vectorstore_still_empty(self, vectorstore: VectorStore) -> None: @@ -71,10 +77,11 @@ class ReadWriteTestSuite(ABC): Document(page_content="foo", metadata={"id": 1}), Document(page_content="bar", metadata={"id": 2}), ] - vectorstore.add_documents(documents, ids=["1", "2"]) + ids = vectorstore.add_documents(documents, ids=["1", "2"]) + assert ids == ["1", "2"] vectorstore.delete(["1"]) documents = vectorstore.similarity_search("foo", k=1) - assert documents == [Document(page_content="bar", metadata={"id": 2})] + assert documents == [Document(page_content="bar", metadata={"id": 2}, id="2")] def test_deleting_bulk_documents(self, vectorstore: VectorStore) -> None: """Test that we can delete several documents at once.""" @@ -87,7 +94,7 @@ class ReadWriteTestSuite(ABC): vectorstore.add_documents(documents, ids=["1", "2", "3"]) vectorstore.delete(["1", "2"]) documents = vectorstore.similarity_search("foo", k=1) - assert documents == [Document(page_content="baz", metadata={"id": 3})] + assert documents == [Document(page_content="baz", metadata={"id": 3}, id="3")] def test_delete_missing_content(self, vectorstore: VectorStore) -> None: """Deleting missing content should not raise an exception.""" @@ -106,25 +113,8 @@ class ReadWriteTestSuite(ABC): vectorstore.add_documents(documents, ids=["1", "2"]) documents = vectorstore.similarity_search("bar", k=2) assert documents == [ - Document(page_content="bar", metadata={"id": 2}), - Document(page_content="foo", metadata={"id": 1}), - ] - - def test_add_documents_without_ids_gets_duplicated( - self, vectorstore: VectorStore - ) -> None: - """Adding documents without specifying IDs should duplicate content.""" - documents = [ - Document(page_content="foo", metadata={"id": 1}), - Document(page_content="bar", metadata={"id": 2}), - ] - - vectorstore.add_documents(documents) - vectorstore.add_documents(documents) - documents = vectorstore.similarity_search("bar", k=2) - assert documents == [ - Document(page_content="bar", metadata={"id": 2}), - Document(page_content="bar", metadata={"id": 2}), + Document(page_content="bar", metadata={"id": 2}, id="2"), + Document(page_content="foo", metadata={"id": 1}, id="1"), ] def test_add_documents_by_id_with_mutation(self, vectorstore: VectorStore) -> None: @@ -149,9 +139,11 @@ class ReadWriteTestSuite(ABC): documents = vectorstore.similarity_search("new foo", k=2) assert documents == [ Document( - page_content="new foo", metadata={"id": 1, "some_other_field": "foo"} + id="1", + page_content="new foo", + metadata={"id": 1, "some_other_field": "foo"}, ), - Document(page_content="bar", metadata={"id": 2}), + Document(id="2", page_content="bar", metadata={"id": 2}), ] @@ -190,15 +182,22 @@ class AsyncReadWriteTestSuite(ABC): async def test_add_documents(self, vectorstore: VectorStore) -> None: """Test adding documents into the vectorstore.""" - documents = [ + original_documents = [ Document(page_content="foo", metadata={"id": 1}), Document(page_content="bar", metadata={"id": 2}), ] - await vectorstore.aadd_documents(documents) + ids = await vectorstore.aadd_documents(original_documents) documents = await vectorstore.asimilarity_search("bar", k=2) assert documents == [ - Document(page_content="bar", metadata={"id": 2}), + Document(page_content="bar", metadata={"id": 2}, id=ids[1]), + Document(page_content="foo", metadata={"id": 1}, id=ids[0]), + ] + + # Verify that the original document object does not get mutated! + # (e.g., an ID is added to the original document object) + assert original_documents == [ Document(page_content="foo", metadata={"id": 1}), + Document(page_content="bar", metadata={"id": 2}), ] async def test_vectorstore_still_empty(self, vectorstore: VectorStore) -> None: @@ -215,10 +214,11 @@ class AsyncReadWriteTestSuite(ABC): Document(page_content="foo", metadata={"id": 1}), Document(page_content="bar", metadata={"id": 2}), ] - await vectorstore.aadd_documents(documents, ids=["1", "2"]) + ids = await vectorstore.aadd_documents(documents, ids=["1", "2"]) + assert ids == ["1", "2"] await vectorstore.adelete(["1"]) documents = await vectorstore.asimilarity_search("foo", k=1) - assert documents == [Document(page_content="bar", metadata={"id": 2})] + assert documents == [Document(page_content="bar", metadata={"id": 2}, id="2")] async def test_deleting_bulk_documents(self, vectorstore: VectorStore) -> None: """Test that we can delete several documents at once.""" @@ -231,7 +231,7 @@ class AsyncReadWriteTestSuite(ABC): await vectorstore.aadd_documents(documents, ids=["1", "2", "3"]) await vectorstore.adelete(["1", "2"]) documents = await vectorstore.asimilarity_search("foo", k=1) - assert documents == [Document(page_content="baz", metadata={"id": 3})] + assert documents == [Document(page_content="baz", metadata={"id": 3}, id="3")] async def test_delete_missing_content(self, vectorstore: VectorStore) -> None: """Deleting missing content should not raise an exception.""" @@ -250,25 +250,8 @@ class AsyncReadWriteTestSuite(ABC): await vectorstore.aadd_documents(documents, ids=["1", "2"]) documents = await vectorstore.asimilarity_search("bar", k=2) assert documents == [ - Document(page_content="bar", metadata={"id": 2}), - Document(page_content="foo", metadata={"id": 1}), - ] - - async def test_add_documents_without_ids_gets_duplicated( - self, vectorstore: VectorStore - ) -> None: - """Adding documents without specifying IDs should duplicate content.""" - documents = [ - Document(page_content="foo", metadata={"id": 1}), - Document(page_content="bar", metadata={"id": 2}), - ] - - await vectorstore.aadd_documents(documents) - await vectorstore.aadd_documents(documents) - documents = await vectorstore.asimilarity_search("bar", k=2) - assert documents == [ - Document(page_content="bar", metadata={"id": 2}), - Document(page_content="bar", metadata={"id": 2}), + Document(page_content="bar", metadata={"id": 2}, id="2"), + Document(page_content="foo", metadata={"id": 1}, id="1"), ] async def test_add_documents_by_id_with_mutation( @@ -295,7 +278,9 @@ class AsyncReadWriteTestSuite(ABC): documents = await vectorstore.asimilarity_search("new foo", k=2) assert documents == [ Document( - page_content="new foo", metadata={"id": 1, "some_other_field": "foo"} + id="1", + page_content="new foo", + metadata={"id": 1, "some_other_field": "foo"}, ), - Document(page_content="bar", metadata={"id": 2}), + Document(id="2", page_content="bar", metadata={"id": 2}), ]