feat:add rag awel operator view metadata. (#1174)

This commit is contained in:
Aries-ckt 2024-02-21 10:24:12 +08:00 committed by GitHub
parent c78bd22fda
commit 32e1554282
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 527 additions and 6 deletions

View File

@ -112,6 +112,7 @@ _OPERATOR_CATEGORY_DETAIL = {
"output_parser": _CategoryDetail("Output Parser", "Parse the output of LLM model"),
"common": _CategoryDetail("Common", "The common operator"),
"agent": _CategoryDetail("Agent", "The agent operator"),
"rag": _CategoryDetail("RAG", "The RAG operator"),
}
@ -124,6 +125,7 @@ class OperatorCategory(str, Enum):
OUTPUT_PARSER = "output_parser"
COMMON = "common"
AGENT = "agent"
RAG = "rag"
def label(self) -> str:
"""Get the label of the category."""
@ -163,6 +165,7 @@ _RESOURCE_CATEGORY_DETAIL = {
"common": _CategoryDetail("Common", "The common resource"),
"prompt": _CategoryDetail("Prompt", "The prompt resource"),
"agent": _CategoryDetail("Agent", "The agent resource"),
"rag": _CategoryDetail("RAG", "The resource"),
}
@ -176,6 +179,7 @@ class ResourceCategory(str, Enum):
COMMON = "common"
PROMPT = "prompt"
AGENT = "agent"
RAG = "rag"
def label(self) -> str:
"""Get the label of the category."""

View File

@ -1031,3 +1031,54 @@ class UserInputParsedOperator(MapOperator[CommonLLMHttpRequestBody, Dict[str, An
async def map(self, request_body: CommonLLMHttpRequestBody) -> Dict[str, Any]:
"""Map the request body to response body."""
return {self._key: request_body.messages}
class RequestedParsedOperator(MapOperator[CommonLLMHttpRequestBody, str]):
"""User input parsed operator."""
metadata = ViewMetadata(
label="Request Body Parsed To String Operator",
name="request_body_to_str__parsed_operator",
category=OperatorCategory.COMMON,
parameters=[
Parameter.build_from(
"Key",
"key",
str,
optional=True,
default="",
description="The key of the dict, link 'user_input'",
)
],
inputs=[
IOField.build_from(
"Request Body",
"request_body",
CommonLLMHttpRequestBody,
description="The request body of the API endpoint",
)
],
outputs=[
IOField.build_from(
"User Input String",
"user_input_str",
str,
description="The user input dict of the API endpoint",
)
],
description="User input parsed operator",
)
def __init__(self, key: str = "user_input", **kwargs):
"""Initialize a UserInputParsedOperator."""
self._key = key
super().__init__(**kwargs)
async def map(self, request_body: CommonLLMHttpRequestBody) -> str:
"""Map the request body to response body."""
dict_value = request_body.dict()
if not self._key or self._key not in dict_value:
raise ValueError(
f"Prefix key {self._key} is not a valid key of the request body"
)
return dict_value[self._key]

View File

@ -457,3 +457,42 @@ class CommonStreamingOutputOperator(TransformStreamAbsOperator[ModelOutput, str]
decoded_unicode = model_output.text.replace("\ufffd", "")
msg = decoded_unicode.replace("\n", "\\n")
yield f"data:{msg}\n\n"
class StringOutput2ModelOutputOperator(MapOperator[str, ModelOutput]):
"""Map String to ModelOutput."""
metadata = ViewMetadata(
label="Map String to ModelOutput",
name="string_2_model_output_operator",
category=OperatorCategory.COMMON,
description="Map String to ModelOutput.",
parameters=[],
inputs=[
IOField.build_from(
"String",
"input_value",
str,
description="The input value of the operator.",
),
],
outputs=[
IOField.build_from(
"Model Output",
"input_value",
ModelOutput,
description="The input value of the operator.",
),
],
)
def __int__(self, **kwargs):
"""Create a new operator."""
super().__init__(**kwargs)
async def map(self, input_value: str) -> ModelOutput:
"""Map the model output to the common response body."""
return ModelOutput(
text=input_value,
error_code=500,
)

View File

@ -1,26 +1,92 @@
from typing import Any, List, Optional
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import (
IOField,
OperatorCategory,
OptionValue,
Parameter,
ViewMetadata,
)
from dbgpt.core.awel.task.base import IN
from dbgpt.rag.knowledge.base import Knowledge, KnowledgeType
from dbgpt.rag.knowledge.factory import KnowledgeFactory
class KnowledgeOperator(MapOperator[Any, Any]):
"""Knowledge Operator."""
"""Knowledge Factory Operator."""
metadata = ViewMetadata(
label="Knowledge Factory Operator",
name="knowledge_operator",
category=OperatorCategory.RAG,
description="The knowledge operator.",
inputs=[
IOField.build_from(
"knowledge datasource",
"knowledge datasource",
dict,
"knowledge datasource",
)
],
outputs=[
IOField.build_from(
"Knowledge",
"Knowledge",
Knowledge,
description="Knowledge",
)
],
parameters=[
Parameter.build_from(
label="datasource",
name="datasource",
type=str,
optional=True,
default="DOCUMENT",
description="datasource",
),
Parameter.build_from(
label="knowledge_type",
name="knowledge type",
type=str,
optional=True,
options=[
OptionValue(
label="DOCUMENT",
name="DOCUMENT",
value=KnowledgeType.DOCUMENT.name,
),
OptionValue(label="URL", name="URL", value=KnowledgeType.URL.name),
OptionValue(
label="TEXT", name="TEXT", value=KnowledgeType.TEXT.name
),
],
default=KnowledgeType.DOCUMENT.name,
description="knowledge type",
),
],
documentation_url="https://github.com/openai/openai-python",
)
def __init__(
self, knowledge_type: Optional[KnowledgeType] = KnowledgeType.DOCUMENT, **kwargs
self,
datasource: Optional[str] = None,
knowledge_type: Optional[str] = KnowledgeType.DOCUMENT.name,
**kwargs
):
"""Init the query rewrite operator.
Args:
knowledge_type: (Optional[KnowledgeType]) The knowledge type.
"""
super().__init__(**kwargs)
self._knowledge_type = knowledge_type
self._datasource = datasource
self._knowledge_type = KnowledgeType.get_by_value(knowledge_type)
async def map(self, datasource: IN) -> Knowledge:
"""knowledge operator."""
if self._datasource:
datasource = self._datasource
return await self.blocking_func_to_async(
KnowledgeFactory.create, datasource, self._knowledge_type
)

View File

@ -2,6 +2,7 @@ from typing import Any, List, Optional
from dbgpt.core import LLMClient
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.core.awel.task.base import IN
from dbgpt.rag.retriever.rewrite import QueryRewrite
@ -9,6 +10,59 @@ from dbgpt.rag.retriever.rewrite import QueryRewrite
class QueryRewriteOperator(MapOperator[Any, Any]):
"""The Rewrite Operator."""
metadata = ViewMetadata(
label="Query Rewrite Operator",
name="query_rewrite_operator",
category=OperatorCategory.RAG,
description="query rewrite operator.",
inputs=[
IOField.build_from("query_context", "query_context", dict, "query context")
],
outputs=[
IOField.build_from(
"rewritten queries",
"queries",
List[str],
description="rewritten queries",
)
],
parameters=[
Parameter.build_from(
"LLM Client",
"llm_client",
LLMClient,
optional=True,
default=None,
description="The LLM Client.",
),
Parameter.build_from(
label="model name",
name="model_name",
type=str,
optional=True,
default="gpt-3.5-turbo",
description="llm model name",
),
Parameter.build_from(
label="prompt language",
name="language",
type=str,
optional=True,
default="en",
description="prompt language",
),
Parameter.build_from(
label="nums",
name="nums",
type=int,
optional=True,
default=5,
description="rewrite query nums",
),
],
documentation_url="https://github.com/openai/openai-python",
)
def __init__(
self,
llm_client: Optional[LLMClient],

View File

@ -1,12 +1,77 @@
from typing import Any, Optional
from dbgpt.core import LLMClient
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.core.awel.task.base import IN
from dbgpt.rag.knowledge.base import Knowledge
from dbgpt.serve.rag.assembler.summary import SummaryAssembler
from dbgpt.serve.rag.operators.base import AssemblerOperator
class SummaryAssemblerOperator(AssemblerOperator[Any, Any]):
metadata = ViewMetadata(
label="Summary Operator",
name="summary_assembler_operator",
category=OperatorCategory.RAG,
description="The summary assembler operator.",
inputs=[
IOField.build_from(
"Knowledge", "knowledge", Knowledge, "knowledge datasource"
)
],
outputs=[
IOField.build_from(
"document summary",
"summary",
str,
description="document summary",
)
],
parameters=[
Parameter.build_from(
"LLM Client",
"llm_client",
LLMClient,
optional=True,
default=None,
description="The LLM Client.",
),
Parameter.build_from(
label="model name",
name="model_name",
type=str,
optional=True,
default="gpt-3.5-turbo",
description="llm model name",
),
Parameter.build_from(
label="prompt language",
name="language",
type=str,
optional=True,
default="en",
description="prompt language",
),
Parameter.build_from(
label="max_iteration_with_llm",
name="max_iteration_with_llm",
type=int,
optional=True,
default=5,
description="prompt language",
),
Parameter.build_from(
label="concurrency_limit_with_llm",
name="concurrency_limit_with_llm",
type=int,
optional=True,
default=3,
description="The concurrency limit with llm",
),
],
documentation_url="https://github.com/openai/openai-python",
)
def __init__(
self,
llm_client: Optional[LLMClient],

View File

@ -0,0 +1,242 @@
from functools import reduce
from typing import List, Optional
from dbgpt.app.knowledge.api import knowledge_space_service
from dbgpt.app.knowledge.request.request import KnowledgeSpaceRequest
from dbgpt.app.knowledge.service import CFG, KnowledgeService
from dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIG
from dbgpt.core import (
BaseMessage,
ChatPromptTemplate,
HumanPromptTemplate,
ModelMessage,
)
from dbgpt.core.awel import JoinOperator, MapOperator
from dbgpt.core.awel.flow import (
IOField,
OperatorCategory,
OperatorType,
OptionValue,
Parameter,
ViewMetadata,
)
from dbgpt.core.awel.task.base import IN, OUT
from dbgpt.core.interface.operators.prompt_operator import BasePromptBuilderOperator
from dbgpt.rag.embedding.embedding_factory import EmbeddingFactory
from dbgpt.rag.retriever.embedding import EmbeddingRetriever
from dbgpt.storage.vector_store.base import VectorStoreConfig
from dbgpt.storage.vector_store.connector import VectorStoreConnector
from dbgpt.util.function_utils import rearrange_args_by_type
class SpaceRetrieverOperator(MapOperator[IN, OUT]):
"""knowledge space retriever operator."""
metadata = ViewMetadata(
label="Knowledge Space Operator",
name="space_operator",
category=OperatorCategory.RAG,
description="knowledge space retriever operator.",
inputs=[IOField.build_from("query", "query", str, "user query")],
outputs=[
IOField.build_from(
"related chunk content",
"related chunk content",
List,
description="related chunk content",
)
],
parameters=[
Parameter.build_from(
"Space Name",
"space_name",
str,
options=[
OptionValue(label=space.name, name=space.name, value=space.name)
for space in knowledge_space_service.get_knowledge_space(
KnowledgeSpaceRequest()
)
],
optional=False,
default=None,
description="space name.",
)
],
documentation_url="https://github.com/openai/openai-python",
)
def __init__(self, space_name: str, recall_score: Optional[float] = 0.3, **kwargs):
"""
Args:
space_name (str): The space name.
recall_score (Optional[float], optional): The recall score. Defaults to 0.3.
"""
self._space_name = space_name
self._recall_score = recall_score
self._service = KnowledgeService()
embedding_factory = CFG.SYSTEM_APP.get_component(
"embedding_factory", EmbeddingFactory
)
embedding_fn = embedding_factory.create(
model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
)
config = VectorStoreConfig(name=self._space_name, embedding_fn=embedding_fn)
self._vector_store_connector = VectorStoreConnector(
vector_store_type=CFG.VECTOR_STORE_TYPE,
vector_store_config=config,
)
super().__init__(**kwargs)
async def map(self, query: IN) -> OUT:
"""Map input value to output value.
Args:
input_value (IN): The input value.
Returns:
OUT: The output value.
"""
space_context = self._service.get_space_context(self._space_name)
top_k = (
CFG.KNOWLEDGE_SEARCH_TOP_SIZE
if space_context is None
else int(space_context["embedding"]["topk"])
)
recall_score = (
CFG.KNOWLEDGE_SEARCH_RECALL_SCORE
if space_context is None
else float(space_context["embedding"]["recall_score"])
)
embedding_retriever = EmbeddingRetriever(
top_k=top_k,
vector_store_connector=self._vector_store_connector,
)
if isinstance(query, str):
candidates = await embedding_retriever.aretrieve_with_scores(
query, recall_score
)
elif isinstance(query, list):
candidates = [
await embedding_retriever.aretrieve_with_scores(q, recall_score)
for q in query
]
candidates = reduce(lambda x, y: x + y, candidates)
return [candidate.content for candidate in candidates]
class KnowledgeSpacePromptBuilderOperator(
BasePromptBuilderOperator, JoinOperator[List[ModelMessage]]
):
"""The operator to build the prompt with static prompt.
The prompt will pass to this operator.
"""
metadata = ViewMetadata(
label="Knowledge Space Prompt Builder Operator",
name="knowledge_space_prompt_builder_operator",
description="Build messages from prompt template and chat history.",
operator_type=OperatorType.JOIN,
category=OperatorCategory.CONVERSION,
parameters=[
Parameter.build_from(
"Chat Prompt Template",
"prompt",
ChatPromptTemplate,
description="The chat prompt template.",
),
Parameter.build_from(
"History Key",
"history_key",
str,
optional=True,
default="chat_history",
description="The key of history in prompt dict.",
),
Parameter.build_from(
"String History",
"str_history",
bool,
optional=True,
default=False,
description="Whether to convert the history to string.",
),
],
inputs=[
IOField.build_from(
"user input",
"user_input",
str,
is_list=False,
description="user input",
),
IOField.build_from(
"space related context",
"related_context",
List,
is_list=False,
description="context of knowledge space.",
),
IOField.build_from(
"History",
"history",
BaseMessage,
is_list=True,
description="The history.",
),
],
outputs=[
IOField.build_from(
"Formatted Messages",
"formatted_messages",
ModelMessage,
is_list=True,
description="The formatted messages.",
)
],
)
def __init__(
self,
prompt: ChatPromptTemplate,
history_key: str = "chat_history",
check_storage: bool = True,
str_history: bool = False,
**kwargs,
):
"""Create a new history dynamic prompt builder operator.
Args:
prompt (ChatPromptTemplate): The chat prompt template.
history_key (str, optional): The key of history in prompt dict. Defaults to "chat_history".
check_storage (bool, optional): Whether to check the storage. Defaults to True.
str_history (bool, optional): Whether to convert the history to string. Defaults to False.
"""
self._prompt = prompt
self._history_key = history_key
self._str_history = str_history
BasePromptBuilderOperator.__init__(self, check_storage=check_storage)
JoinOperator.__init__(self, combine_function=self.merge_context, **kwargs)
@rearrange_args_by_type
async def merge_context(
self,
user_input: str,
related_context: List[str],
history: Optional[List[BaseMessage]],
) -> List[ModelMessage]:
"""Merge the prompt and history."""
prompt_dict = dict()
prompt_dict["context"] = related_context
for prompt in self._prompt.messages:
if isinstance(prompt, HumanPromptTemplate):
prompt_dict[prompt.input_variables[0]] = user_input
if history:
if self._str_history:
prompt_dict[self._history_key] = BaseMessage.messages_to_string(history)
else:
prompt_dict[self._history_key] = history
return await self.format_prompt(self._prompt, prompt_dict)

View File

@ -59,7 +59,7 @@ with DAG("dbgpt_awel_simple_rag_summary_example") as dag:
request_handle_task = RequestHandleOperator()
path_operator = MapOperator(lambda request: request["url"])
# build knowledge operator
knowledge_operator = KnowledgeOperator(knowledge_type=KnowledgeType.URL)
knowledge_operator = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name)
# build summary assembler operator
summary_operator = SummaryAssemblerOperator(
llm_client=OpenAILLMClient(), language="en"

View File

@ -76,7 +76,7 @@ with DAG("simple_sdk_rag_embedding_example") as dag:
"/examples/rag/embedding", methods="POST", request_body=TriggerReqBody
)
request_handle_task = RequestHandleOperator()
knowledge_operator = KnowledgeOperator(knowledge_type=KnowledgeType.URL)
knowledge_operator = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name)
vector_connector = _create_vector_connector()
url_parser_operator = MapOperator(map_function=lambda x: x["url"])
embedding_operator = EmbeddingAssemblerOperator(

View File

@ -39,7 +39,7 @@ from dbgpt.storage.vector_store.connector import VectorStoreConnector
..code-block:: shell
DBGPT_SERVER="http://127.0.0.1:5555"
curl -X POST $DBGPT_SERVER/api/v1/awel/trigger/examples/rag/retrieve \
-H "Content-Type: application/json" -d '{
-H "Content-Type: application/json" -d '{ \
"query": "what is awel talk about?"
}'
"""