feat(core): Support cross-service data recording and analyze (#665)

Close #659
This commit is contained in:
Aries-ckt 2023-10-12 09:11:27 +05:00 committed by GitHub
commit 1acb8da2dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 2212 additions and 106 deletions

View File

@ -15,7 +15,7 @@ IMAGE_NAME_ARGS=""
PIP_INDEX_URL="https://pypi.org/simple"
# en or zh
LANGUAGE="en"
BUILD_LOCAL_CODE="false"
BUILD_LOCAL_CODE="true"
LOAD_EXAMPLES="true"
BUILD_NETWORK=""
DB_GPT_INSTALL_MODEL="default"
@ -26,7 +26,7 @@ usage () {
echo " [-n|--image-name image name] Current image name, default: db-gpt"
echo " [-i|--pip-index-url pip index url] Pip index url, default: https://pypi.org/simple"
echo " [--language en or zh] You language, default: en"
echo " [--build-local-code true or false] Whether to use the local project code to package the image, default: false"
echo " [--build-local-code true or false] Whether to use the local project code to package the image, default: true"
echo " [--load-examples true or false] Whether to load examples to default database default: true"
echo " [--network network name] The network of docker build"
echo " [--install-mode mode name] Installation mode name, default: default, If you completely use openai's service, you can set the mode name to 'openai'"

View File

@ -0,0 +1,309 @@
# Debugging
-------------
DB-GPT provides a set of tools to help you troubleshoot and resolve some of the issues you may encounter.
## Trace Logs
DB-GPT writes some critical system runtime information to trace logs. By default, these are located in `logs/dbgpt*.jsonl`.
DB-GPT also offers a command-line tool, `dbgpt trace`, to help you analyze these trace logs. You can see its specific usage with the command `dbgpt trace --help`.
## Viewing Chat Details
You can use the `dbgpt trace chat` command to view chat details. By default, it will display the latest chat message.
### Viewing Service Runtime Information
```bash
dbgpt trace chat --hide_conv
```
You will see an output like:
```
+------------------------+--------------------------+-----------------------------+------------------------------------+
| Config Key (Webserver) | Config Value (Webserver) | Config Key (EmbeddingModel) | Config Value (EmbeddingModel) |
+------------------------+--------------------------+-----------------------------+------------------------------------+
| host | 0.0.0.0 | model_name | text2vec |
| port | 5000 | model_path | /app/models/text2vec-large-chinese |
| daemon | False | device | cuda |
| share | False | normalize_embeddings | None |
| remote_embedding | False | | |
| log_level | None | | |
| light | False | | |
+------------------------+--------------------------+-----------------------------+------------------------------------+
+--------------------------+-----------------------------+----------------------------+------------------------------+
| Config Key (ModelWorker) | Config Value (ModelWorker) | Config Key (WorkerManager) | Config Value (WorkerManager) |
+--------------------------+-----------------------------+----------------------------+------------------------------+
| model_name | vicuna-13b-v1.5 | model_name | vicuna-13b-v1.5 |
| model_path | /app/models/vicuna-13b-v1.5 | model_path | /app/models/vicuna-13b-v1.5 |
| device | cuda | worker_type | None |
| model_type | huggingface | worker_class | None |
| prompt_template | None | model_type | huggingface |
| max_context_size | 4096 | host | 0.0.0.0 |
| num_gpus | None | port | 5000 |
| max_gpu_memory | None | daemon | False |
| cpu_offloading | False | limit_model_concurrency | 5 |
| load_8bit | False | standalone | True |
| load_4bit | False | register | True |
| quant_type | nf4 | worker_register_host | None |
| use_double_quant | True | controller_addr | http://127.0.0.1:5000 |
| compute_dtype | None | send_heartbeat | True |
| trust_remote_code | True | heartbeat_interval | 20 |
| verbose | False | log_level | None |
+--------------------------+-----------------------------+----------------------------+------------------------------+
```
### Viewing the Latest Chat Message
```bash
dbgpt trace chat --hide_run_params
```
You will see an output like:
```
+-------------------------------------------------------------------------------------------------------------------------------------------+
| Chat Trace Details |
+----------------+--------------------------------------------------------------------------------------------------------------------------+
| Key | Value Value |
+----------------+--------------------------------------------------------------------------------------------------------------------------+
| trace_id | 5d1900c3-5aad-4159-9946-fbb600666530 |
| span_id | 5d1900c3-5aad-4159-9946-fbb600666530:14772034-bed4-4b4e-b43f-fcf3a8aad6a7 |
| conv_uid | 5e456272-68ac-11ee-9fba-0242ac150003 |
| user_input | Who are you? |
| chat_mode | chat_normal |
| select_param | None |
| model_name | vicuna-13b-v1.5 |
| temperature | 0.6 |
| max_new_tokens | 1024 |
| echo | False |
| llm_adapter | FastChatLLMModelAdaperWrapper(fastchat.model.model_adapter.VicunaAdapter) |
| User prompt | A chat between a curious user and an artificial intelligence assistant. The assistant gives helpful, detailed, and polit |
| | e answers to the user's questions. USER: Who are you? ASSISTANT: |
| Model output | You can call me Vicuna, and I was trained by Large Model Systems Organization (LMSYS) researchers as a language model. |
+----------------+--------------------------------------------------------------------------------------------------------------------------+
```
### Viewing Chat Details and Call Chain
```bash
dbgpt trace chat --hide_run_params --tree
```
You will see an output like:
```
Invoke Trace Tree:
Operation: DB-GPT-Web-Entry (Start: 2023-10-12 03:06:43.180, End: None)
Operation: get_chat_instance (Start: 2023-10-12 03:06:43.258, End: None)
Operation: get_chat_instance (Start: 2023-10-12 03:06:43.258, End: 2023-10-12 03:06:43.424)
Operation: stream_generator (Start: 2023-10-12 03:06:43.425, End: None)
Operation: BaseChat.stream_call (Start: 2023-10-12 03:06:43.426, End: None)
Operation: WorkerManager.generate_stream (Start: 2023-10-12 03:06:43.426, End: None)
Operation: DefaultModelWorker.generate_stream (Start: 2023-10-12 03:06:43.428, End: None)
Operation: DefaultModelWorker_call.generate_stream_func (Start: 2023-10-12 03:06:43.430, End: None)
Operation: DefaultModelWorker_call.generate_stream_func (Start: 2023-10-12 03:06:43.430, End: 2023-10-12 03:06:48.518)
Operation: DefaultModelWorker.generate_stream (Start: 2023-10-12 03:06:43.428, End: 2023-10-12 03:06:48.518)
Operation: WorkerManager.generate_stream (Start: 2023-10-12 03:06:43.426, End: 2023-10-12 03:06:48.518)
Operation: BaseChat.stream_call (Start: 2023-10-12 03:06:43.426, End: 2023-10-12 03:06:48.519)
Operation: stream_generator (Start: 2023-10-12 03:06:43.425, End: 2023-10-12 03:06:48.519)
Operation: DB-GPT-Web-Entry (Start: 2023-10-12 03:06:43.180, End: 2023-10-12 03:06:43.257)
+-------------------------------------------------------------------------------------------------------------------------------------------+
| Chat Trace Details |
+----------------+--------------------------------------------------------------------------------------------------------------------------+
| Key | Value Value |
+----------------+--------------------------------------------------------------------------------------------------------------------------+
| trace_id | 5d1900c3-5aad-4159-9946-fbb600666530 |
| span_id | 5d1900c3-5aad-4159-9946-fbb600666530:14772034-bed4-4b4e-b43f-fcf3a8aad6a7 |
| conv_uid | 5e456272-68ac-11ee-9fba-0242ac150003 |
| user_input | Who are you? |
| chat_mode | chat_normal |
| select_param | None |
| model_name | vicuna-13b-v1.5 |
| temperature | 0.6 |
| max_new_tokens | 1024 |
| echo | False |
| llm_adapter | FastChatLLMModelAdaperWrapper(fastchat.model.model_adapter.VicunaAdapter) |
| User prompt | A chat between a curious user and an artificial intelligence assistant. The assistant gives helpful, detailed, and polit |
| | e answers to the user's questions. USER: Who are you? ASSISTANT: |
| Model output | You can call me Vicuna, and I was trained by Large Model Systems Organization (LMSYS) researchers as a language model. |
+----------------+--------------------------------------------------------------------------------------------------------------------------+
```
### Viewing Chat Details Based on trace_id
```bash
dbgpt trace chat --hide_run_params --trace_id ec30d733-7b35-4d61-b02e-2832fd2e29ff
```
You will see an output like:
```
+-------------------------------------------------------------------------------------------------------------------------------------------+
| Chat Trace Details |
+----------------+--------------------------------------------------------------------------------------------------------------------------+
| Key | Value Value |
+----------------+--------------------------------------------------------------------------------------------------------------------------+
| trace_id | ec30d733-7b35-4d61-b02e-2832fd2e29ff |
| span_id | ec30d733-7b35-4d61-b02e-2832fd2e29ff:0482a0c5-38b3-4b38-8101-e42489f90ccd |
| conv_uid | 87a722de-68ae-11ee-9fba-0242ac150003 |
| user_input | Hello |
| chat_mode | chat_normal |
| select_param | None |
| model_name | vicuna-13b-v1.5 |
| temperature | 0.6 |
| max_new_tokens | 1024 |
| echo | False |
| llm_adapter | FastChatLLMModelAdaperWrapper(fastchat.model.model_adapter.VicunaAdapter) |
| User prompt | A chat between a curious user and an artificial intelligence assistant. The assistant gives helpful, detailed, and polit |
| | e answers to the user's questions. USER: Hello ASSISTANT: |
| Model output | Hello! How can I help you today? Is there something specific you want to know or talk about? I'm here to answer any ques |
| | tions you might have, to the best of my ability. |
+----------------+--------------------------------------------------------------------------------------------------------------------------+
```
### More `chat` Usage
```bash
dbgpt trace chat --help
```
```
Usage: dbgpt trace chat [OPTIONS] [FILES]...
Show conversation details
Options:
--trace_id TEXT Specify the trace ID to analyze. If None,
show latest conversation details
--tree Display trace spans as a tree
--hide_conv Hide your conversation details
--hide_run_params Hide run params
--output [text|html|csv|latex|json]
The output format
--help Show this message and exit.
```
## Viewing Call Tree Based on `trace_id`
```bash
dbgpt trace tree --trace_id ec30d733-7b35-4d61-b02e-2832fd2e29ff
```
You will see an output like:
```
Operation: DB-GPT-Web-Entry (Start: 2023-10-12 03:22:10.592, End: None)
Operation: get_chat_instance (Start: 2023-10-12 03:22:10.594, End: None)
Operation: get_chat_instance (Start: 2023-10-12 03:22:10.594, End: 2023-10-12 03:22:10.658)
Operation: stream_generator (Start: 2023-10-12 03:22:10.659, End: None)
Operation: BaseChat.stream_call (Start: 2023-10-12 03:22:10.659, End: None)
Operation: WorkerManager.generate_stream (Start: 2023-10-12 03:22:10.660, End: None)
Operation: DefaultModelWorker.generate_stream (Start: 2023-10-12 03:22:10.675, End: None)
Operation: DefaultModelWorker_call.generate_stream_func (Start: 2023-10-12 03:22:10.676, End: None)
Operation: DefaultModelWorker_call.generate_stream_func (Start: 2023-10-12 03:22:10.676, End: 2023-10-12 03:22:16.130)
Operation: DefaultModelWorker.generate_stream (Start: 2023-10-12 03:22:10.675, End: 2023-10-12 03:22:16.130)
Operation: WorkerManager.generate_stream (Start: 2023-10-12 03:22:10.660, End: 2023-10-12 03:22:16.130)
Operation: BaseChat.stream_call (Start: 2023-10-12 03:22:10.659, End: 2023-10-12 03:22:16.130)
Operation: stream_generator (Start: 2023-10-12 03:22:10.659, End: 2023-10-12 03:22:16.130)
Operation: DB-GPT-Web-Entry (Start: 2023-10-12 03:22:10.592, End: 2023-10-12 03:22:10.673)
```
## Listing Trace Information
### Listing All Trace Information
```bash
dbgpt trace list
```
You will see an output like:
```
+--------------------------------------+---------------------------------------------------------------------------+-----------------------------------+------------------+
| Trace ID | Span ID | Operation Name | Conversation UID |
+--------------------------------------+---------------------------------------------------------------------------+-----------------------------------+------------------+
| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:f650065f-f761-4790-99f7-8109c15f756a | run_webserver | None |
| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:b2ff279e-0557-4b2d-8959-85e25dcfe94e | EmbeddingLoader.load | None |
| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:b2ff279e-0557-4b2d-8959-85e25dcfe94e | EmbeddingLoader.load | None |
| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:3e8b1b9d-5ef2-4382-af62-6b2b21cc04fd | WorkerManager._start_local_worker | None |
| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:3e8b1b9d-5ef2-4382-af62-6b2b21cc04fd | WorkerManager._start_local_worker | None |
| eaf4830f-976f-45a4-9a50-244f3ab6f9e1 | eaf4830f-976f-45a4-9a50-244f3ab6f9e1:4c280ec9-0fd6-4ee8-b79f-1afcab0f9901 | DefaultModelWorker.start | None |
+--------------------------------------+---------------------------------------------------------------------------+-----------------------------------+------------------+
```
### Listing Trace Information by Trace Type
```bash
dbgpt trace list --span_type chat
```
You will see an output like:
```
+--------------------------------------+---------------------------------------------------------------------------+-------------------+--------------------------------------+
| Trace ID | Span ID | Operation Name | Conversation UID |
+--------------------------------------+---------------------------------------------------------------------------+-------------------+--------------------------------------+
| 5d1900c3-5aad-4159-9946-fbb600666530 | 5d1900c3-5aad-4159-9946-fbb600666530:14772034-bed4-4b4e-b43f-fcf3a8aad6a7 | get_chat_instance | 5e456272-68ac-11ee-9fba-0242ac150003 |
| 5d1900c3-5aad-4159-9946-fbb600666530 | 5d1900c3-5aad-4159-9946-fbb600666530:14772034-bed4-4b4e-b43f-fcf3a8aad6a7 | get_chat_instance | 5e456272-68ac-11ee-9fba-0242ac150003 |
| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:0482a0c5-38b3-4b38-8101-e42489f90ccd | get_chat_instance | 87a722de-68ae-11ee-9fba-0242ac150003 |
| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:0482a0c5-38b3-4b38-8101-e42489f90ccd | get_chat_instance | 87a722de-68ae-11ee-9fba-0242ac150003 |
+--------------------------------------+---------------------------------------------------------------------------+-------------------+--------------------------------------+
```
### Searching Trace Information
```bash
dbgpt trace list --search Hello
```
You will see an output like:
```
+--------------------------------------+---------------------------------------------------------------------------+----------------------------------------------+--------------------------------------+
| Trace ID | Span ID | Operation Name | Conversation UID |
+--------------------------------------+---------------------------------------------------------------------------+----------------------------------------------+--------------------------------------+
| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:0482a0c5-38b3-4b38-8101-e42489f90ccd | get_chat_instance | 87a722de-68ae-11ee-9fba-0242ac150003 |
| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:0482a0c5-38b3-4b38-8101-e42489f90ccd | get_chat_instance | 87a722de-68ae-11ee-9fba-0242ac150003 |
| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:03de6c87-34d6-426a-85e8-7d46d475411e | BaseChat.stream_call | None |
| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:03de6c87-34d6-426a-85e8-7d46d475411e | BaseChat.stream_call | None |
| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:19593596-b4c7-4d15-a3c1-0924d86098dd | DefaultModelWorker_call.generate_stream_func | None |
| ec30d733-7b35-4d61-b02e-2832fd2e29ff | ec30d733-7b35-4d61-b02e-2832fd2e29ff:19593596-b4c7-4d15-a3c1-0924d86098dd | DefaultModelWorker_call.generate_stream_func | None |
+--------------------------------------+---------------------------------------------------------------------------+----------------------------------------------+--------------------------------------+
```
### More `list` Usage
```bash
dbgpt trace list --help
```
```
Usage: dbgpt trace list [OPTIONS] [FILES]...
List your trace spans
Options:
--trace_id TEXT Specify the trace ID to list
--span_id TEXT Specify the Span ID to list.
--span_type TEXT Specify the Span Type to list.
--parent_span_id TEXT Specify the Parent Span ID to list.
--search TEXT Search trace_id, span_id, parent_span_id,
operation_name or content in metadata.
-l, --limit INTEGER Limit the number of recent span displayed.
--start_time TEXT Filter by start time. Format: "YYYY-MM-DD
HH:MM:SS.mmm"
--end_time TEXT Filter by end time. Format: "YYYY-MM-DD
HH:MM:SS.mmm"
--desc Whether to use reverse sorting. By default,
sorting is based on start time.
--output [text|html|csv|latex|json]
The output format
--help Show this message and exit.
```

View File

@ -53,6 +53,7 @@ Getting Started
getting_started/concepts.md
getting_started/tutorials.md
getting_started/faq.rst
getting_started/observability.md
Modules

View File

@ -0,0 +1,121 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) 2023, csunny
# This file is distributed under the same license as the DB-GPT package.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2023.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: DB-GPT 👏👏 0.3.9\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2023-10-12 11:54+0800\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language: zh_CN\n"
"Language-Team: zh_CN <LL@li.org>\n"
"Plural-Forms: nplurals=1; plural=0;\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.12.1\n"
#: ../../getting_started/observability.md:1 c88ce18295444597baa7355efc79ab15
msgid "Debugging"
msgstr ""
#: ../../getting_started/observability.md:4 522a4e83c62c493381ca3a452ced8ecf
msgid ""
"DB-GPT provides a set of tools to help you troubleshoot and resolve some "
"of the issues you may encounter."
msgstr ""
"DB-GPT 提供了一套工具来帮助你排查和解决一些遇到的问题。"
#: ../../getting_started/observability.md:7 7fe5b8ab29194e42b74a8ab3e77006c7
msgid "Trace Logs"
msgstr "追踪日志"
#: ../../getting_started/observability.md:9 8a55f7a2b5a247d49728969f179bf50d
msgid ""
"DB-GPT writes some critical system runtime information to trace logs. By "
"default, these are located in `logs/dbgpt*.jsonl`."
msgstr ""
"DB-GPT 会将一些系统运行的关键的信息写入到追踪日志中,默认情况下,在 `logs/dbgpt*.jsonl` 中。"
#: ../../getting_started/observability.md:11 5e2b847179e9427a8ae022b5338cbbd9
msgid ""
"DB-GPT also offers a command-line tool, `dbgpt trace`, to help you "
"analyze these trace logs. You can see its specific usage with the command"
" `dbgpt trace --help`."
msgstr ""
"DB-GPT 也提供了命令工具 `dbgpt trace` 命令来帮助你分析追踪日志,你可以使用命令 `dbgpt trace --help` 来查看具体的用法。"
#: ../../getting_started/observability.md:14 6e0c9c4ba6ac4eb49d9289b0d63f77fb
msgid "查看对话详情"
msgstr ""
#: ../../getting_started/observability.md:16 b9badbab74de47f192ff117d7d36fa72
msgid ""
"You can use the `dbgpt trace chat` command to view chat details. By "
"default, it will display the latest chat message."
msgstr ""
"你可以使用 `dbgpt trace chat` 命令来查看对话信息,默认情况会显示你最新的一条对话信息。"
#: ../../getting_started/observability.md:18 55c7466bd80d43c9a355d87daf2a2be7
msgid "Viewing Service Runtime Information"
msgstr "查看服务运行信息"
#: ../../getting_started/observability.md:24
#: ../../getting_started/observability.md:66
#: ../../getting_started/observability.md:98
#: ../../getting_started/observability.md:146
#: ../../getting_started/observability.md:200
#: ../../getting_started/observability.md:229
#: ../../getting_started/observability.md:249
#: ../../getting_started/observability.md:267 18907a58a0c3493aa24c17e367309471
#: 387f08b6cd864a7682522b5a40863e79 5fe0baa5803d4ca5ad0e8cbd8a859c8c
#: 7abbfdd996444999a24cbad852d2e545 957bdf6826e045608c8c3ebd06c8fe76
#: acddf2cdf8c94bd6864d66739fa26459 f3fcaf2b47774779bad2feb3ef4318c4
#: f99bb110024443f68cc8b7f19956eff4
msgid "You will see an output like:"
msgstr "你将会看到类似的输出:"
#: ../../getting_started/observability.md:60 5c8c213a5bac434bb3defe6611a03813
msgid "Viewing the Latest Chat Message"
msgstr "查看最近的一条对话信息"
#: ../../getting_started/observability.md:92 ce19873d8e754173849d14eaeab963d2
msgid "Viewing Chat Details and Call Chain"
msgstr "查看对话信息和调用链路"
#: ../../getting_started/observability.md:140 36bcc37971ce4d6682f1ea32e2e9a980
msgid "Viewing Chat Details Based on trace_id"
msgstr "根据 `trace_id` 查看对应的对话信息"
#: ../../getting_started/observability.md:172 c74968492f7544758c9d95fa831c4fcf
msgid "More `chat` Usage"
msgstr "更多 `chat` 用法"
#: ../../getting_started/observability.md:194 c2e5a7e7b1ee40fea15790d66b79eb11
msgid "Viewing Call Tree Based on `trace_id`"
msgstr "根据 `trace_id` 查看调用树"
#: ../../getting_started/observability.md:220 6bd64d2ad0ce442e8e81aa1ae7dd2189
msgid "Listing Trace Information"
msgstr "列出追踪信息"
#: ../../getting_started/observability.md:222 ce643441e8744ab09fcbd4081d2adb4a
msgid "Listing All Trace Information"
msgstr "列出全部追踪信息"
#: ../../getting_started/observability.md:243 374376d81ed54bc2a450505abfe7dc6d
msgid "Listing Trace Information by Trace Type"
msgstr "根据追踪类型列出追踪信息"
#: ../../getting_started/observability.md:261 8fc08a9c924d47309dc7062811c4fb62
msgid "Searching Trace Information"
msgstr "搜索追踪信息"
#: ../../getting_started/observability.md:281 3681413d196144389431422010a7e30f
msgid "More `list` Usage"
msgstr "更多 `list` 用法"

View File

@ -47,6 +47,8 @@ class ComponentType(str, Enum):
WORKER_MANAGER_FACTORY = "dbgpt_worker_manager_factory"
MODEL_CONTROLLER = "dbgpt_model_controller"
EXECUTOR_DEFAULT = "dbgpt_thread_pool_default"
TRACER = "dbgpt_tracer"
TRACER_SPAN_STORAGE = "dbgpt_tracer_span_storage"
class BaseComponent(LifeCycle, ABC):
@ -70,6 +72,8 @@ class BaseComponent(LifeCycle, ABC):
T = TypeVar("T", bound=BaseComponent)
_EMPTY_DEFAULT_COMPONENT = "_EMPTY_DEFAULT_COMPONENT"
class SystemApp(LifeCycle):
"""Main System Application class that manages the lifecycle and registration of components."""
@ -104,13 +108,18 @@ class SystemApp(LifeCycle):
instance.init_app(self)
def get_component(
self, name: Union[str, ComponentType], component_type: Type[T]
self,
name: Union[str, ComponentType],
component_type: Type[T],
default_component=_EMPTY_DEFAULT_COMPONENT,
) -> T:
"""Retrieve a registered component by its name and type."""
if isinstance(name, ComponentType):
name = name.value
component = self.components.get(name)
if not component:
if default_component != _EMPTY_DEFAULT_COMPONENT:
return default_component
raise ValueError(f"No component found with name {name}")
if not isinstance(component, component_type):
raise TypeError(f"Component {name} is not of type {component_type}")

View File

@ -354,7 +354,7 @@ class LlamaCppAdapater(BaseLLMAdaper):
if not path.is_file():
model_paths = list(path.glob("*ggml*.gguf"))
if not model_paths:
return False
return False, None
model_path = str(model_paths[0])
logger.warn(
f"Model path {model_path} is not single file, use first *gglm*.gguf model file: {model_path}"

View File

@ -53,6 +53,9 @@ class ModelOutput:
error_code: int
model_context: Dict = None
def to_dict(self) -> Dict:
return asdict(self)
@dataclass
class WorkerApplyOutput:

View File

@ -18,11 +18,13 @@ class PromptRequest(BaseModel):
max_new_tokens: int = None
stop: str = None
echo: bool = True
span_id: str = None
class EmbeddingsRequest(BaseModel):
model: str
input: List[str]
span_id: str = None
class WorkerApplyRequest(BaseModel):

View File

@ -3,6 +3,8 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from pilot.model.parameter import BaseEmbeddingModelParameters
from pilot.utils.parameter_utils import _get_dict_from_obj
from pilot.utils.tracer import root_tracer, SpanType, SpanTypeRunName
if TYPE_CHECKING:
from langchain.embeddings.base import Embeddings
@ -15,13 +17,21 @@ class EmbeddingLoader:
def load(
self, model_name: str, param: BaseEmbeddingModelParameters
) -> "Embeddings":
# add more models
if model_name in ["proxy_openai", "proxy_azure"]:
from langchain.embeddings import OpenAIEmbeddings
metadata = {
"model_name": model_name,
"run_service": SpanTypeRunName.EMBEDDING_MODEL.value,
"params": _get_dict_from_obj(param),
}
with root_tracer.start_span(
"EmbeddingLoader.load", span_type=SpanType.RUN, metadata=metadata
):
# add more models
if model_name in ["proxy_openai", "proxy_azure"]:
from langchain.embeddings import OpenAIEmbeddings
return OpenAIEmbeddings(**param.build_kwargs())
else:
from langchain.embeddings import HuggingFaceEmbeddings
return OpenAIEmbeddings(**param.build_kwargs())
else:
from langchain.embeddings import HuggingFaceEmbeddings
kwargs = param.build_kwargs(model_name=param.model_path)
return HuggingFaceEmbeddings(**kwargs)
kwargs = param.build_kwargs(model_name=param.model_path)
return HuggingFaceEmbeddings(**kwargs)

View File

@ -9,7 +9,8 @@ from pilot.model.loader import ModelLoader, _get_model_real_path
from pilot.model.parameter import ModelParameters
from pilot.model.cluster.worker_base import ModelWorker
from pilot.utils.model_utils import _clear_model_cache
from pilot.utils.parameter_utils import EnvArgumentParser
from pilot.utils.parameter_utils import EnvArgumentParser, _get_dict_from_obj
from pilot.utils.tracer import root_tracer, SpanType, SpanTypeRunName
logger = logging.getLogger(__name__)
@ -94,9 +95,20 @@ class DefaultModelWorker(ModelWorker):
model_params = self.parse_parameters(command_args)
self._model_params = model_params
logger.info(f"Begin load model, model params: {model_params}")
self.model, self.tokenizer = self.ml.loader_with_params(
model_params, self.llm_adapter
)
metadata = {
"model_name": self.model_name,
"model_path": self.model_path,
"model_type": self.llm_adapter.model_type(),
"llm_adapter": str(self.llm_adapter),
"run_service": SpanTypeRunName.MODEL_WORKER,
"params": _get_dict_from_obj(model_params),
}
with root_tracer.start_span(
"DefaultModelWorker.start", span_type=SpanType.RUN, metadata=metadata
):
self.model, self.tokenizer = self.ml.loader_with_params(
model_params, self.llm_adapter
)
def stop(self) -> None:
if not self.model:
@ -109,9 +121,18 @@ class DefaultModelWorker(ModelWorker):
_clear_model_cache(self._model_params.device)
def generate_stream(self, params: Dict) -> Iterator[ModelOutput]:
span = root_tracer.start_span(
"DefaultModelWorker.generate_stream", params.get("span_id")
)
try:
params, model_context, generate_stream_func = self._prepare_generate_stream(
params
(
params,
model_context,
generate_stream_func,
model_span,
) = self._prepare_generate_stream(
params,
span_operation_name="DefaultModelWorker_call.generate_stream_func",
)
previous_response = ""
@ -127,8 +148,12 @@ class DefaultModelWorker(ModelWorker):
print(
f"\n\nfull stream output:\n{previous_response}\n\nmodel generate_stream params:\n{params}"
)
model_span.end(metadata={"output": previous_response})
span.end()
except Exception as e:
yield self._handle_exception(e)
output = self._handle_exception(e)
yield output
span.end(metadata={"error": output.to_dict()})
def generate(self, params: Dict) -> ModelOutput:
"""Generate non stream result"""
@ -141,9 +166,18 @@ class DefaultModelWorker(ModelWorker):
raise NotImplementedError
async def async_generate_stream(self, params: Dict) -> Iterator[ModelOutput]:
span = root_tracer.start_span(
"DefaultModelWorker.async_generate_stream", params.get("span_id")
)
try:
params, model_context, generate_stream_func = self._prepare_generate_stream(
params
(
params,
model_context,
generate_stream_func,
model_span,
) = self._prepare_generate_stream(
params,
span_operation_name="DefaultModelWorker_call.generate_stream_func",
)
previous_response = ""
@ -159,8 +193,12 @@ class DefaultModelWorker(ModelWorker):
print(
f"\n\nfull stream output:\n{previous_response}\n\nmodel generate_stream params:\n{params}"
)
model_span.end(metadata={"output": previous_response})
span.end()
except Exception as e:
yield self._handle_exception(e)
output = self._handle_exception(e)
yield output
span.end(metadata={"error": output.to_dict()})
async def async_generate(self, params: Dict) -> ModelOutput:
output = None
@ -168,7 +206,7 @@ class DefaultModelWorker(ModelWorker):
output = out
return output
def _prepare_generate_stream(self, params: Dict):
def _prepare_generate_stream(self, params: Dict, span_operation_name: str):
params, model_context = self.llm_adapter.model_adaptation(
params,
self.model_name,
@ -190,7 +228,30 @@ class DefaultModelWorker(ModelWorker):
)
str_prompt = params.get("prompt")
print(f"model prompt: \n\n{str_prompt}\n\n{stream_type}stream output:\n")
return params, model_context, generate_stream_func
generate_stream_func_str_name = "{}.{}".format(
generate_stream_func.__module__, generate_stream_func.__name__
)
span_params = {k: v for k, v in params.items()}
if "messages" in span_params:
span_params["messages"] = list(
map(lambda m: m.dict(), span_params["messages"])
)
model_span = root_tracer.start_span(
span_operation_name,
metadata={
"prompt": str_prompt,
"params": span_params,
"is_async_func": self.support_async(),
"llm_adapter": str(self.llm_adapter),
"generate_stream_func": generate_stream_func_str_name,
"model_context": model_context,
},
)
return params, model_context, generate_stream_func, model_span
def _handle_output(self, output, previous_response, model_context):
if isinstance(output, dict):

View File

@ -8,12 +8,13 @@ import sys
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import asdict
from typing import Awaitable, Callable, Dict, Iterator, List, Optional
from typing import Awaitable, Callable, Dict, Iterator, List
from fastapi import APIRouter, FastAPI
from fastapi.responses import StreamingResponse
from pilot.component import SystemApp
from pilot.configs.model_config import LOGDIR
from pilot.model.base import (
ModelInstance,
ModelOutput,
@ -35,8 +36,10 @@ from pilot.utils.parameter_utils import (
EnvArgumentParser,
ParameterDescription,
_dict_to_command_args,
_get_dict_from_obj,
)
from pilot.utils.utils import setup_logging
from pilot.utils.tracer import initialize_tracer, root_tracer, SpanType, SpanTypeRunName
logger = logging.getLogger(__name__)
@ -293,60 +296,72 @@ class LocalWorkerManager(WorkerManager):
self, params: Dict, async_wrapper=None, **kwargs
) -> Iterator[ModelOutput]:
"""Generate stream result, chat scene"""
try:
worker_run_data = await self._get_model(params)
except Exception as e:
yield ModelOutput(
text=f"**LLMServer Generate Error, Please CheckErrorInfo.**: {e}",
error_code=0,
)
return
async with worker_run_data.semaphore:
if worker_run_data.worker.support_async():
async for outout in worker_run_data.worker.async_generate_stream(
params
):
yield outout
else:
if not async_wrapper:
from starlette.concurrency import iterate_in_threadpool
with root_tracer.start_span(
"WorkerManager.generate_stream", params.get("span_id")
) as span:
params["span_id"] = span.span_id
try:
worker_run_data = await self._get_model(params)
except Exception as e:
yield ModelOutput(
text=f"**LLMServer Generate Error, Please CheckErrorInfo.**: {e}",
error_code=0,
)
return
async with worker_run_data.semaphore:
if worker_run_data.worker.support_async():
async for outout in worker_run_data.worker.async_generate_stream(
params
):
yield outout
else:
if not async_wrapper:
from starlette.concurrency import iterate_in_threadpool
async_wrapper = iterate_in_threadpool
async for output in async_wrapper(
worker_run_data.worker.generate_stream(params)
):
yield output
async_wrapper = iterate_in_threadpool
async for output in async_wrapper(
worker_run_data.worker.generate_stream(params)
):
yield output
async def generate(self, params: Dict) -> ModelOutput:
"""Generate non stream result"""
try:
worker_run_data = await self._get_model(params)
except Exception as e:
return ModelOutput(
text=f"**LLMServer Generate Error, Please CheckErrorInfo.**: {e}",
error_code=0,
)
async with worker_run_data.semaphore:
if worker_run_data.worker.support_async():
return await worker_run_data.worker.async_generate(params)
else:
return await self.run_blocking_func(
worker_run_data.worker.generate, params
with root_tracer.start_span(
"WorkerManager.generate", params.get("span_id")
) as span:
params["span_id"] = span.span_id
try:
worker_run_data = await self._get_model(params)
except Exception as e:
return ModelOutput(
text=f"**LLMServer Generate Error, Please CheckErrorInfo.**: {e}",
error_code=0,
)
async with worker_run_data.semaphore:
if worker_run_data.worker.support_async():
return await worker_run_data.worker.async_generate(params)
else:
return await self.run_blocking_func(
worker_run_data.worker.generate, params
)
async def embeddings(self, params: Dict) -> List[List[float]]:
"""Embed input"""
try:
worker_run_data = await self._get_model(params, worker_type="text2vec")
except Exception as e:
raise e
async with worker_run_data.semaphore:
if worker_run_data.worker.support_async():
return await worker_run_data.worker.async_embeddings(params)
else:
return await self.run_blocking_func(
worker_run_data.worker.embeddings, params
)
with root_tracer.start_span(
"WorkerManager.embeddings", params.get("span_id")
) as span:
params["span_id"] = span.span_id
try:
worker_run_data = await self._get_model(params, worker_type="text2vec")
except Exception as e:
raise e
async with worker_run_data.semaphore:
if worker_run_data.worker.support_async():
return await worker_run_data.worker.async_embeddings(params)
else:
return await self.run_blocking_func(
worker_run_data.worker.embeddings, params
)
def sync_embeddings(self, params: Dict) -> List[List[float]]:
worker_run_data = self._sync_get_model(params, worker_type="text2vec")
@ -608,6 +623,9 @@ async def generate_json_stream(params):
@router.post("/worker/generate_stream")
async def api_generate_stream(request: PromptRequest):
params = request.dict(exclude_none=True)
span_id = root_tracer.get_current_span_id()
if "span_id" not in params and span_id:
params["span_id"] = span_id
generator = generate_json_stream(params)
return StreamingResponse(generator)
@ -615,12 +633,18 @@ async def api_generate_stream(request: PromptRequest):
@router.post("/worker/generate")
async def api_generate(request: PromptRequest):
params = request.dict(exclude_none=True)
span_id = root_tracer.get_current_span_id()
if "span_id" not in params and span_id:
params["span_id"] = span_id
return await worker_manager.generate(params)
@router.post("/worker/embeddings")
async def api_embeddings(request: EmbeddingsRequest):
params = request.dict(exclude_none=True)
span_id = root_tracer.get_current_span_id()
if "span_id" not in params and span_id:
params["span_id"] = span_id
return await worker_manager.embeddings(params)
@ -705,8 +729,15 @@ def _parse_worker_params(
model_name: str = None, model_path: str = None, **kwargs
) -> ModelWorkerParameters:
worker_args = EnvArgumentParser()
env_prefix = None
if model_name:
env_prefix = EnvArgumentParser.get_env_prefix(model_name)
worker_params: ModelWorkerParameters = worker_args.parse_args_into_dataclass(
ModelWorkerParameters, model_name=model_name, model_path=model_path, **kwargs
ModelWorkerParameters,
env_prefix=env_prefix,
model_name=model_name,
model_path=model_path,
**kwargs,
)
env_prefix = EnvArgumentParser.get_env_prefix(worker_params.model_name)
# Read parameters agein with prefix of model name.
@ -801,10 +832,18 @@ def _build_worker(worker_params: ModelWorkerParameters):
def _start_local_worker(
worker_manager: WorkerManagerAdapter, worker_params: ModelWorkerParameters
):
worker = _build_worker(worker_params)
if not worker_manager.worker_manager:
worker_manager.worker_manager = _create_local_model_manager(worker_params)
worker_manager.worker_manager.add_worker(worker, worker_params)
with root_tracer.start_span(
"WorkerManager._start_local_worker",
span_type=SpanType.RUN,
metadata={
"run_service": SpanTypeRunName.WORKER_MANAGER,
"params": _get_dict_from_obj(worker_params),
},
):
worker = _build_worker(worker_params)
if not worker_manager.worker_manager:
worker_manager.worker_manager = _create_local_model_manager(worker_params)
worker_manager.worker_manager.add_worker(worker, worker_params)
def _start_local_embedding_worker(
@ -928,17 +967,17 @@ def run_worker_manager(
# Run worker manager independently
embedded_mod = False
app = _setup_fastapi(worker_params)
_start_local_worker(worker_manager, worker_params)
_start_local_embedding_worker(
worker_manager, embedding_model_name, embedding_model_path
)
else:
_start_local_worker(worker_manager, worker_params)
_start_local_embedding_worker(
worker_manager, embedding_model_name, embedding_model_path
)
loop = asyncio.get_event_loop()
loop.run_until_complete(worker_manager.start())
system_app = SystemApp(app)
initialize_tracer(
system_app,
os.path.join(LOGDIR, "dbgpt_model_worker_manager_tracer.jsonl"),
root_operation_name="DB-GPT-WorkerManager-Entry",
)
_start_local_worker(worker_manager, worker_params)
_start_local_embedding_worker(
worker_manager, embedding_model_name, embedding_model_path
)
if include_router:
app.include_router(router, prefix="/api")
@ -946,6 +985,8 @@ def run_worker_manager(
if not embedded_mod:
import uvicorn
loop = asyncio.get_event_loop()
loop.run_until_complete(worker_manager.start())
uvicorn.run(
app, host=worker_params.host, port=worker_params.port, log_level="info"
)

View File

@ -186,6 +186,13 @@ class OldLLMModelAdaperWrapper(LLMModelAdaper):
def get_generate_stream_function(self, model, model_path: str):
return self._chat_adapter.get_generate_stream_func(model_path)
def __str__(self) -> str:
return "{}({}.{})".format(
self.__class__.__name__,
self._adapter.__class__.__module__,
self._adapter.__class__.__name__,
)
class FastChatLLMModelAdaperWrapper(LLMModelAdaper):
"""Wrapping fastchat adapter"""
@ -206,6 +213,13 @@ class FastChatLLMModelAdaperWrapper(LLMModelAdaper):
) -> "Conversation":
return self._adapter.get_default_conv_template(model_path)
def __str__(self) -> str:
return "{}({}.{})".format(
self.__class__.__name__,
self._adapter.__class__.__module__,
self._adapter.__class__.__name__,
)
def get_conv_template(name: str) -> "Conversation":
"""Get a conversation template."""
@ -412,6 +426,9 @@ class VLLMModelAdaperWrapper(LLMModelAdaper):
) -> "Conversation":
return _auto_get_conv_template(model_name, model_path)
def __str__(self) -> str:
return "{}.{}".format(self.__class__.__module__, self.__class__.__name__)
# Covering the configuration of fastcaht, we will regularly feedback the code here to fastchat.
# We also recommend that you modify it directly in the fastchat repository.

View File

@ -46,6 +46,7 @@ from pilot.summary.db_summary_client import DBSummaryClient
from pilot.model.cluster import BaseModelController, WorkerManager, WorkerManagerFactory
from pilot.model.base import FlatSupportedModel
from pilot.utils.tracer import root_tracer, SpanType
router = APIRouter()
CFG = Config()
@ -366,7 +367,10 @@ async def chat_completions(dialogue: ConversationVo = Body()):
print(
f"chat_completions:{dialogue.chat_mode},{dialogue.select_param},{dialogue.model_name}"
)
chat: BaseChat = get_chat_instance(dialogue)
with root_tracer.start_span(
"get_chat_instance", span_type=SpanType.CHAT, metadata=dialogue.dict()
):
chat: BaseChat = get_chat_instance(dialogue)
# background_tasks = BackgroundTasks()
# background_tasks.add_task(release_model_semaphore)
headers = {
@ -417,9 +421,10 @@ async def model_supports(worker_manager: WorkerManager = Depends(get_worker_mana
async def no_stream_generator(chat):
msg = await chat.nostream_call()
msg = msg.replace("\n", "\\n")
yield f"data: {msg}\n\n"
with root_tracer.start_span("no_stream_generator"):
msg = await chat.nostream_call()
msg = msg.replace("\n", "\\n")
yield f"data: {msg}\n\n"
async def stream_generator(chat, incremental: bool, model_name: str):
@ -436,6 +441,7 @@ async def stream_generator(chat, incremental: bool, model_name: str):
Yields:
_type_: streaming responses
"""
span = root_tracer.start_span("stream_generator")
msg = "[LLM_ERROR]: llm server has no output, maybe your prompt template is wrong."
stream_id = f"chatcmpl-{str(uuid.uuid1())}"
@ -464,6 +470,7 @@ async def stream_generator(chat, incremental: bool, model_name: str):
await asyncio.sleep(0.02)
if incremental:
yield "data: [DONE]\n\n"
span.end()
chat.current_message.add_ai_message(msg)
chat.current_message.add_view_message(msg)
chat.memory.append(chat.current_message)

View File

@ -15,6 +15,7 @@ from pilot.prompts.prompt_new import PromptTemplate
from pilot.scene.base_message import ModelMessage, ModelMessageRoleType
from pilot.scene.message import OnceConversation
from pilot.utils import get_or_create_event_loop
from pilot.utils.tracer import root_tracer
from pydantic import Extra
logger = logging.getLogger(__name__)
@ -135,6 +136,14 @@ class BaseChat(ABC):
}
return payload
def _get_span_metadata(self, payload: Dict) -> Dict:
metadata = {k: v for k, v in payload.items()}
del metadata["prompt"]
metadata["messages"] = list(
map(lambda m: m if isinstance(m, dict) else m.dict(), metadata["messages"])
)
return metadata
async def stream_call(self):
# TODO Retry when server connection error
payload = self.__call_base()
@ -142,6 +151,10 @@ class BaseChat(ABC):
self.skip_echo_len = len(payload.get("prompt").replace("</s>", " ")) + 11
logger.info(f"Request: \n{payload}")
ai_response_text = ""
span = root_tracer.start_span(
"BaseChat.stream_call", metadata=self._get_span_metadata(payload)
)
payload["span_id"] = span.span_id
try:
from pilot.model.cluster import WorkerManagerFactory
@ -150,6 +163,7 @@ class BaseChat(ABC):
).create()
async for output in worker_manager.generate_stream(payload):
yield output
span.end()
except Exception as e:
print(traceback.format_exc())
logger.error("model response parase faild" + str(e))
@ -158,11 +172,16 @@ class BaseChat(ABC):
)
### store current conversation
self.memory.append(self.current_message)
span.end(metadata={"error": str(e)})
async def nostream_call(self):
payload = self.__call_base()
logger.info(f"Request: \n{payload}")
ai_response_text = ""
span = root_tracer.start_span(
"BaseChat.nostream_call", metadata=self._get_span_metadata(payload)
)
payload["span_id"] = span.span_id
try:
from pilot.model.cluster import WorkerManagerFactory
@ -170,7 +189,8 @@ class BaseChat(ABC):
ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory
).create()
model_output = await worker_manager.generate(payload)
with root_tracer.start_span("BaseChat.invoke_worker_manager.generate"):
model_output = await worker_manager.generate(payload)
### output parse
ai_response_text = (
@ -185,8 +205,14 @@ class BaseChat(ABC):
ai_response_text
)
)
### run
result = self.do_action(prompt_define_response)
metadata = {
"model_output": model_output.to_dict(),
"ai_response_text": ai_response_text,
"prompt_define_response": prompt_define_response,
}
with root_tracer.start_span("BaseChat.do_action", metadata=metadata):
### run
result = self.do_action(prompt_define_response)
### llm speaker
speak_to_user = self.get_llm_speak(prompt_define_response)
@ -195,12 +221,14 @@ class BaseChat(ABC):
speak_to_user, result
)
self.current_message.add_view_message(view_message)
span.end()
except Exception as e:
print(traceback.format_exc())
logger.error("model response parase faild" + str(e))
self.current_message.add_view_message(
f"""<span style=\"color:red\">ERROR!</span>{str(e)}\n {ai_response_text} """
)
span.end(metadata={"error": str(e)})
### store dialogue
self.memory.append(self.current_message)
return self.current_ai_response()

View File

@ -119,6 +119,14 @@ except ImportError as e:
logging.warning(f"Integrating dbgpt knowledge command line tool failed: {e}")
try:
from pilot.utils.tracer.tracer_cli import trace_cli_group
add_command_alias(trace_cli_group, name="trace", parent_group=cli)
except ImportError as e:
logging.warning(f"Integrating dbgpt trace command line tool failed: {e}")
def main():
return cli()

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, Type
import os
from pilot.component import ComponentType, SystemApp
from pilot.utils.executor_utils import DefaultExecutorFactory

View File

@ -7,7 +7,7 @@ ROOT_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__fi
sys.path.append(ROOT_PATH)
import signal
from pilot.configs.config import Config
from pilot.configs.model_config import LLM_MODEL_CONFIG, EMBEDDING_MODEL_CONFIG
from pilot.configs.model_config import LLM_MODEL_CONFIG, EMBEDDING_MODEL_CONFIG, LOGDIR
from pilot.component import SystemApp
from pilot.server.base import (
@ -38,6 +38,8 @@ from pilot.utils.utils import (
_get_logging_level,
logging_str_to_uvicorn_level,
)
from pilot.utils.tracer import root_tracer, initialize_tracer, SpanType, SpanTypeRunName
from pilot.utils.parameter_utils import _get_dict_from_obj
static_file_path = os.path.join(os.getcwd(), "server/static")
@ -98,17 +100,21 @@ def mount_static_files(app):
app.add_exception_handler(RequestValidationError, validation_exception_handler)
def _get_webserver_params(args: List[str] = None):
from pilot.utils.parameter_utils import EnvArgumentParser
parser: argparse.ArgumentParser = EnvArgumentParser.create_argparse_option(
WebWerverParameters
)
return WebWerverParameters(**vars(parser.parse_args(args=args)))
def initialize_app(param: WebWerverParameters = None, args: List[str] = None):
"""Initialize app
If you use gunicorn as a process manager, initialize_app can be invoke in `on_starting` hook.
"""
if not param:
from pilot.utils.parameter_utils import EnvArgumentParser
parser: argparse.ArgumentParser = EnvArgumentParser.create_argparse_option(
WebWerverParameters
)
param = WebWerverParameters(**vars(parser.parse_args(args=args)))
param = _get_webserver_params(args)
if not param.log_level:
param.log_level = _get_logging_level()
@ -127,7 +133,7 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None):
model_start_listener = _create_model_start_listener(system_app)
initialize_components(param, system_app, embedding_model_name, embedding_model_path)
model_path = LLM_MODEL_CONFIG[CFG.LLM_MODEL]
model_path = LLM_MODEL_CONFIG.get(CFG.LLM_MODEL)
if not param.light:
print("Model Unified Deployment Mode!")
if not param.remote_embedding:
@ -174,8 +180,20 @@ def run_uvicorn(param: WebWerverParameters):
def run_webserver(param: WebWerverParameters = None):
param = initialize_app(param)
run_uvicorn(param)
if not param:
param = _get_webserver_params()
initialize_tracer(system_app, os.path.join(LOGDIR, "dbgpt_webserver_tracer.jsonl"))
with root_tracer.start_span(
"run_webserver",
span_type=SpanType.RUN,
metadata={
"run_service": SpanTypeRunName.WEBSERVER,
"params": _get_dict_from_obj(param),
},
):
param = initialize_app(param)
run_uvicorn(param)
if __name__ == "__main__":

View File

@ -13,7 +13,7 @@ from pilot.model.cluster import run_worker_manager
CFG = Config()
model_path = LLM_MODEL_CONFIG[CFG.LLM_MODEL]
model_path = LLM_MODEL_CONFIG.get(CFG.LLM_MODEL)
if __name__ == "__main__":
run_worker_manager(

View File

@ -1,6 +1,6 @@
import argparse
import os
from dataclasses import dataclass, fields, MISSING, asdict, field
from dataclasses import dataclass, fields, MISSING, asdict, field, is_dataclass
from typing import Any, List, Optional, Type, Union, Callable, Dict
from collections import OrderedDict
@ -590,6 +590,20 @@ def _extract_parameter_details(
return descriptions
def _get_dict_from_obj(obj, default_value=None) -> Optional[Dict]:
if not obj:
return None
if is_dataclass(type(obj)):
params = {}
for field_info in fields(obj):
value = _get_simple_privacy_field_value(obj, field_info)
params[field_info.name] = value
return params
if isinstance(obj, dict):
return obj
return default_value
class _SimpleArgParser:
def __init__(self, *args):
self.params = {arg.replace("_", "-"): None for arg in args}

View File

@ -0,0 +1,32 @@
from pilot.utils.tracer.base import (
SpanType,
Span,
SpanTypeRunName,
Tracer,
SpanStorage,
SpanStorageType,
TracerContext,
)
from pilot.utils.tracer.span_storage import MemorySpanStorage, FileSpanStorage
from pilot.utils.tracer.tracer_impl import (
root_tracer,
initialize_tracer,
DefaultTracer,
TracerManager,
)
__all__ = [
"SpanType",
"Span",
"SpanTypeRunName",
"Tracer",
"SpanStorage",
"SpanStorageType",
"TracerContext",
"MemorySpanStorage",
"FileSpanStorage",
"root_tracer",
"initialize_tracer",
"DefaultTracer",
"TracerManager",
]

184
pilot/utils/tracer/base.py Normal file
View File

@ -0,0 +1,184 @@
from __future__ import annotations
from typing import Dict, Callable, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
from enum import Enum
import uuid
from datetime import datetime
from pilot.component import BaseComponent, SystemApp, ComponentType
class SpanType(str, Enum):
BASE = "base"
RUN = "run"
CHAT = "chat"
class SpanTypeRunName(str, Enum):
WEBSERVER = "Webserver"
WORKER_MANAGER = "WorkerManager"
MODEL_WORKER = "ModelWorker"
EMBEDDING_MODEL = "EmbeddingModel"
@staticmethod
def values():
return [item.value for item in SpanTypeRunName]
class Span:
"""Represents a unit of work that is being traced.
This can be any operation like a function call or a database query.
"""
def __init__(
self,
trace_id: str,
span_id: str,
span_type: SpanType = None,
parent_span_id: str = None,
operation_name: str = None,
metadata: Dict = None,
end_caller: Callable[[Span], None] = None,
):
if not span_type:
span_type = SpanType.BASE
self.span_type = span_type
# The unique identifier for the entire trace
self.trace_id = trace_id
# Unique identifier for this span within the trace
self.span_id = span_id
# Identifier of the parent span, if this is a child span
self.parent_span_id = parent_span_id
# Descriptive name for the operation being traced
self.operation_name = operation_name
# Timestamp when this span started
self.start_time = datetime.now()
# Timestamp when this span ended, initially None
self.end_time = None
# Additional metadata associated with the span
self.metadata = metadata
self._end_callers = []
if end_caller:
self._end_callers.append(end_caller)
def end(self, **kwargs):
"""Mark the end of this span by recording the current time."""
self.end_time = datetime.now()
if "metadata" in kwargs:
self.metadata = kwargs.get("metadata")
for caller in self._end_callers:
caller(self)
def add_end_caller(self, end_caller: Callable[[Span], None]):
if end_caller:
self._end_callers.append(end_caller)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.end()
return False
def to_dict(self) -> Dict:
return {
"span_type": self.span_type.value,
"trace_id": self.trace_id,
"span_id": self.span_id,
"parent_span_id": self.parent_span_id,
"operation_name": self.operation_name,
"start_time": None
if not self.start_time
else self.start_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
"end_time": None
if not self.end_time
else self.end_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
"metadata": self.metadata,
}
class SpanStorageType(str, Enum):
ON_CREATE = "on_create"
ON_END = "on_end"
ON_CREATE_END = "on_create_end"
class SpanStorage(BaseComponent, ABC):
"""Abstract base class for storing spans.
This allows different storage mechanisms (e.g., in-memory, database) to be implemented.
"""
name = ComponentType.TRACER_SPAN_STORAGE.value
def init_app(self, system_app: SystemApp):
"""Initialize the storage with the given application context."""
pass
@abstractmethod
def append_span(self, span: Span):
"""Store the given span. This needs to be implemented by subclasses."""
class Tracer(BaseComponent, ABC):
"""Abstract base class for tracing operations.
Provides the core logic for starting, ending, and retrieving spans.
"""
name = ComponentType.TRACER.value
def __init__(self, system_app: SystemApp | None = None):
super().__init__(system_app)
self.system_app = system_app # Application context
def init_app(self, system_app: SystemApp):
"""Initialize the tracer with the given application context."""
self.system_app = system_app
@abstractmethod
def append_span(self, span: Span):
"""Append the given span to storage. This needs to be implemented by subclasses."""
@abstractmethod
def start_span(
self,
operation_name: str,
parent_span_id: str = None,
span_type: SpanType = None,
metadata: Dict = None,
) -> Span:
"""Begin a new span for the given operation. If provided, the span will be
a child of the span with the given parent_span_id.
"""
@abstractmethod
def end_span(self, span: Span, **kwargs):
"""
End the given span.
"""
@abstractmethod
def get_current_span(self) -> Optional[Span]:
"""
Retrieve the span that is currently being traced.
"""
@abstractmethod
def _get_current_storage(self) -> SpanStorage:
"""
Get the storage mechanism currently in use for storing spans.
This needs to be implemented by subclasses.
"""
def _new_uuid(self) -> str:
"""
Generate a new unique identifier.
"""
return str(uuid.uuid4())
@dataclass
class TracerContext:
span_id: Optional[str] = None

View File

@ -0,0 +1,79 @@
import os
import json
import time
import threading
import queue
import logging
from pilot.component import SystemApp
from pilot.utils.tracer.base import Span, SpanStorage
logger = logging.getLogger(__name__)
class MemorySpanStorage(SpanStorage):
def __init__(self, system_app: SystemApp | None = None):
super().__init__(system_app)
self.spans = []
self._lock = threading.Lock()
def append_span(self, span: Span):
with self._lock:
self.spans.append(span)
class FileSpanStorage(SpanStorage):
def __init__(self, filename: str, batch_size=10, flush_interval=10):
super().__init__()
self.filename = filename
self.queue = queue.Queue()
self.batch_size = batch_size
self.flush_interval = flush_interval
self.last_flush_time = time.time()
self.flush_signal_queue = queue.Queue()
if not os.path.exists(filename):
with open(filename, "w") as _:
pass
self.flush_thread = threading.Thread(target=self._flush_to_file, daemon=True)
self.flush_thread.start()
def append_span(self, span: Span):
span_data = span.to_dict()
logger.debug(f"append span: {span_data}")
self.queue.put(span_data)
if self.queue.qsize() >= self.batch_size:
try:
self.flush_signal_queue.put_nowait(True)
except queue.Full:
pass # If the signal queue is full, it's okay. The flush thread will handle it.
def _write_to_file(self):
spans_to_write = []
while not self.queue.empty():
spans_to_write.append(self.queue.get())
with open(self.filename, "a") as file:
for span_data in spans_to_write:
try:
file.write(json.dumps(span_data, ensure_ascii=False) + "\n")
except Exception as e:
logger.warning(
f"Write span to file failed: {str(e)}, span_data: {span_data}"
)
def _flush_to_file(self):
while True:
interval = time.time() - self.last_flush_time
if interval < self.flush_interval:
try:
self.flush_signal_queue.get(
block=True, timeout=self.flush_interval - interval
)
except Exception:
# Timeout
pass
self._write_to_file()
self.last_flush_time = time.time()

View File

View File

@ -0,0 +1,122 @@
from typing import Dict
from pilot.component import SystemApp
from pilot.utils.tracer import Span, SpanStorage, Tracer
# Mock implementations
class MockSpanStorage(SpanStorage):
def __init__(self):
self.spans = []
def append_span(self, span: Span):
self.spans.append(span)
class MockTracer(Tracer):
def __init__(self, system_app: SystemApp | None = None):
super().__init__(system_app)
self.current_span = None
self.storage = MockSpanStorage()
def append_span(self, span: Span):
self.storage.append_span(span)
def start_span(
self, operation_name: str, parent_span_id: str = None, metadata: Dict = None
) -> Span:
trace_id = (
self._new_uuid() if parent_span_id is None else parent_span_id.split(":")[0]
)
span_id = f"{trace_id}:{self._new_uuid()}"
span = Span(trace_id, span_id, parent_span_id, operation_name, metadata)
self.current_span = span
return span
def end_span(self, span: Span):
span.end()
self.append_span(span)
def get_current_span(self) -> Span:
return self.current_span
def _get_current_storage(self) -> SpanStorage:
return self.storage
# Tests
def test_span_creation():
span = Span("trace_id", "span_id", "parent_span_id", "operation", {"key": "value"})
assert span.trace_id == "trace_id"
assert span.span_id == "span_id"
assert span.parent_span_id == "parent_span_id"
assert span.operation_name == "operation"
assert span.metadata == {"key": "value"}
def test_span_end():
span = Span("trace_id", "span_id")
assert span.end_time is None
span.end()
assert span.end_time is not None
def test_mock_tracer_start_span():
tracer = MockTracer()
span = tracer.start_span("operation")
assert span.operation_name == "operation"
assert tracer.get_current_span() == span
def test_mock_tracer_end_span():
tracer = MockTracer()
span = tracer.start_span("operation")
tracer.end_span(span)
assert span in tracer._get_current_storage().spans
def test_mock_tracer_append_span():
tracer = MockTracer()
span = Span("trace_id", "span_id")
tracer.append_span(span)
assert span in tracer._get_current_storage().spans
def test_parent_child_span_relation():
tracer = MockTracer()
# Start a parent span
parent_span = tracer.start_span("parent_operation")
# Start a child span with parent span's ID
child_span = tracer.start_span(
"child_operation", parent_span_id=parent_span.span_id
)
# Assert the relationships
assert child_span.parent_span_id == parent_span.span_id
assert (
child_span.trace_id == parent_span.trace_id
) # Assuming children share the same trace ID
# End spans
tracer.end_span(child_span)
tracer.end_span(parent_span)
# Assert they are in the storage
assert child_span in tracer._get_current_storage().spans
assert parent_span in tracer._get_current_storage().spans
# This test checks if unique UUIDs are being generated.
# Note: This is a simple test and doesn't guarantee uniqueness for large numbers of UUIDs.
def test_new_uuid_unique():
tracer = MockTracer()
uuid_set = {tracer._new_uuid() for _ in range(1000)}
assert len(uuid_set) == 1000

View File

@ -0,0 +1,124 @@
import os
import pytest
import asyncio
import json
import tempfile
import time
from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span
@pytest.fixture
def storage(request):
if not request or not hasattr(request, "param"):
batch_size = 10
flush_interval = 10
file_does_not_exist = False
else:
batch_size = request.param.get("batch_size", 10)
flush_interval = request.param.get("flush_interval", 10)
file_does_not_exist = request.param.get("file_does_not_exist", False)
if file_does_not_exist:
with tempfile.TemporaryDirectory() as tmp_dir:
filename = os.path.join(tmp_dir, "non_existent_file.jsonl")
storage_instance = FileSpanStorage(
filename, batch_size=batch_size, flush_interval=flush_interval
)
yield storage_instance
else:
with tempfile.NamedTemporaryFile(delete=True) as tmp_file:
filename = tmp_file.name
storage_instance = FileSpanStorage(
filename, batch_size=batch_size, flush_interval=flush_interval
)
yield storage_instance
def read_spans_from_file(filename):
with open(filename, "r") as f:
return [json.loads(line) for line in f.readlines()]
@pytest.mark.parametrize(
"storage", [{"batch_size": 1, "flush_interval": 5}], indirect=True
)
def test_write_span(storage: SpanStorage):
span = Span("1", "a", "b", "op1")
storage.append_span(span)
time.sleep(0.1)
spans_in_file = read_spans_from_file(storage.filename)
assert len(spans_in_file) == 1
assert spans_in_file[0]["trace_id"] == "1"
@pytest.mark.parametrize(
"storage", [{"batch_size": 1, "flush_interval": 5}], indirect=True
)
def test_incremental_write(storage: SpanStorage):
span1 = Span("1", "a", "b", "op1")
span2 = Span("2", "c", "d", "op2")
storage.append_span(span1)
storage.append_span(span2)
time.sleep(0.1)
spans_in_file = read_spans_from_file(storage.filename)
assert len(spans_in_file) == 2
@pytest.mark.parametrize(
"storage", [{"batch_size": 2, "flush_interval": 5}], indirect=True
)
def test_sync_and_async_append(storage: SpanStorage):
span = Span("1", "a", "b", "op1")
storage.append_span(span)
async def async_append():
storage.append_span(span)
asyncio.run(async_append())
time.sleep(0.1)
spans_in_file = read_spans_from_file(storage.filename)
assert len(spans_in_file) == 2
@pytest.mark.asyncio
async def test_flush_policy(storage: SpanStorage):
span = Span("1", "a", "b", "op1")
for _ in range(storage.batch_size - 1):
storage.append_span(span)
spans_in_file = read_spans_from_file(storage.filename)
assert len(spans_in_file) == 0
# Trigger batch write
storage.append_span(span)
await asyncio.sleep(0.1)
spans_in_file = read_spans_from_file(storage.filename)
assert len(spans_in_file) == storage.batch_size
@pytest.mark.parametrize(
"storage", [{"batch_size": 2, "file_does_not_exist": True}], indirect=True
)
def test_non_existent_file(storage: SpanStorage):
span = Span("1", "a", "b", "op1")
span2 = Span("2", "c", "d", "op2")
storage.append_span(span)
time.sleep(0.1)
spans_in_file = read_spans_from_file(storage.filename)
assert len(spans_in_file) == 0
storage.append_span(span2)
time.sleep(0.1)
spans_in_file = read_spans_from_file(storage.filename)
assert len(spans_in_file) == 2
assert spans_in_file[0]["trace_id"] == "1"
assert spans_in_file[1]["trace_id"] == "2"

View File

@ -0,0 +1,103 @@
import pytest
from pilot.utils.tracer import (
Span,
SpanStorageType,
SpanStorage,
DefaultTracer,
TracerManager,
Tracer,
MemorySpanStorage,
)
from pilot.component import SystemApp
@pytest.fixture
def system_app():
return SystemApp()
@pytest.fixture
def storage(system_app: SystemApp):
ms = MemorySpanStorage(system_app)
system_app.register_instance(ms)
return ms
@pytest.fixture
def tracer(request, system_app: SystemApp):
if not request or not hasattr(request, "param"):
return DefaultTracer(system_app)
else:
span_storage_type = request.param.get(
"span_storage_type", SpanStorageType.ON_CREATE_END
)
return DefaultTracer(system_app, span_storage_type=span_storage_type)
@pytest.fixture
def tracer_manager(system_app: SystemApp, tracer: Tracer):
system_app.register_instance(tracer)
manager = TracerManager()
manager.initialize(system_app)
return manager
def test_start_and_end_span(tracer: Tracer):
span = tracer.start_span("operation")
assert isinstance(span, Span)
assert span.operation_name == "operation"
tracer.end_span(span)
assert span.end_time is not None
stored_span = tracer._get_current_storage().spans[0]
assert stored_span == span
def test_start_and_end_span_with_tracer_manager(tracer_manager: TracerManager):
span = tracer_manager.start_span("operation")
assert isinstance(span, Span)
assert span.operation_name == "operation"
tracer_manager.end_span(span)
assert span.end_time is not None
def test_parent_child_span_relation(tracer: Tracer):
parent_span = tracer.start_span("parent_operation")
child_span = tracer.start_span(
"child_operation", parent_span_id=parent_span.span_id
)
assert child_span.parent_span_id == parent_span.span_id
assert child_span.trace_id == parent_span.trace_id
tracer.end_span(child_span)
tracer.end_span(parent_span)
assert parent_span in tracer._get_current_storage().spans
assert child_span in tracer._get_current_storage().spans
@pytest.mark.parametrize(
"tracer, expected_count, after_create_inc_count",
[
({"span_storage_type": SpanStorageType.ON_CREATE}, 1, 1),
({"span_storage_type": SpanStorageType.ON_END}, 1, 0),
({"span_storage_type": SpanStorageType.ON_CREATE_END}, 2, 1),
],
indirect=["tracer"],
)
def test_tracer_span_storage_type_and_with(
tracer: Tracer,
expected_count: int,
after_create_inc_count: int,
storage: SpanStorage,
):
span = tracer.start_span("new_span")
span.end()
assert len(storage.spans) == expected_count
with tracer.start_span("with_span") as ws:
assert len(storage.spans) == expected_count + after_create_inc_count
assert len(storage.spans) == expected_count + expected_count

View File

@ -0,0 +1,572 @@
import os
import click
import logging
import glob
import json
from datetime import datetime
from typing import Iterable, Dict, Callable
from pilot.configs.model_config import LOGDIR
from pilot.utils.tracer import SpanType, SpanTypeRunName
logger = logging.getLogger("dbgpt_cli")
_DEFAULT_FILE_PATTERN = os.path.join(LOGDIR, "dbgpt*.jsonl")
@click.group("trace")
def trace_cli_group():
"""Analyze and visualize trace spans."""
pass
@trace_cli_group.command()
@click.option(
"--trace_id",
required=False,
type=str,
default=None,
show_default=True,
help="Specify the trace ID to list",
)
@click.option(
"--span_id",
required=False,
type=str,
default=None,
show_default=True,
help="Specify the Span ID to list.",
)
@click.option(
"--span_type",
required=False,
type=str,
default=None,
show_default=True,
help="Specify the Span Type to list.",
)
@click.option(
"--parent_span_id",
required=False,
type=str,
default=None,
show_default=True,
help="Specify the Parent Span ID to list.",
)
@click.option(
"--search",
required=False,
type=str,
default=None,
show_default=True,
help="Search trace_id, span_id, parent_span_id, operation_name or content in metadata.",
)
@click.option(
"-l",
"--limit",
type=int,
default=20,
help="Limit the number of recent span displayed.",
)
@click.option(
"--start_time",
type=str,
help='Filter by start time. Format: "YYYY-MM-DD HH:MM:SS.mmm"',
)
@click.option(
"--end_time", type=str, help='Filter by end time. Format: "YYYY-MM-DD HH:MM:SS.mmm"'
)
@click.option(
"--desc",
required=False,
type=bool,
default=False,
is_flag=True,
help="Whether to use reverse sorting. By default, sorting is based on start time.",
)
@click.option(
"--output",
required=False,
type=click.Choice(["text", "html", "csv", "latex", "json"]),
default="text",
help="The output format",
)
@click.argument("files", nargs=-1, type=click.Path(exists=True, readable=True))
def list(
trace_id: str,
span_id: str,
span_type: str,
parent_span_id: str,
search: str,
limit: int,
start_time: str,
end_time: str,
desc: bool,
output: str,
files=None,
):
"""List your trace spans"""
from prettytable import PrettyTable
# If no files are explicitly specified, use the default pattern to get them
spans = read_spans_from_files(files)
if trace_id:
spans = filter(lambda s: s["trace_id"] == trace_id, spans)
if span_id:
spans = filter(lambda s: s["span_id"] == span_id, spans)
if span_type:
spans = filter(lambda s: s["span_type"] == span_type, spans)
if parent_span_id:
spans = filter(lambda s: s["parent_span_id"] == parent_span_id, spans)
# Filter spans based on the start and end times
if start_time:
start_dt = _parse_datetime(start_time)
spans = filter(
lambda span: _parse_datetime(span["start_time"]) >= start_dt, spans
)
if end_time:
end_dt = _parse_datetime(end_time)
spans = filter(
lambda span: _parse_datetime(span["start_time"]) <= end_dt, spans
)
if search:
spans = filter(_new_search_span_func(search), spans)
# Sort spans based on the start time
spans = sorted(
spans, key=lambda span: _parse_datetime(span["start_time"]), reverse=desc
)[:limit]
table = PrettyTable(
["Trace ID", "Span ID", "Operation Name", "Conversation UID"],
)
for sp in spans:
conv_uid = None
if "metadata" in sp and sp:
metadata = sp["metadata"]
if isinstance(metadata, dict):
conv_uid = metadata.get("conv_uid")
table.add_row(
[
sp.get("trace_id"),
sp.get("span_id"),
# sp.get("parent_span_id"),
sp.get("operation_name"),
conv_uid,
]
)
out_kwargs = {"ensure_ascii": False} if output == "json" else {}
print(table.get_formatted_string(out_format=output, **out_kwargs))
@trace_cli_group.command()
@click.option(
"--trace_id",
required=True,
type=str,
help="Specify the trace ID to list",
)
@click.argument("files", nargs=-1, type=click.Path(exists=True, readable=True))
def tree(trace_id: str, files):
"""Display trace links as a tree"""
hierarchy = _view_trace_hierarchy(trace_id, files)
if not hierarchy:
_print_empty_message(files)
return
_print_trace_hierarchy(hierarchy)
@trace_cli_group.command()
@click.option(
"--trace_id",
required=False,
type=str,
default=None,
help="Specify the trace ID to analyze. If None, show latest conversation details",
)
@click.option(
"--tree",
required=False,
type=bool,
default=False,
is_flag=True,
help="Display trace spans as a tree",
)
@click.option(
"--hide_conv",
required=False,
type=bool,
default=False,
is_flag=True,
help="Hide your conversation details",
)
@click.option(
"--hide_run_params",
required=False,
type=bool,
default=False,
is_flag=True,
help="Hide run params",
)
@click.option(
"--output",
required=False,
type=click.Choice(["text", "html", "csv", "latex", "json"]),
default="text",
help="The output format",
)
@click.argument("files", nargs=-1, type=click.Path(exists=False, readable=True))
def chat(
trace_id: str,
tree: bool,
hide_conv: bool,
hide_run_params: bool,
output: str,
files,
):
"""Show conversation details"""
from prettytable import PrettyTable
spans = read_spans_from_files(files)
# Sort by start time
spans = sorted(
spans, key=lambda span: _parse_datetime(span["start_time"]), reverse=True
)
spans = [sp for sp in spans]
if not spans:
_print_empty_message(files)
return
service_spans = {}
service_names = set(SpanTypeRunName.values())
found_trace_id = None
for sp in spans:
span_type = sp["span_type"]
metadata = sp.get("metadata")
if span_type == SpanType.RUN:
service_name = metadata["run_service"]
service_spans[service_name] = sp.copy()
if set(service_spans.keys()) == service_names and found_trace_id:
break
elif span_type == SpanType.CHAT and not found_trace_id:
if not trace_id:
found_trace_id = sp["trace_id"]
if trace_id and trace_id == sp["trace_id"]:
found_trace_id = trace_id
service_tables = {}
out_kwargs = {"ensure_ascii": False} if output == "json" else {}
for service_name, sp in service_spans.items():
metadata = sp["metadata"]
table = PrettyTable(["Config Key", "Config Value"], title=service_name)
for k, v in metadata["params"].items():
table.add_row([k, v])
service_tables[service_name] = table
if not hide_run_params:
merged_table1 = merge_tables_horizontally(
[
service_tables.get(SpanTypeRunName.WEBSERVER.value),
service_tables.get(SpanTypeRunName.EMBEDDING_MODEL.value),
]
)
merged_table2 = merge_tables_horizontally(
[
service_tables.get(SpanTypeRunName.MODEL_WORKER),
service_tables.get(SpanTypeRunName.WORKER_MANAGER),
]
)
if output == "text":
print(merged_table1)
print(merged_table2)
else:
for service_name, table in service_tables.items():
print(table.get_formatted_string(out_format=output, **out_kwargs))
if hide_conv:
return
if not found_trace_id:
print(f"Can't found conversation with trace_id: {trace_id}")
return
trace_id = found_trace_id
trace_spans = [span for span in spans if span["trace_id"] == trace_id]
trace_spans = [s for s in reversed(trace_spans)]
hierarchy = _build_trace_hierarchy(trace_spans)
if tree:
print("\nInvoke Trace Tree:\n")
_print_trace_hierarchy(hierarchy)
trace_spans = _get_ordered_trace_from(hierarchy)
table = PrettyTable(["Key", "Value Value"], title="Chat Trace Details")
split_long_text = output == "text"
for sp in trace_spans:
op = sp["operation_name"]
metadata = sp.get("metadata")
if op == "get_chat_instance" and not sp["end_time"]:
table.add_row(["trace_id", trace_id])
table.add_row(["span_id", sp["span_id"]])
table.add_row(["conv_uid", metadata.get("conv_uid")])
table.add_row(["user_input", metadata.get("user_input")])
table.add_row(["chat_mode", metadata.get("chat_mode")])
table.add_row(["select_param", metadata.get("select_param")])
table.add_row(["model_name", metadata.get("model_name")])
if op in ["BaseChat.stream_call", "BaseChat.nostream_call"]:
if not sp["end_time"]:
table.add_row(["temperature", metadata.get("temperature")])
table.add_row(["max_new_tokens", metadata.get("max_new_tokens")])
table.add_row(["echo", metadata.get("echo")])
elif "error" in metadata:
table.add_row(["BaseChat Error", metadata.get("error")])
if op == "BaseChat.nostream_call" and not sp["end_time"]:
if "model_output" in metadata:
table.add_row(
[
"BaseChat model_output",
split_string_by_terminal_width(
metadata.get("model_output").get("text"),
split=split_long_text,
),
]
)
if "ai_response_text" in metadata:
table.add_row(
[
"BaseChat ai_response_text",
split_string_by_terminal_width(
metadata.get("ai_response_text"), split=split_long_text
),
]
)
if "prompt_define_response" in metadata:
table.add_row(
[
"BaseChat prompt_define_response",
split_string_by_terminal_width(
metadata.get("prompt_define_response"),
split=split_long_text,
),
]
)
if op == "DefaultModelWorker_call.generate_stream_func":
if not sp["end_time"]:
table.add_row(["llm_adapter", metadata.get("llm_adapter")])
table.add_row(
[
"User prompt",
split_string_by_terminal_width(
metadata.get("prompt"), split=split_long_text
),
]
)
else:
table.add_row(
[
"Model output",
split_string_by_terminal_width(metadata.get("output")),
]
)
if (
op
in [
"DefaultModelWorker.async_generate_stream",
"DefaultModelWorker.generate_stream",
]
and metadata
and "error" in metadata
):
table.add_row(["Model Error", metadata.get("error")])
print(table.get_formatted_string(out_format=output, **out_kwargs))
def read_spans_from_files(files=None) -> Iterable[Dict]:
"""
Reads spans from multiple files based on the provided file paths.
"""
if not files:
files = [_DEFAULT_FILE_PATTERN]
for filepath in files:
for filename in glob.glob(filepath):
with open(filename, "r") as file:
for line in file:
yield json.loads(line)
def _print_empty_message(files=None):
if not files:
files = [_DEFAULT_FILE_PATTERN]
file_names = ",".join(files)
print(f"No trace span records found in your tracer files: {file_names}")
def _new_search_span_func(search: str):
def func(span: Dict) -> bool:
items = [span["trace_id"], span["span_id"], span["parent_span_id"]]
if "operation_name" in span:
items.append(span["operation_name"])
if "metadata" in span:
metadata = span["metadata"]
if isinstance(metadata, dict):
for k, v in metadata.items():
items.append(k)
items.append(v)
return any(search in str(item) for item in items if item)
return func
def _parse_datetime(dt_str):
"""Parse a datetime string to a datetime object."""
return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S.%f")
def _build_trace_hierarchy(spans, parent_span_id=None, indent=0):
# Current spans
current_level_spans = [
span
for span in spans
if span["parent_span_id"] == parent_span_id and span["end_time"] is None
]
hierarchy = []
for start_span in current_level_spans:
# Find end span
end_span = next(
(
span
for span in spans
if span["span_id"] == start_span["span_id"]
and span["end_time"] is not None
),
None,
)
entry = {
"operation_name": start_span["operation_name"],
"parent_span_id": start_span["parent_span_id"],
"span_id": start_span["span_id"],
"start_time": start_span["start_time"],
"end_time": start_span["end_time"],
"metadata": start_span["metadata"],
"children": _build_trace_hierarchy(
spans, start_span["span_id"], indent + 1
),
}
hierarchy.append(entry)
# Append end span
if end_span:
entry_end = {
"operation_name": end_span["operation_name"],
"parent_span_id": end_span["parent_span_id"],
"span_id": end_span["span_id"],
"start_time": end_span["start_time"],
"end_time": end_span["end_time"],
"metadata": end_span["metadata"],
"children": [],
}
hierarchy.append(entry_end)
return hierarchy
def _view_trace_hierarchy(trace_id, files=None):
"""Find and display the calls of the entire link based on the given trace_id"""
spans = read_spans_from_files(files)
trace_spans = [span for span in spans if span["trace_id"] == trace_id]
if not trace_spans:
return None
hierarchy = _build_trace_hierarchy(trace_spans)
return hierarchy
def _print_trace_hierarchy(hierarchy, indent=0):
"""Print link hierarchy"""
for entry in hierarchy:
print(
" " * indent
+ f"Operation: {entry['operation_name']} (Start: {entry['start_time']}, End: {entry['end_time']})"
)
_print_trace_hierarchy(entry["children"], indent + 1)
def _get_ordered_trace_from(hierarchy):
traces = []
def func(items):
for item in items:
traces.append(item)
func(item["children"])
func(hierarchy)
return traces
def _print(service_spans: Dict):
for names in [
[SpanTypeRunName.WEBSERVER.name, SpanTypeRunName.EMBEDDING_MODEL],
[SpanTypeRunName.WORKER_MANAGER.name, SpanTypeRunName.MODEL_WORKER],
]:
pass
def merge_tables_horizontally(tables):
from prettytable import PrettyTable
if not tables:
return None
tables = [t for t in tables if t]
if not tables:
return None
max_rows = max(len(table._rows) for table in tables)
merged_table = PrettyTable()
new_field_names = []
for table in tables:
new_field_names.extend(
[
f"{name} ({table.title})" if table.title else f"{name}"
for name in table.field_names
]
)
merged_table.field_names = new_field_names
for i in range(max_rows):
merged_row = []
for table in tables:
if i < len(table._rows):
merged_row.extend(table._rows[i])
else:
# Fill empty cells for shorter tables
merged_row.extend([""] * len(table.field_names))
merged_table.add_row(merged_row)
return merged_table
def split_string_by_terminal_width(s, split=True, max_len=None, sp="\n"):
"""
Split a string into substrings based on the current terminal width.
Parameters:
- s: the input string
"""
if not split:
return s
if not max_len:
try:
max_len = int(os.get_terminal_size().columns * 0.8)
except OSError:
# Default to 80 columns if the terminal size can't be determined
max_len = 100
return sp.join([s[i : i + max_len] for i in range(0, len(s), max_len)])

View File

@ -0,0 +1,195 @@
from typing import Dict, Optional
from contextvars import ContextVar
from functools import wraps
from pilot.component import SystemApp, ComponentType
from pilot.utils.tracer.base import (
SpanType,
Span,
Tracer,
SpanStorage,
SpanStorageType,
TracerContext,
)
from pilot.utils.tracer.span_storage import MemorySpanStorage
class DefaultTracer(Tracer):
def __init__(
self,
system_app: SystemApp | None = None,
default_storage: SpanStorage = None,
span_storage_type: SpanStorageType = SpanStorageType.ON_CREATE_END,
):
super().__init__(system_app)
self._span_stack_var = ContextVar("span_stack", default=[])
if not default_storage:
default_storage = MemorySpanStorage(system_app)
self._default_storage = default_storage
self._span_storage_type = span_storage_type
def append_span(self, span: Span):
self._get_current_storage().append_span(span)
def start_span(
self,
operation_name: str,
parent_span_id: str = None,
span_type: SpanType = None,
metadata: Dict = None,
) -> Span:
trace_id = (
self._new_uuid() if parent_span_id is None else parent_span_id.split(":")[0]
)
span_id = f"{trace_id}:{self._new_uuid()}"
span = Span(
trace_id,
span_id,
span_type,
parent_span_id,
operation_name,
metadata=metadata,
)
if self._span_storage_type in [
SpanStorageType.ON_END,
SpanStorageType.ON_CREATE_END,
]:
span.add_end_caller(self.append_span)
if self._span_storage_type in [
SpanStorageType.ON_CREATE,
SpanStorageType.ON_CREATE_END,
]:
self.append_span(span)
current_stack = self._span_stack_var.get()
current_stack.append(span)
self._span_stack_var.set(current_stack)
span.add_end_caller(self._remove_from_stack_top)
return span
def end_span(self, span: Span, **kwargs):
""""""
span.end(**kwargs)
def _remove_from_stack_top(self, span: Span):
current_stack = self._span_stack_var.get()
if current_stack:
current_stack.pop()
self._span_stack_var.set(current_stack)
def get_current_span(self) -> Optional[Span]:
current_stack = self._span_stack_var.get()
return current_stack[-1] if current_stack else None
def _get_current_storage(self) -> SpanStorage:
return self.system_app.get_component(
ComponentType.TRACER_SPAN_STORAGE, SpanStorage, self._default_storage
)
class TracerManager:
"""The manager of current tracer"""
def __init__(self) -> None:
self._system_app: Optional[SystemApp] = None
self._trace_context_var: ContextVar[TracerContext] = ContextVar(
"trace_context",
default=TracerContext(),
)
def initialize(
self, system_app: SystemApp, trace_context_var: ContextVar[TracerContext] = None
) -> None:
self._system_app = system_app
if trace_context_var:
self._trace_context_var = trace_context_var
def _get_tracer(self) -> Tracer:
if not self._system_app:
return None
return self._system_app.get_component(ComponentType.TRACER, Tracer, None)
def start_span(
self,
operation_name: str,
parent_span_id: str = None,
span_type: SpanType = None,
metadata: Dict = None,
) -> Span:
"""Start a new span with operation_name
This method must not throw an exception under any case and try not to block as much as possible
"""
tracer = self._get_tracer()
if not tracer:
return Span("empty_span", "empty_span")
if not parent_span_id:
parent_span_id = self.get_current_span_id()
return tracer.start_span(
operation_name, parent_span_id, span_type=span_type, metadata=metadata
)
def end_span(self, span: Span, **kwargs):
tracer = self._get_tracer()
if not tracer or not span:
return
tracer.end_span(span, **kwargs)
def get_current_span(self) -> Optional[Span]:
tracer = self._get_tracer()
if not tracer:
return None
return tracer.get_current_span()
def get_current_span_id(self) -> Optional[str]:
current_span = self.get_current_span()
if current_span:
return current_span.span_id
ctx = self._trace_context_var.get()
return ctx.span_id if ctx else None
root_tracer: TracerManager = TracerManager()
def trace(operation_name: str, **trace_kwargs):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
with root_tracer.start_span(operation_name, **trace_kwargs):
return await func(*args, **kwargs)
return wrapper
return decorator
def initialize_tracer(
system_app: SystemApp,
tracer_filename: str,
root_operation_name: str = "DB-GPT-Web-Entry",
):
if not system_app:
return
from pilot.utils.tracer.span_storage import FileSpanStorage
trace_context_var = ContextVar(
"trace_context",
default=TracerContext(),
)
tracer = DefaultTracer(system_app)
system_app.register_instance(FileSpanStorage(tracer_filename))
system_app.register_instance(tracer)
root_tracer.initialize(system_app, trace_context_var)
if system_app.app:
from pilot.utils.tracer.tracer_middleware import TraceIDMiddleware
system_app.app.add_middleware(
TraceIDMiddleware,
trace_context_var=trace_context_var,
tracer=tracer,
root_operation_name=root_operation_name,
)

View File

@ -0,0 +1,45 @@
import uuid
from contextvars import ContextVar
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.types import ASGIApp
from pilot.utils.tracer import TracerContext, Tracer
_DEFAULT_EXCLUDE_PATHS = ["/api/controller/heartbeat"]
class TraceIDMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app: ASGIApp,
trace_context_var: ContextVar[TracerContext],
tracer: Tracer,
root_operation_name: str = "DB-GPT-Web-Entry",
include_prefix: str = "/api",
exclude_paths=_DEFAULT_EXCLUDE_PATHS,
):
super().__init__(app)
self.trace_context_var = trace_context_var
self.tracer = tracer
self.root_operation_name = root_operation_name
self.include_prefix = include_prefix
self.exclude_paths = exclude_paths
async def dispatch(self, request: Request, call_next):
if request.url.path in self.exclude_paths or not request.url.path.startswith(
self.include_prefix
):
return await call_next(request)
span_id = request.headers.get("DBGPT_TRACER_SPAN_ID")
# if not span_id:
# span_id = str(uuid.uuid4())
# self.trace_context_var.set(TracerContext(span_id=span_id))
with self.tracer.start_span(
self.root_operation_name, span_id, metadata={"path": request.url.path}
):
response = await call_next(request)
return response