openai[patch]: support Responses API (#30231)

Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
ccurme
2025-03-12 12:25:46 -04:00
committed by GitHub
parent 49bdd3b6fe
commit cd1ea8e94d
12 changed files with 1933 additions and 74 deletions

View File

@@ -12,9 +12,11 @@ import sys
import warnings
from functools import partial
from io import BytesIO
from json import JSONDecodeError
from math import ceil
from operator import itemgetter
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Callable,
@@ -89,6 +91,7 @@ from langchain_core.runnables import (
)
from langchain_core.runnables.config import run_in_executor
from langchain_core.tools import BaseTool
from langchain_core.tools.base import _stringify
from langchain_core.utils import get_pydantic_field_names
from langchain_core.utils.function_calling import (
convert_to_openai_function,
@@ -104,12 +107,17 @@ from pydantic import BaseModel, ConfigDict, Field, SecretStr, model_validator
from pydantic.v1 import BaseModel as BaseModelV1
from typing_extensions import Self
if TYPE_CHECKING:
from openai.types.responses import Response
logger = logging.getLogger(__name__)
# This SSL context is equivelent to the default `verify=True`.
# https://www.python-httpx.org/advanced/ssl/#configuring-client-instances
global_ssl_context = ssl.create_default_context(cafile=certifi.where())
_FUNCTION_CALL_IDS_MAP_KEY = "__openai_function_call_ids__"
def _convert_dict_to_message(_dict: Mapping[str, Any]) -> BaseMessage:
"""Convert a dictionary to a LangChain message.
@@ -528,6 +536,14 @@ class BaseChatOpenAI(BaseChatModel):
invocation.
"""
use_responses_api: Optional[bool] = None
"""Whether to use the Responses API instead of the Chat API.
If not specified then will be inferred based on invocation params.
.. versionadded:: 0.3.9
"""
model_config = ConfigDict(populate_by_name=True)
@model_validator(mode="before")
@@ -654,7 +670,7 @@ class BaseChatOpenAI(BaseChatModel):
if output is None:
# Happens in streaming
continue
token_usage = output["token_usage"]
token_usage = output.get("token_usage")
if token_usage is not None:
for k, v in token_usage.items():
if v is None:
@@ -725,6 +741,50 @@ class BaseChatOpenAI(BaseChatModel):
)
return generation_chunk
def _stream_responses(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
kwargs["stream"] = True
payload = self._get_request_payload(messages, stop=stop, **kwargs)
context_manager = self.root_client.responses.create(**payload)
with context_manager as response:
for chunk in response:
if generation_chunk := _convert_responses_chunk_to_generation_chunk(
chunk
):
if run_manager:
run_manager.on_llm_new_token(
generation_chunk.text, chunk=generation_chunk
)
yield generation_chunk
async def _astream_responses(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
kwargs["stream"] = True
payload = self._get_request_payload(messages, stop=stop, **kwargs)
context_manager = await self.root_async_client.responses.create(**payload)
async with context_manager as response:
async for chunk in response:
if generation_chunk := _convert_responses_chunk_to_generation_chunk(
chunk
):
if run_manager:
await run_manager.on_llm_new_token(
generation_chunk.text, chunk=generation_chunk
)
yield generation_chunk
def _stream(
self,
messages: List[BaseMessage],
@@ -819,10 +879,19 @@ class BaseChatOpenAI(BaseChatModel):
raw_response = self.client.with_raw_response.create(**payload)
response = raw_response.parse()
generation_info = {"headers": dict(raw_response.headers)}
elif self._use_responses_api(payload):
response = self.root_client.responses.create(**payload)
return _construct_lc_result_from_responses_api(response)
else:
response = self.client.create(**payload)
return self._create_chat_result(response, generation_info)
def _use_responses_api(self, payload: dict) -> bool:
if isinstance(self.use_responses_api, bool):
return self.use_responses_api
else:
return _use_responses_api(payload)
def _get_request_payload(
self,
input_: LanguageModelInput,
@@ -834,11 +903,12 @@ class BaseChatOpenAI(BaseChatModel):
if stop is not None:
kwargs["stop"] = stop
return {
"messages": [_convert_message_to_dict(m) for m in messages],
**self._default_params,
**kwargs,
}
payload = {**self._default_params, **kwargs}
if self._use_responses_api(payload):
payload = _construct_responses_api_payload(messages, payload)
else:
payload["messages"] = [_convert_message_to_dict(m) for m in messages]
return payload
def _create_chat_result(
self,
@@ -877,6 +947,8 @@ class BaseChatOpenAI(BaseChatModel):
"model_name": response_dict.get("model", self.model_name),
"system_fingerprint": response_dict.get("system_fingerprint", ""),
}
if "id" in response_dict:
llm_output["id"] = response_dict["id"]
if isinstance(response, openai.BaseModel) and getattr(
response, "choices", None
@@ -989,6 +1061,9 @@ class BaseChatOpenAI(BaseChatModel):
raw_response = await self.async_client.with_raw_response.create(**payload)
response = raw_response.parse()
generation_info = {"headers": dict(raw_response.headers)}
elif self._use_responses_api(payload):
response = await self.root_async_client.responses.create(**payload)
return _construct_lc_result_from_responses_api(response)
else:
response = await self.async_client.create(**payload)
return await run_in_executor(
@@ -1258,33 +1333,38 @@ class BaseChatOpenAI(BaseChatModel):
formatted_tools = [
convert_to_openai_tool(tool, strict=strict) for tool in tools
]
tool_names = []
for tool in formatted_tools:
if "function" in tool:
tool_names.append(tool["function"]["name"])
elif "name" in tool:
tool_names.append(tool["name"])
else:
pass
if tool_choice:
if isinstance(tool_choice, str):
# tool_choice is a tool/function name
if tool_choice not in ("auto", "none", "any", "required"):
if tool_choice in tool_names:
tool_choice = {
"type": "function",
"function": {"name": tool_choice},
}
elif tool_choice in (
"file_search",
"web_search_preview",
"computer_use_preview",
):
tool_choice = {"type": tool_choice}
# 'any' is not natively supported by OpenAI API.
# We support 'any' since other models use this instead of 'required'.
if tool_choice == "any":
elif tool_choice == "any":
tool_choice = "required"
else:
pass
elif isinstance(tool_choice, bool):
tool_choice = "required"
elif isinstance(tool_choice, dict):
tool_names = [
formatted_tool["function"]["name"]
for formatted_tool in formatted_tools
]
if not any(
tool_name == tool_choice["function"]["name"]
for tool_name in tool_names
):
raise ValueError(
f"Tool choice {tool_choice} was specified, but the only "
f"provided tools were {tool_names}."
)
pass
else:
raise ValueError(
f"Unrecognized tool_choice type. Expected str, bool or dict. "
@@ -1562,6 +1642,8 @@ class ChatOpenAI(BaseChatOpenAI): # type: ignore[override]
stream_options: Dict
Configure streaming outputs, like whether to return token usage when
streaming (``{"include_usage": True}``).
use_responses_api: Optional[bool]
Whether to use the responses API.
See full list of supported init args and their descriptions in the params section.
@@ -1805,6 +1887,79 @@ class ChatOpenAI(BaseChatOpenAI): # type: ignore[override]
See ``ChatOpenAI.bind_tools()`` method for more.
.. dropdown:: Built-in tools
.. versionadded:: 0.3.9
You can access `built-in tools <https://platform.openai.com/docs/guides/tools?api-mode=responses>`_
supported by the OpenAI Responses API. See LangChain
`docs <https://python.langchain.com/docs/integrations/chat/openai/>`_ for more
detail.
.. code-block:: python
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
tool = {"type": "web_search_preview"}
llm_with_tools = llm.bind_tools([tool])
response = llm_with_tools.invoke("What was a positive news story from today?")
response.content
.. code-block:: python
[
{
"type": "text",
"text": "Today, a heartwarming story emerged from ...",
"annotations": [
{
"end_index": 778,
"start_index": 682,
"title": "Title of story",
"type": "url_citation",
"url": "<url of story>",
}
],
}
]
.. dropdown:: Managing conversation state
.. versionadded:: 0.3.9
OpenAI's Responses API supports management of
`conversation state <https://platform.openai.com/docs/guides/conversation-state?api-mode=responses>`_.
Passing in response IDs from previous messages will continue a conversational
thread. See LangChain
`docs <https://python.langchain.com/docs/integrations/chat/openai/>`_ for more
detail.
.. code-block:: python
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", use_responses_api=True)
response = llm.invoke("Hi, I'm Bob.")
response.text()
.. code-block:: python
"Hi Bob! How can I assist you today?"
.. code-block:: python
second_response = llm.invoke(
"What is my name?", previous_response_id=response.response_metadata["id"]
)
second_response.text()
.. code-block:: python
"Your name is Bob. How can I help you today, Bob?"
.. dropdown:: Structured output
.. code-block:: python
@@ -2082,27 +2237,34 @@ class ChatOpenAI(BaseChatOpenAI): # type: ignore[override]
self, *args: Any, stream_usage: Optional[bool] = None, **kwargs: Any
) -> Iterator[ChatGenerationChunk]:
"""Set default stream_options."""
stream_usage = self._should_stream_usage(stream_usage, **kwargs)
# Note: stream_options is not a valid parameter for Azure OpenAI.
# To support users proxying Azure through ChatOpenAI, here we only specify
# stream_options if include_usage is set to True.
# See https://learn.microsoft.com/en-us/azure/ai-services/openai/whats-new
# for release notes.
if stream_usage:
kwargs["stream_options"] = {"include_usage": stream_usage}
if self._use_responses_api(kwargs):
return super()._stream_responses(*args, **kwargs)
else:
stream_usage = self._should_stream_usage(stream_usage, **kwargs)
# Note: stream_options is not a valid parameter for Azure OpenAI.
# To support users proxying Azure through ChatOpenAI, here we only specify
# stream_options if include_usage is set to True.
# See https://learn.microsoft.com/en-us/azure/ai-services/openai/whats-new
# for release notes.
if stream_usage:
kwargs["stream_options"] = {"include_usage": stream_usage}
return super()._stream(*args, **kwargs)
return super()._stream(*args, **kwargs)
async def _astream(
self, *args: Any, stream_usage: Optional[bool] = None, **kwargs: Any
) -> AsyncIterator[ChatGenerationChunk]:
"""Set default stream_options."""
stream_usage = self._should_stream_usage(stream_usage, **kwargs)
if stream_usage:
kwargs["stream_options"] = {"include_usage": stream_usage}
if self._use_responses_api(kwargs):
async for chunk in super()._astream_responses(*args, **kwargs):
yield chunk
else:
stream_usage = self._should_stream_usage(stream_usage, **kwargs)
if stream_usage:
kwargs["stream_options"] = {"include_usage": stream_usage}
async for chunk in super()._astream(*args, **kwargs):
yield chunk
async for chunk in super()._astream(*args, **kwargs):
yield chunk
def with_structured_output(
self,
@@ -2617,3 +2779,355 @@ def _create_usage_metadata(oai_token_usage: dict) -> UsageMetadata:
**{k: v for k, v in output_token_details.items() if v is not None}
),
)
def _create_usage_metadata_responses(oai_token_usage: dict) -> UsageMetadata:
input_tokens = oai_token_usage.get("input_tokens", 0)
output_tokens = oai_token_usage.get("output_tokens", 0)
total_tokens = oai_token_usage.get("total_tokens", input_tokens + output_tokens)
output_token_details: dict = {
"audio": (oai_token_usage.get("completion_tokens_details") or {}).get(
"audio_tokens"
),
"reasoning": (oai_token_usage.get("output_token_details") or {}).get(
"reasoning_tokens"
),
}
return UsageMetadata(
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=total_tokens,
output_token_details=OutputTokenDetails(
**{k: v for k, v in output_token_details.items() if v is not None}
),
)
def _is_builtin_tool(tool: dict) -> bool:
return "type" in tool and tool["type"] != "function"
def _use_responses_api(payload: dict) -> bool:
uses_builtin_tools = "tools" in payload and any(
_is_builtin_tool(tool) for tool in payload["tools"]
)
responses_only_args = {"previous_response_id", "text", "truncation", "include"}
return bool(uses_builtin_tools or responses_only_args.intersection(payload))
def _construct_responses_api_payload(
messages: Sequence[BaseMessage], payload: dict
) -> dict:
payload["input"] = _construct_responses_api_input(messages)
if tools := payload.pop("tools", None):
new_tools: list = []
for tool in tools:
# chat api: {"type": "function", "function": {"name": "...", "description": "...", "parameters": {...}, "strict": ...}} # noqa: E501
# responses api: {"type": "function", "name": "...", "description": "...", "parameters": {...}, "strict": ...} # noqa: E501
if tool["type"] == "function" and "function" in tool:
new_tools.append({"type": "function", **tool["function"]})
else:
new_tools.append(tool)
payload["tools"] = new_tools
if tool_choice := payload.pop("tool_choice", None):
# chat api: {"type": "function", "function": {"name": "..."}}
# responses api: {"type": "function", "name": "..."}
if tool_choice["type"] == "function" and "function" in tool_choice:
payload["tool_choice"] = {"type": "function", **tool_choice["function"]}
else:
payload["tool_choice"] = tool_choice
if response_format := payload.pop("response_format", None):
if payload.get("text"):
text = payload["text"]
raise ValueError(
"Can specify at most one of 'response_format' or 'text', received both:"
f"\n{response_format=}\n{text=}"
)
# chat api: {"type": "json_schema, "json_schema": {"schema": {...}, "name": "...", "description": "...", "strict": ...}} # noqa: E501
# responses api: {"type": "json_schema, "schema": {...}, "name": "...", "description": "...", "strict": ...} # noqa: E501
if response_format["type"] == "json_schema":
payload["text"] = {"type": "json_schema", **response_format["json_schema"]}
else:
payload["text"] = response_format
return payload
def _construct_responses_api_input(messages: Sequence[BaseMessage]) -> list:
input_ = []
for lc_msg in messages:
msg = _convert_message_to_dict(lc_msg)
if msg["role"] == "tool":
tool_output = msg["content"]
if not isinstance(tool_output, str):
tool_output = _stringify(tool_output)
function_call_output = {
"type": "function_call_output",
"output": tool_output,
"call_id": msg["tool_call_id"],
}
input_.append(function_call_output)
elif msg["role"] == "assistant":
function_calls = []
if tool_calls := msg.pop("tool_calls", None):
# TODO: should you be able to preserve the function call object id on
# the langchain tool calls themselves?
if not lc_msg.additional_kwargs.get(_FUNCTION_CALL_IDS_MAP_KEY):
raise ValueError("")
function_call_ids = lc_msg.additional_kwargs[_FUNCTION_CALL_IDS_MAP_KEY]
for tool_call in tool_calls:
function_call = {
"type": "function_call",
"name": tool_call["function"]["name"],
"arguments": tool_call["function"]["arguments"],
"call_id": tool_call["id"],
"id": function_call_ids[tool_call["id"]],
}
function_calls.append(function_call)
msg["content"] = msg.get("content") or []
if lc_msg.additional_kwargs.get("refusal"):
if isinstance(msg["content"], str):
msg["content"] = [
{
"type": "output_text",
"text": msg["content"],
"annotations": [],
}
]
msg["content"] = msg["content"] + [
{"type": "refusal", "refusal": lc_msg.additional_kwargs["refusal"]}
]
if isinstance(msg["content"], list):
new_blocks = []
for block in msg["content"]:
# chat api: {"type": "text", "text": "..."}
# responses api: {"type": "output_text", "text": "...", "annotations": [...]} # noqa: E501
if block["type"] == "text":
new_blocks.append(
{
"type": "output_text",
"text": block["text"],
"annotations": block.get("annotations") or [],
}
)
elif block["type"] in ("output_text", "refusal"):
new_blocks.append(block)
else:
pass
msg["content"] = new_blocks
if msg["content"]:
input_.append(msg)
input_.extend(function_calls)
elif msg["role"] == "user":
if isinstance(msg["content"], list):
new_blocks = []
for block in msg["content"]:
# chat api: {"type": "text", "text": "..."}
# responses api: {"type": "input_text", "text": "..."}
if block["type"] == "text":
new_blocks.append({"type": "input_text", "text": block["text"]})
# chat api: {"type": "image_url", "image_url": {"url": "...", "detail": "..."}} # noqa: E501
# responses api: {"type": "image_url", "image_url": "...", "detail": "...", "file_id": "..."} # noqa: E501
elif block["type"] == "image_url":
new_block = {
"type": "input_image",
"image_url": block["image_url"]["url"],
}
if block["image_url"].get("detail"):
new_block["detail"] = block["image_url"]["detail"]
new_blocks.append(new_block)
elif block["type"] in ("input_text", "input_image", "input_file"):
new_blocks.append(block)
else:
pass
msg["content"] = new_blocks
input_.append(msg)
else:
input_.append(msg)
return input_
def _construct_lc_result_from_responses_api(response: Response) -> ChatResult:
"""Construct ChatResponse from OpenAI Response API response."""
if response.error:
raise ValueError(response.error)
response_metadata = {
k: v
for k, v in response.model_dump(exclude_none=True, mode="json").items()
if k
in (
"created_at",
"id",
"incomplete_details",
"metadata",
"object",
"status",
"user",
"model",
)
}
# for compatibility with chat completion calls.
response_metadata["model_name"] = response_metadata.get("model")
if response.usage:
usage_metadata = _create_usage_metadata_responses(response.usage.model_dump())
else:
usage_metadata = None
content_blocks: list = []
tool_calls = []
invalid_tool_calls = []
additional_kwargs: dict = {}
msg_id = None
for output in response.output:
if output.type == "message":
for content in output.content:
if content.type == "output_text":
block = {
"type": "text",
"text": content.text,
"annotations": [
annotation.model_dump()
for annotation in content.annotations
],
}
content_blocks.append(block)
if content.type == "refusal":
additional_kwargs["refusal"] = content.refusal
msg_id = output.id
elif output.type == "function_call":
try:
args = json.loads(output.arguments, strict=False)
error = None
except JSONDecodeError as e:
args = output.arguments
error = str(e)
if error is None:
tool_call = {
"type": "tool_call",
"name": output.name,
"args": args,
"id": output.call_id,
}
tool_calls.append(tool_call)
else:
tool_call = {
"type": "invalid_tool_call",
"name": output.name,
"args": args,
"id": output.call_id,
"error": error,
}
invalid_tool_calls.append(tool_call)
if _FUNCTION_CALL_IDS_MAP_KEY not in additional_kwargs:
additional_kwargs[_FUNCTION_CALL_IDS_MAP_KEY] = {}
additional_kwargs[_FUNCTION_CALL_IDS_MAP_KEY][output.call_id] = output.id
elif output.type == "reasoning":
additional_kwargs["reasoning"] = output.model_dump(
exclude_none=True, mode="json"
)
else:
tool_output = output.model_dump(exclude_none=True, mode="json")
if "tool_outputs" in additional_kwargs:
additional_kwargs["tool_outputs"].append(tool_output)
else:
additional_kwargs["tool_outputs"] = [tool_output]
message = AIMessage(
content=content_blocks,
id=msg_id,
usage_metadata=usage_metadata,
response_metadata=response_metadata,
additional_kwargs=additional_kwargs,
tool_calls=tool_calls,
invalid_tool_calls=invalid_tool_calls,
)
return ChatResult(generations=[ChatGeneration(message=message)])
def _convert_responses_chunk_to_generation_chunk(
chunk: Any,
) -> Optional[ChatGenerationChunk]:
content = []
tool_call_chunks: list = []
additional_kwargs: dict = {}
response_metadata = {}
usage_metadata = None
id = None
if chunk.type == "response.output_text.delta":
content.append(
{"type": "text", "text": chunk.delta, "index": chunk.content_index}
)
elif chunk.type == "response.output_text.annotation.added":
content.append(
{
"annotations": [
chunk.annotation.model_dump(exclude_none=True, mode="json")
],
"index": chunk.content_index,
}
)
elif chunk.type == "response.created":
response_metadata["id"] = chunk.response.id
elif chunk.type == "response.completed":
msg = cast(
AIMessage,
(
_construct_lc_result_from_responses_api(chunk.response)
.generations[0]
.message
),
)
usage_metadata = msg.usage_metadata
response_metadata = {
k: v for k, v in msg.response_metadata.items() if k != "id"
}
elif chunk.type == "response.output_item.added" and chunk.item.type == "message":
id = chunk.item.id
elif (
chunk.type == "response.output_item.added"
and chunk.item.type == "function_call"
):
tool_call_chunks.append(
{
"type": "tool_call_chunk",
"name": chunk.item.name,
"args": chunk.item.arguments,
"id": chunk.item.call_id,
"index": chunk.output_index,
}
)
additional_kwargs[_FUNCTION_CALL_IDS_MAP_KEY] = {
chunk.item.call_id: chunk.item.id
}
elif chunk.type == "response.output_item.done" and chunk.item.type in (
"web_search_call",
"file_search_call",
):
additional_kwargs["tool_outputs"] = [
chunk.item.model_dump(exclude_none=True, mode="json")
]
elif chunk.type == "response.function_call_arguments.delta":
tool_call_chunks.append(
{
"type": "tool_call_chunk",
"args": chunk.delta,
"index": chunk.output_index,
}
)
elif chunk.type == "response.refusal.done":
additional_kwargs["refusal"] = chunk.refusal
else:
return None
return ChatGenerationChunk(
message=AIMessageChunk(
content=content, # type: ignore[arg-type]
tool_call_chunks=tool_call_chunks,
usage_metadata=usage_metadata,
response_metadata=response_metadata,
additional_kwargs=additional_kwargs,
id=id,
)
)