From bfc65cc04f7ee712fd60fb5ccb3d1f7343710448 Mon Sep 17 00:00:00 2001 From: Christian Bromann Date: Fri, 26 Jun 2026 14:17:00 -0700 Subject: [PATCH] fix(anthropic): keep initial text on `content_block_start` (#38442) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary - Fix `ChatAnthropic._make_message_chunk_from_anthropic_event` dropping the first text chunk of an assistant turn when Anthropic carries the opening text on the `content_block_start` event rather than a following `text_delta`. This most often hits the assistant turn right after a tool result. - The dropped content streams to clients but never reaches the aggregated `AIMessage`, so anything reading message history back (e.g. a checkpointer) sees a truncated message (`Here's the answer.` → `'s the answer.`). Reported via Pylon 25478 (Zip), whose `` parser broke because the dropped chunk was the opening ` Co-authored-by: Mason Daugherty --- .../langchain_anthropic/chat_models.py | 33 +++ .../integration_tests/test_chat_models.py | 1 + .../tests/unit_tests/test_chat_models.py | 202 +++++++++++++++++- 3 files changed, 235 insertions(+), 1 deletion(-) diff --git a/libs/partners/anthropic/langchain_anthropic/chat_models.py b/libs/partners/anthropic/langchain_anthropic/chat_models.py index c11143f2ac9..9def0e668ef 100644 --- a/libs/partners/anthropic/langchain_anthropic/chat_models.py +++ b/libs/partners/anthropic/langchain_anthropic/chat_models.py @@ -1599,6 +1599,39 @@ class ChatAnthropic(BaseChatModel): ) block_start_event = event + elif ( + event.type == "content_block_start" + and event.content_block is not None + and event.content_block.type in ("text", "thinking") + ): + # Anthropic can place the opening content of a text or thinking block + # directly on the `content_block_start` event instead of in a + # following delta. This is common for the assistant turn that follows + # a tool result. Emit that initial content here so it is not dropped + # from the aggregated message. The deltas that follow are emitted as + # separate chunks sharing this block's `index`; chunk addition + # (`AIMessageChunk.__add__`) later coalesces them into one block. + block_start_event = event + if event.content_block.type == "text": + text = getattr(event.content_block, "text", "") or "" + if text: + if coerce_content_to_string: + message_chunk = AIMessageChunk(content=text) + else: + content_block = event.content_block.model_dump() + content_block["index"] = event.index + if content_block.get("citations") is None: + content_block.pop("citations", None) + message_chunk = AIMessageChunk(content=[content_block]) + else: # thinking + thinking = getattr(event.content_block, "thinking", "") or "" + signature = getattr(event.content_block, "signature", "") or "" + if thinking or signature: + content_block = event.content_block.model_dump() + content_block["index"] = event.index + content_block["type"] = "thinking" + message_chunk = AIMessageChunk(content=[content_block]) + # Process incremental content updates elif event.type == "content_block_delta": # Text and citation deltas (incremental text content) diff --git a/libs/partners/anthropic/tests/integration_tests/test_chat_models.py b/libs/partners/anthropic/tests/integration_tests/test_chat_models.py index 3c63ddd5d15..79b5ad2de8c 100644 --- a/libs/partners/anthropic/tests/integration_tests/test_chat_models.py +++ b/libs/partners/anthropic/tests/integration_tests/test_chat_models.py @@ -1263,6 +1263,7 @@ def test_structured_output_thinking_enabled() -> None: assert isinstance(chunk, GenerateUsername) +@pytest.mark.retry(count=3, delay=1) def test_structured_output_thinking_force_tool_use() -> None: # Structured output currently relies on forced tool use, which is not supported # when `thinking` is enabled. When this test fails, it means that the feature diff --git a/libs/partners/anthropic/tests/unit_tests/test_chat_models.py b/libs/partners/anthropic/tests/unit_tests/test_chat_models.py index fd24730ade4..f08adc528c8 100644 --- a/libs/partners/anthropic/tests/unit_tests/test_chat_models.py +++ b/libs/partners/anthropic/tests/unit_tests/test_chat_models.py @@ -14,7 +14,13 @@ import pytest from anthropic.types import Message, TextBlock, Usage from blockbuster import blockbuster_ctx from langchain_core.exceptions import ContextOverflowError -from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage +from langchain_core.messages import ( + AIMessage, + AIMessageChunk, + HumanMessage, + SystemMessage, + ToolMessage, +) from langchain_core.runnables import RunnableBinding from langchain_core.tools import BaseTool, tool from langchain_core.tracers.base import BaseTracer @@ -2212,6 +2218,200 @@ def test_streaming_cache_token_reporting() -> None: assert delta_chunk.usage_metadata["total_tokens"] == 185 +def _aggregate_anthropic_events( + llm: ChatAnthropic, + events: list[Any], + *, + coerce_content_to_string: bool, +) -> AIMessageChunk | None: + """Drive the handler over `events` and sum chunks like `_stream` does.""" + block_start_event = None + aggregate: AIMessageChunk | None = None + for event in events: + chunk, block_start_event = llm._make_message_chunk_from_anthropic_event( + event, + stream_usage=True, + coerce_content_to_string=coerce_content_to_string, + block_start_event=block_start_event, + ) + if chunk is not None: + aggregate = chunk if aggregate is None else aggregate + chunk + return aggregate + + +def test_text_content_block_start_carries_initial_text() -> None: + """Regression test: text on `content_block_start` must not be dropped. + + Anthropic sometimes places the opening text of a text block directly on the + `content_block_start` event (rather than in a following `text_delta`), + most often on the assistant turn that follows a tool result. The handler + previously only built a chunk on `content_block_start` for tool / document + / redacted_thinking blocks, so the leading text was silently dropped from the + aggregated message that gets persisted. + """ + from anthropic.types import ( + RawContentBlockDeltaEvent, + RawContentBlockStartEvent, + RawContentBlockStopEvent, + RawMessageStartEvent, + TextDelta, + ) + + msg = Message( + id="msg_repro", + content=[], + model=MODEL_NAME, + role="assistant", + stop_reason=None, + stop_sequence=None, + usage=Usage(input_tokens=10, output_tokens=0), + type="message", + ) + events = [ + RawMessageStartEvent(message=msg, type="message_start"), + # The first text rides the START event; the rest arrives as a delta. + RawContentBlockStartEvent( + content_block=TextBlock(text="Here", type="text"), + index=0, + type="content_block_start", + ), + RawContentBlockDeltaEvent( + delta=TextDelta(text="'s the answer.", type="text_delta"), + index=0, + type="content_block_delta", + ), + RawContentBlockStopEvent(index=0, type="content_block_stop"), + ] + + llm = ChatAnthropic(model=MODEL_NAME) # type: ignore[call-arg] + + # String content path (no tools / thinking). + aggregate = _aggregate_anthropic_events(llm, events, coerce_content_to_string=True) + assert aggregate is not None + assert aggregate.text == "Here's the answer." + + # Structured content path (e.g. tools / thinking enabled elsewhere). + aggregate = _aggregate_anthropic_events(llm, events, coerce_content_to_string=False) + assert aggregate is not None + assert aggregate.text == "Here's the answer." + # The start-event block and the following delta must coalesce into a single + # text block that carries the block `index` and no spurious `citations`. + assert isinstance(aggregate.content, list) + text_blocks = [ + block + for block in aggregate.content + if isinstance(block, dict) and block.get("type") == "text" + ] + assert len(text_blocks) == 1 + assert text_blocks[0]["index"] == 0 + assert "citations" not in text_blocks[0] + + +def test_empty_text_content_block_start_emits_no_chunk() -> None: + """An empty `TextBlock` start must not change behavior (no spurious chunk).""" + from anthropic.types import RawContentBlockStartEvent + + llm = ChatAnthropic(model=MODEL_NAME) # type: ignore[call-arg] + chunk, block_start_event = llm._make_message_chunk_from_anthropic_event( + RawContentBlockStartEvent( + content_block=TextBlock(text="", type="text"), + index=0, + type="content_block_start", + ), + stream_usage=True, + coerce_content_to_string=True, + block_start_event=None, + ) + assert chunk is None + # The block is still tracked so subsequent deltas resolve against it. + assert block_start_event is not None + + +def test_thinking_content_block_start_carries_initial_thinking() -> None: + """Regression test: thinking/signature on `content_block_start` must survive. + + As with text blocks, Anthropic can place a thinking block's opening + `thinking` content (and its `signature`) directly on the + `content_block_start` event rather than in a following `thinking_delta` / + `signature_delta`. The handler previously only built a chunk on + `content_block_start` for tool / document / redacted_thinking blocks, so the + leading thinking content was silently dropped from the aggregated message. + """ + from anthropic.types import ( + RawContentBlockDeltaEvent, + RawContentBlockStartEvent, + RawContentBlockStopEvent, + RawMessageStartEvent, + ThinkingBlock, + ThinkingDelta, + ) + + msg = Message( + id="msg_repro", + content=[], + model=MODEL_NAME, + role="assistant", + stop_reason=None, + stop_sequence=None, + usage=Usage(input_tokens=10, output_tokens=0), + type="message", + ) + events = [ + RawMessageStartEvent(message=msg, type="message_start"), + # The opening thinking + signature ride the START event. + RawContentBlockStartEvent( + content_block=ThinkingBlock( + thinking="Let me ", signature="sig123", type="thinking" + ), + index=0, + type="content_block_start", + ), + RawContentBlockDeltaEvent( + delta=ThinkingDelta(thinking="think about it.", type="thinking_delta"), + index=0, + type="content_block_delta", + ), + RawContentBlockStopEvent(index=0, type="content_block_stop"), + ] + + llm = ChatAnthropic(model=MODEL_NAME) # type: ignore[call-arg] + + # Thinking is only represented in the structured-content path. + aggregate = _aggregate_anthropic_events(llm, events, coerce_content_to_string=False) + assert aggregate is not None + assert isinstance(aggregate.content, list) + thinking_blocks = [ + block + for block in aggregate.content + if isinstance(block, dict) and block.get("type") == "thinking" + ] + assert len(thinking_blocks) == 1 + # The leading thinking from the start event merges with the following delta, + # and the signature carried on the start event is preserved. + assert thinking_blocks[0]["thinking"] == "Let me think about it." + assert thinking_blocks[0]["signature"] == "sig123" + + +def test_empty_thinking_content_block_start_emits_no_chunk() -> None: + """An empty `ThinkingBlock` start must not emit a spurious chunk.""" + from anthropic.types import RawContentBlockStartEvent, ThinkingBlock + + llm = ChatAnthropic(model=MODEL_NAME) # type: ignore[call-arg] + chunk, block_start_event = llm._make_message_chunk_from_anthropic_event( + RawContentBlockStartEvent( + content_block=ThinkingBlock(thinking="", signature="", type="thinking"), + index=0, + type="content_block_start", + ), + stream_usage=True, + coerce_content_to_string=False, + block_start_event=None, + ) + assert chunk is None + # The block is still tracked so subsequent deltas resolve against it. + assert block_start_event is not None + + def test_strict_tool_use() -> None: model = ChatAnthropic( model=MODEL_NAME, # type: ignore[call-arg]