feat(agent): Multi agents v0.1 (#1044)

Co-authored-by: qidanrui <qidanrui@gmail.com>
Co-authored-by: csunny <cfqsunny@163.com>
Co-authored-by: Fangyin Cheng <staneyffer@gmail.com>
This commit is contained in:
明天
2024-01-09 11:09:52 +08:00
committed by GitHub
parent 409556d31d
commit e11b72c724
41 changed files with 1441 additions and 397 deletions

View File

@@ -3,15 +3,14 @@ import logging
import uuid
from abc import ABC
from collections import defaultdict
from typing import Dict, List
from fastapi import APIRouter, Body
from fastapi.responses import StreamingResponse
from dbgpt._private.config import Config
from dbgpt.agent.agents.agent import AgentContext
from dbgpt.agent.agents.agent import Agent, AgentContext
from dbgpt.agent.agents.agents_mange import agent_mange
from dbgpt.agent.agents.plan_group_chat import PlanChat, PlanChatManager
from dbgpt.agent.agents.planner_agent import PlannerAgent
from dbgpt.agent.agents.user_proxy_agent import UserProxyAgent
from dbgpt.agent.common.schema import Status
from dbgpt.agent.memory.gpts_memory import GptsMemory
@@ -20,9 +19,12 @@ from dbgpt.component import BaseComponent, ComponentType, SystemApp
from dbgpt.model.cluster import WorkerManagerFactory
from dbgpt.model.cluster.client import DefaultLLMClient
from dbgpt.serve.agent.model import PagenationFilter, PluginHubFilter
from dbgpt.serve.agent.team.plan.team_auto_plan import AutoPlanChatManager
from ..db.gpts_conversations_db import GptsConversationsDao, GptsConversationsEntity
from ..db.gpts_mange_db import GptsInstanceDao, GptsInstanceEntity
from ..team.base import TeamMode
from ..team.layout.team_awel_layout import AwelLayoutChatManger
from .db_gpts_memory import MetaDbGptsMessageMemory, MetaDbGptsPlansMemory
from .dbgpts import DbGptsInstance
@@ -51,14 +53,11 @@ class MultiAgents(BaseComponent, ABC):
def gpts_create(self, entity: GptsInstanceEntity):
self.gpts_intance.add(entity)
async def plan_chat(
async def _build_agent_context(
self,
name: str,
user_query: str,
conv_id: str,
user_code: str = None,
sys_code: str = None,
):
) -> AgentContext:
gpts_instance: GptsInstanceEntity = self.gpts_intance.get_by_name(name)
if gpts_instance is None:
raise ValueError(f"can't find dbgpts!{name}")
@@ -77,7 +76,6 @@ class MultiAgents(BaseComponent, ABC):
if gpts_instance.resource_internet
else None
)
### init chat param
worker_manager = CFG.SYSTEM_APP.get_component(
ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory
@@ -92,38 +90,66 @@ class MultiAgents(BaseComponent, ABC):
context.llm_models = await llm_task.models()
context.model_priority = llm_models_priority
return context
async def _build_chat_manger(
self, context: AgentContext, mode: TeamMode, agents: List[Agent]
):
if mode == TeamMode.SINGLE_AGENT:
manager = agents[0]
else:
if TeamMode.AUTO_PLAN == mode:
manager = AutoPlanChatManager(
agent_context=context,
memory=self.memory,
plan_chat=groupchat,
planner=planner,
)
elif TeamMode.AWEL_LAYOUT == mode:
manager = AwelLayoutChatManger(
agent_context=context,
memory=self.memory,
)
else:
raise ValueError(f"Unknown Agent Team Mode!{mode}")
manager.hire(agents)
return manager
async def agent_team_chat(
self,
name: str,
mode: TeamMode,
user_query: str,
conv_id: str,
user_code: str = None,
sys_code: str = None,
):
"""Initiate an Agent-based conversation
Args:
name:
mode:
user_query:
conv_id:
user_code:
sys_code:
Returns:
"""
context = await self._build_agent_context(name, conv_id)
agent_map = defaultdict()
### default plan excute mode
agents = []
for name in agents_names:
for name in context.agents:
cls = agent_mange.get_by_name(name)
agent = cls(
agent_context=context,
memory=self.memory,
)
agents.append(agent)
agent_map[name] = agent
groupchat = PlanChat(agents=agents, messages=[], max_round=50)
planner = PlannerAgent(
agent_context=context,
memory=self.memory,
plan_chat=groupchat,
)
agent_map[planner.name] = planner
manager = PlanChatManager(
agent_context=context,
memory=self.memory,
plan_chat=groupchat,
planner=planner,
)
agent_map[manager.name] = manager
manager = await self._build_chat_manger(context, mode, agents)
user_proxy = UserProxyAgent(memory=self.memory, agent_context=context)
agent_map[user_proxy.name] = user_proxy
gpts_conversation = self.gpts_conversations.get_by_conv_id(conv_id)
if gpts_conversation is None:
@@ -131,7 +157,77 @@ class MultiAgents(BaseComponent, ABC):
GptsConversationsEntity(
conv_id=conv_id,
user_goal=user_query,
gpts_name=gpts_instance.gpts_name,
gpts_name=name,
state=Status.RUNNING.value,
max_auto_reply_round=context.max_chat_round,
auto_reply_count=0,
user_code=user_code,
sys_code=sys_code,
)
)
## dbgpts conversation save
try:
await user_proxy.a_initiate_chat(
recipient=manager,
message=user_query,
memory=self.memory,
)
except Exception as e:
logger.error(f"chat abnormal termination{str(e)}", e)
self.gpts_conversations.update(conv_id, Status.FAILED.value)
else:
# retry chat
self.gpts_conversations.update(conv_id, Status.RUNNING.value)
try:
await user_proxy.a_retry_chat(
recipient=manager,
memory=self.memory,
)
except Exception as e:
logger.error(f"chat abnormal termination{str(e)}", e)
self.gpts_conversations.update(conv_id, Status.FAILED.value)
self.gpts_conversations.update(conv_id, Status.COMPLETE.value)
return conv_id
async def plan_chat(
self,
name: str,
user_query: str,
conv_id: str,
user_code: str = None,
sys_code: str = None,
):
context = await self._build_agent_context(name, conv_id)
### default plan excute mode
agents = []
for name in context.agents:
cls = agent_mange.get_by_name(name)
agent = cls(
agent_context=context,
memory=self.memory,
)
agents.append(agent)
agent_map[name] = agent
manager = AutoPlanChatManager(
agent_context=context,
memory=self.memory,
)
manager.hire(agents)
user_proxy = UserProxyAgent(memory=self.memory, agent_context=context)
gpts_conversation = self.gpts_conversations.get_by_conv_id(conv_id)
if gpts_conversation is None:
self.gpts_conversations.add(
GptsConversationsEntity(
conv_id=conv_id,
user_goal=user_query,
gpts_name=name,
state=Status.RUNNING.value,
max_auto_reply_round=context.max_chat_round,
auto_reply_count=0,
@@ -157,7 +253,6 @@ class MultiAgents(BaseComponent, ABC):
try:
await user_proxy.a_retry_chat(
recipient=manager,
agent_map=agent_map,
memory=self.memory,
)
except Exception as e:

View File

@@ -7,7 +7,7 @@ from dbgpt.agent.memory.gpts_memory import (
GptsPlansMemory,
)
from ..db.gpts_messages_db import GptsMessagesDao, GptsMessagesEntity
from ..db.gpts_messages_db import GptsMessagesDao
from ..db.gpts_plans_db import GptsPlansDao, GptsPlansEntity
@@ -15,7 +15,7 @@ class MetaDbGptsPlansMemory(GptsPlansMemory):
def __init__(self):
self.gpts_plan = GptsPlansDao()
def batch_save(self, plans: list[GptsPlan]):
def batch_save(self, plans: List[GptsPlan]):
self.gpts_plan.batch_save([item.to_dict() for item in plans])
def get_by_conv_id(self, conv_id: str) -> List[GptsPlan]:

View File

View File

@@ -0,0 +1,58 @@
import logging
from enum import Enum
from typing import List, Union
logger = logging.getLogger(__name__)
class TeamMode(Enum):
AUTO_PLAN = "auto_plan"
AWEL_LAYOUT = "awel_layout"
SINGLE_AGENT = "singe_agent"
def content_str(content: Union[str, List, None]) -> str:
"""Converts `content` into a string format.
This function processes content that may be a string, a list of mixed text and image URLs, or None,
and converts it into a string. Text is directly appended to the result string, while image URLs are
represented by a placeholder image token. If the content is None, an empty string is returned.
Args:
- content (Union[str, List, None]): The content to be processed. Can be a string, a list of dictionaries
representing text and image URLs, or None.
Returns:
str: A string representation of the input content. Image URLs are replaced with an image token.
Note:
- The function expects each dictionary in the list to have a "type" key that is either "text" or "image_url".
For "text" type, the "text" key's value is appended to the result. For "image_url", an image token is appended.
- This function is useful for handling content that may include both text and image references, especially
in contexts where images need to be represented as placeholders.
"""
if content is None:
return ""
if isinstance(content, str):
return content
if not isinstance(content, list):
raise TypeError(f"content must be None, str, or list, but got {type(content)}")
rst = ""
for item in content:
if not isinstance(item, dict):
raise TypeError(
"Wrong content format: every element should be dict if the content is a list."
)
assert (
"type" in item
), "Wrong content format. Missing 'type' key in content's dict."
if item["type"] == "text":
rst += item["text"]
elif item["type"] == "image_url":
rst += "<image>"
else:
raise ValueError(
f"Wrong content format: unknown type {item['type']} within the content"
)
return rst

View File

@@ -0,0 +1,72 @@
from abc import ABC
from typing import Dict, List, Optional
from dbgpt.agent.agents.agent import Agent, AgentGenerateContext
from dbgpt.core.awel import BranchFunc, BranchOperator, MapOperator
from dbgpt.core.interface.message import ModelMessageRoleType
class BaseAgentOperator:
"""The abstract operator for a Agent."""
SHARE_DATA_KEY_MODEL_NAME = "share_data_key_agent_name"
def __init__(self, agent: Optional[Agent] = None):
self._agent = agent
@property
def agent(self) -> Agent:
"""Return the Agent."""
if not self._agent:
raise ValueError("agent is not set")
return self._agent
class AgentOperator(
BaseAgentOperator, MapOperator[AgentGenerateContext, AgentGenerateContext], ABC
):
def __init__(self, agent: Agent, **kwargs):
super().__init__(agent=agent)
MapOperator.__init__(self, **kwargs)
async def map(self, input_value: AgentGenerateContext) -> AgentGenerateContext:
now_rely_messages: List[Dict] = []
input_value.message["current_gogal"] = (
self._agent.name + ":" + input_value.message["current_gogal"]
)
###What was received was the User message
human_message = input_value.message.copy()
human_message["role"] = ModelMessageRoleType.HUMAN
now_rely_messages.append(human_message)
###Send a message (no reply required) and pass the message content
now_message = input_value.message
if input_value.rely_messages and len(input_value.rely_messages) > 0:
now_message = input_value.rely_messages[-1]
await input_value.sender.a_send(
now_message, self._agent, input_value.reviewer, False
)
verify_paas, reply_message = await self._agent.a_generate_reply(
message=input_value.message,
sender=input_value.sender,
reviewer=input_value.reviewer,
silent=input_value.silent,
rely_messages=input_value.rely_messages,
)
### Retry on failure
###What is sent is an AI message
ai_message = reply_message
ai_message["role"] = ModelMessageRoleType.AI
now_rely_messages.append(ai_message)
### Handle user goals and outcome dependencies
return AgentGenerateContext(
message=input_value.message,
sender=self._agent,
reviewer=input_value.reviewer,
rely_messages=now_rely_messages, ## Default single step transfer of information
silent=input_value.silent,
)

View File

@@ -0,0 +1,89 @@
import logging
import sys
from typing import Any, Optional
from dbgpt.agent.agents.agent import Agent, AgentContext, AgentGenerateContext
from dbgpt.agent.agents.base_team import MangerAgent
from dbgpt.agent.memory.gpts_memory import GptsMemory
from dbgpt.core.awel import DAG
from dbgpt.serve.agent.team.layout.agent_operator import AgentOperator
logger = logging.getLogger(__name__)
class AwelLayoutChatManger(MangerAgent):
NAME = "layout_manager"
def __init__(
self,
memory: GptsMemory,
agent_context: AgentContext,
# unlimited consecutive auto reply by default
max_consecutive_auto_reply: Optional[int] = sys.maxsize,
human_input_mode: Optional[str] = "NEVER",
describe: Optional[str] = "layout chat manager.",
**kwargs,
):
super().__init__(
name=self.NAME,
describe=describe,
memory=memory,
max_consecutive_auto_reply=max_consecutive_auto_reply,
human_input_mode=human_input_mode,
agent_context=agent_context,
**kwargs,
)
# Allow async chat if initiated using a_initiate_chat
self.register_reply(
Agent,
AwelLayoutChatManger.a_run_chat,
)
async def a_run_chat(
self,
message: Optional[str] = None,
sender: Optional[Agent] = None,
reviewer: Agent = None,
config: Optional[Any] = None,
):
try:
last_node: AgentOperator = None
with DAG(
f"layout_agents_{self.agent_context.gpts_name}_{self.agent_context.conv_id}"
) as dag:
for agent in self.agents:
now_node = AgentOperator(agent=agent)
if not last_node:
last_node = now_node
else:
last_node >> now_node
last_node = now_node
start_message = {
"content": message,
"current_gogal": message,
}
start_message_context: AgentGenerateContext = AgentGenerateContext(
message=start_message, sender=self, reviewer=reviewer
)
final_generate_context: AgentGenerateContext = await last_node.call(
call_data={"data": start_message_context}
)
last_message = final_generate_context.rely_messages[-1]
last_agent = last_node.agent
await last_agent.a_send(
last_message, self, start_message_context.reviewer, False
)
return True, {
"is_exe_success": True,
"content": last_message.get("content", None),
"view": last_message.get("view", None),
}
except Exception as e:
logger.exception("DAG run failed!")
return True, {
"content": f"AWEL task process [{dag.dag_id}] execution exception! {str(e)}",
"is_exe_success": False,
}

View File

View File

@@ -0,0 +1,189 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from dbgpt._private.config import Config
from dbgpt.agent.agents.agent import Agent, AgentContext
from dbgpt.agent.agents.base_agent import ConversableAgent
from dbgpt.agent.common.schema import Status
from dbgpt.agent.memory.base import GptsPlan
from dbgpt.agent.memory.gpts_memory import GptsMemory
from dbgpt.util.json_utils import find_json_objects
CFG = Config()
class PlannerAgent(ConversableAgent):
"""Planner agent, realizing task goal planning decomposition through LLM"""
DEFAULT_SYSTEM_MESSAGE = """
你是一个任务规划专家!您需要理解下面每个智能代理和他们的能力,却确保在没有用户帮助下,使用给出的资源,通过协调下面可用智能代理来回答用户问题。
请发挥你LLM的知识和理解能力理解用户问题的意图和目标生成一个可用智能代理协作的任务计划解决用户问题。
可用资源:
{all_resources}
可用智能代理:
{agents}
*** 重要的提醒 ***
- 充分理解用户目标然后进行必要的步骤拆分,拆分需要保证逻辑顺序和精简,尽量把可以一起完成的内容合并再一个步骤,拆分后每个子任务步骤都将是一个需要智能代理独立完成的目标, 请确保每个子任务目标内容简洁明了
- 请确保只使用上面提到的智能代理,并且可以只使用其中需要的部分,严格根据描述能力和限制分配给合适的步骤,每个智能代理都可以重复使用
- 给子任务分配智能代理是需要考虑整体计划,确保和前后依赖步骤的关系,数据可以被传递使用
- 根据用户目标的实际需要使用提供的资源来协助生成计划步骤,不要使用不需要的资源
- 每个步骤最好是使用一种资源完成一个子目标,如果当前目标可以分解为同类型的多个子任务,可以生成相互不依赖的并行任务
- 数据库资源只需要使用结构生成SQL数据获取交给用户执行
- 尽量合并有顺序依赖的连续相同步骤,如果用户目标无拆分必要,可以生成内容为用户目标的单步任务
- 仔细检查计划,确保计划完整的包含了用户问题所涉及的所有信息,并且最终能完成目标,确认每个步骤是否包含了需要用到的资源信息,如URL、资源名等.
具体任务计划的生成可参考如下例子:
user:help me build a sales report summarizing our key metrics and trends
assisant:[
{{
"serial_number": "1",
"agent": "DataScientist",
"content": "Retrieve total sales, average sales, and number of transactions grouped by "product_category"'.",
"rely": ""
}},
{{
"serial_number": "2",
"agent": "DataScientist",
"content": "Retrieve monthly sales and transaction number trends.",
"rely": ""
}},
{{
"serial_number": "3",
"agent": "DataScientist",
"content": "Count the number of transactions with "pay_status" as "paid" among all transactions to retrieve the sales conversion rate.",
"rely": ""
}},
{{
"serial_number": "4",
"agent": "Reporter",
"content": "Integrate analytical data into the format required to build sales reports.",
"rely": "1,2,3"
}}
]
请一步步思考并以如下json格式返回你的行动计划内容:
[{{
"serial_number":"0",
"agent": "用来完成当前步骤的智能代理",
"content": "当前步骤的任务内容,确保可以被智能代理执行",
"rely":"当前任务执行依赖的其他任务serial_number, 如:1,2,3, 无依赖为空"
}}]
确保回答的json可以被Python代码的json.loads函数加载解析.
"""
REPAIR_SYSTEM_MESSAGE = """
You are a planning expert! Now you need to use your professional knowledge to carefully check the generated plan, re-evaluate and analyze it, and ensure that each step of the plan is clear and complete and can be understood by the intelligent agent to solve the current plan Problems encountered! and return new program content as requested.
"""
NAME = "Planner"
def __init__(
self,
memory: GptsMemory,
agent_context: AgentContext,
agents: Optional[List[Agent]] = None,
is_termination_msg: Optional[Callable[[Dict], bool]] = None,
max_consecutive_auto_reply: Optional[int] = None,
human_input_mode: Optional[str] = "NEVER",
**kwargs,
):
super().__init__(
name=self.NAME,
memory=memory,
system_message=self.DEFAULT_SYSTEM_MESSAGE,
is_termination_msg=is_termination_msg,
max_consecutive_auto_reply=max_consecutive_auto_reply,
human_input_mode=human_input_mode,
agent_context=agent_context,
**kwargs,
)
self._agents = agents
### register planning funtion
self.register_reply(Agent, PlannerAgent._a_planning)
def build_param(self, agent_context: AgentContext):
resources = []
if agent_context.resource_db is not None:
db_connect = CFG.LOCAL_DB_MANAGE.get_connect(
agent_context.resource_db.get("name")
)
resources.append(
f"{agent_context.resource_db.get('type')}:{agent_context.resource_db.get('name')}\n{db_connect.get_table_info()}"
)
if agent_context.resource_knowledge is not None:
resources.append(
f"{agent_context.resource_knowledge.get('type')}:{agent_context.resource_knowledge.get('name')}\n{agent_context.resource_knowledge.get('introduce')}"
)
if agent_context.resource_internet is not None:
resources.append(
f"{agent_context.resource_internet.get('type')}:{agent_context.resource_internet.get('name')}\n{agent_context.resource_internet.get('introduce')}"
)
return {
"all_resources": "\n".join([f"- {item}" for item in resources]),
"agents": "\n".join(
[f"- {item.name}:{item.describe}" for item in self._agents]
),
}
async def a_system_fill_param(self):
params = self.build_param(self.agent_context)
self.update_system_message(self.DEFAULT_SYSTEM_MESSAGE.format(**params))
async def _a_planning(
self,
message: Optional[str] = None,
sender: Optional[Agent] = None,
reviewer: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
json_objects = find_json_objects(message)
plan_objects = []
fail_reason = (
"Please recheck your answerno usable plans generated in correct format"
)
json_count = len(json_objects)
rensponse_succ = True
if json_count != 1:
### Answer failed, turn on automatic repair
fail_reason += f"There are currently {json_count} json contents"
rensponse_succ = False
else:
try:
for item in json_objects[0]:
plan = GptsPlan(
conv_id=self.agent_context.conv_id,
sub_task_num=item.get("serial_number"),
sub_task_content=item.get("content"),
)
plan.resource_name = item.get("resource")
plan.max_retry_times = self.agent_context.max_retry_round
plan.sub_task_agent = item.get("agent")
plan.sub_task_title = item.get("content")
plan.rely = item.get("rely")
plan.retry_times = 0
plan.status = Status.TODO.value
plan_objects.append(plan)
except Exception as e:
fail_reason += f"Return json structure error and cannot be converted to a usable plan{str(e)}"
rensponse_succ = False
if rensponse_succ:
if len(plan_objects) > 0:
### Delete the old plan every time before saving it
self.memory.plans_memory.remove_by_conv_id(self.agent_context.conv_id)
self.memory.plans_memory.batch_save(plan_objects)
content = "\n".join(
[
"{},{}".format(index + 1, item.get("content"))
for index, item in enumerate(json_objects[0])
]
)
else:
content = fail_reason
return True, {
"is_exe_success": rensponse_succ,
"content": content,
"view": content,
}

View File

@@ -0,0 +1,284 @@
import logging
import sys
from typing import Any, List, Optional
from dbgpt.agent.agents.agent import Agent, AgentContext
from dbgpt.agent.agents.agents_mange import mentioned_agents, participant_roles
from dbgpt.agent.agents.base_agent import ConversableAgent
from dbgpt.agent.agents.base_team import MangerAgent
from dbgpt.agent.common.schema import Status
from dbgpt.agent.memory.base import GptsPlan
from dbgpt.agent.memory.gpts_memory import GptsMemory
from dbgpt.core.interface.message import ModelMessageRoleType
from .planner_agent import PlannerAgent
logger = logging.getLogger(__name__)
class AutoPlanChatManager(MangerAgent):
"""(In preview) A chat manager agent that can manage a team chat of multiple agents."""
NAME = "plan_manager"
def __init__(
self,
memory: GptsMemory,
agent_context: AgentContext,
# unlimited consecutive auto reply by default
max_consecutive_auto_reply: Optional[int] = sys.maxsize,
human_input_mode: Optional[str] = "NEVER",
describe: Optional[str] = "Plan chat manager.",
**kwargs,
):
super().__init__(
name=self.NAME,
describe=describe,
memory=memory,
max_consecutive_auto_reply=max_consecutive_auto_reply,
human_input_mode=human_input_mode,
agent_context=agent_context,
**kwargs,
)
# Order of register_reply is important.
# Allow async chat if initiated using a_initiate_chat
self.register_reply(Agent, AutoPlanChatManager.a_run_chat)
async def a_process_rely_message(
self, conv_id: str, now_plan: GptsPlan, speaker: ConversableAgent
):
rely_prompt = ""
speaker.reset_rely_message()
if now_plan.rely and len(now_plan.rely) > 0:
rely_tasks_list = now_plan.rely.split(",")
rely_tasks = self.memory.plans_memory.get_by_conv_id_and_num(
conv_id, rely_tasks_list
)
if rely_tasks:
rely_prompt = "Read the result data of the dependent steps in the above historical message to complete the current goal:"
for rely_task in rely_tasks:
speaker.append_rely_message(
{"content": rely_task.sub_task_content},
ModelMessageRoleType.HUMAN,
)
speaker.append_rely_message(
{"content": rely_task.result}, ModelMessageRoleType.AI
)
return rely_prompt
def select_speaker_msg(self, agents: List[Agent]):
"""Return the message for selecting the next speaker."""
return f"""You are in a role play game. The following roles are available:
{participant_roles(agents)}.
Read the following conversation.
Then select the next role from {[agent.name for agent in agents]} to play. The role can be selected repeatedly.Only return the role."""
async def a_select_speaker(
self,
last_speaker: Agent,
selector: ConversableAgent,
now_goal_context: str = None,
pre_allocated: str = None,
):
"""Select the next speaker."""
agents = self.agents
if pre_allocated:
# Preselect speakers
logger.info(f"Preselect speakers:{pre_allocated}")
name = pre_allocated
model = None
else:
# auto speaker selection
selector.update_system_message(self.select_speaker_msg(agents))
final, name, model = await selector.a_reasoning_reply(
self.messages
+ [
{
"role": ModelMessageRoleType.HUMAN,
"content": f"""Read and understand the following task content and assign the appropriate role to complete the task.
Task content: {now_goal_context}
select the role from: {[agent.name for agent in agents]},
Please only return the role, such as: {agents[0].name}""",
}
]
)
if not final:
raise ValueError("Unable to select next speaker!")
# If exactly one agent is mentioned, use it. Otherwise, leave the OAI response unmodified
mentions = mentioned_agents(name, agents)
if len(mentions) == 1:
name = next(iter(mentions))
else:
logger.warning(
f"GroupChat select_speaker failed to resolve the next speaker's name. This is because the speaker selection OAI call returned:\n{name}"
)
# Return the result
try:
return self.agent_by_name(name), model
except Exception as e:
logger.exception(f"auto select speaker failed!{str(e)}")
raise ValueError("Unable to select next speaker!")
async def a_generate_speech_process(
self,
message: Optional[str],
reviewer: Agent,
agents: Optional[List[Agent]] = None,
) -> None:
planner = PlannerAgent(
agent_context=self.agent_context,
memory=self.memory,
agents=agents,
is_terminal_agent=True,
)
await self.a_initiate_chat(
message=message, recipient=planner, reviewer=reviewer
)
async def a_run_chat(
self,
message: Optional[str] = None,
sender: Optional[Agent] = None,
reviewer: Agent = None,
config: Optional[Any] = None,
):
"""Run a team chat asynchronously."""
speaker = sender
for i in range(self.max_round):
plans = self.memory.plans_memory.get_by_conv_id(self.agent_context.conv_id)
if not plans or len(plans) <= 0:
###Have no plan, generate a new plan
await self.a_generate_speech_process(message, reviewer, self.agents)
else:
todo_plans = [
plan
for plan in plans
if plan.state in [Status.TODO.value, Status.RETRYING.value]
]
if not todo_plans or len(todo_plans) <= 0:
### The plan has been fully executed and a success message is sent to the user.
# complete
complete_message = {"content": f"TERMINATE", "is_exe_success": True}
return True, complete_message
else:
now_plan: GptsPlan = todo_plans[0]
# There is no need to broadcast the message to other agents, it will be automatically obtained from the collective memory according to the dependency relationship.
try:
if Status.RETRYING.value == now_plan.state:
if now_plan.retry_times <= now_plan.max_retry_times:
current_goal_message = {
"content": now_plan.result,
"current_gogal": now_plan.sub_task_content,
"context": {
"plan_task": now_plan.sub_task_content,
"plan_task_num": now_plan.sub_task_num,
},
}
else:
self.memory.plans_memory.update_task(
self.agent_context.conv_id,
now_plan.sub_task_num,
Status.FAILED.value,
now_plan.retry_times + 1,
speaker.name,
"",
plan_result,
)
faild_report = {
"content": f"ReTask [{now_plan.sub_task_content}] was retried more than the maximum number of times and still failed.{now_plan.result}",
"is_exe_success": False,
}
return True, faild_report
else:
current_goal_message = {
"content": now_plan.sub_task_content,
"current_gogal": now_plan.sub_task_content,
"context": {
"plan_task": now_plan.sub_task_content,
"plan_task_num": now_plan.sub_task_num,
},
}
# select the next speaker
speaker, model = await self.a_select_speaker(
speaker,
self,
now_plan.sub_task_content,
now_plan.sub_task_agent,
)
# Tell the speaker the dependent history information
rely_prompt = await self.a_process_rely_message(
conv_id=self.agent_context.conv_id,
now_plan=now_plan,
speaker=speaker,
)
current_goal_message["content"] = (
rely_prompt + current_goal_message["content"]
)
is_recovery = False
if message == current_goal_message["content"]:
is_recovery = True
await self.a_send(
message=current_goal_message,
recipient=speaker,
reviewer=reviewer,
request_reply=False,
is_recovery=is_recovery,
)
verify_pass, reply = await speaker.a_generate_reply(
current_goal_message, self, reviewer
)
plan_result = ""
if verify_pass:
if reply:
action_report = reply.get("action_report", None)
if action_report:
plan_result = action_report.get("content", "")
### The current planned Agent generation verification is successful
##Plan executed successfully
self.memory.plans_memory.complete_task(
self.agent_context.conv_id,
now_plan.sub_task_num,
plan_result,
)
await speaker.a_send(
reply, self, reviewer, request_reply=False
)
else:
plan_result = reply["content"]
self.memory.plans_memory.update_task(
self.agent_context.conv_id,
now_plan.sub_task_num,
Status.RETRYING.value,
now_plan.retry_times + 1,
speaker.name,
"",
plan_result,
)
except Exception as e:
logger.exception(
f"An exception was encountered during the execution of the current plan step.{str(e)}"
)
error_report = {
"content": f"An exception was encountered during the execution of the current plan step.{str(e)}",
"is_exe_success": False,
}
return True, error_report
return True, {
"content": f"Maximum number of dialogue rounds exceeded.{self.MAX_CONSECUTIVE_AUTO_REPLY}",
"is_exe_success": False,
}