feat(openai): prevent silent streaming hangs in ChatOpenAI (#36949)

> [!IMPORTANT]
> **Behavior change on upgrade — minor bump (`1.1.16` → `1.2.0`).**
>
> Streaming calls now raise `StreamChunkTimeoutError` (a `TimeoutError`
subclass — existing `except TimeoutError:` / `except
asyncio.TimeoutError:` handlers catch it) after 120s of content silence
instead of hanging forever. Opt out with `stream_chunk_timeout=None` or
`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S=0`.
>
> Kernel-level TCP keepalive / `TCP_USER_TIMEOUT` are applied via a
custom `httpx` transport. `httpx` disables its env-proxy auto-detection
(`HTTP_PROXY` / `HTTPS_PROXY` / `ALL_PROXY` / `NO_PROXY` and
macOS/Windows system proxy) whenever a transport is supplied, so to
avoid silently breaking enterprise proxy users, `ChatOpenAI` now detects
the "proxy-env-shadow" shape at construction and **skips the custom
transport entirely** when **all** of these hold:
>
> - `http_socket_options` left at default (`None`)
> - No `http_client` or `http_async_client` supplied
> - No `openai_proxy` supplied
> - A proxy env var / system proxy is visible to httpx
>
> On that shape the instance falls back to pre-PR behavior and env-proxy
auto-detection still applies. A one-time `INFO` records the bypass.
>
> Users who explicitly set `http_socket_options=[...]` alongside an env
proxy still get the shadowed behavior with a one-time `WARNING` log —
they opted in. Full opt-outs below.

---

Streaming chat completions can hang forever when the underlying TCP
connection silently dies mid-stream (idle NAT/LB timeouts, sandboxed
runtimes killing long-lived connections, peer gone without a FIN or
RST). httpx's read timeout doesn't help here because it's reset by any
bytes arriving on the socket, including OpenAI's SSE keepalive comments,
so a stream that's quiet on content but still producing keepalives looks
alive forever.

This PR adds two knobs to `ChatOpenAI`, both on by default with
opt-outs:

- `stream_chunk_timeout` (default 120s): wraps the async streaming
iterator in `asyncio.wait_for` per chunk. Measures the gap between
*parsed* SSE chunks, so keepalives don't reset it. Fires on genuine
content silence and raises `StreamChunkTimeoutError` — a `TimeoutError`
subclass carrying `timeout_s`, `model_name`, and `chunks_received` as
structured attributes (mirrored in the WARNING log's `extra=`) for
alerting without message-regex. Override with the kwarg or
`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S`.
- `http_socket_options`: applies `SO_KEEPALIVE` + `TCP_KEEPIDLE` /
`TCP_KEEPINTVL` / `TCP_KEEPCNT` + `TCP_USER_TIMEOUT` on Linux (macOS
equivalents where available). On platforms missing some options, they're
dropped silently and the remaining set still does useful work.

Pool limits are set explicitly on the custom transport to mirror the
`openai` SDK — without that, passing `transport=` to `httpx.AsyncClient`
silently shrinks the connection pool.

## Behavior change

The default-shape proxy-env bypass (above) covers the common enterprise
case. Beyond that:

- Connections that would previously have hung forever will now error out
via `StreamChunkTimeoutError`.
- Users who explicitly opt into `http_socket_options` while also relying
on env proxies will see a one-time `WARNING` and lose env-proxy
auto-detection — the custom transport shadows it. This is the original
shipped behavior, retained for anyone who *wants* socket tuning on top
of an env-proxied setup.

Full opt-outs:

- `stream_chunk_timeout=None` or
`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S=0`
- `http_socket_options=()` or `LANGCHAIN_OPENAI_TCP_KEEPALIVE=0`
- Supply your own `http_client` **and** `http_async_client`.
`http_socket_options` is applied per side: passing only one still leaves
the other side's default builder getting socket options. Supply both (or
combine with `http_socket_options=()`) to take full control.

Unparseable or negative values for the `LANGCHAIN_OPENAI_*` env vars
fall back to the default with a `WARNING` log rather than silently being
accepted, so a misconfigured environment still boots but the fallback is
discoverable.

---------

Co-authored-by: Mason Daugherty <github@mdrxy.com>
Co-authored-by: Mason Daugherty <mason@langchain.dev>
This commit is contained in:
Asamu David
2026-04-23 01:28:43 +01:00
committed by GitHub
parent b57eea2aed
commit 4000c22376
6 changed files with 1998 additions and 49 deletions

View File

@@ -1,6 +1,7 @@
"""Module for OpenAI integrations."""
from langchain_openai.chat_models import AzureChatOpenAI, ChatOpenAI
from langchain_openai.chat_models._client_utils import StreamChunkTimeoutError
from langchain_openai.embeddings import AzureOpenAIEmbeddings, OpenAIEmbeddings
from langchain_openai.llms import AzureOpenAI, OpenAI
from langchain_openai.tools import custom_tool
@@ -12,5 +13,6 @@ __all__ = [
"ChatOpenAI",
"OpenAI",
"OpenAIEmbeddings",
"StreamChunkTimeoutError",
"custom_tool",
]

View File

@@ -1,23 +1,363 @@
"""Helpers for creating OpenAI API clients.
"""Helpers for OpenAI httpx client construction, transport tuning, and streaming.
This module allows for the caching of httpx clients to avoid creating new instances
for each instance of ChatOpenAI.
Covers cached default client builders, proxy-aware variants for the
`openai_proxy` path, kernel-level TCP keepalive / `TCP_USER_TIMEOUT` socket
options, and the `_astream_with_chunk_timeout` wrapper that bounds per-chunk
wall-clock time on async SSE streams.
Logic is largely replicated from openai._base_client.
Client-builder boilerplate mirrors the patterns in `openai._base_client`;
socket-option tuning and the streaming timeout are original to this module.
"""
from __future__ import annotations
import asyncio
import inspect
import logging
import os
from collections.abc import Awaitable, Callable
import socket
import sys
import urllib.request
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
from functools import lru_cache
from typing import Any, cast
from typing import Any, TypeVar, cast
import httpx
import openai
from pydantic import SecretStr
logger = logging.getLogger(__name__)
SocketOption = tuple[int, int, int]
# socket.TCP_KEEPIDLE etc. are absent on darwin/win32; use raw UAPI constants.
_LINUX_TCP_KEEPIDLE = 4
_LINUX_TCP_KEEPINTVL = 5
_LINUX_TCP_KEEPCNT = 6
_LINUX_TCP_USER_TIMEOUT = 18
# macOS: same semantics, different constants from <netinet/tcp.h>.
_DARWIN_TCP_KEEPALIVE = 0x10 # idle seconds before first probe
_DARWIN_TCP_KEEPINTVL = 0x101
_DARWIN_TCP_KEEPCNT = 0x102
# Mirrors the openai SDK's pool defaults. Hardcoded to avoid depending on
# an internal module path (openai._constants) that can move across SDK versions.
_DEFAULT_CONNECTION_LIMITS = httpx.Limits(
max_connections=1000,
max_keepalive_connections=100,
keepalive_expiry=5.0,
)
def _int_env(name: str, default: int, *, allow_negative: bool = False) -> int:
"""Read an int env var with graceful fallback + discoverable warning.
Unparseable or (by default) negative values fall back to `default` and
emit a single `WARNING` naming the offending variable. A misconfigured
environment still loads, but operators see the fallback in their logs
rather than silently getting a surprising default.
"""
raw = os.environ.get(name)
if raw is None:
return default
try:
value = int(raw)
except (TypeError, ValueError):
logger.warning(
"Invalid value for %s=%r (not an int); falling back to %d.",
name,
raw,
default,
)
return default
if not allow_negative and value < 0:
logger.warning(
"Invalid value for %s=%r (negative); falling back to %d.",
name,
raw,
default,
)
return default
return value
def _float_env(name: str, default: float, *, allow_negative: bool = False) -> float:
"""Read a float env var with graceful fallback + discoverable warning.
See `_int_env`. Negative values are rejected by default so a typo in
`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S=-10` can't silently disable the
wrapper it was meant to configure.
"""
raw = os.environ.get(name)
if raw is None:
return default
try:
value = float(raw)
except (TypeError, ValueError):
logger.warning(
"Invalid value for %s=%r (not a float); falling back to %s.",
name,
raw,
default,
)
return default
if not allow_negative and value < 0:
logger.warning(
"Invalid value for %s=%r (negative); falling back to %s.",
name,
raw,
default,
)
return default
return value
def _filter_supported(opts: list[SocketOption]) -> list[SocketOption]:
"""Drop socket options the running platform rejects.
Probes each option against a throwaway socket via `setsockopt` and keeps
only those the kernel accepts. This keeps the library-computed defaults
non-fatal across platforms that don't implement every Linux option —
`TCP_USER_TIMEOUT` in particular is Linux-only and silently missing on
macOS, some minimal kernels, and older gVisor builds. Dropped options
are logged at `DEBUG` so an operator can confirm whether a kernel-level
knob took effect on their platform.
If the probe socket cannot be created (sandboxed runtimes, `pytest-socket`
under `--disable-socket`, tight seccomp policies), the input list is
returned unfiltered. This preserves the pass-through behavior used for
explicit user overrides: unsupported options will surface as a clear
`OSError` at the first real `connect()` rather than being silently
dropped during `ChatOpenAI` construction.
"""
try:
probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except Exception:
# Broad catch is deliberate: `pytest_socket` under `--disable-socket`
# raises `SocketBlockedError` (a `RuntimeError`, not `OSError`), and
# seccomp/sandboxed runtimes have been observed to raise other
# `OSError` subclasses and `PermissionError`. The intent is "any
# inability to create a probe socket -> pass through unfiltered,"
# and narrowing the type would silently regress sandboxed CI.
return list(opts)
try:
supported: list[SocketOption] = []
dropped: list[SocketOption] = []
for level, optname, optval in opts:
try:
probe.setsockopt(level, optname, optval)
except OSError:
dropped.append((level, optname, optval))
continue
supported.append((level, optname, optval))
if dropped:
logger.debug(
"Dropped %d unsupported socket option(s) on %s: %s",
len(dropped),
sys.platform,
dropped,
)
return supported
finally:
probe.close()
def _default_socket_options() -> tuple[SocketOption, ...]:
"""Return default TCP socket options, or `()` if disabled via env.
Always returns a tuple (never None) so callers and `@lru_cache` keys
remain uniform: `()` is the single shape for "no options".
Target behavior on Linux/gVisor with the full option set: silent peers
are surfaced within ~90-120s via `SO_KEEPALIVE` + `TCP_USER_TIMEOUT`
(keepalive path gives a ~90s floor at the defaults; `TCP_USER_TIMEOUT`
caps at 120s). On platforms that reject some options,
`_filter_supported` drops them and the bound degrades to whatever the
remaining options provide.
"""
if os.environ.get("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "1") == "0":
return ()
keepidle = _int_env("LANGCHAIN_OPENAI_TCP_KEEPIDLE", 60)
keepintvl = _int_env("LANGCHAIN_OPENAI_TCP_KEEPINTVL", 10)
keepcnt = _int_env("LANGCHAIN_OPENAI_TCP_KEEPCNT", 3)
user_timeout_ms = _int_env("LANGCHAIN_OPENAI_TCP_USER_TIMEOUT_MS", 120000)
opts: list[SocketOption] = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
if sys.platform == "linux":
opts += [
(socket.IPPROTO_TCP, _LINUX_TCP_KEEPIDLE, keepidle),
(socket.IPPROTO_TCP, _LINUX_TCP_KEEPINTVL, keepintvl),
(socket.IPPROTO_TCP, _LINUX_TCP_KEEPCNT, keepcnt),
(socket.IPPROTO_TCP, _LINUX_TCP_USER_TIMEOUT, user_timeout_ms),
]
elif sys.platform == "darwin":
opts += [
(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPALIVE, keepidle),
(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPINTVL, keepintvl),
(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPCNT, keepcnt),
]
# Windows (win32): SO_KEEPALIVE only; per-option tuning requires WSAIoctl.
return tuple(_filter_supported(opts))
_PROXY_ENV_VARS = (
"HTTP_PROXY",
"HTTPS_PROXY",
"ALL_PROXY",
"http_proxy",
"https_proxy",
"all_proxy",
)
_proxy_env_warning_emitted = False
_proxy_env_bypass_info_emitted = False
def _proxy_env_detected() -> bool:
"""True when httpx would pick up a proxy from env or system config.
Mirrors the surface httpx reads (`urllib.request.getproxies()` plus the
uppercase env var names) so a positive result means env-proxy
auto-detection is live on pre-PR code paths.
"""
if any(os.environ.get(name) for name in _PROXY_ENV_VARS):
return True
try:
return bool(urllib.request.getproxies())
except Exception:
return False
def _should_bypass_socket_options_for_proxy_env(
*,
http_socket_options: Sequence[SocketOption] | None,
http_client: Any,
http_async_client: Any,
openai_proxy: str | None,
) -> bool:
"""True when default shape + env proxy detected → skip transport injection.
Preserves pre-PR behavior for apps relying on httpx's env-proxy
auto-detection. Only triggers when the user has made no explicit choice
that would signal they want the custom transport:
- `http_socket_options` left at `None` (default, not `()` or a sequence)
- `LANGCHAIN_OPENAI_TCP_KEEPALIVE` is not `0` (kill-switch is its own path)
- No `http_client` or `http_async_client` supplied
- No `openai_proxy` supplied
- A proxy env var / system proxy is visible to httpx
If any of those are set, the user has opted in to the transport path
(directly or via `openai_proxy`) and normal behavior — including the
shadowed-proxy WARNING — applies. When the kill-switch is set,
`_default_socket_options` already returns `()`, so the bypass INFO
would be noise; route through the normal path instead.
"""
if http_socket_options is not None:
return False
if os.environ.get("LANGCHAIN_OPENAI_TCP_KEEPALIVE", "1") == "0":
return False
if http_client is not None or http_async_client is not None:
return False
if openai_proxy:
return False
return _proxy_env_detected()
def _log_proxy_env_bypass_once() -> None:
"""Emit a one-time INFO when the proxy-env bypass triggers.
Visibility for operators running with a custom log pipeline: the bypass
is the *safe* outcome (env-proxy auto-detection preserved), but it means
socket-level keepalive / `TCP_USER_TIMEOUT` aren't applied on this
instance. INFO-level, since it's not a problem — just a diagnostic.
"""
global _proxy_env_bypass_info_emitted
if _proxy_env_bypass_info_emitted:
return
_proxy_env_bypass_info_emitted = True
active = [name for name in _PROXY_ENV_VARS if os.environ.get(name)]
source = ", ".join(active) if active else "system proxy configuration"
logger.info(
"langchain-openai detected %s and no explicit `http_socket_options` / "
"`http_client` / `http_async_client` / `openai_proxy`; skipping the "
"custom `httpx` transport so httpx's env-proxy auto-detection applies. "
"Pass `http_socket_options=[...]` to opt back into kernel-level TCP "
"keepalive tuning on top of the env proxy.",
source,
)
def _warn_if_proxy_env_shadowed(
socket_options: tuple[SocketOption, ...],
*,
openai_proxy: str | None,
) -> None:
"""Warn once if a custom transport will shadow httpx's proxy auto-detection.
When `socket_options` is non-empty we pass a custom `httpx` transport,
which disables httpx's native proxy auto-detection — both the uppercase
`HTTP_PROXY` / `HTTPS_PROXY` / `ALL_PROXY` env vars and their lowercase
equivalents, plus macOS/Windows system proxy config. If the user
supplies `openai_proxy` explicitly we route through it and the env-var
handling is moot. Otherwise, a user whose app was transparently relying
on any of those sources will silently stop using them on upgrade —
emit a single WARNING so the behavior change is discoverable.
Detection uses `urllib.request.getproxies()` — the same surface httpx
reads — so lowercase env vars and macOS/Windows system proxy settings
are caught alongside the uppercase names.
"""
global _proxy_env_warning_emitted
if _proxy_env_warning_emitted or not socket_options or openai_proxy:
return
active = [name for name in _PROXY_ENV_VARS if os.environ.get(name)]
try:
detected = bool(urllib.request.getproxies())
except Exception:
detected = False
if not active and not detected:
return
_proxy_env_warning_emitted = True
if active:
source = ", ".join(active) + " set in environment"
else:
source = "system proxy configuration detected"
logger.warning(
"langchain-openai injected a custom httpx transport to apply "
"`http_socket_options`, which disables httpx's proxy "
"auto-detection (%s). Set "
"`LANGCHAIN_OPENAI_TCP_KEEPALIVE=0` or pass `http_socket_options=()` "
"to restore default proxy behavior, or supply `openai_proxy` / your "
"own `http_client` / `http_async_client` to take full control.",
source,
)
def _resolve_socket_options(
value: Sequence[SocketOption] | None,
) -> tuple[SocketOption, ...]:
"""Normalize the user-facing field to the tuple form builders expect.
- `None` => env-driven defaults (may itself be `()` if the user set
`LANGCHAIN_OPENAI_TCP_KEEPALIVE=0`). This path runs through
`_filter_supported()` inside `_default_socket_options()` because
the library-computed option set is aspirational and silent degradation
is the right posture.
- Any other sequence (including empty) => retupled for cache hashability.
An empty tuple is the explicit "disabled" signal. A non-empty sequence
is passed verbatim — **not** filtered. The user chose these options
explicitly, so an unsupported constant should surface as a clear
`OSError` at connect time, not be silently dropped.
Always returns a tuple — never `None` — so downstream signatures take
`tuple[SocketOption, ...]` with `()` as the single "no options" shape.
"""
if value is None:
return _default_socket_options()
return tuple(value)
class _SyncHttpxClientWrapper(openai.DefaultHttpxClient):
"""Borrowed from openai._base_client."""
@@ -47,43 +387,120 @@ class _AsyncHttpxClientWrapper(openai.DefaultAsyncHttpxClient):
def _build_sync_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _SyncHttpxClientWrapper:
return _SyncHttpxClientWrapper(
base_url=base_url
kwargs: dict[str, Any] = {
"base_url": base_url
or os.environ.get("OPENAI_BASE_URL")
or "https://api.openai.com/v1",
timeout=timeout,
)
"timeout": timeout,
}
if socket_options:
# httpx ignores limits= when transport= is provided; set it explicitly
# on the transport to avoid silently shrinking the connection pool.
kwargs["transport"] = httpx.HTTPTransport(
socket_options=list(socket_options),
limits=_DEFAULT_CONNECTION_LIMITS,
)
return _SyncHttpxClientWrapper(**kwargs)
def _build_async_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _AsyncHttpxClientWrapper:
return _AsyncHttpxClientWrapper(
base_url=base_url
kwargs: dict[str, Any] = {
"base_url": base_url
or os.environ.get("OPENAI_BASE_URL")
or "https://api.openai.com/v1",
timeout=timeout,
"timeout": timeout,
}
if socket_options:
# See _build_sync_httpx_client for the limits= rationale.
kwargs["transport"] = httpx.AsyncHTTPTransport(
socket_options=list(socket_options),
limits=_DEFAULT_CONNECTION_LIMITS,
)
return _AsyncHttpxClientWrapper(**kwargs)
def _build_proxied_sync_httpx_client(
proxy: str,
verify: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> httpx.Client:
"""httpx.Client for the openai_proxy code path.
When socket options are disabled (`()`), returns a plain
`httpx.Client(proxy=..., verify=...)` with no transport injected.
"""
if not socket_options:
return httpx.Client(proxy=proxy, verify=verify)
# Mount under `all://` (not `transport=`) so `Client._mounts` mirrors the
# shape produced by httpx's own `proxy=` path — a single-entry dict keyed
# by `URLPattern("all://")`. Callers (and the existing proxy integration
# test) reach into `_mounts` to introspect the proxy URL; a bare
# `transport=` leaves `_mounts` empty.
#
# `httpx.HTTPTransport(proxy=...)` is stricter about string coercion than
# `httpx.Client(proxy=...)`; wrap in the public `httpx.Proxy` type for
# version-stable behavior.
transport = httpx.HTTPTransport(
proxy=httpx.Proxy(proxy),
verify=verify,
socket_options=list(socket_options),
limits=_DEFAULT_CONNECTION_LIMITS,
)
return httpx.Client(mounts={"all://": transport})
def _build_proxied_async_httpx_client(
proxy: str,
verify: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> httpx.AsyncClient:
"""httpx.AsyncClient for the openai_proxy code path.
See `_build_proxied_sync_httpx_client` for the opt-out fallback,
the `mounts={"all://": ...}` shape, and the `httpx.Proxy` wrapping
rationale.
"""
if not socket_options:
return httpx.AsyncClient(proxy=proxy, verify=verify)
transport = httpx.AsyncHTTPTransport(
proxy=httpx.Proxy(proxy),
verify=verify,
socket_options=list(socket_options),
limits=_DEFAULT_CONNECTION_LIMITS,
)
return httpx.AsyncClient(mounts={"all://": transport})
@lru_cache
def _cached_sync_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _SyncHttpxClientWrapper:
return _build_sync_httpx_client(base_url, timeout)
return _build_sync_httpx_client(base_url, timeout, socket_options)
@lru_cache
def _cached_async_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _AsyncHttpxClientWrapper:
return _build_async_httpx_client(base_url, timeout)
return _build_async_httpx_client(base_url, timeout, socket_options)
def _get_default_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _SyncHttpxClientWrapper:
"""Get default httpx client.
@@ -92,13 +509,15 @@ def _get_default_httpx_client(
try:
hash(timeout)
except TypeError:
return _build_sync_httpx_client(base_url, timeout)
return _build_sync_httpx_client(base_url, timeout, socket_options)
else:
return _cached_sync_httpx_client(base_url, timeout)
return _cached_sync_httpx_client(base_url, timeout, socket_options)
def _get_default_async_httpx_client(
base_url: str | None, timeout: Any
base_url: str | None,
timeout: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> _AsyncHttpxClientWrapper:
"""Get default httpx client.
@@ -107,9 +526,9 @@ def _get_default_async_httpx_client(
try:
hash(timeout)
except TypeError:
return _build_async_httpx_client(base_url, timeout)
return _build_async_httpx_client(base_url, timeout, socket_options)
else:
return _cached_async_httpx_client(base_url, timeout)
return _cached_async_httpx_client(base_url, timeout, socket_options)
def _resolve_sync_and_async_api_keys(
@@ -140,3 +559,127 @@ def _resolve_sync_and_async_api_keys(
async_api_key_value = async_api_key_wrapper
return sync_api_key_value, async_api_key_value
T = TypeVar("T")
# On Python ≤3.10, asyncio.TimeoutError and builtins.TimeoutError are distinct
# hierarchies, so subclassing only asyncio.TimeoutError would not be caught by
# `except TimeoutError:`. On Python ≥3.11 they are the same object, so listing
# both bases would raise TypeError: duplicate base class. We resolve this at
# class-definition time.
_StreamChunkTimeoutBases: tuple[type, ...] = (
(asyncio.TimeoutError,)
if issubclass(asyncio.TimeoutError, TimeoutError)
else (asyncio.TimeoutError, TimeoutError)
)
class StreamChunkTimeoutError(*_StreamChunkTimeoutBases): # type: ignore[misc]
"""Raised when no streaming chunk arrives within `stream_chunk_timeout`.
`issubclass(StreamChunkTimeoutError, asyncio.TimeoutError)` and
`issubclass(StreamChunkTimeoutError, TimeoutError)` both hold on all
supported Python versions, so existing `except asyncio.TimeoutError:`
and `except TimeoutError:` handlers keep catching the exception. On
Python 3.11+ the two exceptions are the same object, so only
`asyncio.TimeoutError` appears in `__bases__`.
Structured attributes (`timeout_s`, `model_name`, `chunks_received`)
mirror the WARNING log's `extra=` payload so diagnostic code doesn't
need to regex the message.
"""
def __init__(
self,
timeout_s: float,
*,
model_name: str | None = None,
chunks_received: int = 0,
) -> None:
self.timeout_s = timeout_s
self.model_name = model_name
self.chunks_received = chunks_received
context = []
if model_name:
context.append(f"model={model_name}")
context.append(f"chunks_received={chunks_received}")
suffix = f" ({', '.join(context)})"
super().__init__(
f"No streaming chunk received for {timeout_s:.1f}s{suffix}. The "
f"connection may be alive at the TCP layer but is not producing "
f"content. Tune or disable via the `stream_chunk_timeout` "
f"constructor kwarg (set to None or 0 to disable) or the "
f"`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S` env var. See also "
f"`http_socket_options` for the kernel-level TCP timeout that "
f"catches dead TCP peers."
)
async def _astream_with_chunk_timeout(
source: AsyncIterator[T],
timeout: float | None,
*,
model_name: str | None = None,
) -> AsyncIterator[T]:
"""Yield from `source` but bound the per-chunk wait time.
If `timeout` is None or <=0, yields directly with no wall-clock bound.
Otherwise, each `__anext__` is wrapped in
`asyncio.wait_for(..., timeout)`. A timeout raises
`StreamChunkTimeoutError` (a `TimeoutError` subclass) whose message
names the knob, the env-var override, the model, and how many chunks
were received before the stall. A single-line structured log also
fires at WARNING so the signal is visible in aggregate logging systems
even when the exception is caught upstream.
When the timeout is active, the source iterator is explicitly
`aclose()`-d on early exit (timeout, consumer break, any exception) so
the underlying httpx streaming connection is released promptly. The
pass-through branch (timeout disabled) relies on httpx's GC-driven
cleanup instead — matching the behavior of unwrapped streams.
"""
if not timeout or timeout <= 0:
async for item in source:
yield item
return
chunks_received = 0
it = source.__aiter__()
try:
while True:
try:
chunk = await asyncio.wait_for(it.__anext__(), timeout=timeout)
except StopAsyncIteration:
return
except asyncio.TimeoutError as e:
logger.warning(
"langchain_openai.stream_chunk_timeout fired",
extra={
"source": "stream_chunk_timeout",
"timeout_s": timeout,
"model_name": model_name,
"chunks_received": chunks_received,
},
)
raise StreamChunkTimeoutError(
timeout,
model_name=model_name,
chunks_received=chunks_received,
) from e
chunks_received += 1
yield chunk
finally:
aclose = getattr(it, "aclose", None)
if aclose is not None:
try:
await aclose()
except Exception as cleanup_exc:
# Best-effort cleanup; don't mask the original exception,
# but leave a DEBUG trace so pool/transport bugs stay
# discoverable at the right log level.
logger.debug(
"aclose() during _astream_with_chunk_timeout cleanup "
"raised; ignoring",
exc_info=cleanup_exc,
)

View File

@@ -124,15 +124,24 @@ from pydantic import (
Field,
SecretStr,
ValidationError,
field_validator,
model_validator,
)
from pydantic.v1 import BaseModel as BaseModelV1
from typing_extensions import Self
from langchain_openai.chat_models._client_utils import (
_astream_with_chunk_timeout,
_build_proxied_async_httpx_client,
_build_proxied_sync_httpx_client,
_float_env,
_get_default_async_httpx_client,
_get_default_httpx_client,
_log_proxy_env_bypass_once,
_resolve_socket_options,
_resolve_sync_and_async_api_keys,
_should_bypass_socket_options_for_proxy_env,
_warn_if_proxy_env_shadowed,
)
from langchain_openai.chat_models._compat import (
_convert_from_v1_to_chat_completions,
@@ -790,6 +799,113 @@ class BaseChatOpenAI(BaseChatModel):
like a custom client for sync invocations.
"""
http_socket_options: Sequence[tuple[int, int, int]] | None = Field(
default=None, exclude=True
)
"""TCP socket options applied to the httpx transports built by this instance.
Defaults to a conservative TCP-keepalive + `TCP_USER_TIMEOUT` profile that
targets a ~2-minute bound on silent connection hangs (silent mid-stream peer
loss, gVisor/NAT idle timeouts, silent TCP black holes) on platforms that
support the full option set. On platforms that only support a subset
(macOS without `TCP_USER_TIMEOUT`, Windows with only `SO_KEEPALIVE`,
minimal kernels), unsupported options are silently dropped and the bound
degrades to whatever the remaining options + OS defaults provide — still
better than indefinite hang.
Accepted values:
- `None` (default): use env-driven defaults. Matches the "unset" convention
used by `http_client` elsewhere on this class.
- `()` (empty): disable socket-option injection entirely. Inherits the OS
defaults and restores httpx's native env-proxy auto-detection.
- A non-empty sequence of `(level, option, value)` tuples: explicit
override; passed verbatim to the transport (not filtered). Unsupported
options raise `OSError` at connect time rather than being silently
dropped — the user chose them explicitly.
Environment variables (only consulted when this field is `None`):
`LANGCHAIN_OPENAI_TCP_KEEPALIVE` (set to `0` to disable entirely — the
kill-switch), `LANGCHAIN_OPENAI_TCP_KEEPIDLE`,
`LANGCHAIN_OPENAI_TCP_KEEPINTVL`, `LANGCHAIN_OPENAI_TCP_KEEPCNT`,
`LANGCHAIN_OPENAI_TCP_USER_TIMEOUT_MS`.
Applied per side: if `http_client` is supplied, the sync path uses
that user-owned client's socket options as-is; the async path still
gets `http_socket_options` applied to its default builder (and
vice-versa for `http_async_client`). Supply both to take full control.
!!! note "Interaction with env-proxy auto-detection"
When a custom `httpx` transport is active, `httpx` disables its
native env-proxy auto-detection (`HTTP_PROXY` / `HTTPS_PROXY` /
`ALL_PROXY` / `NO_PROXY` and macOS/Windows system proxy settings).
To keep the default shape safe, `ChatOpenAI` detects the
"proxy-env-shadow" pattern and **skips the custom transport
entirely** when **all** of the following hold:
- `http_socket_options` is left at its default (`None`)
- No `http_client` or `http_async_client` supplied
- No `openai_proxy` supplied
- A proxy env var or system proxy is visible to httpx
On that specific shape, the instance falls back to pre-PR behavior
and httpx's env-proxy auto-detection applies (a one-time `INFO` log
records the bypass for observability).
If you explicitly set `http_socket_options=[...]` while a proxy
env var is also set, no bypass — you opted into the transport, and
a one-time `WARNING` records the shadowing. Set
`http_socket_options=()` or `LANGCHAIN_OPENAI_TCP_KEEPALIVE=0` to
disable transport injection explicitly, or pass a fully-configured
`http_async_client` / `http_client` to take full control. The
`openai_proxy` constructor kwarg is unaffected — socket options
are applied cleanly through the proxied transport on that path.
"""
stream_chunk_timeout: float | None = Field(
default_factory=lambda: _float_env(
"LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", 120.0
),
exclude=True,
)
"""Per-chunk wall-clock timeout (seconds) on async streaming responses.
Applies to async invocations only (`astream`, `ainvoke` with streaming,
etc.). Sync streaming (`stream`) is not affected.
Fires between content chunks yielded by the openai SDK's streaming iterator
(i.e., each call to `__anext__` on the response). Crucially, this is
**not** the same as httpx's `timeout.read`:
- httpx's read timeout is inter-byte and gets reset every time *any* bytes
arrive on the socket — including OpenAI's SSE keepalive comments
(`: keepalive`) that trickle down during long model generations. A
stream that's silent on *content* but still producing keepalives looks
alive forever to httpx.
- `stream_chunk_timeout` measures the gap between *parsed chunks*. The
openai SDK's SSE parser consumes keepalive comments internally and does
not emit them as chunks, so keepalives do *not* reset this timer. It
fires on genuine content silence.
When it fires, a `StreamChunkTimeoutError`
(subclass of `asyncio.TimeoutError`) is raised with a self-describing
message naming this knob, the env-var override, the model, and the
number of chunks received before the stall. A WARNING log with
`extra={"source": "stream_chunk_timeout", "timeout_s": <value>,
"model_name": <value>, "chunks_received": <value>}` also fires so
aggregate logging can distinguish app-layer timeouts from
transport-layer failures.
Defaults to 120s. Set to `None` or `0` to disable. Overridable via the
`LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S` env var. Negative values
(from either the env var or the constructor kwarg — e.g., hydrated
from YAML/JSON configs) fall back to the default with a `WARNING` log
rather than silently disabling the wrapper, so a misconfigured value
still boots safely and the fallback is visible.
"""
stop: list[str] | str | None = Field(default=None, alias="stop_sequences")
"""Default stop sequences."""
@@ -953,6 +1069,27 @@ class BaseChatOpenAI(BaseChatModel):
all_required_field_names = get_pydantic_field_names(cls)
return _build_model_kwargs(values, all_required_field_names)
@field_validator("stream_chunk_timeout", mode="after")
@classmethod
def _validate_stream_chunk_timeout(cls, value: float | None) -> float | None:
"""Reject negative constructor values; fall back to the env-driven default.
Matches the env-var path in `_float_env`: a negative value is a typo,
not an opt-out (`None`/`0` are the documented off switches). Configs
hydrated from YAML/JSON would otherwise silently disable the wrapper
and reintroduce the indefinite-stream hang the feature prevents.
"""
if value is not None and value < 0:
fallback = _float_env("LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S", 120.0)
logger.warning(
"Invalid `stream_chunk_timeout=%r` (negative); "
"falling back to %s. Pass `None` or `0` to disable.",
value,
fallback,
)
return fallback
return value
@model_validator(mode="before")
@classmethod
def validate_temperature(cls, values: dict[str, Any]) -> Any:
@@ -1055,6 +1192,23 @@ class BaseChatOpenAI(BaseChatModel):
f"{openai_proxy=}\n{http_client=}\n{http_async_client=}"
)
raise ValueError(msg)
if _should_bypass_socket_options_for_proxy_env(
http_socket_options=self.http_socket_options,
http_client=self.http_client,
http_async_client=self.http_async_client,
openai_proxy=self.openai_proxy,
):
# Default-shape construction + proxy env var visible to httpx:
# skip the custom transport so httpx's env-proxy auto-detection
# still applies. Users who want kernel-level TCP tuning alongside
# an env proxy can opt in explicitly via `http_socket_options`.
resolved_socket_options: tuple[tuple[int, int, int], ...] = ()
_log_proxy_env_bypass_once()
else:
resolved_socket_options = _resolve_socket_options(self.http_socket_options)
_warn_if_proxy_env_shadowed(
resolved_socket_options, openai_proxy=self.openai_proxy
)
if not self.client:
if sync_api_key_value is None:
# No valid sync API key, leave client as None and raise informative
@@ -1063,21 +1217,17 @@ class BaseChatOpenAI(BaseChatModel):
self.root_client = None
else:
if self.openai_proxy and not self.http_client:
try:
import httpx
except ImportError as e:
msg = (
"Could not import httpx python package. "
"Please install it with `pip install httpx`."
)
raise ImportError(msg) from e
self.http_client = httpx.Client(
proxy=self.openai_proxy, verify=global_ssl_context
self.http_client = _build_proxied_sync_httpx_client(
proxy=self.openai_proxy,
verify=global_ssl_context,
socket_options=resolved_socket_options,
)
sync_specific = {
"http_client": self.http_client
or _get_default_httpx_client(
self.openai_api_base, self.request_timeout
self.openai_api_base,
self.request_timeout,
resolved_socket_options,
),
"api_key": sync_api_key_value,
}
@@ -1085,21 +1235,17 @@ class BaseChatOpenAI(BaseChatModel):
self.client = self.root_client.chat.completions
if not self.async_client:
if self.openai_proxy and not self.http_async_client:
try:
import httpx
except ImportError as e:
msg = (
"Could not import httpx python package. "
"Please install it with `pip install httpx`."
)
raise ImportError(msg) from e
self.http_async_client = httpx.AsyncClient(
proxy=self.openai_proxy, verify=global_ssl_context
self.http_async_client = _build_proxied_async_httpx_client(
proxy=self.openai_proxy,
verify=global_ssl_context,
socket_options=resolved_socket_options,
)
async_specific = {
"http_client": self.http_async_client
or _get_default_async_httpx_client(
self.openai_api_base, self.request_timeout
self.openai_api_base,
self.request_timeout,
resolved_socket_options,
),
"api_key": async_api_key_value,
}
@@ -1333,7 +1479,11 @@ class BaseChatOpenAI(BaseChatModel):
current_output_index = -1
current_sub_index = -1
has_reasoning = False
async for chunk in response:
async for chunk in _astream_with_chunk_timeout(
response,
self.stream_chunk_timeout,
model_name=self.model_name,
):
metadata = headers if is_first_chunk else {}
(
current_index,
@@ -1684,7 +1834,11 @@ class BaseChatOpenAI(BaseChatModel):
context_manager = response
async with context_manager as response:
is_first_chunk = True
async for chunk in response:
async for chunk in _astream_with_chunk_timeout(
response,
self.stream_chunk_timeout,
model_name=self.model_name,
):
if not isinstance(chunk, dict):
chunk = chunk.model_dump()
generation_chunk = self._convert_chunk_to_generation_chunk(