DB-GPT/dbgpt/util/_db_migration_utils.py
2024-01-10 10:39:04 +08:00

385 lines
14 KiB
Python

import logging
import os
from typing import Optional
from alembic import command
from alembic.config import Config as AlembicConfig
from alembic.util.exc import CommandError
from sqlalchemy import Engine, text
from sqlalchemy.orm import DeclarativeMeta, Session
logger = logging.getLogger(__name__)
def create_alembic_config(
alembic_root_path: str,
engine: Engine,
base: DeclarativeMeta,
session: Session,
alembic_ini_path: Optional[str] = None,
script_location: Optional[str] = None,
) -> AlembicConfig:
"""Create alembic config.
Args:
alembic_root_path: alembic root path
engine: sqlalchemy engine
base: sqlalchemy base
session: sqlalchemy session
alembic_ini_path (Optional[str]): alembic ini path
script_location (Optional[str]): alembic script location
Returns:
alembic config
"""
alembic_ini_path = alembic_ini_path or os.path.join(
alembic_root_path, "alembic.ini"
)
alembic_cfg = AlembicConfig(alembic_ini_path)
alembic_cfg.set_main_option("sqlalchemy.url", str(engine.url))
script_location = script_location or os.path.join(alembic_root_path, "alembic")
versions_dir = os.path.join(script_location, "versions")
os.makedirs(script_location, exist_ok=True)
os.makedirs(versions_dir, exist_ok=True)
alembic_cfg.set_main_option("script_location", script_location)
alembic_cfg.attributes["target_metadata"] = base.metadata
alembic_cfg.attributes["session"] = session
return alembic_cfg
def create_migration_script(
alembic_cfg: AlembicConfig,
engine: Engine,
message: str = "New migration",
create_new_revision_if_noting_to_update: Optional[bool] = True,
) -> str:
"""Create migration script.
Args:
alembic_cfg: alembic config
engine: sqlalchemy engine
message: migration message
create_new_revision_if_noting_to_update: Whether to create a new revision if there is nothing to update,
pass False to avoid creating a new revision if there is nothing to update, default is True
Returns:
The path of the generated migration script.
"""
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
# Check if the database is up-to-date
script_dir = ScriptDirectory.from_config(alembic_cfg)
with engine.connect() as connection:
context = MigrationContext.configure(connection=connection)
current_rev = context.get_current_revision()
head_rev = script_dir.get_current_head()
logger.info(
f"alembic migration current revision: {current_rev}, latest revision: {head_rev}"
)
should_create_revision = (
(current_rev is None and head_rev is None)
or current_rev != head_rev
or create_new_revision_if_noting_to_update
)
if should_create_revision:
with engine.connect() as connection:
alembic_cfg.attributes["connection"] = connection
revision = command.revision(alembic_cfg, message=message, autogenerate=True)
# Return the path of the generated migration script
return revision.path
elif current_rev == head_rev:
logger.info("No migration script to generate, database is up-to-date")
# If no new revision is created, return None or an appropriate message
return None
def upgrade_database(
alembic_cfg: AlembicConfig, engine: Engine, target_version: str = "head"
) -> None:
"""Upgrade database to target version.
Args:
alembic_cfg: alembic config
engine: sqlalchemy engine
target_version: target version, default is head(latest version)
"""
with engine.connect() as connection:
alembic_cfg.attributes["connection"] = connection
# Will create tables if not exists
command.upgrade(alembic_cfg, target_version)
def generate_sql_for_upgrade(
alembic_cfg: AlembicConfig,
engine: Engine,
target_version: Optional[str] = "head",
output_file: Optional[str] = "migration.sql",
) -> None:
"""Generate SQL for upgrading database to target version.
Args:
alembic_cfg: alembic config
engine: sqlalchemy engine
target_version: target version, default is head (latest version)
output_file: file to write the SQL script
TODO: Can't generate SQL for most of the operations.
"""
import contextlib
import io
with engine.connect() as connection, contextlib.redirect_stdout(
io.StringIO()
) as stdout:
alembic_cfg.attributes["connection"] = connection
# Generating SQL instead of applying changes
command.upgrade(alembic_cfg, target_version, sql=True)
# Write the generated SQL to a file
with open(output_file, "w", encoding="utf-8") as file:
file.write(stdout.getvalue())
def downgrade_database(
alembic_cfg: AlembicConfig, engine: Engine, revision: str = "-1"
):
"""Downgrade the database by one revision.
Args:
alembic_cfg: Alembic configuration object.
engine: SQLAlchemy engine instance.
revision: Revision identifier, default is "-1" which means one revision back.
"""
with engine.connect() as connection:
alembic_cfg.attributes["connection"] = connection
command.downgrade(alembic_cfg, revision)
def clean_alembic_migration(alembic_cfg: AlembicConfig, engine: Engine) -> None:
"""Clean Alembic migration scripts and history.
Args:
alembic_cfg: Alembic config object
engine: SQLAlchemy engine instance
"""
import shutil
# Get migration script location
script_location = alembic_cfg.get_main_option("script_location")
print(f"Delete migration script location: {script_location}")
# Delete all migration script files
for file in os.listdir(script_location):
if file.startswith("versions"):
filepath = os.path.join(script_location, file)
print(f"Delete migration script file: {filepath}")
if os.path.isfile(filepath):
os.remove(filepath)
else:
shutil.rmtree(filepath, ignore_errors=True)
# Delete Alembic version table if exists
version_table = alembic_cfg.get_main_option("version_table") or "alembic_version"
if version_table:
with engine.connect() as connection:
print(f"Delete Alembic version table: {version_table}")
connection.execute(text(f"DROP TABLE IF EXISTS {version_table}"))
print("Cleaned Alembic migration scripts and history")
_MIGRATION_SOLUTION = """
**Solution 1:**
Run the following command to upgrade the database.
```commandline
dbgpt db migration upgrade
```
**Solution 2:**
Run the following command to clean the migration script and migration history.
```commandline
dbgpt db migration clean -y
```
**Solution 3:**
If you have already run the above command, but the error still exists,
you can try the following command to clean the migration script, migration history and your data.
warning: This command will delete all your data!!! Please use it with caution.
```commandline
dbgpt db migration clean --drop_all_tables -y --confirm_drop_all_tables
```
or
```commandline
rm -rf pilot/meta_data/alembic/versions/*
rm -rf pilot/meta_data/alembic/dbgpt.db
```
If your database is a shared database, and you run DB-GPT in multiple instances,
you should make sure that all migration scripts are same in all instances, in this case,
wo strongly recommend you close migration feature by setting `--disable_alembic_upgrade`.
and use `dbgpt db migration` command to manage migration scripts.
"""
def _check_database_migration_status(alembic_cfg: AlembicConfig, engine: Engine):
"""Check if the database is at the latest migration revision.
If your database is a shared database, and you run DB-GPT in multiple instances,
you should make sure that all migration scripts are same in all instances, in this case,
wo strongly recommend you close migration feature by setting `disable_alembic_upgrade` to True.
and use `dbgpt db migration` command to manage migration scripts.
Args:
alembic_cfg: Alembic configuration object.
engine: SQLAlchemy engine instance.
Raises:
Exception: If the database is not at the latest revision.
"""
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
script = ScriptDirectory.from_config(alembic_cfg)
def get_current_revision(engine):
with engine.connect() as connection:
context = MigrationContext.configure(connection=connection)
return context.get_current_revision()
current_rev = get_current_revision(engine)
head_rev = script.get_current_head()
script_info_msg = "Migration versions and their file paths:"
script_info_msg += f"\n{'='*40}Migration versions{'='*40}\n"
for revision in script.walk_revisions(base="base"):
current_marker = "(current)" if revision.revision == current_rev else ""
script_path = script.get_revision(revision.revision).path
script_info_msg += f"\n{revision.revision} {current_marker}: {revision.doc} (Path: {script_path})"
script_info_msg += f"\n{'='*90}"
logger.info(script_info_msg)
if current_rev != head_rev:
logger.error(
"Database is not at the latest revision. "
f"Current revision: {current_rev}, latest revision: {head_rev}\n"
"Please apply existing migration scripts before generating new ones. "
"Check the listed file paths for migration scripts.\n"
f"Also you can try the following solutions:\n{_MIGRATION_SOLUTION}\n"
)
raise Exception(
"Check database migration status failed, you can see the error and solutions above"
)
def _get_latest_revision(alembic_cfg: AlembicConfig, engine: Engine) -> str:
"""Get the latest revision of the database.
Args:
alembic_cfg: Alembic configuration object.
engine: SQLAlchemy engine instance.
Returns:
The latest revision as a string.
"""
from alembic.runtime.migration import MigrationContext
with engine.connect() as connection:
context = MigrationContext.configure(connection=connection)
return context.get_current_revision()
def _delete_migration_script(script_path: str):
"""Delete a migration script.
Args:
script_path: The path of the migration script to delete.
"""
if os.path.exists(script_path):
os.remove(script_path)
logger.info(f"Deleted migration script at: {script_path}")
else:
logger.warning(f"Migration script not found at: {script_path}")
def _ddl_init_and_upgrade(
default_meta_data_path: str,
disable_alembic_upgrade: bool,
alembic_ini_path: Optional[str] = None,
script_location: Optional[str] = None,
):
"""Initialize and upgrade database metadata
Args:
default_meta_data_path (str): default meta data path
disable_alembic_upgrade (bool): Whether to enable alembic to initialize and upgrade database metadata
alembic_ini_path (Optional[str]): alembic ini path
script_location (Optional[str]): alembic script location
"""
if disable_alembic_upgrade:
logger.info(
"disable_alembic_upgrade is true, not to initialize and upgrade database metadata with alembic"
)
return
else:
warn_msg = (
"Initialize and upgrade database metadata with alembic, "
"just run this in your development environment, if you deploy this in production environment, "
"please run webserver with --disable_alembic_upgrade(`python dbgpt/app/dbgpt_server.py "
"--disable_alembic_upgrade`).\n"
"we suggest you to use `dbgpt db migration` to initialize and upgrade database metadata with alembic, "
"your can run `dbgpt db migration --help` to get more information."
)
logger.warning(warn_msg)
from dbgpt.storage.metadata.db_manager import db
alembic_cfg = create_alembic_config(
default_meta_data_path,
db.engine,
db.Model,
db.session(),
alembic_ini_path,
script_location,
)
try:
_check_database_migration_status(alembic_cfg, db.engine)
except Exception as e:
logger.error(f"Failed to check database migration status: {e}")
raise
latest_revision_before = "__latest_revision_before__"
new_script_path = None
try:
latest_revision_before = _get_latest_revision(alembic_cfg, db.engine)
# create_new_revision_if_noting_to_update=False avoid creating a lot of empty migration scripts
# TODO Set create_new_revision_if_noting_to_update=False, not working now.
new_script_path = create_migration_script(
alembic_cfg, db.engine, create_new_revision_if_noting_to_update=True
)
upgrade_database(alembic_cfg, db.engine)
except CommandError as e:
if "Target database is not up to date" in str(e):
logger.error(
f"Initialize and upgrade database metadata with alembic failed, error detail: {str(e)} "
f"you can try the following solutions:\n{_MIGRATION_SOLUTION}\n"
)
raise Exception(
"Initialize and upgrade database metadata with alembic failed, "
"you can see the error and solutions above"
) from e
else:
latest_revision_after = _get_latest_revision(alembic_cfg, db.engine)
if latest_revision_before != latest_revision_after:
logger.error(
f"Upgrade database failed. Please review the migration script manually. "
f"Failed script path: {new_script_path}\nError: {e}"
)
raise e