mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-26 13:27:46 +00:00
385 lines
14 KiB
Python
385 lines
14 KiB
Python
import logging
|
|
import os
|
|
from typing import Optional
|
|
|
|
from alembic import command
|
|
from alembic.config import Config as AlembicConfig
|
|
from alembic.util.exc import CommandError
|
|
from sqlalchemy import Engine, text
|
|
from sqlalchemy.orm import DeclarativeMeta, Session
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_alembic_config(
|
|
alembic_root_path: str,
|
|
engine: Engine,
|
|
base: DeclarativeMeta,
|
|
session: Session,
|
|
alembic_ini_path: Optional[str] = None,
|
|
script_location: Optional[str] = None,
|
|
) -> AlembicConfig:
|
|
"""Create alembic config.
|
|
|
|
Args:
|
|
alembic_root_path: alembic root path
|
|
engine: sqlalchemy engine
|
|
base: sqlalchemy base
|
|
session: sqlalchemy session
|
|
alembic_ini_path (Optional[str]): alembic ini path
|
|
script_location (Optional[str]): alembic script location
|
|
|
|
Returns:
|
|
alembic config
|
|
"""
|
|
alembic_ini_path = alembic_ini_path or os.path.join(
|
|
alembic_root_path, "alembic.ini"
|
|
)
|
|
alembic_cfg = AlembicConfig(alembic_ini_path)
|
|
alembic_cfg.set_main_option("sqlalchemy.url", str(engine.url))
|
|
script_location = script_location or os.path.join(alembic_root_path, "alembic")
|
|
versions_dir = os.path.join(script_location, "versions")
|
|
|
|
os.makedirs(script_location, exist_ok=True)
|
|
os.makedirs(versions_dir, exist_ok=True)
|
|
|
|
alembic_cfg.set_main_option("script_location", script_location)
|
|
|
|
alembic_cfg.attributes["target_metadata"] = base.metadata
|
|
alembic_cfg.attributes["session"] = session
|
|
return alembic_cfg
|
|
|
|
|
|
def create_migration_script(
|
|
alembic_cfg: AlembicConfig,
|
|
engine: Engine,
|
|
message: str = "New migration",
|
|
create_new_revision_if_noting_to_update: Optional[bool] = True,
|
|
) -> str:
|
|
"""Create migration script.
|
|
|
|
Args:
|
|
alembic_cfg: alembic config
|
|
engine: sqlalchemy engine
|
|
message: migration message
|
|
create_new_revision_if_noting_to_update: Whether to create a new revision if there is nothing to update,
|
|
pass False to avoid creating a new revision if there is nothing to update, default is True
|
|
Returns:
|
|
The path of the generated migration script.
|
|
"""
|
|
from alembic.runtime.migration import MigrationContext
|
|
from alembic.script import ScriptDirectory
|
|
|
|
# Check if the database is up-to-date
|
|
script_dir = ScriptDirectory.from_config(alembic_cfg)
|
|
with engine.connect() as connection:
|
|
context = MigrationContext.configure(connection=connection)
|
|
current_rev = context.get_current_revision()
|
|
head_rev = script_dir.get_current_head()
|
|
|
|
logger.info(
|
|
f"alembic migration current revision: {current_rev}, latest revision: {head_rev}"
|
|
)
|
|
should_create_revision = (
|
|
(current_rev is None and head_rev is None)
|
|
or current_rev != head_rev
|
|
or create_new_revision_if_noting_to_update
|
|
)
|
|
if should_create_revision:
|
|
with engine.connect() as connection:
|
|
alembic_cfg.attributes["connection"] = connection
|
|
revision = command.revision(alembic_cfg, message=message, autogenerate=True)
|
|
# Return the path of the generated migration script
|
|
return revision.path
|
|
elif current_rev == head_rev:
|
|
logger.info("No migration script to generate, database is up-to-date")
|
|
# If no new revision is created, return None or an appropriate message
|
|
return None
|
|
|
|
|
|
def upgrade_database(
|
|
alembic_cfg: AlembicConfig, engine: Engine, target_version: str = "head"
|
|
) -> None:
|
|
"""Upgrade database to target version.
|
|
|
|
Args:
|
|
alembic_cfg: alembic config
|
|
engine: sqlalchemy engine
|
|
target_version: target version, default is head(latest version)
|
|
"""
|
|
with engine.connect() as connection:
|
|
alembic_cfg.attributes["connection"] = connection
|
|
# Will create tables if not exists
|
|
command.upgrade(alembic_cfg, target_version)
|
|
|
|
|
|
def generate_sql_for_upgrade(
|
|
alembic_cfg: AlembicConfig,
|
|
engine: Engine,
|
|
target_version: Optional[str] = "head",
|
|
output_file: Optional[str] = "migration.sql",
|
|
) -> None:
|
|
"""Generate SQL for upgrading database to target version.
|
|
|
|
Args:
|
|
alembic_cfg: alembic config
|
|
engine: sqlalchemy engine
|
|
target_version: target version, default is head (latest version)
|
|
output_file: file to write the SQL script
|
|
|
|
TODO: Can't generate SQL for most of the operations.
|
|
"""
|
|
import contextlib
|
|
import io
|
|
|
|
with engine.connect() as connection, contextlib.redirect_stdout(
|
|
io.StringIO()
|
|
) as stdout:
|
|
alembic_cfg.attributes["connection"] = connection
|
|
# Generating SQL instead of applying changes
|
|
command.upgrade(alembic_cfg, target_version, sql=True)
|
|
|
|
# Write the generated SQL to a file
|
|
with open(output_file, "w", encoding="utf-8") as file:
|
|
file.write(stdout.getvalue())
|
|
|
|
|
|
def downgrade_database(
|
|
alembic_cfg: AlembicConfig, engine: Engine, revision: str = "-1"
|
|
):
|
|
"""Downgrade the database by one revision.
|
|
|
|
Args:
|
|
alembic_cfg: Alembic configuration object.
|
|
engine: SQLAlchemy engine instance.
|
|
revision: Revision identifier, default is "-1" which means one revision back.
|
|
"""
|
|
with engine.connect() as connection:
|
|
alembic_cfg.attributes["connection"] = connection
|
|
command.downgrade(alembic_cfg, revision)
|
|
|
|
|
|
def clean_alembic_migration(alembic_cfg: AlembicConfig, engine: Engine) -> None:
|
|
"""Clean Alembic migration scripts and history.
|
|
|
|
Args:
|
|
alembic_cfg: Alembic config object
|
|
engine: SQLAlchemy engine instance
|
|
|
|
"""
|
|
import shutil
|
|
|
|
# Get migration script location
|
|
script_location = alembic_cfg.get_main_option("script_location")
|
|
print(f"Delete migration script location: {script_location}")
|
|
|
|
# Delete all migration script files
|
|
for file in os.listdir(script_location):
|
|
if file.startswith("versions"):
|
|
filepath = os.path.join(script_location, file)
|
|
print(f"Delete migration script file: {filepath}")
|
|
if os.path.isfile(filepath):
|
|
os.remove(filepath)
|
|
else:
|
|
shutil.rmtree(filepath, ignore_errors=True)
|
|
|
|
# Delete Alembic version table if exists
|
|
version_table = alembic_cfg.get_main_option("version_table") or "alembic_version"
|
|
if version_table:
|
|
with engine.connect() as connection:
|
|
print(f"Delete Alembic version table: {version_table}")
|
|
connection.execute(text(f"DROP TABLE IF EXISTS {version_table}"))
|
|
|
|
print("Cleaned Alembic migration scripts and history")
|
|
|
|
|
|
_MIGRATION_SOLUTION = """
|
|
**Solution 1:**
|
|
|
|
Run the following command to upgrade the database.
|
|
```commandline
|
|
dbgpt db migration upgrade
|
|
```
|
|
|
|
**Solution 2:**
|
|
|
|
Run the following command to clean the migration script and migration history.
|
|
```commandline
|
|
dbgpt db migration clean -y
|
|
```
|
|
|
|
**Solution 3:**
|
|
|
|
If you have already run the above command, but the error still exists,
|
|
you can try the following command to clean the migration script, migration history and your data.
|
|
warning: This command will delete all your data!!! Please use it with caution.
|
|
|
|
```commandline
|
|
dbgpt db migration clean --drop_all_tables -y --confirm_drop_all_tables
|
|
```
|
|
or
|
|
```commandline
|
|
rm -rf pilot/meta_data/alembic/versions/*
|
|
rm -rf pilot/meta_data/alembic/dbgpt.db
|
|
```
|
|
|
|
If your database is a shared database, and you run DB-GPT in multiple instances,
|
|
you should make sure that all migration scripts are same in all instances, in this case,
|
|
wo strongly recommend you close migration feature by setting `--disable_alembic_upgrade`.
|
|
and use `dbgpt db migration` command to manage migration scripts.
|
|
"""
|
|
|
|
|
|
def _check_database_migration_status(alembic_cfg: AlembicConfig, engine: Engine):
|
|
"""Check if the database is at the latest migration revision.
|
|
|
|
If your database is a shared database, and you run DB-GPT in multiple instances,
|
|
you should make sure that all migration scripts are same in all instances, in this case,
|
|
wo strongly recommend you close migration feature by setting `disable_alembic_upgrade` to True.
|
|
and use `dbgpt db migration` command to manage migration scripts.
|
|
|
|
Args:
|
|
alembic_cfg: Alembic configuration object.
|
|
engine: SQLAlchemy engine instance.
|
|
Raises:
|
|
Exception: If the database is not at the latest revision.
|
|
"""
|
|
from alembic.runtime.migration import MigrationContext
|
|
from alembic.script import ScriptDirectory
|
|
|
|
script = ScriptDirectory.from_config(alembic_cfg)
|
|
|
|
def get_current_revision(engine):
|
|
with engine.connect() as connection:
|
|
context = MigrationContext.configure(connection=connection)
|
|
return context.get_current_revision()
|
|
|
|
current_rev = get_current_revision(engine)
|
|
head_rev = script.get_current_head()
|
|
|
|
script_info_msg = "Migration versions and their file paths:"
|
|
script_info_msg += f"\n{'='*40}Migration versions{'='*40}\n"
|
|
for revision in script.walk_revisions(base="base"):
|
|
current_marker = "(current)" if revision.revision == current_rev else ""
|
|
script_path = script.get_revision(revision.revision).path
|
|
script_info_msg += f"\n{revision.revision} {current_marker}: {revision.doc} (Path: {script_path})"
|
|
script_info_msg += f"\n{'='*90}"
|
|
|
|
logger.info(script_info_msg)
|
|
|
|
if current_rev != head_rev:
|
|
logger.error(
|
|
"Database is not at the latest revision. "
|
|
f"Current revision: {current_rev}, latest revision: {head_rev}\n"
|
|
"Please apply existing migration scripts before generating new ones. "
|
|
"Check the listed file paths for migration scripts.\n"
|
|
f"Also you can try the following solutions:\n{_MIGRATION_SOLUTION}\n"
|
|
)
|
|
raise Exception(
|
|
"Check database migration status failed, you can see the error and solutions above"
|
|
)
|
|
|
|
|
|
def _get_latest_revision(alembic_cfg: AlembicConfig, engine: Engine) -> str:
|
|
"""Get the latest revision of the database.
|
|
|
|
Args:
|
|
alembic_cfg: Alembic configuration object.
|
|
engine: SQLAlchemy engine instance.
|
|
|
|
Returns:
|
|
The latest revision as a string.
|
|
"""
|
|
from alembic.runtime.migration import MigrationContext
|
|
|
|
with engine.connect() as connection:
|
|
context = MigrationContext.configure(connection=connection)
|
|
return context.get_current_revision()
|
|
|
|
|
|
def _delete_migration_script(script_path: str):
|
|
"""Delete a migration script.
|
|
|
|
Args:
|
|
script_path: The path of the migration script to delete.
|
|
"""
|
|
if os.path.exists(script_path):
|
|
os.remove(script_path)
|
|
logger.info(f"Deleted migration script at: {script_path}")
|
|
else:
|
|
logger.warning(f"Migration script not found at: {script_path}")
|
|
|
|
|
|
def _ddl_init_and_upgrade(
|
|
default_meta_data_path: str,
|
|
disable_alembic_upgrade: bool,
|
|
alembic_ini_path: Optional[str] = None,
|
|
script_location: Optional[str] = None,
|
|
):
|
|
"""Initialize and upgrade database metadata
|
|
|
|
Args:
|
|
default_meta_data_path (str): default meta data path
|
|
disable_alembic_upgrade (bool): Whether to enable alembic to initialize and upgrade database metadata
|
|
alembic_ini_path (Optional[str]): alembic ini path
|
|
script_location (Optional[str]): alembic script location
|
|
"""
|
|
if disable_alembic_upgrade:
|
|
logger.info(
|
|
"disable_alembic_upgrade is true, not to initialize and upgrade database metadata with alembic"
|
|
)
|
|
return
|
|
else:
|
|
warn_msg = (
|
|
"Initialize and upgrade database metadata with alembic, "
|
|
"just run this in your development environment, if you deploy this in production environment, "
|
|
"please run webserver with --disable_alembic_upgrade(`python dbgpt/app/dbgpt_server.py "
|
|
"--disable_alembic_upgrade`).\n"
|
|
"we suggest you to use `dbgpt db migration` to initialize and upgrade database metadata with alembic, "
|
|
"your can run `dbgpt db migration --help` to get more information."
|
|
)
|
|
logger.warning(warn_msg)
|
|
from dbgpt.storage.metadata.db_manager import db
|
|
|
|
alembic_cfg = create_alembic_config(
|
|
default_meta_data_path,
|
|
db.engine,
|
|
db.Model,
|
|
db.session(),
|
|
alembic_ini_path,
|
|
script_location,
|
|
)
|
|
try:
|
|
_check_database_migration_status(alembic_cfg, db.engine)
|
|
except Exception as e:
|
|
logger.error(f"Failed to check database migration status: {e}")
|
|
raise
|
|
latest_revision_before = "__latest_revision_before__"
|
|
new_script_path = None
|
|
try:
|
|
latest_revision_before = _get_latest_revision(alembic_cfg, db.engine)
|
|
# create_new_revision_if_noting_to_update=False avoid creating a lot of empty migration scripts
|
|
# TODO Set create_new_revision_if_noting_to_update=False, not working now.
|
|
new_script_path = create_migration_script(
|
|
alembic_cfg, db.engine, create_new_revision_if_noting_to_update=True
|
|
)
|
|
upgrade_database(alembic_cfg, db.engine)
|
|
except CommandError as e:
|
|
if "Target database is not up to date" in str(e):
|
|
logger.error(
|
|
f"Initialize and upgrade database metadata with alembic failed, error detail: {str(e)} "
|
|
f"you can try the following solutions:\n{_MIGRATION_SOLUTION}\n"
|
|
)
|
|
raise Exception(
|
|
"Initialize and upgrade database metadata with alembic failed, "
|
|
"you can see the error and solutions above"
|
|
) from e
|
|
else:
|
|
latest_revision_after = _get_latest_revision(alembic_cfg, db.engine)
|
|
if latest_revision_before != latest_revision_after:
|
|
logger.error(
|
|
f"Upgrade database failed. Please review the migration script manually. "
|
|
f"Failed script path: {new_script_path}\nError: {e}"
|
|
)
|
|
raise e
|