langchain/libs/partners/openai/langchain_openai/chat_models/_compat.py
ccurme e02eed5489
feat: standard outputs (#32287)
Co-authored-by: Mason Daugherty <mason@langchain.dev>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
Co-authored-by: Mason Daugherty <github@mdrxy.com>
Co-authored-by: Nuno Campos <nuno@langchain.dev>
2025-08-05 15:17:32 -04:00

708 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
This module converts between AIMessage output formats, which are governed by the
``output_version`` attribute on ChatOpenAI. Supported values are ``"v0"`` and
``"responses/v1"``.
``"v0"`` corresponds to the format as of ChatOpenAI v0.3. For the Responses API, it
stores reasoning and tool outputs in AIMessage.additional_kwargs:
.. code-block:: python
AIMessage(
content=[
{"type": "text", "text": "Hello, world!", "annotations": [{"type": "foo"}]}
],
additional_kwargs={
"reasoning": {
"type": "reasoning",
"id": "rs_123",
"summary": [{"type": "summary_text", "text": "Reasoning summary"}],
},
"tool_outputs": [
{
"type": "web_search_call",
"id": "websearch_123",
"status": "completed",
}
],
"refusal": "I cannot assist with that.",
},
response_metadata={"id": "resp_123"},
id="msg_123",
)
``"responses/v1"`` is only applicable to the Responses API. It retains information
about response item sequencing and accommodates multiple reasoning items by
representing these items in the content sequence:
.. code-block:: python
AIMessage(
content=[
{
"type": "reasoning",
"summary": [{"type": "summary_text", "text": "Reasoning summary"}],
"id": "rs_123",
},
{
"type": "text",
"text": "Hello, world!",
"annotations": [{"type": "foo"}],
"id": "msg_123",
},
{"type": "refusal", "refusal": "I cannot assist with that."},
{"type": "web_search_call", "id": "websearch_123", "status": "completed"},
],
response_metadata={"id": "resp_123"},
id="resp_123",
)
There are other, small improvements as well-- e.g., we store message IDs on text
content blocks, rather than on the AIMessage.id, which now stores the response ID.
For backwards compatibility, this module provides functions to convert between the
formats. The functions are used internally by ChatOpenAI.
""" # noqa: E501
import copy
import json
from collections.abc import Iterable, Iterator
from typing import Any, Literal, Optional, Union, cast
from langchain_core.messages import AIMessage, is_data_content_block
from langchain_core.messages import content_blocks as types
from langchain_core.v1.messages import AIMessage as AIMessageV1
_FUNCTION_CALL_IDS_MAP_KEY = "__openai_function_call_ids__"
# v0.3 / Responses
def _convert_to_v03_ai_message(
message: AIMessage, has_reasoning: bool = False
) -> AIMessage:
"""Mutate an AIMessage to the old-style v0.3 format."""
if isinstance(message.content, list):
new_content: list[Union[dict, str]] = []
for block in message.content:
if isinstance(block, dict):
if block.get("type") == "reasoning":
# Store a reasoning item in additional_kwargs (overwriting as in
# v0.3)
_ = block.pop("index", None)
if has_reasoning:
_ = block.pop("id", None)
_ = block.pop("type", None)
message.additional_kwargs["reasoning"] = block
elif block.get("type") in (
"web_search_call",
"file_search_call",
"computer_call",
"code_interpreter_call",
"mcp_call",
"mcp_list_tools",
"mcp_approval_request",
"image_generation_call",
):
# Store built-in tool calls in additional_kwargs
if "tool_outputs" not in message.additional_kwargs:
message.additional_kwargs["tool_outputs"] = []
message.additional_kwargs["tool_outputs"].append(block)
elif block.get("type") == "function_call":
# Store function call item IDs in additional_kwargs, otherwise
# discard function call items.
if _FUNCTION_CALL_IDS_MAP_KEY not in message.additional_kwargs:
message.additional_kwargs[_FUNCTION_CALL_IDS_MAP_KEY] = {}
if (call_id := block.get("call_id")) and (
function_call_id := block.get("id")
):
message.additional_kwargs[_FUNCTION_CALL_IDS_MAP_KEY][
call_id
] = function_call_id
elif (block.get("type") == "refusal") and (
refusal := block.get("refusal")
):
# Store a refusal item in additional_kwargs (overwriting as in
# v0.3)
message.additional_kwargs["refusal"] = refusal
elif block.get("type") == "text":
# Store a message item ID on AIMessage.id
if "id" in block:
message.id = block["id"]
new_content.append({k: v for k, v in block.items() if k != "id"})
elif (
set(block.keys()) == {"id", "index"}
and isinstance(block["id"], str)
and block["id"].startswith("msg_")
):
# Drop message IDs in streaming case
new_content.append({"index": block["index"]})
else:
new_content.append(block)
else:
new_content.append(block)
message.content = new_content
if isinstance(message.id, str) and message.id.startswith("resp_"):
message.id = None
else:
pass
return message
def _convert_from_v03_ai_message(message: AIMessage) -> AIMessage:
"""Convert an old-style v0.3 AIMessage into the new content-block format."""
# Only update ChatOpenAI v0.3 AIMessages
# TODO: structure provenance into AIMessage
is_chatopenai_v03 = (
isinstance(message.content, list)
and all(isinstance(b, dict) for b in message.content)
) and (
any(
item in message.additional_kwargs
for item in [
"reasoning",
"tool_outputs",
"refusal",
_FUNCTION_CALL_IDS_MAP_KEY,
]
)
or (
isinstance(message.id, str)
and message.id.startswith("msg_")
and (response_id := message.response_metadata.get("id"))
and isinstance(response_id, str)
and response_id.startswith("resp_")
)
)
if not is_chatopenai_v03:
return message
content_order = [
"reasoning",
"code_interpreter_call",
"mcp_call",
"image_generation_call",
"text",
"refusal",
"function_call",
"computer_call",
"mcp_list_tools",
"mcp_approval_request",
# N. B. "web_search_call" and "file_search_call" were not passed back in
# in v0.3
]
# Build a bucket for every known block type
buckets: dict[str, list] = {key: [] for key in content_order}
unknown_blocks = []
# Reasoning
if reasoning := message.additional_kwargs.get("reasoning"):
buckets["reasoning"].append(reasoning)
# Refusal
if refusal := message.additional_kwargs.get("refusal"):
buckets["refusal"].append({"type": "refusal", "refusal": refusal})
# Text
for block in message.content:
if isinstance(block, dict) and block.get("type") == "text":
block_copy = block.copy()
if isinstance(message.id, str) and message.id.startswith("msg_"):
block_copy["id"] = message.id
buckets["text"].append(block_copy)
else:
unknown_blocks.append(block)
# Function calls
function_call_ids = message.additional_kwargs.get(_FUNCTION_CALL_IDS_MAP_KEY)
for tool_call in message.tool_calls:
function_call = {
"type": "function_call",
"name": tool_call["name"],
"arguments": json.dumps(tool_call["args"], ensure_ascii=False),
"call_id": tool_call["id"],
}
if function_call_ids is not None and (
_id := function_call_ids.get(tool_call["id"])
):
function_call["id"] = _id
buckets["function_call"].append(function_call)
# Tool outputs
tool_outputs = message.additional_kwargs.get("tool_outputs", [])
for block in tool_outputs:
if isinstance(block, dict) and (key := block.get("type")) and key in buckets:
buckets[key].append(block)
else:
unknown_blocks.append(block)
# Re-assemble the content list in the canonical order
new_content = []
for key in content_order:
new_content.extend(buckets[key])
new_content.extend(unknown_blocks)
new_additional_kwargs = dict(message.additional_kwargs)
new_additional_kwargs.pop("reasoning", None)
new_additional_kwargs.pop("refusal", None)
new_additional_kwargs.pop("tool_outputs", None)
if "id" in message.response_metadata:
new_id = message.response_metadata["id"]
else:
new_id = message.id
return message.model_copy(
update={
"content": new_content,
"additional_kwargs": new_additional_kwargs,
"id": new_id,
},
deep=False,
)
# v1 / Chat Completions
def _convert_from_v1_to_chat_completions(message: AIMessageV1) -> AIMessageV1:
"""Convert a v1 message to the Chat Completions format."""
new_content: list[types.ContentBlock] = []
for block in message.content:
if block["type"] == "text":
# Strip annotations
new_content.append({"type": "text", "text": block["text"]})
elif block["type"] in ("reasoning", "tool_call"):
pass
else:
new_content.append(block)
new_message = copy.copy(message)
new_message.content = new_content
return new_message
# v1 / Responses
def _convert_annotation_to_v1(annotation: dict[str, Any]) -> types.Annotation:
annotation_type = annotation.get("type")
if annotation_type == "url_citation":
known_fields = {
"type",
"url",
"title",
"cited_text",
"start_index",
"end_index",
}
url_citation = cast(types.Citation, {})
for field in ("end_index", "start_index", "title"):
if field in annotation:
url_citation[field] = annotation[field]
url_citation["type"] = "citation"
url_citation["url"] = annotation["url"]
for field in annotation:
if field not in known_fields:
if "extras" not in url_citation:
url_citation["extras"] = {}
url_citation["extras"][field] = annotation[field]
return url_citation
elif annotation_type == "file_citation":
known_fields = {"type", "title", "cited_text", "start_index", "end_index"}
document_citation: types.Citation = {"type": "citation"}
if "filename" in annotation:
document_citation["title"] = annotation.pop("filename")
for field in annotation:
if field not in known_fields:
if "extras" not in document_citation:
document_citation["extras"] = {}
document_citation["extras"][field] = annotation[field]
return document_citation
# TODO: standardise container_file_citation?
else:
non_standard_annotation: types.NonStandardAnnotation = {
"type": "non_standard_annotation",
"value": annotation,
}
return non_standard_annotation
def _explode_reasoning(block: dict[str, Any]) -> Iterable[types.ReasoningContentBlock]:
if "summary" not in block:
yield cast(types.ReasoningContentBlock, block)
return
known_fields = {"type", "reasoning", "id", "index"}
unknown_fields = [
field for field in block if field != "summary" and field not in known_fields
]
if unknown_fields:
block["extras"] = {}
for field in unknown_fields:
block["extras"][field] = block.pop(field)
if not block["summary"]:
_ = block.pop("summary", None)
yield cast(types.ReasoningContentBlock, block)
return
# Common part for every exploded line, except 'summary'
common = {k: v for k, v in block.items() if k in known_fields}
# Optional keys that must appear only in the first exploded item
first_only = block.pop("extras", None)
for idx, part in enumerate(block["summary"]):
new_block = dict(common)
new_block["reasoning"] = part.get("text", "")
if idx == 0 and first_only:
new_block.update(first_only)
yield cast(types.ReasoningContentBlock, new_block)
def _convert_to_v1_from_responses(
content: list[dict[str, Any]],
tool_calls: Optional[list[types.ToolCall]] = None,
invalid_tool_calls: Optional[list[types.InvalidToolCall]] = None,
) -> list[types.ContentBlock]:
"""Mutate a Responses message to v1 format."""
def _iter_blocks() -> Iterable[types.ContentBlock]:
for block in content:
if not isinstance(block, dict):
continue
block_type = block.get("type")
if block_type == "text":
if "annotations" in block:
block["annotations"] = [
_convert_annotation_to_v1(a) for a in block["annotations"]
]
yield cast(types.TextContentBlock, block)
elif block_type == "reasoning":
yield from _explode_reasoning(block)
elif block_type == "image_generation_call" and (
result := block.get("result")
):
new_block = {"type": "image", "base64": result}
if output_format := block.get("output_format"):
new_block["mime_type"] = f"image/{output_format}"
if "id" in block:
new_block["id"] = block["id"]
if "index" in block:
new_block["index"] = block["index"]
for extra_key in (
"status",
"background",
"output_format",
"quality",
"revised_prompt",
"size",
):
if extra_key in block:
new_block[extra_key] = block[extra_key]
yield cast(types.ImageContentBlock, new_block)
elif block_type == "function_call":
tool_call_block: Optional[types.ContentBlock] = None
call_id = block.get("call_id", "")
if call_id:
for tool_call in tool_calls or []:
if tool_call.get("id") == call_id:
tool_call_block = cast(types.ToolCall, tool_call.copy())
break
else:
for invalid_tool_call in invalid_tool_calls or []:
if invalid_tool_call.get("id") == call_id:
tool_call_block = cast(
types.InvalidToolCall, invalid_tool_call.copy()
)
break
if tool_call_block:
if "id" in block:
if "extras" not in tool_call_block:
tool_call_block["extras"] = {}
tool_call_block["extras"]["item_id"] = block["id"] # type: ignore[typeddict-item]
if "index" in block:
tool_call_block["index"] = block["index"]
yield tool_call_block
elif block_type == "web_search_call":
web_search_call = {"type": "web_search_call", "id": block["id"]}
if "index" in block:
web_search_call["index"] = block["index"]
if (
"action" in block
and isinstance(block["action"], dict)
and block["action"].get("type") == "search"
and "query" in block["action"]
):
web_search_call["query"] = block["action"]["query"]
for key in block:
if key not in ("type", "id"):
web_search_call[key] = block[key]
web_search_result = {"type": "web_search_result", "id": block["id"]}
if "index" in block:
web_search_result["index"] = block["index"] + 1
yield cast(types.WebSearchCall, web_search_call)
yield cast(types.WebSearchResult, web_search_result)
elif block_type == "code_interpreter_call":
code_interpreter_call = {
"type": "code_interpreter_call",
"id": block["id"],
}
if "code" in block:
code_interpreter_call["code"] = block["code"]
if "container_id" in block:
code_interpreter_call["container_id"] = block["container_id"]
if "index" in block:
code_interpreter_call["index"] = block["index"]
code_interpreter_result = {
"type": "code_interpreter_result",
"id": block["id"],
}
if "outputs" in block:
code_interpreter_result["outputs"] = block["outputs"]
for output in block["outputs"]:
if (
isinstance(output, dict)
and (output_type := output.get("type"))
and output_type == "logs"
):
if "output" not in code_interpreter_result:
code_interpreter_result["output"] = []
code_interpreter_result["output"].append(
{
"type": "code_interpreter_output",
"stdout": output.get("logs", ""),
}
)
if "status" in block:
code_interpreter_result["status"] = block["status"]
if "index" in block:
code_interpreter_result["index"] = block["index"] + 1
yield cast(types.CodeInterpreterCall, code_interpreter_call)
yield cast(types.CodeInterpreterResult, code_interpreter_result)
else:
new_block = {"type": "non_standard", "value": block}
if "index" in new_block["value"]:
new_block["index"] = new_block["value"].pop("index")
yield cast(types.NonStandardContentBlock, new_block)
return list(_iter_blocks())
def _convert_annotation_from_v1(annotation: types.Annotation) -> dict[str, Any]:
if annotation["type"] == "citation":
new_ann: dict[str, Any] = {}
for field in ("end_index", "start_index"):
if field in annotation:
new_ann[field] = annotation[field]
if "url" in annotation:
# URL citation
if "title" in annotation:
new_ann["title"] = annotation["title"]
new_ann["type"] = "url_citation"
new_ann["url"] = annotation["url"]
else:
# Document citation
new_ann["type"] = "file_citation"
if "title" in annotation:
new_ann["filename"] = annotation["title"]
if extra_fields := annotation.get("extras"):
for field, value in extra_fields.items():
new_ann[field] = value
return new_ann
elif annotation["type"] == "non_standard_annotation":
return annotation["value"]
else:
return dict(annotation)
def _implode_reasoning_blocks(blocks: list[dict[str, Any]]) -> Iterable[dict[str, Any]]:
i = 0
n = len(blocks)
while i < n:
block = blocks[i]
# Skip non-reasoning blocks or blocks already in Responses format
if block.get("type") != "reasoning" or "summary" in block:
yield dict(block)
i += 1
continue
elif "reasoning" not in block and "summary" not in block:
# {"type": "reasoning", "id": "rs_..."}
oai_format = {**block, "summary": []}
if "extras" in oai_format:
oai_format.update(oai_format.pop("extras"))
oai_format["type"] = oai_format.pop("type", "reasoning")
if "encrypted_content" in oai_format:
oai_format["encrypted_content"] = oai_format.pop("encrypted_content")
yield oai_format
i += 1
continue
else:
pass
summary: list[dict[str, str]] = [
{"type": "summary_text", "text": block.get("reasoning", "")}
]
# 'common' is every field except the exploded 'reasoning'
common = {k: v for k, v in block.items() if k != "reasoning"}
if "extras" in common:
common.update(common.pop("extras"))
i += 1
while i < n:
next_ = blocks[i]
if next_.get("type") == "reasoning" and "reasoning" in next_:
summary.append(
{"type": "summary_text", "text": next_.get("reasoning", "")}
)
i += 1
else:
break
merged = dict(common)
merged["summary"] = summary
merged["type"] = merged.pop("type", "reasoning")
yield merged
def _consolidate_calls(
items: Iterable[dict[str, Any]],
call_name: Literal["web_search_call", "code_interpreter_call"],
result_name: Literal["web_search_result", "code_interpreter_result"],
) -> Iterator[dict[str, Any]]:
"""
Generator that walks through *items* and, whenever it meets the pair
{"type": "web_search_call", "id": X, ...}
{"type": "web_search_result", "id": X}
merges them into
{"id": X,
"action": …,
"status": …,
"type": "web_search_call"}
keeping every other element untouched.
"""
items = iter(items) # make sure we have a true iterator
for current in items:
# Only a call can start a pair worth collapsing
if current.get("type") != call_name:
yield current
continue
try:
nxt = next(items) # look-ahead one element
except StopIteration: # no “result” just yield the call back
yield current
break
# If this really is the matching “result” collapse
if nxt.get("type") == result_name and nxt.get("id") == current.get("id"):
if call_name == "web_search_call":
collapsed = {"id": current["id"]}
if "action" in current:
collapsed["action"] = current["action"]
collapsed["status"] = current["status"]
collapsed["type"] = "web_search_call"
if call_name == "code_interpreter_call":
collapsed = {"id": current["id"]}
for key in ("code", "container_id"):
if key in current:
collapsed[key] = current[key]
for key in ("outputs", "status"):
if key in nxt:
collapsed[key] = nxt[key]
collapsed["type"] = "code_interpreter_call"
yield collapsed
else:
# Not a matching pair emit both, in original order
yield current
yield nxt
def _convert_from_v1_to_responses(
content: list[types.ContentBlock], tool_calls: list[types.ToolCall]
) -> list[dict[str, Any]]:
new_content: list = []
for block in content:
if block["type"] == "text" and "annotations" in block:
# Need a copy because were changing the annotations list
new_block = dict(block)
new_block["annotations"] = [
_convert_annotation_from_v1(a) for a in block["annotations"]
]
new_content.append(new_block)
elif block["type"] == "tool_call":
new_block = {"type": "function_call", "call_id": block["id"]}
if "extras" in block and "item_id" in block["extras"]:
new_block["id"] = block["extras"]["item_id"]
if "name" in block:
new_block["name"] = block["name"]
if "extras" in block and "arguments" in block["extras"]:
new_block["arguments"] = block["extras"]["arguments"]
if any(key not in block for key in ("name", "arguments")):
matching_tool_calls = [
call for call in tool_calls if call["id"] == block["id"]
]
if matching_tool_calls:
tool_call = matching_tool_calls[0]
if "name" not in block:
new_block["name"] = tool_call["name"]
if "arguments" not in block:
new_block["arguments"] = json.dumps(tool_call["args"])
new_content.append(new_block)
elif (
is_data_content_block(cast(dict, block))
and block["type"] == "image"
and "base64" in block
and isinstance(block.get("id"), str)
and block["id"].startswith("ig_")
):
new_block = {"type": "image_generation_call", "result": block["base64"]}
for extra_key in ("id", "status"):
if extra_key in block:
new_block[extra_key] = block[extra_key] # type: ignore[typeddict-item]
new_content.append(new_block)
elif block["type"] == "non_standard" and "value" in block:
new_content.append(block["value"])
else:
new_content.append(block)
new_content = list(_implode_reasoning_blocks(new_content))
new_content = list(
_consolidate_calls(new_content, "web_search_call", "web_search_result")
)
new_content = list(
_consolidate_calls(
new_content, "code_interpreter_call", "code_interpreter_result"
)
)
return new_content