fix(ollama): respect scheme-less base_url (#34042)

Fixes #33986.

Summary:
- Normalize scheme-less `base_url` values (e.g., `ollama:11434`) by
defaulting to `http://` when the input resembles `host:port`.
- Preserve and merge `Authorization` headers when `userinfo` credentials
are present, both for sync and async clients.
- Add unit tests covering scheme-less host:port and scheme-less userinfo
credentials.

Implementation details:
- Update `parse_url_with_auth` to accept scheme-less endpoints,
producing a cleaned URL with explicit scheme and extracted auth headers.
- No changes required in `OllamaLLM`, `ChatOllama`, or
`OllamaEmbeddings`—they already consume the cleaned URL and headers.

Why:
- Previously, scheme-less inputs caused `parse_url_with_auth` to return
`(None, None)`, leading Ollama clients to fall back to defaults and
ignore the provided `base_url`.

Tests:
- Extended `libs/partners/ollama/tests/unit_tests/test_auth.py` to cover
the new cases.

Notes:
- Default scheme chosen is `http` to match common Ollama local
deployments. Users can still explicitly provide `https://` when
appropriate.

---------

Co-authored-by: Mason Daugherty <mason@langchain.dev>
Co-authored-by: Mason Daugherty <github@mdrxy.com>
This commit is contained in:
Amber Shen
2026-04-06 18:39:33 -07:00
committed by GitHub
parent 0aa482d0cd
commit 050b779d97
2 changed files with 137 additions and 19 deletions

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
import base64
from urllib.parse import unquote, urlparse
from urllib.parse import ParseResult, unquote, urlparse, urlunparse
from httpx import ConnectError
from ollama import Client, ResponseError
@@ -47,6 +47,33 @@ def validate_model(client: Client, model_name: str) -> None:
raise ValueError(msg) from e
def _build_cleaned_url(parsed: ParseResult) -> str:
"""Reconstruct a URL from parsed components without userinfo.
Args:
parsed: Parsed URL components.
Returns:
Cleaned URL string with userinfo removed.
"""
hostname = parsed.hostname or ""
if ":" in hostname: # IPv6 — re-add brackets stripped by urlparse
hostname = f"[{hostname}]"
cleaned_netloc = hostname
if parsed.port is not None:
cleaned_netloc += f":{parsed.port}"
return urlunparse(
(
parsed.scheme,
cleaned_netloc,
parsed.path,
parsed.params,
parsed.query,
parsed.fragment,
)
)
def parse_url_with_auth(
url: str | None,
) -> tuple[str | None, dict[str, str] | None]:
@@ -54,23 +81,50 @@ def parse_url_with_auth(
Handles URLs of the form: `https://user:password@host:port/path`
Scheme-less URLs (e.g., `host:port`) are also accepted and will be
given a default `http://` scheme.
Args:
url: The URL to parse.
Returns:
A tuple of `(cleaned_url, headers_dict)` where:
- `cleaned_url` is the URL without authentication credentials if any were
found. Otherwise, returns the original URL.
- `cleaned_url` is a normalized URL with credentials stripped (if any
were present) and a scheme guaranteed (defaulting to `http://` for
scheme-less inputs). Returns the original URL unchanged when it
already has a valid scheme and no credentials.
- `headers_dict` contains Authorization header if credentials were found.
"""
if not url:
return None, None
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc or not parsed.hostname:
needs_reconstruction = False
valid = False
if parsed.scheme in {"http", "https"} and parsed.netloc and parsed.hostname:
valid = True
elif not (parsed.scheme and parsed.netloc) and ":" in url:
# No valid scheme but contains colon — try as scheme-less host:port
parsed_with_scheme = urlparse(f"http://{url}")
if parsed_with_scheme.netloc and parsed_with_scheme.hostname:
parsed = parsed_with_scheme
needs_reconstruction = True
valid = True
# Validate port is numeric (urlparse raises ValueError for non-numeric ports)
if valid:
try:
_ = parsed.port
except ValueError:
valid = False
if not valid:
return None, None
if not parsed.username:
return url, None
cleaned = _build_cleaned_url(parsed) if needs_reconstruction else url
return cleaned, None
# Handle case where password might be empty string or None
password = parsed.password or ""
@@ -82,20 +136,7 @@ def parse_url_with_auth(
encoded_credentials = base64.b64encode(credentials.encode()).decode()
headers = {"Authorization": f"Basic {encoded_credentials}"}
# Strip credentials from URL
cleaned_netloc = parsed.hostname or ""
if parsed.port:
cleaned_netloc += f":{parsed.port}"
cleaned_url = f"{parsed.scheme}://{cleaned_netloc}"
if parsed.path:
cleaned_url += parsed.path
if parsed.query:
cleaned_url += f"?{parsed.query}"
if parsed.fragment:
cleaned_url += f"#{parsed.fragment}"
return cleaned_url, headers
return _build_cleaned_url(parsed), headers
def merge_auth_headers(

View File

@@ -25,6 +25,13 @@ class TestParseUrlWithAuth:
result = parse_url_with_auth(url)
assert result == (url, None)
def test_parse_url_with_auth_no_scheme_host_port(self) -> None:
"""Test scheme-less host:port is accepted with default http scheme."""
url = "ollama:11434"
cleaned_url, headers = parse_url_with_auth(url)
assert cleaned_url == "http://ollama:11434"
assert headers is None
def test_parse_url_with_auth_with_credentials(self) -> None:
"""Test URLs with authentication credentials."""
url = "https://user:password@ollama.example.com:11434"
@@ -37,6 +44,32 @@ class TestParseUrlWithAuth:
assert cleaned_url == expected_url
assert headers == expected_headers
def test_parse_url_with_auth_localhost_no_scheme(self) -> None:
"""Test scheme-less localhost:port is accepted."""
url = "localhost:11434"
cleaned_url, headers = parse_url_with_auth(url)
assert cleaned_url == "http://localhost:11434"
assert headers is None
def test_parse_url_with_auth_no_scheme_with_path(self) -> None:
"""Test scheme-less host:port with path and query."""
url = "ollama:11434/v1/chat?timeout=30#section"
cleaned_url, headers = parse_url_with_auth(url)
assert cleaned_url == "http://ollama:11434/v1/chat?timeout=30#section"
assert headers is None
def test_parse_url_with_auth_no_scheme_with_credentials(self) -> None:
"""Test scheme-less URL with authentication credentials."""
url = "user:password@ollama.example.com:11434"
cleaned_url, headers = parse_url_with_auth(url)
expected_url = "http://ollama.example.com:11434"
expected_credentials = base64.b64encode(b"user:password").decode()
expected_headers = {"Authorization": f"Basic {expected_credentials}"}
assert cleaned_url == expected_url
assert headers == expected_headers
def test_parse_url_with_auth_with_path_and_query(self) -> None:
"""Test URLs with auth, path, and query parameters."""
url = "https://user:pass@ollama.example.com:11434/api/v1?timeout=30"
@@ -216,6 +249,50 @@ class TestUrlAuthEdgeCases:
assert cleaned_url == expected_url
assert headers == expected_headers
def test_parse_url_with_auth_empty_string(self) -> None:
"""Test that empty string input returns None, None."""
result = parse_url_with_auth("")
assert result == (None, None)
def test_parse_url_with_auth_bare_hostname(self) -> None:
"""Test that bare hostname without port or scheme is rejected."""
result = parse_url_with_auth("my-ollama-host")
assert result == (None, None)
def test_parse_url_with_auth_model_name_with_colon(self) -> None:
"""Test that model names with colons are rejected, not treated as URLs."""
assert parse_url_with_auth("llama3:latest") == (None, None)
assert parse_url_with_auth("mistral:7b") == (None, None)
def test_parse_url_with_auth_non_http_scheme(self) -> None:
"""Test that non-http/https schemes are rejected."""
assert parse_url_with_auth("ftp://ollama:11434") == (None, None)
def test_parse_url_with_auth_ipv6_no_auth(self) -> None:
"""Test that IPv6 addresses are preserved correctly."""
url = "http://[::1]:11434"
result = parse_url_with_auth(url)
assert result == (url, None)
def test_parse_url_with_auth_ipv6_with_auth(self) -> None:
"""Test that IPv6 addresses with credentials are handled correctly."""
url = "https://user:password@[::1]:11434"
cleaned_url, headers = parse_url_with_auth(url)
expected_url = "https://[::1]:11434"
expected_credentials = base64.b64encode(b"user:password").decode()
expected_headers = {"Authorization": f"Basic {expected_credentials}"}
assert cleaned_url == expected_url
assert headers == expected_headers
def test_parse_url_with_auth_port_zero(self) -> None:
"""Test that port 0 is preserved, not silently dropped."""
url = "http://host:0"
cleaned_url, headers = parse_url_with_auth(url)
assert cleaned_url == url
assert headers is None
def test_parse_url_with_auth_complex_password(self) -> None:
"""Test with complex passwords containing special characters."""
# Test password with colon, which is the delimiter