This commit is contained in:
Eugene Yurtsev
2024-07-17 15:30:10 -04:00
parent 9e4a0e76f6
commit 428b2409c7
4 changed files with 256 additions and 30 deletions

View File

@@ -7,6 +7,7 @@ if it's unchanged.
from langchain_core.indexing.api import IndexingResult, aindex, index
from langchain_core.indexing.base import (
DocumentIndexer,
InMemoryRecordManager,
RecordManager,
UpsertResponse,
@@ -14,6 +15,7 @@ from langchain_core.indexing.base import (
__all__ = [
"aindex",
"DocumentIndexer",
"index",
"IndexingResult",
"InMemoryRecordManager",

View File

@@ -7,6 +7,7 @@ import json
import uuid
from itertools import islice
from typing import (
TYPE_CHECKING,
Any,
AsyncIterable,
AsyncIterator,
@@ -29,7 +30,9 @@ from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.indexing.base import RecordManager
from langchain_core.pydantic_v1 import root_validator
from langchain_core.vectorstores import VectorStore
if TYPE_CHECKING:
from langchain_core.vectorstores import VectorStore
# Magic UUID to use as a namespace for hashing.
# Used to try and generate a unique UUID for each document
@@ -265,6 +268,9 @@ def index(
"delete" and "add_documents" required methods.
ValueError: If source_id_key is not None, but is not a string or callable.
"""
# Local scope to avoid circular imports
from langchain_core.vectorstores import VectorStore
if cleanup not in {"incremental", "full", None}:
raise ValueError(
f"cleanup should be one of 'incremental', 'full' or None. "
@@ -478,6 +484,8 @@ async def aindex(
"adelete" and "aadd_documents" required methods.
ValueError: If source_id_key is not None, but is not a string or callable.
"""
# Local scope to avoid circular imports
from langchain_core.vectorstores import VectorStore
if cleanup not in {"incremental", "full", None}:
raise ValueError(

View File

@@ -1,8 +1,249 @@
from __future__ import annotations
import abc
import time
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Sequence, TypedDict
from typing import (
Any,
Dict,
List,
Optional,
Sequence,
)
from typing_extensions import TypedDict
from langchain_core._api import beta
from langchain_core.documents.base import Document
from langchain_core.runnables import run_in_executor
class UpsertResponse(TypedDict):
"""A generic response for upsert operations.
The upsert response will be used by abstractions that implement an upsert
operation for content that can be upserted by ID.
Upsert APIs that accept inputs with IDs and generate IDs internally
will return a response that includes the IDs that succeeded and the IDs
that failed.
If there are no failures, the failed list will be empty, and the order
of the IDs in the succeeded list will match the order of the input documents.
If there are failures, the response becomes ill defined, and a user of the API
cannot determine which generated ID corresponds to which input document.
It is recommended for users explicitly attach the IDs to the items being
indexed to avoid this issue.
"""
succeeded: List[str]
"""The IDs that were successfully indexed."""
failed: List[str]
"""The IDs that failed to index."""
class DeleteResponse(TypedDict, total=False):
"""A generic response for delete operation.
The fields in this response are optional and whether the vectorstore
returns them or not is up to the implementation.
"""
num_deleted: int
"""The number of items that were successfully deleted."""
num_failed: int
"""The number of items that failed to be deleted."""
succeeded: Sequence[str]
"""The IDs that were successfully deleted.
Should not be returned when using delete_by_filter.
"""
failed: Sequence[str]
"""The IDs that failed to be deleted.
Should not be returned when using delete_by_filter.
Please note that deleting an ID that does not exist is **NOT** considered a failure.
"""
@beta(message="Added in ___version___. The API is subject to change.")
class DocumentIndexer(abc.ABC):
"""An abstraction for indexing documents.
This indexing interface is designed to be a generic abstraction for storing and
querying documents that has an ID and metadata associated with it.
The interface is designed to be agnostic to the underlying implementation of the
indexing system.
The interface is designed to support the following operations:
1. Storing content in the index.
2. Retrieving content by ID.
.. versionadded:: ___version___
"""
@abc.abstractmethod
def upsert(self, items: Sequence[Document], /, **kwargs: Any) -> UpsertResponse:
"""Upsert documents into the index.
The upsert functionality should utilize the ID field of the content object
if it is provided. If the ID is not provided, the upsert method is free
to generate an ID for the content.
When an ID is specified and the content already exists in the vectorstore,
the upsert method should update the content with the new data. If the content
does not exist, the upsert method should add the item to the vectorstore.
Args:
items: Sequence of documents to add to the vectorstore.
**kwargs: Additional keyword arguments.
Returns:
UpsertResponse: A response object that contains the list of IDs that were
successfully added or updated in the vectorstore and the list of IDs that
failed to be added or updated.
.. versionadded:: ___version___
"""
async def aupsert(
self, items: Sequence[Document], /, **kwargs: Any
) -> UpsertResponse:
"""Add or update documents in the vectorstore. Async version of upsert.
The upsert functionality should utilize the ID field of the item
if it is provided. If the ID is not provided, the upsert method is free
to generate an ID for the item.
When an ID is specified and the item already exists in the vectorstore,
the upsert method should update the item with the new data. If the item
does not exist, the upsert method should add the item to the vectorstore.
Args:
items: Sequence of documents to add to the vectorstore.
**kwargs: Additional keyword arguments.
Returns:
UpsertResponse: A response object that contains the list of IDs that were
successfully added or updated in the vectorstore and the list of IDs that
failed to be added or updated.
.. versionadded:: ___version___
"""
return await run_in_executor(
None,
self.upsert,
items,
**kwargs,
)
@abc.abstractmethod
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse:
"""Delete by IDs or other criteria.
Calling delete without any input parameters should raise a ValueError!
Args:
ids: List of ids to delete.
kwargs: Additional keyword arguments. This is up to the implementation.
For example, can include an option to delete the entire index,
or else issue a non blocking delete etc.
Returns:
DeleteResponse: A response object that contains the list of IDs that were
successfully deleted and the list of IDs that failed to be deleted.
"""
async def adelete(
self, ids: Optional[List[str]] = None, **kwargs: Any
) -> DeleteResponse:
"""Delete by IDs or other criteria. Async variant.
Calling adelete without any input parameters should raise a ValueError!
Args:
ids: List of ids to delete.
kwargs: Additional keyword arguments. This is up to the implementation.
For example, can include an option to delete the entire index.
Returns:
DeleteResponse: A response object that contains the list of IDs that were
successfully deleted and the list of IDs that failed to be deleted.
"""
return await run_in_executor(
None,
self.delete,
ids,
**kwargs,
)
@abc.abstractmethod
def get(
self,
ids: Sequence[str],
/,
**kwargs: Any,
) -> List[Document]:
"""Get documents by id.
Fewer documents may be returned than requested if some IDs are not found or
if there are duplicated IDs.
Users should not assume that the order of the returned documents matches
the order of the input IDs. Instead, users should rely on the ID field of the
returned documents.
This method should **NOT** raise exceptions if no documents are found for
some IDs.
Args:
ids: List of IDs to get.
kwargs: Additional keyword arguments. These are up to the implementation.
Returns:
List[Document]: List of documents that were found.
.. versionadded:: ___version___
"""
async def aget(
self,
ids: Sequence[str],
/,
**kwargs: Any,
) -> List[Document]:
"""Get documents by id.
Fewer documents may be returned than requested if some IDs are not found or
if there are duplicated IDs.
Users should not assume that the order of the returned documents matches
the order of the input IDs. Instead, users should rely on the ID field of the
returned documents.
This method should **NOT** raise exceptions if no documents are found for
some IDs.
Args:
ids: List of IDs to get.
kwargs: Additional keyword arguments. These are up to the implementation.
Returns:
List[Document]: List of documents that were found.
.. versionadded:: ___version___
"""
return await run_in_executor(
None,
self.get,
ids,
**kwargs,
)
class RecordManager(ABC):
@@ -421,29 +662,3 @@ class InMemoryRecordManager(RecordManager):
keys: A list of keys to delete.
"""
self.delete_keys(keys)
class UpsertResponse(TypedDict):
"""A generic response for upsert operations.
The upsert response will be used by abstractions that implement an upsert
operation for content that can be upserted by ID.
Upsert APIs that accept inputs with IDs and generate IDs internally
will return a response that includes the IDs that succeeded and the IDs
that failed.
If there are no failures, the failed list will be empty, and the order
of the IDs in the succeeded list will match the order of the input documents.
If there are failures, the response becomes ill defined, and a user of the API
cannot determine which generated ID corresponds to which input document.
It is recommended for users explicitly attach the IDs to the items being
indexed to avoid this issue.
"""
succeeded: List[str]
"""The IDs that were successfully indexed."""
failed: List[str]
"""The IDs that failed to index."""

View File

@@ -4,11 +4,12 @@ from langchain_core.indexing import __all__
def test_all() -> None:
"""Use to catch obvious breaking changes."""
assert __all__ == sorted(__all__, key=str.lower)
assert __all__ == [
assert set(__all__) == {
"aindex",
"DocumentIndexer",
"index",
"IndexingResult",
"InMemoryRecordManager",
"RecordManager",
"UpsertResponse",
]
}