mirror of
https://github.com/hwchase17/langchain.git
synced 2026-02-03 15:55:44 +00:00
Compare commits
10 Commits
langchain-
...
eugene/ind
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7b7a5f0ef | ||
|
|
f4b4717980 | ||
|
|
acfaf78057 | ||
|
|
4795a022c2 | ||
|
|
decb12f635 | ||
|
|
00f451ca53 | ||
|
|
465ed2e893 | ||
|
|
5bd2ece15b | ||
|
|
64f496610f | ||
|
|
b1d0bf4a99 |
@@ -7,6 +7,7 @@ import json
|
||||
import uuid
|
||||
from itertools import islice
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
AsyncIterable,
|
||||
AsyncIterator,
|
||||
@@ -29,7 +30,9 @@ from langchain_core.document_loaders.base import BaseLoader
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.indexing.base import RecordManager
|
||||
from langchain_core.pydantic_v1 import root_validator
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
|
||||
# Magic UUID to use as a namespace for hashing.
|
||||
# Used to try and generate a unique UUID for each document
|
||||
@@ -265,6 +268,9 @@ def index(
|
||||
"delete" and "add_documents" required methods.
|
||||
ValueError: If source_id_key is not None, but is not a string or callable.
|
||||
"""
|
||||
# Local scope to avoid circular imports
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
|
||||
if cleanup not in {"incremental", "full", None}:
|
||||
raise ValueError(
|
||||
f"cleanup should be one of 'incremental', 'full' or None. "
|
||||
@@ -478,6 +484,8 @@ async def aindex(
|
||||
"adelete" and "aadd_documents" required methods.
|
||||
ValueError: If source_id_key is not None, but is not a string or callable.
|
||||
"""
|
||||
# Local scope to avoid circular imports
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
|
||||
if cleanup not in {"incremental", "full", None}:
|
||||
raise ValueError(
|
||||
|
||||
@@ -447,3 +447,12 @@ class UpsertResponse(TypedDict):
|
||||
"""The IDs that were successfully indexed."""
|
||||
failed: List[str]
|
||||
"""The IDs that failed to index."""
|
||||
|
||||
|
||||
class DeleteResponse(TypedDict, total=False):
|
||||
"""A response to a delete request."""
|
||||
|
||||
num_deleted: int
|
||||
num_failed: int
|
||||
succeeded: Sequence[str]
|
||||
failed: Sequence[str]
|
||||
|
||||
346
libs/core/langchain_core/indexing/base_index.py
Normal file
346
libs/core/langchain_core/indexing/base_index.py
Normal file
@@ -0,0 +1,346 @@
|
||||
"""A generic indexing interface for storing and querying content."""
|
||||
|
||||
import abc
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncIterable,
|
||||
AsyncIterator,
|
||||
Generic,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
NotRequired,
|
||||
Sequence,
|
||||
TypedDict,
|
||||
TypeVar,
|
||||
Optional,
|
||||
Union,
|
||||
Dict,
|
||||
Literal,
|
||||
)
|
||||
|
||||
from langchain_core._api import beta
|
||||
from langchain_core.documents.base import BaseMedia
|
||||
from langchain_core.indexing.base import DeleteResponse, UpsertResponse
|
||||
from langchain_core.runnables import run_in_executor
|
||||
from langchain_core.utils import abatch_iterate, batch_iterate
|
||||
|
||||
|
||||
class Sort(TypedDict):
|
||||
"""Sort order for the results."""
|
||||
|
||||
field: str
|
||||
"""The field to sort by."""
|
||||
ascending: NotRequired[bool]
|
||||
"""Sort order. True for ascending, False for descending.
|
||||
|
||||
If missing, the default sort order is ascending.
|
||||
"""
|
||||
|
||||
|
||||
# Need to compare against supported filtering operators
|
||||
Comparator = Literal[
|
||||
"$eq", "$ne", "$lt", "$lte", "$gt", "$gte", "$in", "$nin", "$exists"
|
||||
]
|
||||
Operator = Literal["$and", "$or", "$not"]
|
||||
|
||||
|
||||
class Description(TypedDict, total=False):
|
||||
"""Description of the index."""
|
||||
|
||||
supported_comparators: List[str] # Set to [] if filtering is not supported
|
||||
supported_operators: List[str] # Set to [] if filtering is not supported
|
||||
supports_sort: bool
|
||||
supports_pagination: bool
|
||||
|
||||
|
||||
T = TypeVar("T", bound=BaseMedia)
|
||||
Q = TypeVar("Q")
|
||||
|
||||
|
||||
class BaseIndex(Generic[T, Q]):
|
||||
"""An index represent a collection of items that can be queried.
|
||||
|
||||
This indexing interface is designed to be a generic abstraction for storing and
|
||||
querying content that has an ID and metadata associated with it.
|
||||
|
||||
The interface is designed to be agnostic to the underlying implementation of the
|
||||
indexing system.
|
||||
|
||||
The interface is designed to support the following operations:
|
||||
|
||||
1. Storing content in the index.
|
||||
2. Retrieving content by ID.
|
||||
3. Querying the content based on the metadata associated with the content.
|
||||
|
||||
The implementation is **NOT** responsible for supporting search queries
|
||||
against the content itself! Such a responsibility is left for more specialized
|
||||
interfaces like the VectorStore.
|
||||
|
||||
While strongly encouraged, implementations are not required to support
|
||||
querying based on metadata. Such implementations override the `get_by_query`
|
||||
and `delete_by_query` methods to raise a NotImplementedError.
|
||||
|
||||
.. versionadded:: 0.2.15
|
||||
"""
|
||||
|
||||
# Developer guidelines:
|
||||
# Do not override streaming_upsert!
|
||||
# This interface will likely be extended in the future with additional support
|
||||
# to deal with failures and retries.
|
||||
@beta(message="Added in 0.2.11. The API is subject to change.")
|
||||
def streaming_upsert(
|
||||
self, items: Iterable[T], /, batch_size: int, **kwargs: Any
|
||||
) -> Iterator[UpsertResponse]:
|
||||
"""Upsert items in a streaming fashion.
|
||||
|
||||
Args:
|
||||
items: Iterable of content to add to the vectorstore.
|
||||
batch_size: The size of each batch to upsert.
|
||||
**kwargs: Additional keyword arguments.
|
||||
kwargs should only include parameters that are common to all
|
||||
items. (e.g., timeout for indexing, retry policy, etc.)
|
||||
kwargs should not include ids to avoid ambiguous semantics.
|
||||
Instead the ID should be provided as part of the item.
|
||||
|
||||
.. versionadded:: 0.2.11
|
||||
"""
|
||||
# The default implementation of this method breaks the input into
|
||||
# batches of size `batch_size` and calls the `upsert` method on each batch.
|
||||
# Subclasses can override this method to provide a more efficient
|
||||
# implementation.
|
||||
for item_batch in batch_iterate(batch_size, items):
|
||||
yield self.upsert(item_batch, **kwargs)
|
||||
|
||||
@beta(message="Added in 0.2.15. The API is subject to change.")
|
||||
def upsert(self, items: Sequence[T], /, **kwargs: Any) -> UpsertResponse:
|
||||
"""Upsert items into the index.
|
||||
|
||||
The upsert functionality should utilize the ID field of the content object
|
||||
if it is provided. If the ID is not provided, the upsert method is free
|
||||
to generate an ID for the content.
|
||||
|
||||
When an ID is specified and the content already exists in the vectorstore,
|
||||
the upsert method should update the content with the new data. If the content
|
||||
does not exist, the upsert method should add the item to the vectorstore.
|
||||
|
||||
Args:
|
||||
items: Sequence of items to add to the vectorstore.
|
||||
**kwargs: Additional keyword arguments.
|
||||
|
||||
Returns:
|
||||
UpsertResponse: A response object that contains the list of IDs that were
|
||||
successfully added or updated in the vectorstore and the list of IDs that
|
||||
failed to be added or updated.
|
||||
|
||||
.. versionadded:: 0.2.15
|
||||
"""
|
||||
|
||||
@beta(message="Added in 0.2.11. The API is subject to change.")
|
||||
async def astreaming_upsert(
|
||||
self,
|
||||
items: AsyncIterable[T],
|
||||
/,
|
||||
batch_size: int,
|
||||
**kwargs: Any,
|
||||
) -> AsyncIterator[UpsertResponse]:
|
||||
"""Upsert items in a streaming fashion. Async version of streaming_upsert.
|
||||
|
||||
Args:
|
||||
items: Iterable of items to add to the vectorstore.
|
||||
batch_size: The size of each batch to upsert.
|
||||
**kwargs: Additional keyword arguments.
|
||||
kwargs should only include parameters that are common to all
|
||||
items. (e.g., timeout for indexing, retry policy, etc.)
|
||||
kwargs should not include ids to avoid ambiguous semantics.
|
||||
Instead the ID should be provided as part of the item object.
|
||||
|
||||
.. versionadded:: 0.2.11
|
||||
"""
|
||||
async for batch in abatch_iterate(batch_size, items):
|
||||
yield await self.aupsert(batch, **kwargs)
|
||||
|
||||
@beta(message="Added in 0.2.15. The API is subject to change.")
|
||||
async def aupsert(self, items: Sequence[T], /, **kwargs: Any) -> UpsertResponse:
|
||||
"""Add or update items in the vectorstore. Async version of upsert.
|
||||
|
||||
The upsert functionality should utilize the ID field of the item
|
||||
if it is provided. If the ID is not provided, the upsert method is free
|
||||
to generate an ID for the item.
|
||||
|
||||
When an ID is specified and the item already exists in the vectorstore,
|
||||
the upsert method should update the item with the new data. If the item
|
||||
does not exist, the upsert method should add the item to the vectorstore.
|
||||
|
||||
Args:
|
||||
items: Sequence of items to add to the vectorstore.
|
||||
**kwargs: Additional keyword arguments.
|
||||
|
||||
Returns:
|
||||
UpsertResponse: A response object that contains the list of IDs that were
|
||||
successfully added or updated in the vectorstore and the list of IDs that
|
||||
failed to be added or updated.
|
||||
|
||||
.. versionadded:: 0.2.15
|
||||
"""
|
||||
return await run_in_executor(
|
||||
None,
|
||||
self.upsert,
|
||||
items,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_by_ids(
|
||||
self,
|
||||
ids: Sequence[str],
|
||||
/,
|
||||
) -> List[T]:
|
||||
"""Get items by id.
|
||||
|
||||
Fewer items may be returned than requested if some IDs are not found or
|
||||
if there are duplicated IDs.
|
||||
|
||||
Users should not assume that the order of the returned items matches
|
||||
the order of the input IDs. Instead, users should rely on the ID field of the
|
||||
returned items.
|
||||
|
||||
This method should **NOT** raise exceptions if no items are found for
|
||||
some IDs.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_by_ids(
|
||||
self,
|
||||
ids: Sequence[str],
|
||||
/,
|
||||
**kwargs: Any,
|
||||
) -> DeleteResponse:
|
||||
"""Delete by IDs or other criteria.
|
||||
|
||||
Args:
|
||||
ids: List of ids to delete.
|
||||
kwargs: Additional keyword arguments. This is up to the implementation.
|
||||
|
||||
Returns:
|
||||
DeleteResponse: A response object that contains the list of IDs that were
|
||||
successfully deleted and the list of IDs that failed to be deleted.
|
||||
"""
|
||||
|
||||
# Delete and get are part of the READ/WRITE interface.
|
||||
# They do not take advantage of indexes on the content.
|
||||
# However, all the indexers ARE assumed to have the capability to index
|
||||
# on metadata if they implement the get_by_filter and delete_by_filter methods.
|
||||
@abc.abstractmethod
|
||||
def get_by_filter(
|
||||
self,
|
||||
*,
|
||||
filter: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
|
||||
limit: Optional[int] = None,
|
||||
sort: Optional[Sort] = None,
|
||||
**kwargs: Any,
|
||||
) -> Iterable[T]:
|
||||
"""Get items by a filter query.
|
||||
|
||||
Args:
|
||||
filter: A filter to apply to the query. Must be a valid filter.
|
||||
Expected to follow the standard LangChain filtering syntax.
|
||||
limit: Number of items to return.
|
||||
sort: Sort order for the results if supported by the index.
|
||||
**kwargs: Additional keyword arguments.
|
||||
"""
|
||||
# Developer guidelines
|
||||
# 1. The filter should be a dictionary or a list of dictionaries.
|
||||
# 2. An invalid filter should raise an exception.
|
||||
# 3. A None filter is considered valid and should return items.
|
||||
# 4. The **default** filter syntax should follow standard LangChain
|
||||
# filtering syntax.
|
||||
# The syntax is as follows:
|
||||
# - All operators and comparators should be prefixed with a "$".
|
||||
# - Field names are expected to be valid identifiers allowing [a-zA-Z0-9_]
|
||||
# only.
|
||||
# - Top level dict with multiple keys should be treated as an "$and" query.
|
||||
# - Top level list should be treated as an "$and" query.
|
||||
# - A key that starts with "$" should be treated as an operator or comparator
|
||||
# (e.g., "$and", "$or", "$not", "$eq", "$ne", "$lt", "$lte", "$gt", "$gte",
|
||||
# - A key that is not prefixed with "$" should be treated as a field name.
|
||||
# 5. Supported filtering operators should be documented in the description
|
||||
# of the index.
|
||||
# 6. Providers are free to support **additional** types of filtering operators
|
||||
# to do that, they should define the filter as
|
||||
# Union[existing_format, provider_format]
|
||||
# the provider format should contain an extra `type`
|
||||
# field, so that it could be distinguished from the standard format.
|
||||
# We suggest for the type value to be "provider". The rest of the syntax is
|
||||
# up to the provider to define.
|
||||
#
|
||||
# For example:
|
||||
# {
|
||||
# "type": "provider",
|
||||
# "filter": "and(or(eq('field', 'value'), eq('field2', 'value2')))"A
|
||||
# }
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_by_filter(
|
||||
self,
|
||||
filter: Union[Dict[str, Any], List[Dict[str, Any]]],
|
||||
/,
|
||||
**kwargs: Any,
|
||||
) -> Iterable[DeleteResponse]:
|
||||
"""Delete items by a filter.
|
||||
|
||||
Args:
|
||||
filter: A filter to apply to the query. Must be a valid filter.
|
||||
Expected to follow the standard LangChain filtering syntax.
|
||||
**kwargs: Additional keyword arguments.
|
||||
|
||||
Returns:
|
||||
Iterable[DeleteResponse]: An iterable of delete responses.
|
||||
"""
|
||||
# Developer guidelines:
|
||||
# 1. The filter should be a dictionary or a list of dictionaries.
|
||||
# 2. An invalid filter should raise an exception.
|
||||
# 3. An empty filter is considered invalid and should raise an exception.
|
||||
# 4. The **default** filter syntax should follow standard LangChain
|
||||
# filtering syntax.
|
||||
# The syntax is as follows:
|
||||
# - All operators and comparators should be prefixed with a "$".
|
||||
# - Field names are expected to be valid identifiers allowing [a-zA-Z0-9_]
|
||||
# only.
|
||||
# - Top level dict with multiple keys should be treated as an "$and" query.
|
||||
# - Top level list should be treated as an "$and" query.
|
||||
# - A key that starts with "$" should be treated as an operator or comparator
|
||||
# (e.g., "$and", "$or", "$not", "$eq", "$ne", "$lt", "$lte", "$gt", "$gte",
|
||||
# - A key that is not prefixed with "$" should be treated as a field name.
|
||||
# 5. Supported filtering operators should be documented in the description
|
||||
# of the index.
|
||||
# 6. Providers are free to support **additional** types of filtering operators
|
||||
# to do that, they should define the filter as
|
||||
# Union[existing_format, provider_format]
|
||||
# the provider format should contain an extra `type`
|
||||
# field, so that it could be distinguished from the standard format.
|
||||
# We suggest for the type value to be "provider". The rest of the syntax is
|
||||
# up to the provider to define.
|
||||
#
|
||||
# For example:
|
||||
# {
|
||||
# "type": "provider",
|
||||
# "filter": "and(or(eq('field', 'value'), eq('field2', 'value2')))"A
|
||||
# }
|
||||
|
||||
@classmethod
|
||||
def describe(cls) -> Description:
|
||||
"""Get a description of the functionality supported by the index."""
|
||||
# Developer guidelines:
|
||||
# Developers are encouraged to override this method to provide a
|
||||
# detailed description of the functionality supported by the index.
|
||||
# The description will be used in the following manners:
|
||||
# 1. Surfaced in the documentation to provide users with an overview of
|
||||
# the functionality supported by the index.
|
||||
# 2. Used by standard test suites to verify that the index actually supports
|
||||
# the functionality it claims to support correctly.
|
||||
# 3. By you, the developer, to leverage utility code that will be used to
|
||||
# provide run-time validation of user provided queries.
|
||||
# 4. Will be accessible to users in an interactive environment to help them
|
||||
# understand the capabilities of the index.
|
||||
raise NotImplementedError(f"{cls.__name__} does not implement describe method.")
|
||||
@@ -24,19 +24,16 @@ from __future__ import annotations
|
||||
import logging
|
||||
import math
|
||||
import warnings
|
||||
from abc import ABC, abstractmethod
|
||||
from abc import abstractmethod
|
||||
from itertools import cycle
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
AsyncIterable,
|
||||
AsyncIterator,
|
||||
Callable,
|
||||
ClassVar,
|
||||
Collection,
|
||||
Dict,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Sequence,
|
||||
@@ -51,23 +48,23 @@ from langchain_core.embeddings import Embeddings
|
||||
from langchain_core.pydantic_v1 import Field, root_validator
|
||||
from langchain_core.retrievers import BaseRetriever
|
||||
from langchain_core.runnables.config import run_in_executor
|
||||
from langchain_core.utils.aiter import abatch_iterate
|
||||
from langchain_core.utils.iter import batch_iterate
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langchain_core.callbacks.manager import (
|
||||
AsyncCallbackManagerForRetrieverRun,
|
||||
CallbackManagerForRetrieverRun,
|
||||
)
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.indexing.base import UpsertResponse
|
||||
|
||||
from langchain_core.documents.base import Document
|
||||
from langchain_core.indexing.base_index import BaseIndex
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
VST = TypeVar("VST", bound="VectorStore")
|
||||
|
||||
|
||||
class VectorStore(ABC):
|
||||
class VectorStore(BaseIndex[Document]):
|
||||
"""Interface for vector store."""
|
||||
|
||||
def add_texts(
|
||||
@@ -130,32 +127,6 @@ class VectorStore(ABC):
|
||||
f"`add_texts` has not been implemented for {self.__class__.__name__} "
|
||||
)
|
||||
|
||||
# Developer guidelines:
|
||||
# Do not override streaming_upsert!
|
||||
@beta(message="Added in 0.2.11. The API is subject to change.")
|
||||
def streaming_upsert(
|
||||
self, items: Iterable[Document], /, batch_size: int, **kwargs: Any
|
||||
) -> Iterator[UpsertResponse]:
|
||||
"""Upsert documents in a streaming fashion.
|
||||
|
||||
Args:
|
||||
items: Iterable of Documents to add to the vectorstore.
|
||||
batch_size: The size of each batch to upsert.
|
||||
**kwargs: Additional keyword arguments.
|
||||
kwargs should only include parameters that are common to all
|
||||
documents. (e.g., timeout for indexing, retry policy, etc.)
|
||||
kwargs should not include ids to avoid ambiguous semantics.
|
||||
Instead the ID should be provided as part of the Document object.
|
||||
|
||||
.. versionadded:: 0.2.11
|
||||
"""
|
||||
# The default implementation of this method breaks the input into
|
||||
# batches of size `batch_size` and calls the `upsert` method on each batch.
|
||||
# Subclasses can override this method to provide a more efficient
|
||||
# implementation.
|
||||
for item_batch in batch_iterate(batch_size, items):
|
||||
yield self.upsert(item_batch, **kwargs)
|
||||
|
||||
# Please note that we've added a new method `upsert` instead of re-using the
|
||||
# existing `add_documents` method.
|
||||
# This was done to resolve potential ambiguities around the behavior of **kwargs
|
||||
@@ -225,30 +196,6 @@ class VectorStore(ABC):
|
||||
f"upsert has not been implemented for {self.__class__.__name__}"
|
||||
)
|
||||
|
||||
@beta(message="Added in 0.2.11. The API is subject to change.")
|
||||
async def astreaming_upsert(
|
||||
self,
|
||||
items: AsyncIterable[Document],
|
||||
/,
|
||||
batch_size: int,
|
||||
**kwargs: Any,
|
||||
) -> AsyncIterator[UpsertResponse]:
|
||||
"""Upsert documents in a streaming fashion. Async version of streaming_upsert.
|
||||
|
||||
Args:
|
||||
items: Iterable of Documents to add to the vectorstore.
|
||||
batch_size: The size of each batch to upsert.
|
||||
**kwargs: Additional keyword arguments.
|
||||
kwargs should only include parameters that are common to all
|
||||
documents. (e.g., timeout for indexing, retry policy, etc.)
|
||||
kwargs should not include ids to avoid ambiguous semantics.
|
||||
Instead the ID should be provided as part of the Document object.
|
||||
|
||||
.. versionadded:: 0.2.11
|
||||
"""
|
||||
async for batch in abatch_iterate(batch_size, items):
|
||||
yield await self.aupsert(batch, **kwargs)
|
||||
|
||||
@beta(message="Added in 0.2.11. The API is subject to change.")
|
||||
async def aupsert(
|
||||
self, items: Sequence[Document], /, **kwargs: Any
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Any, Iterable, List, Optional, cast
|
||||
from typing import Any, Iterable, List, Optional, Sequence, cast
|
||||
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.embeddings import Embeddings, FakeEmbeddings
|
||||
@@ -6,6 +6,7 @@ from langchain_core.example_selectors import (
|
||||
MaxMarginalRelevanceExampleSelector,
|
||||
SemanticSimilarityExampleSelector,
|
||||
)
|
||||
from langchain_core.indexing.base import DeleteResponse
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
|
||||
|
||||
@@ -31,6 +32,16 @@ class DummyVectorStore(VectorStore):
|
||||
self.metadatas.extend(metadatas)
|
||||
return ["dummy_id"]
|
||||
|
||||
def get_by_ids(self, ids: Sequence[str], /) -> List[Document]:
|
||||
raise NotImplementedError()
|
||||
|
||||
def delete_by_ids(
|
||||
self,
|
||||
ids: Sequence[str],
|
||||
/,
|
||||
) -> DeleteResponse:
|
||||
raise NotImplementedError()
|
||||
|
||||
def similarity_search(
|
||||
self, query: str, k: int = 4, **kwargs: Any
|
||||
) -> List[Document]:
|
||||
|
||||
@@ -2,25 +2,21 @@ from datetime import datetime
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncIterator,
|
||||
Dict,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Sequence,
|
||||
Type,
|
||||
)
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from langchain_core.document_loaders.base import BaseLoader
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.embeddings import Embeddings
|
||||
from langchain_core.embeddings import DeterministicFakeEmbedding
|
||||
from langchain_core.indexing import InMemoryRecordManager, aindex, index
|
||||
from langchain_core.indexing.api import _abatch, _HashedDocument
|
||||
from langchain_core.vectorstores import VST, VectorStore
|
||||
from langchain_core.vectorstores import InMemoryVectorStore, VectorStore
|
||||
|
||||
|
||||
class ToyLoader(BaseLoader):
|
||||
@@ -42,101 +38,6 @@ class ToyLoader(BaseLoader):
|
||||
yield document
|
||||
|
||||
|
||||
class InMemoryVectorStore(VectorStore):
|
||||
"""In-memory implementation of VectorStore using a dictionary."""
|
||||
|
||||
def __init__(self, permit_upserts: bool = False) -> None:
|
||||
"""Vector store interface for testing things in memory."""
|
||||
self.store: Dict[str, Document] = {}
|
||||
self.permit_upserts = permit_upserts
|
||||
|
||||
def delete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None:
|
||||
"""Delete the given documents from the store using their IDs."""
|
||||
if ids:
|
||||
for _id in ids:
|
||||
self.store.pop(_id, None)
|
||||
|
||||
async def adelete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None:
|
||||
"""Delete the given documents from the store using their IDs."""
|
||||
if ids:
|
||||
for _id in ids:
|
||||
self.store.pop(_id, None)
|
||||
|
||||
def add_documents( # type: ignore
|
||||
self,
|
||||
documents: Sequence[Document],
|
||||
*,
|
||||
ids: Optional[Sequence[str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[str]:
|
||||
"""Add the given documents to the store (insert behavior)."""
|
||||
if ids and len(ids) != len(documents):
|
||||
raise ValueError(
|
||||
f"Expected {len(ids)} ids, got {len(documents)} documents."
|
||||
)
|
||||
|
||||
if not ids:
|
||||
raise NotImplementedError("This is not implemented yet.")
|
||||
|
||||
for _id, document in zip(ids, documents):
|
||||
if _id in self.store and not self.permit_upserts:
|
||||
raise ValueError(
|
||||
f"Document with uid {_id} already exists in the store."
|
||||
)
|
||||
self.store[_id] = document
|
||||
|
||||
return list(ids)
|
||||
|
||||
async def aadd_documents(
|
||||
self,
|
||||
documents: Sequence[Document],
|
||||
*,
|
||||
ids: Optional[Sequence[str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[str]:
|
||||
if ids and len(ids) != len(documents):
|
||||
raise ValueError(
|
||||
f"Expected {len(ids)} ids, got {len(documents)} documents."
|
||||
)
|
||||
|
||||
if not ids:
|
||||
raise NotImplementedError("This is not implemented yet.")
|
||||
|
||||
for _id, document in zip(ids, documents):
|
||||
if _id in self.store and not self.permit_upserts:
|
||||
raise ValueError(
|
||||
f"Document with uid {_id} already exists in the store."
|
||||
)
|
||||
self.store[_id] = document
|
||||
return list(ids)
|
||||
|
||||
def add_texts(
|
||||
self,
|
||||
texts: Iterable[str],
|
||||
metadatas: Optional[List[Dict[Any, Any]]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[str]:
|
||||
"""Add the given texts to the store (insert behavior)."""
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def from_texts(
|
||||
cls: Type[VST],
|
||||
texts: List[str],
|
||||
embedding: Embeddings,
|
||||
metadatas: Optional[List[Dict[Any, Any]]] = None,
|
||||
**kwargs: Any,
|
||||
) -> VST:
|
||||
"""Create a vector store from a list of texts."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def similarity_search(
|
||||
self, query: str, k: int = 4, **kwargs: Any
|
||||
) -> List[Document]:
|
||||
"""Find the most similar documents to the given query."""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def record_manager() -> InMemoryRecordManager:
|
||||
"""Timestamped set fixture."""
|
||||
@@ -156,13 +57,15 @@ async def arecord_manager() -> InMemoryRecordManager:
|
||||
@pytest.fixture
|
||||
def vector_store() -> InMemoryVectorStore:
|
||||
"""Vector store fixture."""
|
||||
return InMemoryVectorStore()
|
||||
embeddings = DeterministicFakeEmbedding(size=5)
|
||||
return InMemoryVectorStore(embeddings)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def upserting_vector_store() -> InMemoryVectorStore:
|
||||
"""Vector store fixture."""
|
||||
return InMemoryVectorStore(permit_upserts=True)
|
||||
embeddings = DeterministicFakeEmbedding(size=5)
|
||||
return InMemoryVectorStore(embeddings)
|
||||
|
||||
|
||||
def test_indexing_same_content(
|
||||
@@ -286,7 +189,7 @@ def test_index_simple_delete_full(
|
||||
|
||||
doc_texts = set(
|
||||
# Ignoring type since doc should be in the store and not a None
|
||||
vector_store.store.get(uid).page_content # type: ignore
|
||||
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||
for uid in vector_store.store
|
||||
)
|
||||
assert doc_texts == {"mutated document 1", "This is another document."}
|
||||
@@ -368,7 +271,7 @@ async def test_aindex_simple_delete_full(
|
||||
|
||||
doc_texts = set(
|
||||
# Ignoring type since doc should be in the store and not a None
|
||||
vector_store.store.get(uid).page_content # type: ignore
|
||||
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||
for uid in vector_store.store
|
||||
)
|
||||
assert doc_texts == {"mutated document 1", "This is another document."}
|
||||
@@ -659,7 +562,7 @@ def test_incremental_delete(
|
||||
|
||||
doc_texts = set(
|
||||
# Ignoring type since doc should be in the store and not a None
|
||||
vector_store.store.get(uid).page_content # type: ignore
|
||||
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||
for uid in vector_store.store
|
||||
)
|
||||
assert doc_texts == {"This is another document.", "This is a test document."}
|
||||
@@ -718,7 +621,7 @@ def test_incremental_delete(
|
||||
|
||||
doc_texts = set(
|
||||
# Ignoring type since doc should be in the store and not a None
|
||||
vector_store.store.get(uid).page_content # type: ignore
|
||||
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||
for uid in vector_store.store
|
||||
)
|
||||
assert doc_texts == {
|
||||
@@ -786,7 +689,7 @@ def test_incremental_indexing_with_batch_size(
|
||||
|
||||
doc_texts = set(
|
||||
# Ignoring type since doc should be in the store and not a None
|
||||
vector_store.store.get(uid).page_content # type: ignore
|
||||
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||
for uid in vector_store.store
|
||||
)
|
||||
assert doc_texts == {"1", "2", "3", "4"}
|
||||
@@ -836,7 +739,7 @@ def test_incremental_delete_with_batch_size(
|
||||
|
||||
doc_texts = set(
|
||||
# Ignoring type since doc should be in the store and not a None
|
||||
vector_store.store.get(uid).page_content # type: ignore
|
||||
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||
for uid in vector_store.store
|
||||
)
|
||||
assert doc_texts == {"1", "2", "3", "4"}
|
||||
@@ -981,7 +884,7 @@ async def test_aincremental_delete(
|
||||
|
||||
doc_texts = set(
|
||||
# Ignoring type since doc should be in the store and not a None
|
||||
vector_store.store.get(uid).page_content # type: ignore
|
||||
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||
for uid in vector_store.store
|
||||
)
|
||||
assert doc_texts == {"This is another document.", "This is a test document."}
|
||||
@@ -1040,7 +943,7 @@ async def test_aincremental_delete(
|
||||
|
||||
doc_texts = set(
|
||||
# Ignoring type since doc should be in the store and not a None
|
||||
vector_store.store.get(uid).page_content # type: ignore
|
||||
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||
for uid in vector_store.store
|
||||
)
|
||||
assert doc_texts == {
|
||||
@@ -1232,8 +1135,10 @@ def test_deduplication_v2(
|
||||
|
||||
# using in memory implementation here
|
||||
assert isinstance(vector_store, InMemoryVectorStore)
|
||||
|
||||
ids = list(vector_store.store.keys())
|
||||
contents = sorted(
|
||||
[document.page_content for document in vector_store.store.values()]
|
||||
[document.page_content for document in vector_store.get_by_ids(ids)]
|
||||
)
|
||||
assert contents == ["1", "2", "3"]
|
||||
|
||||
@@ -1370,11 +1275,19 @@ def test_indexing_custom_batch_size(
|
||||
ids = [_HashedDocument.from_document(doc).uid for doc in docs]
|
||||
|
||||
batch_size = 1
|
||||
with patch.object(vector_store, "add_documents") as mock_add_documents:
|
||||
|
||||
original = vector_store.add_documents
|
||||
|
||||
try:
|
||||
mock_add_documents = MagicMock()
|
||||
vector_store.add_documents = mock_add_documents
|
||||
|
||||
index(docs, record_manager, vector_store, batch_size=batch_size)
|
||||
args, kwargs = mock_add_documents.call_args
|
||||
assert args == (docs,)
|
||||
assert kwargs == {"ids": ids, "batch_size": batch_size}
|
||||
finally:
|
||||
vector_store.add_documents = original
|
||||
|
||||
|
||||
async def test_aindexing_custom_batch_size(
|
||||
@@ -1390,8 +1303,9 @@ async def test_aindexing_custom_batch_size(
|
||||
ids = [_HashedDocument.from_document(doc).uid for doc in docs]
|
||||
|
||||
batch_size = 1
|
||||
with patch.object(vector_store, "aadd_documents") as mock_add_documents:
|
||||
await aindex(docs, arecord_manager, vector_store, batch_size=batch_size)
|
||||
args, kwargs = mock_add_documents.call_args
|
||||
assert args == (docs,)
|
||||
assert kwargs == {"ids": ids, "batch_size": batch_size}
|
||||
mock_add_documents = AsyncMock()
|
||||
vector_store.aadd_documents = mock_add_documents
|
||||
await aindex(docs, arecord_manager, vector_store, batch_size=batch_size)
|
||||
args, kwargs = mock_add_documents.call_args
|
||||
assert args == (docs,)
|
||||
assert kwargs == {"ids": ids, "batch_size": batch_size}
|
||||
|
||||
@@ -7,7 +7,7 @@ from typing_extensions import TypedDict
|
||||
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.embeddings import Embeddings
|
||||
from langchain_core.indexing.base import UpsertResponse
|
||||
from langchain_core.indexing.base import DeleteResponse, UpsertResponse
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
|
||||
|
||||
@@ -68,6 +68,18 @@ class CustomSyncVectorStore(VectorStore):
|
||||
def get_by_ids(self, ids: Sequence[str], /) -> List[Document]:
|
||||
return [self.store[id] for id in ids if id in self.store]
|
||||
|
||||
def delete_by_ids(
|
||||
self,
|
||||
ids: Sequence[str],
|
||||
/,
|
||||
) -> DeleteResponse:
|
||||
for id_ in ids:
|
||||
self.store.pop(id_, None)
|
||||
return {
|
||||
"succeeded": ids,
|
||||
"failed": [],
|
||||
}
|
||||
|
||||
def from_texts( # type: ignore
|
||||
cls,
|
||||
texts: List[str],
|
||||
|
||||
@@ -54,9 +54,11 @@ class MultiVectorRetriever(BaseRetriever):
|
||||
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
|
||||
) -> List[Document]:
|
||||
"""Get documents relevant to a query.
|
||||
|
||||
Args:
|
||||
query: String to find relevant documents for
|
||||
run_manager: The callbacks handler to use
|
||||
|
||||
Returns:
|
||||
List of relevant documents
|
||||
"""
|
||||
@@ -79,9 +81,11 @@ class MultiVectorRetriever(BaseRetriever):
|
||||
self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
|
||||
) -> List[Document]:
|
||||
"""Asynchronously get documents relevant to a query.
|
||||
|
||||
Args:
|
||||
query: String to find relevant documents for
|
||||
run_manager: The callbacks handler to use
|
||||
|
||||
Returns:
|
||||
List of relevant documents
|
||||
"""
|
||||
|
||||
@@ -1,13 +1,19 @@
|
||||
import uuid
|
||||
from typing import Any, List, Optional, Sequence
|
||||
|
||||
from langchain_core.documents import Document
|
||||
from langchain.pydantic_v1 import BaseModel
|
||||
from langchain.retrievers import MultiVectorRetriever
|
||||
from langchain_core.callbacks import CallbackManagerForRetrieverRun
|
||||
from langchain_core.documents import BaseDocumentTransformer, Document
|
||||
from langchain_core.indexing import UpsertResponse
|
||||
from langchain_core.indexing.base import DeleteResponse
|
||||
from langchain_core.indexing.base_index import BaseIndex
|
||||
from langchain_core.retrievers import BaseRetriever
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
from langchain_text_splitters import TextSplitter
|
||||
|
||||
from langchain.retrievers import MultiVectorRetriever
|
||||
|
||||
|
||||
class ParentDocumentRetriever(MultiVectorRetriever):
|
||||
class ParentDocumentRetriever(MultiVectorRetriever, BaseIndex[Document]):
|
||||
"""Retrieve small chunks then retrieve their parent documents.
|
||||
|
||||
When splitting documents for retrieval, there are often conflicting desires:
|
||||
@@ -58,8 +64,6 @@ class ParentDocumentRetriever(MultiVectorRetriever):
|
||||
child_splitter: TextSplitter
|
||||
"""The text splitter to use to create child documents."""
|
||||
|
||||
"""The key to use to track the parent id. This will be stored in the
|
||||
metadata of child documents."""
|
||||
parent_splitter: Optional[TextSplitter] = None
|
||||
"""The text splitter to use to create parent documents.
|
||||
If none, then the parent documents will be the raw documents passed in."""
|
||||
@@ -123,3 +127,135 @@ class ParentDocumentRetriever(MultiVectorRetriever):
|
||||
self.vectorstore.add_documents(docs, **kwargs)
|
||||
if add_to_docstore:
|
||||
self.docstore.mset(full_docs)
|
||||
|
||||
|
||||
## V2 Implementation
|
||||
|
||||
|
||||
class ParentRetrieverV2(BaseRetriever):
|
||||
underlying_retriever: BaseRetriever
|
||||
"""The underlying retriever to use to retrieve the parent documents."""
|
||||
id_key: str = "doc_id"
|
||||
"""The key to use to look up the parent documents."""
|
||||
store: BaseIndex[Document]
|
||||
transformer: BaseDocumentTransformer
|
||||
|
||||
def _get_relevant_documents(
|
||||
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
|
||||
) -> List[Document]:
|
||||
"""Get documents relevant to a query.
|
||||
|
||||
Args:
|
||||
query: String to find relevant documents for
|
||||
run_manager: The callback handler to use
|
||||
Returns:
|
||||
List of relevant documents
|
||||
"""
|
||||
# Config is a problem for composition?
|
||||
sub_docs = self.underlying_retriever.invoke(
|
||||
query, config={"callbacks": run_manager}
|
||||
)
|
||||
ids = []
|
||||
for d in sub_docs:
|
||||
if self.id_key in d.metadata and d.metadata[self.id_key] not in ids:
|
||||
ids.append(d.metadata[self.id_key])
|
||||
docs = self.store.get_by_ids(ids)
|
||||
return docs
|
||||
|
||||
|
||||
# Needs a better name
|
||||
class FullDocumentIndex(BaseIndex[Document], BaseModel):
|
||||
"""A specialized index that stores small chunks of data and their embeddings."""
|
||||
|
||||
vectorstore: VectorStore # <-- Unnecessarily strict. We should just create a QueryableIndex here
|
||||
"""A specialized index that stores small chunks of data and their embeddings."""
|
||||
store: BaseIndex[Document]
|
||||
"""The storage interface for the parent documents"""
|
||||
id_key: str = "doc_id"
|
||||
"""The key to use to look up the parent documents."""
|
||||
chunker: BaseDocumentTransformer
|
||||
"""Used to chunk the source documents into small chunks that can will be searched."""
|
||||
|
||||
def upsert(
|
||||
self,
|
||||
items: Sequence[Document],
|
||||
/,
|
||||
vector_store_kwargs: Optional[dict] = None,
|
||||
**kwargs: Any,
|
||||
) -> UpsertResponse:
|
||||
"""Upsert documents into the index and vectorstore."""
|
||||
for item in items:
|
||||
if item.id is None:
|
||||
raise ValueError("Document must have an ID.")
|
||||
|
||||
child_docs = []
|
||||
|
||||
# This logic is inefficient since we don't have a good way to keep
|
||||
# track of the original document
|
||||
for doc in items:
|
||||
# Can't do this efficiently cuz we have to keep track of the original
|
||||
# document
|
||||
sub_docs = self.chunker.transform_documents([doc])
|
||||
for sub_doc in sub_docs:
|
||||
# Select the metadata for the child documents
|
||||
for _doc in sub_doc:
|
||||
_doc.metadata = {
|
||||
k: _doc.metadata[k] for k in self.child_metadata_fields
|
||||
}
|
||||
# Add the parent id to the child documents
|
||||
_doc.metadata[self.id_key] = doc.id
|
||||
|
||||
child_docs.extend(sub_docs)
|
||||
|
||||
# Needs to clean UP first to keep things synchronized.
|
||||
self.vectorstore.delete_by_filter(
|
||||
{
|
||||
"filter": {
|
||||
self.id_key: {
|
||||
"$in": [doc.id for doc in items],
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
self.vectorstore.upsert(child_docs, **(vector_store_kwargs or {}))
|
||||
|
||||
return self.docstore.upsert(items)
|
||||
|
||||
def get_by_ids(self, ids: Sequence[str], /) -> List[Document]:
|
||||
"""Get documents by their ids."""
|
||||
return self.store.get_by_ids(ids)
|
||||
|
||||
def delete_by_ids(
|
||||
self,
|
||||
ids: Sequence[str],
|
||||
/,
|
||||
) -> DeleteResponse:
|
||||
"""Delete documents by their ids."""
|
||||
# First delete from vectorstore
|
||||
self.vectorstore.delete_by_filter(
|
||||
{
|
||||
"filter": {
|
||||
self.id_key: {
|
||||
"$in": ids,
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
delete_response = self.store.delete_by_ids(ids)
|
||||
return delete_response
|
||||
|
||||
# This could be an argument for accepting kwargs in get_by_ids!
|
||||
def get_by_ids_from_vectorstore(self, ids: Sequence[str], /) -> List[Document]:
|
||||
"""Get documents by their ids."""
|
||||
return self.vectorstore.get_by_ids(ids)
|
||||
|
||||
# We should inherit from a more generalized version of a retriever
|
||||
# so we don't have to do get_retriever()
|
||||
def get_retriever(self, **kwargs) -> ParentRetrieverV2:
|
||||
"""Get documents by their ids."""
|
||||
# We do this to maintain the order of the ids that are returned
|
||||
return ParentRetrieverV2(
|
||||
underlying_retriever=self.vectortore.as_retriever(**kwargs),
|
||||
store=self.store,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user