DB-GPT/dbgpt/util/dbgpts/repo.py
明天 d7a893e1a7
feat: new dbgpts modules (#1910)
Co-authored-by: 途杨 <tuyang.yhj@antgroup.com>
Co-authored-by: lhwan <1017484907@qq.com>
2024-08-28 21:31:42 +08:00

508 lines
16 KiB
Python

import functools
import logging
import os
import shutil
import subprocess
from pathlib import Path
from typing import List, Tuple
from rich.table import Table
from ..console import CliLogger
from ..i18n_utils import _
from .base import (
DBGPTS_METADATA_FILE,
DBGPTS_REPO_HOME,
DEFAULT_PACKAGES,
DEFAULT_REPO_MAP,
INSTALL_DIR,
INSTALL_METADATA_FILE,
_print_path,
)
from .loader import _load_package_from_path
cl = CliLogger()
_DEFAULT_REPO = "eosphoros/dbgpts"
logger = logging.getLogger(__name__)
@functools.cache
def list_repos() -> List[str]:
"""List all repos
Returns:
List[str]: List of repos
"""
repos = set()
for repo in os.listdir(DBGPTS_REPO_HOME):
full_path = os.path.join(DBGPTS_REPO_HOME, repo)
if os.path.isdir(full_path):
for sub_repo in os.listdir(full_path):
if os.path.isdir(os.path.join(full_path, sub_repo)):
repos.add(f"{repo}/{sub_repo}")
repos.add(_DEFAULT_REPO)
return sorted(list(repos))
def _get_repo_path(repo: str) -> Path:
repo_arr = repo.split("/")
if len(repo_arr) != 2:
cl.error(
f"Invalid repo name '{repo}', repo name must split by '/', "
f"eg.(eosphoros/dbgpts).",
exit_code=1,
)
return Path(DBGPTS_REPO_HOME) / repo_arr[0] / repo_arr[1]
def _list_repos_details() -> List[Tuple[str, str]]:
repos = list_repos()
results = []
for repo in repos:
repo_arr = repo.split("/")
repo_group, repo_name = repo_arr
full_path = os.path.join(DBGPTS_REPO_HOME, repo_group, repo_name)
results.append((repo, full_path))
return results
def _print_repos():
"""Print all repos"""
repos = _list_repos_details()
repos.sort(key=lambda x: (x[0], x[1]))
table = Table(title=_("Repos"))
table.add_column(_("Repository"), justify="right", style="cyan", no_wrap=True)
table.add_column(_("Path"), justify="right", style="green")
for repo, full_path in repos:
full_path = _print_path(full_path)
table.add_row(repo, full_path)
cl.print(table)
def _install_default_repos_if_no_repos():
"""Install the default repos if no repos exist."""
has_repos = False
for repo, full_path in _list_repos_details():
if os.path.exists(full_path):
has_repos = True
break
if not has_repos:
repo_url = DEFAULT_REPO_MAP[_DEFAULT_REPO]
cl.info(
f"No repos found, installing default repos {_DEFAULT_REPO} from {repo_url}"
)
add_repo(_DEFAULT_REPO, repo_url)
def add_repo(repo: str, repo_url: str, branch: str | None = None):
"""Add a new repo
Args:
repo (str): The name of the repo
repo_url (str): The URL of the repo
branch (str): The branch of the repo
"""
exist_repos = list_repos()
if repo in exist_repos and repo_url not in DEFAULT_REPO_MAP.values():
cl.error(f"The repo '{repo}' already exists.", exit_code=1)
repo_arr = repo.split("/")
if len(repo_arr) != 2:
cl.error(
f"Invalid repo name '{repo}', repo name must split by '/', "
"eg.(eosphoros/dbgpts).",
exit_code=1,
)
repo_name = repo_arr[1]
repo_group_dir = os.path.join(DBGPTS_REPO_HOME, repo_arr[0])
os.makedirs(repo_group_dir, exist_ok=True)
if repo_url.startswith("http") or repo_url.startswith("git"):
clone_repo(repo, repo_group_dir, repo_name, repo_url, branch)
elif os.path.isdir(repo_url):
# Create soft link
os.symlink(repo_url, os.path.join(repo_group_dir, repo_name))
def remove_repo(repo: str):
"""Remove the specified repo
Args:
repo (str): The name of the repo
"""
repo_path = _get_repo_path(repo)
if not os.path.exists(repo_path):
cl.error(f"The repo '{repo}' does not exist.", exit_code=1)
if os.path.islink(repo_path):
os.unlink(repo_path)
else:
shutil.rmtree(repo_path)
cl.info(f"Repo '{repo}' removed successfully.")
def clone_repo(
repo: str,
repo_group_dir: str,
repo_name: str,
repo_url: str,
branch: str | None = None,
):
"""Clone the specified repo
Args:
repo (str): The name of the repo
repo_group_dir (str): The directory of the repo group
repo_name (str): The name of the repo
repo_url (str): The URL of the repo
branch (str): The branch of the repo
"""
os.chdir(repo_group_dir)
clone_command = ["git", "clone", repo_url, repo_name]
# If the branch is specified, add it to the clone command
if branch:
clone_command += ["-b", branch]
# subprocess.run(clone_command, check=True)
process = subprocess.Popen(
clone_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = process.communicate()
if branch:
cl.info(
f"Repo '{repo}' cloned from {repo_url} with branch '{branch}' successfully."
)
else:
cl.info(f"Repo '{repo}' cloned from {repo_url} successfully.")
def update_repo(repo: str):
"""Update the specified repo
Args:
repo (str): The name of the repo
"""
cl.info(f"Updating repo '{repo}'...")
repo_path = os.path.join(DBGPTS_REPO_HOME, repo)
if not os.path.exists(repo_path):
if repo in DEFAULT_REPO_MAP:
add_repo(repo, DEFAULT_REPO_MAP[repo])
if not os.path.exists(repo_path):
cl.error(f"The repo '{repo}' does not exist.", exit_code=1)
else:
cl.error(f"The repo '{repo}' does not exist.", exit_code=1)
os.chdir(repo_path)
if not os.path.exists(".git"):
cl.info(f"Repo '{repo}' is not a git repository.")
return
cl.info(f"Updating repo '{repo}'...")
subprocess.run(["git", "pull"], check=False)
def install(
name: str,
repo: str | None = None,
with_update: bool = True,
):
"""Install the specified dbgpt from the specified repo
Args:
name (str): The name of the dbgpt
repo (str): The name of the repo
with_update (bool): Whether to update the repo before installing
"""
repo_info = check_with_retry(name, repo, with_update=with_update, is_first=True)
if not repo_info:
cl.error(f"The specified dbgpt '{name}' does not exist.", exit_code=1)
repo, dbgpt_path = repo_info
copy_and_install(repo, name, dbgpt_path)
def uninstall(name: str):
"""Uninstall the specified dbgpt
Args:
name (str): The name of the dbgpt
"""
install_path = INSTALL_DIR / name
if not install_path.exists():
cl.error(f"The dbgpt '{name}' has not been installed yet.", exit_code=1)
os.chdir(install_path)
subprocess.run(["pip", "uninstall", name, "-y"], check=True)
shutil.rmtree(install_path)
cl.info(f"Uninstalling dbgpt '{name}'...")
def inner_uninstall(name: str):
"""Uninstall the specified dbgpt
Args:
name (str): The name of the dbgpt
"""
install_path = INSTALL_DIR / name
if not install_path.exists():
logger.warning(f"The dbgpt '{name}' has not been installed yet.")
return
os.chdir(install_path)
# subprocess.run(["pip", "uninstall", name, "-y"], check=True)
process = subprocess.Popen(
["pip", "uninstall", name, "-y"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = process.communicate()
logger.info(f"{out},{err}")
shutil.rmtree(install_path)
logger.info(f"Uninstalling dbgpt '{name}'...")
def inner_copy_and_install(repo: str, name: str, package_path: Path):
if not package_path.exists():
raise ValueError(
f"The specified dbgpt '{name}' does not exist in the {repo} tap."
)
install_path = INSTALL_DIR / name
if install_path.exists():
logger.info(
f"The dbgpt '{name}' has already been installed"
f"({_print_path(install_path)})."
)
return True
try:
shutil.copytree(package_path, install_path)
logger.info(f"Installing dbgpts '{name}' from {repo}...")
os.chdir(install_path)
# subprocess.run(["poetry", "build"], check=True)
process = subprocess.Popen(
["poetry", "build"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = process.communicate()
logger.info(f"{out},{err}")
wheel_files = list(install_path.glob("dist/*.whl"))
if not wheel_files:
logger.error(
"No wheel file found after building the package.",
)
raise ValueError("No wheel file found after building the package.")
# Install the wheel file using pip
wheel_file = wheel_files[0]
logger.info(
f"Installing dbgpts '{name}' wheel file {_print_path(wheel_file)}..."
)
# subprocess.run(["pip", "install", str(wheel_file)], check=True)
process = subprocess.Popen(
["pip", "install", str(wheel_file)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = process.communicate()
logger.info(f"{out},{err}")
_write_install_metadata(name, repo, install_path)
logger.info(f"Installed dbgpts at {_print_path(install_path)}.")
logger.info(f"dbgpts '{name}' installed successfully.")
except Exception as e:
if install_path.exists():
shutil.rmtree(install_path)
raise e
def copy_and_install(repo: str, name: str, package_path: Path):
if not package_path.exists():
cl.error(
f"The specified dbgpt '{name}' does not exist in the {repo} tap.",
exit_code=1,
)
install_path = INSTALL_DIR / name
if install_path.exists():
cl.error(
f"The dbgpt '{name}' has already been installed"
f"({_print_path(install_path)}).",
exit_code=1,
)
try:
shutil.copytree(package_path, install_path)
cl.info(f"Installing dbgpts '{name}' from {repo}...")
os.chdir(install_path)
subprocess.run(["poetry", "build"], check=True)
wheel_files = list(install_path.glob("dist/*.whl"))
if not wheel_files:
cl.error("No wheel file found after building the package.", exit_code=1)
# Install the wheel file using pip
wheel_file = wheel_files[0]
cl.info(f"Installing dbgpts '{name}' wheel file {_print_path(wheel_file)}...")
subprocess.run(["pip", "install", str(wheel_file)], check=True)
_write_install_metadata(name, repo, install_path)
cl.success(f"Installed dbgpts at {_print_path(install_path)}.")
cl.success(f"dbgpts '{name}' installed successfully.")
except Exception as e:
if install_path.exists():
shutil.rmtree(install_path)
raise e
def _write_install_metadata(name: str, repo: str, install_path: Path):
import tomlkit
install_metadata = {
"name": name,
"repo": repo,
}
with open(install_path / INSTALL_METADATA_FILE, "w") as f:
tomlkit.dump(install_metadata, f)
def check_with_retry(
name: str,
spec_repo: str | None = None,
with_update: bool = False,
is_first: bool = False,
) -> Tuple[str, Path] | None:
"""Check the specified dbgpt with retry.
Args:
name (str): The name of the dbgpt
spec_repo (str): The name of the repo
with_update (bool): Whether to update the repo before installing
is_first (bool): Whether it's the first time to check the dbgpt
Returns:
Tuple[str, Path] | None: The repo and the path of the dbgpt
"""
repos = _list_repos_details()
if spec_repo:
repos = list(filter(lambda x: x[0] == spec_repo, repos))
if not repos:
cl.error(f"The specified repo '{spec_repo}' does not exist.", exit_code=1)
if is_first and with_update:
for repo in repos:
update_repo(repo[0])
for repo in repos:
repo_path = Path(repo[1])
for package in DEFAULT_PACKAGES:
dbgpt_path = repo_path / package / name
dbgpt_metadata_path = dbgpt_path / DBGPTS_METADATA_FILE
if (
dbgpt_path.exists()
and dbgpt_path.is_dir()
and dbgpt_metadata_path.exists()
):
return repo[0], dbgpt_path
if is_first:
return check_with_retry(
name, spec_repo, with_update=with_update, is_first=False
)
return None
def list_repo_apps(repo: str | None = None, with_update: bool = True):
"""List all installed dbgpts"""
repos = _list_repos_details()
if repo:
repos = list(filter(lambda x: x[0] == repo, repos))
if not repos:
cl.error(f"The specified repo '{repo}' does not exist.", exit_code=1)
if with_update:
for repo in repos:
update_repo(repo[0])
table = Table(title=_("dbgpts In All Repos"))
table.add_column(_("Repository"), justify="right", style="cyan", no_wrap=True)
table.add_column(_("Type"), style="magenta")
table.add_column(_("Name"), justify="right", style="green")
data = []
for repo in repos:
repo_path = Path(repo[1])
for package in DEFAULT_PACKAGES:
dbgpt_path = repo_path / package
for app in os.listdir(dbgpt_path):
dbgpt_metadata_path = dbgpt_path / app / DBGPTS_METADATA_FILE
if (
dbgpt_path.exists()
and dbgpt_path.is_dir()
and dbgpt_metadata_path.exists()
):
data.append((repo[0], package, app))
# Sort by repo name, package name, and app name
data.sort(key=lambda x: (x[0], x[1], x[2]))
for repo, package, app in data:
table.add_row(repo, package, app)
cl.print(table)
def update_repo_inner(repo: str):
logger.info(f"Updating repo '{repo}'...")
repo_path = os.path.join(DBGPTS_REPO_HOME, repo)
if not os.path.exists(repo_path):
if repo in DEFAULT_REPO_MAP:
add_repo(repo, DEFAULT_REPO_MAP[repo])
if not os.path.exists(repo_path):
raise ValueError(f"The repo '{repo}' does not exist.")
else:
raise ValueError(f"The repo '{repo}' does not exist.")
os.chdir(repo_path)
if not os.path.exists(".git"):
logger.info(f"Repo '{repo}' is not a git repository.")
return
logger.info(f"Updating repo '{repo}'...")
process = subprocess.Popen(
["git", "pull"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = process.communicate()
logger.info(f"{out},{err}")
def list_dbgpts(
spec_repo: str | None = None, with_update: bool = True
) -> List[Tuple[str, str, str, str]]:
"""scan dbgpts in repo
Args:
spec_repo: The name of the repo
Returns:
Tuple[str, Path] | None: The repo and the path of the dbgpt
"""
repos = _list_repos_details()
if spec_repo:
repos = list(filter(lambda x: x[0] == spec_repo, repos))
if not repos:
raise ValueError(f"The specified repo '{spec_repo}' does not exist.")
if with_update:
for repo in repos:
update_repo_inner(repo[0])
data = []
for repo in repos:
repo_path = Path(repo[1])
for package in DEFAULT_PACKAGES:
dbgpt_path = repo_path / package
for app in os.listdir(dbgpt_path):
gpts_path = dbgpt_path / app
dbgpt_metadata_path = dbgpt_path / app / DBGPTS_METADATA_FILE
if (
dbgpt_path.exists()
and dbgpt_path.is_dir()
and dbgpt_metadata_path.exists()
):
data.append((repo[0], package, app, gpts_path))
return data
def list_installed_apps():
"""List all installed dbgpts"""
packages = _load_package_from_path(INSTALL_DIR)
table = Table(title=_("Installed dbgpts"))
table.add_column(_("Name"), justify="right", style="cyan", no_wrap=True)
table.add_column(_("Type"), style="blue")
table.add_column(_("Repository"), style="magenta")
table.add_column(_("Path"), justify="right", style="green")
packages.sort(key=lambda x: (x.package, x.package_type, x.repo))
for package in packages:
str_path = package.root
str_path = _print_path(str_path)
table.add_row(package.package, package.package_type, package.repo, str_path)
cl.print(table)