Bump google storage module

Also bump the golangci version

Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
This commit is contained in:
Milos Gajdos
2024-10-26 18:06:01 +01:00
parent 1c26d98fbe
commit 3996413f46
1501 changed files with 448261 additions and 47001 deletions

196
vendor/google.golang.org/grpc/orca/call_metrics.go generated vendored Normal file
View File

@@ -0,0 +1,196 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package orca
import (
"context"
"sync"
"google.golang.org/grpc"
grpcinternal "google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/orca/internal"
"google.golang.org/protobuf/proto"
)
// CallMetricsRecorder allows a service method handler to record per-RPC
// metrics. It contains all utilization-based metrics from
// ServerMetricsRecorder as well as additional request cost metrics.
type CallMetricsRecorder interface {
ServerMetricsRecorder
// SetRequestCost sets the relevant server metric.
SetRequestCost(name string, val float64)
// DeleteRequestCost deletes the relevant server metric to prevent it
// from being sent.
DeleteRequestCost(name string)
// SetNamedMetric sets the relevant server metric.
SetNamedMetric(name string, val float64)
// DeleteNamedMetric deletes the relevant server metric to prevent it
// from being sent.
DeleteNamedMetric(name string)
}
type callMetricsRecorderCtxKey struct{}
// CallMetricsRecorderFromContext returns the RPC-specific custom metrics
// recorder embedded in the provided RPC context.
//
// Returns nil if no custom metrics recorder is found in the provided context,
// which will be the case when custom metrics reporting is not enabled.
func CallMetricsRecorderFromContext(ctx context.Context) CallMetricsRecorder {
rw, ok := ctx.Value(callMetricsRecorderCtxKey{}).(*recorderWrapper)
if !ok {
return nil
}
return rw.recorder()
}
// recorderWrapper is a wrapper around a CallMetricsRecorder to ensure that
// concurrent calls to CallMetricsRecorderFromContext() results in only one
// allocation of the underlying metrics recorder, while also allowing for lazy
// initialization of the recorder itself.
type recorderWrapper struct {
once sync.Once
r CallMetricsRecorder
smp ServerMetricsProvider
}
func (rw *recorderWrapper) recorder() CallMetricsRecorder {
rw.once.Do(func() {
rw.r = newServerMetricsRecorder()
})
return rw.r
}
// setTrailerMetadata adds a trailer metadata entry with key being set to
// `internal.TrailerMetadataKey` and value being set to the binary-encoded
// orca.OrcaLoadReport protobuf message.
//
// This function is called from the unary and streaming interceptors defined
// above. Any errors encountered here are not propagated to the caller because
// they are ignored there. Hence we simply log any errors encountered here at
// warning level, and return nothing.
func (rw *recorderWrapper) setTrailerMetadata(ctx context.Context) {
var sm *ServerMetrics
if rw.smp != nil {
sm = rw.smp.ServerMetrics()
sm.merge(rw.r.ServerMetrics())
} else {
sm = rw.r.ServerMetrics()
}
b, err := proto.Marshal(sm.toLoadReportProto())
if err != nil {
logger.Warningf("Failed to marshal load report: %v", err)
return
}
if err := grpc.SetTrailer(ctx, metadata.Pairs(internal.TrailerMetadataKey, string(b))); err != nil {
logger.Warningf("Failed to set trailer metadata: %v", err)
}
}
var joinServerOptions = grpcinternal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
// CallMetricsServerOption returns a server option which enables the reporting
// of per-RPC custom backend metrics for unary and streaming RPCs.
//
// Server applications interested in injecting custom backend metrics should
// pass the server option returned from this function as the first argument to
// grpc.NewServer().
//
// Subsequently, server RPC handlers can retrieve a reference to the RPC
// specific custom metrics recorder [CallMetricsRecorder] to be used, via a call
// to CallMetricsRecorderFromContext(), and inject custom metrics at any time
// during the RPC lifecycle.
//
// The injected custom metrics will be sent as part of trailer metadata, as a
// binary-encoded [ORCA LoadReport] protobuf message, with the metadata key
// being set be "endpoint-load-metrics-bin".
//
// If a non-nil ServerMetricsProvider is provided, the gRPC server will
// transmit the metrics it provides, overwritten by any per-RPC metrics given
// to the CallMetricsRecorder. A ServerMetricsProvider is typically obtained
// by calling NewServerMetricsRecorder.
//
// [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15
func CallMetricsServerOption(smp ServerMetricsProvider) grpc.ServerOption {
return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt(smp)), grpc.ChainStreamInterceptor(streamInt(smp)))
}
func unaryInt(smp ServerMetricsProvider) func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
// We don't allocate the metric recorder here. It will be allocated the
// first time the user calls CallMetricsRecorderFromContext().
rw := &recorderWrapper{smp: smp}
ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw)
resp, err := handler(ctxWithRecorder, req)
// It is safe to access the underlying metric recorder inside the wrapper at
// this point, as the user's RPC handler is done executing, and therefore
// there will be no more calls to CallMetricsRecorderFromContext(), which is
// where the metric recorder is lazy allocated.
if rw.r != nil {
rw.setTrailerMetadata(ctx)
}
return resp, err
}
}
func streamInt(smp ServerMetricsProvider) func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// We don't allocate the metric recorder here. It will be allocated the
// first time the user calls CallMetricsRecorderFromContext().
rw := &recorderWrapper{smp: smp}
ws := &wrappedStream{
ServerStream: ss,
ctx: newContextWithRecorderWrapper(ss.Context(), rw),
}
err := handler(srv, ws)
// It is safe to access the underlying metric recorder inside the wrapper at
// this point, as the user's RPC handler is done executing, and therefore
// there will be no more calls to CallMetricsRecorderFromContext(), which is
// where the metric recorder is lazy allocated.
if rw.r != nil {
rw.setTrailerMetadata(ss.Context())
}
return err
}
}
func newContextWithRecorderWrapper(ctx context.Context, r *recorderWrapper) context.Context {
return context.WithValue(ctx, callMetricsRecorderCtxKey{}, r)
}
// wrappedStream wraps the grpc.ServerStream received by the streaming
// interceptor. Overrides only the Context() method to return a context which
// contains a reference to the CallMetricsRecorder corresponding to this
// stream.
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}

View File

@@ -0,0 +1,71 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package internal contains orca-internal code, for testing purposes and to
// avoid polluting the godoc of the top-level orca package.
package internal
import (
"errors"
"fmt"
ibackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
// AllowAnyMinReportingInterval prevents clamping of the MinReportingInterval
// configured via ServiceOptions, to a minimum of 30s.
//
// For testing purposes only.
var AllowAnyMinReportingInterval any // func(*ServiceOptions)
// DefaultBackoffFunc is used by the producer to control its backoff behavior.
//
// For testing purposes only.
var DefaultBackoffFunc = ibackoff.DefaultExponential.Backoff
// TrailerMetadataKey is the key in which the per-call backend metrics are
// transmitted.
const TrailerMetadataKey = "endpoint-load-metrics-bin"
// ToLoadReport unmarshals a binary encoded [ORCA LoadReport] protobuf message
// from md and returns the corresponding struct. The load report is expected to
// be stored as the value for key "endpoint-load-metrics-bin".
//
// If no load report was found in the provided metadata, if multiple load
// reports are found, or if the load report found cannot be parsed, an error is
// returned.
//
// [ORCA LoadReport]: (https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15)
func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) {
vs := md.Get(TrailerMetadataKey)
if len(vs) == 0 {
return nil, nil
}
if len(vs) != 1 {
return nil, errors.New("multiple orca load reports found in provided metadata")
}
ret := new(v3orcapb.OrcaLoadReport)
if err := proto.Unmarshal([]byte(vs[0]), ret); err != nil {
return nil, fmt.Errorf("failed to unmarshal load report found in metadata: %v", err)
}
return ret, nil
}

57
vendor/google.golang.org/grpc/orca/orca.go generated vendored Normal file
View File

@@ -0,0 +1,57 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package orca implements Open Request Cost Aggregation, which is an open
// standard for request cost aggregation and reporting by backends and the
// corresponding aggregation of such reports by L7 load balancers (such as
// Envoy) on the data plane. In a proxyless world with gRPC enabled
// applications, aggregation of such reports will be done by the gRPC client.
//
// # Experimental
//
// Notice: All APIs is this package are EXPERIMENTAL and may be changed or
// removed in a later release.
package orca
import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/orca/internal"
)
var logger = grpclog.Component("orca-backend-metrics")
// loadParser implements the Parser interface defined in `internal/balancerload`
// package. This interface is used by the client stream to parse load reports
// sent by the server in trailer metadata. The parsed loads are then sent to
// balancers via balancer.DoneInfo.
//
// The grpc package cannot directly call toLoadReport() as that would cause an
// import cycle. Hence this roundabout method is used.
type loadParser struct{}
func (loadParser) Parse(md metadata.MD) any {
lr, err := internal.ToLoadReport(md)
if err != nil {
logger.Infof("Parse failed: %v", err)
}
return lr
}
func init() {
balancerload.SetParser(loadParser{})
}

223
vendor/google.golang.org/grpc/orca/producer.go generated vendored Normal file
View File

@@ -0,0 +1,223 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package orca
import (
"context"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/status"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
"google.golang.org/protobuf/types/known/durationpb"
)
type producerBuilder struct{}
// Build constructs and returns a producer and its cleanup function
func (*producerBuilder) Build(cci any) (balancer.Producer, func()) {
p := &producer{
client: v3orcaservicegrpc.NewOpenRcaServiceClient(cci.(grpc.ClientConnInterface)),
intervals: make(map[time.Duration]int),
listeners: make(map[OOBListener]struct{}),
backoff: internal.DefaultBackoffFunc,
}
return p, func() {
<-p.stopped
}
}
var producerBuilderSingleton = &producerBuilder{}
// OOBListener is used to receive out-of-band load reports as they arrive.
type OOBListener interface {
// OnLoadReport is called when a load report is received.
OnLoadReport(*v3orcapb.OrcaLoadReport)
}
// OOBListenerOptions contains options to control how an OOBListener is called.
type OOBListenerOptions struct {
// ReportInterval specifies how often to request the server to provide a
// load report. May be provided less frequently if the server requires a
// longer interval, or may be provided more frequently if another
// subscriber requests a shorter interval.
ReportInterval time.Duration
}
// RegisterOOBListener registers an out-of-band load report listener on sc.
// Any OOBListener may only be registered once per subchannel at a time. The
// returned stop function must be called when no longer needed. Do not
// register a single OOBListener more than once per SubConn.
func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) {
pr, close := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*producer)
p.registerListener(l, opts.ReportInterval)
// TODO: When we can register for SubConn state updates, automatically call
// stop() on SHUTDOWN.
// If stop is called multiple times, prevent it from having any effect on
// subsequent calls.
return grpcsync.OnceFunc(func() {
p.unregisterListener(l, opts.ReportInterval)
close()
})
}
type producer struct {
client v3orcaservicegrpc.OpenRcaServiceClient
// backoff is called between stream attempts to determine how long to delay
// to avoid overloading a server experiencing problems. The attempt count
// is incremented when stream errors occur and is reset when the stream
// reports a result.
backoff func(int) time.Duration
mu sync.Mutex
intervals map[time.Duration]int // map from interval time to count of listeners requesting that time
listeners map[OOBListener]struct{} // set of registered listeners
minInterval time.Duration
stop func() // stops the current run goroutine
stopped chan struct{} // closed when the run goroutine exits
}
// registerListener adds the listener and its requested report interval to the
// producer.
func (p *producer) registerListener(l OOBListener, interval time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
p.listeners[l] = struct{}{}
p.intervals[interval]++
if len(p.listeners) == 1 || interval < p.minInterval {
p.minInterval = interval
p.updateRunLocked()
}
}
// registerListener removes the listener and its requested report interval to
// the producer.
func (p *producer) unregisterListener(l OOBListener, interval time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.listeners, l)
p.intervals[interval]--
if p.intervals[interval] == 0 {
delete(p.intervals, interval)
if p.minInterval == interval {
p.recomputeMinInterval()
p.updateRunLocked()
}
}
}
// recomputeMinInterval sets p.minInterval to the minimum key's value in
// p.intervals.
func (p *producer) recomputeMinInterval() {
first := true
for interval := range p.intervals {
if first || interval < p.minInterval {
p.minInterval = interval
first = false
}
}
}
// updateRunLocked is called whenever the run goroutine needs to be started /
// stopped / restarted due to: 1. the initial listener being registered, 2. the
// final listener being unregistered, or 3. the minimum registered interval
// changing.
func (p *producer) updateRunLocked() {
if p.stop != nil {
p.stop()
p.stop = nil
}
if len(p.listeners) > 0 {
var ctx context.Context
ctx, p.stop = context.WithCancel(context.Background())
p.stopped = make(chan struct{})
go p.run(ctx, p.stopped, p.minInterval)
}
}
// run manages the ORCA OOB stream on the subchannel.
func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Duration) {
defer close(done)
runStream := func() error {
resetBackoff, err := p.runStream(ctx, interval)
if status.Code(err) == codes.Unimplemented {
// Unimplemented; do not retry.
logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.")
return err
}
// Retry for all other errors.
if code := status.Code(err); code != codes.Unavailable && code != codes.Canceled {
// TODO: Unavailable and Canceled should also ideally log an error,
// but for now we receive them when shutting down the ClientConn
// (Unavailable if the stream hasn't started yet, and Canceled if it
// happens mid-stream). Once we can determine the state or ensure
// the producer is stopped before the stream ends, we can log an
// error when it's not a natural shutdown.
logger.Error("Received unexpected stream error:", err)
}
if resetBackoff {
return backoff.ErrResetBackoff
}
return nil
}
backoff.RunF(ctx, runStream, p.backoff)
}
// runStream runs a single stream on the subchannel and returns the resulting
// error, if any, and whether or not the run loop should reset the backoff
// timer to zero or advance it.
func (p *producer) runStream(ctx context.Context, interval time.Duration) (resetBackoff bool, err error) {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := p.client.StreamCoreMetrics(streamCtx, &v3orcaservicepb.OrcaLoadReportRequest{
ReportInterval: durationpb.New(interval),
})
if err != nil {
return false, err
}
for {
report, err := stream.Recv()
if err != nil {
return resetBackoff, err
}
resetBackoff = true
p.mu.Lock()
for l := range p.listeners {
l.OnLoadReport(report)
}
p.mu.Unlock()
}
}

352
vendor/google.golang.org/grpc/orca/server_metrics.go generated vendored Normal file
View File

@@ -0,0 +1,352 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package orca
import (
"sync/atomic"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
// ServerMetrics is the data returned from a server to a client to describe the
// current state of the server and/or the cost of a request when used per-call.
type ServerMetrics struct {
CPUUtilization float64 // CPU utilization: [0, inf); unset=-1
MemUtilization float64 // Memory utilization: [0, 1.0]; unset=-1
AppUtilization float64 // Application utilization: [0, inf); unset=-1
QPS float64 // queries per second: [0, inf); unset=-1
EPS float64 // errors per second: [0, inf); unset=-1
// The following maps must never be nil.
Utilization map[string]float64 // Custom fields: [0, 1.0]
RequestCost map[string]float64 // Custom fields: [0, inf); not sent OOB
NamedMetrics map[string]float64 // Custom fields: [0, inf); not sent OOB
}
// toLoadReportProto dumps sm as an OrcaLoadReport proto.
func (sm *ServerMetrics) toLoadReportProto() *v3orcapb.OrcaLoadReport {
ret := &v3orcapb.OrcaLoadReport{
Utilization: sm.Utilization,
RequestCost: sm.RequestCost,
NamedMetrics: sm.NamedMetrics,
}
if sm.CPUUtilization != -1 {
ret.CpuUtilization = sm.CPUUtilization
}
if sm.MemUtilization != -1 {
ret.MemUtilization = sm.MemUtilization
}
if sm.AppUtilization != -1 {
ret.ApplicationUtilization = sm.AppUtilization
}
if sm.QPS != -1 {
ret.RpsFractional = sm.QPS
}
if sm.EPS != -1 {
ret.Eps = sm.EPS
}
return ret
}
// merge merges o into sm, overwriting any values present in both.
func (sm *ServerMetrics) merge(o *ServerMetrics) {
mergeMap(sm.Utilization, o.Utilization)
mergeMap(sm.RequestCost, o.RequestCost)
mergeMap(sm.NamedMetrics, o.NamedMetrics)
if o.CPUUtilization != -1 {
sm.CPUUtilization = o.CPUUtilization
}
if o.MemUtilization != -1 {
sm.MemUtilization = o.MemUtilization
}
if o.AppUtilization != -1 {
sm.AppUtilization = o.AppUtilization
}
if o.QPS != -1 {
sm.QPS = o.QPS
}
if o.EPS != -1 {
sm.EPS = o.EPS
}
}
func mergeMap(a, b map[string]float64) {
for k, v := range b {
a[k] = v
}
}
// ServerMetricsRecorder allows for recording and providing out of band server
// metrics.
type ServerMetricsRecorder interface {
ServerMetricsProvider
// SetCPUUtilization sets the CPU utilization server metric. Must be
// greater than zero.
SetCPUUtilization(float64)
// DeleteCPUUtilization deletes the CPU utilization server metric to
// prevent it from being sent.
DeleteCPUUtilization()
// SetMemoryUtilization sets the memory utilization server metric. Must be
// in the range [0, 1].
SetMemoryUtilization(float64)
// DeleteMemoryUtilization deletes the memory utilization server metric to
// prevent it from being sent.
DeleteMemoryUtilization()
// SetApplicationUtilization sets the application utilization server
// metric. Must be greater than zero.
SetApplicationUtilization(float64)
// DeleteApplicationUtilization deletes the application utilization server
// metric to prevent it from being sent.
DeleteApplicationUtilization()
// SetQPS sets the Queries Per Second server metric. Must be greater than
// zero.
SetQPS(float64)
// DeleteQPS deletes the Queries Per Second server metric to prevent it
// from being sent.
DeleteQPS()
// SetEPS sets the Errors Per Second server metric. Must be greater than
// zero.
SetEPS(float64)
// DeleteEPS deletes the Errors Per Second server metric to prevent it from
// being sent.
DeleteEPS()
// SetNamedUtilization sets the named utilization server metric for the
// name provided. val must be in the range [0, 1].
SetNamedUtilization(name string, val float64)
// DeleteNamedUtilization deletes the named utilization server metric for
// the name provided to prevent it from being sent.
DeleteNamedUtilization(name string)
}
type serverMetricsRecorder struct {
state atomic.Pointer[ServerMetrics] // the current metrics
}
// NewServerMetricsRecorder returns an in-memory store for ServerMetrics and
// allows for safe setting and retrieving of ServerMetrics. Also implements
// ServerMetricsProvider for use with NewService.
func NewServerMetricsRecorder() ServerMetricsRecorder {
return newServerMetricsRecorder()
}
func newServerMetricsRecorder() *serverMetricsRecorder {
s := new(serverMetricsRecorder)
s.state.Store(&ServerMetrics{
CPUUtilization: -1,
MemUtilization: -1,
AppUtilization: -1,
QPS: -1,
EPS: -1,
Utilization: make(map[string]float64),
RequestCost: make(map[string]float64),
NamedMetrics: make(map[string]float64),
})
return s
}
// ServerMetrics returns a copy of the current ServerMetrics.
func (s *serverMetricsRecorder) ServerMetrics() *ServerMetrics {
return copyServerMetrics(s.state.Load())
}
func copyMap(m map[string]float64) map[string]float64 {
ret := make(map[string]float64, len(m))
for k, v := range m {
ret[k] = v
}
return ret
}
func copyServerMetrics(sm *ServerMetrics) *ServerMetrics {
return &ServerMetrics{
CPUUtilization: sm.CPUUtilization,
MemUtilization: sm.MemUtilization,
AppUtilization: sm.AppUtilization,
QPS: sm.QPS,
EPS: sm.EPS,
Utilization: copyMap(sm.Utilization),
RequestCost: copyMap(sm.RequestCost),
NamedMetrics: copyMap(sm.NamedMetrics),
}
}
// SetCPUUtilization records a measurement for the CPU utilization metric.
func (s *serverMetricsRecorder) SetCPUUtilization(val float64) {
if val < 0 {
if logger.V(2) {
logger.Infof("Ignoring CPU Utilization value out of range: %v", val)
}
return
}
smCopy := copyServerMetrics(s.state.Load())
smCopy.CPUUtilization = val
s.state.Store(smCopy)
}
// DeleteCPUUtilization deletes the relevant server metric to prevent it from
// being sent.
func (s *serverMetricsRecorder) DeleteCPUUtilization() {
smCopy := copyServerMetrics(s.state.Load())
smCopy.CPUUtilization = -1
s.state.Store(smCopy)
}
// SetMemoryUtilization records a measurement for the memory utilization metric.
func (s *serverMetricsRecorder) SetMemoryUtilization(val float64) {
if val < 0 || val > 1 {
if logger.V(2) {
logger.Infof("Ignoring Memory Utilization value out of range: %v", val)
}
return
}
smCopy := copyServerMetrics(s.state.Load())
smCopy.MemUtilization = val
s.state.Store(smCopy)
}
// DeleteMemoryUtilization deletes the relevant server metric to prevent it
// from being sent.
func (s *serverMetricsRecorder) DeleteMemoryUtilization() {
smCopy := copyServerMetrics(s.state.Load())
smCopy.MemUtilization = -1
s.state.Store(smCopy)
}
// SetApplicationUtilization records a measurement for a generic utilization
// metric.
func (s *serverMetricsRecorder) SetApplicationUtilization(val float64) {
if val < 0 {
if logger.V(2) {
logger.Infof("Ignoring Application Utilization value out of range: %v", val)
}
return
}
smCopy := copyServerMetrics(s.state.Load())
smCopy.AppUtilization = val
s.state.Store(smCopy)
}
// DeleteApplicationUtilization deletes the relevant server metric to prevent
// it from being sent.
func (s *serverMetricsRecorder) DeleteApplicationUtilization() {
smCopy := copyServerMetrics(s.state.Load())
smCopy.AppUtilization = -1
s.state.Store(smCopy)
}
// SetQPS records a measurement for the QPS metric.
func (s *serverMetricsRecorder) SetQPS(val float64) {
if val < 0 {
if logger.V(2) {
logger.Infof("Ignoring QPS value out of range: %v", val)
}
return
}
smCopy := copyServerMetrics(s.state.Load())
smCopy.QPS = val
s.state.Store(smCopy)
}
// DeleteQPS deletes the relevant server metric to prevent it from being sent.
func (s *serverMetricsRecorder) DeleteQPS() {
smCopy := copyServerMetrics(s.state.Load())
smCopy.QPS = -1
s.state.Store(smCopy)
}
// SetEPS records a measurement for the EPS metric.
func (s *serverMetricsRecorder) SetEPS(val float64) {
if val < 0 {
if logger.V(2) {
logger.Infof("Ignoring EPS value out of range: %v", val)
}
return
}
smCopy := copyServerMetrics(s.state.Load())
smCopy.EPS = val
s.state.Store(smCopy)
}
// DeleteEPS deletes the relevant server metric to prevent it from being sent.
func (s *serverMetricsRecorder) DeleteEPS() {
smCopy := copyServerMetrics(s.state.Load())
smCopy.EPS = -1
s.state.Store(smCopy)
}
// SetNamedUtilization records a measurement for a utilization metric uniquely
// identifiable by name.
func (s *serverMetricsRecorder) SetNamedUtilization(name string, val float64) {
if val < 0 || val > 1 {
if logger.V(2) {
logger.Infof("Ignoring Named Utilization value out of range: %v", val)
}
return
}
smCopy := copyServerMetrics(s.state.Load())
smCopy.Utilization[name] = val
s.state.Store(smCopy)
}
// DeleteNamedUtilization deletes any previously recorded measurement for a
// utilization metric uniquely identifiable by name.
func (s *serverMetricsRecorder) DeleteNamedUtilization(name string) {
smCopy := copyServerMetrics(s.state.Load())
delete(smCopy.Utilization, name)
s.state.Store(smCopy)
}
// SetRequestCost records a measurement for a utilization metric uniquely
// identifiable by name.
func (s *serverMetricsRecorder) SetRequestCost(name string, val float64) {
smCopy := copyServerMetrics(s.state.Load())
smCopy.RequestCost[name] = val
s.state.Store(smCopy)
}
// DeleteRequestCost deletes any previously recorded measurement for a
// utilization metric uniquely identifiable by name.
func (s *serverMetricsRecorder) DeleteRequestCost(name string) {
smCopy := copyServerMetrics(s.state.Load())
delete(smCopy.RequestCost, name)
s.state.Store(smCopy)
}
// SetNamedMetric records a measurement for a utilization metric uniquely
// identifiable by name.
func (s *serverMetricsRecorder) SetNamedMetric(name string, val float64) {
smCopy := copyServerMetrics(s.state.Load())
smCopy.NamedMetrics[name] = val
s.state.Store(smCopy)
}
// DeleteNamedMetric deletes any previously recorded measurement for a
// utilization metric uniquely identifiable by name.
func (s *serverMetricsRecorder) DeleteNamedMetric(name string) {
smCopy := copyServerMetrics(s.state.Load())
delete(smCopy.NamedMetrics, name)
s.state.Store(smCopy)
}

163
vendor/google.golang.org/grpc/orca/service.go generated vendored Normal file
View File

@@ -0,0 +1,163 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package orca
import (
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
ointernal "google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/status"
v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
)
func init() {
ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
so.allowAnyMinReportingInterval = true
}
internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval
}
// minReportingInterval is the absolute minimum value supported for
// out-of-band metrics reporting from the ORCA service implementation
// provided by the orca package.
const minReportingInterval = 30 * time.Second
// Service provides an implementation of the OpenRcaService as defined in the
// [ORCA] service protos. Instances of this type must be created via calls to
// Register() or NewService().
//
// Server applications can use the SetXxx() and DeleteXxx() methods to record
// measurements corresponding to backend metrics, which eventually get pushed to
// clients who have initiated the SteamCoreMetrics streaming RPC.
//
// [ORCA]: https://github.com/cncf/xds/blob/main/xds/service/orca/v3/orca.proto
type Service struct {
v3orcaservicegrpc.UnimplementedOpenRcaServiceServer
// Minimum reporting interval, as configured by the user, or the default.
minReportingInterval time.Duration
smProvider ServerMetricsProvider
}
// ServiceOptions contains options to configure the ORCA service implementation.
type ServiceOptions struct {
// ServerMetricsProvider is the provider to be used by the service for
// reporting OOB server metrics to clients. Typically obtained via
// NewServerMetricsRecorder. This field is required.
ServerMetricsProvider ServerMetricsProvider
// MinReportingInterval sets the lower bound for how often out-of-band
// metrics are reported on the streaming RPC initiated by the client. If
// unspecified, negative or less than the default value of 30s, the default
// is used. Clients may request a higher value as part of the
// StreamCoreMetrics streaming RPC.
MinReportingInterval time.Duration
// Allow a minReportingInterval which is less than the default of 30s.
// Used for testing purposes only.
allowAnyMinReportingInterval bool
}
// A ServerMetricsProvider provides ServerMetrics upon request.
type ServerMetricsProvider interface {
// ServerMetrics returns the current set of server metrics. It should
// return a read-only, immutable copy of the data that is active at the
// time of the call.
ServerMetrics() *ServerMetrics
}
// NewService creates a new ORCA service implementation configured using the
// provided options.
func NewService(opts ServiceOptions) (*Service, error) {
// The default minimum supported reporting interval value can be overridden
// for testing purposes through the orca internal package.
if opts.ServerMetricsProvider == nil {
return nil, fmt.Errorf("ServerMetricsProvider not specified")
}
if !opts.allowAnyMinReportingInterval {
if opts.MinReportingInterval < 0 || opts.MinReportingInterval < minReportingInterval {
opts.MinReportingInterval = minReportingInterval
}
}
service := &Service{
minReportingInterval: opts.MinReportingInterval,
smProvider: opts.ServerMetricsProvider,
}
return service, nil
}
// Register creates a new ORCA service implementation configured using the
// provided options and registers the same on the provided grpc Server.
func Register(s *grpc.Server, opts ServiceOptions) error {
// TODO(https://github.com/cncf/xds/issues/41): replace *grpc.Server with
// grpc.ServiceRegistrar when possible.
service, err := NewService(opts)
if err != nil {
return err
}
v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, service)
return nil
}
// determineReportingInterval determines the reporting interval for out-of-band
// metrics. If the reporting interval is not specified in the request, or is
// negative or is less than the configured minimum (via
// ServiceOptions.MinReportingInterval), the latter is used. Else the value from
// the incoming request is used.
func (s *Service) determineReportingInterval(req *v3orcaservicepb.OrcaLoadReportRequest) time.Duration {
if req.GetReportInterval() == nil {
return s.minReportingInterval
}
dur := req.GetReportInterval().AsDuration()
if dur < s.minReportingInterval {
logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using minimum", dur, s.minReportingInterval)
return s.minReportingInterval
}
return dur
}
func (s *Service) sendMetricsResponse(stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
return stream.Send(s.smProvider.ServerMetrics().toLoadReportProto())
}
// StreamCoreMetrics streams custom backend metrics injected by the server
// application.
func (s *Service) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
ticker := time.NewTicker(s.determineReportingInterval(req))
defer ticker.Stop()
for {
if err := s.sendMetricsResponse(stream); err != nil {
return err
}
// Send a response containing the currently recorded metrics
select {
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Stream has ended.")
case <-ticker.C:
}
}
}