docstrings and general improvements

This commit is contained in:
Sydney Runkle
2025-10-14 21:57:36 -04:00
parent 3757a8a555
commit 853a498dc6
3 changed files with 468 additions and 148 deletions

View File

@@ -11,6 +11,12 @@ from typing_extensions import TypedDict
if TYPE_CHECKING:
from collections.abc import Sequence
# Constants
MEMORIES_PREFIX = "/memories/"
EMPTY_CONTENT_WARNING = "System reminder: File exists but has empty contents"
MAX_LINE_LENGTH = 2000
LINE_NUMBER_WIDTH = 6
class FileData(TypedDict):
"""Data structure for storing file contents with metadata."""
@@ -28,14 +34,28 @@ class FileData(TypedDict):
def file_data_reducer(
left: dict[str, FileData] | None, right: dict[str, FileData | None]
) -> dict[str, FileData]:
"""Custom reducer that merges file updates.
"""Merge file updates with support for deletions.
This reducer enables file deletion by treating `None` values in the right
dictionary as deletion markers. It's designed to work with LangGraph's
state management where annotated reducers control how state updates merge.
Args:
left: Existing files dict.
right: New files dict to merge (None values delete files).
left: Existing files dictionary. May be `None` during initialization.
right: New files dictionary to merge. Files with `None` values are
treated as deletion markers and removed from the result.
Returns:
Merged dict where right overwrites left for matching keys.
Merged dictionary where right overwrites left for matching keys,
and `None` values in right trigger deletions.
Example:
```python
existing = {"/file1.txt": FileData(...), "/file2.txt": FileData(...)}
updates = {"/file2.txt": None, "/file3.txt": FileData(...)}
result = file_data_reducer(existing, updates)
# Result: {"/file1.txt": FileData(...), "/file3.txt": FileData(...)}
```
"""
if left is None:
# Filter out None values when initializing
@@ -43,26 +63,41 @@ def file_data_reducer(
# Merge, filtering out None values (deletions)
result = {**left}
for k, v in right.items():
if v is None:
result.pop(k, None)
for key, value in right.items():
if value is None:
result.pop(key, None)
else:
result[k] = v
result[key] = value
return result
def validate_path(path: str, *, allowed_prefixes: Sequence[str] | None = None) -> str:
"""Validate and normalize file path for security.
Ensures paths are safe to use by preventing directory traversal attacks
and enforcing consistent formatting. All paths are normalized to use
forward slashes and start with a leading slash.
Args:
path: The path to validate.
allowed_prefixes: Optional list of allowed path prefixes.
path: The path to validate and normalize.
allowed_prefixes: Optional list of allowed path prefixes. If provided,
the normalized path must start with one of these prefixes.
Returns:
Normalized canonical path.
Normalized canonical path starting with `/` and using forward slashes.
Raises:
ValueError: If path contains traversal sequences or violates prefix rules.
ValueError: If path contains traversal sequences (`..` or `~`) or does
not start with an allowed prefix when `allowed_prefixes` is specified.
Example:
```python
validate_path("foo/bar") # Returns: "/foo/bar"
validate_path("/./foo//bar") # Returns: "/foo/bar"
validate_path("../etc/passwd") # Raises ValueError
validate_path("/data/file.txt", allowed_prefixes=["/data/"]) # OK
validate_path("/etc/file.txt", allowed_prefixes=["/data/"]) # Raises ValueError
```
"""
# Reject paths with traversal attempts
if ".." in path or path.startswith("~"):
@@ -95,15 +130,30 @@ def format_content_with_line_numbers(
format_style: Literal["pipe", "tab"] = "pipe",
start_line: int = 1,
) -> str:
r"""Format file content with line numbers.
r"""Format file content with line numbers for display.
Converts file content to a numbered format similar to `cat -n` output,
with support for two different formatting styles.
Args:
content: File content as string or list of lines.
format_style: "pipe" for "1|content" or "tab" for " 1\tcontent".
start_line: Starting line number.
content: File content as a string or list of lines.
format_style: Format style for line numbers:
- `"pipe"`: Compact format like `"1|content"`
- `"tab"`: Right-aligned format like `" 1\tcontent"` (lines truncated at 2000 chars)
start_line: Starting line number (default: 1).
Returns:
Formatted content with line numbers.
Formatted content with line numbers prepended to each line.
Example:
```python
content = "Hello\nWorld"
format_content_with_line_numbers(content, format_style="pipe")
# Returns: "1|Hello\n2|World"
format_content_with_line_numbers(content, format_style="tab", start_line=10)
# Returns: " 10\tHello\n 11\tWorld"
```
"""
if isinstance(content, str):
lines = content.split("\n")
@@ -116,7 +166,11 @@ def format_content_with_line_numbers(
if format_style == "pipe":
return "\n".join(f"{i + start_line}|{line}" for i, line in enumerate(lines))
return "\n".join(f"{i + start_line:6d}\t{line[:2000]}" for i, line in enumerate(lines))
# Tab format with defined width and line truncation
return "\n".join(
f"{i + start_line:{LINE_NUMBER_WIDTH}d}\t{line[:MAX_LINE_LENGTH]}"
for i, line in enumerate(lines)
)
def apply_string_replacement(
@@ -126,22 +180,36 @@ def apply_string_replacement(
*,
replace_all: bool = False,
) -> tuple[str, int]:
"""Apply string replacement to content.
"""Apply exact string replacement to content.
Replaces occurrences of a string within content and returns both the
modified content and the number of replacements made.
Args:
content: Original content.
old_string: String to replace.
content: Original content to modify.
old_string: String to find and replace.
new_string: Replacement string.
replace_all: If True, replace all occurrences. Otherwise, replace first.
replace_all: If `True`, replace all occurrences. If `False`, replace
only the first occurrence (default).
Returns:
Tuple of (new_content, replacement_count).
Tuple of `(modified_content, replacement_count)`.
Example:
```python
content = "foo bar foo"
apply_string_replacement(content, "foo", "baz", replace_all=False)
# Returns: ("baz bar foo", 1)
apply_string_replacement(content, "foo", "baz", replace_all=True)
# Returns: ("baz bar baz", 2)
```
"""
if replace_all:
count = content.count(old_string)
new_content = content.replace(old_string, new_string)
else:
count = 1
count = 1 if old_string in content else 0
new_content = content.replace(old_string, new_string, 1)
return new_content, count
@@ -152,17 +220,24 @@ def create_file_data(
*,
created_at: str | None = None,
) -> FileData:
"""Create a FileData object from content.
r"""Create a FileData object with automatic timestamp generation.
Args:
content: File content as string or list of lines.
created_at: Optional creation timestamp. If None, uses current time.
content: File content as a string or list of lines.
created_at: Optional creation timestamp in ISO 8601 format.
If `None`, uses the current UTC time.
Returns:
FileData object.
FileData object with content and timestamps.
Example:
```python
file_data = create_file_data("Hello\nWorld")
# Returns: {"content": ["Hello", "World"], "created_at": "2024-...",
# "modified_at": "2024-..."}
```
"""
lines = content.split("\n") if isinstance(content, str) else content
now = datetime.now(timezone.utc).isoformat()
return {
@@ -176,17 +251,25 @@ def update_file_data(
file_data: FileData,
content: str | list[str],
) -> FileData:
"""Update a FileData object with new content.
"""Update FileData with new content while preserving creation timestamp.
Args:
file_data: Existing FileData object.
content: New file content as string or list of lines.
file_data: Existing FileData object to update.
content: New file content as a string or list of lines.
Returns:
Updated FileData object with new modified_at timestamp.
Updated FileData object with new content and updated `modified_at`
timestamp. The `created_at` timestamp is preserved from the original.
Example:
```python
original = create_file_data("Hello")
updated = update_file_data(original, "Hello World")
# updated["created_at"] == original["created_at"]
# updated["modified_at"] > original["modified_at"]
```
"""
lines = content.split("\n") if isinstance(content, str) else content
now = datetime.now(timezone.utc).isoformat()
return {
@@ -197,26 +280,54 @@ def update_file_data(
def file_data_to_string(file_data: FileData) -> str:
"""Convert FileData to plain string content.
r"""Convert FileData to plain string content.
Joins the lines stored in FileData with newline characters to produce
a single string representation of the file content.
Args:
file_data: FileData object.
file_data: FileData object containing lines of content.
Returns:
File content as string.
File content as a single string with lines joined by newlines.
Example:
```python
file_data = {
"content": ["Hello", "World"],
"created_at": "...",
"modified_at": "...",
}
file_data_to_string(file_data) # Returns: "Hello\nWorld"
```
"""
return "\n".join(file_data["content"])
def list_directory(files: dict[str, FileData], path: str) -> list[str]:
"""List files in a directory.
"""List files in a directory (direct children only).
Returns only the direct children of the specified directory path,
excluding files in subdirectories.
Args:
files: Files dict mapping paths to FileData.
path: Normalized directory path.
files: Dictionary mapping file paths to FileData objects.
path: Normalized directory path to list files from.
Returns:
Sorted list of file paths in the directory.
Sorted list of file paths that are direct children of the directory.
Example:
```python
files = {
"/dir/file1.txt": FileData(...),
"/dir/file2.txt": FileData(...),
"/dir/subdir/file3.txt": FileData(...),
}
list_directory(files, "/dir")
# Returns: ["/dir/file1.txt", "/dir/file2.txt"]
# Note: /dir/subdir/file3.txt is excluded (not a direct child)
```
"""
# Ensure path ends with / for directory matching
dir_path = path if path.endswith("/") else f"{path}/"
@@ -234,50 +345,79 @@ def list_directory(files: dict[str, FileData], path: str) -> list[str]:
def check_empty_content(content: str) -> str | None:
"""Check if file content is empty and return warning message.
"""Check if file content is empty and return a warning message.
Args:
content: File content.
content: File content to check.
Returns:
Warning message if empty, None otherwise.
Warning message string if content is empty or contains only whitespace,
`None` otherwise.
Example:
```python
check_empty_content("") # Returns: "System reminder: File exists but has empty contents"
check_empty_content(" ") # Returns: "System reminder: File exists but has empty contents"
check_empty_content("Hello") # Returns: None
```
"""
if not content or content.strip() == "":
return "System reminder: File exists but has empty contents"
return EMPTY_CONTENT_WARNING
return None
def has_memories_prefix(file_path: str) -> bool:
"""Check if file path has the memories prefix.
"""Check if a file path is in the longterm memory filesystem.
Longterm memory files are distinguished by the `/memories/` path prefix.
Args:
file_path: File path.
file_path: File path to check.
Returns:
True if file path has the memories prefix, False otherwise.
`True` if the file path starts with `/memories/`, `False` otherwise.
Example:
```python
has_memories_prefix("/memories/notes.txt") # Returns: True
has_memories_prefix("/temp/file.txt") # Returns: False
```
"""
return file_path.startswith("/memories/")
return file_path.startswith(MEMORIES_PREFIX)
def append_memories_prefix(file_path: str) -> str:
"""Append the memories prefix to a file path.
"""Add the longterm memory prefix to a file path.
Args:
file_path: File path.
file_path: File path to prefix.
Returns:
File path with the memories prefix.
File path with `/memories` prepended.
Example:
```python
append_memories_prefix("/notes.txt") # Returns: "/memories/notes.txt"
```
"""
return f"/memories{file_path}"
def strip_memories_prefix(file_path: str) -> str:
"""Strip the memories prefix from a file path.
"""Remove the longterm memory prefix from a file path.
Args:
file_path: File path.
file_path: File path potentially containing the memories prefix.
Returns:
File path without the memories prefix.
File path with `/memories` removed if present at the start.
Example:
```python
strip_memories_prefix("/memories/notes.txt") # Returns: "/notes.txt"
strip_memories_prefix("/notes.txt") # Returns: "/notes.txt"
```
"""
return file_path.replace("/memories", "")
if file_path.startswith(MEMORIES_PREFIX):
return file_path[len(MEMORIES_PREFIX) - 1 :] # Keep the leading slash
return file_path

View File

@@ -37,6 +37,11 @@ from langchain.agents.middleware.types import (
)
from langchain.tools.tool_node import InjectedState
# Constants
LONGTERM_MEMORY_PREFIX = "/memories/"
DEFAULT_READ_OFFSET = 0
DEFAULT_READ_LIMIT = 2000
class FilesystemState(AgentState):
"""State for the filesystem middleware."""
@@ -52,9 +57,7 @@ Usage:
- You can optionally provide a path parameter to list files in a specific directory.
- This is very useful for exploring the file system and finding the right file to read or edit.
- You should almost ALWAYS use this tool before using the Read or Edit tools."""
LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = (
"\n- Files from the longterm filesystem will be prefixed with the /memories/ path."
)
LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- Files from the longterm filesystem will be prefixed with the {LONGTERM_MEMORY_PREFIX} path."
READ_FILE_TOOL_DESCRIPTION = """Reads a file from the filesystem. You can access any file directly by using this tool.
Assume this tool is able to read all files on the machine. If the User provides a path to a file assume that path is valid. It is okay to read a file that does not exist; an error will be returned.
@@ -68,9 +71,7 @@ Usage:
- You have the capability to call multiple tools in a single response. It is always better to speculatively read multiple files as a batch that are potentially useful.
- If you read a file that exists but has empty contents you will receive a system reminder warning in place of file contents.
- You should ALWAYS make sure a file has been read before editing it."""
READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = (
"\n- file_paths prefixed with the /memories/ path will be read from the longterm filesystem."
)
READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- file_paths prefixed with the {LONGTERM_MEMORY_PREFIX} path will be read from the longterm filesystem."
EDIT_FILE_TOOL_DESCRIPTION = """Performs exact string replacements in files.
@@ -81,7 +82,7 @@ Usage:
- Only use emojis if the user explicitly requests it. Avoid adding emojis to files unless asked.
- The edit will FAIL if `old_string` is not unique in the file. Either provide a larger string with more surrounding context to make it unique or use `replace_all` to change every instance of `old_string`.
- Use `replace_all` for replacing and renaming strings across the file. This parameter is useful if you want to rename a variable for instance."""
EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = "\n- You can edit files in the longterm filesystem by prefixing the filename with the /memories/ path."
EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- You can edit files in the longterm filesystem by prefixing the filename with the {LONGTERM_MEMORY_PREFIX} path."
WRITE_FILE_TOOL_DESCRIPTION = """Writes to a new file in the filesystem.
@@ -91,9 +92,7 @@ Usage:
- The write_file tool will create the a new file.
- Prefer to edit existing files over creating new ones when possible.
- file_paths prefixed with the /memories/ path will be written to the longterm filesystem."""
WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = (
"\n- file_paths prefixed with the /memories/ path will be written to the longterm filesystem."
)
WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- file_paths prefixed with the {LONGTERM_MEMORY_PREFIX} path will be written to the longterm filesystem."
FILESYSTEM_SYSTEM_PROMPT = """## Filesystem Tools `ls`, `read_file`, `write_file`, `edit_file`
@@ -104,14 +103,23 @@ All file paths must start with a /.
- read_file: read a file from the filesystem
- write_file: write to a file in the filesystem
- edit_file: edit a file in the filesystem"""
FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT = """
FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT = f"""
You also have access to a longterm filesystem in which you can store files that you want to keep around for longer than the current conversation.
In order to interact with the longterm filesystem, you can use those same tools, but filenames must be prefixed with the /memories/ path.
Remember, to interact with the longterm filesystem, you must prefix the filename with the /memories/ path."""
In order to interact with the longterm filesystem, you can use those same tools, but filenames must be prefixed with the {LONGTERM_MEMORY_PREFIX} path.
Remember, to interact with the longterm filesystem, you must prefix the filename with the {LONGTERM_MEMORY_PREFIX} path."""
def _get_namespace() -> tuple[str] | tuple[str, str]:
"""Get the namespace for longterm filesystem storage.
Returns a tuple for organizing files in the store. If an assistant_id is available
in the config metadata, returns a 2-tuple of (assistant_id, "filesystem") to provide
per-assistant isolation. Otherwise, returns a 1-tuple of ("filesystem",) for shared storage.
Returns:
Namespace tuple for store operations, either `(assistant_id, "filesystem")` or `("filesystem",)`.
"""
namespace = "filesystem"
config = get_config()
if config is None:
@@ -123,6 +131,17 @@ def _get_namespace() -> tuple[str] | tuple[str, str]:
def _get_store(runtime: Runtime[Any]) -> BaseStore:
"""Get the store from the runtime, raising an error if unavailable.
Args:
runtime: The LangGraph runtime containing the store.
Returns:
The BaseStore instance for longterm file storage.
Raises:
ValueError: If longterm memory is enabled but no store is available in runtime.
"""
if runtime.store is None:
msg = "Longterm memory is enabled, but no store is available"
raise ValueError(msg)
@@ -130,16 +149,27 @@ def _get_store(runtime: Runtime[Any]) -> BaseStore:
def _convert_store_item_to_file_data(store_item: Item) -> FileData:
"""Convert a store Item to FileData format.
Args:
store_item: The store Item containing file data.
Returns:
FileData with content, created_at, and modified_at fields.
Raises:
ValueError: If required fields are missing or have incorrect types.
"""
if "content" not in store_item.value or not isinstance(store_item.value["content"], list):
msg = "Store item does not contain content"
msg = f"Store item does not contain valid content field. Got: {store_item.value.keys()}"
raise ValueError(msg)
if "created_at" not in store_item.value or not isinstance(store_item.value["created_at"], str):
msg = "Store item does not contain created_at"
msg = f"Store item does not contain valid created_at field. Got: {store_item.value.keys()}"
raise ValueError(msg)
if "modified_at" not in store_item.value or not isinstance(
store_item.value["modified_at"], str
):
msg = "Store item does not contain modified_at"
msg = f"Store item does not contain valid modified_at field. Got: {store_item.value.keys()}"
raise ValueError(msg)
return FileData(
content=store_item.value["content"],
@@ -149,6 +179,14 @@ def _convert_store_item_to_file_data(store_item: Item) -> FileData:
def _convert_file_data_to_store_item(file_data: FileData) -> dict[str, Any]:
"""Convert FileData to a dict suitable for store.put().
Args:
file_data: The FileData to convert.
Returns:
Dictionary with content, created_at, and modified_at fields.
"""
return {
"content": file_data["content"],
"created_at": file_data["created_at"],
@@ -157,6 +195,18 @@ def _convert_file_data_to_store_item(file_data: FileData) -> dict[str, Any]:
def _get_file_data_from_state(state: FilesystemState, file_path: str) -> FileData:
"""Retrieve file data from the agent's state.
Args:
state: The current filesystem state.
file_path: The path of the file to retrieve.
Returns:
The FileData for the requested file.
Raises:
ValueError: If the file is not found in state.
"""
mock_filesystem = state.get("files", {})
if file_path not in mock_filesystem:
msg = f"File '{file_path}' not found"
@@ -165,25 +215,51 @@ def _get_file_data_from_state(state: FilesystemState, file_path: str) -> FileDat
def _ls_tool_generator(
custom_description: str | None = None, *, has_longterm_memory: bool
custom_description: str | None = None, *, long_term_memory: bool
) -> BaseTool:
"""Generate the ls (list files) tool.
Args:
custom_description: Optional custom description for the tool.
long_term_memory: Whether to enable longterm memory support.
Returns:
Configured ls tool that lists files from state and optionally from longterm store.
"""
tool_description = LIST_FILES_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif has_longterm_memory:
elif long_term_memory:
tool_description += LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
def _get_filenames_from_state(state: FilesystemState) -> list[str]:
"""Extract list of filenames from the filesystem state.
Args:
state: The current filesystem state.
Returns:
List of file paths in the state.
"""
files_dict = state.get("files", {})
return list(files_dict.keys())
def _filter_files_by_path(filenames: list[str], path: str | None) -> list[str]:
"""Filter filenames by path prefix.
Args:
filenames: List of file paths to filter.
path: Optional path prefix to filter by.
Returns:
Filtered list of file paths matching the prefix.
"""
if path is None:
return filenames
normalized_path = validate_path(path)
return [f for f in filenames if f.startswith(normalized_path)]
if has_longterm_memory:
if long_term_memory:
@tool(description=tool_description)
def ls(
@@ -211,15 +287,34 @@ def _ls_tool_generator(
def _read_file_tool_generator(
custom_description: str | None = None, *, has_longterm_memory: bool
custom_description: str | None = None, *, long_term_memory: bool
) -> BaseTool:
"""Generate the read_file tool.
Args:
custom_description: Optional custom description for the tool.
long_term_memory: Whether to enable longterm memory support.
Returns:
Configured read_file tool that reads files from state and optionally from longterm store.
"""
tool_description = READ_FILE_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif has_longterm_memory:
elif long_term_memory:
tool_description += READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
def _read_file_data_content(file_data: FileData, offset: int, limit: int) -> str:
"""Read and format file content with line numbers.
Args:
file_data: The file data to read.
offset: Line offset to start reading from (0-indexed).
limit: Maximum number of lines to read.
Returns:
Formatted file content with line numbers, or an error message.
"""
content = file_data_to_string(file_data)
empty_msg = check_empty_content(content)
if empty_msg:
@@ -234,14 +329,14 @@ def _read_file_tool_generator(
selected_lines, format_style="tab", start_line=start_idx + 1
)
if has_longterm_memory:
if long_term_memory:
@tool(description=tool_description)
def read_file(
file_path: str,
state: Annotated[FilesystemState, InjectedState],
offset: int = 0,
limit: int = 2000,
offset: int = DEFAULT_READ_OFFSET,
limit: int = DEFAULT_READ_LIMIT,
) -> str:
file_path = validate_path(file_path)
if has_memories_prefix(file_path):
@@ -266,8 +361,8 @@ def _read_file_tool_generator(
def read_file(
file_path: str,
state: Annotated[FilesystemState, InjectedState],
offset: int = 0,
limit: int = 2000,
offset: int = DEFAULT_READ_OFFSET,
limit: int = DEFAULT_READ_LIMIT,
) -> str:
file_path = validate_path(file_path)
try:
@@ -280,17 +375,37 @@ def _read_file_tool_generator(
def _write_file_tool_generator(
custom_description: str | None = None, *, has_longterm_memory: bool
custom_description: str | None = None, *, long_term_memory: bool
) -> BaseTool:
"""Generate the write_file tool.
Args:
custom_description: Optional custom description for the tool.
long_term_memory: Whether to enable longterm memory support.
Returns:
Configured write_file tool that creates new files in state or longterm store.
"""
tool_description = WRITE_FILE_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif has_longterm_memory:
elif long_term_memory:
tool_description += WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
def _write_file_to_state(
state: FilesystemState, tool_call_id: str, file_path: str, content: str
) -> Command | str:
"""Write a new file to the filesystem state.
Args:
state: The current filesystem state.
tool_call_id: ID of the tool call for generating ToolMessage.
file_path: The path where the file should be written.
content: The content to write to the file.
Returns:
Command to update state with new file, or error string if file exists.
"""
mock_filesystem = state.get("files", {})
existing = mock_filesystem.get(file_path)
if existing:
@@ -303,7 +418,7 @@ def _write_file_tool_generator(
}
)
if has_longterm_memory:
if long_term_memory:
@tool(description=tool_description)
def write_file(
@@ -343,15 +458,54 @@ def _write_file_tool_generator(
def _edit_file_tool_generator(
custom_description: str | None = None, *, has_longterm_memory: bool
custom_description: str | None = None, *, long_term_memory: bool
) -> BaseTool:
"""Generate the edit_file tool.
Args:
custom_description: Optional custom description for the tool.
long_term_memory: Whether to enable longterm memory support.
Returns:
Configured edit_file tool that performs string replacements in files.
"""
tool_description = EDIT_FILE_TOOL_DESCRIPTION
if custom_description:
tool_description = custom_description
elif has_longterm_memory:
elif long_term_memory:
tool_description += EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT
if has_longterm_memory:
def _perform_file_edit(
file_data: FileData,
old_string: str,
new_string: str,
*,
replace_all: bool = False,
) -> tuple[FileData, str] | str:
"""Perform string replacement on file data.
Args:
file_data: The file data to edit.
old_string: String to find and replace.
new_string: Replacement string.
replace_all: If True, replace all occurrences.
Returns:
Tuple of (updated_file_data, success_message) on success,
or error string on failure.
"""
content = file_data_to_string(file_data)
occurrences = content.count(old_string)
if occurrences == 0:
return f"Error: String not found in file: '{old_string}'"
if occurrences > 1 and not replace_all:
return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context."
new_content = content.replace(old_string, new_string)
new_file_data = update_file_data(file_data, new_content)
result_msg = f"Successfully replaced {occurrences} instance(s) of the string"
return new_file_data, result_msg
if long_term_memory:
@tool(description=tool_description)
def edit_file(
@@ -365,6 +519,8 @@ def _edit_file_tool_generator(
) -> Command | str:
file_path = validate_path(file_path)
is_longterm_memory = has_memories_prefix(file_path)
# Retrieve file data from appropriate storage
if is_longterm_memory:
stripped_file_path = strip_memories_prefix(file_path)
runtime = get_runtime()
@@ -380,26 +536,25 @@ def _edit_file_tool_generator(
except ValueError as e:
return str(e)
content = file_data_to_string(file_data)
occurrences = content.count(old_string)
if occurrences == 0:
return f"Error: String not found in file: '{old_string}'"
if occurrences > 1 and not replace_all:
return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context."
new_content = content.replace(old_string, new_string)
new_file_data = update_file_data(file_data, new_content)
result_msg = (
f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'"
)
# Perform the edit
result = _perform_file_edit(file_data, old_string, new_string, replace_all=replace_all)
if isinstance(result, str): # Error message
return result
new_file_data, result_msg = result
full_msg = f"{result_msg} in '{file_path}'"
# Save to appropriate storage
if is_longterm_memory:
store.put(
namespace, stripped_file_path, _convert_file_data_to_store_item(new_file_data)
)
return result_msg
return full_msg
return Command(
update={
"files": {file_path: new_file_data},
"messages": [ToolMessage(result_msg, tool_call_id=tool_call_id)],
"messages": [ToolMessage(full_msg, tool_call_id=tool_call_id)],
}
)
else:
@@ -415,25 +570,25 @@ def _edit_file_tool_generator(
replace_all: bool = False,
) -> Command | str:
file_path = validate_path(file_path)
# Retrieve file data from state
try:
file_data = _get_file_data_from_state(state, file_path)
except ValueError as e:
return str(e)
content = file_data_to_string(file_data)
occurrences = content.count(old_string)
if occurrences == 0:
return f"Error: String not found in file: '{old_string}'"
if occurrences > 1 and not replace_all:
return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context."
new_content = content.replace(old_string, new_string)
new_file_data = update_file_data(file_data, new_content)
result_msg = (
f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'"
)
# Perform the edit
result = _perform_file_edit(file_data, old_string, new_string, replace_all=replace_all)
if isinstance(result, str): # Error message
return result
new_file_data, result_msg = result
full_msg = f"{result_msg} in '{file_path}'"
return Command(
update={
"files": {file_path: new_file_data},
"messages": [ToolMessage(result_msg, tool_call_id=tool_call_id)],
"messages": [ToolMessage(full_msg, tool_call_id=tool_call_id)],
}
)
@@ -449,23 +604,23 @@ TOOL_GENERATORS = {
def _get_filesystem_tools(
custom_tool_descriptions: dict[str, str] | None = None, *, has_longterm_memory: bool
custom_tool_descriptions: dict[str, str] | None = None, *, long_term_memory: bool
) -> list[BaseTool]:
"""Get filesystem tools.
Args:
has_longterm_memory: Whether to enable longterm memory support.
custom_tool_descriptions: Optional custom descriptions for tools.
long_term_memory: Whether to enable longterm memory support.
Returns:
List of configured filesystem tools.
List of configured filesystem tools (ls, read_file, write_file, edit_file).
"""
if custom_tool_descriptions is None:
custom_tool_descriptions = {}
tools = []
for tool_name, tool_generator in TOOL_GENERATORS.items():
tool = tool_generator(
custom_tool_descriptions.get(tool_name), has_longterm_memory=has_longterm_memory
custom_tool_descriptions.get(tool_name), long_term_memory=long_term_memory
)
tools.append(tool)
return tools
@@ -474,13 +629,15 @@ def _get_filesystem_tools(
class FilesystemMiddleware(AgentMiddleware):
"""Middleware for providing filesystem tools to an agent.
Args:
use_longterm_memory: Whether to enable longterm memory support.
system_prompt_extension: Optional custom system prompt.
custom_tool_descriptions: Optional custom tool descriptions.
This middleware adds four filesystem tools to the agent: ls, read_file, write_file,
and edit_file. Files can be stored in two locations:
- Short-term: In the agent's state (ephemeral, lasts only for the conversation)
- Long-term: In a persistent store (persists across conversations when enabled)
Returns:
List of configured filesystem tools.
Args:
long_term_memory: Whether to enable longterm memory support.
system_prompt_extension: Optional custom system prompt override.
custom_tool_descriptions: Optional custom tool descriptions override.
Raises:
ValueError: If longterm memory is enabled but no store is available.
@@ -490,7 +647,11 @@ class FilesystemMiddleware(AgentMiddleware):
from langchain.agents.middleware.filesystem import FilesystemMiddleware
from langchain.agents import create_agent
agent = create_agent(middleware=[FilesystemMiddleware(use_longterm_memory=False)])
# Short-term memory only
agent = create_agent(middleware=[FilesystemMiddleware(long_term_memory=False)])
# With long-term memory
agent = create_agent(middleware=[FilesystemMiddleware(long_term_memory=True)])
```
"""
@@ -499,31 +660,42 @@ class FilesystemMiddleware(AgentMiddleware):
def __init__(
self,
*,
use_longterm_memory: bool = False,
long_term_memory: bool = False,
system_prompt_extension: str | None = None,
custom_tool_descriptions: dict[str, str] | None = None,
) -> None:
"""Initialize the filesystem middleware.
Args:
use_longterm_memory: Whether to enable longterm memory support.
system_prompt_extension: Optional custom system prompt.
custom_tool_descriptions: Optional custom tool descriptions.
long_term_memory: Whether to enable longterm memory support.
system_prompt_extension: Optional custom system prompt override.
custom_tool_descriptions: Optional custom tool descriptions override.
"""
self.use_longterm_memory = use_longterm_memory
self.long_term_memory = long_term_memory
self.system_prompt_extension = FILESYSTEM_SYSTEM_PROMPT
if system_prompt_extension is not None:
self.system_prompt_extension = system_prompt_extension
elif use_longterm_memory:
elif long_term_memory:
self.system_prompt_extension += FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT
self.tools = _get_filesystem_tools(
custom_tool_descriptions, has_longterm_memory=use_longterm_memory
custom_tool_descriptions, long_term_memory=long_term_memory
)
def before_model_call(self, request: ModelRequest, runtime: Runtime[Any]) -> ModelRequest:
"""If use_longterm_memory is True, we must have a store available."""
if self.use_longterm_memory and runtime.store is None:
"""Validate that store is available if longterm memory is enabled.
Args:
request: The model request being processed.
runtime: The LangGraph runtime.
Returns:
The unmodified model request.
Raises:
ValueError: If long_term_memory is True but runtime.store is None.
"""
if self.long_term_memory and runtime.store is None:
msg = "Longterm memory is enabled, but no store is available"
raise ValueError(msg)
return request
@@ -533,7 +705,15 @@ class FilesystemMiddleware(AgentMiddleware):
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
"""Update the system prompt to include instructions on using the filesystem."""
"""Update the system prompt to include instructions on using the filesystem.
Args:
request: The model request being processed.
handler: The handler function to call with the modified request.
Returns:
The model response from the handler.
"""
if self.system_prompt_extension is not None:
request.system_prompt = (
request.system_prompt + "\n\n" + self.system_prompt_extension

View File

@@ -11,14 +11,14 @@ from langgraph.runtime import Runtime
class TestFilesystem:
def test_init_local(self):
middleware = FilesystemMiddleware(use_longterm_memory=False)
assert middleware.use_longterm_memory is False
middleware = FilesystemMiddleware(long_term_memory=False)
assert middleware.long_term_memory is False
assert middleware.system_prompt_extension == FILESYSTEM_SYSTEM_PROMPT
assert len(middleware.tools) == 4
def test_init_longterm(self):
middleware = FilesystemMiddleware(use_longterm_memory=True)
assert middleware.use_longterm_memory is True
middleware = FilesystemMiddleware(long_term_memory=True)
assert middleware.long_term_memory is True
assert middleware.system_prompt_extension == (
FILESYSTEM_SYSTEM_PROMPT + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT
)
@@ -26,34 +26,34 @@ class TestFilesystem:
def test_init_custom_system_prompt_shortterm(self):
middleware = FilesystemMiddleware(
use_longterm_memory=False, system_prompt_extension="Custom system prompt"
long_term_memory=False, system_prompt_extension="Custom system prompt"
)
assert middleware.use_longterm_memory is False
assert middleware.long_term_memory is False
assert middleware.system_prompt_extension == "Custom system prompt"
assert len(middleware.tools) == 4
def test_init_custom_system_prompt_longterm(self):
middleware = FilesystemMiddleware(
use_longterm_memory=True, system_prompt_extension="Custom system prompt"
long_term_memory=True, system_prompt_extension="Custom system prompt"
)
assert middleware.use_longterm_memory is True
assert middleware.long_term_memory is True
assert middleware.system_prompt_extension == "Custom system prompt"
assert len(middleware.tools) == 4
def test_init_custom_tool_descriptions_shortterm(self):
middleware = FilesystemMiddleware(
use_longterm_memory=False, custom_tool_descriptions={"ls": "Custom ls tool description"}
long_term_memory=False, custom_tool_descriptions={"ls": "Custom ls tool description"}
)
assert middleware.use_longterm_memory is False
assert middleware.long_term_memory is False
assert middleware.system_prompt_extension == FILESYSTEM_SYSTEM_PROMPT
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
assert ls_tool.description == "Custom ls tool description"
def test_init_custom_tool_descriptions_longterm(self):
middleware = FilesystemMiddleware(
use_longterm_memory=True, custom_tool_descriptions={"ls": "Custom ls tool description"}
long_term_memory=True, custom_tool_descriptions={"ls": "Custom ls tool description"}
)
assert middleware.use_longterm_memory is True
assert middleware.long_term_memory is True
assert middleware.system_prompt_extension == (
FILESYSTEM_SYSTEM_PROMPT + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT
)
@@ -76,7 +76,7 @@ class TestFilesystem:
),
},
)
middleware = FilesystemMiddleware(use_longterm_memory=False)
middleware = FilesystemMiddleware(long_term_memory=False)
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
result = ls_tool.invoke({"state": state})
assert result == ["test.txt", "test2.txt"]
@@ -107,7 +107,7 @@ class TestFilesystem:
),
},
)
middleware = FilesystemMiddleware(use_longterm_memory=False)
middleware = FilesystemMiddleware(long_term_memory=False)
ls_tool = next(tool for tool in middleware.tools if tool.name == "ls")
result = ls_tool.invoke({"state": state, "path": "pokemon/"})
assert "/pokemon/test2.txt" in result