mirror of
https://github.com/hwchase17/langchain.git
synced 2026-01-21 21:56:38 +00:00
Compare commits
17 Commits
sr/looser-
...
bagatur/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1fbac7a902 | ||
|
|
9f822a257e | ||
|
|
8f5856ec6a | ||
|
|
9c43b5cc6b | ||
|
|
decad84ac6 | ||
|
|
c421645c07 | ||
|
|
803cbb2474 | ||
|
|
53982c43e1 | ||
|
|
4b1599b726 | ||
|
|
f30dbc8c78 | ||
|
|
c5751de3b1 | ||
|
|
187bb7a98e | ||
|
|
8f9dbe0d88 | ||
|
|
d766549756 | ||
|
|
822de04a4f | ||
|
|
027fff680d | ||
|
|
66c5ddbd9e |
4
libs/core/langchain_core/indexes/__init__.py
Normal file
4
libs/core/langchain_core/indexes/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from langchain_core.indexes.base import Index
|
||||
from langchain_core.indexes.types import DeleteResponse, UpsertResponse
|
||||
|
||||
__all__ = ["UpsertResponse", "DeleteResponse", "Index"]
|
||||
369
libs/core/langchain_core/indexes/base.py
Normal file
369
libs/core/langchain_core/indexes/base.py
Normal file
@@ -0,0 +1,369 @@
|
||||
import warnings
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncIterable,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.indexes.types import DeleteResponse, UpsertResponse
|
||||
from langchain_core.runnables import run_in_executor
|
||||
from langchain_core.stores import BaseStore
|
||||
from langchain_core.structured_query import StructuredQuery
|
||||
|
||||
|
||||
class Index(BaseStore[str, Document], ABC):
|
||||
"""Interface for a document index.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union, Iterable
|
||||
from uuid import uuid4
|
||||
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.indexes import UpsertResponse, DeleteResponse, Index
|
||||
|
||||
def uuid4_generator() -> Iterable[str]:
|
||||
while True:
|
||||
yield str(uuid4())
|
||||
|
||||
class DictIndex(Index):
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.store = {}
|
||||
|
||||
def upsert(
|
||||
self,
|
||||
documents: Iterable[Document],
|
||||
*,
|
||||
ids: Optional[Iterable[str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> UpsertResponse:
|
||||
ids = ids or uuid4_generator()
|
||||
succeeded = []
|
||||
for id_, doc in zip(ids, documents):
|
||||
self.store[id_] = doc
|
||||
succeeded.append(id_)
|
||||
return UpsertResponse(succeeded=succeeded, failed=[])
|
||||
|
||||
def delete_by_ids(self, ids: Iterable[str]) -> DeleteResponse:
|
||||
succeeded = []
|
||||
failed = []
|
||||
for id_ in ids:
|
||||
try:
|
||||
del self.store[id_]
|
||||
except Exception:
|
||||
failed.append(id_)
|
||||
else:
|
||||
succeeded.append(id_)
|
||||
return DeleteResponse(succeeded=succeeded, failed=failed)
|
||||
|
||||
def lazy_get_by_ids(self, ids: Iterable[str]) -> Iterable[Document]:
|
||||
for id in ids:
|
||||
yield self.store[id]
|
||||
|
||||
def yield_keys(
|
||||
self, *, prefix: Optional[str] = None
|
||||
) -> Union[Iterator[str]]:
|
||||
prefix = prefix or ""
|
||||
for key in self.store:
|
||||
if key.startswith(prefix):
|
||||
yield key
|
||||
""" # noqa: E501
|
||||
|
||||
@abstractmethod
|
||||
def upsert(
|
||||
self,
|
||||
# TODO: Iterable or Iterator?
|
||||
documents: Iterable[Document],
|
||||
*,
|
||||
ids: Optional[Iterable[str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> UpsertResponse:
|
||||
"""Upsert documents to index."""
|
||||
|
||||
# FOR CONTRIBUTORS: Overwrite this in Index child classes that support native async upsertions.
|
||||
async def aupsert(
|
||||
self,
|
||||
documents: AsyncIterable[Document],
|
||||
*,
|
||||
ids: Optional[AsyncIterable[str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> UpsertResponse:
|
||||
"""Upsert documents to index. Default implementation, runs sync upsert() in async executor."""
|
||||
# TODO: how to convert AsyncIterable -> Iterable
|
||||
return await run_in_executor(None, self.upsert, documents, ids=ids, **kwargs)
|
||||
|
||||
@abstractmethod
|
||||
def delete_by_ids(self, ids: Iterable[str]) -> DeleteResponse:
|
||||
"""Delete documents by id.
|
||||
|
||||
Args:
|
||||
ids: IDs of the documents to delete.
|
||||
|
||||
Returns:
|
||||
A dict ``{"succeeded": [...], "failed": [...]}`` with the IDs of the
|
||||
documents that were successfully deleted and the ones that failed to be
|
||||
deleted.
|
||||
"""
|
||||
|
||||
# FOR CONTRIBUTORS: Overwrite this in Index child classes that support native async deletions.
|
||||
async def adelete_by_ids(self, ids: AsyncIterable[str]) -> DeleteResponse:
|
||||
"""Upsert documents to index. Default implementation, runs sync delete_by_ids() in async executor."""
|
||||
return await run_in_executor(None, self.delete_by_ids, ids)
|
||||
|
||||
@abstractmethod
|
||||
def lazy_get_by_ids(self, ids: Iterable[str]) -> Iterable[Document]:
|
||||
"""Lazily get documents by id.
|
||||
|
||||
Args:
|
||||
ids: IDs of the documents to get.
|
||||
|
||||
Yields:
|
||||
Document
|
||||
"""
|
||||
|
||||
# FOR CONTRIBUTORS: Overwrite this in Index child classes that support native async get.
|
||||
async def alazy_get_by_ids(
|
||||
self, ids: AsyncIterable[str]
|
||||
) -> AsyncIterable[Document]:
|
||||
"""Lazily get documents by id.
|
||||
|
||||
Default implementation, runs sync () in async executor.
|
||||
|
||||
Args:
|
||||
ids: IDs of the documents to get.
|
||||
|
||||
Yields:
|
||||
Document
|
||||
"""
|
||||
return await run_in_executor(None, self.lazy_get_by_ids, ids)
|
||||
|
||||
def get_by_ids(self, ids: Iterable[str]) -> List[Document]:
|
||||
"""Get documents by id.
|
||||
|
||||
Args:
|
||||
ids: IDs of the documents to get.
|
||||
|
||||
Returns:
|
||||
A list of the requested Documents.
|
||||
"""
|
||||
return list(self.lazy_get_by_ids(ids))
|
||||
|
||||
async def aget_by_ids(self, ids: AsyncIterable[str]) -> List[Document]:
|
||||
docs = []
|
||||
async for doc in await self.alazy_get_by_ids(ids):
|
||||
docs.append(doc)
|
||||
return docs
|
||||
|
||||
def delete(
|
||||
self,
|
||||
*,
|
||||
ids: Optional[Iterable[str]] = None,
|
||||
filters: Union[
|
||||
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
|
||||
] = None,
|
||||
**kwargs: Any,
|
||||
) -> DeleteResponse:
|
||||
"""Default implementation only supports deletion by id.
|
||||
|
||||
Override this method if the integration supports deletion by other parameters.
|
||||
|
||||
Args:
|
||||
ids: IDs of the documents to delete. Must be specified.
|
||||
**kwargs: Other keywords args not supported by default. Will be ignored.
|
||||
|
||||
Returns:
|
||||
A dict ``{"succeeded": [...], "failed": [...]}`` with the IDs of the
|
||||
documents that were successfully deleted and the ones that failed to be
|
||||
deleted.
|
||||
|
||||
Raises:
|
||||
ValueError: if ids are not provided.
|
||||
"""
|
||||
if ids is None:
|
||||
raise ValueError("Must provide ids to delete.")
|
||||
if filters:
|
||||
kwargs = {"filters": filters, **kwargs}
|
||||
if kwargs:
|
||||
warnings.warn(
|
||||
"Only deletion by ids is supported for this integration, all other "
|
||||
f"arguments are ignored. Received {kwargs=}"
|
||||
)
|
||||
return self.delete_by_ids(ids)
|
||||
|
||||
async def adelete(
|
||||
self,
|
||||
*,
|
||||
ids: Optional[AsyncIterable[str]] = None,
|
||||
filters: Union[
|
||||
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
|
||||
] = None,
|
||||
**kwargs: Any,
|
||||
) -> DeleteResponse:
|
||||
"""Default implementation only supports deletion by id.
|
||||
|
||||
Override this method if the integration supports deletion by other parameters.
|
||||
|
||||
Args:
|
||||
ids: IDs of the documents to delete. Must be specified.
|
||||
**kwargs: Other keywords args not supported by default. Will be ignored.
|
||||
|
||||
Returns:
|
||||
A dict ``{"succeeded": [...], "failed": [...]}`` with the IDs of the
|
||||
documents that were successfully deleted and the ones that failed to be
|
||||
deleted.
|
||||
|
||||
Raises:
|
||||
ValueError: if ids are not provided.
|
||||
"""
|
||||
if ids is None:
|
||||
raise ValueError("Must provide ids to delete.")
|
||||
if filters:
|
||||
kwargs = {"filters": filters, **kwargs}
|
||||
if kwargs:
|
||||
warnings.warn(
|
||||
"Only deletion by ids is supported for this integration, all other "
|
||||
f"arguments are ignored. Received {kwargs=}"
|
||||
)
|
||||
return await self.adelete_by_ids(ids)
|
||||
|
||||
def lazy_get(
|
||||
self,
|
||||
*,
|
||||
ids: Optional[Iterable[str]] = None,
|
||||
filters: Union[
|
||||
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
|
||||
] = None,
|
||||
**kwargs: Any,
|
||||
) -> Iterable[Document]:
|
||||
"""Default implementation only supports get by id.
|
||||
|
||||
Override this method if the integration supports get by other parameters.
|
||||
|
||||
Args:
|
||||
ids: IDs of the documents to get. Must be specified.
|
||||
**kwargs: Other keywords args not supported by default. Will be ignored.
|
||||
|
||||
Yields:
|
||||
Document.
|
||||
|
||||
Raises:
|
||||
ValueError: if ids are not provided.
|
||||
"""
|
||||
if ids is None:
|
||||
raise ValueError("Must provide ids to get.")
|
||||
if filters:
|
||||
kwargs = {"filters": filters, **kwargs}
|
||||
if kwargs:
|
||||
warnings.warn(
|
||||
"Only deletion by ids is supported for this integration, all other "
|
||||
f"arguments are ignored. Received {kwargs=}"
|
||||
)
|
||||
return self.lazy_get_by_ids(ids)
|
||||
|
||||
async def alazy_get(
|
||||
self,
|
||||
*,
|
||||
ids: Optional[AsyncIterable[str]] = None,
|
||||
filters: Union[
|
||||
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
|
||||
] = None,
|
||||
**kwargs: Any,
|
||||
) -> AsyncIterable[Document]:
|
||||
"""Default implementation only supports get by id.
|
||||
|
||||
Override this method if the integration supports get by other parameters.
|
||||
|
||||
Args:
|
||||
ids: IDs of the documents to get. Must be specified.
|
||||
**kwargs: Other keywords args not supported by default. Will be ignored.
|
||||
|
||||
Yields:
|
||||
Document.
|
||||
|
||||
Raises:
|
||||
ValueError: if ids are not provided.
|
||||
"""
|
||||
if ids is None:
|
||||
raise ValueError("Must provide ids to get.")
|
||||
if filters:
|
||||
kwargs = {"filters": filters, **kwargs}
|
||||
if kwargs:
|
||||
warnings.warn(
|
||||
"Only deletion by ids is supported for this integration, all other "
|
||||
f"arguments are ignored. Received {kwargs=}"
|
||||
)
|
||||
return await self.alazy_get_by_ids(ids)
|
||||
|
||||
def get(
|
||||
self,
|
||||
*,
|
||||
ids: Optional[Iterable[str]] = None,
|
||||
filters: Union[
|
||||
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
|
||||
] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Default implementation only supports get by id.
|
||||
|
||||
Override this method if the integration supports get by other parameters.
|
||||
|
||||
Args:
|
||||
ids: IDs of the documents to get. Must be specified.
|
||||
**kwargs: Other keywords args not supported by default. Will be ignored.
|
||||
|
||||
Returns:
|
||||
A list of the requested Documents.
|
||||
|
||||
Raises:
|
||||
ValueError: if ids are not provided.
|
||||
"""
|
||||
return list(self.lazy_get(ids=ids, filters=filters, **kwargs))
|
||||
|
||||
async def aget(
|
||||
self,
|
||||
*,
|
||||
ids: Optional[AsyncIterable[str]] = None,
|
||||
filters: Union[
|
||||
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
|
||||
] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Default implementation only supports get by id.
|
||||
|
||||
Override this method if the integration supports get by other parameters.
|
||||
|
||||
Args:
|
||||
ids: IDs of the documents to get. Must be specified.
|
||||
**kwargs: Other keywords args not supported by default. Will be ignored.
|
||||
|
||||
Returns:
|
||||
A list of the requested Documents.
|
||||
|
||||
Raises:
|
||||
ValueError: if ids are not provided.
|
||||
"""
|
||||
docs = []
|
||||
async for doc in await self.alazy_get(ids=ids, filters=filters, **kwargs):
|
||||
docs.append(doc)
|
||||
return docs
|
||||
|
||||
def mget(self, keys: Sequence[str]) -> List[Optional[Document]]:
|
||||
return cast(List[Optional[Document]], self.get_by_ids(keys)) # type: ignore[arg-type]
|
||||
|
||||
def mset(self, key_value_pairs: Sequence[Tuple[str, Document]]) -> None:
|
||||
ids, documents = zip(*key_value_pairs)
|
||||
self.add(documents, ids=ids)
|
||||
|
||||
def mdelete(self, keys: Sequence[str]) -> None:
|
||||
self.delete_by_ids(keys) # type: ignore[arg-type]
|
||||
11
libs/core/langchain_core/indexes/types.py
Normal file
11
libs/core/langchain_core/indexes/types.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from typing import List, TypedDict
|
||||
|
||||
|
||||
class UpsertResponse(TypedDict):
|
||||
succeeded: List[str]
|
||||
failed: List[str]
|
||||
|
||||
|
||||
class DeleteResponse(TypedDict):
|
||||
succeeded: List[str]
|
||||
failed: List[str]
|
||||
Reference in New Issue
Block a user