mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-08-02 08:40:36 +00:00
Co-authored-by: sunshinesmilelk <41573506+sunshinesmilelk@users.noreply.github.com> Co-authored-by: csunny <cfqsunny@163.com>
508 lines
18 KiB
Python
508 lines
18 KiB
Python
import inspect
|
|
import logging
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, cast
|
|
|
|
import schedule
|
|
import tomlkit
|
|
|
|
from dbgpt._private.pydantic import BaseModel, ConfigDict, Field, model_validator
|
|
from dbgpt.component import BaseComponent, SystemApp
|
|
from dbgpt.core.awel import DAG
|
|
from dbgpt.core.awel.flow.flow_factory import FlowPanel
|
|
from dbgpt.util.dbgpts.base import (
|
|
DBGPTS_METADATA_FILE,
|
|
INSTALL_DIR,
|
|
INSTALL_METADATA_FILE,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
T = TypeVar("T")
|
|
|
|
|
|
class BasePackage(BaseModel):
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
|
|
name: str = Field(..., description="The name of the package")
|
|
label: str = Field(..., description="The label of the package")
|
|
package_type: str = Field(..., description="The type of the package")
|
|
version: str = Field(..., description="The version of the package")
|
|
description: str = Field(..., description="The description of the package")
|
|
path: str = Field(..., description="The path of the package")
|
|
authors: List[str] = Field(
|
|
default_factory=list, description="The authors of the package"
|
|
)
|
|
definition_type: str = Field(
|
|
default="python", description="The type of the package"
|
|
)
|
|
definition_file: Optional[str] = Field(
|
|
default=None, description="The definition " "file of the package"
|
|
)
|
|
root: str = Field(..., description="The root of the package")
|
|
repo: str = Field(..., description="The repository of the package")
|
|
package: str = Field(..., description="The package name(like name in pypi)")
|
|
|
|
@classmethod
|
|
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
|
|
return cls(**values)
|
|
|
|
@model_validator(mode="before")
|
|
@classmethod
|
|
def pre_fill(cls, values: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Pre-fill the definition_file"""
|
|
if not isinstance(values, dict):
|
|
return values
|
|
import importlib.resources as pkg_resources
|
|
|
|
name = values.get("name")
|
|
root = values.get("root")
|
|
if not name:
|
|
raise ValueError("The name is required")
|
|
if not root:
|
|
raise ValueError("The root is required")
|
|
if root not in sys.path:
|
|
sys.path.append(root)
|
|
with pkg_resources.path(name, "__init__.py") as path:
|
|
# Read the file
|
|
values["path"] = os.path.dirname(os.path.abspath(path))
|
|
return values
|
|
|
|
def abs_definition_file(self) -> str:
|
|
return str(Path(self.path) / self.definition_file)
|
|
|
|
@classmethod
|
|
def load_module_class(
|
|
cls,
|
|
values: Dict[str, Any],
|
|
expected_cls: Type[T],
|
|
predicates: Optional[List[Callable[..., bool]]] = None,
|
|
) -> Tuple[List[Type[T]], List[Any], List[Any]]:
|
|
import importlib.resources as pkg_resources
|
|
|
|
from dbgpt.core.awel.dag.loader import _load_modules_from_file
|
|
|
|
name = values.get("name")
|
|
root = values.get("root")
|
|
if not name:
|
|
raise ValueError("The name is required")
|
|
if not root:
|
|
raise ValueError("The root is required")
|
|
if root not in sys.path:
|
|
sys.path.append(root)
|
|
try:
|
|
with pkg_resources.path(name, "__init__.py") as path:
|
|
mods = _load_modules_from_file(str(path), name, show_log=False)
|
|
all_cls = [_get_classes_from_module(m) for m in mods]
|
|
all_predicate_results = []
|
|
for m in mods:
|
|
all_predicate_results.extend(_get_from_module(m, predicates))
|
|
module_cls = []
|
|
for list_cls in all_cls:
|
|
for c in list_cls:
|
|
if issubclass(c, expected_cls):
|
|
module_cls.append(c)
|
|
return module_cls, all_predicate_results, mods
|
|
except Exception as e:
|
|
logger.warning(f"load_module_class error!{str(e)}", e)
|
|
raise e
|
|
|
|
|
|
class FlowPackage(BasePackage):
|
|
package_type: str = "flow"
|
|
|
|
@classmethod
|
|
def build_from(
|
|
cls, values: Dict[str, Any], ext_dict: Dict[str, Any]
|
|
) -> "FlowPackage":
|
|
if values["definition_type"] == "json":
|
|
return FlowJsonPackage.build_from(values, ext_dict)
|
|
return FlowPythonPackage.build_from(values, ext_dict)
|
|
|
|
|
|
class FlowPythonPackage(FlowPackage):
|
|
dag: DAG = Field(..., description="The DAG of the package")
|
|
|
|
@classmethod
|
|
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
|
|
from dbgpt.core.awel.dag.loader import _process_modules
|
|
|
|
_, _, mods = cls.load_module_class(values, DAG)
|
|
|
|
dags = _process_modules(mods, show_log=False)
|
|
if not dags:
|
|
raise ValueError("No DAGs found in the package")
|
|
if len(dags) > 1:
|
|
raise ValueError("Only support one DAG in the package")
|
|
values["dag"] = dags[0]
|
|
return cls(**values)
|
|
|
|
|
|
class FlowJsonPackage(FlowPackage):
|
|
@classmethod
|
|
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
|
|
if "json_config" not in ext_dict:
|
|
raise ValueError("The json_config is required")
|
|
if "file_path" not in ext_dict["json_config"]:
|
|
raise ValueError("The file_path is required")
|
|
values["definition_file"] = ext_dict["json_config"]["file_path"]
|
|
return cls(**values)
|
|
|
|
def read_definition_json(self) -> Dict[str, Any]:
|
|
import json
|
|
|
|
with open(self.abs_definition_file(), "r", encoding="utf-8") as f:
|
|
return json.loads(f.read())
|
|
|
|
|
|
class OperatorPackage(BasePackage):
|
|
package_type: str = "operator"
|
|
|
|
operators: List[type] = Field(
|
|
default_factory=list, description="The operators of the package"
|
|
)
|
|
|
|
@classmethod
|
|
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
|
|
from dbgpt.core.awel import BaseOperator
|
|
|
|
values["operators"], _, _ = cls.load_module_class(values, BaseOperator)
|
|
return cls(**values)
|
|
|
|
|
|
class AgentPackage(BasePackage):
|
|
package_type: str = "agent"
|
|
|
|
agents: List[type] = Field(
|
|
default_factory=list, description="The agents of the package"
|
|
)
|
|
|
|
@classmethod
|
|
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
|
|
from dbgpt.agent import ConversableAgent
|
|
|
|
values["agents"], _, _ = cls.load_module_class(values, ConversableAgent)
|
|
return cls(**values)
|
|
|
|
|
|
class ResourcePackage(BasePackage):
|
|
package_type: str = "resource"
|
|
|
|
resources: List[type] = Field(
|
|
default_factory=list, description="The resources of the package"
|
|
)
|
|
resource_instances: List[Any] = Field(
|
|
default_factory=list, description="The resource instances of the package"
|
|
)
|
|
|
|
@classmethod
|
|
def build_from(cls, values: Dict[str, Any], ext_dict: Dict[str, Any]):
|
|
from dbgpt.agent.resource import Resource
|
|
from dbgpt.agent.resource.tool.pack import _is_function_tool
|
|
|
|
def _predicate(obj):
|
|
if not obj:
|
|
return False
|
|
elif _is_function_tool(obj):
|
|
return True
|
|
elif isinstance(obj, Resource):
|
|
return True
|
|
elif isinstance(obj, type) and issubclass(obj, Resource):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
_, predicted_cls, _ = cls.load_module_class(values, Resource, [_predicate])
|
|
resource_instances = []
|
|
resources = []
|
|
for o in predicted_cls:
|
|
if _is_function_tool(o) or isinstance(o, Resource):
|
|
resource_instances.append(o)
|
|
elif isinstance(o, type) and issubclass(o, Resource):
|
|
resources.append(o)
|
|
values["resource_instances"] = resource_instances
|
|
values["resources"] = resources
|
|
return cls(**values)
|
|
|
|
|
|
class InstalledPackage(BaseModel):
|
|
name: str = Field(..., description="The name of the package")
|
|
repo: str = Field(..., description="The repository of the package")
|
|
root: str = Field(..., description="The root of the package")
|
|
package: str = Field(..., description="The package name(like name in pypi)")
|
|
|
|
|
|
def _get_classes_from_module(module):
|
|
classes = [
|
|
obj
|
|
for name, obj in inspect.getmembers(module, inspect.isclass)
|
|
if obj.__module__ == module.__name__
|
|
]
|
|
return classes
|
|
|
|
|
|
def _get_from_module(module, predicates: Optional[List[str]] = None):
|
|
if not predicates:
|
|
return []
|
|
results = []
|
|
for predicate in predicates:
|
|
for name, obj in inspect.getmembers(module, predicate):
|
|
if obj.__module__ == module.__name__:
|
|
results.append(obj)
|
|
return results
|
|
|
|
|
|
def parse_package_metadata(package: InstalledPackage) -> BasePackage:
|
|
with open(
|
|
Path(package.root) / DBGPTS_METADATA_FILE, mode="r+", encoding="utf-8"
|
|
) as f:
|
|
metadata = tomlkit.loads(f.read())
|
|
ext_metadata = {}
|
|
pkg_dict = {}
|
|
for key, value in metadata.items():
|
|
if key == "flow":
|
|
pkg_dict = {k: v for k, v in value.items()}
|
|
pkg_dict["package_type"] = "flow"
|
|
elif key == "operator":
|
|
pkg_dict = {k: v for k, v in value.items()}
|
|
pkg_dict["package_type"] = "operator"
|
|
elif key == "agent":
|
|
pkg_dict = {k: v for k, v in value.items()}
|
|
pkg_dict["package_type"] = "agent"
|
|
elif key == "resource":
|
|
pkg_dict = {k: v for k, v in value.items()}
|
|
pkg_dict["package_type"] = "resource"
|
|
else:
|
|
ext_metadata[key] = value
|
|
pkg_dict["root"] = package.root
|
|
pkg_dict["repo"] = package.repo
|
|
pkg_dict["package"] = package.package
|
|
if pkg_dict["package_type"] == "flow":
|
|
return FlowPackage.build_from(pkg_dict, ext_metadata)
|
|
elif pkg_dict["package_type"] == "operator":
|
|
return OperatorPackage.build_from(pkg_dict, ext_metadata)
|
|
elif pkg_dict["package_type"] == "agent":
|
|
return AgentPackage.build_from(pkg_dict, ext_metadata)
|
|
elif pkg_dict["package_type"] == "resource":
|
|
return ResourcePackage.build_from(pkg_dict, ext_metadata)
|
|
else:
|
|
raise ValueError(
|
|
f"Unsupported package package_type: {pkg_dict['package_type']}"
|
|
)
|
|
|
|
|
|
def _load_installed_package(path: str) -> List[InstalledPackage]:
|
|
packages = []
|
|
for package in os.listdir(path):
|
|
full_path = Path(path) / package
|
|
install_metadata_file = full_path / INSTALL_METADATA_FILE
|
|
dbgpts_metadata_file = full_path / DBGPTS_METADATA_FILE
|
|
if (
|
|
full_path.is_dir()
|
|
and install_metadata_file.exists()
|
|
and dbgpts_metadata_file.exists()
|
|
):
|
|
with open(install_metadata_file) as f:
|
|
metadata = tomlkit.loads(f.read())
|
|
name = metadata["name"]
|
|
repo = metadata["repo"]
|
|
packages.append(
|
|
InstalledPackage(
|
|
name=name, repo=repo, root=str(full_path), package=package
|
|
)
|
|
)
|
|
return packages
|
|
|
|
|
|
def _load_package_from_path(path: str):
|
|
"""Load the package from the specified path"""
|
|
packages = _load_installed_package(path)
|
|
parsed_packages = []
|
|
for package in packages:
|
|
try:
|
|
parsed_packages.append(parse_package_metadata(package))
|
|
except Exception as e:
|
|
logger.warning(f"Load package failed!{str(e)}", e)
|
|
|
|
return parsed_packages
|
|
|
|
|
|
def _load_flow_package_from_path(
|
|
name: str, path: str = INSTALL_DIR, filter_by_name: bool = True
|
|
) -> FlowPackage:
|
|
raw_packages = _load_installed_package(path)
|
|
new_name = name.replace("_", "-")
|
|
if filter_by_name:
|
|
packages = [p for p in raw_packages if p.package == name or p.name == name]
|
|
if not packages:
|
|
packages = [
|
|
p for p in raw_packages if p.package == new_name or p.name == new_name
|
|
]
|
|
else:
|
|
packages = raw_packages
|
|
if not packages:
|
|
raise ValueError(f"Can't find the package {name} or {new_name}")
|
|
flow_package = parse_package_metadata(packages[0])
|
|
if flow_package.package_type != "flow":
|
|
raise ValueError(f"Unsupported package type: {flow_package.package_type}")
|
|
return cast(FlowPackage, flow_package)
|
|
|
|
|
|
def _load_flow_package_from_zip_path(zip_path: str) -> FlowPanel:
|
|
import tempfile
|
|
import zipfile
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
|
zip_ref.extractall(temp_dir)
|
|
package_names = os.listdir(temp_dir)
|
|
if not package_names:
|
|
raise ValueError("No package found in the zip file")
|
|
if len(package_names) > 1:
|
|
raise ValueError("Only support one package in the zip file")
|
|
package_name = package_names[0]
|
|
with open(
|
|
Path(temp_dir) / package_name / INSTALL_METADATA_FILE, mode="w+"
|
|
) as f:
|
|
# Write the metadata
|
|
import tomlkit
|
|
|
|
install_metadata = {
|
|
"name": package_name,
|
|
"repo": "local/dbgpts",
|
|
}
|
|
tomlkit.dump(install_metadata, f)
|
|
|
|
package = _load_flow_package_from_path("", path=temp_dir, filter_by_name=False)
|
|
return _flow_package_to_flow_panel(package)
|
|
|
|
|
|
def _flow_package_to_flow_panel(package: FlowPackage) -> FlowPanel:
|
|
dict_value = {
|
|
"name": package.name,
|
|
"label": package.label,
|
|
"version": package.version,
|
|
"editable": False,
|
|
"description": package.description,
|
|
"source": package.repo,
|
|
"define_type": "json",
|
|
"authors": package.authors,
|
|
}
|
|
if isinstance(package, FlowJsonPackage):
|
|
dict_value["flow_data"] = package.read_definition_json()
|
|
elif isinstance(package, FlowPythonPackage):
|
|
dict_value["flow_data"] = {
|
|
"nodes": [],
|
|
"edges": [],
|
|
"viewport": {
|
|
"x": 213,
|
|
"y": 269,
|
|
"zoom": 0,
|
|
},
|
|
}
|
|
dict_value["flow_dag"] = package.dag
|
|
dict_value["define_type"] = "python"
|
|
else:
|
|
raise ValueError(f"Unsupported package type: {package}")
|
|
return FlowPanel(**dict_value)
|
|
|
|
|
|
class DBGPTsLoader(BaseComponent):
|
|
"""The loader of the dbgpts packages"""
|
|
|
|
name: str = "dbgpt_dbgpts_loader"
|
|
|
|
def __init__(
|
|
self,
|
|
system_app: Optional[SystemApp] = None,
|
|
install_dir: Optional[str] = None,
|
|
load_dbgpts_interval: int = 10,
|
|
):
|
|
"""Initialize the DBGPTsLoader."""
|
|
self._system_app = None
|
|
self._install_dir = install_dir or INSTALL_DIR
|
|
self._packages: Dict[str, BasePackage] = {}
|
|
self._load_dbgpts_interval = load_dbgpts_interval
|
|
super().__init__(system_app)
|
|
|
|
def init_app(self, system_app: SystemApp):
|
|
"""Initialize the DBGPTsLoader."""
|
|
self._system_app = system_app
|
|
|
|
def before_start(self):
|
|
"""Execute after the application starts."""
|
|
self.load_package(is_first=True)
|
|
|
|
schedule.every(self._load_dbgpts_interval).seconds.do(self.load_package)
|
|
|
|
def load_package(self, is_first: bool = False) -> None:
|
|
"""Load the package by name."""
|
|
try:
|
|
packages = _load_package_from_path(self._install_dir)
|
|
if is_first:
|
|
logger.info(
|
|
f"Found {len(packages)} dbgpts packages from {self._install_dir}"
|
|
)
|
|
for package in packages:
|
|
self._packages[package.name] = package
|
|
self._register_packages(package)
|
|
except Exception as e:
|
|
logger.warning(f"Load dbgpts package error: {e}", e)
|
|
|
|
def get_flow_package(self, flow_name: str) -> Optional[BasePackage]:
|
|
try:
|
|
packages = _load_package_from_path(self._install_dir)
|
|
for package in packages:
|
|
if package.name == flow_name:
|
|
return package
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Get flow package error: {str(e)}", e)
|
|
return None
|
|
|
|
def get_flows(self) -> List[FlowPanel]:
|
|
"""Get the flows.
|
|
|
|
Returns:
|
|
List[FlowPanel]: The list of the flows
|
|
"""
|
|
panels = []
|
|
for package in self._packages.values():
|
|
if package.package_type != "flow":
|
|
continue
|
|
package = cast(FlowPackage, package)
|
|
flow_panel = _flow_package_to_flow_panel(package)
|
|
panels.append(flow_panel)
|
|
return panels
|
|
|
|
def _register_packages(self, package: BasePackage):
|
|
if package.package_type == "agent":
|
|
from dbgpt.agent import ConversableAgent, get_agent_manager
|
|
|
|
agent_manager = get_agent_manager(self._system_app)
|
|
pkg = cast(AgentPackage, package)
|
|
for agent_cls in pkg.agents:
|
|
if issubclass(agent_cls, ConversableAgent):
|
|
try:
|
|
agent_manager.register_agent(agent_cls, ignore_duplicate=True)
|
|
except ValueError as e:
|
|
logger.warning(f"Register agent {agent_cls} error: {e}")
|
|
elif package.package_type == "resource":
|
|
from dbgpt.agent.resource import Resource
|
|
from dbgpt.agent.resource.manage import get_resource_manager
|
|
|
|
pkg = cast(ResourcePackage, package)
|
|
rm = get_resource_manager(self._system_app)
|
|
for inst in pkg.resource_instances:
|
|
try:
|
|
rm.register_resource(resource_instance=inst, ignore_duplicate=True)
|
|
except ValueError as e:
|
|
logger.warning(f"Register resource {inst} error: {e}")
|
|
for res in pkg.resources:
|
|
try:
|
|
if issubclass(res, Resource):
|
|
rm.register_resource(res, ignore_duplicate=True)
|
|
except ValueError as e:
|
|
logger.warning(f"Register resource {res} error: {e}")
|