From 26a3256fc6642a2a4f4fa1d9cdc219657e6958a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kacper=20W=C5=82odarczyk?= <49692261+wuodar@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:13:32 +0200 Subject: [PATCH] community[major]: DynamoDBChatMessageHistory bulk add messages, raise errors (#30572) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR addresses two key issues: - **Prevent history errors from failing silently**: Previously, errors in message history were only logged and not raised, which can lead to inconsistent state and downstream failures (e.g., ValidationError from Bedrock due to malformed message history). This change ensures that such errors are raised explicitly, making them easier to detect and debug. (Side note: I’m using AWS Lambda Powertools Logger but hadn’t configured it properly with the standard Python logger—my bad. If the error had been raised, I would’ve seen it in the logs 😄) This is a **BREAKING CHANGE** - **Add messages in bulk instead of iteratively**: This introduces a custom add_messages method to add all messages at once. The previous approach failed silently when individual messages were too large, resulting in partial history updates and inconsistent state. With this change, either all messages are added successfully, or none are—helping avoid obscure history-related errors from Bedrock. --------- Co-authored-by: Kacper Wlodarczyk --- .../chat_message_histories/dynamodb.py | 88 +++++-------------- 1 file changed, 24 insertions(+), 64 deletions(-) diff --git a/libs/community/langchain_community/chat_message_histories/dynamodb.py b/libs/community/langchain_community/chat_message_histories/dynamodb.py index 62beb4895e7..a78e6ae54aa 100644 --- a/libs/community/langchain_community/chat_message_histories/dynamodb.py +++ b/libs/community/langchain_community/chat_message_histories/dynamodb.py @@ -1,13 +1,11 @@ from __future__ import annotations -import logging from decimal import Decimal -from typing import TYPE_CHECKING, Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.messages import ( BaseMessage, - message_to_dict, messages_from_dict, messages_to_dict, ) @@ -15,8 +13,6 @@ from langchain_core.messages import ( if TYPE_CHECKING: from boto3.session import Session -logger = logging.getLogger(__name__) - def convert_messages(item: List) -> List: if isinstance(item, list): @@ -131,21 +127,8 @@ class DynamoDBChatMessageHistory(BaseChatMessageHistory): @property def messages(self) -> List[BaseMessage]: """Retrieve the messages from DynamoDB""" - try: - from botocore.exceptions import ClientError - except ImportError as e: - raise ImportError( - "Unable to import botocore, please install with `pip install botocore`." - ) from e - response = None - try: - response = self.table.get_item(Key=self.key) - except ClientError as error: - if error.response["Error"]["Code"] == "ResourceNotFoundException": - logger.warning("No record found with session id: %s", self.session_id) - else: - logger.error(error) + response = self.table.get_item(Key=self.key) if response and "Item" in response: items = response["Item"][self.history_messages_key] @@ -162,57 +145,34 @@ class DynamoDBChatMessageHistory(BaseChatMessageHistory): " Use the 'add_messages' instead." ) - def add_message(self, message: BaseMessage) -> None: + def add_messages(self, messages: Sequence[BaseMessage]) -> None: """Append the message to the record in DynamoDB""" - try: - from botocore.exceptions import ClientError - except ImportError as e: - raise ImportError( - "Unable to import botocore, please install with `pip install botocore`." - ) from e - - messages = messages_to_dict(self.messages) - _message = message_to_dict(message) - messages.append(_message) - + existing_messages = messages_to_dict(self.messages) + existing_messages.extend(messages_to_dict(messages)) if self.coerce_float_to_decimal: - messages = convert_messages(messages) + existing_messages = convert_messages(existing_messages) if self.history_size: - messages = messages[-self.history_size :] + existing_messages = existing_messages[-self.history_size :] - try: - if self.ttl: - import time + if self.ttl: + import time - expireAt = int(time.time()) + self.ttl - self.table.update_item( - Key={**self.key}, - UpdateExpression=( - f"set {self.history_messages_key} = :h, " - f"{self.ttl_key_name} = :t" - ), - ExpressionAttributeValues={":h": messages, ":t": expireAt}, - ) - else: - self.table.update_item( - Key={**self.key}, - UpdateExpression=f"set {self.history_messages_key} = :h", - ExpressionAttributeValues={":h": messages}, - ) - except ClientError as err: - logger.error(err) + expireAt = int(time.time()) + self.ttl + self.table.update_item( + Key={**self.key}, + UpdateExpression=( + f"set {self.history_messages_key} = :h, {self.ttl_key_name} = :t" + ), + ExpressionAttributeValues={":h": existing_messages, ":t": expireAt}, + ) + else: + self.table.update_item( + Key={**self.key}, + UpdateExpression=f"set {self.history_messages_key} = :h", + ExpressionAttributeValues={":h": existing_messages}, + ) def clear(self) -> None: """Clear session memory from DynamoDB""" - try: - from botocore.exceptions import ClientError - except ImportError as e: - raise ImportError( - "Unable to import botocore, please install with `pip install botocore`." - ) from e - - try: - self.table.delete_item(Key=self.key) - except ClientError as err: - logger.error(err) + self.table.delete_item(Key=self.key)