core: Add ruff rules PTH (pathlib) (#29338)

See https://docs.astral.sh/ruff/rules/#flake8-use-pathlib-pth

Co-authored-by: ccurme <chester.curme@gmail.com>
This commit is contained in:
Christophe Bornet
2025-02-28 19:22:20 +01:00
committed by GitHub
parent 86b364de3b
commit 9e6ffd1264
11 changed files with 40 additions and 53 deletions

View File

@@ -354,7 +354,7 @@ def test_prompt_from_file_with_partial_variables() -> None:
template = "This is a {foo} test {bar}."
partial_variables = {"bar": "baz"}
# when
with mock.patch("builtins.open", mock.mock_open(read_data=template)):
with mock.patch("pathlib.Path.open", mock.mock_open(read_data=template)):
prompt = PromptTemplate.from_file(
"mock_file_name", partial_variables=partial_variables
)

View File

@@ -1,20 +1,17 @@
import concurrent.futures
import glob
import importlib
import subprocess
from pathlib import Path
def test_importable_all() -> None:
for path in glob.glob("../core/langchain_core/*"):
relative_path = Path(path).parts[-1]
if relative_path.endswith(".typed"):
continue
module_name = relative_path.split(".")[0]
module = importlib.import_module("langchain_core." + module_name)
all_ = getattr(module, "__all__", [])
for cls_ in all_:
getattr(module, cls_)
for path in Path("../core/langchain_core/").glob("*"):
module_name = path.stem
if not module_name.startswith(".") and path.suffix != ".typed":
module = importlib.import_module("langchain_core." + module_name)
all_ = getattr(module, "__all__", [])
for cls_ in all_:
getattr(module, cls_)
def try_to_import(module_name: str) -> tuple[int, str]:
@@ -37,12 +34,10 @@ def test_importable_all_via_subprocess() -> None:
for one sequence of imports but not another.
"""
module_names = []
for path in glob.glob("../core/langchain_core/*"):
relative_path = Path(path).parts[-1]
if relative_path.endswith(".typed"):
continue
module_name = relative_path.split(".")[0]
module_names.append(module_name)
for path in Path("../core/langchain_core/").glob("*"):
module_name = path.stem
if not module_name.startswith(".") and path.suffix != ".typed":
module_names.append(module_name)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [