From a54eefefa2de2a8038aae34de6cc8295be27f807 Mon Sep 17 00:00:00 2001 From: aries_ckt <916701291@qq.com> Date: Fri, 4 Jul 2025 21:53:25 +0800 Subject: [PATCH] feat:deepsearch agent --- .../expand/actions/deep_search_action.py | 198 ++++++++ .../src/dbgpt/agent/expand/deep_search.py | 426 ++++++++++++++++++ .../dbgpt_serve/agent/resource/knowledge.py | 6 + .../rag/retriever/knowledge_space.py | 2 +- 4 files changed, 631 insertions(+), 1 deletion(-) create mode 100644 packages/dbgpt-core/src/dbgpt/agent/expand/actions/deep_search_action.py create mode 100644 packages/dbgpt-core/src/dbgpt/agent/expand/deep_search.py diff --git a/packages/dbgpt-core/src/dbgpt/agent/expand/actions/deep_search_action.py b/packages/dbgpt-core/src/dbgpt/agent/expand/actions/deep_search_action.py new file mode 100644 index 000000000..6bd9ed16d --- /dev/null +++ b/packages/dbgpt-core/src/dbgpt/agent/expand/actions/deep_search_action.py @@ -0,0 +1,198 @@ +import json +import logging +from typing import Optional, List + +from dbgpt.agent import Action, ActionOutput, AgentResource, Resource, ResourceType +from dbgpt.util.json_utils import parse_or_raise_error + +from ...resource.tool.base import BaseTool, ToolParameter +from ...util.react_parser import ReActOutputParser, ReActStep +from .tool_action import ToolAction, run_tool +from dbgpt._private.pydantic import BaseModel, Field, model_to_dict + +logger = logging.getLogger(__name__) + + +class DeepSearchModel(BaseModel): + """Chart item model.""" + status: str = Field( + ..., + description="The status of the current action, can be split_query, summary, or reflection.", + ) + tools: List[dict] = Field( + default_factory=list, + description="List of tools to be used in the action.", + ) + intention: str = Field( + ..., + description="The intention of the current action, describing what you want to achieve.", + ) + sub_queries: List[str] = Field( + default_factory=list, + description="List of sub-queries generated from the current action.", + ) + + def to_dict(self): + """Convert to dict.""" + return model_to_dict(self) + + +class DeepSearchAction(ToolAction): + """React action class.""" + + def __init__(self, **kwargs): + """Tool action init.""" + super().__init__(**kwargs) + + @property + def resource_need(self) -> Optional[ResourceType]: + """Return the resource type needed for the action.""" + return None + + @classmethod + def parse_action( + cls, + ai_message: str, + default_action: "ReActAction", + resource: Optional[Resource] = None, + **kwargs, + ) -> Optional["ReActAction"]: + """Parse the action from the message. + + If you want skip the action, return None. + """ + return default_action + + async def run( + self, + ai_message: str, + resource: Optional[AgentResource] = None, + rely_action_out: Optional[ActionOutput] = None, + need_vis_render: bool = True, + **kwargs, + ) -> ActionOutput: + """Perform the action.""" + try: + action_param: DeepSearchModel = self._input_convert( + ai_message, DeepSearchModel + ) + except Exception as e: + logger.exception(str(e)) + return ActionOutput( + is_exe_success=False, + content="The requested correctly structured answer could not be found.", + ) + + if action_param.status == "split_query": + sub_queries = action_param.sub_queries + # execute knowledge search + if not action_param.tools: + return ActionOutput( + is_exe_success=False, + content="No tools available for knowledge search.", + ) + if action_param.tools: + for tool in action_param.tools: + if tool.get("tool_type") == "KnowledgeRetrieve": + knowledge_args = action_param.get("args", {}) + if not knowledge_args: + return ActionOutput( + is_exe_success=False, + content="No arguments provided for knowledge search.", + ) + act_out = await self.knowledge_retrieve( + sub_queries, + knowledge_args, + self.resource, + ) + + + # if "parser" in kwargs and isinstance(kwargs["parser"], ReActOutputParser): + # parser = kwargs["parser"] + # else: + # parser = ReActOutputParser() + # steps = parser.parse(ai_message) + # if len(steps) != 1: + # raise ValueError("Only one action is allowed each time.") + # step = steps[0] + # act_out = await self._do_run(ai_message, step, need_vis_render=need_vis_render) + # if not act_out.action: + # act_out.action = step.action + # if step.thought: + # act_out.thoughts = step.thought + # if ( + # not act_out.action_input + # and step.action_input + # and isinstance(step.action_input, str) + # ): + # act_out.action_input = step.action_input + return act_out + + async def knowledge_retrieve( + self, sub_queries: List[str], knowledge_args: List[str], resource: Resource + ) -> ActionOutput: + """Perform knowledge retrieval.""" + query_context_map = {} + for query in sub_queries: + resource_prompt, resource_reference = await resource.get_prompt( + lang=self.language, question=query + ) + query_context_map[query] = resource_prompt + action_output = ActionOutput( + is_exe_success=True, + content="\n".join([ + f"{query}:{context}" for query, context in query_context_map.items()] + ), + view="\n".join([ + f"{query}:{context}" for query, context in query_context_map.items()] + ), + observations=query_context_map, + ) + return action_output + + + + + async def _do_run( + self, + ai_message: str, + parsed_step: ReActStep, + need_vis_render: bool = True, + ) -> ActionOutput: + """Perform the action.""" + tool_args = {} + name = parsed_step.action + action_input = parsed_step.action_input + action_input_str = action_input + + if not name: + terminal_content = str(action_input_str if action_input_str else ai_message) + return ActionOutput( + is_exe_success=True, + content=terminal_content, + observations=terminal_content, + terminate=True, + ) + + try: + # Try to parse the action input to dict + if action_input and isinstance(action_input, str): + tool_args = parse_or_raise_error(action_input) + elif isinstance(action_input, dict) or isinstance(action_input, list): + tool_args = action_input + action_input_str = json.dumps(action_input, ensure_ascii=False) + except json.JSONDecodeError: + if parsed_step.action == "terminate": + tool_args = {"output": action_input} + logger.warning(f"Failed to parse the args: {action_input}") + act_out = await run_tool( + name, + tool_args, + self.resource, + self.render_protocol, + need_vis_render=need_vis_render, + raw_tool_input=action_input_str, + ) + if not act_out.action_input: + act_out.action_input = action_input_str + return act_out diff --git a/packages/dbgpt-core/src/dbgpt/agent/expand/deep_search.py b/packages/dbgpt-core/src/dbgpt/agent/expand/deep_search.py new file mode 100644 index 000000000..b324ee76b --- /dev/null +++ b/packages/dbgpt-core/src/dbgpt/agent/expand/deep_search.py @@ -0,0 +1,426 @@ +import json +import logging +from datetime import datetime +from typing import Any, Dict, List, Optional, Type, Union + +from dbgpt._private.pydantic import Field +from dbgpt.agent import ( + ActionOutput, + Agent, + AgentMemoryFragment, + AgentMessage, + ConversableAgent, + ProfileConfig, + Resource, + ResourceType, + StructuredAgentMemoryFragment, +) +from dbgpt.agent.core.role import AgentRunMode +from dbgpt.agent.resource import BaseTool, ResourcePack, ToolPack +from dbgpt.agent.util.react_parser import ReActOutputParser +from dbgpt.util.configure import DynConfig +from .actions.deep_search_action import DeepSearchAction + +from ...core import ModelMessageRoleType +from .actions.react_action import ReActAction, Terminate +from ...util.tracer import root_tracer + +logger = logging.getLogger(__name__) + +_DEEPSEARCH_GOAL = """Answer the following questions or solve the tasks by \ +selecting the right search tools. +""" + +# _DEEPSEARCH_SYSTEM_TEMPLATE = """ +# You are a DeepSearch Assistant. Your task is to answer questions or solve problems by utilizing a combination of knowledge retrieve tools and search tools. You should break down the task, search for information, reflect on the results, and synthesize a comprehensive answer. +# +# +# 1. Knowledge Tools: Query the internal knowledge base for information. \n {{knowledge_tools}} +# 2. WebSearch Tools: Perform an internet search for up-to-date or additional information. \n {{search_tools}} +# 3. Summarize: Summarize and synthesize information from multiple sources. +# +# +# # PROCESS # +# 1. Analyze the task and create a search plan. +# 2. Use one or more tools to gather information. +# 3. Reflect on the gathered information and determine if it's sufficient to answer the question. +# 4. If the information is insufficient, revise your plan and continue searching. +# 5. Once you have enough information, synthesize a final answer. +# +# +# For each step in your process, your response should contain: +# 1. Analysis of the current state and reasoning for your next action (prefix "Thought: "). +# 2. One or more tool uses, each containing: +# - Tool name (prefix "Tool: "). +# - Tool input (prefix "Input: "). +# 3. After receiving tool output, a reflection on the information (prefix "Reflection: "). +# +# +# +# Human: Who won the Nobel Prize in Literature in 2022? +# +# DeepSearch: +# Thought: To answer this question, I need to search for recent information about the Nobel Prize in Literature. I'll start with a web search as it's likely to have the most up-to-date information. +# +# Tool: WebSearch +# Input: Nobel Prize in Literature 2022 winner +# +# +# +# Please Solve this task: +# {{ question }} +# +# +# The current time is: {{ now_time }}. +# """ +_DEEPSEARCH_SYSTEM_TEMPLATE = """ +你是一个深度搜索助手。你的任务是你将用户原始问题一个或者多个子问题,并且给出可用知识库工具和搜索工具来回答问题或解决问题。 + +<可用工具> +1. KnowledgeRetrieve: 查询内部知识库以获取信息。\n可用知识库: {{knowledge_tools}} +2. WebSearch: 进行互联网搜索以获取最新或额外信息。\n 可用搜索工具: {{search_tools}} +3. 总结: 对多个来源的信息进行总结和综合。 + + +<流程> +1. 分析任务并创建搜索计划。 +2. 选择使用一个或多个工具收集信息。 + + +<回复格式> +严格按以下JSON格式输出,确保可直接解析: +{ + "status": "split_query (拆解搜索计划) | summary (仅当可用知识可以回答用户问题) | reflection (反思) " + "tools": [{ + "tool_type": "工具类型" + "args": "args1", + }], + "intention": "当前你的意图, + "sub_queries": [], +} + + +<示例> +人类: 谁在2022年获得了诺贝尔文学奖? + +深度搜索: +思考: 要回答这个问题,我需要搜索关于2022年诺贝尔文学奖的最新信息。我会从网络搜索开始,因为这可能会有最新的信息。 +<可用工具> +1. KnowledgeRetrieve: 查询内部知识库以获取信息。\n可用知识库: {{knowledge_tools}} +2. WebSearch: 进行互联网搜索以获取最新或额外信息。\n 可用搜索工具: {{search_tools}} + +3. 总结: 对多个来源的信息进行总结和综合。 + +工具类型: KnowledgeRetrieve +工具参数: +: 2022年诺贝尔文学奖得主 +返回 +{ + "status": "split_query" + "tools"?: [{ + "tool_type": "KnowledgeRetrieve" + "args": "knowledge_name", + }], + "intention": "你的拆解意图, + "sub_queries": [], +} + + +<任务> +请解决这个任务: +{{ question }} + + +当前时间是: {{ now_time }}。 +""" + +_DEEPSEARCH_USER_TEMPLATE = """""" + + +_REACT_WRITE_MEMORY_TEMPLATE = """\ +{% if question %}Question: {{ question }} {% endif %} +{% if thought %}Thought: {{ thought }} {% endif %} +{% if action %}Action: {{ action }} {% endif %} +{% if action_input %}Action Input: {{ action_input }} {% endif %} +{% if observation %}Observation: {{ observation }} {% endif %} +""" + + +class DeepSearchAgent(ConversableAgent): + max_retry_count: int = 15 + run_mode: AgentRunMode = AgentRunMode.LOOP + + profile: ProfileConfig = ProfileConfig( + name=DynConfig( + "DeepSearchAssistant", + category="agent", + key="dbgpt_agent_expand_plugin_assistant_agent_name", + ), + role=DynConfig( + "DeepSearchAssistant", + category="agent", + key="dbgpt_agent_expand_plugin_assistant_agent_role", + ), + goal=DynConfig( + _DEEPSEARCH_GOAL, + category="agent", + key="dbgpt_agent_expand_plugin_assistant_agent_goal", + ), + system_prompt_template=_DEEPSEARCH_SYSTEM_TEMPLATE, + user_prompt_template=_DEEPSEARCH_USER_TEMPLATE, + write_memory_template=_REACT_WRITE_MEMORY_TEMPLATE, + ) + parser: ReActOutputParser = Field(default_factory=ReActOutputParser) + + def __init__(self, **kwargs): + """Init indicator AssistantAgent.""" + super().__init__(**kwargs) + + self._init_actions([DeepSearchAction, Terminate]) + + # async def _a_init_reply_message( + # self, + # received_message: AgentMessage, + # rely_messages: Optional[List[AgentMessage]] = None, + # ) -> AgentMessage: + # reply_message = super()._init_reply_message(received_message, rely_messages) + # + # tool_packs = ToolPack.from_resource(self.resource) + # action_space = [] + # action_space_names = [] + # action_space_simple_desc = [] + # if tool_packs: + # tool_pack = tool_packs[0] + # for tool in tool_pack.sub_resources: + # tool_desc, _ = await tool.get_prompt(lang=self.language) + # action_space_names.append(tool.name) + # action_space.append(tool_desc) + # if isinstance(tool, BaseTool): + # tool_simple_desc = tool.description + # else: + # tool_simple_desc = tool.get_prompt() + # action_space_simple_desc.append(f"{tool.name}: {tool_simple_desc}") + # else: + # for action in self.actions: + # action_space_names.append(action.name) + # action_space.append(action.get_action_description()) + # # self.actions + # reply_message.context = { + # "max_steps": self.max_retry_count, + # "action_space": "\n".join(action_space), + # "action_space_names": ", ".join(action_space_names), + # "action_space_simple_desc": "\n".join(action_space_simple_desc), + # } + # return reply_message + + async def preload_resource(self) -> None: + await super().preload_resource() + self._check_and_add_terminate() + + async def generate_resource_variables( + self, resource_prompt: Optional[str] = None + ) -> Dict[str, Any]: + """Generate the resource variables.""" + out_schema: Optional[str] = "" + if self.actions and len(self.actions) > 0: + out_schema = self.actions[0].ai_out_schema + if not resource_prompt: + resource_prompt = "" + now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + return { + "knowledge_tools": resource_prompt, + "search_tools": "", + "out_schema": out_schema, + "now_time": now_time, + } + + def _check_and_add_terminate(self): + if not self.resource: + return + _is_has_terminal = False + + def _has_terminal(r: Resource): + nonlocal _is_has_terminal + if r.type() == ResourceType.Tool and isinstance(r, Terminate): + _is_has_terminal = True + return r + + _has_add_terminal = False + + def _add_terminate(r: Resource): + nonlocal _has_add_terminal + if not _has_add_terminal and isinstance(r, ResourcePack): + terminal = Terminate() + r._resources[terminal.name] = terminal + _has_add_terminal = True + return r + + self.resource.apply(apply_func=_has_terminal) + if not _is_has_terminal: + # Add terminal action to the resource + self.resource.apply(apply_pack_func=_add_terminate) + + async def load_resource(self, question: str, is_retry_chat: bool = False): + """Load agent bind resource.""" + abilities = [] + if self.resource: + def _remove_tool(r: Resource): + if r.type() == ResourceType.Tool: + return None + return r + + # Remove all tools from the resource + # We will handle tools separately + + if isinstance(self.resource, ResourcePack): + for resource in self.resource.sub_resources: + from dbgpt_serve.agent.resource.knowledge import \ + KnowledgeSpaceRetrieverResource + if isinstance(resource, KnowledgeSpaceRetrieverResource): + abilities.append({ + "knowledge_name": resource.retriever_name, + "knowledge_desc": resource.retriever_desc, + }) + else: + from dbgpt_serve.agent.resource.knowledge import KnowledgeSpaceRetrieverResource + if isinstance(self.resource, KnowledgeSpaceRetrieverResource): + abilities.append({ + "knowledge_name": self.resource.retriever_name, + "knowledge_desc": self.resource.retriever_desc, + }) + + # new_resource = self.resource.apply(apply_func=_remove_tool) + # if new_resource: + # resource_prompt, resource_reference = await new_resource.get_prompt( + # lang=self.language, question=question + # ) + # return resource_prompt, resource_reference + return json.dumps(abilities, ensure_ascii=False), [] + + def prepare_act_param( + self, + received_message: Optional[AgentMessage], + sender: Agent, + rely_messages: Optional[List[AgentMessage]] = None, + **kwargs, + ) -> Dict[str, Any]: + """Prepare the parameters for the act method.""" + return { + "parser": self.parser, + } + + async def act( + self, + message: AgentMessage, + sender: Agent, + reviewer: Optional[Agent] = None, + is_retry_chat: bool = False, + last_speaker_name: Optional[str] = None, + **kwargs, + ) -> ActionOutput: + """Perform actions.""" + last_out: Optional[ActionOutput] = None + for i, action in enumerate(self.actions): + if not message: + raise ValueError("The message content is empty!") + + with root_tracer.start_span( + "agent.act.run", + metadata={ + "message": message, + "sender": sender.name if sender else None, + "recipient": self.name, + "reviewer": reviewer.name if reviewer else None, + "rely_action_out": last_out.to_dict() if last_out else None, + "conv_uid": self.not_null_agent_context.conv_id, + "action_index": i, + "total_action": len(self.actions), + }, + ) as span: + ai_message = message.content if message.content else "" + # real_action = action.parse_action( + # ai_message, default_action=action, **kwargs + # ) + # if real_action is None: + # continue + + last_out = await action.run( + ai_message=message.content if message.content else "", + resource=None, + rely_action_out=last_out, + **kwargs, + ) + span.metadata["action_out"] = last_out.to_dict() if last_out else None + if not last_out: + raise ValueError("Action should return value!") + return last_out + + @property + def memory_fragment_class(self) -> Type[AgentMemoryFragment]: + """Return the memory fragment class.""" + return StructuredAgentMemoryFragment + + async def read_memories( + self, + observation: str, + ) -> Union[str, List["AgentMessage"]]: + memories = await self.memory.read(observation) + not_json_memories = [] + messages = [] + structured_memories = [] + for m in memories: + if m.raw_observation: + try: + mem_dict = json.loads(m.raw_observation) + if isinstance(mem_dict, dict): + structured_memories.append(mem_dict) + elif isinstance(mem_dict, list): + structured_memories.extend(mem_dict) + else: + raise ValueError("Invalid memory format.") + except Exception: + not_json_memories.append(m.raw_observation) + + for mem_dict in structured_memories: + question = mem_dict.get("question") + thought = mem_dict.get("thought") + action = mem_dict.get("action") + action_input = mem_dict.get("action_input") + observation = mem_dict.get("observation") + if question: + messages.append( + AgentMessage( + content=f"Question: {question}", + role=ModelMessageRoleType.HUMAN, + ) + ) + ai_content = [] + if thought: + ai_content.append(f"Thought: {thought}") + if action: + ai_content.append(f"Action: {action}") + if action_input: + ai_content.append(f"Action Input: {action_input}") + messages.append( + AgentMessage( + content="\n".join(ai_content), + role=ModelMessageRoleType.AI, + ) + ) + + if observation: + messages.append( + AgentMessage( + content=f"Observation: {observation}", + role=ModelMessageRoleType.HUMAN, + ) + ) + + if not messages and not_json_memories: + messages.append( + AgentMessage( + content="\n".join(not_json_memories), + role=ModelMessageRoleType.HUMAN, + ) + ) + return messages diff --git a/packages/dbgpt-serve/src/dbgpt_serve/agent/resource/knowledge.py b/packages/dbgpt-serve/src/dbgpt_serve/agent/resource/knowledge.py index f5f5542aa..39c5d1a37 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/agent/resource/knowledge.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/agent/resource/knowledge.py @@ -95,6 +95,12 @@ class KnowledgeSpaceRetrieverResource(RetrieverResource): """Return the retriever desc.""" return self._retriever_desc + # @property + # def abilities(self) -> List[dict]: + # abilities = [] + # if self.reso + # return abilities + @classmethod def resource_parameters_class( cls, **kwargs diff --git a/packages/dbgpt-serve/src/dbgpt_serve/rag/retriever/knowledge_space.py b/packages/dbgpt-serve/src/dbgpt_serve/rag/retriever/knowledge_space.py index 0461f9582..239ddcd27 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/rag/retriever/knowledge_space.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/rag/retriever/knowledge_space.py @@ -182,7 +182,7 @@ class KnowledgeSpaceRetriever(BaseRetriever): logger.info("Starting Full Text retrieval") return await self.full_text_retrieve(query, self._top_k, filters) elif self._retrieve_mode == RetrieverStrategy.Tree.value: - logger.info("Starting Doc Tree retrieval") + logger.ianfo("Starting Doc Tree retrieval") return await self.tree_index_retrieve(query, self._top_k, filters) elif self._retrieve_mode == RetrieverStrategy.HYBRID.value: logger.info("Starting Hybrid retrieval")