DB-GPT/dbgpt/util/dbgpts/loader.py
明天 0bc478b7b5
Feat/note book (#2134)
Co-authored-by: sunshinesmilelk <41573506+sunshinesmilelk@users.noreply.github.com>
Co-authored-by: csunny <cfqsunny@163.com>
2024-11-21 20:33:54 +08:00

508 lines
18 KiB
Python

import inspect
import logging
import os
import sys
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, cast
import schedule
import tomlkit
from dbgpt._private.pydantic import BaseModel, ConfigDict, Field, model_validator
from dbgpt.component import BaseComponent, SystemApp
from dbgpt.core.awel import DAG
from dbgpt.core.awel.flow.flow_factory import FlowPanel
from dbgpt.util.dbgpts.base import (
DBGPTS_METADATA_FILE,
INSTALL_DIR,
INSTALL_METADATA_FILE,
)
logger = logging.getLogger(__name__)
T = TypeVar("T")
class BasePackage(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
name: str = Field(..., description="The name of the package")
label: str = Field(..., description="The label of the package")
package_type: str = Field(..., description="The type of the package")
version: str = Field(..., description="The version of the package")
description: str = Field(..., description="The description of the package")
path: str = Field(..., description="The path of the package")
authors: List[str] = Field(
default_factory=list, description="The authors of the package"
)
definition_type: str = Field(
default="python", description="The type of the package"
)
definition_file: Optional[str] = Field(
default=None, description="The definition " "file of the package"
)
root: str = Field(..., description="The root of the package")
repo: str = Field(..., description="The repository of the package")
package: str = Field(..., description="The package name(like name in pypi)")
@classmethod
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
return cls(**values)
@model_validator(mode="before")
@classmethod
def pre_fill(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Pre-fill the definition_file"""
if not isinstance(values, dict):
return values
import importlib.resources as pkg_resources
name = values.get("name")
root = values.get("root")
if not name:
raise ValueError("The name is required")
if not root:
raise ValueError("The root is required")
if root not in sys.path:
sys.path.append(root)
with pkg_resources.path(name, "__init__.py") as path:
# Read the file
values["path"] = os.path.dirname(os.path.abspath(path))
return values
def abs_definition_file(self) -> str:
return str(Path(self.path) / self.definition_file)
@classmethod
def load_module_class(
cls,
values: Dict[str, Any],
expected_cls: Type[T],
predicates: Optional[List[Callable[..., bool]]] = None,
) -> Tuple[List[Type[T]], List[Any], List[Any]]:
import importlib.resources as pkg_resources
from dbgpt.core.awel.dag.loader import _load_modules_from_file
name = values.get("name")
root = values.get("root")
if not name:
raise ValueError("The name is required")
if not root:
raise ValueError("The root is required")
if root not in sys.path:
sys.path.append(root)
try:
with pkg_resources.path(name, "__init__.py") as path:
mods = _load_modules_from_file(str(path), name, show_log=False)
all_cls = [_get_classes_from_module(m) for m in mods]
all_predicate_results = []
for m in mods:
all_predicate_results.extend(_get_from_module(m, predicates))
module_cls = []
for list_cls in all_cls:
for c in list_cls:
if issubclass(c, expected_cls):
module_cls.append(c)
return module_cls, all_predicate_results, mods
except Exception as e:
logger.warning(f"load_module_class error!{str(e)}", e)
raise e
class FlowPackage(BasePackage):
package_type: str = "flow"
@classmethod
def build_from(
cls, values: Dict[str, Any], ext_dict: Dict[str, Any]
) -> "FlowPackage":
if values["definition_type"] == "json":
return FlowJsonPackage.build_from(values, ext_dict)
return FlowPythonPackage.build_from(values, ext_dict)
class FlowPythonPackage(FlowPackage):
dag: DAG = Field(..., description="The DAG of the package")
@classmethod
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
from dbgpt.core.awel.dag.loader import _process_modules
_, _, mods = cls.load_module_class(values, DAG)
dags = _process_modules(mods, show_log=False)
if not dags:
raise ValueError("No DAGs found in the package")
if len(dags) > 1:
raise ValueError("Only support one DAG in the package")
values["dag"] = dags[0]
return cls(**values)
class FlowJsonPackage(FlowPackage):
@classmethod
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
if "json_config" not in ext_dict:
raise ValueError("The json_config is required")
if "file_path" not in ext_dict["json_config"]:
raise ValueError("The file_path is required")
values["definition_file"] = ext_dict["json_config"]["file_path"]
return cls(**values)
def read_definition_json(self) -> Dict[str, Any]:
import json
with open(self.abs_definition_file(), "r", encoding="utf-8") as f:
return json.loads(f.read())
class OperatorPackage(BasePackage):
package_type: str = "operator"
operators: List[type] = Field(
default_factory=list, description="The operators of the package"
)
@classmethod
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
from dbgpt.core.awel import BaseOperator
values["operators"], _, _ = cls.load_module_class(values, BaseOperator)
return cls(**values)
class AgentPackage(BasePackage):
package_type: str = "agent"
agents: List[type] = Field(
default_factory=list, description="The agents of the package"
)
@classmethod
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
from dbgpt.agent import ConversableAgent
values["agents"], _, _ = cls.load_module_class(values, ConversableAgent)
return cls(**values)
class ResourcePackage(BasePackage):
package_type: str = "resource"
resources: List[type] = Field(
default_factory=list, description="The resources of the package"
)
resource_instances: List[Any] = Field(
default_factory=list, description="The resource instances of the package"
)
@classmethod
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
from dbgpt.agent.resource import Resource
from dbgpt.agent.resource.tool.pack import _is_function_tool
def _predicate(obj):
if not obj:
return False
elif _is_function_tool(obj):
return True
elif isinstance(obj, Resource):
return True
elif isinstance(obj, type) and issubclass(obj, Resource):
return True
else:
return False
_, predicted_cls, _ = cls.load_module_class(values, Resource, [_predicate])
resource_instances = []
resources = []
for o in predicted_cls:
if _is_function_tool(o) or isinstance(o, Resource):
resource_instances.append(o)
elif isinstance(o, type) and issubclass(o, Resource):
resources.append(o)
values["resource_instances"] = resource_instances
values["resources"] = resources
return cls(**values)
class InstalledPackage(BaseModel):
name: str = Field(..., description="The name of the package")
repo: str = Field(..., description="The repository of the package")
root: str = Field(..., description="The root of the package")
package: str = Field(..., description="The package name(like name in pypi)")
def _get_classes_from_module(module):
classes = [
obj
for name, obj in inspect.getmembers(module, inspect.isclass)
if obj.__module__ == module.__name__
]
return classes
def _get_from_module(module, predicates: Optional[List[str]] = None):
if not predicates:
return []
results = []
for predicate in predicates:
for name, obj in inspect.getmembers(module, predicate):
if obj.__module__ == module.__name__:
results.append(obj)
return results
def parse_package_metadata(package: InstalledPackage) -> BasePackage:
with open(
Path(package.root) / DBGPTS_METADATA_FILE, mode="r+", encoding="utf-8"
) as f:
metadata = tomlkit.loads(f.read())
ext_metadata = {}
pkg_dict = {}
for key, value in metadata.items():
if key == "flow":
pkg_dict = {k: v for k, v in value.items()}
pkg_dict["package_type"] = "flow"
elif key == "operator":
pkg_dict = {k: v for k, v in value.items()}
pkg_dict["package_type"] = "operator"
elif key == "agent":
pkg_dict = {k: v for k, v in value.items()}
pkg_dict["package_type"] = "agent"
elif key == "resource":
pkg_dict = {k: v for k, v in value.items()}
pkg_dict["package_type"] = "resource"
else:
ext_metadata[key] = value
pkg_dict["root"] = package.root
pkg_dict["repo"] = package.repo
pkg_dict["package"] = package.package
if pkg_dict["package_type"] == "flow":
return FlowPackage.build_from(pkg_dict, ext_metadata)
elif pkg_dict["package_type"] == "operator":
return OperatorPackage.build_from(pkg_dict, ext_metadata)
elif pkg_dict["package_type"] == "agent":
return AgentPackage.build_from(pkg_dict, ext_metadata)
elif pkg_dict["package_type"] == "resource":
return ResourcePackage.build_from(pkg_dict, ext_metadata)
else:
raise ValueError(
f"Unsupported package package_type: {pkg_dict['package_type']}"
)
def _load_installed_package(path: str) -> List[InstalledPackage]:
packages = []
for package in os.listdir(path):
full_path = Path(path) / package
install_metadata_file = full_path / INSTALL_METADATA_FILE
dbgpts_metadata_file = full_path / DBGPTS_METADATA_FILE
if (
full_path.is_dir()
and install_metadata_file.exists()
and dbgpts_metadata_file.exists()
):
with open(install_metadata_file) as f:
metadata = tomlkit.loads(f.read())
name = metadata["name"]
repo = metadata["repo"]
packages.append(
InstalledPackage(
name=name, repo=repo, root=str(full_path), package=package
)
)
return packages
def _load_package_from_path(path: str):
"""Load the package from the specified path"""
packages = _load_installed_package(path)
parsed_packages = []
for package in packages:
try:
parsed_packages.append(parse_package_metadata(package))
except Exception as e:
logger.warning(f"Load package failed!{str(e)}", e)
return parsed_packages
def _load_flow_package_from_path(
name: str, path: str = INSTALL_DIR, filter_by_name: bool = True
) -> FlowPackage:
raw_packages = _load_installed_package(path)
new_name = name.replace("_", "-")
if filter_by_name:
packages = [p for p in raw_packages if p.package == name or p.name == name]
if not packages:
packages = [
p for p in raw_packages if p.package == new_name or p.name == new_name
]
else:
packages = raw_packages
if not packages:
raise ValueError(f"Can't find the package {name} or {new_name}")
flow_package = parse_package_metadata(packages[0])
if flow_package.package_type != "flow":
raise ValueError(f"Unsupported package type: {flow_package.package_type}")
return cast(FlowPackage, flow_package)
def _load_flow_package_from_zip_path(zip_path: str) -> FlowPanel:
import tempfile
import zipfile
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(temp_dir)
package_names = os.listdir(temp_dir)
if not package_names:
raise ValueError("No package found in the zip file")
if len(package_names) > 1:
raise ValueError("Only support one package in the zip file")
package_name = package_names[0]
with open(
Path(temp_dir) / package_name / INSTALL_METADATA_FILE, mode="w+"
) as f:
# Write the metadata
import tomlkit
install_metadata = {
"name": package_name,
"repo": "local/dbgpts",
}
tomlkit.dump(install_metadata, f)
package = _load_flow_package_from_path("", path=temp_dir, filter_by_name=False)
return _flow_package_to_flow_panel(package)
def _flow_package_to_flow_panel(package: FlowPackage) -> FlowPanel:
dict_value = {
"name": package.name,
"label": package.label,
"version": package.version,
"editable": False,
"description": package.description,
"source": package.repo,
"define_type": "json",
"authors": package.authors,
}
if isinstance(package, FlowJsonPackage):
dict_value["flow_data"] = package.read_definition_json()
elif isinstance(package, FlowPythonPackage):
dict_value["flow_data"] = {
"nodes": [],
"edges": [],
"viewport": {
"x": 213,
"y": 269,
"zoom": 0,
},
}
dict_value["flow_dag"] = package.dag
dict_value["define_type"] = "python"
else:
raise ValueError(f"Unsupported package type: {package}")
return FlowPanel(**dict_value)
class DBGPTsLoader(BaseComponent):
"""The loader of the dbgpts packages"""
name: str = "dbgpt_dbgpts_loader"
def __init__(
self,
system_app: Optional[SystemApp] = None,
install_dir: Optional[str] = None,
load_dbgpts_interval: int = 10,
):
"""Initialize the DBGPTsLoader."""
self._system_app = None
self._install_dir = install_dir or INSTALL_DIR
self._packages: Dict[str, BasePackage] = {}
self._load_dbgpts_interval = load_dbgpts_interval
super().__init__(system_app)
def init_app(self, system_app: SystemApp):
"""Initialize the DBGPTsLoader."""
self._system_app = system_app
def before_start(self):
"""Execute after the application starts."""
self.load_package(is_first=True)
schedule.every(self._load_dbgpts_interval).seconds.do(self.load_package)
def load_package(self, is_first: bool = False) -> None:
"""Load the package by name."""
try:
packages = _load_package_from_path(self._install_dir)
if is_first:
logger.info(
f"Found {len(packages)} dbgpts packages from {self._install_dir}"
)
for package in packages:
self._packages[package.name] = package
self._register_packages(package)
except Exception as e:
logger.warning(f"Load dbgpts package error: {e}", e)
def get_flow_package(self, flow_name: str) -> Optional[BasePackage]:
try:
packages = _load_package_from_path(self._install_dir)
for package in packages:
if package.name == flow_name:
return package
return None
except Exception as e:
logger.warning(f"Get flow package error: {str(e)}", e)
return None
def get_flows(self) -> List[FlowPanel]:
"""Get the flows.
Returns:
List[FlowPanel]: The list of the flows
"""
panels = []
for package in self._packages.values():
if package.package_type != "flow":
continue
package = cast(FlowPackage, package)
flow_panel = _flow_package_to_flow_panel(package)
panels.append(flow_panel)
return panels
def _register_packages(self, package: BasePackage):
if package.package_type == "agent":
from dbgpt.agent import ConversableAgent, get_agent_manager
agent_manager = get_agent_manager(self._system_app)
pkg = cast(AgentPackage, package)
for agent_cls in pkg.agents:
if issubclass(agent_cls, ConversableAgent):
try:
agent_manager.register_agent(agent_cls, ignore_duplicate=True)
except ValueError as e:
logger.warning(f"Register agent {agent_cls} error: {e}")
elif package.package_type == "resource":
from dbgpt.agent.resource import Resource
from dbgpt.agent.resource.manage import get_resource_manager
pkg = cast(ResourcePackage, package)
rm = get_resource_manager(self._system_app)
for inst in pkg.resource_instances:
try:
rm.register_resource(resource_instance=inst, ignore_duplicate=True)
except ValueError as e:
logger.warning(f"Register resource {inst} error: {e}")
for res in pkg.resources:
try:
if issubclass(res, Resource):
rm.register_resource(res, ignore_duplicate=True)
except ValueError as e:
logger.warning(f"Register resource {res} error: {e}")