From 22867ecf68f38ed1427c7bfc829f141e491ef4f4 Mon Sep 17 00:00:00 2001 From: Nick Hollon Date: Thu, 30 Apr 2026 14:35:14 -0400 Subject: [PATCH] feat(core): dispatch stream_events(version='v3') on BaseChatModel --- .../language_models/chat_models.py | 165 ++++++++++++------ 1 file changed, 109 insertions(+), 56 deletions(-) diff --git a/libs/core/langchain_core/language_models/chat_models.py b/libs/core/langchain_core/language_models/chat_models.py index 2f5df7a3e46..c6d614c4493 100644 --- a/libs/core/langchain_core/language_models/chat_models.py +++ b/libs/core/langchain_core/language_models/chat_models.py @@ -10,7 +10,7 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator, Callable, Iterator, Sequence from functools import cached_property from operator import itemgetter -from typing import TYPE_CHECKING, Any, Literal, cast +from typing import TYPE_CHECKING, Any, Literal, cast, overload from langchain_protocol.protocol import MessageFinishData from pydantic import BaseModel, ConfigDict, Field, model_validator @@ -100,6 +100,7 @@ if TYPE_CHECKING: from langchain_core.output_parsers.base import OutputParserLike from langchain_core.runnables import Runnable, RunnableConfig + from langchain_core.runnables.schema import StreamEvent from langchain_core.tools import BaseTool @@ -969,10 +970,10 @@ class BaseChatModel(BaseLanguageModel[AIMessage], ABC): LLMResult(generations=[[generation]]), ) - # --- stream_v2 / astream_v2 --- + # --- stream_events v3 --- @beta() - def stream_v2( + def _chat_model_stream_v3( self, input: LanguageModelInput, config: RunnableConfig | None = None, @@ -980,34 +981,7 @@ class BaseChatModel(BaseLanguageModel[AIMessage], ABC): stop: list[str] | None = None, **kwargs: Any, ) -> ChatModelStream: - """Stream content-block lifecycle events for a single model call. - - Returns a `ChatModelStream` with typed projections - (`.text`, `.reasoning`, `.tool_calls`, `.output`). - - !!! warning - - This API is experimental and may change. - - !!! note "Always produces v1-shaped content" - - `ChatModelStream.output.content` is always a list of v1 - content blocks (text / reasoning / tool_call / image / …), - regardless of the model's `output_version` attribute. The - setting only affects the legacy `stream()` / `astream()` / - `invoke()` paths. If you're mixing `stream_v2` with those - paths in the same pipeline and need a consistent output - shape across them, set `output_version="v1"` on the model. - - Args: - input: The model input. - config: Optional runnable config. - stop: Optional list of stop words. - **kwargs: Additional keyword arguments passed to the model. - - Returns: - A `ChatModelStream` with typed projections. - """ + """Internal v3 sync streaming implementation. Public entry point: stream_events(version='v3').""" config = ensure_config(config) messages = self._convert_input(input).to_messages() input_messages = _normalize_messages(messages) @@ -1133,7 +1107,7 @@ class BaseChatModel(BaseLanguageModel[AIMessage], ABC): return stream @beta() - async def astream_v2( + async def _achat_model_stream_v3( self, input: LanguageModelInput, config: RunnableConfig | None = None, @@ -1141,30 +1115,7 @@ class BaseChatModel(BaseLanguageModel[AIMessage], ABC): stop: list[str] | None = None, **kwargs: Any, ) -> AsyncChatModelStream: - """Async variant of `stream_v2`. - - Returns an `AsyncChatModelStream` whose projections are - async-iterable and awaitable. - - !!! warning - - This API is experimental and may change. - - !!! note "Always produces v1-shaped content" - - The assembled message's content is always a list of v1 - content blocks, regardless of the model's `output_version` - attribute — see `stream_v2` for the full rationale. - - Args: - input: The model input. - config: Optional runnable config. - stop: Optional list of stop words. - **kwargs: Additional keyword arguments passed to the model. - - Returns: - An `AsyncChatModelStream` with typed projections. - """ + """Internal v3 async streaming implementation. Public entry point: astream_events(version='v3').""" config = ensure_config(config) messages = self._convert_input(input).to_messages() input_messages = _normalize_messages(messages) @@ -1299,6 +1250,108 @@ class BaseChatModel(BaseLanguageModel[AIMessage], ABC): stream._on_aclose_fail = _on_aclose_fail # noqa: SLF001 return stream + @overload + def stream_events( + self, + input: LanguageModelInput, + config: RunnableConfig | None = None, + *, + version: Literal["v1", "v2"] = "v2", + **kwargs: Any, + ) -> Iterator[StreamEvent]: ... + + @overload + def stream_events( + self, + input: LanguageModelInput, + config: RunnableConfig | None = None, + *, + version: Literal["v3"], + stop: list[str] | None = None, + **kwargs: Any, + ) -> ChatModelStream: ... + + def stream_events( # type: ignore[override] + self, + input: LanguageModelInput, + config: RunnableConfig | None = None, + *, + version: Literal["v1", "v2", "v3"] = "v2", + stop: list[str] | None = None, + **kwargs: Any, + ) -> Iterator[StreamEvent] | ChatModelStream: + """Stream events from this chat model. + + For `version="v1"` / `"v2"`, yields `StreamEvent` dicts (see + `Runnable.stream_events`). For `version="v3"`, returns a + `ChatModelStream` exposing typed projections (`.text`, `.reasoning`, + `.tool_calls`, `.output`). + + Args: + input: The model input. + config: Optional runnable config. + version: Streaming-event schema version. `"v3"` selects the + content-block-centric streaming protocol. + stop: Optional stop sequences. Only used for `version="v3"`; + ignored otherwise. + **kwargs: Additional keyword arguments. For `version="v3"`, + forwarded to the model. + + Returns: + For `version="v3"`, a `ChatModelStream`. Otherwise an + `Iterator[StreamEvent]`. + """ + if version == "v3": + return self._chat_model_stream_v3(input, config, stop=stop, **kwargs) + return super().stream_events(input, config, version=version, **kwargs) + + @overload + async def astream_events( + self, + input: LanguageModelInput, + config: RunnableConfig | None = None, + *, + version: Literal["v1", "v2"] = "v2", + **kwargs: Any, + ) -> AsyncIterator[StreamEvent]: ... + + @overload + async def astream_events( + self, + input: LanguageModelInput, + config: RunnableConfig | None = None, + *, + version: Literal["v3"], + stop: list[str] | None = None, + **kwargs: Any, + ) -> AsyncChatModelStream: ... + + async def _astream_events_v1_v2( + self, + input: LanguageModelInput, + config: RunnableConfig | None = None, + *, + version: Literal["v1", "v2"], + **kwargs: Any, + ) -> AsyncIterator[StreamEvent]: + """Async generator forwarding v1/v2 events to `Runnable.astream_events`.""" + async for ev in super().astream_events(input, config, version=version, **kwargs): + yield ev + + async def astream_events( # type: ignore[override] + self, + input: LanguageModelInput, + config: RunnableConfig | None = None, + *, + version: Literal["v1", "v2", "v3"] = "v2", + stop: list[str] | None = None, + **kwargs: Any, + ) -> AsyncIterator[StreamEvent] | AsyncChatModelStream: + """Async variant of `stream_events`. See `stream_events` for full docs.""" + if version == "v3": + return await self._achat_model_stream_v3(input, config, stop=stop, **kwargs) + return self._astream_events_v1_v2(input, config, version=version, **kwargs) + # --- Custom methods --- def _combine_llm_outputs(self, _llm_outputs: list[dict | None], /) -> dict: