DB-GPT/dbgpt/rag/summary/rdbms_db_summary.py
Cooper 9b0161e521
Feat rdb summary wide table (#2035)
Co-authored-by: dongzhancai1 <dongzhancai1@jd.com>
Co-authored-by: dong <dongzhancai@iie2.com>
2024-12-18 20:34:21 +08:00

258 lines
8.9 KiB
Python

"""Summary for rdbms database."""
import re
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from dbgpt._private.config import Config
from dbgpt.datasource import BaseConnector
from dbgpt.rag.summary.db_summary import DBSummary
if TYPE_CHECKING:
from dbgpt.datasource.manages import ConnectorManager
CFG = Config()
class RdbmsSummary(DBSummary):
"""Get rdbms db table summary template.
Summary example:
table_name(column1(column1 comment),column2(column2 comment),
column3(column3 comment) and index keys, and table comment is {table_comment})
"""
def __init__(
self, name: str, type: str, manager: Optional["ConnectorManager"] = None
):
"""Create a new RdbmsSummary."""
self.name = name
self.type = type
self.summary_template = "{table_name}({columns})"
self.tables = {}
# self.tables_info = []
# self.vector_tables_info = []
# TODO: Don't use the global variable.
db_manager = manager or CFG.local_db_manager
if not db_manager:
raise ValueError("Local db manage is not initialized.")
self.db = db_manager.get_connector(name)
self.metadata = """user info :{users}, grant info:{grant}, charset:{charset},
collation:{collation}""".format(
users=self.db.get_users(),
grant=self.db.get_grants(),
charset=self.db.get_charset(),
collation=self.db.get_collation(),
)
tables = self.db.get_table_names()
self.table_info_summaries = [
self.get_table_summary(table_name) for table_name in tables
]
def get_table_summary(self, table_name):
"""Get table summary for table.
example:
table_name(column1(column1 comment),column2(column2 comment),
column3(column3 comment) and index keys, and table comment: {table_comment})
"""
return _parse_table_summary(self.db, self.summary_template, table_name)
def table_summaries(self):
"""Get table summaries."""
return self.table_info_summaries
def _parse_db_summary(
conn: BaseConnector, summary_template: str = "{table_name}({columns})"
) -> List[str]:
"""Get db summary for database.
Args:
conn (BaseConnector): database connection
summary_template (str): summary template
"""
tables = conn.get_table_names()
table_info_summaries = [
_parse_table_summary(conn, summary_template, table_name)
for table_name in tables
]
return table_info_summaries
def _parse_db_summary_with_metadata(
conn: BaseConnector,
summary_template: str = "table_name: {table_name}",
separator: str = "--table-field-separator--",
model_dimension: int = 512,
) -> List[Tuple[str, Dict[str, Any]]]:
"""Get db summary for database.
Args:
conn (BaseConnector): database connection
summary_template (str): summary template
separator(str, optional): separator used to separate table's
basic info and fields. defaults to `-- table-field-separator--`
model_dimension(int, optional): The threshold for splitting field string
"""
tables = conn.get_table_names()
table_info_summaries = [
_parse_table_summary_with_metadata(
conn, summary_template, separator, table_name, model_dimension
)
for table_name in tables
]
return table_info_summaries
def _split_columns_str(columns: List[str], model_dimension: int):
"""Split columns str.
Args:
columns (List[str]): fields string
model_dimension (int, optional): The threshold for splitting field string.
"""
result = []
current_string = ""
current_length = 0
for element_str in columns:
element_length = len(element_str)
# If adding the current element's length would exceed the threshold,
# add the current string to results and reset
if current_length + element_length > model_dimension:
result.append(current_string.strip()) # Remove trailing spaces
current_string = element_str
current_length = element_length
else:
# If current string is empty, add element directly
if current_string:
current_string += "," + element_str
else:
current_string = element_str
current_length += element_length + 1 # Add length of space
# Handle the last string segment
if current_string:
result.append(current_string.strip())
return result
def _parse_table_summary_with_metadata(
conn: BaseConnector,
summary_template: str,
separator,
table_name: str,
model_dimension=512,
) -> Tuple[str, Dict[str, Any]]:
"""Get table summary for table.
Args:
conn (BaseConnector): database connection
summary_template (str): summary template
separator(str, optional): separator used to separate table's
basic info and fields. defaults to `-- table-field-separator--`
model_dimension(int, optional): The threshold for splitting field string
Examples:
metadata: {'table_name': 'asd', 'separated': 0/1}
table_name: table1
table_comment: comment
index_keys: keys
--table-field-separator--
(column1,comment), (column2, comment), (column3, comment)
(column4,comment), (column5, comment), (column6, comment)
"""
columns = []
metadata = {"table_name": table_name, "separated": 0}
for column in conn.get_columns(table_name):
if column.get("comment"):
columns.append(f"{column['name']} ({column.get('comment')})")
else:
columns.append(f"{column['name']}")
metadata.update({"field_num": len(columns)})
separated_columns = _split_columns_str(columns, model_dimension=model_dimension)
if len(separated_columns) > 1:
metadata["separated"] = 1
column_str = "\n".join(separated_columns)
# Obtain index information
index_keys = []
raw_indexes = conn.get_indexes(table_name)
for index in raw_indexes:
if isinstance(index, tuple): # Process tuple type index information
index_name, index_creation_command = index
# Extract column names using re
matched_columns = re.findall(r"\(([^)]+)\)", index_creation_command)
if matched_columns:
key_str = ", ".join(matched_columns)
index_keys.append(f"{index_name}(`{key_str}`) ")
else:
key_str = ", ".join(index["column_names"])
index_keys.append(f"{index['name']}(`{key_str}`) ")
table_str = summary_template.format(table_name=table_name)
try:
comment = conn.get_table_comment(table_name)
except Exception:
comment = dict(text=None)
if comment.get("text"):
table_str += f"\ntable_comment: {comment.get('text')}"
if len(index_keys) > 0:
index_key_str = ", ".join(index_keys)
table_str += f"\nindex_keys: {index_key_str}"
table_str += f"\n{separator}\n{column_str}"
return table_str, metadata
def _parse_table_summary(
conn: BaseConnector, summary_template: str, table_name: str
) -> str:
"""Get table summary for table.
Args:
conn (BaseConnector): database connection
summary_template (str): summary template
table_name (str): table name
Examples:
table_name(column1(column1 comment),column2(column2 comment),
column3(column3 comment) and index keys, and table comment: {table_comment})
"""
columns = []
for column in conn.get_columns(table_name):
if column.get("comment"):
columns.append(f"{column['name']} ({column.get('comment')})")
else:
columns.append(f"{column['name']}")
column_str = ", ".join(columns)
# Obtain index information
index_keys = []
raw_indexes = conn.get_indexes(table_name)
for index in raw_indexes:
if isinstance(index, tuple): # Process tuple type index information
index_name, index_creation_command = index
# Extract column names using re
matched_columns = re.findall(r"\(([^)]+)\)", index_creation_command)
if matched_columns:
key_str = ", ".join(matched_columns)
index_keys.append(f"{index_name}(`{key_str}`) ")
else:
key_str = ", ".join(index["column_names"])
index_keys.append(f"{index['name']}(`{key_str}`) ")
table_str = summary_template.format(table_name=table_name, columns=column_str)
if len(index_keys) > 0:
index_key_str = ", ".join(index_keys)
table_str += f", and index keys: {index_key_str}"
try:
comment = conn.get_table_comment(table_name)
except Exception:
comment = dict(text=None)
if comment.get("text"):
table_str += f", and table comment: {comment.get('text')}"
return table_str