diff --git a/docker/base/build_image.sh b/docker/base/build_image.sh index 9ecd9db9d..32846936f 100755 --- a/docker/base/build_image.sh +++ b/docker/base/build_image.sh @@ -15,7 +15,7 @@ IMAGE_NAME_ARGS="" PIP_INDEX_URL="https://pypi.org/simple" # en or zh LANGUAGE="en" -BUILD_LOCAL_CODE="false" +BUILD_LOCAL_CODE="true" LOAD_EXAMPLES="true" BUILD_NETWORK="" DB_GPT_INSTALL_MODEL="default" @@ -26,7 +26,7 @@ usage () { echo " [-n|--image-name image name] Current image name, default: db-gpt" echo " [-i|--pip-index-url pip index url] Pip index url, default: https://pypi.org/simple" echo " [--language en or zh] You language, default: en" - echo " [--build-local-code true or false] Whether to use the local project code to package the image, default: false" + echo " [--build-local-code true or false] Whether to use the local project code to package the image, default: true" echo " [--load-examples true or false] Whether to load examples to default database default: true" echo " [--network network name] The network of docker build" echo " [--install-mode mode name] Installation mode name, default: default, If you completely use openai's service, you can set the mode name to 'openai'" diff --git a/docs/getting_started/observability.md b/docs/getting_started/observability.md new file mode 100644 index 000000000..33b2a4017 --- /dev/null +++ b/docs/getting_started/observability.md @@ -0,0 +1,309 @@ +# Debugging +------------- + +DB-GPT provides a set of tools to help you troubleshoot and resolve some of the issues you may encounter. + + +## Trace Logs + +DB-GPT writes some critical system runtime information to trace logs. By default, these are located in `logs/dbgpt*.jsonl`. + +DB-GPT also offers a command-line tool, `dbgpt trace`, to help you analyze these trace logs. You can see its specific usage with the command `dbgpt trace --help`. + + +## Viewing Chat Details + +You can use the `dbgpt trace chat` command to view chat details. By default, it will display the latest chat message. + +### Viewing Service Runtime Information + +```bash +dbgpt trace chat --hide_conv +``` + +You will see an output like: + +``` ++------------------------+--------------------------+-----------------------------+------------------------------------+ +| Config Key (Webserver) | Config Value (Webserver) | Config Key (EmbeddingModel) | Config Value (EmbeddingModel) | ++------------------------+--------------------------+-----------------------------+------------------------------------+ +| host | 0.0.0.0 | model_name | text2vec | +| port | 5000 | model_path | /app/models/text2vec-large-chinese | +| daemon | False | device | cuda | +| share | False | normalize_embeddings | None | +| remote_embedding | False | | | +| log_level | None | | | +| light | False | | | ++------------------------+--------------------------+-----------------------------+------------------------------------+ ++--------------------------+-----------------------------+----------------------------+------------------------------+ +| Config Key (ModelWorker) | Config Value (ModelWorker) | Config Key (WorkerManager) | Config Value (WorkerManager) | ++--------------------------+-----------------------------+----------------------------+------------------------------+ +| model_name | vicuna-13b-v1.5 | model_name | vicuna-13b-v1.5 | +| model_path | /app/models/vicuna-13b-v1.5 | model_path | /app/models/vicuna-13b-v1.5 | +| device | cuda | worker_type | None | +| model_type | huggingface | worker_class | None | +| prompt_template | None | model_type | huggingface | +| max_context_size | 4096 | host | 0.0.0.0 | +| num_gpus | None | port | 5000 | +| max_gpu_memory | None | daemon | False | +| cpu_offloading | False | limit_model_concurrency | 5 | +| load_8bit | False | standalone | True | +| load_4bit | False | register | True | +| quant_type | nf4 | worker_register_host | None | +| use_double_quant | True | controller_addr | http://127.0.0.1:5000 | +| compute_dtype | None | send_heartbeat | True | +| trust_remote_code | True | heartbeat_interval | 20 | +| verbose | False | log_level | None | ++--------------------------+-----------------------------+----------------------------+------------------------------+ +``` + +### Viewing the Latest Chat Message + +```bash +dbgpt trace chat --hide_run_params +``` + +You will see an output like: + +``` ++-------------------------------------------------------------------------------------------------------------------------------------------+ +| Chat Trace Details | ++----------------+--------------------------------------------------------------------------------------------------------------------------+ +| Key | Value Value | ++----------------+--------------------------------------------------------------------------------------------------------------------------+ +| trace_id | 5d1900c3-5aad-4159-9946-fbb600666530 | +| span_id | 5d1900c3-5aad-4159-9946-fbb600666530:14772034-bed4-4b4e-b43f-fcf3a8aad6a7 | +| conv_uid | 5e456272-68ac-11ee-9fba-0242ac150003 | +| user_input | Who are you? | +| chat_mode | chat_normal | +| select_param | None | +| model_name | vicuna-13b-v1.5 | +| temperature | 0.6 | +| max_new_tokens | 1024 | +| echo | False | +| llm_adapter | FastChatLLMModelAdaperWrapper(fastchat.model.model_adapter.VicunaAdapter) | +| User prompt | A chat between a curious user and an artificial intelligence assistant. The assistant gives helpful, detailed, and polit | +| | e answers to the user's questions. USER: Who are you? ASSISTANT: | +| Model output | You can call me Vicuna, and I was trained by Large Model Systems Organization (LMSYS) researchers as a language model. | ++----------------+--------------------------------------------------------------------------------------------------------------------------+ +``` + + +### Viewing Chat Details and Call Chain + +```bash +dbgpt trace chat --hide_run_params --tree +``` + +You will see an output like: + +``` + +Invoke Trace Tree: + +Operation: DB-GPT-Web-Entry (Start: 2023-10-12 03:06:43.180, End: None) + Operation: get_chat_instance (Start: 2023-10-12 03:06:43.258, End: None) + Operation: get_chat_instance (Start: 2023-10-12 03:06:43.258, End: 2023-10-12 03:06:43.424) + Operation: stream_generator (Start: 2023-10-12 03:06:43.425, End: None) + Operation: BaseChat.stream_call (Start: 2023-10-12 03:06:43.426, End: None) + Operation: WorkerManager.generate_stream (Start: 2023-10-12 03:06:43.426, End: None) + Operation: DefaultModelWorker.generate_stream (Start: 2023-10-12 03:06:43.428, End: None) + Operation: DefaultModelWorker_call.generate_stream_func (Start: 2023-10-12 03:06:43.430, End: None) + Operation: DefaultModelWorker_call.generate_stream_func (Start: 2023-10-12 03:06:43.430, End: 2023-10-12 03:06:48.518) + Operation: DefaultModelWorker.generate_stream (Start: 2023-10-12 03:06:43.428, End: 2023-10-12 03:06:48.518) + Operation: WorkerManager.generate_stream (Start: 2023-10-12 03:06:43.426, End: 2023-10-12 03:06:48.518) + Operation: BaseChat.stream_call (Start: 2023-10-12 03:06:43.426, End: 2023-10-12 03:06:48.519) + Operation: stream_generator (Start: 2023-10-12 03:06:43.425, End: 2023-10-12 03:06:48.519) +Operation: DB-GPT-Web-Entry (Start: 2023-10-12 03:06:43.180, End: 2023-10-12 03:06:43.257) ++-------------------------------------------------------------------------------------------------------------------------------------------+ +| Chat Trace Details | ++----------------+--------------------------------------------------------------------------------------------------------------------------+ +| Key | Value Value | ++----------------+--------------------------------------------------------------------------------------------------------------------------+ +| trace_id | 5d1900c3-5aad-4159-9946-fbb600666530 | +| span_id | 5d1900c3-5aad-4159-9946-fbb600666530:14772034-bed4-4b4e-b43f-fcf3a8aad6a7 | +| conv_uid | 5e456272-68ac-11ee-9fba-0242ac150003 | +| user_input | Who are you? | +| chat_mode | chat_normal | +| select_param | None | +| model_name | vicuna-13b-v1.5 | +| temperature | 0.6 | +| max_new_tokens | 1024 | +| echo | False | +| llm_adapter | FastChatLLMModelAdaperWrapper(fastchat.model.model_adapter.VicunaAdapter) | +| User prompt | A chat between a curious user and an artificial intelligence assistant. The assistant gives helpful, detailed, and polit | +| | e answers to the user's questions. USER: Who are you? ASSISTANT: | +| Model output | You can call me Vicuna, and I was trained by Large Model Systems Organization (LMSYS) researchers as a language model. | ++----------------+--------------------------------------------------------------------------------------------------------------------------+ +``` + +### Viewing Chat Details Based on trace_id + +```bash +dbgpt trace chat --hide_run_params --trace_id ec30d733-7b35-4d61-b02e-2832fd2e29ff +``` + +You will see an output like: + +``` ++-------------------------------------------------------------------------------------------------------------------------------------------+ +| Chat Trace Details | ++----------------+--------------------------------------------------------------------------------------------------------------------------+ +| Key | Value Value | ++----------------+--------------------------------------------------------------------------------------------------------------------------+ +| trace_id | ec30d733-7b35-4d61-b02e-2832fd2e29ff | +| span_id | ec30d733-7b35-4d61-b02e-2832fd2e29ff:0482a0c5-38b3-4b38-8101-e42489f90ccd | +| conv_uid | 87a722de-68ae-11ee-9fba-0242ac150003 | +| user_input | Hello | +| chat_mode | chat_normal | +| select_param | None | +| model_name | vicuna-13b-v1.5 | +| temperature | 0.6 | +| max_new_tokens | 1024 | +| echo | False | +| llm_adapter | FastChatLLMModelAdaperWrapper(fastchat.model.model_adapter.VicunaAdapter) | +| User prompt | A chat between a curious user and an artificial intelligence assistant. The assistant gives helpful, detailed, and polit | +| | e answers to the user's questions. USER: Hello ASSISTANT: | +| Model output | Hello! How can I help you today? Is there something specific you want to know or talk about? I'm here to answer any ques | +| | tions you might have, to the best of my ability. | ++----------------+--------------------------------------------------------------------------------------------------------------------------+ +``` + +### More `chat` Usage + +```bash +dbgpt trace chat --help +``` + +``` +Usage: dbgpt trace chat [OPTIONS] [FILES]... + + Show conversation details + +Options: + --trace_id TEXT Specify the trace ID to analyze. If None, + show latest conversation details + --tree Display trace spans as a tree + --hide_conv Hide your conversation details + --hide_run_params Hide run params + --output [text|html|csv|latex|json] + The output format + --help Show this message and exit. +``` + +## Viewing Call Tree Based on `trace_id` + +```bash +dbgpt trace tree --trace_id ec30d733-7b35-4d61-b02e-2832fd2e29ff +``` + +You will see an output like: + +``` +Operation: DB-GPT-Web-Entry (Start: 2023-10-12 03:22:10.592, End: None) + Operation: get_chat_instance (Start: 2023-10-12 03:22:10.594, End: None) + Operation: get_chat_instance (Start: 2023-10-12 03:22:10.594, End: 2023-10-12 03:22:10.658) + Operation: stream_generator (Start: 2023-10-12 03:22:10.659, End: None) + Operation: BaseChat.stream_call (Start: 2023-10-12 03:22:10.659, End: None) + Operation: WorkerManager.generate_stream (Start: 2023-10-12 03:22:10.660, End: None) + Operation: DefaultModelWorker.generate_stream (Start: 2023-10-12 03:22:10.675, End: None) + Operation: DefaultModelWorker_call.generate_stream_func (Start: 2023-10-12 03:22:10.676, End: None) + Operation: DefaultModelWorker_call.generate_stream_func (Start: 2023-10-12 03:22:10.676, End: 2023-10-12 03:22:16.130) + Operation: DefaultModelWorker.generate_stream (Start: 2023-10-12 03:22:10.675, End: 2023-10-12 03:22:16.130) + Operation: WorkerManager.generate_stream (Start: 2023-10-12 03:22:10.660, End: 2023-10-12 03:22:16.130) + Operation: BaseChat.stream_call (Start: 2023-10-12 03:22:10.659, End: 2023-10-12 03:22:16.130) + Operation: stream_generator (Start: 2023-10-12 03:22:10.659, End: 2023-10-12 03:22:16.130) +Operation: DB-GPT-Web-Entry (Start: 2023-10-12 03:22:10.592, End: 2023-10-12 03:22:10.673) +``` + + +## Listing Trace Information + +### Listing All Trace Information + + +```bash +dbgpt trace list +``` + +You will see an output like: +``` ++--------------------------------------+---------------------------------------------------------------------------+-----------------------------------+------------------+ +| Trace ID | Span ID | Operation Name | Conversation UID | ++--------------------------------------+---------------------------------------------------------------------------+-----------------------------------+------------------+ +| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:f650065f-f761-4790-99f7-8109c15f756a | run_webserver | None | +| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:b2ff279e-0557-4b2d-8959-85e25dcfe94e | EmbeddingLoader.load | None | +| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:b2ff279e-0557-4b2d-8959-85e25dcfe94e | EmbeddingLoader.load | None | +| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:3e8b1b9d-5ef2-4382-af62-6b2b21cc04fd | WorkerManager._start_local_worker | None | +| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:3e8b1b9d-5ef2-4382-af62-6b2b21cc04fd | WorkerManager._start_local_worker | None | +| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:4c280ec9-0fd6-4ee8-b79f-1afcab0f9901 | DefaultModelWorker.start | None | ++--------------------------------------+---------------------------------------------------------------------------+-----------------------------------+------------------+ +``` + +### Listing Trace Information by Trace Type + +```bash +dbgpt trace list --span_type chat +``` + +You will see an output like: +``` ++--------------------------------------+---------------------------------------------------------------------------+-------------------+--------------------------------------+ +| Trace ID | Span ID | Operation Name | Conversation UID | ++--------------------------------------+---------------------------------------------------------------------------+-------------------+--------------------------------------+ +| 5d1900c3-5aad-4159-9946-fbb600666530 | 5d1900c3-5aad-4159-9946-fbb600666530:14772034-bed4-4b4e-b43f-fcf3a8aad6a7 | get_chat_instance | 5e456272-68ac-11ee-9fba-0242ac150003 | +| 5d1900c3-5aad-4159-9946-fbb600666530 | 5d1900c3-5aad-4159-9946-fbb600666530:14772034-bed4-4b4e-b43f-fcf3a8aad6a7 | get_chat_instance | 5e456272-68ac-11ee-9fba-0242ac150003 | +| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:0482a0c5-38b3-4b38-8101-e42489f90ccd | get_chat_instance | 87a722de-68ae-11ee-9fba-0242ac150003 | +| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:0482a0c5-38b3-4b38-8101-e42489f90ccd | get_chat_instance | 87a722de-68ae-11ee-9fba-0242ac150003 | ++--------------------------------------+---------------------------------------------------------------------------+-------------------+--------------------------------------+ +``` + +### Searching Trace Information + +```bash +dbgpt trace list --search Hello +``` + +You will see an output like: +``` ++--------------------------------------+---------------------------------------------------------------------------+----------------------------------------------+--------------------------------------+ +| Trace ID | Span ID | Operation Name | Conversation UID | ++--------------------------------------+---------------------------------------------------------------------------+----------------------------------------------+--------------------------------------+ +| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:0482a0c5-38b3-4b38-8101-e42489f90ccd | get_chat_instance | 87a722de-68ae-11ee-9fba-0242ac150003 | +| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:0482a0c5-38b3-4b38-8101-e42489f90ccd | get_chat_instance | 87a722de-68ae-11ee-9fba-0242ac150003 | +| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:03de6c87-34d6-426a-85e8-7d46d475411e | BaseChat.stream_call | None | +| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:03de6c87-34d6-426a-85e8-7d46d475411e | BaseChat.stream_call | None | +| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:19593596-b4c7-4d15-a3c1-0924d86098dd | DefaultModelWorker_call.generate_stream_func | None | +| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:19593596-b4c7-4d15-a3c1-0924d86098dd | DefaultModelWorker_call.generate_stream_func | None | ++--------------------------------------+---------------------------------------------------------------------------+----------------------------------------------+--------------------------------------+ +``` + +### More `list` Usage + +```bash +dbgpt trace list --help +``` + +``` +Usage: dbgpt trace list [OPTIONS] [FILES]... + + List your trace spans + +Options: + --trace_id TEXT Specify the trace ID to list + --span_id TEXT Specify the Span ID to list. + --span_type TEXT Specify the Span Type to list. + --parent_span_id TEXT Specify the Parent Span ID to list. + --search TEXT Search trace_id, span_id, parent_span_id, + operation_name or content in metadata. + -l, --limit INTEGER Limit the number of recent span displayed. + --start_time TEXT Filter by start time. Format: "YYYY-MM-DD + HH:MM:SS.mmm" + --end_time TEXT Filter by end time. Format: "YYYY-MM-DD + HH:MM:SS.mmm" + --desc Whether to use reverse sorting. By default, + sorting is based on start time. + --output [text|html|csv|latex|json] + The output format + --help Show this message and exit. +``` \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index b343dc9f7..423f4a9a9 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -53,6 +53,7 @@ Getting Started getting_started/concepts.md getting_started/tutorials.md getting_started/faq.rst + getting_started/observability.md Modules diff --git a/docs/locales/zh_CN/LC_MESSAGES/getting_started/observability.po b/docs/locales/zh_CN/LC_MESSAGES/getting_started/observability.po new file mode 100644 index 000000000..e7654ecae --- /dev/null +++ b/docs/locales/zh_CN/LC_MESSAGES/getting_started/observability.po @@ -0,0 +1,121 @@ +# SOME DESCRIPTIVE TITLE. +# Copyright (C) 2023, csunny +# This file is distributed under the same license as the DB-GPT package. +# FIRST AUTHOR , 2023. +# +#, fuzzy +msgid "" +msgstr "" +"Project-Id-Version: DB-GPT 👏👏 0.3.9\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2023-10-12 11:54+0800\n" +"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" +"Last-Translator: FULL NAME \n" +"Language: zh_CN\n" +"Language-Team: zh_CN \n" +"Plural-Forms: nplurals=1; plural=0;\n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=utf-8\n" +"Content-Transfer-Encoding: 8bit\n" +"Generated-By: Babel 2.12.1\n" + +#: ../../getting_started/observability.md:1 c88ce18295444597baa7355efc79ab15 +msgid "Debugging" +msgstr "" + +#: ../../getting_started/observability.md:4 522a4e83c62c493381ca3a452ced8ecf +msgid "" +"DB-GPT provides a set of tools to help you troubleshoot and resolve some " +"of the issues you may encounter." +msgstr "" +"DB-GPT 提供了一套工具来帮助你排查和解决一些遇到的问题。" + +#: ../../getting_started/observability.md:7 7fe5b8ab29194e42b74a8ab3e77006c7 +msgid "Trace Logs" +msgstr "追踪日志" + +#: ../../getting_started/observability.md:9 8a55f7a2b5a247d49728969f179bf50d +msgid "" +"DB-GPT writes some critical system runtime information to trace logs. By " +"default, these are located in `logs/dbgpt*.jsonl`." +msgstr "" +"DB-GPT 会将一些系统运行的关键的信息写入到追踪日志中,默认情况下,在 `logs/dbgpt*.jsonl` 中。" + +#: ../../getting_started/observability.md:11 5e2b847179e9427a8ae022b5338cbbd9 +msgid "" +"DB-GPT also offers a command-line tool, `dbgpt trace`, to help you " +"analyze these trace logs. You can see its specific usage with the command" +" `dbgpt trace --help`." +msgstr "" +"DB-GPT 也提供了命令工具 `dbgpt trace` 命令来帮助你分析追踪日志,你可以使用命令 `dbgpt trace --help` 来查看具体的用法。" + +#: ../../getting_started/observability.md:14 6e0c9c4ba6ac4eb49d9289b0d63f77fb +msgid "查看对话详情" +msgstr "" + +#: ../../getting_started/observability.md:16 b9badbab74de47f192ff117d7d36fa72 +msgid "" +"You can use the `dbgpt trace chat` command to view chat details. By " +"default, it will display the latest chat message." +msgstr "" +"你可以使用 `dbgpt trace chat` 命令来查看对话信息,默认情况会显示你最新的一条对话信息。" + +#: ../../getting_started/observability.md:18 55c7466bd80d43c9a355d87daf2a2be7 +msgid "Viewing Service Runtime Information" +msgstr "查看服务运行信息" + +#: ../../getting_started/observability.md:24 +#: ../../getting_started/observability.md:66 +#: ../../getting_started/observability.md:98 +#: ../../getting_started/observability.md:146 +#: ../../getting_started/observability.md:200 +#: ../../getting_started/observability.md:229 +#: ../../getting_started/observability.md:249 +#: ../../getting_started/observability.md:267 18907a58a0c3493aa24c17e367309471 +#: 387f08b6cd864a7682522b5a40863e79 5fe0baa5803d4ca5ad0e8cbd8a859c8c +#: 7abbfdd996444999a24cbad852d2e545 957bdf6826e045608c8c3ebd06c8fe76 +#: acddf2cdf8c94bd6864d66739fa26459 f3fcaf2b47774779bad2feb3ef4318c4 +#: f99bb110024443f68cc8b7f19956eff4 +msgid "You will see an output like:" +msgstr "你将会看到类似的输出:" + +#: ../../getting_started/observability.md:60 5c8c213a5bac434bb3defe6611a03813 +msgid "Viewing the Latest Chat Message" +msgstr "查看最近的一条对话信息" + +#: ../../getting_started/observability.md:92 ce19873d8e754173849d14eaeab963d2 +msgid "Viewing Chat Details and Call Chain" +msgstr "查看对话信息和调用链路" + +#: ../../getting_started/observability.md:140 36bcc37971ce4d6682f1ea32e2e9a980 +msgid "Viewing Chat Details Based on trace_id" +msgstr "根据 `trace_id` 查看对应的对话信息" + +#: ../../getting_started/observability.md:172 c74968492f7544758c9d95fa831c4fcf +msgid "More `chat` Usage" +msgstr "更多 `chat` 用法" + +#: ../../getting_started/observability.md:194 c2e5a7e7b1ee40fea15790d66b79eb11 +msgid "Viewing Call Tree Based on `trace_id`" +msgstr "根据 `trace_id` 查看调用树" + +#: ../../getting_started/observability.md:220 6bd64d2ad0ce442e8e81aa1ae7dd2189 +msgid "Listing Trace Information" +msgstr "列出追踪信息" + +#: ../../getting_started/observability.md:222 ce643441e8744ab09fcbd4081d2adb4a +msgid "Listing All Trace Information" +msgstr "列出全部追踪信息" + +#: ../../getting_started/observability.md:243 374376d81ed54bc2a450505abfe7dc6d +msgid "Listing Trace Information by Trace Type" +msgstr "根据追踪类型列出追踪信息" + +#: ../../getting_started/observability.md:261 8fc08a9c924d47309dc7062811c4fb62 +msgid "Searching Trace Information" +msgstr "搜索追踪信息" + +#: ../../getting_started/observability.md:281 3681413d196144389431422010a7e30f +msgid "More `list` Usage" +msgstr "更多 `list` 用法" + diff --git a/pilot/component.py b/pilot/component.py index 3179fa696..88e48a422 100644 --- a/pilot/component.py +++ b/pilot/component.py @@ -47,6 +47,8 @@ class ComponentType(str, Enum): WORKER_MANAGER_FACTORY = "dbgpt_worker_manager_factory" MODEL_CONTROLLER = "dbgpt_model_controller" EXECUTOR_DEFAULT = "dbgpt_thread_pool_default" + TRACER = "dbgpt_tracer" + TRACER_SPAN_STORAGE = "dbgpt_tracer_span_storage" class BaseComponent(LifeCycle, ABC): @@ -70,6 +72,8 @@ class BaseComponent(LifeCycle, ABC): T = TypeVar("T", bound=BaseComponent) +_EMPTY_DEFAULT_COMPONENT = "_EMPTY_DEFAULT_COMPONENT" + class SystemApp(LifeCycle): """Main System Application class that manages the lifecycle and registration of components.""" @@ -104,13 +108,18 @@ class SystemApp(LifeCycle): instance.init_app(self) def get_component( - self, name: Union[str, ComponentType], component_type: Type[T] + self, + name: Union[str, ComponentType], + component_type: Type[T], + default_component=_EMPTY_DEFAULT_COMPONENT, ) -> T: """Retrieve a registered component by its name and type.""" if isinstance(name, ComponentType): name = name.value component = self.components.get(name) if not component: + if default_component != _EMPTY_DEFAULT_COMPONENT: + return default_component raise ValueError(f"No component found with name {name}") if not isinstance(component, component_type): raise TypeError(f"Component {name} is not of type {component_type}") diff --git a/pilot/model/adapter.py b/pilot/model/adapter.py index 763d27059..69b159a13 100644 --- a/pilot/model/adapter.py +++ b/pilot/model/adapter.py @@ -354,7 +354,7 @@ class LlamaCppAdapater(BaseLLMAdaper): if not path.is_file(): model_paths = list(path.glob("*ggml*.gguf")) if not model_paths: - return False + return False, None model_path = str(model_paths[0]) logger.warn( f"Model path {model_path} is not single file, use first *gglm*.gguf model file: {model_path}" diff --git a/pilot/model/base.py b/pilot/model/base.py index 697253f05..035cee044 100644 --- a/pilot/model/base.py +++ b/pilot/model/base.py @@ -53,6 +53,9 @@ class ModelOutput: error_code: int model_context: Dict = None + def to_dict(self) -> Dict: + return asdict(self) + @dataclass class WorkerApplyOutput: diff --git a/pilot/model/cluster/base.py b/pilot/model/cluster/base.py index 7d97e6bd9..9d22161b1 100644 --- a/pilot/model/cluster/base.py +++ b/pilot/model/cluster/base.py @@ -18,11 +18,13 @@ class PromptRequest(BaseModel): max_new_tokens: int = None stop: str = None echo: bool = True + span_id: str = None class EmbeddingsRequest(BaseModel): model: str input: List[str] + span_id: str = None class WorkerApplyRequest(BaseModel): diff --git a/pilot/model/cluster/embedding/loader.py b/pilot/model/cluster/embedding/loader.py index 63f6c452d..caf4bda9a 100644 --- a/pilot/model/cluster/embedding/loader.py +++ b/pilot/model/cluster/embedding/loader.py @@ -3,6 +3,8 @@ from __future__ import annotations from typing import TYPE_CHECKING from pilot.model.parameter import BaseEmbeddingModelParameters +from pilot.utils.parameter_utils import _get_dict_from_obj +from pilot.utils.tracer import root_tracer, SpanType, SpanTypeRunName if TYPE_CHECKING: from langchain.embeddings.base import Embeddings @@ -15,13 +17,21 @@ class EmbeddingLoader: def load( self, model_name: str, param: BaseEmbeddingModelParameters ) -> "Embeddings": - # add more models - if model_name in ["proxy_openai", "proxy_azure"]: - from langchain.embeddings import OpenAIEmbeddings + metadata = { + "model_name": model_name, + "run_service": SpanTypeRunName.EMBEDDING_MODEL.value, + "params": _get_dict_from_obj(param), + } + with root_tracer.start_span( + "EmbeddingLoader.load", span_type=SpanType.RUN, metadata=metadata + ): + # add more models + if model_name in ["proxy_openai", "proxy_azure"]: + from langchain.embeddings import OpenAIEmbeddings - return OpenAIEmbeddings(**param.build_kwargs()) - else: - from langchain.embeddings import HuggingFaceEmbeddings + return OpenAIEmbeddings(**param.build_kwargs()) + else: + from langchain.embeddings import HuggingFaceEmbeddings - kwargs = param.build_kwargs(model_name=param.model_path) - return HuggingFaceEmbeddings(**kwargs) + kwargs = param.build_kwargs(model_name=param.model_path) + return HuggingFaceEmbeddings(**kwargs) diff --git a/pilot/model/cluster/worker/default_worker.py b/pilot/model/cluster/worker/default_worker.py index 9ccf18b52..378fee2ea 100644 --- a/pilot/model/cluster/worker/default_worker.py +++ b/pilot/model/cluster/worker/default_worker.py @@ -9,7 +9,8 @@ from pilot.model.loader import ModelLoader, _get_model_real_path from pilot.model.parameter import ModelParameters from pilot.model.cluster.worker_base import ModelWorker from pilot.utils.model_utils import _clear_model_cache -from pilot.utils.parameter_utils import EnvArgumentParser +from pilot.utils.parameter_utils import EnvArgumentParser, _get_dict_from_obj +from pilot.utils.tracer import root_tracer, SpanType, SpanTypeRunName logger = logging.getLogger(__name__) @@ -94,9 +95,20 @@ class DefaultModelWorker(ModelWorker): model_params = self.parse_parameters(command_args) self._model_params = model_params logger.info(f"Begin load model, model params: {model_params}") - self.model, self.tokenizer = self.ml.loader_with_params( - model_params, self.llm_adapter - ) + metadata = { + "model_name": self.model_name, + "model_path": self.model_path, + "model_type": self.llm_adapter.model_type(), + "llm_adapter": str(self.llm_adapter), + "run_service": SpanTypeRunName.MODEL_WORKER, + "params": _get_dict_from_obj(model_params), + } + with root_tracer.start_span( + "DefaultModelWorker.start", span_type=SpanType.RUN, metadata=metadata + ): + self.model, self.tokenizer = self.ml.loader_with_params( + model_params, self.llm_adapter + ) def stop(self) -> None: if not self.model: @@ -109,9 +121,18 @@ class DefaultModelWorker(ModelWorker): _clear_model_cache(self._model_params.device) def generate_stream(self, params: Dict) -> Iterator[ModelOutput]: + span = root_tracer.start_span( + "DefaultModelWorker.generate_stream", params.get("span_id") + ) try: - params, model_context, generate_stream_func = self._prepare_generate_stream( - params + ( + params, + model_context, + generate_stream_func, + model_span, + ) = self._prepare_generate_stream( + params, + span_operation_name="DefaultModelWorker_call.generate_stream_func", ) previous_response = "" @@ -127,8 +148,12 @@ class DefaultModelWorker(ModelWorker): print( f"\n\nfull stream output:\n{previous_response}\n\nmodel generate_stream params:\n{params}" ) + model_span.end(metadata={"output": previous_response}) + span.end() except Exception as e: - yield self._handle_exception(e) + output = self._handle_exception(e) + yield output + span.end(metadata={"error": output.to_dict()}) def generate(self, params: Dict) -> ModelOutput: """Generate non stream result""" @@ -141,9 +166,18 @@ class DefaultModelWorker(ModelWorker): raise NotImplementedError async def async_generate_stream(self, params: Dict) -> Iterator[ModelOutput]: + span = root_tracer.start_span( + "DefaultModelWorker.async_generate_stream", params.get("span_id") + ) try: - params, model_context, generate_stream_func = self._prepare_generate_stream( - params + ( + params, + model_context, + generate_stream_func, + model_span, + ) = self._prepare_generate_stream( + params, + span_operation_name="DefaultModelWorker_call.generate_stream_func", ) previous_response = "" @@ -159,8 +193,12 @@ class DefaultModelWorker(ModelWorker): print( f"\n\nfull stream output:\n{previous_response}\n\nmodel generate_stream params:\n{params}" ) + model_span.end(metadata={"output": previous_response}) + span.end() except Exception as e: - yield self._handle_exception(e) + output = self._handle_exception(e) + yield output + span.end(metadata={"error": output.to_dict()}) async def async_generate(self, params: Dict) -> ModelOutput: output = None @@ -168,7 +206,7 @@ class DefaultModelWorker(ModelWorker): output = out return output - def _prepare_generate_stream(self, params: Dict): + def _prepare_generate_stream(self, params: Dict, span_operation_name: str): params, model_context = self.llm_adapter.model_adaptation( params, self.model_name, @@ -190,7 +228,30 @@ class DefaultModelWorker(ModelWorker): ) str_prompt = params.get("prompt") print(f"model prompt: \n\n{str_prompt}\n\n{stream_type}stream output:\n") - return params, model_context, generate_stream_func + + generate_stream_func_str_name = "{}.{}".format( + generate_stream_func.__module__, generate_stream_func.__name__ + ) + + span_params = {k: v for k, v in params.items()} + if "messages" in span_params: + span_params["messages"] = list( + map(lambda m: m.dict(), span_params["messages"]) + ) + + model_span = root_tracer.start_span( + span_operation_name, + metadata={ + "prompt": str_prompt, + "params": span_params, + "is_async_func": self.support_async(), + "llm_adapter": str(self.llm_adapter), + "generate_stream_func": generate_stream_func_str_name, + "model_context": model_context, + }, + ) + + return params, model_context, generate_stream_func, model_span def _handle_output(self, output, previous_response, model_context): if isinstance(output, dict): diff --git a/pilot/model/cluster/worker/manager.py b/pilot/model/cluster/worker/manager.py index b7b9515c5..5648c8e01 100644 --- a/pilot/model/cluster/worker/manager.py +++ b/pilot/model/cluster/worker/manager.py @@ -8,12 +8,13 @@ import sys import time from concurrent.futures import ThreadPoolExecutor from dataclasses import asdict -from typing import Awaitable, Callable, Dict, Iterator, List, Optional +from typing import Awaitable, Callable, Dict, Iterator, List from fastapi import APIRouter, FastAPI from fastapi.responses import StreamingResponse from pilot.component import SystemApp +from pilot.configs.model_config import LOGDIR from pilot.model.base import ( ModelInstance, ModelOutput, @@ -35,8 +36,10 @@ from pilot.utils.parameter_utils import ( EnvArgumentParser, ParameterDescription, _dict_to_command_args, + _get_dict_from_obj, ) from pilot.utils.utils import setup_logging +from pilot.utils.tracer import initialize_tracer, root_tracer, SpanType, SpanTypeRunName logger = logging.getLogger(__name__) @@ -293,60 +296,72 @@ class LocalWorkerManager(WorkerManager): self, params: Dict, async_wrapper=None, **kwargs ) -> Iterator[ModelOutput]: """Generate stream result, chat scene""" - try: - worker_run_data = await self._get_model(params) - except Exception as e: - yield ModelOutput( - text=f"**LLMServer Generate Error, Please CheckErrorInfo.**: {e}", - error_code=0, - ) - return - async with worker_run_data.semaphore: - if worker_run_data.worker.support_async(): - async for outout in worker_run_data.worker.async_generate_stream( - params - ): - yield outout - else: - if not async_wrapper: - from starlette.concurrency import iterate_in_threadpool + with root_tracer.start_span( + "WorkerManager.generate_stream", params.get("span_id") + ) as span: + params["span_id"] = span.span_id + try: + worker_run_data = await self._get_model(params) + except Exception as e: + yield ModelOutput( + text=f"**LLMServer Generate Error, Please CheckErrorInfo.**: {e}", + error_code=0, + ) + return + async with worker_run_data.semaphore: + if worker_run_data.worker.support_async(): + async for outout in worker_run_data.worker.async_generate_stream( + params + ): + yield outout + else: + if not async_wrapper: + from starlette.concurrency import iterate_in_threadpool - async_wrapper = iterate_in_threadpool - async for output in async_wrapper( - worker_run_data.worker.generate_stream(params) - ): - yield output + async_wrapper = iterate_in_threadpool + async for output in async_wrapper( + worker_run_data.worker.generate_stream(params) + ): + yield output async def generate(self, params: Dict) -> ModelOutput: """Generate non stream result""" - try: - worker_run_data = await self._get_model(params) - except Exception as e: - return ModelOutput( - text=f"**LLMServer Generate Error, Please CheckErrorInfo.**: {e}", - error_code=0, - ) - async with worker_run_data.semaphore: - if worker_run_data.worker.support_async(): - return await worker_run_data.worker.async_generate(params) - else: - return await self.run_blocking_func( - worker_run_data.worker.generate, params + with root_tracer.start_span( + "WorkerManager.generate", params.get("span_id") + ) as span: + params["span_id"] = span.span_id + try: + worker_run_data = await self._get_model(params) + except Exception as e: + return ModelOutput( + text=f"**LLMServer Generate Error, Please CheckErrorInfo.**: {e}", + error_code=0, ) + async with worker_run_data.semaphore: + if worker_run_data.worker.support_async(): + return await worker_run_data.worker.async_generate(params) + else: + return await self.run_blocking_func( + worker_run_data.worker.generate, params + ) async def embeddings(self, params: Dict) -> List[List[float]]: """Embed input""" - try: - worker_run_data = await self._get_model(params, worker_type="text2vec") - except Exception as e: - raise e - async with worker_run_data.semaphore: - if worker_run_data.worker.support_async(): - return await worker_run_data.worker.async_embeddings(params) - else: - return await self.run_blocking_func( - worker_run_data.worker.embeddings, params - ) + with root_tracer.start_span( + "WorkerManager.embeddings", params.get("span_id") + ) as span: + params["span_id"] = span.span_id + try: + worker_run_data = await self._get_model(params, worker_type="text2vec") + except Exception as e: + raise e + async with worker_run_data.semaphore: + if worker_run_data.worker.support_async(): + return await worker_run_data.worker.async_embeddings(params) + else: + return await self.run_blocking_func( + worker_run_data.worker.embeddings, params + ) def sync_embeddings(self, params: Dict) -> List[List[float]]: worker_run_data = self._sync_get_model(params, worker_type="text2vec") @@ -608,6 +623,9 @@ async def generate_json_stream(params): @router.post("/worker/generate_stream") async def api_generate_stream(request: PromptRequest): params = request.dict(exclude_none=True) + span_id = root_tracer.get_current_span_id() + if "span_id" not in params and span_id: + params["span_id"] = span_id generator = generate_json_stream(params) return StreamingResponse(generator) @@ -615,12 +633,18 @@ async def api_generate_stream(request: PromptRequest): @router.post("/worker/generate") async def api_generate(request: PromptRequest): params = request.dict(exclude_none=True) + span_id = root_tracer.get_current_span_id() + if "span_id" not in params and span_id: + params["span_id"] = span_id return await worker_manager.generate(params) @router.post("/worker/embeddings") async def api_embeddings(request: EmbeddingsRequest): params = request.dict(exclude_none=True) + span_id = root_tracer.get_current_span_id() + if "span_id" not in params and span_id: + params["span_id"] = span_id return await worker_manager.embeddings(params) @@ -705,8 +729,15 @@ def _parse_worker_params( model_name: str = None, model_path: str = None, **kwargs ) -> ModelWorkerParameters: worker_args = EnvArgumentParser() + env_prefix = None + if model_name: + env_prefix = EnvArgumentParser.get_env_prefix(model_name) worker_params: ModelWorkerParameters = worker_args.parse_args_into_dataclass( - ModelWorkerParameters, model_name=model_name, model_path=model_path, **kwargs + ModelWorkerParameters, + env_prefix=env_prefix, + model_name=model_name, + model_path=model_path, + **kwargs, ) env_prefix = EnvArgumentParser.get_env_prefix(worker_params.model_name) # Read parameters agein with prefix of model name. @@ -801,10 +832,18 @@ def _build_worker(worker_params: ModelWorkerParameters): def _start_local_worker( worker_manager: WorkerManagerAdapter, worker_params: ModelWorkerParameters ): - worker = _build_worker(worker_params) - if not worker_manager.worker_manager: - worker_manager.worker_manager = _create_local_model_manager(worker_params) - worker_manager.worker_manager.add_worker(worker, worker_params) + with root_tracer.start_span( + "WorkerManager._start_local_worker", + span_type=SpanType.RUN, + metadata={ + "run_service": SpanTypeRunName.WORKER_MANAGER, + "params": _get_dict_from_obj(worker_params), + }, + ): + worker = _build_worker(worker_params) + if not worker_manager.worker_manager: + worker_manager.worker_manager = _create_local_model_manager(worker_params) + worker_manager.worker_manager.add_worker(worker, worker_params) def _start_local_embedding_worker( @@ -928,17 +967,17 @@ def run_worker_manager( # Run worker manager independently embedded_mod = False app = _setup_fastapi(worker_params) - _start_local_worker(worker_manager, worker_params) - _start_local_embedding_worker( - worker_manager, embedding_model_name, embedding_model_path - ) - else: - _start_local_worker(worker_manager, worker_params) - _start_local_embedding_worker( - worker_manager, embedding_model_name, embedding_model_path - ) - loop = asyncio.get_event_loop() - loop.run_until_complete(worker_manager.start()) + + system_app = SystemApp(app) + initialize_tracer( + system_app, + os.path.join(LOGDIR, "dbgpt_model_worker_manager_tracer.jsonl"), + root_operation_name="DB-GPT-WorkerManager-Entry", + ) + _start_local_worker(worker_manager, worker_params) + _start_local_embedding_worker( + worker_manager, embedding_model_name, embedding_model_path + ) if include_router: app.include_router(router, prefix="/api") @@ -946,6 +985,8 @@ def run_worker_manager( if not embedded_mod: import uvicorn + loop = asyncio.get_event_loop() + loop.run_until_complete(worker_manager.start()) uvicorn.run( app, host=worker_params.host, port=worker_params.port, log_level="info" ) diff --git a/pilot/model/model_adapter.py b/pilot/model/model_adapter.py index 6b354ddfb..b8f4f0982 100644 --- a/pilot/model/model_adapter.py +++ b/pilot/model/model_adapter.py @@ -186,6 +186,13 @@ class OldLLMModelAdaperWrapper(LLMModelAdaper): def get_generate_stream_function(self, model, model_path: str): return self._chat_adapter.get_generate_stream_func(model_path) + def __str__(self) -> str: + return "{}({}.{})".format( + self.__class__.__name__, + self._adapter.__class__.__module__, + self._adapter.__class__.__name__, + ) + class FastChatLLMModelAdaperWrapper(LLMModelAdaper): """Wrapping fastchat adapter""" @@ -206,6 +213,13 @@ class FastChatLLMModelAdaperWrapper(LLMModelAdaper): ) -> "Conversation": return self._adapter.get_default_conv_template(model_path) + def __str__(self) -> str: + return "{}({}.{})".format( + self.__class__.__name__, + self._adapter.__class__.__module__, + self._adapter.__class__.__name__, + ) + def get_conv_template(name: str) -> "Conversation": """Get a conversation template.""" @@ -412,6 +426,9 @@ class VLLMModelAdaperWrapper(LLMModelAdaper): ) -> "Conversation": return _auto_get_conv_template(model_name, model_path) + def __str__(self) -> str: + return "{}.{}".format(self.__class__.__module__, self.__class__.__name__) + # Covering the configuration of fastcaht, we will regularly feedback the code here to fastchat. # We also recommend that you modify it directly in the fastchat repository. diff --git a/pilot/openapi/api_v1/api_v1.py b/pilot/openapi/api_v1/api_v1.py index ec71f50ac..aba2b627f 100644 --- a/pilot/openapi/api_v1/api_v1.py +++ b/pilot/openapi/api_v1/api_v1.py @@ -46,6 +46,7 @@ from pilot.summary.db_summary_client import DBSummaryClient from pilot.model.cluster import BaseModelController, WorkerManager, WorkerManagerFactory from pilot.model.base import FlatSupportedModel +from pilot.utils.tracer import root_tracer, SpanType router = APIRouter() CFG = Config() @@ -366,7 +367,10 @@ async def chat_completions(dialogue: ConversationVo = Body()): print( f"chat_completions:{dialogue.chat_mode},{dialogue.select_param},{dialogue.model_name}" ) - chat: BaseChat = get_chat_instance(dialogue) + with root_tracer.start_span( + "get_chat_instance", span_type=SpanType.CHAT, metadata=dialogue.dict() + ): + chat: BaseChat = get_chat_instance(dialogue) # background_tasks = BackgroundTasks() # background_tasks.add_task(release_model_semaphore) headers = { @@ -417,9 +421,10 @@ async def model_supports(worker_manager: WorkerManager = Depends(get_worker_mana async def no_stream_generator(chat): - msg = await chat.nostream_call() - msg = msg.replace("\n", "\\n") - yield f"data: {msg}\n\n" + with root_tracer.start_span("no_stream_generator"): + msg = await chat.nostream_call() + msg = msg.replace("\n", "\\n") + yield f"data: {msg}\n\n" async def stream_generator(chat, incremental: bool, model_name: str): @@ -436,6 +441,7 @@ async def stream_generator(chat, incremental: bool, model_name: str): Yields: _type_: streaming responses """ + span = root_tracer.start_span("stream_generator") msg = "[LLM_ERROR]: llm server has no output, maybe your prompt template is wrong." stream_id = f"chatcmpl-{str(uuid.uuid1())}" @@ -464,6 +470,7 @@ async def stream_generator(chat, incremental: bool, model_name: str): await asyncio.sleep(0.02) if incremental: yield "data: [DONE]\n\n" + span.end() chat.current_message.add_ai_message(msg) chat.current_message.add_view_message(msg) chat.memory.append(chat.current_message) diff --git a/pilot/scene/base_chat.py b/pilot/scene/base_chat.py index 3e5b51275..30ec078fc 100644 --- a/pilot/scene/base_chat.py +++ b/pilot/scene/base_chat.py @@ -15,6 +15,7 @@ from pilot.prompts.prompt_new import PromptTemplate from pilot.scene.base_message import ModelMessage, ModelMessageRoleType from pilot.scene.message import OnceConversation from pilot.utils import get_or_create_event_loop +from pilot.utils.tracer import root_tracer from pydantic import Extra logger = logging.getLogger(__name__) @@ -135,6 +136,14 @@ class BaseChat(ABC): } return payload + def _get_span_metadata(self, payload: Dict) -> Dict: + metadata = {k: v for k, v in payload.items()} + del metadata["prompt"] + metadata["messages"] = list( + map(lambda m: m if isinstance(m, dict) else m.dict(), metadata["messages"]) + ) + return metadata + async def stream_call(self): # TODO Retry when server connection error payload = self.__call_base() @@ -142,6 +151,10 @@ class BaseChat(ABC): self.skip_echo_len = len(payload.get("prompt").replace("", " ")) + 11 logger.info(f"Request: \n{payload}") ai_response_text = "" + span = root_tracer.start_span( + "BaseChat.stream_call", metadata=self._get_span_metadata(payload) + ) + payload["span_id"] = span.span_id try: from pilot.model.cluster import WorkerManagerFactory @@ -150,6 +163,7 @@ class BaseChat(ABC): ).create() async for output in worker_manager.generate_stream(payload): yield output + span.end() except Exception as e: print(traceback.format_exc()) logger.error("model response parase faild!" + str(e)) @@ -158,11 +172,16 @@ class BaseChat(ABC): ) ### store current conversation self.memory.append(self.current_message) + span.end(metadata={"error": str(e)}) async def nostream_call(self): payload = self.__call_base() logger.info(f"Request: \n{payload}") ai_response_text = "" + span = root_tracer.start_span( + "BaseChat.nostream_call", metadata=self._get_span_metadata(payload) + ) + payload["span_id"] = span.span_id try: from pilot.model.cluster import WorkerManagerFactory @@ -170,7 +189,8 @@ class BaseChat(ABC): ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory ).create() - model_output = await worker_manager.generate(payload) + with root_tracer.start_span("BaseChat.invoke_worker_manager.generate"): + model_output = await worker_manager.generate(payload) ### output parse ai_response_text = ( @@ -185,8 +205,14 @@ class BaseChat(ABC): ai_response_text ) ) - ### run - result = self.do_action(prompt_define_response) + metadata = { + "model_output": model_output.to_dict(), + "ai_response_text": ai_response_text, + "prompt_define_response": prompt_define_response, + } + with root_tracer.start_span("BaseChat.do_action", metadata=metadata): + ### run + result = self.do_action(prompt_define_response) ### llm speaker speak_to_user = self.get_llm_speak(prompt_define_response) @@ -195,12 +221,14 @@ class BaseChat(ABC): speak_to_user, result ) self.current_message.add_view_message(view_message) + span.end() except Exception as e: print(traceback.format_exc()) logger.error("model response parase faild!" + str(e)) self.current_message.add_view_message( f"""ERROR!{str(e)}\n {ai_response_text} """ ) + span.end(metadata={"error": str(e)}) ### store dialogue self.memory.append(self.current_message) return self.current_ai_response() diff --git a/pilot/scripts/cli_scripts.py b/pilot/scripts/cli_scripts.py index a0a7f029e..a51c2b343 100644 --- a/pilot/scripts/cli_scripts.py +++ b/pilot/scripts/cli_scripts.py @@ -119,6 +119,14 @@ except ImportError as e: logging.warning(f"Integrating dbgpt knowledge command line tool failed: {e}") +try: + from pilot.utils.tracer.tracer_cli import trace_cli_group + + add_command_alias(trace_cli_group, name="trace", parent_group=cli) +except ImportError as e: + logging.warning(f"Integrating dbgpt trace command line tool failed: {e}") + + def main(): return cli() diff --git a/pilot/server/component_configs.py b/pilot/server/component_configs.py index 71ef797d9..91fb9de6e 100644 --- a/pilot/server/component_configs.py +++ b/pilot/server/component_configs.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging from typing import TYPE_CHECKING, Any, Type +import os from pilot.component import ComponentType, SystemApp from pilot.utils.executor_utils import DefaultExecutorFactory diff --git a/pilot/server/dbgpt_server.py b/pilot/server/dbgpt_server.py index 0a5b19933..e58c61756 100644 --- a/pilot/server/dbgpt_server.py +++ b/pilot/server/dbgpt_server.py @@ -7,7 +7,7 @@ ROOT_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__fi sys.path.append(ROOT_PATH) import signal from pilot.configs.config import Config -from pilot.configs.model_config import LLM_MODEL_CONFIG, EMBEDDING_MODEL_CONFIG +from pilot.configs.model_config import LLM_MODEL_CONFIG, EMBEDDING_MODEL_CONFIG, LOGDIR from pilot.component import SystemApp from pilot.server.base import ( @@ -38,6 +38,8 @@ from pilot.utils.utils import ( _get_logging_level, logging_str_to_uvicorn_level, ) +from pilot.utils.tracer import root_tracer, initialize_tracer, SpanType, SpanTypeRunName +from pilot.utils.parameter_utils import _get_dict_from_obj static_file_path = os.path.join(os.getcwd(), "server/static") @@ -98,17 +100,21 @@ def mount_static_files(app): app.add_exception_handler(RequestValidationError, validation_exception_handler) +def _get_webserver_params(args: List[str] = None): + from pilot.utils.parameter_utils import EnvArgumentParser + + parser: argparse.ArgumentParser = EnvArgumentParser.create_argparse_option( + WebWerverParameters + ) + return WebWerverParameters(**vars(parser.parse_args(args=args))) + + def initialize_app(param: WebWerverParameters = None, args: List[str] = None): """Initialize app If you use gunicorn as a process manager, initialize_app can be invoke in `on_starting` hook. """ if not param: - from pilot.utils.parameter_utils import EnvArgumentParser - - parser: argparse.ArgumentParser = EnvArgumentParser.create_argparse_option( - WebWerverParameters - ) - param = WebWerverParameters(**vars(parser.parse_args(args=args))) + param = _get_webserver_params(args) if not param.log_level: param.log_level = _get_logging_level() @@ -127,7 +133,7 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None): model_start_listener = _create_model_start_listener(system_app) initialize_components(param, system_app, embedding_model_name, embedding_model_path) - model_path = LLM_MODEL_CONFIG[CFG.LLM_MODEL] + model_path = LLM_MODEL_CONFIG.get(CFG.LLM_MODEL) if not param.light: print("Model Unified Deployment Mode!") if not param.remote_embedding: @@ -174,8 +180,20 @@ def run_uvicorn(param: WebWerverParameters): def run_webserver(param: WebWerverParameters = None): - param = initialize_app(param) - run_uvicorn(param) + if not param: + param = _get_webserver_params() + initialize_tracer(system_app, os.path.join(LOGDIR, "dbgpt_webserver_tracer.jsonl")) + + with root_tracer.start_span( + "run_webserver", + span_type=SpanType.RUN, + metadata={ + "run_service": SpanTypeRunName.WEBSERVER, + "params": _get_dict_from_obj(param), + }, + ): + param = initialize_app(param) + run_uvicorn(param) if __name__ == "__main__": diff --git a/pilot/server/llmserver.py b/pilot/server/llmserver.py index d521a062d..1a2dd49ce 100644 --- a/pilot/server/llmserver.py +++ b/pilot/server/llmserver.py @@ -13,7 +13,7 @@ from pilot.model.cluster import run_worker_manager CFG = Config() -model_path = LLM_MODEL_CONFIG[CFG.LLM_MODEL] +model_path = LLM_MODEL_CONFIG.get(CFG.LLM_MODEL) if __name__ == "__main__": run_worker_manager( diff --git a/pilot/utils/parameter_utils.py b/pilot/utils/parameter_utils.py index 8acba8881..fbf9c5fb5 100644 --- a/pilot/utils/parameter_utils.py +++ b/pilot/utils/parameter_utils.py @@ -1,6 +1,6 @@ import argparse import os -from dataclasses import dataclass, fields, MISSING, asdict, field +from dataclasses import dataclass, fields, MISSING, asdict, field, is_dataclass from typing import Any, List, Optional, Type, Union, Callable, Dict from collections import OrderedDict @@ -590,6 +590,20 @@ def _extract_parameter_details( return descriptions +def _get_dict_from_obj(obj, default_value=None) -> Optional[Dict]: + if not obj: + return None + if is_dataclass(type(obj)): + params = {} + for field_info in fields(obj): + value = _get_simple_privacy_field_value(obj, field_info) + params[field_info.name] = value + return params + if isinstance(obj, dict): + return obj + return default_value + + class _SimpleArgParser: def __init__(self, *args): self.params = {arg.replace("_", "-"): None for arg in args} diff --git a/pilot/utils/tracer/__init__.py b/pilot/utils/tracer/__init__.py new file mode 100644 index 000000000..16509ff43 --- /dev/null +++ b/pilot/utils/tracer/__init__.py @@ -0,0 +1,32 @@ +from pilot.utils.tracer.base import ( + SpanType, + Span, + SpanTypeRunName, + Tracer, + SpanStorage, + SpanStorageType, + TracerContext, +) +from pilot.utils.tracer.span_storage import MemorySpanStorage, FileSpanStorage +from pilot.utils.tracer.tracer_impl import ( + root_tracer, + initialize_tracer, + DefaultTracer, + TracerManager, +) + +__all__ = [ + "SpanType", + "Span", + "SpanTypeRunName", + "Tracer", + "SpanStorage", + "SpanStorageType", + "TracerContext", + "MemorySpanStorage", + "FileSpanStorage", + "root_tracer", + "initialize_tracer", + "DefaultTracer", + "TracerManager", +] diff --git a/pilot/utils/tracer/base.py b/pilot/utils/tracer/base.py new file mode 100644 index 000000000..e227d6314 --- /dev/null +++ b/pilot/utils/tracer/base.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +from typing import Dict, Callable, Optional +from dataclasses import dataclass +from abc import ABC, abstractmethod +from enum import Enum +import uuid +from datetime import datetime + +from pilot.component import BaseComponent, SystemApp, ComponentType + + +class SpanType(str, Enum): + BASE = "base" + RUN = "run" + CHAT = "chat" + + +class SpanTypeRunName(str, Enum): + WEBSERVER = "Webserver" + WORKER_MANAGER = "WorkerManager" + MODEL_WORKER = "ModelWorker" + EMBEDDING_MODEL = "EmbeddingModel" + + @staticmethod + def values(): + return [item.value for item in SpanTypeRunName] + + +class Span: + """Represents a unit of work that is being traced. + This can be any operation like a function call or a database query. + """ + + def __init__( + self, + trace_id: str, + span_id: str, + span_type: SpanType = None, + parent_span_id: str = None, + operation_name: str = None, + metadata: Dict = None, + end_caller: Callable[[Span], None] = None, + ): + if not span_type: + span_type = SpanType.BASE + self.span_type = span_type + # The unique identifier for the entire trace + self.trace_id = trace_id + # Unique identifier for this span within the trace + self.span_id = span_id + # Identifier of the parent span, if this is a child span + self.parent_span_id = parent_span_id + # Descriptive name for the operation being traced + self.operation_name = operation_name + # Timestamp when this span started + self.start_time = datetime.now() + # Timestamp when this span ended, initially None + self.end_time = None + # Additional metadata associated with the span + self.metadata = metadata + self._end_callers = [] + if end_caller: + self._end_callers.append(end_caller) + + def end(self, **kwargs): + """Mark the end of this span by recording the current time.""" + self.end_time = datetime.now() + if "metadata" in kwargs: + self.metadata = kwargs.get("metadata") + for caller in self._end_callers: + caller(self) + + def add_end_caller(self, end_caller: Callable[[Span], None]): + if end_caller: + self._end_callers.append(end_caller) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.end() + return False + + def to_dict(self) -> Dict: + return { + "span_type": self.span_type.value, + "trace_id": self.trace_id, + "span_id": self.span_id, + "parent_span_id": self.parent_span_id, + "operation_name": self.operation_name, + "start_time": None + if not self.start_time + else self.start_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], + "end_time": None + if not self.end_time + else self.end_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], + "metadata": self.metadata, + } + + +class SpanStorageType(str, Enum): + ON_CREATE = "on_create" + ON_END = "on_end" + ON_CREATE_END = "on_create_end" + + +class SpanStorage(BaseComponent, ABC): + """Abstract base class for storing spans. + + This allows different storage mechanisms (e.g., in-memory, database) to be implemented. + """ + + name = ComponentType.TRACER_SPAN_STORAGE.value + + def init_app(self, system_app: SystemApp): + """Initialize the storage with the given application context.""" + pass + + @abstractmethod + def append_span(self, span: Span): + """Store the given span. This needs to be implemented by subclasses.""" + + +class Tracer(BaseComponent, ABC): + """Abstract base class for tracing operations. + Provides the core logic for starting, ending, and retrieving spans. + """ + + name = ComponentType.TRACER.value + + def __init__(self, system_app: SystemApp | None = None): + super().__init__(system_app) + self.system_app = system_app # Application context + + def init_app(self, system_app: SystemApp): + """Initialize the tracer with the given application context.""" + self.system_app = system_app + + @abstractmethod + def append_span(self, span: Span): + """Append the given span to storage. This needs to be implemented by subclasses.""" + + @abstractmethod + def start_span( + self, + operation_name: str, + parent_span_id: str = None, + span_type: SpanType = None, + metadata: Dict = None, + ) -> Span: + """Begin a new span for the given operation. If provided, the span will be + a child of the span with the given parent_span_id. + """ + + @abstractmethod + def end_span(self, span: Span, **kwargs): + """ + End the given span. + """ + + @abstractmethod + def get_current_span(self) -> Optional[Span]: + """ + Retrieve the span that is currently being traced. + """ + + @abstractmethod + def _get_current_storage(self) -> SpanStorage: + """ + Get the storage mechanism currently in use for storing spans. + This needs to be implemented by subclasses. + """ + + def _new_uuid(self) -> str: + """ + Generate a new unique identifier. + """ + return str(uuid.uuid4()) + + +@dataclass +class TracerContext: + span_id: Optional[str] = None diff --git a/pilot/utils/tracer/span_storage.py b/pilot/utils/tracer/span_storage.py new file mode 100644 index 000000000..8967f9ee5 --- /dev/null +++ b/pilot/utils/tracer/span_storage.py @@ -0,0 +1,79 @@ +import os +import json +import time +import threading +import queue +import logging + +from pilot.component import SystemApp +from pilot.utils.tracer.base import Span, SpanStorage + + +logger = logging.getLogger(__name__) + + +class MemorySpanStorage(SpanStorage): + def __init__(self, system_app: SystemApp | None = None): + super().__init__(system_app) + self.spans = [] + self._lock = threading.Lock() + + def append_span(self, span: Span): + with self._lock: + self.spans.append(span) + + +class FileSpanStorage(SpanStorage): + def __init__(self, filename: str, batch_size=10, flush_interval=10): + super().__init__() + self.filename = filename + self.queue = queue.Queue() + self.batch_size = batch_size + self.flush_interval = flush_interval + self.last_flush_time = time.time() + self.flush_signal_queue = queue.Queue() + + if not os.path.exists(filename): + with open(filename, "w") as _: + pass + self.flush_thread = threading.Thread(target=self._flush_to_file, daemon=True) + self.flush_thread.start() + + def append_span(self, span: Span): + span_data = span.to_dict() + logger.debug(f"append span: {span_data}") + self.queue.put(span_data) + + if self.queue.qsize() >= self.batch_size: + try: + self.flush_signal_queue.put_nowait(True) + except queue.Full: + pass # If the signal queue is full, it's okay. The flush thread will handle it. + + def _write_to_file(self): + spans_to_write = [] + while not self.queue.empty(): + spans_to_write.append(self.queue.get()) + + with open(self.filename, "a") as file: + for span_data in spans_to_write: + try: + file.write(json.dumps(span_data, ensure_ascii=False) + "\n") + except Exception as e: + logger.warning( + f"Write span to file failed: {str(e)}, span_data: {span_data}" + ) + + def _flush_to_file(self): + while True: + interval = time.time() - self.last_flush_time + if interval < self.flush_interval: + try: + self.flush_signal_queue.get( + block=True, timeout=self.flush_interval - interval + ) + except Exception: + # Timeout + pass + self._write_to_file() + self.last_flush_time = time.time() diff --git a/pilot/utils/tracer/tests/__init__.py b/pilot/utils/tracer/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pilot/utils/tracer/tests/test_base.py b/pilot/utils/tracer/tests/test_base.py new file mode 100644 index 000000000..4a8061cb1 --- /dev/null +++ b/pilot/utils/tracer/tests/test_base.py @@ -0,0 +1,122 @@ +from typing import Dict +from pilot.component import SystemApp + +from pilot.utils.tracer import Span, SpanStorage, Tracer + + +# Mock implementations + + +class MockSpanStorage(SpanStorage): + def __init__(self): + self.spans = [] + + def append_span(self, span: Span): + self.spans.append(span) + + +class MockTracer(Tracer): + def __init__(self, system_app: SystemApp | None = None): + super().__init__(system_app) + self.current_span = None + self.storage = MockSpanStorage() + + def append_span(self, span: Span): + self.storage.append_span(span) + + def start_span( + self, operation_name: str, parent_span_id: str = None, metadata: Dict = None + ) -> Span: + trace_id = ( + self._new_uuid() if parent_span_id is None else parent_span_id.split(":")[0] + ) + span_id = f"{trace_id}:{self._new_uuid()}" + span = Span(trace_id, span_id, parent_span_id, operation_name, metadata) + self.current_span = span + return span + + def end_span(self, span: Span): + span.end() + self.append_span(span) + + def get_current_span(self) -> Span: + return self.current_span + + def _get_current_storage(self) -> SpanStorage: + return self.storage + + +# Tests + + +def test_span_creation(): + span = Span("trace_id", "span_id", "parent_span_id", "operation", {"key": "value"}) + assert span.trace_id == "trace_id" + assert span.span_id == "span_id" + assert span.parent_span_id == "parent_span_id" + assert span.operation_name == "operation" + assert span.metadata == {"key": "value"} + + +def test_span_end(): + span = Span("trace_id", "span_id") + assert span.end_time is None + span.end() + assert span.end_time is not None + + +def test_mock_tracer_start_span(): + tracer = MockTracer() + span = tracer.start_span("operation") + assert span.operation_name == "operation" + assert tracer.get_current_span() == span + + +def test_mock_tracer_end_span(): + tracer = MockTracer() + span = tracer.start_span("operation") + tracer.end_span(span) + assert span in tracer._get_current_storage().spans + + +def test_mock_tracer_append_span(): + tracer = MockTracer() + span = Span("trace_id", "span_id") + tracer.append_span(span) + assert span in tracer._get_current_storage().spans + + +def test_parent_child_span_relation(): + tracer = MockTracer() + + # Start a parent span + parent_span = tracer.start_span("parent_operation") + + # Start a child span with parent span's ID + child_span = tracer.start_span( + "child_operation", parent_span_id=parent_span.span_id + ) + + # Assert the relationships + assert child_span.parent_span_id == parent_span.span_id + assert ( + child_span.trace_id == parent_span.trace_id + ) # Assuming children share the same trace ID + + # End spans + tracer.end_span(child_span) + tracer.end_span(parent_span) + + # Assert they are in the storage + assert child_span in tracer._get_current_storage().spans + assert parent_span in tracer._get_current_storage().spans + + +# This test checks if unique UUIDs are being generated. +# Note: This is a simple test and doesn't guarantee uniqueness for large numbers of UUIDs. + + +def test_new_uuid_unique(): + tracer = MockTracer() + uuid_set = {tracer._new_uuid() for _ in range(1000)} + assert len(uuid_set) == 1000 diff --git a/pilot/utils/tracer/tests/test_span_storage.py b/pilot/utils/tracer/tests/test_span_storage.py new file mode 100644 index 000000000..5ad518b15 --- /dev/null +++ b/pilot/utils/tracer/tests/test_span_storage.py @@ -0,0 +1,124 @@ +import os +import pytest +import asyncio +import json +import tempfile +import time + +from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span + + +@pytest.fixture +def storage(request): + if not request or not hasattr(request, "param"): + batch_size = 10 + flush_interval = 10 + file_does_not_exist = False + else: + batch_size = request.param.get("batch_size", 10) + flush_interval = request.param.get("flush_interval", 10) + file_does_not_exist = request.param.get("file_does_not_exist", False) + + if file_does_not_exist: + with tempfile.TemporaryDirectory() as tmp_dir: + filename = os.path.join(tmp_dir, "non_existent_file.jsonl") + storage_instance = FileSpanStorage( + filename, batch_size=batch_size, flush_interval=flush_interval + ) + yield storage_instance + else: + with tempfile.NamedTemporaryFile(delete=True) as tmp_file: + filename = tmp_file.name + storage_instance = FileSpanStorage( + filename, batch_size=batch_size, flush_interval=flush_interval + ) + yield storage_instance + + +def read_spans_from_file(filename): + with open(filename, "r") as f: + return [json.loads(line) for line in f.readlines()] + + +@pytest.mark.parametrize( + "storage", [{"batch_size": 1, "flush_interval": 5}], indirect=True +) +def test_write_span(storage: SpanStorage): + span = Span("1", "a", "b", "op1") + storage.append_span(span) + time.sleep(0.1) + + spans_in_file = read_spans_from_file(storage.filename) + assert len(spans_in_file) == 1 + assert spans_in_file[0]["trace_id"] == "1" + + +@pytest.mark.parametrize( + "storage", [{"batch_size": 1, "flush_interval": 5}], indirect=True +) +def test_incremental_write(storage: SpanStorage): + span1 = Span("1", "a", "b", "op1") + span2 = Span("2", "c", "d", "op2") + + storage.append_span(span1) + storage.append_span(span2) + time.sleep(0.1) + + spans_in_file = read_spans_from_file(storage.filename) + assert len(spans_in_file) == 2 + + +@pytest.mark.parametrize( + "storage", [{"batch_size": 2, "flush_interval": 5}], indirect=True +) +def test_sync_and_async_append(storage: SpanStorage): + span = Span("1", "a", "b", "op1") + + storage.append_span(span) + + async def async_append(): + storage.append_span(span) + + asyncio.run(async_append()) + + time.sleep(0.1) + spans_in_file = read_spans_from_file(storage.filename) + assert len(spans_in_file) == 2 + + +@pytest.mark.asyncio +async def test_flush_policy(storage: SpanStorage): + span = Span("1", "a", "b", "op1") + + for _ in range(storage.batch_size - 1): + storage.append_span(span) + + spans_in_file = read_spans_from_file(storage.filename) + assert len(spans_in_file) == 0 + + # Trigger batch write + storage.append_span(span) + await asyncio.sleep(0.1) + + spans_in_file = read_spans_from_file(storage.filename) + assert len(spans_in_file) == storage.batch_size + + +@pytest.mark.parametrize( + "storage", [{"batch_size": 2, "file_does_not_exist": True}], indirect=True +) +def test_non_existent_file(storage: SpanStorage): + span = Span("1", "a", "b", "op1") + span2 = Span("2", "c", "d", "op2") + storage.append_span(span) + time.sleep(0.1) + + spans_in_file = read_spans_from_file(storage.filename) + assert len(spans_in_file) == 0 + + storage.append_span(span2) + time.sleep(0.1) + spans_in_file = read_spans_from_file(storage.filename) + assert len(spans_in_file) == 2 + assert spans_in_file[0]["trace_id"] == "1" + assert spans_in_file[1]["trace_id"] == "2" diff --git a/pilot/utils/tracer/tests/test_tracer_impl.py b/pilot/utils/tracer/tests/test_tracer_impl.py new file mode 100644 index 000000000..fea42afc6 --- /dev/null +++ b/pilot/utils/tracer/tests/test_tracer_impl.py @@ -0,0 +1,103 @@ +import pytest +from pilot.utils.tracer import ( + Span, + SpanStorageType, + SpanStorage, + DefaultTracer, + TracerManager, + Tracer, + MemorySpanStorage, +) +from pilot.component import SystemApp + + +@pytest.fixture +def system_app(): + return SystemApp() + + +@pytest.fixture +def storage(system_app: SystemApp): + ms = MemorySpanStorage(system_app) + system_app.register_instance(ms) + return ms + + +@pytest.fixture +def tracer(request, system_app: SystemApp): + if not request or not hasattr(request, "param"): + return DefaultTracer(system_app) + else: + span_storage_type = request.param.get( + "span_storage_type", SpanStorageType.ON_CREATE_END + ) + return DefaultTracer(system_app, span_storage_type=span_storage_type) + + +@pytest.fixture +def tracer_manager(system_app: SystemApp, tracer: Tracer): + system_app.register_instance(tracer) + manager = TracerManager() + manager.initialize(system_app) + return manager + + +def test_start_and_end_span(tracer: Tracer): + span = tracer.start_span("operation") + assert isinstance(span, Span) + assert span.operation_name == "operation" + + tracer.end_span(span) + assert span.end_time is not None + + stored_span = tracer._get_current_storage().spans[0] + assert stored_span == span + + +def test_start_and_end_span_with_tracer_manager(tracer_manager: TracerManager): + span = tracer_manager.start_span("operation") + assert isinstance(span, Span) + assert span.operation_name == "operation" + + tracer_manager.end_span(span) + assert span.end_time is not None + + +def test_parent_child_span_relation(tracer: Tracer): + parent_span = tracer.start_span("parent_operation") + child_span = tracer.start_span( + "child_operation", parent_span_id=parent_span.span_id + ) + + assert child_span.parent_span_id == parent_span.span_id + assert child_span.trace_id == parent_span.trace_id + + tracer.end_span(child_span) + tracer.end_span(parent_span) + + assert parent_span in tracer._get_current_storage().spans + assert child_span in tracer._get_current_storage().spans + + +@pytest.mark.parametrize( + "tracer, expected_count, after_create_inc_count", + [ + ({"span_storage_type": SpanStorageType.ON_CREATE}, 1, 1), + ({"span_storage_type": SpanStorageType.ON_END}, 1, 0), + ({"span_storage_type": SpanStorageType.ON_CREATE_END}, 2, 1), + ], + indirect=["tracer"], +) +def test_tracer_span_storage_type_and_with( + tracer: Tracer, + expected_count: int, + after_create_inc_count: int, + storage: SpanStorage, +): + span = tracer.start_span("new_span") + span.end() + assert len(storage.spans) == expected_count + + with tracer.start_span("with_span") as ws: + assert len(storage.spans) == expected_count + after_create_inc_count + assert len(storage.spans) == expected_count + expected_count diff --git a/pilot/utils/tracer/tracer_cli.py b/pilot/utils/tracer/tracer_cli.py new file mode 100644 index 000000000..822b039ee --- /dev/null +++ b/pilot/utils/tracer/tracer_cli.py @@ -0,0 +1,572 @@ +import os +import click +import logging +import glob +import json +from datetime import datetime +from typing import Iterable, Dict, Callable +from pilot.configs.model_config import LOGDIR +from pilot.utils.tracer import SpanType, SpanTypeRunName + +logger = logging.getLogger("dbgpt_cli") + + +_DEFAULT_FILE_PATTERN = os.path.join(LOGDIR, "dbgpt*.jsonl") + + +@click.group("trace") +def trace_cli_group(): + """Analyze and visualize trace spans.""" + pass + + +@trace_cli_group.command() +@click.option( + "--trace_id", + required=False, + type=str, + default=None, + show_default=True, + help="Specify the trace ID to list", +) +@click.option( + "--span_id", + required=False, + type=str, + default=None, + show_default=True, + help="Specify the Span ID to list.", +) +@click.option( + "--span_type", + required=False, + type=str, + default=None, + show_default=True, + help="Specify the Span Type to list.", +) +@click.option( + "--parent_span_id", + required=False, + type=str, + default=None, + show_default=True, + help="Specify the Parent Span ID to list.", +) +@click.option( + "--search", + required=False, + type=str, + default=None, + show_default=True, + help="Search trace_id, span_id, parent_span_id, operation_name or content in metadata.", +) +@click.option( + "-l", + "--limit", + type=int, + default=20, + help="Limit the number of recent span displayed.", +) +@click.option( + "--start_time", + type=str, + help='Filter by start time. Format: "YYYY-MM-DD HH:MM:SS.mmm"', +) +@click.option( + "--end_time", type=str, help='Filter by end time. Format: "YYYY-MM-DD HH:MM:SS.mmm"' +) +@click.option( + "--desc", + required=False, + type=bool, + default=False, + is_flag=True, + help="Whether to use reverse sorting. By default, sorting is based on start time.", +) +@click.option( + "--output", + required=False, + type=click.Choice(["text", "html", "csv", "latex", "json"]), + default="text", + help="The output format", +) +@click.argument("files", nargs=-1, type=click.Path(exists=True, readable=True)) +def list( + trace_id: str, + span_id: str, + span_type: str, + parent_span_id: str, + search: str, + limit: int, + start_time: str, + end_time: str, + desc: bool, + output: str, + files=None, +): + """List your trace spans""" + from prettytable import PrettyTable + + # If no files are explicitly specified, use the default pattern to get them + spans = read_spans_from_files(files) + + if trace_id: + spans = filter(lambda s: s["trace_id"] == trace_id, spans) + if span_id: + spans = filter(lambda s: s["span_id"] == span_id, spans) + if span_type: + spans = filter(lambda s: s["span_type"] == span_type, spans) + if parent_span_id: + spans = filter(lambda s: s["parent_span_id"] == parent_span_id, spans) + # Filter spans based on the start and end times + if start_time: + start_dt = _parse_datetime(start_time) + spans = filter( + lambda span: _parse_datetime(span["start_time"]) >= start_dt, spans + ) + + if end_time: + end_dt = _parse_datetime(end_time) + spans = filter( + lambda span: _parse_datetime(span["start_time"]) <= end_dt, spans + ) + + if search: + spans = filter(_new_search_span_func(search), spans) + + # Sort spans based on the start time + spans = sorted( + spans, key=lambda span: _parse_datetime(span["start_time"]), reverse=desc + )[:limit] + + table = PrettyTable( + ["Trace ID", "Span ID", "Operation Name", "Conversation UID"], + ) + + for sp in spans: + conv_uid = None + if "metadata" in sp and sp: + metadata = sp["metadata"] + if isinstance(metadata, dict): + conv_uid = metadata.get("conv_uid") + table.add_row( + [ + sp.get("trace_id"), + sp.get("span_id"), + # sp.get("parent_span_id"), + sp.get("operation_name"), + conv_uid, + ] + ) + out_kwargs = {"ensure_ascii": False} if output == "json" else {} + print(table.get_formatted_string(out_format=output, **out_kwargs)) + + +@trace_cli_group.command() +@click.option( + "--trace_id", + required=True, + type=str, + help="Specify the trace ID to list", +) +@click.argument("files", nargs=-1, type=click.Path(exists=True, readable=True)) +def tree(trace_id: str, files): + """Display trace links as a tree""" + hierarchy = _view_trace_hierarchy(trace_id, files) + if not hierarchy: + _print_empty_message(files) + return + _print_trace_hierarchy(hierarchy) + + +@trace_cli_group.command() +@click.option( + "--trace_id", + required=False, + type=str, + default=None, + help="Specify the trace ID to analyze. If None, show latest conversation details", +) +@click.option( + "--tree", + required=False, + type=bool, + default=False, + is_flag=True, + help="Display trace spans as a tree", +) +@click.option( + "--hide_conv", + required=False, + type=bool, + default=False, + is_flag=True, + help="Hide your conversation details", +) +@click.option( + "--hide_run_params", + required=False, + type=bool, + default=False, + is_flag=True, + help="Hide run params", +) +@click.option( + "--output", + required=False, + type=click.Choice(["text", "html", "csv", "latex", "json"]), + default="text", + help="The output format", +) +@click.argument("files", nargs=-1, type=click.Path(exists=False, readable=True)) +def chat( + trace_id: str, + tree: bool, + hide_conv: bool, + hide_run_params: bool, + output: str, + files, +): + """Show conversation details""" + from prettytable import PrettyTable + + spans = read_spans_from_files(files) + + # Sort by start time + spans = sorted( + spans, key=lambda span: _parse_datetime(span["start_time"]), reverse=True + ) + spans = [sp for sp in spans] + if not spans: + _print_empty_message(files) + return + service_spans = {} + service_names = set(SpanTypeRunName.values()) + found_trace_id = None + for sp in spans: + span_type = sp["span_type"] + metadata = sp.get("metadata") + if span_type == SpanType.RUN: + service_name = metadata["run_service"] + service_spans[service_name] = sp.copy() + if set(service_spans.keys()) == service_names and found_trace_id: + break + elif span_type == SpanType.CHAT and not found_trace_id: + if not trace_id: + found_trace_id = sp["trace_id"] + if trace_id and trace_id == sp["trace_id"]: + found_trace_id = trace_id + + service_tables = {} + out_kwargs = {"ensure_ascii": False} if output == "json" else {} + for service_name, sp in service_spans.items(): + metadata = sp["metadata"] + table = PrettyTable(["Config Key", "Config Value"], title=service_name) + for k, v in metadata["params"].items(): + table.add_row([k, v]) + service_tables[service_name] = table + + if not hide_run_params: + merged_table1 = merge_tables_horizontally( + [ + service_tables.get(SpanTypeRunName.WEBSERVER.value), + service_tables.get(SpanTypeRunName.EMBEDDING_MODEL.value), + ] + ) + merged_table2 = merge_tables_horizontally( + [ + service_tables.get(SpanTypeRunName.MODEL_WORKER), + service_tables.get(SpanTypeRunName.WORKER_MANAGER), + ] + ) + if output == "text": + print(merged_table1) + print(merged_table2) + else: + for service_name, table in service_tables.items(): + print(table.get_formatted_string(out_format=output, **out_kwargs)) + if hide_conv: + return + + if not found_trace_id: + print(f"Can't found conversation with trace_id: {trace_id}") + return + trace_id = found_trace_id + + trace_spans = [span for span in spans if span["trace_id"] == trace_id] + trace_spans = [s for s in reversed(trace_spans)] + hierarchy = _build_trace_hierarchy(trace_spans) + if tree: + print("\nInvoke Trace Tree:\n") + _print_trace_hierarchy(hierarchy) + + trace_spans = _get_ordered_trace_from(hierarchy) + table = PrettyTable(["Key", "Value Value"], title="Chat Trace Details") + split_long_text = output == "text" + + for sp in trace_spans: + op = sp["operation_name"] + metadata = sp.get("metadata") + if op == "get_chat_instance" and not sp["end_time"]: + table.add_row(["trace_id", trace_id]) + table.add_row(["span_id", sp["span_id"]]) + table.add_row(["conv_uid", metadata.get("conv_uid")]) + table.add_row(["user_input", metadata.get("user_input")]) + table.add_row(["chat_mode", metadata.get("chat_mode")]) + table.add_row(["select_param", metadata.get("select_param")]) + table.add_row(["model_name", metadata.get("model_name")]) + if op in ["BaseChat.stream_call", "BaseChat.nostream_call"]: + if not sp["end_time"]: + table.add_row(["temperature", metadata.get("temperature")]) + table.add_row(["max_new_tokens", metadata.get("max_new_tokens")]) + table.add_row(["echo", metadata.get("echo")]) + elif "error" in metadata: + table.add_row(["BaseChat Error", metadata.get("error")]) + if op == "BaseChat.nostream_call" and not sp["end_time"]: + if "model_output" in metadata: + table.add_row( + [ + "BaseChat model_output", + split_string_by_terminal_width( + metadata.get("model_output").get("text"), + split=split_long_text, + ), + ] + ) + if "ai_response_text" in metadata: + table.add_row( + [ + "BaseChat ai_response_text", + split_string_by_terminal_width( + metadata.get("ai_response_text"), split=split_long_text + ), + ] + ) + if "prompt_define_response" in metadata: + table.add_row( + [ + "BaseChat prompt_define_response", + split_string_by_terminal_width( + metadata.get("prompt_define_response"), + split=split_long_text, + ), + ] + ) + if op == "DefaultModelWorker_call.generate_stream_func": + if not sp["end_time"]: + table.add_row(["llm_adapter", metadata.get("llm_adapter")]) + table.add_row( + [ + "User prompt", + split_string_by_terminal_width( + metadata.get("prompt"), split=split_long_text + ), + ] + ) + else: + table.add_row( + [ + "Model output", + split_string_by_terminal_width(metadata.get("output")), + ] + ) + if ( + op + in [ + "DefaultModelWorker.async_generate_stream", + "DefaultModelWorker.generate_stream", + ] + and metadata + and "error" in metadata + ): + table.add_row(["Model Error", metadata.get("error")]) + print(table.get_formatted_string(out_format=output, **out_kwargs)) + + +def read_spans_from_files(files=None) -> Iterable[Dict]: + """ + Reads spans from multiple files based on the provided file paths. + """ + if not files: + files = [_DEFAULT_FILE_PATTERN] + + for filepath in files: + for filename in glob.glob(filepath): + with open(filename, "r") as file: + for line in file: + yield json.loads(line) + + +def _print_empty_message(files=None): + if not files: + files = [_DEFAULT_FILE_PATTERN] + file_names = ",".join(files) + print(f"No trace span records found in your tracer files: {file_names}") + + +def _new_search_span_func(search: str): + def func(span: Dict) -> bool: + items = [span["trace_id"], span["span_id"], span["parent_span_id"]] + if "operation_name" in span: + items.append(span["operation_name"]) + if "metadata" in span: + metadata = span["metadata"] + if isinstance(metadata, dict): + for k, v in metadata.items(): + items.append(k) + items.append(v) + return any(search in str(item) for item in items if item) + + return func + + +def _parse_datetime(dt_str): + """Parse a datetime string to a datetime object.""" + return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S.%f") + + +def _build_trace_hierarchy(spans, parent_span_id=None, indent=0): + # Current spans + current_level_spans = [ + span + for span in spans + if span["parent_span_id"] == parent_span_id and span["end_time"] is None + ] + + hierarchy = [] + + for start_span in current_level_spans: + # Find end span + end_span = next( + ( + span + for span in spans + if span["span_id"] == start_span["span_id"] + and span["end_time"] is not None + ), + None, + ) + entry = { + "operation_name": start_span["operation_name"], + "parent_span_id": start_span["parent_span_id"], + "span_id": start_span["span_id"], + "start_time": start_span["start_time"], + "end_time": start_span["end_time"], + "metadata": start_span["metadata"], + "children": _build_trace_hierarchy( + spans, start_span["span_id"], indent + 1 + ), + } + hierarchy.append(entry) + + # Append end span + if end_span: + entry_end = { + "operation_name": end_span["operation_name"], + "parent_span_id": end_span["parent_span_id"], + "span_id": end_span["span_id"], + "start_time": end_span["start_time"], + "end_time": end_span["end_time"], + "metadata": end_span["metadata"], + "children": [], + } + hierarchy.append(entry_end) + + return hierarchy + + +def _view_trace_hierarchy(trace_id, files=None): + """Find and display the calls of the entire link based on the given trace_id""" + spans = read_spans_from_files(files) + trace_spans = [span for span in spans if span["trace_id"] == trace_id] + if not trace_spans: + return None + hierarchy = _build_trace_hierarchy(trace_spans) + return hierarchy + + +def _print_trace_hierarchy(hierarchy, indent=0): + """Print link hierarchy""" + for entry in hierarchy: + print( + " " * indent + + f"Operation: {entry['operation_name']} (Start: {entry['start_time']}, End: {entry['end_time']})" + ) + _print_trace_hierarchy(entry["children"], indent + 1) + + +def _get_ordered_trace_from(hierarchy): + traces = [] + + def func(items): + for item in items: + traces.append(item) + func(item["children"]) + + func(hierarchy) + return traces + + +def _print(service_spans: Dict): + for names in [ + [SpanTypeRunName.WEBSERVER.name, SpanTypeRunName.EMBEDDING_MODEL], + [SpanTypeRunName.WORKER_MANAGER.name, SpanTypeRunName.MODEL_WORKER], + ]: + pass + + +def merge_tables_horizontally(tables): + from prettytable import PrettyTable + + if not tables: + return None + + tables = [t for t in tables if t] + if not tables: + return None + + max_rows = max(len(table._rows) for table in tables) + + merged_table = PrettyTable() + + new_field_names = [] + for table in tables: + new_field_names.extend( + [ + f"{name} ({table.title})" if table.title else f"{name}" + for name in table.field_names + ] + ) + + merged_table.field_names = new_field_names + + for i in range(max_rows): + merged_row = [] + for table in tables: + if i < len(table._rows): + merged_row.extend(table._rows[i]) + else: + # Fill empty cells for shorter tables + merged_row.extend([""] * len(table.field_names)) + merged_table.add_row(merged_row) + + return merged_table + + +def split_string_by_terminal_width(s, split=True, max_len=None, sp="\n"): + """ + Split a string into substrings based on the current terminal width. + + Parameters: + - s: the input string + """ + if not split: + return s + if not max_len: + try: + max_len = int(os.get_terminal_size().columns * 0.8) + except OSError: + # Default to 80 columns if the terminal size can't be determined + max_len = 100 + return sp.join([s[i : i + max_len] for i in range(0, len(s), max_len)]) diff --git a/pilot/utils/tracer/tracer_impl.py b/pilot/utils/tracer/tracer_impl.py new file mode 100644 index 000000000..bda25ab4d --- /dev/null +++ b/pilot/utils/tracer/tracer_impl.py @@ -0,0 +1,195 @@ +from typing import Dict, Optional +from contextvars import ContextVar +from functools import wraps + +from pilot.component import SystemApp, ComponentType +from pilot.utils.tracer.base import ( + SpanType, + Span, + Tracer, + SpanStorage, + SpanStorageType, + TracerContext, +) +from pilot.utils.tracer.span_storage import MemorySpanStorage + + +class DefaultTracer(Tracer): + def __init__( + self, + system_app: SystemApp | None = None, + default_storage: SpanStorage = None, + span_storage_type: SpanStorageType = SpanStorageType.ON_CREATE_END, + ): + super().__init__(system_app) + self._span_stack_var = ContextVar("span_stack", default=[]) + + if not default_storage: + default_storage = MemorySpanStorage(system_app) + self._default_storage = default_storage + self._span_storage_type = span_storage_type + + def append_span(self, span: Span): + self._get_current_storage().append_span(span) + + def start_span( + self, + operation_name: str, + parent_span_id: str = None, + span_type: SpanType = None, + metadata: Dict = None, + ) -> Span: + trace_id = ( + self._new_uuid() if parent_span_id is None else parent_span_id.split(":")[0] + ) + span_id = f"{trace_id}:{self._new_uuid()}" + span = Span( + trace_id, + span_id, + span_type, + parent_span_id, + operation_name, + metadata=metadata, + ) + + if self._span_storage_type in [ + SpanStorageType.ON_END, + SpanStorageType.ON_CREATE_END, + ]: + span.add_end_caller(self.append_span) + + if self._span_storage_type in [ + SpanStorageType.ON_CREATE, + SpanStorageType.ON_CREATE_END, + ]: + self.append_span(span) + current_stack = self._span_stack_var.get() + current_stack.append(span) + self._span_stack_var.set(current_stack) + + span.add_end_caller(self._remove_from_stack_top) + return span + + def end_span(self, span: Span, **kwargs): + """""" + span.end(**kwargs) + + def _remove_from_stack_top(self, span: Span): + current_stack = self._span_stack_var.get() + if current_stack: + current_stack.pop() + self._span_stack_var.set(current_stack) + + def get_current_span(self) -> Optional[Span]: + current_stack = self._span_stack_var.get() + return current_stack[-1] if current_stack else None + + def _get_current_storage(self) -> SpanStorage: + return self.system_app.get_component( + ComponentType.TRACER_SPAN_STORAGE, SpanStorage, self._default_storage + ) + + +class TracerManager: + """The manager of current tracer""" + + def __init__(self) -> None: + self._system_app: Optional[SystemApp] = None + self._trace_context_var: ContextVar[TracerContext] = ContextVar( + "trace_context", + default=TracerContext(), + ) + + def initialize( + self, system_app: SystemApp, trace_context_var: ContextVar[TracerContext] = None + ) -> None: + self._system_app = system_app + if trace_context_var: + self._trace_context_var = trace_context_var + + def _get_tracer(self) -> Tracer: + if not self._system_app: + return None + return self._system_app.get_component(ComponentType.TRACER, Tracer, None) + + def start_span( + self, + operation_name: str, + parent_span_id: str = None, + span_type: SpanType = None, + metadata: Dict = None, + ) -> Span: + """Start a new span with operation_name + This method must not throw an exception under any case and try not to block as much as possible + """ + tracer = self._get_tracer() + if not tracer: + return Span("empty_span", "empty_span") + if not parent_span_id: + parent_span_id = self.get_current_span_id() + return tracer.start_span( + operation_name, parent_span_id, span_type=span_type, metadata=metadata + ) + + def end_span(self, span: Span, **kwargs): + tracer = self._get_tracer() + if not tracer or not span: + return + tracer.end_span(span, **kwargs) + + def get_current_span(self) -> Optional[Span]: + tracer = self._get_tracer() + if not tracer: + return None + return tracer.get_current_span() + + def get_current_span_id(self) -> Optional[str]: + current_span = self.get_current_span() + if current_span: + return current_span.span_id + ctx = self._trace_context_var.get() + return ctx.span_id if ctx else None + + +root_tracer: TracerManager = TracerManager() + + +def trace(operation_name: str, **trace_kwargs): + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + with root_tracer.start_span(operation_name, **trace_kwargs): + return await func(*args, **kwargs) + + return wrapper + + return decorator + + +def initialize_tracer( + system_app: SystemApp, + tracer_filename: str, + root_operation_name: str = "DB-GPT-Web-Entry", +): + if not system_app: + return + from pilot.utils.tracer.span_storage import FileSpanStorage + + trace_context_var = ContextVar( + "trace_context", + default=TracerContext(), + ) + tracer = DefaultTracer(system_app) + + system_app.register_instance(FileSpanStorage(tracer_filename)) + system_app.register_instance(tracer) + root_tracer.initialize(system_app, trace_context_var) + if system_app.app: + from pilot.utils.tracer.tracer_middleware import TraceIDMiddleware + + system_app.app.add_middleware( + TraceIDMiddleware, + trace_context_var=trace_context_var, + tracer=tracer, + root_operation_name=root_operation_name, + ) diff --git a/pilot/utils/tracer/tracer_middleware.py b/pilot/utils/tracer/tracer_middleware.py new file mode 100644 index 000000000..41f1b64dc --- /dev/null +++ b/pilot/utils/tracer/tracer_middleware.py @@ -0,0 +1,45 @@ +import uuid +from contextvars import ContextVar + +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.types import ASGIApp +from pilot.utils.tracer import TracerContext, Tracer + + +_DEFAULT_EXCLUDE_PATHS = ["/api/controller/heartbeat"] + + +class TraceIDMiddleware(BaseHTTPMiddleware): + def __init__( + self, + app: ASGIApp, + trace_context_var: ContextVar[TracerContext], + tracer: Tracer, + root_operation_name: str = "DB-GPT-Web-Entry", + include_prefix: str = "/api", + exclude_paths=_DEFAULT_EXCLUDE_PATHS, + ): + super().__init__(app) + self.trace_context_var = trace_context_var + self.tracer = tracer + self.root_operation_name = root_operation_name + self.include_prefix = include_prefix + self.exclude_paths = exclude_paths + + async def dispatch(self, request: Request, call_next): + if request.url.path in self.exclude_paths or not request.url.path.startswith( + self.include_prefix + ): + return await call_next(request) + + span_id = request.headers.get("DBGPT_TRACER_SPAN_ID") + # if not span_id: + # span_id = str(uuid.uuid4()) + # self.trace_context_var.set(TracerContext(span_id=span_id)) + + with self.tracer.start_span( + self.root_operation_name, span_id, metadata={"path": request.url.path} + ): + response = await call_next(request) + return response