feat(core): dispatch stream_events(version='v3') on BaseChatModel

This commit is contained in:
Nick Hollon
2026-04-30 14:35:14 -04:00
parent 484a99a7cb
commit 22867ecf68

View File

@@ -10,7 +10,7 @@ from abc import ABC, abstractmethod
from collections.abc import AsyncIterator, Callable, Iterator, Sequence
from functools import cached_property
from operator import itemgetter
from typing import TYPE_CHECKING, Any, Literal, cast
from typing import TYPE_CHECKING, Any, Literal, cast, overload
from langchain_protocol.protocol import MessageFinishData
from pydantic import BaseModel, ConfigDict, Field, model_validator
@@ -100,6 +100,7 @@ if TYPE_CHECKING:
from langchain_core.output_parsers.base import OutputParserLike
from langchain_core.runnables import Runnable, RunnableConfig
from langchain_core.runnables.schema import StreamEvent
from langchain_core.tools import BaseTool
@@ -969,10 +970,10 @@ class BaseChatModel(BaseLanguageModel[AIMessage], ABC):
LLMResult(generations=[[generation]]),
)
# --- stream_v2 / astream_v2 ---
# --- stream_events v3 ---
@beta()
def stream_v2(
def _chat_model_stream_v3(
self,
input: LanguageModelInput,
config: RunnableConfig | None = None,
@@ -980,34 +981,7 @@ class BaseChatModel(BaseLanguageModel[AIMessage], ABC):
stop: list[str] | None = None,
**kwargs: Any,
) -> ChatModelStream:
"""Stream content-block lifecycle events for a single model call.
Returns a `ChatModelStream` with typed projections
(`.text`, `.reasoning`, `.tool_calls`, `.output`).
!!! warning
This API is experimental and may change.
!!! note "Always produces v1-shaped content"
`ChatModelStream.output.content` is always a list of v1
content blocks (text / reasoning / tool_call / image / …),
regardless of the model's `output_version` attribute. The
setting only affects the legacy `stream()` / `astream()` /
`invoke()` paths. If you're mixing `stream_v2` with those
paths in the same pipeline and need a consistent output
shape across them, set `output_version="v1"` on the model.
Args:
input: The model input.
config: Optional runnable config.
stop: Optional list of stop words.
**kwargs: Additional keyword arguments passed to the model.
Returns:
A `ChatModelStream` with typed projections.
"""
"""Internal v3 sync streaming implementation. Public entry point: stream_events(version='v3')."""
config = ensure_config(config)
messages = self._convert_input(input).to_messages()
input_messages = _normalize_messages(messages)
@@ -1133,7 +1107,7 @@ class BaseChatModel(BaseLanguageModel[AIMessage], ABC):
return stream
@beta()
async def astream_v2(
async def _achat_model_stream_v3(
self,
input: LanguageModelInput,
config: RunnableConfig | None = None,
@@ -1141,30 +1115,7 @@ class BaseChatModel(BaseLanguageModel[AIMessage], ABC):
stop: list[str] | None = None,
**kwargs: Any,
) -> AsyncChatModelStream:
"""Async variant of `stream_v2`.
Returns an `AsyncChatModelStream` whose projections are
async-iterable and awaitable.
!!! warning
This API is experimental and may change.
!!! note "Always produces v1-shaped content"
The assembled message's content is always a list of v1
content blocks, regardless of the model's `output_version`
attribute — see `stream_v2` for the full rationale.
Args:
input: The model input.
config: Optional runnable config.
stop: Optional list of stop words.
**kwargs: Additional keyword arguments passed to the model.
Returns:
An `AsyncChatModelStream` with typed projections.
"""
"""Internal v3 async streaming implementation. Public entry point: astream_events(version='v3')."""
config = ensure_config(config)
messages = self._convert_input(input).to_messages()
input_messages = _normalize_messages(messages)
@@ -1299,6 +1250,108 @@ class BaseChatModel(BaseLanguageModel[AIMessage], ABC):
stream._on_aclose_fail = _on_aclose_fail # noqa: SLF001
return stream
@overload
def stream_events(
self,
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
**kwargs: Any,
) -> Iterator[StreamEvent]: ...
@overload
def stream_events(
self,
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
version: Literal["v3"],
stop: list[str] | None = None,
**kwargs: Any,
) -> ChatModelStream: ...
def stream_events( # type: ignore[override]
self,
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2", "v3"] = "v2",
stop: list[str] | None = None,
**kwargs: Any,
) -> Iterator[StreamEvent] | ChatModelStream:
"""Stream events from this chat model.
For `version="v1"` / `"v2"`, yields `StreamEvent` dicts (see
`Runnable.stream_events`). For `version="v3"`, returns a
`ChatModelStream` exposing typed projections (`.text`, `.reasoning`,
`.tool_calls`, `.output`).
Args:
input: The model input.
config: Optional runnable config.
version: Streaming-event schema version. `"v3"` selects the
content-block-centric streaming protocol.
stop: Optional stop sequences. Only used for `version="v3"`;
ignored otherwise.
**kwargs: Additional keyword arguments. For `version="v3"`,
forwarded to the model.
Returns:
For `version="v3"`, a `ChatModelStream`. Otherwise an
`Iterator[StreamEvent]`.
"""
if version == "v3":
return self._chat_model_stream_v3(input, config, stop=stop, **kwargs)
return super().stream_events(input, config, version=version, **kwargs)
@overload
async def astream_events(
self,
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
**kwargs: Any,
) -> AsyncIterator[StreamEvent]: ...
@overload
async def astream_events(
self,
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
version: Literal["v3"],
stop: list[str] | None = None,
**kwargs: Any,
) -> AsyncChatModelStream: ...
async def _astream_events_v1_v2(
self,
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"],
**kwargs: Any,
) -> AsyncIterator[StreamEvent]:
"""Async generator forwarding v1/v2 events to `Runnable.astream_events`."""
async for ev in super().astream_events(input, config, version=version, **kwargs):
yield ev
async def astream_events( # type: ignore[override]
self,
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2", "v3"] = "v2",
stop: list[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent] | AsyncChatModelStream:
"""Async variant of `stream_events`. See `stream_events` for full docs."""
if version == "v3":
return await self._achat_model_stream_v3(input, config, stop=stop, **kwargs)
return self._astream_events_v1_v2(input, config, version=version, **kwargs)
# --- Custom methods ---
def _combine_llm_outputs(self, _llm_outputs: list[dict | None], /) -> dict: