This commit is contained in:
Eugene Yurtsev
2024-07-17 17:18:09 -04:00
parent 1a41d27b1d
commit 10708e856e
6 changed files with 311 additions and 74 deletions

View File

@@ -7,6 +7,7 @@ if it's unchanged.
from langchain_core.indexing.api import IndexingResult, aindex, index
from langchain_core.indexing.base import (
AsyncDocumentIndexer,
DocumentIndexer,
InMemoryRecordManager,
RecordManager,
@@ -16,6 +17,7 @@ from langchain_core.indexing.base import (
__all__ = [
"aindex",
"DocumentIndexer",
"AsyncDocumentIndexer",
"index",
"IndexingResult",
"InMemoryRecordManager",

View File

@@ -122,7 +122,73 @@ class DocumentIndexer(abc.ABC):
.. versionadded:: ___version___
"""
async def aupsert(
@abc.abstractmethod
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse:
"""Delete by IDs or other criteria.
Calling delete without any input parameters should raise a ValueError!
Args:
ids: List of ids to delete.
kwargs: Additional keyword arguments. This is up to the implementation.
For example, can include an option to delete the entire index,
or else issue a non blocking delete etc.
Returns:
DeleteResponse: A response object that contains the list of IDs that were
successfully deleted and the list of IDs that failed to be deleted.
"""
@abc.abstractmethod
def get(
self,
ids: Sequence[str],
/,
**kwargs: Any,
) -> List[Document]:
"""Get documents by id.
Fewer documents may be returned than requested if some IDs are not found or
if there are duplicated IDs.
Users should not assume that the order of the returned documents matches
the order of the input IDs. Instead, users should rely on the ID field of the
returned documents.
This method should **NOT** raise exceptions if no documents are found for
some IDs.
Args:
ids: List of IDs to get.
kwargs: Additional keyword arguments. These are up to the implementation.
Returns:
List[Document]: List of documents that were found.
.. versionadded:: ___version___
"""
@beta(message="Added in ___version___. The API is subject to change.")
class AsyncDocumentIndexer(abc.ABC):
"""An abstraction for indexing documents. Async Variant.
This indexing interface is designed to be a generic abstraction for storing and
querying documents that has an ID and metadata associated with it.
The interface is designed to be agnostic to the underlying implementation of the
indexing system.
The interface is designed to support the following operations:
1. Storing content in the index.
2. Retrieving content by ID.
.. versionadded:: ___version___
"""
@abc.abstractmethod
async def upsert(
self, items: Sequence[Document], /, **kwargs: Any
) -> UpsertResponse:
"""Add or update documents in the vectorstore. Async version of upsert.
@@ -154,23 +220,7 @@ class DocumentIndexer(abc.ABC):
)
@abc.abstractmethod
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse:
"""Delete by IDs or other criteria.
Calling delete without any input parameters should raise a ValueError!
Args:
ids: List of ids to delete.
kwargs: Additional keyword arguments. This is up to the implementation.
For example, can include an option to delete the entire index,
or else issue a non blocking delete etc.
Returns:
DeleteResponse: A response object that contains the list of IDs that were
successfully deleted and the list of IDs that failed to be deleted.
"""
async def adelete(
async def delete(
self, ids: Optional[List[str]] = None, **kwargs: Any
) -> DeleteResponse:
"""Delete by IDs or other criteria. Async variant.
@@ -186,15 +236,9 @@ class DocumentIndexer(abc.ABC):
DeleteResponse: A response object that contains the list of IDs that were
successfully deleted and the list of IDs that failed to be deleted.
"""
return await run_in_executor(
None,
self.delete,
ids,
**kwargs,
)
@abc.abstractmethod
def get(
async def get(
self,
ids: Sequence[str],
/,
@@ -222,40 +266,6 @@ class DocumentIndexer(abc.ABC):
.. versionadded:: ___version___
"""
async def aget(
self,
ids: Sequence[str],
/,
**kwargs: Any,
) -> List[Document]:
"""Get documents by id.
Fewer documents may be returned than requested if some IDs are not found or
if there are duplicated IDs.
Users should not assume that the order of the returned documents matches
the order of the input IDs. Instead, users should rely on the ID field of the
returned documents.
This method should **NOT** raise exceptions if no documents are found for
some IDs.
Args:
ids: List of IDs to get.
kwargs: Additional keyword arguments. These are up to the implementation.
Returns:
List[Document]: List of documents that were found.
.. versionadded:: ___version___
"""
return await run_in_executor(
None,
self.get,
ids,
**kwargs,
)
class RecordManager(ABC):
"""Abstract base class representing the interface for a record manager.

View File

@@ -6,7 +6,7 @@ from langchain_core.indexing import UpsertResponse
from langchain_core.indexing.base import DeleteResponse, DocumentIndexer
class InMemoryIndexer(DocumentIndexer):
class InMemoryDocumentIndexer(DocumentIndexer):
"""In memory sync indexer."""
def __init__(self, *, store: Optional[Dict[str, Document]] = None) -> None:
@@ -55,3 +55,27 @@ class InMemoryIndexer(DocumentIndexer):
found_documents.append(self.store[id_])
return found_documents
class AsyncInMemoryDocumentIndexer(DocumentIndexer):
"""An in memory async indexer implementation."""
def __init__(self, *, store: Optional[Dict[str, Document]] = None) -> None:
"""An in memory implementation of a document indexer."""
self.indexer = InMemoryDocumentIndexer(store=store)
async def upsert(
self, items: Sequence[Document], /, **kwargs: Any
) -> UpsertResponse:
"""Upsert items into the indexer."""
return self.indexer.upsert(items, **kwargs)
async def delete(
self, ids: Optional[List[str]] = None, **kwargs: Any
) -> DeleteResponse:
"""Delete by ID."""
return self.indexer.delete(ids, **kwargs)
async def get(self, ids: Sequence[str], /, **kwargs: Any) -> List[Document]:
"""Get by ids."""
return self.indexer.get(ids, **kwargs)

View File

@@ -4,14 +4,24 @@ from typing import Generator
import pytest
from langchain_standard_tests.integration_tests.indexer import (
BaseDocumentIndexerTestSuite,
AsyncDocumentIndexerTestSuite,
DocumentIndexerTestSuite,
)
from langchain_core.indexing import DocumentIndexer
from langchain_core.indexing.in_memory import InMemoryIndexer
from langchain_core.indexing import AsyncDocumentIndexer, DocumentIndexer
from langchain_core.indexing.in_memory import (
AsyncInMemoryDocumentIndexer,
InMemoryDocumentIndexer,
)
class TestDocumentIndexerTestSuite(BaseDocumentIndexerTestSuite):
class TestDocumentIndexerTestSuite(DocumentIndexerTestSuite):
@pytest.fixture()
def indexer(self) -> Generator[DocumentIndexer, None, None]:
return InMemoryIndexer()
yield InMemoryDocumentIndexer()
class TestAsyncDocumentIndexerTestSuite(AsyncDocumentIndexerTestSuite):
@pytest.fixture()
async def indexer(self) -> Generator[AsyncDocumentIndexer, None, None]:
yield AsyncInMemoryDocumentIndexer()

View File

@@ -30,14 +30,14 @@ lint_tests: PYTHON_FILES=tests
lint_tests: MYPY_CACHE=.mypy_cache_test
lint lint_diff lint_package lint_tests:
poetry run ruff .
poetry run ruff check .
poetry run ruff format $(PYTHON_FILES) --diff
poetry run ruff --select I $(PYTHON_FILES)
poetry run ruff check --select I $(PYTHON_FILES)
mkdir $(MYPY_CACHE); poetry run mypy $(PYTHON_FILES) --cache-dir $(MYPY_CACHE)
format format_diff:
poetry run ruff format $(PYTHON_FILES)
poetry run ruff --select I --fix $(PYTHON_FILES)
poetry run ruff check --select I --fix $(PYTHON_FILES)
spell_check:
poetry run codespell --toml pyproject.toml

View File

@@ -1,19 +1,16 @@
"""Test suite to check indexer implementations."""
import inspect
import uuid
from abc import ABC, abstractmethod
from typing import Generator
from typing import AsyncGenerator, Generator
import pytest
from langchain_core.documents import Document
from langchain_core.indexing import DocumentIndexer
# Arbitrarily chosen. Using a small embedding size
# so tests are faster and easier to debug.
EMBEDDING_SIZE = 6
class BaseDocumentIndexerTestSuite(ABC):
class DocumentIndexerTestSuite(ABC):
"""Test suite for checking the read-write of a document indexer.
Implementers should subclass this test suite and provide a fixture
@@ -172,6 +169,8 @@ class BaseDocumentIndexerTestSuite(ABC):
def test_delete_no_args(self, indexer: DocumentIndexer) -> None:
"""Test delete with no args raises ValueError."""
with pytest.raises(ValueError):
indexer.delete()
def test_delete_missing_content(self, indexer: DocumentIndexer) -> None:
"""Deleting missing content should not raise an exception."""
@@ -201,3 +200,195 @@ class BaseDocumentIndexerTestSuite(ABC):
# This should not raise an exception
documents = indexer.get(["1", "2", "3"])
assert documents == []
class AsyncDocumentIndexerTestSuite(ABC):
"""Test suite for checking the read-write of a document indexer.
Implementers should subclass this test suite and provide a fixture
that returns an empty indexer for each test.
"""
@abstractmethod
@pytest.fixture
async def indexer(self) -> AsyncGenerator[DocumentIndexer, None]:
"""Get the indexer."""
async def test_upsert_documents_has_no_ids(self, indexer: DocumentIndexer) -> None:
"""Verify that there is not parameter called ids in upsert"""
signature = inspect.signature(indexer.upsert)
assert "ids" not in signature.parameters
async def test_upsert_no_ids(self, indexer: DocumentIndexer) -> None:
"""Upsert works with documents that do not have IDs.
At the moment, the ID field in documents is optional.
"""
documents = [
Document(page_content="foo", metadata={"id": 1}),
Document(page_content="bar", metadata={"id": 2}),
]
response = await indexer.upsert(documents)
ids = sorted(response["succeeded"])
# Ordering is not guaranteed, need to test carefully
documents = await indexer.get(ids)
sorted_documents = sorted(documents, key=lambda x: x.id)
if sorted_documents[0].page_content == "bar":
assert sorted_documents[0] == Document(
page_content="bar", metadata={"id": 2}, id=ids[0]
)
assert sorted_documents[1] == Document(
page_content="foo", metadata={"id": 1}, id=ids[1]
)
else:
assert sorted_documents[0] == Document(
page_content="foo", metadata={"id": 1}, id=ids[0]
)
assert sorted_documents[1] == Document(
page_content="bar", metadata={"id": 2}, id=ids[1]
)
async def test_upsert_some_ids(self, indexer: DocumentIndexer) -> None:
"""Test an upsert where some docs have ids and some dont."""
foo_uuid = str(uuid.UUID(int=7))
documents = [
Document(id=foo_uuid, page_content="foo", metadata={"id": 1}),
Document(page_content="bar", metadata={"id": 2}),
]
response = await indexer.upsert(documents)
ids = response["succeeded"]
other_id = list(set(ids) - {foo_uuid})[0]
assert response["failed"] == []
assert foo_uuid in ids
# Ordering is not guaranteed, so we use a set.
documents = await indexer.get(ids)
first_doc = documents[0]
if first_doc.id == foo_uuid:
assert documents == [
Document(page_content="foo", metadata={"id": 1}, id=foo_uuid),
Document(page_content="bar", metadata={"id": 2}, id=other_id),
]
else:
assert documents == [
Document(page_content="bar", metadata={"id": 2}, id=other_id),
Document(page_content="foo", metadata={"id": 1}, id=foo_uuid),
]
async def test_upsert_overwrites(self, indexer: DocumentIndexer) -> None:
"""Test that upsert overwrites existing content."""
foo_uuid = str(uuid.UUID(int=7))
documents = [
Document(id=foo_uuid, page_content="foo", metadata={"bar": 1}),
]
response = await indexer.upsert(documents)
ids = response["succeeded"]
assert response["failed"] == []
assert await indexer.get(ids) == [
Document(page_content="foo", metadata={"bar": 1}, id=foo_uuid),
]
# Now let's overwrite foo
await indexer.upsert(
[Document(id=foo_uuid, page_content="foo2", metadata={"meow": 2})]
)
documents = await indexer.get([foo_uuid])
assert documents == [
Document(page_content="foo2", metadata={"meow": 2}, id=foo_uuid)
]
async def test_delete_missing_docs(self, indexer: DocumentIndexer) -> None:
"""Verify that we can delete docs that aren't there."""
assert await indexer.get(["1"]) == [] # Should be empty.
delete_response = await indexer.delete(["1"])
if "num_deleted" in delete_response:
assert delete_response["num_deleted"] == 0
if "num_failed" in delete_response:
# Deleting a missing an ID is **not** failure!!
assert delete_response["num_failed"] == 0
if "succeeded" in delete_response:
# There was nothing to delete!
assert delete_response["succeeded"] == []
if "failed" in delete_response:
# Nothing should have failed
assert delete_response["failed"] == []
async def test_delete_semantics(self, indexer: DocumentIndexer) -> None:
"""Test deletion of content has appropriate semantics."""
# Let's index a document first.
foo_uuid = str(uuid.UUID(int=7))
upsert_response = await indexer.upsert(
[Document(id=foo_uuid, page_content="foo", metadata={})]
)
assert upsert_response == {"succeeded": [foo_uuid], "failed": []}
delete_response = await indexer.delete(["missing_id", foo_uuid])
if "num_deleted" in delete_response:
assert delete_response["num_deleted"] == 1
if "num_failed" in delete_response:
# Deleting a missing an ID is **not** failure!!
assert delete_response["num_failed"] == 0
if "succeeded" in delete_response:
# There was nothing to delete!
assert delete_response["succeeded"] == [foo_uuid]
if "failed" in delete_response:
# Nothing should have failed
assert delete_response["failed"] == []
async def test_bulk_delete(self, indexer: DocumentIndexer) -> None:
"""Test that we can delete several documents at once."""
documents = [
Document(id="1", page_content="foo", metadata={"id": 1}),
Document(id="2", page_content="bar", metadata={"id": 2}),
Document(id="3", page_content="baz", metadata={"id": 3}),
]
await indexer.upsert(documents)
await indexer.delete(["1", "2"])
assert await indexer.get(["1", "2", "3"]) == [
Document(page_content="baz", metadata={"id": 3}, id="3")
]
async def test_delete_no_args(self, indexer: DocumentIndexer) -> None:
"""Test delete with no args raises ValueError."""
with pytest.raises(ValueError):
await indexer.delete()
async def test_delete_missing_content(self, indexer: DocumentIndexer) -> None:
"""Deleting missing content should not raise an exception."""
await indexer.delete(["1"])
await indexer.delete(["1", "2", "3"])
async def test_get_with_missing_ids(self, indexer: DocumentIndexer) -> None:
"""Test get with missing IDs."""
documents = [
Document(id="1", page_content="foo", metadata={"id": 1}),
Document(id="2", page_content="bar", metadata={"id": 2}),
]
upsert_response = await indexer.upsert(documents)
assert upsert_response == {
"succeeded": ["1", "2"],
"failed": [],
}
retrieved_documents = await indexer.get(["1", "2", "3", "4"])
# The ordering is not guaranteed, so we use a set.
assert sorted(retrieved_documents, key=lambda x: x.id) == [
Document(page_content="foo", metadata={"id": 1}, id="1"),
Document(page_content="bar", metadata={"id": 2}, id="2"),
]
async def test_get_missing(self, indexer: DocumentIndexer) -> None:
"""Test get by IDs with missing IDs."""
# This should not raise an exception
documents = await indexer.get(["1", "2", "3"])
assert documents == []