Files
Milos Gajdos 3996413f46 Bump google storage module
Also bump the golangci version

Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
2024-10-26 18:19:46 +01:00

276 lines
9.4 KiB
Go

// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"fmt"
"log"
"strings"
"time"
mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"github.com/google/uuid"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
"google.golang.org/api/option"
"google.golang.org/api/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/stats/opentelemetry"
)
const (
monitoredResourceName = "storage.googleapis.com/Client"
metricPrefix = "storage.googleapis.com/client/"
)
func latencyHistogramBoundaries() []float64 {
boundaries := []float64{}
boundary := 0.0
increment := 0.002
// 2ms buckets for first 100ms, so we can have higher resolution for uploads and downloads in the 100 KiB range
for i := 0; i < 50; i++ {
boundaries = append(boundaries, boundary)
// increment by 2ms
boundary += increment
}
// For the remaining buckets do 10 10ms, 10 20ms, and so on, up until 5 minutes
for i := 0; i < 150 && boundary < 300; i++ {
boundaries = append(boundaries, boundary)
if i != 0 && i%10 == 0 {
increment *= 2
}
boundary += increment
}
return boundaries
}
func sizeHistogramBoundaries() []float64 {
kb := 1024.0
mb := 1024.0 * kb
gb := 1024.0 * mb
boundaries := []float64{}
boundary := 0.0
increment := 128 * kb
// 128 KiB increments up to 4MiB, then exponential growth
for len(boundaries) < 200 && boundary <= 16*gb {
boundaries = append(boundaries, boundary)
boundary += increment
if boundary >= 4*mb {
increment *= 2
}
}
return boundaries
}
func metricFormatter(m metricdata.Metrics) string {
return metricPrefix + strings.ReplaceAll(string(m.Name), ".", "/")
}
func gcpAttributeExpectedDefaults() []attribute.KeyValue {
return []attribute.KeyValue{
{Key: "location", Value: attribute.StringValue("global")},
{Key: "cloud_platform", Value: attribute.StringValue("unknown")},
{Key: "host_id", Value: attribute.StringValue("unknown")}}
}
// Added to help with tests
type preparedResource struct {
projectToUse string
resource *resource.Resource
}
func newPreparedResource(ctx context.Context, project string, resourceOptions []resource.Option) (*preparedResource, error) {
detectedAttrs, err := resource.New(ctx, resourceOptions...)
if err != nil {
return nil, err
}
preparedResource := &preparedResource{}
s := detectedAttrs.Set()
p, present := s.Value("cloud.account.id")
if present {
preparedResource.projectToUse = p.AsString()
} else {
preparedResource.projectToUse = project
}
updates := []attribute.KeyValue{}
for _, kv := range gcpAttributeExpectedDefaults() {
if val, present := s.Value(kv.Key); !present || val.AsString() == "" {
updates = append(updates, attribute.KeyValue{Key: kv.Key, Value: kv.Value})
}
}
r, err := resource.New(
ctx,
resource.WithAttributes(
attribute.KeyValue{Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)},
attribute.KeyValue{Key: "instance_id", Value: attribute.StringValue(uuid.New().String())},
attribute.KeyValue{Key: "project_id", Value: attribute.StringValue(project)},
attribute.KeyValue{Key: "api", Value: attribute.StringValue("grpc")},
),
resource.WithAttributes(detectedAttrs.Attributes()...),
// Last duplicate key / value wins
resource.WithAttributes(updates...),
)
if err != nil {
return nil, err
}
preparedResource.resource = r
return preparedResource, nil
}
type metricsContext struct {
// project used by exporter
project string
// client options passed to gRPC channels
clientOpts []option.ClientOption
// instance of metric reader used by gRPC client-side metrics
provider *metric.MeterProvider
// clean func to call when closing gRPC client
close func()
}
func createHistogramView(name string, boundaries []float64) metric.View {
return metric.NewView(metric.Instrument{
Name: name,
Kind: metric.InstrumentKindHistogram,
}, metric.Stream{
Name: name,
Aggregation: metric.AggregationExplicitBucketHistogram{Boundaries: boundaries},
})
}
func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext, error) {
preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())})
if err != nil {
return nil, err
}
// Implementation requires a project, if one is not determined possibly user
// credentials. Then we will fail stating gRPC Metrics require a project-id.
if project == "" && preparedResource.projectToUse != "" {
return nil, fmt.Errorf("google cloud project is required to start client-side metrics")
}
// If projectTouse isn't the same as project provided to Storage client, then
// emit a log stating which project is being used to emit metrics to.
if project != preparedResource.projectToUse {
log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project)
}
meOpts := []mexporter.Option{
mexporter.WithProjectID(preparedResource.projectToUse),
mexporter.WithMetricDescriptorTypeFormatter(metricFormatter),
mexporter.WithCreateServiceTimeSeries(),
mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})}
exporter, err := mexporter.New(meOpts...)
if err != nil {
return nil, err
}
// Metric views update histogram boundaries to be relevant to GCS
// otherwise default OTel histogram boundaries are used.
metricViews := []metric.View{
createHistogramView("grpc.client.attempt.duration", latencyHistogramBoundaries()),
createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()),
createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries()),
}
provider := metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(time.Minute))),
metric.WithResource(preparedResource.resource),
metric.WithView(metricViews...),
)
mo := opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics().Add(
"grpc.lb.wrr.rr_fallback",
"grpc.lb.wrr.endpoint_weight_not_yet_usable",
"grpc.lb.wrr.endpoint_weight_stale",
"grpc.lb.wrr.endpoint_weights",
"grpc.lb.rls.cache_entries",
"grpc.lb.rls.cache_size",
"grpc.lb.rls.default_target_picks",
"grpc.lb.rls.target_picks",
"grpc.lb.rls.failed_picks"),
OptionalLabels: []string{"grpc.lb.locality"},
}
opts := []option.ClientOption{
option.WithGRPCDialOption(opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})),
option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})),
}
context := &metricsContext{
project: preparedResource.projectToUse,
clientOpts: opts,
provider: provider,
close: createShutdown(ctx, provider),
}
return context, nil
}
func enableClientMetrics(ctx context.Context, s *settings) (*metricsContext, error) {
var project string
c, err := transport.Creds(ctx, s.clientOption...)
if err == nil {
project = c.ProjectID
}
// Enable client-side metrics for gRPC
metricsContext, err := newGRPCMetricContext(ctx, project)
if err != nil {
return nil, fmt.Errorf("gRPC Metrics: %w", err)
}
return metricsContext, nil
}
func createShutdown(ctx context.Context, provider *metric.MeterProvider) func() {
return func() {
provider.Shutdown(ctx)
}
}
// Silences permission errors after initial error is emitted to prevent
// chatty logs.
type exporterLogSuppressor struct {
exporter metric.Exporter
emittedFailure bool
}
// Implements OTel SDK metric.Exporter interface to prevent noisy logs from
// lack of credentials after initial failure.
// https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric@v1.28.0#Exporter
func (e *exporterLogSuppressor) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if err := e.exporter.Export(ctx, rm); err != nil && !e.emittedFailure {
if strings.Contains(err.Error(), "PermissionDenied") {
e.emittedFailure = true
return fmt.Errorf("gRPC metrics failed due permission issue: %w", err)
}
return err
}
return nil
}
func (e *exporterLogSuppressor) Temporality(k metric.InstrumentKind) metricdata.Temporality {
return e.exporter.Temporality(k)
}
func (e *exporterLogSuppressor) Aggregation(k metric.InstrumentKind) metric.Aggregation {
return e.exporter.Aggregation(k)
}
func (e *exporterLogSuppressor) ForceFlush(ctx context.Context) error {
return e.exporter.ForceFlush(ctx)
}
func (e *exporterLogSuppressor) Shutdown(ctx context.Context) error {
return e.exporter.Shutdown(ctx)
}