diff --git a/libs/partners/anthropic/langchain_anthropic/chat_models.py b/libs/partners/anthropic/langchain_anthropic/chat_models.py index 6ed53ae497e..43904445a74 100644 --- a/libs/partners/anthropic/langchain_anthropic/chat_models.py +++ b/libs/partners/anthropic/langchain_anthropic/chat_models.py @@ -1313,7 +1313,7 @@ class ChatAnthropic(BaseChatModel): ) block_start_event = None for event in stream: - msg, block_start_event = _make_message_chunk_from_anthropic_event( + msg, block_start_event = self._make_message_chunk_from_anthropic_event( event, stream_usage=stream_usage, coerce_content_to_string=coerce_content_to_string, @@ -1350,7 +1350,7 @@ class ChatAnthropic(BaseChatModel): ) block_start_event = None async for event in stream: - msg, block_start_event = _make_message_chunk_from_anthropic_event( + msg, block_start_event = self._make_message_chunk_from_anthropic_event( event, stream_usage=stream_usage, coerce_content_to_string=coerce_content_to_string, @@ -1364,6 +1364,188 @@ class ChatAnthropic(BaseChatModel): except anthropic.BadRequestError as e: _handle_anthropic_bad_request(e) + def _make_message_chunk_from_anthropic_event( + self, + event: anthropic.types.RawMessageStreamEvent, + *, + stream_usage: bool = True, + coerce_content_to_string: bool, + block_start_event: anthropic.types.RawMessageStreamEvent | None = None, + ) -> tuple[AIMessageChunk | None, anthropic.types.RawMessageStreamEvent | None]: + """Convert Anthropic streaming event to `AIMessageChunk`. + + Args: + event: Raw streaming event from Anthropic SDK + stream_usage: Whether to include usage metadata in the output chunks. + coerce_content_to_string: Whether to convert structured content to plain + text strings. + + When `True`, only text content is preserved; when `False`, structured + content like tool calls and citations are maintained. + block_start_event: Previous content block start event, used for tracking + tool use blocks and maintaining context across related events. + + Returns: + Tuple with + - `AIMessageChunk`: Converted message chunk with appropriate content and + metadata, or `None` if the event doesn't produce a chunk + - `RawMessageStreamEvent`: Updated `block_start_event` for tracking + content blocks across sequential events, or `None` if not applicable + + Note: + Not all Anthropic events result in message chunks. Events like internal + state changes return `None` for the message chunk while potentially + updating the `block_start_event` for context tracking. + """ + message_chunk: AIMessageChunk | None = None + # Reference: Anthropic SDK streaming implementation + # https://github.com/anthropics/anthropic-sdk-python/blob/main/src/anthropic/lib/streaming/_messages.py # noqa: E501 + if event.type == "message_start" and stream_usage: + # Capture model name, but don't include usage_metadata yet + # as it will be properly reported in message_delta with complete info + if hasattr(event.message, "model"): + response_metadata: dict[str, Any] = {"model_name": event.message.model} + else: + response_metadata = {} + + message_chunk = AIMessageChunk( + content="" if coerce_content_to_string else [], + response_metadata=response_metadata, + ) + + elif ( + event.type == "content_block_start" + and event.content_block is not None + and ( + "tool_result" in event.content_block.type + or "tool_use" in event.content_block.type + or "document" in event.content_block.type + or "redacted_thinking" in event.content_block.type + ) + ): + if coerce_content_to_string: + warnings.warn("Received unexpected tool content block.", stacklevel=2) + + content_block = event.content_block.model_dump() + if "caller" in content_block and content_block["caller"] is None: + content_block.pop("caller") + content_block["index"] = event.index + if event.content_block.type == "tool_use": + if ( + parsed_args := getattr(event.content_block, "input", None) + ) and isinstance(parsed_args, dict): + # In some cases parsed args are represented in start event, with no + # following input_json_delta events + args = json.dumps(parsed_args) + else: + args = "" + tool_call_chunk = create_tool_call_chunk( + index=event.index, + id=event.content_block.id, + name=event.content_block.name, + args=args, + ) + tool_call_chunks = [tool_call_chunk] + else: + tool_call_chunks = [] + message_chunk = AIMessageChunk( + content=[content_block], + tool_call_chunks=tool_call_chunks, + ) + block_start_event = event + + # Process incremental content updates + elif event.type == "content_block_delta": + # Text and citation deltas (incremental text content) + if event.delta.type in ("text_delta", "citations_delta"): + if coerce_content_to_string and hasattr(event.delta, "text"): + text = getattr(event.delta, "text", "") + message_chunk = AIMessageChunk(content=text) + else: + content_block = event.delta.model_dump() + content_block["index"] = event.index + + # All citation deltas are part of a text block + content_block["type"] = "text" + if "citation" in content_block: + # Assign citations to a list if present + content_block["citations"] = [content_block.pop("citation")] + message_chunk = AIMessageChunk(content=[content_block]) + + # Reasoning + elif event.delta.type in {"thinking_delta", "signature_delta"}: + content_block = event.delta.model_dump() + content_block["index"] = event.index + content_block["type"] = "thinking" + message_chunk = AIMessageChunk(content=[content_block]) + + # Tool input JSON (streaming tool arguments) + elif event.delta.type == "input_json_delta": + content_block = event.delta.model_dump() + content_block["index"] = event.index + start_event_block = ( + getattr(block_start_event, "content_block", None) + if block_start_event + else None + ) + if ( + start_event_block is not None + and getattr(start_event_block, "type", None) == "tool_use" + ): + tool_call_chunk = create_tool_call_chunk( + index=event.index, + id=None, + name=None, + args=event.delta.partial_json, + ) + tool_call_chunks = [tool_call_chunk] + else: + tool_call_chunks = [] + message_chunk = AIMessageChunk( + content=[content_block], + tool_call_chunks=tool_call_chunks, + ) + + # Compaction block + elif event.delta.type == "compaction_delta": + content_block = event.delta.model_dump() + content_block["index"] = event.index + content_block["type"] = "compaction" + message_chunk = AIMessageChunk(content=[content_block]) + + # Process final usage metadata and completion info + elif event.type == "message_delta" and stream_usage: + usage_metadata = _create_usage_metadata(event.usage) + response_metadata = { + "stop_reason": event.delta.stop_reason, + "stop_sequence": event.delta.stop_sequence, + } + if context_management := getattr(event, "context_management", None): + response_metadata["context_management"] = ( + context_management.model_dump() + ) + message_delta = getattr(event, "delta", None) + if message_delta and ( + container := getattr(message_delta, "container", None) + ): + response_metadata["container"] = container.model_dump(mode="json") + message_chunk = AIMessageChunk( + content="" if coerce_content_to_string else [], + usage_metadata=usage_metadata, + response_metadata=response_metadata, + ) + if message_chunk.response_metadata.get("stop_reason"): + # Mark final Anthropic stream chunk + message_chunk.chunk_position = "last" + # Unhandled event types (e.g., `content_block_stop`, `ping` events) + # https://platform.claude.com/docs/en/build-with-claude/streaming#other-events + else: + pass + + if message_chunk: + message_chunk.response_metadata["model_provider"] = "anthropic" + return message_chunk, block_start_event + def _format_output(self, data: Any, **kwargs: Any) -> ChatResult: """Format the output from the Anthropic API to LC.""" data_dict = data.model_dump() @@ -1980,184 +2162,6 @@ def _convert_to_anthropic_output_config_format(schema: dict | type) -> dict[str, return {"type": "json_schema", "schema": json_schema} -def _make_message_chunk_from_anthropic_event( - event: anthropic.types.RawMessageStreamEvent, - *, - stream_usage: bool = True, - coerce_content_to_string: bool, - block_start_event: anthropic.types.RawMessageStreamEvent | None = None, -) -> tuple[AIMessageChunk | None, anthropic.types.RawMessageStreamEvent | None]: - """Convert Anthropic streaming event to `AIMessageChunk`. - - Args: - event: Raw streaming event from Anthropic SDK - stream_usage: Whether to include usage metadata in the output chunks. - coerce_content_to_string: Whether to convert structured content to plain - text strings. - - When `True`, only text content is preserved; when `False`, structured - content like tool calls and citations are maintained. - block_start_event: Previous content block start event, used for tracking - tool use blocks and maintaining context across related events. - - Returns: - Tuple with - - `AIMessageChunk`: Converted message chunk with appropriate content and - metadata, or `None` if the event doesn't produce a chunk - - `RawMessageStreamEvent`: Updated `block_start_event` for tracking content - blocks across sequential events, or `None` if not applicable - - Note: - Not all Anthropic events result in message chunks. Events like internal - state changes return `None` for the message chunk while potentially - updating the `block_start_event` for context tracking. - """ - message_chunk: AIMessageChunk | None = None - # Reference: Anthropic SDK streaming implementation - # https://github.com/anthropics/anthropic-sdk-python/blob/main/src/anthropic/lib/streaming/_messages.py # noqa: E501 - if event.type == "message_start" and stream_usage: - # Capture model name, but don't include usage_metadata yet - # as it will be properly reported in message_delta with complete info - if hasattr(event.message, "model"): - response_metadata: dict[str, Any] = {"model_name": event.message.model} - else: - response_metadata = {} - - message_chunk = AIMessageChunk( - content="" if coerce_content_to_string else [], - response_metadata=response_metadata, - ) - - elif ( - event.type == "content_block_start" - and event.content_block is not None - and ( - "tool_result" in event.content_block.type - or "tool_use" in event.content_block.type - or "document" in event.content_block.type - or "redacted_thinking" in event.content_block.type - ) - ): - if coerce_content_to_string: - warnings.warn("Received unexpected tool content block.", stacklevel=2) - - content_block = event.content_block.model_dump() - if "caller" in content_block and content_block["caller"] is None: - content_block.pop("caller") - content_block["index"] = event.index - if event.content_block.type == "tool_use": - if ( - parsed_args := getattr(event.content_block, "input", None) - ) and isinstance(parsed_args, dict): - # In some cases parsed args are represented in start event, with no - # following input_json_delta events - args = json.dumps(parsed_args) - else: - args = "" - tool_call_chunk = create_tool_call_chunk( - index=event.index, - id=event.content_block.id, - name=event.content_block.name, - args=args, - ) - tool_call_chunks = [tool_call_chunk] - else: - tool_call_chunks = [] - message_chunk = AIMessageChunk( - content=[content_block], - tool_call_chunks=tool_call_chunks, - ) - block_start_event = event - - # Process incremental content updates - elif event.type == "content_block_delta": - # Text and citation deltas (incremental text content) - if event.delta.type in ("text_delta", "citations_delta"): - if coerce_content_to_string and hasattr(event.delta, "text"): - text = getattr(event.delta, "text", "") - message_chunk = AIMessageChunk(content=text) - else: - content_block = event.delta.model_dump() - content_block["index"] = event.index - - # All citation deltas are part of a text block - content_block["type"] = "text" - if "citation" in content_block: - # Assign citations to a list if present - content_block["citations"] = [content_block.pop("citation")] - message_chunk = AIMessageChunk(content=[content_block]) - - # Reasoning - elif event.delta.type in {"thinking_delta", "signature_delta"}: - content_block = event.delta.model_dump() - content_block["index"] = event.index - content_block["type"] = "thinking" - message_chunk = AIMessageChunk(content=[content_block]) - - # Tool input JSON (streaming tool arguments) - elif event.delta.type == "input_json_delta": - content_block = event.delta.model_dump() - content_block["index"] = event.index - start_event_block = ( - getattr(block_start_event, "content_block", None) - if block_start_event - else None - ) - if ( - start_event_block is not None - and getattr(start_event_block, "type", None) == "tool_use" - ): - tool_call_chunk = create_tool_call_chunk( - index=event.index, - id=None, - name=None, - args=event.delta.partial_json, - ) - tool_call_chunks = [tool_call_chunk] - else: - tool_call_chunks = [] - message_chunk = AIMessageChunk( - content=[content_block], - tool_call_chunks=tool_call_chunks, - ) - - # Compaction block - elif event.delta.type == "compaction_delta": - content_block = event.delta.model_dump() - content_block["index"] = event.index - content_block["type"] = "compaction" - message_chunk = AIMessageChunk(content=[content_block]) - - # Process final usage metadata and completion info - elif event.type == "message_delta" and stream_usage: - usage_metadata = _create_usage_metadata(event.usage) - response_metadata = { - "stop_reason": event.delta.stop_reason, - "stop_sequence": event.delta.stop_sequence, - } - if context_management := getattr(event, "context_management", None): - response_metadata["context_management"] = context_management.model_dump() - message_delta = getattr(event, "delta", None) - if message_delta and (container := getattr(message_delta, "container", None)): - response_metadata["container"] = container.model_dump(mode="json") - message_chunk = AIMessageChunk( - content="" if coerce_content_to_string else [], - usage_metadata=usage_metadata, - response_metadata=response_metadata, - ) - if message_chunk.response_metadata.get("stop_reason"): - # Mark final Anthropic stream chunk - message_chunk.chunk_position = "last" - # Unhandled event types (e.g., `content_block_stop`, `ping` events) - # https://platform.claude.com/docs/en/build-with-claude/streaming#other-events - else: - pass - - if message_chunk: - message_chunk.response_metadata["model_provider"] = "anthropic" - return message_chunk, block_start_event - - def _create_usage_metadata(anthropic_usage: BaseModel) -> UsageMetadata: """Create LangChain `UsageMetadata` from Anthropic `Usage` data. diff --git a/libs/partners/anthropic/tests/unit_tests/test_chat_models.py b/libs/partners/anthropic/tests/unit_tests/test_chat_models.py index f361857cee5..1fa7a4b5cf6 100644 --- a/libs/partners/anthropic/tests/unit_tests/test_chat_models.py +++ b/libs/partners/anthropic/tests/unit_tests/test_chat_models.py @@ -1690,8 +1690,6 @@ def test_streaming_cache_token_reporting() -> None: from anthropic.types import MessageDeltaUsage - from langchain_anthropic.chat_models import _make_message_chunk_from_anthropic_event - # Create a mock message_start event mock_message = MagicMock() mock_message.model = MODEL_NAME @@ -1721,8 +1719,10 @@ def test_streaming_cache_token_reporting() -> None: message_delta_event.usage = mock_delta_usage message_delta_event.delta = mock_delta + llm = ChatAnthropic(model=MODEL_NAME) # type: ignore[call-arg] + # Test message_start event - start_chunk, _ = _make_message_chunk_from_anthropic_event( + start_chunk, _ = llm._make_message_chunk_from_anthropic_event( message_start_event, stream_usage=True, coerce_content_to_string=True, @@ -1730,7 +1730,7 @@ def test_streaming_cache_token_reporting() -> None: ) # Test message_delta event - should contain complete usage metadata (w/ cache) - delta_chunk, _ = _make_message_chunk_from_anthropic_event( + delta_chunk, _ = llm._make_message_chunk_from_anthropic_event( message_delta_event, stream_usage=True, coerce_content_to_string=True,