DB-GPT/dbgpt/app/base.py
2024-07-05 15:20:21 +08:00

289 lines
10 KiB
Python

import logging
import os
import signal
import sys
import threading
from dataclasses import dataclass, field
from typing import Optional
from dbgpt._private.config import Config
from dbgpt.component import SystemApp
from dbgpt.storage import DBType
from dbgpt.util.parameter_utils import BaseServerParameters
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_PATH)
logger = logging.getLogger(__name__)
def signal_handler(sig, frame):
print("in order to avoid chroma db atexit problem")
os._exit(0)
def async_db_summary(system_app: SystemApp):
"""async db schema into vector db"""
from dbgpt.rag.summary.db_summary_client import DBSummaryClient
client = DBSummaryClient(system_app=system_app)
thread = threading.Thread(target=client.init_db_summary)
thread.start()
def server_init(param: "WebServerParameters", system_app: SystemApp):
# logger.info(f"args: {args}")
# init config
cfg = Config()
cfg.SYSTEM_APP = system_app
# Initialize db storage first
_initialize_db_storage(param, system_app)
# load_native_plugins(cfg)
signal.signal(signal.SIGINT, signal_handler)
def _create_model_start_listener(system_app: SystemApp):
def startup_event(wh):
print("begin run _add_app_startup_event")
async_db_summary(system_app)
return startup_event
def _initialize_db_storage(param: "WebServerParameters", system_app: SystemApp):
"""Initialize the db storage.
Now just support sqlite and mysql. If db type is sqlite, the db path is `pilot/meta_data/{db_name}.db`.
"""
_initialize_db(
try_to_create_db=not param.disable_alembic_upgrade, system_app=system_app
)
def _migration_db_storage(param: "WebServerParameters"):
"""Migration the db storage."""
# Import all models to make sure they are registered with SQLAlchemy.
from dbgpt.app.initialization.db_model_initialization import _MODELS
from dbgpt.configs.model_config import PILOT_PATH
default_meta_data_path = os.path.join(PILOT_PATH, "meta_data")
if not param.disable_alembic_upgrade:
from dbgpt.storage.metadata.db_manager import db
from dbgpt.util._db_migration_utils import _ddl_init_and_upgrade
# Try to create all tables, when the dbtype is sqlite, it will auto create and upgrade system schema,
# Otherwise, you need to execute initialization scripts to create schemas.
CFG = Config()
if CFG.LOCAL_DB_TYPE == "sqlite":
try:
db.create_all()
except Exception as e:
logger.warning(
f"Create all tables stored in this metadata error: {str(e)}"
)
_ddl_init_and_upgrade(default_meta_data_path, param.disable_alembic_upgrade)
else:
warn_msg = """For safety considerations, MySQL Database not support DDL init and upgrade. "
"1.If you are use DB-GPT firstly, please manually execute the following command to initialize,
`mysql -h127.0.0.1 -uroot -p{your_password} < ./assets/schema/dbgpt.sql` "
"2.If there are any changes to the table columns in the DB-GPT database,
it is necessary to compare with the DB-GPT/assets/schema/dbgpt.sql file
and manually make the columns changes in the MySQL database instance."""
logger.warning(warn_msg)
def _initialize_db(
try_to_create_db: Optional[bool] = False, system_app: Optional[SystemApp] = None
) -> str:
"""Initialize the database
Now just support sqlite and MySQL. If db type is sqlite, the db path is `pilot/meta_data/{db_name}.db`.
"""
from urllib.parse import quote
from urllib.parse import quote_plus as urlquote
from dbgpt.configs.model_config import PILOT_PATH
from dbgpt.datasource.rdbms.dialect.oceanbase.ob_dialect import ( # noqa: F401
OBDialect,
)
from dbgpt.storage.metadata.db_manager import initialize_db
CFG = Config()
db_name = CFG.LOCAL_DB_NAME
default_meta_data_path = os.path.join(PILOT_PATH, "meta_data")
os.makedirs(default_meta_data_path, exist_ok=True)
if CFG.LOCAL_DB_TYPE == DBType.MySQL.value():
db_url = (
f"mysql+pymysql://{quote(CFG.LOCAL_DB_USER)}:"
f"{urlquote(CFG.LOCAL_DB_PASSWORD)}@"
f"{CFG.LOCAL_DB_HOST}:"
f"{str(CFG.LOCAL_DB_PORT)}/"
f"{db_name}?charset=utf8mb4"
)
# Try to create database, if failed, will raise exception
_create_mysql_database(db_name, db_url, try_to_create_db)
elif CFG.LOCAL_DB_TYPE == DBType.OceanBase.value():
db_url = (
f"mysql+ob://{quote(CFG.LOCAL_DB_USER)}:"
f"{urlquote(CFG.LOCAL_DB_PASSWORD)}@"
f"{CFG.LOCAL_DB_HOST}:"
f"{str(CFG.LOCAL_DB_PORT)}/"
f"{db_name}?charset=utf8mb4"
)
_create_mysql_database(db_name, db_url, try_to_create_db)
else:
sqlite_db_path = os.path.join(default_meta_data_path, f"{db_name}.db")
db_url = f"sqlite:///{sqlite_db_path}"
engine_args = {
"pool_size": CFG.LOCAL_DB_POOL_SIZE,
"max_overflow": CFG.LOCAL_DB_POOL_OVERFLOW,
"pool_timeout": 30,
"pool_recycle": 3600,
"pool_pre_ping": True,
}
db = initialize_db(db_url, db_name, engine_args)
if system_app:
from dbgpt.storage.metadata import UnifiedDBManagerFactory
system_app.register(UnifiedDBManagerFactory, db)
return default_meta_data_path
def _create_mysql_database(db_name: str, db_url: str, try_to_create_db: bool = False):
"""Create mysql database if not exists
Args:
db_name (str): The database name
db_url (str): The database url, include host, port, user, password and database name
try_to_create_db (bool, optional): Whether to try to create database. Defaults to False.
Raises:
Exception: Raise exception if database operation failed
"""
from sqlalchemy import DDL, create_engine
from sqlalchemy.exc import OperationalError, SQLAlchemyError
if not try_to_create_db:
logger.info(f"Skipping creation of database {db_name}")
return
engine = create_engine(db_url)
try:
# Try to connect to the database
with engine.connect() as conn:
logger.info(f"Database {db_name} already exists")
return
except OperationalError as oe:
# If the error indicates that the database does not exist, try to create it
if "Unknown database" in str(oe):
try:
# Create the database
no_db_name_url = db_url.rsplit("/", 1)[0]
engine_no_db = create_engine(no_db_name_url)
with engine_no_db.connect() as conn:
conn.execute(
DDL(
f"CREATE DATABASE {db_name} CHARACTER SET utf8mb4 COLLATE "
f"utf8mb4_unicode_ci"
)
)
logger.info(f"Database {db_name} successfully created")
except SQLAlchemyError as e:
logger.error(f"Failed to create database {db_name}: {e}")
raise
else:
logger.error(f"Error connecting to database {db_name}: {oe}")
raise
@dataclass
class WebServerParameters(BaseServerParameters):
host: Optional[str] = field(
default="0.0.0.0", metadata={"help": "Webserver deploy host"}
)
port: Optional[int] = field(
default=None, metadata={"help": "Webserver deploy port"}
)
daemon: Optional[bool] = field(
default=False, metadata={"help": "Run Webserver in background"}
)
controller_addr: Optional[str] = field(
default=None,
metadata={
"help": "The Model controller address to connect. If None, read model "
"controller address from environment key `MODEL_SERVER`."
},
)
model_name: str = field(
default=None,
metadata={
"help": "The default model name to use. If None, read model name from "
"environment key `LLM_MODEL`.",
"tags": "fixed",
},
)
share: Optional[bool] = field(
default=False,
metadata={
"help": "Whether to create a publicly shareable link for the interface. "
"Creates an SSH tunnel to make your UI accessible from anywhere. "
},
)
remote_embedding: Optional[bool] = field(
default=False,
metadata={
"help": "Whether to enable remote embedding models. If it is True, you need"
" to start a embedding model through `dbgpt start worker --worker_type "
"text2vec --model_name xxx --model_path xxx`"
},
)
remote_rerank: Optional[bool] = field(
default=False,
metadata={
"help": "Whether to enable remote rerank models. If it is True, you need"
" to start a rerank model through `dbgpt start worker --worker_type "
"text2vec --rerank --model_name xxx --model_path xxx`"
},
)
light: Optional[bool] = field(default=False, metadata={"help": "enable light mode"})
log_file: Optional[str] = field(
default="dbgpt_webserver.log",
metadata={
"help": "The filename to store log",
},
)
tracer_file: Optional[str] = field(
default="dbgpt_webserver_tracer.jsonl",
metadata={
"help": "The filename to store tracer span records",
},
)
tracer_storage_cls: Optional[str] = field(
default=None,
metadata={
"help": "The storage class to storage tracer span records",
},
)
disable_alembic_upgrade: Optional[bool] = field(
default=False,
metadata={
"help": "Whether to disable alembic to initialize and upgrade database metadata",
},
)
awel_dirs: Optional[str] = field(
default=None,
metadata={
"help": "The directories to search awel files, split by `,`",
},
)
default_thread_pool_size: Optional[int] = field(
default=None,
metadata={
"help": "The default thread pool size, If None, "
"use default config of python thread pool",
},
)