Compare commits

...

17 Commits

Author SHA1 Message Date
Bagatur
1fbac7a902 undo 2024-06-12 16:41:06 -07:00
Bagatur
9f822a257e undp 2024-06-12 16:17:32 -07:00
Bagatur
8f5856ec6a Merge branch 'master' into bagatur/retrieval_v2_scratch 2024-06-12 16:16:43 -07:00
Bagatur
9c43b5cc6b wip 2024-06-05 12:14:37 -07:00
Bagatur
decad84ac6 wip 2024-06-05 11:47:10 -07:00
Bagatur
c421645c07 fmt 2024-06-05 10:26:07 -07:00
Bagatur
803cbb2474 fmt 2024-06-04 17:13:15 -07:00
Bagatur
53982c43e1 Merge branch 'master' into bagatur/retrieval_v2_scratch 2024-06-04 16:09:52 -07:00
Bagatur
4b1599b726 fmt 2024-05-28 12:00:45 -07:00
Bagatur
f30dbc8c78 fmt 2024-05-27 17:38:34 -07:00
Bagatur
c5751de3b1 fmt 2024-05-27 17:38:02 -07:00
Bagatur
187bb7a98e Merge branch 'master' into bagatur/retrieval_v2_scratch 2024-05-27 15:51:20 -07:00
Bagatur
8f9dbe0d88 fmt 2024-05-24 15:08:08 -07:00
Bagatur
d766549756 fmt 2024-05-24 15:00:08 -07:00
Bagatur
822de04a4f fmt 2024-05-23 18:23:25 -07:00
Bagatur
027fff680d fmt 2024-05-23 18:21:07 -07:00
Bagatur
66c5ddbd9e scratch 2024-05-23 18:14:43 -07:00
3 changed files with 384 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
from langchain_core.indexes.base import Index
from langchain_core.indexes.types import DeleteResponse, UpsertResponse
__all__ = ["UpsertResponse", "DeleteResponse", "Index"]

View File

@@ -0,0 +1,369 @@
import warnings
from abc import ABC, abstractmethod
from typing import (
Any,
AsyncIterable,
Dict,
Iterable,
List,
Optional,
Sequence,
Tuple,
Union,
cast,
)
from langchain_core.documents import Document
from langchain_core.indexes.types import DeleteResponse, UpsertResponse
from langchain_core.runnables import run_in_executor
from langchain_core.stores import BaseStore
from langchain_core.structured_query import StructuredQuery
class Index(BaseStore[str, Document], ABC):
"""Interface for a document index.
Example:
.. code-block:: python
from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union, Iterable
from uuid import uuid4
from langchain_core.documents import Document
from langchain_core.indexes import UpsertResponse, DeleteResponse, Index
def uuid4_generator() -> Iterable[str]:
while True:
yield str(uuid4())
class DictIndex(Index):
def __init__(self) -> None:
self.store = {}
def upsert(
self,
documents: Iterable[Document],
*,
ids: Optional[Iterable[str]] = None,
**kwargs: Any,
) -> UpsertResponse:
ids = ids or uuid4_generator()
succeeded = []
for id_, doc in zip(ids, documents):
self.store[id_] = doc
succeeded.append(id_)
return UpsertResponse(succeeded=succeeded, failed=[])
def delete_by_ids(self, ids: Iterable[str]) -> DeleteResponse:
succeeded = []
failed = []
for id_ in ids:
try:
del self.store[id_]
except Exception:
failed.append(id_)
else:
succeeded.append(id_)
return DeleteResponse(succeeded=succeeded, failed=failed)
def lazy_get_by_ids(self, ids: Iterable[str]) -> Iterable[Document]:
for id in ids:
yield self.store[id]
def yield_keys(
self, *, prefix: Optional[str] = None
) -> Union[Iterator[str]]:
prefix = prefix or ""
for key in self.store:
if key.startswith(prefix):
yield key
""" # noqa: E501
@abstractmethod
def upsert(
self,
# TODO: Iterable or Iterator?
documents: Iterable[Document],
*,
ids: Optional[Iterable[str]] = None,
**kwargs: Any,
) -> UpsertResponse:
"""Upsert documents to index."""
# FOR CONTRIBUTORS: Overwrite this in Index child classes that support native async upsertions.
async def aupsert(
self,
documents: AsyncIterable[Document],
*,
ids: Optional[AsyncIterable[str]] = None,
**kwargs: Any,
) -> UpsertResponse:
"""Upsert documents to index. Default implementation, runs sync upsert() in async executor."""
# TODO: how to convert AsyncIterable -> Iterable
return await run_in_executor(None, self.upsert, documents, ids=ids, **kwargs)
@abstractmethod
def delete_by_ids(self, ids: Iterable[str]) -> DeleteResponse:
"""Delete documents by id.
Args:
ids: IDs of the documents to delete.
Returns:
A dict ``{"succeeded": [...], "failed": [...]}`` with the IDs of the
documents that were successfully deleted and the ones that failed to be
deleted.
"""
# FOR CONTRIBUTORS: Overwrite this in Index child classes that support native async deletions.
async def adelete_by_ids(self, ids: AsyncIterable[str]) -> DeleteResponse:
"""Upsert documents to index. Default implementation, runs sync delete_by_ids() in async executor."""
return await run_in_executor(None, self.delete_by_ids, ids)
@abstractmethod
def lazy_get_by_ids(self, ids: Iterable[str]) -> Iterable[Document]:
"""Lazily get documents by id.
Args:
ids: IDs of the documents to get.
Yields:
Document
"""
# FOR CONTRIBUTORS: Overwrite this in Index child classes that support native async get.
async def alazy_get_by_ids(
self, ids: AsyncIterable[str]
) -> AsyncIterable[Document]:
"""Lazily get documents by id.
Default implementation, runs sync () in async executor.
Args:
ids: IDs of the documents to get.
Yields:
Document
"""
return await run_in_executor(None, self.lazy_get_by_ids, ids)
def get_by_ids(self, ids: Iterable[str]) -> List[Document]:
"""Get documents by id.
Args:
ids: IDs of the documents to get.
Returns:
A list of the requested Documents.
"""
return list(self.lazy_get_by_ids(ids))
async def aget_by_ids(self, ids: AsyncIterable[str]) -> List[Document]:
docs = []
async for doc in await self.alazy_get_by_ids(ids):
docs.append(doc)
return docs
def delete(
self,
*,
ids: Optional[Iterable[str]] = None,
filters: Union[
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
] = None,
**kwargs: Any,
) -> DeleteResponse:
"""Default implementation only supports deletion by id.
Override this method if the integration supports deletion by other parameters.
Args:
ids: IDs of the documents to delete. Must be specified.
**kwargs: Other keywords args not supported by default. Will be ignored.
Returns:
A dict ``{"succeeded": [...], "failed": [...]}`` with the IDs of the
documents that were successfully deleted and the ones that failed to be
deleted.
Raises:
ValueError: if ids are not provided.
"""
if ids is None:
raise ValueError("Must provide ids to delete.")
if filters:
kwargs = {"filters": filters, **kwargs}
if kwargs:
warnings.warn(
"Only deletion by ids is supported for this integration, all other "
f"arguments are ignored. Received {kwargs=}"
)
return self.delete_by_ids(ids)
async def adelete(
self,
*,
ids: Optional[AsyncIterable[str]] = None,
filters: Union[
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
] = None,
**kwargs: Any,
) -> DeleteResponse:
"""Default implementation only supports deletion by id.
Override this method if the integration supports deletion by other parameters.
Args:
ids: IDs of the documents to delete. Must be specified.
**kwargs: Other keywords args not supported by default. Will be ignored.
Returns:
A dict ``{"succeeded": [...], "failed": [...]}`` with the IDs of the
documents that were successfully deleted and the ones that failed to be
deleted.
Raises:
ValueError: if ids are not provided.
"""
if ids is None:
raise ValueError("Must provide ids to delete.")
if filters:
kwargs = {"filters": filters, **kwargs}
if kwargs:
warnings.warn(
"Only deletion by ids is supported for this integration, all other "
f"arguments are ignored. Received {kwargs=}"
)
return await self.adelete_by_ids(ids)
def lazy_get(
self,
*,
ids: Optional[Iterable[str]] = None,
filters: Union[
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
] = None,
**kwargs: Any,
) -> Iterable[Document]:
"""Default implementation only supports get by id.
Override this method if the integration supports get by other parameters.
Args:
ids: IDs of the documents to get. Must be specified.
**kwargs: Other keywords args not supported by default. Will be ignored.
Yields:
Document.
Raises:
ValueError: if ids are not provided.
"""
if ids is None:
raise ValueError("Must provide ids to get.")
if filters:
kwargs = {"filters": filters, **kwargs}
if kwargs:
warnings.warn(
"Only deletion by ids is supported for this integration, all other "
f"arguments are ignored. Received {kwargs=}"
)
return self.lazy_get_by_ids(ids)
async def alazy_get(
self,
*,
ids: Optional[AsyncIterable[str]] = None,
filters: Union[
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
] = None,
**kwargs: Any,
) -> AsyncIterable[Document]:
"""Default implementation only supports get by id.
Override this method if the integration supports get by other parameters.
Args:
ids: IDs of the documents to get. Must be specified.
**kwargs: Other keywords args not supported by default. Will be ignored.
Yields:
Document.
Raises:
ValueError: if ids are not provided.
"""
if ids is None:
raise ValueError("Must provide ids to get.")
if filters:
kwargs = {"filters": filters, **kwargs}
if kwargs:
warnings.warn(
"Only deletion by ids is supported for this integration, all other "
f"arguments are ignored. Received {kwargs=}"
)
return await self.alazy_get_by_ids(ids)
def get(
self,
*,
ids: Optional[Iterable[str]] = None,
filters: Union[
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
] = None,
**kwargs: Any,
) -> List[Document]:
"""Default implementation only supports get by id.
Override this method if the integration supports get by other parameters.
Args:
ids: IDs of the documents to get. Must be specified.
**kwargs: Other keywords args not supported by default. Will be ignored.
Returns:
A list of the requested Documents.
Raises:
ValueError: if ids are not provided.
"""
return list(self.lazy_get(ids=ids, filters=filters, **kwargs))
async def aget(
self,
*,
ids: Optional[AsyncIterable[str]] = None,
filters: Union[
StructuredQuery, Dict[str, Any], List[Dict[str, Any]], None
] = None,
**kwargs: Any,
) -> List[Document]:
"""Default implementation only supports get by id.
Override this method if the integration supports get by other parameters.
Args:
ids: IDs of the documents to get. Must be specified.
**kwargs: Other keywords args not supported by default. Will be ignored.
Returns:
A list of the requested Documents.
Raises:
ValueError: if ids are not provided.
"""
docs = []
async for doc in await self.alazy_get(ids=ids, filters=filters, **kwargs):
docs.append(doc)
return docs
def mget(self, keys: Sequence[str]) -> List[Optional[Document]]:
return cast(List[Optional[Document]], self.get_by_ids(keys)) # type: ignore[arg-type]
def mset(self, key_value_pairs: Sequence[Tuple[str, Document]]) -> None:
ids, documents = zip(*key_value_pairs)
self.add(documents, ids=ids)
def mdelete(self, keys: Sequence[str]) -> None:
self.delete_by_ids(keys) # type: ignore[arg-type]

View File

@@ -0,0 +1,11 @@
from typing import List, TypedDict
class UpsertResponse(TypedDict):
succeeded: List[str]
failed: List[str]
class DeleteResponse(TypedDict):
succeeded: List[str]
failed: List[str]