mirror of
https://github.com/distribution/distribution.git
synced 2025-09-03 07:54:41 +00:00
221 lines
8.2 KiB
Go
221 lines
8.2 KiB
Go
/*
|
|
*
|
|
* Copyright 2021 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package rls
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/balancer"
|
|
"google.golang.org/grpc/balancer/rls/internal/adaptive"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/internal"
|
|
internalgrpclog "google.golang.org/grpc/internal/grpclog"
|
|
"google.golang.org/grpc/internal/pretty"
|
|
rlsgrpc "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
|
|
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
|
|
)
|
|
|
|
var newAdaptiveThrottler = func() adaptiveThrottler { return adaptive.New() }
|
|
|
|
type adaptiveThrottler interface {
|
|
ShouldThrottle() bool
|
|
RegisterBackendResponse(throttled bool)
|
|
}
|
|
|
|
// controlChannel is a wrapper around the gRPC channel to the RLS server
|
|
// specified in the service config.
|
|
type controlChannel struct {
|
|
// rpcTimeout specifies the timeout for the RouteLookup RPC call. The LB
|
|
// policy receives this value in its service config.
|
|
rpcTimeout time.Duration
|
|
// backToReadyFunc is a callback to be invoked when the connectivity state
|
|
// changes from READY --> TRANSIENT_FAILURE --> READY.
|
|
backToReadyFunc func()
|
|
// throttler in an adaptive throttling implementation used to avoid
|
|
// hammering the RLS service while it is overloaded or down.
|
|
throttler adaptiveThrottler
|
|
|
|
cc *grpc.ClientConn
|
|
client rlsgrpc.RouteLookupServiceClient
|
|
logger *internalgrpclog.PrefixLogger
|
|
}
|
|
|
|
// newControlChannel creates a controlChannel to rlsServerName and uses
|
|
// serviceConfig, if non-empty, as the default service config for the underlying
|
|
// gRPC channel.
|
|
func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) {
|
|
ctrlCh := &controlChannel{
|
|
rpcTimeout: rpcTimeout,
|
|
backToReadyFunc: backToReadyFunc,
|
|
throttler: newAdaptiveThrottler(),
|
|
}
|
|
ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))
|
|
|
|
dopts, err := ctrlCh.dialOpts(bOpts, serviceConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctrlCh.cc, err = grpc.Dial(rlsServerName, dopts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc)
|
|
ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName)
|
|
|
|
go ctrlCh.monitorConnectivityState()
|
|
return ctrlCh, nil
|
|
}
|
|
|
|
// dialOpts constructs the dial options for the control plane channel.
|
|
func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig string) ([]grpc.DialOption, error) {
|
|
// The control plane channel will use the same authority as the parent
|
|
// channel for server authorization. This ensures that the identity of the
|
|
// RLS server and the identity of the backends is the same, so if the RLS
|
|
// config is injected by an attacker, it cannot cause leakage of private
|
|
// information contained in headers set by the application.
|
|
dopts := []grpc.DialOption{grpc.WithAuthority(bOpts.Authority)}
|
|
if bOpts.Dialer != nil {
|
|
dopts = append(dopts, grpc.WithContextDialer(bOpts.Dialer))
|
|
}
|
|
|
|
// The control channel will use the channel credentials from the parent
|
|
// channel, including any call creds associated with the channel creds.
|
|
var credsOpt grpc.DialOption
|
|
switch {
|
|
case bOpts.DialCreds != nil:
|
|
credsOpt = grpc.WithTransportCredentials(bOpts.DialCreds.Clone())
|
|
case bOpts.CredsBundle != nil:
|
|
// The "fallback" mode in google default credentials (which is the only
|
|
// type of credentials we expect to be used with RLS) uses TLS/ALTS
|
|
// creds for transport and uses the same call creds as that on the
|
|
// parent bundle.
|
|
bundle, err := bOpts.CredsBundle.NewWithMode(internal.CredsBundleModeFallback)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
credsOpt = grpc.WithCredentialsBundle(bundle)
|
|
default:
|
|
cc.logger.Warningf("no credentials available, using Insecure")
|
|
credsOpt = grpc.WithTransportCredentials(insecure.NewCredentials())
|
|
}
|
|
dopts = append(dopts, credsOpt)
|
|
|
|
// If the RLS LB policy's configuration specified a service config for the
|
|
// control channel, use that and disable service config fetching via the name
|
|
// resolver for the control channel.
|
|
if serviceConfig != "" {
|
|
cc.logger.Infof("Disabling service config from the name resolver and instead using: %s", serviceConfig)
|
|
dopts = append(dopts, grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(serviceConfig))
|
|
}
|
|
|
|
return dopts, nil
|
|
}
|
|
|
|
func (cc *controlChannel) monitorConnectivityState() {
|
|
cc.logger.Infof("Starting connectivity state monitoring goroutine")
|
|
// Since we use two mechanisms to deal with RLS server being down:
|
|
// - adaptive throttling for the channel as a whole
|
|
// - exponential backoff on a per-request basis
|
|
// we need a way to avoid double-penalizing requests by counting failures
|
|
// toward both mechanisms when the RLS server is unreachable.
|
|
//
|
|
// To accomplish this, we monitor the state of the control plane channel. If
|
|
// the state has been TRANSIENT_FAILURE since the last time it was in state
|
|
// READY, and it then transitions into state READY, we push on a channel
|
|
// which is being read by the LB policy.
|
|
//
|
|
// The LB the policy will iterate through the cache to reset the backoff
|
|
// timeouts in all cache entries. Specifically, this means that it will
|
|
// reset the backoff state and cancel the pending backoff timer. Note that
|
|
// when cancelling the backoff timer, just like when the backoff timer fires
|
|
// normally, a new picker is returned to the channel, to force it to
|
|
// re-process any wait-for-ready RPCs that may still be queued if we failed
|
|
// them while we were in backoff. However, we should optimize this case by
|
|
// returning only one new picker, regardless of how many backoff timers are
|
|
// cancelled.
|
|
|
|
// Using the background context is fine here since we check for the ClientConn
|
|
// entering SHUTDOWN and return early in that case.
|
|
ctx := context.Background()
|
|
|
|
first := true
|
|
for {
|
|
// Wait for the control channel to become READY.
|
|
for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
|
|
if s == connectivity.Shutdown {
|
|
return
|
|
}
|
|
cc.cc.WaitForStateChange(ctx, s)
|
|
}
|
|
cc.logger.Infof("Connectivity state is READY")
|
|
|
|
if !first {
|
|
cc.logger.Infof("Control channel back to READY")
|
|
cc.backToReadyFunc()
|
|
}
|
|
first = false
|
|
|
|
// Wait for the control channel to move out of READY.
|
|
cc.cc.WaitForStateChange(ctx, connectivity.Ready)
|
|
if cc.cc.GetState() == connectivity.Shutdown {
|
|
return
|
|
}
|
|
cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())
|
|
}
|
|
}
|
|
|
|
func (cc *controlChannel) close() {
|
|
cc.logger.Infof("Closing control channel")
|
|
cc.cc.Close()
|
|
}
|
|
|
|
type lookupCallback func(targets []string, headerData string, err error)
|
|
|
|
// lookup starts a RouteLookup RPC in a separate goroutine and returns the
|
|
// results (and error, if any) in the provided callback.
|
|
//
|
|
// The returned boolean indicates whether the request was throttled by the
|
|
// client-side adaptive throttling algorithm in which case the provided callback
|
|
// will not be invoked.
|
|
func (cc *controlChannel) lookup(reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string, cb lookupCallback) (throttled bool) {
|
|
if cc.throttler.ShouldThrottle() {
|
|
cc.logger.Infof("RLS request throttled by client-side adaptive throttling")
|
|
return true
|
|
}
|
|
go func() {
|
|
req := &rlspb.RouteLookupRequest{
|
|
TargetType: "grpc",
|
|
KeyMap: reqKeys,
|
|
Reason: reason,
|
|
StaleHeaderData: staleHeaders,
|
|
}
|
|
cc.logger.Infof("Sending RLS request %+v", pretty.ToJSON(req))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), cc.rpcTimeout)
|
|
defer cancel()
|
|
resp, err := cc.client.RouteLookup(ctx, req)
|
|
cb(resp.GetTargets(), resp.GetHeaderData(), err)
|
|
}()
|
|
return false
|
|
}
|