From 428b2409c7208eb3aa52e6b836455974bb52ae31 Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Wed, 17 Jul 2024 15:30:10 -0400 Subject: [PATCH] x --- libs/core/langchain_core/indexing/__init__.py | 2 + libs/core/langchain_core/indexing/api.py | 10 +- libs/core/langchain_core/indexing/base.py | 269 ++++++++++++++++-- .../unit_tests/indexing/test_public_api.py | 5 +- 4 files changed, 256 insertions(+), 30 deletions(-) diff --git a/libs/core/langchain_core/indexing/__init__.py b/libs/core/langchain_core/indexing/__init__.py index 305ae7b459d..58e492e083c 100644 --- a/libs/core/langchain_core/indexing/__init__.py +++ b/libs/core/langchain_core/indexing/__init__.py @@ -7,6 +7,7 @@ if it's unchanged. from langchain_core.indexing.api import IndexingResult, aindex, index from langchain_core.indexing.base import ( + DocumentIndexer, InMemoryRecordManager, RecordManager, UpsertResponse, @@ -14,6 +15,7 @@ from langchain_core.indexing.base import ( __all__ = [ "aindex", + "DocumentIndexer", "index", "IndexingResult", "InMemoryRecordManager", diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index 0462e0838ba..6370ce3a9e9 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -7,6 +7,7 @@ import json import uuid from itertools import islice from typing import ( + TYPE_CHECKING, Any, AsyncIterable, AsyncIterator, @@ -29,7 +30,9 @@ from langchain_core.document_loaders.base import BaseLoader from langchain_core.documents import Document from langchain_core.indexing.base import RecordManager from langchain_core.pydantic_v1 import root_validator -from langchain_core.vectorstores import VectorStore + +if TYPE_CHECKING: + from langchain_core.vectorstores import VectorStore # Magic UUID to use as a namespace for hashing. # Used to try and generate a unique UUID for each document @@ -265,6 +268,9 @@ def index( "delete" and "add_documents" required methods. ValueError: If source_id_key is not None, but is not a string or callable. """ + # Local scope to avoid circular imports + from langchain_core.vectorstores import VectorStore + if cleanup not in {"incremental", "full", None}: raise ValueError( f"cleanup should be one of 'incremental', 'full' or None. " @@ -478,6 +484,8 @@ async def aindex( "adelete" and "aadd_documents" required methods. ValueError: If source_id_key is not None, but is not a string or callable. """ + # Local scope to avoid circular imports + from langchain_core.vectorstores import VectorStore if cleanup not in {"incremental", "full", None}: raise ValueError( diff --git a/libs/core/langchain_core/indexing/base.py b/libs/core/langchain_core/indexing/base.py index c7e549615e7..0491908967c 100644 --- a/libs/core/langchain_core/indexing/base.py +++ b/libs/core/langchain_core/indexing/base.py @@ -1,8 +1,249 @@ from __future__ import annotations +import abc import time from abc import ABC, abstractmethod -from typing import Dict, List, Optional, Sequence, TypedDict +from typing import ( + Any, + Dict, + List, + Optional, + Sequence, +) + +from typing_extensions import TypedDict + +from langchain_core._api import beta +from langchain_core.documents.base import Document +from langchain_core.runnables import run_in_executor + + +class UpsertResponse(TypedDict): + """A generic response for upsert operations. + + The upsert response will be used by abstractions that implement an upsert + operation for content that can be upserted by ID. + + Upsert APIs that accept inputs with IDs and generate IDs internally + will return a response that includes the IDs that succeeded and the IDs + that failed. + + If there are no failures, the failed list will be empty, and the order + of the IDs in the succeeded list will match the order of the input documents. + + If there are failures, the response becomes ill defined, and a user of the API + cannot determine which generated ID corresponds to which input document. + + It is recommended for users explicitly attach the IDs to the items being + indexed to avoid this issue. + """ + + succeeded: List[str] + """The IDs that were successfully indexed.""" + failed: List[str] + """The IDs that failed to index.""" + + +class DeleteResponse(TypedDict, total=False): + """A generic response for delete operation. + + The fields in this response are optional and whether the vectorstore + returns them or not is up to the implementation. + """ + + num_deleted: int + """The number of items that were successfully deleted.""" + num_failed: int + """The number of items that failed to be deleted.""" + succeeded: Sequence[str] + """The IDs that were successfully deleted. + + Should not be returned when using delete_by_filter. + """ + failed: Sequence[str] + """The IDs that failed to be deleted. + + Should not be returned when using delete_by_filter. + + Please note that deleting an ID that does not exist is **NOT** considered a failure. + """ + + +@beta(message="Added in ___version___. The API is subject to change.") +class DocumentIndexer(abc.ABC): + """An abstraction for indexing documents. + + This indexing interface is designed to be a generic abstraction for storing and + querying documents that has an ID and metadata associated with it. + + The interface is designed to be agnostic to the underlying implementation of the + indexing system. + + The interface is designed to support the following operations: + + 1. Storing content in the index. + 2. Retrieving content by ID. + + .. versionadded:: ___version___ + """ + + @abc.abstractmethod + def upsert(self, items: Sequence[Document], /, **kwargs: Any) -> UpsertResponse: + """Upsert documents into the index. + + The upsert functionality should utilize the ID field of the content object + if it is provided. If the ID is not provided, the upsert method is free + to generate an ID for the content. + + When an ID is specified and the content already exists in the vectorstore, + the upsert method should update the content with the new data. If the content + does not exist, the upsert method should add the item to the vectorstore. + + Args: + items: Sequence of documents to add to the vectorstore. + **kwargs: Additional keyword arguments. + + Returns: + UpsertResponse: A response object that contains the list of IDs that were + successfully added or updated in the vectorstore and the list of IDs that + failed to be added or updated. + + .. versionadded:: ___version___ + """ + + async def aupsert( + self, items: Sequence[Document], /, **kwargs: Any + ) -> UpsertResponse: + """Add or update documents in the vectorstore. Async version of upsert. + + The upsert functionality should utilize the ID field of the item + if it is provided. If the ID is not provided, the upsert method is free + to generate an ID for the item. + + When an ID is specified and the item already exists in the vectorstore, + the upsert method should update the item with the new data. If the item + does not exist, the upsert method should add the item to the vectorstore. + + Args: + items: Sequence of documents to add to the vectorstore. + **kwargs: Additional keyword arguments. + + Returns: + UpsertResponse: A response object that contains the list of IDs that were + successfully added or updated in the vectorstore and the list of IDs that + failed to be added or updated. + + .. versionadded:: ___version___ + """ + return await run_in_executor( + None, + self.upsert, + items, + **kwargs, + ) + + @abc.abstractmethod + def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> DeleteResponse: + """Delete by IDs or other criteria. + + Calling delete without any input parameters should raise a ValueError! + + Args: + ids: List of ids to delete. + kwargs: Additional keyword arguments. This is up to the implementation. + For example, can include an option to delete the entire index, + or else issue a non blocking delete etc. + + Returns: + DeleteResponse: A response object that contains the list of IDs that were + successfully deleted and the list of IDs that failed to be deleted. + """ + + async def adelete( + self, ids: Optional[List[str]] = None, **kwargs: Any + ) -> DeleteResponse: + """Delete by IDs or other criteria. Async variant. + + Calling adelete without any input parameters should raise a ValueError! + + Args: + ids: List of ids to delete. + kwargs: Additional keyword arguments. This is up to the implementation. + For example, can include an option to delete the entire index. + + Returns: + DeleteResponse: A response object that contains the list of IDs that were + successfully deleted and the list of IDs that failed to be deleted. + """ + return await run_in_executor( + None, + self.delete, + ids, + **kwargs, + ) + + @abc.abstractmethod + def get( + self, + ids: Sequence[str], + /, + **kwargs: Any, + ) -> List[Document]: + """Get documents by id. + + Fewer documents may be returned than requested if some IDs are not found or + if there are duplicated IDs. + + Users should not assume that the order of the returned documents matches + the order of the input IDs. Instead, users should rely on the ID field of the + returned documents. + + This method should **NOT** raise exceptions if no documents are found for + some IDs. + + Args: + ids: List of IDs to get. + kwargs: Additional keyword arguments. These are up to the implementation. + + Returns: + List[Document]: List of documents that were found. + + .. versionadded:: ___version___ + """ + + async def aget( + self, + ids: Sequence[str], + /, + **kwargs: Any, + ) -> List[Document]: + """Get documents by id. + + Fewer documents may be returned than requested if some IDs are not found or + if there are duplicated IDs. + + Users should not assume that the order of the returned documents matches + the order of the input IDs. Instead, users should rely on the ID field of the + returned documents. + + This method should **NOT** raise exceptions if no documents are found for + some IDs. + + Args: + ids: List of IDs to get. + kwargs: Additional keyword arguments. These are up to the implementation. + + Returns: + List[Document]: List of documents that were found. + + .. versionadded:: ___version___ + """ + return await run_in_executor( + None, + self.get, + ids, + **kwargs, + ) class RecordManager(ABC): @@ -421,29 +662,3 @@ class InMemoryRecordManager(RecordManager): keys: A list of keys to delete. """ self.delete_keys(keys) - - -class UpsertResponse(TypedDict): - """A generic response for upsert operations. - - The upsert response will be used by abstractions that implement an upsert - operation for content that can be upserted by ID. - - Upsert APIs that accept inputs with IDs and generate IDs internally - will return a response that includes the IDs that succeeded and the IDs - that failed. - - If there are no failures, the failed list will be empty, and the order - of the IDs in the succeeded list will match the order of the input documents. - - If there are failures, the response becomes ill defined, and a user of the API - cannot determine which generated ID corresponds to which input document. - - It is recommended for users explicitly attach the IDs to the items being - indexed to avoid this issue. - """ - - succeeded: List[str] - """The IDs that were successfully indexed.""" - failed: List[str] - """The IDs that failed to index.""" diff --git a/libs/core/tests/unit_tests/indexing/test_public_api.py b/libs/core/tests/unit_tests/indexing/test_public_api.py index 0259017a954..abafa21c8b0 100644 --- a/libs/core/tests/unit_tests/indexing/test_public_api.py +++ b/libs/core/tests/unit_tests/indexing/test_public_api.py @@ -4,11 +4,12 @@ from langchain_core.indexing import __all__ def test_all() -> None: """Use to catch obvious breaking changes.""" assert __all__ == sorted(__all__, key=str.lower) - assert __all__ == [ + assert set(__all__) == { "aindex", + "DocumentIndexer", "index", "IndexingResult", "InMemoryRecordManager", "RecordManager", "UpsertResponse", - ] + }