feat(openrouter): add streaming token usage support (#35559)

Streaming token usage was silently dropped for `ChatOpenRouter`. Both
`_stream` and `_astream` skipped any SSE chunk without a `choices` array
— which is exactly the shape OpenRouter uses for the final
usage-reporting chunk. This meant `usage_metadata` was never populated
on streamed responses, causing downstream consumers (like the Deep
Agents CLI) to show "unknown" model with 0 tokens.

## Changes
- Add `stream_usage: bool = True` field to `ChatOpenRouter`, which
passes `stream_options: {"include_usage": True}` to the OpenRouter API
when streaming — matching the pattern already established in
`langchain-openai`'s `BaseChatOpenAI`
- Handle usage-only chunks (no `choices`, just `usage`) in both
`_stream` and `_astream` by emitting a `ChatGenerationChunk` with
`usage_metadata` via `_create_usage_metadata`, instead of silently
`continue`-ing past them
This commit is contained in:
Mason Daugherty
2026-03-04 15:35:30 -05:00
committed by GitHub
parent e50625e7c3
commit e91da86efe
4 changed files with 208 additions and 29 deletions

View File

@@ -577,8 +577,8 @@ class BaseChatOpenAI(BaseChatModel):
)
"""API key to use.
Can be inferred from the `OPENAI_API_KEY` environment variable, or specified as a
string, or sync or async callable that returns a string.
Can be inferred from the `OPENAI_API_KEY` environment variable, or specified
as a string, or sync or async callable that returns a string.
??? example "Specify with environment variable"
@@ -600,6 +600,7 @@ class BaseChatOpenAI(BaseChatModel):
```
??? example "Specify with a sync callable"
```python
from langchain_openai import ChatOpenAI
@@ -611,6 +612,7 @@ class BaseChatOpenAI(BaseChatModel):
```
??? example "Specify with an async callable"
```python
from langchain_openai import ChatOpenAI
@@ -636,16 +638,20 @@ class BaseChatOpenAI(BaseChatModel):
request_timeout: float | tuple[float, float] | Any | None = Field(
default=None, alias="timeout"
)
"""Timeout for requests to OpenAI completion API. Can be float, `httpx.Timeout` or
`None`."""
"""Timeout for requests to OpenAI completion API.
Can be float, `httpx.Timeout` or `None`.
"""
stream_usage: bool | None = None
"""Whether to include usage metadata in streaming output. If enabled, an additional
message chunk will be generated during the stream including usage metadata.
"""Whether to include usage metadata in streaming output.
If enabled, an additional message chunk will be generated during the stream
including usage metadata.
This parameter is enabled unless `openai_api_base` is set or the model is
initialized with a custom client, as many chat completions APIs do not support
streaming token usage.
initialized with a custom client, as many chat completions APIs do not
support streaming token usage.
!!! version-added "Added in `langchain-openai` 0.3.9"
@@ -671,8 +677,10 @@ class BaseChatOpenAI(BaseChatModel):
top_logprobs: int | None = None
"""Number of most likely tokens to return at each token position, each with an
associated log probability. `logprobs` must be set to true if this parameter is
used."""
associated log probability.
`logprobs` must be set to true if this parameter is used.
"""
logit_bias: dict[int, int] | None = None
"""Modify the likelihood of specified tokens appearing in the completion."""
@@ -690,10 +698,9 @@ class BaseChatOpenAI(BaseChatModel):
"""Maximum number of tokens to generate."""
reasoning_effort: str | None = None
"""Constrains effort on reasoning for reasoning models. For use with the Chat
Completions API.
"""Constrains effort on reasoning for reasoning models.
Reasoning models only.
For use with the Chat Completions API. Reasoning models only.
Currently supported values are `'minimal'`, `'low'`, `'medium'`, and
`'high'`. Reducing reasoning effort can result in faster responses and fewer
@@ -701,7 +708,9 @@ class BaseChatOpenAI(BaseChatModel):
"""
reasoning: dict[str, Any] | None = None
"""Reasoning parameters for reasoning models. For use with the Responses API.
"""Reasoning parameters for reasoning models.
For use with the Responses API.
```python
reasoning={
@@ -714,8 +723,9 @@ class BaseChatOpenAI(BaseChatModel):
"""
verbosity: str | None = None
"""Controls the verbosity level of responses for reasoning models. For use with the
Responses API.
"""Controls the verbosity level of responses for reasoning models.
For use with the Responses API.
Currently supported values are `'low'`, `'medium'`, and `'high'`.
@@ -745,35 +755,36 @@ class BaseChatOpenAI(BaseChatModel):
http_client: Any | None = Field(default=None, exclude=True)
"""Optional `httpx.Client`.
Only used for sync invocations. Must specify `http_async_client` as well if you'd
like a custom client for async invocations.
Only used for sync invocations. Must specify `http_async_client` as well if
you'd like a custom client for async invocations.
"""
http_async_client: Any | None = Field(default=None, exclude=True)
"""Optional `httpx.AsyncClient`.
Only used for async invocations. Must specify `http_client` as well if you'd like a
custom client for sync invocations.
Only used for async invocations. Must specify `http_client` as well if you'd
like a custom client for sync invocations.
"""
stop: list[str] | str | None = Field(default=None, alias="stop_sequences")
"""Default stop sequences."""
extra_body: Mapping[str, Any] | None = None
"""Optional additional JSON properties to include in the request parameters when
making requests to OpenAI compatible APIs, such as vLLM, LM Studio, or other
providers.
"""Optional additional JSON properties to include in the request parameters
when making requests to OpenAI compatible APIs, such as vLLM, LM Studio, or
other providers.
This is the recommended way to pass custom parameters that are specific to your
OpenAI-compatible API provider but not part of the standard OpenAI API.
Examples:
- [LM Studio](https://lmstudio.ai/) TTL parameter: `extra_body={"ttl": 300}`
- [vLLM](https://github.com/vllm-project/vllm) custom parameters:
`extra_body={"use_beam_search": True}`
- Any other provider-specific parameters
- [LM Studio](https://lmstudio.ai/) TTL parameter: `extra_body={"ttl": 300}`
- [vLLM](https://github.com/vllm-project/vllm) custom parameters:
`extra_body={"use_beam_search": True}`
- Any other provider-specific parameters
!!! warning
Do not use `model_kwargs` for custom parameters that are not part of the
standard OpenAI API, as this will cause errors when making API calls. Use
`extra_body` instead.

View File

@@ -223,6 +223,13 @@ class ChatOpenRouter(BaseChatModel):
streaming: bool = False
"""Whether to stream the results or not."""
stream_usage: bool = True
"""Whether to include usage metadata in streaming output.
If `True`, additional message chunks will be generated during the stream including
usage metadata.
"""
model_kwargs: dict[str, Any] = Field(default_factory=dict)
"""Any extra model parameters for the OpenRouter API."""
@@ -443,7 +450,7 @@ class ChatOpenRouter(BaseChatModel):
response = await self.client.chat.send_async(messages=sdk_messages, **params)
return self._create_chat_result(response)
def _stream( # noqa: C901
def _stream( # noqa: C901, PLR0912
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
@@ -452,6 +459,8 @@ class ChatOpenRouter(BaseChatModel):
) -> Iterator[ChatGenerationChunk]:
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs, "stream": True}
if self.stream_usage:
params["stream_options"] = {"include_usage": True}
_strip_internal_kwargs(params)
sdk_messages = _wrap_messages_for_sdk(message_dicts)
@@ -466,6 +475,18 @@ class ChatOpenRouter(BaseChatModel):
f"(code: {error.get('code', 'unknown')})"
)
raise ValueError(msg)
# Usage-only chunk (no choices) — emit with usage_metadata
if usage := chunk_dict.get("usage"):
usage_metadata = _create_usage_metadata(usage)
usage_chunk = AIMessageChunk(
content="", usage_metadata=usage_metadata
)
generation_chunk = ChatGenerationChunk(message=usage_chunk)
if run_manager:
run_manager.on_llm_new_token(
generation_chunk.text, chunk=generation_chunk
)
yield generation_chunk
continue
choice = chunk_dict["choices"][0]
message_chunk = _convert_chunk_to_message_chunk(
@@ -515,7 +536,7 @@ class ChatOpenRouter(BaseChatModel):
)
yield generation_chunk
async def _astream( # noqa: C901
async def _astream( # noqa: C901, PLR0912
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
@@ -524,6 +545,8 @@ class ChatOpenRouter(BaseChatModel):
) -> AsyncIterator[ChatGenerationChunk]:
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs, "stream": True}
if self.stream_usage:
params["stream_options"] = {"include_usage": True}
_strip_internal_kwargs(params)
sdk_messages = _wrap_messages_for_sdk(message_dicts)
@@ -540,6 +563,18 @@ class ChatOpenRouter(BaseChatModel):
f"(code: {error.get('code', 'unknown')})"
)
raise ValueError(msg)
# Usage-only chunk (no choices) — emit with usage_metadata
if usage := chunk_dict.get("usage"):
usage_metadata = _create_usage_metadata(usage)
usage_chunk = AIMessageChunk(
content="", usage_metadata=usage_metadata
)
generation_chunk = ChatGenerationChunk(message=usage_chunk)
if run_manager:
await run_manager.on_llm_new_token(
token=generation_chunk.text, chunk=generation_chunk
)
yield generation_chunk
continue
choice = chunk_dict["choices"][0]
message_chunk = _convert_chunk_to_message_chunk(

View File

@@ -23,6 +23,7 @@
'request_timeout': 60,
'stop': list([
]),
'stream_usage': True,
'temperature': 0.0,
}),
'lc': 1,

View File

@@ -2831,3 +2831,135 @@ class TestStreamingErrors:
assert any(
"malformed tool call chunk" in str(warning.message) for warning in w
)
class TestStreamUsage:
"""Tests for stream_usage and usage-only chunk handling."""
def test_stream_options_passed_by_default(self) -> None:
"""Test that stream_options with include_usage is sent by default."""
model = _make_model()
model.client = MagicMock()
model.client.chat.send.return_value = _MockSyncStream(
[dict(c) for c in _STREAM_CHUNKS]
)
list(model.stream("Hello"))
call_kwargs = model.client.chat.send.call_args[1]
assert call_kwargs["stream_options"] == {"include_usage": True}
def test_stream_options_not_passed_when_disabled(self) -> None:
"""Test that stream_options is omitted when stream_usage=False."""
model = _make_model(stream_usage=False)
model.client = MagicMock()
model.client.chat.send.return_value = _MockSyncStream(
[dict(c) for c in _STREAM_CHUNKS]
)
list(model.stream("Hello"))
call_kwargs = model.client.chat.send.call_args[1]
assert "stream_options" not in call_kwargs
def test_usage_only_chunk_emitted(self) -> None:
"""Test that a usage-only chunk (no choices) emits usage_metadata."""
model = _make_model()
model.client = MagicMock()
# Content chunks followed by a usage-only chunk (no choices key)
chunks_with_separate_usage: list[dict[str, Any]] = [
{
"choices": [
{"delta": {"role": "assistant", "content": "Hi"}, "index": 0}
],
"model": MODEL_NAME,
"object": "chat.completion.chunk",
"created": 1700000000.0,
"id": "gen-1",
},
{
"choices": [{"delta": {}, "finish_reason": "stop", "index": 0}],
"model": MODEL_NAME,
"object": "chat.completion.chunk",
"created": 1700000000.0,
"id": "gen-1",
},
# Usage-only final chunk — no choices
{
"usage": {
"prompt_tokens": 10,
"completion_tokens": 5,
"total_tokens": 15,
},
"model": MODEL_NAME,
"object": "chat.completion.chunk",
"created": 1700000000.0,
"id": "gen-1",
},
]
model.client.chat.send.return_value = _MockSyncStream(
chunks_with_separate_usage
)
chunks = list(model.stream("Hello"))
# Last chunk should carry usage_metadata
usage_chunks = [c for c in chunks if c.usage_metadata]
assert len(usage_chunks) >= 1
usage = usage_chunks[-1].usage_metadata
assert usage is not None
assert usage["input_tokens"] == 10
assert usage["output_tokens"] == 5
assert usage["total_tokens"] == 15
async def test_astream_options_passed_by_default(self) -> None:
"""Test that async stream sends stream_options by default."""
model = _make_model()
model.client = MagicMock()
model.client.chat.send_async = AsyncMock(
return_value=_MockAsyncStream([dict(c) for c in _STREAM_CHUNKS])
)
chunks = [c async for c in model.astream("Hello")] # noqa: F841
call_kwargs = model.client.chat.send_async.call_args[1]
assert call_kwargs["stream_options"] == {"include_usage": True}
async def test_astream_usage_only_chunk_emitted(self) -> None:
"""Test that an async usage-only chunk emits usage_metadata."""
model = _make_model()
model.client = MagicMock()
chunks_with_separate_usage: list[dict[str, Any]] = [
{
"choices": [
{"delta": {"role": "assistant", "content": "Hi"}, "index": 0}
],
"model": MODEL_NAME,
"object": "chat.completion.chunk",
"created": 1700000000.0,
"id": "gen-1",
},
{
"choices": [{"delta": {}, "finish_reason": "stop", "index": 0}],
"model": MODEL_NAME,
"object": "chat.completion.chunk",
"created": 1700000000.0,
"id": "gen-1",
},
{
"usage": {
"prompt_tokens": 10,
"completion_tokens": 5,
"total_tokens": 15,
},
"model": MODEL_NAME,
"object": "chat.completion.chunk",
"created": 1700000000.0,
"id": "gen-1",
},
]
model.client.chat.send_async = AsyncMock(
return_value=_MockAsyncStream(chunks_with_separate_usage)
)
chunks = [c async for c in model.astream("Hello")]
usage_chunks = [c for c in chunks if c.usage_metadata]
assert len(usage_chunks) >= 1
usage = usage_chunks[-1].usage_metadata
assert usage is not None
assert usage["input_tokens"] == 10
assert usage["output_tokens"] == 5
assert usage["total_tokens"] == 15