feat(core): Support opentelemetry exporter (#1690)

This commit is contained in:
Fangyin Cheng
2024-07-05 15:20:21 +08:00
committed by GitHub
parent 84fc1fc7fe
commit bf978d2bf9
39 changed files with 1176 additions and 218 deletions

View File

@@ -24,6 +24,7 @@ from dbgpt.util.executor_utils import (
DefaultExecutorFactory,
blocking_func_to_async,
)
from dbgpt.util.tracer import root_tracer
from ..dag.base import DAG, DAGContext, DAGNode, DAGVar
from ..task.base import EMPTY_DATA, OUT, T, TaskOutput, is_empty_data
@@ -218,10 +219,11 @@ class BaseOperator(DAGNode, ABC, Generic[OUT], metaclass=BaseOperatorMeta):
"""
if not is_empty_data(call_data):
call_data = {"data": call_data}
out_ctx = await self._runner.execute_workflow(
self, call_data, exist_dag_ctx=dag_ctx
)
return out_ctx.current_task_context.task_output.output
with root_tracer.start_span("dbgpt.awel.operator.call"):
out_ctx = await self._runner.execute_workflow(
self, call_data, exist_dag_ctx=dag_ctx
)
return out_ctx.current_task_context.task_output.output
def _blocking_call(
self,
@@ -265,19 +267,27 @@ class BaseOperator(DAGNode, ABC, Generic[OUT], metaclass=BaseOperatorMeta):
"""
if call_data != EMPTY_DATA:
call_data = {"data": call_data}
out_ctx = await self._runner.execute_workflow(
self, call_data, streaming_call=True, exist_dag_ctx=dag_ctx
)
with root_tracer.start_span("dbgpt.awel.operator.call_stream"):
task_output = out_ctx.current_task_context.task_output
if task_output.is_stream:
return out_ctx.current_task_context.task_output.output_stream
else:
out_ctx = await self._runner.execute_workflow(
self, call_data, streaming_call=True, exist_dag_ctx=dag_ctx
)
async def _gen():
yield task_output.output
task_output = out_ctx.current_task_context.task_output
if task_output.is_stream:
stream_generator = (
out_ctx.current_task_context.task_output.output_stream
)
else:
return _gen()
# No stream output, wrap the output in a stream
async def _gen():
yield task_output.output
stream_generator = _gen()
return root_tracer.wrapper_async_stream(
stream_generator, "dbgpt.awel.operator.call_stream.iterate"
)
def _blocking_call_stream(
self,

View File

@@ -9,6 +9,7 @@ import traceback
from typing import Any, Dict, List, Optional, Set, cast
from dbgpt.component import SystemApp
from dbgpt.util.tracer import root_tracer
from ..dag.base import DAGContext, DAGVar
from ..operators.base import CALL_DATA, BaseOperator, WorkflowRunner
@@ -90,9 +91,20 @@ class DefaultWorkflowRunner(WorkflowRunner):
# Save dag context
await node.dag._save_dag_ctx(dag_ctx)
await job_manager.before_dag_run()
await self._execute_node(
job_manager, node, dag_ctx, node_outputs, skip_node_ids, system_app
)
with root_tracer.start_span(
"dbgpt.awel.workflow.run_workflow",
metadata={
"exist_dag_ctx": exist_dag_ctx is not None,
"event_loop_task_id": event_loop_task_id,
"streaming_call": streaming_call,
"awel_node_id": node.node_id,
"awel_node_name": node.node_name,
},
):
await self._execute_node(
job_manager, node, dag_ctx, node_outputs, skip_node_ids, system_app
)
if not streaming_call and node.dag and exist_dag_ctx is None:
# streaming call not work for dag end
# if exist_dag_ctx is not None, it means current dag is a sub dag
@@ -158,9 +170,23 @@ class DefaultWorkflowRunner(WorkflowRunner):
if system_app is not None and node.system_app is None:
node.set_system_app(system_app)
await node._run(dag_ctx, task_ctx.log_id)
node_outputs[node.node_id] = dag_ctx.current_task_context
task_ctx.set_current_state(TaskState.SUCCESS)
run_metadata = {
"awel_node_id": node.node_id,
"awel_node_name": node.node_name,
"awel_node_type": str(node),
"state": TaskState.RUNNING.value,
"task_log_id": task_ctx.log_id,
}
with root_tracer.start_span(
"dbgpt.awel.workflow.run_operator", metadata=run_metadata
) as span:
await node._run(dag_ctx, task_ctx.log_id)
node_outputs[node.node_id] = dag_ctx.current_task_context
task_ctx.set_current_state(TaskState.SUCCESS)
run_metadata["skip_node_ids"] = ",".join(skip_node_ids)
run_metadata["state"] = TaskState.SUCCESS.value
span.metadata = run_metadata
if isinstance(node, BranchOperator):
skip_nodes = task_ctx.metadata.get("skip_node_names", [])

View File

@@ -1,4 +1,5 @@
"""Http trigger for AWEL."""
import json
import logging
from enum import Enum
@@ -24,6 +25,7 @@ from dbgpt._private.pydantic import (
model_to_dict,
)
from dbgpt.util.i18n_utils import _
from dbgpt.util.tracer import root_tracer
from ..dag.base import DAG
from ..flow import (
@@ -650,12 +652,21 @@ async def _trigger_dag(
from fastapi import BackgroundTasks
from fastapi.responses import StreamingResponse
span_id = root_tracer._parse_span_id(body)
leaf_nodes = dag.leaf_nodes
if len(leaf_nodes) != 1:
raise ValueError("HttpTrigger just support one leaf node in dag")
end_node = cast(BaseOperator, leaf_nodes[0])
metadata = {
"awel_node_id": end_node.node_id,
"awel_node_name": end_node.node_name,
}
if not streaming_response:
return await end_node.call(call_data=body)
with root_tracer.start_span(
"dbgpt.core.trigger.http.run_dag", span_id, metadata=metadata
):
return await end_node.call(call_data=body)
else:
headers = response_headers
media_type = response_media_type if response_media_type else "text/event-stream"
@@ -666,7 +677,10 @@ async def _trigger_dag(
"Connection": "keep-alive",
"Transfer-Encoding": "chunked",
}
generator = await end_node.call_stream(call_data=body)
_generator = await end_node.call_stream(call_data=body)
trace_generator = root_tracer.wrapper_async_stream(
_generator, "dbgpt.core.trigger.http.run_dag", span_id, metadata=metadata
)
async def _after_dag_end():
await dag._after_dag_end(end_node.current_event_loop_task_id)
@@ -674,7 +688,7 @@ async def _trigger_dag(
background_tasks = BackgroundTasks()
background_tasks.add_task(_after_dag_end)
return StreamingResponse(
generator,
trace_generator,
headers=headers,
media_type=media_type,
background=background_tasks,