feat: add evaluation service module for RAG and Agent (#2070)

This commit is contained in:
Aries-ckt 2024-10-18 17:42:11 +08:00 committed by GitHub
parent 253c367ceb
commit 811ce63493
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 1263 additions and 61 deletions

View File

@ -32,7 +32,7 @@ CREATE TABLE IF NOT EXISTS `knowledge_document`
`id` int NOT NULL AUTO_INCREMENT COMMENT 'auto increment id',
`doc_name` varchar(100) NOT NULL COMMENT 'document path name',
`doc_type` varchar(50) NOT NULL COMMENT 'doc type',
`doc_token` varchar(100) NOT NULL COMMENT 'doc token',
`doc_token` varchar(100) NULL COMMENT 'doc token',
`space` varchar(50) NOT NULL COMMENT 'knowledge space',
`chunk_size` int NOT NULL COMMENT 'chunk size',
`last_sync` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT 'last sync time',
@ -56,7 +56,7 @@ CREATE TABLE IF NOT EXISTS `document_chunk`
`document_id` int NOT NULL COMMENT 'document parent id',
`content` longtext NOT NULL COMMENT 'chunk content',
`questions` text NULL COMMENT 'chunk related questions',
`meta_info` varchar(200) NOT NULL COMMENT 'metadata info',
`meta_info` text NOT NULL COMMENT 'metadata info',
`gmt_created` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'created time',
`gmt_modified` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
PRIMARY KEY (`id`),

View File

@ -122,3 +122,11 @@ def register_serve_apps(system_app: SystemApp, cfg: Config, webserver_port: int)
system_app.register(FileServe)
# ################################ File Serve Register End ########################################
# ################################ Evaluate Serve Register Begin #######################################
from dbgpt.serve.evaluate.serve import Serve as EvaluateServe
# Register serve Evaluate
system_app.register(EvaluateServe)
# ################################ Evaluate Serve Register End ########################################

View File

@ -447,7 +447,7 @@ def chunk_list(
"doc_type": query_request.doc_type,
"content": query_request.content,
}
chunk_res = service.get_chunk_list(
chunk_res = service.get_chunk_list_page(
query, query_request.page, query_request.page_size
)
res = ChunkQueryResponse(

View File

@ -0,0 +1,28 @@
"""Evaluation."""
from typing import List
from dbgpt.core.schema.api import Result
from ..core.interface.evaluation import EvaluationResult
from ..serve.evaluate.api.schemas import EvaluateServeRequest
from .client import Client, ClientException
async def run_evaluation(
client: Client, request: EvaluateServeRequest
) -> List[EvaluationResult]:
"""Run evaluation.
Args:
client (Client): The dbgpt client.
request (EvaluateServeRequest): The Evaluate Request.
"""
try:
res = await client.post("/evaluate/evaluation", request.dict())
result: Result = res.json()
if result["success"]:
return list(result["data"])
else:
raise ClientException(status=result["err_code"], reason=result)
except Exception as e:
raise ClientException(f"Failed to run evaluation: {e}")

View File

@ -287,7 +287,7 @@ class MetricManage:
def register_metric(self, cls: Type[EvaluationMetric]):
"""Register metric."""
self.metrics[cls.name] = cls
self.metrics[cls.name()] = cls
def get_by_name(self, name: str) -> Type[EvaluationMetric]:
"""Get by name."""
@ -308,4 +308,4 @@ class MetricManage:
return result
metric_mange = MetricManage()
metric_manage = MetricManage()

View File

@ -287,7 +287,7 @@ class AnswerEvaluatorOperator(JoinOperator[List[EvaluationResult]]):
contexts=contexts,
passing=result.passing,
raw_dataset=raw_dataset,
metric_name=metric.name,
metric_name=metric.name(),
feedback=result.feedback,
)
)

View File

@ -184,6 +184,7 @@ class IndexStoreBase(ABC):
max_threads,
)
@abstractmethod
def similar_search(
self, text: str, topk: int, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
@ -196,16 +197,26 @@ class IndexStoreBase(ABC):
Return:
List[Chunk]: The similar documents.
"""
return self.similar_search_with_scores(text, topk, 0.0, filters)
async def asimilar_search(
self,
query: str,
topk: int,
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Async similar_search in vector database."""
return await blocking_func_to_async_no_executor(
self.similar_search, query, topk, filters
)
async def asimilar_search_with_scores(
self,
doc: str,
query: str,
topk: int,
score_threshold: float,
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Aynsc similar_search_with_score in vector database."""
"""Async similar_search_with_score in vector database."""
return await blocking_func_to_async_no_executor(
self.similar_search_with_scores, doc, topk, score_threshold, filters
self.similar_search_with_scores, query, topk, score_threshold, filters
)

View File

@ -54,7 +54,7 @@ class RetrieverEvaluatorOperator(JoinOperator[List[EvaluationResult]]):
contexts=contexts,
passing=result.passing,
raw_dataset=raw_dataset,
metric_name=metric.name,
metric_name=metric.name(),
)
)
return results

View File

@ -10,7 +10,6 @@ from dbgpt.rag.retriever.rerank import DefaultRanker, Ranker
from dbgpt.rag.retriever.rewrite import QueryRewrite
from dbgpt.storage.vector_store.filters import MetadataFilters
from dbgpt.util.chat_util import run_async_tasks
from dbgpt.util.executor_utils import blocking_func_to_async_no_executor
from dbgpt.util.tracer import root_tracer
@ -241,9 +240,7 @@ class EmbeddingRetriever(BaseRetriever):
"query": query,
},
):
return await blocking_func_to_async_no_executor(
self._index_store.similar_search, query, self._top_k, filters
)
return await self._index_store.asimilar_search(query, self._top_k, filters)
async def _run_async_tasks(self, tasks) -> List[Chunk]:
"""Run async tasks."""

View File

@ -21,6 +21,7 @@ from dbgpt.agent import (
DefaultAWELLayoutManager,
GptsMemory,
LLMConfig,
ResourceType,
ShortTermMemory,
UserProxyAgent,
get_agent_manager,
@ -43,6 +44,7 @@ from dbgpt.serve.prompt.service import service as PromptService
from dbgpt.util.json_utils import serialize
from dbgpt.util.tracer import TracerManager
from ...rag.retriever.knowledge_space import KnowledgeSpaceRetriever
from ..db import GptsMessagesDao
from ..db.gpts_app import GptsApp, GptsAppDao, GptsAppQuery
from ..db.gpts_conversations_db import GptsConversationsDao, GptsConversationsEntity
@ -602,5 +604,26 @@ class MultiAgents(BaseComponent, ABC):
last_gpts_conversation.conv_id, Status.COMPLETE.value
)
async def get_knowledge_resources(self, app_code: str, question: str):
"""Get the knowledge resources."""
context = []
app: GptsApp = self.get_app(app_code)
if app and app.details and len(app.details) > 0:
for detail in app.details:
if detail and detail.resources and len(detail.resources) > 0:
for resource in detail.resources:
if resource.type == ResourceType.Knowledge:
retriever = KnowledgeSpaceRetriever(
space_id=str(resource.value),
top_k=CFG.KNOWLEDGE_SEARCH_TOP_SIZE,
)
chunks = await retriever.aretrieve_with_scores(
question, score_threshold=0.3
)
context.extend([chunk.content for chunk in chunks])
else:
continue
return context
multi_agents = MultiAgents(system_app)

View File

@ -116,8 +116,9 @@ class AgentEvaluatorOperator(JoinOperator[List[EvaluationResult]]):
contexts=contexts,
passing=result.passing,
raw_dataset=raw_dataset,
metric_name=metric.name,
metric_name=metric.name(),
prediction_cost=prediction_cost,
feedback=result.feedback,
)
)
return results

View File

@ -6,7 +6,13 @@ from typing import Any, List, Optional
from dbgpt.core.interface.evaluation import (
BaseEvaluationResult,
EvaluationMetric,
metric_mange,
metric_manage,
)
from dbgpt.rag.evaluation.answer import AnswerRelevancyMetric
from dbgpt.rag.evaluation.retriever import (
RetrieverHitRateMetric,
RetrieverMRRMetric,
RetrieverSimilarityMetric,
)
logger = logging.getLogger(__name__)
@ -116,5 +122,7 @@ class IntentMetric(EvaluationMetric[str, str], ABC):
)
metric_mange.register_metric(IntentMetric)
metric_mange.register_metric(AppLinkMetric)
metric_manage.register_metric(RetrieverHitRateMetric)
metric_manage.register_metric(RetrieverMRRMetric)
metric_manage.register_metric(RetrieverSimilarityMetric)
metric_manage.register_metric(AnswerRelevancyMetric)

View File

View File

View File

@ -0,0 +1,155 @@
import logging
from functools import cache
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBearer
from dbgpt.component import ComponentType, SystemApp
from dbgpt.core.interface.evaluation import metric_manage
from dbgpt.model.cluster import BaseModelController, WorkerManager, WorkerManagerFactory
from dbgpt.rag.evaluation.answer import AnswerRelevancyMetric
from dbgpt.serve.core import Result
from dbgpt.serve.evaluate.api.schemas import EvaluateServeRequest, EvaluateServeResponse
from dbgpt.serve.evaluate.config import SERVE_SERVICE_COMPONENT_NAME
from dbgpt.serve.evaluate.service.service import Service
from ...prompt.service.service import Service as PromptService
router = APIRouter()
# Add your API endpoints here
global_system_app: Optional[SystemApp] = None
logger = logging.getLogger(__name__)
def get_service() -> Service:
"""Get the service instance"""
return global_system_app.get_component(SERVE_SERVICE_COMPONENT_NAME, Service)
def get_prompt_service() -> PromptService:
return global_system_app.get_component("dbgpt_serve_prompt_service", PromptService)
def get_worker_manager() -> WorkerManager:
worker_manager = global_system_app.get_component(
ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory
).create()
return worker_manager
def get_model_controller() -> BaseModelController:
controller = global_system_app.get_component(
ComponentType.MODEL_CONTROLLER, BaseModelController
)
return controller
get_bearer_token = HTTPBearer(auto_error=False)
@cache
def _parse_api_keys(api_keys: str) -> List[str]:
"""Parse the string api keys to a list
Args:
api_keys (str): The string api keys
Returns:
List[str]: The list of api keys
"""
if not api_keys:
return []
return [key.strip() for key in api_keys.split(",")]
async def check_api_key(
auth: Optional[HTTPAuthorizationCredentials] = Depends(get_bearer_token),
service: Service = Depends(get_service),
) -> Optional[str]:
"""Check the api key
If the api key is not set, allow all.
Your can pass the token in you request header like this:
.. code-block:: python
import requests
client_api_key = "your_api_key"
headers = {"Authorization": "Bearer " + client_api_key}
res = requests.get("http://test/hello", headers=headers)
assert res.status_code == 200
"""
if service.config.api_keys:
api_keys = _parse_api_keys(service.config.api_keys)
if auth is None or (token := auth.credentials) not in api_keys:
raise HTTPException(
status_code=401,
detail={
"error": {
"message": "",
"type": "invalid_request_error",
"param": None,
"code": "invalid_api_key",
}
},
)
return token
else:
# api_keys not set; allow all
return None
@router.get("/health", dependencies=[Depends(check_api_key)])
async def health():
"""Health check endpoint"""
return {"status": "ok"}
@router.get("/test_auth", dependencies=[Depends(check_api_key)])
async def test_auth():
"""Test auth endpoint"""
return {"status": "ok"}
@router.get("/scenes")
async def get_scenes():
scene_list = [{"recall": "召回评测"}, {"app": "应用评测"}]
return Result.succ(scene_list)
@router.post("/evaluation")
async def evaluation(
request: EvaluateServeRequest,
service: Service = Depends(get_service),
) -> Result:
"""Evaluate results by the scene
Args:
request (EvaluateServeRequest): The request
service (Service): The service
Returns:
ServerResponse: The response
"""
return Result.succ(
await service.run_evaluation(
request.scene_key,
request.scene_value,
request.datasets,
request.context,
request.evaluate_metrics,
)
)
def init_endpoints(system_app: SystemApp) -> None:
"""Initialize the endpoints"""
global global_system_app
system_app.register(Service)
global_system_app = system_app

View File

@ -0,0 +1,63 @@
from enum import Enum
from typing import Any, Dict, List, Optional
from dbgpt._private.pydantic import BaseModel, Field
from ..config import SERVE_APP_NAME_HUMP
class EvaluationScene(Enum):
RECALL = "recall"
APP = "app"
class DatasetStorageType(Enum):
OSS = "oss"
DB = "db"
class EvaluateServeRequest(BaseModel):
evaluate_code: Optional[str] = Field(None, description="evaluation code")
scene_key: Optional[str] = Field(None, description="evaluation scene key")
scene_value: Optional[str] = Field(None, description="evaluation scene value")
datasets_name: Optional[str] = Field(None, description="evaluation datasets name")
datasets: Optional[List[dict]] = Field(None, description="datasets")
evaluate_metrics: Optional[List[str]] = Field(
None, description="evaluation metrics"
)
context: Optional[dict] = Field(None, description="The context of the evaluate")
user_name: Optional[str] = Field(None, description="user name")
user_id: Optional[str] = Field(None, description="user id")
sys_code: Optional[str] = Field(None, description="system code")
parallel_num: Optional[int] = Field(None, description="system code")
state: Optional[str] = Field(None, description="evaluation state")
result: Optional[str] = Field(None, description="evaluation result")
storage_type: Optional[str] = Field(None, comment="datasets storage type")
average_score: Optional[str] = Field(None, description="evaluation average score")
log_info: Optional[str] = Field(None, description="evaluation log_info")
gmt_create: Optional[str] = Field(None, description="create time")
gmt_modified: Optional[str] = Field(None, description="create time")
class EvaluateServeResponse(EvaluateServeRequest):
class Config:
title = f"EvaluateServeResponse for {SERVE_APP_NAME_HUMP}"
class DatasetServeRequest(BaseModel):
code: Optional[str] = Field(None, description="dataset code")
name: Optional[str] = Field(None, description="dataset name")
file_type: Optional[str] = Field(None, description="dataset file type")
storage_type: Optional[str] = Field(None, comment="datasets storage type")
storage_position: Optional[str] = Field(None, comment="datasets storage position")
datasets_count: Optional[int] = Field(None, comment="datasets row count")
have_answer: Optional[bool] = Field(None, comment="datasets have answer")
members: Optional[str] = Field(None, comment="datasets manager members")
user_name: Optional[str] = Field(None, description="user name")
user_id: Optional[str] = Field(None, description="user id")
sys_code: Optional[str] = Field(None, description="system code")
class DatasetServeResponse(DatasetServeRequest):
gmt_create: Optional[str] = Field(None, description="create time")
gmt_modified: Optional[str] = Field(None, description="create time")

View File

@ -0,0 +1,31 @@
from dataclasses import dataclass, field
from typing import Optional
from dbgpt.serve.core import BaseServeConfig
APP_NAME = "evaluate"
SERVE_APP_NAME = "dbgpt_serve_evaluate"
SERVE_APP_NAME_HUMP = "dbgpt_serve_evaluate"
SERVE_CONFIG_KEY_PREFIX = "dbgpt.serve.evaluate."
SERVE_SERVICE_COMPONENT_NAME = f"{SERVE_APP_NAME}_service"
# Database table name
SERVER_APP_TABLE_NAME = "dbgpt_serve_evaluate"
@dataclass
class ServeConfig(BaseServeConfig):
"""Parameters for the serve command"""
# TODO: add your own parameters here
api_keys: Optional[str] = field(
default=None, metadata={"help": "API keys for the endpoint, if None, allow all"}
)
default_user: Optional[str] = field(
default=None,
metadata={"help": "Default user name for evaluate"},
)
default_sys_code: Optional[str] = field(
default=None,
metadata={"help": "Default system code for evaluate"},
)

View File

@ -0,0 +1 @@
# Define your dependencies here

View File

View File

@ -0,0 +1,157 @@
"""This is an auto-generated model file
You can define your own models and DAOs here
"""
import json
import uuid
from datetime import datetime
from typing import Any, Dict, Union
from sqlalchemy import Column, DateTime, Index, Integer, String, Text, UniqueConstraint
from dbgpt.agent.core.schema import Status
from dbgpt.storage.metadata import BaseDao, Model, db
from ..api.schemas import EvaluateServeRequest, EvaluateServeResponse
from ..config import SERVER_APP_TABLE_NAME, ServeConfig
class ServeEntity(Model):
__tablename__ = "evaluate_manage"
__table_args__ = (
UniqueConstraint(
"evaluate_code",
name="uk_evaluate_code",
),
)
id = Column(Integer, primary_key=True, comment="Auto increment id")
evaluate_code = Column(String(256), comment="evaluate Code")
scene_key = Column(String(100), comment="evaluate scene key")
scene_value = Column(String(256), comment="evaluate scene value")
context = Column(Text, comment="evaluate scene run context")
evaluate_metrics = Column(String(599), comment="evaluate metrics")
datasets_name = Column(String(256), comment="datasets name")
datasets = Column(Text, comment="datasets")
storage_type = Column(String(256), comment="datasets storage type")
parallel_num = Column(Integer, comment="datasets run parallel num")
state = Column(String(100), comment="evaluate state")
result = Column(Text, comment="evaluate result")
log_info = Column(Text, comment="evaluate log info")
average_score = Column(Text, comment="evaluate average score")
user_id = Column(String(100), index=True, nullable=True, comment="User id")
user_name = Column(String(128), index=True, nullable=True, comment="User name")
sys_code = Column(String(128), index=True, nullable=True, comment="System code")
gmt_create = Column(DateTime, default=datetime.now, comment="Record creation time")
gmt_modified = Column(
DateTime,
default=datetime.now,
onupdate=datetime.now,
comment="Record update time",
)
def __repr__(self):
return f"ServeEntity(id={self.id}, evaluate_code='{self.evaluate_code}', scene_key='{self.scene_key}', scene_value='{self.scene_value}', datasets='{self.datasets}', user_id='{self.user_id}', user_name='{self.user_name}', sys_code='{self.sys_code}', gmt_created='{self.gmt_created}', gmt_modified='{self.gmt_modified}')"
class ServeDao(BaseDao[ServeEntity, EvaluateServeRequest, EvaluateServeResponse]):
"""The DAO class for Prompt"""
def __init__(self, serve_config: ServeConfig):
super().__init__()
self._serve_config = serve_config
def from_request(
self, request: Union[EvaluateServeRequest, Dict[str, Any]]
) -> ServeEntity:
"""Convert the request to an entity
Args:
request (Union[EvaluateServeRequest, Dict[str, Any]]): The request
Returns:
T: The entity
"""
request_dict = (
request.dict() if isinstance(request, EvaluateServeRequest) else request
)
entity = ServeEntity(
evaluate_code=request_dict.get("evaluate_code", None),
scene_key=request_dict.get("scene_key", None),
scene_value=request_dict.get("scene_value", None),
context=json.dumps(request_dict.get("context", None))
if request_dict.get("context", None)
else None,
evaluate_metrics=request_dict.get("evaluate_metrics", None),
datasets_name=request_dict.get("datasets_name", None),
datasets=request_dict.get("datasets", None),
storage_type=request_dict.get("storage_type", None),
parallel_num=request_dict.get("parallel_num", 1),
state=request_dict.get("state", Status.TODO.value),
result=request_dict.get("result", None),
average_score=request_dict.get("average_score", None),
log_info=request_dict.get("log_info", None),
user_id=request_dict.get("user_id", None),
user_name=request_dict.get("user_name", None),
sys_code=request_dict.get("sys_code", None),
)
if not entity.evaluate_code:
entity.evaluate_code = uuid.uuid1().hex
return entity
def to_request(self, entity: ServeEntity) -> EvaluateServeRequest:
"""Convert the entity to a request
Args:
entity (T): The entity
Returns:
REQ: The request
"""
return EvaluateServeRequest(
evaluate_code=entity.evaluate_code,
scene_key=entity.scene_key,
scene_value=entity.scene_value,
datasets_name=entity.datasets_name,
datasets=entity.datasets,
storage_type=entity.storage_type,
evaluate_metrics=entity.evaluate_metrics,
context=json.loads(entity.context) if entity.context else None,
user_name=entity.user_name,
user_id=entity.user_id,
sys_code=entity.sys_code,
state=entity.state,
result=entity.result,
average_score=entity.average_score,
log_info=entity.log_info,
)
def to_response(self, entity: ServeEntity) -> EvaluateServeResponse:
"""Convert the entity to a response
Args:
entity (T): The entity
Returns:
RES: The response
"""
gmt_created_str = entity.gmt_create.strftime("%Y-%m-%d %H:%M:%S")
gmt_modified_str = entity.gmt_modified.strftime("%Y-%m-%d %H:%M:%S")
return EvaluateServeResponse(
evaluate_code=entity.evaluate_code,
scene_key=entity.scene_key,
scene_value=entity.scene_value,
datasets_name=entity.datasets_name,
datasets=entity.datasets,
storage_type=entity.storage_type,
evaluate_metrics=entity.evaluate_metrics,
context=json.loads(entity.context) if entity.context else None,
user_name=entity.user_name,
user_id=entity.user_id,
sys_code=entity.sys_code,
state=entity.state,
result=entity.result,
average_score=entity.average_score,
log_info=entity.log_info,
gmt_create=gmt_created_str,
gmt_modified=gmt_modified_str,
)

View File

@ -0,0 +1,122 @@
from datetime import datetime
from typing import Any, Dict, Union
from sqlalchemy import Column, DateTime, Index, Integer, String, Text, UniqueConstraint
from dbgpt.storage.metadata import BaseDao, Model, db
from ..api.schemas import DatasetServeRequest, DatasetServeResponse
from ..config import SERVER_APP_TABLE_NAME, ServeConfig
class DatasetServeEntity(Model):
__tablename__ = "evaluate_datasets"
__table_args__ = (
UniqueConstraint(
"code",
name="uk_dataset",
),
UniqueConstraint(
"name",
name="uk_dataset_name",
),
)
id = Column(Integer, primary_key=True, comment="Auto increment id")
code = Column(String(256), comment="evaluate datasets Code")
name = Column(String(1000), comment="evaluate datasets Name")
file_type = Column(String(256), comment="datasets file type")
storage_type = Column(String(256), comment="datasets storage type")
storage_position = Column(Text, comment="datasets storage position")
datasets_count = Column(Integer, comment="datasets row count")
have_answer = Column(String(10), comment="datasets have answer")
members = Column(String(1000), comment="evaluate datasets members")
user_id = Column(String(100), index=True, nullable=True, comment="User id")
user_name = Column(String(128), index=True, nullable=True, comment="User name")
sys_code = Column(String(128), index=True, nullable=True, comment="System code")
gmt_create = Column(DateTime, default=datetime.now, comment="Record creation time")
gmt_modified = Column(
DateTime,
default=datetime.now,
onupdate=datetime.now,
comment="Record update time",
)
def __repr__(self):
return f"ServeEntity(id={self.id}, code='{self.code}', name='{self.name}', file_type='{self.file_type}', storage_type='{self.storage_type}', storage_position='{self.storage_position}', datasets_count='{self.datasets_count}', user_id='{self.user_id}', user_name='{self.user_name}', sys_code='{self.sys_code}', gmt_create='{self.gmt_create}', gmt_modified='{self.gmt_modified}')"
class DatasetServeDao(
BaseDao[DatasetServeEntity, DatasetServeRequest, DatasetServeResponse]
):
"""The DAO class for Prompt"""
def __init__(self, serve_config: ServeConfig):
super().__init__()
self._serve_config = serve_config
def from_request(
self, request: Union[DatasetServeRequest, Dict[str, Any]]
) -> DatasetServeEntity:
"""Convert the request to an entity
Args:
request (Union[DatasetServeRequest, Dict[str, Any]]): The request
Returns:
T: The entity
"""
request_dict = (
request.dict() if isinstance(request, DatasetServeRequest) else request
)
entity = DatasetServeEntity(**request_dict)
return entity
def to_request(self, entity: DatasetServeEntity) -> DatasetServeRequest:
"""Convert the entity to a request
Args:
entity (T): The entity
Returns:
REQ: The request
"""
return DatasetServeRequest(
code=entity.code,
name=entity.name,
file_type=entity.file_type,
storage_type=entity.storage_type,
storage_position=entity.storage_position,
datasets_count=entity.datasets_count,
have_answer=entity.have_answer,
members=entity.members,
user_name=entity.user_name,
user_id=entity.user_id,
sys_code=entity.sys_code,
)
def to_response(self, entity: DatasetServeEntity) -> DatasetServeResponse:
"""Convert the entity to a response
Args:
entity (T): The entity
Returns:
RES: The response
"""
gmt_created_str = entity.gmt_create.strftime("%Y-%m-%d %H:%M:%S")
gmt_modified_str = entity.gmt_modified.strftime("%Y-%m-%d %H:%M:%S")
return DatasetServeResponse(
code=entity.code,
name=entity.name,
file_type=entity.file_type,
storage_type=entity.storage_type,
storage_position=entity.storage_position,
datasets_count=entity.datasets_count,
have_answer=entity.have_answer,
members=entity.members,
user_name=entity.user_name,
user_id=entity.user_id,
sys_code=entity.sys_code,
gmt_create=gmt_created_str,
gmt_modified=gmt_modified_str,
)

View File

@ -0,0 +1,119 @@
import logging
from typing import List, Optional, Union
from sqlalchemy import URL
from dbgpt.component import SystemApp
from dbgpt.serve.core import BaseServe
from dbgpt.storage.metadata import DatabaseManager
from .api.endpoints import init_endpoints, router
from .config import APP_NAME, SERVE_APP_NAME, SERVE_APP_NAME_HUMP
logger = logging.getLogger(__name__)
class Serve(BaseServe):
"""Serve component
Examples:
Register the serve component to the system app
.. code-block:: python
from fastapi import FastAPI
from dbgpt import SystemApp
from dbgpt.core import PromptTemplate
from dbgpt.serve.prompt.serve import Serve, SERVE_APP_NAME
app = FastAPI()
system_app = SystemApp(app)
system_app.register(Serve, api_prefix="/api/v1/prompt")
system_app.on_init()
# Run before start hook
system_app.before_start()
prompt_serve = system_app.get_component(SERVE_APP_NAME, Serve)
# Get the prompt manager
prompt_manager = prompt_serve.prompt_manager
prompt_manager.save(
PromptTemplate(template="Hello {name}", input_variables=["name"]),
prompt_name="prompt_name",
)
With your database url
.. code-block:: python
from fastapi import FastAPI
from dbgpt import SystemApp
from dbgpt.core import PromptTemplate
from dbgpt.serve.prompt.serve import Serve, SERVE_APP_NAME
app = FastAPI()
system_app = SystemApp(app)
system_app.register(
Serve,
api_prefix="/api/v1/prompt",
db_url_or_db="sqlite:///:memory:",
try_create_tables=True,
)
system_app.on_init()
# Run before start hook
system_app.before_start()
prompt_serve = system_app.get_component(SERVE_APP_NAME, Serve)
# Get the prompt manager
prompt_manager = prompt_serve.prompt_manager
prompt_manager.save(
PromptTemplate(template="Hello {name}", input_variables=["name"]),
prompt_name="prompt_name",
)
"""
name = SERVE_APP_NAME
def __init__(
self,
system_app: SystemApp,
api_prefix: Optional[List[str]] = None,
api_tags: Optional[List[str]] = None,
db_url_or_db: Union[str, URL, DatabaseManager] = None,
try_create_tables: Optional[bool] = False,
):
if api_prefix is None:
api_prefix = [f"/api/v1/{APP_NAME}", f"/api/v2/serve/{APP_NAME}"]
if api_tags is None:
api_tags = [SERVE_APP_NAME_HUMP]
super().__init__(
system_app, api_prefix, api_tags, db_url_or_db, try_create_tables
)
def init_app(self, system_app: SystemApp):
if self._app_has_initiated:
return
self._system_app = system_app
for prefix in self._api_prefix:
self._system_app.app.include_router(
router, prefix=prefix, tags=self._api_tags
)
init_endpoints(self._system_app)
self._app_has_initiated = True
def on_init(self):
"""Called before the start of the application.
You can do some initialization here.
"""
# import your own module here to ensure the module is loaded before the application starts
def before_start(self):
"""Called before the start of the application.
You can do some initialization here.
"""
# import your own module here to ensure the module is loaded before the application starts

View File

View File

@ -0,0 +1,186 @@
import asyncio
import io
import json
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional
from dbgpt._private.config import Config
from dbgpt.component import ComponentType, SystemApp
from dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIG
from dbgpt.core.interface.evaluation import (
EVALUATE_FILE_COL_ANSWER,
EvaluationResult,
metric_manage,
)
from dbgpt.model import DefaultLLMClient
from dbgpt.model.cluster import WorkerManagerFactory
from dbgpt.rag.embedding.embedding_factory import EmbeddingFactory
from dbgpt.rag.evaluation import RetrieverEvaluator
from dbgpt.rag.evaluation.answer import AnswerRelevancyMetric
from dbgpt.rag.evaluation.retriever import RetrieverSimilarityMetric
from dbgpt.serve.core import BaseService
from dbgpt.serve.rag.operators.knowledge_space import SpaceRetrieverOperator
from dbgpt.storage.metadata import BaseDao
from dbgpt.storage.vector_store.base import VectorStoreConfig
from ...agent.agents.controller import multi_agents
from ...agent.evaluation.evaluation import AgentEvaluator, AgentOutputOperator
from ...agent.evaluation.evaluation_metric import IntentMetric
from ...prompt.service.service import Service as PromptService
from ...rag.connector import VectorStoreConnector
from ...rag.service.service import Service as RagService
from ..api.schemas import EvaluateServeRequest, EvaluateServeResponse, EvaluationScene
from ..config import SERVE_CONFIG_KEY_PREFIX, SERVE_SERVICE_COMPONENT_NAME, ServeConfig
from ..models.models import ServeDao, ServeEntity
logger = logging.getLogger(__name__)
CFG = Config()
executor = ThreadPoolExecutor(max_workers=5)
def get_rag_service(system_app) -> RagService:
return system_app.get_component("dbgpt_rag_service", RagService)
def get_prompt_service(system_app) -> PromptService:
return system_app.get_component("dbgpt_serve_prompt_service", PromptService)
class Service(BaseService[ServeEntity, EvaluateServeRequest, EvaluateServeResponse]):
"""The service class for Evaluate"""
name = SERVE_SERVICE_COMPONENT_NAME
def __init__(self, system_app: SystemApp, dao: Optional[ServeDao] = None):
self._system_app = None
self._serve_config: ServeConfig = None
self._dao: ServeDao = dao
super().__init__(system_app)
self.rag_service = get_rag_service(system_app)
self.prompt_service = get_prompt_service(system_app)
def init_app(self, system_app: SystemApp) -> None:
"""Initialize the service
Args:
system_app (SystemApp): The system app
"""
self._serve_config = ServeConfig.from_app_config(
system_app.config, SERVE_CONFIG_KEY_PREFIX
)
self._dao = self._dao or ServeDao(self._serve_config)
self._system_app = system_app
@property
def dao(self) -> BaseDao[ServeEntity, EvaluateServeRequest, EvaluateServeResponse]:
"""Returns the internal DAO."""
return self._dao
@property
def config(self) -> ServeConfig:
"""Returns the internal ServeConfig."""
return self._serve_config
async def run_evaluation(
self,
scene_key,
scene_value,
datasets: List[dict],
context: Optional[dict] = None,
evaluate_metrics: Optional[List[str]] = None,
parallel_num: Optional[int] = 1,
) -> List[List[EvaluationResult]]:
"""Evaluate results
Args:
scene_key (str): The scene_key
scene_value (str): The scene_value
datasets (List[dict]): The datasets
context (Optional[dict]): The run context
evaluate_metrics (Optional[str]): The metric_names
parallel_num (Optional[int]): The parallel_num
Returns:
List[List[EvaluationResult]]: The response
"""
results = []
if EvaluationScene.RECALL.value == scene_key:
embedding_factory = CFG.SYSTEM_APP.get_component(
"embedding_factory", EmbeddingFactory
)
embeddings = embedding_factory.create(
EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
)
config = VectorStoreConfig(
name=scene_value,
embedding_fn=embeddings,
)
vector_store_connector = VectorStoreConnector(
vector_store_type=CFG.VECTOR_STORE_TYPE,
vector_store_config=config,
)
evaluator = RetrieverEvaluator(
operator_cls=SpaceRetrieverOperator,
embeddings=embeddings,
operator_kwargs={
"space_id": str(scene_value),
"top_k": CFG.KNOWLEDGE_SEARCH_TOP_SIZE,
"vector_store_connector": vector_store_connector,
},
)
metrics = []
metric_name_list = evaluate_metrics
for name in metric_name_list:
if name == "RetrieverSimilarityMetric":
metrics.append(RetrieverSimilarityMetric(embeddings=embeddings))
else:
metrics.append(metric_manage.get_by_name(name)())
for dataset in datasets:
chunks = self.rag_service.get_chunk_list(
{"doc_name": dataset.get("doc_name")}
)
contexts = [chunk.content for chunk in chunks]
dataset["contexts"] = contexts
results = await evaluator.evaluate(
datasets, metrics=metrics, parallel_num=parallel_num
)
elif EvaluationScene.APP.value == scene_key:
evaluator = AgentEvaluator(
operator_cls=AgentOutputOperator,
operator_kwargs={
"app_code": scene_value,
},
)
metrics = []
metric_name_list = evaluate_metrics
for name in metric_name_list:
if name == AnswerRelevancyMetric.name():
worker_manager = CFG.SYSTEM_APP.get_component(
ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory
).create()
llm_client = DefaultLLMClient(worker_manager=worker_manager)
prompt = self.prompt_service.get_template(context.get("prompt"))
metrics.append(
AnswerRelevancyMetric(
llm_client=llm_client,
model_name=context.get("model"),
prompt_template=prompt.template,
)
)
for dataset in datasets:
context = await multi_agents.get_knowledge_resources(
app_code=scene_value, question=dataset.get("query")
)
dataset[EVALUATE_FILE_COL_ANSWER] = context
else:
metrics.append(metric_manage.get_by_name(name)())
results = await evaluator.evaluate(
dataset=datasets, metrics=metrics, parallel_num=parallel_num
)
return results

View File

@ -23,10 +23,8 @@ from dbgpt.core.awel.flow import (
)
from dbgpt.core.awel.task.base import IN, OUT
from dbgpt.core.interface.operators.prompt_operator import BasePromptBuilderOperator
from dbgpt.rag.embedding.embedding_factory import EmbeddingFactory
from dbgpt.rag.retriever.embedding import EmbeddingRetriever
from dbgpt.serve.rag.connector import VectorStoreConnector
from dbgpt.storage.vector_store.base import VectorStoreConfig
from dbgpt.core.interface.operators.retriever import RetrieverOperator
from dbgpt.serve.rag.retriever.knowledge_space import KnowledgeSpaceRetriever
from dbgpt.util.function_utils import rearrange_args_by_type
from dbgpt.util.i18n_utils import _
@ -40,7 +38,7 @@ def _load_space_name() -> List[OptionValue]:
]
class SpaceRetrieverOperator(MapOperator[IN, OUT]):
class SpaceRetrieverOperator(RetrieverOperator[IN, OUT]):
"""knowledge space retriever operator."""
metadata = ViewMetadata(
@ -71,64 +69,48 @@ class SpaceRetrieverOperator(MapOperator[IN, OUT]):
documentation_url="https://github.com/openai/openai-python",
)
def __init__(self, space_name: str, recall_score: Optional[float] = 0.3, **kwargs):
def __init__(
self,
space_id: str,
top_k: Optional[int] = 5,
recall_score: Optional[float] = 0.3,
**kwargs,
):
"""
Args:
space_name (str): The space name.
space_id (str): The space name.
top_k (Optional[int]): top k.
recall_score (Optional[float], optional): The recall score. Defaults to 0.3.
"""
self._space_name = space_name
self._space_id = space_id
self._top_k = top_k
self._recall_score = recall_score
self._service = KnowledgeService()
embedding_factory = CFG.SYSTEM_APP.get_component(
"embedding_factory", EmbeddingFactory
)
embedding_fn = embedding_factory.create(
model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
)
config = VectorStoreConfig(name=self._space_name, embedding_fn=embedding_fn)
self._vector_store_connector = VectorStoreConnector(
vector_store_type=CFG.VECTOR_STORE_TYPE,
vector_store_config=config,
)
super().__init__(**kwargs)
async def map(self, query: IN) -> OUT:
def retrieve(self, query: IN) -> OUT:
"""Map input value to output value.
Args:
input_value (IN): The input value.
query (IN): The input value.
Returns:
OUT: The output value.
"""
space_context = self._service.get_space_context(self._space_name)
top_k = (
CFG.KNOWLEDGE_SEARCH_TOP_SIZE
if space_context is None
else int(space_context["embedding"]["topk"])
)
recall_score = (
CFG.KNOWLEDGE_SEARCH_RECALL_SCORE
if space_context is None
else float(space_context["embedding"]["recall_score"])
)
embedding_retriever = EmbeddingRetriever(
top_k=top_k,
vector_store_connector=self._vector_store_connector,
space_retriever = KnowledgeSpaceRetriever(
space_id=self._space_id,
top_k=self._top_k,
)
if isinstance(query, str):
candidates = await embedding_retriever.aretrieve_with_scores(
query, recall_score
)
candidates = space_retriever.retrieve_with_scores(query, self._recall_score)
elif isinstance(query, list):
candidates = [
await embedding_retriever.aretrieve_with_scores(q, recall_score)
space_retriever.retrieve_with_scores(q, self._recall_score)
for q in query
]
candidates = reduce(lambda x, y: x + y, candidates)
return [candidate.content for candidate in candidates]
return candidates
class KnowledgeSpacePromptBuilderOperator(

View File

@ -320,6 +320,10 @@ class Service(BaseService[KnowledgeSpaceEntity, SpaceServeRequest, SpaceServeRes
entity = self._document_dao.from_response(document)
if request.doc_name:
entity.doc_name = request.doc_name
update_chunk = self._chunk_dao.get_one({"document_id": entity.id})
if update_chunk:
update_chunk.doc_name = request.doc_name
self._chunk_dao.update_chunk(update_chunk)
if len(request.questions) == 0:
request.questions = ""
questions = [
@ -411,13 +415,20 @@ class Service(BaseService[KnowledgeSpaceEntity, SpaceServeRequest, SpaceServeRes
"""
return self._document_dao.get_list_page(request, page, page_size)
def get_chunk_list(self, request: QUERY_SPEC, page: int, page_size: int):
"""get document chunks
def get_chunk_list_page(self, request: QUERY_SPEC, page: int, page_size: int):
"""get document chunks with page
Args:
- request: QUERY_SPEC
"""
return self._chunk_dao.get_list_page(request, page, page_size)
def get_chunk_list(self, request: QUERY_SPEC):
"""get document chunks
Args:
- request: QUERY_SPEC
"""
return self._chunk_dao.get_list(request)
def update_chunk(self, request: ChunkServeRequest):
"""update knowledge document chunk"""
if not request.id:

205
docs/docs/api/evaluation.md Normal file
View File

@ -0,0 +1,205 @@
# Evaluation
Get started with the Evaluation API
### Create Evaluation
```python
POST /api/v2/serve/evaluate/evaluation
```
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
<Tabs
defaultValue="curl_evaluation"
groupId="chat1"
values={[
{label: 'Curl', value: 'curl_evaluation'},
{label: 'Python', value: 'python_evaluation'},
]
}>
<TabItem value="curl_evaluation">
```shell
DBGPT_API_KEY=dbgpt
SPACE_ID={YOUR_SPACE_ID}
curl -X POST "http://localhost:5670/api/v2/serve/evaluate/evaluation"
-H "Authorization: Bearer $DBGPT_API_KEY" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
-d '{
"scene_key": "recall",
"scene_value":147,
"context":{"top_k":5},
"sys_code":"xx",
"evaluate_metrics":["RetrieverHitRateMetric","RetrieverMRRMetric","RetrieverSimilarityMetric"],
"datasets": [{
"query": "what awel talked about",
"doc_name":"awel.md"
}]
}'
```
</TabItem>
<TabItem value="python_evaluation">
```python
from dbgpt.client import Client
from dbgpt.client.evaluation import run_evaluation
from dbgpt.serve.evaluate.api.schemas import EvaluateServeRequest
DBGPT_API_KEY = "dbgpt"
client = Client(api_key=DBGPT_API_KEY)
request = EvaluateServeRequest(
# The scene type of the evaluation, e.g. support app, recall
scene_key="recall",
# e.g. app id(when scene_key is app), space id(when scene_key is recall)
scene_value="147",
context={"top_k": 5},
evaluate_metrics=[
"RetrieverHitRateMetric",
"RetrieverMRRMetric",
"RetrieverSimilarityMetric",
],
datasets=[
{
"query": "what awel talked about",
"doc_name": "awel.md",
}
],
)
data = await run_evaluation(client, request=request)
```
</TabItem>
</Tabs>
#### Request body
Request <a href="#the-evaluation-request">Evaluation Object</a>
when scene_key is app, the request body should be like this:
```json
{
"scene_key": "app",
"scene_value":"2c76eea2-83b6-11ef-b482-acde48001122",
"context":{"top_k":5, "prompt":"942acd7e33b54ce28565f89f9b278044","model":"zhipu_proxyllm"},
"sys_code":"xx",
"evaluate_metrics":["AnswerRelevancyMetric"],
"datasets": [{
"query": "what awel talked about",
"doc_name":"awel.md"
}]
}
```
when scene_key is recall, the request body should be like this:
```json
{
"scene_key": "recall",
"scene_value":"2c76eea2-83b6-11ef-b482-acde48001122",
"context":{"top_k":5, "prompt":"942acd7e33b54ce28565f89f9b278044","model":"zhipu_proxyllm"},
"evaluate_metrics":["RetrieverHitRateMetric", "RetrieverMRRMetric", "RetrieverSimilarityMetric"],
"datasets": [{
"query": "what awel talked about",
"doc_name":"awel.md"
}]
}
```
#### Response body
Return <a href="#the-evaluation-object">Evaluation Object</a> List
### The Evaluation Request Object
________
<b>scene_key</b> <font color="gray"> string </font> <font color="red"> Required </font>
The scene type of the evaluation, e.g. support app, recall
--------
<b>scene_value</b> <font color="gray"> string </font> <font color="red"> Required </font>
The scene value of the evaluation, e.g. app id(when scene_key is app), space id(when scene_key is recall)
--------
<b>context</b> <font color="gray"> object </font> <font color="red"> Required </font>
The context of the evaluation
- top_k <font color="gray"> int </font> <font color="red"> Required </font>
- prompt <font color="gray"> string </font> prompt code
- model <font color="gray"> string </font> llm model name
--------
evaluate_metrics <font color="gray"> array </font> <font color="red"> Required </font>
The evaluate metrics of the evaluation,
e.g.
- <b>AnswerRelevancyMetric</b>: the answer relevancy metric(when scene_key is app)
- <b>RetrieverHitRateMetric</b>: Hit rate calculates the fraction of queries where the correct answer is found
within the top-k retrieved documents. In simpler terms, its about how often our
system gets it right within the top few guesses. (when scene_key is recall)
- <b>RetrieverMRRMetric</b>: For each query, MRR evaluates the systems accuracy by looking at the rank of the
highest-placed relevant document. Specifically, its the average of the reciprocals
of these ranks across all the queries. So, if the first relevant document is the
top result, the reciprocal rank is 1; if its second, the reciprocal rank is 1/2,
and so on. (when scene_key is recall)
- <b>RetrieverSimilarityMetric</b>: Embedding Similarity Metric (when scene_key is recall)
--------
datasets <font color="gray"> array </font> <font color="red"> Required </font>
The datasets of the evaluation
--------
### The Evaluation Result
________
<b>prediction</b> <font color="gray">string</font>
The prediction result
________
<b>contexts</b> <font color="gray">string</font>
The contexts of RAG Retrieve chunk
________
<b>score</b> <font color="gray">float</font>
The score of the prediction
________
<b>passing</b> <font color="gray">bool</font>
The passing of the prediction
________
<b>metric_name</b> <font color="gray">string</font>
The metric name of the evaluation
________
<b>prediction_cost</b> <font color="gray">int</font>
The prediction cost of the evaluation
________
<b>query</b> <font color="gray">string</font>
The query of the evaluation
________
<b>raw_dataset</b> <font color="gray">object</font>
The raw dataset of the evaluation
________
<b>feedback</b> <font color="gray">string</font>
The feedback of the llm evaluation
________

View File

@ -421,6 +421,9 @@ const sidebars = {
},{
type: 'doc',
id: 'api/datasource'
},{
type: 'doc',
id: 'api/evaluation'
},
],
link: {

View File

@ -0,0 +1,91 @@
"""Client: run evaluation example.
This example demonstrates how to use the dbgpt client to evaluate with the rag recall
and app answer.
Example:
.. code-block:: python
DBGPT_API_KEY = "dbgpt"
client = Client(api_key=DBGPT_API_KEY)
# 1. evaluate with rag recall
request = EvaluateServeRequest(
# The scene type of the evaluation, e.g. support app, recall
scene_key="recall",
# e.g. app id(when scene_key is app), space id(when scene_key is recall)
scene_value="147",
context={"top_k": 5},
evaluate_metrics=[
"RetrieverHitRateMetric",
"RetrieverMRRMetric",
"RetrieverSimilarityMetric",
],
datasets=[
{
"query": "what awel talked about",
"doc_name": "awel.md",
}
],
)
# 2. evaluate with app answer
request = EvaluateServeRequest(
# The scene type of the evaluation, e.g. support app, recall
scene_key="app",
# e.g. app id(when scene_key is app), space id(when scene_key is recall)
scene_value="2c76eea2-83b6-11ef-b482-acde48001122",
"context"={
"top_k": 5,
"prompt": "942acd7e33b54ce28565f89f9b278044",
"model": "zhipu_proxyllm",
},
evaluate_metrics=[
"AnswerRelevancyMetric",
],
datasets=[
{
"query": "what awel talked about",
"doc_name": "awel.md",
}
],
)
data = await run_evaluation(client, request=request)
print(data)
"""
import asyncio
from dbgpt.client import Client
from dbgpt.client.evaluation import run_evaluation
from dbgpt.serve.evaluate.api.schemas import EvaluateServeRequest
async def main():
# initialize client
DBGPT_API_KEY = "dbgpt"
SPACE_ID = "147"
client = Client(api_key=DBGPT_API_KEY)
request = EvaluateServeRequest(
# The scene type of the evaluation, e.g. support app, recall
scene_key="recall",
# e.g. app id(when scene_key is app), space id(when scene_key is recall)
scene_value=SPACE_ID,
context={"top_k": 5},
evaluate_metrics=[
"RetrieverHitRateMetric",
"RetrieverMRRMetric",
"RetrieverSimilarityMetric",
],
datasets=[
{
"query": "what awel talked about",
"doc_name": "awel.md",
}
],
)
data = await run_evaluation(client, request=request)
print(data)
if __name__ == "__main__":
asyncio.run(main())