distribution/vendor/google.golang.org/grpc/stats/opentelemetry/client_metrics.go
Milos Gajdos 3996413f46
Bump google storage module
Also bump the golangci version

Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
2024-10-26 18:19:46 +01:00

278 lines
11 KiB
Go

/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package opentelemetry
import (
"context"
"sync/atomic"
"time"
"google.golang.org/grpc"
estats "google.golang.org/grpc/experimental/stats"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
otelattribute "go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
)
type clientStatsHandler struct {
estats.MetricsRecorder
options Options
clientMetrics clientMetrics
}
func (h *clientStatsHandler) initializeMetrics() {
// Will set no metrics to record, logically making this stats handler a
// no-op.
if h.options.MetricsOptions.MeterProvider == nil {
return
}
meter := h.options.MetricsOptions.MeterProvider.Meter("grpc-go", otelmetric.WithInstrumentationVersion(grpc.Version))
if meter == nil {
return
}
metrics := h.options.MetricsOptions.Metrics
if metrics == nil {
metrics = DefaultMetrics()
}
h.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
h.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
rm := &registryMetrics{
optionalLabels: h.options.MetricsOptions.OptionalLabels,
}
h.MetricsRecorder = rm
rm.registerMetrics(metrics, meter)
}
func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ci := &callInfo{
target: cc.CanonicalTarget(),
method: h.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)
if h.options.MetricsOptions.pluginOption != nil {
md := h.options.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
}
}
startTime := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallMetrics(ctx, err, startTime, ci)
return err
}
// determineMethod determines the method to record attributes with. This will be
// "other" if StaticMethod isn't specified or if method filter is set and
// specifies, the method name as is otherwise.
func (h *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOption) string {
for _, opt := range opts {
if _, ok := opt.(grpc.StaticMethodCallOption); ok {
return removeLeadingSlash(method)
}
}
return "other"
}
func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ci := &callInfo{
target: cc.CanonicalTarget(),
method: h.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)
if h.options.MetricsOptions.pluginOption != nil {
md := h.options.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
}
}
startTime := time.Now()
callback := func(err error) {
h.perCallMetrics(ctx, err, startTime, ci)
}
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...)
}
func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
}
// TagConn exists to satisfy stats.Handler.
func (h *clientStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
// HandleConn exists to satisfy stats.Handler.
func (h *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
// TagRPC implements per RPC attempt context management.
func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
// Numerous stats handlers can be used for the same channel. The cluster
// impl balancer which writes to this will only write once, thus have this
// stats handler's per attempt scoped context point to the same optional
// labels map if set.
var labels *istats.Labels
if labels = istats.GetLabels(ctx); labels == nil {
labels = &istats.Labels{
// The defaults for all the per call labels from a plugin that
// executes on the callpath that this OpenTelemetry component
// currently supports.
TelemetryLabels: map[string]string{
"grpc.lb.locality": "",
},
}
ctx = istats.SetLabels(ctx, labels)
}
ai := &attemptInfo{ // populates information about RPC start.
startTime: time.Now(),
xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName,
}
ri := &rpcInfo{
ai: ai,
}
return setRPCInfo(ctx, ri)
}
func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
ri := getRPCInfo(ctx)
if ri == nil {
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
return
}
h.processRPCEvent(ctx, rs, ri.ai)
}
func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
switch st := s.(type) {
case *stats.Begin:
ci := getCallInfo(ctx)
if ci == nil {
logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present")
return
}
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
))
h.clientMetrics.attemptStarted.Add(ctx, 1, attrs)
case *stats.OutPayload:
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength))
case *stats.InHeader:
h.setLabelsFromPluginOption(ai, st.Header)
case *stats.InTrailer:
h.setLabelsFromPluginOption(ai, st.Trailer)
case *stats.End:
h.processRPCEnd(ctx, ai, st)
default:
}
}
func (h *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) {
if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil {
labels := h.options.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
if labels == nil {
labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt.
}
ai.pluginOptionLabels = labels
}
}
func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) {
ci := getCallInfo(ctx)
if ci == nil {
logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present")
return
}
latency := float64(time.Since(ai.startTime)) / float64(time.Second)
st := "OK"
if e.Error != nil {
s, _ := status.FromError(e.Error)
st = canonicalString(s.Code())
}
attributes := []otelattribute.KeyValue{
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", st),
}
for k, v := range ai.pluginOptionLabels {
attributes = append(attributes, otelattribute.String(k, v))
}
for _, o := range h.options.MetricsOptions.OptionalLabels {
// TODO: Add a filter for converting to unknown if not present in the
// CSM Plugin Option layer by adding an optional labels API.
if val, ok := ai.xdsLabels[o]; ok {
attributes = append(attributes, otelattribute.String(o, val))
}
}
// Allocate vararg slice once.
opts := []otelmetric.RecordOption{otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...))}
h.clientMetrics.attemptDuration.Record(ctx, latency, opts...)
h.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...)
h.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...)
}
const (
// ClientAttemptStarted is the number of client call attempts started.
ClientAttemptStarted estats.Metric = "grpc.client.attempt.started"
// ClientAttemptDuration is the end-to-end time taken to complete a client
// call attempt.
ClientAttemptDuration estats.Metric = "grpc.client.attempt.duration"
// ClientAttemptSentCompressedTotalMessageSize is the compressed message
// bytes sent per client call attempt.
ClientAttemptSentCompressedTotalMessageSize estats.Metric = "grpc.client.attempt.sent_total_compressed_message_size"
// ClientAttemptRcvdCompressedTotalMessageSize is the compressed message
// bytes received per call attempt.
ClientAttemptRcvdCompressedTotalMessageSize estats.Metric = "grpc.client.attempt.rcvd_total_compressed_message_size"
// ClientCallDuration is the time taken by gRPC to complete an RPC from
// application's perspective.
ClientCallDuration estats.Metric = "grpc.client.call.duration"
)