core[minor]: add upsert, streaming_upsert, aupsert, astreaming_upsert methods to the VectorStore abstraction (#23774)

This PR rolls out part of the new proposed interface for vectorstores
(https://github.com/langchain-ai/langchain/pull/23544) to existing store
implementations.

The PR makes the following changes:

1. Adds standard upsert, streaming_upsert, aupsert, astreaming_upsert
methods to the vectorstore.
2. Updates `add_texts` and `aadd_texts` to be non required with a
default implementation that delegates to `upsert` and `aupsert` if those
have been implemented. The original `add_texts` and `aadd_texts` methods
are problematic as they spread object specific information across
document and **kwargs. (e.g., ids are not a part of the document)
3. Adds a default implementation to `add_documents` and `aadd_documents`
that delegates to `upsert` and `aupsert` respectively.
4. Adds standard unit tests to verify that a given vectorstore
implements a correct read/write API.

A downside of this implementation is that it creates `upsert` with a
very similar signature to `add_documents`.
The reason for introducing `upsert` is to:
* Remove any ambiguities about what information is allowed in `kwargs`.
Specifically kwargs should only be used for information common to all
indexed data. (e.g., indexing timeout).
*Allow inheriting from an anticipated generalized interface for indexing
that will allow indexing `BaseMedia` (i.e., allow making a vectorstore
for images/audio etc.)
 
`add_documents` can be deprecated in the future in favor of `upsert` to
make sure that users have a single correct way of indexing content.

---------

Co-authored-by: ccurme <chester.curme@gmail.com>
This commit is contained in:
Eugene Yurtsev
2024-07-05 12:21:40 -04:00
committed by GitHub
parent 3c752238c5
commit 6f08e11d7c
14 changed files with 667 additions and 83 deletions

View File

@@ -10,4 +10,5 @@ def test_all() -> None:
"IndexingResult",
"InMemoryRecordManager",
"RecordManager",
"UpsertResponse",
]

View File

@@ -0,0 +1,31 @@
from typing import AsyncIterator, List
import pytest
from langchain_core.utils.aiter import abatch_iterate
@pytest.mark.parametrize(
"input_size, input_iterable, expected_output",
[
(2, [1, 2, 3, 4, 5], [[1, 2], [3, 4], [5]]),
(3, [10, 20, 30, 40, 50], [[10, 20, 30], [40, 50]]),
(1, [100, 200, 300], [[100], [200], [300]]),
(4, [], []),
],
)
async def test_abatch_iterate(
input_size: int, input_iterable: List[str], expected_output: List[str]
) -> None:
"""Test batching function."""
async def _to_async_iterable(iterable: List[str]) -> AsyncIterator[str]:
for item in iterable:
yield item
iterator_ = abatch_iterate(input_size, _to_async_iterable(input_iterable))
assert isinstance(iterator_, AsyncIterator)
output = [el async for el in iterator_]
assert output == expected_output

View File

@@ -6,6 +6,8 @@ EXPECTED_ALL = [
"convert_to_secret_str",
"formatter",
"get_bolded_text",
"abatch_iterate",
"batch_iterate",
"get_color_mapping",
"get_colored_text",
"get_pydantic_field_names",

View File

@@ -0,0 +1,194 @@
from __future__ import annotations
import uuid
from typing import Any, Dict, List, Optional, Sequence, Union
from typing_extensions import TypedDict
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.indexing.base import UpsertResponse
from langchain_core.vectorstores import VectorStore
def test_custom_upsert_type() -> None:
"""Test that we can override the signature of the upsert method
of the VectorStore class without creating typing issues by violating
the Liskov Substitution Principle.
"""
class ByVector(TypedDict):
document: Document
vector: List[float]
class CustomVectorStore(VectorStore):
def upsert(
# This unit test verifies that the signature of the upsert method
# specifically the items parameter can be overridden without
# violating the Liskov Substitution Principle (and getting
# typing errors).
self,
items: Union[Sequence[Document], Sequence[ByVector]],
/,
**kwargs: Any,
) -> UpsertResponse:
raise NotImplementedError()
class CustomSyncVectorStore(VectorStore):
"""A vectorstore that only implements the synchronous methods."""
def __init__(self) -> None:
self.store: Dict[str, Document] = {}
def upsert(
self,
items: Sequence[Document],
/,
**kwargs: Any,
) -> UpsertResponse:
ids = []
for item in items:
if item.id is None:
new_item = item.copy()
id_: str = str(uuid.uuid4())
new_item.id = id_
else:
id_ = item.id
new_item = item
self.store[id_] = new_item
ids.append(id_)
return {
"succeeded": ids,
"failed": [],
}
def get_by_ids(self, ids: Sequence[str], /) -> List[Document]:
return [self.store[id] for id in ids if id in self.store]
def from_texts( # type: ignore
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> CustomSyncVectorStore:
vectorstore = CustomSyncVectorStore()
vectorstore.add_texts(texts, metadatas=metadatas, **kwargs)
return vectorstore
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
raise NotImplementedError()
def test_implement_upsert() -> None:
"""Test that we can implement the upsert method of the CustomVectorStore
class without violating the Liskov Substitution Principle.
"""
store = CustomSyncVectorStore()
# Check upsert with id
assert store.upsert([Document(id="1", page_content="hello")]) == {
"succeeded": ["1"],
"failed": [],
}
assert store.get_by_ids(["1"]) == [Document(id="1", page_content="hello")]
# Check upsert without id
response = store.upsert([Document(page_content="world")])
assert len(response["succeeded"]) == 1
id_ = response["succeeded"][0]
assert id_ is not None
assert store.get_by_ids([id_]) == [Document(id=id_, page_content="world")]
# Check that default implementation of add_texts works
assert store.add_texts(["hello", "world"], ids=["3", "4"]) == ["3", "4"]
assert store.get_by_ids(["3", "4"]) == [
Document(id="3", page_content="hello"),
Document(id="4", page_content="world"),
]
# Add texts without ids
ids_ = store.add_texts(["foo", "bar"])
assert len(ids_) == 2
assert store.get_by_ids(ids_) == [
Document(id=ids_[0], page_content="foo"),
Document(id=ids_[1], page_content="bar"),
]
# Add texts with metadatas
ids_2 = store.add_texts(["foo", "bar"], metadatas=[{"foo": "bar"}] * 2)
assert len(ids_2) == 2
assert store.get_by_ids(ids_2) == [
Document(id=ids_2[0], page_content="foo", metadata={"foo": "bar"}),
Document(id=ids_2[1], page_content="bar", metadata={"foo": "bar"}),
]
# Check that add_documents works
assert store.add_documents([Document(id="5", page_content="baz")]) == ["5"]
# Test add documents with id specified in both document and ids
original_document = Document(id="7", page_content="baz")
assert store.add_documents([original_document], ids=["6"]) == ["6"]
assert original_document.id == "7" # original document should not be modified
assert store.get_by_ids(["6"]) == [Document(id="6", page_content="baz")]
async def test_aupsert_delegation_to_upsert() -> None:
"""Test delegation to the synchronous upsert method in async execution
if async methods are not implemented.
"""
store = CustomSyncVectorStore()
# Check upsert with id
assert await store.aupsert([Document(id="1", page_content="hello")]) == {
"succeeded": ["1"],
"failed": [],
}
assert await store.aget_by_ids(["1"]) == [Document(id="1", page_content="hello")]
# Check upsert without id
response = await store.aupsert([Document(page_content="world")])
assert len(response["succeeded"]) == 1
id_ = response["succeeded"][0]
assert id_ is not None
assert await store.aget_by_ids([id_]) == [Document(id=id_, page_content="world")]
# Check that default implementation of add_texts works
assert await store.aadd_texts(["hello", "world"], ids=["3", "4"]) == ["3", "4"]
assert await store.aget_by_ids(["3", "4"]) == [
Document(id="3", page_content="hello"),
Document(id="4", page_content="world"),
]
# Add texts without ids
ids_ = await store.aadd_texts(["foo", "bar"])
assert len(ids_) == 2
assert await store.aget_by_ids(ids_) == [
Document(id=ids_[0], page_content="foo"),
Document(id=ids_[1], page_content="bar"),
]
# Add texts with metadatas
ids_2 = await store.aadd_texts(["foo", "bar"], metadatas=[{"foo": "bar"}] * 2)
assert len(ids_2) == 2
assert await store.aget_by_ids(ids_2) == [
Document(id=ids_2[0], page_content="foo", metadata={"foo": "bar"}),
Document(id=ids_2[1], page_content="bar", metadata={"foo": "bar"}),
]
# Check that add_documents works
assert await store.aadd_documents([Document(id="5", page_content="baz")]) == ["5"]
# Test add documents with id specified in both document and ids
original_document = Document(id="7", page_content="baz")
assert await store.aadd_documents([original_document], ids=["6"]) == ["6"]
assert original_document.id == "7" # original document should not be modified
assert await store.aget_by_ids(["6"]) == [Document(id="6", page_content="baz")]