Fix:#2859 (#2944)

This commit is contained in:
坐山客
2026-01-05 20:18:47 +08:00
committed by GitHub
parent 7751b07a6f
commit a5c7296ea0
5 changed files with 282 additions and 174 deletions

View File

@@ -535,6 +535,7 @@ async def chat_completions(
user_query=dialogue.user_input,
user_code=dialogue.user_name,
sys_code=dialogue.sys_code,
app_code=dialogue.app_code,
**dialogue.ext_info,
),
headers=headers,
@@ -551,6 +552,7 @@ async def chat_completions(
chat_param=dialogue.select_param,
user_name=dialogue.user_name,
sys_code=dialogue.sys_code,
app_code=dialogue.app_code,
incremental=dialogue.incremental,
)
return StreamingResponse(

View File

@@ -215,6 +215,7 @@ class BaseHOLLMOperator(
chat_mode=req.chat_mode,
user_name=req.user_name,
sys_code=req.sys_code,
app_code=req.app_code,
conv_storage=self.storage,
message_storage=self.message_storage,
param_type="",

View File

@@ -255,6 +255,7 @@ class CommonLLMHttpRequestBody(BaseHttpBody):
conv_uid: Optional[str] = Field(
default=None, description="The conversation id of the model inference"
)
app_code: Optional[str] = Field(default=None, description="The app id of the app")
span_id: Optional[str] = Field(
default=None, description="The span id of the model inference"
)

View File

@@ -277,135 +277,186 @@ class MultiAgents(BaseComponent, ABC):
)
)
if (
TeamMode.AWEL_LAYOUT.value == gpt_app.team_mode
and gpt_app.team_context.flow_category == FlowCategory.CHAT_FLOW
):
team_context = gpt_app.team_context
from dbgpt.core.awel import CommonLLMHttpRequestBody
# if (
# TeamMode.AWEL_LAYOUT.value == gpt_app.team_mode
# and gpt_app.team_context.flow_category == FlowCategory.CHAT_FLOW
# ):
# team_context = gpt_app.team_context
# from dbgpt.core.awel import CommonLLMHttpRequestBody
flow_req = CommonLLMHttpRequestBody(
model=ext_info.get("model_name", None),
messages=user_query,
stream=True,
# context=flow_ctx,
# temperature=
# max_new_tokens=
# enable_vis=
conv_uid=agent_conv_id,
span_id=root_tracer.get_current_span_id(),
chat_mode=ext_info.get("chat_mode", None),
chat_param=team_context.uid,
user_name=user_code,
sys_code=sys_code,
incremental=ext_info.get("incremental", True),
)
from dbgpt_app.openapi.api_v1.api_v1 import get_chat_flow
# flow_req = CommonLLMHttpRequestBody(
# model=ext_info.get("model_name", None),
# messages=user_query,
# stream=True,
# # context=flow_ctx,
# # temperature=
# # max_new_tokens=
# # enable_vis=
# conv_uid=agent_conv_id,
# app_code=gpts_name,
# span_id=root_tracer.get_current_span_id(),
# chat_mode=ext_info.get("chat_mode", None),
# chat_param=team_context.uid,
# user_name=user_code,
# sys_code=sys_code,
# incremental=ext_info.get("incremental", True),
# )
# from dbgpt_app.openapi.api_v1.api_v1 import get_chat_flow
flow_service = get_chat_flow()
async for chunk in flow_service.chat_stream_flow_str(
team_context.uid, flow_req
):
yield None, chunk, agent_conv_id
else:
# init gpts memory
self.memory.init(
agent_conv_id,
enable_vis_message=enable_verbose,
history_messages=history_messages,
start_round=history_message_count,
)
# init agent memory
agent_memory = self.get_or_build_agent_memory(conv_id, gpts_name)
# flow_service = get_chat_flow()
# async for chunk in flow_service.chat_stream_flow_str(
# team_context.uid, flow_req
# ):
# yield None, chunk, agent_conv_id
# else:
# init gpts memory
self.memory.init(
agent_conv_id,
enable_vis_message=enable_verbose,
history_messages=history_messages,
start_round=history_message_count,
)
# init agent memory
agent_memory = self.get_or_build_agent_memory(conv_id, gpts_name)
task = None
try:
task = asyncio.create_task(
multi_agents.agent_team_chat_new(
user_query,
agent_conv_id,
gpt_app,
agent_memory,
is_retry_chat,
last_speaker_name=last_speaker_name,
init_message_rounds=message_round,
enable_verbose=enable_verbose,
historical_dialogues=historical_dialogues,
**ext_info,
)
task = None
try:
task = asyncio.create_task(
multi_agents.agent_team_chat_new(
user_query,
agent_conv_id,
gpt_app,
agent_memory,
is_retry_chat,
last_speaker_name=last_speaker_name,
init_message_rounds=message_round,
enable_verbose=enable_verbose,
historical_dialogues=historical_dialogues,
**ext_info,
)
if enable_verbose:
async for chunk in multi_agents.chat_messages(agent_conv_id):
if chunk:
try:
chunk = json.dumps(
{"vis": chunk},
default=serialize,
ensure_ascii=False,
)
if chunk is None or len(chunk) <= 0:
continue
resp = f"data:{chunk}\n\n"
yield task, resp, agent_conv_id
except Exception as e:
logger.exception(
f"get messages {gpts_name} Exception!" + str(e)
)
yield f"data: {str(e)}\n\n"
)
if enable_verbose:
async for chunk in multi_agents.chat_messages(agent_conv_id):
if chunk:
try:
chunk = json.dumps(
{"vis": chunk},
default=serialize,
ensure_ascii=False,
)
if chunk is None or len(chunk) <= 0:
continue
resp = f"data:{chunk}\n\n"
yield task, resp, agent_conv_id
except Exception as e:
logger.exception(
f"get messages {gpts_name} Exception!" + str(e)
)
yield f"data: {str(e)}\n\n"
yield (
task,
_format_vis_msg("[DONE]"),
agent_conv_id,
)
yield (
task,
_format_vis_msg("[DONE]"),
agent_conv_id,
)
else:
logger.info(
f"{agent_conv_id}开启简略消息模式不进行vis协议封装获取极简流式消息直接输出"
)
# 开启简略消息模式不进行vis协议封装获取极简流式消息直接输出
final_message_chunk = None
async for chunk in multi_agents.chat_messages(agent_conv_id):
if chunk:
try:
if chunk is None or len(chunk) <= 0:
continue
final_message_chunk = chunk[-1]
if stream:
yield task, final_message_chunk, agent_conv_id
logger.info(
"agent_chat_v2 executing, timestamp="
f"{int(time.time() * 1000)}"
)
except Exception as e:
logger.exception(
f"get messages {gpts_name} Exception!" + str(e)
)
final_message_chunk = str(e)
else:
logger.info(
f"{agent_conv_id}开启简略消息模式不进行vis协议封装获取极简流式消息直接输出"
)
# 开启简略消息模式不进行vis协议封装获取极简流式消息直接输出
final_message_chunk = None
async for chunk in multi_agents.chat_messages(agent_conv_id):
if chunk:
try:
if chunk is None or len(chunk) <= 0:
continue
final_message_chunk = chunk[-1]
if stream:
yield task, final_message_chunk, agent_conv_id
logger.info(
"agent_chat_v2 executing, timestamp="
f"{int(time.time() * 1000)}"
)
except Exception as e:
logger.exception(
f"get messages {gpts_name} Exception!" + str(e)
)
final_message_chunk = str(e)
logger.info(
f"agent_chat_v2 finish, timestamp={int(time.time() * 1000)}"
)
yield task, final_message_chunk, agent_conv_id
logger.info(
f"agent_chat_v2 finish, timestamp={int(time.time() * 1000)}"
)
yield task, final_message_chunk, agent_conv_id
except Exception as e:
logger.exception(f"Agent chat have error!{str(e)}")
if enable_verbose:
yield (
task,
_format_vis_msg("[DONE]"),
agent_conv_id,
)
yield (
task,
_format_vis_msg("[DONE]"),
agent_conv_id,
)
else:
yield task, str(e), agent_conv_id
except Exception as e:
logger.exception(f"Agent chat have error!{str(e)}")
if enable_verbose:
yield (
task,
_format_vis_msg("[DONE]"),
agent_conv_id,
)
yield (
task,
_format_vis_msg("[DONE]"),
agent_conv_id,
)
else:
yield task, str(e), agent_conv_id
finally:
self.memory.clear(agent_conv_id)
finally:
self.memory.clear(agent_conv_id)
def is_flow_chat(self, gpts_name: str):
gpt_app: GptsApp = self.gpts_app.app_detail(gpts_name)
if gpt_app:
if (
TeamMode.AWEL_LAYOUT.value == gpt_app.team_mode
and gpt_app.team_context.flow_category == FlowCategory.CHAT_FLOW
):
return True
return False
async def app_agent_flow_chat(
self,
conv_uid: str,
gpts_name: str,
user_query: str,
user_code: str = None,
sys_code: str = None,
enable_verbose: bool = True,
stream: Optional[bool] = True,
**ext_info,
):
gpt_app: GptsApp = self.gpts_app.app_detail(gpts_name)
team_context = gpt_app.team_context
from dbgpt.core.awel import CommonLLMHttpRequestBody
flow_req = CommonLLMHttpRequestBody(
model=ext_info.get("model_name", None),
messages=user_query,
stream=stream,
# context=flow_ctx,
# temperature=
# max_new_tokens=
# enable_vis=
conv_uid=conv_uid,
app_code=gpts_name,
span_id=root_tracer.get_current_span_id(),
chat_mode=ext_info.get("chat_mode", None),
chat_param=team_context.uid,
user_name=user_code,
sys_code=sys_code,
incremental=ext_info.get("incremental", True),
)
from dbgpt_app.openapi.api_v1.api_v1 import get_chat_flow
flow_service = get_chat_flow()
async for chunk in flow_service.chat_stream_flow_str(
team_context.uid, flow_req
):
yield None, chunk, conv_uid
async def app_agent_chat(
self,
@@ -420,60 +471,102 @@ class MultiAgents(BaseComponent, ABC):
):
# logger.info(f"app_agent_chat:{gpts_name},{user_query},{conv_uid}")
# Temporary compatible scenario messages
conv_serve = ConversationServe.get_instance(CFG.SYSTEM_APP)
current_message: StorageConversation = _build_conversation(
conv_id=conv_uid,
select_param=gpts_name,
summary=user_query,
model_name="",
app_code=gpts_name,
conv_serve=conv_serve,
user_name=user_code,
)
current_message.save_to_storage()
current_message.start_new_round()
current_message.add_user_message(user_query)
agent_conv_id = None
agent_task = None
default_final_message = None
try:
async for task, chunk, agent_conv_id in multi_agents.agent_chat_v2(
conv_uid,
current_message.chat_order,
gpts_name,
user_query,
user_code,
sys_code,
enable_verbose=enable_verbose,
stream=stream,
**ext_info,
):
agent_task = task
default_final_message = chunk
yield chunk
if self.is_flow_chat(gpts_name=gpts_name):
try:
async for (
task,
chunk,
agent_conv_id,
) in multi_agents.app_agent_flow_chat(
conv_uid,
gpts_name,
user_query,
user_code,
sys_code,
enable_verbose=enable_verbose,
stream=stream,
**ext_info,
):
agent_task = task
default_final_message = chunk
yield chunk
except asyncio.CancelledError:
# Client disconnects
print("Client disconnected")
if agent_task:
logger.info(f"Chat to App {gpts_name}:{agent_conv_id} Cancel!")
agent_task.cancel()
except Exception as e:
logger.exception(f"Chat to App {gpts_name} Failed!" + str(e))
raise
finally:
logger.info(f"save agent chat info{conv_uid}")
if agent_task:
final_message = await self.stable_message(agent_conv_id)
if final_message:
current_message.add_view_message(final_message)
else:
default_final_message = default_final_message.replace("data:", "")
current_message.add_view_message(default_final_message)
except asyncio.CancelledError:
# Client disconnects
print("Client disconnected")
if agent_task:
logger.info(f"Chat to App {gpts_name}:{agent_conv_id} Cancel!")
agent_task.cancel()
except Exception as e:
logger.exception(f"Chat to App {gpts_name} Failed!" + str(e))
raise
# finally:
# logger.info(f"save agent chat info{conv_uid}")
# if agent_task:
# final_message = await self.stable_message(agent_conv_id)
# if final_message:
# current_message.add_view_message(final_message)
# else:
# default_final_message = default_final_message.replace("data:", "")
# current_message.add_view_message(default_final_message)
current_message.end_current_round()
# current_message.end_current_round()
# current_message.save_to_storage()
else:
# Temporary compatible scenario messages
conv_serve = ConversationServe.get_instance(CFG.SYSTEM_APP)
current_message: StorageConversation = _build_conversation(
conv_id=conv_uid,
select_param=gpts_name,
summary=user_query,
model_name="",
app_code=gpts_name,
conv_serve=conv_serve,
user_name=user_code,
)
current_message.save_to_storage()
current_message.start_new_round()
current_message.add_user_message(user_query)
agent_conv_id = None
agent_task = None
default_final_message = None
try:
async for task, chunk, agent_conv_id in multi_agents.agent_chat_v2(
conv_uid,
current_message.chat_order,
gpts_name,
user_query,
user_code,
sys_code,
enable_verbose=enable_verbose,
stream=stream,
**ext_info,
):
agent_task = task
default_final_message = chunk
yield chunk
except asyncio.CancelledError:
# Client disconnects
print("Client disconnected")
if agent_task:
logger.info(f"Chat to App {gpts_name}:{agent_conv_id} Cancel!")
agent_task.cancel()
except Exception as e:
logger.exception(f"Chat to App {gpts_name} Failed!" + str(e))
raise
finally:
logger.info(f"save agent chat info{conv_uid}")
if agent_task:
final_message = await self.stable_message(agent_conv_id)
if final_message:
current_message.add_view_message(final_message)
else:
default_final_message = default_final_message.replace("data:", "")
current_message.add_view_message(default_final_message)
current_message.end_current_round()
current_message.save_to_storage()
async def agent_team_chat_new(
self,

View File

@@ -77,17 +77,28 @@ const useChat = ({ queryAgentURL = '/api/v1/chat/completions', app_code }: Props
},
onmessage: event => {
let message = event.data;
let needReplaceNewline = false;
let parsedData;
try {
parsedData = JSON.parse(message);
if (scene === 'chat_agent') {
message = JSON.parse(message).vis;
if (parsedData.vis) {
message = parsedData.vis;
} else {
needReplaceNewline = true;
message = parsedData.choices?.[0]?.message?.content;
}
} else {
data = JSON.parse(event.data);
message = data.choices?.[0]?.message?.content;
message = parsedData.choices?.[0]?.message?.content;
}
} catch {
message.replaceAll('\\n', '\n');
}
if (typeof message === 'string') {
if (needReplaceNewline) {
message = message.replaceAll('\\n', '\n');
}
if (message === '[DONE]') {
onDone?.();
} else if (message?.startsWith('[ERROR]')) {