mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-08-03 01:12:15 +00:00
123 lines
4.6 KiB
Python
123 lines
4.6 KiB
Python
from typing import Dict, List, Optional
|
|
|
|
from .base import Span, SpanStorage, _split_span_id
|
|
|
|
try:
|
|
from opentelemetry import trace
|
|
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
|
from opentelemetry.sdk.resources import Resource
|
|
from opentelemetry.sdk.trace import Span as OTSpan
|
|
from opentelemetry.sdk.trace import TracerProvider
|
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
|
from opentelemetry.trace import SpanContext, SpanKind
|
|
except ImportError:
|
|
raise ImportError(
|
|
"To use OpenTelemetrySpanStorage, you must install opentelemetry-api, "
|
|
"opentelemetry-sdk and opentelemetry-exporter-otlp."
|
|
"You can install it via `pip install opentelemetry-api opentelemetry-sdk "
|
|
"opentelemetry-exporter-otlp`"
|
|
)
|
|
|
|
|
|
class OpenTelemetrySpanStorage(SpanStorage):
|
|
"""OpenTelemetry span storage."""
|
|
|
|
def __init__(
|
|
self,
|
|
service_name: str,
|
|
otlp_endpoint: Optional[str] = None,
|
|
otlp_insecure: Optional[bool] = None,
|
|
otlp_timeout: Optional[int] = None,
|
|
):
|
|
super().__init__()
|
|
self.service_name = service_name
|
|
|
|
resource = Resource(attributes={"service.name": service_name})
|
|
self.tracer_provider = TracerProvider(resource=resource)
|
|
self.tracer = self.tracer_provider.get_tracer(__name__)
|
|
# Store the spans that have not ended
|
|
self.spans: Dict[str, OTSpan] = {}
|
|
otlp_exporter = OTLPSpanExporter(
|
|
endpoint=otlp_endpoint,
|
|
insecure=otlp_insecure,
|
|
timeout=otlp_timeout,
|
|
)
|
|
span_processor = BatchSpanProcessor(otlp_exporter)
|
|
self.tracer_provider.add_span_processor(span_processor)
|
|
trace.set_tracer_provider(self.tracer_provider)
|
|
|
|
def append_span(self, span: Span):
|
|
span_id = span.span_id
|
|
|
|
if span_id in self.spans:
|
|
otel_span = self.spans.pop(span_id)
|
|
# Update the end time and attributes of the span
|
|
end_time = int(span.end_time.timestamp() * 1e9) if span.end_time else None
|
|
if span.metadata:
|
|
for key, value in span.metadata.items():
|
|
if isinstance(value, (bool, str, bytes, int, float)) or (
|
|
isinstance(value, list)
|
|
and all(
|
|
isinstance(i, (bool, str, bytes, int, float)) for i in value
|
|
)
|
|
):
|
|
otel_span.set_attribute(key, value)
|
|
if end_time:
|
|
otel_span.end(end_time=end_time)
|
|
else:
|
|
otel_span.end()
|
|
else:
|
|
parent_context = self._create_parent_context(span)
|
|
# Datetime -> int
|
|
start_time = int(span.start_time.timestamp() * 1e9)
|
|
|
|
otel_span = self.tracer.start_span(
|
|
span.operation_name,
|
|
context=parent_context,
|
|
kind=SpanKind.INTERNAL,
|
|
start_time=start_time,
|
|
)
|
|
|
|
otel_span.set_attribute("dbgpt_trace_id", span.trace_id)
|
|
otel_span.set_attribute("dbgpt_span_id", span.span_id)
|
|
|
|
if span.parent_span_id:
|
|
otel_span.set_attribute("dbgpt_parent_span_id", span.parent_span_id)
|
|
|
|
otel_span.set_attribute("span_type", span.span_type.value)
|
|
if span.metadata:
|
|
for key, value in span.metadata.items():
|
|
if isinstance(value, (bool, str, bytes, int, float)) or (
|
|
isinstance(value, list)
|
|
and all(
|
|
isinstance(i, (bool, str, bytes, int, float)) for i in value
|
|
)
|
|
):
|
|
otel_span.set_attribute(key, value)
|
|
|
|
if not span.end_time:
|
|
self.spans[span_id] = otel_span
|
|
|
|
def append_span_batch(self, spans: List[Span]):
|
|
for span in spans:
|
|
self.append_span(span)
|
|
|
|
def _create_parent_context(self, span: Span):
|
|
if not span.parent_span_id:
|
|
return trace.set_span_in_context(trace.INVALID_SPAN)
|
|
|
|
trace_id, parent_span_id = _split_span_id(span.parent_span_id)
|
|
if not trace_id:
|
|
return trace.set_span_in_context(trace.INVALID_SPAN)
|
|
|
|
span_context = SpanContext(
|
|
trace_id=trace_id,
|
|
span_id=parent_span_id,
|
|
is_remote=True,
|
|
trace_flags=trace.TraceFlags(0x01), # Default: SAMPLED
|
|
)
|
|
return trace.set_span_in_context(trace.NonRecordingSpan(span_context))
|
|
|
|
def close(self):
|
|
self.tracer_provider.shutdown()
|