diff --git a/libs/partners/openai/langchain_openai/chat_models/_client_utils.py b/libs/partners/openai/langchain_openai/chat_models/_client_utils.py index 229bdf5de9f..20120f6076f 100644 --- a/libs/partners/openai/langchain_openai/chat_models/_client_utils.py +++ b/libs/partners/openai/langchain_openai/chat_models/_client_utils.py @@ -26,11 +26,7 @@ logger = logging.getLogger(__name__) SocketOption = tuple[int, int, int] -# Linux TCP-level constants. `socket.TCP_KEEPIDLE` etc. exist on Linux builds -# of Python but not on darwin/win32 builds, so we reference the raw protocol -# numbers directly to keep the module import-safe across platforms. These -# values are the Linux kernel UAPI constants from `linux/tcp.h` and have been -# stable for over a decade. +# socket.TCP_KEEPIDLE etc. are absent on darwin/win32; use raw UAPI constants. _LINUX_TCP_KEEPIDLE = 4 _LINUX_TCP_KEEPINTVL = 5 _LINUX_TCP_KEEPCNT = 6 @@ -41,15 +37,8 @@ _DARWIN_TCP_KEEPALIVE = 0x10 # idle seconds before first probe _DARWIN_TCP_KEEPINTVL = 0x101 _DARWIN_TCP_KEEPCNT = 0x102 -# Mirrors openai._constants.DEFAULT_CONNECTION_LIMITS as of openai 2.x. -# The openai SDK writes ``httpx.Limits(max_connections=1000, -# max_keepalive_connections=100)`` without an explicit keepalive_expiry, -# which resolves to httpx's default of 5.0s. We mirror that explicitly here -# so that enabling or disabling the kill-switch (LANGCHAIN_OPENAI_TCP_KEEPALIVE=0) -# never silently changes connection pool retention relative to the pre-PR -# openai SDK default. Hardcoded rather than imported to avoid depending on -# an internal module path that moves across SDK versions. If upstream -# changes, revisit here. +# Mirrors the openai SDK's pool defaults. Hardcoded to avoid depending on +# an internal module path (openai._constants) that can move across SDK versions. _DEFAULT_CONNECTION_LIMITS = httpx.Limits( max_connections=1000, max_keepalive_connections=100, @@ -207,14 +196,8 @@ def _build_sync_httpx_client( "timeout": timeout, } if socket_options: - # Client ignores ``limits=`` when a transport is provided, so configure - # limits explicitly on the transport, mirroring the openai SDK's pool - # config to avoid a silent 10x downgrade. - # NB: passing ``transport=`` also disables httpx's env-proxy - # auto-detection (HTTP_PROXY/HTTPS_PROXY/...). This is a documented - # known limitation; users relying on env proxies for OpenAI traffic - # can opt out via LANGCHAIN_OPENAI_TCP_KEEPALIVE=0 or bring their - # own http_client. + # httpx ignores limits= when transport= is provided; set it explicitly + # on the transport to avoid silently shrinking the connection pool. kwargs["transport"] = httpx.HTTPTransport( socket_options=list(socket_options), limits=_DEFAULT_CONNECTION_LIMITS, @@ -234,8 +217,7 @@ def _build_async_httpx_client( "timeout": timeout, } if socket_options: - # See _build_sync_httpx_client for the rationale on the explicit - # limits= and the env-proxy-detection caveat. + # See _build_sync_httpx_client for the limits= rationale. kwargs["transport"] = httpx.AsyncHTTPTransport( socket_options=list(socket_options), limits=_DEFAULT_CONNECTION_LIMITS, @@ -248,12 +230,10 @@ def _build_proxied_sync_httpx_client( verify: Any, socket_options: tuple[SocketOption, ...] = (), ) -> httpx.Client: - """httpx.Client for the openai_proxy code path, with P2 applied. + """httpx.Client for the openai_proxy code path. - When the user has explicitly disabled socket options (``()``), we return - the original ``httpx.Client(proxy=..., verify=...)`` shape — no transport, - no ``_DEFAULT_CONNECTION_LIMITS`` — so the opt-out is a *strict no-op* on - the proxy path and pre-PR behavior is byte-identical. + When socket options are disabled (``()``), returns a plain + ``httpx.Client(proxy=..., verify=...)`` with no transport injected. """ if not socket_options: return httpx.Client(proxy=proxy, verify=verify) @@ -274,10 +254,10 @@ def _build_proxied_async_httpx_client( verify: Any, socket_options: tuple[SocketOption, ...] = (), ) -> httpx.AsyncClient: - """httpx.AsyncClient for the openai_proxy code path, with P2 applied. + """httpx.AsyncClient for the openai_proxy code path. - See :func:`_build_proxied_sync_httpx_client` for rationale on the opt-out - fallback and the ``httpx.Proxy`` wrapping. + See :func:`_build_proxied_sync_httpx_client` for the opt-out fallback + and the ``httpx.Proxy`` wrapping rationale. """ if not socket_options: return httpx.AsyncClient(proxy=proxy, verify=verify) @@ -372,20 +352,26 @@ def _resolve_sync_and_async_api_keys( return sync_api_key_value, async_api_key_value -# --------------------------------------------------------------------------- -# P1 — per-chunk wall-clock timeout for async streaming iterators. -# --------------------------------------------------------------------------- - T = TypeVar("T") +# On Python ≤3.10, asyncio.TimeoutError and builtins.TimeoutError are distinct +# hierarchies, so subclassing only asyncio.TimeoutError would not be caught by +# ``except TimeoutError:``. On Python ≥3.11 they are the same object, so listing +# both bases would raise TypeError: duplicate base class. We resolve this at +# class-definition time. +_StreamChunkTimeoutBases: tuple[type, ...] = ( + (asyncio.TimeoutError,) + if issubclass(asyncio.TimeoutError, TimeoutError) + else (asyncio.TimeoutError, TimeoutError) +) -class StreamChunkTimeoutError(asyncio.TimeoutError): + +class StreamChunkTimeoutError(*_StreamChunkTimeoutBases): # type: ignore[misc] """Raised when no streaming chunk arrives within ``stream_chunk_timeout``. - Subclasses ``asyncio.TimeoutError`` so existing ``except TimeoutError:`` - handlers keep working, but carries a structured, self-describing message - so the first thing an operator sees in logs names the knob and how to - tune it. + Subclasses both ``asyncio.TimeoutError`` and ``TimeoutError`` on all + supported Python versions, so both ``except asyncio.TimeoutError:`` and + ``except TimeoutError:`` handlers keep working. """ @@ -408,10 +394,8 @@ async def _astream_with_chunk_timeout( connection is released promptly rather than left dangling. """ if not timeout or timeout <= 0: - # Fast path: no wall-clock bound. We don't wrap this in try/finally - # to aclose() the source — if a consumer breaks early, source falls - # back on httpx's GC-driven cleanup, which is pre-existing behavior - # and out of scope for this PR. + # No wall-clock bound. No try/finally here — a consumer breaking early + # falls back on httpx's GC-driven cleanup, same as before this wrapper. async for item in source: yield item return diff --git a/libs/partners/openai/tests/unit_tests/chat_models/test_client_utils.py b/libs/partners/openai/tests/unit_tests/chat_models/test_client_utils.py index e53aaafc89c..62e5345e31a 100644 --- a/libs/partners/openai/tests/unit_tests/chat_models/test_client_utils.py +++ b/libs/partners/openai/tests/unit_tests/chat_models/test_client_utils.py @@ -1,12 +1,9 @@ """Unit tests for ``langchain_openai.chat_models._client_utils``. -These tests assert P2 (socket options) plumbing at the boundary between -our helpers and the httpx layer — *not* on httpx internals. The behavioural -proof that these options actually bound silent connection hangs on -Linux/gVisor lives outside CI (see the Metaview investigation scripts); -here we lock the wiring, the env-driven defaults, the `()` kill-switch -contract, and the precedence between constructor kwargs / env vars / -user-supplied clients. +Asserts socket-options plumbing at the boundary between our helpers and the +httpx layer — not on httpx internals. Locks the wiring, env-driven defaults, +the ``()`` kill-switch contract, and the precedence between constructor kwargs, +env vars, and user-supplied clients. """ from __future__ import annotations @@ -34,11 +31,6 @@ def _clear_langchain_openai_env(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("OPENAI_API_KEY", "sk-test") -# --------------------------------------------------------------------------- -# _default_socket_options / _filter_supported -# --------------------------------------------------------------------------- - - @pytest.mark.skipif( __import__("sys").platform != "linux", reason="Default option set is platform-specific; Linux values asserted here.", @@ -87,11 +79,6 @@ def test_filter_supported_drops_unsupported() -> None: assert bogus not in result -# --------------------------------------------------------------------------- -# _build_{sync,async}_httpx_client boundary kwargs -# --------------------------------------------------------------------------- - - def test_build_async_httpx_client_boundary_kwargs( monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -152,11 +139,6 @@ def test_build_async_httpx_client_transport_carries_socket_options( assert kwargs.get("limits") is _client_utils._DEFAULT_CONNECTION_LIMITS -# --------------------------------------------------------------------------- -# BaseChatOpenAI wiring — None / () / populated trichotomy -# --------------------------------------------------------------------------- - - def test_http_socket_options_none_vs_empty_tuple_vs_populated( monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -203,7 +185,6 @@ def test_http_socket_options_none_vs_empty_tuple_vs_populated( assert recorded, "expected a default-client build" _, _, opts1 = recorded[-1] assert isinstance(opts1, tuple) - # Env-driven defaults should be populated on linux/darwin runners. # (2) Explicit empty tuple -> (). recorded.clear() @@ -225,7 +206,7 @@ def test_http_socket_options_none_vs_empty_tuple_vs_populated( def test_openai_proxy_branch_applies_socket_options( monkeypatch: pytest.MonkeyPatch, ) -> None: - """``openai_proxy`` path must go through the P2-aware proxied helper.""" + """``openai_proxy`` path must go through the socket-options-aware proxied helper.""" recorded: list[dict[str, Any]] = [] def spy(proxy: str, verify: Any, socket_options: tuple = ()) -> httpx.AsyncClient: @@ -315,7 +296,7 @@ def test_default_path_opt_out_is_strict_noop( """With LANGCHAIN_OPENAI_TCP_KEEPALIVE=0 we inject no transport. Boundary assertion on ``_AsyncHttpxClientWrapper.__init__`` kwargs — our - helper passed nothing, so httpx falls back to its own native behaviour + helper passed nothing, so httpx falls back to its own native behavior (env-proxy handling, pool defaults, trust_env, etc.) completely unaffected by this library. """ @@ -346,11 +327,6 @@ def test_default_path_opt_out_is_strict_noop( assert "transport" not in recorded_async[-1] -# --------------------------------------------------------------------------- -# Env-var parse safety -# --------------------------------------------------------------------------- - - def test_invalid_env_values_degrade_safely(monkeypatch: pytest.MonkeyPatch) -> None: """Garbage in LANGCHAIN_OPENAI_TCP_* env vars must not crash model init.""" monkeypatch.setenv("LANGCHAIN_OPENAI_TCP_KEEPIDLE", "not-an-int") diff --git a/libs/partners/openai/tests/unit_tests/chat_models/test_stream_chunk_timeout.py b/libs/partners/openai/tests/unit_tests/chat_models/test_stream_chunk_timeout.py index 7a7c85d6fb7..7bf71eddc01 100644 --- a/libs/partners/openai/tests/unit_tests/chat_models/test_stream_chunk_timeout.py +++ b/libs/partners/openai/tests/unit_tests/chat_models/test_stream_chunk_timeout.py @@ -1,13 +1,12 @@ """Unit tests for ``_astream_with_chunk_timeout`` and ``StreamChunkTimeoutError``. -Covers the P1 wall-clock per-chunk timeout wrapper: - - Pass-through when items arrive in time. - Timeout fires with a self-describing message + subclasses TimeoutError. - Structured WARNING log carries ``source=stream_chunk_timeout`` + - ``timeout_s`` so aggregate logging can split P1 from P2 failures. -- Source iterator's ``aclose()`` is called on early exit (regression test - for the connection-leak bug). + ``timeout_s`` so aggregate logging can distinguish app-layer from + transport-layer timeouts. +- Source iterator's ``aclose()`` is called on early exit to release the + underlying httpx connection promptly. - Garbage in ``LANGCHAIN_OPENAI_STREAM_CHUNK_TIMEOUT_S`` degrades safely. """ @@ -121,11 +120,9 @@ async def test_astream_with_chunk_timeout_logs_on_fire( @pytest.mark.asyncio async def test_astream_with_chunk_timeout_closes_source_on_early_exit() -> None: - """Regression test for the connection-leak bug. + """aclose() is called on early exit so the httpx connection is released promptly. - If the timeout fires, we must ``aclose()`` the underlying iterator so the - httpx streaming connection is released promptly; ditto if a consumer - explicitly closes our wrapper. + Covers both the timeout-fires path and the consumer-closes-wrapper path. """ # Case 1: timeout fires -> aclose() propagates. timed_out_source = _FakeSource(["a"], per_item_sleep=0.2)