catch StreamChunkTimeoutError as TimeoutError on Python ≤3.10, clean up comments/docstrings

This commit is contained in:
David Asamu
2026-04-22 17:05:56 +01:00
parent 9cf67f7837
commit 02d8b5585c
3 changed files with 41 additions and 84 deletions

View File

@@ -26,11 +26,7 @@ logger = logging.getLogger(__name__)
SocketOption = tuple[int, int, int]
# Linux TCP-level constants. `socket.TCP_KEEPIDLE` etc. exist on Linux builds
# of Python but not on darwin/win32 builds, so we reference the raw protocol
# numbers directly to keep the module import-safe across platforms. These
# values are the Linux kernel UAPI constants from `linux/tcp.h` and have been
# stable for over a decade.
# socket.TCP_KEEPIDLE etc. are absent on darwin/win32; use raw UAPI constants.
_LINUX_TCP_KEEPIDLE = 4
_LINUX_TCP_KEEPINTVL = 5
_LINUX_TCP_KEEPCNT = 6
@@ -41,15 +37,8 @@ _DARWIN_TCP_KEEPALIVE = 0x10 # idle seconds before first probe
_DARWIN_TCP_KEEPINTVL = 0x101
_DARWIN_TCP_KEEPCNT = 0x102
# Mirrors openai._constants.DEFAULT_CONNECTION_LIMITS as of openai 2.x.
# The openai SDK writes ``httpx.Limits(max_connections=1000,
# max_keepalive_connections=100)`` without an explicit keepalive_expiry,
# which resolves to httpx's default of 5.0s. We mirror that explicitly here
# so that enabling or disabling the kill-switch (LANGCHAIN_OPENAI_TCP_KEEPALIVE=0)
# never silently changes connection pool retention relative to the pre-PR
# openai SDK default. Hardcoded rather than imported to avoid depending on
# an internal module path that moves across SDK versions. If upstream
# changes, revisit here.
# Mirrors the openai SDK's pool defaults. Hardcoded to avoid depending on
# an internal module path (openai._constants) that can move across SDK versions.
_DEFAULT_CONNECTION_LIMITS = httpx.Limits(
max_connections=1000,
max_keepalive_connections=100,
@@ -207,14 +196,8 @@ def _build_sync_httpx_client(
"timeout": timeout,
}
if socket_options:
# Client ignores ``limits=`` when a transport is provided, so configure
# limits explicitly on the transport, mirroring the openai SDK's pool
# config to avoid a silent 10x downgrade.
# NB: passing ``transport=`` also disables httpx's env-proxy
# auto-detection (HTTP_PROXY/HTTPS_PROXY/...). This is a documented
# known limitation; users relying on env proxies for OpenAI traffic
# can opt out via LANGCHAIN_OPENAI_TCP_KEEPALIVE=0 or bring their
# own http_client.
# httpx ignores limits= when transport= is provided; set it explicitly
# on the transport to avoid silently shrinking the connection pool.
kwargs["transport"] = httpx.HTTPTransport(
socket_options=list(socket_options),
limits=_DEFAULT_CONNECTION_LIMITS,
@@ -234,8 +217,7 @@ def _build_async_httpx_client(
"timeout": timeout,
}
if socket_options:
# See _build_sync_httpx_client for the rationale on the explicit
# limits= and the env-proxy-detection caveat.
# See _build_sync_httpx_client for the limits= rationale.
kwargs["transport"] = httpx.AsyncHTTPTransport(
socket_options=list(socket_options),
limits=_DEFAULT_CONNECTION_LIMITS,
@@ -248,12 +230,10 @@ def _build_proxied_sync_httpx_client(
verify: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> httpx.Client:
"""httpx.Client for the openai_proxy code path, with P2 applied.
"""httpx.Client for the openai_proxy code path.
When the user has explicitly disabled socket options (``()``), we return
the original ``httpx.Client(proxy=..., verify=...)`` shape — no transport,
no ``_DEFAULT_CONNECTION_LIMITS`` — so the opt-out is a *strict no-op* on
the proxy path and pre-PR behavior is byte-identical.
When socket options are disabled (``()``), returns a plain
``httpx.Client(proxy=..., verify=...)`` with no transport injected.
"""
if not socket_options:
return httpx.Client(proxy=proxy, verify=verify)
@@ -274,10 +254,10 @@ def _build_proxied_async_httpx_client(
verify: Any,
socket_options: tuple[SocketOption, ...] = (),
) -> httpx.AsyncClient:
"""httpx.AsyncClient for the openai_proxy code path, with P2 applied.
"""httpx.AsyncClient for the openai_proxy code path.
See :func:`_build_proxied_sync_httpx_client` for rationale on the opt-out
fallback and the ``httpx.Proxy`` wrapping.
See :func:`_build_proxied_sync_httpx_client` for the opt-out fallback
and the ``httpx.Proxy`` wrapping rationale.
"""
if not socket_options:
return httpx.AsyncClient(proxy=proxy, verify=verify)
@@ -372,20 +352,26 @@ def _resolve_sync_and_async_api_keys(
return sync_api_key_value, async_api_key_value
# ---------------------------------------------------------------------------
# P1 — per-chunk wall-clock timeout for async streaming iterators.
# ---------------------------------------------------------------------------
T = TypeVar("T")
# On Python ≤3.10, asyncio.TimeoutError and builtins.TimeoutError are distinct
# hierarchies, so subclassing only asyncio.TimeoutError would not be caught by
# ``except TimeoutError:``. On Python ≥3.11 they are the same object, so listing
# both bases would raise TypeError: duplicate base class. We resolve this at
# class-definition time.
_StreamChunkTimeoutBases: tuple[type, ...] = (
(asyncio.TimeoutError,)
if issubclass(asyncio.TimeoutError, TimeoutError)
else (asyncio.TimeoutError, TimeoutError)
)
class StreamChunkTimeoutError(asyncio.TimeoutError):
class StreamChunkTimeoutError(*_StreamChunkTimeoutBases): # type: ignore[misc]
"""Raised when no streaming chunk arrives within ``stream_chunk_timeout``.
Subclasses ``asyncio.TimeoutError`` so existing ``except TimeoutError:``
handlers keep working, but carries a structured, self-describing message
so the first thing an operator sees in logs names the knob and how to
tune it.
Subclasses both ``asyncio.TimeoutError`` and ``TimeoutError`` on all
supported Python versions, so both ``except asyncio.TimeoutError:`` and
``except TimeoutError:`` handlers keep working.
"""
@@ -408,10 +394,8 @@ async def _astream_with_chunk_timeout(
connection is released promptly rather than left dangling.
"""
if not timeout or timeout <= 0:
# Fast path: no wall-clock bound. We don't wrap this in try/finally
# to aclose() the source — if a consumer breaks early, source falls
# back on httpx's GC-driven cleanup, which is pre-existing behavior
# and out of scope for this PR.
# No wall-clock bound. No try/finally here — a consumer breaking early
# falls back on httpx's GC-driven cleanup, same as before this wrapper.
async for item in source:
yield item
return

View File

@@ -1,12 +1,9 @@
"""Unit tests for ``langchain_openai.chat_models._client_utils``.
These tests assert P2 (socket options) plumbing at the boundary between
our helpers and the httpx layer — *not* on httpx internals. The behavioural
proof that these options actually bound silent connection hangs on
Linux/gVisor lives outside CI (see the Metaview investigation scripts);
here we lock the wiring, the env-driven defaults, the `()` kill-switch
contract, and the precedence between constructor kwargs / env vars /
user-supplied clients.
Asserts socket-options plumbing at the boundary between our helpers and the
httpx layer — not on httpx internals. Locks the wiring, env-driven defaults,
the ``()`` kill-switch contract, and the precedence between constructor kwargs,
env vars, and user-supplied clients.
"""
from __future__ import annotations
@@ -34,11 +31,6 @@ def _clear_langchain_openai_env(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
# ---------------------------------------------------------------------------
# _default_socket_options / _filter_supported
# ---------------------------------------------------------------------------
@pytest.mark.skipif(
__import__("sys").platform != "linux",
reason="Default option set is platform-specific; Linux values asserted here.",
@@ -87,11 +79,6 @@ def test_filter_supported_drops_unsupported() -> None:
assert bogus not in result
# ---------------------------------------------------------------------------
# _build_{sync,async}_httpx_client boundary kwargs
# ---------------------------------------------------------------------------
def test_build_async_httpx_client_boundary_kwargs(
monkeypatch: pytest.MonkeyPatch,
) -> None:
@@ -152,11 +139,6 @@ def test_build_async_httpx_client_transport_carries_socket_options(
assert kwargs.get("limits") is _client_utils._DEFAULT_CONNECTION_LIMITS
# ---------------------------------------------------------------------------
# BaseChatOpenAI wiring — None / () / populated trichotomy
# ---------------------------------------------------------------------------
def test_http_socket_options_none_vs_empty_tuple_vs_populated(
monkeypatch: pytest.MonkeyPatch,
) -> None:
@@ -203,7 +185,6 @@ def test_http_socket_options_none_vs_empty_tuple_vs_populated(
assert recorded, "expected a default-client build"
_, _, opts1 = recorded[-1]
assert isinstance(opts1, tuple)
# Env-driven defaults should be populated on linux/darwin runners.
# (2) Explicit empty tuple -> ().
recorded.clear()
@@ -225,7 +206,7 @@ def test_http_socket_options_none_vs_empty_tuple_vs_populated(
def test_openai_proxy_branch_applies_socket_options(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""``openai_proxy`` path must go through the P2-aware proxied helper."""
"""``openai_proxy`` path must go through the socket-options-aware proxied helper."""
recorded: list[dict[str, Any]] = []
def spy(proxy: str, verify: Any, socket_options: tuple = ()) -> httpx.AsyncClient:
@@ -315,7 +296,7 @@ def test_default_path_opt_out_is_strict_noop(
"""With LANGCHAIN_OPENAI_TCP_KEEPALIVE=0 we inject no transport.
Boundary assertion on ``_AsyncHttpxClientWrapper.__init__`` kwargs — our
helper passed nothing, so httpx falls back to its own native behaviour
helper passed nothing, so httpx falls back to its own native behavior
(env-proxy handling, pool defaults, trust_env, etc.) completely
unaffected by this library.
"""
@@ -346,11 +327,6 @@ def test_default_path_opt_out_is_strict_noop(
assert "transport" not in recorded_async[-1]
# ---------------------------------------------------------------------------
# Env-var parse safety
# ---------------------------------------------------------------------------
def test_invalid_env_values_degrade_safely(monkeypatch: pytest.MonkeyPatch) -> None:
"""Garbage in LANGCHAIN_OPENAI_TCP_* env vars must not crash model init."""
monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPIDLE", "not-an-int")

View File

@@ -1,13 +1,12 @@
"""Unit tests for ``_astream_with_chunk_timeout`` and ``StreamChunkTimeoutError``.
Covers the P1 wall-clock per-chunk timeout wrapper:
- Pass-through when items arrive in time.
- Timeout fires with a self-describing message + subclasses TimeoutError.
- Structured WARNING log carries ``source=stream_chunk_timeout`` +
``timeout_s`` so aggregate logging can split P1 from P2 failures.
- Source iterator's ``aclose()`` is called on early exit (regression test
for the connection-leak bug).
``timeout_s`` so aggregate logging can distinguish app-layer from
transport-layer timeouts.
- Source iterator's ``aclose()`` is called on early exit to release the
underlying httpx connection promptly.
- Garbage in ``LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S`` degrades safely.
"""
@@ -121,11 +120,9 @@ async def test_astream_with_chunk_timeout_logs_on_fire(
@pytest.mark.asyncio
async def test_astream_with_chunk_timeout_closes_source_on_early_exit() -> None:
"""Regression test for the connection-leak bug.
"""aclose() is called on early exit so the httpx connection is released promptly.
If the timeout fires, we must ``aclose()`` the underlying iterator so the
httpx streaming connection is released promptly; ditto if a consumer
explicitly closes our wrapper.
Covers both the timeout-fires path and the consumer-closes-wrapper path.
"""
# Case 1: timeout fires -> aclose() propagates.
timed_out_source = _FakeSource(["a"], per_item_sleep=0.2)