core[path]: Use context manager for FileCallbackHandler (#31813)

Recommend using context manager for FileCallbackHandler to avoid opening
too many file descriptors

---------

Co-authored-by: Mason Daugherty <github@mdrxy.com>
This commit is contained in:
Eugene Yurtsev 2025-07-02 13:31:58 -04:00 committed by GitHub
parent 377e5f5204
commit 73fefe0295
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 198 additions and 77 deletions

View File

@ -243,9 +243,10 @@ class CallbackManagerMixin:
) -> Any: ) -> Any:
"""Run when LLM starts running. """Run when LLM starts running.
**ATTENTION**: This method is called for non-chat models (regular LLMs). If .. ATTENTION::
you're implementing a handler for a chat model, This method is called for non-chat models (regular LLMs). If you're
you should use on_chat_model_start instead. implementing a handler for a chat model, you should use
``on_chat_model_start`` instead.
Args: Args:
serialized (dict[str, Any]): The serialized LLM. serialized (dict[str, Any]): The serialized LLM.
@ -271,7 +272,7 @@ class CallbackManagerMixin:
"""Run when a chat model starts running. """Run when a chat model starts running.
**ATTENTION**: This method is called for chat models. If you're implementing **ATTENTION**: This method is called for chat models. If you're implementing
a handler for a non-chat model, you should use on_llm_start instead. a handler for a non-chat model, you should use ``on_llm_start`` instead.
Args: Args:
serialized (dict[str, Any]): The serialized chat model. serialized (dict[str, Any]): The serialized chat model.
@ -490,9 +491,10 @@ class AsyncCallbackHandler(BaseCallbackHandler):
) -> None: ) -> None:
"""Run when LLM starts running. """Run when LLM starts running.
**ATTENTION**: This method is called for non-chat models (regular LLMs). If .. ATTENTION::
you're implementing a handler for a chat model, This method is called for non-chat models (regular LLMs). If you're
you should use on_chat_model_start instead. implementing a handler for a chat model, you should use
``on_chat_model_start`` instead.
Args: Args:
serialized (dict[str, Any]): The serialized LLM. serialized (dict[str, Any]): The serialized LLM.
@ -518,7 +520,7 @@ class AsyncCallbackHandler(BaseCallbackHandler):
"""Run when a chat model starts running. """Run when a chat model starts running.
**ATTENTION**: This method is called for chat models. If you're implementing **ATTENTION**: This method is called for chat models. If you're implementing
a handler for a non-chat model, you should use on_llm_start instead. a handler for a non-chat model, you should use ``on_llm_start`` instead.
Args: Args:
serialized (dict[str, Any]): The serialized chat model. serialized (dict[str, Any]): The serialized chat model.

View File

@ -5,8 +5,9 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, TextIO, cast from typing import TYPE_CHECKING, Any, Optional, TextIO, cast
from typing_extensions import override from typing_extensions import Self, override
from langchain_core._api import warn_deprecated
from langchain_core.callbacks import BaseCallbackHandler from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.utils.input import print_text from langchain_core.utils.input import print_text
@ -14,78 +15,184 @@ if TYPE_CHECKING:
from langchain_core.agents import AgentAction, AgentFinish from langchain_core.agents import AgentAction, AgentFinish
_GLOBAL_DEPRECATION_WARNED = False
class FileCallbackHandler(BaseCallbackHandler): class FileCallbackHandler(BaseCallbackHandler):
"""Callback Handler that writes to a file. """Callback Handler that writes to a file.
Parameters: This handler supports both context manager usage (recommended) and direct
filename: The file to write to. instantiation (deprecated) for backwards compatibility.
mode: The mode to open the file in. Defaults to "a".
color: The color to use for the text. Examples:
Using as a context manager (recommended):
.. code-block:: python
with FileCallbackHandler("output.txt") as handler:
# Use handler with your chain/agent
chain.invoke(inputs, config={"callbacks": [handler]})
Direct instantiation (deprecated):
.. code-block:: python
handler = FileCallbackHandler("output.txt")
# File remains open until handler is garbage collected
try:
chain.invoke(inputs, config={"callbacks": [handler]})
finally:
handler.close() # Explicit cleanup recommended
Args:
filename: The file path to write to.
mode: The file open mode. Defaults to ``'a'`` (append).
color: Default color for text output. Defaults to ``None``.
Note:
When not used as a context manager, a deprecation warning will be issued
on first use. The file will be opened immediately in ``__init__`` and closed
in ``__del__`` or when ``close()`` is called explicitly.
""" """
def __init__( def __init__(
self, filename: str, mode: str = "a", color: Optional[str] = None self, filename: str, mode: str = "a", color: Optional[str] = None
) -> None: ) -> None:
"""Initialize callback handler. """Initialize the file callback handler.
Args: Args:
filename: The filename to write to. filename: Path to the output file.
mode: The mode to open the file in. Defaults to "a". mode: File open mode (e.g., ``'w'``, ``'a'``, ``'x'``). Defaults to ``'a'``.
color: The color to use for the text. Defaults to None. color: Default text color for output. Defaults to ``None``.
""" """
self.file = cast("TextIO", Path(filename).open(mode, encoding="utf-8")) # noqa: SIM115 self.filename = filename
self.mode = mode
self.color = color self.color = color
self._file_opened_in_context = False
self.file: TextIO = cast(
"TextIO",
# Open the file in the specified mode with UTF-8 encoding.
Path(self.filename).open(self.mode, encoding="utf-8"), # noqa: SIM115
)
def __enter__(self) -> Self:
"""Enter the context manager.
Returns:
The FileCallbackHandler instance.
Note:
The file is already opened in ``__init__``, so this just marks that
the handler is being used as a context manager.
"""
self._file_opened_in_context = True
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: object,
) -> None:
"""Exit the context manager and close the file.
Args:
exc_type: Exception type if an exception occurred.
exc_val: Exception value if an exception occurred.
exc_tb: Exception traceback if an exception occurred.
"""
self.close()
def __del__(self) -> None: def __del__(self) -> None:
"""Destructor to cleanup when done.""" """Destructor to cleanup when done."""
self.close()
def close(self) -> None:
"""Close the file if it's open.
This method is safe to call multiple times and will only close
the file if it's currently open.
"""
if hasattr(self, "file") and self.file and not self.file.closed:
self.file.close() self.file.close()
def _write(
self,
text: str,
color: Optional[str] = None,
end: str = "",
) -> None:
"""Write text to the file with deprecation warning if needed.
Args:
text: The text to write to the file.
color: Optional color for the text. Defaults to ``self.color``.
end: String appended after the text. Defaults to ``""``.
file: Optional file to write to. Defaults to ``self.file``.
Raises:
RuntimeError: If the file is closed or not available.
"""
global _GLOBAL_DEPRECATION_WARNED # noqa: PLW0603
if not self._file_opened_in_context and not _GLOBAL_DEPRECATION_WARNED:
warn_deprecated(
since="0.3.67",
pending=True,
message=(
"Using FileCallbackHandler without a context manager is "
"deprecated. Use 'with FileCallbackHandler(...) as "
"handler:' instead."
),
)
_GLOBAL_DEPRECATION_WARNED = True
if not hasattr(self, "file") or self.file is None or self.file.closed:
msg = "File is not open. Use FileCallbackHandler as a context manager."
raise RuntimeError(msg)
print_text(text, file=self.file, color=color, end=end)
@override @override
def on_chain_start( def on_chain_start(
self, serialized: dict[str, Any], inputs: dict[str, Any], **kwargs: Any self, serialized: dict[str, Any], inputs: dict[str, Any], **kwargs: Any
) -> None: ) -> None:
"""Print out that we are entering a chain. """Print that we are entering a chain.
Args: Args:
serialized (dict[str, Any]): The serialized chain. serialized: The serialized chain information.
inputs (dict[str, Any]): The inputs to the chain. inputs: The inputs to the chain.
**kwargs (Any): Additional keyword arguments. **kwargs: Additional keyword arguments that may contain ``'name'``.
""" """
if "name" in kwargs: name = (
name = kwargs["name"] kwargs.get("name")
elif serialized: or serialized.get("name", serialized.get("id", ["<unknown>"])[-1])
name = serialized.get("name", serialized.get("id", ["<unknown>"])[-1]) or "<unknown>"
else:
name = "<unknown>"
print_text(
f"\n\n\033[1m> Entering new {name} chain...\033[0m",
end="\n",
file=self.file,
) )
self._write(f"\n\n> Entering new {name} chain...", end="\n")
@override @override
def on_chain_end(self, outputs: dict[str, Any], **kwargs: Any) -> None: def on_chain_end(self, outputs: dict[str, Any], **kwargs: Any) -> None:
"""Print out that we finished a chain. """Print that we finished a chain.
Args: Args:
outputs (dict[str, Any]): The outputs of the chain. outputs: The outputs of the chain.
**kwargs (Any): Additional keyword arguments. **kwargs: Additional keyword arguments.
""" """
print_text("\n\033[1m> Finished chain.\033[0m", end="\n", file=self.file) self._write("\n> Finished chain.", end="\n")
@override @override
def on_agent_action( def on_agent_action(
self, action: AgentAction, color: Optional[str] = None, **kwargs: Any self, action: AgentAction, color: Optional[str] = None, **kwargs: Any
) -> Any: ) -> Any:
"""Run on agent action. """Handle agent action by writing the action log.
Args: Args:
action (AgentAction): The agent action. action: The agent action containing the log to write.
color (Optional[str], optional): The color to use for the text. color: Color override for this specific output. If ``None``, uses
Defaults to None. ``self.color``.
**kwargs (Any): Additional keyword arguments. **kwargs: Additional keyword arguments.
""" """
print_text(action.log, color=color or self.color, file=self.file) self._write(action.log, color=color or self.color)
@override @override
def on_tool_end( def on_tool_end(
@ -96,49 +203,47 @@ class FileCallbackHandler(BaseCallbackHandler):
llm_prefix: Optional[str] = None, llm_prefix: Optional[str] = None,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
"""If not the final action, print out observation. """Handle tool end by writing the output with optional prefixes.
Args: Args:
output (str): The output to print. output: The tool output to write.
color (Optional[str], optional): The color to use for the text. color: Color override for this specific output. If ``None``, uses
Defaults to None. ``self.color``.
observation_prefix (Optional[str], optional): The observation prefix. observation_prefix: Optional prefix to write before the output.
Defaults to None. llm_prefix: Optional prefix to write after the output.
llm_prefix (Optional[str], optional): The LLM prefix. **kwargs: Additional keyword arguments.
Defaults to None.
**kwargs (Any): Additional keyword arguments.
""" """
if observation_prefix is not None: if observation_prefix is not None:
print_text(f"\n{observation_prefix}", file=self.file) self._write(f"\n{observation_prefix}")
print_text(output, color=color or self.color, file=self.file) self._write(output)
if llm_prefix is not None: if llm_prefix is not None:
print_text(f"\n{llm_prefix}", file=self.file) self._write(f"\n{llm_prefix}")
@override @override
def on_text( def on_text(
self, text: str, color: Optional[str] = None, end: str = "", **kwargs: Any self, text: str, color: Optional[str] = None, end: str = "", **kwargs: Any
) -> None: ) -> None:
"""Run when the agent ends. """Handle text output.
Args: Args:
text (str): The text to print. text: The text to write.
color (Optional[str], optional): The color to use for the text. color: Color override for this specific output. If ``None``, uses
Defaults to None. ``self.color``.
end (str, optional): The end character. Defaults to "". end: String appended after the text. Defaults to ``""``.
**kwargs (Any): Additional keyword arguments. **kwargs: Additional keyword arguments.
""" """
print_text(text, color=color or self.color, end=end, file=self.file) self._write(text, color=color or self.color, end=end)
@override @override
def on_agent_finish( def on_agent_finish(
self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any
) -> None: ) -> None:
"""Run on the agent end. """Handle agent finish by writing the finish log.
Args: Args:
finish (AgentFinish): The agent finish. finish: The agent finish object containing the log to write.
color (Optional[str], optional): The color to use for the text. color: Color override for this specific output. If ``None``, uses
Defaults to None. ``self.color``.
**kwargs (Any): Additional keyword arguments. **kwargs: Additional keyword arguments.
""" """
print_text(finish.log, color=color or self.color, end="\n", file=self.file) self._write(finish.log, color=color or self.color, end="\n")

View File

@ -1,7 +1,6 @@
import pathlib import pathlib
from typing import Any, Optional from typing import Optional
import pytest
from langchain_core.callbacks import CallbackManagerForChainRun from langchain_core.callbacks import CallbackManagerForChainRun
from langchain.callbacks import FileCallbackHandler from langchain.callbacks import FileCallbackHandler
@ -33,14 +32,29 @@ class FakeChain(Chain):
return {"bar": "bar"} return {"bar": "bar"}
def test_filecallback(capsys: pytest.CaptureFixture, tmp_path: pathlib.Path) -> Any: def test_filecallback(tmp_path: pathlib.Path) -> None:
"""Test the file callback handler.""" """Test the file callback handler."""
p = tmp_path / "output.log" log1 = tmp_path / "output.log"
handler = FileCallbackHandler(str(p)) handler = FileCallbackHandler(str(log1))
chain_test = FakeChain(callbacks=[handler]) chain_test = FakeChain(callbacks=[handler])
chain_test.invoke({"foo": "bar"}) chain_test.invoke({"foo": "bar"})
handler.close()
# Assert the output is as expected # Assert the output is as expected
assert p.read_text() == ( assert "Entering new FakeChain chain" in log1.read_text()
"\n\n\x1b[1m> Entering new FakeChain "
"chain...\x1b[0m\n\n\x1b[1m> Finished chain.\x1b[0m\n" # Test using a callback manager
) log2 = tmp_path / "output2.log"
with FileCallbackHandler(str(log2)) as handler_cm:
chain_test = FakeChain(callbacks=[handler_cm])
chain_test.invoke({"foo": "bar"})
assert "Entering new FakeChain chain" in log2.read_text()
# Test passing via invoke callbacks
log3 = tmp_path / "output3.log"
with FileCallbackHandler(str(log3)) as handler_cm:
chain_test.invoke({"foo": "bar"}, {"callbacks": [handler_cm]})
assert "Entering new FakeChain chain" in log3.read_text()