feat(langchain): add ruff rules PTH (#32008)

See https://docs.astral.sh/ruff/rules/#flake8-use-pathlib-pth
This commit is contained in:
Christophe Bornet 2025-07-14 16:41:37 +02:00 committed by GitHub
parent fd168e1c11
commit 58d4261875
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 41 additions and 47 deletions

View File

@ -209,10 +209,10 @@ class BaseSingleActionAgent(BaseModel):
raise NotImplementedError(msg)
if save_path.suffix == ".json":
with open(file_path, "w") as f:
with save_path.open("w") as f:
json.dump(agent_dict, f, indent=4)
elif save_path.suffix.endswith((".yaml", ".yml")):
with open(file_path, "w") as f:
with save_path.open("w") as f:
yaml.dump(agent_dict, f, default_flow_style=False)
else:
msg = f"{save_path} must be json or yaml"
@ -352,10 +352,10 @@ class BaseMultiActionAgent(BaseModel):
directory_path.mkdir(parents=True, exist_ok=True)
if save_path.suffix == ".json":
with open(file_path, "w") as f:
with save_path.open("w") as f:
json.dump(agent_dict, f, indent=4)
elif save_path.suffix.endswith((".yaml", ".yml")):
with open(file_path, "w") as f:
with save_path.open("w") as f:
yaml.dump(agent_dict, f, default_flow_style=False)
else:
msg = f"{save_path} must be json or yaml"

View File

@ -136,10 +136,10 @@ def _load_agent_from_file(
file_path = Path(file) if isinstance(file, str) else file
# Load from either json or yaml.
if file_path.suffix[1:] == "json":
with open(file_path) as f:
with file_path.open() as f:
config = json.load(f)
elif file_path.suffix[1:] == "yaml":
with open(file_path) as f:
with file_path.open() as f:
config = yaml.safe_load(f)
else:
msg = f"Unsupported file type, must be one of {valid_suffixes}."

View File

@ -781,10 +781,10 @@ class Chain(RunnableSerializable[dict[str, Any], dict[str, Any]], ABC):
directory_path.mkdir(parents=True, exist_ok=True)
if save_path.suffix == ".json":
with open(file_path, "w") as f:
with save_path.open("w") as f:
json.dump(chain_dict, f, indent=4)
elif save_path.suffix.endswith((".yaml", ".yml")):
with open(file_path, "w") as f:
with save_path.open("w") as f:
yaml.dump(chain_dict, f, default_flow_style=False)
else:
msg = f"{save_path} must be json or yaml"

View File

@ -713,10 +713,10 @@ def _load_chain_from_file(file: Union[str, Path], **kwargs: Any) -> Chain:
file_path = Path(file) if isinstance(file, str) else file
# Load from either json or yaml.
if file_path.suffix == ".json":
with open(file_path) as f:
with file_path.open() as f:
config = json.load(f)
elif file_path.suffix.endswith((".yaml", ".yml")):
with open(file_path) as f:
with file_path.open() as f:
config = yaml.safe_load(f)
else:
msg = "File type must be json or yaml"

View File

@ -77,16 +77,17 @@ class LocalFileStore(ByteStore):
if not re.match(r"^[a-zA-Z0-9_.\-/]+$", key):
msg = f"Invalid characters in key: {key}"
raise InvalidKeyException(msg)
full_path = os.path.abspath(self.root_path / key)
common_path = os.path.commonpath([str(self.root_path), full_path])
if common_path != str(self.root_path):
full_path = (self.root_path / key).resolve()
root_path = self.root_path.resolve()
common_path = os.path.commonpath([root_path, full_path])
if common_path != str(root_path):
msg = (
f"Invalid key: {key}. Key should be relative to the full path."
f"{self.root_path} vs. {common_path} and full path of {full_path}"
f"Invalid key: {key}. Key should be relative to the full path. "
f"{root_path} vs. {common_path} and full path of {full_path}"
)
raise InvalidKeyException(msg)
return Path(full_path)
return full_path
def _mkdir_for_store(self, dir_path: Path) -> None:
"""Makes a store directory path (including parents) with specified permissions
@ -104,7 +105,7 @@ class LocalFileStore(ByteStore):
self._mkdir_for_store(dir_path.parent)
dir_path.mkdir(exist_ok=True)
if self.chmod_dir is not None:
os.chmod(dir_path, self.chmod_dir)
dir_path.chmod(self.chmod_dir)
def mget(self, keys: Sequence[str]) -> list[Optional[bytes]]:
"""Get the values associated with the given keys.
@ -124,7 +125,7 @@ class LocalFileStore(ByteStore):
values.append(value)
if self.update_atime:
# update access time only; preserve modified time
os.utime(full_path, (time.time(), os.stat(full_path).st_mtime))
os.utime(full_path, (time.time(), full_path.stat().st_mtime))
else:
values.append(None)
return values
@ -143,7 +144,7 @@ class LocalFileStore(ByteStore):
self._mkdir_for_store(full_path.parent)
full_path.write_bytes(value)
if self.chmod_file is not None:
os.chmod(full_path, self.chmod_file)
full_path.chmod(self.chmod_file)
def mdelete(self, keys: Sequence[str]) -> None:
"""Delete the given keys and their associated values.

View File

@ -165,6 +165,7 @@ select = [
"PGH", # pygrep-hooks
"PIE", # flake8-pie
"PERF", # flake8-perf
"PTH", # flake8-use-pathlib
"PYI", # flake8-pyi
"Q", # flake8-quotes
"RET", # flake8-return

View File

@ -1,19 +1,18 @@
import os
from pathlib import Path
import pytest
# Getting the absolute path of the current file's directory
ABS_PATH = os.path.dirname(os.path.abspath(__file__))
ABS_PATH = Path(__file__).resolve().parent
# Getting the absolute path of the project's root directory
PROJECT_DIR = os.path.abspath(os.path.join(ABS_PATH, os.pardir, os.pardir))
PROJECT_DIR = ABS_PATH.parent.parent
# Loading the .env file if it exists
def _load_env() -> None:
dotenv_path = os.path.join(PROJECT_DIR, "tests", "integration_tests", ".env")
if os.path.exists(dotenv_path):
dotenv_path = PROJECT_DIR / "tests" / "integration_tests" / ".env"
if dotenv_path.exists():
from dotenv import load_dotenv
load_dotenv(dotenv_path)
@ -24,15 +23,12 @@ _load_env()
@pytest.fixture(scope="module")
def test_dir() -> Path:
return Path(os.path.join(PROJECT_DIR, "tests", "integration_tests"))
return PROJECT_DIR / "tests" / "integration_tests"
# This fixture returns a string containing the path to the cassette directory for the
# current module
@pytest.fixture(scope="module")
def vcr_cassette_dir(request: pytest.FixtureRequest) -> str:
return os.path.join(
os.path.dirname(request.module.__file__),
"cassettes",
os.path.basename(request.module.__file__).replace(".py", ""),
)
module = Path(request.module.__file__)
return str(module.parent / "cassettes" / module.stem)

View File

@ -1,6 +1,6 @@
import os
import tempfile
from collections.abc import Generator
from pathlib import Path
import pytest
from langchain_core.stores import InvalidKeyException
@ -41,9 +41,8 @@ def test_mset_chmod(chmod_dir_s: str, chmod_file_s: str) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
# Instantiate the LocalFileStore with a directory inside the temporary directory
# as the root path
temp_dir = os.path.join(temp_dir, "store_dir")
file_store = LocalFileStore(
temp_dir,
Path(temp_dir) / "store_dir",
chmod_dir=chmod_dir,
chmod_file=chmod_file,
)
@ -54,10 +53,10 @@ def test_mset_chmod(chmod_dir_s: str, chmod_file_s: str) -> None:
# verify the permissions are set correctly
# (test only the standard user/group/other bits)
dir_path = str(file_store.root_path)
file_path = os.path.join(dir_path, "key1")
assert (os.stat(dir_path).st_mode & 0o777) == chmod_dir
assert (os.stat(file_path).st_mode & 0o777) == chmod_file
dir_path = file_store.root_path
file_path = file_store.root_path / "key1"
assert (dir_path.stat().st_mode & 0o777) == chmod_dir
assert (file_path.stat().st_mode & 0o777) == chmod_file
def test_mget_update_atime() -> None:
@ -65,23 +64,21 @@ def test_mget_update_atime() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
# Instantiate the LocalFileStore with a directory inside the temporary directory
# as the root path
temp_dir = os.path.join(temp_dir, "store_dir")
file_store = LocalFileStore(temp_dir, update_atime=True)
file_store = LocalFileStore(Path(temp_dir) / "store_dir", update_atime=True)
# Set values for keys
key_value_pairs = [("key1", b"value1"), ("key2", b"value2")]
file_store.mset(key_value_pairs)
# Get original access time
dir_path = str(file_store.root_path)
file_path = os.path.join(dir_path, "key1")
atime1 = os.stat(file_path).st_atime
file_path = file_store.root_path / "key1"
atime1 = file_path.stat().st_atime
# Get values for keys
_ = file_store.mget(["key1", "key2"])
# Make sure the filesystem access time has been updated
atime2 = os.stat(file_path).st_atime
atime2 = file_path.stat().st_atime
assert atime2 != atime1
@ -131,7 +128,7 @@ def test_yield_keys(file_store: LocalFileStore) -> None:
keys = list(file_store.yield_keys())
# Assert that the yielded keys match the expected keys
expected_keys = ["key1", os.path.join("subdir", "key2")]
expected_keys = ["key1", str(Path("subdir") / "key2")]
assert keys == expected_keys

View File

@ -16,7 +16,7 @@ PYPROJECT_TOML = HERE / "../../pyproject.toml"
@pytest.fixture
def uv_conf() -> dict[str, Any]:
"""Load the pyproject.toml file."""
with open(PYPROJECT_TOML) as f:
with PYPROJECT_TOML.open() as f:
return toml.load(f)

View File

@ -118,8 +118,7 @@ def extract_deprecated_lookup(file_path: str) -> Optional[dict[str, Any]]:
Returns:
dict or None: The value of DEPRECATED_LOOKUP if it exists, None otherwise.
"""
with open(file_path) as file:
tree = ast.parse(file.read(), filename=file_path)
tree = ast.parse(Path(file_path).read_text(), filename=file_path)
for node in ast.walk(tree):
if isinstance(node, ast.Assign):