mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-24 20:47:46 +00:00
Co-authored-by: hzh97 <2976151305@qq.com> Co-authored-by: Fangyin Cheng <staneyffer@gmail.com> Co-authored-by: licunxing <864255598@qq.com>
186 lines
6.0 KiB
Python
186 lines
6.0 KiB
Python
import logging
|
|
from typing import List, Optional
|
|
|
|
from fastapi import HTTPException
|
|
|
|
from dbgpt._private.config import Config
|
|
from dbgpt._private.pydantic import model_to_dict
|
|
from dbgpt.component import ComponentType, SystemApp
|
|
from dbgpt.core.awel.dag.dag_manager import DAGManager
|
|
from dbgpt.datasource.db_conn_info import DBConfig
|
|
from dbgpt.datasource.manages.connect_config_db import (
|
|
ConnectConfigDao,
|
|
ConnectConfigEntity,
|
|
)
|
|
from dbgpt.serve.core import BaseService
|
|
from dbgpt.serve.rag.connector import VectorStoreConnector
|
|
from dbgpt.storage.metadata import BaseDao
|
|
from dbgpt.storage.schema import DBType
|
|
from dbgpt.storage.vector_store.base import VectorStoreConfig
|
|
from dbgpt.util.executor_utils import ExecutorFactory
|
|
|
|
from ..api.schemas import DatasourceServeRequest, DatasourceServeResponse
|
|
from ..config import SERVE_CONFIG_KEY_PREFIX, SERVE_SERVICE_COMPONENT_NAME, ServeConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
CFG = Config()
|
|
|
|
|
|
class Service(
|
|
BaseService[ConnectConfigEntity, DatasourceServeRequest, DatasourceServeResponse]
|
|
):
|
|
"""The service class for Flow"""
|
|
|
|
name = SERVE_SERVICE_COMPONENT_NAME
|
|
|
|
def __init__(
|
|
self,
|
|
system_app: SystemApp,
|
|
dao: Optional[ConnectConfigDao] = None,
|
|
):
|
|
self._system_app = None
|
|
self._dao: ConnectConfigDao = dao
|
|
self._dag_manager: Optional[DAGManager] = None
|
|
self._db_summary_client = None
|
|
self._vector_connector = None
|
|
|
|
super().__init__(system_app)
|
|
|
|
def init_app(self, system_app: SystemApp) -> None:
|
|
"""Initialize the service
|
|
|
|
Args:
|
|
system_app (SystemApp): The system app
|
|
"""
|
|
super().init_app(system_app)
|
|
|
|
self._serve_config = ServeConfig.from_app_config(
|
|
system_app.config, SERVE_CONFIG_KEY_PREFIX
|
|
)
|
|
self._dao = self._dao or ConnectConfigDao()
|
|
self._system_app = system_app
|
|
|
|
def before_start(self):
|
|
"""Execute before the application starts"""
|
|
from dbgpt.rag.summary.db_summary_client import DBSummaryClient
|
|
|
|
super().before_start()
|
|
self._db_summary_client = DBSummaryClient(self._system_app)
|
|
|
|
def after_start(self):
|
|
"""Execute after the application starts"""
|
|
|
|
@property
|
|
def dao(
|
|
self,
|
|
) -> BaseDao[ConnectConfigEntity, DatasourceServeRequest, DatasourceServeResponse]:
|
|
"""Returns the internal DAO."""
|
|
return self._dao
|
|
|
|
@property
|
|
def config(self) -> ServeConfig:
|
|
"""Returns the internal ServeConfig."""
|
|
return self._serve_config
|
|
|
|
def create(self, request: DatasourceServeRequest) -> DatasourceServeResponse:
|
|
"""Create a new Datasource entity
|
|
|
|
Args:
|
|
request (DatasourceServeRequest): The request
|
|
|
|
Returns:
|
|
DatasourceServeResponse: The response
|
|
"""
|
|
datasource = self._dao.get_by_names(request.db_name)
|
|
if datasource:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"datasource name:{request.db_name} already exists",
|
|
)
|
|
try:
|
|
db_type = DBType.of_db_type(request.db_type)
|
|
if not db_type:
|
|
raise HTTPException(
|
|
status_code=400, detail=f"Unsupported Db Type, {request.db_type}"
|
|
)
|
|
res = self._dao.create(request)
|
|
|
|
# async embedding
|
|
executor = self._system_app.get_component(
|
|
ComponentType.EXECUTOR_DEFAULT, ExecutorFactory
|
|
).create() # type: ignore
|
|
executor.submit(
|
|
self._db_summary_client.db_summary_embedding,
|
|
request.db_name,
|
|
request.db_type,
|
|
)
|
|
except Exception as e:
|
|
raise ValueError("Add db connect info error!" + str(e))
|
|
return res
|
|
|
|
def update(self, request: DatasourceServeRequest) -> DatasourceServeResponse:
|
|
"""Create a new Datasource entity
|
|
|
|
Args:
|
|
request (DatasourceServeRequest): The request
|
|
|
|
Returns:
|
|
DatasourceServeResponse: The response
|
|
"""
|
|
datasources = self._dao.get_by_names(request.db_name)
|
|
if datasources is None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"there is no datasource name:{request.db_name} exists",
|
|
)
|
|
db_config = DBConfig(**model_to_dict(request))
|
|
if CFG.local_db_manager.edit_db(db_config):
|
|
return DatasourceServeResponse(**model_to_dict(db_config))
|
|
else:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"update datasource name:{request.db_name} failed",
|
|
)
|
|
|
|
def get(self, datasource_id: str) -> Optional[DatasourceServeResponse]:
|
|
"""Get a Flow entity
|
|
|
|
Args:
|
|
request (DatasourceServeRequest): The request
|
|
|
|
Returns:
|
|
DatasourceServeResponse: The response
|
|
"""
|
|
return self._dao.get_one({"id": datasource_id})
|
|
|
|
def delete(self, datasource_id: str) -> Optional[DatasourceServeResponse]:
|
|
"""Delete a Flow entity
|
|
|
|
Args:
|
|
datasource_id (str): The datasource_id
|
|
|
|
Returns:
|
|
DatasourceServeResponse: The data after deletion
|
|
"""
|
|
db_config = self._dao.get_one({"id": datasource_id})
|
|
vector_name = db_config.db_name + "_profile"
|
|
vector_store_config = VectorStoreConfig(name=vector_name)
|
|
self._vector_connector = VectorStoreConnector(
|
|
vector_store_type=CFG.VECTOR_STORE_TYPE,
|
|
vector_store_config=vector_store_config,
|
|
)
|
|
self._vector_connector.delete_vector_name(vector_name)
|
|
if db_config:
|
|
self._dao.delete({"id": datasource_id})
|
|
return db_config
|
|
|
|
def list(self) -> List[DatasourceServeResponse]:
|
|
"""List the Flow entities.
|
|
|
|
Returns:
|
|
List[DatasourceServeResponse]: The list of responses
|
|
"""
|
|
|
|
db_list = CFG.local_db_manager.get_db_list()
|
|
return [DatasourceServeResponse(**db) for db in db_list]
|