mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-09-18 07:30:40 +00:00
feat(model): Add LL benchmarks code
This commit is contained in:
@@ -3,7 +3,9 @@
|
||||
|
||||
from enum import Enum
|
||||
from typing import TypedDict, Optional, Dict, List, Any
|
||||
|
||||
from dataclasses import dataclass, asdict
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pilot.utils.parameter_utils import ParameterDescription
|
||||
|
||||
@@ -47,6 +49,79 @@ class WorkerApplyType(str, Enum):
|
||||
UPDATE_PARAMS = "update_params"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelInferenceMetrics:
|
||||
"""A class to represent metrics for assessing the inference performance of a LLM."""
|
||||
|
||||
start_time_ms: Optional[int] = None
|
||||
"""The timestamp (in milliseconds) when the model inference starts."""
|
||||
|
||||
end_time_ms: Optional[int] = None
|
||||
"""The timestamp (in milliseconds) when the model inference ends."""
|
||||
|
||||
current_time_ms: Optional[int] = None
|
||||
"""The current timestamp (in milliseconds) when the model inference return partially output(stream)."""
|
||||
|
||||
first_token_time_ms: Optional[int] = None
|
||||
"""The timestamp (in milliseconds) when the first token is generated."""
|
||||
|
||||
first_completion_time_ms: Optional[int] = None
|
||||
"""The timestamp (in milliseconds) when the first completion is generated."""
|
||||
|
||||
first_completion_tokens: Optional[int] = None
|
||||
"""The number of tokens when the first completion is generated."""
|
||||
|
||||
prompt_tokens: Optional[int] = None
|
||||
"""The number of tokens in the input prompt."""
|
||||
|
||||
completion_tokens: Optional[int] = None
|
||||
"""The number of tokens in the generated completion."""
|
||||
|
||||
total_tokens: Optional[int] = None
|
||||
"""The total number of tokens (prompt plus completion)."""
|
||||
|
||||
speed_per_second: Optional[float] = None
|
||||
"""The average number of tokens generated per second."""
|
||||
|
||||
@staticmethod
|
||||
def create_metrics(
|
||||
last_metrics: Optional["ModelInferenceMetrics"] = None,
|
||||
) -> "ModelInferenceMetrics":
|
||||
start_time_ms = last_metrics.start_time_ms if last_metrics else None
|
||||
first_token_time_ms = last_metrics.first_token_time_ms if last_metrics else None
|
||||
first_completion_time_ms = (
|
||||
last_metrics.first_completion_time_ms if last_metrics else None
|
||||
)
|
||||
first_completion_tokens = (
|
||||
last_metrics.first_completion_tokens if last_metrics else None
|
||||
)
|
||||
prompt_tokens = last_metrics.prompt_tokens if last_metrics else None
|
||||
completion_tokens = last_metrics.completion_tokens if last_metrics else None
|
||||
total_tokens = last_metrics.total_tokens if last_metrics else None
|
||||
speed_per_second = last_metrics.speed_per_second if last_metrics else None
|
||||
|
||||
if not start_time_ms:
|
||||
start_time_ms = time.time_ns() // 1_000_000
|
||||
current_time_ms = time.time_ns() // 1_000_000
|
||||
end_time_ms = current_time_ms
|
||||
|
||||
return ModelInferenceMetrics(
|
||||
start_time_ms=start_time_ms,
|
||||
end_time_ms=end_time_ms,
|
||||
current_time_ms=current_time_ms,
|
||||
first_token_time_ms=first_token_time_ms,
|
||||
first_completion_time_ms=first_completion_time_ms,
|
||||
first_completion_tokens=first_completion_tokens,
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
total_tokens=total_tokens,
|
||||
speed_per_second=speed_per_second,
|
||||
)
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelOutput:
|
||||
text: str
|
||||
@@ -54,7 +129,8 @@ class ModelOutput:
|
||||
model_context: Dict = None
|
||||
finish_reason: str = None
|
||||
usage: Dict[str, Any] = None
|
||||
metrics: Dict[str, Any] = None
|
||||
metrics: Optional[ModelInferenceMetrics] = None
|
||||
|
||||
"""Some metrics for model inference"""
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
|
@@ -2,9 +2,12 @@ import os
|
||||
import logging
|
||||
from typing import Dict, Iterator, List, Optional
|
||||
|
||||
import time
|
||||
import copy
|
||||
|
||||
from pilot.configs.model_config import get_device
|
||||
from pilot.model.model_adapter import get_llm_model_adapter, LLMModelAdaper
|
||||
from pilot.model.base import ModelOutput
|
||||
from pilot.model.base import ModelOutput, ModelInferenceMetrics
|
||||
from pilot.model.loader import ModelLoader, _get_model_real_path
|
||||
from pilot.model.parameter import ModelParameters
|
||||
from pilot.model.cluster.worker_base import ModelWorker
|
||||
@@ -144,14 +147,29 @@ class DefaultModelWorker(ModelWorker):
|
||||
)
|
||||
|
||||
previous_response = ""
|
||||
last_metrics = ModelInferenceMetrics.create_metrics()
|
||||
is_first_generate = True
|
||||
|
||||
context_len = params.get("context_len") or self.context_len
|
||||
for output in generate_stream_func(
|
||||
self.model, self.tokenizer, params, get_device(), context_len
|
||||
):
|
||||
model_output, incremental_output, output_str = self._handle_output(
|
||||
output, previous_response, model_context
|
||||
(
|
||||
model_output,
|
||||
incremental_output,
|
||||
output_str,
|
||||
current_metrics,
|
||||
) = self._handle_output(
|
||||
output,
|
||||
previous_response,
|
||||
model_context,
|
||||
last_metrics,
|
||||
is_first_generate,
|
||||
)
|
||||
if is_first_generate:
|
||||
is_first_generate = False
|
||||
previous_response = output_str
|
||||
last_metrics = current_metrics
|
||||
yield model_output
|
||||
print(
|
||||
f"\n\nfull stream output:\n{previous_response}\n\nmodel generate_stream params:\n{params}"
|
||||
@@ -191,13 +209,28 @@ class DefaultModelWorker(ModelWorker):
|
||||
previous_response = ""
|
||||
context_len = params.get("context_len") or self.context_len
|
||||
|
||||
last_metrics = ModelInferenceMetrics.create_metrics()
|
||||
is_first_generate = True
|
||||
async for output in generate_stream_func(
|
||||
self.model, self.tokenizer, params, get_device(), context_len
|
||||
):
|
||||
model_output, incremental_output, output_str = self._handle_output(
|
||||
output, previous_response, model_context
|
||||
(
|
||||
model_output,
|
||||
incremental_output,
|
||||
output_str,
|
||||
current_metrics,
|
||||
) = self._handle_output(
|
||||
output,
|
||||
previous_response,
|
||||
model_context,
|
||||
last_metrics,
|
||||
is_first_generate,
|
||||
)
|
||||
if is_first_generate:
|
||||
is_first_generate = False
|
||||
|
||||
previous_response = output_str
|
||||
last_metrics = current_metrics
|
||||
yield model_output
|
||||
print(
|
||||
f"\n\nfull stream output:\n{previous_response}\n\nmodel generate_stream params:\n{params}"
|
||||
@@ -262,7 +295,14 @@ class DefaultModelWorker(ModelWorker):
|
||||
|
||||
return params, model_context, generate_stream_func, model_span
|
||||
|
||||
def _handle_output(self, output, previous_response, model_context):
|
||||
def _handle_output(
|
||||
self,
|
||||
output,
|
||||
previous_response,
|
||||
model_context,
|
||||
last_metrics: ModelInferenceMetrics,
|
||||
is_first_generate: bool,
|
||||
):
|
||||
finish_reason = None
|
||||
usage = None
|
||||
if isinstance(output, dict):
|
||||
@@ -273,14 +313,17 @@ class DefaultModelWorker(ModelWorker):
|
||||
logger.info(f"finish_reason: {finish_reason}")
|
||||
incremental_output = output[len(previous_response) :]
|
||||
print(incremental_output, end="", flush=True)
|
||||
|
||||
metrics = _new_metrics_from_model_output(last_metrics, is_first_generate, usage)
|
||||
model_output = ModelOutput(
|
||||
text=output,
|
||||
error_code=0,
|
||||
model_context=model_context,
|
||||
finish_reason=finish_reason,
|
||||
usage=usage,
|
||||
metrics=metrics,
|
||||
)
|
||||
return model_output, incremental_output, output
|
||||
return model_output, incremental_output, output, metrics
|
||||
|
||||
def _handle_exception(self, e):
|
||||
# Check if the exception is a torch.cuda.CudaError and if torch was imported.
|
||||
@@ -310,3 +353,49 @@ def _parse_model_max_length(model, tokenizer) -> Optional[int]:
|
||||
return model_config.max_position_embeddings
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _new_metrics_from_model_output(
|
||||
last_metric: ModelInferenceMetrics,
|
||||
is_first_generate: bool,
|
||||
usage: Optional[Dict] = None,
|
||||
) -> ModelInferenceMetrics:
|
||||
metrics = ModelInferenceMetrics.create_metrics(last_metric)
|
||||
if is_first_generate:
|
||||
logger.info(f"is_first_generate, usage: {usage}")
|
||||
metrics.first_completion_time_ms = time.time_ns() // 1_000_000
|
||||
|
||||
if not usage or not isinstance(usage, dict):
|
||||
return metrics
|
||||
prompt_tokens = usage.get("prompt_tokens")
|
||||
completion_tokens = usage.get("completion_tokens")
|
||||
total_tokens = usage.get("total_tokens")
|
||||
|
||||
if prompt_tokens is None:
|
||||
prompt_tokens = metrics.prompt_tokens
|
||||
if completion_tokens is None:
|
||||
completion_tokens = metrics.completion_tokens
|
||||
if total_tokens is None:
|
||||
total_tokens = metrics.total_tokens
|
||||
|
||||
if is_first_generate and (completion_tokens is not None):
|
||||
# completion_tokens == 0 is prefill
|
||||
metrics.first_completion_tokens = completion_tokens
|
||||
if completion_tokens == 1:
|
||||
metrics.first_token_time_ms = metrics.first_completion_time_ms
|
||||
|
||||
if prompt_tokens:
|
||||
metrics.prompt_tokens = prompt_tokens
|
||||
if completion_tokens:
|
||||
metrics.completion_tokens = completion_tokens
|
||||
if total_tokens:
|
||||
metrics.total_tokens = total_tokens
|
||||
elif prompt_tokens and completion_tokens:
|
||||
total_tokens = prompt_tokens + completion_tokens
|
||||
metrics.total_tokens = total_tokens
|
||||
|
||||
if total_tokens:
|
||||
# time cost(seconds)
|
||||
duration = (metrics.current_time_ms - metrics.start_time_ms) / 1000.0
|
||||
metrics.speed_per_second = total_tokens / duration
|
||||
return metrics
|
||||
|
@@ -1000,11 +1000,16 @@ def run_worker_manager(
|
||||
embedding_model_name: str = None,
|
||||
embedding_model_path: str = None,
|
||||
start_listener: Callable[["WorkerManager"], None] = None,
|
||||
**kwargs,
|
||||
):
|
||||
global worker_manager
|
||||
|
||||
worker_params: ModelWorkerParameters = _parse_worker_params(
|
||||
model_name=model_name, model_path=model_path, standalone=standalone, port=port
|
||||
model_name=model_name,
|
||||
model_path=model_path,
|
||||
standalone=standalone,
|
||||
port=port,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
setup_logging(
|
||||
|
0
pilot/utils/benchmarks/__init__.py
Normal file
0
pilot/utils/benchmarks/__init__.py
Normal file
0
pilot/utils/benchmarks/llm/__init__.py
Normal file
0
pilot/utils/benchmarks/llm/__init__.py
Normal file
237
pilot/utils/benchmarks/llm/llm_benchmarks.py
Normal file
237
pilot/utils/benchmarks/llm/llm_benchmarks.py
Normal file
@@ -0,0 +1,237 @@
|
||||
from typing import Dict, List
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import csv
|
||||
import argparse
|
||||
from pilot.configs.model_config import ROOT_PATH, LLM_MODEL_CONFIG
|
||||
|
||||
from pilot.model.cluster.worker.manager import (
|
||||
run_worker_manager,
|
||||
initialize_worker_manager_in_client,
|
||||
worker_manager,
|
||||
WorkerManager,
|
||||
)
|
||||
|
||||
from pilot.model.base import ModelOutput, ModelInferenceMetrics
|
||||
from pilot.model.cluster import PromptRequest
|
||||
from pilot.scene.base_message import ModelMessage, ModelMessageRoleType
|
||||
|
||||
|
||||
# model_name = "chatglm2-6b"
|
||||
# model_name = "vicuna-7b-v1.5"
|
||||
model_name = "baichuan2-7b"
|
||||
model_path = LLM_MODEL_CONFIG[model_name]
|
||||
# or vllm
|
||||
model_type = "huggingface"
|
||||
|
||||
controller_addr = "http://127.0.0.1:5005"
|
||||
|
||||
result_csv_file = None
|
||||
|
||||
parallel_nums = [1, 2, 4, 16, 32]
|
||||
# parallel_nums = [1, 2, 4]
|
||||
|
||||
|
||||
def get_result_csv_file() -> str:
|
||||
return os.path.join(
|
||||
ROOT_PATH, f"pilot/data/{model_name}_{model_type}_benchmarks_llm.csv"
|
||||
)
|
||||
|
||||
|
||||
input_output_length_pair = [
|
||||
[64, 256],
|
||||
[64, 512],
|
||||
[64, 1024],
|
||||
[512, 1024],
|
||||
[1024, 1024],
|
||||
[1024, 2048],
|
||||
[2048, 2048],
|
||||
]
|
||||
input_lens = [64, 64]
|
||||
output_lens = [256, 512]
|
||||
|
||||
|
||||
prompt_file_map = {
|
||||
"11k": os.path.join(
|
||||
ROOT_PATH, "docker/examples/benchmarks/benchmarks_llm_11k_prompt.txt"
|
||||
)
|
||||
}
|
||||
|
||||
METRICS_HEADERS = [
|
||||
# Params
|
||||
"model_name",
|
||||
"parallel_nums",
|
||||
"input_length",
|
||||
"output_length",
|
||||
# Merge parallel result
|
||||
"test_time_cost_ms",
|
||||
"test_total_tokens",
|
||||
"test_speed_per_second",
|
||||
# Detail for each task
|
||||
"start_time_ms",
|
||||
"end_time_ms",
|
||||
"current_time_ms",
|
||||
"first_token_time_ms",
|
||||
"first_completion_time_ms",
|
||||
"first_completion_tokens",
|
||||
"prompt_tokens",
|
||||
"completion_tokens",
|
||||
"total_tokens",
|
||||
"speed_per_second",
|
||||
]
|
||||
|
||||
|
||||
def read_prompt_from_file(file_key: str) -> str:
|
||||
full_path = prompt_file_map[file_key]
|
||||
with open(full_path, "r+", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def build_param(
|
||||
input_len: int,
|
||||
output_len: int,
|
||||
user_input: str,
|
||||
system_prompt: str = None,
|
||||
) -> Dict:
|
||||
hist = []
|
||||
if system_prompt:
|
||||
hist.append()(
|
||||
ModelMessage(role=ModelMessageRoleType.SYSTEM, content=system_prompt)
|
||||
)
|
||||
hist.append(ModelMessage(role=ModelMessageRoleType.HUMAN, content=user_input))
|
||||
hist = list(h.dict() for h in hist)
|
||||
context_len = input_len + output_len
|
||||
params = {
|
||||
"prompt": user_input,
|
||||
"messages": hist,
|
||||
"model": model_name,
|
||||
"echo": False,
|
||||
"max_new_tokens": output_len,
|
||||
"context_len": context_len,
|
||||
}
|
||||
return params
|
||||
|
||||
|
||||
async def run_batch(
|
||||
wh, input_len: int, output_len: int, parallel_num: int, output_file: str
|
||||
):
|
||||
tasks = []
|
||||
prompt = read_prompt_from_file("11k")
|
||||
for _ in range(parallel_num):
|
||||
params = build_param(input_len, output_len, prompt)
|
||||
tasks.append(wh.generate(params))
|
||||
print(
|
||||
f"Begin run benchmarks, model name: {model_name}, input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, save result to {output_file}"
|
||||
)
|
||||
start_time_ms = time.time_ns() // 1_000_000
|
||||
results: List[ModelOutput] = await asyncio.gather(*tasks)
|
||||
end_time_ms = time.time_ns() // 1_000_000
|
||||
|
||||
test_time_cost_ms = end_time_ms - start_time_ms
|
||||
test_total_tokens = 0
|
||||
rows = []
|
||||
for r in results:
|
||||
metrics = r.metrics
|
||||
if isinstance(metrics, dict):
|
||||
metrics = ModelInferenceMetrics(**metrics)
|
||||
test_total_tokens += metrics.total_tokens
|
||||
row_data = metrics.to_dict()
|
||||
rows.append(row_data)
|
||||
test_speed_per_second = test_total_tokens / (test_time_cost_ms / 1000.0)
|
||||
|
||||
with open(output_file, "a", newline="", encoding="utf-8") as f:
|
||||
writer = csv.DictWriter(f, fieldnames=METRICS_HEADERS)
|
||||
if f.tell() == 0:
|
||||
# Fist time
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
row["model_name"] = model_name
|
||||
row["parallel_nums"] = parallel_num
|
||||
row["input_length"] = input_len
|
||||
row["output_length"] = output_len
|
||||
row["test_time_cost_ms"] = test_time_cost_ms
|
||||
row["test_total_tokens"] = test_total_tokens
|
||||
row["test_speed_per_second"] = test_speed_per_second
|
||||
writer.writerow(row)
|
||||
print(
|
||||
f"input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, save result to {output_file}"
|
||||
)
|
||||
|
||||
|
||||
async def run_model(wh: WorkerManager) -> None:
|
||||
global result_csv_file
|
||||
if not result_csv_file:
|
||||
result_csv_file = get_result_csv_file()
|
||||
if os.path.exists(result_csv_file):
|
||||
os.rename(result_csv_file, f"{result_csv_file}.bak.csv")
|
||||
for parallel_num in parallel_nums:
|
||||
for input_len, output_len in zip(input_lens, output_lens):
|
||||
await run_batch(wh, input_len, output_len, parallel_num, result_csv_file)
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def startup_llm_env():
|
||||
from fastapi import FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
initialize_worker_manager_in_client(
|
||||
app=app,
|
||||
model_name=model_name,
|
||||
model_path=model_path,
|
||||
run_locally=False,
|
||||
controller_addr=controller_addr,
|
||||
local_port=6000,
|
||||
start_listener=run_model,
|
||||
# system_app=system_app,
|
||||
)
|
||||
|
||||
|
||||
def connect_to_remote_model():
|
||||
startup_llm_env()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--model_name", type=str, default=model_name)
|
||||
parser.add_argument("--model_path", type=str, default=None)
|
||||
parser.add_argument("--model_type", type=str, default="huggingface")
|
||||
parser.add_argument("--result_csv_file", type=str, default=None)
|
||||
parser.add_argument("--input_lens", type=str, default="64,64,64,512,1024,1024,2048")
|
||||
parser.add_argument(
|
||||
"--output_lens", type=str, default="256,512,1024,1024,1024,2048,2048"
|
||||
)
|
||||
parser.add_argument("--parallel_nums", type=str, default="1,2,4,16,32")
|
||||
parser.add_argument(
|
||||
"--remote_model", type=bool, default=False, help="Connect to remote model"
|
||||
)
|
||||
parser.add_argument("--controller_addr", type=str, default="http://127.0.0.1:8000")
|
||||
parser.add_argument("--limit_model_concurrency", type=int, default=200)
|
||||
|
||||
args = parser.parse_args()
|
||||
print(f"args: {args}")
|
||||
model_name = args.model_name
|
||||
model_path = args.model_path or LLM_MODEL_CONFIG[model_name]
|
||||
result_csv_file = args.result_csv_file
|
||||
input_lens = [int(i) for i in args.input_lens.strip().split(",")]
|
||||
output_lens = [int(i) for i in args.output_lens.strip().split(",")]
|
||||
parallel_nums = [int(i) for i in args.parallel_nums.strip().split(",")]
|
||||
remote_model = args.remote_model
|
||||
controller_addr = args.controller_addr
|
||||
limit_model_concurrency = args.limit_model_concurrency
|
||||
model_type = args.model_type
|
||||
if len(input_lens) != len(output_lens):
|
||||
raise ValueError("input_lens size must equal output_lens size")
|
||||
|
||||
if remote_model:
|
||||
connect_to_remote_model()
|
||||
else:
|
||||
run_worker_manager(
|
||||
model_name=model_name,
|
||||
model_path=model_path,
|
||||
start_listener=run_model,
|
||||
limit_model_concurrency=limit_model_concurrency,
|
||||
model_type=model_type,
|
||||
)
|
Reference in New Issue
Block a user