Compare commits

...

13 Commits

Author SHA1 Message Date
Eugene Yurtsev
b6086311fe Merge branch 'eugene/add_indexer_to_retriever' of github.com:langchain-ai/langchain into eugene/add_indexer_to_retriever 2024-07-26 17:46:26 -04:00
Eugene Yurtsev
77e7f67bb6 qxqx 2024-07-26 17:46:09 -04:00
Eugene Yurtsev
35166baddd Update libs/core/tests/unit_tests/federated_rerievers/test_version_2.py 2024-07-26 17:30:04 -04:00
Eugene Yurtsev
72b96b8096 test implementations 2024-07-26 17:25:56 -04:00
Eugene Yurtsev
d5c5968a39 qxqx 2024-07-23 14:43:17 -04:00
Eugene Yurtsev
e250f63977 x 2024-07-23 13:30:53 -04:00
Eugene Yurtsev
81db15ffa0 x 2024-07-18 11:01:36 -04:00
Eugene Yurtsev
fed2495a31 x 2024-07-17 17:24:43 -04:00
Eugene Yurtsev
f85287c5e7 x 2024-07-17 17:19:01 -04:00
Eugene Yurtsev
10708e856e update 2024-07-17 17:18:09 -04:00
Eugene Yurtsev
1a41d27b1d qxqx 2024-07-17 17:05:29 -04:00
Eugene Yurtsev
ab72ad9e36 update 2024-07-17 17:05:09 -04:00
Eugene Yurtsev
428b2409c7 x 2024-07-17 15:30:10 -04:00
12 changed files with 1110 additions and 34 deletions

View File

@@ -7,6 +7,8 @@ if it's unchanged.
from langchain_core.indexing.api import IndexingResult, aindex, index
from langchain_core.indexing.base import (
AsyncDocumentIndexer,
DocumentIndexer,
InMemoryRecordManager,
RecordManager,
UpsertResponse,
@@ -14,6 +16,8 @@ from langchain_core.indexing.base import (
__all__ = [
"aindex",
"AsyncDocumentIndexer",
"DocumentIndexer",
"index",
"IndexingResult",
"InMemoryRecordManager",

View File

@@ -7,6 +7,7 @@ import json
import uuid
from itertools import islice
from typing import (
TYPE_CHECKING,
Any,
AsyncIterable,
AsyncIterator,
@@ -29,7 +30,9 @@ from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.indexing.base import RecordManager
from langchain_core.pydantic_v1 import root_validator
from langchain_core.vectorstores import VectorStore
if TYPE_CHECKING:
from langchain_core.vectorstores import VectorStore
# Magic UUID to use as a namespace for hashing.
# Used to try and generate a unique UUID for each document
@@ -265,6 +268,9 @@ def index(
"delete" and "add_documents" required methods.
ValueError: If source_id_key is not None, but is not a string or callable.
"""
# Local scope to avoid circular imports
from langchain_core.vectorstores import VectorStore
if cleanup not in {"incremental", "full", None}:
raise ValueError(
f"cleanup should be one of 'incremental', 'full' or None. "
@@ -478,6 +484,8 @@ async def aindex(
"adelete" and "aadd_documents" required methods.
ValueError: If source_id_key is not None, but is not a string or callable.
"""
# Local scope to avoid circular imports
from langchain_core.vectorstores import VectorStore
if cleanup not in {"incremental", "full", None}:
raise ValueError(

View File

@@ -1,8 +1,398 @@
from __future__ import annotations
import time
import abc
import inspect
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Sequence, TypedDict
from typing import (
Any,
Dict,
Generic,
List,
Optional,
Sequence,
TypeVar,
Union,
)
import time
from typing_extensions import TypedDict
from langchain_core._api import beta
from langchain_core.callbacks import (
CallbackManagerForRetrieverRun,
)
from langchain_core.documents.base import BaseMedia, Document
from langchain_core.retrievers import BaseRetriever
class UpsertResponse(TypedDict):
"""A generic response for upsert operations.
The upsert response will be used by abstractions that implement an upsert
operation for content that can be upserted by ID.
Upsert APIs that accept inputs with IDs and generate IDs internally
will return a response that includes the IDs that succeeded and the IDs
that failed.
If there are no failures, the failed list will be empty, and the order
of the IDs in the succeeded list will match the order of the input documents.
If there are failures, the response becomes ill defined, and a user of the API
cannot determine which generated ID corresponds to which input document.
It is recommended for users explicitly attach the IDs to the items being
indexed to avoid this issue.
"""
succeeded: List[str]
"""The IDs that were successfully indexed."""
failed: List[str]
"""The IDs that failed to index."""
class DeleteResponse(TypedDict, total=False):
"""A generic response for delete operation.
The fields in this response are optional and whether the vectorstore
returns them or not is up to the implementation.
"""
num_deleted: int
"""The number of items that were successfully deleted.
If returned, this should only include *actual* deletions.
If the ID did not exist to begin with,
it should not be included in this count.
"""
succeeded: Sequence[str]
"""The IDs that were successfully deleted.
If returned, this should only include *actual* deletions.
If the ID did not exist to begin with,
it should not be included in this list.
"""
failed: Sequence[str]
"""The IDs that failed to be deleted.
Please note that deleting an ID that
does not exist is **NOT** considered a failure.
"""
num_failed: int
"""The number of items that failed to be deleted."""
Hit = TypeVar("Hit")
Query = TypeVar("Query")
Content = TypeVar("Content", bound=BaseMedia)
class QueryResponse(Generic[Hit], TypedDict):
"""A retrieval result."""
# Free form metadata for vectorstore providers
metadata: Dict[str, Any]
hits: List[Hit]
@beta(message="Added in ___version___. The API is subject to change.")
class Index(abc.ABC, Generic[Content, Query, Hit]):
"""An abstraction for indexing documents.
This indexing interface is designed to be a generic abstraction for storing and
querying documents that has an ID and metadata associated with it.
The interface is designed to be agnostic to the underlying implementation of the
indexing system.
The interface is designed to support the following operations:
1. Storing content in the index.
2. Retrieving content by ID.
.. versionadded:: ___version___
"""
@abc.abstractmethod
def upsert(self, items: Sequence[Content], /, **kwargs: Any) -> UpsertResponse:
"""Upsert documents into the index.
The upsert functionality should utilize the ID field of the content object
if it is provided. If the ID is not provided, the upsert method is free
to generate an ID for the content.
When an ID is specified and the content already exists in the vectorstore,
the upsert method should update the content with the new data. If the content
does not exist, the upsert method should add the item to the vectorstore.
Args:
items: Sequence of documents to add to the vectorstore.
**kwargs: Additional keyword arguments.
Returns:
UpsertResponse: A response object that contains the list of IDs that were
successfully added or updated in the vectorstore and the list of IDs that
failed to be added or updated.
.. versionadded:: ___version___
"""
@abc.abstractmethod
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse:
"""Delete by IDs or other criteria.
Calling delete without any input parameters should raise a ValueError!
Args:
ids: List of ids to delete.
kwargs: Additional keyword arguments. This is up to the implementation.
For example, can include an option to delete the entire index,
or else issue a non-blocking delete etc.
Returns:
DeleteResponse: A response object that contains the list of IDs that were
successfully deleted and the list of IDs that failed to be deleted.
"""
@abc.abstractmethod
def get(
self,
ids: Sequence[str],
/,
**kwargs: Any,
) -> List[Content]:
"""Get documents by id.
Fewer documents may be returned than requested if some IDs are not found or
if there are duplicated IDs.
Users should not assume that the order of the returned documents matches
the order of the input IDs. Instead, users should rely on the ID field of the
returned documents.
This method should **NOT** raise exceptions if no documents are found for
some IDs.
Args:
ids: List of IDs to get.
kwargs: Additional keyword arguments. These are up to the implementation.
Returns:
List[Document]: List of documents that were found.
.. versionadded:: ___version___
"""
@abc.abstractmethod
def search(self, query: Query, **kwargs: Any) -> QueryResponse[Hit]:
"""Search for documents."""
raise NotImplementedError()
@beta(message="Added in ___version___. The API is subject to change.")
class OneShotIndex(abc.ABC, Generic[Content, Query, Hit]):
@abc.abstractmethod
def from_items(
cls, items: Sequence[Content], /, **kwargs: Any
) -> OneShotIndex[Content]:
"""Create an index from a sequence of items."""
raise NotImplementedError()
@abc.abstractmethod
def get(
self,
ids: Sequence[str],
/,
**kwargs: Any,
) -> List[Content]:
"""Get documents by id.
Fewer documents may be returned than requested if some IDs are not found or
if there are duplicated IDs.
Users should not assume that the order of the returned documents matches
the order of the input IDs. Instead, users should rely on the ID field of the
returned documents.
This method should **NOT** raise exceptions if no documents are found for
some IDs.
Args:
ids: List of IDs to get.
kwargs: Additional keyword arguments. These are up to the implementation.
Returns:
List[Document]: List of documents that were found.
.. versionadded:: ___version___
"""
@abc.abstractmethod
def search(self, query: Query, **kwargs: Any) -> QueryResponse[Hit]:
"""Search for documents."""
raise NotImplementedError()
# This targets existing retrievers that can be used as indexes
# These retrievers only support `str` as an input for a query.
class DocumentIndex(Index[Document, str, Document], BaseRetriever):
"""A searchable document index."""
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
"""Get relevant documents for a query."""
import inspect
# check if run_manager is in the signature of query if so pass it
accepts_run_manager = "run_manager" in inspect.signature(self.query).parameters
if accepts_run_manager:
return self.query(
query,
run_manager=run_manager,
)["hits"]
else:
return self.query(query)
class OneShotDocumentIndex(OneShotIndex[Document, str, Document], BaseRetriever):
"""A searchable document index."""
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
"""Get relevant documents for a query."""
# check if run_manager is in the signature of query if so pass it
accepts_run_manager = "run_manager" in inspect.signature(self.query).parameters
if accepts_run_manager:
return self.query(
query,
run_manager=run_manager,
)["hits"]
else:
return self.query(query)["hits"]
class VectorStoreQuery(TypedDict):
"""A generic query type for vectorstores."""
query: Union[
str, List[float]
] # Search by vector or text (can swap into base media)
filter: Dict[str, Any]
limit: int
include_vector: bool
include_score: bool
class VectorHit(Document): # Or type-dict and include document as source etc.
"""A hit in a vectorstore."""
score: Optional[float]
vector: Optional[List[float]]
class GenericVectorStore(
Index[BaseMedia, VectorStoreQuery, VectorHit]
): # Would require generalizing the BaseRetriever
"""An interface for a generic vectorstore."""
@beta(message="Added in ___version___. The API is subject to change.")
class AsyncDocumentIndexer(abc.ABC):
"""An abstraction for indexing documents. Async Variant.
This indexing interface is designed to be a generic abstraction for storing and
querying documents that has an ID and metadata associated with it.
The interface is designed to be agnostic to the underlying implementation of the
indexing system.
The interface is designed to support the following operations:
1. Storing content in the index.
2. Retrieving content by ID.
.. versionadded:: ___version___
"""
@abc.abstractmethod
async def upsert(
self, items: Sequence[Document], /, **kwargs: Any
) -> UpsertResponse:
"""Add or update documents in the vectorstore. Async version of upsert.
The upsert functionality should utilize the ID field of the item
if it is provided. If the ID is not provided, the upsert method is free
to generate an ID for the item.
When an ID is specified and the item already exists in the vectorstore,
the upsert method should update the item with the new data. If the item
does not exist, the upsert method should add the item to the vectorstore.
Args:
items: Sequence of documents to add to the vectorstore.
**kwargs: Additional keyword arguments.
Returns:
UpsertResponse: A response object that contains the list of IDs that were
successfully added or updated in the vectorstore and the list of IDs that
failed to be added or updated.
.. versionadded:: ___version___
"""
@abc.abstractmethod
async def delete(
self, ids: Optional[List[str]] = None, **kwargs: Any
) -> DeleteResponse:
"""Delete by IDs or other criteria. Async variant.
Calling adelete without any input parameters should raise a ValueError!
Args:
ids: List of ids to delete.
kwargs: Additional keyword arguments. This is up to the implementation.
For example, can include an option to delete the entire index.
Returns:
DeleteResponse: A response object that contains the list of IDs that were
successfully deleted and the list of IDs that failed to be deleted.
"""
@abc.abstractmethod
async def get(
self,
ids: Sequence[str],
/,
**kwargs: Any,
) -> List[Document]:
"""Get documents by id.
Fewer documents may be returned than requested if some IDs are not found or
if there are duplicated IDs.
Users should not assume that the order of the returned documents matches
the order of the input IDs. Instead, users should rely on the ID field of the
returned documents.
This method should **NOT** raise exceptions if no documents are found for
some IDs.
Args:
ids: List of IDs to get.
kwargs: Additional keyword arguments. These are up to the implementation.
Returns:
List[Document]: List of documents that were found.
.. versionadded:: ___version___
"""
class RecordManager(ABC):
@@ -421,29 +811,3 @@ class InMemoryRecordManager(RecordManager):
keys: A list of keys to delete.
"""
self.delete_keys(keys)
class UpsertResponse(TypedDict):
"""A generic response for upsert operations.
The upsert response will be used by abstractions that implement an upsert
operation for content that can be upserted by ID.
Upsert APIs that accept inputs with IDs and generate IDs internally
will return a response that includes the IDs that succeeded and the IDs
that failed.
If there are no failures, the failed list will be empty, and the order
of the IDs in the succeeded list will match the order of the input documents.
If there are failures, the response becomes ill defined, and a user of the API
cannot determine which generated ID corresponds to which input document.
It is recommended for users explicitly attach the IDs to the items being
indexed to avoid this issue.
"""
succeeded: List[str]
"""The IDs that were successfully indexed."""
failed: List[str]
"""The IDs that failed to index."""

View File

@@ -0,0 +1,81 @@
import uuid
from typing import Any, Dict, List, Optional, Sequence
from langchain_core.documents import Document
from langchain_core.indexing import UpsertResponse
from langchain_core.indexing.base import DeleteResponse, DocumentIndexer
class InMemoryDocumentIndexer(DocumentIndexer):
"""In memory sync indexer."""
def __init__(self, *, store: Optional[Dict[str, Document]] = None) -> None:
"""An in memory implementation of a document indexer."""
self.store = store if store is not None else {}
def upsert(self, items: Sequence[Document], /, **kwargs: Any) -> UpsertResponse:
"""Upsert items into the indexer."""
ok_ids = []
for item in items:
if item.id is None:
id_ = uuid.uuid4()
item_ = item.copy()
item_.id = str(id_)
else:
item_ = item
self.store[item_.id] = item_
ok_ids.append(item_.id)
return UpsertResponse(succeeded=ok_ids, failed=[])
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse:
"""Delete by ID."""
if ids is None:
raise ValueError("IDs must be provided for deletion")
ok_ids = []
for id_ in ids:
if id_ in self.store:
del self.store[id_]
ok_ids.append(id_)
return DeleteResponse(
succeeded=ok_ids, num_deleted=len(ok_ids), num_failed=0, failed=[]
)
def get(self, ids: Sequence[str], /, **kwargs: Any) -> List[Document]:
"""Get by ids."""
found_documents = []
for id_ in ids:
if id_ in self.store:
found_documents.append(self.store[id_])
return found_documents
class AsyncInMemoryDocumentIndexer(DocumentIndexer):
"""An in memory async indexer implementation."""
def __init__(self, *, store: Optional[Dict[str, Document]] = None) -> None:
"""An in memory implementation of a document indexer."""
self.indexer = InMemoryDocumentIndexer(store=store)
async def upsert(
self, items: Sequence[Document], /, **kwargs: Any
) -> UpsertResponse:
"""Upsert items into the indexer."""
return self.indexer.upsert(items, **kwargs)
async def delete(
self, ids: Optional[List[str]] = None, **kwargs: Any
) -> DeleteResponse:
"""Delete by ID."""
return self.indexer.delete(ids, **kwargs)
async def get(self, ids: Sequence[str], /, **kwargs: Any) -> List[Document]:
"""Get by ids."""
return self.indexer.get(ids, **kwargs)

View File

@@ -0,0 +1,101 @@
"""Design pattern with .indexer attached as an attribute to a retriever."""
from pydantic.v1 import Field
from typing import List, Sequence, Any, Optional
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.indexing import UpsertResponse
from langchain_core.indexing.base import DocumentIndex, Content, DeleteResponse
from langchain_core.retrievers import BaseRetriever
class InMemoryDocIndexer(DocumentIndex):
def __init__(self):
self.documents = {}
def upsert(self, items: Sequence[Content], /, **kwargs: Any) -> UpsertResponse:
for item in items:
self.documents[item.id] = item
return UpsertResponse(success=True)
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse:
failed_ids = []
for id in ids:
try:
del self.documents[id]
except KeyError:
failed_ids.append(id)
return sorted(set(failed_ids))
def get(self, ids: Sequence[str], **kwargs: Any) -> List[Content]:
return [self.documents[id] for id in ids]
class InMemoryCatRetriever(BaseRetriever):
indexer: InMemoryDocIndexer = Field(default_factory=InMemoryDocIndexer)
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
# ↓↓ Leaks knowledge
docs = self.indexer.documents # <-- retriever needs knowledge of the indexer
# ^^
good_docs = [doc for doc in docs if "cat" in doc.text.lower()]
return good_docs
class MultiIndexIndexer(DocumentIndex):
def __init__(self, indexers: List[DocumentIndex]):
self.indexers = indexers
def upsert(self, items: Sequence[Content], /, **kwargs: Any) -> UpsertResponse:
for index in self.indexers:
index.upsert(items, **kwargs)
return {} # Update properly in actual implementation
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse:
failed_ids = []
# Thread pool
for index in self.indexers:
try:
index.delete(ids, **kwargs)
except Exception as e:
failed_ids.extend(ids)
return sorted(set(failed_ids))
def get(
self, ids: Sequence[str], *, index_id: int = 0, **kwargs
) -> Optional[Content]:
indexer = self.indexers[index_id]
return indexer.get(ids, **kwargs)
class FederatedRetriever(BaseRetriever):
"""A simple retriever that returns the first document in the index"""
retrievers: List[BaseRetriever]
indexer: DocumentIndex
def __init__(self, retrievers: List[BaseRetriever]):
"""
Args:
retrievers: A list of retrievers to use
indexer: The indexer to use
"""
indexer = MultiIndexIndexer([retriever.indexer for retriever in retrievers])
super().__init__(retrievers=retrievers, indexer=indexer)
def get_indexer(self) -> DocumentIndex:
return self.indexer
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
docs = []
# whatever logic
for retriever in self.retrievers:
docs.extend(
retriever.get_relevant_documents(query, run_manager=run_manager)
)
return docs

View File

@@ -0,0 +1,88 @@
"""Design pattern where the Indexer implements a .search() method.
We can either subclass from Retriever and document index is a retriever that
invokes the search() method correctly.
InMemoryDocIndexer.invoke('meow')
Or we can create a retriever from it using a factory method -- this amounts
to chopping off everything but the search() method and standardizing the inputs
and outputs into that method.
InMemoryDocIndexer.create_retriever().invoke('meow')
"""
from typing import List, Sequence, Any, Optional
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.indexing import UpsertResponse
from langchain_core.indexing.base import (
DocumentIndex,
Content,
DeleteResponse,
QueryResponse,
Hit,
)
class InMemoryDocIndexer(DocumentIndex):
def __init__(self):
self.documents = {}
def upsert(self, items: Sequence[Content], /, **kwargs: Any) -> UpsertResponse:
for item in items:
self.documents[item.id] = item
return UpsertResponse(success=True)
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse:
failed_ids = []
for id in ids:
try:
del self.documents[id]
except KeyError:
failed_ids.append(id)
return sorted(set(failed_ids))
def get(self, ids: Sequence[str], **kwargs: Any) -> List[Content]:
return [self.documents[id] for id in ids]
def search(self, query: str, **kwargs: Any) -> QueryResponse[Hit]:
good_docs = [doc for doc in docs if "cat" in doc.text.lower()]
return QueryResponse(hits=[Hit(score=1.0, **doc) for doc in good_docs])
class FederatedIndex(DocumentIndex):
"""A federated index. Can upsert and delete from all indexes. Can search across all indexes."""
indexes: List[DocumentIndex]
def upsert(self, items: Sequence[Content], /, **kwargs: Any) -> UpsertResponse:
for index in self.indexers:
index.upsert(items, **kwargs)
return {} # Update properly in actual implementation
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse:
failed_ids = []
# Thread pool
for index in self.indexers:
try:
index.delete(ids, **kwargs)
except Exception as e:
failed_ids.extend(ids)
return sorted(set(failed_ids))
def get(
self, ids: Sequence[str], *, index_id: int = 0, **kwargs
) -> Optional[Content]:
indexer = self.indexers[index_id]
return indexer.get(ids, **kwargs)
def search(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
docs = []
# whatever logic
for index in self.indexes:
docs.extend(index.search(query, run_manager=run_manager))
return docs

View File

@@ -0,0 +1,27 @@
"""Test in memory indexer"""
from typing import Generator
import pytest
from langchain_standard_tests.integration_tests.indexer import (
AsyncDocumentIndexerTestSuite,
DocumentIndexerTestSuite,
)
from langchain_core.indexing import AsyncDocumentIndexer, DocumentIndexer
from langchain_core.indexing.in_memory import (
AsyncInMemoryDocumentIndexer,
InMemoryDocumentIndexer,
)
class TestDocumentIndexerTestSuite(DocumentIndexerTestSuite):
@pytest.fixture()
def indexer(self) -> Generator[DocumentIndexer, None, None]:
yield InMemoryDocumentIndexer()
class TestAsyncDocumentIndexerTestSuite(AsyncDocumentIndexerTestSuite):
@pytest.fixture()
async def indexer(self) -> Generator[AsyncDocumentIndexer, None, None]:
yield AsyncInMemoryDocumentIndexer()

View File

@@ -4,11 +4,13 @@ from langchain_core.indexing import __all__
def test_all() -> None:
"""Use to catch obvious breaking changes."""
assert __all__ == sorted(__all__, key=str.lower)
assert __all__ == [
assert set(__all__) == {
"aindex",
"AsyncDocumentIndexer",
"DocumentIndexer",
"index",
"IndexingResult",
"InMemoryRecordManager",
"RecordManager",
"UpsertResponse",
]
}

View File

@@ -30,14 +30,14 @@ lint_tests: PYTHON_FILES=tests
lint_tests: MYPY_CACHE=.mypy_cache_test
lint lint_diff lint_package lint_tests:
poetry run ruff .
poetry run ruff check .
poetry run ruff format $(PYTHON_FILES) --diff
poetry run ruff --select I $(PYTHON_FILES)
poetry run ruff check --select I $(PYTHON_FILES)
mkdir $(MYPY_CACHE); poetry run mypy $(PYTHON_FILES) --cache-dir $(MYPY_CACHE)
format format_diff:
poetry run ruff format $(PYTHON_FILES)
poetry run ruff --select I --fix $(PYTHON_FILES)
poetry run ruff check --select I --fix $(PYTHON_FILES)
spell_check:
poetry run codespell --toml pyproject.toml

View File

@@ -1,7 +1,14 @@
import pytest
from langchain_standard_tests.integration_tests.chat_models import (
ChatModelIntegrationTests,
)
# Rewrite assert statements for test suite so that implementations can
# see the full error message from failed asserts.
# https://docs.pytest.org/en/7.1.x/how-to/writing_plugins.html#assertion-rewriting
pytest.register_assert_rewrite("langchain_standard_tests.integration_tests.indexer")
__all__ = [
"ChatModelIntegrationTests",
]

View File

@@ -0,0 +1,394 @@
"""Test suite to check indexer implementations."""
import inspect
import uuid
from abc import ABC, abstractmethod
from typing import AsyncGenerator, Generator
import pytest
from langchain_core.documents import Document
from langchain_core.indexing import DocumentIndexer
class DocumentIndexerTestSuite(ABC):
"""Test suite for checking the read-write of a document indexer.
Implementers should subclass this test suite and provide a fixture
that returns an empty indexer for each test.
"""
@abstractmethod
@pytest.fixture
def indexer(self) -> Generator[DocumentIndexer, None, None]:
"""Get the indexer."""
def test_upsert_documents_has_no_ids(self, indexer: DocumentIndexer) -> None:
"""Verify that there is not parameter called ids in upsert"""
signature = inspect.signature(indexer.upsert)
assert "ids" not in signature.parameters
def test_upsert_no_ids(self, indexer: DocumentIndexer) -> None:
"""Upsert works with documents that do not have IDs.
At the moment, the ID field in documents is optional.
"""
documents = [
Document(page_content="foo", metadata={"id": 1}),
Document(page_content="bar", metadata={"id": 2}),
]
response = indexer.upsert(documents)
ids = sorted(response["succeeded"])
# Ordering is not guaranteed, need to test carefully
documents = indexer.get(ids)
sorted_documents = sorted(documents, key=lambda x: x.id)
if sorted_documents[0].page_content == "bar":
assert sorted_documents[0] == Document(
page_content="bar", metadata={"id": 2}, id=ids[0]
)
assert sorted_documents[1] == Document(
page_content="foo", metadata={"id": 1}, id=ids[1]
)
else:
assert sorted_documents[0] == Document(
page_content="foo", metadata={"id": 1}, id=ids[0]
)
assert sorted_documents[1] == Document(
page_content="bar", metadata={"id": 2}, id=ids[1]
)
def test_upsert_some_ids(self, indexer: DocumentIndexer) -> None:
"""Test an upsert where some docs have ids and some dont."""
foo_uuid = str(uuid.UUID(int=7))
documents = [
Document(id=foo_uuid, page_content="foo", metadata={"id": 1}),
Document(page_content="bar", metadata={"id": 2}),
]
response = indexer.upsert(documents)
ids = response["succeeded"]
other_id = list(set(ids) - {foo_uuid})[0]
assert response["failed"] == []
assert foo_uuid in ids
# Ordering is not guaranteed, so we use a set.
documents = indexer.get(ids)
first_doc = documents[0]
if first_doc.id == foo_uuid:
assert documents == [
Document(page_content="foo", metadata={"id": 1}, id=foo_uuid),
Document(page_content="bar", metadata={"id": 2}, id=other_id),
]
else:
assert documents == [
Document(page_content="bar", metadata={"id": 2}, id=other_id),
Document(page_content="foo", metadata={"id": 1}, id=foo_uuid),
]
def test_upsert_overwrites(self, indexer: DocumentIndexer) -> None:
"""Test that upsert overwrites existing content."""
foo_uuid = str(uuid.UUID(int=7))
documents = [
Document(id=foo_uuid, page_content="foo", metadata={"bar": 1}),
]
response = indexer.upsert(documents)
ids = response["succeeded"]
assert response["failed"] == []
assert indexer.get(ids) == [
Document(page_content="foo", metadata={"bar": 1}, id=foo_uuid),
]
# Now let's overwrite foo
indexer.upsert(
[Document(id=foo_uuid, page_content="foo2", metadata={"meow": 2})]
)
documents = indexer.get([foo_uuid])
assert documents == [
Document(page_content="foo2", metadata={"meow": 2}, id=foo_uuid)
]
def test_delete_missing_docs(self, indexer: DocumentIndexer) -> None:
"""Verify that we can delete docs that aren't there."""
assert indexer.get(["1"]) == [] # Should be empty.
delete_response = indexer.delete(["1"])
if "num_deleted" in delete_response:
assert delete_response["num_deleted"] == 0
if "num_failed" in delete_response:
# Deleting a missing an ID is **not** failure!!
assert delete_response["num_failed"] == 0
if "succeeded" in delete_response:
# There was nothing to delete!
assert delete_response["succeeded"] == []
if "failed" in delete_response:
# Nothing should have failed
assert delete_response["failed"] == []
def test_delete_semantics(self, indexer: DocumentIndexer) -> None:
"""Test deletion of content has appropriate semantics."""
# Let's index a document first.
foo_uuid = str(uuid.UUID(int=7))
upsert_response = indexer.upsert(
[Document(id=foo_uuid, page_content="foo", metadata={})]
)
assert upsert_response == {"succeeded": [foo_uuid], "failed": []}
delete_response = indexer.delete(["missing_id", foo_uuid])
if "num_deleted" in delete_response:
assert delete_response["num_deleted"] == 1
if "num_failed" in delete_response:
# Deleting a missing an ID is **not** failure!!
assert delete_response["num_failed"] == 0
if "succeeded" in delete_response:
# There was nothing to delete!
assert delete_response["succeeded"] == [foo_uuid]
if "failed" in delete_response:
# Nothing should have failed
assert delete_response["failed"] == []
def test_bulk_delete(self, indexer: DocumentIndexer) -> None:
"""Test that we can delete several documents at once."""
documents = [
Document(id="1", page_content="foo", metadata={"id": 1}),
Document(id="2", page_content="bar", metadata={"id": 2}),
Document(id="3", page_content="baz", metadata={"id": 3}),
]
indexer.upsert(documents)
indexer.delete(["1", "2"])
assert indexer.get(["1", "2", "3"]) == [
Document(page_content="baz", metadata={"id": 3}, id="3")
]
def test_delete_no_args(self, indexer: DocumentIndexer) -> None:
"""Test delete with no args raises ValueError."""
with pytest.raises(ValueError):
indexer.delete()
def test_delete_missing_content(self, indexer: DocumentIndexer) -> None:
"""Deleting missing content should not raise an exception."""
indexer.delete(["1"])
indexer.delete(["1", "2", "3"])
def test_get_with_missing_ids(self, indexer: DocumentIndexer) -> None:
"""Test get with missing IDs."""
documents = [
Document(id="1", page_content="foo", metadata={"id": 1}),
Document(id="2", page_content="bar", metadata={"id": 2}),
]
upsert_response = indexer.upsert(documents)
assert upsert_response == {
"succeeded": ["1", "2"],
"failed": [],
}
retrieved_documents = indexer.get(["1", "2", "3", "4"])
# The ordering is not guaranteed, so we use a set.
assert sorted(retrieved_documents, key=lambda x: x.id) == [
Document(page_content="foo", metadata={"id": 1}, id="1"),
Document(page_content="bar", metadata={"id": 2}, id="2"),
]
def test_get_missing(self, indexer: DocumentIndexer) -> None:
"""Test get by IDs with missing IDs."""
# This should not raise an exception
documents = indexer.get(["1", "2", "3"])
assert documents == []
class AsyncDocumentIndexerTestSuite(ABC):
"""Test suite for checking the read-write of a document indexer.
Implementers should subclass this test suite and provide a fixture
that returns an empty indexer for each test.
"""
@abstractmethod
@pytest.fixture
async def indexer(self) -> AsyncGenerator[DocumentIndexer, None]:
"""Get the indexer."""
async def test_upsert_documents_has_no_ids(self, indexer: DocumentIndexer) -> None:
"""Verify that there is not parameter called ids in upsert"""
signature = inspect.signature(indexer.upsert)
assert "ids" not in signature.parameters
async def test_upsert_no_ids(self, indexer: DocumentIndexer) -> None:
"""Upsert works with documents that do not have IDs.
At the moment, the ID field in documents is optional.
"""
documents = [
Document(page_content="foo", metadata={"id": 1}),
Document(page_content="bar", metadata={"id": 2}),
]
response = await indexer.upsert(documents)
ids = sorted(response["succeeded"])
# Ordering is not guaranteed, need to test carefully
documents = await indexer.get(ids)
sorted_documents = sorted(documents, key=lambda x: x.id)
if sorted_documents[0].page_content == "bar":
assert sorted_documents[0] == Document(
page_content="bar", metadata={"id": 2}, id=ids[0]
)
assert sorted_documents[1] == Document(
page_content="foo", metadata={"id": 1}, id=ids[1]
)
else:
assert sorted_documents[0] == Document(
page_content="foo", metadata={"id": 1}, id=ids[0]
)
assert sorted_documents[1] == Document(
page_content="bar", metadata={"id": 2}, id=ids[1]
)
async def test_upsert_some_ids(self, indexer: DocumentIndexer) -> None:
"""Test an upsert where some docs have ids and some dont."""
foo_uuid = str(uuid.UUID(int=7))
documents = [
Document(id=foo_uuid, page_content="foo", metadata={"id": 1}),
Document(page_content="bar", metadata={"id": 2}),
]
response = await indexer.upsert(documents)
ids = response["succeeded"]
other_id = list(set(ids) - {foo_uuid})[0]
assert response["failed"] == []
assert foo_uuid in ids
# Ordering is not guaranteed, so we use a set.
documents = await indexer.get(ids)
first_doc = documents[0]
if first_doc.id == foo_uuid:
assert documents == [
Document(page_content="foo", metadata={"id": 1}, id=foo_uuid),
Document(page_content="bar", metadata={"id": 2}, id=other_id),
]
else:
assert documents == [
Document(page_content="bar", metadata={"id": 2}, id=other_id),
Document(page_content="foo", metadata={"id": 1}, id=foo_uuid),
]
async def test_upsert_overwrites(self, indexer: DocumentIndexer) -> None:
"""Test that upsert overwrites existing content."""
foo_uuid = str(uuid.UUID(int=7))
documents = [
Document(id=foo_uuid, page_content="foo", metadata={"bar": 1}),
]
response = await indexer.upsert(documents)
ids = response["succeeded"]
assert response["failed"] == []
assert await indexer.get(ids) == [
Document(page_content="foo", metadata={"bar": 1}, id=foo_uuid),
]
# Now let's overwrite foo
await indexer.upsert(
[Document(id=foo_uuid, page_content="foo2", metadata={"meow": 2})]
)
documents = await indexer.get([foo_uuid])
assert documents == [
Document(page_content="foo2", metadata={"meow": 2}, id=foo_uuid)
]
async def test_delete_missing_docs(self, indexer: DocumentIndexer) -> None:
"""Verify that we can delete docs that aren't there."""
assert await indexer.get(["1"]) == [] # Should be empty.
delete_response = await indexer.delete(["1"])
if "num_deleted" in delete_response:
assert delete_response["num_deleted"] == 0
if "num_failed" in delete_response:
# Deleting a missing an ID is **not** failure!!
assert delete_response["num_failed"] == 0
if "succeeded" in delete_response:
# There was nothing to delete!
assert delete_response["succeeded"] == []
if "failed" in delete_response:
# Nothing should have failed
assert delete_response["failed"] == []
async def test_delete_semantics(self, indexer: DocumentIndexer) -> None:
"""Test deletion of content has appropriate semantics."""
# Let's index a document first.
foo_uuid = str(uuid.UUID(int=7))
upsert_response = await indexer.upsert(
[Document(id=foo_uuid, page_content="foo", metadata={})]
)
assert upsert_response == {"succeeded": [foo_uuid], "failed": []}
delete_response = await indexer.delete(["missing_id", foo_uuid])
if "num_deleted" in delete_response:
assert delete_response["num_deleted"] == 1
if "num_failed" in delete_response:
# Deleting a missing an ID is **not** failure!!
assert delete_response["num_failed"] == 0
if "succeeded" in delete_response:
# There was nothing to delete!
assert delete_response["succeeded"] == [foo_uuid]
if "failed" in delete_response:
# Nothing should have failed
assert delete_response["failed"] == []
async def test_bulk_delete(self, indexer: DocumentIndexer) -> None:
"""Test that we can delete several documents at once."""
documents = [
Document(id="1", page_content="foo", metadata={"id": 1}),
Document(id="2", page_content="bar", metadata={"id": 2}),
Document(id="3", page_content="baz", metadata={"id": 3}),
]
await indexer.upsert(documents)
await indexer.delete(["1", "2"])
assert await indexer.get(["1", "2", "3"]) == [
Document(page_content="baz", metadata={"id": 3}, id="3")
]
async def test_delete_no_args(self, indexer: DocumentIndexer) -> None:
"""Test delete with no args raises ValueError."""
with pytest.raises(ValueError):
await indexer.delete()
async def test_delete_missing_content(self, indexer: DocumentIndexer) -> None:
"""Deleting missing content should not raise an exception."""
await indexer.delete(["1"])
await indexer.delete(["1", "2", "3"])
async def test_get_with_missing_ids(self, indexer: DocumentIndexer) -> None:
"""Test get with missing IDs."""
documents = [
Document(id="1", page_content="foo", metadata={"id": 1}),
Document(id="2", page_content="bar", metadata={"id": 2}),
]
upsert_response = await indexer.upsert(documents)
assert upsert_response == {
"succeeded": ["1", "2"],
"failed": [],
}
retrieved_documents = await indexer.get(["1", "2", "3", "4"])
# The ordering is not guaranteed, so we use a set.
assert sorted(retrieved_documents, key=lambda x: x.id) == [
Document(page_content="foo", metadata={"id": 1}, id="1"),
Document(page_content="bar", metadata={"id": 2}, id="2"),
]
async def test_get_missing(self, indexer: DocumentIndexer) -> None:
"""Test get by IDs with missing IDs."""
# This should not raise an exception
documents = await indexer.get(["1", "2", "3"])
assert documents == []