cli: Add ruff rule UP (pyupgrade) (#31843)

See https://docs.astral.sh/ruff/rules/#pyupgrade-up
All auto-fixed

Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
Christophe Bornet 2025-07-03 16:12:46 +02:00 committed by GitHub
parent cd7dce687a
commit b8e9b4adfc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 61 additions and 66 deletions

View File

@ -1,7 +1,6 @@
from typing import Optional from typing import Annotated, Optional
import typer import typer
from typing_extensions import Annotated
from langchain_cli._version import __version__ from langchain_cli._version import __version__
from langchain_cli.namespaces import app as app_namespace from langchain_cli.namespaces import app as app_namespace

View File

@ -3,7 +3,7 @@
Development Scripts for template packages Development Scripts for template packages
""" """
from typing import Sequence from collections.abc import Sequence
from fastapi import FastAPI from fastapi import FastAPI
from langserve import add_routes from langserve import add_routes

View File

@ -7,10 +7,9 @@ import subprocess
import sys import sys
import warnings import warnings
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Tuple from typing import Annotated, Optional
import typer import typer
from typing_extensions import Annotated
from langchain_cli.utils.events import create_events from langchain_cli.utils.events import create_events
from langchain_cli.utils.git import ( from langchain_cli.utils.git import (
@ -44,7 +43,7 @@ def new(
] = None, ] = None,
*, *,
package: Annotated[ package: Annotated[
Optional[List[str]], Optional[list[str]],
typer.Option(help="Packages to seed the project with"), typer.Option(help="Packages to seed the project with"),
] = None, ] = None,
pip: Annotated[ pip: Annotated[
@ -132,19 +131,19 @@ def new(
@app_cli.command() @app_cli.command()
def add( def add(
dependencies: Annotated[ dependencies: Annotated[
Optional[List[str]], typer.Argument(help="The dependency to add") Optional[list[str]], typer.Argument(help="The dependency to add")
] = None, ] = None,
*, *,
api_path: Annotated[List[str], typer.Option(help="API paths to add")] = [], api_path: Annotated[list[str], typer.Option(help="API paths to add")] = [],
project_dir: Annotated[ project_dir: Annotated[
Optional[Path], typer.Option(help="The project directory") Optional[Path], typer.Option(help="The project directory")
] = None, ] = None,
repo: Annotated[ repo: Annotated[
List[str], list[str],
typer.Option(help="Install templates from a specific github repo instead"), typer.Option(help="Install templates from a specific github repo instead"),
] = [], ] = [],
branch: Annotated[ branch: Annotated[
List[str], typer.Option(help="Install templates from a specific branch") list[str], typer.Option(help="Install templates from a specific branch")
] = [], ] = [],
pip: Annotated[ pip: Annotated[
bool, bool,
@ -181,16 +180,16 @@ def add(
) )
# group by repo/ref # group by repo/ref
grouped: Dict[Tuple[str, Optional[str]], List[DependencySource]] = {} grouped: dict[tuple[str, Optional[str]], list[DependencySource]] = {}
for dep in parsed_deps: for dep in parsed_deps:
key_tup = (dep["git"], dep["ref"]) key_tup = (dep["git"], dep["ref"])
lst = grouped.get(key_tup, []) lst = grouped.get(key_tup, [])
lst.append(dep) lst.append(dep)
grouped[key_tup] = lst grouped[key_tup] = lst
installed_destination_paths: List[Path] = [] installed_destination_paths: list[Path] = []
installed_destination_names: List[str] = [] installed_destination_names: list[str] = []
installed_exports: List[LangServeExport] = [] installed_exports: list[LangServeExport] = []
for (git, ref), group_deps in grouped.items(): for (git, ref), group_deps in grouped.items():
if len(group_deps) == 1: if len(group_deps) == 1:
@ -295,7 +294,7 @@ def add(
@app_cli.command() @app_cli.command()
def remove( def remove(
api_paths: Annotated[List[str], typer.Argument(help="The API paths to remove")], api_paths: Annotated[list[str], typer.Argument(help="The API paths to remove")],
*, *,
project_dir: Annotated[ project_dir: Annotated[
Optional[Path], typer.Option(help="The project directory") Optional[Path], typer.Option(help="The project directory")
@ -311,7 +310,7 @@ def remove(
package_root = project_root / "packages" package_root = project_root / "packages"
remove_deps: List[str] = [] remove_deps: list[str] = []
for api_path in api_paths: for api_path in api_paths:
package_dir = package_root / api_path package_dir = package_root / api_path

View File

@ -6,10 +6,10 @@ import re
import shutil import shutil
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Dict, Optional, cast from typing import Annotated, Optional, cast
import typer import typer
from typing_extensions import Annotated, TypedDict from typing_extensions import TypedDict
from langchain_cli.utils.find_replace import replace_file, replace_glob from langchain_cli.utils.find_replace import replace_file, replace_glob
@ -123,7 +123,7 @@ def new(
shutil.move(destination_dir / "integration_template", package_dir) shutil.move(destination_dir / "integration_template", package_dir)
# replacements in files # replacements in files
replace_glob(destination_dir, "**/*", cast(Dict[str, str], replacements)) replace_glob(destination_dir, "**/*", cast(dict[str, str], replacements))
# poetry install # poetry install
subprocess.run( subprocess.run(
@ -167,7 +167,7 @@ def new(
for src_path, dst_path in zip(src_paths, dst_paths): for src_path, dst_path in zip(src_paths, dst_paths):
shutil.copy(src_path, dst_path) shutil.copy(src_path, dst_path)
replace_file(dst_path, cast(Dict[str, str], replacements)) replace_file(dst_path, cast(dict[str, str], replacements))
TEMPLATE_MAP: dict[str, str] = { TEMPLATE_MAP: dict[str, str] = {

View File

@ -3,12 +3,11 @@
import importlib import importlib
import inspect import inspect
import pkgutil import pkgutil
from typing import List, Tuple
def generate_raw_migrations( def generate_raw_migrations(
from_package: str, to_package: str, filter_by_all: bool = False from_package: str, to_package: str, filter_by_all: bool = False
) -> List[Tuple[str, str]]: ) -> list[tuple[str, str]]:
"""Scan the `langchain` package and generate migrations for all modules.""" """Scan the `langchain` package and generate migrations for all modules."""
package = importlib.import_module(from_package) package = importlib.import_module(from_package)
@ -56,7 +55,7 @@ def generate_raw_migrations(
return items return items
def generate_top_level_imports(pkg: str) -> List[Tuple[str, str]]: def generate_top_level_imports(pkg: str) -> list[tuple[str, str]]:
"""This code will look at all the top level modules in langchain_community. """This code will look at all the top level modules in langchain_community.
It'll attempt to import everything from each __init__ file It'll attempt to import everything from each __init__ file
@ -121,7 +120,7 @@ def generate_top_level_imports(pkg: str) -> List[Tuple[str, str]]:
def generate_simplified_migrations( def generate_simplified_migrations(
from_package: str, to_package: str, filter_by_all: bool = True from_package: str, to_package: str, filter_by_all: bool = True
) -> List[Tuple[str, str]]: ) -> list[tuple[str, str]]:
"""Get all the raw migrations, then simplify them if possible.""" """Get all the raw migrations, then simplify them if possible."""
raw_migrations = generate_raw_migrations( raw_migrations = generate_raw_migrations(
from_package, to_package, filter_by_all=filter_by_all from_package, to_package, filter_by_all=filter_by_all

View File

@ -1,13 +1,10 @@
from typing import List, Tuple def split_package(package: str) -> tuple[str, str]:
def split_package(package: str) -> Tuple[str, str]:
"""Split a package name into the containing package and the final name""" """Split a package name into the containing package and the final name"""
parts = package.split(".") parts = package.split(".")
return ".".join(parts[:-1]), parts[-1] return ".".join(parts[:-1]), parts[-1]
def dump_migrations_as_grit(name: str, migration_pairs: List[Tuple[str, str]]): def dump_migrations_as_grit(name: str, migration_pairs: list[tuple[str, str]]):
"""Dump the migration pairs as a Grit file.""" """Dump the migration pairs as a Grit file."""
output = "language python" output = "language python"
remapped = ",\n".join( remapped = ",\n".join(

View File

@ -1,7 +1,6 @@
"""Generate migrations for partner packages.""" """Generate migrations for partner packages."""
import importlib import importlib
from typing import List, Tuple
from langchain_core.documents import BaseDocumentCompressor, BaseDocumentTransformer from langchain_core.documents import BaseDocumentCompressor, BaseDocumentTransformer
from langchain_core.embeddings import Embeddings from langchain_core.embeddings import Embeddings
@ -19,7 +18,7 @@ from langchain_cli.namespaces.migrate.generate.utils import (
# PUBLIC API # PUBLIC API
def get_migrations_for_partner_package(pkg_name: str) -> List[Tuple[str, str]]: def get_migrations_for_partner_package(pkg_name: str) -> list[tuple[str, str]]:
"""Generate migrations from community package to partner package. """Generate migrations from community package to partner package.
This code works This code works

View File

@ -3,7 +3,7 @@ import inspect
import os import os
import pathlib import pathlib
from pathlib import Path from pathlib import Path
from typing import Any, List, Optional, Tuple, Type from typing import Any, Optional
HERE = Path(__file__).parent HERE = Path(__file__).parent
# Should bring us to [root]/src # Should bring us to [root]/src
@ -29,7 +29,7 @@ class ImportExtractor(ast.NodeVisitor):
self.generic_visit(node) self.generic_visit(node)
def _get_class_names(code: str) -> List[str]: def _get_class_names(code: str) -> list[str]:
"""Extract class names from a code string.""" """Extract class names from a code string."""
# Parse the content of the file into an AST # Parse the content of the file into an AST
tree = ast.parse(code) tree = ast.parse(code)
@ -49,7 +49,7 @@ def _get_class_names(code: str) -> List[str]:
return class_names return class_names
def is_subclass(class_obj: Any, classes_: List[Type]) -> bool: def is_subclass(class_obj: Any, classes_: list[type]) -> bool:
"""Check if the given class object is a subclass of any class in list classes.""" """Check if the given class object is a subclass of any class in list classes."""
return any( return any(
issubclass(class_obj, kls) issubclass(class_obj, kls)
@ -58,7 +58,7 @@ def is_subclass(class_obj: Any, classes_: List[Type]) -> bool:
) )
def find_subclasses_in_module(module, classes_: List[Type]) -> List[str]: def find_subclasses_in_module(module, classes_: list[type]) -> list[str]:
"""Find all classes in the module that inherit from one of the classes.""" """Find all classes in the module that inherit from one of the classes."""
subclasses = [] subclasses = []
# Iterate over all attributes of the module that are classes # Iterate over all attributes of the module that are classes
@ -68,7 +68,7 @@ def find_subclasses_in_module(module, classes_: List[Type]) -> List[str]:
return subclasses return subclasses
def _get_all_classnames_from_file(file: Path, pkg: str) -> List[Tuple[str, str]]: def _get_all_classnames_from_file(file: Path, pkg: str) -> list[tuple[str, str]]:
"""Extract all class names from a file.""" """Extract all class names from a file."""
with open(file, encoding="utf-8") as f: with open(file, encoding="utf-8") as f:
code = f.read() code = f.read()
@ -80,7 +80,7 @@ def _get_all_classnames_from_file(file: Path, pkg: str) -> List[Tuple[str, str]]
def identify_all_imports_in_file( def identify_all_imports_in_file(
file: str, *, from_package: Optional[str] = None file: str, *, from_package: Optional[str] = None
) -> List[Tuple[str, str]]: ) -> list[tuple[str, str]]:
"""Let's also identify all the imports in the given file.""" """Let's also identify all the imports in the given file."""
with open(file, encoding="utf-8") as f: with open(file, encoding="utf-8") as f:
code = f.read() code = f.read()
@ -103,7 +103,7 @@ def identify_pkg_source(pkg_root: str) -> pathlib.Path:
return matching_dirs[0] return matching_dirs[0]
def list_classes_by_package(pkg_root: str) -> List[Tuple[str, str]]: def list_classes_by_package(pkg_root: str) -> list[tuple[str, str]]:
"""List all classes in a package.""" """List all classes in a package."""
module_classes = [] module_classes = []
pkg_source = identify_pkg_source(pkg_root) pkg_source = identify_pkg_source(pkg_root)
@ -117,7 +117,7 @@ def list_classes_by_package(pkg_root: str) -> List[Tuple[str, str]]:
return module_classes return module_classes
def list_init_imports_by_package(pkg_root: str) -> List[Tuple[str, str]]: def list_init_imports_by_package(pkg_root: str) -> list[tuple[str, str]]:
"""List all the things that are being imported in a package by module.""" """List all the things that are being imported in a package by module."""
imports = [] imports = []
pkg_source = identify_pkg_source(pkg_root) pkg_source = identify_pkg_source(pkg_root)
@ -135,7 +135,7 @@ def list_init_imports_by_package(pkg_root: str) -> List[Tuple[str, str]]:
def find_imports_from_package( def find_imports_from_package(
code: str, *, from_package: Optional[str] = None code: str, *, from_package: Optional[str] = None
) -> List[Tuple[str, str]]: ) -> list[tuple[str, str]]:
# Parse the code into an AST # Parse the code into an AST
tree = ast.parse(code) tree = ast.parse(code)
# Create an instance of the visitor # Create an instance of the visitor

View File

@ -6,10 +6,9 @@ import re
import shutil import shutil
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Annotated, Optional
import typer import typer
from typing_extensions import Annotated
from langchain_cli.utils.packages import get_langserve_export, get_package_root from langchain_cli.utils.packages import get_langserve_export, get_package_root

View File

@ -1,16 +1,16 @@
import http.client import http.client
import json import json
from typing import Any, Dict, List, Optional, TypedDict from typing import Any, Optional, TypedDict
WRITE_KEY = "310apTK0HUFl4AOv" WRITE_KEY = "310apTK0HUFl4AOv"
class EventDict(TypedDict): class EventDict(TypedDict):
event: str event: str
properties: Optional[Dict[str, Any]] properties: Optional[dict[str, Any]]
def create_events(events: List[EventDict]) -> Optional[Any]: def create_events(events: list[EventDict]) -> Optional[Any]:
try: try:
data = { data = {
"events": [ "events": [

View File

@ -1,8 +1,7 @@
from pathlib import Path from pathlib import Path
from typing import Dict
def find_and_replace(source: str, replacements: Dict[str, str]) -> str: def find_and_replace(source: str, replacements: dict[str, str]) -> str:
rtn = source rtn = source
# replace keys in deterministic alphabetical order # replace keys in deterministic alphabetical order

View File

@ -1,8 +1,9 @@
import hashlib import hashlib
import re import re
import shutil import shutil
from collections.abc import Sequence
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Sequence, TypedDict from typing import Optional, TypedDict
from git import Repo from git import Repo
@ -18,7 +19,7 @@ class DependencySource(TypedDict):
ref: Optional[str] ref: Optional[str]
subdirectory: Optional[str] subdirectory: Optional[str]
api_path: Optional[str] api_path: Optional[str]
event_metadata: Dict event_metadata: dict
# use poetry dependency string format # use poetry dependency string format
@ -104,7 +105,7 @@ def parse_dependency_string(
) )
def _list_arg_to_length(arg: Optional[List[str]], num: int) -> Sequence[Optional[str]]: def _list_arg_to_length(arg: Optional[list[str]], num: int) -> Sequence[Optional[str]]:
if not arg: if not arg:
return [None] * num return [None] * num
elif len(arg) == 1: elif len(arg) == 1:
@ -116,11 +117,11 @@ def _list_arg_to_length(arg: Optional[List[str]], num: int) -> Sequence[Optional
def parse_dependencies( def parse_dependencies(
dependencies: Optional[List[str]], dependencies: Optional[list[str]],
repo: List[str], repo: list[str],
branch: List[str], branch: list[str],
api_path: List[str], api_path: list[str],
) -> List[DependencySource]: ) -> list[DependencySource]:
num_deps = max( num_deps = max(
len(dependencies) if dependencies is not None else 0, len(repo), len(branch) len(dependencies) if dependencies is not None else 0, len(repo), len(branch)
) )
@ -150,7 +151,7 @@ def parse_dependencies(
def _get_repo_path(gitstring: str, ref: Optional[str], repo_dir: Path) -> Path: def _get_repo_path(gitstring: str, ref: Optional[str], repo_dir: Path) -> Path:
# only based on git for now # only based on git for now
ref_str = ref if ref is not None else "" ref_str = ref if ref is not None else ""
hashed = hashlib.sha256((f"{gitstring}:{ref_str}").encode("utf-8")).hexdigest()[:8] hashed = hashlib.sha256((f"{gitstring}:{ref_str}").encode()).hexdigest()[:8]
removed_protocol = gitstring.split("://")[-1] removed_protocol = gitstring.split("://")[-1]
removed_basename = re.split(r"[/:]", removed_protocol, 1)[-1] removed_basename = re.split(r"[/:]", removed_protocol, 1)[-1]

View File

@ -1,5 +1,5 @@
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional, Set, TypedDict from typing import Any, Optional, TypedDict
from tomlkit import load from tomlkit import load
@ -7,7 +7,7 @@ from tomlkit import load
def get_package_root(cwd: Optional[Path] = None) -> Path: def get_package_root(cwd: Optional[Path] = None) -> Path:
# traverse path for routes to host (any directory holding a pyproject.toml file) # traverse path for routes to host (any directory holding a pyproject.toml file)
package_root = Path.cwd() if cwd is None else cwd package_root = Path.cwd() if cwd is None else cwd
visited: Set[Path] = set() visited: set[Path] = set()
while package_root not in visited: while package_root not in visited:
visited.add(package_root) visited.add(package_root)
@ -35,7 +35,7 @@ class LangServeExport(TypedDict):
def get_langserve_export(filepath: Path) -> LangServeExport: def get_langserve_export(filepath: Path) -> LangServeExport:
with open(filepath) as f: with open(filepath) as f:
data: Dict[str, Any] = load(f) data: dict[str, Any] = load(f)
try: try:
module = data["tool"]["langserve"]["export_module"] module = data["tool"]["langserve"]["export_module"]
attr = data["tool"]["langserve"]["export_attr"] attr = data["tool"]["langserve"]["export_attr"]

View File

@ -1,5 +1,6 @@
from collections.abc import Iterable
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Iterable, Tuple from typing import Any
from tomlkit import dump, inline_table, load from tomlkit import dump, inline_table, load
from tomlkit.items import InlineTable from tomlkit.items import InlineTable
@ -12,12 +13,12 @@ def _get_dep_inline_table(path: Path) -> InlineTable:
def add_dependencies_to_pyproject_toml( def add_dependencies_to_pyproject_toml(
pyproject_toml: Path, local_editable_dependencies: Iterable[Tuple[str, Path]] pyproject_toml: Path, local_editable_dependencies: Iterable[tuple[str, Path]]
) -> None: ) -> None:
"""Add dependencies to pyproject.toml.""" """Add dependencies to pyproject.toml."""
with open(pyproject_toml, encoding="utf-8") as f: with open(pyproject_toml, encoding="utf-8") as f:
# tomlkit types aren't amazing - treat as Dict instead # tomlkit types aren't amazing - treat as Dict instead
pyproject: Dict[str, Any] = load(f) pyproject: dict[str, Any] = load(f)
pyproject["tool"]["poetry"]["dependencies"].update( pyproject["tool"]["poetry"]["dependencies"].update(
{ {
name: _get_dep_inline_table(loc.relative_to(pyproject_toml.parent)) name: _get_dep_inline_table(loc.relative_to(pyproject_toml.parent))
@ -33,7 +34,7 @@ def remove_dependencies_from_pyproject_toml(
) -> None: ) -> None:
"""Remove dependencies from pyproject.toml.""" """Remove dependencies from pyproject.toml."""
with open(pyproject_toml, encoding="utf-8") as f: with open(pyproject_toml, encoding="utf-8") as f:
pyproject: Dict[str, Any] = load(f) pyproject: dict[str, Any] = load(f)
# tomlkit types aren't amazing - treat as Dict instead # tomlkit types aren't amazing - treat as Dict instead
dependencies = pyproject["tool"]["poetry"]["dependencies"] dependencies = pyproject["tool"]["poetry"]["dependencies"]
for name in local_editable_dependencies: for name in local_editable_dependencies:

View File

@ -45,7 +45,9 @@ select = [
"F", # pyflakes "F", # pyflakes
"I", # isort "I", # isort
"T201", # print "T201", # print
"UP", # pyupgrade
] ]
ignore = ["UP007",]
[tool.mypy] [tool.mypy]
exclude = [ exclude = [

View File

@ -100,7 +100,7 @@ def partner(pkg: str, output: str) -> None:
@click.argument("json_file") @click.argument("json_file")
def json_to_grit(json_file: str) -> None: def json_to_grit(json_file: str) -> None:
"""Generate a Grit migration from an old JSON migration file.""" """Generate a Grit migration from an old JSON migration file."""
with open(json_file, "r") as f: with open(json_file) as f:
migrations = json.load(f) migrations = json.load(f)
name = os.path.basename(json_file).removesuffix(".json").removesuffix(".grit") name = os.path.basename(json_file).removesuffix(".json").removesuffix(".grit")
data = dump_migrations_as_grit(name, migrations) data = dump_migrations_as_grit(name, migrations)

View File

@ -1,4 +1,4 @@
from typing import Dict, Optional from typing import Optional
import pytest import pytest
@ -16,7 +16,7 @@ def _assert_dependency_equals(
git: Optional[str] = None, git: Optional[str] = None,
ref: Optional[str] = None, ref: Optional[str] = None,
subdirectory: Optional[str] = None, subdirectory: Optional[str] = None,
event_metadata: Optional[Dict] = None, event_metadata: Optional[dict] = None,
) -> None: ) -> None:
assert dep["git"] == git assert dep["git"] == git
assert dep["ref"] == ref assert dep["ref"] == ref