fix(anthropic): keep initial text on content_block_start (#38442)

## Summary
- Fix `ChatAnthropic._make_message_chunk_from_anthropic_event` dropping
the first text chunk of an assistant turn when Anthropic carries the
opening text on the `content_block_start` event rather than a following
`text_delta`. This most often hits the assistant turn right after a tool
result.
- The dropped content streams to clients but never reaches the
aggregated `AIMessage`, so anything reading message history back (e.g. a
checkpointer) sees a truncated message (`Here's the answer.` → `'s the
answer.`). Reported via Pylon 25478 (Zip), whose `<canvaspreview>`
parser broke because the dropped chunk was the opening `<can` tag.
- Add a `content_block_start` branch for `text` and `thinking` blocks:
emit non-empty start-event content on both the string
(`coerce_content_to_string=True`) and structured content paths; empty
starts still emit no chunk (preserving prior behavior) and update
`block_start_event` so following deltas resolve against the current
block.

---------

Co-authored-by: Mason Daugherty <mason@langchain.dev>
Co-authored-by: Mason Daugherty <github@mdrxy.com>
This commit is contained in:
Christian Bromann
2026-06-26 14:17:00 -07:00
committed by GitHub
parent 00ad96ce85
commit bfc65cc04f
3 changed files with 235 additions and 1 deletions

View File

@@ -1599,6 +1599,39 @@ class ChatAnthropic(BaseChatModel):
)
block_start_event = event
elif (
event.type == "content_block_start"
and event.content_block is not None
and event.content_block.type in ("text", "thinking")
):
# Anthropic can place the opening content of a text or thinking block
# directly on the `content_block_start` event instead of in a
# following delta. This is common for the assistant turn that follows
# a tool result. Emit that initial content here so it is not dropped
# from the aggregated message. The deltas that follow are emitted as
# separate chunks sharing this block's `index`; chunk addition
# (`AIMessageChunk.__add__`) later coalesces them into one block.
block_start_event = event
if event.content_block.type == "text":
text = getattr(event.content_block, "text", "") or ""
if text:
if coerce_content_to_string:
message_chunk = AIMessageChunk(content=text)
else:
content_block = event.content_block.model_dump()
content_block["index"] = event.index
if content_block.get("citations") is None:
content_block.pop("citations", None)
message_chunk = AIMessageChunk(content=[content_block])
else: # thinking
thinking = getattr(event.content_block, "thinking", "") or ""
signature = getattr(event.content_block, "signature", "") or ""
if thinking or signature:
content_block = event.content_block.model_dump()
content_block["index"] = event.index
content_block["type"] = "thinking"
message_chunk = AIMessageChunk(content=[content_block])
# Process incremental content updates
elif event.type == "content_block_delta":
# Text and citation deltas (incremental text content)

View File

@@ -1263,6 +1263,7 @@ def test_structured_output_thinking_enabled() -> None:
assert isinstance(chunk, GenerateUsername)
@pytest.mark.retry(count=3, delay=1)
def test_structured_output_thinking_force_tool_use() -> None:
# Structured output currently relies on forced tool use, which is not supported
# when `thinking` is enabled. When this test fails, it means that the feature

View File

@@ -14,7 +14,13 @@ import pytest
from anthropic.types import Message, TextBlock, Usage
from blockbuster import blockbuster_ctx
from langchain_core.exceptions import ContextOverflowError
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.messages import (
AIMessage,
AIMessageChunk,
HumanMessage,
SystemMessage,
ToolMessage,
)
from langchain_core.runnables import RunnableBinding
from langchain_core.tools import BaseTool, tool
from langchain_core.tracers.base import BaseTracer
@@ -2212,6 +2218,200 @@ def test_streaming_cache_token_reporting() -> None:
assert delta_chunk.usage_metadata["total_tokens"] == 185
def _aggregate_anthropic_events(
llm: ChatAnthropic,
events: list[Any],
*,
coerce_content_to_string: bool,
) -> AIMessageChunk | None:
"""Drive the handler over `events` and sum chunks like `_stream` does."""
block_start_event = None
aggregate: AIMessageChunk | None = None
for event in events:
chunk, block_start_event = llm._make_message_chunk_from_anthropic_event(
event,
stream_usage=True,
coerce_content_to_string=coerce_content_to_string,
block_start_event=block_start_event,
)
if chunk is not None:
aggregate = chunk if aggregate is None else aggregate + chunk
return aggregate
def test_text_content_block_start_carries_initial_text() -> None:
"""Regression test: text on `content_block_start` must not be dropped.
Anthropic sometimes places the opening text of a text block directly on the
`content_block_start` event (rather than in a following `text_delta`),
most often on the assistant turn that follows a tool result. The handler
previously only built a chunk on `content_block_start` for tool / document
/ redacted_thinking blocks, so the leading text was silently dropped from the
aggregated message that gets persisted.
"""
from anthropic.types import (
RawContentBlockDeltaEvent,
RawContentBlockStartEvent,
RawContentBlockStopEvent,
RawMessageStartEvent,
TextDelta,
)
msg = Message(
id="msg_repro",
content=[],
model=MODEL_NAME,
role="assistant",
stop_reason=None,
stop_sequence=None,
usage=Usage(input_tokens=10, output_tokens=0),
type="message",
)
events = [
RawMessageStartEvent(message=msg, type="message_start"),
# The first text rides the START event; the rest arrives as a delta.
RawContentBlockStartEvent(
content_block=TextBlock(text="Here", type="text"),
index=0,
type="content_block_start",
),
RawContentBlockDeltaEvent(
delta=TextDelta(text="'s the answer.", type="text_delta"),
index=0,
type="content_block_delta",
),
RawContentBlockStopEvent(index=0, type="content_block_stop"),
]
llm = ChatAnthropic(model=MODEL_NAME) # type: ignore[call-arg]
# String content path (no tools / thinking).
aggregate = _aggregate_anthropic_events(llm, events, coerce_content_to_string=True)
assert aggregate is not None
assert aggregate.text == "Here's the answer."
# Structured content path (e.g. tools / thinking enabled elsewhere).
aggregate = _aggregate_anthropic_events(llm, events, coerce_content_to_string=False)
assert aggregate is not None
assert aggregate.text == "Here's the answer."
# The start-event block and the following delta must coalesce into a single
# text block that carries the block `index` and no spurious `citations`.
assert isinstance(aggregate.content, list)
text_blocks = [
block
for block in aggregate.content
if isinstance(block, dict) and block.get("type") == "text"
]
assert len(text_blocks) == 1
assert text_blocks[0]["index"] == 0
assert "citations" not in text_blocks[0]
def test_empty_text_content_block_start_emits_no_chunk() -> None:
"""An empty `TextBlock` start must not change behavior (no spurious chunk)."""
from anthropic.types import RawContentBlockStartEvent
llm = ChatAnthropic(model=MODEL_NAME) # type: ignore[call-arg]
chunk, block_start_event = llm._make_message_chunk_from_anthropic_event(
RawContentBlockStartEvent(
content_block=TextBlock(text="", type="text"),
index=0,
type="content_block_start",
),
stream_usage=True,
coerce_content_to_string=True,
block_start_event=None,
)
assert chunk is None
# The block is still tracked so subsequent deltas resolve against it.
assert block_start_event is not None
def test_thinking_content_block_start_carries_initial_thinking() -> None:
"""Regression test: thinking/signature on `content_block_start` must survive.
As with text blocks, Anthropic can place a thinking block's opening
`thinking` content (and its `signature`) directly on the
`content_block_start` event rather than in a following `thinking_delta` /
`signature_delta`. The handler previously only built a chunk on
`content_block_start` for tool / document / redacted_thinking blocks, so the
leading thinking content was silently dropped from the aggregated message.
"""
from anthropic.types import (
RawContentBlockDeltaEvent,
RawContentBlockStartEvent,
RawContentBlockStopEvent,
RawMessageStartEvent,
ThinkingBlock,
ThinkingDelta,
)
msg = Message(
id="msg_repro",
content=[],
model=MODEL_NAME,
role="assistant",
stop_reason=None,
stop_sequence=None,
usage=Usage(input_tokens=10, output_tokens=0),
type="message",
)
events = [
RawMessageStartEvent(message=msg, type="message_start"),
# The opening thinking + signature ride the START event.
RawContentBlockStartEvent(
content_block=ThinkingBlock(
thinking="Let me ", signature="sig123", type="thinking"
),
index=0,
type="content_block_start",
),
RawContentBlockDeltaEvent(
delta=ThinkingDelta(thinking="think about it.", type="thinking_delta"),
index=0,
type="content_block_delta",
),
RawContentBlockStopEvent(index=0, type="content_block_stop"),
]
llm = ChatAnthropic(model=MODEL_NAME) # type: ignore[call-arg]
# Thinking is only represented in the structured-content path.
aggregate = _aggregate_anthropic_events(llm, events, coerce_content_to_string=False)
assert aggregate is not None
assert isinstance(aggregate.content, list)
thinking_blocks = [
block
for block in aggregate.content
if isinstance(block, dict) and block.get("type") == "thinking"
]
assert len(thinking_blocks) == 1
# The leading thinking from the start event merges with the following delta,
# and the signature carried on the start event is preserved.
assert thinking_blocks[0]["thinking"] == "Let me think about it."
assert thinking_blocks[0]["signature"] == "sig123"
def test_empty_thinking_content_block_start_emits_no_chunk() -> None:
"""An empty `ThinkingBlock` start must not emit a spurious chunk."""
from anthropic.types import RawContentBlockStartEvent, ThinkingBlock
llm = ChatAnthropic(model=MODEL_NAME) # type: ignore[call-arg]
chunk, block_start_event = llm._make_message_chunk_from_anthropic_event(
RawContentBlockStartEvent(
content_block=ThinkingBlock(thinking="", signature="", type="thinking"),
index=0,
type="content_block_start",
),
stream_usage=True,
coerce_content_to_string=False,
block_start_event=None,
)
assert chunk is None
# The block is still tracked so subsequent deltas resolve against it.
assert block_start_event is not None
def test_strict_tool_use() -> None:
model = ChatAnthropic(
model=MODEL_NAME, # type: ignore[call-arg]