mirror of
https://github.com/hwchase17/langchain.git
synced 2026-07-01 14:47:02 +00:00
fix(langchain,anthropic): confine file-search results and tighten anthropic allowed_prefixes (#38106)
This commit is contained in:
@@ -8,6 +8,7 @@ from __future__ import annotations
|
||||
|
||||
import fnmatch
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from contextlib import suppress
|
||||
@@ -20,6 +21,25 @@ from langchain_core.tools import tool
|
||||
from langchain.agents.middleware.types import AgentMiddleware, AgentState, ContextT, ResponseT
|
||||
|
||||
|
||||
def _is_within_root(candidate: Path, root: Path) -> bool:
|
||||
"""Return True iff `candidate` resolves to a path inside `root` (symlinks resolved).
|
||||
|
||||
Args:
|
||||
candidate: The path to check. It is resolved (following symlinks and
|
||||
normalizing `..`) before the containment check.
|
||||
root: The allowed root directory. Also resolved before comparison.
|
||||
|
||||
Returns:
|
||||
`True` if the fully resolved `candidate` lies inside the resolved `root`,
|
||||
using path-segment boundaries; `False` otherwise (including on resolution
|
||||
errors).
|
||||
"""
|
||||
try:
|
||||
return candidate.resolve().is_relative_to(root.resolve())
|
||||
except (OSError, ValueError):
|
||||
return False
|
||||
|
||||
|
||||
def _expand_include_patterns(pattern: str) -> list[str] | None:
|
||||
"""Expand brace patterns like `*.{py,pyi}` into a list of globs."""
|
||||
if "}" in pattern and "{" not in pattern:
|
||||
@@ -157,10 +177,21 @@ class FilesystemFileSearchMiddleware(AgentMiddleware[AgentState[ResponseT], Cont
|
||||
if not base_full.exists() or not base_full.is_dir():
|
||||
return "No files found"
|
||||
|
||||
# Reject glob patterns that could escape the root before expanding them.
|
||||
# `Path.glob` expands a `..` pattern like "../../etc/passwd" against the
|
||||
# base directory and yields paths outside the allowed root, and raises
|
||||
# for absolute patterns. (A `~` pattern is never expanduser'd by glob,
|
||||
# and any escape that slips through is dropped by the per-match
|
||||
# containment check below.)
|
||||
if pattern.startswith("/") or any(part == ".." for part in pattern.split("/")):
|
||||
return "No files found"
|
||||
|
||||
# Use pathlib glob
|
||||
matching: list[tuple[str, str]] = []
|
||||
for match in base_full.glob(pattern):
|
||||
if match.is_file():
|
||||
# Re-check containment after resolving so an in-root symlink that
|
||||
# points outside the root is never enumerated.
|
||||
if match.is_file() and _is_within_root(match, self.root_path):
|
||||
# Convert to virtual path
|
||||
virtual_path = "/" + str(match.relative_to(self.root_path))
|
||||
stat = match.stat()
|
||||
@@ -297,6 +328,10 @@ class FilesystemFileSearchMiddleware(AgentMiddleware[AgentState[ResponseT], Cont
|
||||
data = json.loads(line)
|
||||
if data["type"] == "match":
|
||||
path = data["data"]["path"]["text"]
|
||||
# Defense in depth: drop any result whose resolved path lies
|
||||
# outside the root (e.g. surfaced via an in-root symlink).
|
||||
if not _is_within_root(Path(path), self.root_path):
|
||||
continue
|
||||
# Convert to virtual path
|
||||
virtual_path = "/" + str(Path(path).relative_to(self.root_path))
|
||||
line_num = data["data"]["line_number"]
|
||||
@@ -325,31 +360,40 @@ class FilesystemFileSearchMiddleware(AgentMiddleware[AgentState[ResponseT], Cont
|
||||
regex = re.compile(pattern)
|
||||
results: dict[str, list[tuple[int, str]]] = {}
|
||||
|
||||
# Walk directory tree
|
||||
for file_path in base_full.rglob("*"):
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
# Walk directory tree without following symlinked directories so traversal
|
||||
# cannot leave the root via a symlinked subdirectory.
|
||||
for walk_root, _dirs, files in os.walk(base_full, followlinks=False):
|
||||
for name in files:
|
||||
file_path = Path(walk_root) / name
|
||||
|
||||
# Check include filter
|
||||
if include and not _match_include_pattern(file_path.name, include):
|
||||
continue
|
||||
# Re-check containment after resolving so an in-root symlinked file
|
||||
# pointing outside the root is never read.
|
||||
if not _is_within_root(file_path, self.root_path):
|
||||
continue
|
||||
|
||||
# Skip files that are too large
|
||||
if file_path.stat().st_size > self.max_file_size_bytes:
|
||||
continue
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
|
||||
try:
|
||||
content = file_path.read_text()
|
||||
except (UnicodeDecodeError, PermissionError):
|
||||
continue
|
||||
# Check include filter
|
||||
if include and not _match_include_pattern(file_path.name, include):
|
||||
continue
|
||||
|
||||
# Search content
|
||||
for line_num, line in enumerate(content.splitlines(), 1):
|
||||
if regex.search(line):
|
||||
virtual_path = "/" + str(file_path.relative_to(self.root_path))
|
||||
if virtual_path not in results:
|
||||
results[virtual_path] = []
|
||||
results[virtual_path].append((line_num, line))
|
||||
# Skip files that are too large
|
||||
if file_path.stat().st_size > self.max_file_size_bytes:
|
||||
continue
|
||||
|
||||
try:
|
||||
content = file_path.read_text()
|
||||
except (UnicodeDecodeError, PermissionError):
|
||||
continue
|
||||
|
||||
# Search content
|
||||
for line_num, line in enumerate(content.splitlines(), 1):
|
||||
if regex.search(line):
|
||||
virtual_path = "/" + str(file_path.relative_to(self.root_path))
|
||||
if virtual_path not in results:
|
||||
results[virtual_path] = []
|
||||
results[virtual_path].append((line_num, line))
|
||||
|
||||
return results
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ from langchain.agents.middleware.file_search import (
|
||||
FilesystemFileSearchMiddleware,
|
||||
_expand_include_patterns,
|
||||
_is_valid_include_pattern,
|
||||
_is_within_root,
|
||||
_match_include_pattern,
|
||||
)
|
||||
|
||||
@@ -325,6 +326,175 @@ class TestPathTraversalSecurity:
|
||||
assert "secret" not in result
|
||||
|
||||
|
||||
class TestGlobPatternTraversalSecurity:
|
||||
"""Security tests for the glob `pattern` argument (not just the base `path`)."""
|
||||
|
||||
def test_glob_pattern_with_double_dots_rejected(self, tmp_path: Path) -> None:
|
||||
"""A glob pattern containing `..` must not escape the root."""
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
(root / "inside.txt").write_text("inside", encoding="utf-8")
|
||||
# Secret lives outside the root.
|
||||
(tmp_path / "passwd").write_text("secret", encoding="utf-8")
|
||||
|
||||
middleware = FilesystemFileSearchMiddleware(root_path=str(root))
|
||||
|
||||
assert isinstance(middleware.glob_search, StructuredTool)
|
||||
assert middleware.glob_search.func is not None
|
||||
result = middleware.glob_search.func(pattern="../passwd")
|
||||
|
||||
assert result == "No files found"
|
||||
assert "secret" not in result
|
||||
assert "passwd" not in result
|
||||
|
||||
def test_glob_pattern_absolute_rejected(self, tmp_path: Path) -> None:
|
||||
"""A glob pattern that is an absolute path must be rejected."""
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
(root / "inside.txt").write_text("inside", encoding="utf-8")
|
||||
secret = tmp_path / "secret.txt"
|
||||
secret.write_text("secret", encoding="utf-8")
|
||||
|
||||
middleware = FilesystemFileSearchMiddleware(root_path=str(root))
|
||||
|
||||
assert isinstance(middleware.glob_search, StructuredTool)
|
||||
assert middleware.glob_search.func is not None
|
||||
result = middleware.glob_search.func(pattern=str(secret))
|
||||
|
||||
assert result == "No files found"
|
||||
|
||||
def test_glob_pattern_tilde_rejected(self, tmp_path: Path) -> None:
|
||||
"""A glob pattern containing `~` must be rejected."""
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
(root / "inside.txt").write_text("inside", encoding="utf-8")
|
||||
|
||||
middleware = FilesystemFileSearchMiddleware(root_path=str(root))
|
||||
|
||||
assert isinstance(middleware.glob_search, StructuredTool)
|
||||
assert middleware.glob_search.func is not None
|
||||
result = middleware.glob_search.func(pattern="~/secret")
|
||||
|
||||
assert result == "No files found"
|
||||
|
||||
def test_glob_normal_pattern_still_works(self, tmp_path: Path) -> None:
|
||||
"""A normal in-root pattern must still return expected results."""
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
(root / "keep.py").write_text("content", encoding="utf-8")
|
||||
(root / "skip.txt").write_text("content", encoding="utf-8")
|
||||
|
||||
middleware = FilesystemFileSearchMiddleware(root_path=str(root))
|
||||
|
||||
assert isinstance(middleware.glob_search, StructuredTool)
|
||||
assert middleware.glob_search.func is not None
|
||||
result = middleware.glob_search.func(pattern="*.py")
|
||||
|
||||
assert "/keep.py" in result
|
||||
assert "/skip.txt" not in result
|
||||
|
||||
def test_glob_does_not_follow_inside_symlink_to_outside(self, tmp_path: Path) -> None:
|
||||
"""A symlink inside the root pointing outside must not be enumerated by glob."""
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
outside = tmp_path / "outside.txt"
|
||||
outside.write_text("secret-outside", encoding="utf-8")
|
||||
|
||||
try:
|
||||
(root / "link.txt").symlink_to(outside)
|
||||
except OSError:
|
||||
pytest.skip("Symlink creation not supported")
|
||||
|
||||
middleware = FilesystemFileSearchMiddleware(root_path=str(root))
|
||||
|
||||
assert isinstance(middleware.glob_search, StructuredTool)
|
||||
assert middleware.glob_search.func is not None
|
||||
result = middleware.glob_search.func(pattern="*.txt")
|
||||
|
||||
# The symlinked entry resolves outside root, so it must be excluded.
|
||||
assert "/link.txt" not in result
|
||||
assert result == "No files found"
|
||||
|
||||
|
||||
class TestGrepSymlinkContainment:
|
||||
"""Security tests that grep does not surface out-of-root files via symlinks."""
|
||||
|
||||
def test_python_grep_does_not_read_inside_symlink_to_outside(self, tmp_path: Path) -> None:
|
||||
"""Python-fallback grep must not read a symlink inside root pointing outside."""
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
(root / "inside.txt").write_text("nothing-here\n", encoding="utf-8")
|
||||
outside = tmp_path / "outside.txt"
|
||||
outside.write_text("TOPSECRET\n", encoding="utf-8")
|
||||
|
||||
try:
|
||||
(root / "link.txt").symlink_to(outside)
|
||||
except OSError:
|
||||
pytest.skip("Symlink creation not supported")
|
||||
|
||||
middleware = FilesystemFileSearchMiddleware(root_path=str(root), use_ripgrep=False)
|
||||
|
||||
assert isinstance(middleware.grep_search, StructuredTool)
|
||||
assert middleware.grep_search.func is not None
|
||||
result = middleware.grep_search.func(pattern="TOPSECRET", output_mode="content")
|
||||
|
||||
assert "TOPSECRET" not in result
|
||||
assert "/link.txt" not in result
|
||||
assert result == "No matches found"
|
||||
|
||||
def test_python_grep_normal_search_still_works(self, tmp_path: Path) -> None:
|
||||
"""A normal in-root grep must still return expected results."""
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
(root / "a.py").write_text("needle here\n", encoding="utf-8")
|
||||
(root / "b.py").write_text("nothing\n", encoding="utf-8")
|
||||
|
||||
middleware = FilesystemFileSearchMiddleware(root_path=str(root), use_ripgrep=False)
|
||||
|
||||
assert isinstance(middleware.grep_search, StructuredTool)
|
||||
assert middleware.grep_search.func is not None
|
||||
result = middleware.grep_search.func(pattern="needle")
|
||||
|
||||
assert "/a.py" in result
|
||||
assert "/b.py" not in result
|
||||
|
||||
|
||||
class TestIsWithinRoot:
|
||||
"""Tests for the private `_is_within_root` helper."""
|
||||
|
||||
def test_in_root_path_is_within(self, tmp_path: Path) -> None:
|
||||
"""A file inside the root is reported as within."""
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
candidate = root / "file.txt"
|
||||
candidate.write_text("x", encoding="utf-8")
|
||||
|
||||
assert _is_within_root(candidate, root) is True
|
||||
|
||||
def test_outside_path_is_not_within(self, tmp_path: Path) -> None:
|
||||
"""A file outside the root is reported as not within."""
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
outside = tmp_path / "outside.txt"
|
||||
outside.write_text("x", encoding="utf-8")
|
||||
|
||||
assert _is_within_root(outside, root) is False
|
||||
|
||||
def test_symlink_escaping_root_is_not_within(self, tmp_path: Path) -> None:
|
||||
"""A symlink inside the root that resolves outside is not within."""
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
outside = tmp_path / "outside.txt"
|
||||
outside.write_text("x", encoding="utf-8")
|
||||
try:
|
||||
link = root / "link.txt"
|
||||
link.symlink_to(outside)
|
||||
except OSError:
|
||||
pytest.skip("Symlink creation not supported")
|
||||
|
||||
assert _is_within_root(link, root) is False
|
||||
|
||||
|
||||
class TestExpandIncludePatterns:
|
||||
"""Tests for _expand_include_patterns helper function."""
|
||||
|
||||
|
||||
@@ -93,6 +93,31 @@ class AnthropicToolsState(AgentState):
|
||||
"""Virtual file system for memory tools."""
|
||||
|
||||
|
||||
def _is_within_allowed_prefix(normalized: str, prefixes: Sequence[str]) -> bool:
|
||||
"""Check whether a normalized path lies within an allowed prefix directory.
|
||||
|
||||
Uses a segment-boundary comparison rather than a raw string prefix test so
|
||||
that sibling directories sharing a textual prefix cannot escape the allowed
|
||||
directory. For example, with prefix `/memories` the path `/memories2/evil.txt`
|
||||
is rejected because it is not the prefix itself nor a descendant of it.
|
||||
|
||||
Args:
|
||||
normalized: A normalized, forward-slash, absolute-style path.
|
||||
prefixes: Allowed path prefixes to compare against.
|
||||
|
||||
Returns:
|
||||
`True` if `normalized` exactly equals one of the prefix directories or is
|
||||
contained within one of them, `False` otherwise.
|
||||
"""
|
||||
for prefix in prefixes:
|
||||
# Normalize the prefix the same way the path was normalized so the
|
||||
# comparison is consistent (drop any trailing slash for the boundary).
|
||||
prefix_dir = prefix.rstrip("/")
|
||||
if normalized == prefix_dir or normalized.startswith(f"{prefix_dir}/"):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _validate_path(path: str, *, allowed_prefixes: Sequence[str] | None = None) -> str:
|
||||
"""Validate and normalize file path for security.
|
||||
|
||||
@@ -122,8 +147,8 @@ def _validate_path(path: str, *, allowed_prefixes: Sequence[str] | None = None)
|
||||
normalized = f"/{normalized}"
|
||||
|
||||
# Check allowed prefixes if specified
|
||||
if allowed_prefixes is not None and not any(
|
||||
normalized.startswith(prefix) for prefix in allowed_prefixes
|
||||
if allowed_prefixes is not None and not _is_within_allowed_prefix(
|
||||
normalized, allowed_prefixes
|
||||
):
|
||||
msg = f"Path must start with one of {allowed_prefixes}: {path}"
|
||||
raise ValueError(msg)
|
||||
@@ -847,14 +872,11 @@ class _FilesystemClaudeFileToolMiddleware(AgentMiddleware):
|
||||
|
||||
# Check allowed prefixes
|
||||
virtual_path = "/" + str(full_path.relative_to(self.root_path))
|
||||
if self.allowed_prefixes:
|
||||
allowed = any(
|
||||
virtual_path.startswith(prefix) or virtual_path == prefix.rstrip("/")
|
||||
for prefix in self.allowed_prefixes
|
||||
)
|
||||
if not allowed:
|
||||
msg = f"Path must start with one of: {self.allowed_prefixes}"
|
||||
raise ValueError(msg)
|
||||
if self.allowed_prefixes and not _is_within_allowed_prefix(
|
||||
virtual_path, self.allowed_prefixes
|
||||
):
|
||||
msg = f"Path must start with one of: {self.allowed_prefixes}"
|
||||
raise ValueError(msg)
|
||||
|
||||
return full_path
|
||||
|
||||
|
||||
@@ -62,6 +62,33 @@ class TestPathValidation:
|
||||
with pytest.raises(ValueError, match="Path must start with"):
|
||||
_validate_path("/other/notes.txt", allowed_prefixes=["/memories"])
|
||||
|
||||
def test_prefix_boundary_bypass_blocked(self) -> None:
|
||||
"""Test that sibling directories sharing a textual prefix are rejected.
|
||||
|
||||
Regression test: a raw `str.startswith` check is not segment-aware, so a
|
||||
path like `/memories2/evil.txt` would slip past a `/memories` prefix even
|
||||
though it lives outside the intended directory.
|
||||
"""
|
||||
# Sibling directory sharing the textual prefix must be rejected.
|
||||
with pytest.raises(ValueError, match="Path must start with"):
|
||||
_validate_path("/memories2/evil.txt", allowed_prefixes=["/memories"])
|
||||
|
||||
# A genuine descendant of the prefix is accepted.
|
||||
assert (
|
||||
_validate_path("/memories/ok.txt", allowed_prefixes=["/memories"])
|
||||
== "/memories/ok.txt"
|
||||
)
|
||||
|
||||
# The prefix directory itself is accepted (exact match).
|
||||
assert (
|
||||
_validate_path("/memories", allowed_prefixes=["/memories"]) == "/memories"
|
||||
)
|
||||
|
||||
def test_prefix_boundary_traversal_still_blocked(self) -> None:
|
||||
"""Test that traversal within an allowed prefix is still rejected."""
|
||||
with pytest.raises(ValueError, match="Path traversal not allowed"):
|
||||
_validate_path("/memories/../escape", allowed_prefixes=["/memories"])
|
||||
|
||||
|
||||
class TestTextEditorMiddleware:
|
||||
"""Test text editor middleware functionality."""
|
||||
|
||||
Reference in New Issue
Block a user