mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-23 12:21:08 +00:00
289 lines
10 KiB
Python
289 lines
10 KiB
Python
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import threading
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
from dbgpt._private.config import Config
|
|
from dbgpt.component import SystemApp
|
|
from dbgpt.storage import DBType
|
|
from dbgpt.util.parameter_utils import BaseServerParameters
|
|
|
|
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
sys.path.append(ROOT_PATH)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def signal_handler(sig, frame):
|
|
print("in order to avoid chroma db atexit problem")
|
|
os._exit(0)
|
|
|
|
|
|
def async_db_summary(system_app: SystemApp):
|
|
"""async db schema into vector db"""
|
|
from dbgpt.rag.summary.db_summary_client import DBSummaryClient
|
|
|
|
client = DBSummaryClient(system_app=system_app)
|
|
thread = threading.Thread(target=client.init_db_summary)
|
|
thread.start()
|
|
|
|
|
|
def server_init(param: "WebServerParameters", system_app: SystemApp):
|
|
# logger.info(f"args: {args}")
|
|
# init config
|
|
cfg = Config()
|
|
cfg.SYSTEM_APP = system_app
|
|
# Initialize db storage first
|
|
_initialize_db_storage(param, system_app)
|
|
|
|
# load_native_plugins(cfg)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
|
|
def _create_model_start_listener(system_app: SystemApp):
|
|
def startup_event(wh):
|
|
print("begin run _add_app_startup_event")
|
|
async_db_summary(system_app)
|
|
|
|
return startup_event
|
|
|
|
|
|
def _initialize_db_storage(param: "WebServerParameters", system_app: SystemApp):
|
|
"""Initialize the db storage.
|
|
|
|
Now just support sqlite and mysql. If db type is sqlite, the db path is `pilot/meta_data/{db_name}.db`.
|
|
"""
|
|
_initialize_db(
|
|
try_to_create_db=not param.disable_alembic_upgrade, system_app=system_app
|
|
)
|
|
|
|
|
|
def _migration_db_storage(param: "WebServerParameters"):
|
|
"""Migration the db storage."""
|
|
# Import all models to make sure they are registered with SQLAlchemy.
|
|
from dbgpt.app.initialization.db_model_initialization import _MODELS
|
|
from dbgpt.configs.model_config import PILOT_PATH
|
|
|
|
default_meta_data_path = os.path.join(PILOT_PATH, "meta_data")
|
|
if not param.disable_alembic_upgrade:
|
|
from dbgpt.storage.metadata.db_manager import db
|
|
from dbgpt.util._db_migration_utils import _ddl_init_and_upgrade
|
|
|
|
# Try to create all tables, when the dbtype is sqlite, it will auto create and upgrade system schema,
|
|
# Otherwise, you need to execute initialization scripts to create schemas.
|
|
CFG = Config()
|
|
if CFG.LOCAL_DB_TYPE == "sqlite":
|
|
try:
|
|
db.create_all()
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Create all tables stored in this metadata error: {str(e)}"
|
|
)
|
|
|
|
_ddl_init_and_upgrade(default_meta_data_path, param.disable_alembic_upgrade)
|
|
else:
|
|
warn_msg = """For safety considerations, MySQL Database not support DDL init and upgrade. "
|
|
"1.If you are use DB-GPT firstly, please manually execute the following command to initialize,
|
|
`mysql -h127.0.0.1 -uroot -p{your_password} < ./assets/schema/dbgpt.sql` "
|
|
"2.If there are any changes to the table columns in the DB-GPT database,
|
|
it is necessary to compare with the DB-GPT/assets/schema/dbgpt.sql file
|
|
and manually make the columns changes in the MySQL database instance."""
|
|
logger.warning(warn_msg)
|
|
|
|
|
|
def _initialize_db(
|
|
try_to_create_db: Optional[bool] = False, system_app: Optional[SystemApp] = None
|
|
) -> str:
|
|
"""Initialize the database
|
|
|
|
Now just support sqlite and MySQL. If db type is sqlite, the db path is `pilot/meta_data/{db_name}.db`.
|
|
"""
|
|
from urllib.parse import quote
|
|
from urllib.parse import quote_plus as urlquote
|
|
|
|
from dbgpt.configs.model_config import PILOT_PATH
|
|
from dbgpt.datasource.rdbms.dialect.oceanbase.ob_dialect import ( # noqa: F401
|
|
OBDialect,
|
|
)
|
|
from dbgpt.storage.metadata.db_manager import initialize_db
|
|
|
|
CFG = Config()
|
|
db_name = CFG.LOCAL_DB_NAME
|
|
default_meta_data_path = os.path.join(PILOT_PATH, "meta_data")
|
|
os.makedirs(default_meta_data_path, exist_ok=True)
|
|
if CFG.LOCAL_DB_TYPE == DBType.MySQL.value():
|
|
db_url = (
|
|
f"mysql+pymysql://{quote(CFG.LOCAL_DB_USER)}:"
|
|
f"{urlquote(CFG.LOCAL_DB_PASSWORD)}@"
|
|
f"{CFG.LOCAL_DB_HOST}:"
|
|
f"{str(CFG.LOCAL_DB_PORT)}/"
|
|
f"{db_name}?charset=utf8mb4"
|
|
)
|
|
# Try to create database, if failed, will raise exception
|
|
_create_mysql_database(db_name, db_url, try_to_create_db)
|
|
elif CFG.LOCAL_DB_TYPE == DBType.OceanBase.value():
|
|
db_url = (
|
|
f"mysql+ob://{quote(CFG.LOCAL_DB_USER)}:"
|
|
f"{urlquote(CFG.LOCAL_DB_PASSWORD)}@"
|
|
f"{CFG.LOCAL_DB_HOST}:"
|
|
f"{str(CFG.LOCAL_DB_PORT)}/"
|
|
f"{db_name}?charset=utf8mb4"
|
|
)
|
|
_create_mysql_database(db_name, db_url, try_to_create_db)
|
|
else:
|
|
sqlite_db_path = os.path.join(default_meta_data_path, f"{db_name}.db")
|
|
db_url = f"sqlite:///{sqlite_db_path}"
|
|
engine_args = {
|
|
"pool_size": CFG.LOCAL_DB_POOL_SIZE,
|
|
"max_overflow": CFG.LOCAL_DB_POOL_OVERFLOW,
|
|
"pool_timeout": 30,
|
|
"pool_recycle": 3600,
|
|
"pool_pre_ping": True,
|
|
}
|
|
db = initialize_db(db_url, db_name, engine_args)
|
|
if system_app:
|
|
from dbgpt.storage.metadata import UnifiedDBManagerFactory
|
|
|
|
system_app.register(UnifiedDBManagerFactory, db)
|
|
return default_meta_data_path
|
|
|
|
|
|
def _create_mysql_database(db_name: str, db_url: str, try_to_create_db: bool = False):
|
|
"""Create mysql database if not exists
|
|
|
|
Args:
|
|
db_name (str): The database name
|
|
db_url (str): The database url, include host, port, user, password and database name
|
|
try_to_create_db (bool, optional): Whether to try to create database. Defaults to False.
|
|
|
|
Raises:
|
|
Exception: Raise exception if database operation failed
|
|
"""
|
|
from sqlalchemy import DDL, create_engine
|
|
from sqlalchemy.exc import OperationalError, SQLAlchemyError
|
|
|
|
if not try_to_create_db:
|
|
logger.info(f"Skipping creation of database {db_name}")
|
|
return
|
|
engine = create_engine(db_url)
|
|
|
|
try:
|
|
# Try to connect to the database
|
|
with engine.connect() as conn:
|
|
logger.info(f"Database {db_name} already exists")
|
|
return
|
|
except OperationalError as oe:
|
|
# If the error indicates that the database does not exist, try to create it
|
|
if "Unknown database" in str(oe):
|
|
try:
|
|
# Create the database
|
|
no_db_name_url = db_url.rsplit("/", 1)[0]
|
|
engine_no_db = create_engine(no_db_name_url)
|
|
with engine_no_db.connect() as conn:
|
|
conn.execute(
|
|
DDL(
|
|
f"CREATE DATABASE {db_name} CHARACTER SET utf8mb4 COLLATE "
|
|
f"utf8mb4_unicode_ci"
|
|
)
|
|
)
|
|
logger.info(f"Database {db_name} successfully created")
|
|
except SQLAlchemyError as e:
|
|
logger.error(f"Failed to create database {db_name}: {e}")
|
|
raise
|
|
else:
|
|
logger.error(f"Error connecting to database {db_name}: {oe}")
|
|
raise
|
|
|
|
|
|
@dataclass
|
|
class WebServerParameters(BaseServerParameters):
|
|
host: Optional[str] = field(
|
|
default="0.0.0.0", metadata={"help": "Webserver deploy host"}
|
|
)
|
|
port: Optional[int] = field(
|
|
default=None, metadata={"help": "Webserver deploy port"}
|
|
)
|
|
daemon: Optional[bool] = field(
|
|
default=False, metadata={"help": "Run Webserver in background"}
|
|
)
|
|
controller_addr: Optional[str] = field(
|
|
default=None,
|
|
metadata={
|
|
"help": "The Model controller address to connect. If None, read model "
|
|
"controller address from environment key `MODEL_SERVER`."
|
|
},
|
|
)
|
|
model_name: str = field(
|
|
default=None,
|
|
metadata={
|
|
"help": "The default model name to use. If None, read model name from "
|
|
"environment key `LLM_MODEL`.",
|
|
"tags": "fixed",
|
|
},
|
|
)
|
|
share: Optional[bool] = field(
|
|
default=False,
|
|
metadata={
|
|
"help": "Whether to create a publicly shareable link for the interface. "
|
|
"Creates an SSH tunnel to make your UI accessible from anywhere. "
|
|
},
|
|
)
|
|
remote_embedding: Optional[bool] = field(
|
|
default=False,
|
|
metadata={
|
|
"help": "Whether to enable remote embedding models. If it is True, you need"
|
|
" to start a embedding model through `dbgpt start worker --worker_type "
|
|
"text2vec --model_name xxx --model_path xxx`"
|
|
},
|
|
)
|
|
remote_rerank: Optional[bool] = field(
|
|
default=False,
|
|
metadata={
|
|
"help": "Whether to enable remote rerank models. If it is True, you need"
|
|
" to start a rerank model through `dbgpt start worker --worker_type "
|
|
"text2vec --rerank --model_name xxx --model_path xxx`"
|
|
},
|
|
)
|
|
|
|
light: Optional[bool] = field(default=False, metadata={"help": "enable light mode"})
|
|
log_file: Optional[str] = field(
|
|
default="dbgpt_webserver.log",
|
|
metadata={
|
|
"help": "The filename to store log",
|
|
},
|
|
)
|
|
tracer_file: Optional[str] = field(
|
|
default="dbgpt_webserver_tracer.jsonl",
|
|
metadata={
|
|
"help": "The filename to store tracer span records",
|
|
},
|
|
)
|
|
tracer_storage_cls: Optional[str] = field(
|
|
default=None,
|
|
metadata={
|
|
"help": "The storage class to storage tracer span records",
|
|
},
|
|
)
|
|
disable_alembic_upgrade: Optional[bool] = field(
|
|
default=False,
|
|
metadata={
|
|
"help": "Whether to disable alembic to initialize and upgrade database metadata",
|
|
},
|
|
)
|
|
awel_dirs: Optional[str] = field(
|
|
default=None,
|
|
metadata={
|
|
"help": "The directories to search awel files, split by `,`",
|
|
},
|
|
)
|
|
default_thread_pool_size: Optional[int] = field(
|
|
default=None,
|
|
metadata={
|
|
"help": "The default thread pool size, If None, "
|
|
"use default config of python thread pool",
|
|
},
|
|
)
|