Compare commits

...

27 Commits

Author SHA1 Message Date
Sydney Runkle
853a498dc6 docstrings and general improvements 2025-10-14 21:57:36 -04:00
Sydney Runkle
3757a8a555 more improvements 2025-10-14 21:22:15 -04:00
Sydney Runkle
b642d8f06f sub agent 2025-10-14 20:55:02 -04:00
Sydney Runkle
24e2a95b67 more custom interface 2025-10-14 17:02:54 -04:00
Sydney Runkle
d642285c5a removing is_async 2025-10-14 15:49:43 -04:00
Sydney Runkle
086c6cc872 Merge branch 'master' into sr/deepagents 2025-10-14 15:23:07 -04:00
Sydney Runkle
83e7c4ea74 linting, renaming 2025-10-14 15:22:31 -04:00
Sydney Runkle
6677d57cdd HITL changes 2025-10-14 15:15:10 -04:00
Sydney Runkle
5e93774258 Merge branch 'master' into sr/deepagents 2025-10-14 15:10:03 -04:00
Sydney Runkle
93108324df remove anthropic stuff 2025-10-14 15:06:54 -04:00
Sydney Runkle
6eb8377e12 excluded keys 2025-10-14 14:30:03 -04:00
Nick Huang
ecbd43b660 Change how namespace is built 2025-10-13 14:45:05 -04:00
Nick Huang
918e2ee567 Update default model 2025-10-10 17:22:22 -04:00
Nick Huang
bcbf2fff5f Fix import in test 2025-10-10 16:05:00 -04:00
Nick Huang
206fa156ac Add dependencies to tests 2025-10-10 16:01:25 -04:00
Nick Huang
1713a149e4 Address comments 2025-10-10 15:45:50 -04:00
Nick Huang
f682a9e0a4 Fix and simplify filesystem substantially - add integration tests for filesystem 2025-10-10 14:48:38 -04:00
Nick Huang
c7a52e8c78 Fix formatting 2025-10-09 23:31:54 -04:00
Nick Huang
9cab0be047 Update imports 2025-10-09 23:16:06 -04:00
Nick Huang
7340cb5353 Share code between anthropic tools and filesystem, add deepagents 2025-10-09 22:09:19 -04:00
Nick Huang
7eedbf2a40 Merge remote-tracking branch 'origin/nc/9oct/file-tools-middleware' into nh/subagent-middleware 2025-10-09 17:31:14 -04:00
Nick Huang
203323d249 Remove claude file 2025-10-09 17:22:39 -04:00
Nick Huang
290e83863f resolve merge conflicts 2025-10-09 17:20:56 -04:00
Nick Huang
6b6057ff1d Resolve merge conflicts 2025-10-09 17:17:20 -04:00
Nick Huang
544a755887 Fix linting 2025-10-09 17:14:40 -04:00
Nick Huang
89d0fff26f Add subagents middleware 2025-10-09 16:42:51 -04:00
Nuno Campos
bd5ea78bfd feat(langchain_v1): Add Anthropic tools middleware with text editor, memory, and file
search

Middleware Classes

Text Editor Tools
- StateClaudeTextEditorToolMiddleware: In-memory text editor using agent state
- FilesystemClaudeTextEditorToolMiddleware: Text editor operating on real filesystem

Implementing Claude's text editor tools
https://docs.claude.com/en/docs/agents-and-tools/tool-use/text-editor-tool Operations:
view, create, str_replace, insert

Memory Tools
- StateClaudeMemoryToolMiddleware: Memory persistence in agent state
- FilesystemClaudeMemoryToolMiddleware: Memory persistence on filesystem

Implementing Claude's memory tools
https://docs.claude.com/en/docs/agents-and-tools/tool-use/memory-tool Operations: Same
as text editor plus delete and rename

File Search Tools
- StateFileSearchMiddleware: Search state-based files
- FilesystemFileSearchMiddleware: Search real filesystem

Provides Glob and Grep tools with same schema as used by Claude Code (but compatible
with any model)
- Glob: Pattern matching (e.g., **/*.py, src/**/*.ts), sorted by modification time
- Grep: Regex content search with output modes (files_with_matches, content, count)

Usage

``` from langchain.agents import create_agent from langchain.agents.middleware import (
StateTextEditorToolMiddleware, StateFileSearchMiddleware, )

agent = create_agent( model=model, tools=[], middleware=[
StateTextEditorToolMiddleware(), StateFileSearchMiddleware(), ], ) ```
2025-10-09 12:58:08 +01:00
16 changed files with 3005 additions and 8 deletions

View File

@@ -0,0 +1,423 @@
"""Shared utility functions for file operations in middleware."""
from __future__ import annotations
import os
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Literal
from typing_extensions import TypedDict
if TYPE_CHECKING:
from collections.abc import Sequence
# Constants
MEMORIES_PREFIX = "/memories/"
EMPTY_CONTENT_WARNING = "System reminder: File exists but has empty contents"
MAX_LINE_LENGTH = 2000
LINE_NUMBER_WIDTH = 6
class FileData(TypedDict):
"""Data structure for storing file contents with metadata."""
content: list[str]
"""Lines of the file."""
created_at: str
"""ISO 8601 timestamp of file creation."""
modified_at: str
"""ISO 8601 timestamp of last modification."""
def file_data_reducer(
left: dict[str, FileData] | None, right: dict[str, FileData | None]
) -> dict[str, FileData]:
"""Merge file updates with support for deletions.
This reducer enables file deletion by treating `None` values in the right
dictionary as deletion markers. It's designed to work with LangGraph's
state management where annotated reducers control how state updates merge.
Args:
left: Existing files dictionary. May be `None` during initialization.
right: New files dictionary to merge. Files with `None` values are
treated as deletion markers and removed from the result.
Returns:
Merged dictionary where right overwrites left for matching keys,
and `None` values in right trigger deletions.
Example:
```python
existing = {"/file1.txt": FileData(...), "/file2.txt": FileData(...)}
updates = {"/file2.txt": None, "/file3.txt": FileData(...)}
result = file_data_reducer(existing, updates)
# Result: {"/file1.txt": FileData(...), "/file3.txt": FileData(...)}
```
"""
if left is None:
# Filter out None values when initializing
return {k: v for k, v in right.items() if v is not None}
# Merge, filtering out None values (deletions)
result = {**left}
for key, value in right.items():
if value is None:
result.pop(key, None)
else:
result[key] = value
return result
def validate_path(path: str, *, allowed_prefixes: Sequence[str] | None = None) -> str:
"""Validate and normalize file path for security.
Ensures paths are safe to use by preventing directory traversal attacks
and enforcing consistent formatting. All paths are normalized to use
forward slashes and start with a leading slash.
Args:
path: The path to validate and normalize.
allowed_prefixes: Optional list of allowed path prefixes. If provided,
the normalized path must start with one of these prefixes.
Returns:
Normalized canonical path starting with `/` and using forward slashes.
Raises:
ValueError: If path contains traversal sequences (`..` or `~`) or does
not start with an allowed prefix when `allowed_prefixes` is specified.
Example:
```python
validate_path("foo/bar") # Returns: "/foo/bar"
validate_path("/./foo//bar") # Returns: "/foo/bar"
validate_path("../etc/passwd") # Raises ValueError
validate_path("/data/file.txt", allowed_prefixes=["/data/"]) # OK
validate_path("/etc/file.txt", allowed_prefixes=["/data/"]) # Raises ValueError
```
"""
# Reject paths with traversal attempts
if ".." in path or path.startswith("~"):
msg = f"Path traversal not allowed: {path}"
raise ValueError(msg)
# Normalize path (resolve ., //, etc.)
normalized = os.path.normpath(path)
# Convert to forward slashes for consistency
normalized = normalized.replace("\\", "/")
# Ensure path starts with /
if not normalized.startswith("/"):
normalized = f"/{normalized}"
# Check allowed prefixes if specified
if allowed_prefixes is not None and not any(
normalized.startswith(prefix) for prefix in allowed_prefixes
):
msg = f"Path must start with one of {allowed_prefixes}: {path}"
raise ValueError(msg)
return normalized
def format_content_with_line_numbers(
content: str | list[str],
*,
format_style: Literal["pipe", "tab"] = "pipe",
start_line: int = 1,
) -> str:
r"""Format file content with line numbers for display.
Converts file content to a numbered format similar to `cat -n` output,
with support for two different formatting styles.
Args:
content: File content as a string or list of lines.
format_style: Format style for line numbers:
- `"pipe"`: Compact format like `"1|content"`
- `"tab"`: Right-aligned format like `" 1\tcontent"` (lines truncated at 2000 chars)
start_line: Starting line number (default: 1).
Returns:
Formatted content with line numbers prepended to each line.
Example:
```python
content = "Hello\nWorld"
format_content_with_line_numbers(content, format_style="pipe")
# Returns: "1|Hello\n2|World"
format_content_with_line_numbers(content, format_style="tab", start_line=10)
# Returns: " 10\tHello\n 11\tWorld"
```
"""
if isinstance(content, str):
lines = content.split("\n")
# Remove trailing empty line from split
if lines and lines[-1] == "":
lines = lines[:-1]
else:
lines = content
if format_style == "pipe":
return "\n".join(f"{i + start_line}|{line}" for i, line in enumerate(lines))
# Tab format with defined width and line truncation
return "\n".join(
f"{i + start_line:{LINE_NUMBER_WIDTH}d}\t{line[:MAX_LINE_LENGTH]}"
for i, line in enumerate(lines)
)
def apply_string_replacement(
content: str,
old_string: str,
new_string: str,
*,
replace_all: bool = False,
) -> tuple[str, int]:
"""Apply exact string replacement to content.
Replaces occurrences of a string within content and returns both the
modified content and the number of replacements made.
Args:
content: Original content to modify.
old_string: String to find and replace.
new_string: Replacement string.
replace_all: If `True`, replace all occurrences. If `False`, replace
only the first occurrence (default).
Returns:
Tuple of `(modified_content, replacement_count)`.
Example:
```python
content = "foo bar foo"
apply_string_replacement(content, "foo", "baz", replace_all=False)
# Returns: ("baz bar foo", 1)
apply_string_replacement(content, "foo", "baz", replace_all=True)
# Returns: ("baz bar baz", 2)
```
"""
if replace_all:
count = content.count(old_string)
new_content = content.replace(old_string, new_string)
else:
count = 1 if old_string in content else 0
new_content = content.replace(old_string, new_string, 1)
return new_content, count
def create_file_data(
content: str | list[str],
*,
created_at: str | None = None,
) -> FileData:
r"""Create a FileData object with automatic timestamp generation.
Args:
content: File content as a string or list of lines.
created_at: Optional creation timestamp in ISO 8601 format.
If `None`, uses the current UTC time.
Returns:
FileData object with content and timestamps.
Example:
```python
file_data = create_file_data("Hello\nWorld")
# Returns: {"content": ["Hello", "World"], "created_at": "2024-...",
# "modified_at": "2024-..."}
```
"""
lines = content.split("\n") if isinstance(content, str) else content
now = datetime.now(timezone.utc).isoformat()
return {
"content": lines,
"created_at": created_at or now,
"modified_at": now,
}
def update_file_data(
file_data: FileData,
content: str | list[str],
) -> FileData:
"""Update FileData with new content while preserving creation timestamp.
Args:
file_data: Existing FileData object to update.
content: New file content as a string or list of lines.
Returns:
Updated FileData object with new content and updated `modified_at`
timestamp. The `created_at` timestamp is preserved from the original.
Example:
```python
original = create_file_data("Hello")
updated = update_file_data(original, "Hello World")
# updated["created_at"] == original["created_at"]
# updated["modified_at"] > original["modified_at"]
```
"""
lines = content.split("\n") if isinstance(content, str) else content
now = datetime.now(timezone.utc).isoformat()
return {
"content": lines,
"created_at": file_data["created_at"],
"modified_at": now,
}
def file_data_to_string(file_data: FileData) -> str:
r"""Convert FileData to plain string content.
Joins the lines stored in FileData with newline characters to produce
a single string representation of the file content.
Args:
file_data: FileData object containing lines of content.
Returns:
File content as a single string with lines joined by newlines.
Example:
```python
file_data = {
"content": ["Hello", "World"],
"created_at": "...",
"modified_at": "...",
}
file_data_to_string(file_data) # Returns: "Hello\nWorld"
```
"""
return "\n".join(file_data["content"])
def list_directory(files: dict[str, FileData], path: str) -> list[str]:
"""List files in a directory (direct children only).
Returns only the direct children of the specified directory path,
excluding files in subdirectories.
Args:
files: Dictionary mapping file paths to FileData objects.
path: Normalized directory path to list files from.
Returns:
Sorted list of file paths that are direct children of the directory.
Example:
```python
files = {
"/dir/file1.txt": FileData(...),
"/dir/file2.txt": FileData(...),
"/dir/subdir/file3.txt": FileData(...),
}
list_directory(files, "/dir")
# Returns: ["/dir/file1.txt", "/dir/file2.txt"]
# Note: /dir/subdir/file3.txt is excluded (not a direct child)
```
"""
# Ensure path ends with / for directory matching
dir_path = path if path.endswith("/") else f"{path}/"
matching_files = []
for file_path in files:
if file_path.startswith(dir_path):
# Get relative path from directory
relative = file_path[len(dir_path) :]
# Only include direct children (no subdirectories)
if "/" not in relative:
matching_files.append(file_path)
return sorted(matching_files)
def check_empty_content(content: str) -> str | None:
"""Check if file content is empty and return a warning message.
Args:
content: File content to check.
Returns:
Warning message string if content is empty or contains only whitespace,
`None` otherwise.
Example:
```python
check_empty_content("") # Returns: "System reminder: File exists but has empty contents"
check_empty_content(" ") # Returns: "System reminder: File exists but has empty contents"
check_empty_content("Hello") # Returns: None
```
"""
if not content or content.strip() == "":
return EMPTY_CONTENT_WARNING
return None
def has_memories_prefix(file_path: str) -> bool:
"""Check if a file path is in the longterm memory filesystem.
Longterm memory files are distinguished by the `/memories/` path prefix.
Args:
file_path: File path to check.
Returns:
`True` if the file path starts with `/memories/`, `False` otherwise.
Example:
```python
has_memories_prefix("/memories/notes.txt") # Returns: True
has_memories_prefix("/temp/file.txt") # Returns: False
```
"""
return file_path.startswith(MEMORIES_PREFIX)
def append_memories_prefix(file_path: str) -> str:
"""Add the longterm memory prefix to a file path.
Args:
file_path: File path to prefix.
Returns:
File path with `/memories` prepended.
Example:
```python
append_memories_prefix("/notes.txt") # Returns: "/memories/notes.txt"
```
"""
return f"/memories{file_path}"
def strip_memories_prefix(file_path: str) -> str:
"""Remove the longterm memory prefix from a file path.
Args:
file_path: File path potentially containing the memories prefix.
Returns:
File path with `/memories` removed if present at the start.
Example:
```python
strip_memories_prefix("/memories/notes.txt") # Returns: "/notes.txt"
strip_memories_prefix("/notes.txt") # Returns: "/notes.txt"
```
"""
if file_path.startswith(MEMORIES_PREFIX):
return file_path[len(MEMORIES_PREFIX) - 1 :] # Keep the leading slash
return file_path

View File

@@ -4,6 +4,7 @@ from .context_editing import (
ClearToolUsesEdit,
ContextEditingMiddleware,
)
from .filesystem import FilesystemMiddleware
from .human_in_the_loop import (
HumanInTheLoopMiddleware,
InterruptOnConfig,
@@ -11,8 +12,9 @@ from .human_in_the_loop import (
from .model_call_limit import ModelCallLimitMiddleware
from .model_fallback import ModelFallbackMiddleware
from .pii import PIIDetectionError, PIIMiddleware
from .subagents import SubAgentMiddleware
from .summarization import SummarizationMiddleware
from .todo import TodoListMiddleware
from .todo_list import TodoListMiddleware
from .tool_call_limit import ToolCallLimitMiddleware
from .tool_emulator import LLMToolEmulator
from .tool_selection import LLMToolSelectorMiddleware
@@ -36,6 +38,7 @@ __all__ = [
"AgentState",
"ClearToolUsesEdit",
"ContextEditingMiddleware",
"FilesystemMiddleware",
"HumanInTheLoopMiddleware",
"InterruptOnConfig",
"LLMToolEmulator",
@@ -46,6 +49,7 @@ __all__ = [
"ModelResponse",
"PIIDetectionError",
"PIIMiddleware",
"SubAgentMiddleware",
"SummarizationMiddleware",
"TodoListMiddleware",
"ToolCallLimitMiddleware",

View File

@@ -0,0 +1,723 @@
"""Middleware for providing filesystem tools to an agent."""
# ruff: noqa: E501
from collections.abc import Callable
from typing import TYPE_CHECKING, Annotated, Any
from typing_extensions import NotRequired
if TYPE_CHECKING:
from langgraph.runtime import Runtime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, InjectedToolCallId, tool
from langgraph.config import get_config
from langgraph.runtime import Runtime, get_runtime
from langgraph.store.base import BaseStore, Item
from langgraph.types import Command
from langchain.agents._internal.file_utils import (
FileData,
append_memories_prefix,
check_empty_content,
create_file_data,
file_data_reducer,
file_data_to_string,
format_content_with_line_numbers,
has_memories_prefix,
strip_memories_prefix,
update_file_data,
validate_path,
)
from langchain.agents.middleware.types import (
AgentMiddleware,
AgentState,
ModelRequest,
ModelResponse,
)
from langchain.tools.tool_node import InjectedState
# Constants
LONGTERM_MEMORY_PREFIX = "/memories/"
DEFAULT_READ_OFFSET = 0
DEFAULT_READ_LIMIT = 2000
class FilesystemState(AgentState):
"""State for the filesystem middleware."""
files: Annotated[NotRequired[dict[str, FileData]], file_data_reducer]
"""Files in the filesystem."""
LIST_FILES_TOOL_DESCRIPTION = """Lists all files in the filesystem, optionally filtering by directory.
Usage:
- The list_files tool will return a list of all files in the filesystem.
- You can optionally provide a path parameter to list files in a specific directory.
- This is very useful for exploring the file system and finding the right file to read or edit.
- You should almost ALWAYS use this tool before using the Read or Edit tools."""
LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- Files from the longterm filesystem will be prefixed with the {LONGTERM_MEMORY_PREFIX} path."
READ_FILE_TOOL_DESCRIPTION = """Reads a file from the filesystem. You can access any file directly by using this tool.
Assume this tool is able to read all files on the machine. If the User provides a path to a file assume that path is valid. It is okay to read a file that does not exist; an error will be returned.
Usage:
- The file_path parameter must be an absolute path, not a relative path
- By default, it reads up to 2000 lines starting from the beginning of the file
- You can optionally specify a line offset and limit (especially handy for long files), but it's recommended to read the whole file by not providing these parameters
- Any lines longer than 2000 characters will be truncated
- Results are returned using cat -n format, with line numbers starting at 1
- You have the capability to call multiple tools in a single response. It is always better to speculatively read multiple files as a batch that are potentially useful.
- If you read a file that exists but has empty contents you will receive a system reminder warning in place of file contents.
- You should ALWAYS make sure a file has been read before editing it."""
READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- file_paths prefixed with the {LONGTERM_MEMORY_PREFIX} path will be read from the longterm filesystem."
EDIT_FILE_TOOL_DESCRIPTION = """Performs exact string replacements in files.
Usage:
- You must use your `Read` tool at least once in the conversation before editing. This tool will error if you attempt an edit without reading the file.
- When editing text from Read tool output, ensure you preserve the exact indentation (tabs/spaces) as it appears AFTER the line number prefix. The line number prefix format is: spaces + line number + tab. Everything after that tab is the actual file content to match. Never include any part of the line number prefix in the old_string or new_string.
- ALWAYS prefer editing existing files. NEVER write new files unless explicitly required.
- Only use emojis if the user explicitly requests it. Avoid adding emojis to files unless asked.
- The edit will FAIL if `old_string` is not unique in the file. Either provide a larger string with more surrounding context to make it unique or use `replace_all` to change every instance of `old_string`.
- Use `replace_all` for replacing and renaming strings across the file. This parameter is useful if you want to rename a variable for instance."""
EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- You can edit files in the longterm filesystem by prefixing the filename with the {LONGTERM_MEMORY_PREFIX} path."
WRITE_FILE_TOOL_DESCRIPTION = """Writes to a new file in the filesystem.
Usage:
- The file_path parameter must be an absolute path, not a relative path
- The content parameter must be a string
- The write_file tool will create the a new file.
- Prefer to edit existing files over creating new ones when possible.
- file_paths prefixed with the /memories/ path will be written to the longterm filesystem."""
WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- file_paths prefixed with the {LONGTERM_MEMORY_PREFIX} path will be written to the longterm filesystem."
FILESYSTEM_SYSTEM_PROMPT = """## Filesystem Tools `ls`, `read_file`, `write_file`, `edit_file`
You have access to a filesystem which you can interact with using these tools.
All file paths must start with a /.
- ls: list all files in the filesystem
- read_file: read a file from the filesystem
- write_file: write to a file in the filesystem
- edit_file: edit a file in the filesystem"""
FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT = f"""
You also have access to a longterm filesystem in which you can store files that you want to keep around for longer than the current conversation.
In order to interact with the longterm filesystem, you can use those same tools, but filenames must be prefixed with the {LONGTERM_MEMORY_PREFIX} path.
Remember, to interact with the longterm filesystem, you must prefix the filename with the {LONGTERM_MEMORY_PREFIX} path."""
def _get_namespace() -> tuple[str] | tuple[str, str]:
"""Get the namespace for longterm filesystem storage.
Returns a tuple for organizing files in the store. If an assistant_id is available
in the config metadata, returns a 2-tuple of (assistant_id, "filesystem") to provide
per-assistant isolation. Otherwise, returns a 1-tuple of ("filesystem",) for shared storage.
Returns:
Namespace tuple for store operations, either `(assistant_id, "filesystem")` or `("filesystem",)`.
"""
namespace = "filesystem"
config = get_config()
if config is None:
return (namespace,)
assistant_id = config.get("metadata", {}).get("assistant_id")
if assistant_id is None:
return (namespace,)
return (assistant_id, "filesystem")
def _get_store(runtime: Runtime[Any]) -> BaseStore:
"""Get the store from the runtime, raising an error if unavailable.
Args:
runtime: The LangGraph runtime containing the store.
Returns:
The BaseStore instance for longterm file storage.
Raises:
ValueError: If longterm memory is enabled but no store is available in runtime.
"""
if runtime.store is None:
msg = "Longterm memory is enabled, but no store is available"
raise ValueError(msg)
return runtime.store
def _convert_store_item_to_file_data(store_item: Item) -> FileData:
"""Convert a store Item to FileData format.
Args:
store_item: The store Item containing file data.
Returns:
FileData with content, created_at, and modified_at fields.
Raises:
ValueError: If required fields are missing or have incorrect types.
"""
if "content" not in store_item.value or not isinstance(store_item.value["content"], list):
msg = f"Store item does not contain valid content field. Got: {store_item.value.keys()}"
raise ValueError(msg)
if "created_at" not in store_item.value or not isinstance(store_item.value["created_at"], str):
msg = f"Store item does not contain valid created_at field. Got: {store_item.value.keys()}"
raise ValueError(msg)
if "modified_at" not in store_item.value or not isinstance(
store_item.value["modified_at"], str
):
msg = f"Store item does not contain valid modified_at field. Got: {store_item.value.keys()}"
raise ValueError(msg)
return FileData(
content=store_item.value["content"],
created_at=store_item.value["created_at"],
modified_at=store_item.value["modified_at"],
)
def _convert_file_data_to_store_item(file_data: FileData) -> dict[str, Any]:
"""Convert FileData to a dict suitable for store.put().
Args:
file_data: The FileData to convert.
Returns:
Dictionary with content, created_at, and modified_at fields.
"""
return {
"content": file_data["content"],
"created_at": file_data["created_at"],
"modified_at": file_data["modified_at"],
}
def _get_file_data_from_state(state: FilesystemState, file_path: str) -> FileData:
"""Retrieve file data from the agent's state.
Args:
state: The current filesystem state.
file_path: The path of the file to retrieve.
Returns:
The FileData for the requested file.
Raises:
ValueError: If the file is not found in state.
"""
mock_filesystem = state.get("files", {})
if file_path not in mock_filesystem:
msg = f"File '{file_path}' not found"
raise ValueError(msg)
return mock_filesystem[file_path]
def _ls_tool_generator(
custom_description: str | None = None, *, long_term_memory: bool
) -> BaseTool:
"""Generate the ls (list files) tool.
Args:
custom_description: Optional custom description for the tool.
long_term_memory: Whether to enable longterm memory support.
Returns:
Configured ls tool that lists files from state and optionally from longterm store.
"""
tool_description = LIST_FILES_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif long_term_memory:
tool_description += LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
def _get_filenames_from_state(state: FilesystemState) -> list[str]:
"""Extract list of filenames from the filesystem state.
Args:
state: The current filesystem state.
Returns:
List of file paths in the state.
"""
files_dict = state.get("files", {})
return list(files_dict.keys())
def _filter_files_by_path(filenames: list[str], path: str | None) -> list[str]:
"""Filter filenames by path prefix.
Args:
filenames: List of file paths to filter.
path: Optional path prefix to filter by.
Returns:
Filtered list of file paths matching the prefix.
"""
if path is None:
return filenames
normalized_path = validate_path(path)
return [f for f in filenames if f.startswith(normalized_path)]
if long_term_memory:
@tool(description=tool_description)
def ls(
state: Annotated[FilesystemState, InjectedState], path: str | None = None
) -> list[str]:
files = _get_filenames_from_state(state)
# Add filenames from longterm memory
runtime = get_runtime()
store = _get_store(runtime)
namespace = _get_namespace()
longterm_files = store.search(namespace)
longterm_files_prefixed = [append_memories_prefix(f.key) for f in longterm_files]
files.extend(longterm_files_prefixed)
return _filter_files_by_path(files, path)
else:
@tool(description=tool_description)
def ls(
state: Annotated[FilesystemState, InjectedState], path: str | None = None
) -> list[str]:
files = _get_filenames_from_state(state)
return _filter_files_by_path(files, path)
return ls
def _read_file_tool_generator(
custom_description: str | None = None, *, long_term_memory: bool
) -> BaseTool:
"""Generate the read_file tool.
Args:
custom_description: Optional custom description for the tool.
long_term_memory: Whether to enable longterm memory support.
Returns:
Configured read_file tool that reads files from state and optionally from longterm store.
"""
tool_description = READ_FILE_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif long_term_memory:
tool_description += READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
def _read_file_data_content(file_data: FileData, offset: int, limit: int) -> str:
"""Read and format file content with line numbers.
Args:
file_data: The file data to read.
offset: Line offset to start reading from (0-indexed).
limit: Maximum number of lines to read.
Returns:
Formatted file content with line numbers, or an error message.
"""
content = file_data_to_string(file_data)
empty_msg = check_empty_content(content)
if empty_msg:
return empty_msg
lines = content.splitlines()
start_idx = offset
end_idx = min(start_idx + limit, len(lines))
if start_idx >= len(lines):
return f"Error: Line offset {offset} exceeds file length ({len(lines)} lines)"
selected_lines = lines[start_idx:end_idx]
return format_content_with_line_numbers(
selected_lines, format_style="tab", start_line=start_idx + 1
)
if long_term_memory:
@tool(description=tool_description)
def read_file(
file_path: str,
state: Annotated[FilesystemState, InjectedState],
offset: int = DEFAULT_READ_OFFSET,
limit: int = DEFAULT_READ_LIMIT,
) -> str:
file_path = validate_path(file_path)
if has_memories_prefix(file_path):
stripped_file_path = strip_memories_prefix(file_path)
runtime = get_runtime()
store = _get_store(runtime)
namespace = _get_namespace()
item: Item | None = store.get(namespace, stripped_file_path)
if item is None:
return f"Error: File '{file_path}' not found"
file_data = _convert_store_item_to_file_data(item)
else:
try:
file_data = _get_file_data_from_state(state, file_path)
except ValueError as e:
return str(e)
return _read_file_data_content(file_data, offset, limit)
else:
@tool(description=tool_description)
def read_file(
file_path: str,
state: Annotated[FilesystemState, InjectedState],
offset: int = DEFAULT_READ_OFFSET,
limit: int = DEFAULT_READ_LIMIT,
) -> str:
file_path = validate_path(file_path)
try:
file_data = _get_file_data_from_state(state, file_path)
except ValueError as e:
return str(e)
return _read_file_data_content(file_data, offset, limit)
return read_file
def _write_file_tool_generator(
custom_description: str | None = None, *, long_term_memory: bool
) -> BaseTool:
"""Generate the write_file tool.
Args:
custom_description: Optional custom description for the tool.
long_term_memory: Whether to enable longterm memory support.
Returns:
Configured write_file tool that creates new files in state or longterm store.
"""
tool_description = WRITE_FILE_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif long_term_memory:
tool_description += WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
def _write_file_to_state(
state: FilesystemState, tool_call_id: str, file_path: str, content: str
) -> Command | str:
"""Write a new file to the filesystem state.
Args:
state: The current filesystem state.
tool_call_id: ID of the tool call for generating ToolMessage.
file_path: The path where the file should be written.
content: The content to write to the file.
Returns:
Command to update state with new file, or error string if file exists.
"""
mock_filesystem = state.get("files", {})
existing = mock_filesystem.get(file_path)
if existing:
return f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path."
new_file_data = create_file_data(content)
return Command(
update={
"files": {file_path: new_file_data},
"messages": [ToolMessage(f"Updated file {file_path}", tool_call_id=tool_call_id)],
}
)
if long_term_memory:
@tool(description=tool_description)
def write_file(
file_path: str,
content: str,
state: Annotated[FilesystemState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
) -> Command | str:
file_path = validate_path(file_path)
if has_memories_prefix(file_path):
stripped_file_path = strip_memories_prefix(file_path)
runtime = get_runtime()
store = _get_store(runtime)
namespace = _get_namespace()
if store.get(namespace, stripped_file_path) is not None:
return f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path."
new_file_data = create_file_data(content)
store.put(
namespace, stripped_file_path, _convert_file_data_to_store_item(new_file_data)
)
return f"Updated longterm memories file {file_path}"
return _write_file_to_state(state, tool_call_id, file_path, content)
else:
@tool(description=tool_description)
def write_file(
file_path: str,
content: str,
state: Annotated[FilesystemState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
) -> Command | str:
file_path = validate_path(file_path)
return _write_file_to_state(state, tool_call_id, file_path, content)
return write_file
def _edit_file_tool_generator(
custom_description: str | None = None, *, long_term_memory: bool
) -> BaseTool:
"""Generate the edit_file tool.
Args:
custom_description: Optional custom description for the tool.
long_term_memory: Whether to enable longterm memory support.
Returns:
Configured edit_file tool that performs string replacements in files.
"""
tool_description = EDIT_FILE_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif long_term_memory:
tool_description += EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
def _perform_file_edit(
file_data: FileData,
old_string: str,
new_string: str,
*,
replace_all: bool = False,
) -> tuple[FileData, str] | str:
"""Perform string replacement on file data.
Args:
file_data: The file data to edit.
old_string: String to find and replace.
new_string: Replacement string.
replace_all: If True, replace all occurrences.
Returns:
Tuple of (updated_file_data, success_message) on success,
or error string on failure.
"""
content = file_data_to_string(file_data)
occurrences = content.count(old_string)
if occurrences == 0:
return f"Error: String not found in file: '{old_string}'"
if occurrences > 1 and not replace_all:
return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context."
new_content = content.replace(old_string, new_string)
new_file_data = update_file_data(file_data, new_content)
result_msg = f"Successfully replaced {occurrences} instance(s) of the string"
return new_file_data, result_msg
if long_term_memory:
@tool(description=tool_description)
def edit_file(
file_path: str,
old_string: str,
new_string: str,
state: Annotated[FilesystemState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
*,
replace_all: bool = False,
) -> Command | str:
file_path = validate_path(file_path)
is_longterm_memory = has_memories_prefix(file_path)
# Retrieve file data from appropriate storage
if is_longterm_memory:
stripped_file_path = strip_memories_prefix(file_path)
runtime = get_runtime()
store = _get_store(runtime)
namespace = _get_namespace()
item: Item | None = store.get(namespace, stripped_file_path)
if item is None:
return f"Error: File '{file_path}' not found"
file_data = _convert_store_item_to_file_data(item)
else:
try:
file_data = _get_file_data_from_state(state, file_path)
except ValueError as e:
return str(e)
# Perform the edit
result = _perform_file_edit(file_data, old_string, new_string, replace_all=replace_all)
if isinstance(result, str): # Error message
return result
new_file_data, result_msg = result
full_msg = f"{result_msg} in '{file_path}'"
# Save to appropriate storage
if is_longterm_memory:
store.put(
namespace, stripped_file_path, _convert_file_data_to_store_item(new_file_data)
)
return full_msg
return Command(
update={
"files": {file_path: new_file_data},
"messages": [ToolMessage(full_msg, tool_call_id=tool_call_id)],
}
)
else:
@tool(description=tool_description)
def edit_file(
file_path: str,
old_string: str,
new_string: str,
state: Annotated[FilesystemState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
*,
replace_all: bool = False,
) -> Command | str:
file_path = validate_path(file_path)
# Retrieve file data from state
try:
file_data = _get_file_data_from_state(state, file_path)
except ValueError as e:
return str(e)
# Perform the edit
result = _perform_file_edit(file_data, old_string, new_string, replace_all=replace_all)
if isinstance(result, str): # Error message
return result
new_file_data, result_msg = result
full_msg = f"{result_msg} in '{file_path}'"
return Command(
update={
"files": {file_path: new_file_data},
"messages": [ToolMessage(full_msg, tool_call_id=tool_call_id)],
}
)
return edit_file
TOOL_GENERATORS = {
"ls": _ls_tool_generator,
"read_file": _read_file_tool_generator,
"write_file": _write_file_tool_generator,
"edit_file": _edit_file_tool_generator,
}
def _get_filesystem_tools(
custom_tool_descriptions: dict[str, str] | None = None, *, long_term_memory: bool
) -> list[BaseTool]:
"""Get filesystem tools.
Args:
custom_tool_descriptions: Optional custom descriptions for tools.
long_term_memory: Whether to enable longterm memory support.
Returns:
List of configured filesystem tools (ls, read_file, write_file, edit_file).
"""
if custom_tool_descriptions is None:
custom_tool_descriptions = {}
tools = []
for tool_name, tool_generator in TOOL_GENERATORS.items():
tool = tool_generator(
custom_tool_descriptions.get(tool_name), long_term_memory=long_term_memory
)
tools.append(tool)
return tools
class FilesystemMiddleware(AgentMiddleware):
"""Middleware for providing filesystem tools to an agent.
This middleware adds four filesystem tools to the agent: ls, read_file, write_file,
and edit_file. Files can be stored in two locations:
- Short-term: In the agent's state (ephemeral, lasts only for the conversation)
- Long-term: In a persistent store (persists across conversations when enabled)
Args:
long_term_memory: Whether to enable longterm memory support.
system_prompt_extension: Optional custom system prompt override.
custom_tool_descriptions: Optional custom tool descriptions override.
Raises:
ValueError: If longterm memory is enabled but no store is available.
Example:
```python
from langchain.agents.middleware.filesystem import FilesystemMiddleware
from langchain.agents import create_agent
# Short-term memory only
agent = create_agent(middleware=[FilesystemMiddleware(long_term_memory=False)])
# With long-term memory
agent = create_agent(middleware=[FilesystemMiddleware(long_term_memory=True)])
```
"""
state_schema = FilesystemState
def __init__(
self,
*,
long_term_memory: bool = False,
system_prompt_extension: str | None = None,
custom_tool_descriptions: dict[str, str] | None = None,
) -> None:
"""Initialize the filesystem middleware.
Args:
long_term_memory: Whether to enable longterm memory support.
system_prompt_extension: Optional custom system prompt override.
custom_tool_descriptions: Optional custom tool descriptions override.
"""
self.long_term_memory = long_term_memory
self.system_prompt_extension = FILESYSTEM_SYSTEM_PROMPT
if system_prompt_extension is not None:
self.system_prompt_extension = system_prompt_extension
elif long_term_memory:
self.system_prompt_extension += FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT
self.tools = _get_filesystem_tools(
custom_tool_descriptions, long_term_memory=long_term_memory
)
def before_model_call(self, request: ModelRequest, runtime: Runtime[Any]) -> ModelRequest:
"""Validate that store is available if longterm memory is enabled.
Args:
request: The model request being processed.
runtime: The LangGraph runtime.
Returns:
The unmodified model request.
Raises:
ValueError: If long_term_memory is True but runtime.store is None.
"""
if self.long_term_memory and runtime.store is None:
msg = "Longterm memory is enabled, but no store is available"
raise ValueError(msg)
return request
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
"""Update the system prompt to include instructions on using the filesystem.
Args:
request: The model request being processed.
handler: The handler function to call with the modified request.
Returns:
The model response from the handler.
"""
if self.system_prompt_extension is not None:
request.system_prompt = (
request.system_prompt + "\n\n" + self.system_prompt_extension
if request.system_prompt
else self.system_prompt_extension
)
return handler(request)

View File

@@ -180,14 +180,14 @@ class HumanInTheLoopMiddleware(AgentMiddleware):
"""
super().__init__()
resolved_configs: dict[str, InterruptOnConfig] = {}
for tool_name, tool_config in interrupt_on.items():
if isinstance(tool_config, bool):
if tool_config is True:
for tool_name, interrupt_on_config in interrupt_on.items():
if isinstance(interrupt_on_config, bool):
if interrupt_on_config is True:
resolved_configs[tool_name] = InterruptOnConfig(
allowed_decisions=["approve", "edit", "reject"]
)
elif tool_config.get("allowed_decisions"):
resolved_configs[tool_name] = tool_config
elif interrupt_on_config.get("allowed_decisions"):
resolved_configs[tool_name] = interrupt_on_config
self.interrupt_on = resolved_configs
self.description_prefix = description_prefix

View File

@@ -0,0 +1,434 @@
"""Middleware for providing subagents to an agent via a `task` tool."""
from collections.abc import Callable, Sequence
from typing import Annotated, Any, TypedDict, cast
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_core.runnables import Runnable
from langchain_core.tools import StructuredTool
from langgraph.types import Command
from typing_extensions import NotRequired
from langchain.agents.middleware.types import AgentMiddleware, ModelRequest, ModelResponse
from langchain.tools import BaseTool, InjectedState, InjectedToolCallId
try:
from langchain_anthropic.middleware.prompt_caching import AnthropicPromptCachingMiddleware
except ImportError:
AnthropicPromptCachingMiddleware = None
class SubAgent(TypedDict):
"""Specification for an agent.
When specifying custom agents, the `default_middleware` from `SubAgentMiddleware`
will be applied first, followed by any `middleware` specified in this spec.
To use only custom middleware without the defaults, pass `default_middleware=[]`
to `SubAgentMiddleware`.
"""
name: str
"""The name of the agent."""
description: str
"""The description of the agent."""
system_prompt: str
"""The system prompt to use for the agent."""
tools: Sequence[BaseTool | Callable | dict[str, Any]]
"""The tools to use for the agent."""
model: NotRequired[str | BaseChatModel]
"""The model for the agent. Defaults to `default_model`."""
middleware: NotRequired[list[AgentMiddleware]]
"""Additional middleware to append after `default_middleware`."""
class CompiledSubAgent(TypedDict):
"""A pre-compiled agent spec."""
name: str
"""The name of the agent."""
description: str
"""The description of the agent."""
runnable: Runnable
"""The Runnable to use for the agent."""
DEFAULT_SUBAGENT_PROMPT = "In order to complete the objective that the user asks of you, you have access to a number of standard tools." # noqa: E501
# State keys that should be excluded when passing state to subagents
_EXCLUDED_STATE_KEYS = ("messages", "todos")
TASK_TOOL_DESCRIPTION = """Launch an ephemeral subagent to handle complex, multi-step independent tasks with isolated context windows.
Available agent types and the tools they have access to:
{available_agents}
When using the Task tool, you must specify a subagent_type parameter to select which agent type to use.
## Usage notes:
1. Launch multiple agents concurrently whenever possible, to maximize performance; to do that, use a single message with multiple tool uses
2. When the agent is done, it will return a single message back to you. The result returned by the agent is not visible to the user. To show the user the result, you should send a text message back to the user with a concise summary of the result.
3. Each agent invocation is stateless. You will not be able to send additional messages to the agent, nor will the agent be able to communicate with you outside of its final report. Therefore, your prompt should contain a highly detailed task description for the agent to perform autonomously and you should specify exactly what information the agent should return back to you in its final and only message to you.
4. The agent's outputs should generally be trusted
5. Clearly tell the agent whether you expect it to create content, perform analysis, or just do research (search, file reads, web fetches, etc.), since it is not aware of the user's intent
6. If the agent description mentions that it should be used proactively, then you should try your best to use it without the user having to ask for it first. Use your judgement.
7. When only the general-purpose agent is provided, you should use it for all tasks. It is great for isolating context and token usage, and completing specific, complex tasks, as it has all the same capabilities as the main agent.
### Example usage of the general-purpose agent:
<example_agent_descriptions>
"general-purpose": use this agent for general purpose tasks, it has access to all tools as the main agent.
</example_agent_descriptions>
<example>
User: "I want to conduct research on the accomplishments of Lebron James, Michael Jordan, and Kobe Bryant, and then compare them."
Assistant: *Uses the task tool in parallel to conduct isolated research on each of the three players*
Assistant: *Synthesizes the results of the three isolated research tasks and responds to the User*
<commentary>
Research is a complex, multi-step task in it of itself.
The research of each individual player is not dependent on the research of the other players.
The assistant uses the task tool to break down the complex objective into three isolated tasks.
Each research task only needs to worry about context and tokens about one player, then returns synthesized information about each player as the Tool Result.
This means each research task can dive deep and spend tokens and context deeply researching each player, but the final result is synthesized information, and saves us tokens in the long run when comparing the players to each other.
</commentary>
</example>
<example>
User: "Analyze a single large code repository for security vulnerabilities and generate a report."
Assistant: *Launches a single `task` subagent for the repository analysis*
Assistant: *Receives report and integrates results into final summary*
<commentary>
Subagent is used to isolate a large, context-heavy task, even though there is only one. This prevents the main thread from being overloaded with details.
If the user then asks followup questions, we have a concise report to reference instead of the entire history of analysis and tool calls, which is good and saves us time and money.
</commentary>
</example>
<example>
User: "Schedule two meetings for me and prepare agendas for each."
Assistant: *Calls the task tool in parallel to launch two `task` subagents (one per meeting) to prepare agendas*
Assistant: *Returns final schedules and agendas*
<commentary>
Tasks are simple individually, but subagents help silo agenda preparation.
Each subagent only needs to worry about the agenda for one meeting.
</commentary>
</example>
<example>
User: "I want to order a pizza from Dominos, order a burger from McDonald's, and order a salad from Subway."
Assistant: *Calls tools directly in parallel to order a pizza from Dominos, a burger from McDonald's, and a salad from Subway*
<commentary>
The assistant did not use the task tool because the objective is super simple and clear and only requires a few trivial tool calls.
It is better to just complete the task directly and NOT use the `task`tool.
</commentary>
</example>
### Example usage with custom agents:
<example_agent_descriptions>
"content-reviewer": use this agent after you are done creating significant content or documents
"greeting-responder": use this agent when to respond to user greetings with a friendly joke
"research-analyst": use this agent to conduct thorough research on complex topics
</example_agent_description>
<example>
user: "Please write a function that checks if a number is prime"
assistant: Sure let me write a function that checks if a number is prime
assistant: First let me use the Write tool to write a function that checks if a number is prime
assistant: I'm going to use the Write tool to write the following code:
<code>
function isPrime(n) {{
if (n <= 1) return false
for (let i = 2; i * i <= n; i++) {{
if (n % i === 0) return false
}}
return true
}}
</code>
<commentary>
Since significant content was created and the task was completed, now use the content-reviewer agent to review the work
</commentary>
assistant: Now let me use the content-reviewer agent to review the code
assistant: Uses the Task tool to launch with the content-reviewer agent
</example>
<example>
user: "Can you help me research the environmental impact of different renewable energy sources and create a comprehensive report?"
<commentary>
This is a complex research task that would benefit from using the research-analyst agent to conduct thorough analysis
</commentary>
assistant: I'll help you research the environmental impact of renewable energy sources. Let me use the research-analyst agent to conduct comprehensive research on this topic.
assistant: Uses the Task tool to launch with the research-analyst agent, providing detailed instructions about what research to conduct and what format the report should take
</example>
<example>
user: "Hello"
<commentary>
Since the user is greeting, use the greeting-responder agent to respond with a friendly joke
</commentary>
assistant: "I'm going to use the Task tool to launch with the greeting-responder agent"
</example>""" # noqa: E501
DEFAULT_GENERAL_PURPOSE_DESCRIPTION = "General-purpose agent for researching complex questions, searching for files and content, and executing multi-step tasks. When you are searching for a keyword or file and are not confident that you will find the right match in the first few tries use this agent to perform the search for you. This agent has access to all tools as the main agent." # noqa: E501
def _get_subagents(
*,
default_model: str | BaseChatModel,
default_tools: Sequence[BaseTool | Callable | dict[str, Any]],
default_middleware: list[AgentMiddleware] | None,
subagents: list[SubAgent | CompiledSubAgent],
general_purpose_agent: bool,
) -> tuple[dict[str, Any], list[str]]:
"""Create subagent instances from specifications.
Args:
default_model: Default model for subagents that don't specify one.
default_tools: Default tools for subagents that don't specify tools.
default_middleware: Middleware to apply to all subagents. If `None`,
no default middleware is applied.
subagents: List of agent specifications or pre-compiled agents.
general_purpose_agent: Whether to include a general-purpose subagent.
Returns:
Tuple of (agent_dict, description_list) where agent_dict maps agent names
to runnable instances and description_list contains formatted descriptions.
"""
from langchain.agents.factory import create_agent
# Use empty list if None (no default middleware)
default_subagent_middleware = default_middleware or []
agents: dict[str, Any] = {}
subagent_descriptions = []
# Create general-purpose agent if enabled
if general_purpose_agent:
general_purpose_subagent = create_agent(
default_model,
system_prompt=DEFAULT_SUBAGENT_PROMPT,
tools=default_tools,
middleware=default_subagent_middleware,
)
agents["general-purpose"] = general_purpose_subagent
subagent_descriptions.append(f"- general-purpose: {DEFAULT_GENERAL_PURPOSE_DESCRIPTION}")
# Process custom subagents
for agent_ in subagents:
subagent_descriptions.append(f"- {agent_['name']}: {agent_['description']}")
if "runnable" in agent_:
custom_agent = cast("CompiledSubAgent", agent_)
agents[custom_agent["name"]] = custom_agent["runnable"]
continue
_tools = agent_.get("tools", list(default_tools))
subagent_model = agent_.get("model", default_model)
if "middleware" in agent_:
_middleware = [*default_subagent_middleware, *agent_["middleware"]]
else:
_middleware = default_subagent_middleware
agents[agent_["name"]] = create_agent(
subagent_model,
system_prompt=agent_["system_prompt"],
tools=_tools,
middleware=_middleware,
checkpointer=False,
)
return agents, subagent_descriptions
def _create_task_tool(
*,
default_model: str | BaseChatModel,
default_tools: Sequence[BaseTool | Callable | dict[str, Any]],
default_middleware: list[AgentMiddleware] | None,
subagents: list[SubAgent | CompiledSubAgent],
general_purpose_agent: bool,
task_description: str | None = None,
) -> BaseTool:
"""Create a task tool for invoking subagents.
Args:
default_model: Default model for subagents.
default_tools: Default tools for subagents.
default_middleware: Middleware to apply to all subagents.
subagents: List of subagent specifications.
general_purpose_agent: Whether to include general-purpose agent.
task_description: Custom description for the task tool. If `None`,
uses default template. Supports `{available_agents}` placeholder.
Returns:
A StructuredTool that can invoke subagents by type.
"""
subagent_graphs, subagent_descriptions = _get_subagents(
default_model=default_model,
default_tools=default_tools,
default_middleware=default_middleware,
subagents=subagents,
general_purpose_agent=general_purpose_agent,
)
subagent_description_str = "\n".join(subagent_descriptions)
def _return_command_with_state_update(result: dict, tool_call_id: str) -> Command:
state_update = {k: v for k, v in result.items() if k not in _EXCLUDED_STATE_KEYS}
return Command(
update={
**state_update,
"messages": [
ToolMessage(result["messages"][-1].content, tool_call_id=tool_call_id)
],
}
)
def _validate_and_prepare_state(
subagent_type: str, description: str, state: dict
) -> tuple[Runnable, dict]:
"""Validate subagent type and prepare state for invocation."""
if subagent_type not in subagent_graphs:
msg = (
f"Error: invoked agent of type {subagent_type}, "
f"the only allowed types are {[f'`{k}`' for k in subagent_graphs]}"
)
raise ValueError(msg)
subagent = subagent_graphs[subagent_type]
# Create a new state dict to avoid mutating the original
subagent_state = {k: v for k, v in state.items() if k not in _EXCLUDED_STATE_KEYS}
subagent_state["messages"] = [HumanMessage(content=description)]
return subagent, subagent_state
# Use custom description if provided, otherwise use default template
if task_description is None:
task_description = TASK_TOOL_DESCRIPTION.format(available_agents=subagent_description_str)
elif "{available_agents}" in task_description:
# If custom description has placeholder, format with agent descriptions
task_description = task_description.format(available_agents=subagent_description_str)
def task(
description: str,
subagent_type: str,
state: Annotated[dict, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
) -> str | Command:
subagent, subagent_state = _validate_and_prepare_state(subagent_type, description, state)
result = subagent.invoke(subagent_state)
return _return_command_with_state_update(result, tool_call_id)
async def atask(
description: str,
subagent_type: str,
state: Annotated[dict, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
) -> str | Command:
subagent, subagent_state = _validate_and_prepare_state(subagent_type, description, state)
result = await subagent.ainvoke(subagent_state)
return _return_command_with_state_update(result, tool_call_id)
return StructuredTool.from_function(
name="task",
func=task,
coroutine=atask,
description=task_description,
)
class SubAgentMiddleware(AgentMiddleware):
"""Middleware for providing subagents to an agent via a `task` tool.
This middleware adds a `task` tool to the agent that can be used to invoke subagents.
Subagents are useful for handling complex tasks that require multiple steps, or tasks
that require a lot of context to resolve.
A chief benefit of subagents is that they can handle multi-step tasks, and then return
a clean, concise response to the main agent.
Subagents are also great for different domains of expertise that require a narrower
subset of tools and focus.
This middleware comes with a default general-purpose subagent that can be used to
handle the same tasks as the main agent, but with isolated context.
Args:
default_model: The model to use for subagents.
Can be a LanguageModelLike or a dict for init_chat_model.
default_tools: The tools to use for the default general-purpose subagent.
default_middleware: Default middleware to apply to all subagents. If `None` (default),
no default middleware is applied. Pass a list to specify custom middleware.
subagents: A list of additional subagents to provide to the agent.
system_prompt: Full system prompt override. When provided, completely replaces
the agent's system prompt.
general_purpose_agent: Whether to include the general-purpose agent. Defaults to `True`.
task_description: Custom description for the task tool. If `None`, uses the
default description template.
Example:
```python
from langchain.agents.middleware.subagents import SubAgentMiddleware
from langchain.agents import create_agent
# Basic usage with defaults (no default middleware)
agent = create_agent(
"openai:gpt-4o",
middleware=[
SubAgentMiddleware(
default_model="openai:gpt-4o",
subagents=[],
)
],
)
# Add custom middleware to subagents
agent = create_agent(
"openai:gpt-4o",
middleware=[
SubAgentMiddleware(
default_model="openai:gpt-4o",
default_middleware=[TodoListMiddleware()],
subagents=[],
)
],
)
```
"""
def __init__(
self,
*,
default_model: str | BaseChatModel,
default_tools: Sequence[BaseTool | Callable | dict[str, Any]] | None = None,
default_middleware: list[AgentMiddleware] | None = None,
subagents: list[SubAgent | CompiledSubAgent] | None = None,
system_prompt: str | None = None,
general_purpose_agent: bool = True,
task_description: str | None = None,
) -> None:
"""Initialize the SubAgentMiddleware."""
super().__init__()
self.system_prompt = system_prompt
task_tool = _create_task_tool(
default_model=default_model,
default_tools=default_tools or [],
default_middleware=default_middleware,
subagents=subagents or [],
general_purpose_agent=general_purpose_agent,
task_description=task_description,
)
self.tools = [task_tool]
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
"""Update the system prompt to include instructions on using subagents."""
if self.system_prompt is not None:
request.system_prompt = self.system_prompt
return handler(request)

View File

@@ -57,6 +57,7 @@ test = [
"toml>=0.10.2,<1.0.0",
"langchain-tests",
"langchain-openai",
"langchain-anthropic",
]
lint = [
"ruff>=0.12.2,<0.13.0",

View File

@@ -0,0 +1,722 @@
from langchain.agents.middleware.filesystem import (
FilesystemMiddleware,
WRITE_FILE_TOOL_DESCRIPTION,
WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT,
)
from langchain.agents import create_agent
from langchain.agents.deepagents import create_deep_agent
from langchain_core.messages import HumanMessage
from langchain_anthropic import ChatAnthropic
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import MemorySaver
from langchain.agents._internal.file_utils import FileData
import pytest
import uuid
@pytest.mark.requires("langchain_anthropic")
class TestFilesystem:
def test_create_deepagent_without_store_and_with_longterm_memory_should_fail(self):
with pytest.raises(ValueError):
deepagent = create_deep_agent(tools=[], use_longterm_memory=True)
deepagent.invoke(
{"messages": [HumanMessage(content="List all of the files in your filesystem?")]}
)
def test_filesystem_system_prompt_override(self):
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=False,
system_prompt_extension="In every single response, you must say the word 'pokemon'! You love it!",
)
],
)
response = agent.invoke({"messages": [HumanMessage(content="What do you like?")]})
assert "pokemon" in response["messages"][1].text.lower()
def test_filesystem_system_prompt_override_with_longterm_memory(self):
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
system_prompt_extension="In every single response, you must say the word 'pokemon'! You love it!",
)
],
store=InMemoryStore(),
)
response = agent.invoke({"messages": [HumanMessage(content="What do you like?")]})
assert "pokemon" in response["messages"][1].text.lower()
def test_filesystem_tool_prompt_override(self):
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=False,
custom_tool_descriptions={
"ls": "Charmander",
"read_file": "Bulbasaur",
"edit_file": "Squirtle",
},
)
],
)
tools = agent.nodes["tools"].bound._tools_by_name
assert "ls" in tools
assert tools["ls"].description == "Charmander"
assert "read_file" in tools
assert tools["read_file"].description == "Bulbasaur"
assert "write_file" in tools
assert tools["write_file"].description == WRITE_FILE_TOOL_DESCRIPTION
assert "edit_file" in tools
assert tools["edit_file"].description == "Squirtle"
def test_filesystem_tool_prompt_override_with_longterm_memory(self):
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
custom_tool_descriptions={
"ls": "Charmander",
"read_file": "Bulbasaur",
"edit_file": "Squirtle",
},
)
],
store=InMemoryStore(),
)
tools = agent.nodes["tools"].bound._tools_by_name
assert "ls" in tools
assert tools["ls"].description == "Charmander"
assert "read_file" in tools
assert tools["read_file"].description == "Bulbasaur"
assert "write_file" in tools
assert (
tools["write_file"].description
== WRITE_FILE_TOOL_DESCRIPTION + WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
)
assert "edit_file" in tools
assert tools["edit_file"].description == "Squirtle"
def test_ls_longterm_without_path(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/test.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
store.put(
("filesystem",),
"/pokemon/charmander.txt",
{
"content": ["Ember"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [HumanMessage(content="List all of your files")],
"files": {
"/pizza.txt": FileData(
content=["Hello world"],
created_at="2021-01-01",
modified_at="2021-01-01",
),
"/pokemon/squirtle.txt": FileData(
content=["Splash"],
created_at="2021-01-01",
modified_at="2021-01-01",
),
},
},
config=config,
)
messages = response["messages"]
ls_message = next(
message for message in messages if message.type == "tool" and message.name == "ls"
)
assert "/pizza.txt" in ls_message.text
assert "/pokemon/squirtle.txt" in ls_message.text
assert "/memories/test.txt" in ls_message.text
assert "/memories/pokemon/charmander.txt" in ls_message.text
def test_ls_longterm_with_path(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/test.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
store.put(
("filesystem",),
"/pokemon/charmander.txt",
{
"content": ["Ember"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(content="List all of your files in the /pokemon directory")
],
"files": {
"/pizza.txt": FileData(
content=["Hello world"],
created_at="2021-01-01",
modified_at="2021-01-01",
),
"/pokemon/squirtle.txt": FileData(
content=["Splash"],
created_at="2021-01-01",
modified_at="2021-01-01",
),
},
},
config=config,
)
messages = response["messages"]
ls_message = next(
message for message in messages if message.type == "tool" and message.name == "ls"
)
assert "/pokemon/squirtle.txt" in ls_message.text
assert "/memories/pokemon/charmander.txt" not in ls_message.text
def test_read_file_longterm_local_file(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/test.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [HumanMessage(content="Read test.txt from local memory")],
"files": {
"/test.txt": FileData(
content=["Goodbye world"],
created_at="2021-01-01",
modified_at="2021-01-01",
)
},
},
config=config,
)
messages = response["messages"]
read_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "read_file"
)
assert read_file_message is not None
assert "Goodbye world" in read_file_message.content
def test_read_file_longterm_store_file(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/test.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [HumanMessage(content="Read test.txt from longterm memory")],
"files": {
"/test.txt": FileData(
content=["Goodbye world"],
created_at="2021-01-01",
modified_at="2021-01-01",
)
},
},
config=config,
)
messages = response["messages"]
read_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "read_file"
)
assert read_file_message is not None
assert "Hello world" in read_file_message.content
def test_read_file_longterm(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/test.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
store.put(
("filesystem",),
"/pokemon/charmander.txt",
{
"content": ["Ember"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Read the contents of the file about charmander from longterm memory."
)
],
"files": {},
},
config=config,
)
messages = response["messages"]
ai_msg_w_toolcall = next(
message
for message in messages
if message.type == "ai"
and any(
tc["name"] == "read_file"
and tc["args"]["file_path"] == "/memories/pokemon/charmander.txt"
for tc in message.tool_calls
)
)
assert ai_msg_w_toolcall is not None
def test_write_file_longterm(self):
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Write a haiku about Charmander to longterm memory in /charmander.txt, use the word 'fiery'"
)
],
"files": {},
},
config=config,
)
messages = response["messages"]
write_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "write_file"
)
assert write_file_message is not None
file_item = store.get(("filesystem",), "/charmander.txt")
assert file_item is not None
assert any("fiery" in c for c in file_item.value["content"]) or any(
"Fiery" in c for c in file_item.value["content"]
)
def test_write_file_fail_already_exists_in_store(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/charmander.txt",
{
"content": ["Hello world"],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Write a haiku about Charmander to longterm memory in /charmander.txt, use the word 'fiery'"
)
],
"files": {},
},
config=config,
)
messages = response["messages"]
write_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "write_file"
)
assert write_file_message is not None
assert "Cannot write" in write_file_message.content
def test_write_file_fail_already_exists_in_local(self):
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Write a haiku about Charmander to /charmander.txt, use the word 'fiery'"
)
],
"files": {
"/charmander.txt": FileData(
content=["Hello world"],
created_at="2021-01-01",
modified_at="2021-01-01",
)
},
},
config=config,
)
messages = response["messages"]
write_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "write_file"
)
assert write_file_message is not None
assert "Cannot write" in write_file_message.content
def test_edit_file_longterm(self):
checkpointer = MemorySaver()
store = InMemoryStore()
store.put(
("filesystem",),
"/charmander.txt",
{
"content": ["The fire burns brightly. The fire burns hot."],
"created_at": "2021-01-01",
"modified_at": "2021-01-01",
},
)
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Edit the longterm memory file about charmander, to replace all instances of the word 'fire' with 'embers'"
)
],
"files": {},
},
config=config,
)
messages = response["messages"]
edit_file_message = next(
message
for message in messages
if message.type == "tool" and message.name == "edit_file"
)
assert edit_file_message is not None
assert store.get(("filesystem",), "/charmander.txt").value["content"] == [
"The embers burns brightly. The embers burns hot."
]
def test_longterm_memory_multiple_tools(self):
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_agent(
model=ChatAnthropic(model="claude-3-5-sonnet-20240620"),
middleware=[
FilesystemMiddleware(
use_longterm_memory=True,
)
],
checkpointer=checkpointer,
store=store,
)
assert_longterm_mem_tools(agent, store)
def test_longterm_memory_multiple_tools_deepagent(self):
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_deep_agent(use_longterm_memory=True, checkpointer=checkpointer, store=store)
assert_longterm_mem_tools(agent, store)
def test_shortterm_memory_multiple_tools_deepagent(self):
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_deep_agent(use_longterm_memory=False, checkpointer=checkpointer, store=store)
assert_shortterm_mem_tools(agent)
# Take actions on multiple threads to test longterm memory
def assert_longterm_mem_tools(agent, store):
# Write a longterm memory file
config = {"configurable": {"thread_id": uuid.uuid4()}}
agent.invoke(
{
"messages": [
HumanMessage(
content="Write a haiku about Charmander to longterm memory in /charmander.txt, use the word 'fiery'"
)
]
},
config=config,
)
namespaces = store.list_namespaces()
assert len(namespaces) == 1
assert namespaces[0] == ("filesystem",)
file_item = store.get(("filesystem",), "/charmander.txt")
assert file_item is not None
assert file_item.key == "/charmander.txt"
# Read the longterm memory file
config2 = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Read the haiku about Charmander from longterm memory at /charmander.txt"
)
]
},
config=config2,
)
messages = response["messages"]
read_file_message = next(
message for message in messages if message.type == "tool" and message.name == "read_file"
)
assert "fiery" in read_file_message.content or "Fiery" in read_file_message.content
# List all of the files in longterm memory
config3 = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{"messages": [HumanMessage(content="List all of the files in longterm memory")]},
config=config3,
)
messages = response["messages"]
ls_message = next(
message for message in messages if message.type == "tool" and message.name == "ls"
)
assert "/memories/charmander.txt" in ls_message.content
# Edit the longterm memory file
config4 = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Edit the haiku about Charmander in longterm memory to use the word 'ember'"
)
]
},
config=config4,
)
file_item = store.get(("filesystem",), "/charmander.txt")
assert file_item is not None
assert file_item.key == "/charmander.txt"
assert any("ember" in c for c in file_item.value["content"]) or any(
"Ember" in c for c in file_item.value["content"]
)
# Read the longterm memory file
config5 = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Read the haiku about Charmander from longterm memory at /charmander.txt"
)
]
},
config=config5,
)
messages = response["messages"]
read_file_message = next(
message for message in messages if message.type == "tool" and message.name == "read_file"
)
assert "ember" in read_file_message.content or "Ember" in read_file_message.content
def assert_shortterm_mem_tools(agent):
# Write a shortterm memory file
config = {"configurable": {"thread_id": uuid.uuid4()}}
response = agent.invoke(
{
"messages": [
HumanMessage(
content="Write a haiku about Charmander to /charmander.txt, use the word 'fiery'"
)
]
},
config=config,
)
files = response["files"]
assert "/charmander.txt" in files
# Read the shortterm memory file
response = agent.invoke(
{
"messages": [
HumanMessage(content="Read the haiku about Charmander from /charmander.txt")
]
},
config=config,
)
messages = response["messages"]
read_file_message = next(
message
for message in reversed(messages)
if message.type == "tool" and message.name == "read_file"
)
assert "fiery" in read_file_message.content or "Fiery" in read_file_message.content
# List all of the files in shortterm memory
response = agent.invoke(
{"messages": [HumanMessage(content="List all of the files in your filesystem")]},
config=config,
)
messages = response["messages"]
ls_message = next(
message for message in messages if message.type == "tool" and message.name == "ls"
)
assert "/charmander.txt" in ls_message.content
# Edit the shortterm memory file
response = agent.invoke(
{
"messages": [
HumanMessage(content="Edit the haiku about Charmander to use the word 'ember'")
]
},
config=config,
)
files = response["files"]
assert "/charmander.txt" in files
assert any("ember" in c for c in files["/charmander.txt"]["content"]) or any(
"Ember" in c for c in files["/charmander.txt"]["content"]
)
# Read the shortterm memory file
response = agent.invoke(
{"messages": [HumanMessage(content="Read the haiku about Charmander at /charmander.txt")]},
config=config,
)
messages = response["messages"]
read_file_message = next(
message
for message in reversed(messages)
if message.type == "tool" and message.name == "read_file"
)
assert "ember" in read_file_message.content or "Ember" in read_file_message.content

View File

@@ -0,0 +1,226 @@
from langchain.agents.middleware.subagents import SubAgentMiddleware
from langchain.agents.middleware import AgentMiddleware
from langchain_core.tools import tool
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
import pytest
@tool
def get_weather(city: str) -> str:
"""Get the weather in a city."""
return f"The weather in {city} is sunny."
class WeatherMiddleware(AgentMiddleware):
tools = [get_weather]
def assert_expected_subgraph_actions(expected_tool_calls, agent, inputs):
current_idx = 0
for update in agent.stream(
inputs,
subgraphs=True,
stream_mode="updates",
):
if "model" in update[1]:
ai_message = update[1]["model"]["messages"][-1]
tool_calls = ai_message.tool_calls
for tool_call in tool_calls:
if tool_call["name"] == expected_tool_calls[current_idx]["name"]:
if "model" in expected_tool_calls[current_idx]:
assert (
ai_message.response_metadata["model_name"]
== expected_tool_calls[current_idx]["model"]
)
for arg in expected_tool_calls[current_idx]["args"]:
assert arg in tool_call["args"]
assert (
tool_call["args"][arg] == expected_tool_calls[current_idx]["args"][arg]
)
current_idx += 1
assert current_idx == len(expected_tool_calls)
@pytest.mark.requires("langchain_anthropic", "langchain_openai")
class TestSubagentMiddleware:
"""Integration tests for the SubagentMiddleware class."""
def test_general_purpose_subagent(self):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the general-purpose subagent to get the weather in a city.",
middleware=[
SubAgentMiddleware(
default_model="claude-sonnet-4-20250514",
default_tools=[get_weather],
)
],
)
assert "task" in agent.nodes["tools"].bound._tools_by_name.keys()
response = agent.invoke(
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]}
)
assert response["messages"][1].tool_calls[0]["name"] == "task"
assert response["messages"][1].tool_calls[0]["args"]["subagent_type"] == "general-purpose"
def test_defined_subagent(self):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_model="claude-sonnet-4-20250514",
default_tools=[],
subagents=[
{
"name": "weather",
"description": "This subagent can get weather in cities.",
"system_prompt": "Use the get_weather tool to get the weather in a city.",
"tools": [get_weather],
}
],
)
],
)
assert "task" in agent.nodes["tools"].bound._tools_by_name.keys()
response = agent.invoke(
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]}
)
assert response["messages"][1].tool_calls[0]["name"] == "task"
assert response["messages"][1].tool_calls[0]["args"]["subagent_type"] == "weather"
def test_defined_subagent_tool_calls(self):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_model="claude-sonnet-4-20250514",
default_tools=[],
subagents=[
{
"name": "weather",
"description": "This subagent can get weather in cities.",
"system_prompt": "Use the get_weather tool to get the weather in a city.",
"tools": [get_weather],
}
],
)
],
)
expected_tool_calls = [
{"name": "task", "args": {"subagent_type": "weather"}},
{"name": "get_weather", "args": {}},
]
assert_expected_subgraph_actions(
expected_tool_calls,
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)
def test_defined_subagent_custom_model(self):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_model="claude-sonnet-4-20250514",
default_tools=[],
subagents=[
{
"name": "weather",
"description": "This subagent can get weather in cities.",
"system_prompt": "Use the get_weather tool to get the weather in a city.",
"tools": [get_weather],
"model": "gpt-4.1",
}
],
)
],
)
expected_tool_calls = [
{
"name": "task",
"args": {"subagent_type": "weather"},
"model": "claude-sonnet-4-20250514",
},
{"name": "get_weather", "args": {}, "model": "gpt-4.1-2025-04-14"},
]
assert_expected_subgraph_actions(
expected_tool_calls,
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)
def test_defined_subagent_custom_middleware(self):
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_model="claude-sonnet-4-20250514",
default_tools=[],
subagents=[
{
"name": "weather",
"description": "This subagent can get weather in cities.",
"system_prompt": "Use the get_weather tool to get the weather in a city.",
"tools": [], # No tools, only in middleware
"model": "gpt-4.1",
"middleware": [WeatherMiddleware()],
}
],
)
],
)
expected_tool_calls = [
{
"name": "task",
"args": {"subagent_type": "weather"},
"model": "claude-sonnet-4-20250514",
},
{"name": "get_weather", "args": {}, "model": "gpt-4.1-2025-04-14"},
]
assert_expected_subgraph_actions(
expected_tool_calls,
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)
def test_defined_subagent_custom_runnable(self):
custom_subagent = create_agent(
model="gpt-4.1-2025-04-14",
system_prompt="Use the get_weather tool to get the weather in a city.",
tools=[get_weather],
)
agent = create_agent(
model="claude-sonnet-4-20250514",
system_prompt="Use the task tool to call a subagent.",
middleware=[
SubAgentMiddleware(
default_model="claude-sonnet-4-20250514",
default_tools=[],
subagents=[
{
"name": "weather",
"description": "This subagent can get weather in cities.",
"runnable": custom_subagent,
}
],
)
],
)
expected_tool_calls = [
{
"name": "task",
"args": {"subagent_type": "weather"},
"model": "claude-sonnet-4-20250514",
},
{"name": "get_weather", "args": {}, "model": "gpt-4.1-2025-04-14"},
]
assert_expected_subgraph_actions(
expected_tool_calls,
agent,
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
)

View File

@@ -0,0 +1,306 @@
from langchain.agents.deepagents import create_deep_agent
from langchain.agents import create_agent
from langchain_core.tools import tool, InjectedToolCallId
from typing import Annotated
from langchain.tools.tool_node import InjectedState
from langchain.agents.middleware import AgentMiddleware, AgentState
from langgraph.types import Command
from langchain_core.messages import ToolMessage
import pytest
def assert_all_deepagent_qualities(agent):
assert "todos" in agent.stream_channels
assert "files" in agent.stream_channels
assert "write_todos" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "ls" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "read_file" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "write_file" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "edit_file" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "task" in agent.nodes["tools"].bound._tools_by_name.keys()
SAMPLE_MODEL = "claude-3-5-sonnet-20240620"
@tool(description="Use this tool to get the weather")
def get_weather(location: str):
return f"The weather in {location} is sunny."
@tool(description="Use this tool to get the latest soccer scores")
def get_soccer_scores(team: str):
return f"The latest soccer scores for {team} are 2-1."
@tool(description="Sample tool")
def sample_tool(sample_input: str):
return sample_input
@tool(description="Sample tool with injected state")
def sample_tool_with_injected_state(sample_input: str, state: Annotated[dict, InjectedState]):
return sample_input + state["sample_input"]
TOY_BASKETBALL_RESEARCH = "Lebron James is the best basketball player of all time with over 40k points and 21 seasons in the NBA."
@tool(description="Use this tool to conduct research into basketball and save it to state")
def research_basketball(
topic: str,
state: Annotated[dict, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
):
current_research = state.get("research", "")
research = f"{current_research}\n\nResearching on {topic}... Done! {TOY_BASKETBALL_RESEARCH}"
return Command(
update={
"research": research,
"messages": [ToolMessage(research, tool_call_id=tool_call_id)],
}
)
class ResearchState(AgentState):
research: str
class ResearchMiddlewareWithTools(AgentMiddleware):
state_schema = ResearchState
tools = [research_basketball]
class ResearchMiddleware(AgentMiddleware):
state_schema = ResearchState
class SampleMiddlewareWithTools(AgentMiddleware):
tools = [sample_tool]
class SampleState(AgentState):
sample_input: str
class SampleMiddlewareWithToolsAndState(AgentMiddleware):
state_schema = SampleState
tools = [sample_tool]
class WeatherToolMiddleware(AgentMiddleware):
tools = [get_weather]
@pytest.mark.requires("langchain_anthropic")
class TestDeepAgentsFilesystem:
def test_base_deep_agent(self):
agent = create_deep_agent()
assert_all_deepagent_qualities(agent)
def test_deep_agent_with_tool(self):
agent = create_deep_agent(tools=[sample_tool])
assert_all_deepagent_qualities(agent)
assert "sample_tool" in agent.nodes["tools"].bound._tools_by_name.keys()
def test_deep_agent_with_middleware_with_tool(self):
agent = create_deep_agent(middleware=[SampleMiddlewareWithTools()])
assert_all_deepagent_qualities(agent)
assert "sample_tool" in agent.nodes["tools"].bound._tools_by_name.keys()
def test_deep_agent_with_middleware_with_tool_and_state(self):
agent = create_deep_agent(middleware=[SampleMiddlewareWithToolsAndState()])
assert_all_deepagent_qualities(agent)
assert "sample_tool" in agent.nodes["tools"].bound._tools_by_name.keys()
assert "sample_input" in agent.stream_channels
def test_deep_agent_with_subagents(self):
subagents = [
{
"name": "weather_agent",
"description": "Use this agent to get the weather",
"system_prompt": "You are a weather agent.",
"tools": [get_weather],
"model": SAMPLE_MODEL,
}
]
agent = create_deep_agent(tools=[sample_tool], subagents=subagents)
assert_all_deepagent_qualities(agent)
result = agent.invoke(
{"messages": [{"role": "user", "content": "What is the weather in Tokyo?"}]}
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "weather_agent"
for tool_call in tool_calls
]
)
def test_deep_agent_with_subagents_gen_purpose(self):
subagents = [
{
"name": "weather_agent",
"description": "Use this agent to get the weather",
"system_prompt": "You are a weather agent.",
"tools": [get_weather],
"model": SAMPLE_MODEL,
}
]
agent = create_deep_agent(tools=[sample_tool], subagents=subagents)
assert_all_deepagent_qualities(agent)
result = agent.invoke(
{
"messages": [
{
"role": "user",
"content": "Use the general purpose subagent to call the sample tool",
}
]
}
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "general-purpose"
for tool_call in tool_calls
]
)
def test_deep_agent_with_subagents_with_middleware(self):
subagents = [
{
"name": "weather_agent",
"description": "Use this agent to get the weather",
"system_prompt": "You are a weather agent.",
"tools": [],
"model": SAMPLE_MODEL,
"middleware": [WeatherToolMiddleware()],
}
]
agent = create_deep_agent(tools=[sample_tool], subagents=subagents)
assert_all_deepagent_qualities(agent)
result = agent.invoke(
{"messages": [{"role": "user", "content": "What is the weather in Tokyo?"}]}
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "weather_agent"
for tool_call in tool_calls
]
)
def test_deep_agent_with_custom_subagents(self):
subagents = [
{
"name": "weather_agent",
"description": "Use this agent to get the weather",
"system_prompt": "You are a weather agent.",
"tools": [get_weather],
"model": SAMPLE_MODEL,
},
{
"name": "soccer_agent",
"description": "Use this agent to get the latest soccer scores",
"runnable": create_agent(
model=SAMPLE_MODEL,
tools=[get_soccer_scores],
system_prompt="You are a soccer agent.",
),
},
]
agent = create_deep_agent(tools=[sample_tool], subagents=subagents)
assert_all_deepagent_qualities(agent)
result = agent.invoke(
{
"messages": [
{
"role": "user",
"content": "Look up the weather in Tokyo, and the latest scores for Manchester City!",
}
]
}
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "weather_agent"
for tool_call in tool_calls
]
)
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "soccer_agent"
for tool_call in tool_calls
]
)
def test_deep_agent_with_extended_state_and_subagents(self):
subagents = [
{
"name": "basketball_info_agent",
"description": "Use this agent to get surface level info on any basketball topic",
"system_prompt": "You are a basketball info agent.",
"middleware": [ResearchMiddlewareWithTools()],
}
]
agent = create_deep_agent(
tools=[sample_tool], subagents=subagents, middleware=[ResearchMiddleware()]
)
assert_all_deepagent_qualities(agent)
assert "research" in agent.stream_channels
result = agent.invoke(
{"messages": [{"role": "user", "content": "Get surface level info on lebron james"}]},
config={"recursion_limit": 100},
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "basketball_info_agent"
for tool_call in tool_calls
]
)
assert TOY_BASKETBALL_RESEARCH in result["research"]
def test_deep_agent_with_subagents_no_tools(self):
subagents = [
{
"name": "basketball_info_agent",
"description": "Use this agent to get surface level info on any basketball topic",
"system_prompt": "You are a basketball info agent.",
}
]
agent = create_deep_agent(tools=[sample_tool], subagents=subagents)
assert_all_deepagent_qualities(agent)
result = agent.invoke(
{
"messages": [
{
"role": "user",
"content": "Use the basketball info subagent to call the sample tool",
}
]
},
config={"recursion_limit": 100},
)
agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"]
tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls]
assert any(
[
tool_call["name"] == "task"
and tool_call["args"].get("subagent_type") == "basketball_info_agent"
for tool_call in tool_calls
]
)

View File

@@ -0,0 +1,114 @@
from langchain.agents.middleware.filesystem import (
FileData,
FilesystemState,
FilesystemMiddleware,
FILESYSTEM_SYSTEM_PROMPT,
FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT,
)
from langgraph.store.memory import InMemoryStore
from langgraph.runtime import Runtime
class TestFilesystem:
def test_init_local(self):
middleware = FilesystemMiddleware(long_term_memory=False)
assert middleware.long_term_memory is False
assert middleware.system_prompt_extension == FILESYSTEM_SYSTEM_PROMPT
assert len(middleware.tools) == 4
def test_init_longterm(self):
middleware = FilesystemMiddleware(long_term_memory=True)
assert middleware.long_term_memory is True
assert middleware.system_prompt_extension == (
FILESYSTEM_SYSTEM_PROMPT + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT
)
assert len(middleware.tools) == 4
def test_init_custom_system_prompt_shortterm(self):
middleware = FilesystemMiddleware(
long_term_memory=False, system_prompt_extension="Custom system prompt"
)
assert middleware.long_term_memory is False
assert middleware.system_prompt_extension == "Custom system prompt"
assert len(middleware.tools) == 4
def test_init_custom_system_prompt_longterm(self):
middleware = FilesystemMiddleware(
long_term_memory=True, system_prompt_extension="Custom system prompt"
)
assert middleware.long_term_memory is True
assert middleware.system_prompt_extension == "Custom system prompt"
assert len(middleware.tools) == 4
def test_init_custom_tool_descriptions_shortterm(self):
middleware = FilesystemMiddleware(
long_term_memory=False, custom_tool_descriptions={"ls": "Custom ls tool description"}
)
assert middleware.long_term_memory is False
assert middleware.system_prompt_extension == FILESYSTEM_SYSTEM_PROMPT
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
assert ls_tool.description == "Custom ls tool description"
def test_init_custom_tool_descriptions_longterm(self):
middleware = FilesystemMiddleware(
long_term_memory=True, custom_tool_descriptions={"ls": "Custom ls tool description"}
)
assert middleware.long_term_memory is True
assert middleware.system_prompt_extension == (
FILESYSTEM_SYSTEM_PROMPT + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT
)
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
assert ls_tool.description == "Custom ls tool description"
def test_ls_shortterm(self):
state = FilesystemState(
messages=[],
files={
"test.txt": FileData(
content=["Hello world"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
"test2.txt": FileData(
content=["Goodbye world"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
},
)
middleware = FilesystemMiddleware(long_term_memory=False)
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
result = ls_tool.invoke({"state": state})
assert result == ["test.txt", "test2.txt"]
def test_ls_shortterm_with_path(self):
state = FilesystemState(
messages=[],
files={
"/test.txt": FileData(
content=["Hello world"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
"/pokemon/test2.txt": FileData(
content=["Goodbye world"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
"/pokemon/charmander.txt": FileData(
content=["Ember"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
"/pokemon/water/squirtle.txt": FileData(
content=["Water"],
modified_at="2021-01-01",
created_at="2021-01-01",
),
},
)
middleware = FilesystemMiddleware(long_term_memory=False)
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
result = ls_tool.invoke({"state": state, "path": "pokemon/"})
assert "/pokemon/test2.txt" in result
assert "/pokemon/charmander.txt" in result

View File

@@ -0,0 +1,42 @@
from langchain.agents.middleware.subagents import (
DEFAULT_GENERAL_PURPOSE_DESCRIPTION,
TASK_TOOL_DESCRIPTION,
SubAgentMiddleware,
)
import pytest
@pytest.mark.requires("langchain_openai")
class TestSubagentMiddleware:
"""Test the SubagentMiddleware class."""
def test_subagent_middleware_init(self):
middleware = SubAgentMiddleware(
default_model="gpt-4o-mini",
)
assert middleware is not None
assert middleware.system_prompt is None
assert len(middleware.tools) == 1
assert middleware.tools[0].name == "task"
expected_desc = TASK_TOOL_DESCRIPTION.format(
available_agents=f"- general-purpose: {DEFAULT_GENERAL_PURPOSE_DESCRIPTION}"
)
assert middleware.tools[0].description == expected_desc
def test_default_subagent_with_tools(self):
middleware = SubAgentMiddleware(
default_model="gpt-4o-mini",
default_tools=[],
)
assert middleware is not None
assert middleware.system_prompt is None
def test_default_subagent_custom_system_prompt(self):
middleware = SubAgentMiddleware(
default_model="gpt-4o-mini",
default_tools=[],
system_prompt="Use the task tool to call a subagent.",
)
assert middleware is not None
assert middleware.system_prompt == "Use the task tool to call a subagent."

View File

@@ -32,7 +32,7 @@ from langchain.agents.middleware.human_in_the_loop import (
Action,
HumanInTheLoopMiddleware,
)
from langchain.agents.middleware.todo import (
from langchain.agents.middleware.todo_list import (
TodoListMiddleware,
PlanningState,
WRITE_TODOS_SYSTEM_PROMPT,

View File

@@ -7,7 +7,7 @@ from typing import cast
from langchain_core.language_models.fake_chat_models import GenericFakeChatModel
from langchain_core.messages import AIMessage
from langchain.agents.middleware.todo import TodoListMiddleware
from langchain.agents.middleware.todo_list import TodoListMiddleware
from langchain.agents.middleware.types import ModelRequest, ModelResponse
from langgraph.runtime import Runtime

View File

@@ -1607,6 +1607,7 @@ lint = [
{ name = "ruff" },
]
test = [
{ name = "langchain-anthropic" },
{ name = "langchain-openai" },
{ name = "langchain-tests" },
{ name = "pytest" },
@@ -1658,6 +1659,7 @@ provides-extras = ["community", "anthropic", "openai", "google-vertexai", "googl
[package.metadata.requires-dev]
lint = [{ name = "ruff", specifier = ">=0.12.2,<0.13.0" }]
test = [
{ name = "langchain-anthropic" },
{ name = "langchain-openai", editable = "../partners/openai" },
{ name = "langchain-tests", editable = "../standard-tests" },
{ name = "pytest", specifier = ">=8.0.0,<9.0.0" },