mirror of
https://github.com/hwchase17/langchain.git
synced 2025-08-01 09:04:03 +00:00
test type narrowing option
This commit is contained in:
parent
bc5e8e0c17
commit
81a4a051ab
@ -8,6 +8,7 @@ from typing import Any, Literal, Optional, Union, cast
|
||||
from pydantic import model_validator
|
||||
from typing_extensions import NotRequired, Self, TypedDict, override
|
||||
|
||||
from langchain_core.messages import ContentBlock
|
||||
from langchain_core.messages.base import (
|
||||
BaseMessage,
|
||||
BaseMessageChunk,
|
||||
@ -178,7 +179,7 @@ class AIMessage(BaseMessage):
|
||||
"""The type of the message (used for deserialization). Defaults to "ai"."""
|
||||
|
||||
def __init__(
|
||||
self, content: Union[str, list[Union[str, dict]]], **kwargs: Any
|
||||
self, content: Union[str, list[Union[str, ContentBlock, dict]]], **kwargs: Any
|
||||
) -> None:
|
||||
"""Pass in content as positional arg.
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
from typing import Any, Literal, Union
|
||||
|
||||
from langchain_core.messages import ContentBlock
|
||||
from langchain_core.messages.base import BaseMessage, BaseMessageChunk
|
||||
|
||||
|
||||
@ -41,7 +42,7 @@ class HumanMessage(BaseMessage):
|
||||
"""The type of the message (used for serialization). Defaults to "human"."""
|
||||
|
||||
def __init__(
|
||||
self, content: Union[str, list[Union[str, dict]]], **kwargs: Any
|
||||
self, content: Union[str, list[Union[str, ContentBlock, dict]]], **kwargs: Any
|
||||
) -> None:
|
||||
"""Pass in content as positional arg.
|
||||
|
||||
|
@ -31,7 +31,10 @@ from typing import (
|
||||
from pydantic import Discriminator, Field, Tag
|
||||
|
||||
from langchain_core.exceptions import ErrorCode, create_message
|
||||
from langchain_core.messages import convert_to_openai_data_block, is_data_content_block
|
||||
from langchain_core.messages import (
|
||||
convert_to_openai_data_block,
|
||||
is_data_content_block,
|
||||
)
|
||||
from langchain_core.messages.ai import AIMessage, AIMessageChunk
|
||||
from langchain_core.messages.base import BaseMessage, BaseMessageChunk
|
||||
from langchain_core.messages.chat import ChatMessage, ChatMessageChunk
|
||||
@ -1011,8 +1014,6 @@ def convert_to_openai_messages(
|
||||
|
||||
for i, message in enumerate(messages):
|
||||
oai_msg: dict = {"role": _get_message_openai_role(message)}
|
||||
tool_messages: list = []
|
||||
content: Union[str, list[dict]]
|
||||
|
||||
if message.name:
|
||||
oai_msg["name"] = message.name
|
||||
@ -1023,257 +1024,7 @@ def convert_to_openai_messages(
|
||||
if isinstance(message, ToolMessage):
|
||||
oai_msg["tool_call_id"] = message.tool_call_id
|
||||
|
||||
if not message.content:
|
||||
content = "" if text_format == "string" else []
|
||||
elif isinstance(message.content, str):
|
||||
if text_format == "string":
|
||||
content = message.content
|
||||
else:
|
||||
content = [{"type": "text", "text": message.content}]
|
||||
elif text_format == "string" and all(
|
||||
isinstance(block, str) or block.get("type") == "text"
|
||||
for block in message.content
|
||||
):
|
||||
content = "\n".join(
|
||||
block if isinstance(block, str) else block["text"]
|
||||
for block in message.content
|
||||
)
|
||||
else:
|
||||
content = []
|
||||
for j, block in enumerate(message.content):
|
||||
# OpenAI format
|
||||
if isinstance(block, str):
|
||||
content.append({"type": "text", "text": block})
|
||||
elif block.get("type") == "text":
|
||||
if missing := [k for k in ("text",) if k not in block]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] has 'type': 'text' "
|
||||
f"but is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
content.append({"type": block["type"], "text": block["text"]})
|
||||
elif block.get("type") == "image_url":
|
||||
if missing := [k for k in ("image_url",) if k not in block]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] has 'type': 'image_url' "
|
||||
f"but is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
content.append(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": block["image_url"],
|
||||
}
|
||||
)
|
||||
# Standard multi-modal content block
|
||||
elif is_data_content_block(block):
|
||||
formatted_block = convert_to_openai_data_block(block)
|
||||
if (
|
||||
formatted_block.get("type") == "file"
|
||||
and "file" in formatted_block
|
||||
and "filename" not in formatted_block["file"]
|
||||
):
|
||||
logger.info("Generating a fallback filename.")
|
||||
formatted_block["file"]["filename"] = "LC_AUTOGENERATED"
|
||||
content.append(formatted_block)
|
||||
# Anthropic and Bedrock converse format
|
||||
elif (block.get("type") == "image") or "image" in block:
|
||||
# Anthropic
|
||||
if source := block.get("source"):
|
||||
if missing := [
|
||||
k for k in ("media_type", "type", "data") if k not in source
|
||||
]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] has 'type': 'image' "
|
||||
f"but 'source' is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
content.append(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": (
|
||||
f"data:{source['media_type']};"
|
||||
f"{source['type']},{source['data']}"
|
||||
)
|
||||
},
|
||||
}
|
||||
)
|
||||
# Bedrock converse
|
||||
elif image := block.get("image"):
|
||||
if missing := [
|
||||
k for k in ("source", "format") if k not in image
|
||||
]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] has key 'image', "
|
||||
f"but 'image' is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
b64_image = _bytes_to_b64_str(image["source"]["bytes"])
|
||||
content.append(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": (
|
||||
f"data:image/{image['format']};base64,{b64_image}"
|
||||
)
|
||||
},
|
||||
}
|
||||
)
|
||||
else:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] has 'type': 'image' "
|
||||
f"but does not have a 'source' or 'image' key. Full "
|
||||
f"content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
# OpenAI file format
|
||||
elif (
|
||||
block.get("type") == "file"
|
||||
and isinstance(block.get("file"), dict)
|
||||
and isinstance(block.get("file", {}).get("file_data"), str)
|
||||
):
|
||||
if block.get("file", {}).get("filename") is None:
|
||||
logger.info("Generating a fallback filename.")
|
||||
block["file"]["filename"] = "LC_AUTOGENERATED"
|
||||
content.append(block)
|
||||
# OpenAI audio format
|
||||
elif (
|
||||
block.get("type") == "input_audio"
|
||||
and isinstance(block.get("input_audio"), dict)
|
||||
and isinstance(block.get("input_audio", {}).get("data"), str)
|
||||
and isinstance(block.get("input_audio", {}).get("format"), str)
|
||||
):
|
||||
content.append(block)
|
||||
elif block.get("type") == "tool_use":
|
||||
if missing := [
|
||||
k for k in ("id", "name", "input") if k not in block
|
||||
]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] has 'type': "
|
||||
f"'tool_use', but is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
if not any(
|
||||
tool_call["id"] == block["id"]
|
||||
for tool_call in cast("AIMessage", message).tool_calls
|
||||
):
|
||||
oai_msg["tool_calls"] = oai_msg.get("tool_calls", [])
|
||||
oai_msg["tool_calls"].append(
|
||||
{
|
||||
"type": "function",
|
||||
"id": block["id"],
|
||||
"function": {
|
||||
"name": block["name"],
|
||||
"arguments": json.dumps(block["input"]),
|
||||
},
|
||||
}
|
||||
)
|
||||
elif block.get("type") == "tool_result":
|
||||
if missing := [
|
||||
k for k in ("content", "tool_use_id") if k not in block
|
||||
]:
|
||||
msg = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] has 'type': "
|
||||
f"'tool_result', but is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(msg)
|
||||
tool_message = ToolMessage(
|
||||
block["content"],
|
||||
tool_call_id=block["tool_use_id"],
|
||||
status="error" if block.get("is_error") else "success",
|
||||
)
|
||||
# Recurse to make sure tool message contents are OpenAI format.
|
||||
tool_messages.extend(
|
||||
convert_to_openai_messages(
|
||||
[tool_message], text_format=text_format
|
||||
)
|
||||
)
|
||||
elif (block.get("type") == "json") or "json" in block:
|
||||
if "json" not in block:
|
||||
msg = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] has 'type': 'json' "
|
||||
f"but does not have a 'json' key. Full "
|
||||
f"content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(msg)
|
||||
content.append(
|
||||
{
|
||||
"type": "text",
|
||||
"text": json.dumps(block["json"]),
|
||||
}
|
||||
)
|
||||
elif (block.get("type") == "guard_content") or "guard_content" in block:
|
||||
if (
|
||||
"guard_content" not in block
|
||||
or "text" not in block["guard_content"]
|
||||
):
|
||||
msg = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] has 'type': "
|
||||
f"'guard_content' but does not have a "
|
||||
f"messages[{i}].content[{j}]['guard_content']['text'] "
|
||||
f"key. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(msg)
|
||||
text = block["guard_content"]["text"]
|
||||
if isinstance(text, dict):
|
||||
text = text["text"]
|
||||
content.append({"type": "text", "text": text})
|
||||
# VertexAI format
|
||||
elif block.get("type") == "media":
|
||||
if missing := [k for k in ("mime_type", "data") if k not in block]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] has 'type': "
|
||||
f"'media' but does not have key(s) {missing}. Full "
|
||||
f"content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
if "image" not in block["mime_type"]:
|
||||
err = (
|
||||
f"OpenAI messages can only support text and image data."
|
||||
f" Received content block with media of type:"
|
||||
f" {block['mime_type']}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
b64_image = _bytes_to_b64_str(block["data"])
|
||||
content.append(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": (f"data:{block['mime_type']};base64,{b64_image}")
|
||||
},
|
||||
}
|
||||
)
|
||||
elif block.get("type") == "thinking":
|
||||
content.append(block)
|
||||
else:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{i}].content[{j}] does not match OpenAI, "
|
||||
f"Anthropic, Bedrock Converse, or VertexAI format. Full "
|
||||
f"content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
if text_format == "string" and not any(
|
||||
block["type"] != "text" for block in content
|
||||
):
|
||||
content = "\n".join(block["text"] for block in content)
|
||||
content, tool_messages = _extract_content(i, message, oai_msg, text_format)
|
||||
oai_msg["content"] = content
|
||||
if message.content and not oai_msg["content"] and tool_messages:
|
||||
oai_messages.extend(tool_messages)
|
||||
@ -1285,6 +1036,263 @@ def convert_to_openai_messages(
|
||||
return oai_messages
|
||||
|
||||
|
||||
def _extract_content(
|
||||
idx: int,
|
||||
message: BaseMessage,
|
||||
oai_msg: dict,
|
||||
text_format: Literal["string", "block"],
|
||||
) -> tuple[Union[str, list[dict]], list]:
|
||||
"""Extract content from a message and format it according to OpenAI standards."""
|
||||
content: Union[str, list[dict]]
|
||||
tool_messages: list = []
|
||||
if not message.content:
|
||||
content = "" if text_format == "string" else []
|
||||
return content, tool_messages
|
||||
if isinstance(message.content, str):
|
||||
if text_format == "string":
|
||||
content = message.content
|
||||
else:
|
||||
content = [{"type": "text", "text": message.content}]
|
||||
return content, tool_messages
|
||||
if text_format == "string" and all(
|
||||
isinstance(block, str) or block.get("type") == "text"
|
||||
for block in message.content
|
||||
):
|
||||
content = "\n".join(
|
||||
block if isinstance(block, str) else block["text"]
|
||||
for block in message.content
|
||||
)
|
||||
return content, tool_messages
|
||||
|
||||
content = []
|
||||
for block_idx, block in enumerate(message.content):
|
||||
# OpenAI format
|
||||
if isinstance(block, str):
|
||||
content.append({"type": "text", "text": block})
|
||||
continue
|
||||
|
||||
block = cast("dict", block)
|
||||
|
||||
if block.get("type") == "text":
|
||||
if missing := [k for k in ("text",) if k not in block]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] has 'type': 'text' "
|
||||
f"but is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
content.append({"type": block["type"], "text": block["text"]})
|
||||
elif block.get("type") == "image_url":
|
||||
if missing := [k for k in ("image_url",) if k not in block]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] has 'type': 'image_url' "
|
||||
f"but is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
content.append(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": block["image_url"],
|
||||
}
|
||||
)
|
||||
# Standard multi-modal content block
|
||||
elif is_data_content_block(block):
|
||||
formatted_block = convert_to_openai_data_block(block)
|
||||
if (
|
||||
formatted_block.get("type") == "file"
|
||||
and "file" in formatted_block
|
||||
and "filename" not in formatted_block["file"]
|
||||
):
|
||||
logger.info("Generating a fallback filename.")
|
||||
formatted_block["file"]["filename"] = "LC_AUTOGENERATED"
|
||||
content.append(formatted_block)
|
||||
# Anthropic and Bedrock converse format
|
||||
elif (block.get("type") == "image") or "image" in block:
|
||||
# Anthropic
|
||||
if source := block.get("source"):
|
||||
if missing := [
|
||||
k for k in ("media_type", "type", "data") if k not in source
|
||||
]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] has 'type': 'image' "
|
||||
f"but 'source' is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
content.append(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": (
|
||||
f"data:{source['media_type']};"
|
||||
f"{source['type']},{source['data']}"
|
||||
)
|
||||
},
|
||||
}
|
||||
)
|
||||
# Bedrock converse
|
||||
elif image := block.get("image"):
|
||||
if missing := [k for k in ("source", "format") if k not in image]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] has key 'image', "
|
||||
f"but 'image' is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
b64_image = _bytes_to_b64_str(image["source"]["bytes"])
|
||||
content.append(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": (f"data:image/{image['format']};base64,{b64_image}")
|
||||
},
|
||||
}
|
||||
)
|
||||
else:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] has 'type': 'image' "
|
||||
f"but does not have a 'source' or 'image' key. Full "
|
||||
f"content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
# OpenAI file format
|
||||
elif (
|
||||
block.get("type") == "file"
|
||||
and isinstance(block.get("file"), dict)
|
||||
and isinstance(block.get("file", {}).get("file_data"), str)
|
||||
):
|
||||
if block.get("file", {}).get("filename") is None:
|
||||
logger.info("Generating a fallback filename.")
|
||||
block["file"]["filename"] = "LC_AUTOGENERATED"
|
||||
content.append(block)
|
||||
# OpenAI audio format
|
||||
elif (
|
||||
block.get("type") == "input_audio"
|
||||
and isinstance(block.get("input_audio"), dict)
|
||||
and isinstance(block.get("input_audio", {}).get("data"), str)
|
||||
and isinstance(block.get("input_audio", {}).get("format"), str)
|
||||
):
|
||||
content.append(block)
|
||||
elif block.get("type") == "tool_use":
|
||||
if missing := [k for k in ("id", "name", "input") if k not in block]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] has 'type': "
|
||||
f"'tool_use', but is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
if not any(
|
||||
tool_call["id"] == block["id"]
|
||||
for tool_call in cast("AIMessage", message).tool_calls
|
||||
):
|
||||
oai_msg["tool_calls"] = oai_msg.get("tool_calls", [])
|
||||
oai_msg["tool_calls"].append(
|
||||
{
|
||||
"type": "function",
|
||||
"id": block["id"],
|
||||
"function": {
|
||||
"name": block["name"],
|
||||
"arguments": json.dumps(block["input"]),
|
||||
},
|
||||
}
|
||||
)
|
||||
elif block.get("type") == "tool_result":
|
||||
if missing := [k for k in ("content", "tool_use_id") if k not in block]:
|
||||
msg = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] has 'type': "
|
||||
f"'tool_result', but is missing expected key(s) "
|
||||
f"{missing}. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(msg)
|
||||
tool_message = ToolMessage(
|
||||
block["content"],
|
||||
tool_call_id=block["tool_use_id"],
|
||||
status="error" if block.get("is_error") else "success",
|
||||
)
|
||||
# Recurse to make sure tool message contents are OpenAI format.
|
||||
tool_messages.extend(
|
||||
convert_to_openai_messages([tool_message], text_format=text_format)
|
||||
)
|
||||
elif (block.get("type") == "json") or "json" in block:
|
||||
if "json" not in block:
|
||||
msg = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] has 'type': 'json' "
|
||||
f"but does not have a 'json' key. Full "
|
||||
f"content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(msg)
|
||||
content.append(
|
||||
{
|
||||
"type": "text",
|
||||
"text": json.dumps(block["json"]),
|
||||
}
|
||||
)
|
||||
elif (block.get("type") == "guard_content") or "guard_content" in block:
|
||||
if "guard_content" not in block or "text" not in block["guard_content"]:
|
||||
msg = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] has 'type': "
|
||||
f"'guard_content' but does not have a "
|
||||
f"messages[{idx}].content[{block_idx}]['guard_content']['text'] "
|
||||
f"key. Full content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(msg)
|
||||
text = block["guard_content"]["text"]
|
||||
if isinstance(text, dict):
|
||||
text = text["text"]
|
||||
content.append({"type": "text", "text": text})
|
||||
# VertexAI format
|
||||
elif block.get("type") == "media":
|
||||
if missing := [k for k in ("mime_type", "data") if k not in block]:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] has 'type': "
|
||||
f"'media' but does not have key(s) {missing}. Full "
|
||||
f"content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
if "image" not in block["mime_type"]:
|
||||
err = (
|
||||
f"OpenAI messages can only support text and image data."
|
||||
f" Received content block with media of type:"
|
||||
f" {block['mime_type']}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
b64_image = _bytes_to_b64_str(block["data"])
|
||||
content.append(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": (f"data:{block['mime_type']};base64,{b64_image}")
|
||||
},
|
||||
}
|
||||
)
|
||||
elif block.get("type") == "thinking":
|
||||
content.append(block)
|
||||
else:
|
||||
err = (
|
||||
f"Unrecognized content block at "
|
||||
f"messages[{idx}].content[{block_idx}] does not match OpenAI, "
|
||||
f"Anthropic, Bedrock Converse, or VertexAI format. Full "
|
||||
f"content block:\n\n{block}"
|
||||
)
|
||||
raise ValueError(err)
|
||||
if text_format == "string" and not any(
|
||||
block["type"] != "text" for block in content
|
||||
):
|
||||
content = "\n".join(block["text"] for block in content)
|
||||
return content, tool_messages
|
||||
|
||||
|
||||
def _first_max_tokens(
|
||||
messages: Sequence[BaseMessage],
|
||||
*,
|
||||
|
@ -1,18 +1,20 @@
|
||||
import base64
|
||||
import json
|
||||
import re
|
||||
from collections.abc import Sequence
|
||||
from typing import Any, Callable, Optional, Union
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import Any, Callable, Optional, Union, cast
|
||||
|
||||
import pytest
|
||||
from typing_extensions import override
|
||||
from typing_extensions import TypeGuard, override
|
||||
|
||||
from langchain_core.language_models.fake_chat_models import FakeChatModel
|
||||
from langchain_core.messages import (
|
||||
AIMessage,
|
||||
BaseMessage,
|
||||
HumanMessage,
|
||||
ReasoningContentBlock,
|
||||
SystemMessage,
|
||||
TextContentBlock,
|
||||
ToolCall,
|
||||
ToolMessage,
|
||||
)
|
||||
@ -1457,3 +1459,32 @@ def test_get_buffer_string_with_empty_content() -> None:
|
||||
expected = "Human: \nAI: \nSystem: "
|
||||
actual = get_buffer_string(messages)
|
||||
assert actual == expected
|
||||
|
||||
|
||||
def is_reasoning_block(block: Mapping[str, Any]) -> TypeGuard[ReasoningContentBlock]:
|
||||
"""Check if a block is a ReasoningContentBlock."""
|
||||
return block.get("type") == "reasoning"
|
||||
|
||||
|
||||
def is_text_block(block: Mapping[str, Any]) -> TypeGuard[TextContentBlock]:
|
||||
"""Check if a block is a TextContentBlock."""
|
||||
return block.get("type") == "text"
|
||||
|
||||
|
||||
def test_typing() -> None:
|
||||
"""Test typing on things"""
|
||||
message = AIMessage(
|
||||
content="Hello",
|
||||
)
|
||||
if isinstance(message.content, str):
|
||||
# This should not raise an error
|
||||
message.content = message.content + " world"
|
||||
elif isinstance(message.content, list):
|
||||
all_contents = []
|
||||
for block in message.content:
|
||||
if isinstance(block, dict):
|
||||
block = cast("dict", block)
|
||||
if is_text_block(block):
|
||||
all_contents.append(block["text"])
|
||||
if is_reasoning_block(block):
|
||||
all_contents.append(block.get("reasoning", "foo"))
|
||||
|
Loading…
Reference in New Issue
Block a user