mirror of
https://github.com/hwchase17/langchain.git
synced 2026-05-03 09:56:00 +00:00
fix(openrouter): merge fragmented reasoning_details in streaming (#36401)
## Description Fixes #36400 During streaming, `AIMessageChunk.__add__` list-concatenates `reasoning_details` in `additional_kwargs`, fragmenting a single entry into many. When `_convert_message_to_dict()` serializes conversation history back to the OpenRouter API for the next turn, these fragmented entries cause `BadRequestResponseError`. ### Changes - Add `_merge_reasoning_details()` helper that merges consecutive entries sharing the same `type` and `index` (streaming fragments) while preserving distinct entries (legitimate non-streaming data) - Metadata from later fragments (e.g. `signature`) is preserved in the merged result - Entries without `index` are never merged (safe for non-streaming responses) - Call `_merge_reasoning_details()` in `_convert_message_to_dict()` before serializing `reasoning_details` ### Why merge instead of drop? Non-streaming users (`invoke()`) rely on `reasoning_details` for structured metadata (`type`, `signature`, `format`, `index`). Dropping it entirely would be a regression. This approach fixes streaming while preserving non-streaming functionality, similar to `langchain-openai`'s `_implode_reasoning_blocks()`. ## Test plan - [x] Fragmented entries (same type + same index) are merged into one - [x] Distinct entries (different index) are preserved separately - [x] Entries without index are never merged - [x] Metadata from later fragments (e.g. signature) is preserved - [x] Single-entry lists pass through unchanged - [x] Round-trip (dict → message → dict) works correctly - [x] All 210 unit tests pass --------- Co-authored-by: Mason Daugherty <github@mdrxy.com> Co-authored-by: Mason Daugherty <mason@langchain.dev>
This commit is contained in:
@@ -1166,6 +1166,86 @@ def _format_message_content(content: Any) -> Any:
|
||||
return content
|
||||
|
||||
|
||||
def _merge_reasoning_run(run: list[dict[str, Any]]) -> dict[str, Any]:
|
||||
"""Merge a run of consecutive same-`(type, index)` reasoning fragments."""
|
||||
merged_entry: dict[str, Any] = {}
|
||||
text_parts: list[str] = []
|
||||
has_text = False
|
||||
for frag in run:
|
||||
for k, v in frag.items():
|
||||
if k == "text":
|
||||
has_text = True
|
||||
if v:
|
||||
text_parts.append(v)
|
||||
elif v is not None:
|
||||
merged_entry[k] = v
|
||||
if has_text:
|
||||
merged_entry["text"] = "".join(text_parts)
|
||||
return merged_entry
|
||||
|
||||
|
||||
def _merge_reasoning_details(
|
||||
details: list[dict[str, Any]],
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Merge fragmented `reasoning_details` from streaming chunk concatenation.
|
||||
|
||||
During streaming, `AIMessageChunk.__add__` list-concatenates
|
||||
`reasoning_details` in `additional_kwargs`, fragmenting a single entry
|
||||
into many. When serialized back to the API via
|
||||
`_convert_message_to_dict`, these fragments cause
|
||||
`BadRequestResponseError` on multi-turn conversations (the provider
|
||||
rejects the malformed thinking block with `Invalid signature`).
|
||||
|
||||
Streaming deltas tag each fragment with the `index` of the entry it
|
||||
belongs to in the original (non-streamed) array, so this function groups
|
||||
consecutive entries by `(type, index)` and merges each group into one.
|
||||
Entries without an `index` are preserved as-is, since non-streaming
|
||||
responses can legitimately contain multiple entries.
|
||||
|
||||
Within a merged group, `text` values are concatenated in order. Other
|
||||
metadata fields (e.g. `format`, `signature`) use last-non-`None`-wins
|
||||
semantics, which preserves stable provider metadata without concatenating
|
||||
repeated strings — Anthropic-style reasoning streams emit a single
|
||||
signature-bearing fragment at the end of the block.
|
||||
|
||||
A list with zero or one items passes through unchanged.
|
||||
"""
|
||||
if not isinstance(details, list) or len(details) <= 1:
|
||||
return details
|
||||
|
||||
merged: list[dict[str, Any]] = []
|
||||
i = 0
|
||||
while i < len(details):
|
||||
entry = details[i]
|
||||
# Without an index we cannot distinguish streaming fragments from
|
||||
# distinct non-streaming entries, so leave them alone. Same for any
|
||||
# non-dict items that may have slipped in upstream.
|
||||
if not isinstance(entry, dict) or entry.get("index") is None:
|
||||
merged.append(entry)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
entry_type = entry.get("type", "")
|
||||
entry_index = entry["index"]
|
||||
run = [entry]
|
||||
i += 1
|
||||
while i < len(details):
|
||||
nxt = details[i]
|
||||
if (
|
||||
isinstance(nxt, dict)
|
||||
and nxt.get("type", "") == entry_type
|
||||
and nxt.get("index") == entry_index
|
||||
):
|
||||
run.append(nxt)
|
||||
i += 1
|
||||
else:
|
||||
break
|
||||
|
||||
merged.append(entry if len(run) == 1 else _merge_reasoning_run(run))
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
def _convert_message_to_dict(message: BaseMessage) -> dict[str, Any]: # noqa: C901, PLR0912
|
||||
"""Convert a LangChain message to an OpenRouter-compatible dict payload.
|
||||
|
||||
@@ -1217,14 +1297,15 @@ def _convert_message_to_dict(message: BaseMessage) -> dict[str, Any]: # noqa: C
|
||||
):
|
||||
message_dict["content"] = None
|
||||
# Preserve reasoning content for multi-turn conversations (e.g.
|
||||
# tool-calling loops). OpenRouter stores reasoning in "reasoning" and
|
||||
# optional structured details in "reasoning_details".
|
||||
# tool-calling loops). OpenRouter stores reasoning text in `reasoning`
|
||||
# and structured fragment details in `reasoning_details`; the latter
|
||||
# is merged before serialization to undo streaming fragmentation.
|
||||
if "reasoning_content" in message.additional_kwargs:
|
||||
message_dict["reasoning"] = message.additional_kwargs["reasoning_content"]
|
||||
if "reasoning_details" in message.additional_kwargs:
|
||||
message_dict["reasoning_details"] = message.additional_kwargs[
|
||||
"reasoning_details"
|
||||
]
|
||||
message_dict["reasoning_details"] = _merge_reasoning_details(
|
||||
message.additional_kwargs["reasoning_details"]
|
||||
)
|
||||
elif isinstance(message, SystemMessage):
|
||||
message_dict = {"role": "system", "content": message.content}
|
||||
elif isinstance(message, ToolMessage):
|
||||
|
||||
@@ -3,7 +3,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from langchain_core.messages import AIMessageChunk, BaseMessageChunk
|
||||
from langchain_core.messages import (
|
||||
AIMessage,
|
||||
AIMessageChunk,
|
||||
BaseMessageChunk,
|
||||
HumanMessage,
|
||||
)
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from langchain_openrouter.chat_models import ChatOpenRouter
|
||||
@@ -67,3 +72,44 @@ def test_reasoning_content() -> None:
|
||||
)
|
||||
response = model.invoke("What is 2 + 2?")
|
||||
assert response.content
|
||||
|
||||
|
||||
def test_streaming_reasoning_multi_turn() -> None:
|
||||
"""Multi-turn streaming with reasoning preserves the thinking signature.
|
||||
|
||||
Regression test for #36400. During streaming, `reasoning_details` is
|
||||
fragmented into multiple list entries by `AIMessageChunk.__add__` (because
|
||||
`index` is a float and `langchain_core.utils._merge.merge_lists` only
|
||||
auto-merges int-indexed dicts). When sent back on the next turn, the
|
||||
fragmented entries cause Anthropic via OpenRouter to reject the request
|
||||
with `"Invalid signature in thinking block"`. The fix in
|
||||
`_convert_message_to_dict` merges fragments before serialization.
|
||||
"""
|
||||
model = ChatOpenRouter(
|
||||
model="anthropic/claude-haiku-4.5",
|
||||
reasoning={"effort": "low"},
|
||||
)
|
||||
|
||||
messages: list = [HumanMessage(content="What is 2+2? Think briefly.")]
|
||||
|
||||
full: BaseMessageChunk | None = None
|
||||
for chunk in model.stream(messages):
|
||||
full = chunk if full is None else full + chunk
|
||||
assert isinstance(full, AIMessageChunk)
|
||||
assert full.content
|
||||
assert full.additional_kwargs.get("reasoning_details"), (
|
||||
"expected reasoning_details on the streamed chunk"
|
||||
)
|
||||
|
||||
# Hand-build the AIMessage from the accumulated chunk and continue the
|
||||
# conversation. Pre-fix, this raises a 400 from the provider.
|
||||
assistant_msg = AIMessage(
|
||||
content=full.content,
|
||||
additional_kwargs=full.additional_kwargs,
|
||||
response_metadata=full.response_metadata,
|
||||
)
|
||||
messages.append(assistant_msg)
|
||||
messages.append(HumanMessage(content="Now what is 3+3?"))
|
||||
|
||||
response = model.invoke(messages)
|
||||
assert response.content
|
||||
|
||||
@@ -1193,11 +1193,57 @@ class TestMessageConversion:
|
||||
assert result["content"] == "The answer is 42."
|
||||
assert result["reasoning"] == "Let me think about this..."
|
||||
|
||||
def test_ai_message_with_reasoning_details_to_dict(self) -> None:
|
||||
"""Test that reasoning_details is preserved when converting back to dict."""
|
||||
def test_ai_message_with_fragmented_reasoning_details_merged(self) -> None:
|
||||
"""Fragmented `reasoning_details` are merged before serialization.
|
||||
|
||||
Float `index` values mirror what `ChatOpenRouter.stream()` produces
|
||||
(the OpenRouter SDK coerces `index` via Pydantic). With float
|
||||
`index`, `langchain_core.utils._merge.merge_lists` does not auto-merge
|
||||
list entries (its index-match path requires `int`), so fragments
|
||||
accumulate as separate list items and require this helper to merge
|
||||
them before the next API turn.
|
||||
"""
|
||||
details = [
|
||||
{"type": "reasoning.text", "text": "Step 1: analyze"},
|
||||
{"type": "reasoning.text", "text": "Step 2: solve"},
|
||||
{
|
||||
"type": "reasoning.text",
|
||||
"text": "The",
|
||||
"format": "anthropic-claude-v1",
|
||||
"index": 0.0,
|
||||
},
|
||||
{
|
||||
"type": "reasoning.text",
|
||||
"text": " user wants",
|
||||
"format": "anthropic-claude-v1",
|
||||
"index": 0.0,
|
||||
},
|
||||
{
|
||||
"type": "reasoning.text",
|
||||
"signature": "sig_abc123",
|
||||
"format": "anthropic-claude-v1",
|
||||
"index": 0.0,
|
||||
},
|
||||
]
|
||||
msg = AIMessage(
|
||||
content="Answer",
|
||||
additional_kwargs={"reasoning_details": details},
|
||||
)
|
||||
result = _convert_message_to_dict(msg)
|
||||
assert result["reasoning_details"] == [
|
||||
{
|
||||
"type": "reasoning.text",
|
||||
"text": "The user wants",
|
||||
"format": "anthropic-claude-v1",
|
||||
"signature": "sig_abc123",
|
||||
"index": 0.0,
|
||||
}
|
||||
]
|
||||
assert "reasoning" not in result
|
||||
|
||||
def test_ai_message_distinct_reasoning_details_preserved(self) -> None:
|
||||
"""Distinct entries (different `index`) are not merged."""
|
||||
details = [
|
||||
{"type": "reasoning.text", "text": "First thought", "index": 0},
|
||||
{"type": "reasoning.text", "text": "Second thought", "index": 1},
|
||||
]
|
||||
msg = AIMessage(
|
||||
content="Answer",
|
||||
@@ -1205,7 +1251,138 @@ class TestMessageConversion:
|
||||
)
|
||||
result = _convert_message_to_dict(msg)
|
||||
assert result["reasoning_details"] == details
|
||||
assert "reasoning" not in result
|
||||
|
||||
def test_ai_message_unindexed_reasoning_details_not_merged(self) -> None:
|
||||
"""Entries without an `index` are passed through unchanged."""
|
||||
details = [
|
||||
{"type": "reasoning.text", "text": "First"},
|
||||
{"type": "reasoning.text", "text": "Second"},
|
||||
]
|
||||
msg = AIMessage(
|
||||
content="Answer",
|
||||
additional_kwargs={"reasoning_details": details},
|
||||
)
|
||||
result = _convert_message_to_dict(msg)
|
||||
assert result["reasoning_details"] == details
|
||||
|
||||
def test_ai_message_interleaved_index_fragments_preserved(self) -> None:
|
||||
"""Only consecutive same-`index` runs merge; interleaved runs stay split."""
|
||||
details = [
|
||||
{"type": "reasoning.text", "text": "A", "index": 0},
|
||||
{"type": "reasoning.text", "text": "B", "index": 1},
|
||||
{"type": "reasoning.text", "text": "C", "index": 0},
|
||||
{"type": "reasoning.text", "text": "D", "index": 1},
|
||||
]
|
||||
msg = AIMessage(
|
||||
content="Answer",
|
||||
additional_kwargs={"reasoning_details": details},
|
||||
)
|
||||
result = _convert_message_to_dict(msg)
|
||||
assert result["reasoning_details"] == details
|
||||
|
||||
def test_ai_message_fragment_metadata_preserved(self) -> None:
|
||||
"""Test that metadata from later fragments is preserved after merge."""
|
||||
details = [
|
||||
{"type": "reasoning.text", "text": "thinking...", "index": 0},
|
||||
{
|
||||
"type": "reasoning.text",
|
||||
"text": " done",
|
||||
"index": 0,
|
||||
"signature": "sig_abc123",
|
||||
},
|
||||
]
|
||||
msg = AIMessage(
|
||||
content="Answer",
|
||||
additional_kwargs={"reasoning_details": details},
|
||||
)
|
||||
result = _convert_message_to_dict(msg)
|
||||
assert len(result["reasoning_details"]) == 1
|
||||
assert result["reasoning_details"][0]["text"] == "thinking... done"
|
||||
assert result["reasoning_details"][0]["signature"] == "sig_abc123"
|
||||
|
||||
def test_streamed_reasoning_details_roundtrip_to_next_turn_payload(self) -> None:
|
||||
"""Test the chunk-merge-to-next-turn serialization path from issue #36400."""
|
||||
chunk_dicts = [
|
||||
{"choices": [{"delta": {"role": "assistant", "content": ""}, "index": 0}]},
|
||||
{
|
||||
"choices": [
|
||||
{
|
||||
"delta": {
|
||||
"reasoning_details": [
|
||||
{
|
||||
"type": "reasoning.text",
|
||||
"text": "The",
|
||||
"format": "anthropic-claude-v1",
|
||||
"index": 0.0,
|
||||
}
|
||||
]
|
||||
},
|
||||
"index": 0,
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"choices": [
|
||||
{
|
||||
"delta": {
|
||||
"reasoning_details": [
|
||||
{
|
||||
"type": "reasoning.text",
|
||||
"text": " user wants",
|
||||
"format": "anthropic-claude-v1",
|
||||
"index": 0.0,
|
||||
}
|
||||
]
|
||||
},
|
||||
"index": 0,
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"choices": [
|
||||
{
|
||||
"delta": {
|
||||
"reasoning_details": [
|
||||
{
|
||||
"type": "reasoning.text",
|
||||
"signature": "sig_abc123",
|
||||
"format": "anthropic-claude-v1",
|
||||
"index": 0.0,
|
||||
}
|
||||
]
|
||||
},
|
||||
"index": 0,
|
||||
}
|
||||
]
|
||||
},
|
||||
{"choices": [{"delta": {"content": "Answer"}, "index": 0}]},
|
||||
]
|
||||
chunks = [
|
||||
_convert_chunk_to_message_chunk(chunk, AIMessageChunk)
|
||||
for chunk in chunk_dicts
|
||||
]
|
||||
merged_chunk = chunks[0]
|
||||
for chunk in chunks[1:]:
|
||||
merged_chunk = merged_chunk + chunk
|
||||
|
||||
assert len(merged_chunk.additional_kwargs["reasoning_details"]) == 3
|
||||
|
||||
msg = AIMessage(
|
||||
content=merged_chunk.content,
|
||||
additional_kwargs=merged_chunk.additional_kwargs,
|
||||
response_metadata=merged_chunk.response_metadata,
|
||||
)
|
||||
|
||||
result = _convert_message_to_dict(msg)
|
||||
assert result["reasoning_details"] == [
|
||||
{
|
||||
"type": "reasoning.text",
|
||||
"text": "The user wants",
|
||||
"format": "anthropic-claude-v1",
|
||||
"signature": "sig_abc123",
|
||||
"index": 0.0,
|
||||
}
|
||||
]
|
||||
|
||||
def test_ai_message_with_both_reasoning_fields_to_dict(self) -> None:
|
||||
"""Test that both reasoning_content and reasoning_details are preserved."""
|
||||
|
||||
Reference in New Issue
Block a user