community[minor]: Add Zep Cloud components + docs + examples (#21671)

Thank you for contributing to LangChain!

- [x] **PR title**: community: Add Zep Cloud components + docs +
examples

- [x] **PR message**: 
We have recently released our new zep-cloud sdks that are compatible
with Zep Cloud (not Zep Open Source). We have also maintained our Cloud
version of langchain components (ChatMessageHistory, VectorStore) as
part of our sdks. This PRs goal is to port these components to langchain
community repo, and close the gap with the existing Zep Open Source
components already present in community repo (added
ZepCloudMemory,ZepCloudVectorStore,ZepCloudRetriever).
Also added a ZepCloudChatMessageHistory components together with an
expression language example ported from our repo. We have left the
original open source components intact on purpose as to not introduce
any breaking changes.
    - **Issue:** -
- **Dependencies:** Added optional dependency of our new cloud sdk
`zep-cloud`
    - **Twitter handle:** @paulpaliychuk51


- [x] **Add tests and docs**


- [x] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/

Additional guidelines:
- Make sure optional dependencies are imported within a function.
- Please do not add dependencies to pyproject.toml files (even optional
ones) unless they are required for unit tests.
- Most PRs should not touch more than one package.
- Changes should be backwards compatible.
- If you are adding something to community, do not re-import it in
langchain.

If no one reviews your PR within a few days, please @-mention one of
baskaryan, efriis, eyurtsev, hwchase17.
This commit is contained in:
Pavlo Paliychuk
2024-05-27 15:50:13 -04:00
committed by GitHub
parent cccc8fbe2f
commit 342df7cf83
18 changed files with 2843 additions and 27 deletions

View File

@@ -82,6 +82,9 @@ if TYPE_CHECKING:
from langchain_community.chat_message_histories.zep import (
ZepChatMessageHistory,
)
from langchain_community.chat_message_histories.zep_cloud import (
ZepCloudChatMessageHistory,
)
__all__ = [
"AstraDBChatMessageHistory",
@@ -105,6 +108,7 @@ __all__ = [
"UpstashRedisChatMessageHistory",
"XataChatMessageHistory",
"ZepChatMessageHistory",
"ZepCloudChatMessageHistory",
]
_module_lookup = {
@@ -129,6 +133,7 @@ _module_lookup = {
"UpstashRedisChatMessageHistory": "langchain_community.chat_message_histories.upstash_redis", # noqa: E501
"XataChatMessageHistory": "langchain_community.chat_message_histories.xata",
"ZepChatMessageHistory": "langchain_community.chat_message_histories.zep",
"ZepCloudChatMessageHistory": "langchain_community.chat_message_histories.zep_cloud", # noqa: E501
}

View File

@@ -0,0 +1,285 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
AIMessage,
BaseMessage,
HumanMessage,
)
if TYPE_CHECKING:
from zep_cloud import (
Memory,
MemoryGetRequestMemoryType,
MemorySearchResult,
Message,
NotFoundError,
RoleType,
SearchScope,
SearchType,
)
logger = logging.getLogger(__name__)
def condense_zep_memory_into_human_message(zep_memory: Memory) -> BaseMessage:
prompt = ""
if zep_memory.facts:
prompt = "\n".join(zep_memory.facts)
if zep_memory.summary and zep_memory.summary.content:
prompt += "\n" + zep_memory.summary.content
for msg in zep_memory.messages or []:
prompt += f"\n{msg.role or msg.role_type}: {msg.content}"
return HumanMessage(content=prompt)
def get_zep_message_role_type(role: str) -> RoleType:
if role == "human":
return "user"
elif role == "ai":
return "assistant"
elif role == "system":
return "system"
elif role == "function":
return "function"
elif role == "tool":
return "tool"
else:
return "system"
class ZepCloudChatMessageHistory(BaseChatMessageHistory):
"""Chat message history that uses Zep Cloud as a backend.
Recommended usage::
# Set up Zep Chat History
zep_chat_history = ZepChatMessageHistory(
session_id=session_id,
api_key=<your_api_key>,
)
# Use a standard ConversationBufferMemory to encapsulate the Zep chat history
memory = ConversationBufferMemory(
memory_key="chat_history", chat_memory=zep_chat_history
)
Zep - Recall, understand, and extract data from chat histories.
Power personalized AI experiences.
Zep is a long-term memory service for AI Assistant apps.
With Zep, you can provide AI assistants with the
ability to recall past conversations,
no matter how distant,
while also reducing hallucinations, latency, and cost.
see Zep Cloud Docs: https://help.getzep.com
This class is a thin wrapper around the zep-python package. Additional
Zep functionality is exposed via the `zep_summary`, `zep_messages` and `zep_facts`
properties.
For more information on the zep-python package, see:
https://github.com/getzep/zep-python
"""
def __init__(
self,
session_id: str,
api_key: str,
*,
memory_type: Optional[MemoryGetRequestMemoryType] = None,
lastn: Optional[int] = None,
ai_prefix: Optional[str] = None,
human_prefix: Optional[str] = None,
summary_instruction: Optional[str] = None,
) -> None:
try:
from zep_cloud.client import AsyncZep, Zep
except ImportError:
raise ImportError(
"Could not import zep-cloud package. "
"Please install it with `pip install zep-cloud`."
)
self.zep_client = Zep(api_key=api_key)
self.zep_client_async = AsyncZep(api_key=api_key)
self.session_id = session_id
self.memory_type = memory_type or "perpetual"
self.lastn = lastn
self.ai_prefix = ai_prefix or "ai"
self.human_prefix = human_prefix or "human"
self.summary_instruction = summary_instruction
@property
def messages(self) -> List[BaseMessage]: # type: ignore
"""Retrieve messages from Zep memory"""
zep_memory: Optional[Memory] = self._get_memory()
if not zep_memory:
return []
return [condense_zep_memory_into_human_message(zep_memory)]
@property
def zep_messages(self) -> List[Message]:
"""Retrieve summary from Zep memory"""
zep_memory: Optional[Memory] = self._get_memory()
if not zep_memory:
return []
return zep_memory.messages or []
@property
def zep_summary(self) -> Optional[str]:
"""Retrieve summary from Zep memory"""
zep_memory: Optional[Memory] = self._get_memory()
if not zep_memory or not zep_memory.summary:
return None
return zep_memory.summary.content
@property
def zep_facts(self) -> Optional[List[str]]:
"""Retrieve conversation facts from Zep memory"""
if self.memory_type != "perpetual":
return None
zep_memory: Optional[Memory] = self._get_memory()
if not zep_memory or not zep_memory.facts:
return None
return zep_memory.facts
def _get_memory(self) -> Optional[Memory]:
"""Retrieve memory from Zep"""
from zep_cloud import NotFoundError
try:
zep_memory: Memory = self.zep_client.memory.get(
self.session_id, memory_type=self.memory_type, lastn=self.lastn
)
except NotFoundError:
logger.warning(
f"Session {self.session_id} not found in Zep. Returning None"
)
return None
return zep_memory
def add_user_message( # type: ignore[override]
self, message: str, metadata: Optional[Dict[str, Any]] = None
) -> None:
"""Convenience method for adding a human message string to the store.
Args:
message: The string contents of a human message.
metadata: Optional metadata to attach to the message.
"""
self.add_message(HumanMessage(content=message), metadata=metadata)
def add_ai_message( # type: ignore[override]
self, message: str, metadata: Optional[Dict[str, Any]] = None
) -> None:
"""Convenience method for adding an AI message string to the store.
Args:
message: The string contents of an AI message.
metadata: Optional metadata to attach to the message.
"""
self.add_message(AIMessage(content=message), metadata=metadata)
def add_message(
self, message: BaseMessage, metadata: Optional[Dict[str, Any]] = None
) -> None:
"""Append the message to the Zep memory history"""
from zep_cloud import Message
self.zep_client.memory.add(
self.session_id,
messages=[
Message(
content=str(message.content),
role=message.type,
role_type=get_zep_message_role_type(message.type),
metadata=metadata,
)
],
)
def add_messages(self, messages: Sequence[BaseMessage]) -> None:
"""Append the messages to the Zep memory history"""
from zep_cloud import Message
zep_messages = [
Message(
content=str(message.content),
role=message.type,
role_type=get_zep_message_role_type(message.type),
metadata=message.additional_kwargs.get("metadata", None),
)
for message in messages
]
self.zep_client.memory.add(self.session_id, messages=zep_messages)
async def aadd_messages(self, messages: Sequence[BaseMessage]) -> None:
"""Append the messages to the Zep memory history asynchronously"""
from zep_cloud import Message
zep_messages = [
Message(
content=str(message.content),
role=message.type,
role_type=get_zep_message_role_type(message.type),
metadata=message.additional_kwargs.get("metadata", None),
)
for message in messages
]
await self.zep_client_async.memory.add(self.session_id, messages=zep_messages)
def search(
self,
query: str,
metadata: Optional[Dict] = None,
search_scope: SearchScope = "messages",
search_type: SearchType = "similarity",
mmr_lambda: Optional[float] = None,
limit: Optional[int] = None,
) -> List[MemorySearchResult]:
"""Search Zep memory for messages matching the query"""
return self.zep_client.memory.search(
self.session_id,
text=query,
metadata=metadata,
search_scope=search_scope,
search_type=search_type,
mmr_lambda=mmr_lambda,
limit=limit,
)
def clear(self) -> None:
"""Clear session memory from Zep. Note that Zep is long-term storage for memory
and this is not advised unless you have specific data retention requirements.
"""
try:
self.zep_client.memory.delete(self.session_id)
except NotFoundError:
logger.warning(
f"Session {self.session_id} not found in Zep. Skipping delete."
)
async def aclear(self) -> None:
"""Clear session memory from Zep asynchronously.
Note that Zep is long-term storage for memory and this is not advised
unless you have specific data retention requirements.
"""
try:
await self.zep_client_async.memory.delete(self.session_id)
except NotFoundError:
logger.warning(
f"Session {self.session_id} not found in Zep. Skipping delete."
)

View File

@@ -0,0 +1,124 @@
from __future__ import annotations
from typing import Any, Dict, Optional
from langchain_community.chat_message_histories import ZepCloudChatMessageHistory
try:
from langchain.memory import ConversationBufferMemory
from zep_cloud import MemoryGetRequestMemoryType
class ZepCloudMemory(ConversationBufferMemory):
"""Persist your chain history to the Zep MemoryStore.
Documentation: https://help.getzep.com
Example:
.. code-block:: python
memory = ZepCloudMemory(
session_id=session_id, # Identifies your user or a user's session
api_key=<your_api_key>, # Your Zep Project API key
memory_key="history", # Ensure this matches the key used in
# chain's prompt template
return_messages=True, # Does your prompt template expect a string
# or a list of Messages?
)
chain = LLMChain(memory=memory,...) # Configure your chain to use the ZepMemory
instance
Note:
To persist metadata alongside your chat history, your will need to create a
custom Chain class that overrides the `prep_outputs` method to include the metadata
in the call to `self.memory.save_context`.
Zep - Recall, understand, and extract data from chat histories. Power personalized AI experiences.
=========
Zep is a long-term memory service for AI Assistant apps. With Zep, you can provide AI assistants with the ability to recall past conversations,
no matter how distant, while also reducing hallucinations, latency, and cost.
For more information on the zep-python package, see:
https://github.com/getzep/zep-python
""" # noqa: E501
chat_memory: ZepCloudChatMessageHistory
def __init__(
self,
session_id: str,
api_key: str,
memory_type: Optional[MemoryGetRequestMemoryType] = None,
lastn: Optional[int] = None,
output_key: Optional[str] = None,
input_key: Optional[str] = None,
return_messages: bool = False,
human_prefix: str = "Human",
ai_prefix: str = "AI",
memory_key: str = "history",
):
"""Initialize ZepMemory.
Args:
session_id (str): Identifies your user or a user's session
api_key (str): Your Zep Project key.
memory_type (Optional[MemoryGetRequestMemoryType], optional): Zep Memory Type, defaults to perpetual
lastn (Optional[int], optional): Number of messages to retrieve. Will add the last summary generated prior to the nth oldest message. Defaults to 6
output_key (Optional[str], optional): The key to use for the output message.
Defaults to None.
input_key (Optional[str], optional): The key to use for the input message.
Defaults to None.
return_messages (bool, optional): Does your prompt template expect a string
or a list of Messages? Defaults to False
i.e. return a string.
human_prefix (str, optional): The prefix to use for human messages.
Defaults to "Human".
ai_prefix (str, optional): The prefix to use for AI messages.
Defaults to "AI".
memory_key (str, optional): The key to use for the memory.
Defaults to "history".
Ensure that this matches the key used in
chain's prompt template.
""" # noqa: E501
chat_message_history = ZepCloudChatMessageHistory(
session_id=session_id,
memory_type=memory_type,
lastn=lastn,
api_key=api_key,
)
super().__init__(
chat_memory=chat_message_history,
output_key=output_key,
input_key=input_key,
return_messages=return_messages,
human_prefix=human_prefix,
ai_prefix=ai_prefix,
memory_key=memory_key,
)
def save_context(
self,
inputs: Dict[str, Any],
outputs: Dict[str, str],
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Save context from this conversation to buffer.
Args:
inputs (Dict[str, Any]): The inputs to the chain.
outputs (Dict[str, str]): The outputs from the chain.
metadata (Optional[Dict[str, Any]], optional): Any metadata to save with
the context. Defaults to None
Returns:
None
"""
input_str, output_str = self._get_input_output(inputs, outputs)
self.chat_memory.add_user_message(input_str, metadata=metadata)
self.chat_memory.add_ai_message(output_str, metadata=metadata)
except ImportError:
# Placeholder object
class ZepCloudMemory: # type: ignore[no-redef]
pass

View File

@@ -136,6 +136,9 @@ if TYPE_CHECKING:
from langchain_community.retrievers.zep import (
ZepRetriever,
)
from langchain_community.retrievers.zep_cloud import (
ZepCloudRetriever,
)
from langchain_community.retrievers.zilliz import (
ZillizRetriever,
)
@@ -183,6 +186,7 @@ _module_lookup = {
"WikipediaRetriever": "langchain_community.retrievers.wikipedia",
"YouRetriever": "langchain_community.retrievers.you",
"ZepRetriever": "langchain_community.retrievers.zep",
"ZepCloudRetriever": "langchain_community.retrievers.zep_cloud",
"ZillizRetriever": "langchain_community.retrievers.zilliz",
"NeuralDBRetriever": "langchain_community.retrievers.thirdai_neuraldb",
}
@@ -238,5 +242,6 @@ __all__ = [
"WikipediaRetriever",
"YouRetriever",
"ZepRetriever",
"ZepCloudRetriever",
"ZillizRetriever",
]

View File

@@ -0,0 +1,162 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from langchain_core.callbacks import (
AsyncCallbackManagerForRetrieverRun,
CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import root_validator
from langchain_core.retrievers import BaseRetriever
if TYPE_CHECKING:
from zep_cloud import MemorySearchResult, SearchScope, SearchType
from zep_cloud.client import AsyncZep, Zep
class ZepCloudRetriever(BaseRetriever):
"""`Zep Cloud` MemoryStore Retriever.
Search your user's long-term chat history with Zep.
Zep offers both simple semantic search and Maximal Marginal Relevance (MMR)
reranking of search results.
Note: You will need to provide the user's `session_id` to use this retriever.
Args:
api_key: Your Zep API key
session_id: Identifies your user or a user's session (required)
top_k: Number of documents to return (default: 3, optional)
search_type: Type of search to perform (similarity / mmr)
(default: similarity, optional)
mmr_lambda: Lambda value for MMR search. Defaults to 0.5 (optional)
Zep - Recall, understand, and extract data from chat histories.
Power personalized AI experiences.
=========
Zep is a long-term memory service for AI Assistant apps.
With Zep, you can provide AI assistants with the ability
to recall past conversations,
no matter how distant, while also reducing hallucinations, latency, and cost.
see Zep Cloud Docs: https://help.getzep.com
"""
api_key: str
"""Your Zep API key."""
zep_client: Zep
"""Zep client used for making API requests."""
zep_client_async: AsyncZep
"""Async Zep client used for making API requests."""
session_id: str
"""Zep session ID."""
top_k: Optional[int]
"""Number of items to return."""
search_scope: SearchScope = "messages"
"""Which documents to search. Messages or Summaries?"""
search_type: SearchType = "similarity"
"""Type of search to perform (similarity / mmr)"""
mmr_lambda: Optional[float] = None
"""Lambda value for MMR search."""
@root_validator(pre=True)
def create_client(cls, values: dict) -> dict:
try:
from zep_cloud.client import AsyncZep, Zep
except ImportError:
raise ImportError(
"Could not import zep-cloud package. "
"Please install it with `pip install zep-cloud`."
)
if values.get("api_key") is None:
raise ValueError("Zep API key is required.")
values["zep_client"] = Zep(api_key=values.get("api_key"))
values["zep_client_async"] = AsyncZep(api_key=values.get("api_key"))
return values
def _messages_search_result_to_doc(
self, results: List[MemorySearchResult]
) -> List[Document]:
return [
Document(
page_content=str(r.message.content),
metadata={
"score": r.score,
"uuid": r.message.uuid_,
"created_at": r.message.created_at,
"token_count": r.message.token_count,
"role": r.message.role or r.message.role_type,
},
)
for r in results or []
if r.message
]
def _summary_search_result_to_doc(
self, results: List[MemorySearchResult]
) -> List[Document]:
return [
Document(
page_content=str(r.summary.content),
metadata={
"score": r.score,
"uuid": r.summary.uuid_,
"created_at": r.summary.created_at,
"token_count": r.summary.token_count,
},
)
for r in results
if r.summary
]
def _get_relevant_documents(
self,
query: str,
*,
run_manager: CallbackManagerForRetrieverRun,
metadata: Optional[Dict[str, Any]] = None,
) -> List[Document]:
if not self.zep_client:
raise RuntimeError("Zep client not initialized.")
results = self.zep_client.memory.search(
self.session_id,
text=query,
metadata=metadata,
search_scope=self.search_scope,
search_type=self.search_type,
mmr_lambda=self.mmr_lambda,
limit=self.top_k,
)
if self.search_scope == "summary":
return self._summary_search_result_to_doc(results)
return self._messages_search_result_to_doc(results)
async def _aget_relevant_documents(
self,
query: str,
*,
run_manager: AsyncCallbackManagerForRetrieverRun,
metadata: Optional[Dict[str, Any]] = None,
) -> List[Document]:
if not self.zep_client_async:
raise RuntimeError("Zep client not initialized.")
results = await self.zep_client_async.memory.search(
self.session_id,
text=query,
metadata=metadata,
search_scope=self.search_scope,
search_type=self.search_type,
mmr_lambda=self.mmr_lambda,
limit=self.top_k,
)
if self.search_scope == "summary":
return self._summary_search_result_to_doc(results)
return self._messages_search_result_to_doc(results)

View File

@@ -294,6 +294,9 @@ if TYPE_CHECKING:
from langchain_community.vectorstores.zep import (
ZepVectorStore,
)
from langchain_community.vectorstores.zep_cloud import (
ZepCloudVectorStore,
)
from langchain_community.vectorstores.zilliz import (
Zilliz,
)
@@ -395,6 +398,7 @@ __all__ = [
"Weaviate",
"Yellowbrick",
"ZepVectorStore",
"ZepCloudVectorStore",
"Zilliz",
]
@@ -495,6 +499,7 @@ _module_lookup = {
"Weaviate": "langchain_community.vectorstores.weaviate",
"Yellowbrick": "langchain_community.vectorstores.yellowbrick",
"ZepVectorStore": "langchain_community.vectorstores.zep",
"ZepCloudVectorStore": "langchain_community.vectorstores.zep_cloud",
"Zilliz": "langchain_community.vectorstores.zilliz",
}

View File

@@ -0,0 +1,477 @@
from __future__ import annotations
import logging
import warnings
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
if TYPE_CHECKING:
from zep_cloud import CreateDocumentRequest, DocumentCollectionResponse, SearchType
logger = logging.getLogger()
class ZepCloudVectorStore(VectorStore):
"""`Zep` vector store.
It provides methods for adding texts or documents to the store,
searching for similar documents, and deleting documents.
Search scores are calculated using cosine similarity normalized to [0, 1].
Args:
collection_name (str): The name of the collection in the Zep store.
api_key (str): The API key for the Zep API.
"""
def __init__(
self,
collection_name: str,
api_key: str,
) -> None:
super().__init__()
if not collection_name:
raise ValueError(
"collection_name must be specified when using ZepVectorStore."
)
try:
from zep_cloud.client import AsyncZep, Zep
except ImportError:
raise ImportError(
"Could not import zep-python python package. "
"Please install it with `pip install zep-python`."
)
self._client = Zep(api_key=api_key)
self._client_async = AsyncZep(api_key=api_key)
self.collection_name = collection_name
self._load_collection()
@property
def embeddings(self) -> Optional[Embeddings]:
"""Unavailable for ZepCloud"""
return None
def _load_collection(self) -> DocumentCollectionResponse:
"""
Load the collection from the Zep backend.
"""
from zep_cloud import NotFoundError
try:
collection = self._client.document.get_collection(self.collection_name)
except NotFoundError:
logger.info(
f"Collection {self.collection_name} not found. Creating new collection."
)
collection = self._create_collection()
return collection
def _create_collection(self) -> DocumentCollectionResponse:
"""
Create a new collection in the Zep backend.
"""
self._client.document.add_collection(self.collection_name)
collection = self._client.document.get_collection(self.collection_name)
return collection
def _generate_documents_to_add(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[Any, Any]]] = None,
document_ids: Optional[List[str]] = None,
) -> List[CreateDocumentRequest]:
from zep_cloud import CreateDocumentRequest as ZepDocument
documents: List[ZepDocument] = []
for i, d in enumerate(texts):
documents.append(
ZepDocument(
content=d,
metadata=metadatas[i] if metadatas else None,
document_id=document_ids[i] if document_ids else None,
)
)
return documents
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
document_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
document_ids: Optional list of document ids associated with the texts.
kwargs: vectorstore specific parameters
Returns:
List of ids from adding the texts into the vectorstore.
"""
documents = self._generate_documents_to_add(texts, metadatas, document_ids)
uuids = self._client.document.add_documents(
self.collection_name, request=documents
)
return uuids
async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
document_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore."""
documents = self._generate_documents_to_add(texts, metadatas, document_ids)
uuids = await self._client_async.document.add_documents(
self.collection_name, request=documents
)
return uuids
def search(
self,
query: str,
search_type: SearchType,
metadata: Optional[Dict[str, Any]] = None,
k: int = 3,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to query using specified search type."""
if search_type == "similarity":
return self.similarity_search(query, k=k, metadata=metadata, **kwargs)
elif search_type == "mmr":
return self.max_marginal_relevance_search(
query, k=k, metadata=metadata, **kwargs
)
else:
raise ValueError(
f"search_type of {search_type} not allowed. Expected "
"search_type to be 'similarity' or 'mmr'."
)
async def asearch(
self,
query: str,
search_type: str,
metadata: Optional[Dict[str, Any]] = None,
k: int = 3,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to query using specified search type."""
if search_type == "similarity":
return await self.asimilarity_search(
query, k=k, metadata=metadata, **kwargs
)
elif search_type == "mmr":
return await self.amax_marginal_relevance_search(
query, k=k, metadata=metadata, **kwargs
)
else:
raise ValueError(
f"search_type of {search_type} not allowed. Expected "
"search_type to be 'similarity' or 'mmr'."
)
def similarity_search(
self,
query: str,
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to query."""
results = self._similarity_search_with_relevance_scores(
query, k=k, metadata=metadata, **kwargs
)
return [doc for doc, _ in results]
def similarity_search_with_score(
self,
query: str,
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Run similarity search with distance."""
return self._similarity_search_with_relevance_scores(
query, k=k, metadata=metadata, **kwargs
)
def _similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
Default similarity search with relevance scores. Modify if necessary
in subclass.
Return docs and relevance scores in the range [0, 1].
0 is dissimilar, 1 is most similar.
Args:
query: input text
k: Number of Documents to return. Defaults to 4.
metadata: Optional, metadata filter
**kwargs: kwargs to be passed to similarity search. Should include:
score_threshold: Optional, a floating point value between 0 to 1 and
filter the resulting set of retrieved docs
Returns:
List of Tuples of (doc, similarity_score)
"""
results = self._client.document.search(
collection_name=self.collection_name,
text=query,
limit=k,
metadata=metadata,
**kwargs,
)
return [
(
Document(
page_content=str(doc.content),
metadata=doc.metadata,
),
doc.score or 0.0,
)
for doc in results.results or []
]
async def asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query."""
results = await self._client_async.document.search(
collection_name=self.collection_name,
text=query,
limit=k,
metadata=metadata,
**kwargs,
)
return [
(
Document(
page_content=str(doc.content),
metadata=doc.metadata,
),
doc.score or 0.0,
)
for doc in results.results or []
]
async def asimilarity_search(
self,
query: str,
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to query."""
results = await self.asimilarity_search_with_relevance_scores(
query, k, metadata=metadata, **kwargs
)
return [doc for doc, _ in results]
def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""Unsupported in Zep Cloud"""
warnings.warn("similarity_search_by_vector is not supported in Zep Cloud")
return []
async def asimilarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""Unsupported in Zep Cloud"""
warnings.warn("asimilarity_search_by_vector is not supported in Zep Cloud")
return []
def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
Zep determines this automatically and this parameter is
ignored.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
metadata: Optional, metadata to filter the resulting set of retrieved docs
Returns:
List of Documents selected by maximal marginal relevance.
"""
results = self._client.document.search(
collection_name=self.collection_name,
text=query,
limit=k,
metadata=metadata,
search_type="mmr",
mmr_lambda=lambda_mult,
**kwargs,
)
return [
Document(page_content=str(d.content), metadata=d.metadata)
for d in results.results or []
]
async def amax_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance."""
results = await self._client_async.document.search(
collection_name=self.collection_name,
text=query,
limit=k,
metadata=metadata,
search_type="mmr",
mmr_lambda=lambda_mult,
**kwargs,
)
return [
Document(page_content=str(d.content), metadata=d.metadata)
for d in results.results or []
]
def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""Unsupported in Zep Cloud"""
warnings.warn(
"max_marginal_relevance_search_by_vector is not supported in Zep Cloud"
)
return []
async def amax_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""Unsupported in Zep Cloud"""
warnings.warn(
"amax_marginal_relevance_search_by_vector is not supported in Zep Cloud"
)
return []
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = "",
api_key: Optional[str] = None,
**kwargs: Any,
) -> ZepCloudVectorStore:
"""
Class method that returns a ZepVectorStore instance initialized from texts.
If the collection does not exist, it will be created.
Args:
texts (List[str]): The list of texts to add to the vectorstore.
metadatas (Optional[List[Dict[str, Any]]]): Optional list of metadata
associated with the texts.
collection_name (str): The name of the collection in the Zep store.
api_key (str): The API key for the Zep API.
**kwargs: Additional parameters specific to the vectorstore.
Returns:
ZepVectorStore: An instance of ZepVectorStore.
"""
if not api_key:
raise ValueError("api_key must be specified when using ZepVectorStore.")
vecstore = cls(
collection_name=collection_name,
api_key=api_key,
)
vecstore.add_texts(texts, metadatas)
return vecstore
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> None:
"""Delete by Zep vector UUIDs.
Parameters
----------
ids : Optional[List[str]]
The UUIDs of the vectors to delete.
Raises
------
ValueError
If no UUIDs are provided.
"""
if ids is None or len(ids) == 0:
raise ValueError("No uuids provided to delete.")
for u in ids:
self._client.document.delete_document(self.collection_name, u)

View File

@@ -22,6 +22,7 @@ EXPECTED_ALL = [
"UpstashRedisChatMessageHistory",
"XataChatMessageHistory",
"ZepChatMessageHistory",
"ZepCloudChatMessageHistory",
]

View File

@@ -43,6 +43,7 @@ EXPECTED_ALL = [
"WebResearchRetriever",
"YouRetriever",
"ZepRetriever",
"ZepCloudRetriever",
"ZillizRetriever",
"DocArrayRetriever",
"NeuralDBRetriever",

View File

@@ -100,6 +100,7 @@ EXPECTED_ALL = [
"Weaviate",
"Yellowbrick",
"ZepVectorStore",
"ZepCloudVectorStore",
"Zilliz",
]

View File

@@ -98,6 +98,7 @@ def test_compatible_vectorstore_documentation() -> None:
"Weaviate",
"Yellowbrick",
"ZepVectorStore",
"ZepCloudVectorStore",
"Zilliz",
"Lantern",
"OpenSearchVectorSearch",