From e91da86efe9f006f79ba9271cab8cd2cf3f59322 Mon Sep 17 00:00:00 2001 From: Mason Daugherty Date: Wed, 4 Mar 2026 15:35:30 -0500 Subject: [PATCH] feat(openrouter): add streaming token usage support (#35559) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Streaming token usage was silently dropped for `ChatOpenRouter`. Both `_stream` and `_astream` skipped any SSE chunk without a `choices` array — which is exactly the shape OpenRouter uses for the final usage-reporting chunk. This meant `usage_metadata` was never populated on streamed responses, causing downstream consumers (like the Deep Agents CLI) to show "unknown" model with 0 tokens. ## Changes - Add `stream_usage: bool = True` field to `ChatOpenRouter`, which passes `stream_options: {"include_usage": True}` to the OpenRouter API when streaming — matching the pattern already established in `langchain-openai`'s `BaseChatOpenAI` - Handle usage-only chunks (no `choices`, just `usage`) in both `_stream` and `_astream` by emitting a `ChatGenerationChunk` with `usage_metadata` via `_create_usage_metadata`, instead of silently `continue`-ing past them --- .../langchain_openai/chat_models/base.py | 65 +++++---- .../langchain_openrouter/chat_models.py | 39 +++++- .../__snapshots__/test_standard.ambr | 1 + .../tests/unit_tests/test_chat_models.py | 132 ++++++++++++++++++ 4 files changed, 208 insertions(+), 29 deletions(-) diff --git a/libs/partners/openai/langchain_openai/chat_models/base.py b/libs/partners/openai/langchain_openai/chat_models/base.py index c74c40f3c56..fb93df89d3d 100644 --- a/libs/partners/openai/langchain_openai/chat_models/base.py +++ b/libs/partners/openai/langchain_openai/chat_models/base.py @@ -577,8 +577,8 @@ class BaseChatOpenAI(BaseChatModel): ) """API key to use. - Can be inferred from the `OPENAI_API_KEY` environment variable, or specified as a - string, or sync or async callable that returns a string. + Can be inferred from the `OPENAI_API_KEY` environment variable, or specified + as a string, or sync or async callable that returns a string. ??? example "Specify with environment variable" @@ -600,6 +600,7 @@ class BaseChatOpenAI(BaseChatModel): ``` ??? example "Specify with a sync callable" + ```python from langchain_openai import ChatOpenAI @@ -611,6 +612,7 @@ class BaseChatOpenAI(BaseChatModel): ``` ??? example "Specify with an async callable" + ```python from langchain_openai import ChatOpenAI @@ -636,16 +638,20 @@ class BaseChatOpenAI(BaseChatModel): request_timeout: float | tuple[float, float] | Any | None = Field( default=None, alias="timeout" ) - """Timeout for requests to OpenAI completion API. Can be float, `httpx.Timeout` or - `None`.""" + """Timeout for requests to OpenAI completion API. + + Can be float, `httpx.Timeout` or `None`. + """ stream_usage: bool | None = None - """Whether to include usage metadata in streaming output. If enabled, an additional - message chunk will be generated during the stream including usage metadata. + """Whether to include usage metadata in streaming output. + + If enabled, an additional message chunk will be generated during the stream + including usage metadata. This parameter is enabled unless `openai_api_base` is set or the model is - initialized with a custom client, as many chat completions APIs do not support - streaming token usage. + initialized with a custom client, as many chat completions APIs do not + support streaming token usage. !!! version-added "Added in `langchain-openai` 0.3.9" @@ -671,8 +677,10 @@ class BaseChatOpenAI(BaseChatModel): top_logprobs: int | None = None """Number of most likely tokens to return at each token position, each with an - associated log probability. `logprobs` must be set to true if this parameter is - used.""" + associated log probability. + + `logprobs` must be set to true if this parameter is used. + """ logit_bias: dict[int, int] | None = None """Modify the likelihood of specified tokens appearing in the completion.""" @@ -690,10 +698,9 @@ class BaseChatOpenAI(BaseChatModel): """Maximum number of tokens to generate.""" reasoning_effort: str | None = None - """Constrains effort on reasoning for reasoning models. For use with the Chat - Completions API. + """Constrains effort on reasoning for reasoning models. - Reasoning models only. + For use with the Chat Completions API. Reasoning models only. Currently supported values are `'minimal'`, `'low'`, `'medium'`, and `'high'`. Reducing reasoning effort can result in faster responses and fewer @@ -701,7 +708,9 @@ class BaseChatOpenAI(BaseChatModel): """ reasoning: dict[str, Any] | None = None - """Reasoning parameters for reasoning models. For use with the Responses API. + """Reasoning parameters for reasoning models. + + For use with the Responses API. ```python reasoning={ @@ -714,8 +723,9 @@ class BaseChatOpenAI(BaseChatModel): """ verbosity: str | None = None - """Controls the verbosity level of responses for reasoning models. For use with the - Responses API. + """Controls the verbosity level of responses for reasoning models. + + For use with the Responses API. Currently supported values are `'low'`, `'medium'`, and `'high'`. @@ -745,35 +755,36 @@ class BaseChatOpenAI(BaseChatModel): http_client: Any | None = Field(default=None, exclude=True) """Optional `httpx.Client`. - Only used for sync invocations. Must specify `http_async_client` as well if you'd - like a custom client for async invocations. + Only used for sync invocations. Must specify `http_async_client` as well if + you'd like a custom client for async invocations. """ http_async_client: Any | None = Field(default=None, exclude=True) """Optional `httpx.AsyncClient`. - Only used for async invocations. Must specify `http_client` as well if you'd like a - custom client for sync invocations. + Only used for async invocations. Must specify `http_client` as well if you'd + like a custom client for sync invocations. """ stop: list[str] | str | None = Field(default=None, alias="stop_sequences") """Default stop sequences.""" extra_body: Mapping[str, Any] | None = None - """Optional additional JSON properties to include in the request parameters when - making requests to OpenAI compatible APIs, such as vLLM, LM Studio, or other - providers. + """Optional additional JSON properties to include in the request parameters + when making requests to OpenAI compatible APIs, such as vLLM, LM Studio, or + other providers. This is the recommended way to pass custom parameters that are specific to your OpenAI-compatible API provider but not part of the standard OpenAI API. Examples: - - [LM Studio](https://lmstudio.ai/) TTL parameter: `extra_body={"ttl": 300}` - - [vLLM](https://github.com/vllm-project/vllm) custom parameters: - `extra_body={"use_beam_search": True}` - - Any other provider-specific parameters + - [LM Studio](https://lmstudio.ai/) TTL parameter: `extra_body={"ttl": 300}` + - [vLLM](https://github.com/vllm-project/vllm) custom parameters: + `extra_body={"use_beam_search": True}` + - Any other provider-specific parameters !!! warning + Do not use `model_kwargs` for custom parameters that are not part of the standard OpenAI API, as this will cause errors when making API calls. Use `extra_body` instead. diff --git a/libs/partners/openrouter/langchain_openrouter/chat_models.py b/libs/partners/openrouter/langchain_openrouter/chat_models.py index e87d88a72d0..13846b4ab03 100644 --- a/libs/partners/openrouter/langchain_openrouter/chat_models.py +++ b/libs/partners/openrouter/langchain_openrouter/chat_models.py @@ -223,6 +223,13 @@ class ChatOpenRouter(BaseChatModel): streaming: bool = False """Whether to stream the results or not.""" + stream_usage: bool = True + """Whether to include usage metadata in streaming output. + + If `True`, additional message chunks will be generated during the stream including + usage metadata. + """ + model_kwargs: dict[str, Any] = Field(default_factory=dict) """Any extra model parameters for the OpenRouter API.""" @@ -443,7 +450,7 @@ class ChatOpenRouter(BaseChatModel): response = await self.client.chat.send_async(messages=sdk_messages, **params) return self._create_chat_result(response) - def _stream( # noqa: C901 + def _stream( # noqa: C901, PLR0912 self, messages: list[BaseMessage], stop: list[str] | None = None, @@ -452,6 +459,8 @@ class ChatOpenRouter(BaseChatModel): ) -> Iterator[ChatGenerationChunk]: message_dicts, params = self._create_message_dicts(messages, stop) params = {**params, **kwargs, "stream": True} + if self.stream_usage: + params["stream_options"] = {"include_usage": True} _strip_internal_kwargs(params) sdk_messages = _wrap_messages_for_sdk(message_dicts) @@ -466,6 +475,18 @@ class ChatOpenRouter(BaseChatModel): f"(code: {error.get('code', 'unknown')})" ) raise ValueError(msg) + # Usage-only chunk (no choices) — emit with usage_metadata + if usage := chunk_dict.get("usage"): + usage_metadata = _create_usage_metadata(usage) + usage_chunk = AIMessageChunk( + content="", usage_metadata=usage_metadata + ) + generation_chunk = ChatGenerationChunk(message=usage_chunk) + if run_manager: + run_manager.on_llm_new_token( + generation_chunk.text, chunk=generation_chunk + ) + yield generation_chunk continue choice = chunk_dict["choices"][0] message_chunk = _convert_chunk_to_message_chunk( @@ -515,7 +536,7 @@ class ChatOpenRouter(BaseChatModel): ) yield generation_chunk - async def _astream( # noqa: C901 + async def _astream( # noqa: C901, PLR0912 self, messages: list[BaseMessage], stop: list[str] | None = None, @@ -524,6 +545,8 @@ class ChatOpenRouter(BaseChatModel): ) -> AsyncIterator[ChatGenerationChunk]: message_dicts, params = self._create_message_dicts(messages, stop) params = {**params, **kwargs, "stream": True} + if self.stream_usage: + params["stream_options"] = {"include_usage": True} _strip_internal_kwargs(params) sdk_messages = _wrap_messages_for_sdk(message_dicts) @@ -540,6 +563,18 @@ class ChatOpenRouter(BaseChatModel): f"(code: {error.get('code', 'unknown')})" ) raise ValueError(msg) + # Usage-only chunk (no choices) — emit with usage_metadata + if usage := chunk_dict.get("usage"): + usage_metadata = _create_usage_metadata(usage) + usage_chunk = AIMessageChunk( + content="", usage_metadata=usage_metadata + ) + generation_chunk = ChatGenerationChunk(message=usage_chunk) + if run_manager: + await run_manager.on_llm_new_token( + token=generation_chunk.text, chunk=generation_chunk + ) + yield generation_chunk continue choice = chunk_dict["choices"][0] message_chunk = _convert_chunk_to_message_chunk( diff --git a/libs/partners/openrouter/tests/unit_tests/__snapshots__/test_standard.ambr b/libs/partners/openrouter/tests/unit_tests/__snapshots__/test_standard.ambr index f0574a19267..20fec727080 100644 --- a/libs/partners/openrouter/tests/unit_tests/__snapshots__/test_standard.ambr +++ b/libs/partners/openrouter/tests/unit_tests/__snapshots__/test_standard.ambr @@ -23,6 +23,7 @@ 'request_timeout': 60, 'stop': list([ ]), + 'stream_usage': True, 'temperature': 0.0, }), 'lc': 1, diff --git a/libs/partners/openrouter/tests/unit_tests/test_chat_models.py b/libs/partners/openrouter/tests/unit_tests/test_chat_models.py index 33668f63a92..5e7bc3a1cef 100644 --- a/libs/partners/openrouter/tests/unit_tests/test_chat_models.py +++ b/libs/partners/openrouter/tests/unit_tests/test_chat_models.py @@ -2831,3 +2831,135 @@ class TestStreamingErrors: assert any( "malformed tool call chunk" in str(warning.message) for warning in w ) + + +class TestStreamUsage: + """Tests for stream_usage and usage-only chunk handling.""" + + def test_stream_options_passed_by_default(self) -> None: + """Test that stream_options with include_usage is sent by default.""" + model = _make_model() + model.client = MagicMock() + model.client.chat.send.return_value = _MockSyncStream( + [dict(c) for c in _STREAM_CHUNKS] + ) + list(model.stream("Hello")) + call_kwargs = model.client.chat.send.call_args[1] + assert call_kwargs["stream_options"] == {"include_usage": True} + + def test_stream_options_not_passed_when_disabled(self) -> None: + """Test that stream_options is omitted when stream_usage=False.""" + model = _make_model(stream_usage=False) + model.client = MagicMock() + model.client.chat.send.return_value = _MockSyncStream( + [dict(c) for c in _STREAM_CHUNKS] + ) + list(model.stream("Hello")) + call_kwargs = model.client.chat.send.call_args[1] + assert "stream_options" not in call_kwargs + + def test_usage_only_chunk_emitted(self) -> None: + """Test that a usage-only chunk (no choices) emits usage_metadata.""" + model = _make_model() + model.client = MagicMock() + # Content chunks followed by a usage-only chunk (no choices key) + chunks_with_separate_usage: list[dict[str, Any]] = [ + { + "choices": [ + {"delta": {"role": "assistant", "content": "Hi"}, "index": 0} + ], + "model": MODEL_NAME, + "object": "chat.completion.chunk", + "created": 1700000000.0, + "id": "gen-1", + }, + { + "choices": [{"delta": {}, "finish_reason": "stop", "index": 0}], + "model": MODEL_NAME, + "object": "chat.completion.chunk", + "created": 1700000000.0, + "id": "gen-1", + }, + # Usage-only final chunk — no choices + { + "usage": { + "prompt_tokens": 10, + "completion_tokens": 5, + "total_tokens": 15, + }, + "model": MODEL_NAME, + "object": "chat.completion.chunk", + "created": 1700000000.0, + "id": "gen-1", + }, + ] + model.client.chat.send.return_value = _MockSyncStream( + chunks_with_separate_usage + ) + chunks = list(model.stream("Hello")) + + # Last chunk should carry usage_metadata + usage_chunks = [c for c in chunks if c.usage_metadata] + assert len(usage_chunks) >= 1 + usage = usage_chunks[-1].usage_metadata + assert usage is not None + assert usage["input_tokens"] == 10 + assert usage["output_tokens"] == 5 + assert usage["total_tokens"] == 15 + + async def test_astream_options_passed_by_default(self) -> None: + """Test that async stream sends stream_options by default.""" + model = _make_model() + model.client = MagicMock() + model.client.chat.send_async = AsyncMock( + return_value=_MockAsyncStream([dict(c) for c in _STREAM_CHUNKS]) + ) + chunks = [c async for c in model.astream("Hello")] # noqa: F841 + call_kwargs = model.client.chat.send_async.call_args[1] + assert call_kwargs["stream_options"] == {"include_usage": True} + + async def test_astream_usage_only_chunk_emitted(self) -> None: + """Test that an async usage-only chunk emits usage_metadata.""" + model = _make_model() + model.client = MagicMock() + chunks_with_separate_usage: list[dict[str, Any]] = [ + { + "choices": [ + {"delta": {"role": "assistant", "content": "Hi"}, "index": 0} + ], + "model": MODEL_NAME, + "object": "chat.completion.chunk", + "created": 1700000000.0, + "id": "gen-1", + }, + { + "choices": [{"delta": {}, "finish_reason": "stop", "index": 0}], + "model": MODEL_NAME, + "object": "chat.completion.chunk", + "created": 1700000000.0, + "id": "gen-1", + }, + { + "usage": { + "prompt_tokens": 10, + "completion_tokens": 5, + "total_tokens": 15, + }, + "model": MODEL_NAME, + "object": "chat.completion.chunk", + "created": 1700000000.0, + "id": "gen-1", + }, + ] + model.client.chat.send_async = AsyncMock( + return_value=_MockAsyncStream(chunks_with_separate_usage) + ) + chunks = [c async for c in model.astream("Hello")] + + usage_chunks = [c for c in chunks if c.usage_metadata] + assert len(usage_chunks) >= 1 + usage = usage_chunks[-1].usage_metadata + assert usage is not None + assert usage["input_tokens"] == 10 + assert usage["output_tokens"] == 5 + assert usage["total_tokens"] == 15