mirror of
https://github.com/distribution/distribution.git
synced 2025-08-31 22:48:01 +00:00
276 lines
9.4 KiB
Go
276 lines
9.4 KiB
Go
// Copyright 2024 Google LLC
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
|
|
mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
|
|
"github.com/google/uuid"
|
|
"go.opentelemetry.io/contrib/detectors/gcp"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/sdk/metric"
|
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
"google.golang.org/api/option"
|
|
"google.golang.org/api/transport"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/stats/opentelemetry"
|
|
)
|
|
|
|
const (
|
|
monitoredResourceName = "storage.googleapis.com/Client"
|
|
metricPrefix = "storage.googleapis.com/client/"
|
|
)
|
|
|
|
func latencyHistogramBoundaries() []float64 {
|
|
boundaries := []float64{}
|
|
boundary := 0.0
|
|
increment := 0.002
|
|
// 2ms buckets for first 100ms, so we can have higher resolution for uploads and downloads in the 100 KiB range
|
|
for i := 0; i < 50; i++ {
|
|
boundaries = append(boundaries, boundary)
|
|
// increment by 2ms
|
|
boundary += increment
|
|
}
|
|
// For the remaining buckets do 10 10ms, 10 20ms, and so on, up until 5 minutes
|
|
for i := 0; i < 150 && boundary < 300; i++ {
|
|
boundaries = append(boundaries, boundary)
|
|
if i != 0 && i%10 == 0 {
|
|
increment *= 2
|
|
}
|
|
boundary += increment
|
|
}
|
|
return boundaries
|
|
}
|
|
|
|
func sizeHistogramBoundaries() []float64 {
|
|
kb := 1024.0
|
|
mb := 1024.0 * kb
|
|
gb := 1024.0 * mb
|
|
boundaries := []float64{}
|
|
boundary := 0.0
|
|
increment := 128 * kb
|
|
// 128 KiB increments up to 4MiB, then exponential growth
|
|
for len(boundaries) < 200 && boundary <= 16*gb {
|
|
boundaries = append(boundaries, boundary)
|
|
boundary += increment
|
|
if boundary >= 4*mb {
|
|
increment *= 2
|
|
}
|
|
}
|
|
return boundaries
|
|
}
|
|
|
|
func metricFormatter(m metricdata.Metrics) string {
|
|
return metricPrefix + strings.ReplaceAll(string(m.Name), ".", "/")
|
|
}
|
|
|
|
func gcpAttributeExpectedDefaults() []attribute.KeyValue {
|
|
return []attribute.KeyValue{
|
|
{Key: "location", Value: attribute.StringValue("global")},
|
|
{Key: "cloud_platform", Value: attribute.StringValue("unknown")},
|
|
{Key: "host_id", Value: attribute.StringValue("unknown")}}
|
|
}
|
|
|
|
// Added to help with tests
|
|
type preparedResource struct {
|
|
projectToUse string
|
|
resource *resource.Resource
|
|
}
|
|
|
|
func newPreparedResource(ctx context.Context, project string, resourceOptions []resource.Option) (*preparedResource, error) {
|
|
detectedAttrs, err := resource.New(ctx, resourceOptions...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
preparedResource := &preparedResource{}
|
|
s := detectedAttrs.Set()
|
|
p, present := s.Value("cloud.account.id")
|
|
if present {
|
|
preparedResource.projectToUse = p.AsString()
|
|
} else {
|
|
preparedResource.projectToUse = project
|
|
}
|
|
updates := []attribute.KeyValue{}
|
|
for _, kv := range gcpAttributeExpectedDefaults() {
|
|
if val, present := s.Value(kv.Key); !present || val.AsString() == "" {
|
|
updates = append(updates, attribute.KeyValue{Key: kv.Key, Value: kv.Value})
|
|
}
|
|
}
|
|
r, err := resource.New(
|
|
ctx,
|
|
resource.WithAttributes(
|
|
attribute.KeyValue{Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)},
|
|
attribute.KeyValue{Key: "instance_id", Value: attribute.StringValue(uuid.New().String())},
|
|
attribute.KeyValue{Key: "project_id", Value: attribute.StringValue(project)},
|
|
attribute.KeyValue{Key: "api", Value: attribute.StringValue("grpc")},
|
|
),
|
|
resource.WithAttributes(detectedAttrs.Attributes()...),
|
|
// Last duplicate key / value wins
|
|
resource.WithAttributes(updates...),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
preparedResource.resource = r
|
|
return preparedResource, nil
|
|
}
|
|
|
|
type metricsContext struct {
|
|
// project used by exporter
|
|
project string
|
|
// client options passed to gRPC channels
|
|
clientOpts []option.ClientOption
|
|
// instance of metric reader used by gRPC client-side metrics
|
|
provider *metric.MeterProvider
|
|
// clean func to call when closing gRPC client
|
|
close func()
|
|
}
|
|
|
|
func createHistogramView(name string, boundaries []float64) metric.View {
|
|
return metric.NewView(metric.Instrument{
|
|
Name: name,
|
|
Kind: metric.InstrumentKindHistogram,
|
|
}, metric.Stream{
|
|
Name: name,
|
|
Aggregation: metric.AggregationExplicitBucketHistogram{Boundaries: boundaries},
|
|
})
|
|
}
|
|
|
|
func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext, error) {
|
|
preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Implementation requires a project, if one is not determined possibly user
|
|
// credentials. Then we will fail stating gRPC Metrics require a project-id.
|
|
if project == "" && preparedResource.projectToUse != "" {
|
|
return nil, fmt.Errorf("google cloud project is required to start client-side metrics")
|
|
}
|
|
// If projectTouse isn't the same as project provided to Storage client, then
|
|
// emit a log stating which project is being used to emit metrics to.
|
|
if project != preparedResource.projectToUse {
|
|
log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project)
|
|
}
|
|
meOpts := []mexporter.Option{
|
|
mexporter.WithProjectID(preparedResource.projectToUse),
|
|
mexporter.WithMetricDescriptorTypeFormatter(metricFormatter),
|
|
mexporter.WithCreateServiceTimeSeries(),
|
|
mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})}
|
|
exporter, err := mexporter.New(meOpts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Metric views update histogram boundaries to be relevant to GCS
|
|
// otherwise default OTel histogram boundaries are used.
|
|
metricViews := []metric.View{
|
|
createHistogramView("grpc.client.attempt.duration", latencyHistogramBoundaries()),
|
|
createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()),
|
|
createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries()),
|
|
}
|
|
provider := metric.NewMeterProvider(
|
|
metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(time.Minute))),
|
|
metric.WithResource(preparedResource.resource),
|
|
metric.WithView(metricViews...),
|
|
)
|
|
mo := opentelemetry.MetricsOptions{
|
|
MeterProvider: provider,
|
|
Metrics: opentelemetry.DefaultMetrics().Add(
|
|
"grpc.lb.wrr.rr_fallback",
|
|
"grpc.lb.wrr.endpoint_weight_not_yet_usable",
|
|
"grpc.lb.wrr.endpoint_weight_stale",
|
|
"grpc.lb.wrr.endpoint_weights",
|
|
"grpc.lb.rls.cache_entries",
|
|
"grpc.lb.rls.cache_size",
|
|
"grpc.lb.rls.default_target_picks",
|
|
"grpc.lb.rls.target_picks",
|
|
"grpc.lb.rls.failed_picks"),
|
|
OptionalLabels: []string{"grpc.lb.locality"},
|
|
}
|
|
opts := []option.ClientOption{
|
|
option.WithGRPCDialOption(opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})),
|
|
option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})),
|
|
}
|
|
context := &metricsContext{
|
|
project: preparedResource.projectToUse,
|
|
clientOpts: opts,
|
|
provider: provider,
|
|
close: createShutdown(ctx, provider),
|
|
}
|
|
return context, nil
|
|
}
|
|
|
|
func enableClientMetrics(ctx context.Context, s *settings) (*metricsContext, error) {
|
|
var project string
|
|
c, err := transport.Creds(ctx, s.clientOption...)
|
|
if err == nil {
|
|
project = c.ProjectID
|
|
}
|
|
// Enable client-side metrics for gRPC
|
|
metricsContext, err := newGRPCMetricContext(ctx, project)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("gRPC Metrics: %w", err)
|
|
}
|
|
return metricsContext, nil
|
|
}
|
|
|
|
func createShutdown(ctx context.Context, provider *metric.MeterProvider) func() {
|
|
return func() {
|
|
provider.Shutdown(ctx)
|
|
}
|
|
}
|
|
|
|
// Silences permission errors after initial error is emitted to prevent
|
|
// chatty logs.
|
|
type exporterLogSuppressor struct {
|
|
exporter metric.Exporter
|
|
emittedFailure bool
|
|
}
|
|
|
|
// Implements OTel SDK metric.Exporter interface to prevent noisy logs from
|
|
// lack of credentials after initial failure.
|
|
// https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric@v1.28.0#Exporter
|
|
func (e *exporterLogSuppressor) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
|
|
if err := e.exporter.Export(ctx, rm); err != nil && !e.emittedFailure {
|
|
if strings.Contains(err.Error(), "PermissionDenied") {
|
|
e.emittedFailure = true
|
|
return fmt.Errorf("gRPC metrics failed due permission issue: %w", err)
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *exporterLogSuppressor) Temporality(k metric.InstrumentKind) metricdata.Temporality {
|
|
return e.exporter.Temporality(k)
|
|
}
|
|
|
|
func (e *exporterLogSuppressor) Aggregation(k metric.InstrumentKind) metric.Aggregation {
|
|
return e.exporter.Aggregation(k)
|
|
}
|
|
|
|
func (e *exporterLogSuppressor) ForceFlush(ctx context.Context) error {
|
|
return e.exporter.ForceFlush(ctx)
|
|
}
|
|
|
|
func (e *exporterLogSuppressor) Shutdown(ctx context.Context) error {
|
|
return e.exporter.Shutdown(ctx)
|
|
}
|