From 49f7c8cdd802f76de497daaf78dd0b07ad56a606 Mon Sep 17 00:00:00 2001 From: Bagatur Date: Wed, 28 Aug 2024 16:31:30 -0700 Subject: [PATCH] core[minor]: Add msg content formatting util --- libs/core/langchain_core/messages/utils.py | 735 +++++++++++++++++- .../tests/unit_tests/messages/test_utils.py | 221 ++++++ 2 files changed, 931 insertions(+), 25 deletions(-) diff --git a/libs/core/langchain_core/messages/utils.py b/libs/core/langchain_core/messages/utils.py index eb0b7df86a2..07d84cc260e 100644 --- a/libs/core/langchain_core/messages/utils.py +++ b/libs/core/langchain_core/messages/utils.py @@ -9,8 +9,10 @@ Some examples of what you can do with these functions include: from __future__ import annotations +import base64 import inspect import json +import re from functools import partial from typing import ( TYPE_CHECKING, @@ -37,6 +39,9 @@ from langchain_core.messages.human import HumanMessage, HumanMessageChunk from langchain_core.messages.modifier import RemoveMessage from langchain_core.messages.system import SystemMessage, SystemMessageChunk from langchain_core.messages.tool import ToolMessage, ToolMessageChunk +from langchain_core.messages.tool import ( + tool_call as create_tool_call, +) if TYPE_CHECKING: from langchain_text_splitters import TextSplitter @@ -252,7 +257,9 @@ def _create_message_from_message_type( return message -def _convert_to_message(message: MessageLikeRepresentation) -> BaseMessage: +def _convert_to_message( + message: MessageLikeRepresentation, *, copy: bool = False +) -> BaseMessage: """Instantiate a message from a variety of message formats. The message format can be one of the following: @@ -274,7 +281,10 @@ def _convert_to_message(message: MessageLikeRepresentation) -> BaseMessage: ValueError: if the message dict does not contain the required keys. """ if isinstance(message, BaseMessage): - _message = message + if copy: + _message = message.__class__(**message.dict()) + else: + _message = message elif isinstance(message, str): _message = _create_message_from_message_type("human", message) elif isinstance(message, Sequence) and len(message) == 2: @@ -305,6 +315,8 @@ def _convert_to_message(message: MessageLikeRepresentation) -> BaseMessage: def convert_to_messages( messages: Union[Iterable[MessageLikeRepresentation], PromptValue], + *, + copy: bool = False, ) -> List[BaseMessage]: """Convert a sequence of messages to a list of messages. @@ -319,35 +331,87 @@ def convert_to_messages( if isinstance(messages, PromptValue): return messages.to_messages() - return [_convert_to_message(m) for m in messages] + return [_convert_to_message(m, copy=copy) for m in messages] -def _runnable_support(func: Callable) -> Callable: - @overload - def wrapped( - messages: Literal[None] = None, **kwargs: Any - ) -> Runnable[Sequence[MessageLikeRepresentation], List[BaseMessage]]: ... +def _runnable_support(*args: Callable, supports_single: bool = False) -> Callable: + if supports_single: - @overload - def wrapped( - messages: Sequence[MessageLikeRepresentation], **kwargs: Any - ) -> List[BaseMessage]: ... + def runnable_support(func: Callable) -> Callable: + @overload + def wrapped( + messages: Literal[None] = None, **kwargs: Any + ) -> Runnable[ + Union[MessageLikeRepresentation, Sequence[MessageLikeRepresentation]], + Union[BaseMessage, List[BaseMessage]], + ]: ... - def wrapped( - messages: Optional[Sequence[MessageLikeRepresentation]] = None, **kwargs: Any - ) -> Union[ - List[BaseMessage], - Runnable[Sequence[MessageLikeRepresentation], List[BaseMessage]], - ]: - from langchain_core.runnables.base import RunnableLambda + @overload + def wrapped( + messages: Sequence[Union[BaseMessage, Dict, Tuple]], **kwargs: Any + ) -> List[BaseMessage]: ... - if messages is not None: - return func(messages, **kwargs) - else: - return RunnableLambda(partial(func, **kwargs), name=func.__name__) + @overload + def wrapped( + messages: MessageLikeRepresentation, **kwargs: Any + ) -> BaseMessage: ... - wrapped.__doc__ = func.__doc__ - return wrapped + def wrapped( + messages: Union[ + MessageLikeRepresentation, Sequence[MessageLikeRepresentation], None + ] = None, + **kwargs: Any, + ) -> Union[ + BaseMessage, + List[BaseMessage], + Runnable[ + Union[ + MessageLikeRepresentation, Sequence[MessageLikeRepresentation] + ], + Union[BaseMessage, List[BaseMessage]], + ], + ]: + from langchain_core.runnables.base import RunnableLambda + + if messages is not None: + return func(messages, **kwargs) + else: + return RunnableLambda(partial(func, **kwargs), name=func.__name__) + + wrapped.__doc__ = func.__doc__ + return wrapped + + else: + + def runnable_support(func: Callable) -> Callable: + @overload + def wrapped( + messages: Literal[None] = None, **kwargs: Any + ) -> Runnable[Sequence[MessageLikeRepresentation], List[BaseMessage]]: ... + + @overload + def wrapped( + messages: Sequence[MessageLikeRepresentation], **kwargs: Any + ) -> List[BaseMessage]: ... + + def wrapped( + messages: Union[Sequence[MessageLikeRepresentation], None] = None, + **kwargs: Any, + ) -> Union[ + Runnable[Sequence[MessageLikeRepresentation], List[BaseMessage]], + List[BaseMessage], + ]: + from langchain_core.runnables.base import RunnableLambda + + if messages is not None: + return func(messages, **kwargs) + else: + return RunnableLambda(partial(func, **kwargs), name=func.__name__) + + wrapped.__doc__ = func.__doc__ + return wrapped + + return runnable_support(*args) if args else cast(Callable, runnable_support) @_runnable_support @@ -845,6 +909,571 @@ def trim_messages( ) +@_runnable_support(supports_single=True) +def format_content_as( + messages: Union[MessageLikeRepresentation, Iterable[MessageLikeRepresentation]], + *, + format: Literal["openai", "anthropic"], + text: Literal["string", "block"], +) -> Union[BaseMessage, List[BaseMessage]]: + """Convert message contents into a standard format. + + .. versionadded:: 0.2.36 + + Args: + messages: Message-like object or iterable of objects whose contents are already + in OpenAI, Anthropic, Bedrock Converse, or VertexAI formats. + format: Format to convert message contents to. + text: How to format text contents. If ``text='string'`` then any string + contents are left as strings. If a message has content blocks that are all + of type 'text', these are joined with a newline to make a single string. If + a message has content blocks and at least one isn't of type 'text', then + all blocks are left as dicts. If ``text='block'`` then all contents are + turned into a list of dicts. + + Returns: + A single BaseMessage is a single message-like object was passed in, else list + of BaseMessages. + + .. dropdown:: Basic usage + :open: + + .. code-block:: python + + from langchain_core.messages import format_content_as + + messages = [ + SystemMessage, + {}, + (), + AIMessage(), + ToolMessage(), + ] + oai_strings = format_content_as(messages, format="openai", text="string") + anthropic_blocks = format_content_as(messages, format="anthropic", text="block") + + .. dropdown:: Chain usage + :open: + + .. code-block:: python + + from langchain_core.messages import format_content_as + from langchain.chat_models import init_chat_model + + formatter = format_content_as(format="openai", text="block") + llm = init_chat_model() | formatter + + llm.invoke( + [{"role": "user", "content": "how are you"}], + config={"model": "gpt-4o"}, + ) + # -> AIMessage([{"type": "text", "text": ""}], ...) + + llm.invoke( + [{"role": "user", "content": "whats your name"}], + config={"model": "claude-3-5-sonnet-20240620"}) + # -> AIMessage([{"type": "text", "text": ""}], ...) + + .. note:: Doesn't support streaming + + This util does not support formatting streamed chunks on the fly (i.e. + "transforming" chunks). This means if you pipe the outputs of a model to this + formatter in a chain, the chain will not have token-level streaming when + using ``chain.stream()/.astream()``. You'll still see the + token stream when using ``chat.astream_events()`` but the message chunks will + not yet be formatted. + + .. code-block:: python + + from langchain_core.messages import format_content_as + from langchain.chat_models import init_chat_model + + formatter = format_content_as(format="openai", text="block") + llm = init_chat_model() | formatter + + # Will contain a single, completed chunk. + list(llm.stream( + [{"role": "user", "content": "how are you"}], + config={"model": "gpt-4o"}, + )) + + # Will include token-level events, but the streamed chunks will not yet be + # formatted. + async for chunk in llm.astream_events( + [{"role": "user", "content": "how are you"}], + config={"model": "gpt-4o"}, + version="v2", + ): + ... + + + """ # noqa: E501 + if is_single := isinstance(messages, (BaseMessage, dict)): + messages = [messages] + messages = convert_to_messages(messages, copy=True) + if format.lower() == "openai": + formatted = _format_contents_as_openai(messages, text=text) + elif format.lower() == "anthropic": + formatted = _format_contents_as_anthropic(messages, text=text) + else: + raise ValueError( + f"Unrecognized {format=}. Expected one of ('openai', 'anthropic')." + ) + if is_single: + return formatted[0] + else: + return formatted + + +def _format_contents_as_openai( + messages: Sequence[BaseMessage], *, text: Literal["string", "block"] +) -> List[BaseMessage]: + """Mutates messages so their contents match OpenAI messages API.""" + updated_messages: list = [] + for i, message in enumerate(messages): + tool_messages: list = [] + if not message.content: + message.content = "" if text == "string" else [] + elif isinstance(message.content, str): + if text == "string": + pass + else: + message.content = [{"type": "text", "text": message.content}] + else: + if text == "string" and all( + isinstance(block, str) or block.get("type") == "text" + for block in message.content + ): + message.content = "\n".join( + block if isinstance(block, str) else block["text"] + for block in message.content + ) + else: + content: List[dict] = [] + for j, block in enumerate(message.content): + # OpenAI format + if isinstance(block, str): + content.append({"type": "text", "text": block}) + elif block.get("type") == "text": + if missing := [k for k in ("text",) if k not in block]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'text' " + f"but is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + content.append({"type": block["type"], "text": block["text"]}) + elif block.get("type") == "image_url": + if missing := [k for k in ("image_url",) if k not in block]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'image_url' " + f"but is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + content.append( + {"type": "image_url", "image_url": block["image_url"]} + ) + # Anthropic and Bedrock converse format + elif (block.get("type") == "image") or "image" in block: + # Anthropic + if source := block.get("source"): + if missing := [ + k + for k in ("media_type", "type", "data") + if k not in source + ]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'image' " + f"but 'source' is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + content.append( + { + "type": "image_url", + "image_url": { + "url": ( + f"data:{source['media_type']};" + f"{source['type']},{source['data']}" + ) + }, + } + ) + # Bedrock converse + elif image := block.get("image"): + raise ValueError("1064") + if missing := [ + k for k in ("source", "format") if k not in image + ]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has key 'image', " + f"but 'image' is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + b64_image = _bytes_to_b64_str(image["source"]["bytes"]) + content.append( + { + "type": "image_url", + "image_url": { + "url": ( + f"data:image/{image['format']};" + f"base64,{b64_image}" + ) + }, + } + ) + else: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'image' " + f"but does not have a 'source' or 'image' key. Full " + f"content block:\n\n{block}" + ) + elif block.get("type") == "tool_use": + if missing := [ + k for k in ("id", "name", "input") if k not in block + ]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'tool_use', " + f"but is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + if not any( + tool_call["id"] == block["id"] + for tool_call in cast(AIMessage, message).tool_calls + ): + cast(AIMessage, message).tool_calls.append( + create_tool_call( + name=block["name"], + id=block["id"], + args=block["input"], + ) + ) + elif block.get("type") == "tool_result": + if missing := [ + k for k in ("content", "tool_use_id") if k not in block + ]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': " + f"'tool_result', but is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + tool_message = ToolMessage( + block["content"], + tool_call_id=block["tool_use_id"], + status="error" if block.get("is_error") else "success", + ) + # Recurse to make sure tool message contents are OpenAI format. + tool_messages.extend( + _format_contents_as_openai([tool_message], text=text) + ) + elif (block.get("type") == "json") or "json" in block: + if "json" not in block: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'json' " + f"but does not have a 'json' key. Full " + f"content block:\n\n{block}" + ) + content.append( + {"type": "text", "text": json.dumps(block["json"])} + ) + elif ( + block.get("type") == "guard_content" + ) or "guard_content" in block: + if ( + "guard_content" not in block + or "text" not in block["guard_content"] + ): + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': " + f"'guard_content' but does not have a " + f"messages[{i}].content[{j}]['guard_content']['text'] " + f"key. Full content block:\n\n{block}" + ) + text = block["guard_content"]["text"] + if isinstance(text, dict): + text = text["text"] + content.append({"type": "text", "text": text}) + # VertexAI format + elif block.get("type") == "media": + if missing := [ + k for k in ("mime_type", "data") if k not in block + ]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': " + f"'media' but does not have key(s) {missing}. Full " + f"content block:\n\n{block}" + ) + if "image" not in block["mime_type"]: + raise ValueError( + f"OpenAI messages can only support text and image data." + f" Received content block with media of type:" + f" {block['mime_type']}" + ) + b64_image = _bytes_to_b64_str(block["data"]) + content.append( + { + "type": "image_url", + "image_url": { + "url": ( + f"data:{block['mime_type']};base64,{b64_image}" + ) + }, + } + ) + else: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] does not match OpenAI, " + f"Anthropic, Bedrock Converse, or VertexAI format. Full " + f"content block:\n\n{block}" + ) + message.content = content # type: ignore[assignment] + updated_messages.extend([message, *tool_messages]) + return updated_messages + + +_OPTIONAL_ANTHROPIC_KEYS = ("cache_control", "is_error") + + +def _format_contents_as_anthropic( + messages: Sequence[BaseMessage], *, text: Literal["string", "block"] +) -> List[BaseMessage]: + """Mutates messages so their contents match Anthropic messages API.""" + updated_messages: List = [] + for i, message in enumerate(messages): + if isinstance(message, ToolMessage): + tool_result_block = { + "type": "tool_result", + "content": message.content, + "tool_use_id": message.tool_call_id, + "is_error": message.status == "error", + } + if updated_messages and isinstance(updated_messages[-1], HumanMessage): + if isinstance(updated_messages[-1].content, str): + updated_messages[-1].content = [ + {"type": "text", "text": updated_messages[-1].content} + ] + updated_messages[-1].content.append(tool_result_block) + else: + updated_messages.append(HumanMessage([tool_result_block])) + continue + elif not message.content: + message.content = "" if text == "string" else [] + elif isinstance(message.content, str): + if text == "string": + pass + else: + message.content = [{"type": "text", "text": message.content}] + else: + if text == "string" and all( + isinstance(block, str) + or (block.get("type") == "text" and "cache_control" not in block) + for block in message.content + ): + message.content = "\n".join( + block if isinstance(block, str) else block["text"] + for block in message.content + ) + else: + content = [] + for j, block in enumerate(message.content): + # OpenAI format + if isinstance(block, str): + content.append({"type": "text", "text": block}) + elif block.get("type") == "text": + block_extra = { + k: block[k] for k in _OPTIONAL_ANTHROPIC_KEYS if k in block + } + if missing := [k for k in ("text",) if k not in block]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'text' " + f"but is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + content.append( + {"type": "text", "text": block["text"], **block_extra} + ) + elif block.get("type") == "image_url": + if missing := [k for k in ("image_url",) if k not in block]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'image_url' " + f"but is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + content.append( + {**_openai_image_to_anthropic(block), **block_extra} + ) + # Anthropic and Bedrock converse format + elif (block.get("type") == "image") or "image" in block: + # Anthropic + if source := block.get("source"): + if missing := [ + k + for k in ("media_type", "type", "data") + if k not in source + ]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'image' " + f"but 'source' is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + content.append( + { + "type": "image", + "source": block["source"], + **block_extra, + } + ) + # Bedrock converse + elif image := block.get("image"): + if missing := [ + k for k in ("source", "format") if k not in image + ]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has key 'image', " + f"but 'image' is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + content.append( + { + **_bedrock_converse_image_to_anthropic( + block["image"] + ), + **block_extra, + } + ) + else: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'image' " + f"but does not have a 'source' or 'image' key. Full " + f"content block:\n\n{block}" + ) + elif block.get("type") == "tool_use": + if missing := [ + k for k in ("id", "name", "input") if k not in block + ]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'tool_use', " + f"but is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + content.append( + { + "type": "tool_use", + "name": block["name"], + "id": block["id"], + "input": block["input"], + **block_extra, + } + ) + if not any( + tool_call["id"] == block["id"] + for tool_call in cast(AIMessage, message).tool_calls + ): + cast(AIMessage, message).tool_calls.append( + create_tool_call( + name=block["name"], + id=block["id"], + args=block["input"], + ) + ) + elif block.get("type") == "tool_result": + if missing := [ + k for k in ("content", "tool_use_id") if k not in block + ]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': " + f"'tool_result', but is missing expected key(s) " + f"{missing}. Full content block:\n\n{block}" + ) + content.append( + { + "type": "tool_result", + "content": block["content"], + "tool_use_id": block["tool_use_id"], + **block_extra, + } + ) + elif (block.get("type") == "json") or "json" in block: + if "json" not in block: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': 'json' " + f"but does not have a 'json' key. Full " + f"content block:\n\n{block}" + ) + content.append( + { + "type": "text", + "text": json.dumps(block["json"]), + **block_extra, + } + ) + elif ( + block.get("type") == "guard_content" + ) or "guard_content" in block: + if ( + "guard_content" not in block + or "text" not in block["guard_content"] + ): + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': " + f"'guard_content' but does not have a " + f"messages[{i}].content[{j}]['guard_content']['text'] " + f"key. Full content block:\n\n{block}" + ) + text = block["guard_content"]["text"] + if isinstance(text, dict): + text = text["text"] + content.append({"type": "text", "text": text, **block_extra}) + # VertexAI format + elif block.get("type") == "media": + if missing := [ + k for k in ("mime_type", "data") if k not in block + ]: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] has 'type': " + f"'media' but does not have key(s) {missing}. Full " + f"content block:\n\n{block}" + ) + if "image" not in block["mime_type"]: + raise ValueError( + f"Anthropic messages can only support text and image " + f"data. Received content block with media of type: " + f"{block['mime_type']}" + ) + content.append( + {**_vertexai_image_to_anthropic(block), **block_extra} + ) + else: + raise ValueError( + f"Unrecognized content block at " + f"messages[{i}].content[{j}] does not match OpenAI, " + f"Anthropic, Bedrock Converse, or VertexAI format. Full " + f"content block:\n\n{block}" + ) + message.content = content # type: ignore[assignment] + updated_messages.append(message) + return merge_message_runs(updated_messages) + + def _first_max_tokens( messages: Sequence[BaseMessage], *, @@ -1012,3 +1641,59 @@ def _is_message_type( types_types = tuple(t for t in types if isinstance(t, type)) return message.type in types_str or isinstance(message, types_types) + + +def _bytes_to_b64_str(bytes_: bytes) -> str: + return base64.b64encode(bytes_).decode("utf-8") + + +def _openai_image_to_anthropic(image: dict) -> Dict: + """ + Formats an image of format data:image/jpeg;base64,{b64_string} + to a dict for anthropic api + + { + "type": "base64", + "media_type": "image/jpeg", + "data": "/9j/4AAQSkZJRg...", + } + + And throws an error if it's not a b64 image + """ + regex = r"^data:(?Pimage/.+);base64,(?P.+)$" + match = re.match(regex, image["image_url"]) + if match is None: + raise ValueError( + "Anthropic only supports base64-encoded images currently." + " Example: data:image/png;base64,'/9j/4AAQSk'..." + ) + return { + "type": "image", + "source": { + "type": "base64", + "media_type": match.group("media_type"), + "data": match.group("data"), + }, + } + + +def _bedrock_converse_image_to_anthropic(image: dict) -> dict: + return { + "type": "image", + "source": { + "media_type": f"image/{image['format']}", + "type": "base64", + "data": _bytes_to_b64_str(image["source"]["bytes"]), + }, + } + + +def _vertexai_image_to_anthropic(image: dict) -> dict: + return { + "type": "image", + "source": { + "media_type": image["mime_type"], + "type": "base64", + "data": _bytes_to_b64_str(image["data"]), + }, + } diff --git a/libs/core/tests/unit_tests/messages/test_utils.py b/libs/core/tests/unit_tests/messages/test_utils.py index 56b8c0df7be..72bde93cacf 100644 --- a/libs/core/tests/unit_tests/messages/test_utils.py +++ b/libs/core/tests/unit_tests/messages/test_utils.py @@ -13,8 +13,10 @@ from langchain_core.messages import ( ToolMessage, ) from langchain_core.messages.utils import ( + _bytes_to_b64_str, convert_to_messages, filter_messages, + format_content_as, merge_message_runs, trim_messages, ) @@ -556,3 +558,222 @@ def test_convert_to_messages() -> None: @pytest.mark.xfail(reason="AI message does not support refusal key yet.") def test_convert_to_messages_openai_refusal() -> None: convert_to_messages([{"role": "assistant", "refusal": "9.1"}]) + + +def create_base64_image(format: str = "jpeg") -> str: + return f"data:image/{format};base64,/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/2wBDAQkJCQwLDBgNDRgyIRwhMjIyMjIyMjIyMjIyMjIyMjIyMjIyMjIyMjIyMjIyMjIyMjIyMjIyMjIyMjIyMjIyMjL/wAARCAABAAEDASIAAhEBAxEB/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/8QAHwEAAwEBAQEBAQEBAQAAAAAAAAECAwQFBgcICQoL/8QAtREAAgECBAQDBAcFBAQAAQJ3AAECAxEEBSExBhJBUQdhcRMiMoEIFEKRobHBCSMzUvAVYnLRChYkNOEl8RcYGRomJygpKjU2Nzg5OkNERUZHSElKU1RVVldYWVpjZGVmZ2hpanN0dXZ3eHl6goOEhYaHiImKkpOUlZaXmJmaoqOkpaanqKmqsrO0tba3uLm6wsPExcbHyMnK0tPU1dbX2Nna4uPk5ebn6Onq8vP09fb3+Pn6/9oADAMBAAIRAxEAPwD3+iiigD//2Q==" # noqa: E501 + + +def test_format_content_as_single_message() -> None: + message = HumanMessage(content="Hello") + result = format_content_as(message, format="openai", text="string") + assert isinstance(result, BaseMessage) + assert result.content == "Hello" + + +def test_format_content_as_multiple_messages() -> None: + messages = [ + SystemMessage(content="System message"), + HumanMessage(content="Human message"), + AIMessage(content="AI message"), + ] + result = format_content_as(messages, format="openai", text="string") + assert isinstance(result, list) + assert len(result) == 3 + assert all(isinstance(msg, BaseMessage) for msg in result) + assert [msg.content for msg in result] == [ + "System message", + "Human message", + "AI message", + ] + + +def test_format_content_as_openai_string() -> None: + messages = [ + HumanMessage( + content=[ + {"type": "text", "text": "Hello"}, + {"type": "text", "text": "World"}, + ] + ), + AIMessage( + content=[{"type": "text", "text": "Hi"}, {"type": "text", "text": "there"}] + ), + ] + result = format_content_as(messages, format="openai", text="string") + assert [msg.content for msg in result] == ["Hello\nWorld", "Hi\nthere"] + + +def test_format_content_as_openai_block() -> None: + messages = [ + HumanMessage(content="Hello"), + AIMessage(content="Hi there"), + ] + result = format_content_as(messages, format="openai", text="block") + assert [msg.content for msg in result] == [ + [{"type": "text", "text": "Hello"}], + [{"type": "text", "text": "Hi there"}], + ] + + +def test_format_content_as_anthropic_string() -> None: + messages = [ + HumanMessage( + content=[ + {"type": "text", "text": "Hello"}, + {"type": "text", "text": "World"}, + ] + ), + AIMessage( + content=[{"type": "text", "text": "Hi"}, {"type": "text", "text": "there"}] + ), + ] + result = format_content_as(messages, format="anthropic", text="string") + assert [msg.content for msg in result] == ["Hello\nWorld", "Hi\nthere"] + + +def test_format_content_as_anthropic_block() -> None: + messages = [ + HumanMessage(content="Hello"), + AIMessage(content="Hi there"), + ] + result = format_content_as(messages, format="anthropic", text="block") + assert [msg.content for msg in result] == [ + [{"type": "text", "text": "Hello"}], + [{"type": "text", "text": "Hi there"}], + ] + + +def test_format_content_as_invalid_format() -> None: + with pytest.raises(ValueError, match="Unrecognized format="): + format_content_as( + [HumanMessage(content="Hello")], format="invalid", text="string" + ) + + +def test_format_content_as_openai_image() -> None: + base64_image = create_base64_image() + messages = [ + HumanMessage( + content=[ + {"type": "text", "text": "Here's an image:"}, + {"type": "image_url", "image_url": {"url": base64_image}}, + ] + ) + ] + result = format_content_as(messages, format="openai", text="block") + assert result[0].content[1]["type"] == "image_url" + assert result[0].content[1]["image_url"]["url"] == base64_image + + +def test_format_content_as_anthropic_image() -> None: + base64_image = create_base64_image() + messages = [ + HumanMessage( + content=[ + {"type": "text", "text": "Here's an image:"}, + {"type": "image_url", "image_url": base64_image}, + ] + ) + ] + result = format_content_as(messages, format="anthropic", text="block") + assert result[0].content[1]["type"] == "image" + assert result[0].content[1]["source"]["type"] == "base64" + assert result[0].content[1]["source"]["media_type"] == "image/jpeg" + + +def test_format_content_as_tool_message() -> None: + tool_message = ToolMessage(content="Tool result", tool_call_id="123") + result = format_content_as([tool_message], format="openai", text="block") + assert isinstance(result[0], ToolMessage) + assert result[0].content == [{"type": "text", "text": "Tool result"}] + assert result[0].tool_call_id == "123" + + +def test_format_content_as_tool_use() -> None: + messages = [ + AIMessage( + content=[ + {"type": "tool_use", "id": "123", "name": "calculator", "input": "2+2"} + ] + ) + ] + result = format_content_as(messages, format="openai", text="block") + assert result[0].tool_calls[0]["id"] == "123" + assert result[0].tool_calls[0]["name"] == "calculator" + assert result[0].tool_calls[0]["args"] == "2+2" + + +def test_format_content_as_json() -> None: + json_data = {"key": "value"} + messages = [HumanMessage(content=[{"type": "json", "json": json_data}])] + result = format_content_as(messages, format="openai", text="block") + assert result[0].content[0]["type"] == "text" + assert json.loads(result[0].content[0]["text"]) == json_data + + +def test_format_content_as_guard_content() -> None: + messages = [ + HumanMessage( + content=[ + { + "type": "guard_content", + "guard_content": {"text": "Protected content"}, + } + ] + ) + ] + result = format_content_as(messages, format="openai", text="block") + assert result[0].content[0]["type"] == "text" + assert result[0].content[0]["text"] == "Protected content" + + +def test_format_content_as_vertexai_image() -> None: + messages = [ + HumanMessage( + content=[ + {"type": "media", "mime_type": "image/jpeg", "data": b"image_bytes"} + ] + ) + ] + result = format_content_as(messages, format="openai", text="block") + assert result[0].content[0]["type"] == "image_url" + assert ( + result[0].content[0]["image_url"]["url"] + == f"data:image/jpeg;base64,{_bytes_to_b64_str(b'image_bytes')}" + ) + + +def test_format_content_as_invalid_block() -> None: + messages = [HumanMessage(content=[{"type": "invalid", "foo": "bar"}])] + with pytest.raises(ValueError, match="Unrecognized content block"): + format_content_as(messages, format="openai", text="block") + with pytest.raises(ValueError, match="Unrecognized content block"): + format_content_as(messages, format="anthropic", text="block") + + +def test_format_content_as_empty_message() -> None: + result = format_content_as(HumanMessage(content=""), format="openai", text="string") + assert result.content == "" + + +def test_format_content_as_empty_list() -> None: + result = format_content_as([], format="openai", text="string") + assert result == [] + + +def test_format_content_as_mixed_content_types() -> None: + messages = [ + HumanMessage( + content=[ + "Text message", + {"type": "text", "text": "Structured text"}, + {"type": "image_url", "image_url": create_base64_image()}, + ] + ) + ] + result = format_content_as(messages, format="openai", text="block") + assert len(result[0].content) == 3 + assert isinstance(result[0].content[0], dict) + assert isinstance(result[0].content[1], dict) + assert isinstance(result[0].content[2], dict)