From dcaf7795a3e6590af55c3ff7bda6add6355e9ea6 Mon Sep 17 00:00:00 2001 From: Nick Hollon Date: Fri, 12 Jun 2026 12:47:23 -0400 Subject: [PATCH] fix(langchain,anthropic): confine file-search results and tighten anthropic `allowed_prefixes` (#38106) --- .../agents/middleware/file_search.py | 88 ++++++--- .../implementations/test_file_search.py | 170 ++++++++++++++++++ .../middleware/anthropic_tools.py | 42 +++-- .../middleware/test_anthropic_tools.py | 27 +++ 4 files changed, 295 insertions(+), 32 deletions(-) diff --git a/libs/langchain_v1/langchain/agents/middleware/file_search.py b/libs/langchain_v1/langchain/agents/middleware/file_search.py index c05bfde7f94..18e6ab12c70 100644 --- a/libs/langchain_v1/langchain/agents/middleware/file_search.py +++ b/libs/langchain_v1/langchain/agents/middleware/file_search.py @@ -8,6 +8,7 @@ from __future__ import annotations import fnmatch import json +import os import re import subprocess from contextlib import suppress @@ -20,6 +21,25 @@ from langchain_core.tools import tool from langchain.agents.middleware.types import AgentMiddleware, AgentState, ContextT, ResponseT +def _is_within_root(candidate: Path, root: Path) -> bool: + """Return True iff `candidate` resolves to a path inside `root` (symlinks resolved). + + Args: + candidate: The path to check. It is resolved (following symlinks and + normalizing `..`) before the containment check. + root: The allowed root directory. Also resolved before comparison. + + Returns: + `True` if the fully resolved `candidate` lies inside the resolved `root`, + using path-segment boundaries; `False` otherwise (including on resolution + errors). + """ + try: + return candidate.resolve().is_relative_to(root.resolve()) + except (OSError, ValueError): + return False + + def _expand_include_patterns(pattern: str) -> list[str] | None: """Expand brace patterns like `*.{py,pyi}` into a list of globs.""" if "}" in pattern and "{" not in pattern: @@ -157,10 +177,21 @@ class FilesystemFileSearchMiddleware(AgentMiddleware[AgentState[ResponseT], Cont if not base_full.exists() or not base_full.is_dir(): return "No files found" + # Reject glob patterns that could escape the root before expanding them. + # `Path.glob` expands a `..` pattern like "../../etc/passwd" against the + # base directory and yields paths outside the allowed root, and raises + # for absolute patterns. (A `~` pattern is never expanduser'd by glob, + # and any escape that slips through is dropped by the per-match + # containment check below.) + if pattern.startswith("/") or any(part == ".." for part in pattern.split("/")): + return "No files found" + # Use pathlib glob matching: list[tuple[str, str]] = [] for match in base_full.glob(pattern): - if match.is_file(): + # Re-check containment after resolving so an in-root symlink that + # points outside the root is never enumerated. + if match.is_file() and _is_within_root(match, self.root_path): # Convert to virtual path virtual_path = "/" + str(match.relative_to(self.root_path)) stat = match.stat() @@ -297,6 +328,10 @@ class FilesystemFileSearchMiddleware(AgentMiddleware[AgentState[ResponseT], Cont data = json.loads(line) if data["type"] == "match": path = data["data"]["path"]["text"] + # Defense in depth: drop any result whose resolved path lies + # outside the root (e.g. surfaced via an in-root symlink). + if not _is_within_root(Path(path), self.root_path): + continue # Convert to virtual path virtual_path = "/" + str(Path(path).relative_to(self.root_path)) line_num = data["data"]["line_number"] @@ -325,31 +360,40 @@ class FilesystemFileSearchMiddleware(AgentMiddleware[AgentState[ResponseT], Cont regex = re.compile(pattern) results: dict[str, list[tuple[int, str]]] = {} - # Walk directory tree - for file_path in base_full.rglob("*"): - if not file_path.is_file(): - continue + # Walk directory tree without following symlinked directories so traversal + # cannot leave the root via a symlinked subdirectory. + for walk_root, _dirs, files in os.walk(base_full, followlinks=False): + for name in files: + file_path = Path(walk_root) / name - # Check include filter - if include and not _match_include_pattern(file_path.name, include): - continue + # Re-check containment after resolving so an in-root symlinked file + # pointing outside the root is never read. + if not _is_within_root(file_path, self.root_path): + continue - # Skip files that are too large - if file_path.stat().st_size > self.max_file_size_bytes: - continue + if not file_path.is_file(): + continue - try: - content = file_path.read_text() - except (UnicodeDecodeError, PermissionError): - continue + # Check include filter + if include and not _match_include_pattern(file_path.name, include): + continue - # Search content - for line_num, line in enumerate(content.splitlines(), 1): - if regex.search(line): - virtual_path = "/" + str(file_path.relative_to(self.root_path)) - if virtual_path not in results: - results[virtual_path] = [] - results[virtual_path].append((line_num, line)) + # Skip files that are too large + if file_path.stat().st_size > self.max_file_size_bytes: + continue + + try: + content = file_path.read_text() + except (UnicodeDecodeError, PermissionError): + continue + + # Search content + for line_num, line in enumerate(content.splitlines(), 1): + if regex.search(line): + virtual_path = "/" + str(file_path.relative_to(self.root_path)) + if virtual_path not in results: + results[virtual_path] = [] + results[virtual_path].append((line_num, line)) return results diff --git a/libs/langchain_v1/tests/unit_tests/agents/middleware/implementations/test_file_search.py b/libs/langchain_v1/tests/unit_tests/agents/middleware/implementations/test_file_search.py index 04a94cc3ef3..c3a58936c2a 100644 --- a/libs/langchain_v1/tests/unit_tests/agents/middleware/implementations/test_file_search.py +++ b/libs/langchain_v1/tests/unit_tests/agents/middleware/implementations/test_file_search.py @@ -11,6 +11,7 @@ from langchain.agents.middleware.file_search import ( FilesystemFileSearchMiddleware, _expand_include_patterns, _is_valid_include_pattern, + _is_within_root, _match_include_pattern, ) @@ -325,6 +326,175 @@ class TestPathTraversalSecurity: assert "secret" not in result +class TestGlobPatternTraversalSecurity: + """Security tests for the glob `pattern` argument (not just the base `path`).""" + + def test_glob_pattern_with_double_dots_rejected(self, tmp_path: Path) -> None: + """A glob pattern containing `..` must not escape the root.""" + root = tmp_path / "root" + root.mkdir() + (root / "inside.txt").write_text("inside", encoding="utf-8") + # Secret lives outside the root. + (tmp_path / "passwd").write_text("secret", encoding="utf-8") + + middleware = FilesystemFileSearchMiddleware(root_path=str(root)) + + assert isinstance(middleware.glob_search, StructuredTool) + assert middleware.glob_search.func is not None + result = middleware.glob_search.func(pattern="../passwd") + + assert result == "No files found" + assert "secret" not in result + assert "passwd" not in result + + def test_glob_pattern_absolute_rejected(self, tmp_path: Path) -> None: + """A glob pattern that is an absolute path must be rejected.""" + root = tmp_path / "root" + root.mkdir() + (root / "inside.txt").write_text("inside", encoding="utf-8") + secret = tmp_path / "secret.txt" + secret.write_text("secret", encoding="utf-8") + + middleware = FilesystemFileSearchMiddleware(root_path=str(root)) + + assert isinstance(middleware.glob_search, StructuredTool) + assert middleware.glob_search.func is not None + result = middleware.glob_search.func(pattern=str(secret)) + + assert result == "No files found" + + def test_glob_pattern_tilde_rejected(self, tmp_path: Path) -> None: + """A glob pattern containing `~` must be rejected.""" + root = tmp_path / "root" + root.mkdir() + (root / "inside.txt").write_text("inside", encoding="utf-8") + + middleware = FilesystemFileSearchMiddleware(root_path=str(root)) + + assert isinstance(middleware.glob_search, StructuredTool) + assert middleware.glob_search.func is not None + result = middleware.glob_search.func(pattern="~/secret") + + assert result == "No files found" + + def test_glob_normal_pattern_still_works(self, tmp_path: Path) -> None: + """A normal in-root pattern must still return expected results.""" + root = tmp_path / "root" + root.mkdir() + (root / "keep.py").write_text("content", encoding="utf-8") + (root / "skip.txt").write_text("content", encoding="utf-8") + + middleware = FilesystemFileSearchMiddleware(root_path=str(root)) + + assert isinstance(middleware.glob_search, StructuredTool) + assert middleware.glob_search.func is not None + result = middleware.glob_search.func(pattern="*.py") + + assert "/keep.py" in result + assert "/skip.txt" not in result + + def test_glob_does_not_follow_inside_symlink_to_outside(self, tmp_path: Path) -> None: + """A symlink inside the root pointing outside must not be enumerated by glob.""" + root = tmp_path / "root" + root.mkdir() + outside = tmp_path / "outside.txt" + outside.write_text("secret-outside", encoding="utf-8") + + try: + (root / "link.txt").symlink_to(outside) + except OSError: + pytest.skip("Symlink creation not supported") + + middleware = FilesystemFileSearchMiddleware(root_path=str(root)) + + assert isinstance(middleware.glob_search, StructuredTool) + assert middleware.glob_search.func is not None + result = middleware.glob_search.func(pattern="*.txt") + + # The symlinked entry resolves outside root, so it must be excluded. + assert "/link.txt" not in result + assert result == "No files found" + + +class TestGrepSymlinkContainment: + """Security tests that grep does not surface out-of-root files via symlinks.""" + + def test_python_grep_does_not_read_inside_symlink_to_outside(self, tmp_path: Path) -> None: + """Python-fallback grep must not read a symlink inside root pointing outside.""" + root = tmp_path / "root" + root.mkdir() + (root / "inside.txt").write_text("nothing-here\n", encoding="utf-8") + outside = tmp_path / "outside.txt" + outside.write_text("TOPSECRET\n", encoding="utf-8") + + try: + (root / "link.txt").symlink_to(outside) + except OSError: + pytest.skip("Symlink creation not supported") + + middleware = FilesystemFileSearchMiddleware(root_path=str(root), use_ripgrep=False) + + assert isinstance(middleware.grep_search, StructuredTool) + assert middleware.grep_search.func is not None + result = middleware.grep_search.func(pattern="TOPSECRET", output_mode="content") + + assert "TOPSECRET" not in result + assert "/link.txt" not in result + assert result == "No matches found" + + def test_python_grep_normal_search_still_works(self, tmp_path: Path) -> None: + """A normal in-root grep must still return expected results.""" + root = tmp_path / "root" + root.mkdir() + (root / "a.py").write_text("needle here\n", encoding="utf-8") + (root / "b.py").write_text("nothing\n", encoding="utf-8") + + middleware = FilesystemFileSearchMiddleware(root_path=str(root), use_ripgrep=False) + + assert isinstance(middleware.grep_search, StructuredTool) + assert middleware.grep_search.func is not None + result = middleware.grep_search.func(pattern="needle") + + assert "/a.py" in result + assert "/b.py" not in result + + +class TestIsWithinRoot: + """Tests for the private `_is_within_root` helper.""" + + def test_in_root_path_is_within(self, tmp_path: Path) -> None: + """A file inside the root is reported as within.""" + root = tmp_path / "root" + root.mkdir() + candidate = root / "file.txt" + candidate.write_text("x", encoding="utf-8") + + assert _is_within_root(candidate, root) is True + + def test_outside_path_is_not_within(self, tmp_path: Path) -> None: + """A file outside the root is reported as not within.""" + root = tmp_path / "root" + root.mkdir() + outside = tmp_path / "outside.txt" + outside.write_text("x", encoding="utf-8") + + assert _is_within_root(outside, root) is False + + def test_symlink_escaping_root_is_not_within(self, tmp_path: Path) -> None: + """A symlink inside the root that resolves outside is not within.""" + root = tmp_path / "root" + root.mkdir() + outside = tmp_path / "outside.txt" + outside.write_text("x", encoding="utf-8") + try: + link = root / "link.txt" + link.symlink_to(outside) + except OSError: + pytest.skip("Symlink creation not supported") + + assert _is_within_root(link, root) is False + + class TestExpandIncludePatterns: """Tests for _expand_include_patterns helper function.""" diff --git a/libs/partners/anthropic/langchain_anthropic/middleware/anthropic_tools.py b/libs/partners/anthropic/langchain_anthropic/middleware/anthropic_tools.py index ffc491a1fe7..ce83f980d5f 100644 --- a/libs/partners/anthropic/langchain_anthropic/middleware/anthropic_tools.py +++ b/libs/partners/anthropic/langchain_anthropic/middleware/anthropic_tools.py @@ -93,6 +93,31 @@ class AnthropicToolsState(AgentState): """Virtual file system for memory tools.""" +def _is_within_allowed_prefix(normalized: str, prefixes: Sequence[str]) -> bool: + """Check whether a normalized path lies within an allowed prefix directory. + + Uses a segment-boundary comparison rather than a raw string prefix test so + that sibling directories sharing a textual prefix cannot escape the allowed + directory. For example, with prefix `/memories` the path `/memories2/evil.txt` + is rejected because it is not the prefix itself nor a descendant of it. + + Args: + normalized: A normalized, forward-slash, absolute-style path. + prefixes: Allowed path prefixes to compare against. + + Returns: + `True` if `normalized` exactly equals one of the prefix directories or is + contained within one of them, `False` otherwise. + """ + for prefix in prefixes: + # Normalize the prefix the same way the path was normalized so the + # comparison is consistent (drop any trailing slash for the boundary). + prefix_dir = prefix.rstrip("/") + if normalized == prefix_dir or normalized.startswith(f"{prefix_dir}/"): + return True + return False + + def _validate_path(path: str, *, allowed_prefixes: Sequence[str] | None = None) -> str: """Validate and normalize file path for security. @@ -122,8 +147,8 @@ def _validate_path(path: str, *, allowed_prefixes: Sequence[str] | None = None) normalized = f"/{normalized}" # Check allowed prefixes if specified - if allowed_prefixes is not None and not any( - normalized.startswith(prefix) for prefix in allowed_prefixes + if allowed_prefixes is not None and not _is_within_allowed_prefix( + normalized, allowed_prefixes ): msg = f"Path must start with one of {allowed_prefixes}: {path}" raise ValueError(msg) @@ -847,14 +872,11 @@ class _FilesystemClaudeFileToolMiddleware(AgentMiddleware): # Check allowed prefixes virtual_path = "/" + str(full_path.relative_to(self.root_path)) - if self.allowed_prefixes: - allowed = any( - virtual_path.startswith(prefix) or virtual_path == prefix.rstrip("/") - for prefix in self.allowed_prefixes - ) - if not allowed: - msg = f"Path must start with one of: {self.allowed_prefixes}" - raise ValueError(msg) + if self.allowed_prefixes and not _is_within_allowed_prefix( + virtual_path, self.allowed_prefixes + ): + msg = f"Path must start with one of: {self.allowed_prefixes}" + raise ValueError(msg) return full_path diff --git a/libs/partners/anthropic/tests/unit_tests/middleware/test_anthropic_tools.py b/libs/partners/anthropic/tests/unit_tests/middleware/test_anthropic_tools.py index bd523d17268..212c0eb6594 100644 --- a/libs/partners/anthropic/tests/unit_tests/middleware/test_anthropic_tools.py +++ b/libs/partners/anthropic/tests/unit_tests/middleware/test_anthropic_tools.py @@ -62,6 +62,33 @@ class TestPathValidation: with pytest.raises(ValueError, match="Path must start with"): _validate_path("/other/notes.txt", allowed_prefixes=["/memories"]) + def test_prefix_boundary_bypass_blocked(self) -> None: + """Test that sibling directories sharing a textual prefix are rejected. + + Regression test: a raw `str.startswith` check is not segment-aware, so a + path like `/memories2/evil.txt` would slip past a `/memories` prefix even + though it lives outside the intended directory. + """ + # Sibling directory sharing the textual prefix must be rejected. + with pytest.raises(ValueError, match="Path must start with"): + _validate_path("/memories2/evil.txt", allowed_prefixes=["/memories"]) + + # A genuine descendant of the prefix is accepted. + assert ( + _validate_path("/memories/ok.txt", allowed_prefixes=["/memories"]) + == "/memories/ok.txt" + ) + + # The prefix directory itself is accepted (exact match). + assert ( + _validate_path("/memories", allowed_prefixes=["/memories"]) == "/memories" + ) + + def test_prefix_boundary_traversal_still_blocked(self) -> None: + """Test that traversal within an allowed prefix is still rejected.""" + with pytest.raises(ValueError, match="Path traversal not allowed"): + _validate_path("/memories/../escape", allowed_prefixes=["/memories"]) + class TestTextEditorMiddleware: """Test text editor middleware functionality."""