diff --git a/packages/dbgpt-app/src/dbgpt_app/openapi/api_v1/api_v1.py b/packages/dbgpt-app/src/dbgpt_app/openapi/api_v1/api_v1.py index 3c42b9f03..154fdf57e 100644 --- a/packages/dbgpt-app/src/dbgpt_app/openapi/api_v1/api_v1.py +++ b/packages/dbgpt-app/src/dbgpt_app/openapi/api_v1/api_v1.py @@ -535,6 +535,7 @@ async def chat_completions( user_query=dialogue.user_input, user_code=dialogue.user_name, sys_code=dialogue.sys_code, + app_code=dialogue.app_code, **dialogue.ext_info, ), headers=headers, @@ -551,6 +552,7 @@ async def chat_completions( chat_param=dialogue.select_param, user_name=dialogue.user_name, sys_code=dialogue.sys_code, + app_code=dialogue.app_code, incremental=dialogue.incremental, ) return StreamingResponse( diff --git a/packages/dbgpt-app/src/dbgpt_app/operators/llm.py b/packages/dbgpt-app/src/dbgpt_app/operators/llm.py index f8bb48181..cdaa96c8e 100644 --- a/packages/dbgpt-app/src/dbgpt_app/operators/llm.py +++ b/packages/dbgpt-app/src/dbgpt_app/operators/llm.py @@ -215,6 +215,7 @@ class BaseHOLLMOperator( chat_mode=req.chat_mode, user_name=req.user_name, sys_code=req.sys_code, + app_code=req.app_code, conv_storage=self.storage, message_storage=self.message_storage, param_type="", diff --git a/packages/dbgpt-core/src/dbgpt/core/awel/trigger/http_trigger.py b/packages/dbgpt-core/src/dbgpt/core/awel/trigger/http_trigger.py index 1e2449952..0e02df8be 100644 --- a/packages/dbgpt-core/src/dbgpt/core/awel/trigger/http_trigger.py +++ b/packages/dbgpt-core/src/dbgpt/core/awel/trigger/http_trigger.py @@ -255,6 +255,7 @@ class CommonLLMHttpRequestBody(BaseHttpBody): conv_uid: Optional[str] = Field( default=None, description="The conversation id of the model inference" ) + app_code: Optional[str] = Field(default=None, description="The app id of the app") span_id: Optional[str] = Field( default=None, description="The span id of the model inference" ) diff --git a/packages/dbgpt-serve/src/dbgpt_serve/agent/agents/controller.py b/packages/dbgpt-serve/src/dbgpt_serve/agent/agents/controller.py index 2cde0f370..8476d25a6 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/agent/agents/controller.py +++ b/packages/dbgpt-serve/src/dbgpt_serve/agent/agents/controller.py @@ -277,135 +277,186 @@ class MultiAgents(BaseComponent, ABC): ) ) - if ( - TeamMode.AWEL_LAYOUT.value == gpt_app.team_mode - and gpt_app.team_context.flow_category == FlowCategory.CHAT_FLOW - ): - team_context = gpt_app.team_context - from dbgpt.core.awel import CommonLLMHttpRequestBody + # if ( + # TeamMode.AWEL_LAYOUT.value == gpt_app.team_mode + # and gpt_app.team_context.flow_category == FlowCategory.CHAT_FLOW + # ): + # team_context = gpt_app.team_context + # from dbgpt.core.awel import CommonLLMHttpRequestBody - flow_req = CommonLLMHttpRequestBody( - model=ext_info.get("model_name", None), - messages=user_query, - stream=True, - # context=flow_ctx, - # temperature= - # max_new_tokens= - # enable_vis= - conv_uid=agent_conv_id, - span_id=root_tracer.get_current_span_id(), - chat_mode=ext_info.get("chat_mode", None), - chat_param=team_context.uid, - user_name=user_code, - sys_code=sys_code, - incremental=ext_info.get("incremental", True), - ) - from dbgpt_app.openapi.api_v1.api_v1 import get_chat_flow + # flow_req = CommonLLMHttpRequestBody( + # model=ext_info.get("model_name", None), + # messages=user_query, + # stream=True, + # # context=flow_ctx, + # # temperature= + # # max_new_tokens= + # # enable_vis= + # conv_uid=agent_conv_id, + # app_code=gpts_name, + # span_id=root_tracer.get_current_span_id(), + # chat_mode=ext_info.get("chat_mode", None), + # chat_param=team_context.uid, + # user_name=user_code, + # sys_code=sys_code, + # incremental=ext_info.get("incremental", True), + # ) + # from dbgpt_app.openapi.api_v1.api_v1 import get_chat_flow - flow_service = get_chat_flow() - async for chunk in flow_service.chat_stream_flow_str( - team_context.uid, flow_req - ): - yield None, chunk, agent_conv_id - else: - # init gpts memory - self.memory.init( - agent_conv_id, - enable_vis_message=enable_verbose, - history_messages=history_messages, - start_round=history_message_count, - ) - # init agent memory - agent_memory = self.get_or_build_agent_memory(conv_id, gpts_name) + # flow_service = get_chat_flow() + # async for chunk in flow_service.chat_stream_flow_str( + # team_context.uid, flow_req + # ): + # yield None, chunk, agent_conv_id + # else: + # init gpts memory + self.memory.init( + agent_conv_id, + enable_vis_message=enable_verbose, + history_messages=history_messages, + start_round=history_message_count, + ) + # init agent memory + agent_memory = self.get_or_build_agent_memory(conv_id, gpts_name) - task = None - try: - task = asyncio.create_task( - multi_agents.agent_team_chat_new( - user_query, - agent_conv_id, - gpt_app, - agent_memory, - is_retry_chat, - last_speaker_name=last_speaker_name, - init_message_rounds=message_round, - enable_verbose=enable_verbose, - historical_dialogues=historical_dialogues, - **ext_info, - ) + task = None + try: + task = asyncio.create_task( + multi_agents.agent_team_chat_new( + user_query, + agent_conv_id, + gpt_app, + agent_memory, + is_retry_chat, + last_speaker_name=last_speaker_name, + init_message_rounds=message_round, + enable_verbose=enable_verbose, + historical_dialogues=historical_dialogues, + **ext_info, ) - if enable_verbose: - async for chunk in multi_agents.chat_messages(agent_conv_id): - if chunk: - try: - chunk = json.dumps( - {"vis": chunk}, - default=serialize, - ensure_ascii=False, - ) - if chunk is None or len(chunk) <= 0: - continue - resp = f"data:{chunk}\n\n" - yield task, resp, agent_conv_id - except Exception as e: - logger.exception( - f"get messages {gpts_name} Exception!" + str(e) - ) - yield f"data: {str(e)}\n\n" + ) + if enable_verbose: + async for chunk in multi_agents.chat_messages(agent_conv_id): + if chunk: + try: + chunk = json.dumps( + {"vis": chunk}, + default=serialize, + ensure_ascii=False, + ) + if chunk is None or len(chunk) <= 0: + continue + resp = f"data:{chunk}\n\n" + yield task, resp, agent_conv_id + except Exception as e: + logger.exception( + f"get messages {gpts_name} Exception!" + str(e) + ) + yield f"data: {str(e)}\n\n" - yield ( - task, - _format_vis_msg("[DONE]"), - agent_conv_id, - ) + yield ( + task, + _format_vis_msg("[DONE]"), + agent_conv_id, + ) - else: - logger.info( - f"{agent_conv_id}开启简略消息模式,不进行vis协议封装,获取极简流式消息直接输出" - ) - # 开启简略消息模式,不进行vis协议封装,获取极简流式消息直接输出 - final_message_chunk = None - async for chunk in multi_agents.chat_messages(agent_conv_id): - if chunk: - try: - if chunk is None or len(chunk) <= 0: - continue - final_message_chunk = chunk[-1] - if stream: - yield task, final_message_chunk, agent_conv_id - logger.info( - "agent_chat_v2 executing, timestamp=" - f"{int(time.time() * 1000)}" - ) - except Exception as e: - logger.exception( - f"get messages {gpts_name} Exception!" + str(e) - ) - final_message_chunk = str(e) + else: + logger.info( + f"{agent_conv_id}开启简略消息模式,不进行vis协议封装,获取极简流式消息直接输出" + ) + # 开启简略消息模式,不进行vis协议封装,获取极简流式消息直接输出 + final_message_chunk = None + async for chunk in multi_agents.chat_messages(agent_conv_id): + if chunk: + try: + if chunk is None or len(chunk) <= 0: + continue + final_message_chunk = chunk[-1] + if stream: + yield task, final_message_chunk, agent_conv_id + logger.info( + "agent_chat_v2 executing, timestamp=" + f"{int(time.time() * 1000)}" + ) + except Exception as e: + logger.exception( + f"get messages {gpts_name} Exception!" + str(e) + ) + final_message_chunk = str(e) - logger.info( - f"agent_chat_v2 finish, timestamp={int(time.time() * 1000)}" - ) - yield task, final_message_chunk, agent_conv_id + logger.info( + f"agent_chat_v2 finish, timestamp={int(time.time() * 1000)}" + ) + yield task, final_message_chunk, agent_conv_id - except Exception as e: - logger.exception(f"Agent chat have error!{str(e)}") - if enable_verbose: - yield ( - task, - _format_vis_msg("[DONE]"), - agent_conv_id, - ) - yield ( - task, - _format_vis_msg("[DONE]"), - agent_conv_id, - ) - else: - yield task, str(e), agent_conv_id + except Exception as e: + logger.exception(f"Agent chat have error!{str(e)}") + if enable_verbose: + yield ( + task, + _format_vis_msg("[DONE]"), + agent_conv_id, + ) + yield ( + task, + _format_vis_msg("[DONE]"), + agent_conv_id, + ) + else: + yield task, str(e), agent_conv_id - finally: - self.memory.clear(agent_conv_id) + finally: + self.memory.clear(agent_conv_id) + + def is_flow_chat(self, gpts_name: str): + gpt_app: GptsApp = self.gpts_app.app_detail(gpts_name) + if gpt_app: + if ( + TeamMode.AWEL_LAYOUT.value == gpt_app.team_mode + and gpt_app.team_context.flow_category == FlowCategory.CHAT_FLOW + ): + return True + return False + + async def app_agent_flow_chat( + self, + conv_uid: str, + gpts_name: str, + user_query: str, + user_code: str = None, + sys_code: str = None, + enable_verbose: bool = True, + stream: Optional[bool] = True, + **ext_info, + ): + gpt_app: GptsApp = self.gpts_app.app_detail(gpts_name) + team_context = gpt_app.team_context + from dbgpt.core.awel import CommonLLMHttpRequestBody + + flow_req = CommonLLMHttpRequestBody( + model=ext_info.get("model_name", None), + messages=user_query, + stream=stream, + # context=flow_ctx, + # temperature= + # max_new_tokens= + # enable_vis= + conv_uid=conv_uid, + app_code=gpts_name, + span_id=root_tracer.get_current_span_id(), + chat_mode=ext_info.get("chat_mode", None), + chat_param=team_context.uid, + user_name=user_code, + sys_code=sys_code, + incremental=ext_info.get("incremental", True), + ) + from dbgpt_app.openapi.api_v1.api_v1 import get_chat_flow + + flow_service = get_chat_flow() + async for chunk in flow_service.chat_stream_flow_str( + team_context.uid, flow_req + ): + yield None, chunk, conv_uid async def app_agent_chat( self, @@ -420,60 +471,102 @@ class MultiAgents(BaseComponent, ABC): ): # logger.info(f"app_agent_chat:{gpts_name},{user_query},{conv_uid}") - # Temporary compatible scenario messages - conv_serve = ConversationServe.get_instance(CFG.SYSTEM_APP) - current_message: StorageConversation = _build_conversation( - conv_id=conv_uid, - select_param=gpts_name, - summary=user_query, - model_name="", - app_code=gpts_name, - conv_serve=conv_serve, - user_name=user_code, - ) - current_message.save_to_storage() - current_message.start_new_round() - current_message.add_user_message(user_query) - agent_conv_id = None - agent_task = None - default_final_message = None - try: - async for task, chunk, agent_conv_id in multi_agents.agent_chat_v2( - conv_uid, - current_message.chat_order, - gpts_name, - user_query, - user_code, - sys_code, - enable_verbose=enable_verbose, - stream=stream, - **ext_info, - ): - agent_task = task - default_final_message = chunk - yield chunk + if self.is_flow_chat(gpts_name=gpts_name): + try: + async for ( + task, + chunk, + agent_conv_id, + ) in multi_agents.app_agent_flow_chat( + conv_uid, + gpts_name, + user_query, + user_code, + sys_code, + enable_verbose=enable_verbose, + stream=stream, + **ext_info, + ): + agent_task = task + default_final_message = chunk + yield chunk - except asyncio.CancelledError: - # Client disconnects - print("Client disconnected") - if agent_task: - logger.info(f"Chat to App {gpts_name}:{agent_conv_id} Cancel!") - agent_task.cancel() - except Exception as e: - logger.exception(f"Chat to App {gpts_name} Failed!" + str(e)) - raise - finally: - logger.info(f"save agent chat info!{conv_uid}") - if agent_task: - final_message = await self.stable_message(agent_conv_id) - if final_message: - current_message.add_view_message(final_message) - else: - default_final_message = default_final_message.replace("data:", "") - current_message.add_view_message(default_final_message) + except asyncio.CancelledError: + # Client disconnects + print("Client disconnected") + if agent_task: + logger.info(f"Chat to App {gpts_name}:{agent_conv_id} Cancel!") + agent_task.cancel() + except Exception as e: + logger.exception(f"Chat to App {gpts_name} Failed!" + str(e)) + raise + # finally: + # logger.info(f"save agent chat info!{conv_uid}") + # if agent_task: + # final_message = await self.stable_message(agent_conv_id) + # if final_message: + # current_message.add_view_message(final_message) + # else: + # default_final_message = default_final_message.replace("data:", "") + # current_message.add_view_message(default_final_message) - current_message.end_current_round() + # current_message.end_current_round() + # current_message.save_to_storage() + else: + # Temporary compatible scenario messages + conv_serve = ConversationServe.get_instance(CFG.SYSTEM_APP) + current_message: StorageConversation = _build_conversation( + conv_id=conv_uid, + select_param=gpts_name, + summary=user_query, + model_name="", + app_code=gpts_name, + conv_serve=conv_serve, + user_name=user_code, + ) current_message.save_to_storage() + current_message.start_new_round() + current_message.add_user_message(user_query) + agent_conv_id = None + agent_task = None + default_final_message = None + try: + async for task, chunk, agent_conv_id in multi_agents.agent_chat_v2( + conv_uid, + current_message.chat_order, + gpts_name, + user_query, + user_code, + sys_code, + enable_verbose=enable_verbose, + stream=stream, + **ext_info, + ): + agent_task = task + default_final_message = chunk + yield chunk + + except asyncio.CancelledError: + # Client disconnects + print("Client disconnected") + if agent_task: + logger.info(f"Chat to App {gpts_name}:{agent_conv_id} Cancel!") + agent_task.cancel() + except Exception as e: + logger.exception(f"Chat to App {gpts_name} Failed!" + str(e)) + raise + finally: + logger.info(f"save agent chat info!{conv_uid}") + if agent_task: + final_message = await self.stable_message(agent_conv_id) + if final_message: + current_message.add_view_message(final_message) + else: + default_final_message = default_final_message.replace("data:", "") + current_message.add_view_message(default_final_message) + + current_message.end_current_round() + current_message.save_to_storage() async def agent_team_chat_new( self, diff --git a/web/hooks/use-chat.ts b/web/hooks/use-chat.ts index 1f7e42bef..67a30816b 100644 --- a/web/hooks/use-chat.ts +++ b/web/hooks/use-chat.ts @@ -77,17 +77,28 @@ const useChat = ({ queryAgentURL = '/api/v1/chat/completions', app_code }: Props }, onmessage: event => { let message = event.data; + let needReplaceNewline = false; + let parsedData; + try { + parsedData = JSON.parse(message); if (scene === 'chat_agent') { - message = JSON.parse(message).vis; + if (parsedData.vis) { + message = parsedData.vis; + } else { + needReplaceNewline = true; + message = parsedData.choices?.[0]?.message?.content; + } } else { - data = JSON.parse(event.data); - message = data.choices?.[0]?.message?.content; + message = parsedData.choices?.[0]?.message?.content; } } catch { message.replaceAll('\\n', '\n'); } if (typeof message === 'string') { + if (needReplaceNewline) { + message = message.replaceAll('\\n', '\n'); + } if (message === '[DONE]') { onDone?.(); } else if (message?.startsWith('[ERROR]')) {