From 8381f859b41f92f9bb172c59f98735ebc928ed4c Mon Sep 17 00:00:00 2001 From: Shashank <42868640+snsten@users.noreply.github.com> Date: Wed, 21 Feb 2024 22:45:19 +0530 Subject: [PATCH] community[patch]: Graceful handling of redis errors in RedisCache and AsyncRedisCache (#17171) - **Description:** The existing `RedisCache` implementation lacks proper handling for redis client failures, such as `ConnectionRefusedError`, leading to subsequent failures in pipeline components like LLM calls. This pull request aims to improve error handling for redis client issues, ensuring a more robust and graceful handling of such errors. - **Issue:** Fixes #16866 - **Dependencies:** No new dependency - **Twitter handle:** N/A Co-authored-by: snsten <> Co-authored-by: Eugene Yurtsev --- libs/community/langchain_community/cache.py | 50 ++++++++++++++------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/libs/community/langchain_community/cache.py b/libs/community/langchain_community/cache.py index 6b5844e463a..d25f4865907 100644 --- a/libs/community/langchain_community/cache.py +++ b/libs/community/langchain_community/cache.py @@ -461,22 +461,31 @@ class RedisCache(_RedisCacheBase): def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: """Look up based on prompt and llm_string.""" # Read from a Redis HASH - results = self.redis.hgetall(self._key(prompt, llm_string)) - return self._get_generations(results) # type: ignore[arg-type] + try: + results = self.redis.hgetall(self._key(prompt, llm_string)) + return self._get_generations(results) # type: ignore[arg-type] + except Exception as e: + logger.error(f"Redis lookup failed: {e}") + return None def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None: """Update cache based on prompt and llm_string.""" self._ensure_generation_type(return_val) key = self._key(prompt, llm_string) - - with self.redis.pipeline() as pipe: - self._configure_pipeline_for_update(key, pipe, return_val, self.ttl) - pipe.execute() + try: + with self.redis.pipeline() as pipe: + self._configure_pipeline_for_update(key, pipe, return_val, self.ttl) + pipe.execute() + except Exception as e: + logger.error(f"Redis update failed: {e}") def clear(self, **kwargs: Any) -> None: """Clear cache. If `asynchronous` is True, flush asynchronously.""" - asynchronous = kwargs.get("asynchronous", False) - self.redis.flushdb(asynchronous=asynchronous, **kwargs) + try: + asynchronous = kwargs.get("asynchronous", False) + self.redis.flushdb(asynchronous=asynchronous, **kwargs) + except Exception as e: + logger.error(f"Redis clear failed: {e}") class AsyncRedisCache(_RedisCacheBase): @@ -525,8 +534,12 @@ class AsyncRedisCache(_RedisCacheBase): async def alookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: """Look up based on prompt and llm_string. Async version.""" - results = await self.redis.hgetall(self._key(prompt, llm_string)) - return self._get_generations(results) # type: ignore[arg-type] + try: + results = await self.redis.hgetall(self._key(prompt, llm_string)) + return self._get_generations(results) # type: ignore[arg-type] + except Exception as e: + logger.error(f"Redis async lookup failed: {e}") + return None def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None: """Update cache based on prompt and llm_string.""" @@ -541,10 +554,12 @@ class AsyncRedisCache(_RedisCacheBase): """Update cache based on prompt and llm_string. Async version.""" self._ensure_generation_type(return_val) key = self._key(prompt, llm_string) - - async with self.redis.pipeline() as pipe: - self._configure_pipeline_for_update(key, pipe, return_val, self.ttl) - await pipe.execute() # type: ignore[attr-defined] + try: + async with self.redis.pipeline() as pipe: + self._configure_pipeline_for_update(key, pipe, return_val, self.ttl) + await pipe.execute() # type: ignore[attr-defined] + except Exception as e: + logger.error(f"Redis async update failed: {e}") def clear(self, **kwargs: Any) -> None: """Clear cache. If `asynchronous` is True, flush asynchronously.""" @@ -558,8 +573,11 @@ class AsyncRedisCache(_RedisCacheBase): Clear cache. If `asynchronous` is True, flush asynchronously. Async version. """ - asynchronous = kwargs.get("asynchronous", False) - await self.redis.flushdb(asynchronous=asynchronous, **kwargs) + try: + asynchronous = kwargs.get("asynchronous", False) + await self.redis.flushdb(asynchronous=asynchronous, **kwargs) + except Exception as e: + logger.error(f"Redis async clear failed: {e}") class RedisSemanticCache(BaseCache):