fix(anthropic): move _make_message_chunk_from_anthropic_event to instance method (#35670)

This commit is contained in:
ccurme
2026-03-08 14:52:07 -04:00
committed by GitHub
parent fbfe4b812d
commit f11a105023
2 changed files with 188 additions and 184 deletions

View File

@@ -1313,7 +1313,7 @@ class ChatAnthropic(BaseChatModel):
)
block_start_event = None
for event in stream:
msg, block_start_event = _make_message_chunk_from_anthropic_event(
msg, block_start_event = self._make_message_chunk_from_anthropic_event(
event,
stream_usage=stream_usage,
coerce_content_to_string=coerce_content_to_string,
@@ -1350,7 +1350,7 @@ class ChatAnthropic(BaseChatModel):
)
block_start_event = None
async for event in stream:
msg, block_start_event = _make_message_chunk_from_anthropic_event(
msg, block_start_event = self._make_message_chunk_from_anthropic_event(
event,
stream_usage=stream_usage,
coerce_content_to_string=coerce_content_to_string,
@@ -1364,6 +1364,188 @@ class ChatAnthropic(BaseChatModel):
except anthropic.BadRequestError as e:
_handle_anthropic_bad_request(e)
def _make_message_chunk_from_anthropic_event(
self,
event: anthropic.types.RawMessageStreamEvent,
*,
stream_usage: bool = True,
coerce_content_to_string: bool,
block_start_event: anthropic.types.RawMessageStreamEvent | None = None,
) -> tuple[AIMessageChunk | None, anthropic.types.RawMessageStreamEvent | None]:
"""Convert Anthropic streaming event to `AIMessageChunk`.
Args:
event: Raw streaming event from Anthropic SDK
stream_usage: Whether to include usage metadata in the output chunks.
coerce_content_to_string: Whether to convert structured content to plain
text strings.
When `True`, only text content is preserved; when `False`, structured
content like tool calls and citations are maintained.
block_start_event: Previous content block start event, used for tracking
tool use blocks and maintaining context across related events.
Returns:
Tuple with
- `AIMessageChunk`: Converted message chunk with appropriate content and
metadata, or `None` if the event doesn't produce a chunk
- `RawMessageStreamEvent`: Updated `block_start_event` for tracking
content blocks across sequential events, or `None` if not applicable
Note:
Not all Anthropic events result in message chunks. Events like internal
state changes return `None` for the message chunk while potentially
updating the `block_start_event` for context tracking.
"""
message_chunk: AIMessageChunk | None = None
# Reference: Anthropic SDK streaming implementation
# https://github.com/anthropics/anthropic-sdk-python/blob/main/src/anthropic/lib/streaming/_messages.py # noqa: E501
if event.type == "message_start" and stream_usage:
# Capture model name, but don't include usage_metadata yet
# as it will be properly reported in message_delta with complete info
if hasattr(event.message, "model"):
response_metadata: dict[str, Any] = {"model_name": event.message.model}
else:
response_metadata = {}
message_chunk = AIMessageChunk(
content="" if coerce_content_to_string else [],
response_metadata=response_metadata,
)
elif (
event.type == "content_block_start"
and event.content_block is not None
and (
"tool_result" in event.content_block.type
or "tool_use" in event.content_block.type
or "document" in event.content_block.type
or "redacted_thinking" in event.content_block.type
)
):
if coerce_content_to_string:
warnings.warn("Received unexpected tool content block.", stacklevel=2)
content_block = event.content_block.model_dump()
if "caller" in content_block and content_block["caller"] is None:
content_block.pop("caller")
content_block["index"] = event.index
if event.content_block.type == "tool_use":
if (
parsed_args := getattr(event.content_block, "input", None)
) and isinstance(parsed_args, dict):
# In some cases parsed args are represented in start event, with no
# following input_json_delta events
args = json.dumps(parsed_args)
else:
args = ""
tool_call_chunk = create_tool_call_chunk(
index=event.index,
id=event.content_block.id,
name=event.content_block.name,
args=args,
)
tool_call_chunks = [tool_call_chunk]
else:
tool_call_chunks = []
message_chunk = AIMessageChunk(
content=[content_block],
tool_call_chunks=tool_call_chunks,
)
block_start_event = event
# Process incremental content updates
elif event.type == "content_block_delta":
# Text and citation deltas (incremental text content)
if event.delta.type in ("text_delta", "citations_delta"):
if coerce_content_to_string and hasattr(event.delta, "text"):
text = getattr(event.delta, "text", "")
message_chunk = AIMessageChunk(content=text)
else:
content_block = event.delta.model_dump()
content_block["index"] = event.index
# All citation deltas are part of a text block
content_block["type"] = "text"
if "citation" in content_block:
# Assign citations to a list if present
content_block["citations"] = [content_block.pop("citation")]
message_chunk = AIMessageChunk(content=[content_block])
# Reasoning
elif event.delta.type in {"thinking_delta", "signature_delta"}:
content_block = event.delta.model_dump()
content_block["index"] = event.index
content_block["type"] = "thinking"
message_chunk = AIMessageChunk(content=[content_block])
# Tool input JSON (streaming tool arguments)
elif event.delta.type == "input_json_delta":
content_block = event.delta.model_dump()
content_block["index"] = event.index
start_event_block = (
getattr(block_start_event, "content_block", None)
if block_start_event
else None
)
if (
start_event_block is not None
and getattr(start_event_block, "type", None) == "tool_use"
):
tool_call_chunk = create_tool_call_chunk(
index=event.index,
id=None,
name=None,
args=event.delta.partial_json,
)
tool_call_chunks = [tool_call_chunk]
else:
tool_call_chunks = []
message_chunk = AIMessageChunk(
content=[content_block],
tool_call_chunks=tool_call_chunks,
)
# Compaction block
elif event.delta.type == "compaction_delta":
content_block = event.delta.model_dump()
content_block["index"] = event.index
content_block["type"] = "compaction"
message_chunk = AIMessageChunk(content=[content_block])
# Process final usage metadata and completion info
elif event.type == "message_delta" and stream_usage:
usage_metadata = _create_usage_metadata(event.usage)
response_metadata = {
"stop_reason": event.delta.stop_reason,
"stop_sequence": event.delta.stop_sequence,
}
if context_management := getattr(event, "context_management", None):
response_metadata["context_management"] = (
context_management.model_dump()
)
message_delta = getattr(event, "delta", None)
if message_delta and (
container := getattr(message_delta, "container", None)
):
response_metadata["container"] = container.model_dump(mode="json")
message_chunk = AIMessageChunk(
content="" if coerce_content_to_string else [],
usage_metadata=usage_metadata,
response_metadata=response_metadata,
)
if message_chunk.response_metadata.get("stop_reason"):
# Mark final Anthropic stream chunk
message_chunk.chunk_position = "last"
# Unhandled event types (e.g., `content_block_stop`, `ping` events)
# https://platform.claude.com/docs/en/build-with-claude/streaming#other-events
else:
pass
if message_chunk:
message_chunk.response_metadata["model_provider"] = "anthropic"
return message_chunk, block_start_event
def _format_output(self, data: Any, **kwargs: Any) -> ChatResult:
"""Format the output from the Anthropic API to LC."""
data_dict = data.model_dump()
@@ -1980,184 +2162,6 @@ def _convert_to_anthropic_output_config_format(schema: dict | type) -> dict[str,
return {"type": "json_schema", "schema": json_schema}
def _make_message_chunk_from_anthropic_event(
event: anthropic.types.RawMessageStreamEvent,
*,
stream_usage: bool = True,
coerce_content_to_string: bool,
block_start_event: anthropic.types.RawMessageStreamEvent | None = None,
) -> tuple[AIMessageChunk | None, anthropic.types.RawMessageStreamEvent | None]:
"""Convert Anthropic streaming event to `AIMessageChunk`.
Args:
event: Raw streaming event from Anthropic SDK
stream_usage: Whether to include usage metadata in the output chunks.
coerce_content_to_string: Whether to convert structured content to plain
text strings.
When `True`, only text content is preserved; when `False`, structured
content like tool calls and citations are maintained.
block_start_event: Previous content block start event, used for tracking
tool use blocks and maintaining context across related events.
Returns:
Tuple with
- `AIMessageChunk`: Converted message chunk with appropriate content and
metadata, or `None` if the event doesn't produce a chunk
- `RawMessageStreamEvent`: Updated `block_start_event` for tracking content
blocks across sequential events, or `None` if not applicable
Note:
Not all Anthropic events result in message chunks. Events like internal
state changes return `None` for the message chunk while potentially
updating the `block_start_event` for context tracking.
"""
message_chunk: AIMessageChunk | None = None
# Reference: Anthropic SDK streaming implementation
# https://github.com/anthropics/anthropic-sdk-python/blob/main/src/anthropic/lib/streaming/_messages.py # noqa: E501
if event.type == "message_start" and stream_usage:
# Capture model name, but don't include usage_metadata yet
# as it will be properly reported in message_delta with complete info
if hasattr(event.message, "model"):
response_metadata: dict[str, Any] = {"model_name": event.message.model}
else:
response_metadata = {}
message_chunk = AIMessageChunk(
content="" if coerce_content_to_string else [],
response_metadata=response_metadata,
)
elif (
event.type == "content_block_start"
and event.content_block is not None
and (
"tool_result" in event.content_block.type
or "tool_use" in event.content_block.type
or "document" in event.content_block.type
or "redacted_thinking" in event.content_block.type
)
):
if coerce_content_to_string:
warnings.warn("Received unexpected tool content block.", stacklevel=2)
content_block = event.content_block.model_dump()
if "caller" in content_block and content_block["caller"] is None:
content_block.pop("caller")
content_block["index"] = event.index
if event.content_block.type == "tool_use":
if (
parsed_args := getattr(event.content_block, "input", None)
) and isinstance(parsed_args, dict):
# In some cases parsed args are represented in start event, with no
# following input_json_delta events
args = json.dumps(parsed_args)
else:
args = ""
tool_call_chunk = create_tool_call_chunk(
index=event.index,
id=event.content_block.id,
name=event.content_block.name,
args=args,
)
tool_call_chunks = [tool_call_chunk]
else:
tool_call_chunks = []
message_chunk = AIMessageChunk(
content=[content_block],
tool_call_chunks=tool_call_chunks,
)
block_start_event = event
# Process incremental content updates
elif event.type == "content_block_delta":
# Text and citation deltas (incremental text content)
if event.delta.type in ("text_delta", "citations_delta"):
if coerce_content_to_string and hasattr(event.delta, "text"):
text = getattr(event.delta, "text", "")
message_chunk = AIMessageChunk(content=text)
else:
content_block = event.delta.model_dump()
content_block["index"] = event.index
# All citation deltas are part of a text block
content_block["type"] = "text"
if "citation" in content_block:
# Assign citations to a list if present
content_block["citations"] = [content_block.pop("citation")]
message_chunk = AIMessageChunk(content=[content_block])
# Reasoning
elif event.delta.type in {"thinking_delta", "signature_delta"}:
content_block = event.delta.model_dump()
content_block["index"] = event.index
content_block["type"] = "thinking"
message_chunk = AIMessageChunk(content=[content_block])
# Tool input JSON (streaming tool arguments)
elif event.delta.type == "input_json_delta":
content_block = event.delta.model_dump()
content_block["index"] = event.index
start_event_block = (
getattr(block_start_event, "content_block", None)
if block_start_event
else None
)
if (
start_event_block is not None
and getattr(start_event_block, "type", None) == "tool_use"
):
tool_call_chunk = create_tool_call_chunk(
index=event.index,
id=None,
name=None,
args=event.delta.partial_json,
)
tool_call_chunks = [tool_call_chunk]
else:
tool_call_chunks = []
message_chunk = AIMessageChunk(
content=[content_block],
tool_call_chunks=tool_call_chunks,
)
# Compaction block
elif event.delta.type == "compaction_delta":
content_block = event.delta.model_dump()
content_block["index"] = event.index
content_block["type"] = "compaction"
message_chunk = AIMessageChunk(content=[content_block])
# Process final usage metadata and completion info
elif event.type == "message_delta" and stream_usage:
usage_metadata = _create_usage_metadata(event.usage)
response_metadata = {
"stop_reason": event.delta.stop_reason,
"stop_sequence": event.delta.stop_sequence,
}
if context_management := getattr(event, "context_management", None):
response_metadata["context_management"] = context_management.model_dump()
message_delta = getattr(event, "delta", None)
if message_delta and (container := getattr(message_delta, "container", None)):
response_metadata["container"] = container.model_dump(mode="json")
message_chunk = AIMessageChunk(
content="" if coerce_content_to_string else [],
usage_metadata=usage_metadata,
response_metadata=response_metadata,
)
if message_chunk.response_metadata.get("stop_reason"):
# Mark final Anthropic stream chunk
message_chunk.chunk_position = "last"
# Unhandled event types (e.g., `content_block_stop`, `ping` events)
# https://platform.claude.com/docs/en/build-with-claude/streaming#other-events
else:
pass
if message_chunk:
message_chunk.response_metadata["model_provider"] = "anthropic"
return message_chunk, block_start_event
def _create_usage_metadata(anthropic_usage: BaseModel) -> UsageMetadata:
"""Create LangChain `UsageMetadata` from Anthropic `Usage` data.

View File

@@ -1690,8 +1690,6 @@ def test_streaming_cache_token_reporting() -> None:
from anthropic.types import MessageDeltaUsage
from langchain_anthropic.chat_models import _make_message_chunk_from_anthropic_event
# Create a mock message_start event
mock_message = MagicMock()
mock_message.model = MODEL_NAME
@@ -1721,8 +1719,10 @@ def test_streaming_cache_token_reporting() -> None:
message_delta_event.usage = mock_delta_usage
message_delta_event.delta = mock_delta
llm = ChatAnthropic(model=MODEL_NAME) # type: ignore[call-arg]
# Test message_start event
start_chunk, _ = _make_message_chunk_from_anthropic_event(
start_chunk, _ = llm._make_message_chunk_from_anthropic_event(
message_start_event,
stream_usage=True,
coerce_content_to_string=True,
@@ -1730,7 +1730,7 @@ def test_streaming_cache_token_reporting() -> None:
)
# Test message_delta event - should contain complete usage metadata (w/ cache)
delta_chunk, _ = _make_message_chunk_from_anthropic_event(
delta_chunk, _ = llm._make_message_chunk_from_anthropic_event(
message_delta_event,
stream_usage=True,
coerce_content_to_string=True,