DB-GPT/dbgpt/util/tracer/opentelemetry.py
2024-07-05 15:20:21 +08:00

123 lines
4.6 KiB
Python

from typing import Dict, List, Optional
from .base import Span, SpanStorage, _split_span_id
try:
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import Span as OTSpan
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import SpanContext, SpanKind
except ImportError:
raise ImportError(
"To use OpenTelemetrySpanStorage, you must install opentelemetry-api, "
"opentelemetry-sdk and opentelemetry-exporter-otlp."
"You can install it via `pip install opentelemetry-api opentelemetry-sdk "
"opentelemetry-exporter-otlp`"
)
class OpenTelemetrySpanStorage(SpanStorage):
"""OpenTelemetry span storage."""
def __init__(
self,
service_name: str,
otlp_endpoint: Optional[str] = None,
otlp_insecure: Optional[bool] = None,
otlp_timeout: Optional[int] = None,
):
super().__init__()
self.service_name = service_name
resource = Resource(attributes={"service.name": service_name})
self.tracer_provider = TracerProvider(resource=resource)
self.tracer = self.tracer_provider.get_tracer(__name__)
# Store the spans that have not ended
self.spans: Dict[str, OTSpan] = {}
otlp_exporter = OTLPSpanExporter(
endpoint=otlp_endpoint,
insecure=otlp_insecure,
timeout=otlp_timeout,
)
span_processor = BatchSpanProcessor(otlp_exporter)
self.tracer_provider.add_span_processor(span_processor)
trace.set_tracer_provider(self.tracer_provider)
def append_span(self, span: Span):
span_id = span.span_id
if span_id in self.spans:
otel_span = self.spans.pop(span_id)
# Update the end time and attributes of the span
end_time = int(span.end_time.timestamp() * 1e9) if span.end_time else None
if span.metadata:
for key, value in span.metadata.items():
if isinstance(value, (bool, str, bytes, int, float)) or (
isinstance(value, list)
and all(
isinstance(i, (bool, str, bytes, int, float)) for i in value
)
):
otel_span.set_attribute(key, value)
if end_time:
otel_span.end(end_time=end_time)
else:
otel_span.end()
else:
parent_context = self._create_parent_context(span)
# Datetime -> int
start_time = int(span.start_time.timestamp() * 1e9)
otel_span = self.tracer.start_span(
span.operation_name,
context=parent_context,
kind=SpanKind.INTERNAL,
start_time=start_time,
)
otel_span.set_attribute("dbgpt_trace_id", span.trace_id)
otel_span.set_attribute("dbgpt_span_id", span.span_id)
if span.parent_span_id:
otel_span.set_attribute("dbgpt_parent_span_id", span.parent_span_id)
otel_span.set_attribute("span_type", span.span_type.value)
if span.metadata:
for key, value in span.metadata.items():
if isinstance(value, (bool, str, bytes, int, float)) or (
isinstance(value, list)
and all(
isinstance(i, (bool, str, bytes, int, float)) for i in value
)
):
otel_span.set_attribute(key, value)
if not span.end_time:
self.spans[span_id] = otel_span
def append_span_batch(self, spans: List[Span]):
for span in spans:
self.append_span(span)
def _create_parent_context(self, span: Span):
if not span.parent_span_id:
return trace.set_span_in_context(trace.INVALID_SPAN)
trace_id, parent_span_id = _split_span_id(span.parent_span_id)
if not trace_id:
return trace.set_span_in_context(trace.INVALID_SPAN)
span_context = SpanContext(
trace_id=trace_id,
span_id=parent_span_id,
is_remote=True,
trace_flags=trace.TraceFlags(0x01), # Default: SAMPLED
)
return trace.set_span_in_context(trace.NonRecordingSpan(span_context))
def close(self):
self.tracer_provider.shutdown()