community[major]: DynamoDBChatMessageHistory bulk add messages, raise errors (#30572)

This PR addresses two key issues:

- **Prevent history errors from failing silently**: Previously, errors
in message history were only logged and not raised, which can lead to
inconsistent state and downstream failures (e.g., ValidationError from
Bedrock due to malformed message history). This change ensures that such
errors are raised explicitly, making them easier to detect and debug.
(Side note: I’m using AWS Lambda Powertools Logger but hadn’t configured
it properly with the standard Python logger—my bad. If the error had
been raised, I would’ve seen it in the logs 😄) This is a **BREAKING
CHANGE**

- **Add messages in bulk instead of iteratively**: This introduces a
custom add_messages method to add all messages at once. The previous
approach failed silently when individual messages were too large,
resulting in partial history updates and inconsistent state. With this
change, either all messages are added successfully, or none are—helping
avoid obscure history-related errors from Bedrock.

---------

Co-authored-by: Kacper Wlodarczyk <kacper.wlodarczyk@chaosgears.com>
This commit is contained in:
Kacper Włodarczyk 2025-04-01 17:13:32 +02:00 committed by GitHub
parent 8c8bca68b2
commit 26a3256fc6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,13 +1,11 @@
from __future__ import annotations
import logging
from decimal import Decimal
from typing import TYPE_CHECKING, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
BaseMessage,
message_to_dict,
messages_from_dict,
messages_to_dict,
)
@ -15,8 +13,6 @@ from langchain_core.messages import (
if TYPE_CHECKING:
from boto3.session import Session
logger = logging.getLogger(__name__)
def convert_messages(item: List) -> List:
if isinstance(item, list):
@ -131,21 +127,8 @@ class DynamoDBChatMessageHistory(BaseChatMessageHistory):
@property
def messages(self) -> List[BaseMessage]:
"""Retrieve the messages from DynamoDB"""
try:
from botocore.exceptions import ClientError
except ImportError as e:
raise ImportError(
"Unable to import botocore, please install with `pip install botocore`."
) from e
response = None
try:
response = self.table.get_item(Key=self.key)
except ClientError as error:
if error.response["Error"]["Code"] == "ResourceNotFoundException":
logger.warning("No record found with session id: %s", self.session_id)
else:
logger.error(error)
response = self.table.get_item(Key=self.key)
if response and "Item" in response:
items = response["Item"][self.history_messages_key]
@ -162,57 +145,34 @@ class DynamoDBChatMessageHistory(BaseChatMessageHistory):
" Use the 'add_messages' instead."
)
def add_message(self, message: BaseMessage) -> None:
def add_messages(self, messages: Sequence[BaseMessage]) -> None:
"""Append the message to the record in DynamoDB"""
try:
from botocore.exceptions import ClientError
except ImportError as e:
raise ImportError(
"Unable to import botocore, please install with `pip install botocore`."
) from e
messages = messages_to_dict(self.messages)
_message = message_to_dict(message)
messages.append(_message)
existing_messages = messages_to_dict(self.messages)
existing_messages.extend(messages_to_dict(messages))
if self.coerce_float_to_decimal:
messages = convert_messages(messages)
existing_messages = convert_messages(existing_messages)
if self.history_size:
messages = messages[-self.history_size :]
existing_messages = existing_messages[-self.history_size :]
try:
if self.ttl:
import time
if self.ttl:
import time
expireAt = int(time.time()) + self.ttl
self.table.update_item(
Key={**self.key},
UpdateExpression=(
f"set {self.history_messages_key} = :h, "
f"{self.ttl_key_name} = :t"
),
ExpressionAttributeValues={":h": messages, ":t": expireAt},
)
else:
self.table.update_item(
Key={**self.key},
UpdateExpression=f"set {self.history_messages_key} = :h",
ExpressionAttributeValues={":h": messages},
)
except ClientError as err:
logger.error(err)
expireAt = int(time.time()) + self.ttl
self.table.update_item(
Key={**self.key},
UpdateExpression=(
f"set {self.history_messages_key} = :h, {self.ttl_key_name} = :t"
),
ExpressionAttributeValues={":h": existing_messages, ":t": expireAt},
)
else:
self.table.update_item(
Key={**self.key},
UpdateExpression=f"set {self.history_messages_key} = :h",
ExpressionAttributeValues={":h": existing_messages},
)
def clear(self) -> None:
"""Clear session memory from DynamoDB"""
try:
from botocore.exceptions import ClientError
except ImportError as e:
raise ImportError(
"Unable to import botocore, please install with `pip install botocore`."
) from e
try:
self.table.delete_item(Key=self.key)
except ClientError as err:
logger.error(err)
self.table.delete_item(Key=self.key)