fix(langchain): activate mypy warn-unreachable (#34553)

Co-authored-by: Mason Daugherty <mason@langchain.dev>
This commit is contained in:
Christophe Bornet
2026-01-10 04:11:16 +01:00
committed by GitHub
parent 5dc8ba3c99
commit fc417aaf17
9 changed files with 37 additions and 45 deletions

View File

@@ -417,18 +417,15 @@ def _handle_structured_output_error(
return True, STRUCTURED_OUTPUT_ERROR_TEMPLATE.format(error=str(exception))
if isinstance(handle_errors, str):
return True, handle_errors
if isinstance(handle_errors, type) and issubclass(handle_errors, Exception):
if isinstance(exception, handle_errors):
if isinstance(handle_errors, type):
if issubclass(handle_errors, Exception) and isinstance(exception, handle_errors):
return True, STRUCTURED_OUTPUT_ERROR_TEMPLATE.format(error=str(exception))
return False, ""
if isinstance(handle_errors, tuple):
if any(isinstance(exception, exc_type) for exc_type in handle_errors):
return True, STRUCTURED_OUTPUT_ERROR_TEMPLATE.format(error=str(exception))
return False, ""
if callable(handle_errors):
# type narrowing not working appropriately w/ callable check, can fix later
return True, handle_errors(exception) # type: ignore[return-value,call-arg]
return False, ""
return True, handle_errors(exception)
def _chain_tool_call_wrappers(
@@ -547,7 +544,7 @@ def create_agent(
*,
system_prompt: str | SystemMessage | None = None,
middleware: Sequence[AgentMiddleware[StateT_co, ContextT]] = (),
response_format: ResponseFormat[ResponseT] | type[ResponseT] | None = None,
response_format: ResponseFormat[ResponseT] | type[ResponseT] | dict[str, Any] | None = None,
state_schema: type[AgentState[ResponseT]] | None = None,
context_schema: type[ContextT] | None = None,
checkpointer: Checkpointer | None = None,

View File

@@ -15,8 +15,10 @@ from pathlib import Path
try: # pragma: no cover - optional dependency on POSIX platforms
import resource
_HAS_RESOURCE = True
except ImportError: # pragma: no cover - non-POSIX systems
resource = None # type: ignore[assignment]
_HAS_RESOURCE = False
SHELL_TEMP_PREFIX = "langchain-shell-"
@@ -119,7 +121,7 @@ class HostExecutionPolicy(BaseExecutionPolicy):
self._limits_requested = any(
value is not None for value in (self.cpu_time_seconds, self.memory_bytes)
)
if self._limits_requested and resource is None:
if self._limits_requested and not _HAS_RESOURCE:
msg = (
"HostExecutionPolicy cpu/memory limits require the Python 'resource' module. "
"Either remove the limits or run on a POSIX platform."
@@ -163,11 +165,9 @@ class HostExecutionPolicy(BaseExecutionPolicy):
def _apply_post_spawn_limits(self, process: subprocess.Popen[str]) -> None:
if not self._limits_requested or not self._can_use_prlimit():
return
if resource is None: # pragma: no cover - defensive
if not _HAS_RESOURCE: # pragma: no cover - defensive
return
pid = process.pid
if pid is None:
return
try:
prlimit = typing.cast("typing.Any", resource).prlimit
if self.cpu_time_seconds is not None:
@@ -184,11 +184,7 @@ class HostExecutionPolicy(BaseExecutionPolicy):
@staticmethod
def _can_use_prlimit() -> bool:
return (
resource is not None
and hasattr(resource, "prlimit")
and sys.platform.startswith("linux")
)
return _HAS_RESOURCE and hasattr(resource, "prlimit") and sys.platform.startswith("linux")
@dataclass
@@ -251,9 +247,9 @@ class CodexSandboxExecutionPolicy(BaseExecutionPolicy):
return self.platform
if sys.platform.startswith("linux"):
return "linux"
if sys.platform == "darwin":
if sys.platform == "darwin": # type: ignore[unreachable, unused-ignore]
return "macos"
msg = (
msg = ( # type: ignore[unreachable, unused-ignore]
"Codex sandbox policy could not determine a supported platform; "
"set 'platform' explicitly."
)

View File

@@ -332,7 +332,7 @@ def apply_strategy(
return _apply_hash_strategy(content, matches)
if strategy == "block":
raise PIIDetectionError(matches[0]["type"], matches)
msg = f"Unknown redaction strategy: {strategy}"
msg = f"Unknown redaction strategy: {strategy}" # type: ignore[unreachable]
raise ValueError(msg)

View File

@@ -608,7 +608,7 @@ class ShellToolMiddleware(AgentMiddleware[ShellToolState, Any]):
normalized: dict[str, str] = {}
for key, value in env.items():
if not isinstance(key, str):
msg = "Environment variable names must be strings."
msg = "Environment variable names must be strings." # type: ignore[unreachable]
raise TypeError(msg)
normalized[key] = str(value)
return normalized

View File

@@ -189,14 +189,14 @@ class ToolRetryMiddleware(AgentMiddleware):
# Handle backwards compatibility for deprecated on_failure values
if on_failure == "raise": # type: ignore[comparison-overlap]
msg = (
msg = ( # type: ignore[unreachable]
"on_failure='raise' is deprecated and will be removed in a future version. "
"Use on_failure='error' instead."
)
warnings.warn(msg, DeprecationWarning, stacklevel=2)
on_failure = "error"
elif on_failure == "return_message": # type: ignore[comparison-overlap]
msg = (
msg = ( # type: ignore[unreachable]
"on_failure='return_message' is deprecated and will be removed "
"in a future version. Use on_failure='continue' instead."
)

View File

@@ -254,7 +254,7 @@ class ModelRequest:
raise ValueError(msg)
if "system_prompt" in overrides:
system_prompt = cast("str", overrides.pop("system_prompt")) # type: ignore[typeddict-item]
system_prompt = cast("str | None", overrides.pop("system_prompt")) # type: ignore[typeddict-item]
if system_prompt is None:
overrides["system_message"] = None
else:

View File

@@ -106,7 +106,7 @@ def _parse_with_schema(
class _SchemaSpec(Generic[SchemaT]):
"""Describes a structured output schema."""
schema: type[SchemaT]
schema: type[SchemaT] | dict[str, Any]
"""The schema for the response, can be a Pydantic model, `dataclass`, `TypedDict`,
or JSON schema dict."""
@@ -134,7 +134,7 @@ class _SchemaSpec(Generic[SchemaT]):
def __init__(
self,
schema: type[SchemaT],
schema: type[SchemaT] | dict[str, Any],
*,
name: str | None = None,
description: str | None = None,
@@ -257,7 +257,7 @@ class ToolStrategy(Generic[SchemaT]):
class ProviderStrategy(Generic[SchemaT]):
"""Use the model provider's native structured output method."""
schema: type[SchemaT]
schema: type[SchemaT] | dict[str, Any]
"""Schema for native mode."""
schema_spec: _SchemaSpec[SchemaT]
@@ -265,7 +265,7 @@ class ProviderStrategy(Generic[SchemaT]):
def __init__(
self,
schema: type[SchemaT],
schema: type[SchemaT] | dict[str, Any],
*,
strict: bool | None = None,
) -> None:
@@ -309,7 +309,7 @@ class OutputToolBinding(Generic[SchemaT]):
and the corresponding tool implementation used by the tools strategy.
"""
schema: type[SchemaT]
schema: type[SchemaT] | dict[str, Any]
"""The original schema provided for structured output
(Pydantic model, dataclass, TypedDict, or JSON schema dict)."""
@@ -363,7 +363,7 @@ class ProviderStrategyBinding(Generic[SchemaT]):
its type classification, and parsing logic for provider-enforced JSON.
"""
schema: type[SchemaT]
schema: type[SchemaT] | dict[str, Any]
"""The original schema provided for structured output
(Pydantic model, `dataclass`, `TypedDict`, or JSON schema dict)."""
@@ -426,29 +426,27 @@ class ProviderStrategyBinding(Generic[SchemaT]):
content = message.content
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for c in content:
if isinstance(c, dict):
if c.get("type") == "text" and "text" in c:
parts.append(str(c["text"]))
elif "content" in c and isinstance(c["content"], str):
parts.append(c["content"])
else:
parts.append(str(c))
return "".join(parts)
return str(content)
parts: list[str] = []
for c in content:
if isinstance(c, dict):
if c.get("type") == "text" and "text" in c:
parts.append(str(c["text"]))
elif "content" in c and isinstance(c["content"], str):
parts.append(c["content"])
else:
parts.append(str(c))
return "".join(parts)
class AutoStrategy(Generic[SchemaT]):
"""Automatically select the best strategy for structured output."""
schema: type[SchemaT]
schema: type[SchemaT] | dict[str, Any]
"""Schema for automatic mode."""
def __init__(
self,
schema: type[SchemaT],
schema: type[SchemaT] | dict[str, Any],
) -> None:
"""Initialize AutoStrategy with schema."""
self.schema = schema

View File

@@ -92,6 +92,7 @@ line-length = 100
strict = true
ignore_missing_imports = true
enable_error_code = "deprecated"
warn_unreachable = true
exclude = ["tests/unit_tests/agents/*"]
# TODO: activate for 'strict' checking

View File

@@ -64,7 +64,7 @@ def test_host_policy_validations() -> None:
def test_host_policy_requires_resource_for_limits(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(_execution, "resource", None, raising=False)
monkeypatch.setattr(_execution, "_HAS_RESOURCE", False, raising=False)
with pytest.raises(RuntimeError):
HostExecutionPolicy(cpu_time_seconds=1)