feat(benchmark): update benchmark task status & benchmark task info list

This commit is contained in:
alan.cl
2025-10-16 16:21:04 +08:00
parent 2a823ee25c
commit 8e025c8323
8 changed files with 292 additions and 69 deletions

View File

@@ -1,5 +1,4 @@
import asyncio
import json
import logging
from functools import cache
from typing import List, Optional
@@ -210,7 +209,13 @@ async def get_compare_run_detail(evaluate_code: str, limit: int = 200, offset: i
summaries = dao.list_summaries_by_task(evaluate_code, limit=10000, offset=0)
if not summaries:
return Result.succ(
{"evaluate_code": evaluate_code, "summaries": [], "metrics": {}, "cotTokens": {"total": 0, "byModel": {}}})
{
"evaluate_code": evaluate_code,
"summaries": [],
"metrics": {},
"cotTokens": {"total": 0, "byModel": {}},
}
)
detail_list = []
total_counts = {"right": 0, "wrong": 0, "failed": 0, "exception": 0}
@@ -262,7 +267,6 @@ async def execute_benchmark_task(
Returns:
Result: The response
"""
# 使用FastAPI的BackgroundTasks来执行后台任务
background_tasks.add_task(
_run_benchmark_task_sync,
service,
@@ -298,21 +302,19 @@ async def benchmark_task_list(
)
)
@router.get("/benchmark/list_datasets", dependencies=[Depends(check_api_key)])
async def list_benchmark_datasets():
manager = get_benchmark_manager(global_system_app)
info = await manager.get_table_info()
result = [
{
"dataset_id": "1",
"name": "Falcon",
"tableCount": len(info.items())
}
]
result = [{"dataset_id": "1", "name": "Falcon", "tableCount": len(info.items())}]
return Result.succ(result)
@router.get("/benchmark/dataset/{dataset_id}", dependencies=[Depends(check_api_key)])
async def list_benchmark_dataset_tables(dataset_id: str, limit: int = 200, offset: int = 0):
async def list_benchmark_dataset_tables(
dataset_id: str, limit: int = 200, offset: int = 0
):
if dataset_id == "1":
manager = get_benchmark_manager(global_system_app)
info = await manager.get_table_info()
@@ -329,8 +331,11 @@ async def list_benchmark_dataset_tables(dataset_id: str, limit: int = 200, offse
return Result.succ("dataset not found")
@router.get("/benchmark/dataset/{dataset_id}/{table}/rows", dependencies=[Depends(check_api_key)])
async def get_benchmark_table_rows(dataset_id:str, table: str, limit: int = 10):
@router.get(
"/benchmark/dataset/{dataset_id}/{table}/rows",
dependencies=[Depends(check_api_key)],
)
async def get_benchmark_table_rows(dataset_id: str, table: str, limit: int = 10):
if dataset_id == "1":
manager = get_benchmark_manager(global_system_app)
info = await manager.get_table_info()
@@ -342,6 +347,7 @@ async def get_benchmark_table_rows(dataset_id:str, table: str, limit: int = 10):
else:
return Result.succ("dataset not found")
@router.get("/benchmark_result_download", dependencies=[Depends(check_api_key)])
async def download_benchmark_result(
evaluate_code: Optional[str] = Query(default=None, description="evaluate code"),
@@ -418,6 +424,7 @@ async def list_benchmark_tasks(limit: int = 50, offset: int = 0):
)
return Result.succ(result)
def init_endpoints(system_app: SystemApp, config: ServeConfig) -> None:
"""Initialize the endpoints"""
global global_system_app

View File

@@ -70,6 +70,7 @@ class BenchmarkServeRequest(BaseModel):
evaluate_code: Optional[str] = Field(None, description="evaluation code")
scene_key: Optional[str] = Field(None, description="evaluation scene key")
scene_value: Optional[str] = Field(None, description="evaluation scene value")
datasets_name: Optional[str] = Field(None, description="evaluation datasets name")
input_file_path: Optional[str] = Field(
None, description="input benchmark file path"
)
@@ -77,6 +78,7 @@ class BenchmarkServeRequest(BaseModel):
model_list: Optional[List[str]] = Field(
None, description="execute benchmark model name list"
)
context: Optional[dict] = Field(None, description="The context of the evaluate")
user_name: Optional[str] = Field(None, description="user name")
user_id: Optional[str] = Field(None, description="user id")
sys_code: Optional[str] = Field(None, description="system code")
@@ -84,10 +86,19 @@ class BenchmarkServeRequest(BaseModel):
state: Optional[str] = Field(None, description="evaluation state")
temperature: Optional[str] = Field(None, description="evaluation state")
max_tokens: Optional[str] = Field(None, description="evaluation state")
log_info: Optional[str] = Field(None, description="evaluation log_info")
gmt_create: Optional[str] = Field(None, description="create time")
gmt_modified: Optional[str] = Field(None, description="create time")
class BenchmarkServeResponse(BenchmarkServeRequest):
cost_time: Optional[int] = Field(None, description="evaluation cost time")
round_time: Optional[int] = Field(None, description="evaluation round time")
class Config:
title = f"BenchmarkServeResponse for {SERVE_APP_NAME_HUMP}"
class StorageType(Enum):
FILE = "FILE"
OSS = "OSS"

View File

@@ -1,4 +1,3 @@
import json
import logging
from datetime import datetime
from typing import List, Optional
@@ -87,7 +86,9 @@ class BenchmarkSummaryEntity(Model):
__tablename__ = "benchmark_summary"
__table_args__ = (
UniqueConstraint("round_id", "output_path", "llm_code", name="uk_round_output_llm"),
UniqueConstraint(
"round_id", "output_path", "llm_code", name="uk_round_output_llm"
),
)
id = Column(
@@ -97,7 +98,11 @@ class BenchmarkSummaryEntity(Model):
output_path = Column(
String(512), nullable=False, comment="Original output file path"
)
evaluate_code = Column(String(255), nullable=True, comment="Task evaluate_code (unique id per submitted task)")
evaluate_code = Column(
String(255),
nullable=True,
comment="Task evaluate_code (unique id per submitted task)",
)
llm_code = Column(String(255), nullable=True, comment="LLM code for this summary")
right = Column(Integer, default=0, comment="RIGHT count")
@@ -119,6 +124,7 @@ class BenchmarkSummaryEntity(Model):
class BenchmarkResultDao(BaseDao):
"""DAO for benchmark summary results."""
def upsert_summary(
self,
round_id: int,
@@ -130,7 +136,7 @@ class BenchmarkResultDao(BaseDao):
exception: int,
evaluate_code: Optional[str] = None,
):
"""Upsert summary counts directly into DB (per llm_code), with task serial no."""
"""Upsert summary counts directly into DB (per llm_code) with task"""
with self.session() as session:
existing = (
session.query(BenchmarkSummaryEntity)

View File

@@ -9,8 +9,10 @@ from typing import Any, Dict, Union
from sqlalchemy import Column, DateTime, Integer, String, Text, UniqueConstraint
from dbgpt._private.pydantic import model_to_dict
from dbgpt.agent.core.schema import Status
from dbgpt.storage.metadata import BaseDao, Model
from dbgpt.storage.metadata._base_dao import QUERY_SPEC, REQ, RES
from ..api.schemas import EvaluateServeRequest, EvaluateServeResponse
from ..config import ServeConfig
@@ -164,3 +166,33 @@ class ServeDao(BaseDao[ServeEntity, EvaluateServeRequest, EvaluateServeResponse]
gmt_create=gmt_created_str,
gmt_modified=gmt_modified_str,
)
def update(self, query_request: QUERY_SPEC, update_request: REQ) -> RES:
"""Update an entity object.
Args:
query_request (REQ): The request schema object or dict for query.
update_request (REQ): The request schema object for update.
Returns:
RES: The response schema object.
"""
with self.session() as session:
query = self._create_query_object(session, query_request)
entry = query.first()
if entry is None:
raise Exception("Invalid request")
update_request = (
update_request
if isinstance(update_request, dict)
else model_to_dict(update_request)
)
for key, value in update_request.items(): # type: ignore
if isinstance(value, dict) or isinstance(value, list):
value = json.dumps(value, ensure_ascii=False)
if value is not None:
setattr(entry, key, value)
session.merge(entry)
# res = self.get_one(self.to_request(entry))
# if not res:
# raise Exception("Update failed")
return self.to_response(entry)

View File

@@ -24,6 +24,7 @@ from ....prompt.service.service import Service as PromptService
from ....rag.service.service import Service as RagService
from ....rag.storage_manager import StorageManager
from ...api.schemas import (
BenchmarkServeResponse,
EvaluateServeRequest,
EvaluateServeResponse,
EvaluationScene,
@@ -254,51 +255,67 @@ class BenchmarkService(
output_file_path,
)
# read input file
input_list: List[BaseInputModel] = (
self.user_input_execute_service.read_input_file(input_file_path)
)
result_list = []
for i in range(1, config.round_time + 1):
round_result_list: List[BenchmarkTaskResult[OutputType]] = []
llm_index = 0
for llm_code, thread_num in config.llm_thread_map.items():
# 每个llm_code对应的偏移量llm_index * input_list长度
offset = len(input_list) * llm_index
llm_result = BenchmarkTaskResult[OutputType]()
try:
llm_result = self.batch_execute(
config,
input_list,
llm_code,
thread_num,
i,
output_file_path,
offset,
)
except Exception as e:
logger.error(f"batch execute error! {e}, llm_code: {llm_code}")
if llm_result is not None:
round_result_list.append(llm_result)
llm_index += 1
self.post_dispatch(
i,
config,
input_list,
round_result_list,
input_file_path,
output_file_path,
start_time = time.time()
try:
# read input file
input_list: List[BaseInputModel] = (
self.user_input_execute_service.read_input_file(input_file_path)
)
for i in range(1, config.round_time + 1):
round_result_list: List[BenchmarkTaskResult[OutputType]] = []
llm_index = 0
for llm_code, thread_num in config.llm_thread_map.items():
# 每个llm_code对应的偏移量llm_index * input_list长度
offset = len(input_list) * llm_index
llm_result = BenchmarkTaskResult[OutputType]()
try:
llm_result = self.batch_execute(
config,
input_list,
llm_code,
thread_num,
i,
output_file_path,
offset,
)
except Exception as e:
logger.error(f"batch execute error! {e}, llm_code: {llm_code}")
if llm_result is not None:
round_result_list.append(llm_result)
llm_index += 1
self.post_dispatch(
i,
config,
input_list,
round_result_list,
input_file_path,
output_file_path,
)
result_list.extend(round_result_list)
cost_time = int(time.time() - start_time)
self._update_benchmark_task_status(
evaluate_code, Status.COMPLETE.value, cost_time
)
except Exception as e:
logger.error(
f"Benchmark execution failed: {e}, evaluate_code: {evaluate_code}"
)
cost_time = int(time.time() - start_time)
self._update_benchmark_task_status(
evaluate_code, Status.FAILED.value, cost_time, error_message=str(e)
)
result_list.extend(round_result_list)
logger.info(
f"Benchmark task completed successfully for evaluate_code:"
f" {evaluate_code}, output_file_path: {output_file_path}"
f" {evaluate_code}, output_file_path: {output_file_path}, "
f"benchmark task costTime: {cost_time}"
)
return result_list
@@ -323,6 +340,61 @@ class BenchmarkService(
return config
def _update_benchmark_task_status(
self,
evaluate_code: str,
status: str,
cost_time: int,
error_message: Optional[str] = None,
) -> None:
"""
Update the status and execution time information of the benchmark task
Args:
evaluate_code: Evaluation code
status: Task status (Status.COMPLETE.value or Status.FAILED.value)
cost_time: Execution time (in seconds)
error_message: Error message
"""
try:
running_info = {"cost_time": cost_time}
# 获取现有的context数据并保留原有结构
context_data = {}
existing_entity: EvaluateServeResponse = self.dao.get_one(
{"evaluate_code": evaluate_code}
)
if existing_entity and existing_entity.context:
try:
if isinstance(existing_entity.context, dict):
context_data = existing_entity.context.copy()
elif isinstance(existing_entity.context, str):
existing_context = json.loads(existing_entity.context)
if isinstance(existing_context, dict):
context_data = existing_context.copy()
except (json.JSONDecodeError, TypeError):
context_data = {}
context_data["benchmark_running_info"] = json.dumps(
running_info, ensure_ascii=False
)
update_request = EvaluateServeRequest(
state=status,
context=context_data,
log_info=error_message,
)
self.dao.update({"evaluate_code": evaluate_code}, update_request)
logger.info(
f"Successfully updated benchmark task status to {status} "
f"with cost_time: {cost_time}s, evaluate_code: {evaluate_code}"
)
except Exception as e:
logger.error(
f"Failed to update benchmark task status to {status}: {e}, "
f"evaluate_code: {evaluate_code}"
)
def batch_execute(
self,
config: BenchmarkExecuteConfig,
@@ -531,7 +603,7 @@ class BenchmarkService(
def get_list_by_page(
self, request: EvaluateServeRequest, page: int, page_size: int
) -> PaginationResult[EvaluateServeResponse]:
) -> PaginationResult[BenchmarkServeResponse]:
"""Get a list of Evaluate entities by page
Args:
@@ -540,13 +612,95 @@ class BenchmarkService(
page_size (int): The page size
Returns:
List[EvaluateServeResponse]: The response
PaginationResult[BenchmarkServeResponse]: The response
"""
query_request = request
return self.dao.get_list_page(
original_result = self.dao.get_list_page(
query_request, page, page_size, ServeEntity.id.name
)
benchmark_items = []
for item in original_result.items:
benchmark_response = self._convert_to_benchmark_response(item)
benchmark_items.append(benchmark_response)
return PaginationResult[BenchmarkServeResponse](
items=benchmark_items,
total_count=original_result.total_count,
total_pages=original_result.total_pages,
page=original_result.page,
page_size=original_result.page_size,
)
def _convert_to_benchmark_response(
self, evaluate_response: EvaluateServeResponse
) -> BenchmarkServeResponse:
"""Convert EvaluateServeResponse to BenchmarkServeResponse
Args:
evaluate_response: The original EvaluateServeResponse
Returns:
BenchmarkServeResponse: The converted response
"""
cost_time = None
model_list = None
parallel_num = None
round_time = None
# parse context data
if evaluate_response.context:
try:
context_data = evaluate_response.context
if isinstance(context_data, str):
context_data = json.loads(context_data)
if "benchmark_config" in context_data:
benchmark_config_str = context_data["benchmark_config"]
if isinstance(benchmark_config_str, str):
benchmark_config = json.loads(benchmark_config_str)
if "llm_thread_map" in benchmark_config:
llm_thread_map = benchmark_config["llm_thread_map"]
if isinstance(llm_thread_map, dict):
model_list = list(llm_thread_map.keys())
if "thread_num" in benchmark_config:
parallel_num = benchmark_config["thread_num"]
if "round_time" in benchmark_config:
round_time = benchmark_config["round_time"]
if "benchmark_running_info" in context_data:
running_info_str = context_data["benchmark_running_info"]
if isinstance(running_info_str, str):
running_info = json.loads(running_info_str)
if "cost_time" in running_info:
cost_time = running_info["cost_time"]
except (json.JSONDecodeError, TypeError, KeyError) as e:
logger.warning(f"Failed to parse context data: {e}")
return BenchmarkServeResponse(
evaluate_code=evaluate_response.evaluate_code,
scene_key=evaluate_response.scene_key,
scene_value=evaluate_response.scene_value,
datasets_name=evaluate_response.datasets_name,
input_file_path=evaluate_response.datasets_name,
output_file_path=evaluate_response.result,
model_list=model_list,
context=evaluate_response.context,
user_name=evaluate_response.user_name,
user_id=evaluate_response.user_id,
sys_code=evaluate_response.sys_code,
parallel_num=parallel_num,
state=evaluate_response.state,
temperature=None,
max_tokens=None,
log_info=evaluate_response.log_info,
gmt_create=evaluate_response.gmt_create,
gmt_modified=evaluate_response.gmt_modified,
cost_time=cost_time,
round_time=round_time,
)
async def get_benchmark_file_stream(
self, evaluate_code: str
) -> Tuple[str, io.BytesIO]:

View File

@@ -81,8 +81,8 @@ class FileParseService(ABC):
"""Compute summary from the Excel file grouped by llmCode and return JSON list.
It reads the '<base>_round{round_id}.xlsx' file and sheet
'benchmark_compare_result', then for each llmCode counts the compareResult column
(RIGHT/WRONG/FAILED/EXCEPTION) to build summary list.
'benchmark_compare_result', then for each llmCode counts the compareResult
column (RIGHT/WRONG/FAILED/EXCEPTION) to build summary list.
"""
try:
base_name = Path(output_path).stem
@@ -120,7 +120,8 @@ class FileParseService(ABC):
)
logger.info(
f"[summary] computed per llmCode for round={round_id}, output_path={output_path} -> {summaries}"
f"[summary] computed per llmCode for round={round_id},"
f" output_path={output_path} -> {summaries}"
)
return json.dumps(summaries, ensure_ascii=False)
except Exception as e:

View File

@@ -172,6 +172,7 @@ class BenchmarkExecuteConfig:
# file path config
output_file_path: Optional[str] = None
standard_file_path: str = None
input_file_path: Optional[str] = None
# runtime execute config

View File

@@ -3,10 +3,10 @@ import logging
from typing import Dict, List, Optional, Union
from dbgpt.util.benchmarks import StorageUtil
from dbgpt_serve.evaluate.db.benchmark_db import BenchmarkResultDao
from dbgpt_serve.evaluate.service.fetchdata.benchmark_data_manager import (
get_benchmark_manager,
)
from dbgpt_serve.evaluate.db.benchmark_db import BenchmarkResultDao
from .data_compare_service import DataCompareService
from .file_parse_service import FileParseService
@@ -199,8 +199,10 @@ class UserInputExecuteService:
llm_count,
)
try:
summary_json = self.file_service.summary_and_write_multi_round_benchmark_result(
location, round_id
summary_json = (
self.file_service.summary_and_write_multi_round_benchmark_result(
location, round_id
)
)
import json as _json
@@ -212,11 +214,20 @@ class UserInputExecuteService:
wrong = int(item.get("wrong", 0))
failed = int(item.get("failed", 0))
exception = int(item.get("exception", 0))
dao.upsert_summary(round_id, location, llm_code, right, wrong, failed, exception, evaluate_code=config.evaluate_code)
dao.upsert_summary(
round_id,
location,
llm_code,
right,
wrong,
failed,
exception,
evaluate_code=config.evaluate_code,
)
except Exception as e:
logger.error(
f"[execute_llm_compare_result] summary from excel or write db failed: {e}",
exc_info=True,
f"[execute_llm_compare_result] summary from excel"
f" or write db failed: {e}",
)
def _convert_query_result_to_column_format(