diff --git a/libs/community/langchain_community/chat_message_histories/dynamodb.py b/libs/community/langchain_community/chat_message_histories/dynamodb.py index 62beb4895e7..a78e6ae54aa 100644 --- a/libs/community/langchain_community/chat_message_histories/dynamodb.py +++ b/libs/community/langchain_community/chat_message_histories/dynamodb.py @@ -1,13 +1,11 @@ from __future__ import annotations -import logging from decimal import Decimal -from typing import TYPE_CHECKING, Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.messages import ( BaseMessage, - message_to_dict, messages_from_dict, messages_to_dict, ) @@ -15,8 +13,6 @@ from langchain_core.messages import ( if TYPE_CHECKING: from boto3.session import Session -logger = logging.getLogger(__name__) - def convert_messages(item: List) -> List: if isinstance(item, list): @@ -131,21 +127,8 @@ class DynamoDBChatMessageHistory(BaseChatMessageHistory): @property def messages(self) -> List[BaseMessage]: """Retrieve the messages from DynamoDB""" - try: - from botocore.exceptions import ClientError - except ImportError as e: - raise ImportError( - "Unable to import botocore, please install with `pip install botocore`." - ) from e - response = None - try: - response = self.table.get_item(Key=self.key) - except ClientError as error: - if error.response["Error"]["Code"] == "ResourceNotFoundException": - logger.warning("No record found with session id: %s", self.session_id) - else: - logger.error(error) + response = self.table.get_item(Key=self.key) if response and "Item" in response: items = response["Item"][self.history_messages_key] @@ -162,57 +145,34 @@ class DynamoDBChatMessageHistory(BaseChatMessageHistory): " Use the 'add_messages' instead." ) - def add_message(self, message: BaseMessage) -> None: + def add_messages(self, messages: Sequence[BaseMessage]) -> None: """Append the message to the record in DynamoDB""" - try: - from botocore.exceptions import ClientError - except ImportError as e: - raise ImportError( - "Unable to import botocore, please install with `pip install botocore`." - ) from e - - messages = messages_to_dict(self.messages) - _message = message_to_dict(message) - messages.append(_message) - + existing_messages = messages_to_dict(self.messages) + existing_messages.extend(messages_to_dict(messages)) if self.coerce_float_to_decimal: - messages = convert_messages(messages) + existing_messages = convert_messages(existing_messages) if self.history_size: - messages = messages[-self.history_size :] + existing_messages = existing_messages[-self.history_size :] - try: - if self.ttl: - import time + if self.ttl: + import time - expireAt = int(time.time()) + self.ttl - self.table.update_item( - Key={**self.key}, - UpdateExpression=( - f"set {self.history_messages_key} = :h, " - f"{self.ttl_key_name} = :t" - ), - ExpressionAttributeValues={":h": messages, ":t": expireAt}, - ) - else: - self.table.update_item( - Key={**self.key}, - UpdateExpression=f"set {self.history_messages_key} = :h", - ExpressionAttributeValues={":h": messages}, - ) - except ClientError as err: - logger.error(err) + expireAt = int(time.time()) + self.ttl + self.table.update_item( + Key={**self.key}, + UpdateExpression=( + f"set {self.history_messages_key} = :h, {self.ttl_key_name} = :t" + ), + ExpressionAttributeValues={":h": existing_messages, ":t": expireAt}, + ) + else: + self.table.update_item( + Key={**self.key}, + UpdateExpression=f"set {self.history_messages_key} = :h", + ExpressionAttributeValues={":h": existing_messages}, + ) def clear(self) -> None: """Clear session memory from DynamoDB""" - try: - from botocore.exceptions import ClientError - except ImportError as e: - raise ImportError( - "Unable to import botocore, please install with `pip install botocore`." - ) from e - - try: - self.table.delete_item(Key=self.key) - except ClientError as err: - logger.error(err) + self.table.delete_item(Key=self.key)