Compare commits

...

10 Commits

Author SHA1 Message Date
Eugene Yurtsev
d7b7a5f0ef xx 2024-07-12 10:36:57 -04:00
Eugene Yurtsev
f4b4717980 x 2024-07-11 16:45:30 -04:00
Eugene Yurtsev
acfaf78057 x 2024-07-11 16:45:14 -04:00
Eugene Yurtsev
4795a022c2 x 2024-07-11 16:30:54 -04:00
Eugene Yurtsev
decb12f635 qx 2024-07-11 16:28:27 -04:00
Eugene Yurtsev
00f451ca53 qxqx 2024-07-11 14:52:39 -04:00
Eugene Yurtsev
465ed2e893 qxqx 2024-07-11 14:05:23 -04:00
Eugene Yurtsev
5bd2ece15b qxqx 2024-07-11 14:00:01 -04:00
Eugene Yurtsev
64f496610f Stashing current work 2024-07-11 10:18:27 -04:00
Eugene Yurtsev
b1d0bf4a99 qxqx 2024-07-10 15:37:19 -04:00
9 changed files with 573 additions and 186 deletions

View File

@@ -7,6 +7,7 @@ import json
import uuid
from itertools import islice
from typing import (
TYPE_CHECKING,
Any,
AsyncIterable,
AsyncIterator,
@@ -29,7 +30,9 @@ from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.indexing.base import RecordManager
from langchain_core.pydantic_v1 import root_validator
from langchain_core.vectorstores import VectorStore
if TYPE_CHECKING:
from langchain_core.vectorstores import VectorStore
# Magic UUID to use as a namespace for hashing.
# Used to try and generate a unique UUID for each document
@@ -265,6 +268,9 @@ def index(
"delete" and "add_documents" required methods.
ValueError: If source_id_key is not None, but is not a string or callable.
"""
# Local scope to avoid circular imports
from langchain_core.vectorstores import VectorStore
if cleanup not in {"incremental", "full", None}:
raise ValueError(
f"cleanup should be one of 'incremental', 'full' or None. "
@@ -478,6 +484,8 @@ async def aindex(
"adelete" and "aadd_documents" required methods.
ValueError: If source_id_key is not None, but is not a string or callable.
"""
# Local scope to avoid circular imports
from langchain_core.vectorstores import VectorStore
if cleanup not in {"incremental", "full", None}:
raise ValueError(

View File

@@ -447,3 +447,12 @@ class UpsertResponse(TypedDict):
"""The IDs that were successfully indexed."""
failed: List[str]
"""The IDs that failed to index."""
class DeleteResponse(TypedDict, total=False):
"""A response to a delete request."""
num_deleted: int
num_failed: int
succeeded: Sequence[str]
failed: Sequence[str]

View File

@@ -0,0 +1,346 @@
"""A generic indexing interface for storing and querying content."""
import abc
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Generic,
Iterable,
Iterator,
List,
NotRequired,
Sequence,
TypedDict,
TypeVar,
Optional,
Union,
Dict,
Literal,
)
from langchain_core._api import beta
from langchain_core.documents.base import BaseMedia
from langchain_core.indexing.base import DeleteResponse, UpsertResponse
from langchain_core.runnables import run_in_executor
from langchain_core.utils import abatch_iterate, batch_iterate
class Sort(TypedDict):
"""Sort order for the results."""
field: str
"""The field to sort by."""
ascending: NotRequired[bool]
"""Sort order. True for ascending, False for descending.
If missing, the default sort order is ascending.
"""
# Need to compare against supported filtering operators
Comparator = Literal[
"$eq", "$ne", "$lt", "$lte", "$gt", "$gte", "$in", "$nin", "$exists"
]
Operator = Literal["$and", "$or", "$not"]
class Description(TypedDict, total=False):
"""Description of the index."""
supported_comparators: List[str] # Set to [] if filtering is not supported
supported_operators: List[str] # Set to [] if filtering is not supported
supports_sort: bool
supports_pagination: bool
T = TypeVar("T", bound=BaseMedia)
Q = TypeVar("Q")
class BaseIndex(Generic[T, Q]):
"""An index represent a collection of items that can be queried.
This indexing interface is designed to be a generic abstraction for storing and
querying content that has an ID and metadata associated with it.
The interface is designed to be agnostic to the underlying implementation of the
indexing system.
The interface is designed to support the following operations:
1. Storing content in the index.
2. Retrieving content by ID.
3. Querying the content based on the metadata associated with the content.
The implementation is **NOT** responsible for supporting search queries
against the content itself! Such a responsibility is left for more specialized
interfaces like the VectorStore.
While strongly encouraged, implementations are not required to support
querying based on metadata. Such implementations override the `get_by_query`
and `delete_by_query` methods to raise a NotImplementedError.
.. versionadded:: 0.2.15
"""
# Developer guidelines:
# Do not override streaming_upsert!
# This interface will likely be extended in the future with additional support
# to deal with failures and retries.
@beta(message="Added in 0.2.11. The API is subject to change.")
def streaming_upsert(
self, items: Iterable[T], /, batch_size: int, **kwargs: Any
) -> Iterator[UpsertResponse]:
"""Upsert items in a streaming fashion.
Args:
items: Iterable of content to add to the vectorstore.
batch_size: The size of each batch to upsert.
**kwargs: Additional keyword arguments.
kwargs should only include parameters that are common to all
items. (e.g., timeout for indexing, retry policy, etc.)
kwargs should not include ids to avoid ambiguous semantics.
Instead the ID should be provided as part of the item.
.. versionadded:: 0.2.11
"""
# The default implementation of this method breaks the input into
# batches of size `batch_size` and calls the `upsert` method on each batch.
# Subclasses can override this method to provide a more efficient
# implementation.
for item_batch in batch_iterate(batch_size, items):
yield self.upsert(item_batch, **kwargs)
@beta(message="Added in 0.2.15. The API is subject to change.")
def upsert(self, items: Sequence[T], /, **kwargs: Any) -> UpsertResponse:
"""Upsert items into the index.
The upsert functionality should utilize the ID field of the content object
if it is provided. If the ID is not provided, the upsert method is free
to generate an ID for the content.
When an ID is specified and the content already exists in the vectorstore,
the upsert method should update the content with the new data. If the content
does not exist, the upsert method should add the item to the vectorstore.
Args:
items: Sequence of items to add to the vectorstore.
**kwargs: Additional keyword arguments.
Returns:
UpsertResponse: A response object that contains the list of IDs that were
successfully added or updated in the vectorstore and the list of IDs that
failed to be added or updated.
.. versionadded:: 0.2.15
"""
@beta(message="Added in 0.2.11. The API is subject to change.")
async def astreaming_upsert(
self,
items: AsyncIterable[T],
/,
batch_size: int,
**kwargs: Any,
) -> AsyncIterator[UpsertResponse]:
"""Upsert items in a streaming fashion. Async version of streaming_upsert.
Args:
items: Iterable of items to add to the vectorstore.
batch_size: The size of each batch to upsert.
**kwargs: Additional keyword arguments.
kwargs should only include parameters that are common to all
items. (e.g., timeout for indexing, retry policy, etc.)
kwargs should not include ids to avoid ambiguous semantics.
Instead the ID should be provided as part of the item object.
.. versionadded:: 0.2.11
"""
async for batch in abatch_iterate(batch_size, items):
yield await self.aupsert(batch, **kwargs)
@beta(message="Added in 0.2.15. The API is subject to change.")
async def aupsert(self, items: Sequence[T], /, **kwargs: Any) -> UpsertResponse:
"""Add or update items in the vectorstore. Async version of upsert.
The upsert functionality should utilize the ID field of the item
if it is provided. If the ID is not provided, the upsert method is free
to generate an ID for the item.
When an ID is specified and the item already exists in the vectorstore,
the upsert method should update the item with the new data. If the item
does not exist, the upsert method should add the item to the vectorstore.
Args:
items: Sequence of items to add to the vectorstore.
**kwargs: Additional keyword arguments.
Returns:
UpsertResponse: A response object that contains the list of IDs that were
successfully added or updated in the vectorstore and the list of IDs that
failed to be added or updated.
.. versionadded:: 0.2.15
"""
return await run_in_executor(
None,
self.upsert,
items,
**kwargs,
)
@abc.abstractmethod
def get_by_ids(
self,
ids: Sequence[str],
/,
) -> List[T]:
"""Get items by id.
Fewer items may be returned than requested if some IDs are not found or
if there are duplicated IDs.
Users should not assume that the order of the returned items matches
the order of the input IDs. Instead, users should rely on the ID field of the
returned items.
This method should **NOT** raise exceptions if no items are found for
some IDs.
"""
@abc.abstractmethod
def delete_by_ids(
self,
ids: Sequence[str],
/,
**kwargs: Any,
) -> DeleteResponse:
"""Delete by IDs or other criteria.
Args:
ids: List of ids to delete.
kwargs: Additional keyword arguments. This is up to the implementation.
Returns:
DeleteResponse: A response object that contains the list of IDs that were
successfully deleted and the list of IDs that failed to be deleted.
"""
# Delete and get are part of the READ/WRITE interface.
# They do not take advantage of indexes on the content.
# However, all the indexers ARE assumed to have the capability to index
# on metadata if they implement the get_by_filter and delete_by_filter methods.
@abc.abstractmethod
def get_by_filter(
self,
*,
filter: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
limit: Optional[int] = None,
sort: Optional[Sort] = None,
**kwargs: Any,
) -> Iterable[T]:
"""Get items by a filter query.
Args:
filter: A filter to apply to the query. Must be a valid filter.
Expected to follow the standard LangChain filtering syntax.
limit: Number of items to return.
sort: Sort order for the results if supported by the index.
**kwargs: Additional keyword arguments.
"""
# Developer guidelines
# 1. The filter should be a dictionary or a list of dictionaries.
# 2. An invalid filter should raise an exception.
# 3. A None filter is considered valid and should return items.
# 4. The **default** filter syntax should follow standard LangChain
# filtering syntax.
# The syntax is as follows:
# - All operators and comparators should be prefixed with a "$".
# - Field names are expected to be valid identifiers allowing [a-zA-Z0-9_]
# only.
# - Top level dict with multiple keys should be treated as an "$and" query.
# - Top level list should be treated as an "$and" query.
# - A key that starts with "$" should be treated as an operator or comparator
# (e.g., "$and", "$or", "$not", "$eq", "$ne", "$lt", "$lte", "$gt", "$gte",
# - A key that is not prefixed with "$" should be treated as a field name.
# 5. Supported filtering operators should be documented in the description
# of the index.
# 6. Providers are free to support **additional** types of filtering operators
# to do that, they should define the filter as
# Union[existing_format, provider_format]
# the provider format should contain an extra `type`
# field, so that it could be distinguished from the standard format.
# We suggest for the type value to be "provider". The rest of the syntax is
# up to the provider to define.
#
# For example:
# {
# "type": "provider",
# "filter": "and(or(eq('field', 'value'), eq('field2', 'value2')))"A
# }
@abc.abstractmethod
def delete_by_filter(
self,
filter: Union[Dict[str, Any], List[Dict[str, Any]]],
/,
**kwargs: Any,
) -> Iterable[DeleteResponse]:
"""Delete items by a filter.
Args:
filter: A filter to apply to the query. Must be a valid filter.
Expected to follow the standard LangChain filtering syntax.
**kwargs: Additional keyword arguments.
Returns:
Iterable[DeleteResponse]: An iterable of delete responses.
"""
# Developer guidelines:
# 1. The filter should be a dictionary or a list of dictionaries.
# 2. An invalid filter should raise an exception.
# 3. An empty filter is considered invalid and should raise an exception.
# 4. The **default** filter syntax should follow standard LangChain
# filtering syntax.
# The syntax is as follows:
# - All operators and comparators should be prefixed with a "$".
# - Field names are expected to be valid identifiers allowing [a-zA-Z0-9_]
# only.
# - Top level dict with multiple keys should be treated as an "$and" query.
# - Top level list should be treated as an "$and" query.
# - A key that starts with "$" should be treated as an operator or comparator
# (e.g., "$and", "$or", "$not", "$eq", "$ne", "$lt", "$lte", "$gt", "$gte",
# - A key that is not prefixed with "$" should be treated as a field name.
# 5. Supported filtering operators should be documented in the description
# of the index.
# 6. Providers are free to support **additional** types of filtering operators
# to do that, they should define the filter as
# Union[existing_format, provider_format]
# the provider format should contain an extra `type`
# field, so that it could be distinguished from the standard format.
# We suggest for the type value to be "provider". The rest of the syntax is
# up to the provider to define.
#
# For example:
# {
# "type": "provider",
# "filter": "and(or(eq('field', 'value'), eq('field2', 'value2')))"A
# }
@classmethod
def describe(cls) -> Description:
"""Get a description of the functionality supported by the index."""
# Developer guidelines:
# Developers are encouraged to override this method to provide a
# detailed description of the functionality supported by the index.
# The description will be used in the following manners:
# 1. Surfaced in the documentation to provide users with an overview of
# the functionality supported by the index.
# 2. Used by standard test suites to verify that the index actually supports
# the functionality it claims to support correctly.
# 3. By you, the developer, to leverage utility code that will be used to
# provide run-time validation of user provided queries.
# 4. Will be accessible to users in an interactive environment to help them
# understand the capabilities of the index.
raise NotImplementedError(f"{cls.__name__} does not implement describe method.")

View File

@@ -24,19 +24,16 @@ from __future__ import annotations
import logging
import math
import warnings
from abc import ABC, abstractmethod
from abc import abstractmethod
from itertools import cycle
from typing import (
TYPE_CHECKING,
Any,
AsyncIterable,
AsyncIterator,
Callable,
ClassVar,
Collection,
Dict,
Iterable,
Iterator,
List,
Optional,
Sequence,
@@ -51,23 +48,23 @@ from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables.config import run_in_executor
from langchain_core.utils.aiter import abatch_iterate
from langchain_core.utils.iter import batch_iterate
if TYPE_CHECKING:
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForRetrieverRun,
CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document
from langchain_core.indexing.base import UpsertResponse
from langchain_core.documents.base import Document
from langchain_core.indexing.base_index import BaseIndex
logger = logging.getLogger(__name__)
VST = TypeVar("VST", bound="VectorStore")
class VectorStore(ABC):
class VectorStore(BaseIndex[Document]):
"""Interface for vector store."""
def add_texts(
@@ -130,32 +127,6 @@ class VectorStore(ABC):
f"`add_texts` has not been implemented for {self.__class__.__name__} "
)
# Developer guidelines:
# Do not override streaming_upsert!
@beta(message="Added in 0.2.11. The API is subject to change.")
def streaming_upsert(
self, items: Iterable[Document], /, batch_size: int, **kwargs: Any
) -> Iterator[UpsertResponse]:
"""Upsert documents in a streaming fashion.
Args:
items: Iterable of Documents to add to the vectorstore.
batch_size: The size of each batch to upsert.
**kwargs: Additional keyword arguments.
kwargs should only include parameters that are common to all
documents. (e.g., timeout for indexing, retry policy, etc.)
kwargs should not include ids to avoid ambiguous semantics.
Instead the ID should be provided as part of the Document object.
.. versionadded:: 0.2.11
"""
# The default implementation of this method breaks the input into
# batches of size `batch_size` and calls the `upsert` method on each batch.
# Subclasses can override this method to provide a more efficient
# implementation.
for item_batch in batch_iterate(batch_size, items):
yield self.upsert(item_batch, **kwargs)
# Please note that we've added a new method `upsert` instead of re-using the
# existing `add_documents` method.
# This was done to resolve potential ambiguities around the behavior of **kwargs
@@ -225,30 +196,6 @@ class VectorStore(ABC):
f"upsert has not been implemented for {self.__class__.__name__}"
)
@beta(message="Added in 0.2.11. The API is subject to change.")
async def astreaming_upsert(
self,
items: AsyncIterable[Document],
/,
batch_size: int,
**kwargs: Any,
) -> AsyncIterator[UpsertResponse]:
"""Upsert documents in a streaming fashion. Async version of streaming_upsert.
Args:
items: Iterable of Documents to add to the vectorstore.
batch_size: The size of each batch to upsert.
**kwargs: Additional keyword arguments.
kwargs should only include parameters that are common to all
documents. (e.g., timeout for indexing, retry policy, etc.)
kwargs should not include ids to avoid ambiguous semantics.
Instead the ID should be provided as part of the Document object.
.. versionadded:: 0.2.11
"""
async for batch in abatch_iterate(batch_size, items):
yield await self.aupsert(batch, **kwargs)
@beta(message="Added in 0.2.11. The API is subject to change.")
async def aupsert(
self, items: Sequence[Document], /, **kwargs: Any

View File

@@ -1,4 +1,4 @@
from typing import Any, Iterable, List, Optional, cast
from typing import Any, Iterable, List, Optional, Sequence, cast
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings, FakeEmbeddings
@@ -6,6 +6,7 @@ from langchain_core.example_selectors import (
MaxMarginalRelevanceExampleSelector,
SemanticSimilarityExampleSelector,
)
from langchain_core.indexing.base import DeleteResponse
from langchain_core.vectorstores import VectorStore
@@ -31,6 +32,16 @@ class DummyVectorStore(VectorStore):
self.metadatas.extend(metadatas)
return ["dummy_id"]
def get_by_ids(self, ids: Sequence[str], /) -> List[Document]:
raise NotImplementedError()
def delete_by_ids(
self,
ids: Sequence[str],
/,
) -> DeleteResponse:
raise NotImplementedError()
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:

View File

@@ -2,25 +2,21 @@ from datetime import datetime
from typing import (
Any,
AsyncIterator,
Dict,
Iterable,
Iterator,
List,
Optional,
Sequence,
Type,
)
from unittest.mock import patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytest_asyncio
from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.embeddings import DeterministicFakeEmbedding
from langchain_core.indexing import InMemoryRecordManager, aindex, index
from langchain_core.indexing.api import _abatch, _HashedDocument
from langchain_core.vectorstores import VST, VectorStore
from langchain_core.vectorstores import InMemoryVectorStore, VectorStore
class ToyLoader(BaseLoader):
@@ -42,101 +38,6 @@ class ToyLoader(BaseLoader):
yield document
class InMemoryVectorStore(VectorStore):
"""In-memory implementation of VectorStore using a dictionary."""
def __init__(self, permit_upserts: bool = False) -> None:
"""Vector store interface for testing things in memory."""
self.store: Dict[str, Document] = {}
self.permit_upserts = permit_upserts
def delete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None:
"""Delete the given documents from the store using their IDs."""
if ids:
for _id in ids:
self.store.pop(_id, None)
async def adelete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None:
"""Delete the given documents from the store using their IDs."""
if ids:
for _id in ids:
self.store.pop(_id, None)
def add_documents( # type: ignore
self,
documents: Sequence[Document],
*,
ids: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Add the given documents to the store (insert behavior)."""
if ids and len(ids) != len(documents):
raise ValueError(
f"Expected {len(ids)} ids, got {len(documents)} documents."
)
if not ids:
raise NotImplementedError("This is not implemented yet.")
for _id, document in zip(ids, documents):
if _id in self.store and not self.permit_upserts:
raise ValueError(
f"Document with uid {_id} already exists in the store."
)
self.store[_id] = document
return list(ids)
async def aadd_documents(
self,
documents: Sequence[Document],
*,
ids: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> List[str]:
if ids and len(ids) != len(documents):
raise ValueError(
f"Expected {len(ids)} ids, got {len(documents)} documents."
)
if not ids:
raise NotImplementedError("This is not implemented yet.")
for _id, document in zip(ids, documents):
if _id in self.store and not self.permit_upserts:
raise ValueError(
f"Document with uid {_id} already exists in the store."
)
self.store[_id] = document
return list(ids)
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[Any, Any]]] = None,
**kwargs: Any,
) -> List[str]:
"""Add the given texts to the store (insert behavior)."""
raise NotImplementedError()
@classmethod
def from_texts(
cls: Type[VST],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[Dict[Any, Any]]] = None,
**kwargs: Any,
) -> VST:
"""Create a vector store from a list of texts."""
raise NotImplementedError()
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
"""Find the most similar documents to the given query."""
raise NotImplementedError()
@pytest.fixture
def record_manager() -> InMemoryRecordManager:
"""Timestamped set fixture."""
@@ -156,13 +57,15 @@ async def arecord_manager() -> InMemoryRecordManager:
@pytest.fixture
def vector_store() -> InMemoryVectorStore:
"""Vector store fixture."""
return InMemoryVectorStore()
embeddings = DeterministicFakeEmbedding(size=5)
return InMemoryVectorStore(embeddings)
@pytest.fixture
def upserting_vector_store() -> InMemoryVectorStore:
"""Vector store fixture."""
return InMemoryVectorStore(permit_upserts=True)
embeddings = DeterministicFakeEmbedding(size=5)
return InMemoryVectorStore(embeddings)
def test_indexing_same_content(
@@ -286,7 +189,7 @@ def test_index_simple_delete_full(
doc_texts = set(
# Ignoring type since doc should be in the store and not a None
vector_store.store.get(uid).page_content # type: ignore
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
)
assert doc_texts == {"mutated document 1", "This is another document."}
@@ -368,7 +271,7 @@ async def test_aindex_simple_delete_full(
doc_texts = set(
# Ignoring type since doc should be in the store and not a None
vector_store.store.get(uid).page_content # type: ignore
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
)
assert doc_texts == {"mutated document 1", "This is another document."}
@@ -659,7 +562,7 @@ def test_incremental_delete(
doc_texts = set(
# Ignoring type since doc should be in the store and not a None
vector_store.store.get(uid).page_content # type: ignore
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
)
assert doc_texts == {"This is another document.", "This is a test document."}
@@ -718,7 +621,7 @@ def test_incremental_delete(
doc_texts = set(
# Ignoring type since doc should be in the store and not a None
vector_store.store.get(uid).page_content # type: ignore
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
)
assert doc_texts == {
@@ -786,7 +689,7 @@ def test_incremental_indexing_with_batch_size(
doc_texts = set(
# Ignoring type since doc should be in the store and not a None
vector_store.store.get(uid).page_content # type: ignore
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
)
assert doc_texts == {"1", "2", "3", "4"}
@@ -836,7 +739,7 @@ def test_incremental_delete_with_batch_size(
doc_texts = set(
# Ignoring type since doc should be in the store and not a None
vector_store.store.get(uid).page_content # type: ignore
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
)
assert doc_texts == {"1", "2", "3", "4"}
@@ -981,7 +884,7 @@ async def test_aincremental_delete(
doc_texts = set(
# Ignoring type since doc should be in the store and not a None
vector_store.store.get(uid).page_content # type: ignore
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
)
assert doc_texts == {"This is another document.", "This is a test document."}
@@ -1040,7 +943,7 @@ async def test_aincremental_delete(
doc_texts = set(
# Ignoring type since doc should be in the store and not a None
vector_store.store.get(uid).page_content # type: ignore
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
)
assert doc_texts == {
@@ -1232,8 +1135,10 @@ def test_deduplication_v2(
# using in memory implementation here
assert isinstance(vector_store, InMemoryVectorStore)
ids = list(vector_store.store.keys())
contents = sorted(
[document.page_content for document in vector_store.store.values()]
[document.page_content for document in vector_store.get_by_ids(ids)]
)
assert contents == ["1", "2", "3"]
@@ -1370,11 +1275,19 @@ def test_indexing_custom_batch_size(
ids = [_HashedDocument.from_document(doc).uid for doc in docs]
batch_size = 1
with patch.object(vector_store, "add_documents") as mock_add_documents:
original = vector_store.add_documents
try:
mock_add_documents = MagicMock()
vector_store.add_documents = mock_add_documents
index(docs, record_manager, vector_store, batch_size=batch_size)
args, kwargs = mock_add_documents.call_args
assert args == (docs,)
assert kwargs == {"ids": ids, "batch_size": batch_size}
finally:
vector_store.add_documents = original
async def test_aindexing_custom_batch_size(
@@ -1390,8 +1303,9 @@ async def test_aindexing_custom_batch_size(
ids = [_HashedDocument.from_document(doc).uid for doc in docs]
batch_size = 1
with patch.object(vector_store, "aadd_documents") as mock_add_documents:
await aindex(docs, arecord_manager, vector_store, batch_size=batch_size)
args, kwargs = mock_add_documents.call_args
assert args == (docs,)
assert kwargs == {"ids": ids, "batch_size": batch_size}
mock_add_documents = AsyncMock()
vector_store.aadd_documents = mock_add_documents
await aindex(docs, arecord_manager, vector_store, batch_size=batch_size)
args, kwargs = mock_add_documents.call_args
assert args == (docs,)
assert kwargs == {"ids": ids, "batch_size": batch_size}

View File

@@ -7,7 +7,7 @@ from typing_extensions import TypedDict
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.indexing.base import UpsertResponse
from langchain_core.indexing.base import DeleteResponse, UpsertResponse
from langchain_core.vectorstores import VectorStore
@@ -68,6 +68,18 @@ class CustomSyncVectorStore(VectorStore):
def get_by_ids(self, ids: Sequence[str], /) -> List[Document]:
return [self.store[id] for id in ids if id in self.store]
def delete_by_ids(
self,
ids: Sequence[str],
/,
) -> DeleteResponse:
for id_ in ids:
self.store.pop(id_, None)
return {
"succeeded": ids,
"failed": [],
}
def from_texts( # type: ignore
cls,
texts: List[str],

View File

@@ -54,9 +54,11 @@ class MultiVectorRetriever(BaseRetriever):
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
"""Get documents relevant to a query.
Args:
query: String to find relevant documents for
run_manager: The callbacks handler to use
Returns:
List of relevant documents
"""
@@ -79,9 +81,11 @@ class MultiVectorRetriever(BaseRetriever):
self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
) -> List[Document]:
"""Asynchronously get documents relevant to a query.
Args:
query: String to find relevant documents for
run_manager: The callbacks handler to use
Returns:
List of relevant documents
"""

View File

@@ -1,13 +1,19 @@
import uuid
from typing import Any, List, Optional, Sequence
from langchain_core.documents import Document
from langchain.pydantic_v1 import BaseModel
from langchain.retrievers import MultiVectorRetriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import BaseDocumentTransformer, Document
from langchain_core.indexing import UpsertResponse
from langchain_core.indexing.base import DeleteResponse
from langchain_core.indexing.base_index import BaseIndex
from langchain_core.retrievers import BaseRetriever
from langchain_core.vectorstores import VectorStore
from langchain_text_splitters import TextSplitter
from langchain.retrievers import MultiVectorRetriever
class ParentDocumentRetriever(MultiVectorRetriever):
class ParentDocumentRetriever(MultiVectorRetriever, BaseIndex[Document]):
"""Retrieve small chunks then retrieve their parent documents.
When splitting documents for retrieval, there are often conflicting desires:
@@ -58,8 +64,6 @@ class ParentDocumentRetriever(MultiVectorRetriever):
child_splitter: TextSplitter
"""The text splitter to use to create child documents."""
"""The key to use to track the parent id. This will be stored in the
metadata of child documents."""
parent_splitter: Optional[TextSplitter] = None
"""The text splitter to use to create parent documents.
If none, then the parent documents will be the raw documents passed in."""
@@ -123,3 +127,135 @@ class ParentDocumentRetriever(MultiVectorRetriever):
self.vectorstore.add_documents(docs, **kwargs)
if add_to_docstore:
self.docstore.mset(full_docs)
## V2 Implementation
class ParentRetrieverV2(BaseRetriever):
underlying_retriever: BaseRetriever
"""The underlying retriever to use to retrieve the parent documents."""
id_key: str = "doc_id"
"""The key to use to look up the parent documents."""
store: BaseIndex[Document]
transformer: BaseDocumentTransformer
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
"""Get documents relevant to a query.
Args:
query: String to find relevant documents for
run_manager: The callback handler to use
Returns:
List of relevant documents
"""
# Config is a problem for composition?
sub_docs = self.underlying_retriever.invoke(
query, config={"callbacks": run_manager}
)
ids = []
for d in sub_docs:
if self.id_key in d.metadata and d.metadata[self.id_key] not in ids:
ids.append(d.metadata[self.id_key])
docs = self.store.get_by_ids(ids)
return docs
# Needs a better name
class FullDocumentIndex(BaseIndex[Document], BaseModel):
"""A specialized index that stores small chunks of data and their embeddings."""
vectorstore: VectorStore # <-- Unnecessarily strict. We should just create a QueryableIndex here
"""A specialized index that stores small chunks of data and their embeddings."""
store: BaseIndex[Document]
"""The storage interface for the parent documents"""
id_key: str = "doc_id"
"""The key to use to look up the parent documents."""
chunker: BaseDocumentTransformer
"""Used to chunk the source documents into small chunks that can will be searched."""
def upsert(
self,
items: Sequence[Document],
/,
vector_store_kwargs: Optional[dict] = None,
**kwargs: Any,
) -> UpsertResponse:
"""Upsert documents into the index and vectorstore."""
for item in items:
if item.id is None:
raise ValueError("Document must have an ID.")
child_docs = []
# This logic is inefficient since we don't have a good way to keep
# track of the original document
for doc in items:
# Can't do this efficiently cuz we have to keep track of the original
# document
sub_docs = self.chunker.transform_documents([doc])
for sub_doc in sub_docs:
# Select the metadata for the child documents
for _doc in sub_doc:
_doc.metadata = {
k: _doc.metadata[k] for k in self.child_metadata_fields
}
# Add the parent id to the child documents
_doc.metadata[self.id_key] = doc.id
child_docs.extend(sub_docs)
# Needs to clean UP first to keep things synchronized.
self.vectorstore.delete_by_filter(
{
"filter": {
self.id_key: {
"$in": [doc.id for doc in items],
}
}
}
)
self.vectorstore.upsert(child_docs, **(vector_store_kwargs or {}))
return self.docstore.upsert(items)
def get_by_ids(self, ids: Sequence[str], /) -> List[Document]:
"""Get documents by their ids."""
return self.store.get_by_ids(ids)
def delete_by_ids(
self,
ids: Sequence[str],
/,
) -> DeleteResponse:
"""Delete documents by their ids."""
# First delete from vectorstore
self.vectorstore.delete_by_filter(
{
"filter": {
self.id_key: {
"$in": ids,
}
}
}
)
delete_response = self.store.delete_by_ids(ids)
return delete_response
# This could be an argument for accepting kwargs in get_by_ids!
def get_by_ids_from_vectorstore(self, ids: Sequence[str], /) -> List[Document]:
"""Get documents by their ids."""
return self.vectorstore.get_by_ids(ids)
# We should inherit from a more generalized version of a retriever
# so we don't have to do get_retriever()
def get_retriever(self, **kwargs) -> ParentRetrieverV2:
"""Get documents by their ids."""
# We do this to maintain the order of the ids that are returned
return ParentRetrieverV2(
underlying_retriever=self.vectortore.as_retriever(**kwargs),
store=self.store,
)