From 8e025c832386790a16eda2e2eefdc9c655d6a8fc Mon Sep 17 00:00:00 2001 From: "alan.cl" <1165243776@qq.com> Date: Thu, 16 Oct 2025 16:21:04 +0800 Subject: [PATCH] feat(benchmark): update benchmark task status & benchmark task info list --- .../src/dbgpt_serve/evaluate/api/endpoints.py | 33 ++- .../src/dbgpt_serve/evaluate/api/schemas.py | 11 + .../dbgpt_serve/evaluate/db/benchmark_db.py | 14 +- .../src/dbgpt_serve/evaluate/models/models.py | 32 +++ .../service/benchmark/benchmark_service.py | 240 ++++++++++++++---- .../service/benchmark/file_parse_service.py | 7 +- .../evaluate/service/benchmark/models.py | 1 + .../benchmark/user_input_execute_service.py | 23 +- 8 files changed, 292 insertions(+), 69 deletions(-) diff --git a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/api/endpoints.py b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/api/endpoints.py index 4567ac9fc..d7a58933e 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/api/endpoints.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/api/endpoints.py @@ -1,5 +1,4 @@ import asyncio -import json import logging from functools import cache from typing import List, Optional @@ -210,7 +209,13 @@ async def get_compare_run_detail(evaluate_code: str, limit: int = 200, offset: i summaries = dao.list_summaries_by_task(evaluate_code, limit=10000, offset=0) if not summaries: return Result.succ( - {"evaluate_code": evaluate_code, "summaries": [], "metrics": {}, "cotTokens": {"total": 0, "byModel": {}}}) + { + "evaluate_code": evaluate_code, + "summaries": [], + "metrics": {}, + "cotTokens": {"total": 0, "byModel": {}}, + } + ) detail_list = [] total_counts = {"right": 0, "wrong": 0, "failed": 0, "exception": 0} @@ -262,7 +267,6 @@ async def execute_benchmark_task( Returns: Result: The response """ - # 使用FastAPI的BackgroundTasks来执行后台任务 background_tasks.add_task( _run_benchmark_task_sync, service, @@ -298,21 +302,19 @@ async def benchmark_task_list( ) ) + @router.get("/benchmark/list_datasets", dependencies=[Depends(check_api_key)]) async def list_benchmark_datasets(): manager = get_benchmark_manager(global_system_app) info = await manager.get_table_info() - result = [ - { - "dataset_id": "1", - "name": "Falcon", - "tableCount": len(info.items()) - } - ] + result = [{"dataset_id": "1", "name": "Falcon", "tableCount": len(info.items())}] return Result.succ(result) + @router.get("/benchmark/dataset/{dataset_id}", dependencies=[Depends(check_api_key)]) -async def list_benchmark_dataset_tables(dataset_id: str, limit: int = 200, offset: int = 0): +async def list_benchmark_dataset_tables( + dataset_id: str, limit: int = 200, offset: int = 0 +): if dataset_id == "1": manager = get_benchmark_manager(global_system_app) info = await manager.get_table_info() @@ -329,8 +331,11 @@ async def list_benchmark_dataset_tables(dataset_id: str, limit: int = 200, offse return Result.succ("dataset not found") -@router.get("/benchmark/dataset/{dataset_id}/{table}/rows", dependencies=[Depends(check_api_key)]) -async def get_benchmark_table_rows(dataset_id:str, table: str, limit: int = 10): +@router.get( + "/benchmark/dataset/{dataset_id}/{table}/rows", + dependencies=[Depends(check_api_key)], +) +async def get_benchmark_table_rows(dataset_id: str, table: str, limit: int = 10): if dataset_id == "1": manager = get_benchmark_manager(global_system_app) info = await manager.get_table_info() @@ -342,6 +347,7 @@ async def get_benchmark_table_rows(dataset_id:str, table: str, limit: int = 10): else: return Result.succ("dataset not found") + @router.get("/benchmark_result_download", dependencies=[Depends(check_api_key)]) async def download_benchmark_result( evaluate_code: Optional[str] = Query(default=None, description="evaluate code"), @@ -418,6 +424,7 @@ async def list_benchmark_tasks(limit: int = 50, offset: int = 0): ) return Result.succ(result) + def init_endpoints(system_app: SystemApp, config: ServeConfig) -> None: """Initialize the endpoints""" global global_system_app diff --git a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/api/schemas.py b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/api/schemas.py index 67108f78e..bc54e5e1b 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/api/schemas.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/api/schemas.py @@ -70,6 +70,7 @@ class BenchmarkServeRequest(BaseModel): evaluate_code: Optional[str] = Field(None, description="evaluation code") scene_key: Optional[str] = Field(None, description="evaluation scene key") scene_value: Optional[str] = Field(None, description="evaluation scene value") + datasets_name: Optional[str] = Field(None, description="evaluation datasets name") input_file_path: Optional[str] = Field( None, description="input benchmark file path" ) @@ -77,6 +78,7 @@ class BenchmarkServeRequest(BaseModel): model_list: Optional[List[str]] = Field( None, description="execute benchmark model name list" ) + context: Optional[dict] = Field(None, description="The context of the evaluate") user_name: Optional[str] = Field(None, description="user name") user_id: Optional[str] = Field(None, description="user id") sys_code: Optional[str] = Field(None, description="system code") @@ -84,10 +86,19 @@ class BenchmarkServeRequest(BaseModel): state: Optional[str] = Field(None, description="evaluation state") temperature: Optional[str] = Field(None, description="evaluation state") max_tokens: Optional[str] = Field(None, description="evaluation state") + log_info: Optional[str] = Field(None, description="evaluation log_info") gmt_create: Optional[str] = Field(None, description="create time") gmt_modified: Optional[str] = Field(None, description="create time") +class BenchmarkServeResponse(BenchmarkServeRequest): + cost_time: Optional[int] = Field(None, description="evaluation cost time") + round_time: Optional[int] = Field(None, description="evaluation round time") + + class Config: + title = f"BenchmarkServeResponse for {SERVE_APP_NAME_HUMP}" + + class StorageType(Enum): FILE = "FILE" OSS = "OSS" diff --git a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/db/benchmark_db.py b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/db/benchmark_db.py index 55ec6b4a7..752807111 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/db/benchmark_db.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/db/benchmark_db.py @@ -1,4 +1,3 @@ -import json import logging from datetime import datetime from typing import List, Optional @@ -87,7 +86,9 @@ class BenchmarkSummaryEntity(Model): __tablename__ = "benchmark_summary" __table_args__ = ( - UniqueConstraint("round_id", "output_path", "llm_code", name="uk_round_output_llm"), + UniqueConstraint( + "round_id", "output_path", "llm_code", name="uk_round_output_llm" + ), ) id = Column( @@ -97,7 +98,11 @@ class BenchmarkSummaryEntity(Model): output_path = Column( String(512), nullable=False, comment="Original output file path" ) - evaluate_code = Column(String(255), nullable=True, comment="Task evaluate_code (unique id per submitted task)") + evaluate_code = Column( + String(255), + nullable=True, + comment="Task evaluate_code (unique id per submitted task)", + ) llm_code = Column(String(255), nullable=True, comment="LLM code for this summary") right = Column(Integer, default=0, comment="RIGHT count") @@ -119,6 +124,7 @@ class BenchmarkSummaryEntity(Model): class BenchmarkResultDao(BaseDao): """DAO for benchmark summary results.""" + def upsert_summary( self, round_id: int, @@ -130,7 +136,7 @@ class BenchmarkResultDao(BaseDao): exception: int, evaluate_code: Optional[str] = None, ): - """Upsert summary counts directly into DB (per llm_code), with task serial no.""" + """Upsert summary counts directly into DB (per llm_code) with task""" with self.session() as session: existing = ( session.query(BenchmarkSummaryEntity) diff --git a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/models/models.py b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/models/models.py index 5447f2faa..b05c5e9f2 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/models/models.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/models/models.py @@ -9,8 +9,10 @@ from typing import Any, Dict, Union from sqlalchemy import Column, DateTime, Integer, String, Text, UniqueConstraint +from dbgpt._private.pydantic import model_to_dict from dbgpt.agent.core.schema import Status from dbgpt.storage.metadata import BaseDao, Model +from dbgpt.storage.metadata._base_dao import QUERY_SPEC, REQ, RES from ..api.schemas import EvaluateServeRequest, EvaluateServeResponse from ..config import ServeConfig @@ -164,3 +166,33 @@ class ServeDao(BaseDao[ServeEntity, EvaluateServeRequest, EvaluateServeResponse] gmt_create=gmt_created_str, gmt_modified=gmt_modified_str, ) + + def update(self, query_request: QUERY_SPEC, update_request: REQ) -> RES: + """Update an entity object. + + Args: + query_request (REQ): The request schema object or dict for query. + update_request (REQ): The request schema object for update. + Returns: + RES: The response schema object. + """ + with self.session() as session: + query = self._create_query_object(session, query_request) + entry = query.first() + if entry is None: + raise Exception("Invalid request") + update_request = ( + update_request + if isinstance(update_request, dict) + else model_to_dict(update_request) + ) + for key, value in update_request.items(): # type: ignore + if isinstance(value, dict) or isinstance(value, list): + value = json.dumps(value, ensure_ascii=False) + if value is not None: + setattr(entry, key, value) + session.merge(entry) + # res = self.get_one(self.to_request(entry)) + # if not res: + # raise Exception("Update failed") + return self.to_response(entry) diff --git a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/benchmark_service.py b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/benchmark_service.py index fda5a02fd..88707c418 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/benchmark_service.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/benchmark_service.py @@ -24,6 +24,7 @@ from ....prompt.service.service import Service as PromptService from ....rag.service.service import Service as RagService from ....rag.storage_manager import StorageManager from ...api.schemas import ( + BenchmarkServeResponse, EvaluateServeRequest, EvaluateServeResponse, EvaluationScene, @@ -254,51 +255,67 @@ class BenchmarkService( output_file_path, ) - # read input file - input_list: List[BaseInputModel] = ( - self.user_input_execute_service.read_input_file(input_file_path) - ) - result_list = [] - for i in range(1, config.round_time + 1): - round_result_list: List[BenchmarkTaskResult[OutputType]] = [] - - llm_index = 0 - for llm_code, thread_num in config.llm_thread_map.items(): - # 每个llm_code对应的偏移量:llm_index * input_list长度 - offset = len(input_list) * llm_index - - llm_result = BenchmarkTaskResult[OutputType]() - try: - llm_result = self.batch_execute( - config, - input_list, - llm_code, - thread_num, - i, - output_file_path, - offset, - ) - except Exception as e: - logger.error(f"batch execute error! {e}, llm_code: {llm_code}") - - if llm_result is not None: - round_result_list.append(llm_result) - llm_index += 1 - - self.post_dispatch( - i, - config, - input_list, - round_result_list, - input_file_path, - output_file_path, + start_time = time.time() + try: + # read input file + input_list: List[BaseInputModel] = ( + self.user_input_execute_service.read_input_file(input_file_path) + ) + + for i in range(1, config.round_time + 1): + round_result_list: List[BenchmarkTaskResult[OutputType]] = [] + + llm_index = 0 + for llm_code, thread_num in config.llm_thread_map.items(): + # 每个llm_code对应的偏移量:llm_index * input_list长度 + offset = len(input_list) * llm_index + + llm_result = BenchmarkTaskResult[OutputType]() + try: + llm_result = self.batch_execute( + config, + input_list, + llm_code, + thread_num, + i, + output_file_path, + offset, + ) + except Exception as e: + logger.error(f"batch execute error! {e}, llm_code: {llm_code}") + + if llm_result is not None: + round_result_list.append(llm_result) + llm_index += 1 + + self.post_dispatch( + i, + config, + input_list, + round_result_list, + input_file_path, + output_file_path, + ) + result_list.extend(round_result_list) + + cost_time = int(time.time() - start_time) + self._update_benchmark_task_status( + evaluate_code, Status.COMPLETE.value, cost_time + ) + except Exception as e: + logger.error( + f"Benchmark execution failed: {e}, evaluate_code: {evaluate_code}" + ) + cost_time = int(time.time() - start_time) + self._update_benchmark_task_status( + evaluate_code, Status.FAILED.value, cost_time, error_message=str(e) ) - result_list.extend(round_result_list) logger.info( f"Benchmark task completed successfully for evaluate_code:" - f" {evaluate_code}, output_file_path: {output_file_path}" + f" {evaluate_code}, output_file_path: {output_file_path}, " + f"benchmark task costTime: {cost_time}" ) return result_list @@ -323,6 +340,61 @@ class BenchmarkService( return config + def _update_benchmark_task_status( + self, + evaluate_code: str, + status: str, + cost_time: int, + error_message: Optional[str] = None, + ) -> None: + """ + Update the status and execution time information of the benchmark task + + Args: + evaluate_code: Evaluation code + status: Task status (Status.COMPLETE.value or Status.FAILED.value) + cost_time: Execution time (in seconds) + error_message: Error message + """ + try: + running_info = {"cost_time": cost_time} + + # 获取现有的context数据并保留原有结构 + context_data = {} + existing_entity: EvaluateServeResponse = self.dao.get_one( + {"evaluate_code": evaluate_code} + ) + if existing_entity and existing_entity.context: + try: + if isinstance(existing_entity.context, dict): + context_data = existing_entity.context.copy() + elif isinstance(existing_entity.context, str): + existing_context = json.loads(existing_entity.context) + if isinstance(existing_context, dict): + context_data = existing_context.copy() + except (json.JSONDecodeError, TypeError): + context_data = {} + + context_data["benchmark_running_info"] = json.dumps( + running_info, ensure_ascii=False + ) + + update_request = EvaluateServeRequest( + state=status, + context=context_data, + log_info=error_message, + ) + self.dao.update({"evaluate_code": evaluate_code}, update_request) + logger.info( + f"Successfully updated benchmark task status to {status} " + f"with cost_time: {cost_time}s, evaluate_code: {evaluate_code}" + ) + except Exception as e: + logger.error( + f"Failed to update benchmark task status to {status}: {e}, " + f"evaluate_code: {evaluate_code}" + ) + def batch_execute( self, config: BenchmarkExecuteConfig, @@ -531,7 +603,7 @@ class BenchmarkService( def get_list_by_page( self, request: EvaluateServeRequest, page: int, page_size: int - ) -> PaginationResult[EvaluateServeResponse]: + ) -> PaginationResult[BenchmarkServeResponse]: """Get a list of Evaluate entities by page Args: @@ -540,13 +612,95 @@ class BenchmarkService( page_size (int): The page size Returns: - List[EvaluateServeResponse]: The response + PaginationResult[BenchmarkServeResponse]: The response """ query_request = request - return self.dao.get_list_page( + original_result = self.dao.get_list_page( query_request, page, page_size, ServeEntity.id.name ) + benchmark_items = [] + for item in original_result.items: + benchmark_response = self._convert_to_benchmark_response(item) + benchmark_items.append(benchmark_response) + + return PaginationResult[BenchmarkServeResponse]( + items=benchmark_items, + total_count=original_result.total_count, + total_pages=original_result.total_pages, + page=original_result.page, + page_size=original_result.page_size, + ) + + def _convert_to_benchmark_response( + self, evaluate_response: EvaluateServeResponse + ) -> BenchmarkServeResponse: + """Convert EvaluateServeResponse to BenchmarkServeResponse + + Args: + evaluate_response: The original EvaluateServeResponse + + Returns: + BenchmarkServeResponse: The converted response + """ + cost_time = None + model_list = None + parallel_num = None + round_time = None + + # parse context data + if evaluate_response.context: + try: + context_data = evaluate_response.context + if isinstance(context_data, str): + context_data = json.loads(context_data) + + if "benchmark_config" in context_data: + benchmark_config_str = context_data["benchmark_config"] + if isinstance(benchmark_config_str, str): + benchmark_config = json.loads(benchmark_config_str) + if "llm_thread_map" in benchmark_config: + llm_thread_map = benchmark_config["llm_thread_map"] + if isinstance(llm_thread_map, dict): + model_list = list(llm_thread_map.keys()) + if "thread_num" in benchmark_config: + parallel_num = benchmark_config["thread_num"] + if "round_time" in benchmark_config: + round_time = benchmark_config["round_time"] + + if "benchmark_running_info" in context_data: + running_info_str = context_data["benchmark_running_info"] + if isinstance(running_info_str, str): + running_info = json.loads(running_info_str) + if "cost_time" in running_info: + cost_time = running_info["cost_time"] + + except (json.JSONDecodeError, TypeError, KeyError) as e: + logger.warning(f"Failed to parse context data: {e}") + + return BenchmarkServeResponse( + evaluate_code=evaluate_response.evaluate_code, + scene_key=evaluate_response.scene_key, + scene_value=evaluate_response.scene_value, + datasets_name=evaluate_response.datasets_name, + input_file_path=evaluate_response.datasets_name, + output_file_path=evaluate_response.result, + model_list=model_list, + context=evaluate_response.context, + user_name=evaluate_response.user_name, + user_id=evaluate_response.user_id, + sys_code=evaluate_response.sys_code, + parallel_num=parallel_num, + state=evaluate_response.state, + temperature=None, + max_tokens=None, + log_info=evaluate_response.log_info, + gmt_create=evaluate_response.gmt_create, + gmt_modified=evaluate_response.gmt_modified, + cost_time=cost_time, + round_time=round_time, + ) + async def get_benchmark_file_stream( self, evaluate_code: str ) -> Tuple[str, io.BytesIO]: diff --git a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/file_parse_service.py b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/file_parse_service.py index 8e7fc6fc5..27bc8f881 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/file_parse_service.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/file_parse_service.py @@ -81,8 +81,8 @@ class FileParseService(ABC): """Compute summary from the Excel file grouped by llmCode and return JSON list. It reads the '_round{round_id}.xlsx' file and sheet - 'benchmark_compare_result', then for each llmCode counts the compareResult column - (RIGHT/WRONG/FAILED/EXCEPTION) to build summary list. + 'benchmark_compare_result', then for each llmCode counts the compareResult + column (RIGHT/WRONG/FAILED/EXCEPTION) to build summary list. """ try: base_name = Path(output_path).stem @@ -120,7 +120,8 @@ class FileParseService(ABC): ) logger.info( - f"[summary] computed per llmCode for round={round_id}, output_path={output_path} -> {summaries}" + f"[summary] computed per llmCode for round={round_id}," + f" output_path={output_path} -> {summaries}" ) return json.dumps(summaries, ensure_ascii=False) except Exception as e: diff --git a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/models.py b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/models.py index 8020c35e8..9ae85089f 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/models.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/models.py @@ -172,6 +172,7 @@ class BenchmarkExecuteConfig: # file path config output_file_path: Optional[str] = None standard_file_path: str = None + input_file_path: Optional[str] = None # runtime execute config diff --git a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/user_input_execute_service.py b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/user_input_execute_service.py index 8209cdf1f..b17ce06db 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/user_input_execute_service.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/evaluate/service/benchmark/user_input_execute_service.py @@ -3,10 +3,10 @@ import logging from typing import Dict, List, Optional, Union from dbgpt.util.benchmarks import StorageUtil +from dbgpt_serve.evaluate.db.benchmark_db import BenchmarkResultDao from dbgpt_serve.evaluate.service.fetchdata.benchmark_data_manager import ( get_benchmark_manager, ) -from dbgpt_serve.evaluate.db.benchmark_db import BenchmarkResultDao from .data_compare_service import DataCompareService from .file_parse_service import FileParseService @@ -199,8 +199,10 @@ class UserInputExecuteService: llm_count, ) try: - summary_json = self.file_service.summary_and_write_multi_round_benchmark_result( - location, round_id + summary_json = ( + self.file_service.summary_and_write_multi_round_benchmark_result( + location, round_id + ) ) import json as _json @@ -212,11 +214,20 @@ class UserInputExecuteService: wrong = int(item.get("wrong", 0)) failed = int(item.get("failed", 0)) exception = int(item.get("exception", 0)) - dao.upsert_summary(round_id, location, llm_code, right, wrong, failed, exception, evaluate_code=config.evaluate_code) + dao.upsert_summary( + round_id, + location, + llm_code, + right, + wrong, + failed, + exception, + evaluate_code=config.evaluate_code, + ) except Exception as e: logger.error( - f"[execute_llm_compare_result] summary from excel or write db failed: {e}", - exc_info=True, + f"[execute_llm_compare_result] summary from excel" + f" or write db failed: {e}", ) def _convert_query_result_to_column_format(