openai[patch]: refactor handling of Responses API (#31587)

This commit is contained in:
ccurme
2025-06-16 14:01:39 -04:00
committed by GitHub
parent 532e6455e9
commit b9357d456e
10 changed files with 1423 additions and 209 deletions

View File

@@ -106,6 +106,10 @@ from langchain_openai.chat_models._client_utils import (
_get_default_async_httpx_client,
_get_default_httpx_client,
)
from langchain_openai.chat_models._compat import (
_convert_from_v03_ai_message,
_convert_to_v03_ai_message,
)
if TYPE_CHECKING:
from openai.types.responses import Response
@@ -116,8 +120,6 @@ logger = logging.getLogger(__name__)
# https://www.python-httpx.org/advanced/ssl/#configuring-client-instances
global_ssl_context = ssl.create_default_context(cafile=certifi.where())
_FUNCTION_CALL_IDS_MAP_KEY = "__openai_function_call_ids__"
WellKnownTools = (
"file_search",
"web_search_preview",
@@ -797,15 +799,27 @@ class BaseChatOpenAI(BaseChatModel):
with context_manager as response:
is_first_chunk = True
current_index = -1
current_output_index = -1
current_sub_index = -1
has_reasoning = False
for chunk in response:
metadata = headers if is_first_chunk else {}
if generation_chunk := _convert_responses_chunk_to_generation_chunk(
(
current_index,
current_output_index,
current_sub_index,
generation_chunk,
) = _convert_responses_chunk_to_generation_chunk(
chunk,
current_index,
current_output_index,
current_sub_index,
schema=original_schema_obj,
metadata=metadata,
has_reasoning=has_reasoning,
):
)
if generation_chunk:
if run_manager:
run_manager.on_llm_new_token(
generation_chunk.text, chunk=generation_chunk
@@ -839,15 +853,27 @@ class BaseChatOpenAI(BaseChatModel):
async with context_manager as response:
is_first_chunk = True
current_index = -1
current_output_index = -1
current_sub_index = -1
has_reasoning = False
async for chunk in response:
metadata = headers if is_first_chunk else {}
if generation_chunk := _convert_responses_chunk_to_generation_chunk(
(
current_index,
current_output_index,
current_sub_index,
generation_chunk,
) = _convert_responses_chunk_to_generation_chunk(
chunk,
current_index,
current_output_index,
current_sub_index,
schema=original_schema_obj,
metadata=metadata,
has_reasoning=has_reasoning,
):
)
if generation_chunk:
if run_manager:
await run_manager.on_llm_new_token(
generation_chunk.text, chunk=generation_chunk
@@ -3209,27 +3235,26 @@ def _make_computer_call_output_from_message(message: ToolMessage) -> dict:
return computer_call_output
def _pop_summary_index_from_reasoning(reasoning: dict) -> dict:
"""When streaming, langchain-core uses the ``index`` key to aggregate reasoning
def _pop_index_and_sub_index(block: dict) -> dict:
"""When streaming, langchain-core uses the ``index`` key to aggregate
text blocks. OpenAI API does not support this key, so we need to remove it.
N.B. OpenAI also does not appear to support the ``summary_inex`` key when passed
back in.
"""
new_reasoning = reasoning.copy()
if "summary" in reasoning and isinstance(reasoning["summary"], list):
new_block = {k: v for k, v in block.items() if k != "index"}
if "summary" in new_block and isinstance(new_block["summary"], list):
new_summary = []
for block in reasoning["summary"]:
new_block = {k: v for k, v in block.items() if k != "index"}
new_summary.append(new_block)
new_reasoning["summary"] = new_summary
return new_reasoning
for sub_block in new_block["summary"]:
new_sub_block = {k: v for k, v in sub_block.items() if k != "index"}
new_summary.append(new_sub_block)
new_block["summary"] = new_summary
return new_block
def _construct_responses_api_input(messages: Sequence[BaseMessage]) -> list:
"""Construct the input for the OpenAI Responses API."""
input_ = []
for lc_msg in messages:
if isinstance(lc_msg, AIMessage):
lc_msg = _convert_from_v03_ai_message(lc_msg)
msg = _convert_message_to_dict(lc_msg)
# "name" parameter unsupported
if "name" in msg:
@@ -3251,97 +3276,85 @@ def _construct_responses_api_input(messages: Sequence[BaseMessage]) -> list:
}
input_.append(function_call_output)
elif msg["role"] == "assistant":
# Reasoning items
reasoning_items = []
if reasoning := lc_msg.additional_kwargs.get("reasoning"):
reasoning_items.append(_pop_summary_index_from_reasoning(reasoning))
input_.extend(reasoning_items)
# Function calls
function_calls = []
if tool_calls := msg.pop("tool_calls", None):
# TODO: should you be able to preserve the function call object id on
# the langchain tool calls themselves?
function_call_ids = lc_msg.additional_kwargs.get(
_FUNCTION_CALL_IDS_MAP_KEY
)
for tool_call in tool_calls:
function_call = {
"type": "function_call",
"name": tool_call["function"]["name"],
"arguments": tool_call["function"]["arguments"],
"call_id": tool_call["id"],
}
if function_call_ids is not None and (
_id := function_call_ids.get(tool_call["id"])
):
function_call["id"] = _id
function_calls.append(function_call)
# Built-in tool calls
computer_calls = []
code_interpreter_calls = []
mcp_calls = []
image_generation_calls = []
tool_outputs = lc_msg.additional_kwargs.get("tool_outputs", [])
for tool_output in tool_outputs:
if tool_output.get("type") == "computer_call":
computer_calls.append(tool_output)
elif tool_output.get("type") == "code_interpreter_call":
code_interpreter_calls.append(tool_output)
elif tool_output.get("type") == "mcp_call":
mcp_calls.append(tool_output)
elif tool_output.get("type") == "image_generation_call":
image_generation_calls.append(tool_output)
else:
pass
input_.extend(code_interpreter_calls)
input_.extend(mcp_calls)
# A previous image generation call can be referenced by ID
input_.extend(
[
{"type": "image_generation_call", "id": image_generation_call["id"]}
for image_generation_call in image_generation_calls
]
)
msg["content"] = msg.get("content") or []
if lc_msg.additional_kwargs.get("refusal"):
if isinstance(msg["content"], str):
msg["content"] = [
{
"type": "output_text",
"text": msg["content"],
"annotations": [],
}
]
msg["content"] = msg["content"] + [
{"type": "refusal", "refusal": lc_msg.additional_kwargs["refusal"]}
]
if isinstance(msg["content"], list):
new_blocks = []
if isinstance(msg.get("content"), list):
for block in msg["content"]:
# chat api: {"type": "text", "text": "..."}
# responses api: {"type": "output_text", "text": "...", "annotations": [...]} # noqa: E501
if block["type"] == "text":
new_blocks.append(
{
"type": "output_text",
"text": block["text"],
"annotations": block.get("annotations") or [],
}
)
elif block["type"] in ("output_text", "refusal"):
new_blocks.append(block)
else:
pass
msg["content"] = new_blocks
if msg["content"]:
if lc_msg.id and lc_msg.id.startswith("msg_"):
msg["id"] = lc_msg.id
input_.append(msg)
input_.extend(function_calls)
input_.extend(computer_calls)
if isinstance(block, dict) and (block_type := block.get("type")):
# Aggregate content blocks for a single message
if block_type in ("text", "output_text", "refusal"):
msg_id = block.get("id")
if block_type in ("text", "output_text"):
new_block = {
"type": "output_text",
"text": block["text"],
"annotations": block.get("annotations") or [],
}
elif block_type == "refusal":
new_block = {
"type": "refusal",
"refusal": block["refusal"],
}
for item in input_:
if (item_id := item.get("id")) and item_id == msg_id:
# If existing block with this ID, append to it
if "content" not in item:
item["content"] = []
item["content"].append(new_block)
break
else:
# If no block with this ID, create a new one
input_.append(
{
"type": "message",
"content": [new_block],
"role": "assistant",
"id": msg_id,
}
)
elif block_type in (
"reasoning",
"web_search_call",
"file_search_call",
"function_call",
"computer_call",
"code_interpreter_call",
"mcp_call",
"mcp_list_tools",
"mcp_approval_request",
):
input_.append(_pop_index_and_sub_index(block))
elif block_type == "image_generation_call":
# A previous image generation call can be referenced by ID
input_.append(
{"type": "image_generation_call", "id": block["id"]}
)
else:
pass
elif isinstance(msg.get("content"), str):
input_.append(
{
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": msg["content"]}],
}
)
# Add function calls from tool calls if not already present
if tool_calls := msg.pop("tool_calls", None):
content_call_ids = {
block["call_id"]
for block in input_
if block.get("type") == "function_call" and "call_id" in block
}
for tool_call in tool_calls:
if tool_call["id"] not in content_call_ids:
function_call = {
"type": "function_call",
"name": tool_call["function"]["name"],
"arguments": tool_call["function"]["arguments"],
"call_id": tool_call["id"],
}
input_.append(function_call)
elif msg["role"] in ("user", "system", "developer"):
if isinstance(msg["content"], list):
new_blocks = []
@@ -3396,6 +3409,8 @@ def _construct_lc_result_from_responses_api(
if k
in (
"created_at",
# backwards compatibility: keep response ID in response_metadata as well as
# top-level-id
"id",
"incomplete_details",
"metadata",
@@ -3419,7 +3434,6 @@ def _construct_lc_result_from_responses_api(
tool_calls = []
invalid_tool_calls = []
additional_kwargs: dict = {}
msg_id = None
for output in response.output:
if output.type == "message":
for content in output.content:
@@ -3431,14 +3445,17 @@ def _construct_lc_result_from_responses_api(
annotation.model_dump()
for annotation in content.annotations
],
"id": output.id,
}
content_blocks.append(block)
if hasattr(content, "parsed"):
additional_kwargs["parsed"] = content.parsed
if content.type == "refusal":
additional_kwargs["refusal"] = content.refusal
msg_id = output.id
content_blocks.append(
{"type": "refusal", "refusal": content.refusal, "id": output.id}
)
elif output.type == "function_call":
content_blocks.append(output.model_dump(exclude_none=True, mode="json"))
try:
args = json.loads(output.arguments, strict=False)
error = None
@@ -3462,19 +3479,19 @@ def _construct_lc_result_from_responses_api(
"error": error,
}
invalid_tool_calls.append(tool_call)
if _FUNCTION_CALL_IDS_MAP_KEY not in additional_kwargs:
additional_kwargs[_FUNCTION_CALL_IDS_MAP_KEY] = {}
additional_kwargs[_FUNCTION_CALL_IDS_MAP_KEY][output.call_id] = output.id
elif output.type == "reasoning":
additional_kwargs["reasoning"] = output.model_dump(
exclude_none=True, mode="json"
)
else:
tool_output = output.model_dump(exclude_none=True, mode="json")
if "tool_outputs" in additional_kwargs:
additional_kwargs["tool_outputs"].append(tool_output)
else:
additional_kwargs["tool_outputs"] = [tool_output]
elif output.type in (
"reasoning",
"web_search_call",
"file_search_call",
"computer_call",
"code_interpreter_call",
"mcp_call",
"mcp_list_tools",
"mcp_approval_request",
"image_generation_call",
):
content_blocks.append(output.model_dump(exclude_none=True, mode="json"))
# Workaround for parsing structured output in the streaming case.
# from openai import OpenAI
# from pydantic import BaseModel
@@ -3510,22 +3527,70 @@ def _construct_lc_result_from_responses_api(
pass
message = AIMessage(
content=content_blocks,
id=msg_id,
id=response.id,
usage_metadata=usage_metadata,
response_metadata=response_metadata,
additional_kwargs=additional_kwargs,
tool_calls=tool_calls,
invalid_tool_calls=invalid_tool_calls,
)
message = _convert_to_v03_ai_message(message)
return ChatResult(generations=[ChatGeneration(message=message)])
def _convert_responses_chunk_to_generation_chunk(
chunk: Any,
current_index: int, # index in content
current_output_index: int, # index in Response output
current_sub_index: int, # index of content block in output item
schema: Optional[type[_BM]] = None,
metadata: Optional[dict] = None,
has_reasoning: bool = False,
) -> Optional[ChatGenerationChunk]:
) -> tuple[int, int, int, Optional[ChatGenerationChunk]]:
def _advance(output_idx: int, sub_idx: Optional[int] = None) -> None:
"""Advance indexes tracked during streaming.
Example: we stream a response item of the form:
.. code-block:: python
{
"type": "message", # output_index 0
"role": "assistant",
"id": "msg_123",
"content": [
{"type": "output_text", "text": "foo"}, # sub_index 0
{"type": "output_text", "text": "bar"}, # sub_index 1
],
}
This is a single item with a shared ``output_index`` and two sub-indexes, one
for each content block.
This will be processed into an AIMessage with two text blocks:
.. code-block:: python
AIMessage(
[
{"type": "text", "text": "foo", "id": "msg_123"}, # index 0
{"type": "text", "text": "bar", "id": "msg_123"}, # index 1
]
)
This function just identifies updates in output or sub-indexes and increments
the current index accordingly.
"""
nonlocal current_index, current_output_index, current_sub_index
if sub_idx is None:
if current_output_index != output_idx:
current_index += 1
else:
if (current_output_index != output_idx) or (current_sub_index != sub_idx):
current_index += 1
current_sub_index = sub_idx
current_output_index = output_idx
content = []
tool_call_chunks: list = []
additional_kwargs: dict = {}
@@ -3536,16 +3601,18 @@ def _convert_responses_chunk_to_generation_chunk(
usage_metadata = None
id = None
if chunk.type == "response.output_text.delta":
content.append(
{"type": "text", "text": chunk.delta, "index": chunk.content_index}
)
_advance(chunk.output_index, chunk.content_index)
content.append({"type": "text", "text": chunk.delta, "index": current_index})
elif chunk.type == "response.output_text.annotation.added":
_advance(chunk.output_index, chunk.content_index)
if isinstance(chunk.annotation, dict):
# Appears to be a breaking change in openai==1.82.0
annotation = chunk.annotation
else:
annotation = chunk.annotation.model_dump(exclude_none=True, mode="json")
content.append({"annotations": [annotation], "index": chunk.content_index})
content.append({"annotations": [annotation], "index": current_index})
elif chunk.type == "response.output_text.done":
content.append({"id": chunk.item_id, "index": current_index})
elif chunk.type == "response.created":
response_metadata["id"] = chunk.response.id
elif chunk.type == "response.completed":
@@ -3569,18 +3636,26 @@ def _convert_responses_chunk_to_generation_chunk(
chunk.type == "response.output_item.added"
and chunk.item.type == "function_call"
):
_advance(chunk.output_index)
tool_call_chunks.append(
{
"type": "tool_call_chunk",
"name": chunk.item.name,
"args": chunk.item.arguments,
"id": chunk.item.call_id,
"index": chunk.output_index,
"index": current_index,
}
)
content.append(
{
"type": "function_call",
"name": chunk.item.name,
"arguments": chunk.item.arguments,
"call_id": chunk.item.call_id,
"id": chunk.item.id,
"index": current_index,
}
)
additional_kwargs[_FUNCTION_CALL_IDS_MAP_KEY] = {
chunk.item.call_id: chunk.item.id
}
elif chunk.type == "response.output_item.done" and chunk.item.type in (
"web_search_call",
"file_search_call",
@@ -3591,55 +3666,70 @@ def _convert_responses_chunk_to_generation_chunk(
"mcp_approval_request",
"image_generation_call",
):
additional_kwargs["tool_outputs"] = [
chunk.item.model_dump(exclude_none=True, mode="json")
]
_advance(chunk.output_index)
tool_output = chunk.item.model_dump(exclude_none=True, mode="json")
tool_output["index"] = current_index
content.append(tool_output)
elif chunk.type == "response.function_call_arguments.delta":
_advance(chunk.output_index)
tool_call_chunks.append(
{
"type": "tool_call_chunk",
"args": chunk.delta,
"index": chunk.output_index,
}
{"type": "tool_call_chunk", "args": chunk.delta, "index": current_index}
)
content.append(
{"type": "function_call", "arguments": chunk.delta, "index": current_index}
)
elif chunk.type == "response.refusal.done":
additional_kwargs["refusal"] = chunk.refusal
content.append({"type": "refusal", "refusal": chunk.refusal})
elif chunk.type == "response.output_item.added" and chunk.item.type == "reasoning":
if not has_reasoning:
# Hack until breaking release: store first reasoning item ID.
additional_kwargs["reasoning"] = chunk.item.model_dump(
exclude_none=True, mode="json"
)
_advance(chunk.output_index)
reasoning = chunk.item.model_dump(exclude_none=True, mode="json")
reasoning["index"] = current_index
content.append(reasoning)
elif chunk.type == "response.reasoning_summary_part.added":
additional_kwargs["reasoning"] = {
# langchain-core uses the `index` key to aggregate text blocks.
"summary": [
{"index": chunk.summary_index, "type": "summary_text", "text": ""}
]
}
_advance(chunk.output_index)
content.append(
{
# langchain-core uses the `index` key to aggregate text blocks.
"summary": [
{"index": chunk.summary_index, "type": "summary_text", "text": ""}
],
"index": current_index,
}
)
elif chunk.type == "response.image_generation_call.partial_image":
# Partial images are not supported yet.
pass
elif chunk.type == "response.reasoning_summary_text.delta":
additional_kwargs["reasoning"] = {
"summary": [
{
"index": chunk.summary_index,
"type": "summary_text",
"text": chunk.delta,
}
]
}
else:
return None
return ChatGenerationChunk(
message=AIMessageChunk(
content=content, # type: ignore[arg-type]
tool_call_chunks=tool_call_chunks,
usage_metadata=usage_metadata,
response_metadata=response_metadata,
additional_kwargs=additional_kwargs,
id=id,
_advance(chunk.output_index)
content.append(
{
"summary": [
{
"index": chunk.summary_index,
"type": "summary_text",
"text": chunk.delta,
}
],
"index": current_index,
}
)
else:
return current_index, current_output_index, current_sub_index, None
message = AIMessageChunk(
content=content, # type: ignore[arg-type]
tool_call_chunks=tool_call_chunks,
usage_metadata=usage_metadata,
response_metadata=response_metadata,
additional_kwargs=additional_kwargs,
id=id,
)
message = cast(
AIMessageChunk, _convert_to_v03_ai_message(message, has_reasoning=has_reasoning)
)
return (
current_index,
current_output_index,
current_sub_index,
ChatGenerationChunk(message=message),
)