mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-08-03 01:12:15 +00:00
671 lines
26 KiB
Python
671 lines
26 KiB
Python
import asyncio
|
||
import datetime
|
||
import json
|
||
import logging
|
||
import re
|
||
import time
|
||
import uuid
|
||
from abc import ABC
|
||
from typing import Any, Dict, List, Optional, Type
|
||
|
||
from fastapi import APIRouter, Body, Depends
|
||
from fastapi.responses import StreamingResponse
|
||
|
||
from dbgpt._private.config import Config
|
||
from dbgpt.agent import (
|
||
Agent,
|
||
AgentContext,
|
||
AgentMemory,
|
||
AutoPlanChatManager,
|
||
ConversableAgent,
|
||
DefaultAWELLayoutManager,
|
||
GptsMemory,
|
||
LLMConfig,
|
||
ResourceType,
|
||
ShortTermMemory,
|
||
UserProxyAgent,
|
||
get_agent_manager,
|
||
)
|
||
from dbgpt.agent.core.memory.gpts import GptsMessage
|
||
from dbgpt.agent.core.schema import Status
|
||
from dbgpt.agent.resource import get_resource_manager
|
||
from dbgpt.agent.util.llm.llm import LLMStrategyType
|
||
from dbgpt.app.dbgpt_server import system_app
|
||
from dbgpt.app.scene.base import ChatScene
|
||
from dbgpt.component import BaseComponent, ComponentType, SystemApp
|
||
from dbgpt.core import PromptTemplate
|
||
from dbgpt.core.awel.flow.flow_factory import FlowCategory
|
||
from dbgpt.core.interface.message import StorageConversation
|
||
from dbgpt.model.cluster import WorkerManagerFactory
|
||
from dbgpt.model.cluster.client import DefaultLLMClient
|
||
from dbgpt.serve.conversation.serve import Serve as ConversationServe
|
||
from dbgpt.serve.prompt.api.endpoints import get_service
|
||
from dbgpt.serve.prompt.service import service as PromptService
|
||
from dbgpt.util.json_utils import serialize
|
||
from dbgpt.util.tracer import TracerManager
|
||
|
||
from ...rag.retriever.knowledge_space import KnowledgeSpaceRetriever
|
||
from ..db import GptsMessagesDao
|
||
from ..db.gpts_app import GptsApp, GptsAppDao, GptsAppQuery
|
||
from ..db.gpts_conversations_db import GptsConversationsDao, GptsConversationsEntity
|
||
from ..team.base import TeamMode
|
||
from .db_gpts_memory import MetaDbGptsMessageMemory, MetaDbGptsPlansMemory
|
||
|
||
CFG = Config()
|
||
|
||
router = APIRouter()
|
||
logger = logging.getLogger(__name__)
|
||
root_tracer: TracerManager = TracerManager()
|
||
|
||
|
||
def _build_conversation(
|
||
conv_id: str,
|
||
select_param: Dict[str, Any],
|
||
model_name: str,
|
||
summary: str,
|
||
app_code: str,
|
||
conv_serve: ConversationServe,
|
||
user_name: Optional[str] = "",
|
||
sys_code: Optional[str] = "",
|
||
) -> StorageConversation:
|
||
return StorageConversation(
|
||
conv_uid=conv_id,
|
||
chat_mode=ChatScene.ChatAgent.value(),
|
||
user_name=user_name,
|
||
sys_code=sys_code,
|
||
model_name=model_name,
|
||
summary=summary,
|
||
param_type="DbGpts",
|
||
param_value=select_param,
|
||
app_code=app_code,
|
||
conv_storage=conv_serve.conv_storage,
|
||
message_storage=conv_serve.message_storage,
|
||
)
|
||
|
||
|
||
class MultiAgents(BaseComponent, ABC):
|
||
name = ComponentType.MULTI_AGENTS
|
||
|
||
def init_app(self, system_app: SystemApp):
|
||
system_app.app.include_router(router, prefix="/api", tags=["Multi-Agents"])
|
||
self.system_app = system_app
|
||
from dbgpt.serve.agent.app.controller import gpts_dao
|
||
|
||
gpts_dao.init_native_apps()
|
||
|
||
def __init__(self, system_app: SystemApp):
|
||
self.gpts_conversations = GptsConversationsDao()
|
||
self.gpts_messages_dao = GptsMessagesDao()
|
||
|
||
self.gpts_app = GptsAppDao()
|
||
self.memory = GptsMemory(
|
||
plans_memory=MetaDbGptsPlansMemory(),
|
||
message_memory=MetaDbGptsMessageMemory(),
|
||
)
|
||
self.agent_memory_map = {}
|
||
|
||
super().__init__(system_app)
|
||
self.system_app = system_app
|
||
|
||
def get_dbgpts(self, user_code: str = None, sys_code: str = None):
|
||
apps = self.gpts_app.app_list(
|
||
GptsAppQuery(user_code=user_code, sys_code=sys_code)
|
||
).app_list
|
||
return apps
|
||
|
||
def get_app(self, app_code) -> GptsApp:
|
||
"""get app"""
|
||
return self.gpts_app.app_detail(app_code)
|
||
|
||
def get_or_build_agent_memory(self, conv_id: str, dbgpts_name: str) -> AgentMemory:
|
||
from dbgpt.agent.core.memory.hybrid import HybridMemory
|
||
from dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIG
|
||
from dbgpt.rag.embedding.embedding_factory import EmbeddingFactory
|
||
|
||
memory_key = f"{dbgpts_name}_{conv_id}"
|
||
if memory_key in self.agent_memory_map:
|
||
return self.agent_memory_map[memory_key]
|
||
|
||
# embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)
|
||
# embedding_fn = embedding_factory.create(
|
||
# model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
|
||
# )
|
||
# vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"
|
||
# Just use chroma store now
|
||
# vector_store_connector = VectorStoreConnector(
|
||
# vector_store_type=CFG.VECTOR_STORE_TYPE,
|
||
# vector_store_config=VectorStoreConfig(
|
||
# name=vstore_name, embedding_fn=embedding_fn
|
||
# ),
|
||
# )
|
||
# memory = HybridMemory[AgentMemoryFragment].from_chroma(
|
||
# vstore_name=vstore_name,
|
||
# embeddings=embedding_fn,
|
||
# )
|
||
|
||
agent_memory = AgentMemory(gpts_memory=self.memory)
|
||
self.agent_memory_map[memory_key] = agent_memory
|
||
return agent_memory
|
||
|
||
async def agent_chat_v2(
|
||
self,
|
||
conv_id: str,
|
||
new_order: int,
|
||
gpts_name: str,
|
||
user_query: str,
|
||
user_code: str = None,
|
||
sys_code: str = None,
|
||
enable_verbose: bool = True,
|
||
stream: Optional[bool] = True,
|
||
**ext_info,
|
||
):
|
||
logger.info(
|
||
f"agent_chat_v2 conv_id:{conv_id},gpts_name:{gpts_name},user_query:{user_query}"
|
||
)
|
||
gpts_conversations: List[
|
||
GptsConversationsEntity
|
||
] = self.gpts_conversations.get_like_conv_id_asc(conv_id)
|
||
logger.info(
|
||
f"gpts_conversations count:{conv_id},{len(gpts_conversations) if gpts_conversations else 0}"
|
||
)
|
||
gpt_chat_order = (
|
||
"1" if not gpts_conversations else str(len(gpts_conversations) + 1)
|
||
)
|
||
agent_conv_id = conv_id + "_" + gpt_chat_order
|
||
message_round = 0
|
||
history_message_count = 0
|
||
is_retry_chat = False
|
||
last_speaker_name = None
|
||
history_messages = None
|
||
# 检查最后一个对话记录是否完成,如果是等待状态,则要继续进行当前对话
|
||
if gpts_conversations and len(gpts_conversations) > 0:
|
||
last_gpts_conversation: GptsConversationsEntity = gpts_conversations[-1]
|
||
logger.info(f"last conversation status:{last_gpts_conversation.__dict__}")
|
||
if last_gpts_conversation.state == Status.WAITING.value:
|
||
is_retry_chat = True
|
||
agent_conv_id = last_gpts_conversation.conv_id
|
||
|
||
gpts_messages: List[
|
||
GptsMessage
|
||
] = self.gpts_messages_dao.get_by_conv_id(agent_conv_id)
|
||
history_message_count = len(gpts_messages)
|
||
history_messages = gpts_messages
|
||
last_message = gpts_messages[-1]
|
||
message_round = last_message.rounds + 1
|
||
|
||
from dbgpt.serve.agent.agents.expand.app_start_assisant_agent import (
|
||
StartAppAssistantAgent,
|
||
)
|
||
|
||
if last_message.sender == StartAppAssistantAgent().role:
|
||
last_message = gpts_messages[-2]
|
||
last_speaker_name = last_message.sender
|
||
|
||
gpt_app: GptsApp = self.gpts_app.app_detail(last_message.app_code)
|
||
|
||
if not gpt_app:
|
||
raise ValueError(f"Not found app {gpts_name}!")
|
||
|
||
historical_dialogues: List[GptsMessage] = []
|
||
if not is_retry_chat:
|
||
# Create a new gpts conversation record
|
||
gpt_app: GptsApp = self.gpts_app.app_detail(gpts_name)
|
||
if not gpt_app:
|
||
raise ValueError(f"Not found app {gpts_name}!")
|
||
|
||
## When creating a new gpts conversation record, determine whether to include the history of previous topics according to the application definition.
|
||
## TODO BEGIN
|
||
# Temporarily use system configuration management, and subsequently use application configuration management
|
||
if CFG.MESSAGES_KEEP_START_ROUNDS and CFG.MESSAGES_KEEP_START_ROUNDS > 0:
|
||
gpt_app.keep_start_rounds = CFG.MESSAGES_KEEP_START_ROUNDS
|
||
if CFG.MESSAGES_KEEP_END_ROUNDS and CFG.MESSAGES_KEEP_END_ROUNDS > 0:
|
||
gpt_app.keep_end_rounds = CFG.MESSAGES_KEEP_END_ROUNDS
|
||
## TODO END
|
||
|
||
if gpt_app.keep_start_rounds > 0 or gpt_app.keep_end_rounds > 0:
|
||
if gpts_conversations and len(gpts_conversations) > 0:
|
||
rely_conversations = []
|
||
if gpt_app.keep_start_rounds + gpt_app.keep_end_rounds < len(
|
||
gpts_conversations
|
||
):
|
||
if gpt_app.keep_start_rounds > 0:
|
||
front = gpts_conversations[gpt_app.keep_start_rounds :]
|
||
rely_conversations.extend(front)
|
||
if gpt_app.keep_end_rounds > 0:
|
||
back = gpts_conversations[-gpt_app.keep_end_rounds :]
|
||
rely_conversations.extend(back)
|
||
else:
|
||
rely_conversations = gpts_conversations
|
||
for gpts_conversation in rely_conversations:
|
||
temps: List[GptsMessage] = await self.memory.get_messages(
|
||
gpts_conversation.conv_id
|
||
)
|
||
if temps and len(temps) > 1:
|
||
historical_dialogues.append(temps[0])
|
||
historical_dialogues.append(temps[-1])
|
||
|
||
self.gpts_conversations.add(
|
||
GptsConversationsEntity(
|
||
conv_id=agent_conv_id,
|
||
user_goal=user_query,
|
||
gpts_name=gpts_name,
|
||
team_mode=gpt_app.team_mode,
|
||
state=Status.RUNNING.value,
|
||
max_auto_reply_round=0,
|
||
auto_reply_count=0,
|
||
user_code=user_code,
|
||
sys_code=sys_code,
|
||
)
|
||
)
|
||
|
||
if (
|
||
TeamMode.AWEL_LAYOUT.value == gpt_app.team_mode
|
||
and gpt_app.team_context.flow_category == FlowCategory.CHAT_FLOW
|
||
):
|
||
team_context = gpt_app.team_context
|
||
from dbgpt.core.awel import CommonLLMHttpRequestBody
|
||
|
||
flow_req = CommonLLMHttpRequestBody(
|
||
model=ext_info.get("model_name", None),
|
||
messages=user_query,
|
||
stream=True,
|
||
# context=flow_ctx,
|
||
# temperature=
|
||
# max_new_tokens=
|
||
# enable_vis=
|
||
conv_uid=agent_conv_id,
|
||
span_id=root_tracer.get_current_span_id(),
|
||
chat_mode=ext_info.get("chat_mode", None),
|
||
chat_param=team_context.uid,
|
||
user_name=user_code,
|
||
sys_code=sys_code,
|
||
incremental=ext_info.get("incremental", True),
|
||
)
|
||
from dbgpt.app.openapi.api_v1.api_v1 import get_chat_flow
|
||
|
||
flow_service = get_chat_flow()
|
||
async for chunk in flow_service.chat_stream_flow_str(
|
||
team_context.uid, flow_req
|
||
):
|
||
yield None, chunk, agent_conv_id
|
||
else:
|
||
# init gpts memory
|
||
self.memory.init(
|
||
agent_conv_id,
|
||
enable_vis_message=enable_verbose,
|
||
history_messages=history_messages,
|
||
start_round=history_message_count,
|
||
)
|
||
# init agent memory
|
||
agent_memory = self.get_or_build_agent_memory(conv_id, gpts_name)
|
||
|
||
task = None
|
||
try:
|
||
task = asyncio.create_task(
|
||
multi_agents.agent_team_chat_new(
|
||
user_query,
|
||
agent_conv_id,
|
||
gpt_app,
|
||
agent_memory,
|
||
is_retry_chat,
|
||
last_speaker_name=last_speaker_name,
|
||
init_message_rounds=message_round,
|
||
enable_verbose=enable_verbose,
|
||
historical_dialogues=historical_dialogues,
|
||
**ext_info,
|
||
)
|
||
)
|
||
if enable_verbose:
|
||
async for chunk in multi_agents.chat_messages(agent_conv_id):
|
||
if chunk:
|
||
try:
|
||
chunk = json.dumps(
|
||
{"vis": chunk},
|
||
default=serialize,
|
||
ensure_ascii=False,
|
||
)
|
||
if chunk is None or len(chunk) <= 0:
|
||
continue
|
||
resp = f"data:{chunk}\n\n"
|
||
yield task, resp, agent_conv_id
|
||
except Exception as e:
|
||
logger.exception(
|
||
f"get messages {gpts_name} Exception!" + str(e)
|
||
)
|
||
yield f"data: {str(e)}\n\n"
|
||
|
||
yield task, f'data:{json.dumps({"vis": "[DONE]"}, default=serialize, ensure_ascii=False)} \n\n', agent_conv_id
|
||
|
||
else:
|
||
logger.info(f"{agent_conv_id}开启简略消息模式,不进行vis协议封装,获取极简流式消息直接输出")
|
||
# 开启简略消息模式,不进行vis协议封装,获取极简流式消息直接输出
|
||
final_message_chunk = None
|
||
async for chunk in multi_agents.chat_messages(agent_conv_id):
|
||
if chunk:
|
||
try:
|
||
if chunk is None or len(chunk) <= 0:
|
||
continue
|
||
final_message_chunk = chunk[-1]
|
||
if stream:
|
||
yield task, final_message_chunk, agent_conv_id
|
||
logger.info(
|
||
f"agent_chat_v2 executing, timestamp={int(time.time() * 1000)}"
|
||
)
|
||
except Exception as e:
|
||
logger.exception(
|
||
f"get messages {gpts_name} Exception!" + str(e)
|
||
)
|
||
final_message_chunk = str(e)
|
||
|
||
logger.info(
|
||
f"agent_chat_v2 finish, timestamp={int(time.time() * 1000)}"
|
||
)
|
||
yield task, final_message_chunk, agent_conv_id
|
||
|
||
except Exception as e:
|
||
logger.exception(f"Agent chat have error!{str(e)}")
|
||
if enable_verbose:
|
||
yield task, f'data:{json.dumps({"vis": f"{str(e)}"}, default=serialize, ensure_ascii=False)} \n\n', agent_conv_id
|
||
yield task, f'data:{json.dumps({"vis": "[DONE]"}, default=serialize, ensure_ascii=False)} \n\n', agent_conv_id
|
||
else:
|
||
yield task, str(e), agent_conv_id
|
||
|
||
finally:
|
||
self.memory.clear(agent_conv_id)
|
||
|
||
async def app_agent_chat(
|
||
self,
|
||
conv_uid: str,
|
||
gpts_name: str,
|
||
user_query: str,
|
||
user_code: str = None,
|
||
sys_code: str = None,
|
||
enable_verbose: bool = True,
|
||
stream: Optional[bool] = True,
|
||
**ext_info,
|
||
):
|
||
# logger.info(f"app_agent_chat:{gpts_name},{user_query},{conv_uid}")
|
||
|
||
# Temporary compatible scenario messages
|
||
conv_serve = ConversationServe.get_instance(CFG.SYSTEM_APP)
|
||
current_message: StorageConversation = _build_conversation(
|
||
conv_id=conv_uid,
|
||
select_param=gpts_name,
|
||
summary=user_query,
|
||
model_name="",
|
||
app_code=gpts_name,
|
||
conv_serve=conv_serve,
|
||
user_name=user_code,
|
||
)
|
||
current_message.save_to_storage()
|
||
current_message.start_new_round()
|
||
current_message.add_user_message(user_query)
|
||
agent_conv_id = None
|
||
agent_task = None
|
||
default_final_message = None
|
||
try:
|
||
async for task, chunk, agent_conv_id in multi_agents.agent_chat_v2(
|
||
conv_uid,
|
||
current_message.chat_order,
|
||
gpts_name,
|
||
user_query,
|
||
user_code,
|
||
sys_code,
|
||
enable_verbose=enable_verbose,
|
||
stream=stream,
|
||
**ext_info,
|
||
):
|
||
agent_task = task
|
||
default_final_message = chunk
|
||
yield chunk
|
||
|
||
except asyncio.CancelledError:
|
||
# Client disconnects
|
||
print("Client disconnected")
|
||
if agent_task:
|
||
logger.info(f"Chat to App {gpts_name}:{agent_conv_id} Cancel!")
|
||
agent_task.cancel()
|
||
except Exception as e:
|
||
logger.exception(f"Chat to App {gpts_name} Failed!" + str(e))
|
||
raise
|
||
finally:
|
||
logger.info(f"save agent chat info!{conv_uid}")
|
||
if agent_task:
|
||
final_message = await self.stable_message(agent_conv_id)
|
||
if final_message:
|
||
current_message.add_view_message(final_message)
|
||
else:
|
||
default_final_message = default_final_message.replace("data:", "")
|
||
current_message.add_view_message(default_final_message)
|
||
|
||
current_message.end_current_round()
|
||
current_message.save_to_storage()
|
||
|
||
async def agent_team_chat_new(
|
||
self,
|
||
user_query: str,
|
||
conv_uid: str,
|
||
gpts_app: GptsApp,
|
||
agent_memory: AgentMemory,
|
||
is_retry_chat: bool = False,
|
||
last_speaker_name: str = None,
|
||
init_message_rounds: int = 0,
|
||
link_sender: ConversableAgent = None,
|
||
app_link_start: bool = False,
|
||
enable_verbose: bool = True,
|
||
historical_dialogues: Optional[List[GptsMessage]] = None,
|
||
rely_messages: Optional[List[GptsMessage]] = None,
|
||
**ext_info,
|
||
):
|
||
gpts_status = Status.COMPLETE.value
|
||
try:
|
||
employees: List[Agent] = []
|
||
|
||
self.agent_manage = get_agent_manager()
|
||
|
||
context: AgentContext = AgentContext(
|
||
conv_id=conv_uid,
|
||
gpts_app_code=gpts_app.app_code,
|
||
gpts_app_name=gpts_app.app_name,
|
||
language=gpts_app.language,
|
||
app_link_start=app_link_start,
|
||
enable_vis_message=enable_verbose,
|
||
)
|
||
|
||
prompt_service: PromptService = get_service()
|
||
rm = get_resource_manager()
|
||
|
||
# init llm provider
|
||
### init chat param
|
||
worker_manager = CFG.SYSTEM_APP.get_component(
|
||
ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory
|
||
).create()
|
||
self.llm_provider = DefaultLLMClient(
|
||
worker_manager, auto_convert_message=True
|
||
)
|
||
|
||
for record in gpts_app.details:
|
||
cls: Type[ConversableAgent] = self.agent_manage.get_by_name(
|
||
record.agent_name
|
||
)
|
||
llm_config = LLMConfig(
|
||
llm_client=self.llm_provider,
|
||
llm_strategy=LLMStrategyType(record.llm_strategy),
|
||
strategy_context=record.llm_strategy_value,
|
||
)
|
||
prompt_template = None
|
||
if record.prompt_template:
|
||
prompt_template: PromptTemplate = prompt_service.get_template(
|
||
prompt_code=record.prompt_template
|
||
)
|
||
depend_resource = rm.build_resource(record.resources, version="v1")
|
||
agent = (
|
||
await cls()
|
||
.bind(context)
|
||
.bind(agent_memory)
|
||
.bind(llm_config)
|
||
.bind(depend_resource)
|
||
.bind(prompt_template)
|
||
.build(is_retry_chat=is_retry_chat)
|
||
)
|
||
employees.append(agent)
|
||
|
||
team_mode = TeamMode(gpts_app.team_mode)
|
||
if team_mode == TeamMode.SINGLE_AGENT:
|
||
recipient = employees[0]
|
||
else:
|
||
if TeamMode.AUTO_PLAN == team_mode:
|
||
if not gpts_app.details or len(gpts_app.details) < 0:
|
||
raise ValueError("APP exception no available agent!")
|
||
llm_config = employees[0].llm_config
|
||
manager = AutoPlanChatManager()
|
||
elif TeamMode.AWEL_LAYOUT == team_mode:
|
||
if not gpts_app.team_context:
|
||
raise ValueError(
|
||
"Your APP has not been developed yet, please bind Flow!"
|
||
)
|
||
manager = DefaultAWELLayoutManager(dag=gpts_app.team_context)
|
||
llm_config = LLMConfig(
|
||
llm_client=self.llm_provider,
|
||
llm_strategy=LLMStrategyType.Priority,
|
||
strategy_context=json.dumps(["bailing_proxyllm"]),
|
||
) # TODO
|
||
elif TeamMode.NATIVE_APP == team_mode:
|
||
raise ValueError(f"Native APP chat not supported!")
|
||
else:
|
||
raise ValueError(f"Unknown Agent Team Mode!{team_mode}")
|
||
manager = (
|
||
await manager.bind(context)
|
||
.bind(agent_memory)
|
||
.bind(llm_config)
|
||
.build()
|
||
)
|
||
manager.hire(employees)
|
||
recipient = manager
|
||
|
||
if is_retry_chat:
|
||
# retry chat
|
||
self.gpts_conversations.update(conv_uid, Status.RUNNING.value)
|
||
|
||
user_proxy = None
|
||
if link_sender:
|
||
await link_sender.initiate_chat(
|
||
recipient=recipient,
|
||
message=user_query,
|
||
is_retry_chat=is_retry_chat,
|
||
last_speaker_name=last_speaker_name,
|
||
message_rounds=init_message_rounds,
|
||
)
|
||
else:
|
||
user_proxy: UserProxyAgent = (
|
||
await UserProxyAgent().bind(context).bind(agent_memory).build()
|
||
)
|
||
await user_proxy.initiate_chat(
|
||
recipient=recipient,
|
||
message=user_query,
|
||
is_retry_chat=is_retry_chat,
|
||
last_speaker_name=last_speaker_name,
|
||
message_rounds=init_message_rounds,
|
||
historical_dialogues=user_proxy.convert_to_agent_message(
|
||
historical_dialogues
|
||
),
|
||
rely_messages=rely_messages,
|
||
**ext_info,
|
||
)
|
||
|
||
if user_proxy:
|
||
# Check if the user has received a question.
|
||
if user_proxy.have_ask_user():
|
||
gpts_status = Status.WAITING.value
|
||
if not app_link_start:
|
||
self.gpts_conversations.update(conv_uid, gpts_status)
|
||
except Exception as e:
|
||
logger.error(f"chat abnormal termination!{str(e)}", e)
|
||
self.gpts_conversations.update(conv_uid, Status.FAILED.value)
|
||
finally:
|
||
if not app_link_start:
|
||
await self.memory.complete(conv_uid)
|
||
|
||
return conv_uid
|
||
|
||
async def chat_messages(
|
||
self,
|
||
conv_id: str,
|
||
user_code: str = None,
|
||
system_app: str = None,
|
||
):
|
||
while True:
|
||
queue = self.memory.queue(conv_id)
|
||
if not queue:
|
||
break
|
||
item = await queue.get()
|
||
if item == "[DONE]":
|
||
queue.task_done()
|
||
break
|
||
else:
|
||
yield item
|
||
await asyncio.sleep(0.005)
|
||
|
||
async def stable_message(
|
||
self, conv_id: str, user_code: str = None, system_app: str = None
|
||
):
|
||
gpts_conv = self.gpts_conversations.get_by_conv_id(conv_id)
|
||
if gpts_conv:
|
||
is_complete = (
|
||
True
|
||
if gpts_conv.state
|
||
in [Status.COMPLETE.value, Status.WAITING.value, Status.FAILED.value]
|
||
else False
|
||
)
|
||
if is_complete:
|
||
return await self.memory.app_link_chat_message(conv_id)
|
||
else:
|
||
pass
|
||
# raise ValueError(
|
||
# "The conversation has not been completed yet, so we cannot directly obtain information."
|
||
# )
|
||
else:
|
||
raise Exception("No conversation record found!")
|
||
|
||
def gpts_conv_list(self, user_code: str = None, system_app: str = None):
|
||
return self.gpts_conversations.get_convs(user_code, system_app)
|
||
|
||
async def topic_terminate(
|
||
self,
|
||
conv_id: str,
|
||
):
|
||
gpts_conversations: List[
|
||
GptsConversationsEntity
|
||
] = self.gpts_conversations.get_like_conv_id_asc(conv_id)
|
||
# 检查最后一个对话记录是否完成,如果是等待状态,则要继续进行当前对话
|
||
if gpts_conversations and len(gpts_conversations) > 0:
|
||
last_gpts_conversation: GptsConversationsEntity = gpts_conversations[-1]
|
||
if last_gpts_conversation.state == Status.WAITING.value:
|
||
self.gpts_conversations.update(
|
||
last_gpts_conversation.conv_id, Status.COMPLETE.value
|
||
)
|
||
|
||
async def get_knowledge_resources(self, app_code: str, question: str):
|
||
"""Get the knowledge resources."""
|
||
context = []
|
||
app: GptsApp = self.get_app(app_code)
|
||
if app and app.details and len(app.details) > 0:
|
||
for detail in app.details:
|
||
if detail and detail.resources and len(detail.resources) > 0:
|
||
for resource in detail.resources:
|
||
if resource.type == ResourceType.Knowledge:
|
||
retriever = KnowledgeSpaceRetriever(
|
||
space_id=str(resource.value),
|
||
top_k=CFG.KNOWLEDGE_SEARCH_TOP_SIZE,
|
||
)
|
||
chunks = await retriever.aretrieve_with_scores(
|
||
question, score_threshold=0.3
|
||
)
|
||
context.extend([chunk.content for chunk in chunks])
|
||
else:
|
||
continue
|
||
return context
|
||
|
||
|
||
multi_agents = MultiAgents(system_app)
|