DB-GPT/dbgpt/serve/dbgpts/hub/service/service.py
明天 d7a893e1a7
feat: new dbgpts modules (#1910)
Co-authored-by: 途杨 <tuyang.yhj@antgroup.com>
Co-authored-by: lhwan <1017484907@qq.com>
2024-08-28 21:31:42 +08:00

215 lines
7.3 KiB
Python

import logging
import re
from typing import List, Optional, Tuple
from dbgpt.agent import PluginStorageType
from dbgpt.component import BaseComponent, SystemApp
from dbgpt.serve.core import BaseService
from dbgpt.storage.metadata import BaseDao
from dbgpt.util.dbgpts.repo import _install_default_repos_if_no_repos, list_dbgpts
from dbgpt.util.pagination_utils import PaginationResult
from ..api.schemas import ServeRequest, ServerResponse
from ..config import SERVE_CONFIG_KEY_PREFIX, SERVE_SERVICE_COMPONENT_NAME, ServeConfig
from ..models.models import ServeDao, ServeEntity
logger = logging.getLogger(__name__)
class Service(BaseService[ServeEntity, ServeRequest, ServerResponse]):
"""The service class for DbgptsHub"""
name = SERVE_SERVICE_COMPONENT_NAME
def __init__(self, system_app: SystemApp, dao: Optional[ServeDao] = None):
self._system_app = None
self._serve_config: ServeConfig = None
self._dao: ServeDao = dao
super().__init__(system_app)
def init_app(self, system_app: SystemApp) -> None:
"""Initialize the service
Args:
system_app (SystemApp): The system app
"""
super().init_app(system_app)
self._serve_config = ServeConfig.from_app_config(
system_app.config, SERVE_CONFIG_KEY_PREFIX
)
self._dao = self._dao or ServeDao(self._serve_config)
self._system_app = system_app
@property
def dao(self) -> BaseDao[ServeEntity, ServeRequest, ServerResponse]:
"""Returns the internal DAO."""
return self._dao
@property
def config(self) -> ServeConfig:
"""Returns the internal ServeConfig."""
return self._serve_config
def update(self, request: ServeRequest) -> ServerResponse:
"""Update a DbgptsHub entity
Args:
request (ServeRequest): The request
Returns:
ServerResponse: The response
"""
# TODO: implement your own logic here
# Build the query request from the request
query_request = {"id": request.id}
return self.dao.update(query_request, update_request=request)
def get(self, request: ServeRequest) -> Optional[ServerResponse]:
"""Get a DbgptsHub entity
Args:
request (ServeRequest): The request
Returns:
ServerResponse: The response
"""
# TODO: implement your own logic here
# Build the query request from the request
query_request = request
return self.dao.get_one(query_request)
def delete(self, request: ServeRequest) -> None:
"""Delete a DbgptsHub entity
Args:
request (ServeRequest): The request
"""
# TODO: implement your own logic here
# Build the query request from the request
query_request = {
# "id": request.id
}
self.dao.delete(query_request)
def get_list(self, request: ServeRequest) -> List[ServerResponse]:
"""Get a list of DbgptsHub entities
Args:
request (ServeRequest): The request
Returns:
List[ServerResponse]: The response
"""
# TODO: implement your own logic here
# Build the query request from the request
query_request = request
return self.dao.get_list(query_request)
def get_list_by_page(
self, request: ServeRequest, page: int, page_size: int
) -> PaginationResult[ServerResponse]:
"""Get a list of DbgptsHub entities by page
Args:
request (ServeRequest): The request
page (int): The page number
page_size (int): The page size
Returns:
List[ServerResponse]: The response
"""
query_request = ServeRequest(
name=request.name,
type=request.type,
version=request.version,
description=request.description,
author=request.author,
storage_channel=request.storage_channel,
storage_url=request.storage_url,
installed=request.installed,
)
return self.dao.get_list_page(query_request, page, page_size)
def refresh_hub_from_git(
self,
github_repo: str = None,
branch_name: str = "main",
authorization: str = None,
):
logger.info("refresh_hub_by_git start!")
_install_default_repos_if_no_repos()
data: List[Tuple[str, str, str, str]] = list_dbgpts()
from dbgpt.util.dbgpts.base import get_repo_path
from dbgpt.util.dbgpts.loader import (
BasePackage,
InstalledPackage,
parse_package_metadata,
)
try:
for repo, package, name, gpts_path in data:
try:
if not name:
logger.info(
f"dbgpts error repo:{repo}, package:{package}, name:{name}, gpts_path:{gpts_path}"
)
continue
old_hub_info = self.get(ServeRequest(name=name, type=package))
base_package: BasePackage = parse_package_metadata(
InstalledPackage(
name=name,
repo=repo,
root=str(gpts_path),
package=package,
)
)
if old_hub_info:
self.dao.update(
query_request=ServeRequest(
name=old_hub_info.name, type=old_hub_info.type
),
update_request=ServeRequest(
version=base_package.version,
description=base_package.description,
),
)
else:
request = ServeRequest()
request.type = package
request.name = name
request.storage_channel = repo
request.storage_url = str(gpts_path)
request.author = self._get_dbgpts_author(base_package.authors)
request.email = self._get_dbgpts_email(base_package.authors)
request.download_param = None
request.installed = 0
request.version = base_package.version
request.description = base_package.description
self.create(request)
except Exception as e:
logger.warning(
f"Load from git failed repo:{repo}, package:{package}, name:{name}, gpts_path:{gpts_path}",
e,
)
except Exception as e:
raise ValueError(f"Update Agent Hub Db Info Faild!{str(e)}")
def _get_dbgpts_author(self, authors):
pattern = r"(.+?)<"
names = []
for item in authors:
names.extend(re.findall(pattern, item))
return ",".join(names)
def _get_dbgpts_email(self, authors):
pattern = r"<(.*?)>"
emails: List[str] = []
for item in authors:
emails.extend(re.findall(pattern, item))
return ",".join(emails)