mirror of
https://github.com/csunny/DB-GPT.git
synced 2026-01-13 19:55:44 +00:00
feat: adjust benchmark data construct approach (#2948)
This commit is contained in:
@@ -40,7 +40,7 @@ from ...config import ServeConfig
|
||||
from ...models.models import ServeDao, ServeEntity
|
||||
from ..fetchdata.benchmark_data_manager import get_benchmark_manager
|
||||
from .data_compare_service import DataCompareService
|
||||
from .ext.excel_file_parse import ExcelFileParseService
|
||||
from .file_parse_service import ExcelFileParseService
|
||||
from .models import (
|
||||
BaseInputModel,
|
||||
BenchmarkDataSets,
|
||||
|
||||
@@ -1,17 +1,16 @@
|
||||
import asyncio
|
||||
import csv
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
import zipfile
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from concurrent.futures import TimeoutError as FutureTimeoutError
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple, cast
|
||||
|
||||
import aiohttp
|
||||
@@ -36,12 +35,10 @@ class BenchmarkDataConfig(BaseModel):
|
||||
db_path: str = os.path.join(
|
||||
BENCHMARK_DATA_ROOT_PATH, f"{BENCHMARK_DEFAULT_DB_SCHEMA}db"
|
||||
)
|
||||
table_mapping_file: str = os.path.join(
|
||||
BENCHMARK_DATA_ROOT_PATH, "table_mapping.json"
|
||||
)
|
||||
table_mapping_file: Optional[str] = None
|
||||
cache_expiry_days: int = 1
|
||||
repo_url: str = "https://github.com/eosphoros-ai/Falcon"
|
||||
data_dir: str = "data/source"
|
||||
repo_url: str = "https://github.com/eosphoros-ai/Falcon/tree/yifan_1216"
|
||||
data_dir: str = "dev_data/dev_databases"
|
||||
|
||||
|
||||
class BenchmarkDataManager(BaseComponent):
|
||||
@@ -56,7 +53,6 @@ class BenchmarkDataManager(BaseComponent):
|
||||
self._config = config or BenchmarkDataConfig()
|
||||
self._http_session: Optional[aiohttp.ClientSession] = None
|
||||
self._connector: Optional[SQLiteConnector] = None
|
||||
self._table_mappings = self._load_mappings()
|
||||
self._lock = asyncio.Lock()
|
||||
self.temp_dir: Optional[str] = None
|
||||
|
||||
@@ -142,59 +138,6 @@ class BenchmarkDataManager(BaseComponent):
|
||||
except Exception as e:
|
||||
logger.error(f"BenchmarkDataManager: auto load failed: {e}")
|
||||
|
||||
def _sanitize_column_name(self, name: str) -> str:
|
||||
if name is None:
|
||||
return ""
|
||||
name = str(name).strip().strip('"').strip("'")
|
||||
invalid_chars = [
|
||||
"-",
|
||||
" ",
|
||||
".",
|
||||
",",
|
||||
";",
|
||||
":",
|
||||
"!",
|
||||
"?",
|
||||
"'",
|
||||
'"',
|
||||
"(",
|
||||
")",
|
||||
"[",
|
||||
"]",
|
||||
"{",
|
||||
"}",
|
||||
"\t",
|
||||
"\r",
|
||||
"\n",
|
||||
"\x00",
|
||||
]
|
||||
while name and name[-1] in invalid_chars:
|
||||
name = name[:-1]
|
||||
for ch in invalid_chars:
|
||||
if ch in name:
|
||||
name = name.replace(ch, "_")
|
||||
while "__" in name:
|
||||
name = name.replace("__", "_")
|
||||
if name and not (name[0].isalpha() or name[0] == "_"):
|
||||
name = "_" + name
|
||||
return name.lower()
|
||||
|
||||
def _sanitize_and_dedup_headers(self, headers: List[str]) -> List[str]:
|
||||
sanitized: List[str] = []
|
||||
used: set = set()
|
||||
for idx, h in enumerate(headers):
|
||||
name = self._sanitize_column_name(h)
|
||||
if not name:
|
||||
name = f"col_{idx}"
|
||||
base = name
|
||||
k = 2
|
||||
while name in used or not name:
|
||||
name = f"{base}_{k}"
|
||||
k += 1
|
||||
used.add(name)
|
||||
sanitized.append(name)
|
||||
return sanitized
|
||||
|
||||
# ==========================================================
|
||||
|
||||
# 通用查询(阻塞实现,在线程池中调用,支持超时与可中断)
|
||||
@@ -292,7 +235,7 @@ class BenchmarkDataManager(BaseComponent):
|
||||
return result.rowcount
|
||||
|
||||
if timeout is not None:
|
||||
# 使用ThreadPoolExecutor实现超时控制,类似于基类中DuckDB的实现
|
||||
# 使用ThreadPoolExecutor实现超时控制
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(_execute_write)
|
||||
try:
|
||||
@@ -307,13 +250,7 @@ class BenchmarkDataManager(BaseComponent):
|
||||
async def query(
|
||||
self, query: str, params: tuple = (), timeout: Optional[float] = None
|
||||
) -> List[Dict]:
|
||||
"""Execute query and return results as dict list
|
||||
|
||||
Args:
|
||||
query: SQL query string
|
||||
params: Query parameters
|
||||
timeout: Query timeout in seconds (optional)
|
||||
"""
|
||||
"""Execute query and return results as dict list"""
|
||||
await self.init_connector()
|
||||
cols, rows = await self._run_in_thread(
|
||||
self._query_blocking, query, params, timeout
|
||||
@@ -321,7 +258,7 @@ class BenchmarkDataManager(BaseComponent):
|
||||
return [dict(zip(cols, row)) for row in rows]
|
||||
|
||||
async def load_from_github(
|
||||
self, repo_url: str, data_dir: str = "data/source"
|
||||
self, repo_url: str, data_dir: str = "dev_data/dev_databases"
|
||||
) -> Dict:
|
||||
"""Main method to load data from GitHub repository"""
|
||||
try:
|
||||
@@ -330,14 +267,14 @@ class BenchmarkDataManager(BaseComponent):
|
||||
# 1. Download or use cached repository
|
||||
repo_dir = await self._download_repo_contents(repo_url)
|
||||
|
||||
# 2. Find all CSV files recursively
|
||||
csv_files = self._discover_csv_files(repo_dir, data_dir)
|
||||
if not csv_files:
|
||||
raise ValueError("No CSV files found")
|
||||
logger.info(f"Found {len(csv_files)} CSV files")
|
||||
# 2. Find all SQLite files recursively in the specified data_dir
|
||||
sqlite_files = self._discover_sqlite_files(repo_dir, data_dir)
|
||||
if not sqlite_files:
|
||||
raise ValueError(f"No SQLite files found in {data_dir}")
|
||||
logger.info(f"Found {len(sqlite_files)} SQLite files")
|
||||
|
||||
# 3. Import to SQLite
|
||||
result = await self._import_to_database(csv_files)
|
||||
# 3. Merge all SQLite files into the main database
|
||||
result = await self._merge_sqlite_databases(sqlite_files)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
@@ -389,63 +326,8 @@ class BenchmarkDataManager(BaseComponent):
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to clear cache: {str(e)}")
|
||||
|
||||
def _load_mappings(self) -> Dict[str, str]:
|
||||
"""Load table name mappings from config file"""
|
||||
if not self._config.table_mapping_file or not os.path.exists(
|
||||
self._config.table_mapping_file
|
||||
):
|
||||
logger.warning(
|
||||
f"Table mapping file not found: {self._config.table_mapping_file}"
|
||||
)
|
||||
return {}
|
||||
|
||||
try:
|
||||
with open(self._config.table_mapping_file, "r", encoding="utf-8") as f:
|
||||
mapping = json.load(f)
|
||||
return {
|
||||
key: value.split(".")[-1] if "." in value else value
|
||||
for key, value in mapping.items()
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load table mapping: {str(e)}")
|
||||
return {}
|
||||
|
||||
def _sanitize_table_name(self, name: str) -> str:
|
||||
"""Normalize table names using mappings"""
|
||||
mapped_name = self._table_mappings.get(name.lower(), name)
|
||||
if mapped_name is None:
|
||||
mapped_name = name or ""
|
||||
|
||||
invalid_chars = [
|
||||
"-",
|
||||
" ",
|
||||
".",
|
||||
",",
|
||||
";",
|
||||
":",
|
||||
"!",
|
||||
"?",
|
||||
"'",
|
||||
'"',
|
||||
"(",
|
||||
")",
|
||||
"[",
|
||||
"]",
|
||||
"{",
|
||||
"}",
|
||||
]
|
||||
while mapped_name and mapped_name[-1] in invalid_chars:
|
||||
mapped_name = mapped_name[:-1]
|
||||
for char in invalid_chars:
|
||||
if char in mapped_name:
|
||||
mapped_name = mapped_name.replace(char, "_")
|
||||
while "__" in mapped_name:
|
||||
mapped_name = mapped_name.replace("__", "_")
|
||||
|
||||
return (mapped_name or "").lower()
|
||||
|
||||
async def _download_repo_contents(self, repo_url: str) -> str:
|
||||
"""Download repository with caching"""
|
||||
"""Download repository with caching, supporting branch URLs"""
|
||||
cache_path = self._get_cache_path(repo_url)
|
||||
|
||||
# Use cache if valid
|
||||
@@ -455,21 +337,45 @@ class BenchmarkDataManager(BaseComponent):
|
||||
|
||||
# Download fresh copy
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
zip_url = (
|
||||
repo_url.replace("github.com", "api.github.com/repos") + "/zipball/main"
|
||||
)
|
||||
|
||||
# Simple parsing for github.com URLs
|
||||
github_pattern = r"github\.com/([^/]+)/([^/]+)(?:/tree/(.+))?"
|
||||
match = re.search(github_pattern, repo_url)
|
||||
|
||||
if match:
|
||||
owner, repo, branch = match.groups()
|
||||
branch = branch or "main" # Default to main if no tree/branch specified
|
||||
zip_url = f"https://api.github.com/repos/{owner}/{repo}/zipball/{branch}"
|
||||
else:
|
||||
# Fallback for generic structure or direct zip links
|
||||
if repo_url.endswith(".zip"):
|
||||
zip_url = repo_url
|
||||
else:
|
||||
# Default fallback behavior from original code
|
||||
zip_url = (
|
||||
repo_url.replace("github.com", "api.github.com/repos")
|
||||
+ "/zipball/main"
|
||||
)
|
||||
|
||||
logger.info(f"Downloading from GitHub repo: {zip_url}")
|
||||
|
||||
try:
|
||||
if self._http_session is None:
|
||||
self._http_session = aiohttp.ClientSession()
|
||||
async with self._http_session.get(zip_url) as response:
|
||||
response.raise_for_status()
|
||||
|
||||
headers = {"Accept": "application/vnd.github.v3+json"}
|
||||
async with self._http_session.get(zip_url, headers=headers) as response:
|
||||
if response.status != 200:
|
||||
text_resp = await response.text()
|
||||
raise RuntimeError(
|
||||
f"GitHub API Error {response.status}: {text_resp}"
|
||||
)
|
||||
|
||||
zip_path = os.path.join(self.temp_dir, "repo.zip")
|
||||
|
||||
with open(zip_path, "wb") as f:
|
||||
while True:
|
||||
chunk = await response.content.read(1024)
|
||||
chunk = await response.content.read(1024 * 1024) # 1MB chunks
|
||||
if not chunk:
|
||||
break
|
||||
f.write(chunk)
|
||||
@@ -515,297 +421,112 @@ class BenchmarkDataManager(BaseComponent):
|
||||
raise ValueError("No valid directory found after extraction")
|
||||
return os.path.join(self.temp_dir, extracted_dirs[0])
|
||||
|
||||
def _discover_csv_files(self, base_dir: str, search_dir: str) -> List[Dict]:
|
||||
"""Find all CSV files recursively"""
|
||||
def _discover_sqlite_files(self, base_dir: str, search_dir: str) -> List[str]:
|
||||
"""Find all SQLite files recursively in the search directory"""
|
||||
full_search_dir = os.path.join(base_dir, search_dir) if search_dir else base_dir
|
||||
if not os.path.exists(full_search_dir):
|
||||
raise ValueError(f"Directory not found: {full_search_dir}")
|
||||
|
||||
csv_files = []
|
||||
sqlite_files = []
|
||||
for root, _, files in os.walk(full_search_dir):
|
||||
for file in files:
|
||||
if file.lower().endswith(".csv"):
|
||||
rel_path = os.path.relpath(root, start=base_dir)
|
||||
csv_files.append(
|
||||
{
|
||||
"full_path": os.path.join(root, file),
|
||||
"rel_path": rel_path,
|
||||
"file_name": file,
|
||||
}
|
||||
)
|
||||
return csv_files
|
||||
if file.lower().endswith(".sqlite"):
|
||||
full_path = os.path.join(root, file)
|
||||
sqlite_files.append(full_path)
|
||||
return sqlite_files
|
||||
|
||||
async def _import_to_database(self, csv_files: List[Dict]) -> Dict:
|
||||
"""Import CSV data to SQLite"""
|
||||
async def _merge_sqlite_databases(self, sqlite_files: List[str]) -> Dict:
|
||||
"""Merge multiple SQLite files into the main database"""
|
||||
await self.init_connector()
|
||||
assert self._connector is not None
|
||||
results = {
|
||||
"total_files": len(csv_files),
|
||||
"successful": 0,
|
||||
"failed": 0,
|
||||
"tables_created": [],
|
||||
}
|
||||
|
||||
def _process_one_file(file_info: Dict) -> Tuple[bool, Optional[str]]:
|
||||
table_name = ""
|
||||
try:
|
||||
path_parts = [p for p in file_info["rel_path"].split(os.sep) if p]
|
||||
table_name = "_".join(path_parts + [Path(file_info["file_name"]).stem])
|
||||
table_name = self._sanitize_table_name(table_name)
|
||||
|
||||
with self._connector.session_scope() as session:
|
||||
session.execute(text(f'DROP TABLE IF EXISTS "{table_name}"'))
|
||||
session.commit()
|
||||
encodings = ["utf-8-sig", "utf-8", "latin-1", "iso-8859-1", "cp1252"]
|
||||
|
||||
for encoding in encodings:
|
||||
try:
|
||||
with open(file_info["full_path"], "r", encoding=encoding) as f:
|
||||
content = f.read()
|
||||
|
||||
if not content.strip():
|
||||
raise ValueError("File is empty")
|
||||
|
||||
content = content.replace("\r\n", "\n").replace("\r", "\n")
|
||||
lines = [line for line in content.split("\n") if line.strip()]
|
||||
if not lines:
|
||||
raise ValueError("No data after normalization")
|
||||
|
||||
header_line = lines[0]
|
||||
data_line = lines[1] if len(lines) > 1 else ""
|
||||
|
||||
try:
|
||||
sample_for_sniff = "\n".join(lines[:10])
|
||||
sniffer = csv.Sniffer()
|
||||
try:
|
||||
dialect = sniffer.sniff(sample_for_sniff)
|
||||
except Exception:
|
||||
# Fallback: choose delimiter by counting common
|
||||
# separators in header/data line
|
||||
delims = [",", "\t", ";", "|"]
|
||||
counts = {
|
||||
d: (header_line.count(d) if header_line else 0)
|
||||
+ (data_line.count(d) if data_line else 0)
|
||||
for d in delims
|
||||
}
|
||||
best = (
|
||||
max(counts, key=counts.get)
|
||||
if any(counts.values())
|
||||
else ","
|
||||
)
|
||||
|
||||
class _DefaultDialect(csv.Dialect):
|
||||
delimiter = best
|
||||
quotechar = '"'
|
||||
doublequote = True
|
||||
skipinitialspace = False
|
||||
lineterminator = "\n"
|
||||
quoting = csv.QUOTE_MINIMAL
|
||||
|
||||
dialect = _DefaultDialect()
|
||||
|
||||
try:
|
||||
has_header = sniffer.has_header("\n".join(lines[:50]))
|
||||
except Exception:
|
||||
has_header = True
|
||||
|
||||
header_row = (
|
||||
list(csv.reader([header_line], dialect))[0]
|
||||
if header_line
|
||||
else []
|
||||
)
|
||||
first_data_row = (
|
||||
list(csv.reader([data_line], dialect))[0]
|
||||
if data_line
|
||||
else []
|
||||
)
|
||||
|
||||
# Heuristic: if has_header is False but header_row looks
|
||||
# like names (mostly alphabetic), treat as header
|
||||
if not has_header:
|
||||
|
||||
def _looks_like_header(tokens: List[str]) -> bool:
|
||||
if not tokens:
|
||||
return False
|
||||
# 非空、重复少、字母比例高
|
||||
cleaned = [
|
||||
str(t).strip() for t in tokens if str(t).strip()
|
||||
]
|
||||
if not cleaned:
|
||||
return False
|
||||
# 允许少量数字,但大多以字母开头
|
||||
alpha_starts = sum(
|
||||
1
|
||||
for t in cleaned
|
||||
if t and (t[0].isalpha() or t[0] == "_")
|
||||
)
|
||||
return alpha_starts >= max(
|
||||
1, int(0.6 * len(cleaned))
|
||||
)
|
||||
|
||||
if _looks_like_header(header_row):
|
||||
has_header = True
|
||||
|
||||
if not has_header:
|
||||
num_cols_guess = len(header_row)
|
||||
headers = [f"col_{i}" for i in range(num_cols_guess)]
|
||||
first_data_row = header_row
|
||||
else:
|
||||
headers = header_row
|
||||
|
||||
num_cols = (
|
||||
len(first_data_row) if first_data_row else len(headers)
|
||||
)
|
||||
|
||||
# no header
|
||||
if not headers or all(
|
||||
(not str(h).strip()) for h in headers
|
||||
):
|
||||
headers = [f"col_{i}" for i in range(num_cols or 1)]
|
||||
|
||||
headers = self._sanitize_and_dedup_headers(headers)
|
||||
|
||||
if num_cols <= 0:
|
||||
num_cols = len(headers)
|
||||
headers = headers[:num_cols]
|
||||
if not headers or any(
|
||||
h is None or h == "" for h in headers
|
||||
):
|
||||
raise csv.Error("Invalid headers after sanitization")
|
||||
|
||||
create_sql = f'''
|
||||
CREATE TABLE IF NOT EXISTS "{table_name}" (
|
||||
{", ".join([f'"{h}" TEXT' for h in headers])}
|
||||
)
|
||||
'''
|
||||
insert_sql = f'''
|
||||
INSERT INTO "{table_name}" ({
|
||||
", ".join([f'"{h}"' for h in headers])
|
||||
})
|
||||
VALUES ({
|
||||
", ".join([":" + f"p{i}" for i in range(len(headers))])
|
||||
})
|
||||
'''
|
||||
|
||||
with self._connector.session_scope() as session:
|
||||
logger.debug(
|
||||
f"Table: {table_name}, headers(final): {headers}"
|
||||
)
|
||||
session.execute(text(create_sql))
|
||||
|
||||
reader = csv.reader(lines, dialect)
|
||||
if has_header:
|
||||
next(reader, None)
|
||||
|
||||
batch_params: List[Dict[str, Any]] = []
|
||||
for row in reader:
|
||||
if not row:
|
||||
continue
|
||||
if len(row) != len(headers):
|
||||
if len(row) < len(headers):
|
||||
row += [None] * (len(headers) - len(row))
|
||||
else:
|
||||
row = row[: len(headers)]
|
||||
params = {
|
||||
f"p{i}": (row[i] if i < len(row) else None)
|
||||
for i in range(len(headers))
|
||||
}
|
||||
batch_params.append(params)
|
||||
if len(batch_params) >= 1000:
|
||||
session.execute(text(insert_sql), batch_params)
|
||||
batch_params = []
|
||||
if batch_params:
|
||||
session.execute(text(insert_sql), batch_params)
|
||||
session.commit()
|
||||
|
||||
return True, table_name
|
||||
|
||||
except csv.Error:
|
||||
self._import_with_simple_split_blocking(table_name, content)
|
||||
return True, table_name
|
||||
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.warning(f"Error with encoding {encoding}: {str(e)}")
|
||||
continue
|
||||
def _worker():
|
||||
results = {
|
||||
"total_files": len(sqlite_files),
|
||||
"successful": 0,
|
||||
"failed": 0,
|
||||
"tables_merged": [],
|
||||
}
|
||||
|
||||
with self._connector.session_scope() as session:
|
||||
# 获取底层的 sqlite3 连接对象
|
||||
connection_proxy = session.connection()
|
||||
# 兼容不同版本的 SQLAlchemy 获取底层连接的方式
|
||||
try:
|
||||
with open(file_info["full_path"], "rb") as f:
|
||||
content = f.read().decode("ascii", errors="ignore")
|
||||
if content.strip():
|
||||
self._import_with_simple_split_blocking(table_name, content)
|
||||
return True, table_name
|
||||
else:
|
||||
raise ValueError("File is empty or unreadable")
|
||||
except Exception as e:
|
||||
return (
|
||||
False,
|
||||
f"Failed to process {file_info['file_name']}: {str(e)}",
|
||||
)
|
||||
# SQLAlchemy 1.4+ / 2.0
|
||||
raw_conn = connection_proxy.connection.dbapi_connection
|
||||
except AttributeError:
|
||||
try:
|
||||
# 旧版本或某些驱动
|
||||
raw_conn = connection_proxy.connection
|
||||
except AttributeError:
|
||||
# 最后的尝试
|
||||
raw_conn = session.get_bind().raw_connection()
|
||||
|
||||
except Exception as e:
|
||||
return (
|
||||
False,
|
||||
f"Failed to process {file_info.get('full_path', '')}: {str(e)}",
|
||||
)
|
||||
# 确保 raw_conn 是 sqlite3 的连接对象
|
||||
if not raw_conn:
|
||||
raise RuntimeError("Failed to get raw sqlite3 connection")
|
||||
|
||||
for file_info in csv_files:
|
||||
ok, info = await self._run_in_thread(_process_one_file, file_info)
|
||||
if ok:
|
||||
results["successful"] += 1
|
||||
if info:
|
||||
results["tables_created"].append(info)
|
||||
else:
|
||||
results["failed"] += 1
|
||||
logger.error(info)
|
||||
cursor = raw_conn.cursor()
|
||||
|
||||
return results
|
||||
for db_path in sqlite_files:
|
||||
src_alias = f"src_db_{uuid.uuid4().hex[:8]}"
|
||||
try:
|
||||
try:
|
||||
cursor.execute("PRAGMA database_list")
|
||||
attached_dbs = cursor.fetchall()
|
||||
for _, name, _ in attached_dbs:
|
||||
if name not in ("main", "temp"):
|
||||
cursor.execute(f"DETACH DATABASE {name}")
|
||||
except Exception as cleanup_err:
|
||||
logger.warning(f"Cleanup warning: {cleanup_err}")
|
||||
|
||||
def _import_with_simple_split_blocking(self, table_name: str, content: str):
|
||||
"""Fallback method for malformed CSV files (blocking, 使用 SQLAlchemy 执行)"""
|
||||
assert self._connector is not None
|
||||
content = content.replace("\r\n", "\n").replace("\r", "\n")
|
||||
lines = [line for line in content.split("\n") if line.strip()]
|
||||
if not lines:
|
||||
raise ValueError("No data found after cleaning")
|
||||
cursor.execute(f"ATTACH DATABASE ? AS {src_alias}", (db_path,))
|
||||
|
||||
first_line = lines[0]
|
||||
delimiter = "," if "," in first_line else "\t" if "\t" in first_line else ";"
|
||||
cursor.execute(
|
||||
f"SELECT name, sql FROM {src_alias}.sqlite_master "
|
||||
f"WHERE type='table' AND name NOT LIKE 'sqlite_%'"
|
||||
)
|
||||
tables = cursor.fetchall()
|
||||
|
||||
raw_headers = first_line.split(delimiter)
|
||||
headers = self._sanitize_and_dedup_headers(raw_headers)
|
||||
actual_columns = len(headers)
|
||||
for table_name, create_sql in tables:
|
||||
cursor.execute(
|
||||
"SELECT name FROM sqlite_master "
|
||||
"WHERE type='table' "
|
||||
"AND name=?",
|
||||
(table_name,),
|
||||
)
|
||||
if not cursor.fetchone():
|
||||
cursor.execute(create_sql)
|
||||
cursor.execute(
|
||||
f'INSERT INTO main."{table_name}" '
|
||||
f'SELECT * FROM {src_alias}."{table_name}"'
|
||||
)
|
||||
results["tables_merged"].append(table_name)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Table '{table_name}' exists. Skipping."
|
||||
)
|
||||
|
||||
create_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS "{table_name}" (
|
||||
{", ".join([f'"{h}" TEXT' for h in headers])}
|
||||
)
|
||||
"""
|
||||
raw_conn.commit()
|
||||
results["successful"] += 1
|
||||
|
||||
insert_sql = f"""
|
||||
INSERT INTO "{table_name}" ({", ".join([f'"{h}"' for h in headers])})
|
||||
VALUES ({", ".join([":" + f"p{i}" for i in range(actual_columns)])})
|
||||
"""
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to merge {db_path}: {e}")
|
||||
results["failed"] += 1
|
||||
try:
|
||||
raw_conn.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
try:
|
||||
cursor.execute(f"DETACH DATABASE {src_alias}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
with self._connector.session_scope() as session:
|
||||
session.execute(text(create_sql))
|
||||
batch: List[Dict[str, Any]] = []
|
||||
for line in lines[1:]:
|
||||
row = line.split(delimiter)
|
||||
if len(row) != actual_columns:
|
||||
if len(row) < actual_columns:
|
||||
row += [None] * (actual_columns - len(row))
|
||||
else:
|
||||
row = row[:actual_columns]
|
||||
params = {f"p{i}": row[i] for i in range(actual_columns)}
|
||||
batch.append(params)
|
||||
if len(batch) >= 1000:
|
||||
session.execute(text(insert_sql), batch)
|
||||
batch = []
|
||||
if batch:
|
||||
session.execute(text(insert_sql), batch)
|
||||
session.commit()
|
||||
return results
|
||||
|
||||
return await self._run_in_thread(_worker)
|
||||
|
||||
async def get_table_info_simple(self) -> List[str]:
|
||||
"""Return simplified table info: table(column1,column2,...)"""
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
{
|
||||
"data_source_10_indexdata": "ant_icube_dev.stock_exchange_index_data",
|
||||
"data_source_10_indexinfo": "ant_icube_dev.stock_exchange_index_info",
|
||||
"data_source_11_price": "ant_icube_dev.bakery_sales_price",
|
||||
"data_source_11_sales": "ant_icube_dev.bakery_sales_sale",
|
||||
"data_source_12_events1": "ant_icube_dev.google_merchandise_events",
|
||||
"data_source_12_items": "ant_icube_dev.google_merchandise_items",
|
||||
"data_source_12_users": "ant_icube_dev.google_merchandise_users",
|
||||
"data_source_13_features": "ant_icube_dev.walmart_features",
|
||||
"data_source_13_sales": "ant_icube_dev.walmart_sales",
|
||||
"data_source_13_stores": "ant_icube_dev.walmart_stores",
|
||||
"data_source_14_inventory": "ant_icube_dev.mexico_toy_inventory",
|
||||
"data_source_14_products": "ant_icube_dev.mexico_toy_products",
|
||||
"data_source_14_sales": "ant_icube_dev.mexico_toy_sales",
|
||||
"data_source_14_stores": "ant_icube_dev.mexico_toy_stores",
|
||||
"data_source_15_cardbase": "ant_icube_dev.credit_card_card_base",
|
||||
"data_source_15_customerbase": "ant_icube_dev.credit_card_customer_base",
|
||||
"data_source_15_fraudbase": "ant_icube_dev.credit_card_fraud_base",
|
||||
"data_source_15_transactionbase": "ant_icube_dev.credit_card_transaction_base",
|
||||
"data_source_16_marks": "ant_icube_dev.school_marks",
|
||||
"data_source_16_students": "ant_icube_dev.school_students",
|
||||
"data_source_16_subjects": "ant_icube_dev.school_subject",
|
||||
"data_source_16_teachers": "ant_icube_dev.school_teachers",
|
||||
"data_source_17_df_customers": "ant_icube_dev.ecommerce_order_customers",
|
||||
"data_source_17_df_orderitems": "ant_icube_dev.ecommerce_order_order_items",
|
||||
"data_source_17_df_orders": "ant_icube_dev.ecommerce_order_orders",
|
||||
"data_source_17_df_payments": "ant_icube_dev.ecommerce_order_payments",
|
||||
"data_source_17_df_products": "ant_icube_dev.ecommerce_order_products",
|
||||
"data_source_18_corruption": "ant_icube_dev.world_economic_corruption",
|
||||
"data_source_18_cost_of_living": "ant_icube_dev.world_economic_cost_of_living",
|
||||
"data_source_18_richest_countries": "ant_icube_dev.world_economic_richest_countries",
|
||||
"data_source_18_tourism": "ant_icube_dev.world_economic_tourism",
|
||||
"data_source_18_unemployment": "ant_icube_dev.world_economic_unemployment",
|
||||
"data_source_19_drinks": "ant_icube_dev.alcohol_and_life_expectancy_drinks",
|
||||
"data_source_19_lifeexpectancy-verbose": "ant_icube_dev.alcohol_and_life_expectancy_verbose",
|
||||
"data_source_1_finance_data": "ant_icube_dev.di_finance_data",
|
||||
"data_source_20_drivers_data": "ant_icube_dev.city_ride_data_drivers",
|
||||
"data_source_20_rides_data": "ant_icube_dev.city_ride_data_rides",
|
||||
"data_source_21_e_customers": "ant_icube_dev.di_data_cleaning_for_customer_database_e_customers",
|
||||
"data_source_21_e_orders": "ant_icube_dev.di_data_cleaning_for_customer_database_e_orders",
|
||||
"data_source_21_e_products": "ant_icube_dev.di_data_cleaning_for_customer_database_e_products",
|
||||
"data_source_22_ufc_country_data": "ant_icube_dev.ufc_country_data",
|
||||
"data_source_22_ufc_events_stats": "ant_icube_dev.ufc_events_stats",
|
||||
"data_source_22_ufc_fighters_stats": "ant_icube_dev.ufc_fighters_stats",
|
||||
"data_source_23_ben10_aliens": "ant_icube_dev.di_ben10_alien_universe_realistic_battle_dataset_aliens",
|
||||
"data_source_23_ben10_battles": "ant_icube_dev.di_ben10_alien_universe_realistic_battle_dataset_battles",
|
||||
"data_source_23_ben10_enemies": "ant_icube_dev.di_ben10_alien_universe_realistic_battle_dataset_enemies",
|
||||
"data_source_24_blinkit_customer_feedback": "ant_icube_dev.blinkit_customers",
|
||||
"data_source_24_blinkit_customers": "ant_icube_dev.blinkit_customers",
|
||||
"data_source_24_blinkit_delivery_performance": "ant_icube_dev.blinkit_delivery_performance",
|
||||
"data_source_24_blinkit_inventory": "ant_icube_dev.blinkit_inventory",
|
||||
"data_source_24_blinkit_inventorynew": "ant_icube_dev.blinkit_inventory",
|
||||
"data_source_24_blinkit_marketing_performance": "ant_icube_dev.blinkit_delivery_performance",
|
||||
"data_source_24_blinkit_order_items": "ant_icube_dev.blinkit_order_items",
|
||||
"data_source_24_blinkit_orders": "ant_icube_dev.blinkit_orders",
|
||||
"data_source_24_blinkit_products": "ant_icube_dev.blinkit_products",
|
||||
"data_source_25_bakutech_bakutech_product_categories": "ant_icube_dev.tech_sales_product_categories",
|
||||
"data_source_25_bakutech_bakutech_product_subcategories": "ant_icube_dev.tech_sales_product_subcategories",
|
||||
"data_source_25_bakutech_bakutech_sales_data": "ant_icube_dev.tech_sales_sales_data",
|
||||
"data_source_25_bakutech_bakutech_assets": "ant_icube_dev.tech_sales_assets",
|
||||
"data_source_25_bakutech_bakutech_customer_lookup": "ant_icube_dev.tech_sales_customer_lookup",
|
||||
"data_source_25_bakutech_bakutech_dates": "ant_icube_dev.tech_sales_dates",
|
||||
"data_source_25_bakutech_bakutech_product_returns": "ant_icube_dev.tech_sales_product_returns",
|
||||
"data_source_25_bakutech_bakutech_products_lookup": "ant_icube_dev.tech_sales_product_lookup",
|
||||
"data_source_26_appearances": "ant_icube_dev.football_appereances",
|
||||
"data_source_26_games": "ant_icube_dev.football_games",
|
||||
"data_source_26_leagues": "ant_icube_dev.football_leagues",
|
||||
"data_source_26_players": "ant_icube_dev.football_players",
|
||||
"data_source_26_shots": "ant_icube_dev.football_shots",
|
||||
"data_source_26_teams": "ant_icube_dev.football_teams",
|
||||
"data_source_26_teamstats": "ant_icube_dev.football_teamstats",
|
||||
"data_source_27_categories": "ant_icube_dev.grocery_sales_categories",
|
||||
"data_source_27_cities": "ant_icube_dev.grocery_sales_cities",
|
||||
"data_source_27_countries": "ant_icube_dev.grocery_sales_countries",
|
||||
"data_source_27_customers": "ant_極cube_dev.grocery_sales_customers",
|
||||
"data_source_27_employees": "ant_icube_dev.grocery_sales_employees",
|
||||
"data_source_27_products": "ant_icube_dev.grocery_sales_products",
|
||||
"data_source_27_sales": "ant_icube_dev.grocery_sales_sales",
|
||||
"data_source_28_customers": "ant_icube_dev.online_shop_customers",
|
||||
"data_source_28_order_items": "ant_icube_dev.online_shop_order_items",
|
||||
"data_source_28_orders": "ant_icube_dev.online_shop_orders",
|
||||
"data_source_28_payment": "ant_icube_dev.online_shop_payment",
|
||||
"data_source_28_products": "ant_icube_dev.online_shop_products",
|
||||
"data_source_28_reviews": "ant_icube_dev.online_shop_reviews",
|
||||
"data_source_28_shipments": "ant_icube_dev.online_shop_shipments",
|
||||
"data_source_28_suppliers": "ant_icube_dev.online_shop_suppliers",
|
||||
"data_source_2_finance_loan_approval_prediction_data": "ant_icube_dev.di_finance_loan_approval_prediction_data",
|
||||
"data_source_3_stock_details_5_years 3": "ant_icube_dev.di_massive_yahoo_finance_dataset_0805",
|
||||
"data_source_4_wa_fn-usec_-accounts-receivable 2": "ant_icube_dev.di_finance_factoring_ibm_late_payment_histories",
|
||||
"data_source_5_unicorns till sep 2022": "ant_icube_dev.di_unicorn_startups",
|
||||
"data_source_6_sales dataset": "ant_icube_dev.di_sales_dataset",
|
||||
"data_source_7_vgsales": "ant_icube_dev.di_video_game_sales",
|
||||
"data_source_8_googleplaystore": "ant_icube_dev.di_google_play_store_apps",
|
||||
"data_source_9_final": "ant_icube_dev.di_global_lnternet_users"
|
||||
}
|
||||
Reference in New Issue
Block a user