From 853a498dc6dcd2201cf507d03971abe7cb151f14 Mon Sep 17 00:00:00 2001 From: Sydney Runkle Date: Tue, 14 Oct 2025 21:57:36 -0400 Subject: [PATCH] docstrings and general improvements --- .../langchain/agents/_internal/file_utils.py | 252 ++++++++++--- .../langchain/agents/middleware/filesystem.py | 336 ++++++++++++++---- .../middleware/test_filesystem_middleware.py | 28 +- 3 files changed, 468 insertions(+), 148 deletions(-) diff --git a/libs/langchain_v1/langchain/agents/_internal/file_utils.py b/libs/langchain_v1/langchain/agents/_internal/file_utils.py index 9449a803a24..2db1e440487 100644 --- a/libs/langchain_v1/langchain/agents/_internal/file_utils.py +++ b/libs/langchain_v1/langchain/agents/_internal/file_utils.py @@ -11,6 +11,12 @@ from typing_extensions import TypedDict if TYPE_CHECKING: from collections.abc import Sequence +# Constants +MEMORIES_PREFIX = "/memories/" +EMPTY_CONTENT_WARNING = "System reminder: File exists but has empty contents" +MAX_LINE_LENGTH = 2000 +LINE_NUMBER_WIDTH = 6 + class FileData(TypedDict): """Data structure for storing file contents with metadata.""" @@ -28,14 +34,28 @@ class FileData(TypedDict): def file_data_reducer( left: dict[str, FileData] | None, right: dict[str, FileData | None] ) -> dict[str, FileData]: - """Custom reducer that merges file updates. + """Merge file updates with support for deletions. + + This reducer enables file deletion by treating `None` values in the right + dictionary as deletion markers. It's designed to work with LangGraph's + state management where annotated reducers control how state updates merge. Args: - left: Existing files dict. - right: New files dict to merge (None values delete files). + left: Existing files dictionary. May be `None` during initialization. + right: New files dictionary to merge. Files with `None` values are + treated as deletion markers and removed from the result. Returns: - Merged dict where right overwrites left for matching keys. + Merged dictionary where right overwrites left for matching keys, + and `None` values in right trigger deletions. + + Example: + ```python + existing = {"/file1.txt": FileData(...), "/file2.txt": FileData(...)} + updates = {"/file2.txt": None, "/file3.txt": FileData(...)} + result = file_data_reducer(existing, updates) + # Result: {"/file1.txt": FileData(...), "/file3.txt": FileData(...)} + ``` """ if left is None: # Filter out None values when initializing @@ -43,26 +63,41 @@ def file_data_reducer( # Merge, filtering out None values (deletions) result = {**left} - for k, v in right.items(): - if v is None: - result.pop(k, None) + for key, value in right.items(): + if value is None: + result.pop(key, None) else: - result[k] = v + result[key] = value return result def validate_path(path: str, *, allowed_prefixes: Sequence[str] | None = None) -> str: """Validate and normalize file path for security. + Ensures paths are safe to use by preventing directory traversal attacks + and enforcing consistent formatting. All paths are normalized to use + forward slashes and start with a leading slash. + Args: - path: The path to validate. - allowed_prefixes: Optional list of allowed path prefixes. + path: The path to validate and normalize. + allowed_prefixes: Optional list of allowed path prefixes. If provided, + the normalized path must start with one of these prefixes. Returns: - Normalized canonical path. + Normalized canonical path starting with `/` and using forward slashes. Raises: - ValueError: If path contains traversal sequences or violates prefix rules. + ValueError: If path contains traversal sequences (`..` or `~`) or does + not start with an allowed prefix when `allowed_prefixes` is specified. + + Example: + ```python + validate_path("foo/bar") # Returns: "/foo/bar" + validate_path("/./foo//bar") # Returns: "/foo/bar" + validate_path("../etc/passwd") # Raises ValueError + validate_path("/data/file.txt", allowed_prefixes=["/data/"]) # OK + validate_path("/etc/file.txt", allowed_prefixes=["/data/"]) # Raises ValueError + ``` """ # Reject paths with traversal attempts if ".." in path or path.startswith("~"): @@ -95,15 +130,30 @@ def format_content_with_line_numbers( format_style: Literal["pipe", "tab"] = "pipe", start_line: int = 1, ) -> str: - r"""Format file content with line numbers. + r"""Format file content with line numbers for display. + + Converts file content to a numbered format similar to `cat -n` output, + with support for two different formatting styles. Args: - content: File content as string or list of lines. - format_style: "pipe" for "1|content" or "tab" for " 1\tcontent". - start_line: Starting line number. + content: File content as a string or list of lines. + format_style: Format style for line numbers: + - `"pipe"`: Compact format like `"1|content"` + - `"tab"`: Right-aligned format like `" 1\tcontent"` (lines truncated at 2000 chars) + start_line: Starting line number (default: 1). Returns: - Formatted content with line numbers. + Formatted content with line numbers prepended to each line. + + Example: + ```python + content = "Hello\nWorld" + format_content_with_line_numbers(content, format_style="pipe") + # Returns: "1|Hello\n2|World" + + format_content_with_line_numbers(content, format_style="tab", start_line=10) + # Returns: " 10\tHello\n 11\tWorld" + ``` """ if isinstance(content, str): lines = content.split("\n") @@ -116,7 +166,11 @@ def format_content_with_line_numbers( if format_style == "pipe": return "\n".join(f"{i + start_line}|{line}" for i, line in enumerate(lines)) - return "\n".join(f"{i + start_line:6d}\t{line[:2000]}" for i, line in enumerate(lines)) + # Tab format with defined width and line truncation + return "\n".join( + f"{i + start_line:{LINE_NUMBER_WIDTH}d}\t{line[:MAX_LINE_LENGTH]}" + for i, line in enumerate(lines) + ) def apply_string_replacement( @@ -126,22 +180,36 @@ def apply_string_replacement( *, replace_all: bool = False, ) -> tuple[str, int]: - """Apply string replacement to content. + """Apply exact string replacement to content. + + Replaces occurrences of a string within content and returns both the + modified content and the number of replacements made. Args: - content: Original content. - old_string: String to replace. + content: Original content to modify. + old_string: String to find and replace. new_string: Replacement string. - replace_all: If True, replace all occurrences. Otherwise, replace first. + replace_all: If `True`, replace all occurrences. If `False`, replace + only the first occurrence (default). Returns: - Tuple of (new_content, replacement_count). + Tuple of `(modified_content, replacement_count)`. + + Example: + ```python + content = "foo bar foo" + apply_string_replacement(content, "foo", "baz", replace_all=False) + # Returns: ("baz bar foo", 1) + + apply_string_replacement(content, "foo", "baz", replace_all=True) + # Returns: ("baz bar baz", 2) + ``` """ if replace_all: count = content.count(old_string) new_content = content.replace(old_string, new_string) else: - count = 1 + count = 1 if old_string in content else 0 new_content = content.replace(old_string, new_string, 1) return new_content, count @@ -152,17 +220,24 @@ def create_file_data( *, created_at: str | None = None, ) -> FileData: - """Create a FileData object from content. + r"""Create a FileData object with automatic timestamp generation. Args: - content: File content as string or list of lines. - created_at: Optional creation timestamp. If None, uses current time. + content: File content as a string or list of lines. + created_at: Optional creation timestamp in ISO 8601 format. + If `None`, uses the current UTC time. Returns: - FileData object. + FileData object with content and timestamps. + + Example: + ```python + file_data = create_file_data("Hello\nWorld") + # Returns: {"content": ["Hello", "World"], "created_at": "2024-...", + # "modified_at": "2024-..."} + ``` """ lines = content.split("\n") if isinstance(content, str) else content - now = datetime.now(timezone.utc).isoformat() return { @@ -176,17 +251,25 @@ def update_file_data( file_data: FileData, content: str | list[str], ) -> FileData: - """Update a FileData object with new content. + """Update FileData with new content while preserving creation timestamp. Args: - file_data: Existing FileData object. - content: New file content as string or list of lines. + file_data: Existing FileData object to update. + content: New file content as a string or list of lines. Returns: - Updated FileData object with new modified_at timestamp. + Updated FileData object with new content and updated `modified_at` + timestamp. The `created_at` timestamp is preserved from the original. + + Example: + ```python + original = create_file_data("Hello") + updated = update_file_data(original, "Hello World") + # updated["created_at"] == original["created_at"] + # updated["modified_at"] > original["modified_at"] + ``` """ lines = content.split("\n") if isinstance(content, str) else content - now = datetime.now(timezone.utc).isoformat() return { @@ -197,26 +280,54 @@ def update_file_data( def file_data_to_string(file_data: FileData) -> str: - """Convert FileData to plain string content. + r"""Convert FileData to plain string content. + + Joins the lines stored in FileData with newline characters to produce + a single string representation of the file content. Args: - file_data: FileData object. + file_data: FileData object containing lines of content. Returns: - File content as string. + File content as a single string with lines joined by newlines. + + Example: + ```python + file_data = { + "content": ["Hello", "World"], + "created_at": "...", + "modified_at": "...", + } + file_data_to_string(file_data) # Returns: "Hello\nWorld" + ``` """ return "\n".join(file_data["content"]) def list_directory(files: dict[str, FileData], path: str) -> list[str]: - """List files in a directory. + """List files in a directory (direct children only). + + Returns only the direct children of the specified directory path, + excluding files in subdirectories. Args: - files: Files dict mapping paths to FileData. - path: Normalized directory path. + files: Dictionary mapping file paths to FileData objects. + path: Normalized directory path to list files from. Returns: - Sorted list of file paths in the directory. + Sorted list of file paths that are direct children of the directory. + + Example: + ```python + files = { + "/dir/file1.txt": FileData(...), + "/dir/file2.txt": FileData(...), + "/dir/subdir/file3.txt": FileData(...), + } + list_directory(files, "/dir") + # Returns: ["/dir/file1.txt", "/dir/file2.txt"] + # Note: /dir/subdir/file3.txt is excluded (not a direct child) + ``` """ # Ensure path ends with / for directory matching dir_path = path if path.endswith("/") else f"{path}/" @@ -234,50 +345,79 @@ def list_directory(files: dict[str, FileData], path: str) -> list[str]: def check_empty_content(content: str) -> str | None: - """Check if file content is empty and return warning message. + """Check if file content is empty and return a warning message. Args: - content: File content. + content: File content to check. Returns: - Warning message if empty, None otherwise. + Warning message string if content is empty or contains only whitespace, + `None` otherwise. + + Example: + ```python + check_empty_content("") # Returns: "System reminder: File exists but has empty contents" + check_empty_content(" ") # Returns: "System reminder: File exists but has empty contents" + check_empty_content("Hello") # Returns: None + ``` """ if not content or content.strip() == "": - return "System reminder: File exists but has empty contents" + return EMPTY_CONTENT_WARNING return None def has_memories_prefix(file_path: str) -> bool: - """Check if file path has the memories prefix. + """Check if a file path is in the longterm memory filesystem. + + Longterm memory files are distinguished by the `/memories/` path prefix. Args: - file_path: File path. + file_path: File path to check. Returns: - True if file path has the memories prefix, False otherwise. + `True` if the file path starts with `/memories/`, `False` otherwise. + + Example: + ```python + has_memories_prefix("/memories/notes.txt") # Returns: True + has_memories_prefix("/temp/file.txt") # Returns: False + ``` """ - return file_path.startswith("/memories/") + return file_path.startswith(MEMORIES_PREFIX) def append_memories_prefix(file_path: str) -> str: - """Append the memories prefix to a file path. + """Add the longterm memory prefix to a file path. Args: - file_path: File path. + file_path: File path to prefix. Returns: - File path with the memories prefix. + File path with `/memories` prepended. + + Example: + ```python + append_memories_prefix("/notes.txt") # Returns: "/memories/notes.txt" + ``` """ return f"/memories{file_path}" def strip_memories_prefix(file_path: str) -> str: - """Strip the memories prefix from a file path. + """Remove the longterm memory prefix from a file path. Args: - file_path: File path. + file_path: File path potentially containing the memories prefix. Returns: - File path without the memories prefix. + File path with `/memories` removed if present at the start. + + Example: + ```python + strip_memories_prefix("/memories/notes.txt") # Returns: "/notes.txt" + strip_memories_prefix("/notes.txt") # Returns: "/notes.txt" + ``` """ - return file_path.replace("/memories", "") + if file_path.startswith(MEMORIES_PREFIX): + return file_path[len(MEMORIES_PREFIX) - 1 :] # Keep the leading slash + return file_path diff --git a/libs/langchain_v1/langchain/agents/middleware/filesystem.py b/libs/langchain_v1/langchain/agents/middleware/filesystem.py index df54d6d4ab5..83d148781de 100644 --- a/libs/langchain_v1/langchain/agents/middleware/filesystem.py +++ b/libs/langchain_v1/langchain/agents/middleware/filesystem.py @@ -37,6 +37,11 @@ from langchain.agents.middleware.types import ( ) from langchain.tools.tool_node import InjectedState +# Constants +LONGTERM_MEMORY_PREFIX = "/memories/" +DEFAULT_READ_OFFSET = 0 +DEFAULT_READ_LIMIT = 2000 + class FilesystemState(AgentState): """State for the filesystem middleware.""" @@ -52,9 +57,7 @@ Usage: - You can optionally provide a path parameter to list files in a specific directory. - This is very useful for exploring the file system and finding the right file to read or edit. - You should almost ALWAYS use this tool before using the Read or Edit tools.""" -LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = ( - "\n- Files from the longterm filesystem will be prefixed with the /memories/ path." -) +LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- Files from the longterm filesystem will be prefixed with the {LONGTERM_MEMORY_PREFIX} path." READ_FILE_TOOL_DESCRIPTION = """Reads a file from the filesystem. You can access any file directly by using this tool. Assume this tool is able to read all files on the machine. If the User provides a path to a file assume that path is valid. It is okay to read a file that does not exist; an error will be returned. @@ -68,9 +71,7 @@ Usage: - You have the capability to call multiple tools in a single response. It is always better to speculatively read multiple files as a batch that are potentially useful. - If you read a file that exists but has empty contents you will receive a system reminder warning in place of file contents. - You should ALWAYS make sure a file has been read before editing it.""" -READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = ( - "\n- file_paths prefixed with the /memories/ path will be read from the longterm filesystem." -) +READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- file_paths prefixed with the {LONGTERM_MEMORY_PREFIX} path will be read from the longterm filesystem." EDIT_FILE_TOOL_DESCRIPTION = """Performs exact string replacements in files. @@ -81,7 +82,7 @@ Usage: - Only use emojis if the user explicitly requests it. Avoid adding emojis to files unless asked. - The edit will FAIL if `old_string` is not unique in the file. Either provide a larger string with more surrounding context to make it unique or use `replace_all` to change every instance of `old_string`. - Use `replace_all` for replacing and renaming strings across the file. This parameter is useful if you want to rename a variable for instance.""" -EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = "\n- You can edit files in the longterm filesystem by prefixing the filename with the /memories/ path." +EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- You can edit files in the longterm filesystem by prefixing the filename with the {LONGTERM_MEMORY_PREFIX} path." WRITE_FILE_TOOL_DESCRIPTION = """Writes to a new file in the filesystem. @@ -91,9 +92,7 @@ Usage: - The write_file tool will create the a new file. - Prefer to edit existing files over creating new ones when possible. - file_paths prefixed with the /memories/ path will be written to the longterm filesystem.""" -WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = ( - "\n- file_paths prefixed with the /memories/ path will be written to the longterm filesystem." -) +WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = f"\n- file_paths prefixed with the {LONGTERM_MEMORY_PREFIX} path will be written to the longterm filesystem." FILESYSTEM_SYSTEM_PROMPT = """## Filesystem Tools `ls`, `read_file`, `write_file`, `edit_file` @@ -104,14 +103,23 @@ All file paths must start with a /. - read_file: read a file from the filesystem - write_file: write to a file in the filesystem - edit_file: edit a file in the filesystem""" -FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT = """ +FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT = f""" You also have access to a longterm filesystem in which you can store files that you want to keep around for longer than the current conversation. -In order to interact with the longterm filesystem, you can use those same tools, but filenames must be prefixed with the /memories/ path. -Remember, to interact with the longterm filesystem, you must prefix the filename with the /memories/ path.""" +In order to interact with the longterm filesystem, you can use those same tools, but filenames must be prefixed with the {LONGTERM_MEMORY_PREFIX} path. +Remember, to interact with the longterm filesystem, you must prefix the filename with the {LONGTERM_MEMORY_PREFIX} path.""" def _get_namespace() -> tuple[str] | tuple[str, str]: + """Get the namespace for longterm filesystem storage. + + Returns a tuple for organizing files in the store. If an assistant_id is available + in the config metadata, returns a 2-tuple of (assistant_id, "filesystem") to provide + per-assistant isolation. Otherwise, returns a 1-tuple of ("filesystem",) for shared storage. + + Returns: + Namespace tuple for store operations, either `(assistant_id, "filesystem")` or `("filesystem",)`. + """ namespace = "filesystem" config = get_config() if config is None: @@ -123,6 +131,17 @@ def _get_namespace() -> tuple[str] | tuple[str, str]: def _get_store(runtime: Runtime[Any]) -> BaseStore: + """Get the store from the runtime, raising an error if unavailable. + + Args: + runtime: The LangGraph runtime containing the store. + + Returns: + The BaseStore instance for longterm file storage. + + Raises: + ValueError: If longterm memory is enabled but no store is available in runtime. + """ if runtime.store is None: msg = "Longterm memory is enabled, but no store is available" raise ValueError(msg) @@ -130,16 +149,27 @@ def _get_store(runtime: Runtime[Any]) -> BaseStore: def _convert_store_item_to_file_data(store_item: Item) -> FileData: + """Convert a store Item to FileData format. + + Args: + store_item: The store Item containing file data. + + Returns: + FileData with content, created_at, and modified_at fields. + + Raises: + ValueError: If required fields are missing or have incorrect types. + """ if "content" not in store_item.value or not isinstance(store_item.value["content"], list): - msg = "Store item does not contain content" + msg = f"Store item does not contain valid content field. Got: {store_item.value.keys()}" raise ValueError(msg) if "created_at" not in store_item.value or not isinstance(store_item.value["created_at"], str): - msg = "Store item does not contain created_at" + msg = f"Store item does not contain valid created_at field. Got: {store_item.value.keys()}" raise ValueError(msg) if "modified_at" not in store_item.value or not isinstance( store_item.value["modified_at"], str ): - msg = "Store item does not contain modified_at" + msg = f"Store item does not contain valid modified_at field. Got: {store_item.value.keys()}" raise ValueError(msg) return FileData( content=store_item.value["content"], @@ -149,6 +179,14 @@ def _convert_store_item_to_file_data(store_item: Item) -> FileData: def _convert_file_data_to_store_item(file_data: FileData) -> dict[str, Any]: + """Convert FileData to a dict suitable for store.put(). + + Args: + file_data: The FileData to convert. + + Returns: + Dictionary with content, created_at, and modified_at fields. + """ return { "content": file_data["content"], "created_at": file_data["created_at"], @@ -157,6 +195,18 @@ def _convert_file_data_to_store_item(file_data: FileData) -> dict[str, Any]: def _get_file_data_from_state(state: FilesystemState, file_path: str) -> FileData: + """Retrieve file data from the agent's state. + + Args: + state: The current filesystem state. + file_path: The path of the file to retrieve. + + Returns: + The FileData for the requested file. + + Raises: + ValueError: If the file is not found in state. + """ mock_filesystem = state.get("files", {}) if file_path not in mock_filesystem: msg = f"File '{file_path}' not found" @@ -165,25 +215,51 @@ def _get_file_data_from_state(state: FilesystemState, file_path: str) -> FileDat def _ls_tool_generator( - custom_description: str | None = None, *, has_longterm_memory: bool + custom_description: str | None = None, *, long_term_memory: bool ) -> BaseTool: + """Generate the ls (list files) tool. + + Args: + custom_description: Optional custom description for the tool. + long_term_memory: Whether to enable longterm memory support. + + Returns: + Configured ls tool that lists files from state and optionally from longterm store. + """ tool_description = LIST_FILES_TOOL_DESCRIPTION if custom_description: tool_description = custom_description - elif has_longterm_memory: + elif long_term_memory: tool_description += LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT def _get_filenames_from_state(state: FilesystemState) -> list[str]: + """Extract list of filenames from the filesystem state. + + Args: + state: The current filesystem state. + + Returns: + List of file paths in the state. + """ files_dict = state.get("files", {}) return list(files_dict.keys()) def _filter_files_by_path(filenames: list[str], path: str | None) -> list[str]: + """Filter filenames by path prefix. + + Args: + filenames: List of file paths to filter. + path: Optional path prefix to filter by. + + Returns: + Filtered list of file paths matching the prefix. + """ if path is None: return filenames normalized_path = validate_path(path) return [f for f in filenames if f.startswith(normalized_path)] - if has_longterm_memory: + if long_term_memory: @tool(description=tool_description) def ls( @@ -211,15 +287,34 @@ def _ls_tool_generator( def _read_file_tool_generator( - custom_description: str | None = None, *, has_longterm_memory: bool + custom_description: str | None = None, *, long_term_memory: bool ) -> BaseTool: + """Generate the read_file tool. + + Args: + custom_description: Optional custom description for the tool. + long_term_memory: Whether to enable longterm memory support. + + Returns: + Configured read_file tool that reads files from state and optionally from longterm store. + """ tool_description = READ_FILE_TOOL_DESCRIPTION if custom_description: tool_description = custom_description - elif has_longterm_memory: + elif long_term_memory: tool_description += READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT def _read_file_data_content(file_data: FileData, offset: int, limit: int) -> str: + """Read and format file content with line numbers. + + Args: + file_data: The file data to read. + offset: Line offset to start reading from (0-indexed). + limit: Maximum number of lines to read. + + Returns: + Formatted file content with line numbers, or an error message. + """ content = file_data_to_string(file_data) empty_msg = check_empty_content(content) if empty_msg: @@ -234,14 +329,14 @@ def _read_file_tool_generator( selected_lines, format_style="tab", start_line=start_idx + 1 ) - if has_longterm_memory: + if long_term_memory: @tool(description=tool_description) def read_file( file_path: str, state: Annotated[FilesystemState, InjectedState], - offset: int = 0, - limit: int = 2000, + offset: int = DEFAULT_READ_OFFSET, + limit: int = DEFAULT_READ_LIMIT, ) -> str: file_path = validate_path(file_path) if has_memories_prefix(file_path): @@ -266,8 +361,8 @@ def _read_file_tool_generator( def read_file( file_path: str, state: Annotated[FilesystemState, InjectedState], - offset: int = 0, - limit: int = 2000, + offset: int = DEFAULT_READ_OFFSET, + limit: int = DEFAULT_READ_LIMIT, ) -> str: file_path = validate_path(file_path) try: @@ -280,17 +375,37 @@ def _read_file_tool_generator( def _write_file_tool_generator( - custom_description: str | None = None, *, has_longterm_memory: bool + custom_description: str | None = None, *, long_term_memory: bool ) -> BaseTool: + """Generate the write_file tool. + + Args: + custom_description: Optional custom description for the tool. + long_term_memory: Whether to enable longterm memory support. + + Returns: + Configured write_file tool that creates new files in state or longterm store. + """ tool_description = WRITE_FILE_TOOL_DESCRIPTION if custom_description: tool_description = custom_description - elif has_longterm_memory: + elif long_term_memory: tool_description += WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT def _write_file_to_state( state: FilesystemState, tool_call_id: str, file_path: str, content: str ) -> Command | str: + """Write a new file to the filesystem state. + + Args: + state: The current filesystem state. + tool_call_id: ID of the tool call for generating ToolMessage. + file_path: The path where the file should be written. + content: The content to write to the file. + + Returns: + Command to update state with new file, or error string if file exists. + """ mock_filesystem = state.get("files", {}) existing = mock_filesystem.get(file_path) if existing: @@ -303,7 +418,7 @@ def _write_file_tool_generator( } ) - if has_longterm_memory: + if long_term_memory: @tool(description=tool_description) def write_file( @@ -343,15 +458,54 @@ def _write_file_tool_generator( def _edit_file_tool_generator( - custom_description: str | None = None, *, has_longterm_memory: bool + custom_description: str | None = None, *, long_term_memory: bool ) -> BaseTool: + """Generate the edit_file tool. + + Args: + custom_description: Optional custom description for the tool. + long_term_memory: Whether to enable longterm memory support. + + Returns: + Configured edit_file tool that performs string replacements in files. + """ tool_description = EDIT_FILE_TOOL_DESCRIPTION if custom_description: tool_description = custom_description - elif has_longterm_memory: + elif long_term_memory: tool_description += EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT - if has_longterm_memory: + def _perform_file_edit( + file_data: FileData, + old_string: str, + new_string: str, + *, + replace_all: bool = False, + ) -> tuple[FileData, str] | str: + """Perform string replacement on file data. + + Args: + file_data: The file data to edit. + old_string: String to find and replace. + new_string: Replacement string. + replace_all: If True, replace all occurrences. + + Returns: + Tuple of (updated_file_data, success_message) on success, + or error string on failure. + """ + content = file_data_to_string(file_data) + occurrences = content.count(old_string) + if occurrences == 0: + return f"Error: String not found in file: '{old_string}'" + if occurrences > 1 and not replace_all: + return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context." + new_content = content.replace(old_string, new_string) + new_file_data = update_file_data(file_data, new_content) + result_msg = f"Successfully replaced {occurrences} instance(s) of the string" + return new_file_data, result_msg + + if long_term_memory: @tool(description=tool_description) def edit_file( @@ -365,6 +519,8 @@ def _edit_file_tool_generator( ) -> Command | str: file_path = validate_path(file_path) is_longterm_memory = has_memories_prefix(file_path) + + # Retrieve file data from appropriate storage if is_longterm_memory: stripped_file_path = strip_memories_prefix(file_path) runtime = get_runtime() @@ -380,26 +536,25 @@ def _edit_file_tool_generator( except ValueError as e: return str(e) - content = file_data_to_string(file_data) - occurrences = content.count(old_string) - if occurrences == 0: - return f"Error: String not found in file: '{old_string}'" - if occurrences > 1 and not replace_all: - return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context." - new_content = content.replace(old_string, new_string) - new_file_data = update_file_data(file_data, new_content) - result_msg = ( - f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'" - ) + # Perform the edit + result = _perform_file_edit(file_data, old_string, new_string, replace_all=replace_all) + if isinstance(result, str): # Error message + return result + + new_file_data, result_msg = result + full_msg = f"{result_msg} in '{file_path}'" + + # Save to appropriate storage if is_longterm_memory: store.put( namespace, stripped_file_path, _convert_file_data_to_store_item(new_file_data) ) - return result_msg + return full_msg + return Command( update={ "files": {file_path: new_file_data}, - "messages": [ToolMessage(result_msg, tool_call_id=tool_call_id)], + "messages": [ToolMessage(full_msg, tool_call_id=tool_call_id)], } ) else: @@ -415,25 +570,25 @@ def _edit_file_tool_generator( replace_all: bool = False, ) -> Command | str: file_path = validate_path(file_path) + + # Retrieve file data from state try: file_data = _get_file_data_from_state(state, file_path) except ValueError as e: return str(e) - content = file_data_to_string(file_data) - occurrences = content.count(old_string) - if occurrences == 0: - return f"Error: String not found in file: '{old_string}'" - if occurrences > 1 and not replace_all: - return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context." - new_content = content.replace(old_string, new_string) - new_file_data = update_file_data(file_data, new_content) - result_msg = ( - f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'" - ) + + # Perform the edit + result = _perform_file_edit(file_data, old_string, new_string, replace_all=replace_all) + if isinstance(result, str): # Error message + return result + + new_file_data, result_msg = result + full_msg = f"{result_msg} in '{file_path}'" + return Command( update={ "files": {file_path: new_file_data}, - "messages": [ToolMessage(result_msg, tool_call_id=tool_call_id)], + "messages": [ToolMessage(full_msg, tool_call_id=tool_call_id)], } ) @@ -449,23 +604,23 @@ TOOL_GENERATORS = { def _get_filesystem_tools( - custom_tool_descriptions: dict[str, str] | None = None, *, has_longterm_memory: bool + custom_tool_descriptions: dict[str, str] | None = None, *, long_term_memory: bool ) -> list[BaseTool]: """Get filesystem tools. Args: - has_longterm_memory: Whether to enable longterm memory support. custom_tool_descriptions: Optional custom descriptions for tools. + long_term_memory: Whether to enable longterm memory support. Returns: - List of configured filesystem tools. + List of configured filesystem tools (ls, read_file, write_file, edit_file). """ if custom_tool_descriptions is None: custom_tool_descriptions = {} tools = [] for tool_name, tool_generator in TOOL_GENERATORS.items(): tool = tool_generator( - custom_tool_descriptions.get(tool_name), has_longterm_memory=has_longterm_memory + custom_tool_descriptions.get(tool_name), long_term_memory=long_term_memory ) tools.append(tool) return tools @@ -474,13 +629,15 @@ def _get_filesystem_tools( class FilesystemMiddleware(AgentMiddleware): """Middleware for providing filesystem tools to an agent. - Args: - use_longterm_memory: Whether to enable longterm memory support. - system_prompt_extension: Optional custom system prompt. - custom_tool_descriptions: Optional custom tool descriptions. + This middleware adds four filesystem tools to the agent: ls, read_file, write_file, + and edit_file. Files can be stored in two locations: + - Short-term: In the agent's state (ephemeral, lasts only for the conversation) + - Long-term: In a persistent store (persists across conversations when enabled) - Returns: - List of configured filesystem tools. + Args: + long_term_memory: Whether to enable longterm memory support. + system_prompt_extension: Optional custom system prompt override. + custom_tool_descriptions: Optional custom tool descriptions override. Raises: ValueError: If longterm memory is enabled but no store is available. @@ -490,7 +647,11 @@ class FilesystemMiddleware(AgentMiddleware): from langchain.agents.middleware.filesystem import FilesystemMiddleware from langchain.agents import create_agent - agent = create_agent(middleware=[FilesystemMiddleware(use_longterm_memory=False)]) + # Short-term memory only + agent = create_agent(middleware=[FilesystemMiddleware(long_term_memory=False)]) + + # With long-term memory + agent = create_agent(middleware=[FilesystemMiddleware(long_term_memory=True)]) ``` """ @@ -499,31 +660,42 @@ class FilesystemMiddleware(AgentMiddleware): def __init__( self, *, - use_longterm_memory: bool = False, + long_term_memory: bool = False, system_prompt_extension: str | None = None, custom_tool_descriptions: dict[str, str] | None = None, ) -> None: """Initialize the filesystem middleware. Args: - use_longterm_memory: Whether to enable longterm memory support. - system_prompt_extension: Optional custom system prompt. - custom_tool_descriptions: Optional custom tool descriptions. + long_term_memory: Whether to enable longterm memory support. + system_prompt_extension: Optional custom system prompt override. + custom_tool_descriptions: Optional custom tool descriptions override. """ - self.use_longterm_memory = use_longterm_memory + self.long_term_memory = long_term_memory self.system_prompt_extension = FILESYSTEM_SYSTEM_PROMPT if system_prompt_extension is not None: self.system_prompt_extension = system_prompt_extension - elif use_longterm_memory: + elif long_term_memory: self.system_prompt_extension += FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT self.tools = _get_filesystem_tools( - custom_tool_descriptions, has_longterm_memory=use_longterm_memory + custom_tool_descriptions, long_term_memory=long_term_memory ) def before_model_call(self, request: ModelRequest, runtime: Runtime[Any]) -> ModelRequest: - """If use_longterm_memory is True, we must have a store available.""" - if self.use_longterm_memory and runtime.store is None: + """Validate that store is available if longterm memory is enabled. + + Args: + request: The model request being processed. + runtime: The LangGraph runtime. + + Returns: + The unmodified model request. + + Raises: + ValueError: If long_term_memory is True but runtime.store is None. + """ + if self.long_term_memory and runtime.store is None: msg = "Longterm memory is enabled, but no store is available" raise ValueError(msg) return request @@ -533,7 +705,15 @@ class FilesystemMiddleware(AgentMiddleware): request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse], ) -> ModelResponse: - """Update the system prompt to include instructions on using the filesystem.""" + """Update the system prompt to include instructions on using the filesystem. + + Args: + request: The model request being processed. + handler: The handler function to call with the modified request. + + Returns: + The model response from the handler. + """ if self.system_prompt_extension is not None: request.system_prompt = ( request.system_prompt + "\n\n" + self.system_prompt_extension diff --git a/libs/langchain_v1/tests/unit_tests/agents/middleware/test_filesystem_middleware.py b/libs/langchain_v1/tests/unit_tests/agents/middleware/test_filesystem_middleware.py index b958c39928c..f5e7a77aade 100644 --- a/libs/langchain_v1/tests/unit_tests/agents/middleware/test_filesystem_middleware.py +++ b/libs/langchain_v1/tests/unit_tests/agents/middleware/test_filesystem_middleware.py @@ -11,14 +11,14 @@ from langgraph.runtime import Runtime class TestFilesystem: def test_init_local(self): - middleware = FilesystemMiddleware(use_longterm_memory=False) - assert middleware.use_longterm_memory is False + middleware = FilesystemMiddleware(long_term_memory=False) + assert middleware.long_term_memory is False assert middleware.system_prompt_extension == FILESYSTEM_SYSTEM_PROMPT assert len(middleware.tools) == 4 def test_init_longterm(self): - middleware = FilesystemMiddleware(use_longterm_memory=True) - assert middleware.use_longterm_memory is True + middleware = FilesystemMiddleware(long_term_memory=True) + assert middleware.long_term_memory is True assert middleware.system_prompt_extension == ( FILESYSTEM_SYSTEM_PROMPT + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT ) @@ -26,34 +26,34 @@ class TestFilesystem: def test_init_custom_system_prompt_shortterm(self): middleware = FilesystemMiddleware( - use_longterm_memory=False, system_prompt_extension="Custom system prompt" + long_term_memory=False, system_prompt_extension="Custom system prompt" ) - assert middleware.use_longterm_memory is False + assert middleware.long_term_memory is False assert middleware.system_prompt_extension == "Custom system prompt" assert len(middleware.tools) == 4 def test_init_custom_system_prompt_longterm(self): middleware = FilesystemMiddleware( - use_longterm_memory=True, system_prompt_extension="Custom system prompt" + long_term_memory=True, system_prompt_extension="Custom system prompt" ) - assert middleware.use_longterm_memory is True + assert middleware.long_term_memory is True assert middleware.system_prompt_extension == "Custom system prompt" assert len(middleware.tools) == 4 def test_init_custom_tool_descriptions_shortterm(self): middleware = FilesystemMiddleware( - use_longterm_memory=False, custom_tool_descriptions={"ls": "Custom ls tool description"} + long_term_memory=False, custom_tool_descriptions={"ls": "Custom ls tool description"} ) - assert middleware.use_longterm_memory is False + assert middleware.long_term_memory is False assert middleware.system_prompt_extension == FILESYSTEM_SYSTEM_PROMPT ls_tool = next(tool for tool in middleware.tools if tool.name == "ls") assert ls_tool.description == "Custom ls tool description" def test_init_custom_tool_descriptions_longterm(self): middleware = FilesystemMiddleware( - use_longterm_memory=True, custom_tool_descriptions={"ls": "Custom ls tool description"} + long_term_memory=True, custom_tool_descriptions={"ls": "Custom ls tool description"} ) - assert middleware.use_longterm_memory is True + assert middleware.long_term_memory is True assert middleware.system_prompt_extension == ( FILESYSTEM_SYSTEM_PROMPT + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT ) @@ -76,7 +76,7 @@ class TestFilesystem: ), }, ) - middleware = FilesystemMiddleware(use_longterm_memory=False) + middleware = FilesystemMiddleware(long_term_memory=False) ls_tool = next(tool for tool in middleware.tools if tool.name == "ls") result = ls_tool.invoke({"state": state}) assert result == ["test.txt", "test2.txt"] @@ -107,7 +107,7 @@ class TestFilesystem: ), }, ) - middleware = FilesystemMiddleware(use_longterm_memory=False) + middleware = FilesystemMiddleware(long_term_memory=False) ls_tool = next(tool for tool in middleware.tools if tool.name == "ls") result = ls_tool.invoke({"state": state, "path": "pokemon/"}) assert "/pokemon/test2.txt" in result