mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-26 21:37:40 +00:00
Co-authored-by: 途杨 <tuyang.yhj@antgroup.com> Co-authored-by: lhwan <1017484907@qq.com> Co-authored-by: hustcc <i@hust.cc>
1041 lines
40 KiB
Python
1041 lines
40 KiB
Python
"""Base agent class for conversable agents."""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from concurrent.futures import Executor, ThreadPoolExecutor
|
||
from datetime import datetime
|
||
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, final
|
||
|
||
from jinja2 import Template
|
||
|
||
from dbgpt._private.pydantic import ConfigDict, Field
|
||
from dbgpt.core import LLMClient, ModelMessageRoleType, PromptTemplate
|
||
from dbgpt.util.error_types import LLMChatError
|
||
from dbgpt.util.executor_utils import blocking_func_to_async
|
||
from dbgpt.util.tracer import SpanType, root_tracer
|
||
from dbgpt.util.utils import colored
|
||
|
||
from ..resource.base import Resource
|
||
from ..util.llm.llm import LLMConfig, LLMStrategyType
|
||
from ..util.llm.llm_client import AIWrapper
|
||
from .action.base import Action, ActionOutput
|
||
from .agent import Agent, AgentContext, AgentMessage, AgentReviewInfo
|
||
from .memory.agent_memory import AgentMemory
|
||
from .memory.gpts.base import GptsMessage
|
||
from .memory.gpts.gpts_memory import GptsMemory
|
||
from .profile.base import ProfileConfig
|
||
from .role import Role
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class ConversableAgent(Role, Agent):
|
||
"""ConversableAgent is an agent that can communicate with other agents."""
|
||
|
||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||
|
||
agent_context: Optional[AgentContext] = Field(None, description="Agent context")
|
||
actions: List[Action] = Field(default_factory=list)
|
||
resource: Optional[Resource] = Field(None, description="Resource")
|
||
llm_config: Optional[LLMConfig] = None
|
||
bind_prompt: Optional[PromptTemplate] = None
|
||
max_retry_count: int = 3
|
||
llm_client: Optional[AIWrapper] = None
|
||
# 确认当前Agent是否需要进行流式输出
|
||
stream_out: bool = True
|
||
# 确认当前Agent是否需要进行参考资源展示
|
||
show_reference: bool = False
|
||
|
||
executor: Executor = Field(
|
||
default_factory=lambda: ThreadPoolExecutor(max_workers=1),
|
||
description="Executor for running tasks",
|
||
)
|
||
|
||
def __init__(self, **kwargs):
|
||
"""Create a new agent."""
|
||
Role.__init__(self, **kwargs)
|
||
Agent.__init__(self)
|
||
|
||
def check_available(self) -> None:
|
||
"""Check if the agent is available.
|
||
|
||
Raises:
|
||
ValueError: If the agent is not available.
|
||
"""
|
||
self.identity_check()
|
||
# check run context
|
||
if self.agent_context is None:
|
||
raise ValueError(
|
||
f"{self.name}[{self.role}] Missing context in which agent is "
|
||
f"running!"
|
||
)
|
||
|
||
# action check
|
||
if self.actions and len(self.actions) > 0:
|
||
for action in self.actions:
|
||
if action.resource_need and (
|
||
not self.resource
|
||
or not self.resource.get_resource_by_type(action.resource_need)
|
||
):
|
||
raise ValueError(
|
||
f"{self.name}[{self.role}] Missing resources required for "
|
||
"runtime!"
|
||
)
|
||
else:
|
||
if not self.is_human and not self.is_team:
|
||
raise ValueError(
|
||
f"This agent {self.name}[{self.role}] is missing action modules."
|
||
)
|
||
# llm check
|
||
if not self.is_human and (
|
||
self.llm_config is None or self.llm_config.llm_client is None
|
||
):
|
||
raise ValueError(
|
||
f"{self.name}[{self.role}] Model configuration is missing or model "
|
||
"service is unavailable!"
|
||
)
|
||
|
||
@property
|
||
def not_null_agent_context(self) -> AgentContext:
|
||
"""Get the agent context.
|
||
|
||
Returns:
|
||
AgentContext: The agent context.
|
||
|
||
Raises:
|
||
ValueError: If the agent context is not initialized.
|
||
"""
|
||
if not self.agent_context:
|
||
raise ValueError("Agent context is not initialized!")
|
||
return self.agent_context
|
||
|
||
@property
|
||
def not_null_llm_config(self) -> LLMConfig:
|
||
"""Get the LLM config."""
|
||
if not self.llm_config:
|
||
raise ValueError("LLM config is not initialized!")
|
||
return self.llm_config
|
||
|
||
@property
|
||
def not_null_llm_client(self) -> LLMClient:
|
||
"""Get the LLM client."""
|
||
llm_client = self.not_null_llm_config.llm_client
|
||
if not llm_client:
|
||
raise ValueError("LLM client is not initialized!")
|
||
return llm_client
|
||
|
||
async def blocking_func_to_async(
|
||
self, func: Callable[..., Any], *args, **kwargs
|
||
) -> Any:
|
||
"""Run a potentially blocking function within an executor."""
|
||
if not asyncio.iscoroutinefunction(func):
|
||
return await blocking_func_to_async(self.executor, func, *args, **kwargs)
|
||
return await func(*args, **kwargs)
|
||
|
||
async def preload_resource(self) -> None:
|
||
"""Preload resources before agent initialization."""
|
||
if self.resource:
|
||
await self.blocking_func_to_async(self.resource.preload_resource)
|
||
|
||
async def build(self, is_retry_chat: bool = False) -> "ConversableAgent":
|
||
"""Build the agent."""
|
||
# Preload resources
|
||
await self.preload_resource()
|
||
# Check if agent is available
|
||
self.check_available()
|
||
_language = self.not_null_agent_context.language
|
||
if _language:
|
||
self.language = _language
|
||
|
||
# Initialize resource loader
|
||
for action in self.actions:
|
||
action.init_resource(self.resource)
|
||
|
||
# Initialize LLM Server
|
||
if not self.is_human:
|
||
if not self.llm_config or not self.llm_config.llm_client:
|
||
raise ValueError("LLM client is not initialized!")
|
||
self.llm_client = AIWrapper(llm_client=self.llm_config.llm_client)
|
||
self.memory.initialize(
|
||
self.name,
|
||
self.llm_config.llm_client,
|
||
importance_scorer=self.memory_importance_scorer,
|
||
insight_extractor=self.memory_insight_extractor,
|
||
)
|
||
# Clone the memory structure
|
||
self.memory = self.memory.structure_clone()
|
||
# init agent memory
|
||
if is_retry_chat:
|
||
# recover agent memory message
|
||
agent_history_memories = (
|
||
await self.memory.gpts_memory.get_agent_history_memory(
|
||
self.not_null_agent_context.conv_id, self.role
|
||
)
|
||
)
|
||
for agent_history_memory in agent_history_memories:
|
||
await self.write_memories(**agent_history_memory)
|
||
return self
|
||
|
||
def bind(self, target: Any) -> "ConversableAgent":
|
||
"""Bind the resources to the agent."""
|
||
if isinstance(target, LLMConfig):
|
||
self.llm_config = target
|
||
elif isinstance(target, GptsMemory):
|
||
raise ValueError("GptsMemory is not supported!Please Use Agent Memory")
|
||
elif isinstance(target, AgentContext):
|
||
self.agent_context = target
|
||
elif isinstance(target, Resource):
|
||
self.resource = target
|
||
elif isinstance(target, AgentMemory):
|
||
self.memory = target
|
||
elif isinstance(target, ProfileConfig):
|
||
self.profile = target
|
||
elif isinstance(target, type) and issubclass(target, Action):
|
||
self.actions.append(target())
|
||
elif isinstance(target, PromptTemplate):
|
||
self.bind_prompt = target
|
||
|
||
return self
|
||
|
||
async def send(
|
||
self,
|
||
message: AgentMessage,
|
||
recipient: Agent,
|
||
reviewer: Optional[Agent] = None,
|
||
request_reply: Optional[bool] = True,
|
||
is_recovery: Optional[bool] = False,
|
||
silent: Optional[bool] = False,
|
||
is_retry_chat: bool = False,
|
||
last_speaker_name: Optional[str] = None,
|
||
) -> None:
|
||
"""Send a message to recipient agent."""
|
||
with root_tracer.start_span(
|
||
"agent.send",
|
||
metadata={
|
||
"sender": self.name,
|
||
"recipient": recipient.name,
|
||
"reviewer": reviewer.name if reviewer else None,
|
||
"agent_message": json.dumps(message.to_dict(), ensure_ascii=False),
|
||
"request_reply": request_reply,
|
||
"is_recovery": is_recovery,
|
||
"conv_uid": self.not_null_agent_context.conv_id,
|
||
},
|
||
):
|
||
await recipient.receive(
|
||
message=message,
|
||
sender=self,
|
||
reviewer=reviewer,
|
||
request_reply=request_reply,
|
||
is_recovery=is_recovery,
|
||
silent=silent,
|
||
is_retry_chat=is_retry_chat,
|
||
last_speaker_name=last_speaker_name,
|
||
)
|
||
|
||
async def receive(
|
||
self,
|
||
message: AgentMessage,
|
||
sender: Agent,
|
||
reviewer: Optional[Agent] = None,
|
||
request_reply: Optional[bool] = None,
|
||
silent: Optional[bool] = False,
|
||
is_recovery: Optional[bool] = False,
|
||
is_retry_chat: bool = False,
|
||
last_speaker_name: Optional[str] = None,
|
||
) -> None:
|
||
"""Receive a message from another agent."""
|
||
with root_tracer.start_span(
|
||
"agent.receive",
|
||
metadata={
|
||
"sender": sender.name,
|
||
"recipient": self.name,
|
||
"reviewer": reviewer.name if reviewer else None,
|
||
"agent_message": json.dumps(message.to_dict(), ensure_ascii=False),
|
||
"request_reply": request_reply,
|
||
"silent": silent,
|
||
"is_recovery": is_recovery,
|
||
"conv_uid": self.not_null_agent_context.conv_id,
|
||
"is_human": self.is_human,
|
||
},
|
||
):
|
||
await self._a_process_received_message(message, sender)
|
||
if request_reply is False or request_reply is None:
|
||
return
|
||
|
||
if not self.is_human:
|
||
if isinstance(sender, ConversableAgent) and sender.is_human:
|
||
reply = await self.generate_reply(
|
||
received_message=message,
|
||
sender=sender,
|
||
reviewer=reviewer,
|
||
is_retry_chat=is_retry_chat,
|
||
last_speaker_name=last_speaker_name,
|
||
)
|
||
else:
|
||
reply = await self.generate_reply(
|
||
received_message=message,
|
||
sender=sender,
|
||
reviewer=reviewer,
|
||
is_retry_chat=is_retry_chat,
|
||
)
|
||
|
||
if reply is not None:
|
||
await self.send(reply, sender)
|
||
|
||
def prepare_act_param(
|
||
self,
|
||
received_message: Optional[AgentMessage],
|
||
sender: Agent,
|
||
rely_messages: Optional[List[AgentMessage]] = None,
|
||
) -> Dict[str, Any]:
|
||
"""Prepare the parameters for the act method."""
|
||
return {}
|
||
|
||
@final
|
||
async def generate_reply(
|
||
self,
|
||
received_message: AgentMessage,
|
||
sender: Agent,
|
||
reviewer: Optional[Agent] = None,
|
||
rely_messages: Optional[List[AgentMessage]] = None,
|
||
is_retry_chat: bool = False,
|
||
last_speaker_name: Optional[str] = None,
|
||
**kwargs,
|
||
) -> AgentMessage:
|
||
"""Generate a reply based on the received messages."""
|
||
logger.info(
|
||
f"generate agent reply!sender={sender}, rely_messages_len={rely_messages}"
|
||
)
|
||
root_span = root_tracer.start_span(
|
||
"agent.generate_reply",
|
||
metadata={
|
||
"sender": sender.name,
|
||
"recipient": self.name,
|
||
"reviewer": reviewer.name if reviewer else None,
|
||
"received_message": json.dumps(received_message.to_dict()),
|
||
"conv_uid": self.not_null_agent_context.conv_id,
|
||
"rely_messages": (
|
||
[msg.to_dict() for msg in rely_messages] if rely_messages else None
|
||
),
|
||
},
|
||
)
|
||
|
||
try:
|
||
with root_tracer.start_span(
|
||
"agent.generate_reply._init_reply_message",
|
||
) as span:
|
||
# initialize reply message
|
||
reply_message: AgentMessage = self._init_reply_message(
|
||
received_message=received_message
|
||
)
|
||
span.metadata["reply_message"] = reply_message.to_dict()
|
||
|
||
fail_reason = None
|
||
current_retry_counter = 0
|
||
is_success = True
|
||
while current_retry_counter < self.max_retry_count:
|
||
if current_retry_counter > 0:
|
||
retry_message = self._init_reply_message(
|
||
received_message=received_message,
|
||
rely_messages=rely_messages,
|
||
)
|
||
|
||
retry_message.rounds = reply_message.rounds + 1
|
||
|
||
retry_message.content = fail_reason
|
||
retry_message.current_goal = received_message.current_goal
|
||
|
||
# The current message is a self-optimized message that needs to be
|
||
# recorded.
|
||
# It is temporarily set to be initiated by the originating end to
|
||
# facilitate the organization of historical memory context.
|
||
await sender.send(
|
||
retry_message, self, reviewer, request_reply=False
|
||
)
|
||
reply_message.rounds = retry_message.rounds + 1
|
||
|
||
# In manual retry mode, load all messages of the last speaker as dependent messages # noqa
|
||
logger.info(
|
||
f"Depends on the number of historical messages:{len(rely_messages) if rely_messages else 0}!" # noqa
|
||
)
|
||
thinking_messages, resource_info = await self._load_thinking_messages(
|
||
received_message,
|
||
sender,
|
||
rely_messages,
|
||
context=reply_message.get_dict_context(),
|
||
is_retry_chat=is_retry_chat,
|
||
)
|
||
with root_tracer.start_span(
|
||
"agent.generate_reply.thinking",
|
||
metadata={
|
||
"thinking_messages": json.dumps(
|
||
[msg.to_dict() for msg in thinking_messages],
|
||
ensure_ascii=False,
|
||
)
|
||
},
|
||
) as span:
|
||
# 1.Think about how to do things
|
||
llm_reply, model_name = await self.thinking(
|
||
thinking_messages, sender
|
||
)
|
||
reply_message.model_name = model_name
|
||
reply_message.content = llm_reply
|
||
reply_message.resource_info = resource_info
|
||
span.metadata["llm_reply"] = llm_reply
|
||
span.metadata["model_name"] = model_name
|
||
|
||
with root_tracer.start_span(
|
||
"agent.generate_reply.review",
|
||
metadata={"llm_reply": llm_reply, "censored": self.name},
|
||
) as span:
|
||
# 2.Review whether what is being done is legal
|
||
approve, comments = await self.review(llm_reply, self)
|
||
reply_message.review_info = AgentReviewInfo(
|
||
approve=approve,
|
||
comments=comments,
|
||
)
|
||
span.metadata["approve"] = approve
|
||
span.metadata["comments"] = comments
|
||
|
||
act_extent_param = self.prepare_act_param(
|
||
received_message, sender, rely_messages
|
||
)
|
||
with root_tracer.start_span(
|
||
"agent.generate_reply.act",
|
||
metadata={
|
||
"llm_reply": llm_reply,
|
||
"sender": sender.name,
|
||
"reviewer": reviewer.name if reviewer else None,
|
||
"act_extent_param": act_extent_param,
|
||
},
|
||
) as span:
|
||
# 3.Act based on the results of your thinking
|
||
act_out: ActionOutput = await self.act(
|
||
message=reply_message,
|
||
sender=sender,
|
||
reviewer=reviewer,
|
||
is_retry_chat=is_retry_chat,
|
||
last_speaker_name=last_speaker_name,
|
||
**act_extent_param,
|
||
)
|
||
if act_out:
|
||
reply_message.action_report = act_out
|
||
span.metadata["action_report"] = (
|
||
act_out.to_dict() if act_out else None
|
||
)
|
||
|
||
with root_tracer.start_span(
|
||
"agent.generate_reply.verify",
|
||
metadata={
|
||
"llm_reply": llm_reply,
|
||
"sender": sender.name,
|
||
"reviewer": reviewer.name if reviewer else None,
|
||
},
|
||
) as span:
|
||
# 4.Reply information verification
|
||
check_pass, reason = await self.verify(
|
||
reply_message, sender, reviewer
|
||
)
|
||
is_success = check_pass
|
||
span.metadata["check_pass"] = check_pass
|
||
span.metadata["reason"] = reason
|
||
|
||
question: str = received_message.content or ""
|
||
ai_message: str = llm_reply or ""
|
||
# 5.Optimize wrong answers myself
|
||
if not check_pass:
|
||
if not act_out.have_retry:
|
||
break
|
||
current_retry_counter += 1
|
||
# Send error messages and issue new problem-solving instructions
|
||
if current_retry_counter < self.max_retry_count:
|
||
await self.send(
|
||
reply_message, sender, reviewer, request_reply=False
|
||
)
|
||
fail_reason = reason
|
||
await self.write_memories(
|
||
question=question,
|
||
ai_message=ai_message,
|
||
action_output=act_out,
|
||
check_pass=check_pass,
|
||
check_fail_reason=fail_reason,
|
||
)
|
||
else:
|
||
await self.write_memories(
|
||
question=question,
|
||
ai_message=ai_message,
|
||
action_output=act_out,
|
||
check_pass=check_pass,
|
||
)
|
||
break
|
||
reply_message.success = is_success
|
||
# 6.final message adjustment
|
||
await self.adjust_final_message(is_success, reply_message)
|
||
return reply_message
|
||
|
||
except Exception as e:
|
||
logger.exception("Generate reply exception!")
|
||
err_message = AgentMessage(content=str(e))
|
||
err_message.success = False
|
||
return err_message
|
||
finally:
|
||
root_span.metadata["reply_message"] = reply_message.to_dict()
|
||
root_span.end()
|
||
|
||
async def thinking(
|
||
self,
|
||
messages: List[AgentMessage],
|
||
sender: Optional[Agent] = None,
|
||
prompt: Optional[str] = None,
|
||
) -> Tuple[Optional[str], Optional[str]]:
|
||
"""Think and reason about the current task goal.
|
||
|
||
Args:
|
||
messages(List[AgentMessage]): the messages to be reasoned
|
||
prompt(str): the prompt to be reasoned
|
||
"""
|
||
last_model = None
|
||
last_err = None
|
||
retry_count = 0
|
||
llm_messages = [message.to_llm_message() for message in messages]
|
||
# LLM inference automatically retries 3 times to reduce interruption
|
||
# probability caused by speed limit and network stability
|
||
while retry_count < 3:
|
||
llm_model = await self._a_select_llm_model(last_model)
|
||
try:
|
||
if prompt:
|
||
llm_messages = _new_system_message(prompt) + llm_messages
|
||
|
||
if not self.llm_client:
|
||
raise ValueError("LLM client is not initialized!")
|
||
response = await self.llm_client.create(
|
||
context=llm_messages[-1].pop("context", None),
|
||
messages=llm_messages,
|
||
llm_model=llm_model,
|
||
max_new_tokens=self.not_null_agent_context.max_new_tokens,
|
||
temperature=self.not_null_agent_context.temperature,
|
||
verbose=self.not_null_agent_context.verbose,
|
||
memory=self.memory.gpts_memory,
|
||
conv_id=self.not_null_agent_context.conv_id,
|
||
sender=sender.role if sender else "?",
|
||
stream_out=self.stream_out,
|
||
)
|
||
return response, llm_model
|
||
except LLMChatError as e:
|
||
logger.error(f"model:{llm_model} generate Failed!{str(e)}")
|
||
retry_count += 1
|
||
last_model = llm_model
|
||
last_err = str(e)
|
||
await asyncio.sleep(10)
|
||
|
||
if last_err:
|
||
raise ValueError(last_err)
|
||
else:
|
||
raise ValueError("LLM model inference failed!")
|
||
|
||
async def review(self, message: Optional[str], censored: Agent) -> Tuple[bool, Any]:
|
||
"""Review the message based on the censored message."""
|
||
return True, None
|
||
|
||
async def act(
|
||
self,
|
||
message: AgentMessage,
|
||
sender: Agent,
|
||
reviewer: Optional[Agent] = None,
|
||
is_retry_chat: bool = False,
|
||
last_speaker_name: Optional[str] = None,
|
||
**kwargs,
|
||
) -> ActionOutput:
|
||
"""Perform actions."""
|
||
last_out: Optional[ActionOutput] = None
|
||
for i, action in enumerate(self.actions):
|
||
if not message:
|
||
raise ValueError("The message content is empty!")
|
||
|
||
with root_tracer.start_span(
|
||
"agent.act.run",
|
||
metadata={
|
||
"message": message,
|
||
"sender": sender.name if sender else None,
|
||
"recipient": self.name,
|
||
"reviewer": reviewer.name if reviewer else None,
|
||
"rely_action_out": last_out.to_dict() if last_out else None,
|
||
"conv_uid": self.not_null_agent_context.conv_id,
|
||
"action_index": i,
|
||
"total_action": len(self.actions),
|
||
},
|
||
) as span:
|
||
last_out = await action.run(
|
||
ai_message=message.content if message.content else "",
|
||
resource=None,
|
||
rely_action_out=last_out,
|
||
**kwargs,
|
||
)
|
||
span.metadata["action_out"] = last_out.to_dict() if last_out else None
|
||
if not last_out:
|
||
raise ValueError("Action should return value!")
|
||
return last_out
|
||
|
||
async def correctness_check(
|
||
self, message: AgentMessage
|
||
) -> Tuple[bool, Optional[str]]:
|
||
"""Verify the correctness of the results."""
|
||
return True, None
|
||
|
||
async def verify(
|
||
self,
|
||
message: AgentMessage,
|
||
sender: Agent,
|
||
reviewer: Optional[Agent] = None,
|
||
**kwargs,
|
||
) -> Tuple[bool, Optional[str]]:
|
||
"""Verify the current execution results."""
|
||
# Check approval results
|
||
if message.review_info and not message.review_info.approve:
|
||
return False, message.review_info.comments
|
||
|
||
# Check action run results
|
||
action_output: Optional[ActionOutput] = message.action_report
|
||
if action_output:
|
||
if not action_output.is_exe_success:
|
||
return False, action_output.content
|
||
elif not action_output.content or len(action_output.content.strip()) < 1:
|
||
return (
|
||
False,
|
||
"The current execution result is empty. Please rethink the "
|
||
"question and background and generate a new answer.. ",
|
||
)
|
||
|
||
# agent output correctness check
|
||
return await self.correctness_check(message)
|
||
|
||
async def initiate_chat(
|
||
self,
|
||
recipient: Agent,
|
||
reviewer: Optional[Agent] = None,
|
||
message: Optional[str] = None,
|
||
request_reply: bool = True,
|
||
is_retry_chat: bool = False,
|
||
last_speaker_name: Optional[str] = None,
|
||
message_rounds: int = 0,
|
||
**context,
|
||
):
|
||
"""Initiate a chat with another agent.
|
||
|
||
Args:
|
||
recipient (Agent): The recipient agent.
|
||
reviewer (Agent): The reviewer agent.
|
||
message (str): The message to send.
|
||
"""
|
||
agent_message = AgentMessage(
|
||
content=message,
|
||
current_goal=message,
|
||
rounds=message_rounds,
|
||
context=context,
|
||
)
|
||
with root_tracer.start_span(
|
||
"agent.initiate_chat",
|
||
span_type=SpanType.AGENT,
|
||
metadata={
|
||
"sender": self.name,
|
||
"recipient": recipient.name,
|
||
"reviewer": reviewer.name if reviewer else None,
|
||
"agent_message": json.dumps(
|
||
agent_message.to_dict(), ensure_ascii=False
|
||
),
|
||
"conv_uid": self.not_null_agent_context.conv_id,
|
||
},
|
||
):
|
||
await self.send(
|
||
agent_message,
|
||
recipient,
|
||
reviewer,
|
||
request_reply=request_reply,
|
||
is_retry_chat=is_retry_chat,
|
||
last_speaker_name=last_speaker_name,
|
||
)
|
||
|
||
async def adjust_final_message(
|
||
self,
|
||
is_success: bool,
|
||
reply_message: AgentMessage,
|
||
):
|
||
"""Adjust final message after agent reply."""
|
||
return is_success, reply_message
|
||
|
||
#######################################################################
|
||
# Private Function Begin
|
||
#######################################################################
|
||
|
||
def _init_actions(self, actions: List[Type[Action]]):
|
||
self.actions = []
|
||
for idx, action in enumerate(actions):
|
||
if issubclass(action, Action):
|
||
self.actions.append(action())
|
||
|
||
async def _a_append_message(
|
||
self, message: AgentMessage, role, sender: Agent
|
||
) -> bool:
|
||
gpts_message: GptsMessage = GptsMessage(
|
||
conv_id=self.not_null_agent_context.conv_id,
|
||
sender=sender.role,
|
||
receiver=self.role,
|
||
role=role,
|
||
rounds=message.rounds,
|
||
is_success=message.success,
|
||
app_code=sender.not_null_agent_context.gpts_app_code
|
||
if isinstance(sender, ConversableAgent)
|
||
else None,
|
||
app_name=sender.not_null_agent_context.gpts_app_name
|
||
if isinstance(sender, ConversableAgent)
|
||
else None,
|
||
current_goal=message.current_goal,
|
||
content=message.content if message.content else "",
|
||
context=(
|
||
json.dumps(message.context, ensure_ascii=False)
|
||
if message.context
|
||
else None
|
||
),
|
||
review_info=(
|
||
json.dumps(message.review_info.to_dict(), ensure_ascii=False)
|
||
if message.review_info
|
||
else None
|
||
),
|
||
action_report=(
|
||
json.dumps(message.action_report.to_dict(), ensure_ascii=False)
|
||
if message.action_report
|
||
else None
|
||
),
|
||
model_name=message.model_name,
|
||
resource_info=(
|
||
json.dumps(message.resource_info) if message.resource_info else None
|
||
),
|
||
)
|
||
|
||
with root_tracer.start_span(
|
||
"agent.save_message_to_memory",
|
||
metadata={
|
||
"gpts_message": gpts_message.to_dict(),
|
||
"conv_uid": self.not_null_agent_context.conv_id,
|
||
},
|
||
):
|
||
await self.memory.gpts_memory.append_message(
|
||
self.not_null_agent_context.conv_id, gpts_message
|
||
)
|
||
return True
|
||
|
||
def _print_received_message(self, message: AgentMessage, sender: Agent):
|
||
# print the message received
|
||
print("\n", "-" * 80, flush=True, sep="")
|
||
_print_name = self.name if self.name else self.role
|
||
print(
|
||
colored(
|
||
sender.name if sender.name else sender.role,
|
||
"yellow",
|
||
),
|
||
"(to",
|
||
f"{_print_name})-[{message.model_name or ''}]:\n",
|
||
flush=True,
|
||
)
|
||
|
||
content = json.dumps(message.content, ensure_ascii=False)
|
||
if content is not None:
|
||
print(content, flush=True)
|
||
|
||
review_info = message.review_info
|
||
if review_info:
|
||
name = sender.name if sender.name else sender.role
|
||
pass_msg = "Pass" if review_info.approve else "Reject"
|
||
review_msg = f"{pass_msg}({review_info.comments})"
|
||
approve_print = f">>>>>>>>{name} Review info: \n{review_msg}"
|
||
print(colored(approve_print, "green"), flush=True)
|
||
|
||
action_report = message.action_report
|
||
if action_report:
|
||
name = sender.name if sender.name else sender.role
|
||
action_msg = (
|
||
"execution succeeded"
|
||
if action_report.is_exe_success
|
||
else "execution failed"
|
||
)
|
||
action_report_msg = f"{action_msg},\n{action_report.content}"
|
||
action_print = f">>>>>>>>{name} Action report: \n{action_report_msg}"
|
||
print(colored(action_print, "blue"), flush=True)
|
||
|
||
print("\n", "-" * 80, flush=True, sep="")
|
||
|
||
async def _a_process_received_message(self, message: AgentMessage, sender: Agent):
|
||
valid = await self._a_append_message(message, None, sender)
|
||
if not valid:
|
||
raise ValueError(
|
||
"Received message can't be converted into a valid ChatCompletion"
|
||
" message. Either content or function_call must be provided."
|
||
)
|
||
|
||
self._print_received_message(message, sender)
|
||
|
||
async def load_resource(self, question: str, is_retry_chat: bool = False):
|
||
"""Load agent bind resource."""
|
||
if self.resource:
|
||
resource_prompt, resource_reference = await self.resource.get_prompt(
|
||
lang=self.language, question=question
|
||
)
|
||
return resource_prompt, resource_reference
|
||
return None, None
|
||
|
||
async def generate_resource_variables(
|
||
self, resource_prompt: Optional[str] = None
|
||
) -> Dict[str, Any]:
|
||
"""Generate the resource variables."""
|
||
out_schema: Optional[str] = ""
|
||
if self.actions and len(self.actions) > 0:
|
||
out_schema = self.actions[0].ai_out_schema
|
||
if not resource_prompt:
|
||
resource_prompt = ""
|
||
now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
return {
|
||
"resource_prompt": resource_prompt,
|
||
"out_schema": out_schema,
|
||
"now_time": now_time,
|
||
}
|
||
|
||
def _excluded_models(
|
||
self,
|
||
all_models: List[str],
|
||
order_llms: Optional[List[str]] = None,
|
||
excluded_models: Optional[List[str]] = None,
|
||
):
|
||
if not order_llms:
|
||
order_llms = []
|
||
if not excluded_models:
|
||
excluded_models = []
|
||
can_uses = []
|
||
if order_llms and len(order_llms) > 0:
|
||
for llm_name in order_llms:
|
||
if llm_name in all_models and (
|
||
not excluded_models or llm_name not in excluded_models
|
||
):
|
||
can_uses.append(llm_name)
|
||
else:
|
||
for llm_name in all_models:
|
||
if not excluded_models or llm_name not in excluded_models:
|
||
can_uses.append(llm_name)
|
||
|
||
return can_uses
|
||
|
||
async def _a_select_llm_model(
|
||
self, excluded_models: Optional[List[str]] = None
|
||
) -> str:
|
||
logger.info(f"_a_select_llm_model:{excluded_models}")
|
||
try:
|
||
all_models = await self.not_null_llm_client.models()
|
||
all_model_names = [item.model for item in all_models]
|
||
# TODO Currently only two strategies, priority and default, are implemented.
|
||
if self.not_null_llm_config.llm_strategy == LLMStrategyType.Priority:
|
||
priority: List[str] = []
|
||
strategy_context = self.not_null_llm_config.strategy_context
|
||
if strategy_context is not None:
|
||
priority = json.loads(strategy_context) # type: ignore
|
||
can_uses = self._excluded_models(
|
||
all_model_names, priority, excluded_models
|
||
)
|
||
else:
|
||
can_uses = self._excluded_models(all_model_names, None, excluded_models)
|
||
if can_uses and len(can_uses) > 0:
|
||
return can_uses[0]
|
||
else:
|
||
raise ValueError("No model service available!")
|
||
except Exception as e:
|
||
logger.error(f"{self.role} get next llm failed!{str(e)}")
|
||
raise ValueError(f"Failed to allocate model service,{str(e)}!")
|
||
|
||
def _init_reply_message(
|
||
self,
|
||
received_message: AgentMessage,
|
||
rely_messages: Optional[List[AgentMessage]] = None,
|
||
) -> AgentMessage:
|
||
"""Create a new message from the received message.
|
||
|
||
Initialize a new message from the received message
|
||
|
||
Args:
|
||
received_message(AgentMessage): The received message
|
||
|
||
Returns:
|
||
AgentMessage: A new message
|
||
"""
|
||
return AgentMessage(
|
||
content=received_message.content,
|
||
current_goal=received_message.current_goal,
|
||
context=received_message.context,
|
||
rounds=received_message.rounds + 1,
|
||
)
|
||
|
||
def _convert_to_ai_message(
|
||
self,
|
||
gpts_messages: List[GptsMessage],
|
||
is_rery_chat: bool = False,
|
||
) -> List[AgentMessage]:
|
||
oai_messages: List[AgentMessage] = []
|
||
# Based on the current agent, all messages received are user, and all messages
|
||
# sent are assistant.
|
||
for item in gpts_messages:
|
||
if item.role:
|
||
role = item.role
|
||
else:
|
||
if item.receiver == self.role:
|
||
role = ModelMessageRoleType.HUMAN
|
||
elif item.sender == self.role:
|
||
role = ModelMessageRoleType.AI
|
||
else:
|
||
continue
|
||
|
||
# Message conversion, priority is given to converting execution results,
|
||
# and only model output results will be used if not.
|
||
content = item.content
|
||
if item.action_report:
|
||
action_out = ActionOutput.from_dict(json.loads(item.action_report))
|
||
if is_rery_chat:
|
||
if action_out is not None and action_out.content:
|
||
content = action_out.content
|
||
else:
|
||
if (
|
||
action_out is not None
|
||
and action_out.is_exe_success
|
||
and action_out.content is not None
|
||
):
|
||
content = action_out.content
|
||
oai_messages.append(
|
||
AgentMessage(
|
||
content=content,
|
||
role=role,
|
||
context=(
|
||
json.loads(item.context) if item.context is not None else None
|
||
),
|
||
)
|
||
)
|
||
return oai_messages
|
||
|
||
async def build_system_prompt(
|
||
self,
|
||
question: Optional[str] = None,
|
||
most_recent_memories: Optional[str] = None,
|
||
resource_vars: Optional[Dict] = None,
|
||
context: Optional[Dict[str, Any]] = None,
|
||
is_retry_chat: bool = False,
|
||
):
|
||
"""Build system prompt."""
|
||
system_prompt = None
|
||
if self.bind_prompt:
|
||
prompt_param = {}
|
||
if resource_vars:
|
||
prompt_param.update(resource_vars)
|
||
if context:
|
||
prompt_param.update(context)
|
||
if self.bind_prompt.template_format == "f-string":
|
||
system_prompt = self.bind_prompt.template.format(
|
||
**prompt_param,
|
||
)
|
||
elif self.bind_prompt.template_format == "jinja2":
|
||
system_prompt = Template(self.bind_prompt.template).render(prompt_param)
|
||
else:
|
||
logger.warning("Bind prompt template not exsit or format not support!")
|
||
if not system_prompt:
|
||
param: Dict = context if context else {}
|
||
system_prompt = await self.build_prompt(
|
||
question=question,
|
||
is_system=True,
|
||
most_recent_memories=most_recent_memories,
|
||
resource_vars=resource_vars,
|
||
is_retry_chat=is_retry_chat,
|
||
**param,
|
||
)
|
||
return system_prompt
|
||
|
||
async def _load_thinking_messages(
|
||
self,
|
||
received_message: AgentMessage,
|
||
sender: Agent,
|
||
rely_messages: Optional[List[AgentMessage]] = None,
|
||
context: Optional[Dict[str, Any]] = None,
|
||
is_retry_chat: bool = False,
|
||
) -> Tuple[List[AgentMessage], Optional[Dict]]:
|
||
observation = received_message.content
|
||
if not observation:
|
||
raise ValueError("The received message content is empty!")
|
||
memories = await self.read_memories(observation)
|
||
reply_message_str = ""
|
||
if context is None:
|
||
context = {}
|
||
if rely_messages:
|
||
copied_rely_messages = [m.copy() for m in rely_messages]
|
||
# When directly relying on historical messages, use the execution result
|
||
# content as a dependency
|
||
for message in copied_rely_messages:
|
||
action_report: Optional[ActionOutput] = message.action_report
|
||
if action_report:
|
||
# TODO: Modify in-place, need to be optimized
|
||
message.content = action_report.content
|
||
if message.name != self.role:
|
||
# TODO, use name
|
||
# Rely messages are not from the current agent
|
||
if message.role == ModelMessageRoleType.HUMAN:
|
||
reply_message_str += f"Question: {message.content}\n"
|
||
elif message.role == ModelMessageRoleType.AI:
|
||
reply_message_str += f"Observation: {message.content}\n"
|
||
if reply_message_str:
|
||
memories += "\n" + reply_message_str
|
||
try:
|
||
resource_prompt_str, resource_references = await self.load_resource(
|
||
observation, is_retry_chat=is_retry_chat
|
||
)
|
||
except Exception as e:
|
||
logger.exception(f"Load resource error!{str(e)}")
|
||
raise ValueError(f"Load resource error!{str(e)}")
|
||
|
||
resource_vars = await self.generate_resource_variables(resource_prompt_str)
|
||
|
||
system_prompt = await self.build_system_prompt(
|
||
question=observation,
|
||
most_recent_memories=memories,
|
||
resource_vars=resource_vars,
|
||
context=context,
|
||
is_retry_chat=is_retry_chat,
|
||
)
|
||
user_prompt = await self.build_prompt(
|
||
question=observation,
|
||
is_system=False,
|
||
most_recent_memories=memories,
|
||
resource_vars=resource_vars,
|
||
**context,
|
||
)
|
||
|
||
agent_messages = []
|
||
if system_prompt:
|
||
agent_messages.append(
|
||
AgentMessage(
|
||
content=system_prompt,
|
||
role=ModelMessageRoleType.SYSTEM,
|
||
)
|
||
)
|
||
if user_prompt:
|
||
agent_messages.append(
|
||
AgentMessage(
|
||
content=user_prompt,
|
||
role=ModelMessageRoleType.HUMAN,
|
||
)
|
||
)
|
||
|
||
return agent_messages, resource_references
|
||
|
||
|
||
def _new_system_message(content):
|
||
"""Return the system message."""
|
||
return [{"content": content, "role": ModelMessageRoleType.SYSTEM}]
|
||
|
||
|
||
def _is_list_of_type(lst: List[Any], type_cls: type) -> bool:
|
||
return all(isinstance(item, type_cls) for item in lst)
|