Support of openai reasoning summary streaming (#30909)

**langchain_openai: Support of reasoning summary streaming**

**Description:**
OpenAI API now supports streaming reasoning summaries for reasoning
models (o1, o3, o3-mini, o4-mini). More info about it:
https://platform.openai.com/docs/guides/reasoning#reasoning-summaries

It is supported only in Responses API (not Completion API), so you need
to create LangChain Open AI model as follows to support reasoning
summaries streaming:

```
llm = ChatOpenAI(
    model="o4-mini", # also o1, o3, o3-mini support reasoning streaming
    use_responses_api=True,  # reasoning streaming works only with responses api, not completion api
    model_kwargs={
        "reasoning": {
            "effort": "high",  # also "low" and "medium" supported
            "summary": "auto"  # some models support "concise" summary, some "detailed", but auto will always work
        }
    }
)
```

Now, if you stream events from llm:

```
async for event in llm.astream_events(prompt, version="v2"):
    print(event)
```

or

```
for chunk in llm.stream(prompt):
    print (chunk)
```

OpenAI API will send you new types of events:
`response.reasoning_summary_text.added`
`response.reasoning_summary_text.delta`
`response.reasoning_summary_text.done`

These events are new, so they were ignored. So I have added support of
these events in function `_convert_responses_chunk_to_generation_chunk`,
so reasoning chunks or full reasoning added to the chunk
additional_kwargs.

Example of how this reasoning summary may be printed:

```
    async for event in llm.astream_events(prompt, version="v2"):
        if event["event"] == "on_chat_model_stream":
            chunk: AIMessageChunk = event["data"]["chunk"]
            if "reasoning_summary_chunk" in chunk.additional_kwargs:
                print(chunk.additional_kwargs["reasoning_summary_chunk"], end="")
            elif "reasoning_summary" in chunk.additional_kwargs:
                print("\n\nFull reasoning step summary:", chunk.additional_kwargs["reasoning_summary"])
            elif chunk.content and chunk.content[0]["type"] == "text":
                print(chunk.content[0]["text"], end="")
```

or

```
    for chunk in llm.stream(prompt):
        if "reasoning_summary_chunk" in chunk.additional_kwargs:
            print(chunk.additional_kwargs["reasoning_summary_chunk"], end="")
        elif "reasoning_summary" in chunk.additional_kwargs:
            print("\n\nFull reasoning step summary:", chunk.additional_kwargs["reasoning_summary"])
        elif chunk.content and chunk.content[0]["type"] == "text":
            print(chunk.content[0]["text"], end="")
```

---------

Co-authored-by: Chester Curme <chester.curme@gmail.com>
This commit is contained in:
Dmitrii Rashchenko 2025-04-22 16:51:13 +02:00 committed by GitHub
parent 0f6fa34372
commit a43df006de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 70 additions and 5 deletions

View File

@ -3091,6 +3091,23 @@ def _make_computer_call_output_from_message(message: ToolMessage) -> dict:
return computer_call_output
def _pop_summary_index_from_reasoning(reasoning: dict) -> dict:
"""When streaming, langchain-core uses the ``index`` key to aggregate reasoning
text blocks. OpenAI API does not support this key, so we need to remove it.
N.B. OpenAI also does not appear to support the ``summary_inex`` key when passed
back in.
"""
new_reasoning = reasoning.copy()
if "summary" in reasoning and isinstance(reasoning["summary"], list):
new_summary = []
for block in reasoning["summary"]:
new_block = {k: v for k, v in block.items() if k != "index"}
new_summary.append(new_block)
new_reasoning["summary"] = new_summary
return new_reasoning
def _construct_responses_api_input(messages: Sequence[BaseMessage]) -> list:
input_ = []
for lc_msg in messages:
@ -3118,7 +3135,7 @@ def _construct_responses_api_input(messages: Sequence[BaseMessage]) -> list:
# Reasoning items
reasoning_items = []
if reasoning := lc_msg.additional_kwargs.get("reasoning"):
reasoning_items.append(reasoning)
reasoning_items.append(_pop_summary_index_from_reasoning(reasoning))
# Function calls
function_calls = []
if tool_calls := msg.pop("tool_calls", None):
@ -3178,9 +3195,12 @@ def _construct_responses_api_input(messages: Sequence[BaseMessage]) -> list:
msg["content"] = new_blocks
if msg["content"]:
input_.append(msg)
input_.extend(reasoning_items)
input_.extend(function_calls)
input_.extend(computer_calls)
if computer_calls:
# Hack: we only add reasoning items if computer calls are present. See:
# https://community.openai.com/t/how-to-solve-badrequesterror-400-item-rs-of-type-reasoning-was-provided-without-its-required-following-item-error-in-responses-api/1151686/5
input_.extend(reasoning_items)
input_.extend(computer_calls)
elif msg["role"] == "user":
if isinstance(msg["content"], list):
new_blocks = []
@ -3390,8 +3410,6 @@ def _convert_responses_chunk_to_generation_chunk(
)
if parsed := msg.additional_kwargs.get("parsed"):
additional_kwargs["parsed"] = parsed
if reasoning := msg.additional_kwargs.get("reasoning"):
additional_kwargs["reasoning"] = reasoning
usage_metadata = msg.usage_metadata
response_metadata = {
k: v for k, v in msg.response_metadata.items() if k != "id"
@ -3432,6 +3450,25 @@ def _convert_responses_chunk_to_generation_chunk(
)
elif chunk.type == "response.refusal.done":
additional_kwargs["refusal"] = chunk.refusal
elif chunk.type == "response.reasoning_summary_part.added":
additional_kwargs["reasoning"] = {
"type": "reasoning",
"id": chunk.item_id,
# langchain-core uses the `index` key to aggregate text blocks.
"summary": [
{"index": chunk.summary_index, "type": "summary_text", "text": ""}
],
}
elif chunk.type == "response.reasoning_summary_text.delta":
additional_kwargs["reasoning"] = {
"summary": [
{
"index": chunk.summary_index,
"type": "summary_text",
"text": chunk.delta,
}
]
}
else:
return None

View File

@ -348,3 +348,31 @@ def test_file_search() -> None:
full = chunk if full is None else full + chunk
assert isinstance(full, AIMessageChunk)
_check_response(full)
def test_stream_reasoning_summary() -> None:
reasoning = {"effort": "medium", "summary": "auto"}
llm = ChatOpenAI(
model="o4-mini", use_responses_api=True, model_kwargs={"reasoning": reasoning}
)
message_1 = {"role": "user", "content": "What is 3^3?"}
response_1: Optional[BaseMessageChunk] = None
for chunk in llm.stream([message_1]):
assert isinstance(chunk, AIMessageChunk)
response_1 = chunk if response_1 is None else response_1 + chunk
assert isinstance(response_1, AIMessageChunk)
reasoning = response_1.additional_kwargs["reasoning"]
assert set(reasoning.keys()) == {"id", "type", "summary"}
summary = reasoning["summary"]
assert isinstance(summary, list)
for block in summary:
assert isinstance(block, dict)
assert isinstance(block["type"], str)
assert isinstance(block["text"], str)
assert block["text"]
# Check we can pass back summaries
message_2 = {"role": "user", "content": "Thank you."}
response_2 = llm.invoke([message_1, response_1, message_2])
assert isinstance(response_2, AIMessage)