DB-GPT/dbgpt/serve/agent/evaluation/evaluation.py
明天 b124ecc10b
feat: (0.6)New UI (#1855)
Co-authored-by: 夏姜 <wenfengjiang.jwf@digital-engine.com>
Co-authored-by: aries_ckt <916701291@qq.com>
Co-authored-by: wb-lh513319 <wb-lh513319@alibaba-inc.com>
Co-authored-by: csunny <cfqsunny@163.com>
2024-08-21 17:37:45 +08:00

235 lines
8.3 KiB
Python

import asyncio
import logging
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional, Type, Union
from dbgpt.core import Embeddings, LLMClient
from dbgpt.core.awel import IteratorTrigger, JoinOperator, MapOperator
from dbgpt.core.awel.task.base import IN, OUT
from dbgpt.core.interface.evaluation import (
EVALUATE_FILE_COL_ANSWER,
EVALUATE_FILE_COL_PREDICTION,
EVALUATE_FILE_COL_PREDICTION_COST,
EVALUATE_FILE_COL_QUESTION,
DatasetType,
EvaluationMetric,
EvaluationResult,
Evaluator,
)
from dbgpt.rag.evaluation import RetrieverSimilarityMetric
from dbgpt.serve.agent.agents.controller import multi_agents
logger = logging.getLogger(__name__)
class AgentOutputOperator(MapOperator):
def __init__(self, app_code: str, **kwargs):
"""
Args:
space_name (str): The space name.
recall_score (Optional[float], optional): The recall score. Defaults to 0.3.
"""
self.app_code = app_code
super().__init__(**kwargs)
async def map(self, input_value: IN) -> OUT:
logger.info(f"AgentOutputOperator map:{input_value}")
final_output = None
try:
begin_time_ms = int(datetime.now().timestamp())
async for output in multi_agents.app_agent_chat(
conv_uid=str(uuid.uuid1()),
gpts_name=self.app_code,
user_query=input_value,
user_code="",
sys_code="",
enable_verbose=False,
stream=False,
):
role = "assistant"
content = output
if isinstance(output, dict):
content = output["markdown"]
role = output["sender"]
model = output["model"]
if model is None:
continue
final_output = content
end_time_ms = int(datetime.now().timestamp())
cost_time_ms = begin_time_ms - end_time_ms
result = {}
result[EVALUATE_FILE_COL_PREDICTION] = final_output
result[EVALUATE_FILE_COL_PREDICTION_COST] = cost_time_ms
return result
except Exception as e:
logger.warning(f"{input_value} agent evalute faild!{str(e)}")
return None
class AgentEvaluatorOperator(JoinOperator[List[EvaluationResult]]):
"""Evaluator for retriever."""
def __init__(
self,
evaluation_metrics: List[EvaluationMetric],
llm_client: Optional[LLMClient] = None,
**kwargs,
):
"""Create a new RetrieverEvaluatorOperator."""
self.llm_client = llm_client
self.evaluation_metrics = evaluation_metrics
super().__init__(combine_function=self._do_evaluation, **kwargs)
async def _do_evaluation(
self,
query: str,
prediction_result: dict,
contexts: List[str],
raw_dataset: Any = None,
) -> List[EvaluationResult]:
"""Run evaluation.
Args:
query(str): The query string.
prediction(List[str]): The retrieved chunks from the retriever.
contexts(List[str]): The contexts from dataset.
raw_dataset(Any): The raw data(single row) from dataset.
"""
if isinstance(contexts, str):
contexts = [contexts]
tasks = []
prediction = prediction_result.get(EVALUATE_FILE_COL_PREDICTION, None)
prediction_cost = prediction_result.get(EVALUATE_FILE_COL_PREDICTION_COST, None)
for metric in self.evaluation_metrics:
tasks.append(metric.compute(prediction, contexts))
task_results = await asyncio.gather(*tasks)
results = []
for result, metric in zip(task_results, self.evaluation_metrics):
results.append(
EvaluationResult(
query=query,
prediction=result.prediction,
score=result.score,
contexts=contexts,
passing=result.passing,
raw_dataset=raw_dataset,
metric_name=metric.name,
prediction_cost=prediction_cost,
)
)
return results
class AgentEvaluator(Evaluator):
"""Evaluator for relevancy.
Examples:
.. code-block:: python
import os
import asyncio
from dbgpt.rag.operators import (
EmbeddingRetrieverOperator,
RetrieverEvaluatorOperator,
)
from dbgpt.rag.evaluation import (
RetrieverEvaluator,
RetrieverSimilarityMetric,
)
from dbgpt.rag.embedding import DefaultEmbeddingFactory
from dbgpt.storage.vector_store.chroma_store import ChromaVectorConfig
from dbgpt.storage.vector_store.connector import VectorStoreConnector
from dbgpt.configs.model_config import MODEL_PATH, PILOT_PATH
embeddings = DefaultEmbeddingFactory(
default_model_name=os.path.join(MODEL_PATH, "text2vec-large-chinese"),
).create()
vector_connector = VectorStoreConnector.from_default(
"Chroma",
vector_store_config=ChromaVectorConfig(
name="my_test_schema",
persist_path=os.path.join(PILOT_PATH, "data"),
),
embedding_fn=embeddings,
)
dataset = [
{
"query": "what is awel talk about",
"contexts": [
"Through the AWEL API, you can focus on the development"
" of business logic for LLMs applications without paying "
"attention to cumbersome model and environment details."
],
},
]
evaluator = RetrieverEvaluator(
operator_cls=EmbeddingRetrieverOperator,
embeddings=embeddings,
operator_kwargs={
"top_k": 5,
"vector_store_connector": vector_connector,
},
)
results = asyncio.run(evaluator.evaluate(dataset))
"""
def __init__(
self,
operator_cls: Type[MapOperator],
llm_client: Optional[LLMClient] = None,
embeddings: Optional[Embeddings] = None,
operator_kwargs: Optional[Dict] = None,
):
"""Create a new RetrieverEvaluator."""
if not operator_kwargs:
operator_kwargs = {}
self._operator_cls = operator_cls
self._operator_kwargs: Dict[str, Any] = operator_kwargs
self.embeddings = embeddings
super().__init__(llm_client=llm_client)
async def evaluate(
self,
dataset: DatasetType,
metrics: Optional[List[EvaluationMetric]] = None,
query_key: str = EVALUATE_FILE_COL_QUESTION,
contexts_key: str = EVALUATE_FILE_COL_ANSWER,
prediction_key: str = EVALUATE_FILE_COL_PREDICTION,
parallel_num: int = 1,
**kwargs,
) -> List[List[EvaluationResult]]:
"""Evaluate the dataset."""
from dbgpt.core.awel import DAG, MapOperator
if not metrics:
if not self.embeddings:
raise ValueError("embeddings are required for SimilarityMetric")
metrics = [RetrieverSimilarityMetric(self.embeddings)]
def _query_task_func(x):
logger.info(x)
return x[query_key]
with DAG("agent_evaluation_dag"):
input_task = IteratorTrigger(dataset)
query_task: MapOperator = MapOperator(_query_task_func)
agent_output_task = self._operator_cls(**self._operator_kwargs)
agent_eva_task = AgentEvaluatorOperator(
evaluation_metrics=metrics, llm_client=self.llm_client
)
input_task >> query_task
query_task >> agent_eva_task
query_task >> agent_output_task >> agent_eva_task
input_task >> MapOperator(lambda x: x[contexts_key]) >> agent_eva_task
input_task >> agent_eva_task
if parallel_num > len(dataset):
parallel_num = len(dataset)
results = await input_task.trigger(parallel_num=parallel_num)
return [item for _, item in results]