Compare commits

...

16 Commits

Author SHA1 Message Date
Nick Huang
ecbd43b660 Change how namespace is built 2025-10-13 14:45:05 -04:00
Nick Huang
918e2ee567 Update default model 2025-10-10 17:22:22 -04:00
Nick Huang
bcbf2fff5f Fix import in test 2025-10-10 16:05:00 -04:00
Nick Huang
206fa156ac Add dependencies to tests 2025-10-10 16:01:25 -04:00
Nick Huang
1713a149e4 Address comments 2025-10-10 15:45:50 -04:00
Nick Huang
f682a9e0a4 Fix and simplify filesystem substantially - add integration tests for filesystem 2025-10-10 14:48:38 -04:00
Nick Huang
c7a52e8c78 Fix formatting 2025-10-09 23:31:54 -04:00
Nick Huang
9cab0be047 Update imports 2025-10-09 23:16:06 -04:00
Nick Huang
7340cb5353 Share code between anthropic tools and filesystem, add deepagents 2025-10-09 22:09:19 -04:00
Nick Huang
7eedbf2a40 Merge remote-tracking branch 'origin/nc/9oct/file-tools-middleware' into nh/subagent-middleware 2025-10-09 17:31:14 -04:00
Nick Huang
203323d249 Remove claude file 2025-10-09 17:22:39 -04:00
Nick Huang
290e83863f resolve merge conflicts 2025-10-09 17:20:56 -04:00
Nick Huang
6b6057ff1d Resolve merge conflicts 2025-10-09 17:17:20 -04:00
Nick Huang
544a755887 Fix linting 2025-10-09 17:14:40 -04:00
Nick Huang
89d0fff26f Add subagents middleware 2025-10-09 16:42:51 -04:00
Nuno Campos
bd5ea78bfd feat(langchain_v1): Add Anthropic tools middleware with text editor, memory, and file
search

Middleware Classes

Text Editor Tools
- StateClaudeTextEditorToolMiddleware: In-memory text editor using agent state
- FilesystemClaudeTextEditorToolMiddleware: Text editor operating on real filesystem

Implementing Claude's text editor tools
https://docs.claude.com/en/docs/agents-and-tools/tool-use/text-editor-tool Operations:
view, create, str_replace, insert

Memory Tools
- StateClaudeMemoryToolMiddleware: Memory persistence in agent state
- FilesystemClaudeMemoryToolMiddleware: Memory persistence on filesystem

Implementing Claude's memory tools
https://docs.claude.com/en/docs/agents-and-tools/tool-use/memory-tool Operations: Same
as text editor plus delete and rename

File Search Tools
- StateFileSearchMiddleware: Search state-based files
- FilesystemFileSearchMiddleware: Search real filesystem

Provides Glob and Grep tools with same schema as used by Claude Code (but compatible
with any model)
- Glob: Pattern matching (e.g., **/*.py, src/**/*.ts), sorted by modification time
- Grep: Regex content search with output modes (files_with_matches, content, count)

Usage

``` from langchain.agents import create_agent from langchain.agents.middleware import (
StateTextEditorToolMiddleware, StateFileSearchMiddleware, )

agent = create_agent( model=model, tools=[], middleware=[
StateTextEditorToolMiddleware(), StateFileSearchMiddleware(), ], ) ```
2025-10-09 12:58:08 +01:00
18 changed files with 4944 additions and 2 deletions

View File

@@ -0,0 +1,283 @@
"""Shared utility functions for file operations in middleware."""
from __future__ import annotations
import os
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Literal
from typing_extensions import TypedDict
if TYPE_CHECKING:
from collections.abc import Sequence
class FileData(TypedDict):
"""Data structure for storing file contents with metadata."""
content: list[str]
"""Lines of the file."""
created_at: str
"""ISO 8601 timestamp of file creation."""
modified_at: str
"""ISO 8601 timestamp of last modification."""
def file_data_reducer(
left: dict[str, FileData] | None, right: dict[str, FileData | None]
) -> dict[str, FileData]:
"""Custom reducer that merges file updates.
Args:
left: Existing files dict.
right: New files dict to merge (None values delete files).
Returns:
Merged dict where right overwrites left for matching keys.
"""
if left is None:
# Filter out None values when initializing
return {k: v for k, v in right.items() if v is not None}
# Merge, filtering out None values (deletions)
result = {**left}
for k, v in right.items():
if v is None:
result.pop(k, None)
else:
result[k] = v
return result
def validate_path(path: str, *, allowed_prefixes: Sequence[str] | None = None) -> str:
"""Validate and normalize file path for security.
Args:
path: The path to validate.
allowed_prefixes: Optional list of allowed path prefixes.
Returns:
Normalized canonical path.
Raises:
ValueError: If path contains traversal sequences or violates prefix rules.
"""
# Reject paths with traversal attempts
if ".." in path or path.startswith("~"):
msg = f"Path traversal not allowed: {path}"
raise ValueError(msg)
# Normalize path (resolve ., //, etc.)
normalized = os.path.normpath(path)
# Convert to forward slashes for consistency
normalized = normalized.replace("\\", "/")
# Ensure path starts with /
if not normalized.startswith("/"):
normalized = f"/{normalized}"
# Check allowed prefixes if specified
if allowed_prefixes is not None and not any(
normalized.startswith(prefix) for prefix in allowed_prefixes
):
msg = f"Path must start with one of {allowed_prefixes}: {path}"
raise ValueError(msg)
return normalized
def format_content_with_line_numbers(
content: str | list[str],
*,
format_style: Literal["pipe", "tab"] = "pipe",
start_line: int = 1,
) -> str:
r"""Format file content with line numbers.
Args:
content: File content as string or list of lines.
format_style: "pipe" for "1|content" or "tab" for " 1\tcontent".
start_line: Starting line number.
Returns:
Formatted content with line numbers.
"""
if isinstance(content, str):
lines = content.split("\n")
# Remove trailing empty line from split
if lines and lines[-1] == "":
lines = lines[:-1]
else:
lines = content
if format_style == "pipe":
return "\n".join(f"{i + start_line}|{line}" for i, line in enumerate(lines))
return "\n".join(f"{i + start_line:6d}\t{line[:2000]}" for i, line in enumerate(lines))
def apply_string_replacement(
content: str,
old_string: str,
new_string: str,
*,
replace_all: bool = False,
) -> tuple[str, int]:
"""Apply string replacement to content.
Args:
content: Original content.
old_string: String to replace.
new_string: Replacement string.
replace_all: If True, replace all occurrences. Otherwise, replace first.
Returns:
Tuple of (new_content, replacement_count).
"""
if replace_all:
count = content.count(old_string)
new_content = content.replace(old_string, new_string)
else:
count = 1
new_content = content.replace(old_string, new_string, 1)
return new_content, count
def create_file_data(
content: str | list[str],
*,
created_at: str | None = None,
) -> FileData:
"""Create a FileData object from content.
Args:
content: File content as string or list of lines.
created_at: Optional creation timestamp. If None, uses current time.
Returns:
FileData object.
"""
lines = content.split("\n") if isinstance(content, str) else content
now = datetime.now(timezone.utc).isoformat()
return {
"content": lines,
"created_at": created_at or now,
"modified_at": now,
}
def update_file_data(
file_data: FileData,
content: str | list[str],
) -> FileData:
"""Update a FileData object with new content.
Args:
file_data: Existing FileData object.
content: New file content as string or list of lines.
Returns:
Updated FileData object with new modified_at timestamp.
"""
lines = content.split("\n") if isinstance(content, str) else content
now = datetime.now(timezone.utc).isoformat()
return {
"content": lines,
"created_at": file_data["created_at"],
"modified_at": now,
}
def file_data_to_string(file_data: FileData) -> str:
"""Convert FileData to plain string content.
Args:
file_data: FileData object.
Returns:
File content as string.
"""
return "\n".join(file_data["content"])
def list_directory(files: dict[str, FileData], path: str) -> list[str]:
"""List files in a directory.
Args:
files: Files dict mapping paths to FileData.
path: Normalized directory path.
Returns:
Sorted list of file paths in the directory.
"""
# Ensure path ends with / for directory matching
dir_path = path if path.endswith("/") else f"{path}/"
matching_files = []
for file_path in files:
if file_path.startswith(dir_path):
# Get relative path from directory
relative = file_path[len(dir_path) :]
# Only include direct children (no subdirectories)
if "/" not in relative:
matching_files.append(file_path)
return sorted(matching_files)
def check_empty_content(content: str) -> str | None:
"""Check if file content is empty and return warning message.
Args:
content: File content.
Returns:
Warning message if empty, None otherwise.
"""
if not content or content.strip() == "":
return "System reminder: File exists but has empty contents"
return None
def has_memories_prefix(file_path: str) -> bool:
"""Check if file path has the memories prefix.
Args:
file_path: File path.
Returns:
True if file path has the memories prefix, False otherwise.
"""
return file_path.startswith("/memories/")
def append_memories_prefix(file_path: str) -> str:
"""Append the memories prefix to a file path.
Args:
file_path: File path.
Returns:
File path with the memories prefix.
"""
return f"/memories{file_path}"
def strip_memories_prefix(file_path: str) -> str:
"""Strip the memories prefix from a file path.
Args:
file_path: File path.
Returns:
File path without the memories prefix.
"""
return file_path.replace("/memories", "")

View File

@@ -0,0 +1,137 @@
"""Deepagents come with planning, filesystem, and subagents."""
from collections.abc import Callable, Sequence
from typing import Any
from langchain_anthropic import ChatAnthropic
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from langgraph.cache.base import BaseCache
from langgraph.graph.state import CompiledStateGraph
from langgraph.store.base import BaseStore
from langgraph.types import Checkpointer
from langchain.agents.factory import create_agent
from langchain.agents.middleware.filesystem import FilesystemMiddleware
from langchain.agents.middleware.human_in_the_loop import HumanInTheLoopMiddleware, ToolConfig
from langchain.agents.middleware.planning import PlanningMiddleware
from langchain.agents.middleware.prompt_caching import AnthropicPromptCachingMiddleware
from langchain.agents.middleware.subagents import (
CompiledSubAgent,
SubAgent,
SubAgentMiddleware,
)
from langchain.agents.middleware.summarization import SummarizationMiddleware
from langchain.agents.middleware.types import AgentMiddleware
BASE_AGENT_PROMPT = "In order to complete the objective that the user asks of you, you have access to a number of standard tools." # noqa: E501
def get_default_model() -> ChatAnthropic:
"""Get the default model for deep agents.
Returns:
ChatAnthropic instance configured with Claude Sonnet 4.
"""
return ChatAnthropic(
model_name="claude-sonnet-4-20250514",
timeout=None,
stop=None,
max_tokens=64000,
)
def create_deep_agent(
model: str | BaseChatModel | None = None,
tools: Sequence[BaseTool | Callable | dict[str, Any]] | None = None,
*,
system_prompt: str | None = None,
middleware: Sequence[AgentMiddleware] = (),
subagents: list[SubAgent | CompiledSubAgent] | None = None,
context_schema: type[Any] | None = None,
checkpointer: Checkpointer | None = None,
store: BaseStore | None = None,
use_longterm_memory: bool = False,
tool_configs: dict[str, bool | ToolConfig] | None = None,
is_async: bool = False,
debug: bool = False,
name: str | None = None,
cache: BaseCache | None = None,
) -> CompiledStateGraph:
"""Create a deep agent.
This agent will by default have access to a tool to write todos (write_todos),
four file editing tools: write_file, ls, read_file, edit_file, and a tool to call
subagents.
Args:
tools: The tools the agent should have access to.
system_prompt: The additional instructions the agent should have. Will go in
the system prompt.
middleware: Additional middleware to apply after standard middleware.
model: The model to use.
subagents: The subagents to use. Each subagent should be a dictionary with the
following keys:
- `name`
- `description` (used by the main agent to decide whether to call the
sub agent)
- `prompt` (used as the system prompt in the subagent)
- (optional) `tools`
- (optional) `model` (either a LanguageModelLike instance or dict
settings)
- (optional) `middleware` (list of AgentMiddleware)
context_schema: The schema of the deep agent.
checkpointer: Optional checkpointer for persisting agent state between runs.
store: Optional store for persisting longterm memories.
use_longterm_memory: Whether to use longterm memory - you must provide a store
in order to use longterm memory.
tool_configs: Optional Dict[str, HumanInTheLoopConfig] mapping tool names to
interrupt configs.
is_async: Whether to use async mode. If True, the agent will use async tools.
debug: Whether to enable debug mode. Passed through to create_agent.
name: The name of the agent. Passed through to create_agent.
cache: The cache to use for the agent. Passed through to create_agent.
Returns:
A configured deep agent.
"""
if model is None:
model = get_default_model()
deepagent_middleware = [
PlanningMiddleware(),
FilesystemMiddleware(
use_longterm_memory=use_longterm_memory,
),
SubAgentMiddleware(
default_subagent_tools=tools,
default_subagent_model=model,
subagents=subagents if subagents is not None else [],
is_async=is_async,
),
SummarizationMiddleware(
model=model,
max_tokens_before_summary=120000,
messages_to_keep=20,
),
AnthropicPromptCachingMiddleware(ttl="5m", unsupported_model_behavior="ignore"),
]
if tool_configs is not None:
deepagent_middleware.append(HumanInTheLoopMiddleware(interrupt_on=tool_configs))
if middleware is not None:
deepagent_middleware.extend(middleware)
return create_agent(
model,
system_prompt=system_prompt + "\n\n" + BASE_AGENT_PROMPT
if system_prompt
else BASE_AGENT_PROMPT,
tools=tools,
middleware=deepagent_middleware,
context_schema=context_schema,
checkpointer=checkpointer,
store=store,
debug=debug,
name=name,
cache=cache,
)

View File

@@ -1,15 +1,26 @@
"""Middleware plugins for agents."""
from .anthropic_tools import (
AnthropicToolsState,
FileData,
FilesystemClaudeMemoryMiddleware,
FilesystemClaudeTextEditorMiddleware,
StateClaudeMemoryMiddleware,
StateClaudeTextEditorMiddleware,
)
from .context_editing import (
ClearToolUsesEdit,
ContextEditingMiddleware,
)
from .file_search import FilesystemFileSearchMiddleware, StateFileSearchMiddleware
from .filesystem import FilesystemMiddleware
from .human_in_the_loop import HumanInTheLoopMiddleware
from .model_call_limit import ModelCallLimitMiddleware
from .model_fallback import ModelFallbackMiddleware
from .pii import PIIDetectionError, PIIMiddleware
from .planning import PlanningMiddleware
from .prompt_caching import AnthropicPromptCachingMiddleware
from .subagents import SubAgentMiddleware
from .summarization import SummarizationMiddleware
from .tool_call_limit import ToolCallLimitMiddleware
from .tool_selection import LLMToolSelectorMiddleware
@@ -31,8 +42,14 @@ __all__ = [
"AgentState",
# should move to langchain-anthropic if we decide to keep it
"AnthropicPromptCachingMiddleware",
"AnthropicToolsState",
"ClearToolUsesEdit",
"ContextEditingMiddleware",
"FileData",
"FilesystemClaudeMemoryMiddleware",
"FilesystemClaudeTextEditorMiddleware",
"FilesystemFileSearchMiddleware",
"FilesystemMiddleware",
"HumanInTheLoopMiddleware",
"LLMToolSelectorMiddleware",
"ModelCallLimitMiddleware",
@@ -41,6 +58,10 @@ __all__ = [
"PIIDetectionError",
"PIIMiddleware",
"PlanningMiddleware",
"StateClaudeMemoryMiddleware",
"StateClaudeTextEditorMiddleware",
"StateFileSearchMiddleware",
"SubAgentMiddleware",
"SummarizationMiddleware",
"ToolCallLimitMiddleware",
"after_agent",

View File

@@ -0,0 +1,910 @@
"""Anthropic text editor and memory tool middleware.
This module provides client-side implementations of Anthropic's text editor and
memory tools using schema-less tool definitions and tool call interception.
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Any, cast
from langchain_core.messages import AIMessage, ToolMessage
from langgraph.types import Command
from typing_extensions import NotRequired
from langchain.agents._internal.file_utils import (
FileData,
apply_string_replacement,
create_file_data,
file_data_reducer,
file_data_to_string,
format_content_with_line_numbers,
list_directory,
update_file_data,
validate_path,
)
from langchain.agents.middleware.types import AgentMiddleware, AgentState, ModelRequest
if TYPE_CHECKING:
from collections.abc import Callable, Sequence
from langchain.tools.tool_node import ToolCallRequest
# Tool type constants
TEXT_EDITOR_TOOL_TYPE = "text_editor_20250728"
TEXT_EDITOR_TOOL_NAME = "str_replace_based_edit_tool"
MEMORY_TOOL_TYPE = "memory_20250818"
MEMORY_TOOL_NAME = "memory"
MEMORY_SYSTEM_PROMPT = """IMPORTANT: ALWAYS VIEW YOUR MEMORY DIRECTORY BEFORE \
DOING ANYTHING ELSE.
MEMORY PROTOCOL:
1. Use the `view` command of your `memory` tool to check for earlier progress.
2. ... (work on the task) ...
- As you make progress, record status / progress / thoughts etc in your memory.
ASSUME INTERRUPTION: Your context window might be reset at any moment, so you risk \
losing any progress that is not recorded in your memory directory."""
class AnthropicToolsState(AgentState):
"""State schema for Anthropic text editor and memory tools."""
text_editor_files: NotRequired[Annotated[dict[str, FileData], file_data_reducer]]
"""Virtual file system for text editor tools."""
memory_files: NotRequired[Annotated[dict[str, FileData], file_data_reducer]]
"""Virtual file system for memory tools."""
class _StateClaudeFileToolMiddleware(AgentMiddleware):
"""Base class for state-based file tool middleware (internal)."""
state_schema = AnthropicToolsState
def __init__(
self,
*,
tool_type: str,
tool_name: str,
state_key: str,
allowed_path_prefixes: Sequence[str] | None = None,
system_prompt: str | None = None,
) -> None:
"""Initialize the middleware.
Args:
tool_type: Tool type identifier.
tool_name: Tool name.
state_key: State key for file storage.
allowed_path_prefixes: Optional list of allowed path prefixes.
system_prompt: Optional system prompt to inject.
"""
self.tool_type = tool_type
self.tool_name = tool_name
self.state_key = state_key
self.allowed_prefixes = allowed_path_prefixes
self.system_prompt = system_prompt
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], AIMessage],
) -> AIMessage:
"""Inject tool and optional system prompt."""
# Add tool
tools = list(request.tools or [])
tools.append(
{
"type": self.tool_type,
"name": self.tool_name,
}
)
request.tools = tools
# Inject system prompt if provided
if self.system_prompt:
request.system_prompt = (
request.system_prompt + "\n\n" + self.system_prompt
if request.system_prompt
else self.system_prompt
)
return handler(request)
def wrap_tool_call(
self, request: ToolCallRequest, handler: Callable[[ToolCallRequest], ToolMessage | Command]
) -> ToolMessage | Command:
"""Intercept tool calls."""
tool_call = request.tool_call
tool_name = tool_call.get("name")
if tool_name != self.tool_name:
return handler(request)
# Handle tool call
try:
args = tool_call.get("args", {})
command = args.get("command")
state = request.state
if command == "view":
return self._handle_view(args, state, tool_call["id"])
if command == "create":
return self._handle_create(args, state, tool_call["id"])
if command == "str_replace":
return self._handle_str_replace(args, state, tool_call["id"])
if command == "insert":
return self._handle_insert(args, state, tool_call["id"])
if command == "delete":
return self._handle_delete(args, state, tool_call["id"])
if command == "rename":
return self._handle_rename(args, state, tool_call["id"])
msg = f"Unknown command: {command}"
return ToolMessage(
content=msg,
tool_call_id=tool_call["id"],
name=tool_name,
status="error",
)
except (ValueError, FileNotFoundError) as e:
return ToolMessage(
content=str(e),
tool_call_id=tool_call["id"],
name=tool_name,
status="error",
)
def _handle_view(
self, args: dict, state: AnthropicToolsState, tool_call_id: str | None
) -> Command:
"""Handle view command."""
path = args["path"]
normalized_path = validate_path(path, allowed_prefixes=self.allowed_prefixes)
files = cast("dict[str, Any]", state.get(self.state_key, {}))
file_data = files.get(normalized_path)
if file_data is None:
# Try directory listing
matching = list_directory(files, normalized_path)
if matching:
content = "\n".join(matching)
return Command(
update={
"messages": [
ToolMessage(
content=content,
tool_call_id=tool_call_id,
name=self.tool_name,
)
]
}
)
msg = f"File not found: {path}"
raise FileNotFoundError(msg)
# Format file content with line numbers
content = format_content_with_line_numbers(file_data["content"], format_style="pipe")
return Command(
update={
"messages": [
ToolMessage(
content=content,
tool_call_id=tool_call_id,
name=self.tool_name,
)
]
}
)
def _handle_create(
self, args: dict, state: AnthropicToolsState, tool_call_id: str | None
) -> Command:
"""Handle create command."""
path = args["path"]
file_text = args["file_text"]
normalized_path = validate_path(path, allowed_prefixes=self.allowed_prefixes)
# Get existing files
files = cast("dict[str, Any]", state.get(self.state_key, {}))
existing = files.get(normalized_path)
# Create or update file data
if existing:
new_file_data = update_file_data(existing, file_text)
else:
new_file_data = create_file_data(file_text)
return Command(
update={
self.state_key: {
normalized_path: new_file_data,
},
"messages": [
ToolMessage(
content=f"File created: {path}",
tool_call_id=tool_call_id,
name=self.tool_name,
)
],
}
)
def _handle_str_replace(
self, args: dict, state: AnthropicToolsState, tool_call_id: str | None
) -> Command:
"""Handle str_replace command."""
path = args["path"]
old_str = args["old_str"]
new_str = args.get("new_str", "")
normalized_path = validate_path(path, allowed_prefixes=self.allowed_prefixes)
# Read file
files = cast("dict[str, Any]", state.get(self.state_key, {}))
file_data = files.get(normalized_path)
if file_data is None:
msg = f"File not found: {path}"
raise FileNotFoundError(msg)
content = file_data_to_string(file_data)
# Replace string
if old_str not in content:
msg = f"String not found in file: {old_str}"
raise ValueError(msg)
new_content, _ = apply_string_replacement(content, old_str, new_str, replace_all=False)
# Update file
new_file_data = update_file_data(file_data, new_content)
return Command(
update={
self.state_key: {
normalized_path: new_file_data,
},
"messages": [
ToolMessage(
content=f"String replaced in {path}",
tool_call_id=tool_call_id,
name=self.tool_name,
)
],
}
)
def _handle_insert(
self, args: dict, state: AnthropicToolsState, tool_call_id: str | None
) -> Command:
"""Handle insert command."""
path = args["path"]
insert_line = args["insert_line"]
text_to_insert = args["new_str"]
normalized_path = validate_path(path, allowed_prefixes=self.allowed_prefixes)
# Read file
files = cast("dict[str, Any]", state.get(self.state_key, {}))
file_data = files.get(normalized_path)
if file_data is None:
msg = f"File not found: {path}"
raise FileNotFoundError(msg)
lines_content = file_data["content"]
new_lines = text_to_insert.split("\n")
# Insert after insert_line (0-indexed)
updated_lines = lines_content[:insert_line] + new_lines + lines_content[insert_line:]
# Update file
new_file_data = update_file_data(file_data, updated_lines)
return Command(
update={
self.state_key: {
normalized_path: new_file_data,
},
"messages": [
ToolMessage(
content=f"Text inserted in {path}",
tool_call_id=tool_call_id,
name=self.tool_name,
)
],
}
)
def _handle_delete(
self,
args: dict,
state: AnthropicToolsState, # noqa: ARG002
tool_call_id: str | None,
) -> Command:
"""Handle delete command."""
path = args["path"]
normalized_path = validate_path(path, allowed_prefixes=self.allowed_prefixes)
return Command(
update={
self.state_key: {normalized_path: None},
"messages": [
ToolMessage(
content=f"File deleted: {path}",
tool_call_id=tool_call_id,
name=self.tool_name,
)
],
}
)
def _handle_rename(
self, args: dict, state: AnthropicToolsState, tool_call_id: str | None
) -> Command:
"""Handle rename command."""
old_path = args["old_path"]
new_path = args["new_path"]
normalized_old = validate_path(old_path, allowed_prefixes=self.allowed_prefixes)
normalized_new = validate_path(new_path, allowed_prefixes=self.allowed_prefixes)
# Read file
files = cast("dict[str, Any]", state.get(self.state_key, {}))
file_data = files.get(normalized_old)
if file_data is None:
msg = f"File not found: {old_path}"
raise ValueError(msg)
# Update timestamp
content = file_data["content"]
new_file_data = update_file_data(file_data, content)
return Command(
update={
self.state_key: {
normalized_old: None,
normalized_new: new_file_data,
},
"messages": [
ToolMessage(
content=f"File renamed: {old_path} -> {new_path}",
tool_call_id=tool_call_id,
name=self.tool_name,
)
],
}
)
class StateClaudeTextEditorMiddleware(_StateClaudeFileToolMiddleware):
"""State-based text editor tool middleware.
Provides Anthropic's text_editor tool using LangGraph state for storage.
Files persist for the conversation thread.
Example:
```python
from langchain.agents import create_agent
from langchain.agents.middleware import StateTextEditorToolMiddleware
agent = create_agent(
model=model,
tools=[],
middleware=[StateTextEditorToolMiddleware()],
)
```
"""
def __init__(
self,
*,
allowed_path_prefixes: Sequence[str] | None = None,
) -> None:
"""Initialize the text editor middleware.
Args:
allowed_path_prefixes: Optional list of allowed path prefixes.
If specified, only paths starting with these prefixes are allowed.
"""
super().__init__(
tool_type=TEXT_EDITOR_TOOL_TYPE,
tool_name=TEXT_EDITOR_TOOL_NAME,
state_key="text_editor_files",
allowed_path_prefixes=allowed_path_prefixes,
)
class StateClaudeMemoryMiddleware(_StateClaudeFileToolMiddleware):
"""State-based memory tool middleware.
Provides Anthropic's memory tool using LangGraph state for storage.
Files persist for the conversation thread. Enforces /memories prefix
and injects Anthropic's recommended system prompt.
Example:
```python
from langchain.agents import create_agent
from langchain.agents.middleware import StateMemoryToolMiddleware
agent = create_agent(
model=model,
tools=[],
middleware=[StateMemoryToolMiddleware()],
)
```
"""
def __init__(
self,
*,
allowed_path_prefixes: Sequence[str] | None = None,
system_prompt: str = MEMORY_SYSTEM_PROMPT,
) -> None:
"""Initialize the memory middleware.
Args:
allowed_path_prefixes: Optional list of allowed path prefixes.
Defaults to ["/memories"].
system_prompt: System prompt to inject. Defaults to Anthropic's
recommended memory prompt.
"""
super().__init__(
tool_type=MEMORY_TOOL_TYPE,
tool_name=MEMORY_TOOL_NAME,
state_key="memory_files",
allowed_path_prefixes=allowed_path_prefixes or ["/memories"],
system_prompt=system_prompt,
)
class _FilesystemClaudeFileToolMiddleware(AgentMiddleware):
"""Base class for filesystem-based file tool middleware (internal)."""
def __init__(
self,
*,
tool_type: str,
tool_name: str,
root_path: str,
allowed_prefixes: list[str] | None = None,
max_file_size_mb: int = 10,
system_prompt: str | None = None,
) -> None:
"""Initialize the middleware.
Args:
tool_type: Tool type identifier.
tool_name: Tool name.
root_path: Root directory for file operations.
allowed_prefixes: Optional list of allowed virtual path prefixes.
max_file_size_mb: Maximum file size in MB.
system_prompt: Optional system prompt to inject.
"""
self.tool_type = tool_type
self.tool_name = tool_name
self.root_path = Path(root_path).resolve()
self.allowed_prefixes = allowed_prefixes or ["/"]
self.max_file_size_bytes = max_file_size_mb * 1024 * 1024
self.system_prompt = system_prompt
# Create root directory if it doesn't exist
self.root_path.mkdir(parents=True, exist_ok=True)
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], AIMessage],
) -> AIMessage:
"""Inject tool and optional system prompt."""
# Add tool
tools = list(request.tools or [])
tools.append(
{
"type": self.tool_type,
"name": self.tool_name,
}
)
request.tools = tools
# Inject system prompt if provided
if self.system_prompt:
request.system_prompt = (
request.system_prompt + "\n\n" + self.system_prompt
if request.system_prompt
else self.system_prompt
)
return handler(request)
def wrap_tool_call(
self, request: ToolCallRequest, handler: Callable[[ToolCallRequest], ToolMessage | Command]
) -> ToolMessage | Command:
"""Intercept tool calls."""
tool_call = request.tool_call
tool_name = tool_call.get("name")
if tool_name != self.tool_name:
return handler(request)
# Handle tool call
try:
args = tool_call.get("args", {})
command = args.get("command")
if command == "view":
return self._handle_view(args, tool_call["id"])
if command == "create":
return self._handle_create(args, tool_call["id"])
if command == "str_replace":
return self._handle_str_replace(args, tool_call["id"])
if command == "insert":
return self._handle_insert(args, tool_call["id"])
if command == "delete":
return self._handle_delete(args, tool_call["id"])
if command == "rename":
return self._handle_rename(args, tool_call["id"])
msg = f"Unknown command: {command}"
return ToolMessage(
content=msg,
tool_call_id=tool_call["id"],
name=tool_name,
status="error",
)
except (ValueError, FileNotFoundError) as e:
return ToolMessage(
content=str(e),
tool_call_id=tool_call["id"],
name=tool_name,
status="error",
)
def _validate_and_resolve_path(self, path: str) -> Path:
"""Validate and resolve a virtual path to filesystem path.
Args:
path: Virtual path (e.g., /file.txt or /src/main.py).
Returns:
Resolved absolute filesystem path within root_path.
Raises:
ValueError: If path contains traversal attempts, escapes root directory,
or violates allowed_prefixes restrictions.
"""
# Normalize path
if not path.startswith("/"):
path = "/" + path
# Check for path traversal
if ".." in path or "~" in path:
msg = "Path traversal not allowed"
raise ValueError(msg)
# Convert virtual path to filesystem path
# Remove leading / and resolve relative to root
relative = path.lstrip("/")
full_path = (self.root_path / relative).resolve()
# Ensure path is within root
try:
full_path.relative_to(self.root_path)
except ValueError:
msg = f"Path outside root directory: {path}"
raise ValueError(msg) from None
# Check allowed prefixes
virtual_path = "/" + str(full_path.relative_to(self.root_path))
if self.allowed_prefixes:
allowed = any(
virtual_path.startswith(prefix) or virtual_path == prefix.rstrip("/")
for prefix in self.allowed_prefixes
)
if not allowed:
msg = f"Path must start with one of: {self.allowed_prefixes}"
raise ValueError(msg)
return full_path
def _handle_view(self, args: dict, tool_call_id: str | None) -> Command:
"""Handle view command."""
path = args["path"]
full_path = self._validate_and_resolve_path(path)
if not full_path.exists() or not full_path.is_file():
msg = f"File not found: {path}"
raise FileNotFoundError(msg)
# Check file size
if full_path.stat().st_size > self.max_file_size_bytes:
msg = f"File too large: {path} exceeds {self.max_file_size_bytes / 1024 / 1024}MB"
raise ValueError(msg)
# Read file
try:
content = full_path.read_text()
except UnicodeDecodeError as e:
msg = f"Cannot decode file {path}: {e}"
raise ValueError(msg) from e
# Format with line numbers
formatted_content = format_content_with_line_numbers(content, format_style="pipe")
return Command(
update={
"messages": [
ToolMessage(
content=formatted_content,
tool_call_id=tool_call_id,
name=self.tool_name,
)
]
}
)
def _handle_create(self, args: dict, tool_call_id: str | None) -> Command:
"""Handle create command."""
path = args["path"]
file_text = args["file_text"]
full_path = self._validate_and_resolve_path(path)
# Create parent directories
full_path.parent.mkdir(parents=True, exist_ok=True)
# Write file
full_path.write_text(file_text + "\n")
return Command(
update={
"messages": [
ToolMessage(
content=f"File created: {path}",
tool_call_id=tool_call_id,
name=self.tool_name,
)
]
}
)
def _handle_str_replace(self, args: dict, tool_call_id: str | None) -> Command:
"""Handle str_replace command."""
path = args["path"]
old_str = args["old_str"]
new_str = args.get("new_str", "")
full_path = self._validate_and_resolve_path(path)
if not full_path.exists():
msg = f"File not found: {path}"
raise FileNotFoundError(msg)
# Read file
content = full_path.read_text()
# Replace string
if old_str not in content:
msg = f"String not found in file: {old_str}"
raise ValueError(msg)
new_content, _ = apply_string_replacement(content, old_str, new_str, replace_all=False)
# Write back
full_path.write_text(new_content)
return Command(
update={
"messages": [
ToolMessage(
content=f"String replaced in {path}",
tool_call_id=tool_call_id,
name=self.tool_name,
)
]
}
)
def _handle_insert(self, args: dict, tool_call_id: str | None) -> Command:
"""Handle insert command."""
path = args["path"]
insert_line = args["insert_line"]
text_to_insert = args["new_str"]
full_path = self._validate_and_resolve_path(path)
if not full_path.exists():
msg = f"File not found: {path}"
raise FileNotFoundError(msg)
# Read file
content = full_path.read_text()
lines = content.split("\n")
# Handle trailing newline
if lines and lines[-1] == "":
lines = lines[:-1]
had_trailing_newline = True
else:
had_trailing_newline = False
new_lines = text_to_insert.split("\n")
# Insert after insert_line (0-indexed)
updated_lines = lines[:insert_line] + new_lines + lines[insert_line:]
# Write back
new_content = "\n".join(updated_lines)
if had_trailing_newline:
new_content += "\n"
full_path.write_text(new_content)
return Command(
update={
"messages": [
ToolMessage(
content=f"Text inserted in {path}",
tool_call_id=tool_call_id,
name=self.tool_name,
)
]
}
)
def _handle_delete(self, args: dict, tool_call_id: str | None) -> Command:
"""Handle delete command."""
import shutil
path = args["path"]
full_path = self._validate_and_resolve_path(path)
if full_path.is_file():
full_path.unlink()
elif full_path.is_dir():
shutil.rmtree(full_path)
# If doesn't exist, silently succeed
return Command(
update={
"messages": [
ToolMessage(
content=f"File deleted: {path}",
tool_call_id=tool_call_id,
name=self.tool_name,
)
]
}
)
def _handle_rename(self, args: dict, tool_call_id: str | None) -> Command:
"""Handle rename command."""
old_path = args["old_path"]
new_path = args["new_path"]
old_full = self._validate_and_resolve_path(old_path)
new_full = self._validate_and_resolve_path(new_path)
if not old_full.exists():
msg = f"File not found: {old_path}"
raise ValueError(msg)
# Create parent directory for new path
new_full.parent.mkdir(parents=True, exist_ok=True)
# Rename
old_full.rename(new_full)
return Command(
update={
"messages": [
ToolMessage(
content=f"File renamed: {old_path} -> {new_path}",
tool_call_id=tool_call_id,
name=self.tool_name,
)
]
}
)
class FilesystemClaudeTextEditorMiddleware(_FilesystemClaudeFileToolMiddleware):
"""Filesystem-based text editor tool middleware.
Provides Anthropic's text_editor tool using local filesystem for storage.
User handles persistence via volumes, git, or other mechanisms.
Example:
```python
from langchain.agents import create_agent
from langchain.agents.middleware import FilesystemTextEditorToolMiddleware
agent = create_agent(
model=model,
tools=[],
middleware=[FilesystemTextEditorToolMiddleware(root_path="/workspace")],
)
```
"""
def __init__(
self,
*,
root_path: str,
allowed_prefixes: list[str] | None = None,
max_file_size_mb: int = 10,
) -> None:
"""Initialize the text editor middleware.
Args:
root_path: Root directory for file operations.
allowed_prefixes: Optional list of allowed virtual path prefixes (default: ["/"]).
max_file_size_mb: Maximum file size in MB (default: 10).
"""
super().__init__(
tool_type=TEXT_EDITOR_TOOL_TYPE,
tool_name=TEXT_EDITOR_TOOL_NAME,
root_path=root_path,
allowed_prefixes=allowed_prefixes,
max_file_size_mb=max_file_size_mb,
)
class FilesystemClaudeMemoryMiddleware(_FilesystemClaudeFileToolMiddleware):
"""Filesystem-based memory tool middleware.
Provides Anthropic's memory tool using local filesystem for storage.
User handles persistence via volumes, git, or other mechanisms.
Enforces /memories prefix and injects Anthropic's recommended system prompt.
Example:
```python
from langchain.agents import create_agent
from langchain.agents.middleware import FilesystemMemoryToolMiddleware
agent = create_agent(
model=model,
tools=[],
middleware=[FilesystemMemoryToolMiddleware(root_path="/workspace")],
)
```
"""
def __init__(
self,
*,
root_path: str,
allowed_prefixes: list[str] | None = None,
max_file_size_mb: int = 10,
system_prompt: str = MEMORY_SYSTEM_PROMPT,
) -> None:
"""Initialize the memory middleware.
Args:
root_path: Root directory for file operations.
allowed_prefixes: Optional list of allowed virtual path prefixes.
Defaults to ["/memories"].
max_file_size_mb: Maximum file size in MB (default: 10).
system_prompt: System prompt to inject. Defaults to Anthropic's
recommended memory prompt.
"""
super().__init__(
tool_type=MEMORY_TOOL_TYPE,
tool_name=MEMORY_TOOL_NAME,
root_path=root_path,
allowed_prefixes=allowed_prefixes or ["/memories"],
max_file_size_mb=max_file_size_mb,
system_prompt=system_prompt,
)
__all__ = [
"AnthropicToolsState",
"FileData",
"FilesystemClaudeMemoryMiddleware",
"FilesystemClaudeTextEditorMiddleware",
"StateClaudeMemoryMiddleware",
"StateClaudeTextEditorMiddleware",
]

View File

@@ -0,0 +1,550 @@
"""File search middleware for Anthropic text editor and memory tools.
This module provides Glob and Grep search tools that operate on files stored
in state or filesystem.
"""
from __future__ import annotations
import fnmatch
import json
import re
import subprocess
from contextlib import suppress
from datetime import datetime, timezone
from pathlib import Path, PurePosixPath
from typing import Annotated, Any, Literal, cast
from langchain_core.tools import InjectedToolArg, tool
from langchain.agents.middleware.anthropic_tools import AnthropicToolsState
from langchain.agents.middleware.types import AgentMiddleware
class StateFileSearchMiddleware(AgentMiddleware):
"""Provides Glob and Grep search over state-based files.
This middleware adds two tools that search through virtual files in state:
- Glob: Fast file pattern matching by file path
- Grep: Fast content search using regular expressions
Example:
```python
from langchain.agents import create_agent
from langchain.agents.middleware import (
StateTextEditorToolMiddleware,
StateFileSearchMiddleware,
)
agent = create_agent(
model=model,
tools=[],
middleware=[
StateTextEditorToolMiddleware(),
StateFileSearchMiddleware(),
],
)
```
"""
state_schema = AnthropicToolsState
def __init__(
self,
*,
state_key: str = "text_editor_files",
) -> None:
"""Initialize the search middleware.
Args:
state_key: State key to search (default: "text_editor_files").
Use "memory_files" to search memory tool files.
"""
self.state_key = state_key
# Create tool instances
@tool
def glob_search( # noqa: D417
pattern: str,
path: str = "/",
state: Annotated[AnthropicToolsState, InjectedToolArg] = None, # type: ignore[assignment]
) -> str:
"""Fast file pattern matching tool that works with any codebase size.
Supports glob patterns like **/*.js or src/**/*.ts.
Returns matching file paths sorted by modification time.
Use this tool when you need to find files by name patterns.
Args:
pattern: The glob pattern to match files against.
path: The directory to search in. If not specified, searches from root.
Returns:
Newline-separated list of matching file paths, sorted by modification
time (most recently modified first). Returns "No files found" if no
matches.
"""
# Normalize base path
base_path = path if path.startswith("/") else "/" + path
# Get files from state
files = cast("dict[str, Any]", state.get(self.state_key, {}))
# Match files
matches = []
for file_path, file_data in files.items():
if file_path.startswith(base_path):
# Get relative path from base
if base_path == "/":
relative = file_path[1:] # Remove leading /
elif file_path == base_path:
relative = Path(file_path).name
elif file_path.startswith(base_path + "/"):
relative = file_path[len(base_path) + 1 :]
else:
continue
# Match against pattern
# Handle ** pattern which requires special care
# PurePosixPath.match doesn't match single-level paths against **/pattern
is_match = PurePosixPath(relative).match(pattern)
if not is_match and pattern.startswith("**/"):
# Also try matching without the **/ prefix for files in base dir
is_match = PurePosixPath(relative).match(pattern[3:])
if is_match:
matches.append((file_path, file_data["modified_at"]))
if not matches:
return "No files found"
# Sort by modification time
matches.sort(key=lambda x: x[1], reverse=True)
file_paths = [path for path, _ in matches]
return "\n".join(file_paths)
@tool
def grep_search( # noqa: D417
pattern: str,
path: str = "/",
include: str | None = None,
output_mode: Literal["files_with_matches", "content", "count"] = "files_with_matches",
state: Annotated[AnthropicToolsState, InjectedToolArg] = None, # type: ignore[assignment]
) -> str:
"""Fast content search tool that works with any codebase size.
Searches file contents using regular expressions. Supports full regex
syntax and filters files by pattern with the include parameter.
Args:
pattern: The regular expression pattern to search for in file contents.
path: The directory to search in. If not specified, searches from root.
include: File pattern to filter (e.g., "*.js", "*.{ts,tsx}").
output_mode: Output format:
- "files_with_matches": Only file paths containing matches (default)
- "content": Matching lines with file:line:content format
- "count": Count of matches per file
Returns:
Search results formatted according to output_mode. Returns "No matches
found" if no results.
"""
# Normalize base path
base_path = path if path.startswith("/") else "/" + path
# Compile regex pattern (for validation)
try:
regex = re.compile(pattern)
except re.error as e:
return f"Invalid regex pattern: {e}"
# Search files
files = cast("dict[str, Any]", state.get(self.state_key, {}))
results: dict[str, list[tuple[int, str]]] = {}
for file_path, file_data in files.items():
if not file_path.startswith(base_path):
continue
# Check include filter
if include:
basename = Path(file_path).name
if not self._match_include(basename, include):
continue
# Search file content
for line_num, line in enumerate(file_data["content"], 1):
if regex.search(line):
if file_path not in results:
results[file_path] = []
results[file_path].append((line_num, line))
if not results:
return "No matches found"
# Format output based on mode
return self._format_grep_results(results, output_mode)
self.glob_search = glob_search
self.grep_search = grep_search
self.tools = [glob_search, grep_search]
def _match_include(self, basename: str, pattern: str) -> bool:
"""Match filename against include pattern."""
# Handle brace expansion {a,b,c}
if "{" in pattern and "}" in pattern:
start = pattern.index("{")
end = pattern.index("}")
prefix = pattern[:start]
suffix = pattern[end + 1 :]
alternatives = pattern[start + 1 : end].split(",")
for alt in alternatives:
expanded = prefix + alt + suffix
if fnmatch.fnmatch(basename, expanded):
return True
return False
return fnmatch.fnmatch(basename, pattern)
def _format_grep_results(
self,
results: dict[str, list[tuple[int, str]]],
output_mode: str,
) -> str:
"""Format grep results based on output mode."""
if output_mode == "files_with_matches":
# Just return file paths
return "\n".join(sorted(results.keys()))
if output_mode == "content":
# Return file:line:content format
lines = []
for file_path in sorted(results.keys()):
for line_num, line in results[file_path]:
lines.append(f"{file_path}:{line_num}:{line}")
return "\n".join(lines)
if output_mode == "count":
# Return file:count format
lines = []
for file_path in sorted(results.keys()):
count = len(results[file_path])
lines.append(f"{file_path}:{count}")
return "\n".join(lines)
# Default to files_with_matches
return "\n".join(sorted(results.keys()))
class FilesystemFileSearchMiddleware(AgentMiddleware):
"""Provides Glob and Grep search over filesystem files.
This middleware adds two tools that search through local filesystem:
- Glob: Fast file pattern matching by file path
- Grep: Fast content search using ripgrep or Python fallback
Example:
```python
from langchain.agents import create_agent
from langchain.agents.middleware import (
FilesystemTextEditorToolMiddleware,
FilesystemFileSearchMiddleware,
)
agent = create_agent(
model=model,
tools=[],
middleware=[
FilesystemTextEditorToolMiddleware(root_path="/workspace"),
FilesystemFileSearchMiddleware(root_path="/workspace"),
],
)
```
"""
def __init__(
self,
*,
root_path: str,
use_ripgrep: bool = True,
max_file_size_mb: int = 10,
) -> None:
"""Initialize the search middleware.
Args:
root_path: Root directory to search.
use_ripgrep: Whether to use ripgrep for search (default: True).
Falls back to Python if ripgrep unavailable.
max_file_size_mb: Maximum file size to search in MB (default: 10).
"""
self.root_path = Path(root_path).resolve()
self.use_ripgrep = use_ripgrep
self.max_file_size_bytes = max_file_size_mb * 1024 * 1024
# Create tool instances as closures that capture self
@tool
def glob_search(pattern: str, path: str = "/") -> str:
"""Fast file pattern matching tool that works with any codebase size.
Supports glob patterns like **/*.js or src/**/*.ts.
Returns matching file paths sorted by modification time.
Use this tool when you need to find files by name patterns.
Args:
pattern: The glob pattern to match files against.
path: The directory to search in. If not specified, searches from root.
Returns:
Newline-separated list of matching file paths, sorted by modification
time (most recently modified first). Returns "No files found" if no
matches.
"""
try:
base_full = self._validate_and_resolve_path(path)
except ValueError:
return "No files found"
if not base_full.exists() or not base_full.is_dir():
return "No files found"
# Use pathlib glob
matching: list[tuple[str, str]] = []
for match in base_full.glob(pattern):
if match.is_file():
# Convert to virtual path
virtual_path = "/" + str(match.relative_to(self.root_path))
stat = match.stat()
modified_at = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
matching.append((virtual_path, modified_at))
if not matching:
return "No files found"
file_paths = [p for p, _ in matching]
return "\n".join(file_paths)
@tool
def grep_search(
pattern: str,
path: str = "/",
include: str | None = None,
output_mode: Literal["files_with_matches", "content", "count"] = "files_with_matches",
) -> str:
"""Fast content search tool that works with any codebase size.
Searches file contents using regular expressions. Supports full regex
syntax and filters files by pattern with the include parameter.
Args:
pattern: The regular expression pattern to search for in file contents.
path: The directory to search in. If not specified, searches from root.
include: File pattern to filter (e.g., "*.js", "*.{ts,tsx}").
output_mode: Output format:
- "files_with_matches": Only file paths containing matches (default)
- "content": Matching lines with file:line:content format
- "count": Count of matches per file
Returns:
Search results formatted according to output_mode. Returns "No matches
found" if no results.
"""
# Compile regex pattern (for validation)
try:
re.compile(pattern)
except re.error as e:
return f"Invalid regex pattern: {e}"
# Try ripgrep first if enabled
results = None
if self.use_ripgrep:
with suppress(
FileNotFoundError,
subprocess.CalledProcessError,
subprocess.TimeoutExpired,
):
results = self._ripgrep_search(pattern, path, include)
# Python fallback if ripgrep failed or is disabled
if results is None:
results = self._python_search(pattern, path, include)
if not results:
return "No matches found"
# Format output based on mode
return self._format_grep_results(results, output_mode)
self.glob_search = glob_search
self.grep_search = grep_search
self.tools = [glob_search, grep_search]
def _validate_and_resolve_path(self, path: str) -> Path:
"""Validate and resolve a virtual path to filesystem path."""
# Normalize path
if not path.startswith("/"):
path = "/" + path
# Check for path traversal
if ".." in path or "~" in path:
msg = "Path traversal not allowed"
raise ValueError(msg)
# Convert virtual path to filesystem path
relative = path.lstrip("/")
full_path = (self.root_path / relative).resolve()
# Ensure path is within root
try:
full_path.relative_to(self.root_path)
except ValueError:
msg = f"Path outside root directory: {path}"
raise ValueError(msg) from None
return full_path
def _ripgrep_search(
self, pattern: str, base_path: str, include: str | None
) -> dict[str, list[tuple[int, str]]]:
"""Search using ripgrep subprocess."""
try:
base_full = self._validate_and_resolve_path(base_path)
except ValueError:
return {}
if not base_full.exists():
return {}
# Build ripgrep command
cmd = ["rg", "--json", pattern, str(base_full)]
if include:
# Convert glob pattern to ripgrep glob
cmd.extend(["--glob", include])
try:
result = subprocess.run( # noqa: S603
cmd,
capture_output=True,
text=True,
timeout=30,
check=False,
)
except (subprocess.TimeoutExpired, FileNotFoundError):
# Fallback to Python search if ripgrep unavailable or times out
return self._python_search(pattern, base_path, include)
# Parse ripgrep JSON output
results: dict[str, list[tuple[int, str]]] = {}
for line in result.stdout.splitlines():
try:
data = json.loads(line)
if data["type"] == "match":
path = data["data"]["path"]["text"]
# Convert to virtual path
virtual_path = "/" + str(Path(path).relative_to(self.root_path))
line_num = data["data"]["line_number"]
line_text = data["data"]["lines"]["text"].rstrip("\n")
if virtual_path not in results:
results[virtual_path] = []
results[virtual_path].append((line_num, line_text))
except (json.JSONDecodeError, KeyError):
continue
return results
def _python_search(
self, pattern: str, base_path: str, include: str | None
) -> dict[str, list[tuple[int, str]]]:
"""Search using Python regex (fallback)."""
try:
base_full = self._validate_and_resolve_path(base_path)
except ValueError:
return {}
if not base_full.exists():
return {}
regex = re.compile(pattern)
results: dict[str, list[tuple[int, str]]] = {}
# Walk directory tree
for file_path in base_full.rglob("*"):
if not file_path.is_file():
continue
# Check include filter
if include and not self._match_include(file_path.name, include):
continue
# Skip files that are too large
if file_path.stat().st_size > self.max_file_size_bytes:
continue
try:
content = file_path.read_text()
except (UnicodeDecodeError, PermissionError):
continue
# Search content
for line_num, line in enumerate(content.splitlines(), 1):
if regex.search(line):
virtual_path = "/" + str(file_path.relative_to(self.root_path))
if virtual_path not in results:
results[virtual_path] = []
results[virtual_path].append((line_num, line))
return results
def _match_include(self, basename: str, pattern: str) -> bool:
"""Match filename against include pattern."""
# Handle brace expansion {a,b,c}
if "{" in pattern and "}" in pattern:
start = pattern.index("{")
end = pattern.index("}")
prefix = pattern[:start]
suffix = pattern[end + 1 :]
alternatives = pattern[start + 1 : end].split(",")
for alt in alternatives:
expanded = prefix + alt + suffix
if fnmatch.fnmatch(basename, expanded):
return True
return False
return fnmatch.fnmatch(basename, pattern)
def _format_grep_results(
self,
results: dict[str, list[tuple[int, str]]],
output_mode: str,
) -> str:
"""Format grep results based on output mode."""
if output_mode == "files_with_matches":
# Just return file paths
return "\n".join(sorted(results.keys()))
if output_mode == "content":
# Return file:line:content format
lines = []
for file_path in sorted(results.keys()):
for line_num, line in results[file_path]:
lines.append(f"{file_path}:{line_num}:{line}")
return "\n".join(lines)
if output_mode == "count":
# Return file:count format
lines = []
for file_path in sorted(results.keys()):
count = len(results[file_path])
lines.append(f"{file_path}:{count}")
return "\n".join(lines)
# Default to files_with_matches
return "\n".join(sorted(results.keys()))
__all__ = [
"FilesystemFileSearchMiddleware",
"StateFileSearchMiddleware",
]

View File

@@ -0,0 +1,538 @@
"""Middleware for providing filesystem tools to an agent."""
# ruff: noqa: E501
from collections.abc import Callable
from typing import TYPE_CHECKING, Annotated, Any
from typing_extensions import NotRequired
if TYPE_CHECKING:
from langgraph.runtime import Runtime
from langchain_core.messages import AIMessage, ToolMessage
from langchain_core.tools import BaseTool, InjectedToolCallId, tool
from langgraph.config import get_config
from langgraph.runtime import Runtime, get_runtime
from langgraph.store.base import BaseStore, Item
from langgraph.types import Command
from langchain.agents._internal.file_utils import (
FileData,
append_memories_prefix,
check_empty_content,
create_file_data,
file_data_reducer,
file_data_to_string,
format_content_with_line_numbers,
has_memories_prefix,
strip_memories_prefix,
update_file_data,
validate_path,
)
from langchain.agents.middleware.types import AgentMiddleware, AgentState, ModelRequest
from langchain.tools.tool_node import InjectedState
class FilesystemState(AgentState):
"""State for the filesystem middleware."""
files: Annotated[NotRequired[dict[str, FileData]], file_data_reducer]
"""Files in the filesystem."""
LIST_FILES_TOOL_DESCRIPTION = """Lists all files in the filesystem, optionally filtering by directory.
Usage:
- The list_files tool will return a list of all files in the filesystem.
- You can optionally provide a path parameter to list files in a specific directory.
- This is very useful for exploring the file system and finding the right file to read or edit.
- You should almost ALWAYS use this tool before using the Read or Edit tools."""
LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = (
"\n- Files from the longterm filesystem will be prefixed with the /memories/ path."
)
READ_FILE_TOOL_DESCRIPTION = """Reads a file from the filesystem. You can access any file directly by using this tool.
Assume this tool is able to read all files on the machine. If the User provides a path to a file assume that path is valid. It is okay to read a file that does not exist; an error will be returned.
Usage:
- The file_path parameter must be an absolute path, not a relative path
- By default, it reads up to 2000 lines starting from the beginning of the file
- You can optionally specify a line offset and limit (especially handy for long files), but it's recommended to read the whole file by not providing these parameters
- Any lines longer than 2000 characters will be truncated
- Results are returned using cat -n format, with line numbers starting at 1
- You have the capability to call multiple tools in a single response. It is always better to speculatively read multiple files as a batch that are potentially useful.
- If you read a file that exists but has empty contents you will receive a system reminder warning in place of file contents.
- You should ALWAYS make sure a file has been read before editing it."""
READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = (
"\n- file_paths prefixed with the /memories/ path will be read from the longterm filesystem."
)
EDIT_FILE_TOOL_DESCRIPTION = """Performs exact string replacements in files.
Usage:
- You must use your `Read` tool at least once in the conversation before editing. This tool will error if you attempt an edit without reading the file.
- When editing text from Read tool output, ensure you preserve the exact indentation (tabs/spaces) as it appears AFTER the line number prefix. The line number prefix format is: spaces + line number + tab. Everything after that tab is the actual file content to match. Never include any part of the line number prefix in the old_string or new_string.
- ALWAYS prefer editing existing files. NEVER write new files unless explicitly required.
- Only use emojis if the user explicitly requests it. Avoid adding emojis to files unless asked.
- The edit will FAIL if `old_string` is not unique in the file. Either provide a larger string with more surrounding context to make it unique or use `replace_all` to change every instance of `old_string`.
- Use `replace_all` for replacing and renaming strings across the file. This parameter is useful if you want to rename a variable for instance."""
EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = "\n- You can edit files in the longterm filesystem by prefixing the filename with the /memories/ path."
WRITE_FILE_TOOL_DESCRIPTION = """Writes to a new file in the filesystem.
Usage:
- The file_path parameter must be an absolute path, not a relative path
- The content parameter must be a string
- The write_file tool will create the a new file.
- Prefer to edit existing files over creating new ones when possible.
- file_paths prefixed with the /memories/ path will be written to the longterm filesystem."""
WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = (
"\n- file_paths prefixed with the /memories/ path will be written to the longterm filesystem."
)
FILESYSTEM_SYSTEM_PROMPT = """## Filesystem Tools `ls`, `read_file`, `write_file`, `edit_file`
You have access to a filesystem which you can interact with using these tools.
All file paths must start with a /.
- ls: list all files in the filesystem
- read_file: read a file from the filesystem
- write_file: write to a file in the filesystem
- edit_file: edit a file in the filesystem"""
FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT = """
You also have access to a longterm filesystem in which you can store files that you want to keep around for longer than the current conversation.
In order to interact with the longterm filesystem, you can use those same tools, but filenames must be prefixed with the /memories/ path.
Remember, to interact with the longterm filesystem, you must prefix the filename with the /memories/ path."""
def _get_namespace() -> tuple[str] | tuple[str, str]:
namespace = "filesystem"
config = get_config()
if config is None:
return (namespace,)
assistant_id = config.get("metadata", {}).get("assistant_id")
if assistant_id is None:
return (namespace,)
return (assistant_id, "filesystem")
def _get_store(runtime: Runtime[Any]) -> BaseStore:
if runtime.store is None:
msg = "Longterm memory is enabled, but no store is available"
raise ValueError(msg)
return runtime.store
def _convert_store_item_to_file_data(store_item: Item) -> FileData:
if "content" not in store_item.value or not isinstance(store_item.value["content"], list):
msg = "Store item does not contain content"
raise ValueError(msg)
if "created_at" not in store_item.value or not isinstance(store_item.value["created_at"], str):
msg = "Store item does not contain created_at"
raise ValueError(msg)
if "modified_at" not in store_item.value or not isinstance(
store_item.value["modified_at"], str
):
msg = "Store item does not contain modified_at"
raise ValueError(msg)
return FileData(
content=store_item.value["content"],
created_at=store_item.value["created_at"],
modified_at=store_item.value["modified_at"],
)
def _convert_file_data_to_store_item(file_data: FileData) -> dict[str, Any]:
return {
"content": file_data["content"],
"created_at": file_data["created_at"],
"modified_at": file_data["modified_at"],
}
def _get_file_data_from_state(state: FilesystemState, file_path: str) -> FileData:
mock_filesystem = state.get("files", {})
if file_path not in mock_filesystem:
msg = f"File '{file_path}' not found"
raise ValueError(msg)
return mock_filesystem[file_path]
def _ls_tool_generator(
custom_description: str | None = None, *, has_longterm_memory: bool
) -> BaseTool:
tool_description = LIST_FILES_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif has_longterm_memory:
tool_description += LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
def _get_filenames_from_state(state: FilesystemState) -> list[str]:
files_dict = state.get("files", {})
return list(files_dict.keys())
def _filter_files_by_path(filenames: list[str], path: str | None) -> list[str]:
if path is None:
return filenames
normalized_path = validate_path(path)
return [f for f in filenames if f.startswith(normalized_path)]
if has_longterm_memory:
@tool(description=tool_description)
def ls(
state: Annotated[FilesystemState, InjectedState], path: str | None = None
) -> list[str]:
files = _get_filenames_from_state(state)
# Add filenames from longterm memory
runtime = get_runtime()
store = _get_store(runtime)
namespace = _get_namespace()
longterm_files = store.search(namespace)
longterm_files_prefixed = [append_memories_prefix(f.key) for f in longterm_files]
files.extend(longterm_files_prefixed)
return _filter_files_by_path(files, path)
else:
@tool(description=tool_description)
def ls(
state: Annotated[FilesystemState, InjectedState], path: str | None = None
) -> list[str]:
files = _get_filenames_from_state(state)
return _filter_files_by_path(files, path)
return ls
def _read_file_tool_generator(
custom_description: str | None = None, *, has_longterm_memory: bool
) -> BaseTool:
tool_description = READ_FILE_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif has_longterm_memory:
tool_description += READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
def _read_file_data_content(file_data: FileData, offset: int, limit: int) -> str:
content = file_data_to_string(file_data)
empty_msg = check_empty_content(content)
if empty_msg:
return empty_msg
lines = content.splitlines()
start_idx = offset
end_idx = min(start_idx + limit, len(lines))
if start_idx >= len(lines):
return f"Error: Line offset {offset} exceeds file length ({len(lines)} lines)"
selected_lines = lines[start_idx:end_idx]
return format_content_with_line_numbers(
selected_lines, format_style="tab", start_line=start_idx + 1
)
if has_longterm_memory:
@tool(description=tool_description)
def read_file(
file_path: str,
state: Annotated[FilesystemState, InjectedState],
offset: int = 0,
limit: int = 2000,
) -> str:
file_path = validate_path(file_path)
if has_memories_prefix(file_path):
stripped_file_path = strip_memories_prefix(file_path)
runtime = get_runtime()
store = _get_store(runtime)
namespace = _get_namespace()
item: Item | None = store.get(namespace, stripped_file_path)
if item is None:
return f"Error: File '{file_path}' not found"
file_data = _convert_store_item_to_file_data(item)
else:
try:
file_data = _get_file_data_from_state(state, file_path)
except ValueError as e:
return str(e)
return _read_file_data_content(file_data, offset, limit)
else:
@tool(description=tool_description)
def read_file(
file_path: str,
state: Annotated[FilesystemState, InjectedState],
offset: int = 0,
limit: int = 2000,
) -> str:
file_path = validate_path(file_path)
try:
file_data = _get_file_data_from_state(state, file_path)
except ValueError as e:
return str(e)
return _read_file_data_content(file_data, offset, limit)
return read_file
def _write_file_tool_generator(
custom_description: str | None = None, *, has_longterm_memory: bool
) -> BaseTool:
tool_description = WRITE_FILE_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif has_longterm_memory:
tool_description += WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
def _write_file_to_state(
state: FilesystemState, tool_call_id: str, file_path: str, content: str
) -> Command | str:
mock_filesystem = state.get("files", {})
existing = mock_filesystem.get(file_path)
if existing:
return f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path."
new_file_data = create_file_data(content)
return Command(
update={
"files": {file_path: new_file_data},
"messages": [ToolMessage(f"Updated file {file_path}", tool_call_id=tool_call_id)],
}
)
if has_longterm_memory:
@tool(description=tool_description)
def write_file(
file_path: str,
content: str,
state: Annotated[FilesystemState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
) -> Command | str:
file_path = validate_path(file_path)
if has_memories_prefix(file_path):
stripped_file_path = strip_memories_prefix(file_path)
runtime = get_runtime()
store = _get_store(runtime)
namespace = _get_namespace()
if store.get(namespace, stripped_file_path) is not None:
return f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path."
new_file_data = create_file_data(content)
store.put(
namespace, stripped_file_path, _convert_file_data_to_store_item(new_file_data)
)
return f"Updated longterm memories file {file_path}"
return _write_file_to_state(state, tool_call_id, file_path, content)
else:
@tool(description=tool_description)
def write_file(
file_path: str,
content: str,
state: Annotated[FilesystemState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
) -> Command | str:
file_path = validate_path(file_path)
return _write_file_to_state(state, tool_call_id, file_path, content)
return write_file
def _edit_file_tool_generator(
custom_description: str | None = None, *, has_longterm_memory: bool
) -> BaseTool:
tool_description = EDIT_FILE_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif has_longterm_memory:
tool_description += EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
if has_longterm_memory:
@tool(description=tool_description)
def edit_file(
file_path: str,
old_string: str,
new_string: str,
state: Annotated[FilesystemState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
*,
replace_all: bool = False,
) -> Command | str:
file_path = validate_path(file_path)
is_longterm_memory = has_memories_prefix(file_path)
if is_longterm_memory:
stripped_file_path = strip_memories_prefix(file_path)
runtime = get_runtime()
store = _get_store(runtime)
namespace = _get_namespace()
item: Item | None = store.get(namespace, stripped_file_path)
if item is None:
return f"Error: File '{file_path}' not found"
file_data = _convert_store_item_to_file_data(item)
else:
try:
file_data = _get_file_data_from_state(state, file_path)
except ValueError as e:
return str(e)
content = file_data_to_string(file_data)
occurrences = content.count(old_string)
if occurrences == 0:
return f"Error: String not found in file: '{old_string}'"
if occurrences > 1 and not replace_all:
return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context."
new_content = content.replace(old_string, new_string)
new_file_data = update_file_data(file_data, new_content)
result_msg = (
f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'"
)
if is_longterm_memory:
store.put(
namespace, stripped_file_path, _convert_file_data_to_store_item(new_file_data)
)
return result_msg
return Command(
update={
"files": {file_path: new_file_data},
"messages": [ToolMessage(result_msg, tool_call_id=tool_call_id)],
}
)
else:
@tool(description=tool_description)
def edit_file(
file_path: str,
old_string: str,
new_string: str,
state: Annotated[FilesystemState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
*,
replace_all: bool = False,
) -> Command | str:
file_path = validate_path(file_path)
try:
file_data = _get_file_data_from_state(state, file_path)
except ValueError as e:
return str(e)
content = file_data_to_string(file_data)
occurrences = content.count(old_string)
if occurrences == 0:
return f"Error: String not found in file: '{old_string}'"
if occurrences > 1 and not replace_all:
return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context."
new_content = content.replace(old_string, new_string)
new_file_data = update_file_data(file_data, new_content)
result_msg = (
f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'"
)
return Command(
update={
"files": {file_path: new_file_data},
"messages": [ToolMessage(result_msg, tool_call_id=tool_call_id)],
}
)
return edit_file
TOOL_GENERATORS = {
"ls": _ls_tool_generator,
"read_file": _read_file_tool_generator,
"write_file": _write_file_tool_generator,
"edit_file": _edit_file_tool_generator,
}
def _get_filesystem_tools(
custom_tool_descriptions: dict[str, str] | None = None, *, has_longterm_memory: bool
) -> list[BaseTool]:
"""Get filesystem tools.
Args:
has_longterm_memory: Whether to enable longterm memory support.
custom_tool_descriptions: Optional custom descriptions for tools.
Returns:
List of configured filesystem tools.
"""
if custom_tool_descriptions is None:
custom_tool_descriptions = {}
tools = []
for tool_name, tool_generator in TOOL_GENERATORS.items():
tool = tool_generator(
custom_tool_descriptions.get(tool_name), has_longterm_memory=has_longterm_memory
)
tools.append(tool)
return tools
class FilesystemMiddleware(AgentMiddleware):
"""Middleware for providing filesystem tools to an agent.
Args:
use_longterm_memory: Whether to enable longterm memory support.
system_prompt_extension: Optional custom system prompt.
custom_tool_descriptions: Optional custom tool descriptions.
Returns:
List of configured filesystem tools.
Raises:
ValueError: If longterm memory is enabled but no store is available.
Example:
```python
from langchain.agents.middleware.filesystem import FilesystemMiddleware
from langchain.agents import create_agent
agent = create_agent(middleware=[FilesystemMiddleware(use_longterm_memory=False)])
```
"""
state_schema = FilesystemState
def __init__(
self,
*,
use_longterm_memory: bool = False,
system_prompt_extension: str | None = None,
custom_tool_descriptions: dict[str, str] | None = None,
) -> None:
"""Initialize the filesystem middleware.
Args:
use_longterm_memory: Whether to enable longterm memory support.
system_prompt_extension: Optional custom system prompt.
custom_tool_descriptions: Optional custom tool descriptions.
"""
self.use_longterm_memory = use_longterm_memory
self.system_prompt_extension = FILESYSTEM_SYSTEM_PROMPT
if system_prompt_extension is not None:
self.system_prompt_extension = system_prompt_extension
elif use_longterm_memory:
self.system_prompt_extension += FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT
self.tools = _get_filesystem_tools(
custom_tool_descriptions, has_longterm_memory=use_longterm_memory
)
def before_model_call(self, request: ModelRequest, runtime: Runtime[Any]) -> ModelRequest:
"""If use_longterm_memory is True, we must have a store available."""
if self.use_longterm_memory and runtime.store is None:
msg = "Longterm memory is enabled, but no store is available"
raise ValueError(msg)
return request
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], AIMessage],
) -> AIMessage:
"""Update the system prompt to include instructions on using the filesystem."""
if self.system_prompt_extension is not None:
request.system_prompt = (
request.system_prompt + "\n\n" + self.system_prompt_extension
if request.system_prompt
else self.system_prompt_extension
)
return handler(request)

View File

@@ -51,7 +51,7 @@ class AnthropicPromptCachingMiddleware(AgentMiddleware):
try:
from langchain_anthropic import ChatAnthropic
except ImportError:
ChatAnthropic = None # noqa: N806
ChatAnthropic = None # type: ignore[assignment,misc] # noqa: N806
msg: str | None = None

View File

@@ -0,0 +1,360 @@
"""Middleware for providing subagents to an agent via a `task` tool."""
from collections.abc import Callable, Sequence
from typing import Annotated, Any, TypedDict, cast
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
from langchain_core.runnables import Runnable
from langchain_core.tools import BaseTool, tool
from langgraph.types import Command
from typing_extensions import NotRequired
from langchain.agents.middleware.filesystem import FilesystemMiddleware
from langchain.agents.middleware.planning import PlanningMiddleware
from langchain.agents.middleware.prompt_caching import AnthropicPromptCachingMiddleware
from langchain.agents.middleware.summarization import SummarizationMiddleware
from langchain.agents.middleware.types import AgentMiddleware, ModelRequest
from langchain.tools import InjectedState, InjectedToolCallId
class SubAgent(TypedDict):
"""A subagent constructed with user-defined parameters."""
name: str
"""The name of the subagent."""
description: str
"""The description of the subagent."""
system_prompt: str
"""The system prompt to use for the subagent."""
tools: Sequence[BaseTool | Callable | dict[str, Any]]
"""The tools to use for the subagent."""
model: NotRequired[str | BaseChatModel]
"""The model for the subagent."""
middleware: NotRequired[list[AgentMiddleware]]
"""The middleware to use for the subagent."""
class CompiledSubAgent(TypedDict):
"""A Runnable passed in as a subagent."""
name: str
"""The name of the subagent."""
description: str
"""The description of the subagent."""
runnable: Runnable
"""The Runnable to use for the subagent."""
DEFAULT_SUBAGENT_PROMPT = "In order to complete the objective that the user asks of you, you have access to a number of standard tools." # noqa: E501
TASK_TOOL_DESCRIPTION = """Launch an ephemeral subagent to handle complex, multi-step independent tasks with isolated context windows.
Available agent types and the tools they have access to:
- general-purpose: General-purpose agent for researching complex questions, searching for files and content, and executing multi-step tasks. When you are searching for a keyword or file and are not confident that you will find the right match in the first few tries use this agent to perform the search for you. This agent has access to all tools as the main agent.
{other_agents}
When using the Task tool, you must specify a subagent_type parameter to select which agent type to use.
## Usage notes:
1. Launch multiple agents concurrently whenever possible, to maximize performance; to do that, use a single message with multiple tool uses
2. When the agent is done, it will return a single message back to you. The result returned by the agent is not visible to the user. To show the user the result, you should send a text message back to the user with a concise summary of the result.
3. Each agent invocation is stateless. You will not be able to send additional messages to the agent, nor will the agent be able to communicate with you outside of its final report. Therefore, your prompt should contain a highly detailed task description for the agent to perform autonomously and you should specify exactly what information the agent should return back to you in its final and only message to you.
4. The agent's outputs should generally be trusted
5. Clearly tell the agent whether you expect it to create content, perform analysis, or just do research (search, file reads, web fetches, etc.), since it is not aware of the user's intent
6. If the agent description mentions that it should be used proactively, then you should try your best to use it without the user having to ask for it first. Use your judgement.
7. When only the general-purpose agent is provided, you should use it for all tasks. It is great for isolating context and token usage, and completing specific, complex tasks, as it has all the same capabilities as the main agent.
### Example usage of the general-purpose agent:
<example_agent_descriptions>
"general-purpose": use this agent for general purpose tasks, it has access to all tools as the main agent.
</example_agent_descriptions>
<example>
User: "I want to conduct research on the accomplishments of Lebron James, Michael Jordan, and Kobe Bryant, and then compare them."
Assistant: *Uses the task tool in parallel to conduct isolated research on each of the three players*
Assistant: *Synthesizes the results of the three isolated research tasks and responds to the User*
<commentary>
Research is a complex, multi-step task in it of itself.
The research of each individual player is not dependent on the research of the other players.
The assistant uses the task tool to break down the complex objective into three isolated tasks.
Each research task only needs to worry about context and tokens about one player, then returns synthesized information about each player as the Tool Result.
This means each research task can dive deep and spend tokens and context deeply researching each player, but the final result is synthesized information, and saves us tokens in the long run when comparing the players to each other.
</commentary>
</example>
<example>
User: "Analyze a single large code repository for security vulnerabilities and generate a report."
Assistant: *Launches a single `task` subagent for the repository analysis*
Assistant: *Receives report and integrates results into final summary*
<commentary>
Subagent is used to isolate a large, context-heavy task, even though there is only one. This prevents the main thread from being overloaded with details.
If the user then asks followup questions, we have a concise report to reference instead of the entire history of analysis and tool calls, which is good and saves us time and money.
</commentary>
</example>
<example>
User: "Schedule two meetings for me and prepare agendas for each."
Assistant: *Calls the task tool in parallel to launch two `task` subagents (one per meeting) to prepare agendas*
Assistant: *Returns final schedules and agendas*
<commentary>
Tasks are simple individually, but subagents help silo agenda preparation.
Each subagent only needs to worry about the agenda for one meeting.
</commentary>
</example>
<example>
User: "I want to order a pizza from Dominos, order a burger from McDonald's, and order a salad from Subway."
Assistant: *Calls tools directly in parallel to order a pizza from Dominos, a burger from McDonald's, and a salad from Subway*
<commentary>
The assistant did not use the task tool because the objective is super simple and clear and only requires a few trivial tool calls.
It is better to just complete the task directly and NOT use the `task`tool.
</commentary>
</example>
### Example usage with custom agents:
<example_agent_descriptions>
"content-reviewer": use this agent after you are done creating significant content or documents
"greeting-responder": use this agent when to respond to user greetings with a friendly joke
"research-analyst": use this agent to conduct thorough research on complex topics
</example_agent_description>
<example>
user: "Please write a function that checks if a number is prime"
assistant: Sure let me write a function that checks if a number is prime
assistant: First let me use the Write tool to write a function that checks if a number is prime
assistant: I'm going to use the Write tool to write the following code:
<code>
function isPrime(n) {{
if (n <= 1) return false
for (let i = 2; i * i <= n; i++) {{
if (n % i === 0) return false
}}
return true
}}
</code>
<commentary>
Since significant content was created and the task was completed, now use the content-reviewer agent to review the work
</commentary>
assistant: Now let me use the content-reviewer agent to review the code
assistant: Uses the Task tool to launch with the content-reviewer agent
</example>
<example>
user: "Can you help me research the environmental impact of different renewable energy sources and create a comprehensive report?"
<commentary>
This is a complex research task that would benefit from using the research-analyst agent to conduct thorough analysis
</commentary>
assistant: I'll help you research the environmental impact of renewable energy sources. Let me use the research-analyst agent to conduct comprehensive research on this topic.
assistant: Uses the Task tool to launch with the research-analyst agent, providing detailed instructions about what research to conduct and what format the report should take
</example>
<example>
user: "Hello"
<commentary>
Since the user is greeting, use the greeting-responder agent to respond with a friendly joke
</commentary>
assistant: "I'm going to use the Task tool to launch with the greeting-responder agent"
</example>""" # noqa: E501
def _get_subagents(
default_subagent_model: str | BaseChatModel,
default_subagent_tools: Sequence[BaseTool | Callable | dict[str, Any]],
subagents: list[SubAgent | CompiledSubAgent],
) -> tuple[dict[str, Any], list[str]]:
from langchain.agents.factory import create_agent
default_subagent_middleware = [
PlanningMiddleware(),
FilesystemMiddleware(),
SummarizationMiddleware(
model=default_subagent_model,
max_tokens_before_summary=120000,
messages_to_keep=20,
),
AnthropicPromptCachingMiddleware(ttl="5m", unsupported_model_behavior="ignore"),
]
# Create the general-purpose subagent
general_purpose_subagent = create_agent(
model=default_subagent_model,
system_prompt=DEFAULT_SUBAGENT_PROMPT,
tools=default_subagent_tools,
middleware=default_subagent_middleware,
)
agents: dict[str, Any] = {"general-purpose": general_purpose_subagent}
subagent_descriptions = []
for _agent in subagents:
subagent_descriptions.append(f"- {_agent['name']}: {_agent['description']}")
if "runnable" in _agent:
custom_agent = cast("CompiledSubAgent", _agent)
agents[custom_agent["name"]] = custom_agent["runnable"]
continue
_tools = _agent.get("tools", list(default_subagent_tools))
subagent_model = _agent.get("model", default_subagent_model)
if "middleware" in _agent:
_middleware = [*default_subagent_middleware, *_agent["middleware"]]
else:
_middleware = default_subagent_middleware
agents[_agent["name"]] = create_agent(
subagent_model,
system_prompt=_agent["system_prompt"],
tools=_tools,
middleware=_middleware,
checkpointer=False,
)
return agents, subagent_descriptions
def _create_task_tool(
default_subagent_model: str | BaseChatModel,
default_subagent_tools: Sequence[BaseTool | Callable | dict[str, Any]],
subagents: list[SubAgent | CompiledSubAgent],
*,
is_async: bool = False,
) -> BaseTool:
subagent_graphs, subagent_descriptions = _get_subagents(
default_subagent_model, default_subagent_tools, subagents
)
subagent_description_str = "\n".join(subagent_descriptions)
def _return_command_with_state_update(result: dict, tool_call_id: str) -> Command:
state_update = {k: v for k, v in result.items() if k not in ["todos", "messages"]}
return Command(
update={
**state_update,
"messages": [
ToolMessage(result["messages"][-1].content, tool_call_id=tool_call_id)
],
}
)
task_tool_description = TASK_TOOL_DESCRIPTION.format(other_agents=subagent_description_str)
if is_async:
@tool(description=task_tool_description)
async def task(
description: str,
subagent_type: str,
state: Annotated[dict, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
) -> str | Command:
if subagent_type not in subagent_graphs:
msg = (
f"Error: invoked agent of type {subagent_type}, "
f"the only allowed types are {[f'`{k}`' for k in subagent_graphs]}"
)
raise ValueError(msg)
subagent = subagent_graphs[subagent_type]
state["messages"] = [HumanMessage(content=description)]
if "todos" in state:
del state["todos"]
result = await subagent.ainvoke(state)
return _return_command_with_state_update(result, tool_call_id)
else:
@tool(description=task_tool_description)
def task(
description: str,
subagent_type: str,
state: Annotated[dict, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
) -> str | Command:
if subagent_type not in subagent_graphs:
msg = (
f"Error: invoked agent of type {subagent_type}, "
f"the only allowed types are {[f'`{k}`' for k in subagent_graphs]}"
)
raise ValueError(msg)
subagent = subagent_graphs[subagent_type]
state["messages"] = [HumanMessage(content=description)]
if "todos" in state:
del state["todos"]
result = subagent.invoke(state)
return _return_command_with_state_update(result, tool_call_id)
return task
class SubAgentMiddleware(AgentMiddleware):
"""Middleware for providing subagents to an agent via a `task` tool.
This middleware adds a `task` tool to the agent that can be used to invoke subagents.
Subagents are useful for handling complex tasks that require multiple steps, or tasks
that require a lot of context to resolve.
A chief benefit of subagents is that they can handle multi-step tasks, and then return
a clean, concise response to the main agent.
Subagents are also great for different domains of expertise that require a narrower
subset of tools and focus.
This middleware comes with a default general-purpose subagent that can be used to
handle the same tasks as the main agent, but with isolated context.
Args:
default_subagent_model: The model to use for the general-purpose subagent.
Can be a LanguageModelLike or a dict for init_chat_model.
default_subagent_tools: The tools to use for the general-purpose subagent.
subagents: A list of additional subagents to provide to the agent.
system_prompt_extension: Additional instructions on how the main agent should use subagents.
is_async: Whether the `task` tool should be asynchronous.
Example:
```python
from langchain.agents.middleware.subagents import SubAgentMiddleware
from langchain.agents import create_agent
agent = create_agent("openai:gpt-4o", middleware=[SubAgentMiddleware(subagents=[])])
# Agent now has access to the `task` tool
result = await agent.invoke({"messages": [HumanMessage("Help me refactor my codebase")]})
```
"""
def __init__(
self,
*,
default_subagent_model: str | BaseChatModel,
default_subagent_tools: Sequence[BaseTool | Callable | dict[str, Any]] | None = None,
subagents: list[SubAgent | CompiledSubAgent] | None = None,
system_prompt_extension: str | None = None,
is_async: bool = False,
) -> None:
"""Initialize the SubAgentMiddleware."""
super().__init__()
self.system_prompt_extension = system_prompt_extension
task_tool = _create_task_tool(
default_subagent_model,
default_subagent_tools or [],
subagents or [],
is_async=is_async,
)
self.tools = [task_tool]
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], AIMessage],
) -> AIMessage:
"""Update the system prompt to include instructions on using subagents."""
if self.system_prompt_extension is not None:
request.system_prompt = (
request.system_prompt + "\n\n" + self.system_prompt_extension
if request.system_prompt
else self.system_prompt_extension
)
return handler(request)

View File

@@ -57,7 +57,8 @@ test = [
"toml>=0.10.2,<1.0.0",
"langchain-tests",
"langchain-text-splitters",
"langchain-openai"
"langchain-openai",
"langchain-anthropic",
]
lint = [
"ruff>=0.12.2,<0.13.0",

View File

@@ -0,0 +1,722 @@
from langchain.agents.middleware.filesystem import (
FilesystemMiddleware,
WRITE_FILE_TOOL_DESCRIPTION,
WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT,
)
from langchain.agents import create_agent
from langchain.agents.deepagents import create_deep_agent
from langchain_core.messages import HumanMessage
from langchain_anthropic import ChatAnthropic
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import MemorySaver
from langchain.agents._internal.file_utils import FileData
import pytest
import uuid
@pytest.mark.requires("langchain_anthropic")
class TestFilesystem:
def test_create_deepagent_without_store_and_with_longterm_memory_should_fail(self):
with pytest.raises(ValueError):
deepagent = create_deep_agent(tools=[], use_longterm_memory=True)
deepagent.invoke(
{"messages": [HumanMessage(content="List all of the files in your filesystem?")]}
)
def test_filesystem_system_prompt_override(self):
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=False,
system_prompt_extension="In every single response, you must say the word 'pokemon'! You love it!",
)
],
)
response = agent.invoke({"messages": [HumanMessage(content="What do you like?")]})
assert "pokemon" in response["messages"][1].text.lower()
def test_filesystem_system_prompt_override_with_longterm_memory(self):
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
system_prompt_extension="In every single response, you must say the word 'pokemon'! You love it!",
)
],
store=InMemoryStore(),
)
response = agent.invoke({"messages": [HumanMessage(content="What do you like?")]})
assert "pokemon" in response["messages"][1].text.lower()
def test_filesystem_tool_prompt_override(self):
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=False,
custom_tool_descriptions={
"ls": "Charmander",
"read_file": "Bulbasaur",
"edit_file": "Squirtle",
},
)
],
)
tools = agent.nodes["tools"].bound._tools_by_name
assert "ls" in tools
assert tools["ls"].description == "Charmander"
assert "read_file" in tools
assert tools["read_file"].description == "Bulbasaur"
assert "write_file" in tools
assert tools["write_file"].description == WRITE_FILE_TOOL_DESCRIPTION
assert "edit_file" in tools
assert tools["edit_file"].description == "Squirtle"
def test_filesystem_tool_prompt_override_with_longterm_memory(self):
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
custom_tool_descriptions={
"ls": "Charmander",
"read_file": "Bulbasaur",
"edit_file": "Squirtle",
},
)
],
store=InMemoryStore(),
)
tools = agent.nodes["tools"].bound._tools_by_name
assert "ls" in tools
assert tools["ls"].description == "Charmander"
assert "read_file" in tools
assert tools["read_file"].description == "Bulbasaur"
assert "write_file" in tools
assert (
tools["write_file"].description
== WRITE_FILE_TOOL_DESCRIPTION + WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
)
assert "edit_file" in tools
assert tools["edit_file"].description == "Squirtle"
def test_ls_longterm_without_path(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/test.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
store.put(
("filesystem",),
"/pokemon/charmander.txt",
{
"content": ["Ember"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [HumanMessage(content="List all of your files")],
"files": {
"/pizza.txt": FileData(
content=["Hello world"],
created_at="2021-01-01",
modified_at="2021-01-01",
),
"/pokemon/squirtle.txt": FileData(
content=["Splash"],
created_at="2021-01-01",
modified_at="2021-01-01",
),
},
},
config=config,
)
messages = response["messages"]
ls_message = next(
message for message in messages if message.type == "tool" and message.name == "ls"
)
assert "/pizza.txt" in ls_message.text
assert "/pokemon/squirtle.txt" in ls_message.text
assert "/memories/test.txt" in ls_message.text
assert "/memories/pokemon/charmander.txt" in ls_message.text
def test_ls_longterm_with_path(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/test.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
store.put(
("filesystem",),
"/pokemon/charmander.txt",
{
"content": ["Ember"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(content="List all of your files in the /pokemon directory")
],
"files": {
"/pizza.txt": FileData(
content=["Hello world"],
created_at="2021-01-01",
modified_at="2021-01-01",
),
"/pokemon/squirtle.txt": FileData(
content=["Splash"],
created_at="2021-01-01",
modified_at="2021-01-01",
),
},
},
config=config,
)
messages = response["messages"]
ls_message = next(
message for message in messages if message.type == "tool" and message.name == "ls"
)
assert "/pokemon/squirtle.txt" in ls_message.text
assert "/memories/pokemon/charmander.txt" not in ls_message.text
def test_read_file_longterm_local_file(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/test.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [HumanMessage(content="Read test.txt from local memory")],
"files": {
"/test.txt": FileData(
content=["Goodbye world"],
created_at="2021-01-01",
modified_at="2021-01-01",
)
},
},
config=config,
)
messages = response["messages"]
read_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "read_file"
)
assert read_file_message is not None
assert "Goodbye world" in read_file_message.content
def test_read_file_longterm_store_file(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/test.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [HumanMessage(content="Read test.txt from longterm memory")],
"files": {
"/test.txt": FileData(
content=["Goodbye world"],
created_at="2021-01-01",
modified_at="2021-01-01",
)
},
},
config=config,
)
messages = response["messages"]
read_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "read_file"
)
assert read_file_message is not None
assert "Hello world" in read_file_message.content
def test_read_file_longterm(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/test.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
store.put(
("filesystem",),
"/pokemon/charmander.txt",
{
"content": ["Ember"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Read the contents of the file about charmander from longterm memory."
)
],
"files": {},
},
config=config,
)
messages = response["messages"]
ai_msg_w_toolcall = next(
message
for message in messages
if message.type == "ai"
and any(
tc["name"] == "read_file"
and tc["args"]["file_path"] == "/memories/pokemon/charmander.txt"
for tc in message.tool_calls
)
)
assert ai_msg_w_toolcall is not None
def test_write_file_longterm(self):
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Write a haiku about Charmander to longterm memory in /charmander.txt, use the word 'fiery'"
)
],
"files": {},
},
config=config,
)
messages = response["messages"]
write_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "write_file"
)
assert write_file_message is not None
file_item = store.get(("filesystem",), "/charmander.txt")
assert file_item is not None
assert any("fiery" in c for c in file_item.value["content"]) or any(
"Fiery" in c for c in file_item.value["content"]
)
def test_write_file_fail_already_exists_in_store(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/charmander.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Write a haiku about Charmander to longterm memory in /charmander.txt, use the word 'fiery'"
)
],
"files": {},
},
config=config,
)
messages = response["messages"]
write_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "write_file"
)
assert write_file_message is not None
assert "Cannot write" in write_file_message.content
def test_write_file_fail_already_exists_in_local(self):
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Write a haiku about Charmander to /charmander.txt, use the word 'fiery'"
)
],
"files": {
"/charmander.txt": FileData(
content=["Hello world"],
created_at="2021-01-01",
modified_at="2021-01-01",
)
},
},
config=config,
)
messages = response["messages"]
write_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "write_file"
)
assert write_file_message is not None
assert "Cannot write" in write_file_message.content
def test_edit_file_longterm(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/charmander.txt",
{
"content": ["The fire burns brightly. The fire burns hot."],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Edit the longterm memory file about charmander, to replace all instances of the word 'fire' with 'embers'"
)
],
"files": {},
},
config=config,
)
messages = response["messages"]
edit_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "edit_file"
)
assert edit_file_message is not None
assert store.get(("filesystem",), "/charmander.txt").value["content"] == [
"The embers burns brightly. The embers burns hot."
]
def test_longterm_memory_multiple_tools(self):
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
assert_longterm_mem_tools(agent, store)
def test_longterm_memory_multiple_tools_deepagent(self):
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_deep_agent(use_longterm_memory=True, checkpointer=checkpointer, store=store)
assert_longterm_mem_tools(agent, store)
def test_shortterm_memory_multiple_tools_deepagent(self):
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_deep_agent(use_longterm_memory=False, checkpointer=checkpointer, store=store)
assert_shortterm_mem_tools(agent)
# Take actions on multiple threads to test longterm memory
def assert_longterm_mem_tools(agent, store):
# Write a longterm memory file
config = {"configurable": {"thread_id": uuid.uuid4()}}
agent.invoke(
{
"messages": [
HumanMessage(
content="Write a haiku about Charmander to longterm memory in /charmander.txt, use the word 'fiery'"
)
]
},
config=config,
)
namespaces = store.list_namespaces()
assert len(namespaces) == 1
assert namespaces[0] == ("filesystem",)
file_item = store.get(("filesystem",), "/charmander.txt")
assert file_item is not None
assert file_item.key == "/charmander.txt"
# Read the longterm memory file
config2 = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Read the haiku about Charmander from longterm memory at /charmander.txt"
)
]
},
config=config2,
)
messages = response["messages"]
read_file_message = next(
message for message in messages if message.type == "tool" and message.name == "read_file"
)
assert "fiery" in read_file_message.content or "Fiery" in read_file_message.content
# List all of the files in longterm memory
config3 = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{"messages": [HumanMessage(content="List all of the files in longterm memory")]},
config=config3,
)
messages = response["messages"]
ls_message = next(
message for message in messages if message.type == "tool" and message.name == "ls"
)
assert "/memories/charmander.txt" in ls_message.content
# Edit the longterm memory file
config4 = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Edit the haiku about Charmander in longterm memory to use the word 'ember'"
)
]
},
config=config4,
)
file_item = store.get(("filesystem",), "/charmander.txt")
assert file_item is not None
assert file_item.key == "/charmander.txt"
assert any("ember" in c for c in file_item.value["content"]) or any(
"Ember" in c for c in file_item.value["content"]
)
# Read the longterm memory file
config5 = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Read the haiku about Charmander from longterm memory at /charmander.txt"
)
]
},
config=config5,
)
messages = response["messages"]
read_file_message = next(
message for message in messages if message.type == "tool" and message.name == "read_file"
)
assert "ember" in read_file_message.content or "Ember" in read_file_message.content
def assert_shortterm_mem_tools(agent):
# Write a shortterm memory file
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Write a haiku about Charmander to /charmander.txt, use the word 'fiery'"
)
]
},
config=config,
)
files = response["files"]
assert "/charmander.txt" in files
# Read the shortterm memory file
response = agent.invoke(
{
"messages": [
HumanMessage(content="Read the haiku about Charmander from /charmander.txt")
]
},
config=config,
)
messages = response["messages"]
read_file_message = next(
message
for message in reversed(messages)
if message.type == "tool" and message.name == "read_file"
)
assert "fiery" in read_file_message.content or "Fiery" in read_file_message.content
# List all of the files in shortterm memory
response = agent.invoke(
{"messages": [HumanMessage(content="List all of the files in your filesystem")]},
config=config,
)
messages = response["messages"]
ls_message = next(
message for message in messages if message.type == "tool" and message.name == "ls"
)
assert "/charmander.txt" in ls_message.content
# Edit the shortterm memory file
response = agent.invoke(
{
"messages": [
HumanMessage(content="Edit the haiku about Charmander to use the word 'ember'")
]
},
config=config,
)
files = response["files"]
assert "/charmander.txt" in files
assert any("ember" in c for c in files["/charmander.txt"]["content"]) or any(
"Ember" in c for c in files["/charmander.txt"]["content"]
)
# Read the shortterm memory file
response = agent.invoke(
{"messages": [HumanMessage(content="Read the haiku about Charmander at /charmander.txt")]},
config=config,
)
messages = response["messages"]
read_file_message = next(
message
for message in reversed(messages)
if message.type == "tool" and message.name == "read_file"
)
assert "ember" in read_file_message.content or "Ember" in read_file_message.content

View File

@@ -0,0 +1,226 @@
from langchain.agents.middleware.subagents import SubAgentMiddleware
from langchain.agents.middleware import AgentMiddleware
from langchain_core.tools import tool
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
import pytest
@tool
def get_weather(city: str) -> str:
"""Get the weather in a city."""
return f"The weather in {city} is sunny."
class WeatherMiddleware(AgentMiddleware):
tools = [get_weather]
def assert_expected_subgraph_actions(expected_tool_calls, agent, inputs):
current_idx = 0
for update in agent.stream(
inputs,
subgraphs=True,
stream_mode="updates",
):
if "model" in update[1]:
ai_message = update[1]["model"]["messages"][-1]
tool_calls = ai_message.tool_calls
for tool_call in tool_calls:
if tool_call["name"] == expected_tool_calls[current_idx]["name"]:
if "model" in expected_tool_calls[current_idx]:
assert (
ai_message.response_metadata["model_name"]
== expected_tool_calls[current_idx]["model"]
)
for arg in expected_tool_calls[current_idx]["args"]:
assert arg in tool_call["args"]
assert (
tool_call["args"][arg] == expected_tool_calls[current_idx]["args"][arg]
)
current_idx += 1
assert current_idx == len(expected_tool_calls)
@pytest.mark.requires("langchain_anthropic", "langchain_openai")
class TestSubagentMiddleware:
"""Integration tests for the SubagentMiddleware class."""
def test_general_purpose_subagent(self):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the general-purpose subagent to get the weather in a city.",
middleware=[
SubAgentMiddleware(
default_subagent_model="claude-sonnet-4-20250514",
default_subagent_tools=[get_weather],
)
],
)
assert "task" in agent.nodes["tools"].bound._tools_by_name.keys()
response = agent.invoke(
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]}
)
assert response["messages"][1].tool_calls[0]["name"] == "task"
assert response["messages"][1].tool_calls[0]["args"]["subagent_type"] == "general-purpose"
def test_defined_subagent(self):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_subagent_model="claude-sonnet-4-20250514",
default_subagent_tools=[],
subagents=[
{
"name": "weather",
"description": "This subagent can get weather in cities.",
"system_prompt": "Use the get_weather tool to get the weather in a city.",
"tools": [get_weather],
}
],
)
],
)
assert "task" in agent.nodes["tools"].bound._tools_by_name.keys()
response = agent.invoke(
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]}
)
assert response["messages"][1].tool_calls[0]["name"] == "task"
assert response["messages"][1].tool_calls[0]["args"]["subagent_type"] == "weather"
def test_defined_subagent_tool_calls(self):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_subagent_model="claude-sonnet-4-20250514",
default_subagent_tools=[],
subagents=[
{
"name": "weather",
"description": "This subagent can get weather in cities.",
"system_prompt": "Use the get_weather tool to get the weather in a city.",
"tools": [get_weather],
}
],
)
],
)
expected_tool_calls = [
{"name": "task", "args": {"subagent_type": "weather"}},
{"name": "get_weather", "args": {}},
]
assert_expected_subgraph_actions(
expected_tool_calls,
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)
def test_defined_subagent_custom_model(self):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_subagent_model="claude-sonnet-4-20250514",
default_subagent_tools=[],
subagents=[
{
"name": "weather",
"description": "This subagent can get weather in cities.",
"system_prompt": "Use the get_weather tool to get the weather in a city.",
"tools": [get_weather],
"model": "gpt-4.1",
}
],
)
],
)
expected_tool_calls = [
{
"name": "task",
"args": {"subagent_type": "weather"},
"model": "claude-sonnet-4-20250514",
},
{"name": "get_weather", "args": {}, "model": "gpt-4.1-2025-04-14"},
]
assert_expected_subgraph_actions(
expected_tool_calls,
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)
def test_defined_subagent_custom_middleware(self):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_subagent_model="claude-sonnet-4-20250514",
default_subagent_tools=[],
subagents=[
{
"name": "weather",
"description": "This subagent can get weather in cities.",
"system_prompt": "Use the get_weather tool to get the weather in a city.",
"tools": [], # No tools, only in middleware
"model": "gpt-4.1",
"middleware": [WeatherMiddleware()],
}
],
)
],
)
expected_tool_calls = [
{
"name": "task",
"args": {"subagent_type": "weather"},
"model": "claude-sonnet-4-20250514",
},
{"name": "get_weather", "args": {}, "model": "gpt-4.1-2025-04-14"},
]
assert_expected_subgraph_actions(
expected_tool_calls,
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)
def test_defined_subagent_custom_runnable(self):
custom_subagent = create_agent(
model="gpt-4.1-2025-04-14",
system_prompt="Use the get_weather tool to get the weather in a city.",
tools=[get_weather],
)
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_subagent_model="claude-sonnet-4-20250514",
default_subagent_tools=[],
subagents=[
{
"name": "weather",
"description": "This subagent can get weather in cities.",
"runnable": custom_subagent,
}
],
)
],
)
expected_tool_calls = [
{
"name": "task",
"args": {"subagent_type": "weather"},
"model": "claude-sonnet-4-20250514",
},
{"name": "get_weather", "args": {}, "model": "gpt-4.1-2025-04-14"},
]
assert_expected_subgraph_actions(
expected_tool_calls,
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)

View File

@@ -0,0 +1,306 @@
from langchain.agents.deepagents import create_deep_agent
from langchain.agents import create_agent
from langchain_core.tools import tool, InjectedToolCallId
from typing import Annotated
from langchain.tools.tool_node import InjectedState
from langchain.agents.middleware import AgentMiddleware, AgentState
from langgraph.types import Command
from langchain_core.messages import ToolMessage
import pytest
def assert_all_deepagent_qualities(agent):
assert "todos" in agent.stream_channels
assert "files" in agent.stream_channels
assert "write_todos" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "ls" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "read_file" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "write_file" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "edit_file" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "task" in agent.nodes["tools"].bound._tools_by_name.keys()
SAMPLE_MODEL = "claude-3-5-sonnet-20240620"
@tool(description="Use this tool to get the weather")
def get_weather(location: str):
return f"The weather in {location} is sunny."
@tool(description="Use this tool to get the latest soccer scores")
def get_soccer_scores(team: str):
return f"The latest soccer scores for {team} are 2-1."
@tool(description="Sample tool")
def sample_tool(sample_input: str):
return sample_input
@tool(description="Sample tool with injected state")
def sample_tool_with_injected_state(sample_input: str, state: Annotated[dict, InjectedState]):
return sample_input + state["sample_input"]
TOY_BASKETBALL_RESEARCH = "Lebron James is the best basketball player of all time with over 40k points and 21 seasons in the NBA."
@tool(description="Use this tool to conduct research into basketball and save it to state")
def research_basketball(
topic: str,
state: Annotated[dict, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
):
current_research = state.get("research", "")
research = f"{current_research}\n\nResearching on {topic}... Done! {TOY_BASKETBALL_RESEARCH}"
return Command(
update={
"research": research,
"messages": [ToolMessage(research, tool_call_id=tool_call_id)],
}
)
class ResearchState(AgentState):
research: str
class ResearchMiddlewareWithTools(AgentMiddleware):
state_schema = ResearchState
tools = [research_basketball]
class ResearchMiddleware(AgentMiddleware):
state_schema = ResearchState
class SampleMiddlewareWithTools(AgentMiddleware):
tools = [sample_tool]
class SampleState(AgentState):
sample_input: str
class SampleMiddlewareWithToolsAndState(AgentMiddleware):
state_schema = SampleState
tools = [sample_tool]
class WeatherToolMiddleware(AgentMiddleware):
tools = [get_weather]
@pytest.mark.requires("langchain_anthropic")
class TestDeepAgentsFilesystem:
def test_base_deep_agent(self):
agent = create_deep_agent()
assert_all_deepagent_qualities(agent)
def test_deep_agent_with_tool(self):
agent = create_deep_agent(tools=[sample_tool])
assert_all_deepagent_qualities(agent)
assert "sample_tool" in agent.nodes["tools"].bound._tools_by_name.keys()
def test_deep_agent_with_middleware_with_tool(self):
agent = create_deep_agent(middleware=[SampleMiddlewareWithTools()])
assert_all_deepagent_qualities(agent)
assert "sample_tool" in agent.nodes["tools"].bound._tools_by_name.keys()
def test_deep_agent_with_middleware_with_tool_and_state(self):
agent = create_deep_agent(middleware=[SampleMiddlewareWithToolsAndState()])
assert_all_deepagent_qualities(agent)
assert "sample_tool" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "sample_input" in agent.stream_channels
def test_deep_agent_with_subagents(self):
subagents = [
{
"name": "weather_agent",
"description": "Use this agent to get the weather",
"system_prompt": "You are a weather agent.",
"tools": [get_weather],
"model": SAMPLE_MODEL,
}
]
agent = create_deep_agent(tools=[sample_tool], subagents=subagents)
assert_all_deepagent_qualities(agent)
result = agent.invoke(
{"messages": [{"role": "user", "content": "What is the weather in Tokyo?"}]}
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "weather_agent"
for tool_call in tool_calls
]
)
def test_deep_agent_with_subagents_gen_purpose(self):
subagents = [
{
"name": "weather_agent",
"description": "Use this agent to get the weather",
"system_prompt": "You are a weather agent.",
"tools": [get_weather],
"model": SAMPLE_MODEL,
}
]
agent = create_deep_agent(tools=[sample_tool], subagents=subagents)
assert_all_deepagent_qualities(agent)
result = agent.invoke(
{
"messages": [
{
"role": "user",
"content": "Use the general purpose subagent to call the sample tool",
}
]
}
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "general-purpose"
for tool_call in tool_calls
]
)
def test_deep_agent_with_subagents_with_middleware(self):
subagents = [
{
"name": "weather_agent",
"description": "Use this agent to get the weather",
"system_prompt": "You are a weather agent.",
"tools": [],
"model": SAMPLE_MODEL,
"middleware": [WeatherToolMiddleware()],
}
]
agent = create_deep_agent(tools=[sample_tool], subagents=subagents)
assert_all_deepagent_qualities(agent)
result = agent.invoke(
{"messages": [{"role": "user", "content": "What is the weather in Tokyo?"}]}
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "weather_agent"
for tool_call in tool_calls
]
)
def test_deep_agent_with_custom_subagents(self):
subagents = [
{
"name": "weather_agent",
"description": "Use this agent to get the weather",
"system_prompt": "You are a weather agent.",
"tools": [get_weather],
"model": SAMPLE_MODEL,
},
{
"name": "soccer_agent",
"description": "Use this agent to get the latest soccer scores",
"runnable": create_agent(
model=SAMPLE_MODEL,
tools=[get_soccer_scores],
system_prompt="You are a soccer agent.",
),
},
]
agent = create_deep_agent(tools=[sample_tool], subagents=subagents)
assert_all_deepagent_qualities(agent)
result = agent.invoke(
{
"messages": [
{
"role": "user",
"content": "Look up the weather in Tokyo, and the latest scores for Manchester City!",
}
]
}
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "weather_agent"
for tool_call in tool_calls
]
)
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "soccer_agent"
for tool_call in tool_calls
]
)
def test_deep_agent_with_extended_state_and_subagents(self):
subagents = [
{
"name": "basketball_info_agent",
"description": "Use this agent to get surface level info on any basketball topic",
"system_prompt": "You are a basketball info agent.",
"middleware": [ResearchMiddlewareWithTools()],
}
]
agent = create_deep_agent(
tools=[sample_tool], subagents=subagents, middleware=[ResearchMiddleware()]
)
assert_all_deepagent_qualities(agent)
assert "research" in agent.stream_channels
result = agent.invoke(
{"messages": [{"role": "user", "content": "Get surface level info on lebron james"}]},
config={"recursion_limit": 100},
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "basketball_info_agent"
for tool_call in tool_calls
]
)
assert TOY_BASKETBALL_RESEARCH in result["research"]
def test_deep_agent_with_subagents_no_tools(self):
subagents = [
{
"name": "basketball_info_agent",
"description": "Use this agent to get surface level info on any basketball topic",
"system_prompt": "You are a basketball info agent.",
}
]
agent = create_deep_agent(tools=[sample_tool], subagents=subagents)
assert_all_deepagent_qualities(agent)
result = agent.invoke(
{
"messages": [
{
"role": "user",
"content": "Use the basketball info subagent to call the sample tool",
}
]
},
config={"recursion_limit": 100},
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "basketball_info_agent"
for tool_call in tool_calls
]
)

View File

@@ -0,0 +1,276 @@
"""Unit tests for Anthropic text editor and memory tool middleware."""
import pytest
from langchain.agents.middleware.anthropic_tools import (
AnthropicToolsState,
StateClaudeMemoryMiddleware,
StateClaudeTextEditorMiddleware,
)
from langchain.agents._internal.file_utils import validate_path
from langchain_core.messages import ToolMessage
from langgraph.types import Command
class TestPathValidation:
"""Test path validation and security."""
def test_basic_path_normalization(self) -> None:
"""Test basic path normalization."""
assert validate_path("/foo/bar") == "/foo/bar"
assert validate_path("foo/bar") == "/foo/bar"
assert validate_path("/foo//bar") == "/foo/bar"
assert validate_path("/foo/./bar") == "/foo/bar"
def test_path_traversal_blocked(self) -> None:
"""Test that path traversal attempts are blocked."""
with pytest.raises(ValueError, match="Path traversal not allowed"):
validate_path("/foo/../etc/passwd")
with pytest.raises(ValueError, match="Path traversal not allowed"):
validate_path("../etc/passwd")
with pytest.raises(ValueError, match="Path traversal not allowed"):
validate_path("~/.ssh/id_rsa")
def test_allowed_prefixes(self) -> None:
"""Test path prefix validation."""
# Should pass
assert (
validate_path("/workspace/file.txt", allowed_prefixes=["/workspace"])
== "/workspace/file.txt"
)
# Should fail
with pytest.raises(ValueError, match="Path must start with"):
validate_path("/etc/passwd", allowed_prefixes=["/workspace"])
with pytest.raises(ValueError, match="Path must start with"):
validate_path("/workspacemalicious/file.txt", allowed_prefixes=["/workspace/"])
def test_memories_prefix(self) -> None:
"""Test /memories prefix validation for memory tools."""
assert (
validate_path("/memories/notes.txt", allowed_prefixes=["/memories"])
== "/memories/notes.txt"
)
with pytest.raises(ValueError, match="Path must start with"):
validate_path("/other/notes.txt", allowed_prefixes=["/memories"])
class TestTextEditorMiddleware:
"""Test text editor middleware functionality."""
def test_middleware_initialization(self) -> None:
"""Test middleware initializes correctly."""
middleware = StateClaudeTextEditorMiddleware()
assert middleware.state_schema == AnthropicToolsState
assert middleware.tool_type == "text_editor_20250728"
assert middleware.tool_name == "str_replace_based_edit_tool"
assert middleware.state_key == "text_editor_files"
# With path restrictions
middleware = StateClaudeTextEditorMiddleware(allowed_path_prefixes=["/workspace"])
assert middleware.allowed_prefixes == ["/workspace"]
class TestMemoryMiddleware:
"""Test memory middleware functionality."""
def test_middleware_initialization(self) -> None:
"""Test middleware initializes correctly."""
middleware = StateClaudeMemoryMiddleware()
assert middleware.state_schema == AnthropicToolsState
assert middleware.tool_type == "memory_20250818"
assert middleware.tool_name == "memory"
assert middleware.state_key == "memory_files"
assert middleware.system_prompt # Should have default prompt
def test_custom_system_prompt(self) -> None:
"""Test custom system prompt can be set."""
custom_prompt = "Custom memory instructions"
middleware = StateClaudeMemoryMiddleware(system_prompt=custom_prompt)
assert middleware.system_prompt == custom_prompt
class TestFileOperations:
"""Test file operation implementations via wrap_tool_call."""
def test_view_operation(self) -> None:
"""Test view command execution."""
middleware = StateClaudeTextEditorMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/test.txt": {
"content": ["line1", "line2", "line3"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
}
},
}
args = {"command": "view", "path": "/test.txt"}
result = middleware._handle_view(args, state, "test_id")
assert isinstance(result, Command)
assert result.update is not None
messages = result.update.get("messages", [])
assert len(messages) == 1
assert isinstance(messages[0], ToolMessage)
assert messages[0].content == "1|line1\n2|line2\n3|line3"
assert messages[0].tool_call_id == "test_id"
def test_create_operation(self) -> None:
"""Test create command execution."""
middleware = StateClaudeTextEditorMiddleware()
state: AnthropicToolsState = {"messages": []}
args = {"command": "create", "path": "/test.txt", "file_text": "line1\nline2"}
result = middleware._handle_create(args, state, "test_id")
assert isinstance(result, Command)
assert result.update is not None
files = result.update.get("text_editor_files", {})
assert "/test.txt" in files
assert files["/test.txt"]["content"] == ["line1", "line2"]
def test_path_prefix_enforcement(self) -> None:
"""Test that path prefixes are enforced."""
middleware = StateClaudeTextEditorMiddleware(allowed_path_prefixes=["/workspace"])
state: AnthropicToolsState = {"messages": []}
# Should fail with /etc/passwd
args = {"command": "create", "path": "/etc/passwd", "file_text": "test"}
try:
middleware._handle_create(args, state, "test_id")
assert False, "Should have raised ValueError"
except ValueError as e:
assert "Path must start with" in str(e)
def test_memories_prefix_enforcement(self) -> None:
"""Test that /memories prefix is enforced for memory middleware."""
middleware = StateClaudeMemoryMiddleware()
state: AnthropicToolsState = {"messages": []}
# Should fail with /other/path
args = {"command": "create", "path": "/other/path.txt", "file_text": "test"}
try:
middleware._handle_create(args, state, "test_id")
assert False, "Should have raised ValueError"
except ValueError as e:
assert "/memories" in str(e)
def test_str_replace_operation(self) -> None:
"""Test str_replace command execution."""
middleware = StateClaudeTextEditorMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/test.txt": {
"content": ["Hello world", "Goodbye world"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
}
},
}
args = {
"command": "str_replace",
"path": "/test.txt",
"old_str": "world",
"new_str": "universe",
}
result = middleware._handle_str_replace(args, state, "test_id")
assert isinstance(result, Command)
files = result.update.get("text_editor_files", {})
# Should only replace first occurrence
assert files["/test.txt"]["content"] == ["Hello universe", "Goodbye world"]
def test_insert_operation(self) -> None:
"""Test insert command execution."""
middleware = StateClaudeTextEditorMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/test.txt": {
"content": ["line1", "line2"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
}
},
}
args = {
"command": "insert",
"path": "/test.txt",
"insert_line": 0,
"new_str": "inserted",
}
result = middleware._handle_insert(args, state, "test_id")
assert isinstance(result, Command)
files = result.update.get("text_editor_files", {})
assert files["/test.txt"]["content"] == ["inserted", "line1", "line2"]
def test_delete_operation(self) -> None:
"""Test delete command execution (memory only)."""
middleware = StateClaudeMemoryMiddleware()
state: AnthropicToolsState = {
"messages": [],
"memory_files": {
"/memories/test.txt": {
"content": ["line1"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
}
},
}
args = {"command": "delete", "path": "/memories/test.txt"}
result = middleware._handle_delete(args, state, "test_id")
assert isinstance(result, Command)
files = result.update.get("memory_files", {})
# Deleted files are marked as None in state
assert files.get("/memories/test.txt") is None
def test_rename_operation(self) -> None:
"""Test rename command execution (memory only)."""
middleware = StateClaudeMemoryMiddleware()
state: AnthropicToolsState = {
"messages": [],
"memory_files": {
"/memories/old.txt": {
"content": ["line1"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
}
},
}
args = {
"command": "rename",
"old_path": "/memories/old.txt",
"new_path": "/memories/new.txt",
}
result = middleware._handle_rename(args, state, "test_id")
assert isinstance(result, Command)
files = result.update.get("memory_files", {})
# Old path is marked as None (deleted)
assert files.get("/memories/old.txt") is None
# New path has the file data
assert files.get("/memories/new.txt") is not None
assert files["/memories/new.txt"]["content"] == ["line1"]

View File

@@ -0,0 +1,461 @@
"""Unit tests for file search middleware."""
import pytest
from langchain.agents.middleware.anthropic_tools import AnthropicToolsState
from langchain.agents.middleware.file_search import StateFileSearchMiddleware
from langchain_core.messages import ToolMessage
class TestSearchMiddlewareInitialization:
"""Test search middleware initialization."""
def test_middleware_initialization(self) -> None:
"""Test middleware initializes correctly."""
middleware = StateFileSearchMiddleware()
assert middleware.state_schema == AnthropicToolsState
assert middleware.state_key == "text_editor_files"
def test_custom_state_key(self) -> None:
"""Test middleware with custom state key."""
middleware = StateFileSearchMiddleware(state_key="memory_files")
assert middleware.state_key == "memory_files"
class TestGlobSearch:
"""Test Glob file pattern matching."""
def test_glob_basic_pattern(self) -> None:
"""Test basic glob pattern matching."""
middleware = StateFileSearchMiddleware()
test_state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": ["print('hello')"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/src/utils.py": {
"content": ["def helper(): pass"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/README.md": {
"content": ["# Project"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
# Call tool function directly (state is injected in real usage)
result = middleware.glob_search.func(pattern="*.py", state=test_state)
assert isinstance(result, str)
assert "/src/main.py" in result
assert "/src/utils.py" in result
assert "/README.md" not in result
def test_glob_recursive_pattern(self) -> None:
"""Test recursive glob pattern matching."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": [],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/src/utils/helper.py": {
"content": [],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/tests/test_main.py": {
"content": [],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.glob_search.func(pattern="**/*.py", state=state)
assert isinstance(result, str)
lines = result.split("\n")
assert len(lines) == 3
assert all(".py" in line for line in lines)
def test_glob_with_base_path(self) -> None:
"""Test glob with base path restriction."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": [],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/tests/test.py": {
"content": [],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.glob_search.func(pattern="**/*.py", path="/src", state=state)
assert isinstance(result, str)
assert "/src/main.py" in result
assert "/tests/test.py" not in result
def test_glob_no_matches(self) -> None:
"""Test glob with no matching files."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": [],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.glob_search.func(pattern="*.ts", state=state)
assert isinstance(result, str)
assert result == "No files found"
def test_glob_sorts_by_modified_time(self) -> None:
"""Test that glob results are sorted by modification time."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/old.py": {
"content": [],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/new.py": {
"content": [],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-02T00:00:00",
},
},
}
result = middleware.glob_search.func(pattern="*.py", state=state)
lines = result.split("\n")
# Most recent first
assert lines[0] == "/new.py"
assert lines[1] == "/old.py"
class TestGrepSearch:
"""Test Grep content search."""
def test_grep_files_with_matches_mode(self) -> None:
"""Test grep with files_with_matches output mode."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": ["def foo():", " pass"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/src/utils.py": {
"content": ["def bar():", " return None"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/README.md": {
"content": ["# Documentation", "No code here"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.grep_search.func(pattern=r"def \w+\(\):", state=state)
assert isinstance(result, str)
assert "/src/main.py" in result
assert "/src/utils.py" in result
assert "/README.md" not in result
# Should only have file paths, not line content
assert "def foo():" not in result
def test_grep_content_mode(self) -> None:
"""Test grep with content output mode."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": ["def foo():", " pass", "def bar():"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.grep_search.func(
pattern=r"def \w+\(\):", output_mode="content", state=state
)
assert isinstance(result, str)
lines = result.split("\n")
assert len(lines) == 2
assert lines[0] == "/src/main.py:1:def foo():"
assert lines[1] == "/src/main.py:3:def bar():"
def test_grep_count_mode(self) -> None:
"""Test grep with count output mode."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": ["TODO: fix this", "print('hello')", "TODO: add tests"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/src/utils.py": {
"content": ["TODO: implement"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.grep_search.func(pattern=r"TODO", output_mode="count", state=state)
assert isinstance(result, str)
lines = result.split("\n")
assert "/src/main.py:2" in lines
assert "/src/utils.py:1" in lines
def test_grep_with_include_filter(self) -> None:
"""Test grep with include file pattern filter."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": ["import os"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/src/main.ts": {
"content": ["import os from 'os'"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.grep_search.func(pattern="import", include="*.py", state=state)
assert isinstance(result, str)
assert "/src/main.py" in result
assert "/src/main.ts" not in result
def test_grep_with_brace_expansion_filter(self) -> None:
"""Test grep with brace expansion in include filter."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.ts": {
"content": ["const x = 1"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/src/App.tsx": {
"content": ["const y = 2"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/src/main.py": {
"content": ["z = 3"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.grep_search.func(pattern="const", include="*.{ts,tsx}", state=state)
assert isinstance(result, str)
assert "/src/main.ts" in result
assert "/src/App.tsx" in result
assert "/src/main.py" not in result
def test_grep_with_base_path(self) -> None:
"""Test grep with base path restriction."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": ["import foo"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
"/tests/test.py": {
"content": ["import foo"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.grep_search.func(pattern="import", path="/src", state=state)
assert isinstance(result, str)
assert "/src/main.py" in result
assert "/tests/test.py" not in result
def test_grep_no_matches(self) -> None:
"""Test grep with no matching content."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": ["print('hello')"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.grep_search.func(pattern=r"TODO", state=state)
assert isinstance(result, str)
assert result == "No matches found"
def test_grep_invalid_regex(self) -> None:
"""Test grep with invalid regex pattern."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {},
}
result = middleware.grep_search.func(pattern=r"[unclosed", state=state)
assert isinstance(result, str)
assert "Invalid regex pattern" in result
class TestSearchWithDifferentBackends:
"""Test searching with different backend configurations."""
def test_glob_default_backend(self) -> None:
"""Test that glob searches the default backend (text_editor_files)."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": [],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
"memory_files": {
"/memories/notes.txt": {
"content": [],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.glob_search.func(pattern="**/*", state=state)
assert isinstance(result, str)
assert "/src/main.py" in result
# Should NOT find memory_files since default backend is text_editor_files
assert "/memories/notes.txt" not in result
def test_grep_default_backend(self) -> None:
"""Test that grep searches the default backend (text_editor_files)."""
middleware = StateFileSearchMiddleware()
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": ["TODO: implement"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
"memory_files": {
"/memories/tasks.txt": {
"content": ["TODO: review"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.grep_search.func(pattern=r"TODO", state=state)
assert isinstance(result, str)
assert "/src/main.py" in result
# Should NOT find memory_files since default backend is text_editor_files
assert "/memories/tasks.txt" not in result
def test_search_with_single_store(self) -> None:
"""Test searching with a specific state key."""
middleware = StateFileSearchMiddleware(state_key="text_editor_files")
state: AnthropicToolsState = {
"messages": [],
"text_editor_files": {
"/src/main.py": {
"content": ["code"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
"memory_files": {
"/memories/notes.txt": {
"content": ["notes"],
"created_at": "2025-01-01T00:00:00",
"modified_at": "2025-01-01T00:00:00",
},
},
}
result = middleware.grep_search.func(pattern=r".*", state=state)
assert isinstance(result, str)
assert "/src/main.py" in result
assert "/memories/notes.txt" not in result

View File

@@ -0,0 +1,114 @@
from langchain.agents.middleware.filesystem import (
FileData,
FilesystemState,
FilesystemMiddleware,
FILESYSTEM_SYSTEM_PROMPT,
FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT,
)
from langgraph.store.memory import InMemoryStore
from langgraph.runtime import Runtime
class TestFilesystem:
def test_init_local(self):
middleware = FilesystemMiddleware(use_longterm_memory=False)
assert middleware.use_longterm_memory is False
assert middleware.system_prompt_extension == FILESYSTEM_SYSTEM_PROMPT
assert len(middleware.tools) == 4
def test_init_longterm(self):
middleware = FilesystemMiddleware(use_longterm_memory=True)
assert middleware.use_longterm_memory is True
assert middleware.system_prompt_extension == (
FILESYSTEM_SYSTEM_PROMPT + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT
)
assert len(middleware.tools) == 4
def test_init_custom_system_prompt_shortterm(self):
middleware = FilesystemMiddleware(
use_longterm_memory=False, system_prompt_extension="Custom system prompt"
)
assert middleware.use_longterm_memory is False
assert middleware.system_prompt_extension == "Custom system prompt"
assert len(middleware.tools) == 4
def test_init_custom_system_prompt_longterm(self):
middleware = FilesystemMiddleware(
use_longterm_memory=True, system_prompt_extension="Custom system prompt"
)
assert middleware.use_longterm_memory is True
assert middleware.system_prompt_extension == "Custom system prompt"
assert len(middleware.tools) == 4
def test_init_custom_tool_descriptions_shortterm(self):
middleware = FilesystemMiddleware(
use_longterm_memory=False, custom_tool_descriptions={"ls": "Custom ls tool description"}
)
assert middleware.use_longterm_memory is False
assert middleware.system_prompt_extension == FILESYSTEM_SYSTEM_PROMPT
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
assert ls_tool.description == "Custom ls tool description"
def test_init_custom_tool_descriptions_longterm(self):
middleware = FilesystemMiddleware(
use_longterm_memory=True, custom_tool_descriptions={"ls": "Custom ls tool description"}
)
assert middleware.use_longterm_memory is True
assert middleware.system_prompt_extension == (
FILESYSTEM_SYSTEM_PROMPT + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT
)
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
assert ls_tool.description == "Custom ls tool description"
def test_ls_shortterm(self):
state = FilesystemState(
messages=[],
files={
"test.txt": FileData(
content=["Hello world"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
"test2.txt": FileData(
content=["Goodbye world"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
},
)
middleware = FilesystemMiddleware(use_longterm_memory=False)
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
result = ls_tool.invoke({"state": state})
assert result == ["test.txt", "test2.txt"]
def test_ls_shortterm_with_path(self):
state = FilesystemState(
messages=[],
files={
"/test.txt": FileData(
content=["Hello world"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
"/pokemon/test2.txt": FileData(
content=["Goodbye world"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
"/pokemon/charmander.txt": FileData(
content=["Ember"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
"/pokemon/water/squirtle.txt": FileData(
content=["Water"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
},
)
middleware = FilesystemMiddleware(use_longterm_memory=False)
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
result = ls_tool.invoke({"state": state, "path": "pokemon/"})
assert "/pokemon/test2.txt" in result
assert "/pokemon/charmander.txt" in result

View File

@@ -0,0 +1,35 @@
from langchain.agents.middleware.subagents import TASK_TOOL_DESCRIPTION, SubAgentMiddleware
import pytest
@pytest.mark.requires("langchain_openai")
class TestSubagentMiddleware:
"""Test the SubagentMiddleware class."""
def test_subagent_middleware_init(self):
middleware = SubAgentMiddleware(
default_subagent_model="gpt-4o-mini",
)
assert middleware is not None
assert middleware.system_prompt_extension == None
assert len(middleware.tools) == 1
assert middleware.tools[0].name == "task"
assert middleware.tools[0].description == TASK_TOOL_DESCRIPTION.format(other_agents="")
def test_default_subagent_with_tools(self):
middleware = SubAgentMiddleware(
default_subagent_model="gpt-4o-mini",
default_subagent_tools=[],
)
assert middleware is not None
assert middleware.system_prompt_extension == None
def test_default_subagent_custom_system_prompt_extension(self):
middleware = SubAgentMiddleware(
default_subagent_model="gpt-4o-mini",
default_subagent_tools=[],
system_prompt_extension="Use the task tool to call a subagent.",
)
assert middleware is not None
assert middleware.system_prompt_extension == "Use the task tool to call a subagent."

View File

@@ -1607,6 +1607,7 @@ lint = [
{ name = "ruff" },
]
test = [
{ name = "langchain-anthropic" },
{ name = "langchain-openai" },
{ name = "langchain-tests" },
{ name = "langchain-text-splitters" },
@@ -1659,6 +1660,7 @@ provides-extras = ["community", "anthropic", "openai", "google-vertexai", "googl
[package.metadata.requires-dev]
lint = [{ name = "ruff", specifier = ">=0.12.2,<0.13.0" }]
test = [
{ name = "langchain-anthropic" },
{ name = "langchain-openai", editable = "../partners/openai" },
{ name = "langchain-tests", editable = "../standard-tests" },
{ name = "langchain-text-splitters", editable = "../text-splitters" },