diff --git a/libs/partners/openai/langchain_openai/__init__.py b/libs/partners/openai/langchain_openai/__init__.py index da0c6c2b25d..e55c6042561 100644 --- a/libs/partners/openai/langchain_openai/__init__.py +++ b/libs/partners/openai/langchain_openai/__init__.py @@ -1,6 +1,7 @@ """Module for OpenAI integrations.""" from langchain_openai.chat_models import AzureChatOpenAI, ChatOpenAI +from langchain_openai.chat_models._client_utils import StreamChunkTimeoutError from langchain_openai.embeddings import AzureOpenAIEmbeddings, OpenAIEmbeddings from langchain_openai.llms import AzureOpenAI, OpenAI from langchain_openai.tools import custom_tool @@ -12,5 +13,6 @@ __all__ = [ "ChatOpenAI", "OpenAI", "OpenAIEmbeddings", + "StreamChunkTimeoutError", "custom_tool", ] diff --git a/libs/partners/openai/langchain_openai/chat_models/_client_utils.py b/libs/partners/openai/langchain_openai/chat_models/_client_utils.py index 4a0efce9e1b..1d7acc6472d 100644 --- a/libs/partners/openai/langchain_openai/chat_models/_client_utils.py +++ b/libs/partners/openai/langchain_openai/chat_models/_client_utils.py @@ -1,23 +1,363 @@ -"""Helpers for creating OpenAI API clients. +"""Helpers for OpenAI httpx client construction, transport tuning, and streaming. -This module allows for the caching of httpx clients to avoid creating new instances -for each instance of ChatOpenAI. +Covers cached default client builders, proxy-aware variants for the +`openai_proxy` path, kernel-level TCP keepalive / `TCP_USER_TIMEOUT` socket +options, and the `_astream_with_chunk_timeout` wrapper that bounds per-chunk +wall-clock time on async SSE streams. -Logic is largely replicated from openai._base_client. +Client-builder boilerplate mirrors the patterns in `openai._base_client`; +socket-option tuning and the streaming timeout are original to this module. """ from __future__ import annotations import asyncio import inspect +import logging import os -from collections.abc import Awaitable, Callable +import socket +import sys +import urllib.request +from collections.abc import AsyncIterator, Awaitable, Callable, Sequence from functools import lru_cache -from typing import Any, cast +from typing import Any, TypeVar, cast +import httpx import openai from pydantic import SecretStr +logger = logging.getLogger(__name__) + +SocketOption = tuple[int, int, int] + +# socket.TCP_KEEPIDLE etc. are absent on darwin/win32; use raw UAPI constants. +_LINUX_TCP_KEEPIDLE = 4 +_LINUX_TCP_KEEPINTVL = 5 +_LINUX_TCP_KEEPCNT = 6 +_LINUX_TCP_USER_TIMEOUT = 18 + +# macOS: same semantics, different constants from . +_DARWIN_TCP_KEEPALIVE = 0x10 # idle seconds before first probe +_DARWIN_TCP_KEEPINTVL = 0x101 +_DARWIN_TCP_KEEPCNT = 0x102 + +# Mirrors the openai SDK's pool defaults. Hardcoded to avoid depending on +# an internal module path (openai._constants) that can move across SDK versions. +_DEFAULT_CONNECTION_LIMITS = httpx.Limits( + max_connections=1000, + max_keepalive_connections=100, + keepalive_expiry=5.0, +) + + +def _int_env(name: str, default: int, *, allow_negative: bool = False) -> int: + """Read an int env var with graceful fallback + discoverable warning. + + Unparseable or (by default) negative values fall back to `default` and + emit a single `WARNING` naming the offending variable. A misconfigured + environment still loads, but operators see the fallback in their logs + rather than silently getting a surprising default. + """ + raw = os.environ.get(name) + if raw is None: + return default + try: + value = int(raw) + except (TypeError, ValueError): + logger.warning( + "Invalid value for %s=%r (not an int); falling back to %d.", + name, + raw, + default, + ) + return default + if not allow_negative and value < 0: + logger.warning( + "Invalid value for %s=%r (negative); falling back to %d.", + name, + raw, + default, + ) + return default + return value + + +def _float_env(name: str, default: float, *, allow_negative: bool = False) -> float: + """Read a float env var with graceful fallback + discoverable warning. + + See `_int_env`. Negative values are rejected by default so a typo in + `LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S=-10` can't silently disable the + wrapper it was meant to configure. + """ + raw = os.environ.get(name) + if raw is None: + return default + try: + value = float(raw) + except (TypeError, ValueError): + logger.warning( + "Invalid value for %s=%r (not a float); falling back to %s.", + name, + raw, + default, + ) + return default + if not allow_negative and value < 0: + logger.warning( + "Invalid value for %s=%r (negative); falling back to %s.", + name, + raw, + default, + ) + return default + return value + + +def _filter_supported(opts: list[SocketOption]) -> list[SocketOption]: + """Drop socket options the running platform rejects. + + Probes each option against a throwaway socket via `setsockopt` and keeps + only those the kernel accepts. This keeps the library-computed defaults + non-fatal across platforms that don't implement every Linux option — + `TCP_USER_TIMEOUT` in particular is Linux-only and silently missing on + macOS, some minimal kernels, and older gVisor builds. Dropped options + are logged at `DEBUG` so an operator can confirm whether a kernel-level + knob took effect on their platform. + + If the probe socket cannot be created (sandboxed runtimes, `pytest-socket` + under `--disable-socket`, tight seccomp policies), the input list is + returned unfiltered. This preserves the pass-through behavior used for + explicit user overrides: unsupported options will surface as a clear + `OSError` at the first real `connect()` rather than being silently + dropped during `ChatOpenAI` construction. + """ + try: + probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + except Exception: + # Broad catch is deliberate: `pytest_socket` under `--disable-socket` + # raises `SocketBlockedError` (a `RuntimeError`, not `OSError`), and + # seccomp/sandboxed runtimes have been observed to raise other + # `OSError` subclasses and `PermissionError`. The intent is "any + # inability to create a probe socket -> pass through unfiltered," + # and narrowing the type would silently regress sandboxed CI. + return list(opts) + try: + supported: list[SocketOption] = [] + dropped: list[SocketOption] = [] + for level, optname, optval in opts: + try: + probe.setsockopt(level, optname, optval) + except OSError: + dropped.append((level, optname, optval)) + continue + supported.append((level, optname, optval)) + if dropped: + logger.debug( + "Dropped %d unsupported socket option(s) on %s: %s", + len(dropped), + sys.platform, + dropped, + ) + return supported + finally: + probe.close() + + +def _default_socket_options() -> tuple[SocketOption, ...]: + """Return default TCP socket options, or `()` if disabled via env. + + Always returns a tuple (never None) so callers and `@lru_cache` keys + remain uniform: `()` is the single shape for "no options". + + Target behavior on Linux/gVisor with the full option set: silent peers + are surfaced within ~90-120s via `SO_KEEPALIVE` + `TCP_USER_TIMEOUT` + (keepalive path gives a ~90s floor at the defaults; `TCP_USER_TIMEOUT` + caps at 120s). On platforms that reject some options, + `_filter_supported` drops them and the bound degrades to whatever the + remaining options provide. + """ + if os.environ.get("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "1") == "0": + return () + + keepidle = _int_env("LANGCHAIN_OPENAI_TCP_KEEPIDLE", 60) + keepintvl = _int_env("LANGCHAIN_OPENAI_TCP_KEEPINTVL", 10) + keepcnt = _int_env("LANGCHAIN_OPENAI_TCP_KEEPCNT", 3) + user_timeout_ms = _int_env("LANGCHAIN_OPENAI_TCP_USER_TIMEOUT_MS", 120000) + + opts: list[SocketOption] = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)] + if sys.platform == "linux": + opts += [ + (socket.IPPROTO_TCP, _LINUX_TCP_KEEPIDLE, keepidle), + (socket.IPPROTO_TCP, _LINUX_TCP_KEEPINTVL, keepintvl), + (socket.IPPROTO_TCP, _LINUX_TCP_KEEPCNT, keepcnt), + (socket.IPPROTO_TCP, _LINUX_TCP_USER_TIMEOUT, user_timeout_ms), + ] + elif sys.platform == "darwin": + opts += [ + (socket.IPPROTO_TCP, _DARWIN_TCP_KEEPALIVE, keepidle), + (socket.IPPROTO_TCP, _DARWIN_TCP_KEEPINTVL, keepintvl), + (socket.IPPROTO_TCP, _DARWIN_TCP_KEEPCNT, keepcnt), + ] + # Windows (win32): SO_KEEPALIVE only; per-option tuning requires WSAIoctl. + return tuple(_filter_supported(opts)) + + +_PROXY_ENV_VARS = ( + "HTTP_PROXY", + "HTTPS_PROXY", + "ALL_PROXY", + "http_proxy", + "https_proxy", + "all_proxy", +) +_proxy_env_warning_emitted = False +_proxy_env_bypass_info_emitted = False + + +def _proxy_env_detected() -> bool: + """True when httpx would pick up a proxy from env or system config. + + Mirrors the surface httpx reads (`urllib.request.getproxies()` plus the + uppercase env var names) so a positive result means env-proxy + auto-detection is live on pre-PR code paths. + """ + if any(os.environ.get(name) for name in _PROXY_ENV_VARS): + return True + try: + return bool(urllib.request.getproxies()) + except Exception: + return False + + +def _should_bypass_socket_options_for_proxy_env( + *, + http_socket_options: Sequence[SocketOption] | None, + http_client: Any, + http_async_client: Any, + openai_proxy: str | None, +) -> bool: + """True when default shape + env proxy detected → skip transport injection. + + Preserves pre-PR behavior for apps relying on httpx's env-proxy + auto-detection. Only triggers when the user has made no explicit choice + that would signal they want the custom transport: + + - `http_socket_options` left at `None` (default, not `()` or a sequence) + - `LANGCHAIN_OPENAI_TCP_KEEPALIVE` is not `0` (kill-switch is its own path) + - No `http_client` or `http_async_client` supplied + - No `openai_proxy` supplied + - A proxy env var / system proxy is visible to httpx + + If any of those are set, the user has opted in to the transport path + (directly or via `openai_proxy`) and normal behavior — including the + shadowed-proxy WARNING — applies. When the kill-switch is set, + `_default_socket_options` already returns `()`, so the bypass INFO + would be noise; route through the normal path instead. + """ + if http_socket_options is not None: + return False + if os.environ.get("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "1") == "0": + return False + if http_client is not None or http_async_client is not None: + return False + if openai_proxy: + return False + return _proxy_env_detected() + + +def _log_proxy_env_bypass_once() -> None: + """Emit a one-time INFO when the proxy-env bypass triggers. + + Visibility for operators running with a custom log pipeline: the bypass + is the *safe* outcome (env-proxy auto-detection preserved), but it means + socket-level keepalive / `TCP_USER_TIMEOUT` aren't applied on this + instance. INFO-level, since it's not a problem — just a diagnostic. + """ + global _proxy_env_bypass_info_emitted + if _proxy_env_bypass_info_emitted: + return + _proxy_env_bypass_info_emitted = True + active = [name for name in _PROXY_ENV_VARS if os.environ.get(name)] + source = ", ".join(active) if active else "system proxy configuration" + logger.info( + "langchain-openai detected %s and no explicit `http_socket_options` / " + "`http_client` / `http_async_client` / `openai_proxy`; skipping the " + "custom `httpx` transport so httpx's env-proxy auto-detection applies. " + "Pass `http_socket_options=[...]` to opt back into kernel-level TCP " + "keepalive tuning on top of the env proxy.", + source, + ) + + +def _warn_if_proxy_env_shadowed( + socket_options: tuple[SocketOption, ...], + *, + openai_proxy: str | None, +) -> None: + """Warn once if a custom transport will shadow httpx's proxy auto-detection. + + When `socket_options` is non-empty we pass a custom `httpx` transport, + which disables httpx's native proxy auto-detection — both the uppercase + `HTTP_PROXY` / `HTTPS_PROXY` / `ALL_PROXY` env vars and their lowercase + equivalents, plus macOS/Windows system proxy config. If the user + supplies `openai_proxy` explicitly we route through it and the env-var + handling is moot. Otherwise, a user whose app was transparently relying + on any of those sources will silently stop using them on upgrade — + emit a single WARNING so the behavior change is discoverable. + + Detection uses `urllib.request.getproxies()` — the same surface httpx + reads — so lowercase env vars and macOS/Windows system proxy settings + are caught alongside the uppercase names. + """ + global _proxy_env_warning_emitted + if _proxy_env_warning_emitted or not socket_options or openai_proxy: + return + active = [name for name in _PROXY_ENV_VARS if os.environ.get(name)] + try: + detected = bool(urllib.request.getproxies()) + except Exception: + detected = False + if not active and not detected: + return + _proxy_env_warning_emitted = True + if active: + source = ", ".join(active) + " set in environment" + else: + source = "system proxy configuration detected" + logger.warning( + "langchain-openai injected a custom httpx transport to apply " + "`http_socket_options`, which disables httpx's proxy " + "auto-detection (%s). Set " + "`LANGCHAIN_OPENAI_TCP_KEEPALIVE=0` or pass `http_socket_options=()` " + "to restore default proxy behavior, or supply `openai_proxy` / your " + "own `http_client` / `http_async_client` to take full control.", + source, + ) + + +def _resolve_socket_options( + value: Sequence[SocketOption] | None, +) -> tuple[SocketOption, ...]: + """Normalize the user-facing field to the tuple form builders expect. + + - `None` => env-driven defaults (may itself be `()` if the user set + `LANGCHAIN_OPENAI_TCP_KEEPALIVE=0`). This path runs through + `_filter_supported()` inside `_default_socket_options()` because + the library-computed option set is aspirational and silent degradation + is the right posture. + - Any other sequence (including empty) => retupled for cache hashability. + An empty tuple is the explicit "disabled" signal. A non-empty sequence + is passed verbatim — **not** filtered. The user chose these options + explicitly, so an unsupported constant should surface as a clear + `OSError` at connect time, not be silently dropped. + + Always returns a tuple — never `None` — so downstream signatures take + `tuple[SocketOption, ...]` with `()` as the single "no options" shape. + """ + if value is None: + return _default_socket_options() + return tuple(value) + class _SyncHttpxClientWrapper(openai.DefaultHttpxClient): """Borrowed from openai._base_client.""" @@ -47,43 +387,120 @@ class _AsyncHttpxClientWrapper(openai.DefaultAsyncHttpxClient): def _build_sync_httpx_client( - base_url: str | None, timeout: Any + base_url: str | None, + timeout: Any, + socket_options: tuple[SocketOption, ...] = (), ) -> _SyncHttpxClientWrapper: - return _SyncHttpxClientWrapper( - base_url=base_url + kwargs: dict[str, Any] = { + "base_url": base_url or os.environ.get("OPENAI_BASE_URL") or "https://api.openai.com/v1", - timeout=timeout, - ) + "timeout": timeout, + } + if socket_options: + # httpx ignores limits= when transport= is provided; set it explicitly + # on the transport to avoid silently shrinking the connection pool. + kwargs["transport"] = httpx.HTTPTransport( + socket_options=list(socket_options), + limits=_DEFAULT_CONNECTION_LIMITS, + ) + return _SyncHttpxClientWrapper(**kwargs) def _build_async_httpx_client( - base_url: str | None, timeout: Any + base_url: str | None, + timeout: Any, + socket_options: tuple[SocketOption, ...] = (), ) -> _AsyncHttpxClientWrapper: - return _AsyncHttpxClientWrapper( - base_url=base_url + kwargs: dict[str, Any] = { + "base_url": base_url or os.environ.get("OPENAI_BASE_URL") or "https://api.openai.com/v1", - timeout=timeout, + "timeout": timeout, + } + if socket_options: + # See _build_sync_httpx_client for the limits= rationale. + kwargs["transport"] = httpx.AsyncHTTPTransport( + socket_options=list(socket_options), + limits=_DEFAULT_CONNECTION_LIMITS, + ) + return _AsyncHttpxClientWrapper(**kwargs) + + +def _build_proxied_sync_httpx_client( + proxy: str, + verify: Any, + socket_options: tuple[SocketOption, ...] = (), +) -> httpx.Client: + """httpx.Client for the openai_proxy code path. + + When socket options are disabled (`()`), returns a plain + `httpx.Client(proxy=..., verify=...)` with no transport injected. + """ + if not socket_options: + return httpx.Client(proxy=proxy, verify=verify) + # Mount under `all://` (not `transport=`) so `Client._mounts` mirrors the + # shape produced by httpx's own `proxy=` path — a single-entry dict keyed + # by `URLPattern("all://")`. Callers (and the existing proxy integration + # test) reach into `_mounts` to introspect the proxy URL; a bare + # `transport=` leaves `_mounts` empty. + # + # `httpx.HTTPTransport(proxy=...)` is stricter about string coercion than + # `httpx.Client(proxy=...)`; wrap in the public `httpx.Proxy` type for + # version-stable behavior. + transport = httpx.HTTPTransport( + proxy=httpx.Proxy(proxy), + verify=verify, + socket_options=list(socket_options), + limits=_DEFAULT_CONNECTION_LIMITS, ) + return httpx.Client(mounts={"all://": transport}) + + +def _build_proxied_async_httpx_client( + proxy: str, + verify: Any, + socket_options: tuple[SocketOption, ...] = (), +) -> httpx.AsyncClient: + """httpx.AsyncClient for the openai_proxy code path. + + See `_build_proxied_sync_httpx_client` for the opt-out fallback, + the `mounts={"all://": ...}` shape, and the `httpx.Proxy` wrapping + rationale. + """ + if not socket_options: + return httpx.AsyncClient(proxy=proxy, verify=verify) + transport = httpx.AsyncHTTPTransport( + proxy=httpx.Proxy(proxy), + verify=verify, + socket_options=list(socket_options), + limits=_DEFAULT_CONNECTION_LIMITS, + ) + return httpx.AsyncClient(mounts={"all://": transport}) @lru_cache def _cached_sync_httpx_client( - base_url: str | None, timeout: Any + base_url: str | None, + timeout: Any, + socket_options: tuple[SocketOption, ...] = (), ) -> _SyncHttpxClientWrapper: - return _build_sync_httpx_client(base_url, timeout) + return _build_sync_httpx_client(base_url, timeout, socket_options) @lru_cache def _cached_async_httpx_client( - base_url: str | None, timeout: Any + base_url: str | None, + timeout: Any, + socket_options: tuple[SocketOption, ...] = (), ) -> _AsyncHttpxClientWrapper: - return _build_async_httpx_client(base_url, timeout) + return _build_async_httpx_client(base_url, timeout, socket_options) def _get_default_httpx_client( - base_url: str | None, timeout: Any + base_url: str | None, + timeout: Any, + socket_options: tuple[SocketOption, ...] = (), ) -> _SyncHttpxClientWrapper: """Get default httpx client. @@ -92,13 +509,15 @@ def _get_default_httpx_client( try: hash(timeout) except TypeError: - return _build_sync_httpx_client(base_url, timeout) + return _build_sync_httpx_client(base_url, timeout, socket_options) else: - return _cached_sync_httpx_client(base_url, timeout) + return _cached_sync_httpx_client(base_url, timeout, socket_options) def _get_default_async_httpx_client( - base_url: str | None, timeout: Any + base_url: str | None, + timeout: Any, + socket_options: tuple[SocketOption, ...] = (), ) -> _AsyncHttpxClientWrapper: """Get default httpx client. @@ -107,9 +526,9 @@ def _get_default_async_httpx_client( try: hash(timeout) except TypeError: - return _build_async_httpx_client(base_url, timeout) + return _build_async_httpx_client(base_url, timeout, socket_options) else: - return _cached_async_httpx_client(base_url, timeout) + return _cached_async_httpx_client(base_url, timeout, socket_options) def _resolve_sync_and_async_api_keys( @@ -140,3 +559,127 @@ def _resolve_sync_and_async_api_keys( async_api_key_value = async_api_key_wrapper return sync_api_key_value, async_api_key_value + + +T = TypeVar("T") + +# On Python ≤3.10, asyncio.TimeoutError and builtins.TimeoutError are distinct +# hierarchies, so subclassing only asyncio.TimeoutError would not be caught by +# `except TimeoutError:`. On Python ≥3.11 they are the same object, so listing +# both bases would raise TypeError: duplicate base class. We resolve this at +# class-definition time. +_StreamChunkTimeoutBases: tuple[type, ...] = ( + (asyncio.TimeoutError,) + if issubclass(asyncio.TimeoutError, TimeoutError) + else (asyncio.TimeoutError, TimeoutError) +) + + +class StreamChunkTimeoutError(*_StreamChunkTimeoutBases): # type: ignore[misc] + """Raised when no streaming chunk arrives within `stream_chunk_timeout`. + + `issubclass(StreamChunkTimeoutError, asyncio.TimeoutError)` and + `issubclass(StreamChunkTimeoutError, TimeoutError)` both hold on all + supported Python versions, so existing `except asyncio.TimeoutError:` + and `except TimeoutError:` handlers keep catching the exception. On + Python 3.11+ the two exceptions are the same object, so only + `asyncio.TimeoutError` appears in `__bases__`. + + Structured attributes (`timeout_s`, `model_name`, `chunks_received`) + mirror the WARNING log's `extra=` payload so diagnostic code doesn't + need to regex the message. + """ + + def __init__( + self, + timeout_s: float, + *, + model_name: str | None = None, + chunks_received: int = 0, + ) -> None: + self.timeout_s = timeout_s + self.model_name = model_name + self.chunks_received = chunks_received + context = [] + if model_name: + context.append(f"model={model_name}") + context.append(f"chunks_received={chunks_received}") + suffix = f" ({', '.join(context)})" + super().__init__( + f"No streaming chunk received for {timeout_s:.1f}s{suffix}. The " + f"connection may be alive at the TCP layer but is not producing " + f"content. Tune or disable via the `stream_chunk_timeout` " + f"constructor kwarg (set to None or 0 to disable) or the " + f"`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S` env var. See also " + f"`http_socket_options` for the kernel-level TCP timeout that " + f"catches dead TCP peers." + ) + + +async def _astream_with_chunk_timeout( + source: AsyncIterator[T], + timeout: float | None, + *, + model_name: str | None = None, +) -> AsyncIterator[T]: + """Yield from `source` but bound the per-chunk wait time. + + If `timeout` is None or <=0, yields directly with no wall-clock bound. + Otherwise, each `__anext__` is wrapped in + `asyncio.wait_for(..., timeout)`. A timeout raises + `StreamChunkTimeoutError` (a `TimeoutError` subclass) whose message + names the knob, the env-var override, the model, and how many chunks + were received before the stall. A single-line structured log also + fires at WARNING so the signal is visible in aggregate logging systems + even when the exception is caught upstream. + + When the timeout is active, the source iterator is explicitly + `aclose()`-d on early exit (timeout, consumer break, any exception) so + the underlying httpx streaming connection is released promptly. The + pass-through branch (timeout disabled) relies on httpx's GC-driven + cleanup instead — matching the behavior of unwrapped streams. + """ + if not timeout or timeout <= 0: + async for item in source: + yield item + return + + chunks_received = 0 + it = source.__aiter__() + try: + while True: + try: + chunk = await asyncio.wait_for(it.__anext__(), timeout=timeout) + except StopAsyncIteration: + return + except asyncio.TimeoutError as e: + logger.warning( + "langchain_openai.stream_chunk_timeout fired", + extra={ + "source": "stream_chunk_timeout", + "timeout_s": timeout, + "model_name": model_name, + "chunks_received": chunks_received, + }, + ) + raise StreamChunkTimeoutError( + timeout, + model_name=model_name, + chunks_received=chunks_received, + ) from e + chunks_received += 1 + yield chunk + finally: + aclose = getattr(it, "aclose", None) + if aclose is not None: + try: + await aclose() + except Exception as cleanup_exc: + # Best-effort cleanup; don't mask the original exception, + # but leave a DEBUG trace so pool/transport bugs stay + # discoverable at the right log level. + logger.debug( + "aclose() during _astream_with_chunk_timeout cleanup " + "raised; ignoring", + exc_info=cleanup_exc, + ) diff --git a/libs/partners/openai/langchain_openai/chat_models/base.py b/libs/partners/openai/langchain_openai/chat_models/base.py index 57497158049..478a42a0810 100644 --- a/libs/partners/openai/langchain_openai/chat_models/base.py +++ b/libs/partners/openai/langchain_openai/chat_models/base.py @@ -124,15 +124,24 @@ from pydantic import ( Field, SecretStr, ValidationError, + field_validator, model_validator, ) from pydantic.v1 import BaseModel as BaseModelV1 from typing_extensions import Self from langchain_openai.chat_models._client_utils import ( + _astream_with_chunk_timeout, + _build_proxied_async_httpx_client, + _build_proxied_sync_httpx_client, + _float_env, _get_default_async_httpx_client, _get_default_httpx_client, + _log_proxy_env_bypass_once, + _resolve_socket_options, _resolve_sync_and_async_api_keys, + _should_bypass_socket_options_for_proxy_env, + _warn_if_proxy_env_shadowed, ) from langchain_openai.chat_models._compat import ( _convert_from_v1_to_chat_completions, @@ -790,6 +799,113 @@ class BaseChatOpenAI(BaseChatModel): like a custom client for sync invocations. """ + http_socket_options: Sequence[tuple[int, int, int]] | None = Field( + default=None, exclude=True + ) + """TCP socket options applied to the httpx transports built by this instance. + + Defaults to a conservative TCP-keepalive + `TCP_USER_TIMEOUT` profile that + targets a ~2-minute bound on silent connection hangs (silent mid-stream peer + loss, gVisor/NAT idle timeouts, silent TCP black holes) on platforms that + support the full option set. On platforms that only support a subset + (macOS without `TCP_USER_TIMEOUT`, Windows with only `SO_KEEPALIVE`, + minimal kernels), unsupported options are silently dropped and the bound + degrades to whatever the remaining options + OS defaults provide — still + better than indefinite hang. + + Accepted values: + + - `None` (default): use env-driven defaults. Matches the "unset" convention + used by `http_client` elsewhere on this class. + - `()` (empty): disable socket-option injection entirely. Inherits the OS + defaults and restores httpx's native env-proxy auto-detection. + - A non-empty sequence of `(level, option, value)` tuples: explicit + override; passed verbatim to the transport (not filtered). Unsupported + options raise `OSError` at connect time rather than being silently + dropped — the user chose them explicitly. + + Environment variables (only consulted when this field is `None`): + `LANGCHAIN_OPENAI_TCP_KEEPALIVE` (set to `0` to disable entirely — the + kill-switch), `LANGCHAIN_OPENAI_TCP_KEEPIDLE`, + `LANGCHAIN_OPENAI_TCP_KEEPINTVL`, `LANGCHAIN_OPENAI_TCP_KEEPCNT`, + `LANGCHAIN_OPENAI_TCP_USER_TIMEOUT_MS`. + + Applied per side: if `http_client` is supplied, the sync path uses + that user-owned client's socket options as-is; the async path still + gets `http_socket_options` applied to its default builder (and + vice-versa for `http_async_client`). Supply both to take full control. + + !!! note "Interaction with env-proxy auto-detection" + + When a custom `httpx` transport is active, `httpx` disables its + native env-proxy auto-detection (`HTTP_PROXY` / `HTTPS_PROXY` / + `ALL_PROXY` / `NO_PROXY` and macOS/Windows system proxy settings). + + To keep the default shape safe, `ChatOpenAI` detects the + "proxy-env-shadow" pattern and **skips the custom transport + entirely** when **all** of the following hold: + + - `http_socket_options` is left at its default (`None`) + - No `http_client` or `http_async_client` supplied + - No `openai_proxy` supplied + - A proxy env var or system proxy is visible to httpx + + On that specific shape, the instance falls back to pre-PR behavior + and httpx's env-proxy auto-detection applies (a one-time `INFO` log + records the bypass for observability). + + If you explicitly set `http_socket_options=[...]` while a proxy + env var is also set, no bypass — you opted into the transport, and + a one-time `WARNING` records the shadowing. Set + `http_socket_options=()` or `LANGCHAIN_OPENAI_TCP_KEEPALIVE=0` to + disable transport injection explicitly, or pass a fully-configured + `http_async_client` / `http_client` to take full control. The + `openai_proxy` constructor kwarg is unaffected — socket options + are applied cleanly through the proxied transport on that path. + """ + + stream_chunk_timeout: float | None = Field( + default_factory=lambda: _float_env( + "LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", 120.0 + ), + exclude=True, + ) + """Per-chunk wall-clock timeout (seconds) on async streaming responses. + + Applies to async invocations only (`astream`, `ainvoke` with streaming, + etc.). Sync streaming (`stream`) is not affected. + + Fires between content chunks yielded by the openai SDK's streaming iterator + (i.e., each call to `__anext__` on the response). Crucially, this is + **not** the same as httpx's `timeout.read`: + + - httpx's read timeout is inter-byte and gets reset every time *any* bytes + arrive on the socket — including OpenAI's SSE keepalive comments + (`: keepalive`) that trickle down during long model generations. A + stream that's silent on *content* but still producing keepalives looks + alive forever to httpx. + - `stream_chunk_timeout` measures the gap between *parsed chunks*. The + openai SDK's SSE parser consumes keepalive comments internally and does + not emit them as chunks, so keepalives do *not* reset this timer. It + fires on genuine content silence. + + When it fires, a `StreamChunkTimeoutError` + (subclass of `asyncio.TimeoutError`) is raised with a self-describing + message naming this knob, the env-var override, the model, and the + number of chunks received before the stall. A WARNING log with + `extra={"source": "stream_chunk_timeout", "timeout_s": , + "model_name": , "chunks_received": }` also fires so + aggregate logging can distinguish app-layer timeouts from + transport-layer failures. + + Defaults to 120s. Set to `None` or `0` to disable. Overridable via the + `LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S` env var. Negative values + (from either the env var or the constructor kwarg — e.g., hydrated + from YAML/JSON configs) fall back to the default with a `WARNING` log + rather than silently disabling the wrapper, so a misconfigured value + still boots safely and the fallback is visible. + """ + stop: list[str] | str | None = Field(default=None, alias="stop_sequences") """Default stop sequences.""" @@ -953,6 +1069,27 @@ class BaseChatOpenAI(BaseChatModel): all_required_field_names = get_pydantic_field_names(cls) return _build_model_kwargs(values, all_required_field_names) + @field_validator("stream_chunk_timeout", mode="after") + @classmethod + def _validate_stream_chunk_timeout(cls, value: float | None) -> float | None: + """Reject negative constructor values; fall back to the env-driven default. + + Matches the env-var path in `_float_env`: a negative value is a typo, + not an opt-out (`None`/`0` are the documented off switches). Configs + hydrated from YAML/JSON would otherwise silently disable the wrapper + and reintroduce the indefinite-stream hang the feature prevents. + """ + if value is not None and value < 0: + fallback = _float_env("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", 120.0) + logger.warning( + "Invalid `stream_chunk_timeout=%r` (negative); " + "falling back to %s. Pass `None` or `0` to disable.", + value, + fallback, + ) + return fallback + return value + @model_validator(mode="before") @classmethod def validate_temperature(cls, values: dict[str, Any]) -> Any: @@ -1055,6 +1192,23 @@ class BaseChatOpenAI(BaseChatModel): f"{openai_proxy=}\n{http_client=}\n{http_async_client=}" ) raise ValueError(msg) + if _should_bypass_socket_options_for_proxy_env( + http_socket_options=self.http_socket_options, + http_client=self.http_client, + http_async_client=self.http_async_client, + openai_proxy=self.openai_proxy, + ): + # Default-shape construction + proxy env var visible to httpx: + # skip the custom transport so httpx's env-proxy auto-detection + # still applies. Users who want kernel-level TCP tuning alongside + # an env proxy can opt in explicitly via `http_socket_options`. + resolved_socket_options: tuple[tuple[int, int, int], ...] = () + _log_proxy_env_bypass_once() + else: + resolved_socket_options = _resolve_socket_options(self.http_socket_options) + _warn_if_proxy_env_shadowed( + resolved_socket_options, openai_proxy=self.openai_proxy + ) if not self.client: if sync_api_key_value is None: # No valid sync API key, leave client as None and raise informative @@ -1063,21 +1217,17 @@ class BaseChatOpenAI(BaseChatModel): self.root_client = None else: if self.openai_proxy and not self.http_client: - try: - import httpx - except ImportError as e: - msg = ( - "Could not import httpx python package. " - "Please install it with `pip install httpx`." - ) - raise ImportError(msg) from e - self.http_client = httpx.Client( - proxy=self.openai_proxy, verify=global_ssl_context + self.http_client = _build_proxied_sync_httpx_client( + proxy=self.openai_proxy, + verify=global_ssl_context, + socket_options=resolved_socket_options, ) sync_specific = { "http_client": self.http_client or _get_default_httpx_client( - self.openai_api_base, self.request_timeout + self.openai_api_base, + self.request_timeout, + resolved_socket_options, ), "api_key": sync_api_key_value, } @@ -1085,21 +1235,17 @@ class BaseChatOpenAI(BaseChatModel): self.client = self.root_client.chat.completions if not self.async_client: if self.openai_proxy and not self.http_async_client: - try: - import httpx - except ImportError as e: - msg = ( - "Could not import httpx python package. " - "Please install it with `pip install httpx`." - ) - raise ImportError(msg) from e - self.http_async_client = httpx.AsyncClient( - proxy=self.openai_proxy, verify=global_ssl_context + self.http_async_client = _build_proxied_async_httpx_client( + proxy=self.openai_proxy, + verify=global_ssl_context, + socket_options=resolved_socket_options, ) async_specific = { "http_client": self.http_async_client or _get_default_async_httpx_client( - self.openai_api_base, self.request_timeout + self.openai_api_base, + self.request_timeout, + resolved_socket_options, ), "api_key": async_api_key_value, } @@ -1333,7 +1479,11 @@ class BaseChatOpenAI(BaseChatModel): current_output_index = -1 current_sub_index = -1 has_reasoning = False - async for chunk in response: + async for chunk in _astream_with_chunk_timeout( + response, + self.stream_chunk_timeout, + model_name=self.model_name, + ): metadata = headers if is_first_chunk else {} ( current_index, @@ -1684,7 +1834,11 @@ class BaseChatOpenAI(BaseChatModel): context_manager = response async with context_manager as response: is_first_chunk = True - async for chunk in response: + async for chunk in _astream_with_chunk_timeout( + response, + self.stream_chunk_timeout, + model_name=self.model_name, + ): if not isinstance(chunk, dict): chunk = chunk.model_dump() generation_chunk = self._convert_chunk_to_generation_chunk( diff --git a/libs/partners/openai/tests/unit_tests/chat_models/test_client_utils.py b/libs/partners/openai/tests/unit_tests/chat_models/test_client_utils.py new file mode 100644 index 00000000000..37679c3a8ed --- /dev/null +++ b/libs/partners/openai/tests/unit_tests/chat_models/test_client_utils.py @@ -0,0 +1,812 @@ +"""Unit tests for `langchain_openai.chat_models._client_utils`. + +Asserts socket-options plumbing at the boundary between our helpers and the +httpx layer — not on httpx internals. Locks the wiring, env-driven defaults, +the `()` kill-switch contract, and the precedence between constructor kwargs, +env vars, and user-supplied clients. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import socket +from typing import Any + +import httpx +import pytest + +from langchain_openai import ChatOpenAI +from langchain_openai.chat_models import _client_utils + +SOL_SOCKET = socket.SOL_SOCKET +SO_KEEPALIVE = socket.SO_KEEPALIVE + + +@pytest.fixture(autouse=True) +def _clear_langchain_openai_env(monkeypatch: pytest.MonkeyPatch) -> None: + """Ensure LANGCHAIN_OPENAI_* env vars don't leak between tests.""" + for name in list(os.environ): + if name.startswith("LANGCHAIN_OPENAI_") or name == "OPENAI_API_KEY": + monkeypatch.delenv(name, raising=False) + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + + +@pytest.mark.skipif( + __import__("sys").platform != "linux", + reason="Default option set is platform-specific; Linux values asserted here.", +) +def test_default_socket_options_linux() -> None: + """On Linux, the full option set should be present with default values.""" + opts = _client_utils._default_socket_options() + expected = { + (SOL_SOCKET, SO_KEEPALIVE, 1), + (socket.IPPROTO_TCP, _client_utils._LINUX_TCP_KEEPIDLE, 60), + (socket.IPPROTO_TCP, _client_utils._LINUX_TCP_KEEPINTVL, 10), + (socket.IPPROTO_TCP, _client_utils._LINUX_TCP_KEEPCNT, 3), + (socket.IPPROTO_TCP, _client_utils._LINUX_TCP_USER_TIMEOUT, 120000), + } + assert set(opts) == expected + + +def test_default_socket_options_disabled_returns_empty_tuple( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Kill-switch: `()` is the single 'no options' shape, never None.""" + monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "0") + opts = _client_utils._default_socket_options() + assert opts == () + assert isinstance(opts, tuple) + + +@pytest.mark.enable_socket +def test_filter_supported_drops_unsupported() -> None: + """An option with a deliberately-bogus level should be silently dropped. + + Requires a real probe socket, so opt out of the suite-wide + `--disable-socket`. If the probe still cannot be created (unusual + sandboxed runner), the helper falls back to pass-through; assert that + contract explicitly rather than masking the behavior. + """ + good = (SOL_SOCKET, SO_KEEPALIVE, 1) + # Very high level number the kernel will reject. + bogus = (0xDEAD, 0xBEEF, 1) + try: + socket.socket(socket.AF_INET, socket.SOCK_STREAM).close() + except OSError: + pytest.skip("probe socket unavailable in this environment") + result = _client_utils._filter_supported([good, bogus]) + assert good in result + assert bogus not in result + + +def test_build_async_httpx_client_boundary_kwargs( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Did our helper decide to inject a transport or not?""" + recorded: list[dict[str, Any]] = [] + + original = _client_utils._AsyncHttpxClientWrapper.__init__ + + def spy(self: Any, **kwargs: Any) -> None: + recorded.append(kwargs) + original(self, **kwargs) + + monkeypatch.setattr(_client_utils._AsyncHttpxClientWrapper, "__init__", spy) + + _client_utils._build_async_httpx_client( + base_url=None, + timeout=None, + socket_options=((SOL_SOCKET, SO_KEEPALIVE, 1),), + ) + assert recorded, "expected one call when socket_options populated" + assert "transport" in recorded[-1] + + recorded.clear() + _client_utils._build_async_httpx_client( + base_url=None, timeout=None, socket_options=() + ) + assert recorded, "expected one call when socket_options empty" + assert "transport" not in recorded[-1] + + +def test_build_async_httpx_client_transport_carries_socket_options( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Transport should receive our options + the mirrored limits.""" + recorded: list[dict[str, Any]] = [] + + original_cls = _client_utils.httpx.AsyncHTTPTransport + + class Recorder(original_cls): # type: ignore[misc, valid-type] + def __init__(self, *args: Any, **kwargs: Any) -> None: + recorded.append(kwargs) + super().__init__(*args, **kwargs) + + monkeypatch.setattr( + "langchain_openai.chat_models._client_utils.httpx.AsyncHTTPTransport", + Recorder, + ) + + _client_utils._build_async_httpx_client( + base_url=None, + timeout=None, + socket_options=((SOL_SOCKET, SO_KEEPALIVE, 1),), + ) + + assert recorded, "expected httpx.AsyncHTTPTransport to be constructed" + kwargs = recorded[-1] + assert kwargs.get("socket_options") == [(SOL_SOCKET, SO_KEEPALIVE, 1)] + assert kwargs.get("limits") is _client_utils._DEFAULT_CONNECTION_LIMITS + + +def test_http_socket_options_none_vs_empty_tuple_vs_populated( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Discriminates the three input shapes at the builder boundary. + + Also locks the no-filter contract for user overrides: the populated-case + assertion is verbatim, proving `_resolve_socket_options` does not run + user overrides through `_filter_supported`. + """ + recorded: list[tuple[str, tuple, tuple]] = [] + + def spy_async( + base_url: str | None, + timeout: Any, + socket_options: tuple = (), + ) -> Any: + recorded.append(("async", (base_url, timeout), tuple(socket_options))) + # Return a real (but unused) client so init completes. + return _client_utils._AsyncHttpxClientWrapper( + base_url=base_url or "https://api.openai.com/v1", timeout=timeout + ) + + def spy_sync( + base_url: str | None, + timeout: Any, + socket_options: tuple = (), + ) -> Any: + recorded.append(("sync", (base_url, timeout), tuple(socket_options))) + return _client_utils._SyncHttpxClientWrapper( + base_url=base_url or "https://api.openai.com/v1", timeout=timeout + ) + + monkeypatch.setattr( + "langchain_openai.chat_models.base._get_default_async_httpx_client", + spy_async, + ) + monkeypatch.setattr( + "langchain_openai.chat_models.base._get_default_httpx_client", + spy_sync, + ) + + # (1) Unset -> None -> env-driven defaults (non-empty on linux/darwin CI). + ChatOpenAI(model="gpt-4o") + assert recorded, "expected a default-client build" + _, _, opts1 = recorded[-1] + assert isinstance(opts1, tuple) + + # (2) Explicit empty tuple -> (). + recorded.clear() + ChatOpenAI(model="gpt-4o", http_socket_options=()) + assert recorded + assert all(opts == () for _, _, opts in recorded) + + # (3) Populated sequence -> verbatim passthrough (not filtered). + recorded.clear() + ChatOpenAI( + model="gpt-4o", + http_socket_options=[(SOL_SOCKET, SO_KEEPALIVE, 1)], + ) + assert recorded + for _, _, opts in recorded: + assert opts == ((SOL_SOCKET, SO_KEEPALIVE, 1),) + + +def test_openai_proxy_branch_applies_socket_options( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """`openai_proxy` path must go through the socket-options-aware proxied helper.""" + recorded: list[dict[str, Any]] = [] + + def spy(proxy: str, verify: Any, socket_options: tuple = ()) -> httpx.AsyncClient: + recorded.append( + {"proxy": proxy, "verify": verify, "socket_options": socket_options} + ) + return httpx.AsyncClient() + + monkeypatch.setattr( + "langchain_openai.chat_models.base._build_proxied_async_httpx_client", + spy, + ) + # Sync branch should also be covered — spy on that too. + sync_recorded: list[dict[str, Any]] = [] + + def sync_spy(proxy: str, verify: Any, socket_options: tuple = ()) -> httpx.Client: + sync_recorded.append( + {"proxy": proxy, "verify": verify, "socket_options": socket_options} + ) + return httpx.Client() + + monkeypatch.setattr( + "langchain_openai.chat_models.base._build_proxied_sync_httpx_client", + sync_spy, + ) + + ChatOpenAI( + model="gpt-4o", + openai_proxy="http://proxy.example.com:3128", + http_socket_options=[(SOL_SOCKET, SO_KEEPALIVE, 1)], + ) + + assert recorded, "expected async proxied helper to be called" + assert recorded[-1]["proxy"] == "http://proxy.example.com:3128" + assert recorded[-1]["socket_options"] == ((SOL_SOCKET, SO_KEEPALIVE, 1),) + + assert sync_recorded, "expected sync proxied helper to be called" + assert sync_recorded[-1]["proxy"] == "http://proxy.example.com:3128" + assert sync_recorded[-1]["socket_options"] == ((SOL_SOCKET, SO_KEEPALIVE, 1),) + + +def test_user_supplied_http_async_client_untouched( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """If the user passes an http_async_client, we must not mutate it.""" + default_calls: list[Any] = [] + proxied_calls: list[Any] = [] + + def default_async_spy(*args: Any, **kwargs: Any) -> Any: + default_calls.append((args, kwargs)) + msg = "default async builder should not run" + raise AssertionError(msg) + + def proxied_async_spy(*args: Any, **kwargs: Any) -> Any: + proxied_calls.append((args, kwargs)) + msg = "proxied async builder should not run" + raise AssertionError(msg) + + monkeypatch.setattr( + "langchain_openai.chat_models.base._get_default_async_httpx_client", + default_async_spy, + ) + monkeypatch.setattr( + "langchain_openai.chat_models.base._build_proxied_async_httpx_client", + proxied_async_spy, + ) + + user_client = httpx.AsyncClient() + user_sync_client = httpx.Client() + + model = ChatOpenAI( + model="gpt-4o", + http_client=user_sync_client, + http_async_client=user_client, + http_socket_options=[(SOL_SOCKET, SO_KEEPALIVE, 1)], + ) + + assert default_calls == [] + assert proxied_calls == [] + assert model.http_async_client is user_client + assert model.http_client is user_sync_client + + +def test_default_path_opt_out_is_strict_noop( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """With LANGCHAIN_OPENAI_TCP_KEEPALIVE=0 we inject no transport. + + Boundary assertion on `_AsyncHttpxClientWrapper.__init__` kwargs — our + helper passed nothing, so httpx falls back to its own native behavior + (env-proxy handling, pool defaults, trust_env, etc.) completely + unaffected by this library. + """ + monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "0") + + recorded_sync: list[dict[str, Any]] = [] + recorded_async: list[dict[str, Any]] = [] + + sync_original = _client_utils._SyncHttpxClientWrapper.__init__ + async_original = _client_utils._AsyncHttpxClientWrapper.__init__ + + def sync_spy(self: Any, **kwargs: Any) -> None: + recorded_sync.append(kwargs) + sync_original(self, **kwargs) + + def async_spy(self: Any, **kwargs: Any) -> None: + recorded_async.append(kwargs) + async_original(self, **kwargs) + + monkeypatch.setattr(_client_utils._SyncHttpxClientWrapper, "__init__", sync_spy) + monkeypatch.setattr(_client_utils._AsyncHttpxClientWrapper, "__init__", async_spy) + + ChatOpenAI(model="gpt-4o") + + assert recorded_sync, "expected the sync default client to be built" + assert "transport" not in recorded_sync[-1] + assert recorded_async, "expected the async default client to be built" + assert "transport" not in recorded_async[-1] + + +def test_invalid_env_values_degrade_safely(monkeypatch: pytest.MonkeyPatch) -> None: + """Garbage in LANGCHAIN_OPENAI_TCP_* env vars must not crash model init.""" + monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPIDLE", "not-an-int") + monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPINTVL", "") + monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPCNT", "NaN") + monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_USER_TIMEOUT_MS", "abc") + + opts = _client_utils._default_socket_options() + assert isinstance(opts, tuple) + # Fallback values (60/10/3/120000) are used; on Linux, the full option + # set should still be present because the fallbacks are valid. + # (Windows/darwin may filter some options; at minimum SO_KEEPALIVE + # survives.) + assert (SOL_SOCKET, SO_KEEPALIVE, 1) in opts + + # Instantiating a model doesn't raise. + ChatOpenAI(model="gpt-4o") + + +def test_invalid_stream_chunk_timeout_env_degrades_safely( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Garbage in LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S must not crash init.""" + monkeypatch.setenv("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", "not-a-float") + model = ChatOpenAI(model="gpt-4o") + assert model.stream_chunk_timeout == 120.0 + + +def test_default_socket_options_darwin(monkeypatch: pytest.MonkeyPatch) -> None: + """macOS: `TCP_USER_TIMEOUT` is unavailable, but keepalive trio maps to darwin.""" + monkeypatch.setattr(_client_utils.sys, "platform", "darwin") + opts = _client_utils._default_socket_options() + assert (SOL_SOCKET, SO_KEEPALIVE, 1) in opts + darwin_keepalive = ( + socket.IPPROTO_TCP, + _client_utils._DARWIN_TCP_KEEPALIVE, + 60, + ) + assert darwin_keepalive in opts or opts == ((SOL_SOCKET, SO_KEEPALIVE, 1),) + + +def test_default_socket_options_other_platform(monkeypatch: pytest.MonkeyPatch) -> None: + """Unknown platform (e.g. win32): `SO_KEEPALIVE` only.""" + monkeypatch.setattr(_client_utils.sys, "platform", "win32") + opts = _client_utils._default_socket_options() + assert opts in (((SOL_SOCKET, SO_KEEPALIVE, 1),), ()) + + +def test_filter_supported_probe_failure_returns_unfiltered( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Contract: probe-socket failure -> input is returned verbatim.""" + + def _raise(*args: Any, **kwargs: Any) -> None: + msg = "sandboxed" + raise OSError(msg) + + monkeypatch.setattr(_client_utils.socket, "socket", _raise) + good = (SOL_SOCKET, SO_KEEPALIVE, 1) + bogus = (0xDEAD, 0xBEEF, 1) + result = _client_utils._filter_supported([good, bogus]) + assert result == [good, bogus] + + +def test_invalid_tcp_env_emits_warning( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Int env fallback must log a WARNING naming the offending variable.""" + monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPIDLE", "not-an-int") + caplog.set_level( + logging.WARNING, logger="langchain_openai.chat_models._client_utils" + ) + _client_utils._default_socket_options() + assert any( + "LANGCHAIN_OPENAI_TCP_KEEPIDLE" in r.getMessage() + for r in caplog.records + if r.levelno == logging.WARNING + ) + + +def test_negative_tcp_env_is_rejected( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Negative keepalive counts fall back to the default with a WARNING.""" + monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPCNT", "-5") + caplog.set_level( + logging.WARNING, logger="langchain_openai.chat_models._client_utils" + ) + value = _client_utils._int_env("LANGCHAIN_OPENAI_TCP_KEEPCNT", 3) + assert value == 3 + assert any( + "negative" in r.getMessage().lower() + for r in caplog.records + if r.levelno == logging.WARNING + ) + + +@pytest.mark.enable_socket +def test_filter_supported_logs_drops_at_debug( + caplog: pytest.LogCaptureFixture, +) -> None: + """Dropped options are visible at DEBUG so a macOS user can confirm the filter.""" + try: + socket.socket(socket.AF_INET, socket.SOCK_STREAM).close() + except OSError: + pytest.skip("probe socket unavailable in this environment") + caplog.set_level(logging.DEBUG, logger="langchain_openai.chat_models._client_utils") + good = (SOL_SOCKET, SO_KEEPALIVE, 1) + bogus = (0xDEAD, 0xBEEF, 1) + _client_utils._filter_supported([good, bogus]) + assert any( + "Dropped" in r.getMessage() + for r in caplog.records + if r.levelno == logging.DEBUG + ) + + +def test_build_proxied_async_httpx_client_opt_out_returns_plain_client() -> None: + """Empty socket_options -> plain httpx.AsyncClient, no transport injection.""" + client = _client_utils._build_proxied_async_httpx_client( + proxy="http://proxy.example:3128", + verify=True, + socket_options=(), + ) + assert isinstance(client, httpx.AsyncClient) + + +def test_build_proxied_async_httpx_client_wraps_transport() -> None: + """Non-empty socket_options -> real httpx.AsyncHTTPTransport wiring executes. + + Exercises the proxy-wrapping bodies end-to-end so a change to httpx's + `Proxy`/transport signatures would surface here, not at connect time. + """ + client = _client_utils._build_proxied_async_httpx_client( + proxy="http://proxy.example:3128", + verify=True, + socket_options=((SOL_SOCKET, SO_KEEPALIVE, 1),), + ) + assert isinstance(client, httpx.AsyncClient) + + +def test_build_proxied_sync_httpx_client_opt_out_returns_plain_client() -> None: + client = _client_utils._build_proxied_sync_httpx_client( + proxy="http://proxy.example:3128", + verify=True, + socket_options=(), + ) + assert isinstance(client, httpx.Client) + + +def test_build_proxied_sync_httpx_client_wraps_transport() -> None: + client = _client_utils._build_proxied_sync_httpx_client( + proxy="http://proxy.example:3128", + verify=True, + socket_options=((SOL_SOCKET, SO_KEEPALIVE, 1),), + ) + assert isinstance(client, httpx.Client) + + +def test_warn_if_proxy_env_shadowed_emits_once( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """One WARNING per process when a proxy env var is shadowed by our transport.""" + monkeypatch.setenv("HTTP_PROXY", "http://proxy.example:3128") + monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False) + caplog.set_level( + logging.WARNING, logger="langchain_openai.chat_models._client_utils" + ) + opts = ((SOL_SOCKET, SO_KEEPALIVE, 1),) + _client_utils._warn_if_proxy_env_shadowed(opts, openai_proxy=None) + _client_utils._warn_if_proxy_env_shadowed(opts, openai_proxy=None) + warnings = [ + r + for r in caplog.records + if r.levelno == logging.WARNING and "HTTP_PROXY" in r.getMessage() + ] + assert len(warnings) == 1 + + +def test_warn_if_proxy_env_shadowed_detects_lowercase( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Lowercase `http_proxy` is picked up by httpx; the warning must fire for it.""" + for name in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY"): + monkeypatch.delenv(name, raising=False) + monkeypatch.setenv("http_proxy", "http://proxy.example:3128") + monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False) + caplog.set_level( + logging.WARNING, logger="langchain_openai.chat_models._client_utils" + ) + opts = ((SOL_SOCKET, SO_KEEPALIVE, 1),) + _client_utils._warn_if_proxy_env_shadowed(opts, openai_proxy=None) + warnings = [ + r + for r in caplog.records + if r.levelno == logging.WARNING and "http_proxy" in r.getMessage() + ] + assert len(warnings) == 1 + + +def test_warn_if_proxy_env_shadowed_detects_system_proxy( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """macOS/Windows system proxies shadow the transport too; warning should fire.""" + for name in _client_utils._PROXY_ENV_VARS: + monkeypatch.delenv(name, raising=False) + monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False) + monkeypatch.setattr( + _client_utils.urllib.request, + "getproxies", + lambda: {"http": "http://system.proxy:3128"}, + ) + caplog.set_level( + logging.WARNING, logger="langchain_openai.chat_models._client_utils" + ) + opts = ((SOL_SOCKET, SO_KEEPALIVE, 1),) + _client_utils._warn_if_proxy_env_shadowed(opts, openai_proxy=None) + warnings = [ + r + for r in caplog.records + if r.levelno == logging.WARNING and "system proxy" in r.getMessage() + ] + assert len(warnings) == 1 + + +def test_warn_if_proxy_env_shadowed_skipped_when_openai_proxy_set( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Explicit `openai_proxy` suppresses the warn (proxy handling is controlled).""" + monkeypatch.setenv("HTTP_PROXY", "http://proxy.example:3128") + monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False) + caplog.set_level( + logging.WARNING, logger="langchain_openai.chat_models._client_utils" + ) + opts = ((SOL_SOCKET, SO_KEEPALIVE, 1),) + _client_utils._warn_if_proxy_env_shadowed( + opts, openai_proxy="http://proxy.example:3128" + ) + assert not [r for r in caplog.records if r.levelno == logging.WARNING] + + +def test_proxy_env_bypass_default_shape_triggers( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Default-shape + env proxy => bypass socket-option transport.""" + monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128") + assert _client_utils._should_bypass_socket_options_for_proxy_env( + http_socket_options=None, + http_client=None, + http_async_client=None, + openai_proxy=None, + ) + + +def test_proxy_env_bypass_no_env_does_not_trigger( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """No proxy env/system proxy => no bypass, even with everything else default.""" + for name in _client_utils._PROXY_ENV_VARS: + monkeypatch.delenv(name, raising=False) + monkeypatch.setattr(_client_utils.urllib.request, "getproxies", dict) + assert not _client_utils._should_bypass_socket_options_for_proxy_env( + http_socket_options=None, + http_client=None, + http_async_client=None, + openai_proxy=None, + ) + + +def test_proxy_env_bypass_blocked_by_explicit_socket_options( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Explicit `http_socket_options` => user opted in, no bypass.""" + monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128") + assert not _client_utils._should_bypass_socket_options_for_proxy_env( + http_socket_options=[(SOL_SOCKET, SO_KEEPALIVE, 1)], + http_client=None, + http_async_client=None, + openai_proxy=None, + ) + # Empty tuple is also an explicit choice (kill-switch), no bypass. + assert not _client_utils._should_bypass_socket_options_for_proxy_env( + http_socket_options=(), + http_client=None, + http_async_client=None, + openai_proxy=None, + ) + + +def test_proxy_env_bypass_blocked_by_kill_switch( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """`LANGCHAIN_OPENAI_TCP_KEEPALIVE=0` => kill-switch owns the disable path.""" + monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128") + monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "0") + assert not _client_utils._should_bypass_socket_options_for_proxy_env( + http_socket_options=None, + http_client=None, + http_async_client=None, + openai_proxy=None, + ) + + +def test_proxy_env_bypass_blocked_by_user_http_client( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Any user-supplied http(_async)_client => user opted in, no bypass.""" + monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128") + user_client = httpx.Client() + try: + assert not _client_utils._should_bypass_socket_options_for_proxy_env( + http_socket_options=None, + http_client=user_client, + http_async_client=None, + openai_proxy=None, + ) + finally: + user_client.close() + + async_client = httpx.AsyncClient() + try: + assert not _client_utils._should_bypass_socket_options_for_proxy_env( + http_socket_options=None, + http_client=None, + http_async_client=async_client, + openai_proxy=None, + ) + finally: + asyncio.run(async_client.aclose()) + + +def test_proxy_env_bypass_blocked_by_openai_proxy( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """`openai_proxy` handles proxying explicitly => no bypass.""" + monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128") + assert not _client_utils._should_bypass_socket_options_for_proxy_env( + http_socket_options=None, + http_client=None, + http_async_client=None, + openai_proxy="http://openai.proxy:3128", + ) + + +def test_proxy_env_bypass_detects_lowercase_env( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Lowercase `https_proxy` also triggers the bypass.""" + for name in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY"): + monkeypatch.delenv(name, raising=False) + monkeypatch.setenv("https_proxy", "http://proxy.example:3128") + assert _client_utils._should_bypass_socket_options_for_proxy_env( + http_socket_options=None, + http_client=None, + http_async_client=None, + openai_proxy=None, + ) + + +def test_proxy_env_bypass_detects_system_proxy( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """macOS/Windows system proxy config triggers the bypass too.""" + for name in _client_utils._PROXY_ENV_VARS: + monkeypatch.delenv(name, raising=False) + monkeypatch.setattr( + _client_utils.urllib.request, + "getproxies", + lambda: {"http": "http://system.proxy:3128"}, + ) + assert _client_utils._should_bypass_socket_options_for_proxy_env( + http_socket_options=None, + http_client=None, + http_async_client=None, + openai_proxy=None, + ) + + +def test_log_proxy_env_bypass_once_emits_info_once( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """One INFO per process when the bypass kicks in.""" + monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128") + monkeypatch.setattr(_client_utils, "_proxy_env_bypass_info_emitted", False) + caplog.set_level(logging.INFO, logger="langchain_openai.chat_models._client_utils") + _client_utils._log_proxy_env_bypass_once() + _client_utils._log_proxy_env_bypass_once() + infos = [ + r + for r in caplog.records + if r.levelno == logging.INFO and "HTTPS_PROXY" in r.getMessage() + ] + assert len(infos) == 1 + + +def test_client_build_skips_transport_on_proxy_env_default_shape( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """End-to-end: default-shape ChatOpenAI + HTTPS_PROXY => no custom transport. + + Locks that the bypass wiring in `base.py` actually prevents the default + builder from installing `httpx.HTTPTransport(socket_options=...)`. The + async client's `_transport` (or underlying mount) should be httpx's + default, not ours. + """ + monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128") + # Neutralise module-level latches so repeated runs still exercise logging. + monkeypatch.setattr(_client_utils, "_proxy_env_bypass_info_emitted", False) + monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False) + # Clear cached builder results so env changes take effect. + _client_utils._cached_sync_httpx_client.cache_clear() + _client_utils._cached_async_httpx_client.cache_clear() + + recorded: list[tuple[Any, ...]] = [] + + original_build = _client_utils._build_async_httpx_client + + def spy( + base_url: str | None, + timeout: Any, + socket_options: tuple = (), + ) -> Any: + recorded.append(socket_options) + return original_build(base_url, timeout, socket_options) + + monkeypatch.setattr(_client_utils, "_build_async_httpx_client", spy) + # `_get_default_async_httpx_client` reaches the cached builder directly, + # which ignores our module-level patch; bypass the cache to route through + # the spy. + monkeypatch.setattr( + _client_utils, + "_cached_async_httpx_client", + spy, + ) + + ChatOpenAI(model="gpt-5.1") + + assert recorded, "async builder should have been called" + assert all(opts == () for opts in recorded), ( + f"expected bypass (no socket options), got {recorded!r}" + ) + + +def test_client_build_applies_socket_options_when_user_opts_in( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Explicit `http_socket_options` => transport applied, bypass skipped.""" + monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:3128") + monkeypatch.setattr(_client_utils, "_proxy_env_bypass_info_emitted", False) + monkeypatch.setattr(_client_utils, "_proxy_env_warning_emitted", False) + _client_utils._cached_sync_httpx_client.cache_clear() + _client_utils._cached_async_httpx_client.cache_clear() + + recorded: list[tuple[Any, ...]] = [] + original_build = _client_utils._build_async_httpx_client + + def spy( + base_url: str | None, + timeout: Any, + socket_options: tuple = (), + ) -> Any: + recorded.append(socket_options) + return original_build(base_url, timeout, socket_options) + + monkeypatch.setattr(_client_utils, "_build_async_httpx_client", spy) + monkeypatch.setattr(_client_utils, "_cached_async_httpx_client", spy) + + explicit = [(SOL_SOCKET, SO_KEEPALIVE, 1)] + ChatOpenAI(model="gpt-5.1", http_socket_options=explicit) + + assert recorded, "async builder should have been called" + assert all(tuple(opts) == tuple(explicit) for opts in recorded), ( + f"expected user-supplied opts, got {recorded!r}" + ) diff --git a/libs/partners/openai/tests/unit_tests/chat_models/test_stream_chunk_timeout.py b/libs/partners/openai/tests/unit_tests/chat_models/test_stream_chunk_timeout.py new file mode 100644 index 00000000000..f192a2ed458 --- /dev/null +++ b/libs/partners/openai/tests/unit_tests/chat_models/test_stream_chunk_timeout.py @@ -0,0 +1,437 @@ +"""Unit tests for `_astream_with_chunk_timeout` and `StreamChunkTimeoutError`. + +- Pass-through when items arrive in time. +- Timeout fires with a self-describing message + subclasses TimeoutError. +- Structured WARNING log carries `source=stream_chunk_timeout` + + `timeout_s` so aggregate logging can distinguish app-layer from + transport-layer timeouts. +- Source iterator's `aclose()` is called on early exit to release the + underlying httpx connection promptly. +- Garbage in `LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S` degrades safely. +""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import AsyncGenerator +from types import TracebackType +from typing import Any, cast +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from typing_extensions import Self + +from langchain_openai import ChatOpenAI +from langchain_openai.chat_models._client_utils import ( + StreamChunkTimeoutError, + _astream_with_chunk_timeout, +) + +MODEL = "gpt-5.4" + + +class _FakeSource: + """AsyncIterator with an observable aclose() for leak-testing.""" + + def __init__(self, items: list[Any], per_item_sleep: float = 0.0) -> None: + self._items = list(items) + self._sleep = per_item_sleep + self.aclose_count = 0 + + def __aiter__(self) -> _FakeSource: + return self + + async def __anext__(self) -> Any: + if self._sleep: + await asyncio.sleep(self._sleep) + if not self._items: + raise StopAsyncIteration + return self._items.pop(0) + + async def aclose(self) -> None: + self.aclose_count += 1 + + +@pytest.mark.asyncio +async def test_astream_with_chunk_timeout_passes_through() -> None: + """Fast source + generous timeout: every item should be delivered.""" + source = _FakeSource(["a", "b", "c"], per_item_sleep=0.0) + collected = [item async for item in _astream_with_chunk_timeout(source, 5.0)] + assert collected == ["a", "b", "c"] + + +@pytest.mark.asyncio +async def test_astream_with_chunk_timeout_disabled_passes_through() -> None: + """timeout=None / timeout=0 disables the bound; still iterates normally.""" + source_none = _FakeSource(["a", "b"]) + collected_none = [ + item async for item in _astream_with_chunk_timeout(source_none, None) + ] + assert collected_none == ["a", "b"] + + source_zero = _FakeSource(["x", "y"]) + collected_zero = [ + item async for item in _astream_with_chunk_timeout(source_zero, 0.0) + ] + assert collected_zero == ["x", "y"] + + +@pytest.mark.asyncio +async def test_astream_with_chunk_timeout_fires() -> None: + """Slow source + tight timeout: `StreamChunkTimeoutError` fires.""" + source = _FakeSource(["a", "b"], per_item_sleep=0.2) + with pytest.raises(StreamChunkTimeoutError) as exc_info: + async for _ in _astream_with_chunk_timeout(source, 0.05): + pass + + # Backward-compat: existing `except TimeoutError:` handlers must still catch. + assert isinstance(exc_info.value, asyncio.TimeoutError) + assert isinstance(exc_info.value, TimeoutError) + + # Self-describing message names the knob and env var so operators can act. + msg = str(exc_info.value) + assert "stream_chunk_timeout" in msg + assert "LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S" in msg + + +@pytest.mark.asyncio +async def test_astream_with_chunk_timeout_logs_on_fire( + caplog: pytest.LogCaptureFixture, +) -> None: + """Structured log carries source + timeout_s for aggregate-log filtering.""" + # Pin the logger + level; don't rely on caplog's default or module + # inheritance so the test can't silently no-op. + caplog.set_level( + logging.WARNING, logger="langchain_openai.chat_models._client_utils" + ) + + source = _FakeSource(["a"], per_item_sleep=0.2) + with pytest.raises(StreamChunkTimeoutError): + async for _ in _astream_with_chunk_timeout(source, 0.05): + pass + + records = [ + r + for r in caplog.records + if r.name == "langchain_openai.chat_models._client_utils" + and getattr(r, "source", None) == "stream_chunk_timeout" + ] + assert len(records) == 1, f"expected one structured record, got {len(records)}" + record = records[0] + assert record.levelno == logging.WARNING + assert record.__dict__["timeout_s"] == 0.05 + + +@pytest.mark.asyncio +async def test_astream_with_chunk_timeout_closes_source_on_early_exit() -> None: + """aclose() is called on early exit so the httpx connection is released promptly. + + Covers both the timeout-fires path and the consumer-closes-wrapper path. + """ + # Case 1: timeout fires -> aclose() propagates. + timed_out_source = _FakeSource(["a"], per_item_sleep=0.2) + with pytest.raises(StreamChunkTimeoutError): + async for _ in _astream_with_chunk_timeout(timed_out_source, 0.05): + pass + assert timed_out_source.aclose_count == 1 + + # Case 2: consumer explicitly closes the wrapper after one yield. + closer_source = _FakeSource(["a", "b", "c"], per_item_sleep=0.0) + # Cast to AsyncGenerator so mypy sees the aclose() method; the helper + # is always implemented as an async generator at runtime. + wrapper = cast( + "AsyncGenerator[Any, None]", + _astream_with_chunk_timeout(closer_source, 5.0), + ) + got = await wrapper.__anext__() + assert got == "a" + await wrapper.aclose() + assert closer_source.aclose_count == 1 + + +def test_invalid_stream_chunk_timeout_env_degrades_safely( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Garbage env var -> model init succeeds with the 120s default.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + monkeypatch.setenv("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", "not-a-float") + model = ChatOpenAI(model=MODEL) + assert model.stream_chunk_timeout == 120.0 + + +def test_stream_chunk_timeout_env_kill_switch_zero( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Env-var kill-switch: `_S=0` should disable the wrapper on the model.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + monkeypatch.setenv("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", "0") + model = ChatOpenAI(model=MODEL) + assert model.stream_chunk_timeout == 0.0 + + +def test_stream_chunk_timeout_kwarg_none_disables( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Constructor kwarg opt-out: `stream_chunk_timeout=None` persists.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + model = ChatOpenAI(model=MODEL, stream_chunk_timeout=None) + assert model.stream_chunk_timeout is None + + +def test_stream_chunk_timeout_error_has_structured_attrs() -> None: + """Structured payload mirrors the log `extra=`; no message-regex needed.""" + err = StreamChunkTimeoutError(0.5, model_name=MODEL, chunks_received=3) + assert err.timeout_s == 0.5 + assert err.model_name == "gpt-5.4" + assert err.chunks_received == 3 + text = str(err) + assert "gpt-5.4" in text + assert "chunks_received=3" in text + + +@pytest.mark.asyncio +async def test_astream_with_chunk_timeout_threads_model_name( + caplog: pytest.LogCaptureFixture, +) -> None: + """`model_name` flows into both the raised error and the structured log.""" + caplog.set_level( + logging.WARNING, logger="langchain_openai.chat_models._client_utils" + ) + source = _FakeSource(["a", "b"], per_item_sleep=0.2) + with pytest.raises(StreamChunkTimeoutError) as exc_info: + async for _ in _astream_with_chunk_timeout( + source, 0.05, model_name="gpt-4o-mini" + ): + pass + assert exc_info.value.model_name == "gpt-4o-mini" + records = [ + r + for r in caplog.records + if getattr(r, "source", None) == "stream_chunk_timeout" + ] + assert records + assert records[0].__dict__["model_name"] == "gpt-4o-mini" + + +def test_invalid_stream_chunk_timeout_env_emits_warning( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Fallback is logged at WARNING so the typo is discoverable.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + monkeypatch.setenv("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", "nonsense") + caplog.set_level( + logging.WARNING, logger="langchain_openai.chat_models._client_utils" + ) + ChatOpenAI(model=MODEL) + assert any( + "LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S" in r.getMessage() + for r in caplog.records + if r.levelno == logging.WARNING + ) + + +def test_negative_stream_chunk_timeout_env_rejected( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Negative timeout typo must not silently disable the wrapper.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + monkeypatch.setenv("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", "-10") + caplog.set_level( + logging.WARNING, logger="langchain_openai.chat_models._client_utils" + ) + model = ChatOpenAI(model=MODEL) + assert model.stream_chunk_timeout == 120.0 + assert any( + "negative" in r.getMessage().lower() + for r in caplog.records + if r.levelno == logging.WARNING + ) + + +def test_negative_stream_chunk_timeout_kwarg_rejected( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Negative kwarg (e.g., from YAML/JSON configs) must not disable the wrapper. + + Mirrors the env-var path: fall back to the default and emit a WARNING + rather than silently treating a negative value as an opt-out — `None` / + `0` are the documented off switches. + """ + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + caplog.set_level(logging.WARNING, logger="langchain_openai.chat_models.base") + model = ChatOpenAI(model=MODEL, stream_chunk_timeout=-10) + assert model.stream_chunk_timeout == 120.0 + assert any( + "negative" in r.getMessage().lower() + and "stream_chunk_timeout" in r.getMessage() + for r in caplog.records + if r.levelno == logging.WARNING + ) + + +def test_zero_stream_chunk_timeout_kwarg_preserved( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """`stream_chunk_timeout=0` is the documented opt-out and must persist.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + model = ChatOpenAI(model=MODEL, stream_chunk_timeout=0) + assert model.stream_chunk_timeout == 0 + + +class _SlowAsyncContextManager: + """Async context manager that sleeps between streamed items.""" + + def __init__(self, chunks: list[Any], per_item_sleep: float) -> None: + self._chunks = list(chunks) + self._sleep = per_item_sleep + self._iter = iter(chunks) + + async def __aenter__(self) -> Self: + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + return None + + def __aiter__(self) -> Self: + return self + + async def __anext__(self) -> Any: + await asyncio.sleep(self._sleep) + try: + return next(self._iter) + except StopIteration as exc: + raise StopAsyncIteration from exc + + +class _SlowSyncContextManager: + """Sync context manager mirror of `_SlowAsyncContextManager`. + + Sleeps between items in wall-clock time. The sync path never uses + `asyncio.wait_for`, so a tight `stream_chunk_timeout` should have no + effect here — that is the invariant we want to lock. + """ + + def __init__(self, chunks: list[Any], per_item_sleep: float) -> None: + self._chunks = list(chunks) + self._sleep = per_item_sleep + self._iter = iter(chunks) + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + return None + + def __iter__(self) -> Self: + return self + + def __next__(self) -> Any: + import time as _time + + _time.sleep(self._sleep) + try: + return next(self._iter) + except StopIteration: + raise + + +@pytest.mark.asyncio +async def test_astream_integration_raises_stream_chunk_timeout_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """End-to-end: slow async stream + tight timeout must raise. + + Guards against a refactor that drops the `_astream_with_chunk_timeout` + wrapper from the `_astream` path — unit tests on the helper alone + wouldn't catch that regression. + """ + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + llm = ChatOpenAI(model=MODEL, stream_chunk_timeout=0.05) + fake_chunks = [ + { + "id": "c1", + "object": "chat.completion.chunk", + "created": 1, + "model": "gpt-4o", + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": "hi"}, + "finish_reason": None, + } + ], + }, + ] + mock_client = AsyncMock() + + async def mock_create(*args: Any, **kwargs: Any) -> _SlowAsyncContextManager: + return _SlowAsyncContextManager(fake_chunks, per_item_sleep=0.3) + + mock_client.create = mock_create + with ( + patch.object(llm, "async_client", mock_client), + pytest.raises(StreamChunkTimeoutError) as exc_info, + ): + async for _ in llm.astream("hello"): + pass + assert exc_info.value.model_name == MODEL + + +def test_stream_sync_not_wrapped_by_chunk_timeout( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Sync `llm.stream()` must not be subject to `stream_chunk_timeout`. + + Setting `stream_chunk_timeout=0.01` with a 100ms-per-chunk sync source + would raise if the wrapper were (incorrectly) applied to the sync path. + Completion without error proves the contract. + """ + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + llm = ChatOpenAI(model=MODEL, stream_chunk_timeout=0.01) + fake_chunks = [ + { + "id": "c1", + "object": "chat.completion.chunk", + "created": 1, + "model": "gpt-4o", + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": "hi"}, + "finish_reason": None, + } + ], + }, + { + "id": "c2", + "object": "chat.completion.chunk", + "created": 1, + "model": "gpt-4o", + "choices": [ + {"index": 0, "delta": {}, "finish_reason": "stop"}, + ], + }, + ] + mock_client = MagicMock() + + def _create(*_args: Any, **_kwargs: Any) -> _SlowSyncContextManager: + return _SlowSyncContextManager(fake_chunks, per_item_sleep=0.1) + + mock_client.create = _create + with patch.object(llm, "client", mock_client): + chunks = list(llm.stream("hello")) + assert chunks, "sync stream should have delivered chunks" diff --git a/libs/partners/openai/tests/unit_tests/test_imports.py b/libs/partners/openai/tests/unit_tests/test_imports.py index 144a394c2ff..2d3255366ba 100644 --- a/libs/partners/openai/tests/unit_tests/test_imports.py +++ b/libs/partners/openai/tests/unit_tests/test_imports.py @@ -7,6 +7,7 @@ EXPECTED_ALL = [ "AzureOpenAI", "AzureChatOpenAI", "AzureOpenAIEmbeddings", + "StreamChunkTimeoutError", "custom_tool", ]