community[patch]: Graceful handling of redis errors in RedisCache and AsyncRedisCache (#17171)

- **Description:**
The existing `RedisCache` implementation lacks proper handling for redis
client failures, such as `ConnectionRefusedError`, leading to subsequent
failures in pipeline components like LLM calls. This pull request aims
to improve error handling for redis client issues, ensuring a more
robust and graceful handling of such errors.

  - **Issue:**  Fixes #16866
  - **Dependencies:** No new dependency
  - **Twitter handle:** N/A

Co-authored-by: snsten <>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
Shashank 2024-02-21 22:45:19 +05:30 committed by GitHub
parent e6311d953d
commit 8381f859b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -461,22 +461,31 @@ class RedisCache(_RedisCacheBase):
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Look up based on prompt and llm_string."""
# Read from a Redis HASH
results = self.redis.hgetall(self._key(prompt, llm_string))
return self._get_generations(results) # type: ignore[arg-type]
try:
results = self.redis.hgetall(self._key(prompt, llm_string))
return self._get_generations(results) # type: ignore[arg-type]
except Exception as e:
logger.error(f"Redis lookup failed: {e}")
return None
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""Update cache based on prompt and llm_string."""
self._ensure_generation_type(return_val)
key = self._key(prompt, llm_string)
with self.redis.pipeline() as pipe:
self._configure_pipeline_for_update(key, pipe, return_val, self.ttl)
pipe.execute()
try:
with self.redis.pipeline() as pipe:
self._configure_pipeline_for_update(key, pipe, return_val, self.ttl)
pipe.execute()
except Exception as e:
logger.error(f"Redis update failed: {e}")
def clear(self, **kwargs: Any) -> None:
"""Clear cache. If `asynchronous` is True, flush asynchronously."""
asynchronous = kwargs.get("asynchronous", False)
self.redis.flushdb(asynchronous=asynchronous, **kwargs)
try:
asynchronous = kwargs.get("asynchronous", False)
self.redis.flushdb(asynchronous=asynchronous, **kwargs)
except Exception as e:
logger.error(f"Redis clear failed: {e}")
class AsyncRedisCache(_RedisCacheBase):
@ -525,8 +534,12 @@ class AsyncRedisCache(_RedisCacheBase):
async def alookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Look up based on prompt and llm_string. Async version."""
results = await self.redis.hgetall(self._key(prompt, llm_string))
return self._get_generations(results) # type: ignore[arg-type]
try:
results = await self.redis.hgetall(self._key(prompt, llm_string))
return self._get_generations(results) # type: ignore[arg-type]
except Exception as e:
logger.error(f"Redis async lookup failed: {e}")
return None
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""Update cache based on prompt and llm_string."""
@ -541,10 +554,12 @@ class AsyncRedisCache(_RedisCacheBase):
"""Update cache based on prompt and llm_string. Async version."""
self._ensure_generation_type(return_val)
key = self._key(prompt, llm_string)
async with self.redis.pipeline() as pipe:
self._configure_pipeline_for_update(key, pipe, return_val, self.ttl)
await pipe.execute() # type: ignore[attr-defined]
try:
async with self.redis.pipeline() as pipe:
self._configure_pipeline_for_update(key, pipe, return_val, self.ttl)
await pipe.execute() # type: ignore[attr-defined]
except Exception as e:
logger.error(f"Redis async update failed: {e}")
def clear(self, **kwargs: Any) -> None:
"""Clear cache. If `asynchronous` is True, flush asynchronously."""
@ -558,8 +573,11 @@ class AsyncRedisCache(_RedisCacheBase):
Clear cache. If `asynchronous` is True, flush asynchronously.
Async version.
"""
asynchronous = kwargs.get("asynchronous", False)
await self.redis.flushdb(asynchronous=asynchronous, **kwargs)
try:
asynchronous = kwargs.get("asynchronous", False)
await self.redis.flushdb(asynchronous=asynchronous, **kwargs)
except Exception as e:
logger.error(f"Redis async clear failed: {e}")
class RedisSemanticCache(BaseCache):