From 63e516c2b0434f15c967a8b527cd37ee8624580e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Y=C4=B1lmaz?= <46003469+yilmaz-burak@users.noreply.github.com> Date: Fri, 13 Oct 2023 03:36:51 +0300 Subject: [PATCH] Upstash redis integration (#10871) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - **Description:** Introduced Upstash provider with following wrappers: UpstashRedisCache, UpstashRedisEntityStore, UpstashRedisChatMessageHistory, UpstashRedisStore - **Issue:** -, - **Dependencies:** upstash-redis python package is needed, - **Tag maintainer:** @baskaryan - **Twitter handle:** @BurakY744 --------- Co-authored-by: Burak Yılmaz Co-authored-by: Bagatur --- docs/docs/integrations/llms/llm_caching.ipynb | 105 +++++++++++++- .../upstash_redis_chat_message_history.ipynb | 61 ++++++++ docs/docs/integrations/providers/upstash.mdx | 42 ++++++ .../text_embedding/caching_embeddings.ipynb | 134 ++++++++++++++---- libs/langchain/langchain/cache.py | 89 +++++++++++- libs/langchain/langchain/memory/__init__.py | 4 + .../memory/chat_message_histories/__init__.py | 4 + .../chat_message_histories/upstash_redis.py | 67 +++++++++ libs/langchain/langchain/memory/entity.py | 78 ++++++++++ libs/langchain/langchain/storage/__init__.py | 2 + .../langchain/storage/upstash_redis.py | 119 ++++++++++++++++ libs/langchain/poetry.lock | 19 ++- libs/langchain/pyproject.toml | 2 + .../cache/test_upstash_redis_cache.py | 91 ++++++++++++ .../memory/test_upstash_redis.py | 38 +++++ .../storage/test_upstash_redis.py | 95 +++++++++++++ .../unit_tests/storage/test_upstash_redis.py | 8 ++ 17 files changed, 923 insertions(+), 35 deletions(-) create mode 100644 docs/docs/integrations/memory/upstash_redis_chat_message_history.ipynb create mode 100644 docs/docs/integrations/providers/upstash.mdx create mode 100644 libs/langchain/langchain/memory/chat_message_histories/upstash_redis.py create mode 100644 libs/langchain/langchain/storage/upstash_redis.py create mode 100644 libs/langchain/tests/integration_tests/cache/test_upstash_redis_cache.py create mode 100644 libs/langchain/tests/integration_tests/memory/test_upstash_redis.py create mode 100644 libs/langchain/tests/integration_tests/storage/test_upstash_redis.py create mode 100644 libs/langchain/tests/unit_tests/storage/test_upstash_redis.py diff --git a/docs/docs/integrations/llms/llm_caching.ipynb b/docs/docs/integrations/llms/llm_caching.ipynb index be15915c941..4683177d946 100644 --- a/docs/docs/integrations/llms/llm_caching.ipynb +++ b/docs/docs/integrations/llms/llm_caching.ipynb @@ -12,7 +12,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": 3, "id": "10ad9224", "metadata": {}, "outputs": [], @@ -37,7 +37,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 4, "id": "426ff912", "metadata": {}, "outputs": [], @@ -207,6 +207,101 @@ "llm(\"Tell me a joke\")" ] }, + { + "cell_type": "markdown", + "id": "e71273ab", + "metadata": {}, + "source": [ + "## `Upstash Redis` Cache" + ] + }, + { + "cell_type": "markdown", + "id": "f10dabef", + "metadata": {}, + "source": [ + "### Standard Cache\n", + "Use [Upstash Redis](https://upstash.com) to cache prompts and responses with a serverless HTTP API." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "f3920f25", + "metadata": {}, + "outputs": [], + "source": [ + "from upstash_redis import Redis\n", + "from langchain.cache import UpstashRedisCache\n", + "\n", + "URL = \"\"\n", + "TOKEN = \"\"\n", + "\n", + "langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN))" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "id": "3bf7d959", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 7.56 ms, sys: 2.98 ms, total: 10.5 ms\n", + "Wall time: 1.14 s\n" + ] + }, + { + "data": { + "text/plain": [ + "'\\n\\nWhy did the chicken cross the road?\\n\\nTo get to the other side!'" + ] + }, + "execution_count": 39, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%time\n", + "# The first time, it is not yet in cache, so it should take longer\n", + "llm(\"Tell me a joke\")" + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "id": "00fc3a34", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 2.78 ms, sys: 1.95 ms, total: 4.73 ms\n", + "Wall time: 82.9 ms\n" + ] + }, + { + "data": { + "text/plain": [ + "'\\n\\nTwo guys stole a calendar. They got six months each.'" + ] + }, + "execution_count": 50, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%time\n", + "# The first time, it is not yet in cache, so it should take longer\n", + "llm(\"Tell me a joke\")" + ] + }, { "cell_type": "markdown", "id": "278ad7ae", @@ -229,7 +324,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 9, "id": "39f6eb0b", "metadata": {}, "outputs": [], @@ -244,7 +339,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "id": "28920749", "metadata": {}, "outputs": [ @@ -440,7 +535,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "id": "9e4ecfd1", "metadata": {}, "outputs": [ diff --git a/docs/docs/integrations/memory/upstash_redis_chat_message_history.ipynb b/docs/docs/integrations/memory/upstash_redis_chat_message_history.ipynb new file mode 100644 index 00000000000..bf87de7d38d --- /dev/null +++ b/docs/docs/integrations/memory/upstash_redis_chat_message_history.ipynb @@ -0,0 +1,61 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Upstash Redis Chat Message History\n", + "\n", + "This notebook goes over how to use Upstash Redis to store chat message history." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.memory.chat_message_histories.upstash_redis import UpstashRedisChatMessageHistory\n", + "\n", + "URL = \"\"\n", + "TOKEN = \"\"\n", + "\n", + "history = UpstashRedisChatMessageHistory(url=URL, token=TOKEN, ttl=10, session_id=\"my-test-session\")\n", + "\n", + "history.add_user_message(\"hello llm!\")\n", + "history.add_ai_message(\"hello user!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "history.messages" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.3" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/docs/docs/integrations/providers/upstash.mdx b/docs/docs/integrations/providers/upstash.mdx new file mode 100644 index 00000000000..66f4c3df468 --- /dev/null +++ b/docs/docs/integrations/providers/upstash.mdx @@ -0,0 +1,42 @@ +# Upstash Redis + +Upstash offers developers serverless databases and messaging platforms to build powerful applications without having to worry about the operational complexity of running databases at scale. + +This page covers how to use [Upstash Redis](https://upstash.com/redis) with LangChain. + +## Installation and Setup +- Upstash Redis Python SDK can be installed with `pip install upstash-redis` +- A globally distributed, low-latency and highly available database can be created at the [Upstash Console](https://console.upstash.com) + + +## Integrations +All of Upstash-LangChain integrations are based on `upstash-redis` Python SDK being utilized as wrappers for LangChain. +This SDK utilizes Upstash Redis DB by giving UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN parameters from the console. +One significant advantage of this is that, this SDK uses a REST API. This means, you can run this in serverless platforms, edge or any platform that does not support TCP connections. + + +### Cache + +[Upstash Redis](https://upstash.com/redis) can be used as a cache for LLM prompts and responses. + +To import this cache: +```python +from langchain.cache import UpstashRedisCache +``` + +To use with your LLMs: +```python +import langchain +from upstash_redis import Redis + +URL = "" +TOKEN = "" + +langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN)) +``` + +### Memory +Upstash Redis can be used to persist LLM conversations. + +#### Chat Message History Memory +An example of Upstash Redis for caching conversation message history can be seen in [this notebook](/docs/integrations/memory/upstash_redis_chat_message_history.html). diff --git a/docs/docs/modules/data_connection/text_embedding/caching_embeddings.ipynb b/docs/docs/modules/data_connection/text_embedding/caching_embeddings.ipynb index c00f70908a1..c9bbb3fe6c2 100644 --- a/docs/docs/modules/data_connection/text_embedding/caching_embeddings.ipynb +++ b/docs/docs/modules/data_connection/text_embedding/caching_embeddings.ipynb @@ -23,14 +23,14 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 7, "id": "a463c3c2-749b-40d1-a433-84f68a1cd1c7", "metadata": { "tags": [] }, "outputs": [], "source": [ - "from langchain.storage import InMemoryStore, LocalFileStore, RedisStore\n", + "from langchain.storage import InMemoryStore, LocalFileStore, RedisStore, UpstashRedisStore\n", "from langchain.embeddings import OpenAIEmbeddings, CacheBackedEmbeddings" ] }, @@ -46,7 +46,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "id": "9e4314d8-88ef-4f52-81ae-0be771168bb6", "metadata": {}, "outputs": [], @@ -59,7 +59,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "id": "3e751f26-9b5b-4c10-843a-d784b5ea8538", "metadata": {}, "outputs": [], @@ -69,7 +69,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "id": "30743664-38f5-425d-8216-772b64e7f348", "metadata": {}, "outputs": [], @@ -91,7 +91,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "id": "f9ad627f-ced2-4277-b336-2434f22f2c8a", "metadata": {}, "outputs": [ @@ -120,7 +120,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": null, "id": "cf958ac2-e60e-4668-b32c-8bb2d78b3c61", "metadata": {}, "outputs": [], @@ -140,7 +140,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": null, "id": "3a1d7bb8-3b72-4bb5-9013-cf7729caca61", "metadata": {}, "outputs": [ @@ -168,7 +168,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": null, "id": "714cb2e2-77ba-41a8-bb83-84e75342af2d", "metadata": {}, "outputs": [ @@ -196,7 +196,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": null, "id": "f2ca32dd-3712-4093-942b-4122f3dc8a8e", "metadata": {}, "outputs": [ @@ -232,7 +232,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": null, "id": "13bd1c5b-b7ba-4394-957c-7d5b5a841972", "metadata": { "tags": [] @@ -244,7 +244,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": null, "id": "9d99885f-99e1-498c-904d-6db539ac9466", "metadata": { "tags": [] @@ -259,7 +259,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": null, "id": "682eb5d4-0b7a-4dac-b8fb-3de4ca6e421c", "metadata": { "tags": [] @@ -289,7 +289,7 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": null, "id": "f819c3ff-a212-4d06-a5f7-5eb1435c1feb", "metadata": { "tags": [] @@ -311,7 +311,7 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": null, "id": "ec38fb72-90a9-4687-a483-c62c87d1f4dd", "metadata": { "tags": [] @@ -344,7 +344,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": null, "id": "a0070271-0809-4528-97e0-2a88216846f3", "metadata": { "tags": [] @@ -356,7 +356,7 @@ }, { "cell_type": "code", - "execution_count": 20, + "execution_count": null, "id": "0b20e9fe-f57f-4d7c-9f81-105c5f8726f4", "metadata": { "tags": [] @@ -370,7 +370,7 @@ }, { "cell_type": "code", - "execution_count": 21, + "execution_count": null, "id": "630515fd-bf5c-4d9c-a404-9705308f3a2c", "metadata": { "tags": [] @@ -392,7 +392,7 @@ }, { "cell_type": "code", - "execution_count": 22, + "execution_count": null, "id": "30e6bb87-42c9-4d08-88ac-0d22c9c449a1", "metadata": { "tags": [] @@ -424,7 +424,7 @@ }, { "cell_type": "code", - "execution_count": 23, + "execution_count": null, "id": "658e2914-05e9-44a3-a8fe-3fe17ca84039", "metadata": {}, "outputs": [ @@ -444,6 +444,86 @@ "list(fs.yield_keys())" ] }, + { + "cell_type": "markdown", + "id": "904c1d47", + "metadata": {}, + "source": [ + "## Upstash Redis Store" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d0f9f212", + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.storage.upstash_redis import UpstashRedisStore" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "45bf62e4", + "metadata": {}, + "outputs": [], + "source": [ + "from upstash_redis import Redis\n", + "URL = \"\"\n", + "TOKEN = \"\"\n", + "\n", + "redis_client = Redis(url=URL, token=TOKEN)\n", + "store = UpstashRedisStore(client=redis_client, ttl=None, namespace=\"test-ns\")\n", + "\n", + "underlying_embeddings = OpenAIEmbeddings()\n", + "embedder = CacheBackedEmbeddings.from_bytes_store(\n", + " underlying_embeddings, store, namespace=underlying_embeddings.model\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3eac3504", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "embeddings = embedder.embed_documents([\"welcome\", \"goodbye\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "085dcd30", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "embeddings = embedder.embed_documents([\"welcome\", \"goodbye\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3570e83f", + "metadata": {}, + "outputs": [], + "source": [ + "list(store.yield_keys())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d7dc8e51", + "metadata": {}, + "outputs": [], + "source": [ + "list(store.client.scan(0))" + ] + }, { "cell_type": "markdown", "id": "cd5f5a96-6ffa-429d-aa82-00b3f6532871", @@ -455,7 +535,7 @@ }, { "cell_type": "code", - "execution_count": 24, + "execution_count": null, "id": "4879c134-141f-48a0-acfe-7d6f30253af0", "metadata": {}, "outputs": [], @@ -465,7 +545,7 @@ }, { "cell_type": "code", - "execution_count": 25, + "execution_count": null, "id": "8b2bb9a0-6549-4487-8532-29ab4ab7336f", "metadata": {}, "outputs": [], @@ -482,7 +562,7 @@ }, { "cell_type": "code", - "execution_count": 26, + "execution_count": null, "id": "eca3cb99-2bb3-49d5-81f9-1dee03da4b8c", "metadata": {}, "outputs": [ @@ -502,7 +582,7 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": null, "id": "317ba5d8-89f9-462c-b807-ad4ef26e518b", "metadata": {}, "outputs": [ @@ -522,7 +602,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": null, "id": "8a540317-5142-4491-9062-a097932b56e3", "metadata": {}, "outputs": [ @@ -544,7 +624,7 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": null, "id": "cd9b0d4a-f816-4dce-9dde-cde1ad9a65fb", "metadata": {}, "outputs": [ @@ -581,7 +661,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.1" + "version": "3.11.3" } }, "nbformat": 4, diff --git a/libs/langchain/langchain/cache.py b/libs/langchain/langchain/cache.py index be96bdf4411..1073dc74067 100644 --- a/libs/langchain/langchain/cache.py +++ b/libs/langchain/langchain/cache.py @@ -26,6 +26,7 @@ import inspect import json import logging import uuid +import warnings from datetime import timedelta from functools import lru_cache from typing import ( @@ -53,7 +54,7 @@ except ImportError: from langchain.llms.base import LLM, get_prompts from langchain.load.dump import dumps from langchain.load.load import loads -from langchain.schema import Generation +from langchain.schema import ChatGeneration, Generation from langchain.schema.cache import RETURN_VAL_TYPE, BaseCache from langchain.schema.embeddings import Embeddings from langchain.utils import get_from_env @@ -260,6 +261,92 @@ class SQLiteCache(SQLAlchemyCache): super().__init__(engine) +class UpstashRedisCache(BaseCache): + """Cache that uses Upstash Redis as a backend.""" + + def __init__(self, redis_: Any, *, ttl: Optional[int] = None): + """ + Initialize an instance of UpstashRedisCache. + + This method initializes an object with Upstash Redis caching capabilities. + It takes a `redis_` parameter, which should be an instance of an Upstash Redis + client class, allowing the object to interact with Upstash Redis + server for caching purposes. + + Parameters: + redis_: An instance of Upstash Redis client class + (e.g., Redis) used for caching. + This allows the object to communicate with + Redis server for caching operations on. + ttl (int, optional): Time-to-live (TTL) for cached items in seconds. + If provided, it sets the time duration for how long cached + items will remain valid. If not provided, cached items will not + have an automatic expiration. + """ + try: + from upstash_redis import Redis + except ImportError: + raise ValueError( + "Could not import upstash_redis python package. " + "Please install it with `pip install upstash_redis`." + ) + if not isinstance(redis_, Redis): + raise ValueError("Please pass in Upstash Redis object.") + self.redis = redis_ + self.ttl = ttl + + def _key(self, prompt: str, llm_string: str) -> str: + """Compute key from prompt and llm_string""" + return _hash(prompt + llm_string) + + def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: + """Look up based on prompt and llm_string.""" + generations = [] + # Read from a HASH + results = self.redis.hgetall(self._key(prompt, llm_string)) + if results: + for _, text in results.items(): + generations.append(Generation(text=text)) + return generations if generations else None + + def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None: + """Update cache based on prompt and llm_string.""" + for gen in return_val: + if not isinstance(gen, Generation): + raise ValueError( + "UpstashRedisCache supports caching of normal LLM generations, " + f"got {type(gen)}" + ) + if isinstance(gen, ChatGeneration): + warnings.warn( + "NOTE: Generation has not been cached. UpstashRedisCache does not" + " support caching ChatModel outputs." + ) + return + # Write to a HASH + key = self._key(prompt, llm_string) + + mapping = { + str(idx): generation.text for idx, generation in enumerate(return_val) + } + self.redis.hset(key=key, values=mapping) + + if self.ttl is not None: + self.redis.expire(key, self.ttl) + + def clear(self, **kwargs: Any) -> None: + """ + Clear cache. If `asynchronous` is True, flush asynchronously. + This flushes the *whole* db. + """ + asynchronous = kwargs.get("asynchronous", False) + if asynchronous: + asynchronous = "ASYNC" + else: + asynchronous = "SYNC" + self.redis.flushdb(flush_type=asynchronous) + + class RedisCache(BaseCache): """Cache that uses Redis as a backend.""" diff --git a/libs/langchain/langchain/memory/__init__.py b/libs/langchain/langchain/memory/__init__.py index 026225ddb05..6db58403470 100644 --- a/libs/langchain/langchain/memory/__init__.py +++ b/libs/langchain/langchain/memory/__init__.py @@ -44,6 +44,7 @@ from langchain.memory.chat_message_histories import ( RedisChatMessageHistory, SQLChatMessageHistory, StreamlitChatMessageHistory, + UpstashRedisChatMessageHistory, XataChatMessageHistory, ZepChatMessageHistory, ) @@ -53,6 +54,7 @@ from langchain.memory.entity import ( InMemoryEntityStore, RedisEntityStore, SQLiteEntityStore, + UpstashRedisEntityStore, ) from langchain.memory.kg import ConversationKGMemory from langchain.memory.motorhead_memory import MotorheadMemory @@ -96,4 +98,6 @@ __all__ = [ "XataChatMessageHistory", "ZepChatMessageHistory", "ZepMemory", + "UpstashRedisEntityStore", + "UpstashRedisChatMessageHistory", ] diff --git a/libs/langchain/langchain/memory/chat_message_histories/__init__.py b/libs/langchain/langchain/memory/chat_message_histories/__init__.py index 772f07f084f..760e9d2eff3 100644 --- a/libs/langchain/langchain/memory/chat_message_histories/__init__.py +++ b/libs/langchain/langchain/memory/chat_message_histories/__init__.py @@ -20,6 +20,9 @@ from langchain.memory.chat_message_histories.sql import SQLChatMessageHistory from langchain.memory.chat_message_histories.streamlit import ( StreamlitChatMessageHistory, ) +from langchain.memory.chat_message_histories.upstash_redis import ( + UpstashRedisChatMessageHistory, +) from langchain.memory.chat_message_histories.xata import XataChatMessageHistory from langchain.memory.chat_message_histories.zep import ZepChatMessageHistory @@ -40,4 +43,5 @@ __all__ = [ "StreamlitChatMessageHistory", "XataChatMessageHistory", "ZepChatMessageHistory", + "UpstashRedisChatMessageHistory", ] diff --git a/libs/langchain/langchain/memory/chat_message_histories/upstash_redis.py b/libs/langchain/langchain/memory/chat_message_histories/upstash_redis.py new file mode 100644 index 00000000000..8cc3ac00f40 --- /dev/null +++ b/libs/langchain/langchain/memory/chat_message_histories/upstash_redis.py @@ -0,0 +1,67 @@ +import json +import logging +from typing import List, Optional + +from langchain.schema import ( + BaseChatMessageHistory, +) +from langchain.schema.messages import BaseMessage, _message_to_dict, messages_from_dict + +logger = logging.getLogger(__name__) + + +class UpstashRedisChatMessageHistory(BaseChatMessageHistory): + """Chat message history stored in an Upstash Redis database.""" + + def __init__( + self, + session_id: str, + url: str = "", + token: str = "", + key_prefix: str = "message_store:", + ttl: Optional[int] = None, + ): + try: + from upstash_redis import Redis + except ImportError: + raise ImportError( + "Could not import upstash redis python package. " + "Please install it with `pip install upstash_redis`." + ) + + if url == "" or token == "": + raise ValueError( + "UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN are needed." + ) + + try: + self.redis_client = Redis(url=url, token=token) + except Exception: + logger.error("Upstash Redis instance could not be initiated.") + + self.session_id = session_id + self.key_prefix = key_prefix + self.ttl = ttl + + @property + def key(self) -> str: + """Construct the record key to use""" + return self.key_prefix + self.session_id + + @property + def messages(self) -> List[BaseMessage]: # type: ignore + """Retrieve the messages from Upstash Redis""" + _items = self.redis_client.lrange(self.key, 0, -1) + items = [json.loads(m) for m in _items[::-1]] + messages = messages_from_dict(items) + return messages + + def add_message(self, message: BaseMessage) -> None: + """Append the message to the record in Upstash Redis""" + self.redis_client.lpush(self.key, json.dumps(_message_to_dict(message))) + if self.ttl: + self.redis_client.expire(self.key, self.ttl) + + def clear(self) -> None: + """Clear session memory from Upstash Redis""" + self.redis_client.delete(self.key) diff --git a/libs/langchain/langchain/memory/entity.py b/libs/langchain/langchain/memory/entity.py index cc48026f8c4..a4ae4c132cd 100644 --- a/libs/langchain/langchain/memory/entity.py +++ b/libs/langchain/langchain/memory/entity.py @@ -69,6 +69,84 @@ class InMemoryEntityStore(BaseEntityStore): return self.store.clear() +class UpstashRedisEntityStore(BaseEntityStore): + """Upstash Redis backed Entity store. + + Entities get a TTL of 1 day by default, and + that TTL is extended by 3 days every time the entity is read back. + """ + + def __init__( + self, + session_id: str = "default", + url: str = "", + token: str = "", + key_prefix: str = "memory_store", + ttl: Optional[int] = 60 * 60 * 24, + recall_ttl: Optional[int] = 60 * 60 * 24 * 3, + *args: Any, + **kwargs: Any, + ): + try: + from upstash_redis import Redis + except ImportError: + raise ImportError( + "Could not import upstash_redis python package. " + "Please install it with `pip install upstash_redis`." + ) + + super().__init__(*args, **kwargs) + + try: + self.redis_client = Redis(url=url, token=token) + except Exception: + logger.error("Upstash Redis instance could not be initiated.") + + self.session_id = session_id + self.key_prefix = key_prefix + self.ttl = ttl + self.recall_ttl = recall_ttl or ttl + + @property + def full_key_prefix(self) -> str: + return f"{self.key_prefix}:{self.session_id}" + + def get(self, key: str, default: Optional[str] = None) -> Optional[str]: + res = ( + self.redis_client.getex(f"{self.full_key_prefix}:{key}", ex=self.recall_ttl) + or default + or "" + ) + logger.debug(f"Upstash Redis MEM get '{self.full_key_prefix}:{key}': '{res}'") + return res + + def set(self, key: str, value: Optional[str]) -> None: + if not value: + return self.delete(key) + self.redis_client.set(f"{self.full_key_prefix}:{key}", value, ex=self.ttl) + logger.debug( + f"Redis MEM set '{self.full_key_prefix}:{key}': '{value}' EX {self.ttl}" + ) + + def delete(self, key: str) -> None: + self.redis_client.delete(f"{self.full_key_prefix}:{key}") + + def exists(self, key: str) -> bool: + return self.redis_client.exists(f"{self.full_key_prefix}:{key}") == 1 + + def clear(self) -> None: + def scan_and_delete(cursor: int) -> int: + cursor, keys_to_delete = self.redis_client.scan( + cursor, f"{self.full_key_prefix}:*" + ) + self.redis_client.delete(*keys_to_delete) + return cursor + + cursor = scan_and_delete(0) + while cursor != 0: + scan_and_delete(cursor) + + class RedisEntityStore(BaseEntityStore): """Redis-backed Entity store. diff --git a/libs/langchain/langchain/storage/__init__.py b/libs/langchain/langchain/storage/__init__.py index 49a721b59ab..bf95c8b9d39 100644 --- a/libs/langchain/langchain/storage/__init__.py +++ b/libs/langchain/langchain/storage/__init__.py @@ -11,6 +11,7 @@ from langchain.storage.encoder_backed import EncoderBackedStore from langchain.storage.file_system import LocalFileStore from langchain.storage.in_memory import InMemoryStore from langchain.storage.redis import RedisStore +from langchain.storage.upstash_redis import UpstashRedisStore __all__ = [ "EncoderBackedStore", @@ -19,4 +20,5 @@ __all__ = [ "RedisStore", "create_lc_store", "create_kv_docstore", + "UpstashRedisStore", ] diff --git a/libs/langchain/langchain/storage/upstash_redis.py b/libs/langchain/langchain/storage/upstash_redis.py new file mode 100644 index 00000000000..e9dc4fd6557 --- /dev/null +++ b/libs/langchain/langchain/storage/upstash_redis.py @@ -0,0 +1,119 @@ +from typing import Any, Iterator, List, Optional, Sequence, Tuple, cast + +from langchain.schema import BaseStore + + +class UpstashRedisStore(BaseStore[str, str]): + """BaseStore implementation using Upstash Redis as the underlying store.""" + + def __init__( + self, + *, + client: Any = None, + url: Optional[str] = None, + token: Optional[str] = None, + ttl: Optional[int] = None, + namespace: Optional[str] = None, + ) -> None: + """Initialize the UpstashRedisStore with HTTP API. + + Must provide either an Upstash Redis client or a url. + + Args: + client: An Upstash Redis instance + url: UPSTASH_REDIS_REST_URL + token: UPSTASH_REDIS_REST_TOKEN + ttl: time to expire keys in seconds if provided, + if None keys will never expire + namespace: if provided, all keys will be prefixed with this namespace + """ + try: + from upstash_redis import Redis + except ImportError as e: + raise ImportError( + "UpstashRedisStore requires the upstash_redis library to be installed. " + "pip install upstash_redis" + ) from e + + if client and url: + raise ValueError( + "Either an Upstash Redis client or a url must be provided, not both." + ) + + if client: + if not isinstance(client, Redis): + raise TypeError( + f"Expected Upstash Redis client, got {type(client).__name__}." + ) + _client = client + else: + if not url or not token: + raise ValueError( + "Either an Upstash Redis client or url and token must be provided." + ) + _client = Redis(url=url, token=token) + + self.client = _client + + if not isinstance(ttl, int) and ttl is not None: + raise TypeError(f"Expected int or None, got {type(ttl)} instead.") + + self.ttl = ttl + self.namespace = namespace + + def _get_prefixed_key(self, key: str) -> str: + """Get the key with the namespace prefix. + + Args: + key (str): The original key. + + Returns: + str: The key with the namespace prefix. + """ + delimiter = "/" + if self.namespace: + return f"{self.namespace}{delimiter}{key}" + return key + + def mget(self, keys: Sequence[str]) -> List[Optional[str]]: + """Get the values associated with the given keys.""" + + keys = [self._get_prefixed_key(key) for key in keys] + return cast( + List[Optional[str]], + self.client.mget(*keys), + ) + + def mset(self, key_value_pairs: Sequence[Tuple[str, str]]) -> None: + """Set the given key-value pairs.""" + for key, value in key_value_pairs: + self.client.set(self._get_prefixed_key(key), value, ex=self.ttl) + + def mdelete(self, keys: Sequence[str]) -> None: + """Delete the given keys.""" + _keys = [self._get_prefixed_key(key) for key in keys] + self.client.delete(*_keys) + + def yield_keys(self, *, prefix: Optional[str] = None) -> Iterator[str]: + """Yield keys in the store.""" + if prefix: + pattern = self._get_prefixed_key(prefix) + else: + pattern = self._get_prefixed_key("*") + + cursor, keys = self.client.scan(0, match=pattern) + for key in keys: + if self.namespace: + relative_key = key[len(self.namespace) + 1 :] + yield relative_key + else: + yield key + + while cursor != 0: + cursor, keys = self.client.scan(cursor, match=pattern) + for key in keys: + if self.namespace: + relative_key = key[len(self.namespace) + 1 :] + yield relative_key + else: + yield key diff --git a/libs/langchain/poetry.lock b/libs/langchain/poetry.lock index b2e4d99b071..e713936b69a 100644 --- a/libs/langchain/poetry.lock +++ b/libs/langchain/poetry.lock @@ -10131,6 +10131,21 @@ tzdata = {version = "*", markers = "platform_system == \"Windows\""} [package.extras] devenv = ["black", "check-manifest", "flake8", "pyroma", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"] +[[package]] +name = "upstash-redis" +version = "0.15.0" +description = "Serverless Redis SDK from Upstash" +optional = true +python-versions = ">=3.8,<4.0" +files = [ + {file = "upstash_redis-0.15.0-py3-none-any.whl", hash = "sha256:4a89913cb2bb2422610bc2a9c8d6b9a9d75d0674c22c5ea8037d35d343ee5846"}, + {file = "upstash_redis-0.15.0.tar.gz", hash = "sha256:910f6a567142167b742c38efecfabf23f47e24fcbddb00a6b5845cb11064c3af"}, +] + +[package.dependencies] +aiohttp = ">=3.8.4,<4.0.0" +requests = ">=2.31.0,<3.0.0" + [[package]] name = "uri-template" version = "1.3.0" @@ -10883,7 +10898,7 @@ cli = ["typer"] cohere = ["cohere"] docarray = ["docarray"] embeddings = ["sentence-transformers"] -extended-testing = ["aiosqlite", "amazon-textract-caller", "anthropic", "arxiv", "assemblyai", "atlassian-python-api", "beautifulsoup4", "bibtexparser", "cassio", "chardet", "dashvector", "esprima", "faiss-cpu", "feedparser", "geopandas", "gitpython", "gql", "html2text", "jinja2", "jq", "lxml", "markdownify", "motor", "mwparserfromhell", "mwxml", "newspaper3k", "numexpr", "openai", "openai", "openapi-schema-pydantic", "pandas", "pdfminer-six", "pgvector", "psychicapi", "py-trello", "pymupdf", "pypdf", "pypdfium2", "pyspark", "rank-bm25", "rapidfuzz", "rapidocr-onnxruntime", "requests-toolbelt", "rspace_client", "scikit-learn", "sqlite-vss", "streamlit", "sympy", "telethon", "timescale-vector", "tqdm", "xata", "xmltodict"] +extended-testing = ["aiosqlite", "amazon-textract-caller", "anthropic", "arxiv", "assemblyai", "atlassian-python-api", "beautifulsoup4", "bibtexparser", "cassio", "chardet", "dashvector", "esprima", "faiss-cpu", "feedparser", "geopandas", "gitpython", "gql", "html2text", "jinja2", "jq", "lxml", "markdownify", "motor", "mwparserfromhell", "mwxml", "newspaper3k", "numexpr", "openai", "openai", "openapi-schema-pydantic", "pandas", "pdfminer-six", "pgvector", "psychicapi", "py-trello", "pymupdf", "pypdf", "pypdfium2", "pyspark", "rank-bm25", "rapidfuzz", "rapidocr-onnxruntime", "requests-toolbelt", "rspace_client", "scikit-learn", "sqlite-vss", "streamlit", "sympy", "telethon", "timescale-vector", "tqdm", "upstash-redis", "xata", "xmltodict"] javascript = ["esprima"] llms = ["clarifai", "cohere", "huggingface_hub", "manifest-ml", "nlpcloud", "openai", "openlm", "torch", "transformers"] openai = ["openai", "tiktoken"] @@ -10893,4 +10908,4 @@ text-helpers = ["chardet"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "3a5bca34a60eaa9b66a4d1f9ec14de5e6a0e5ca1071a0a874499fe122cc0ee36" +content-hash = "6205031e342d6e4640b47b0b5a37fa7d11ea1915e8a3ee05c00e2e49fdec071e" diff --git a/libs/langchain/pyproject.toml b/libs/langchain/pyproject.toml index 8d9451e7c14..5514b35e51f 100644 --- a/libs/langchain/pyproject.toml +++ b/libs/langchain/pyproject.toml @@ -139,6 +139,7 @@ typer = {version= "^0.9.0", optional = true} anthropic = {version = "^0.3.11", optional = true} aiosqlite = {version = "^0.19.0", optional = true} rspace_client = {version = "^2.5.0", optional = true} +upstash-redis = {version = "^0.15.0", optional = true} [tool.poetry.group.test.dependencies] @@ -367,6 +368,7 @@ extended_testing = [ "motor", "timescale-vector", "anthropic", + "upstash-redis", "rspace_client", ] diff --git a/libs/langchain/tests/integration_tests/cache/test_upstash_redis_cache.py b/libs/langchain/tests/integration_tests/cache/test_upstash_redis_cache.py new file mode 100644 index 00000000000..1a78d0ae85f --- /dev/null +++ b/libs/langchain/tests/integration_tests/cache/test_upstash_redis_cache.py @@ -0,0 +1,91 @@ +"""Test Upstash Redis cache functionality.""" +import uuid + +import pytest + +import langchain +from langchain.cache import UpstashRedisCache +from langchain.schema import Generation, LLMResult +from tests.unit_tests.llms.fake_chat_model import FakeChatModel +from tests.unit_tests.llms.fake_llm import FakeLLM + +URL = "" +TOKEN = "" + + +def random_string() -> str: + return str(uuid.uuid4()) + + +@pytest.mark.requires("upstash_redis") +def test_redis_cache_ttl() -> None: + from upstash_redis import Redis + + langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN), ttl=1) + langchain.llm_cache.update("foo", "bar", [Generation(text="fizz")]) + key = langchain.llm_cache._key("foo", "bar") + assert langchain.llm_cache.redis.pttl(key) > 0 + + +@pytest.mark.requires("upstash_redis") +def test_redis_cache() -> None: + from upstash_redis import Redis + + langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN), ttl=1) + llm = FakeLLM() + params = llm.dict() + params["stop"] = None + llm_string = str(sorted([(k, v) for k, v in params.items()])) + langchain.llm_cache.update("foo", llm_string, [Generation(text="fizz")]) + output = llm.generate(["foo"]) + expected_output = LLMResult( + generations=[[Generation(text="fizz")]], + llm_output={}, + ) + assert output == expected_output + + lookup_output = langchain.llm_cache.lookup("foo", llm_string) + if lookup_output and len(lookup_output) > 0: + assert lookup_output == expected_output.generations[0] + + langchain.llm_cache.clear() + output = llm.generate(["foo"]) + + assert output != expected_output + langchain.llm_cache.redis.flushall() + + +def test_redis_cache_multi() -> None: + from upstash_redis import Redis + + langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN), ttl=1) + llm = FakeLLM() + params = llm.dict() + params["stop"] = None + llm_string = str(sorted([(k, v) for k, v in params.items()])) + langchain.llm_cache.update( + "foo", llm_string, [Generation(text="fizz"), Generation(text="Buzz")] + ) + output = llm.generate( + ["foo"] + ) # foo and bar will have the same embedding produced by FakeEmbeddings + expected_output = LLMResult( + generations=[[Generation(text="fizz"), Generation(text="Buzz")]], + llm_output={}, + ) + assert output == expected_output + # clear the cache + langchain.llm_cache.clear() + + +@pytest.mark.requires("upstash_redis") +def test_redis_cache_chat() -> None: + from upstash_redis import Redis + + langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN), ttl=1) + llm = FakeChatModel() + params = llm.dict() + params["stop"] = None + with pytest.warns(): + llm.predict("foo") + langchain.llm_cache.redis.flushall() diff --git a/libs/langchain/tests/integration_tests/memory/test_upstash_redis.py b/libs/langchain/tests/integration_tests/memory/test_upstash_redis.py new file mode 100644 index 00000000000..ff9b9a445ea --- /dev/null +++ b/libs/langchain/tests/integration_tests/memory/test_upstash_redis.py @@ -0,0 +1,38 @@ +import json + +import pytest + +from langchain.memory import ConversationBufferMemory +from langchain.memory.chat_message_histories.upstash_redis import ( + UpstashRedisChatMessageHistory, +) +from langchain.schema.messages import _message_to_dict + +URL = "" +TOKEN = "" + + +@pytest.mark.requires("upstash_redis") +def test_memory_with_message_store() -> None: + """Test the memory with a message store.""" + # setup Upstash Redis as a message store + message_history = UpstashRedisChatMessageHistory( + url=URL, token=TOKEN, ttl=10, session_id="my-test-session" + ) + memory = ConversationBufferMemory( + memory_key="baz", chat_memory=message_history, return_messages=True + ) + + # add some messages + memory.chat_memory.add_ai_message("This is me, the AI") + memory.chat_memory.add_user_message("This is me, the human") + + # get the message history from the memory store and turn it into a json + messages = memory.chat_memory.messages + messages_json = json.dumps([_message_to_dict(msg) for msg in messages]) + + assert "This is me, the AI" in messages_json + assert "This is me, the human" in messages_json + + # remove the record from Redis, so the next test run won't pick it up + memory.chat_memory.clear() diff --git a/libs/langchain/tests/integration_tests/storage/test_upstash_redis.py b/libs/langchain/tests/integration_tests/storage/test_upstash_redis.py new file mode 100644 index 00000000000..ead1cd06fcc --- /dev/null +++ b/libs/langchain/tests/integration_tests/storage/test_upstash_redis.py @@ -0,0 +1,95 @@ +"""Implement integration tests for Redis storage.""" + +import pytest +from upstash_redis import Redis + +from langchain.storage.upstash_redis import UpstashRedisStore + +pytest.importorskip("upstash_redis") + +URL = "" +TOKEN = "" + + +@pytest.fixture +def redis_client() -> Redis: + """Yield redis client.""" + from upstash_redis import Redis + + # This fixture flushes the database! + + client = Redis(url=URL, token=TOKEN) + try: + client.ping() + except Exception: + pytest.skip("Ping request failed. Verify that credentials are correct.") + + client.flushdb() + return client + + +def test_mget(redis_client: Redis) -> None: + store = UpstashRedisStore(client=redis_client, ttl=None) + keys = ["key1", "key2"] + redis_client.mset({"key1": "value1", "key2": "value2"}) + result = store.mget(keys) + assert result == ["value1", "value2"] + + +def test_mset(redis_client: Redis) -> None: + store = UpstashRedisStore(client=redis_client, ttl=None) + key_value_pairs = [("key1", "value1"), ("key2", "value2")] + store.mset(key_value_pairs) + result = redis_client.mget("key1", "key2") + assert result == ["value1", "value2"] + + +def test_mdelete(redis_client: Redis) -> None: + """Test that deletion works as expected.""" + store = UpstashRedisStore(client=redis_client, ttl=None) + keys = ["key1", "key2"] + redis_client.mset({"key1": "value1", "key2": "value2"}) + store.mdelete(keys) + result = redis_client.mget(*keys) + assert result == [None, None] + + +def test_yield_keys(redis_client: Redis) -> None: + store = UpstashRedisStore(client=redis_client, ttl=None) + redis_client.mset({"key1": "value2", "key2": "value2"}) + assert sorted(store.yield_keys()) == ["key1", "key2"] + assert sorted(store.yield_keys(prefix="key*")) == ["key1", "key2"] + assert sorted(store.yield_keys(prefix="lang*")) == [] + + +def test_namespace(redis_client: Redis) -> None: + store = UpstashRedisStore(client=redis_client, ttl=None, namespace="meow") + key_value_pairs = [("key1", "value1"), ("key2", "value2")] + store.mset(key_value_pairs) + + cursor, all_keys = redis_client.scan(0) + while cursor != 0: + cursor, keys = redis_client.scan(cursor) + if len(keys) != 0: + all_keys.extend(keys) + + assert sorted(all_keys) == [ + "meow/key1", + "meow/key2", + ] + + store.mdelete(["key1"]) + + cursor, all_keys = redis_client.scan(0, match="*") + while cursor != 0: + cursor, keys = redis_client.scan(cursor, match="*") + if len(keys) != 0: + all_keys.extend(keys) + + assert sorted(all_keys) == [ + "meow/key2", + ] + + assert list(store.yield_keys()) == ["key2"] + assert list(store.yield_keys(prefix="key*")) == ["key2"] + assert list(store.yield_keys(prefix="key1")) == [] diff --git a/libs/langchain/tests/unit_tests/storage/test_upstash_redis.py b/libs/langchain/tests/unit_tests/storage/test_upstash_redis.py new file mode 100644 index 00000000000..0e7ba2da65c --- /dev/null +++ b/libs/langchain/tests/unit_tests/storage/test_upstash_redis.py @@ -0,0 +1,8 @@ +"""Light weight unit test that attempts to import UpstashRedisStore. +""" +import pytest + + +@pytest.mark.requires("upstash_redis") +def test_import_storage() -> None: + from langchain.storage.upstash_redis import UpstashRedisStore # noqa