diff --git a/libs/core/langchain_core/_security/_policy.py b/libs/core/langchain_core/_security/_policy.py index 0440918bbc5..79cf437c899 100644 --- a/libs/core/langchain_core/_security/_policy.py +++ b/libs/core/langchain_core/_security/_policy.py @@ -50,13 +50,23 @@ _BLOCKED_IPV6_NETWORKS: tuple[ipaddress.IPv6Network, ...] = tuple( _CLOUD_METADATA_IPS: frozenset[str] = frozenset( { - "169.254.169.254", - "169.254.170.2", - "100.100.100.200", - "fd00:ec2::254", + "169.254.169.254", # AWS, GCP, Azure, DigitalOcean, Oracle Cloud + "169.254.170.2", # AWS ECS task metadata + "169.254.170.23", # AWS EKS Pod Identity Agent + "100.100.100.200", # Alibaba Cloud metadata + "fd00:ec2::254", # AWS EC2 IMDSv2 over IPv6 (Nitro instances) + "fd00:ec2::23", # AWS EKS Pod Identity Agent (IPv6) + "fe80::a9fe:a9fe", # OpenStack Nova metadata (IPv6 link-local) } ) +# Network ranges that are always blocked when block_cloud_metadata=True, +# independent of block_private_ips. The entire link-local range is used by +# cloud metadata services across providers. +_CLOUD_METADATA_NETWORKS: tuple[ipaddress.IPv4Network | ipaddress.IPv6Network, ...] = ( + ipaddress.IPv4Network("169.254.0.0/16"), +) + _CLOUD_METADATA_HOSTNAMES: frozenset[str] = frozenset( { "metadata.google.internal", @@ -160,9 +170,15 @@ def _ip_in_blocked_networks( if isinstance(addr, ipaddress.IPv6Address) and addr == _LOOPBACK_IPV6: return "localhost address" - # Cloud metadata IP check - if policy.block_cloud_metadata and str(addr) in _CLOUD_METADATA_IPS: - return "cloud metadata endpoint" + # Cloud metadata check — IP set *and* network ranges (e.g. 169.254.0.0/16). + # Independent of block_private_ips so that allow_private=True still blocks + # cloud metadata endpoints. + if policy.block_cloud_metadata: + if str(addr) in _CLOUD_METADATA_IPS: + return "cloud metadata endpoint" + for net in _CLOUD_METADATA_NETWORKS: # type: ignore[assignment] + if addr in net: + return "cloud metadata endpoint" return None @@ -223,7 +239,7 @@ async def validate_url(url: str, policy: SSRFPolicy = SSRFPolicy()) -> None: """Validate a URL against the SSRF policy, including DNS resolution. This is the primary entry-point for async code paths. It delegates - scheme/hostname/allowed-hosts checks to ``validate_url_sync``, then + scheme/hostname/allowed-hosts checks to `validate_url_sync`, then resolves DNS and validates every resolved IP. Raises: @@ -256,7 +272,7 @@ def validate_url_sync(url: str, policy: SSRFPolicy = SSRFPolicy()) -> None: """Synchronous URL validation (no DNS resolution). Suitable for Pydantic validators and other sync contexts. Checks scheme - and hostname patterns only - use ``validate_url`` for full DNS-aware checking. + and hostname patterns only - use `validate_url` for full DNS-aware checking. Raises: SSRFBlockedError: If the URL violates the policy. diff --git a/libs/core/langchain_core/_security/_ssrf_protection.py b/libs/core/langchain_core/_security/_ssrf_protection.py index e1f7b482587..0eb3cd7e11c 100644 --- a/libs/core/langchain_core/_security/_ssrf_protection.py +++ b/libs/core/langchain_core/_security/_ssrf_protection.py @@ -51,9 +51,9 @@ def validate_safe_url( Args: url: The URL to validate (string or Pydantic HttpUrl). - allow_private: If ``True``, allows private IPs and localhost (for development). + allow_private: If `True`, allows private IPs and localhost (for development). Cloud metadata endpoints are ALWAYS blocked. - allow_http: If ``True``, allows both HTTP and HTTPS. If ``False``, only HTTPS. + allow_http: If `True`, allows both HTTP and HTTPS. If `False`, only HTTPS. Returns: The validated URL as a string. diff --git a/libs/core/langchain_core/_security/_transport.py b/libs/core/langchain_core/_security/_transport.py index b6efc9ec017..2bbc8d8989f 100644 --- a/libs/core/langchain_core/_security/_transport.py +++ b/libs/core/langchain_core/_security/_transport.py @@ -31,14 +31,14 @@ class SSRFSafeTransport(httpx.AsyncBaseTransport): """httpx async transport that validates DNS results against an SSRF policy. For every outgoing request the transport: - 1. Checks the URL scheme against ``policy.allowed_schemes``. + 1. Checks the URL scheme against `policy.allowed_schemes`. 2. Validates the hostname against blocked patterns. 3. Resolves DNS and validates **all** returned IPs. 4. Rewrites the request to connect to the first valid IP while - preserving the original ``Host`` header and TLS SNI hostname. + preserving the original `Host` header and TLS SNI hostname. - Redirects are re-validated on each hop because ``follow_redirects`` - is set on the *client*, causing ``handle_async_request`` to be called + Redirects are re-validated on each hop because `follow_redirects` + is set on the *client*, causing `handle_async_request` to be called again for each redirect target. """ @@ -225,12 +225,12 @@ def ssrf_safe_async_client( policy: SSRFPolicy = SSRFPolicy(), **kwargs: object, ) -> httpx.AsyncClient: - """Create an ``httpx.AsyncClient`` with SSRF protection. + """Create an `httpx.AsyncClient` with SSRF protection. - Drop-in replacement for ``httpx.AsyncClient(...)`` - callers just swap - the constructor call. Transport-specific kwargs (``verify``, ``cert``, - ``retries``, etc.) are forwarded to the inner ``AsyncHTTPTransport``; - everything else goes to the ``AsyncClient``. + Drop-in replacement for `httpx.AsyncClient(...)` - callers just swap + the constructor call. Transport-specific kwargs (`verify`, `cert`, + `retries`, etc.) are forwarded to the inner `AsyncHTTPTransport`; + everything else goes to the `AsyncClient`. """ transport_kwargs: dict[str, object] = {} client_kwargs: dict[str, object] = {} diff --git a/libs/core/tests/unit_tests/test_ssrf_policy_transport.py b/libs/core/tests/unit_tests/test_ssrf_policy_transport.py index 2a905c1d2bf..fe174c88da9 100644 --- a/libs/core/tests/unit_tests/test_ssrf_policy_transport.py +++ b/libs/core/tests/unit_tests/test_ssrf_policy_transport.py @@ -190,6 +190,46 @@ def test_k8s_still_blocked_when_private_ips_allowed() -> None: validate_url_sync("http://myservice.default.svc.cluster.local/", policy) +# --------------------------------------------------------------------------- +# Cloud metadata: link-local range and restored IPs blocked even with +# block_private_ips=False (regression test for dropped ranges/IPs) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "ip", + [ + "169.254.169.254", + "169.254.170.2", + "169.254.170.23", # AWS EKS Pod Identity Agent + "100.100.100.200", + "fd00:ec2::254", + "fd00:ec2::23", # AWS EKS Pod Identity Agent (IPv6) + "fe80::a9fe:a9fe", # OpenStack Nova metadata + ], +) +def test_cloud_metadata_ips_blocked_when_private_ips_allowed(ip: str) -> None: + policy = SSRFPolicy(block_private_ips=False) + with pytest.raises(SSRFBlockedError, match="cloud metadata endpoint"): + validate_resolved_ip(ip, policy) + + +@pytest.mark.parametrize( + "ip", + [ + "169.254.1.2", + "169.254.255.254", + "169.254.42.99", + ], +) +def test_link_local_range_blocked_as_cloud_metadata_when_private_ips_allowed( + ip: str, +) -> None: + policy = SSRFPolicy(block_private_ips=False) + with pytest.raises(SSRFBlockedError, match="cloud metadata endpoint"): + validate_resolved_ip(ip, policy) + + # --------------------------------------------------------------------------- # Transport: redirect to private IP blocked # ---------------------------------------------------------------------------