Compare commits

...

6 Commits

Author SHA1 Message Date
Mason Daugherty
2715a7499a fix(fireworks): swap undeployed Kimi K2 slug in integration tests (#36975)
Replace `accounts/fireworks/models/kimi-k2-instruct-0905` with
`accounts/fireworks/models/kimi-k2p6` across the Fireworks integration
tests. Fireworks appears to have pulled the 0905 slug from serverless
(returns 404 `NOT_FOUND` despite still appearing "Ready" in their UI);
`kimi-k2p6` is the current deployed successor and supports the same
capabilities used by these tests (tool calls, streaming, structured
output).
2026-04-23 16:08:55 -04:00
Mason Daugherty
2d3b49162c ci(infra): shorten working-directory dropdown labels (#36974)
Clean up the `workflow_dispatch` dropdowns for the release and scheduled
integration-test workflows. Showing short package names (`openai`,
`langchain_v1`, ...) instead of `libs/partners/openai` makes the UI in
the Actions tab easier to scan; the prefix now lives in the resolver
rather than every dropdown entry.
2026-04-23 15:53:17 -04:00
ccurme
3f382a9e20 release(core): 1.3.1 (#36972) 2026-04-23 14:50:43 -04:00
Hunter Lovell
9a671d7919 feat(core): allow _format_output to pass through list of ToolOutputMixin instances (#36963) 2026-04-23 13:49:46 -04:00
Mason Daugherty
bb77a4229f release(openai): 1.2.0 (#36961) 2026-04-22 20:34:21 -04:00
Asamu David
4000c22376 feat(openai): prevent silent streaming hangs in ChatOpenAI (#36949)
> [!IMPORTANT]
> **Behavior change on upgrade — minor bump (`1.1.16` → `1.2.0`).**
>
> Streaming calls now raise `StreamChunkTimeoutError` (a `TimeoutError`
subclass — existing `except TimeoutError:` / `except
asyncio.TimeoutError:` handlers catch it) after 120s of content silence
instead of hanging forever. Opt out with `stream_chunk_timeout=None` or
`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S=0`.
>
> Kernel-level TCP keepalive / `TCP_USER_TIMEOUT` are applied via a
custom `httpx` transport. `httpx` disables its env-proxy auto-detection
(`HTTP_PROXY` / `HTTPS_PROXY` / `ALL_PROXY` / `NO_PROXY` and
macOS/Windows system proxy) whenever a transport is supplied, so to
avoid silently breaking enterprise proxy users, `ChatOpenAI` now detects
the "proxy-env-shadow" shape at construction and **skips the custom
transport entirely** when **all** of these hold:
>
> - `http_socket_options` left at default (`None`)
> - No `http_client` or `http_async_client` supplied
> - No `openai_proxy` supplied
> - A proxy env var / system proxy is visible to httpx
>
> On that shape the instance falls back to pre-PR behavior and env-proxy
auto-detection still applies. A one-time `INFO` records the bypass.
>
> Users who explicitly set `http_socket_options=[...]` alongside an env
proxy still get the shadowed behavior with a one-time `WARNING` log —
they opted in. Full opt-outs below.

---

Streaming chat completions can hang forever when the underlying TCP
connection silently dies mid-stream (idle NAT/LB timeouts, sandboxed
runtimes killing long-lived connections, peer gone without a FIN or
RST). httpx's read timeout doesn't help here because it's reset by any
bytes arriving on the socket, including OpenAI's SSE keepalive comments,
so a stream that's quiet on content but still producing keepalives looks
alive forever.

This PR adds two knobs to `ChatOpenAI`, both on by default with
opt-outs:

- `stream_chunk_timeout` (default 120s): wraps the async streaming
iterator in `asyncio.wait_for` per chunk. Measures the gap between
*parsed* SSE chunks, so keepalives don't reset it. Fires on genuine
content silence and raises `StreamChunkTimeoutError` — a `TimeoutError`
subclass carrying `timeout_s`, `model_name`, and `chunks_received` as
structured attributes (mirrored in the WARNING log's `extra=`) for
alerting without message-regex. Override with the kwarg or
`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S`.
- `http_socket_options`: applies `SO_KEEPALIVE` + `TCP_KEEPIDLE` /
`TCP_KEEPINTVL` / `TCP_KEEPCNT` + `TCP_USER_TIMEOUT` on Linux (macOS
equivalents where available). On platforms missing some options, they're
dropped silently and the remaining set still does useful work.

Pool limits are set explicitly on the custom transport to mirror the
`openai` SDK — without that, passing `transport=` to `httpx.AsyncClient`
silently shrinks the connection pool.

## Behavior change

The default-shape proxy-env bypass (above) covers the common enterprise
case. Beyond that:

- Connections that would previously have hung forever will now error out
via `StreamChunkTimeoutError`.
- Users who explicitly opt into `http_socket_options` while also relying
on env proxies will see a one-time `WARNING` and lose env-proxy
auto-detection — the custom transport shadows it. This is the original
shipped behavior, retained for anyone who *wants* socket tuning on top
of an env-proxied setup.

Full opt-outs:

- `stream_chunk_timeout=None` or
`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S=0`
- `http_socket_options=()` or `LANGCHAIN_OPENAI_TCP_KEEPALIVE=0`
- Supply your own `http_client` **and** `http_async_client`.
`http_socket_options` is applied per side: passing only one still leaves
the other side's default builder getting socket options. Supply both (or
combine with `http_socket_options=()`) to take full control.

Unparseable or negative values for the `LANGCHAIN_OPENAI_*` env vars
fall back to the default with a `WARNING` log rather than silently being
accepted, so a misconfigured environment still boots but the fallback is
discoverable.

---------

Co-authored-by: Mason Daugherty <github@mdrxy.com>
Co-authored-by: Mason Daugherty <mason@langchain.dev>
2026-04-22 20:28:43 -04:00
21 changed files with 2200 additions and 112 deletions

View File

@@ -1,4 +1,10 @@
"""Verify _release.yml dropdown options match actual package directories."""
"""Verify _release.yml dropdown options match actual package directories.
Dropdown options are short names (e.g. `openai`, `core`). The workflow's
`EFFECTIVE_WORKING_DIR` expression re-adds the `libs/` prefix for top-level
packages and `libs/partners/` for everything else. This test reconstructs the
full path for each short name and compares against packages on disk.
"""
from pathlib import Path
@@ -6,6 +12,12 @@ import yaml
REPO_ROOT = Path(__file__).resolve().parents[2]
# Keep in sync with the non-partner allowlist in `EFFECTIVE_WORKING_DIR`
# in `.github/workflows/_release.yml`.
TOP_LEVEL_PACKAGES = frozenset(
{"core", "langchain", "langchain_v1", "text-splitters", "standard-tests", "model-profiles"}
)
def _get_release_options() -> list[str]:
workflow = REPO_ROOT / ".github" / "workflows" / "_release.yml"
@@ -19,6 +31,12 @@ def _get_release_options() -> list[str]:
raise AssertionError(msg) from e
def _expand_option(option: str) -> str:
if option in TOP_LEVEL_PACKAGES:
return f"libs/{option}"
return f"libs/partners/{option}"
def _get_package_dirs() -> set[str]:
libs = REPO_ROOT / "libs"
dirs: set[str] = set()
@@ -36,7 +54,7 @@ def _get_package_dirs() -> set[str]:
def test_release_options_match_packages() -> None:
options = set(_get_release_options())
options = {_expand_option(o) for o in _get_release_options()}
packages = _get_package_dirs()
missing_from_dropdown = packages - options
extra_in_dropdown = options - packages

View File

@@ -19,29 +19,33 @@ on:
required: true
type: choice
description: "From which folder this pipeline executes"
default: "libs/langchain_v1"
default: "langchain_v1"
# Short names only — `EFFECTIVE_WORKING_DIR` below re-adds the `libs/`
# or `libs/partners/` prefix. When adding a new option, also update the
# non-partner allowlist in `EFFECTIVE_WORKING_DIR` if it isn't a partner
# package (partners are the default branch).
options:
- libs/core
- libs/langchain
- libs/langchain_v1
- libs/text-splitters
- libs/standard-tests
- libs/model-profiles
- libs/partners/anthropic
- libs/partners/chroma
- libs/partners/deepseek
- libs/partners/exa
- libs/partners/fireworks
- libs/partners/groq
- libs/partners/huggingface
- libs/partners/mistralai
- libs/partners/nomic
- libs/partners/ollama
- libs/partners/openai
- libs/partners/openrouter
- libs/partners/perplexity
- libs/partners/qdrant
- libs/partners/xai
- core
- langchain
- langchain_v1
- text-splitters
- standard-tests
- model-profiles
- anthropic
- chroma
- deepseek
- exa
- fireworks
- groq
- huggingface
- mistralai
- nomic
- ollama
- openai
- openrouter
- perplexity
- qdrant
- xai
working-directory-override:
required: false
type: string
@@ -61,7 +65,17 @@ env:
PYTHON_VERSION: "3.11"
UV_FROZEN: "true"
UV_NO_SYNC: "true"
EFFECTIVE_WORKING_DIR: ${{ inputs.working-directory-override || inputs.working-directory }}
# Resolves to a full path. Accepts either:
# - `working-directory-override` as a full path (e.g. `libs/partners/partner-xyz`)
# - `working-directory` as a full path (from `workflow_call` callers)
# - `working-directory` as a short dropdown name (from `workflow_dispatch`)
EFFECTIVE_WORKING_DIR: >-
${{
inputs.working-directory-override
|| (startsWith(inputs.working-directory, 'libs/') && inputs.working-directory)
|| (contains(fromJSON('["core","langchain","langchain_v1","text-splitters","standard-tests","model-profiles"]'), inputs.working-directory) && format('libs/{0}', inputs.working-directory))
|| format('libs/partners/{0}', inputs.working-directory)
}}
permissions:
contents: read # Job-level overrides grant write only where needed (mark-release)

View File

@@ -14,29 +14,33 @@ on:
type: choice
description: "Library to test (select from dropdown)"
default: "all"
# Short names only — the `compute-matrix` job re-adds the `libs/` or
# `libs/partners/` prefix. When adding a new option, also update the
# `case` statement in `compute-matrix` if it isn't a partner package
# (partners are the default branch).
options:
- "all"
- "libs/core"
- "libs/langchain"
- "libs/langchain_v1"
- "libs/text-splitters"
- "libs/standard-tests"
- "libs/model-profiles"
- "libs/partners/anthropic"
- "libs/partners/chroma"
- "libs/partners/deepseek"
- "libs/partners/exa"
- "libs/partners/fireworks"
- "libs/partners/groq"
- "libs/partners/huggingface"
- "libs/partners/mistralai"
- "libs/partners/nomic"
- "libs/partners/ollama"
- "libs/partners/openai"
- "libs/partners/openrouter"
- "libs/partners/perplexity"
- "libs/partners/qdrant"
- "libs/partners/xai"
- "core"
- "langchain"
- "langchain_v1"
- "text-splitters"
- "standard-tests"
- "model-profiles"
- "anthropic"
- "chroma"
- "deepseek"
- "exa"
- "fireworks"
- "groq"
- "huggingface"
- "mistralai"
- "nomic"
- "ollama"
- "openai"
- "openrouter"
- "perplexity"
- "qdrant"
- "xai"
working-directory-override:
type: string
description: "Manual override — takes precedence over dropdown (e.g. libs/partners/partner-xyz)"
@@ -101,7 +105,15 @@ jobs:
if [ -n "$WORKING_DIRECTORY_OVERRIDE" ]; then
working_directory="[\"$WORKING_DIRECTORY_OVERRIDE\"]"
elif [ "$WORKING_DIRECTORY_CHOICE" != "all" ]; then
working_directory="[\"$WORKING_DIRECTORY_CHOICE\"]"
# Map short dropdown name back to full path
case "$WORKING_DIRECTORY_CHOICE" in
core|langchain|langchain_v1|text-splitters|standard-tests|model-profiles)
working_directory="[\"libs/$WORKING_DIRECTORY_CHOICE\"]"
;;
*)
working_directory="[\"libs/partners/$WORKING_DIRECTORY_CHOICE\"]"
;;
esac
fi
matrix="{\"python-version\": $python_version, \"working-directory\": $working_directory}"
echo "$matrix"

View File

@@ -1265,8 +1265,15 @@ def _format_output(
status: The execution status.
Returns:
The formatted output, either as a `ToolMessage` or the original content.
The formatted output, either as a `ToolMessage`, the original content,
or an unchanged list of `ToolOutputMixin` instances.
"""
if (
isinstance(content, list)
and content
and all(isinstance(item, ToolOutputMixin) for item in content)
):
return content
if isinstance(content, ToolOutputMixin) or tool_call_id is None:
return content
if not _is_message_content_type(content):

View File

@@ -1,3 +1,3 @@
"""langchain-core version information and utilities."""
VERSION = "1.3.0"
VERSION = "1.3.1"

View File

@@ -21,7 +21,7 @@ classifiers = [
"Topic :: Software Development :: Libraries :: Python Modules",
]
version = "1.3.0"
version = "1.3.1"
requires-python = ">=3.10.0,<4.0.0"
dependencies = [
"langsmith>=0.3.45,<1.0.0",

View File

@@ -58,6 +58,7 @@ from langchain_core.tools.base import (
InjectedToolCallId,
SchemaAnnotationError,
_DirectlyInjectedToolArg,
_format_output,
_is_message_content_block,
_is_message_content_type,
get_all_basemodel_annotations,
@@ -128,6 +129,22 @@ class _MockStructuredTool(BaseTool):
raise NotImplementedError
class _FakeOutput(ToolOutputMixin):
"""Minimal ToolOutputMixin subclass used only in tests."""
def __init__(self, value: int) -> None:
self.value = value
def __eq__(self, other: object) -> bool:
return isinstance(other, _FakeOutput) and self.value == other.value
def __hash__(self) -> int:
return hash(self.value)
def __repr__(self) -> str:
return f"_FakeOutput({self.value})"
def test_structured_args() -> None:
"""Test functionality with structured arguments."""
structured_api = _MockStructuredTool()
@@ -3653,3 +3670,74 @@ def test_tool_default_factory_not_required() -> None:
schema = convert_to_openai_tool(some_func)
params = schema["function"]["parameters"]
assert "names" not in params.get("required", [])
def test_format_output_list_of_tool_messages() -> None:
"""A list of ToolMessages passes through unchanged."""
msgs = [
ToolMessage("a", tool_call_id="1", name="t"),
ToolMessage("b", tool_call_id="2", name="t"),
]
result = _format_output(
msgs, artifact=None, tool_call_id="0", name="t", status="success"
)
assert result is msgs
def test_format_output_list_of_custom_mixin_instances() -> None:
"""A list of custom ToolOutputMixin subclass instances passes through."""
items = [_FakeOutput(1), _FakeOutput(2)]
result = _format_output(
items, artifact=None, tool_call_id="0", name="t", status="success"
)
assert result is items
def test_format_output_mixed_mixin_subclasses() -> None:
"""A list mixing ToolMessage and custom ToolOutputMixin passes through."""
items: list[ToolOutputMixin] = [
ToolMessage("a", tool_call_id="1", name="t"),
_FakeOutput(42),
]
result = _format_output(
items, artifact=None, tool_call_id="0", name="t", status="success"
)
assert result is items
def test_format_output_list_with_non_mixin_element() -> None:
"""A list containing a non-ToolOutputMixin falls through to stringify."""
items = [ToolMessage("a", tool_call_id="1", name="t"), "oops"]
result = _format_output(
items, artifact=None, tool_call_id="0", name="t", status="success"
)
assert isinstance(result, ToolMessage)
assert result.tool_call_id == "0"
def test_format_output_empty_list() -> None:
"""An empty list falls through to stringify-and-wrap."""
result = _format_output(
[], artifact=None, tool_call_id="0", name="t", status="success"
)
assert isinstance(result, ToolMessage)
assert result.tool_call_id == "0"
def test_tool_invoke_returns_list_of_mixin() -> None:
"""End-to-end: a tool returning a list of ToolOutputMixin via invoke."""
@tool
def multi(x: int) -> list:
"""Return multiple outputs."""
return [
ToolMessage(f"result-{i}", tool_call_id=f"sub-{i}", name="multi")
for i in range(x)
]
result = multi.invoke(
{"type": "tool_call", "args": {"x": 3}, "name": "multi", "id": "outer"}
)
assert isinstance(result, list)
assert len(result) == 3
assert all(isinstance(m, ToolMessage) for m in result)

4
libs/core/uv.lock generated
View File

@@ -1,5 +1,5 @@
version = 1
revision = 3
revision = 2
requires-python = ">=3.10.0, <4.0.0"
resolution-markers = [
"python_full_version >= '3.14' and platform_python_implementation == 'PyPy'",
@@ -995,7 +995,7 @@ wheels = [
[[package]]
name = "langchain-core"
version = "1.3.0"
version = "1.3.1"
source = { editable = "." }
dependencies = [
{ name = "jsonpatch" },

View File

@@ -2784,7 +2784,7 @@ wheels = [
[[package]]
name = "langchain-openai"
version = "1.1.15"
version = "1.2.0"
source = { editable = "../partners/openai" }
dependencies = [
{ name = "langchain-core" },

View File

@@ -2391,7 +2391,7 @@ wheels = [
[[package]]
name = "langchain-openai"
version = "1.1.14"
version = "1.2.0"
source = { editable = "../partners/openai" }
dependencies = [
{ name = "langchain-core" },

View File

@@ -532,7 +532,7 @@ typing = [
[[package]]
name = "langchain-core"
version = "1.3.0a2"
version = "1.3.0"
source = { editable = "../core" }
dependencies = [
{ name = "jsonpatch" },
@@ -660,7 +660,7 @@ typing = [
[[package]]
name = "langchain-openai"
version = "1.1.13"
version = "1.2.0"
source = { editable = "../partners/openai" }
dependencies = [
{ name = "langchain-core" },

View File

@@ -21,7 +21,7 @@ _MODEL = "accounts/fireworks/models/gpt-oss-120b"
@pytest.mark.parametrize("strict", [None, True, False])
def test_tool_choice_bool(strict: bool | None) -> None: # noqa: FBT001
"""Test that tool choice is respected with different strict values."""
llm = ChatFireworks(model="accounts/fireworks/models/kimi-k2-instruct-0905")
llm = ChatFireworks(model="accounts/fireworks/models/kimi-k2p6")
class MyTool(BaseModel):
name: str
@@ -59,7 +59,7 @@ def test_tool_choice_bool(strict: bool | None) -> None: # noqa: FBT001
async def test_astream() -> None:
"""Test streaming tokens from ChatFireworks."""
llm = ChatFireworks(model="accounts/fireworks/models/kimi-k2-instruct-0905")
llm = ChatFireworks(model="accounts/fireworks/models/kimi-k2p6")
full: BaseMessageChunk | None = None
chunks_with_token_counts = 0
@@ -157,7 +157,7 @@ def _get_joke_class(
@pytest.mark.parametrize("schema_type", ["pydantic", "typeddict", "json_schema"])
def test_structured_output_json_schema(schema_type: str) -> None:
llm = ChatFireworks(model="accounts/fireworks/models/kimi-k2-instruct-0905")
llm = ChatFireworks(model="accounts/fireworks/models/kimi-k2p6")
schema, validation_function = _get_joke_class(schema_type) # type: ignore[arg-type]
chat = llm.with_structured_output(schema, method="json_schema")

View File

@@ -18,7 +18,7 @@ class TestFireworksStandard(ChatModelIntegrationTests):
@property
def chat_model_params(self) -> dict:
return {
"model": "accounts/fireworks/models/kimi-k2-instruct-0905",
"model": "accounts/fireworks/models/kimi-k2p6",
"temperature": 0,
}

View File

@@ -1,6 +1,7 @@
"""Module for OpenAI integrations."""
from langchain_openai.chat_models import AzureChatOpenAI, ChatOpenAI
from langchain_openai.chat_models._client_utils import StreamChunkTimeoutError
from langchain_openai.embeddings import AzureOpenAIEmbeddings, OpenAIEmbeddings
from langchain_openai.llms import AzureOpenAI, OpenAI
from langchain_openai.tools import custom_tool
@@ -12,5 +13,6 @@ __all__ = [
"ChatOpenAI",
"OpenAI",
"OpenAIEmbeddings",
"StreamChunkTimeoutError",
"custom_tool",
]

View File

@@ -1,23 +1,363 @@
"""Helpers for creating OpenAI API clients.
"""Helpers for OpenAI httpx client construction, transport tuning, and streaming.
This module allows for the caching of httpx clients to avoid creating new instances
for each instance of ChatOpenAI.
Covers cached default client builders, proxy-aware variants for the
`openai_proxy` path, kernel-level TCP keepalive / `TCP_USER_TIMEOUT` socket
options, and the `_astream_with_chunk_timeout` wrapper that bounds per-chunk
wall-clock time on async SSE streams.
Logic is largely replicated from openai._base_client.
Client-builder boilerplate mirrors the patterns in `openai._base_client`;
socket-option tuning and the streaming timeout are original to this module.
"""
from __future__ import annotations
import asyncio
import inspect
import logging
import os
from collections.abc import Awaitable, Callable
import socket
import sys
import urllib.request
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
from functools import lru_cache
from typing import Any, cast
from typing import Any, TypeVar, cast
import httpx
import openai
from pydantic import SecretStr
logger = logging.getLogger(__name__)
SocketOption = tuple[int, int, int]
# socket.TCP_KEEPIDLE etc. are absent on darwin/win32; use raw UAPI constants.
_LINUX_TCP_KEEPIDLE = 4
_LINUX_TCP_KEEPINTVL = 5
_LINUX_TCP_KEEPCNT = 6
_LINUX_TCP_USER_TIMEOUT = 18
# macOS: same semantics, different constants from <netinet/tcp.h>.
_DARWIN_TCP_KEEPALIVE = 0x10 # idle seconds before first probe
_DARWIN_TCP_KEEPINTVL = 0x101
_DARWIN_TCP_KEEPCNT = 0x102
# Mirrors the openai SDK's pool defaults. Hardcoded to avoid depending on
# an internal module path (openai._constants) that can move across SDK versions.
_DEFAULT_CONNECTION_LIMITS = httpx.Limits(
max_connections=1000,
max_keepalive_connections=100,
keepalive_expiry=5.0,
)
def _int_env(name: str, default: int, *, allow_negative: bool = False) -> int:
"""Read an int env var with graceful fallback + discoverable warning.
Unparseable or (by default) negative values fall back to `default` and
emit a single `WARNING` naming the offending variable. A misconfigured
environment still loads, but operators see the fallback in their logs
rather than silently getting a surprising default.
"""
raw = os.environ.get(name)
if raw is None:
return default
try:
value = int(raw)
except (TypeError, ValueError):
logger.warning(
"Invalid value for %s=%r (not an int); falling back to %d.",
name,
raw,
default,
)
return default
if not allow_negative and value < 0:
logger.warning(
"Invalid value for %s=%r (negative); falling back to %d.",
name,
raw,
default,
)
return default
return value
def _float_env(name: str, default: float, *, allow_negative: bool = False) -> float:
"""Read a float env var with graceful fallback + discoverable warning.
See `_int_env`. Negative values are rejected by default so a typo in
`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S=-10` can't silently disable the
wrapper it was meant to configure.
"""
raw = os.environ.get(name)
if raw is None:
return default
try:
value = float(raw)
except (TypeError, ValueError):
logger.warning(
"Invalid value for %s=%r (not a float); falling back to %s.",
name,
raw,
default,
)
return default
if not allow_negative and value < 0:
logger.warning(
"Invalid value for %s=%r (negative); falling back to %s.",
name,
raw,
default,
)
return default
return value
def _filter_supported(opts: list[SocketOption]) -> list[SocketOption]:
"""Drop socket options the running platform rejects.
Probes each option against a throwaway socket via `setsockopt` and keeps
only those the kernel accepts. This keeps the library-computed defaults
non-fatal across platforms that don't implement every Linux option —
`TCP_USER_TIMEOUT` in particular is Linux-only and silently missing on
macOS, some minimal kernels, and older gVisor builds. Dropped options
are logged at `DEBUG` so an operator can confirm whether a kernel-level
knob took effect on their platform.
If the probe socket cannot be created (sandboxed runtimes, `pytest-socket`
under `--disable-socket`, tight seccomp policies), the input list is
returned unfiltered. This preserves the pass-through behavior used for
explicit user overrides: unsupported options will surface as a clear
`OSError` at the first real `connect()` rather than being silently
dropped during `ChatOpenAI` construction.
"""
try:
probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except Exception:
# Broad catch is deliberate: `pytest_socket` under `--disable-socket`
# raises `SocketBlockedError` (a `RuntimeError`, not `OSError`), and
# seccomp/sandboxed runtimes have been observed to raise other
# `OSError` subclasses and `PermissionError`. The intent is "any
# inability to create a probe socket -> pass through unfiltered,"
# and narrowing the type would silently regress sandboxed CI.
return list(opts)
try:
supported: list[SocketOption] = []
dropped: list[SocketOption] = []
for level, optname, optval in opts:
try:
probe.setsockopt(level, optname, optval)
except OSError:
dropped.append((level, optname, optval))
continue
supported.append((level, optname, optval))
if dropped:
logger.debug(
"Dropped %d unsupported socket option(s) on %s: %s",
len(dropped),
sys.platform,
dropped,
)
return supported
finally:
probe.close()
def _default_socket_options() -> tuple[SocketOption, ...]:
"""Return default TCP socket options, or `()` if disabled via env.
Always returns a tuple (never None) so callers and `@lru_cache` keys
remain uniform: `()` is the single shape for "no options".
Target behavior on Linux/gVisor with the full option set: silent peers
are surfaced within ~90-120s via `SO_KEEPALIVE` + `TCP_USER_TIMEOUT`
(keepalive path gives a ~90s floor at the defaults; `TCP_USER_TIMEOUT`
caps at 120s). On platforms that reject some options,
`_filter_supported` drops them and the bound degrades to whatever the
remaining options provide.
"""
if os.environ.get("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "1") == "0":
return ()
keepidle = _int_env("LANGCHAIN_OPENAI_TCP_KEEPIDLE", 60)
keepintvl = _int_env("LANGCHAIN_OPENAI_TCP_KEEPINTVL", 10)
keepcnt = _int_env("LANGCHAIN_OPENAI_TCP_KEEPCNT", 3)
user_timeout_ms = _int_env("LANGCHAIN_OPENAI_TCP_USER_TIMEOUT_MS", 120000)
opts: list[SocketOption] = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
if sys.platform == "linux":
opts += [
(socket.IPPROTO_TCP, _LINUX_TCP_KEEPIDLE, keepidle),
(socket.IPPROTO_TCP, _LINUX_TCP_KEEPINTVL, keepintvl),
(socket.IPPROTO_TCP, _LINUX_TCP_KEEPCNT, keepcnt),
(socket.IPPROTO_TCP, _LINUX_TCP_USER_TIMEOUT, user_timeout_ms),
]
elif sys.platform == "darwin":
opts += [
(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPALIVE, keepidle),
(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPINTVL, keepintvl),
(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPCNT, keepcnt),
]
# Windows (win32): SO_KEEPALIVE only; per-option tuning requires WSAIoctl.
return tuple(_filter_supported(opts))
_PROXY_ENV_VARS = (
"HTTP_PROXY",
"HTTPS_PROXY",
"ALL_PROXY",
"http_proxy",
"https_proxy",
"all_proxy",
)
_proxy_env_warning_emitted = False
_proxy_env_bypass_info_emitted = False
def _proxy_env_detected() -> bool:
"""True when httpx would pick up a proxy from env or system config.
Mirrors the surface httpx reads (`urllib.request.getproxies()` plus the
uppercase env var names) so a positive result means env-proxy
auto-detection is live on pre-PR code paths.
"""
if any(os.environ.get(name) for name in _PROXY_ENV_VARS):
return True
try:
return bool(urllib.request.getproxies())
except Exception:
return False
def _should_bypass_socket_options_for_proxy_env(
*,
http_socket_options: Sequence[SocketOption] | None,
http_client: Any,
http_async_client: Any,
openai_proxy: str | None,
) -> bool:
"""True when default shape + env proxy detected → skip transport injection.
Preserves pre-PR behavior for apps relying on httpx's env-proxy
auto-detection. Only triggers when the user has made no explicit choice
that would signal they want the custom transport:
- `http_socket_options` left at `None` (default, not `()` or a sequence)
- `LANGCHAIN_OPENAI_TCP_KEEPALIVE` is not `0` (kill-switch is its own path)
- No `http_client` or `http_async_client` supplied
- No `openai_proxy` supplied
- A proxy env var / system proxy is visible to httpx
If any of those are set, the user has opted in to the transport path
(directly or via `openai_proxy`) and normal behavior — including the
shadowed-proxy WARNING — applies. When the kill-switch is set,
`_default_socket_options` already returns `()`, so the bypass INFO
would be noise; route through the normal path instead.
"""
if http_socket_options is not None:
return False
if os.environ.get("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "1") == "0":
return False
if http_client is not None or http_async_client is not None:
return False
if openai_proxy:
return False
return _proxy_env_detected()
def _log_proxy_env_bypass_once() -> None:
"""Emit a one-time INFO when the proxy-env bypass triggers.
Visibility for operators running with a custom log pipeline: the bypass
is the *safe* outcome (env-proxy auto-detection preserved), but it means
socket-level keepalive / `TCP_USER_TIMEOUT` aren't applied on this
instance. INFO-level, since it's not a problem — just a diagnostic.
"""
global _proxy_env_bypass_info_emitted
if _proxy_env_bypass_info_emitted:
return
_proxy_env_bypass_info_emitted = True
active = [name for name in _PROXY_ENV_VARS if os.environ.get(name)]
source = ", ".join(active) if active else "system proxy configuration"
logger.info(
"langchain-openai detected %s and no explicit `http_socket_options` / "
"`http_client` / `http_async_client` / `openai_proxy`; skipping the "
"custom `httpx` transport so httpx's env-proxy auto-detection applies. "
"Pass `http_socket_options=[...]` to opt back into kernel-level TCP "
"keepalive tuning on top of the env proxy.",
source,
)
def _warn_if_proxy_env_shadowed(
socket_options: tuple[SocketOption, ...],
*,
openai_proxy: str | None,
) -> None:
"""Warn once if a custom transport will shadow httpx's proxy auto-detection.
When `socket_options` is non-empty we pass a custom `httpx` transport,
which disables httpx's native proxy auto-detection — both the uppercase
`HTTP_PROXY` / `HTTPS_PROXY` / `ALL_PROXY` env vars and their lowercase
equivalents, plus macOS/Windows system proxy config. If the user
supplies `openai_proxy` explicitly we route through it and the env-var
handling is moot. Otherwise, a user whose app was transparently relying
on any of those sources will silently stop using them on upgrade —
emit a single WARNING so the behavior change is discoverable.
Detection uses `urllib.request.getproxies()` — the same surface httpx
reads — so lowercase env vars and macOS/Windows system proxy settings
are caught alongside the uppercase names.
"""
global _proxy_env_warning_emitted
if _proxy_env_warning_emitted or not socket_options or openai_proxy:
return
active = [name for name in _PROXY_ENV_VARS if os.environ.get(name)]
try:
detected = bool(urllib.request.getproxies())
except Exception:
detected = False
if not active and not detected:
return
_proxy_env_warning_emitted = True
if active:
source = ", ".join(active) + " set in environment"
else:
source = "system proxy configuration detected"
logger.warning(
"langchain-openai injected a custom httpx transport to apply "
"`http_socket_options`, which disables httpx's proxy "
"auto-detection (%s). Set "
"`LANGCHAIN_OPENAI_TCP_KEEPALIVE=0` or pass `http_socket_options=()` "
"to restore default proxy behavior, or supply `openai_proxy` / your "
"own `http_client` / `http_async_client` to take full control.",
source,
)
def _resolve_socket_options(
value: Sequence[SocketOption] | None,
) -> tuple[SocketOption, ...]:
"""Normalize the user-facing field to the tuple form builders expect.
- `None` => env-driven defaults (may itself be `()` if the user set
`LANGCHAIN_OPENAI_TCP_KEEPALIVE=0`). This path runs through
`_filter_supported()` inside `_default_socket_options()` because
the library-computed option set is aspirational and silent degradation
is the right posture.
- Any other sequence (including empty) => retupled for cache hashability.
An empty tuple is the explicit "disabled" signal. A non-empty sequence
is passed verbatim — **not** filtered. The user chose these options
explicitly, so an unsupported constant should surface as a clear
`OSError` at connect time, not be silently dropped.
Always returns a tuple — never `None` — so downstream signatures take
`tuple[SocketOption, ...]` with `()` as the single "no options" shape.
"""
if value is None:
return _default_socket_options()
return tuple(value)
class _SyncHttpxClientWrapper(openai.DefaultHttpxClient):
"""Borrowed from openai._base_client."""
@@ -47,43 +387,120 @@ class _AsyncHttpxClientWrapper(openai.DefaultAsyncHttpxClient):
def _build_sync_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _SyncHttpxClientWrapper:
return _SyncHttpxClientWrapper(
base_url=base_url
kwargs: dict[str, Any] = {
"base_url": base_url
or os.environ.get("OPENAI_BASE_URL")
or "https://api.openai.com/v1",
timeout=timeout,
)
"timeout": timeout,
}
if socket_options:
# httpx ignores limits= when transport= is provided; set it explicitly
# on the transport to avoid silently shrinking the connection pool.
kwargs["transport"] = httpx.HTTPTransport(
socket_options=list(socket_options),
limits=_DEFAULT_CONNECTION_LIMITS,
)
return _SyncHttpxClientWrapper(**kwargs)
def _build_async_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _AsyncHttpxClientWrapper:
return _AsyncHttpxClientWrapper(
base_url=base_url
kwargs: dict[str, Any] = {
"base_url": base_url
or os.environ.get("OPENAI_BASE_URL")
or "https://api.openai.com/v1",
timeout=timeout,
"timeout": timeout,
}
if socket_options:
# See _build_sync_httpx_client for the limits= rationale.
kwargs["transport"] = httpx.AsyncHTTPTransport(
socket_options=list(socket_options),
limits=_DEFAULT_CONNECTION_LIMITS,
)
return _AsyncHttpxClientWrapper(**kwargs)
def _build_proxied_sync_httpx_client(
proxy: str,
verify: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> httpx.Client:
"""httpx.Client for the openai_proxy code path.
When socket options are disabled (`()`), returns a plain
`httpx.Client(proxy=..., verify=...)` with no transport injected.
"""
if not socket_options:
return httpx.Client(proxy=proxy, verify=verify)
# Mount under `all://` (not `transport=`) so `Client._mounts` mirrors the
# shape produced by httpx's own `proxy=` path — a single-entry dict keyed
# by `URLPattern("all://")`. Callers (and the existing proxy integration
# test) reach into `_mounts` to introspect the proxy URL; a bare
# `transport=` leaves `_mounts` empty.
#
# `httpx.HTTPTransport(proxy=...)` is stricter about string coercion than
# `httpx.Client(proxy=...)`; wrap in the public `httpx.Proxy` type for
# version-stable behavior.
transport = httpx.HTTPTransport(
proxy=httpx.Proxy(proxy),
verify=verify,
socket_options=list(socket_options),
limits=_DEFAULT_CONNECTION_LIMITS,
)
return httpx.Client(mounts={"all://": transport})
def _build_proxied_async_httpx_client(
proxy: str,
verify: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> httpx.AsyncClient:
"""httpx.AsyncClient for the openai_proxy code path.
See `_build_proxied_sync_httpx_client` for the opt-out fallback,
the `mounts={"all://": ...}` shape, and the `httpx.Proxy` wrapping
rationale.
"""
if not socket_options:
return httpx.AsyncClient(proxy=proxy, verify=verify)
transport = httpx.AsyncHTTPTransport(
proxy=httpx.Proxy(proxy),
verify=verify,
socket_options=list(socket_options),
limits=_DEFAULT_CONNECTION_LIMITS,
)
return httpx.AsyncClient(mounts={"all://": transport})
@lru_cache
def _cached_sync_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _SyncHttpxClientWrapper:
return _build_sync_httpx_client(base_url, timeout)
return _build_sync_httpx_client(base_url, timeout, socket_options)
@lru_cache
def _cached_async_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _AsyncHttpxClientWrapper:
return _build_async_httpx_client(base_url, timeout)
return _build_async_httpx_client(base_url, timeout, socket_options)
def _get_default_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _SyncHttpxClientWrapper:
"""Get default httpx client.
@@ -92,13 +509,15 @@ def _get_default_httpx_client(
try:
hash(timeout)
except TypeError:
return _build_sync_httpx_client(base_url, timeout)
return _build_sync_httpx_client(base_url, timeout, socket_options)
else:
return _cached_sync_httpx_client(base_url, timeout)
return _cached_sync_httpx_client(base_url, timeout, socket_options)
def _get_default_async_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _AsyncHttpxClientWrapper:
"""Get default httpx client.
@@ -107,9 +526,9 @@ def _get_default_async_httpx_client(
try:
hash(timeout)
except TypeError:
return _build_async_httpx_client(base_url, timeout)
return _build_async_httpx_client(base_url, timeout, socket_options)
else:
return _cached_async_httpx_client(base_url, timeout)
return _cached_async_httpx_client(base_url, timeout, socket_options)
def _resolve_sync_and_async_api_keys(
@@ -140,3 +559,127 @@ def _resolve_sync_and_async_api_keys(
async_api_key_value = async_api_key_wrapper
return sync_api_key_value, async_api_key_value
T = TypeVar("T")
# On Python ≤3.10, asyncio.TimeoutError and builtins.TimeoutError are distinct
# hierarchies, so subclassing only asyncio.TimeoutError would not be caught by
# `except TimeoutError:`. On Python ≥3.11 they are the same object, so listing
# both bases would raise TypeError: duplicate base class. We resolve this at
# class-definition time.
_StreamChunkTimeoutBases: tuple[type, ...] = (
(asyncio.TimeoutError,)
if issubclass(asyncio.TimeoutError, TimeoutError)
else (asyncio.TimeoutError, TimeoutError)
)
class StreamChunkTimeoutError(*_StreamChunkTimeoutBases): # type: ignore[misc]
"""Raised when no streaming chunk arrives within `stream_chunk_timeout`.
`issubclass(StreamChunkTimeoutError, asyncio.TimeoutError)` and
`issubclass(StreamChunkTimeoutError, TimeoutError)` both hold on all
supported Python versions, so existing `except asyncio.TimeoutError:`
and `except TimeoutError:` handlers keep catching the exception. On
Python 3.11+ the two exceptions are the same object, so only
`asyncio.TimeoutError` appears in `__bases__`.
Structured attributes (`timeout_s`, `model_name`, `chunks_received`)
mirror the WARNING log's `extra=` payload so diagnostic code doesn't
need to regex the message.
"""
def __init__(
self,
timeout_s: float,
*,
model_name: str | None = None,
chunks_received: int = 0,
) -> None:
self.timeout_s = timeout_s
self.model_name = model_name
self.chunks_received = chunks_received
context = []
if model_name:
context.append(f"model={model_name}")
context.append(f"chunks_received={chunks_received}")
suffix = f" ({', '.join(context)})"
super().__init__(
f"No streaming chunk received for {timeout_s:.1f}s{suffix}. The "
f"connection may be alive at the TCP layer but is not producing "
f"content. Tune or disable via the `stream_chunk_timeout` "
f"constructor kwarg (set to None or 0 to disable) or the "
f"`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S` env var. See also "
f"`http_socket_options` for the kernel-level TCP timeout that "
f"catches dead TCP peers."
)
async def _astream_with_chunk_timeout(
source: AsyncIterator[T],
timeout: float | None,
*,
model_name: str | None = None,
) -> AsyncIterator[T]:
"""Yield from `source` but bound the per-chunk wait time.
If `timeout` is None or <=0, yields directly with no wall-clock bound.
Otherwise, each `__anext__` is wrapped in
`asyncio.wait_for(..., timeout)`. A timeout raises
`StreamChunkTimeoutError` (a `TimeoutError` subclass) whose message
names the knob, the env-var override, the model, and how many chunks
were received before the stall. A single-line structured log also
fires at WARNING so the signal is visible in aggregate logging systems
even when the exception is caught upstream.
When the timeout is active, the source iterator is explicitly
`aclose()`-d on early exit (timeout, consumer break, any exception) so
the underlying httpx streaming connection is released promptly. The
pass-through branch (timeout disabled) relies on httpx's GC-driven
cleanup instead — matching the behavior of unwrapped streams.
"""
if not timeout or timeout <= 0:
async for item in source:
yield item
return
chunks_received = 0
it = source.__aiter__()
try:
while True:
try:
chunk = await asyncio.wait_for(it.__anext__(), timeout=timeout)
except StopAsyncIteration:
return
except asyncio.TimeoutError as e:
logger.warning(
"langchain_openai.stream_chunk_timeout fired",
extra={
"source": "stream_chunk_timeout",
"timeout_s": timeout,
"model_name": model_name,
"chunks_received": chunks_received,
},
)
raise StreamChunkTimeoutError(
timeout,
model_name=model_name,
chunks_received=chunks_received,
) from e
chunks_received += 1
yield chunk
finally:
aclose = getattr(it, "aclose", None)
if aclose is not None:
try:
await aclose()
except Exception as cleanup_exc:
# Best-effort cleanup; don't mask the original exception,
# but leave a DEBUG trace so pool/transport bugs stay
# discoverable at the right log level.
logger.debug(
"aclose() during _astream_with_chunk_timeout cleanup "
"raised; ignoring",
exc_info=cleanup_exc,
)

View File

@@ -124,15 +124,24 @@ from pydantic import (
Field,
SecretStr,
ValidationError,
field_validator,
model_validator,
)
from pydantic.v1 import BaseModel as BaseModelV1
from typing_extensions import Self
from langchain_openai.chat_models._client_utils import (
_astream_with_chunk_timeout,
_build_proxied_async_httpx_client,
_build_proxied_sync_httpx_client,
_float_env,
_get_default_async_httpx_client,
_get_default_httpx_client,
_log_proxy_env_bypass_once,
_resolve_socket_options,
_resolve_sync_and_async_api_keys,
_should_bypass_socket_options_for_proxy_env,
_warn_if_proxy_env_shadowed,
)
from langchain_openai.chat_models._compat import (
_convert_from_v1_to_chat_completions,
@@ -790,6 +799,113 @@ class BaseChatOpenAI(BaseChatModel):
like a custom client for sync invocations.
"""
http_socket_options: Sequence[tuple[int, int, int]] | None = Field(
default=None, exclude=True
)
"""TCP socket options applied to the httpx transports built by this instance.
Defaults to a conservative TCP-keepalive + `TCP_USER_TIMEOUT` profile that
targets a ~2-minute bound on silent connection hangs (silent mid-stream peer
loss, gVisor/NAT idle timeouts, silent TCP black holes) on platforms that
support the full option set. On platforms that only support a subset
(macOS without `TCP_USER_TIMEOUT`, Windows with only `SO_KEEPALIVE`,
minimal kernels), unsupported options are silently dropped and the bound
degrades to whatever the remaining options + OS defaults provide — still
better than indefinite hang.
Accepted values:
- `None` (default): use env-driven defaults. Matches the "unset" convention
used by `http_client` elsewhere on this class.
- `()` (empty): disable socket-option injection entirely. Inherits the OS
defaults and restores httpx's native env-proxy auto-detection.
- A non-empty sequence of `(level, option, value)` tuples: explicit
override; passed verbatim to the transport (not filtered). Unsupported
options raise `OSError` at connect time rather than being silently
dropped — the user chose them explicitly.
Environment variables (only consulted when this field is `None`):
`LANGCHAIN_OPENAI_TCP_KEEPALIVE` (set to `0` to disable entirely — the
kill-switch), `LANGCHAIN_OPENAI_TCP_KEEPIDLE`,
`LANGCHAIN_OPENAI_TCP_KEEPINTVL`, `LANGCHAIN_OPENAI_TCP_KEEPCNT`,
`LANGCHAIN_OPENAI_TCP_USER_TIMEOUT_MS`.
Applied per side: if `http_client` is supplied, the sync path uses
that user-owned client's socket options as-is; the async path still
gets `http_socket_options` applied to its default builder (and
vice-versa for `http_async_client`). Supply both to take full control.
!!! note "Interaction with env-proxy auto-detection"
When a custom `httpx` transport is active, `httpx` disables its
native env-proxy auto-detection (`HTTP_PROXY` / `HTTPS_PROXY` /
`ALL_PROXY` / `NO_PROXY` and macOS/Windows system proxy settings).
To keep the default shape safe, `ChatOpenAI` detects the
"proxy-env-shadow" pattern and **skips the custom transport
entirely** when **all** of the following hold:
- `http_socket_options` is left at its default (`None`)
- No `http_client` or `http_async_client` supplied
- No `openai_proxy` supplied
- A proxy env var or system proxy is visible to httpx
On that specific shape, the instance falls back to pre-PR behavior
and httpx's env-proxy auto-detection applies (a one-time `INFO` log
records the bypass for observability).
If you explicitly set `http_socket_options=[...]` while a proxy
env var is also set, no bypass — you opted into the transport, and
a one-time `WARNING` records the shadowing. Set
`http_socket_options=()` or `LANGCHAIN_OPENAI_TCP_KEEPALIVE=0` to
disable transport injection explicitly, or pass a fully-configured
`http_async_client` / `http_client` to take full control. The
`openai_proxy` constructor kwarg is unaffected — socket options
are applied cleanly through the proxied transport on that path.
"""
stream_chunk_timeout: float | None = Field(
default_factory=lambda: _float_env(
"LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", 120.0
),
exclude=True,
)
"""Per-chunk wall-clock timeout (seconds) on async streaming responses.
Applies to async invocations only (`astream`, `ainvoke` with streaming,
etc.). Sync streaming (`stream`) is not affected.
Fires between content chunks yielded by the openai SDK's streaming iterator
(i.e., each call to `__anext__` on the response). Crucially, this is
**not** the same as httpx's `timeout.read`:
- httpx's read timeout is inter-byte and gets reset every time *any* bytes
arrive on the socket — including OpenAI's SSE keepalive comments
(`: keepalive`) that trickle down during long model generations. A
stream that's silent on *content* but still producing keepalives looks
alive forever to httpx.
- `stream_chunk_timeout` measures the gap between *parsed chunks*. The
openai SDK's SSE parser consumes keepalive comments internally and does
not emit them as chunks, so keepalives do *not* reset this timer. It
fires on genuine content silence.
When it fires, a `StreamChunkTimeoutError`
(subclass of `asyncio.TimeoutError`) is raised with a self-describing
message naming this knob, the env-var override, the model, and the
number of chunks received before the stall. A WARNING log with
`extra={"source": "stream_chunk_timeout", "timeout_s": <value>,
"model_name": <value>, "chunks_received": <value>}` also fires so
aggregate logging can distinguish app-layer timeouts from
transport-layer failures.
Defaults to 120s. Set to `None` or `0` to disable. Overridable via the
`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S` env var. Negative values
(from either the env var or the constructor kwarg — e.g., hydrated
from YAML/JSON configs) fall back to the default with a `WARNING` log
rather than silently disabling the wrapper, so a misconfigured value
still boots safely and the fallback is visible.
"""
stop: list[str] | str | None = Field(default=None, alias="stop_sequences")
"""Default stop sequences."""
@@ -953,6 +1069,27 @@ class BaseChatOpenAI(BaseChatModel):
all_required_field_names = get_pydantic_field_names(cls)
return _build_model_kwargs(values, all_required_field_names)
@field_validator("stream_chunk_timeout", mode="after")
@classmethod
def _validate_stream_chunk_timeout(cls, value: float | None) -> float | None:
"""Reject negative constructor values; fall back to the env-driven default.
Matches the env-var path in `_float_env`: a negative value is a typo,
not an opt-out (`None`/`0` are the documented off switches). Configs
hydrated from YAML/JSON would otherwise silently disable the wrapper
and reintroduce the indefinite-stream hang the feature prevents.
"""
if value is not None and value < 0:
fallback = _float_env("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", 120.0)
logger.warning(
"Invalid `stream_chunk_timeout=%r` (negative); "
"falling back to %s. Pass `None` or `0` to disable.",
value,
fallback,
)
return fallback
return value
@model_validator(mode="before")
@classmethod
def validate_temperature(cls, values: dict[str, Any]) -> Any:
@@ -1055,6 +1192,23 @@ class BaseChatOpenAI(BaseChatModel):
f"{openai_proxy=}\n{http_client=}\n{http_async_client=}"
)
raise ValueError(msg)
if _should_bypass_socket_options_for_proxy_env(
http_socket_options=self.http_socket_options,
http_client=self.http_client,
http_async_client=self.http_async_client,
openai_proxy=self.openai_proxy,
):
# Default-shape construction + proxy env var visible to httpx:
# skip the custom transport so httpx's env-proxy auto-detection
# still applies. Users who want kernel-level TCP tuning alongside
# an env proxy can opt in explicitly via `http_socket_options`.
resolved_socket_options: tuple[tuple[int, int, int], ...] = ()
_log_proxy_env_bypass_once()
else:
resolved_socket_options = _resolve_socket_options(self.http_socket_options)
_warn_if_proxy_env_shadowed(
resolved_socket_options, openai_proxy=self.openai_proxy
)
if not self.client:
if sync_api_key_value is None:
# No valid sync API key, leave client as None and raise informative
@@ -1063,21 +1217,17 @@ class BaseChatOpenAI(BaseChatModel):
self.root_client = None
else:
if self.openai_proxy and not self.http_client:
try:
import httpx
except ImportError as e:
msg = (
"Could not import httpx python package. "
"Please install it with `pip install httpx`."
)
raise ImportError(msg) from e
self.http_client = httpx.Client(
proxy=self.openai_proxy, verify=global_ssl_context
self.http_client = _build_proxied_sync_httpx_client(
proxy=self.openai_proxy,
verify=global_ssl_context,
socket_options=resolved_socket_options,
)
sync_specific = {
"http_client": self.http_client
or _get_default_httpx_client(
self.openai_api_base, self.request_timeout
self.openai_api_base,
self.request_timeout,
resolved_socket_options,
),
"api_key": sync_api_key_value,
}
@@ -1085,21 +1235,17 @@ class BaseChatOpenAI(BaseChatModel):
self.client = self.root_client.chat.completions
if not self.async_client:
if self.openai_proxy and not self.http_async_client:
try:
import httpx
except ImportError as e:
msg = (
"Could not import httpx python package. "
"Please install it with `pip install httpx`."
)
raise ImportError(msg) from e
self.http_async_client = httpx.AsyncClient(
proxy=self.openai_proxy, verify=global_ssl_context
self.http_async_client = _build_proxied_async_httpx_client(
proxy=self.openai_proxy,
verify=global_ssl_context,
socket_options=resolved_socket_options,
)
async_specific = {
"http_client": self.http_async_client
or _get_default_async_httpx_client(
self.openai_api_base, self.request_timeout
self.openai_api_base,
self.request_timeout,
resolved_socket_options,
),
"api_key": async_api_key_value,
}
@@ -1333,7 +1479,11 @@ class BaseChatOpenAI(BaseChatModel):
current_output_index = -1
current_sub_index = -1
has_reasoning = False
async for chunk in response:
async for chunk in _astream_with_chunk_timeout(
response,
self.stream_chunk_timeout,
model_name=self.model_name,
):
metadata = headers if is_first_chunk else {}
(
current_index,
@@ -1684,7 +1834,11 @@ class BaseChatOpenAI(BaseChatModel):
context_manager = response
async with context_manager as response:
is_first_chunk = True
async for chunk in response:
async for chunk in _astream_with_chunk_timeout(
response,
self.stream_chunk_timeout,
model_name=self.model_name,
):
if not isinstance(chunk, dict):
chunk = chunk.model_dump()
generation_chunk = self._convert_chunk_to_generation_chunk(

View File

@@ -20,7 +20,7 @@ classifiers = [
"Topic :: Scientific/Engineering :: Artificial Intelligence",
]
version = "1.1.16"
version = "1.2.0"
requires-python = ">=3.10.0,<4.0.0"
dependencies = [
"langchain-core>=1.3.0,<2.0.0",

View File

@@ -0,0 +1,812 @@
"""Unit tests for `langchain_openai.chat_models._client_utils`.
Asserts socket-options plumbing at the boundary between our helpers and the
httpx layer — not on httpx internals. Locks the wiring, env-driven defaults,
the `()` kill-switch contract, and the precedence between constructor kwargs,
env vars, and user-supplied clients.
"""
from __future__ import annotations
import asyncio
import logging
import os
import socket
from typing import Any
import httpx
import pytest
from langchain_openai import ChatOpenAI
from langchain_openai.chat_models import _client_utils
SOL_SOCKET = socket.SOL_SOCKET
SO_KEEPALIVE = socket.SO_KEEPALIVE
@pytest.fixture(autouse=True)
def _clear_langchain_openai_env(monkeypatch: pytest.MonkeyPatch) -> None:
"""Ensure LANGCHAIN_OPENAI_* env vars don't leak between tests."""
for name in list(os.environ):
if name.startswith("LANGCHAIN_OPENAI_") or name == "OPENAI_API_KEY":
monkeypatch.delenv(name, raising=False)
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
@pytest.mark.skipif(
__import__("sys").platform != "linux",
reason="Default option set is platform-specific; Linux values asserted here.",
)
def test_default_socket_options_linux() -> None:
"""On Linux, the full option set should be present with default values."""
opts = _client_utils._default_socket_options()
expected = {
(SOL_SOCKET, SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, _client_utils._LINUX_TCP_KEEPIDLE, 60),
(socket.IPPROTO_TCP, _client_utils._LINUX_TCP_KEEPINTVL, 10),
(socket.IPPROTO_TCP, _client_utils._LINUX_TCP_KEEPCNT, 3),
(socket.IPPROTO_TCP, _client_utils._LINUX_TCP_USER_TIMEOUT, 120000),
}
assert set(opts) == expected
def test_default_socket_options_disabled_returns_empty_tuple(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Kill-switch: `()` is the single 'no options' shape, never None."""
monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "0")
opts = _client_utils._default_socket_options()
assert opts == ()
assert isinstance(opts, tuple)
@pytest.mark.enable_socket
def test_filter_supported_drops_unsupported() -> None:
"""An option with a deliberately-bogus level should be silently dropped.
Requires a real probe socket, so opt out of the suite-wide
`--disable-socket`. If the probe still cannot be created (unusual
sandboxed runner), the helper falls back to pass-through; assert that
contract explicitly rather than masking the behavior.
"""
good = (SOL_SOCKET, SO_KEEPALIVE, 1)
# Very high level number the kernel will reject.
bogus = (0xDEAD, 0xBEEF, 1)
try:
socket.socket(socket.AF_INET, socket.SOCK_STREAM).close()
except OSError:
pytest.skip("probe socket unavailable in this environment")
result = _client_utils._filter_supported([good, bogus])
assert good in result
assert bogus not in result
def test_build_async_httpx_client_boundary_kwargs(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Did our helper decide to inject a transport or not?"""
recorded: list[dict[str, Any]] = []
original = _client_utils._AsyncHttpxClientWrapper.__init__
def spy(self: Any, **kwargs: Any) -> None:
recorded.append(kwargs)
original(self, **kwargs)
monkeypatch.setattr(_client_utils._AsyncHttpxClientWrapper, "__init__", spy)
_client_utils._build_async_httpx_client(
base_url=None,
timeout=None,
socket_options=((SOL_SOCKET, SO_KEEPALIVE, 1),),
)
assert recorded, "expected one call when socket_options populated"
assert "transport" in recorded[-1]
recorded.clear()
_client_utils._build_async_httpx_client(
base_url=None, timeout=None, socket_options=()
)
assert recorded, "expected one call when socket_options empty"
assert "transport" not in recorded[-1]
def test_build_async_httpx_client_transport_carries_socket_options(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Transport should receive our options + the mirrored limits."""
recorded: list[dict[str, Any]] = []
original_cls = _client_utils.httpx.AsyncHTTPTransport
class Recorder(original_cls): # type: ignore[misc, valid-type]
def __init__(self, *args: Any, **kwargs: Any) -> None:
recorded.append(kwargs)
super().__init__(*args, **kwargs)
monkeypatch.setattr(
"langchain_openai.chat_models._client_utils.httpx.AsyncHTTPTransport",
Recorder,
)
_client_utils._build_async_httpx_client(
base_url=None,
timeout=None,
socket_options=((SOL_SOCKET, SO_KEEPALIVE, 1),),
)
assert recorded, "expected httpx.AsyncHTTPTransport to be constructed"
kwargs = recorded[-1]
assert kwargs.get("socket_options") == [(SOL_SOCKET, SO_KEEPALIVE, 1)]
assert kwargs.get("limits") is _client_utils._DEFAULT_CONNECTION_LIMITS
def test_http_socket_options_none_vs_empty_tuple_vs_populated(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Discriminates the three input shapes at the builder boundary.
Also locks the no-filter contract for user overrides: the populated-case
assertion is verbatim, proving `_resolve_socket_options` does not run
user overrides through `_filter_supported`.
"""
recorded: list[tuple[str, tuple, tuple]] = []
def spy_async(
base_url: str | None,
timeout: Any,
socket_options: tuple = (),
) -> Any:
recorded.append(("async", (base_url, timeout), tuple(socket_options)))
# Return a real (but unused) client so init completes.
return _client_utils._AsyncHttpxClientWrapper(
base_url=base_url or "https://api.openai.com/v1", timeout=timeout
)
def spy_sync(
base_url: str | None,
timeout: Any,
socket_options: tuple = (),
) -> Any:
recorded.append(("sync", (base_url, timeout), tuple(socket_options)))
return _client_utils._SyncHttpxClientWrapper(
base_url=base_url or "https://api.openai.com/v1", timeout=timeout
)
monkeypatch.setattr(
"langchain_openai.chat_models.base._get_default_async_httpx_client",
spy_async,
)
monkeypatch.setattr(
"langchain_openai.chat_models.base._get_default_httpx_client",
spy_sync,
)
# (1) Unset -> None -> env-driven defaults (non-empty on linux/darwin CI).
ChatOpenAI(model="gpt-4o")
assert recorded, "expected a default-client build"
_, _, opts1 = recorded[-1]
assert isinstance(opts1, tuple)
# (2) Explicit empty tuple -> ().
recorded.clear()
ChatOpenAI(model="gpt-4o", http_socket_options=())
assert recorded
assert all(opts == () for _, _, opts in recorded)
# (3) Populated sequence -> verbatim passthrough (not filtered).
recorded.clear()
ChatOpenAI(
model="gpt-4o",
http_socket_options=[(SOL_SOCKET, SO_KEEPALIVE, 1)],
)
assert recorded
for _, _, opts in recorded:
assert opts == ((SOL_SOCKET, SO_KEEPALIVE, 1),)
def test_openai_proxy_branch_applies_socket_options(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""`openai_proxy` path must go through the socket-options-aware proxied helper."""
recorded: list[dict[str, Any]] = []
def spy(proxy: str, verify: Any, socket_options: tuple = ()) -> httpx.AsyncClient:
recorded.append(
{"proxy": proxy, "verify": verify, "socket_options": socket_options}
)
return httpx.AsyncClient()
monkeypatch.setattr(
"langchain_openai.chat_models.base._build_proxied_async_httpx_client",
spy,
)
# Sync branch should also be covered — spy on that too.
sync_recorded: list[dict[str, Any]] = []
def sync_spy(proxy: str, verify: Any, socket_options: tuple = ()) -> httpx.Client:
sync_recorded.append(
{"proxy": proxy, "verify": verify, "socket_options": socket_options}
)
return httpx.Client()
monkeypatch.setattr(
"langchain_openai.chat_models.base._build_proxied_sync_httpx_client",
sync_spy,
)
ChatOpenAI(
model="gpt-4o",
openai_proxy="http://proxy.example.com:3128",
http_socket_options=[(SOL_SOCKET, SO_KEEPALIVE, 1)],
)
assert recorded, "expected async proxied helper to be called"
assert recorded[-1]["proxy"] == "http://proxy.example.com:3128"
assert recorded[-1]["socket_options"] == ((SOL_SOCKET, SO_KEEPALIVE, 1),)
assert sync_recorded, "expected sync proxied helper to be called"
assert sync_recorded[-1]["proxy"] == "http://proxy.example.com:3128"
assert sync_recorded[-1]["socket_options"] == ((SOL_SOCKET, SO_KEEPALIVE, 1),)
def test_user_supplied_http_async_client_untouched(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""If the user passes an http_async_client, we must not mutate it."""
default_calls: list[Any] = []
proxied_calls: list[Any] = []
def default_async_spy(*args: Any, **kwargs: Any) -> Any:
default_calls.append((args, kwargs))
msg = "default async builder should not run"
raise AssertionError(msg)
def proxied_async_spy(*args: Any, **kwargs: Any) -> Any:
proxied_calls.append((args, kwargs))
msg = "proxied async builder should not run"
raise AssertionError(msg)
monkeypatch.setattr(
"langchain_openai.chat_models.base._get_default_async_httpx_client",
default_async_spy,
)
monkeypatch.setattr(
"langchain_openai.chat_models.base._build_proxied_async_httpx_client",
proxied_async_spy,
)
user_client = httpx.AsyncClient()
user_sync_client = httpx.Client()
model = ChatOpenAI(
model="gpt-4o",
http_client=user_sync_client,
http_async_client=user_client,
http_socket_options=[(SOL_SOCKET, SO_KEEPALIVE, 1)],
)
assert default_calls == []
assert proxied_calls == []
assert model.http_async_client is user_client
assert model.http_client is user_sync_client
def test_default_path_opt_out_is_strict_noop(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""With LANGCHAIN_OPENAI_TCP_KEEPALIVE=0 we inject no transport.
Boundary assertion on `_AsyncHttpxClientWrapper.__init__` kwargs — our
helper passed nothing, so httpx falls back to its own native behavior
(env-proxy handling, pool defaults, trust_env, etc.) completely
unaffected by this library.
"""
monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "0")
recorded_sync: list[dict[str, Any]] = []
recorded_async: list[dict[str, Any]] = []
sync_original = _client_utils._SyncHttpxClientWrapper.__init__
async_original = _client_utils._AsyncHttpxClientWrapper.__init__
def sync_spy(self: Any, **kwargs: Any) -> None:
recorded_sync.append(kwargs)
sync_original(self, **kwargs)
def async_spy(self: Any, **kwargs: Any) -> None:
recorded_async.append(kwargs)
async_original(self, **kwargs)
monkeypatch.setattr(_client_utils._SyncHttpxClientWrapper, "__init__", sync_spy)
monkeypatch.setattr(_client_utils._AsyncHttpxClientWrapper, "__init__", async_spy)
ChatOpenAI(model="gpt-4o")
assert recorded_sync, "expected the sync default client to be built"
assert "transport" not in recorded_sync[-1]
assert recorded_async, "expected the async default client to be built"
assert "transport" not in recorded_async[-1]
def test_invalid_env_values_degrade_safely(monkeypatch: pytest.MonkeyPatch) -> None:
"""Garbage in LANGCHAIN_OPENAI_TCP_* env vars must not crash model init."""
monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPIDLE", "not-an-int")
monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPINTVL", "")
monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPCNT", "NaN")
monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_USER_TIMEOUT_MS", "abc")
opts = _client_utils._default_socket_options()
assert isinstance(opts, tuple)
# Fallback values (60/10/3/120000) are used; on Linux, the full option
# set should still be present because the fallbacks are valid.
# (Windows/darwin may filter some options; at minimum SO_KEEPALIVE
# survives.)
assert (SOL_SOCKET, SO_KEEPALIVE, 1) in opts
# Instantiating a model doesn't raise.
ChatOpenAI(model="gpt-4o")
def test_invalid_stream_chunk_timeout_env_degrades_safely(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Garbage in LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S must not crash init."""
monkeypatch.setenv("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", "not-a-float")
model = ChatOpenAI(model="gpt-4o")
assert model.stream_chunk_timeout == 120.0
def test_default_socket_options_darwin(monkeypatch: pytest.MonkeyPatch) -> None:
"""macOS: `TCP_USER_TIMEOUT` is unavailable, but keepalive trio maps to darwin."""
monkeypatch.setattr(_client_utils.sys, "platform", "darwin")
opts = _client_utils._default_socket_options()
assert (SOL_SOCKET, SO_KEEPALIVE, 1) in opts
darwin_keepalive = (
socket.IPPROTO_TCP,
_client_utils._DARWIN_TCP_KEEPALIVE,
60,
)
assert darwin_keepalive in opts or opts == ((SOL_SOCKET, SO_KEEPALIVE, 1),)
def test_default_socket_options_other_platform(monkeypatch: pytest.MonkeyPatch) -> None:
"""Unknown platform (e.g. win32): `SO_KEEPALIVE` only."""
monkeypatch.setattr(_client_utils.sys, "platform", "win32")
opts = _client_utils._default_socket_options()
assert opts in (((SOL_SOCKET, SO_KEEPALIVE, 1),), ())
def test_filter_supported_probe_failure_returns_unfiltered(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Contract: probe-socket failure -> input is returned verbatim."""
def _raise(*args: Any, **kwargs: Any) -> None:
msg = "sandboxed"
raise OSError(msg)
monkeypatch.setattr(_client_utils.socket, "socket", _raise)
good = (SOL_SOCKET, SO_KEEPALIVE, 1)
bogus = (0xDEAD, 0xBEEF, 1)
result = _client_utils._filter_supported([good, bogus])
assert result == [good, bogus]
def test_invalid_tcp_env_emits_warning(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Int env fallback must log a WARNING naming the offending variable."""
monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPIDLE", "not-an-int")
caplog.set_level(
logging.WARNING, logger="langchain_openai.chat_models._client_utils"
)
_client_utils._default_socket_options()
assert any(
"LANGCHAIN_OPENAI_TCP_KEEPIDLE" in r.getMessage()
for r in caplog.records
if r.levelno == logging.WARNING
)
def test_negative_tcp_env_is_rejected(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Negative keepalive counts fall back to the default with a WARNING."""
monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPCNT", "-5")
caplog.set_level(
logging.WARNING, logger="langchain_openai.chat_models._client_utils"
)
value = _client_utils._int_env("LANGCHAIN_OPENAI_TCP_KEEPCNT", 3)
assert value == 3
assert any(
"negative" in r.getMessage().lower()
for r in caplog.records
if r.levelno == logging.WARNING
)
@pytest.mark.enable_socket
def test_filter_supported_logs_drops_at_debug(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Dropped options are visible at DEBUG so a macOS user can confirm the filter."""
try:
socket.socket(socket.AF_INET, socket.SOCK_STREAM).close()
except OSError:
pytest.skip("probe socket unavailable in this environment")
caplog.set_level(logging.DEBUG, logger="langchain_openai.chat_models._client_utils")
good = (SOL_SOCKET, SO_KEEPALIVE, 1)
bogus = (0xDEAD, 0xBEEF, 1)
_client_utils._filter_supported([good, bogus])
assert any(
"Dropped" in r.getMessage()
for r in caplog.records
if r.levelno == logging.DEBUG
)
def test_build_proxied_async_httpx_client_opt_out_returns_plain_client() -> None:
"""Empty socket_options -> plain httpx.AsyncClient, no transport injection."""
client = _client_utils._build_proxied_async_httpx_client(
proxy="http://proxy.example:3128",
verify=True,
socket_options=(),
)
assert isinstance(client, httpx.AsyncClient)
def test_build_proxied_async_httpx_client_wraps_transport() -> None:
"""Non-empty socket_options -> real httpx.AsyncHTTPTransport wiring executes.
Exercises the proxy-wrapping bodies end-to-end so a change to httpx's
`Proxy`/transport signatures would surface here, not at connect time.
"""
client = _client_utils._build_proxied_async_httpx_client(
proxy="http://proxy.example:3128",
verify=True,
socket_options=((SOL_SOCKET, SO_KEEPALIVE, 1),),
)
assert isinstance(client, httpx.AsyncClient)
def test_build_proxied_sync_httpx_client_opt_out_returns_plain_client() -> None:
client = _client_utils._build_proxied_sync_httpx_client(
proxy="http://proxy.example:3128",
verify=True,
socket_options=(),
)
assert isinstance(client, httpx.Client)
def test_build_proxied_sync_httpx_client_wraps_transport() -> None:
client = _client_utils._build_proxied_sync_httpx_client(
proxy="http://proxy.example:3128",
verify=True,
socket_options=((SOL_SOCKET, SO_KEEPALIVE, 1),),
)
assert isinstance(client, httpx.Client)
def test_warn_if_proxy_env_shadowed_emits_once(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""One WARNING per process when a proxy env var is shadowed by our transport."""
monkeypatch.setenv("HTTP_PROXY", "http://proxy.example:3128")
monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False)
caplog.set_level(
logging.WARNING, logger="langchain_openai.chat_models._client_utils"
)
opts = ((SOL_SOCKET, SO_KEEPALIVE, 1),)
_client_utils._warn_if_proxy_env_shadowed(opts, openai_proxy=None)
_client_utils._warn_if_proxy_env_shadowed(opts, openai_proxy=None)
warnings = [
r
for r in caplog.records
if r.levelno == logging.WARNING and "HTTP_PROXY" in r.getMessage()
]
assert len(warnings) == 1
def test_warn_if_proxy_env_shadowed_detects_lowercase(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Lowercase `http_proxy` is picked up by httpx; the warning must fire for it."""
for name in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY"):
monkeypatch.delenv(name, raising=False)
monkeypatch.setenv("http_proxy", "http://proxy.example:3128")
monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False)
caplog.set_level(
logging.WARNING, logger="langchain_openai.chat_models._client_utils"
)
opts = ((SOL_SOCKET, SO_KEEPALIVE, 1),)
_client_utils._warn_if_proxy_env_shadowed(opts, openai_proxy=None)
warnings = [
r
for r in caplog.records
if r.levelno == logging.WARNING and "http_proxy" in r.getMessage()
]
assert len(warnings) == 1
def test_warn_if_proxy_env_shadowed_detects_system_proxy(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""macOS/Windows system proxies shadow the transport too; warning should fire."""
for name in _client_utils._PROXY_ENV_VARS:
monkeypatch.delenv(name, raising=False)
monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False)
monkeypatch.setattr(
_client_utils.urllib.request,
"getproxies",
lambda: {"http": "http://system.proxy:3128"},
)
caplog.set_level(
logging.WARNING, logger="langchain_openai.chat_models._client_utils"
)
opts = ((SOL_SOCKET, SO_KEEPALIVE, 1),)
_client_utils._warn_if_proxy_env_shadowed(opts, openai_proxy=None)
warnings = [
r
for r in caplog.records
if r.levelno == logging.WARNING and "system proxy" in r.getMessage()
]
assert len(warnings) == 1
def test_warn_if_proxy_env_shadowed_skipped_when_openai_proxy_set(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Explicit `openai_proxy` suppresses the warn (proxy handling is controlled)."""
monkeypatch.setenv("HTTP_PROXY", "http://proxy.example:3128")
monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False)
caplog.set_level(
logging.WARNING, logger="langchain_openai.chat_models._client_utils"
)
opts = ((SOL_SOCKET, SO_KEEPALIVE, 1),)
_client_utils._warn_if_proxy_env_shadowed(
opts, openai_proxy="http://proxy.example:3128"
)
assert not [r for r in caplog.records if r.levelno == logging.WARNING]
def test_proxy_env_bypass_default_shape_triggers(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Default-shape + env proxy => bypass socket-option transport."""
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128")
assert _client_utils._should_bypass_socket_options_for_proxy_env(
http_socket_options=None,
http_client=None,
http_async_client=None,
openai_proxy=None,
)
def test_proxy_env_bypass_no_env_does_not_trigger(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""No proxy env/system proxy => no bypass, even with everything else default."""
for name in _client_utils._PROXY_ENV_VARS:
monkeypatch.delenv(name, raising=False)
monkeypatch.setattr(_client_utils.urllib.request, "getproxies", dict)
assert not _client_utils._should_bypass_socket_options_for_proxy_env(
http_socket_options=None,
http_client=None,
http_async_client=None,
openai_proxy=None,
)
def test_proxy_env_bypass_blocked_by_explicit_socket_options(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Explicit `http_socket_options` => user opted in, no bypass."""
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128")
assert not _client_utils._should_bypass_socket_options_for_proxy_env(
http_socket_options=[(SOL_SOCKET, SO_KEEPALIVE, 1)],
http_client=None,
http_async_client=None,
openai_proxy=None,
)
# Empty tuple is also an explicit choice (kill-switch), no bypass.
assert not _client_utils._should_bypass_socket_options_for_proxy_env(
http_socket_options=(),
http_client=None,
http_async_client=None,
openai_proxy=None,
)
def test_proxy_env_bypass_blocked_by_kill_switch(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""`LANGCHAIN_OPENAI_TCP_KEEPALIVE=0` => kill-switch owns the disable path."""
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128")
monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "0")
assert not _client_utils._should_bypass_socket_options_for_proxy_env(
http_socket_options=None,
http_client=None,
http_async_client=None,
openai_proxy=None,
)
def test_proxy_env_bypass_blocked_by_user_http_client(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Any user-supplied http(_async)_client => user opted in, no bypass."""
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128")
user_client = httpx.Client()
try:
assert not _client_utils._should_bypass_socket_options_for_proxy_env(
http_socket_options=None,
http_client=user_client,
http_async_client=None,
openai_proxy=None,
)
finally:
user_client.close()
async_client = httpx.AsyncClient()
try:
assert not _client_utils._should_bypass_socket_options_for_proxy_env(
http_socket_options=None,
http_client=None,
http_async_client=async_client,
openai_proxy=None,
)
finally:
asyncio.run(async_client.aclose())
def test_proxy_env_bypass_blocked_by_openai_proxy(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""`openai_proxy` handles proxying explicitly => no bypass."""
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128")
assert not _client_utils._should_bypass_socket_options_for_proxy_env(
http_socket_options=None,
http_client=None,
http_async_client=None,
openai_proxy="http://openai.proxy:3128",
)
def test_proxy_env_bypass_detects_lowercase_env(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Lowercase `https_proxy` also triggers the bypass."""
for name in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY"):
monkeypatch.delenv(name, raising=False)
monkeypatch.setenv("https_proxy", "http://proxy.example:3128")
assert _client_utils._should_bypass_socket_options_for_proxy_env(
http_socket_options=None,
http_client=None,
http_async_client=None,
openai_proxy=None,
)
def test_proxy_env_bypass_detects_system_proxy(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""macOS/Windows system proxy config triggers the bypass too."""
for name in _client_utils._PROXY_ENV_VARS:
monkeypatch.delenv(name, raising=False)
monkeypatch.setattr(
_client_utils.urllib.request,
"getproxies",
lambda: {"http": "http://system.proxy:3128"},
)
assert _client_utils._should_bypass_socket_options_for_proxy_env(
http_socket_options=None,
http_client=None,
http_async_client=None,
openai_proxy=None,
)
def test_log_proxy_env_bypass_once_emits_info_once(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""One INFO per process when the bypass kicks in."""
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128")
monkeypatch.setattr(_client_utils, "_proxy_env_bypass_info_emitted", False)
caplog.set_level(logging.INFO, logger="langchain_openai.chat_models._client_utils")
_client_utils._log_proxy_env_bypass_once()
_client_utils._log_proxy_env_bypass_once()
infos = [
r
for r in caplog.records
if r.levelno == logging.INFO and "HTTPS_PROXY" in r.getMessage()
]
assert len(infos) == 1
def test_client_build_skips_transport_on_proxy_env_default_shape(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""End-to-end: default-shape ChatOpenAI + HTTPS_PROXY => no custom transport.
Locks that the bypass wiring in `base.py` actually prevents the default
builder from installing `httpx.HTTPTransport(socket_options=...)`. The
async client's `_transport` (or underlying mount) should be httpx's
default, not ours.
"""
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128")
# Neutralise module-level latches so repeated runs still exercise logging.
monkeypatch.setattr(_client_utils, "_proxy_env_bypass_info_emitted", False)
monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False)
# Clear cached builder results so env changes take effect.
_client_utils._cached_sync_httpx_client.cache_clear()
_client_utils._cached_async_httpx_client.cache_clear()
recorded: list[tuple[Any, ...]] = []
original_build = _client_utils._build_async_httpx_client
def spy(
base_url: str | None,
timeout: Any,
socket_options: tuple = (),
) -> Any:
recorded.append(socket_options)
return original_build(base_url, timeout, socket_options)
monkeypatch.setattr(_client_utils, "_build_async_httpx_client", spy)
# `_get_default_async_httpx_client` reaches the cached builder directly,
# which ignores our module-level patch; bypass the cache to route through
# the spy.
monkeypatch.setattr(
_client_utils,
"_cached_async_httpx_client",
spy,
)
ChatOpenAI(model="gpt-5.1")
assert recorded, "async builder should have been called"
assert all(opts == () for opts in recorded), (
f"expected bypass (no socket options), got {recorded!r}"
)
def test_client_build_applies_socket_options_when_user_opts_in(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Explicit `http_socket_options` => transport applied, bypass skipped."""
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128")
monkeypatch.setattr(_client_utils, "_proxy_env_bypass_info_emitted", False)
monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False)
_client_utils._cached_sync_httpx_client.cache_clear()
_client_utils._cached_async_httpx_client.cache_clear()
recorded: list[tuple[Any, ...]] = []
original_build = _client_utils._build_async_httpx_client
def spy(
base_url: str | None,
timeout: Any,
socket_options: tuple = (),
) -> Any:
recorded.append(socket_options)
return original_build(base_url, timeout, socket_options)
monkeypatch.setattr(_client_utils, "_build_async_httpx_client", spy)
monkeypatch.setattr(_client_utils, "_cached_async_httpx_client", spy)
explicit = [(SOL_SOCKET, SO_KEEPALIVE, 1)]
ChatOpenAI(model="gpt-5.1", http_socket_options=explicit)
assert recorded, "async builder should have been called"
assert all(tuple(opts) == tuple(explicit) for opts in recorded), (
f"expected user-supplied opts, got {recorded!r}"
)

View File

@@ -0,0 +1,437 @@
"""Unit tests for `_astream_with_chunk_timeout` and `StreamChunkTimeoutError`.
- Pass-through when items arrive in time.
- Timeout fires with a self-describing message + subclasses TimeoutError.
- Structured WARNING log carries `source=stream_chunk_timeout` +
`timeout_s` so aggregate logging can distinguish app-layer from
transport-layer timeouts.
- Source iterator's `aclose()` is called on early exit to release the
underlying httpx connection promptly.
- Garbage in `LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S` degrades safely.
"""
from __future__ import annotations
import asyncio
import logging
from collections.abc import AsyncGenerator
from types import TracebackType
from typing import Any, cast
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from typing_extensions import Self
from langchain_openai import ChatOpenAI
from langchain_openai.chat_models._client_utils import (
StreamChunkTimeoutError,
_astream_with_chunk_timeout,
)
MODEL = "gpt-5.4"
class _FakeSource:
"""AsyncIterator with an observable aclose() for leak-testing."""
def __init__(self, items: list[Any], per_item_sleep: float = 0.0) -> None:
self._items = list(items)
self._sleep = per_item_sleep
self.aclose_count = 0
def __aiter__(self) -> _FakeSource:
return self
async def __anext__(self) -> Any:
if self._sleep:
await asyncio.sleep(self._sleep)
if not self._items:
raise StopAsyncIteration
return self._items.pop(0)
async def aclose(self) -> None:
self.aclose_count += 1
@pytest.mark.asyncio
async def test_astream_with_chunk_timeout_passes_through() -> None:
"""Fast source + generous timeout: every item should be delivered."""
source = _FakeSource(["a", "b", "c"], per_item_sleep=0.0)
collected = [item async for item in _astream_with_chunk_timeout(source, 5.0)]
assert collected == ["a", "b", "c"]
@pytest.mark.asyncio
async def test_astream_with_chunk_timeout_disabled_passes_through() -> None:
"""timeout=None / timeout=0 disables the bound; still iterates normally."""
source_none = _FakeSource(["a", "b"])
collected_none = [
item async for item in _astream_with_chunk_timeout(source_none, None)
]
assert collected_none == ["a", "b"]
source_zero = _FakeSource(["x", "y"])
collected_zero = [
item async for item in _astream_with_chunk_timeout(source_zero, 0.0)
]
assert collected_zero == ["x", "y"]
@pytest.mark.asyncio
async def test_astream_with_chunk_timeout_fires() -> None:
"""Slow source + tight timeout: `StreamChunkTimeoutError` fires."""
source = _FakeSource(["a", "b"], per_item_sleep=0.2)
with pytest.raises(StreamChunkTimeoutError) as exc_info:
async for _ in _astream_with_chunk_timeout(source, 0.05):
pass
# Backward-compat: existing `except TimeoutError:` handlers must still catch.
assert isinstance(exc_info.value, asyncio.TimeoutError)
assert isinstance(exc_info.value, TimeoutError)
# Self-describing message names the knob and env var so operators can act.
msg = str(exc_info.value)
assert "stream_chunk_timeout" in msg
assert "LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S" in msg
@pytest.mark.asyncio
async def test_astream_with_chunk_timeout_logs_on_fire(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Structured log carries source + timeout_s for aggregate-log filtering."""
# Pin the logger + level; don't rely on caplog's default or module
# inheritance so the test can't silently no-op.
caplog.set_level(
logging.WARNING, logger="langchain_openai.chat_models._client_utils"
)
source = _FakeSource(["a"], per_item_sleep=0.2)
with pytest.raises(StreamChunkTimeoutError):
async for _ in _astream_with_chunk_timeout(source, 0.05):
pass
records = [
r
for r in caplog.records
if r.name == "langchain_openai.chat_models._client_utils"
and getattr(r, "source", None) == "stream_chunk_timeout"
]
assert len(records) == 1, f"expected one structured record, got {len(records)}"
record = records[0]
assert record.levelno == logging.WARNING
assert record.__dict__["timeout_s"] == 0.05
@pytest.mark.asyncio
async def test_astream_with_chunk_timeout_closes_source_on_early_exit() -> None:
"""aclose() is called on early exit so the httpx connection is released promptly.
Covers both the timeout-fires path and the consumer-closes-wrapper path.
"""
# Case 1: timeout fires -> aclose() propagates.
timed_out_source = _FakeSource(["a"], per_item_sleep=0.2)
with pytest.raises(StreamChunkTimeoutError):
async for _ in _astream_with_chunk_timeout(timed_out_source, 0.05):
pass
assert timed_out_source.aclose_count == 1
# Case 2: consumer explicitly closes the wrapper after one yield.
closer_source = _FakeSource(["a", "b", "c"], per_item_sleep=0.0)
# Cast to AsyncGenerator so mypy sees the aclose() method; the helper
# is always implemented as an async generator at runtime.
wrapper = cast(
"AsyncGenerator[Any, None]",
_astream_with_chunk_timeout(closer_source, 5.0),
)
got = await wrapper.__anext__()
assert got == "a"
await wrapper.aclose()
assert closer_source.aclose_count == 1
def test_invalid_stream_chunk_timeout_env_degrades_safely(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Garbage env var -> model init succeeds with the 120s default."""
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
monkeypatch.setenv("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", "not-a-float")
model = ChatOpenAI(model=MODEL)
assert model.stream_chunk_timeout == 120.0
def test_stream_chunk_timeout_env_kill_switch_zero(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Env-var kill-switch: `_S=0` should disable the wrapper on the model."""
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
monkeypatch.setenv("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", "0")
model = ChatOpenAI(model=MODEL)
assert model.stream_chunk_timeout == 0.0
def test_stream_chunk_timeout_kwarg_none_disables(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Constructor kwarg opt-out: `stream_chunk_timeout=None` persists."""
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
model = ChatOpenAI(model=MODEL, stream_chunk_timeout=None)
assert model.stream_chunk_timeout is None
def test_stream_chunk_timeout_error_has_structured_attrs() -> None:
"""Structured payload mirrors the log `extra=`; no message-regex needed."""
err = StreamChunkTimeoutError(0.5, model_name=MODEL, chunks_received=3)
assert err.timeout_s == 0.5
assert err.model_name == "gpt-5.4"
assert err.chunks_received == 3
text = str(err)
assert "gpt-5.4" in text
assert "chunks_received=3" in text
@pytest.mark.asyncio
async def test_astream_with_chunk_timeout_threads_model_name(
caplog: pytest.LogCaptureFixture,
) -> None:
"""`model_name` flows into both the raised error and the structured log."""
caplog.set_level(
logging.WARNING, logger="langchain_openai.chat_models._client_utils"
)
source = _FakeSource(["a", "b"], per_item_sleep=0.2)
with pytest.raises(StreamChunkTimeoutError) as exc_info:
async for _ in _astream_with_chunk_timeout(
source, 0.05, model_name="gpt-4o-mini"
):
pass
assert exc_info.value.model_name == "gpt-4o-mini"
records = [
r
for r in caplog.records
if getattr(r, "source", None) == "stream_chunk_timeout"
]
assert records
assert records[0].__dict__["model_name"] == "gpt-4o-mini"
def test_invalid_stream_chunk_timeout_env_emits_warning(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Fallback is logged at WARNING so the typo is discoverable."""
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
monkeypatch.setenv("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", "nonsense")
caplog.set_level(
logging.WARNING, logger="langchain_openai.chat_models._client_utils"
)
ChatOpenAI(model=MODEL)
assert any(
"LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S" in r.getMessage()
for r in caplog.records
if r.levelno == logging.WARNING
)
def test_negative_stream_chunk_timeout_env_rejected(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Negative timeout typo must not silently disable the wrapper."""
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
monkeypatch.setenv("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", "-10")
caplog.set_level(
logging.WARNING, logger="langchain_openai.chat_models._client_utils"
)
model = ChatOpenAI(model=MODEL)
assert model.stream_chunk_timeout == 120.0
assert any(
"negative" in r.getMessage().lower()
for r in caplog.records
if r.levelno == logging.WARNING
)
def test_negative_stream_chunk_timeout_kwarg_rejected(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Negative kwarg (e.g., from YAML/JSON configs) must not disable the wrapper.
Mirrors the env-var path: fall back to the default and emit a WARNING
rather than silently treating a negative value as an opt-out — `None` /
`0` are the documented off switches.
"""
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
caplog.set_level(logging.WARNING, logger="langchain_openai.chat_models.base")
model = ChatOpenAI(model=MODEL, stream_chunk_timeout=-10)
assert model.stream_chunk_timeout == 120.0
assert any(
"negative" in r.getMessage().lower()
and "stream_chunk_timeout" in r.getMessage()
for r in caplog.records
if r.levelno == logging.WARNING
)
def test_zero_stream_chunk_timeout_kwarg_preserved(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""`stream_chunk_timeout=0` is the documented opt-out and must persist."""
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
model = ChatOpenAI(model=MODEL, stream_chunk_timeout=0)
assert model.stream_chunk_timeout == 0
class _SlowAsyncContextManager:
"""Async context manager that sleeps between streamed items."""
def __init__(self, chunks: list[Any], per_item_sleep: float) -> None:
self._chunks = list(chunks)
self._sleep = per_item_sleep
self._iter = iter(chunks)
async def __aenter__(self) -> Self:
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
return None
def __aiter__(self) -> Self:
return self
async def __anext__(self) -> Any:
await asyncio.sleep(self._sleep)
try:
return next(self._iter)
except StopIteration as exc:
raise StopAsyncIteration from exc
class _SlowSyncContextManager:
"""Sync context manager mirror of `_SlowAsyncContextManager`.
Sleeps between items in wall-clock time. The sync path never uses
`asyncio.wait_for`, so a tight `stream_chunk_timeout` should have no
effect here — that is the invariant we want to lock.
"""
def __init__(self, chunks: list[Any], per_item_sleep: float) -> None:
self._chunks = list(chunks)
self._sleep = per_item_sleep
self._iter = iter(chunks)
def __enter__(self) -> Self:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
return None
def __iter__(self) -> Self:
return self
def __next__(self) -> Any:
import time as _time
_time.sleep(self._sleep)
try:
return next(self._iter)
except StopIteration:
raise
@pytest.mark.asyncio
async def test_astream_integration_raises_stream_chunk_timeout_error(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""End-to-end: slow async stream + tight timeout must raise.
Guards against a refactor that drops the `_astream_with_chunk_timeout`
wrapper from the `_astream` path — unit tests on the helper alone
wouldn't catch that regression.
"""
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
llm = ChatOpenAI(model=MODEL, stream_chunk_timeout=0.05)
fake_chunks = [
{
"id": "c1",
"object": "chat.completion.chunk",
"created": 1,
"model": "gpt-4o",
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": "hi"},
"finish_reason": None,
}
],
},
]
mock_client = AsyncMock()
async def mock_create(*args: Any, **kwargs: Any) -> _SlowAsyncContextManager:
return _SlowAsyncContextManager(fake_chunks, per_item_sleep=0.3)
mock_client.create = mock_create
with (
patch.object(llm, "async_client", mock_client),
pytest.raises(StreamChunkTimeoutError) as exc_info,
):
async for _ in llm.astream("hello"):
pass
assert exc_info.value.model_name == MODEL
def test_stream_sync_not_wrapped_by_chunk_timeout(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Sync `llm.stream()` must not be subject to `stream_chunk_timeout`.
Setting `stream_chunk_timeout=0.01` with a 100ms-per-chunk sync source
would raise if the wrapper were (incorrectly) applied to the sync path.
Completion without error proves the contract.
"""
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
llm = ChatOpenAI(model=MODEL, stream_chunk_timeout=0.01)
fake_chunks = [
{
"id": "c1",
"object": "chat.completion.chunk",
"created": 1,
"model": "gpt-4o",
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": "hi"},
"finish_reason": None,
}
],
},
{
"id": "c2",
"object": "chat.completion.chunk",
"created": 1,
"model": "gpt-4o",
"choices": [
{"index": 0, "delta": {}, "finish_reason": "stop"},
],
},
]
mock_client = MagicMock()
def _create(*_args: Any, **_kwargs: Any) -> _SlowSyncContextManager:
return _SlowSyncContextManager(fake_chunks, per_item_sleep=0.1)
mock_client.create = _create
with patch.object(llm, "client", mock_client):
chunks = list(llm.stream("hello"))
assert chunks, "sync stream should have delivered chunks"

View File

@@ -7,6 +7,7 @@ EXPECTED_ALL = [
"AzureOpenAI",
"AzureChatOpenAI",
"AzureOpenAIEmbeddings",
"StreamChunkTimeoutError",
"custom_tool",
]

View File

@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.10.0, <4.0.0"
resolution-markers = [
"python_full_version >= '3.13' and platform_python_implementation == 'PyPy'",
@@ -684,7 +684,7 @@ typing = [
[[package]]
name = "langchain-openai"
version = "1.1.16"
version = "1.2.0"
source = { editable = "." }
dependencies = [
{ name = "langchain-core" },