mirror of
https://github.com/distribution/distribution.git
synced 2025-09-15 22:59:24 +00:00
Upgrade go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp
Signed-off-by: krynju <krystian.gulinski@juliahub.com>
This commit is contained in:
2
vendor/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/cdsbalancer.go
generated
vendored
2
vendor/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/cdsbalancer.go
generated
vendored
@@ -364,7 +364,7 @@ func (b *cdsBalancer) closeAllWatchers() {
|
||||
// Close cancels the CDS watch, closes the child policy and closes the
|
||||
// cdsBalancer.
|
||||
func (b *cdsBalancer) Close() {
|
||||
b.serializer.TrySchedule(func(ctx context.Context) {
|
||||
b.serializer.TrySchedule(func(context.Context) {
|
||||
b.closeAllWatchers()
|
||||
|
||||
if b.childLB != nil {
|
||||
|
18
vendor/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/cluster_watcher.go
generated
vendored
18
vendor/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/cluster_watcher.go
generated
vendored
@@ -32,19 +32,19 @@ type clusterWatcher struct {
|
||||
parent *cdsBalancer
|
||||
}
|
||||
|
||||
func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) {
|
||||
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone.OnDone() }
|
||||
cw.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone)
|
||||
func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
|
||||
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
|
||||
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
|
||||
}
|
||||
|
||||
func (cw *clusterWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
|
||||
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone.OnDone() }
|
||||
cw.parent.serializer.ScheduleOr(handleError, onDone.OnDone)
|
||||
func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
|
||||
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
|
||||
cw.parent.serializer.ScheduleOr(handleError, onDone)
|
||||
}
|
||||
|
||||
func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
|
||||
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone.OnDone() }
|
||||
cw.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone)
|
||||
func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
|
||||
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
|
||||
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
|
||||
}
|
||||
|
||||
// watcherState groups the state associated with a clusterWatcher.
|
||||
|
219
vendor/google.golang.org/grpc/xds/internal/balancer/clusterimpl/clusterimpl.go
generated
vendored
219
vendor/google.golang.org/grpc/xds/internal/balancer/clusterimpl/clusterimpl.go
generated
vendored
@@ -24,6 +24,7 @@
|
||||
package clusterimpl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
@@ -33,7 +34,6 @@ import (
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/balancer/gracefulswitch"
|
||||
"google.golang.org/grpc/internal/buffer"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/pretty"
|
||||
@@ -53,7 +53,10 @@ const (
|
||||
defaultRequestCountMax = 1024
|
||||
)
|
||||
|
||||
var connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address)
|
||||
var (
|
||||
connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address)
|
||||
errBalancerClosed = fmt.Errorf("%s LB policy is closed", Name)
|
||||
)
|
||||
|
||||
func init() {
|
||||
balancer.Register(bb{})
|
||||
@@ -62,18 +65,17 @@ func init() {
|
||||
type bb struct{}
|
||||
|
||||
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
b := &clusterImplBalancer{
|
||||
ClientConn: cc,
|
||||
bOpts: bOpts,
|
||||
closed: grpcsync.NewEvent(),
|
||||
done: grpcsync.NewEvent(),
|
||||
loadWrapper: loadstore.NewWrapper(),
|
||||
pickerUpdateCh: buffer.NewUnbounded(),
|
||||
requestCountMax: defaultRequestCountMax,
|
||||
ClientConn: cc,
|
||||
bOpts: bOpts,
|
||||
loadWrapper: loadstore.NewWrapper(),
|
||||
requestCountMax: defaultRequestCountMax,
|
||||
serializer: grpcsync.NewCallbackSerializer(ctx),
|
||||
serializerCancel: cancel,
|
||||
}
|
||||
b.logger = prefixLogger(b)
|
||||
b.child = gracefulswitch.NewBalancer(b, bOpts)
|
||||
go b.run()
|
||||
b.logger.Infof("Created")
|
||||
return b
|
||||
}
|
||||
@@ -89,18 +91,6 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
|
||||
type clusterImplBalancer struct {
|
||||
balancer.ClientConn
|
||||
|
||||
// mu guarantees mutual exclusion between Close() and handling of picker
|
||||
// update to the parent ClientConn in run(). It's to make sure that the
|
||||
// run() goroutine doesn't send picker update to parent after the balancer
|
||||
// is closed.
|
||||
//
|
||||
// It's only used by the run() goroutine, but not the other exported
|
||||
// functions. Because the exported functions are guaranteed to be
|
||||
// synchronized with Close().
|
||||
mu sync.Mutex
|
||||
closed *grpcsync.Event
|
||||
done *grpcsync.Event
|
||||
|
||||
bOpts balancer.BuildOptions
|
||||
logger *grpclog.PrefixLogger
|
||||
xdsClient xdsclient.XDSClient
|
||||
@@ -115,10 +105,11 @@ type clusterImplBalancer struct {
|
||||
clusterNameMu sync.Mutex
|
||||
clusterName string
|
||||
|
||||
serializer *grpcsync.CallbackSerializer
|
||||
serializerCancel context.CancelFunc
|
||||
|
||||
// childState/drops/requestCounter keeps the state used by the most recently
|
||||
// generated picker. All fields can only be accessed in run(). And run() is
|
||||
// the only goroutine that sends picker to the parent ClientConn. All
|
||||
// requests to update picker need to be sent to pickerUpdateCh.
|
||||
// generated picker.
|
||||
childState balancer.State
|
||||
dropCategories []DropConfig // The categories for drops.
|
||||
drops []*dropper
|
||||
@@ -127,7 +118,6 @@ type clusterImplBalancer struct {
|
||||
requestCounter *xdsclient.ClusterRequestsCounter
|
||||
requestCountMax uint32
|
||||
telemetryLabels map[string]string
|
||||
pickerUpdateCh *buffer.Unbounded
|
||||
}
|
||||
|
||||
// updateLoadStore checks the config for load store, and decides whether it
|
||||
@@ -208,14 +198,9 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
||||
if b.closed.HasFired() {
|
||||
b.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState) error {
|
||||
if b.logger.V(2) {
|
||||
b.logger.Infof("Received update from resolver, balancer config: %s", pretty.ToJSON(s.BalancerConfig))
|
||||
b.logger.Infof("Received configuration: %s", pretty.ToJSON(s.BalancerConfig))
|
||||
}
|
||||
newConfig, ok := s.BalancerConfig.(*LBConfig)
|
||||
if !ok {
|
||||
@@ -227,7 +212,7 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
|
||||
// it.
|
||||
bb := balancer.Get(newConfig.ChildPolicy.Name)
|
||||
if bb == nil {
|
||||
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
|
||||
return fmt.Errorf("child policy %q not registered", newConfig.ChildPolicy.Name)
|
||||
}
|
||||
|
||||
if b.xdsClient == nil {
|
||||
@@ -246,38 +231,56 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
|
||||
return err
|
||||
}
|
||||
|
||||
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
|
||||
if err := b.child.SwitchTo(bb); err != nil {
|
||||
return fmt.Errorf("error switching to child of type %q: %v", newConfig.ChildPolicy.Name, err)
|
||||
}
|
||||
// Build config for the gracefulswitch balancer. It is safe to ignore JSON
|
||||
// marshaling errors here, since the config was already validated as part of
|
||||
// ParseConfig().
|
||||
cfg := []map[string]any{{newConfig.ChildPolicy.Name: newConfig.ChildPolicy.Config}}
|
||||
cfgJSON, _ := json.Marshal(cfg)
|
||||
parsedCfg, err := gracefulswitch.ParseConfig(cfgJSON)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.config = newConfig
|
||||
|
||||
// Notify run() of this new config, in case drop and request counter need
|
||||
// update (which means a new picker needs to be generated).
|
||||
b.pickerUpdateCh.Put(newConfig)
|
||||
b.telemetryLabels = newConfig.TelemetryLabels
|
||||
dc := b.handleDropAndRequestCount(newConfig)
|
||||
if dc != nil && b.childState.Picker != nil {
|
||||
b.ClientConn.UpdateState(balancer.State{
|
||||
ConnectivityState: b.childState.ConnectivityState,
|
||||
Picker: b.newPicker(dc),
|
||||
})
|
||||
}
|
||||
|
||||
// Addresses and sub-balancer config are sent to sub-balancer.
|
||||
return b.child.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: s.ResolverState,
|
||||
BalancerConfig: b.config.ChildPolicy.Config,
|
||||
BalancerConfig: parsedCfg,
|
||||
})
|
||||
}
|
||||
|
||||
func (b *clusterImplBalancer) ResolverError(err error) {
|
||||
if b.closed.HasFired() {
|
||||
b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
|
||||
return
|
||||
func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
||||
// Handle the update in a blocking fashion.
|
||||
errCh := make(chan error, 1)
|
||||
callback := func(context.Context) {
|
||||
errCh <- b.updateClientConnState(s)
|
||||
}
|
||||
b.child.ResolverError(err)
|
||||
onFailure := func() {
|
||||
// An attempt to schedule callback fails only when an update is received
|
||||
// after Close().
|
||||
errCh <- errBalancerClosed
|
||||
}
|
||||
b.serializer.ScheduleOr(callback, onFailure)
|
||||
return <-errCh
|
||||
}
|
||||
|
||||
func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
|
||||
if b.closed.HasFired() {
|
||||
b.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s)
|
||||
return
|
||||
}
|
||||
func (b *clusterImplBalancer) ResolverError(err error) {
|
||||
b.serializer.TrySchedule(func(context.Context) {
|
||||
b.child.ResolverError(err)
|
||||
})
|
||||
}
|
||||
|
||||
func (b *clusterImplBalancer) updateSubConnState(_ balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
|
||||
// Trigger re-resolution when a SubConn turns transient failure. This is
|
||||
// necessary for the LogicalDNS in cluster_resolver policy to re-resolve.
|
||||
//
|
||||
@@ -299,26 +302,40 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer
|
||||
}
|
||||
|
||||
func (b *clusterImplBalancer) Close() {
|
||||
b.mu.Lock()
|
||||
b.closed.Fire()
|
||||
b.mu.Unlock()
|
||||
b.serializer.TrySchedule(func(_ context.Context) {
|
||||
b.child.Close()
|
||||
b.childState = balancer.State{}
|
||||
|
||||
b.child.Close()
|
||||
b.childState = balancer.State{}
|
||||
b.pickerUpdateCh.Close()
|
||||
<-b.done.Done()
|
||||
b.logger.Infof("Shutdown")
|
||||
if b.cancelLoadReport != nil {
|
||||
b.cancelLoadReport()
|
||||
b.cancelLoadReport = nil
|
||||
}
|
||||
b.logger.Infof("Shutdown")
|
||||
})
|
||||
b.serializerCancel()
|
||||
<-b.serializer.Done()
|
||||
}
|
||||
|
||||
func (b *clusterImplBalancer) ExitIdle() {
|
||||
b.child.ExitIdle()
|
||||
b.serializer.TrySchedule(func(context.Context) {
|
||||
b.child.ExitIdle()
|
||||
})
|
||||
}
|
||||
|
||||
// Override methods to accept updates from the child LB.
|
||||
|
||||
func (b *clusterImplBalancer) UpdateState(state balancer.State) {
|
||||
// Instead of updating parent ClientConn inline, send state to run().
|
||||
b.pickerUpdateCh.Put(state)
|
||||
b.serializer.TrySchedule(func(context.Context) {
|
||||
b.childState = state
|
||||
b.ClientConn.UpdateState(balancer.State{
|
||||
ConnectivityState: b.childState.ConnectivityState,
|
||||
Picker: b.newPicker(&dropConfigs{
|
||||
drops: b.drops,
|
||||
requestCounter: b.requestCounter,
|
||||
requestCountMax: b.requestCountMax,
|
||||
}),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (b *clusterImplBalancer) setClusterName(n string) {
|
||||
@@ -370,21 +387,23 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer
|
||||
scw := &scWrapper{}
|
||||
oldListener := opts.StateListener
|
||||
opts.StateListener = func(state balancer.SubConnState) {
|
||||
b.updateSubConnState(sc, state, oldListener)
|
||||
if state.ConnectivityState != connectivity.Ready {
|
||||
return
|
||||
}
|
||||
// Read connected address and call updateLocalityID() based on the connected
|
||||
// address's locality. https://github.com/grpc/grpc-go/issues/7339
|
||||
addr := connectedAddress(state)
|
||||
lID := xdsinternal.GetLocalityID(addr)
|
||||
if lID.Empty() {
|
||||
if b.logger.V(2) {
|
||||
b.logger.Infof("Locality ID for %s unexpectedly empty", addr)
|
||||
b.serializer.TrySchedule(func(context.Context) {
|
||||
b.updateSubConnState(sc, state, oldListener)
|
||||
if state.ConnectivityState != connectivity.Ready {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
scw.updateLocalityID(lID)
|
||||
// Read connected address and call updateLocalityID() based on the connected
|
||||
// address's locality. https://github.com/grpc/grpc-go/issues/7339
|
||||
addr := connectedAddress(state)
|
||||
lID := xdsinternal.GetLocalityID(addr)
|
||||
if lID.Empty() {
|
||||
if b.logger.V(2) {
|
||||
b.logger.Infof("Locality ID for %s unexpectedly empty", addr)
|
||||
}
|
||||
return
|
||||
}
|
||||
scw.updateLocalityID(lID)
|
||||
})
|
||||
}
|
||||
sc, err := b.ClientConn.NewSubConn(newAddrs, opts)
|
||||
if err != nil {
|
||||
@@ -464,49 +483,3 @@ func (b *clusterImplBalancer) handleDropAndRequestCount(newConfig *LBConfig) *dr
|
||||
requestCountMax: b.requestCountMax,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *clusterImplBalancer) run() {
|
||||
defer b.done.Fire()
|
||||
for {
|
||||
select {
|
||||
case update, ok := <-b.pickerUpdateCh.Get():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
b.pickerUpdateCh.Load()
|
||||
b.mu.Lock()
|
||||
if b.closed.HasFired() {
|
||||
b.mu.Unlock()
|
||||
return
|
||||
}
|
||||
switch u := update.(type) {
|
||||
case balancer.State:
|
||||
b.childState = u
|
||||
b.ClientConn.UpdateState(balancer.State{
|
||||
ConnectivityState: b.childState.ConnectivityState,
|
||||
Picker: b.newPicker(&dropConfigs{
|
||||
drops: b.drops,
|
||||
requestCounter: b.requestCounter,
|
||||
requestCountMax: b.requestCountMax,
|
||||
}),
|
||||
})
|
||||
case *LBConfig:
|
||||
b.telemetryLabels = u.TelemetryLabels
|
||||
dc := b.handleDropAndRequestCount(u)
|
||||
if dc != nil && b.childState.Picker != nil {
|
||||
b.ClientConn.UpdateState(balancer.State{
|
||||
ConnectivityState: b.childState.ConnectivityState,
|
||||
Picker: b.newPicker(dc),
|
||||
})
|
||||
}
|
||||
}
|
||||
b.mu.Unlock()
|
||||
case <-b.closed.Done():
|
||||
if b.cancelLoadReport != nil {
|
||||
b.cancelLoadReport()
|
||||
b.cancelLoadReport = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -134,7 +134,7 @@ func (bb) ParseConfig(j json.RawMessage) (serviceconfig.LoadBalancingConfig, err
|
||||
// This will never occur, valid configuration is emitted from the xDS
|
||||
// Client. Validity is already checked in the xDS Client, however, this
|
||||
// double validation is present because Unmarshalling and Validating are
|
||||
// coupled into one json.Unmarshal operation). We will switch this in
|
||||
// coupled into one json.Unmarshal operation. We will switch this in
|
||||
// the future to two separate operations.
|
||||
return nil, fmt.Errorf("error unmarshalling xDS LB Policy: %v", err)
|
||||
}
|
||||
@@ -216,7 +216,7 @@ func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) {
|
||||
b.updateChildConfig()
|
||||
|
||||
if update.onDone != nil {
|
||||
update.onDone.OnDone()
|
||||
update.onDone()
|
||||
}
|
||||
}
|
||||
|
||||
|
4
vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go
generated
vendored
4
vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go
generated
vendored
@@ -37,7 +37,7 @@ import (
|
||||
|
||||
const million = 1000000
|
||||
|
||||
// priorityConfig is config for one priority. For example, if there an EDS and a
|
||||
// priorityConfig is config for one priority. For example, if there's an EDS and a
|
||||
// DNS, the priority list will be [priorityConfig{EDS}, priorityConfig{DNS}].
|
||||
//
|
||||
// Each priorityConfig corresponds to one discovery mechanism from the LBConfig
|
||||
@@ -171,7 +171,7 @@ func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.Endpoint
|
||||
}
|
||||
|
||||
// Localities of length 0 is triggered by an NACK or resource-not-found
|
||||
// error before update, or a empty localities list in a update. In either
|
||||
// error before update, or an empty localities list in an update. In either
|
||||
// case want to create a priority, and send down empty address list, causing
|
||||
// TF for that priority. "If any discovery mechanism instance experiences an
|
||||
// error retrieving data, and it has not previously reported any results, it
|
||||
|
20
vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go
generated
vendored
20
vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go
generated
vendored
@@ -37,7 +37,7 @@ type resourceUpdate struct {
|
||||
priorities []priorityConfig
|
||||
// To be invoked once the update is completely processed, or is dropped in
|
||||
// favor of a newer update.
|
||||
onDone xdsresource.DoneNotifier
|
||||
onDone xdsresource.OnDoneFunc
|
||||
}
|
||||
|
||||
// topLevelResolver is used by concrete endpointsResolver implementations for
|
||||
@@ -49,7 +49,7 @@ type topLevelResolver interface {
|
||||
// endpointsResolver implementation. The onDone callback is to be invoked
|
||||
// once the update is completely processed, or is dropped in favor of a
|
||||
// newer update.
|
||||
onUpdate(onDone xdsresource.DoneNotifier)
|
||||
onUpdate(onDone xdsresource.OnDoneFunc)
|
||||
}
|
||||
|
||||
// endpointsResolver wraps the functionality to resolve a given resource name to
|
||||
@@ -77,7 +77,7 @@ type endpointsResolver interface {
|
||||
// discoveryMechanismKey is {type+resource_name}, it's used as the map key, so
|
||||
// that the same resource resolver can be reused (e.g. when there are two
|
||||
// mechanisms, both for the same EDS resource, but has different circuit
|
||||
// breaking config.
|
||||
// breaking config).
|
||||
type discoveryMechanismKey struct {
|
||||
typ DiscoveryMechanismType
|
||||
name string
|
||||
@@ -215,7 +215,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
|
||||
}
|
||||
// Regenerate even if there's no change in discovery mechanism, in case
|
||||
// priority order changed.
|
||||
rr.generateLocked(xdsresource.NopDoneNotifier{})
|
||||
rr.generateLocked(func() {})
|
||||
}
|
||||
|
||||
// resolveNow is typically called to trigger re-resolve of DNS. The EDS
|
||||
@@ -264,7 +264,7 @@ func (rr *resourceResolver) stop(closing bool) {
|
||||
select {
|
||||
case ru := <-rr.updateChannel:
|
||||
if ru.onDone != nil {
|
||||
ru.onDone.OnDone()
|
||||
ru.onDone()
|
||||
}
|
||||
default:
|
||||
}
|
||||
@@ -281,14 +281,14 @@ func (rr *resourceResolver) stop(closing bool) {
|
||||
// clusterresolver LB policy.
|
||||
//
|
||||
// Caller must hold rr.mu.
|
||||
func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) {
|
||||
func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) {
|
||||
var ret []priorityConfig
|
||||
for _, rDM := range rr.children {
|
||||
u, ok := rDM.r.lastUpdate()
|
||||
if !ok {
|
||||
// Don't send updates to parent until all resolvers have update to
|
||||
// send.
|
||||
onDone.OnDone()
|
||||
onDone()
|
||||
return
|
||||
}
|
||||
switch uu := u.(type) {
|
||||
@@ -304,18 +304,18 @@ func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) {
|
||||
// receive path.
|
||||
case ru := <-rr.updateChannel:
|
||||
if ru.onDone != nil {
|
||||
ru.onDone.OnDone()
|
||||
ru.onDone()
|
||||
}
|
||||
default:
|
||||
}
|
||||
rr.updateChannel <- &resourceUpdate{priorities: ret, onDone: onDone}
|
||||
}
|
||||
|
||||
func (rr *resourceResolver) onUpdate(onDone xdsresource.DoneNotifier) {
|
||||
func (rr *resourceResolver) onUpdate(onDone xdsresource.OnDoneFunc) {
|
||||
handleUpdate := func(context.Context) {
|
||||
rr.mu.Lock()
|
||||
rr.generateLocked(onDone)
|
||||
rr.mu.Unlock()
|
||||
}
|
||||
rr.serializer.ScheduleOr(handleUpdate, func() { onDone.OnDone() })
|
||||
rr.serializer.ScheduleOr(handleUpdate, func() { onDone() })
|
||||
}
|
||||
|
@@ -27,7 +27,6 @@ import (
|
||||
"google.golang.org/grpc/internal/pretty"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -80,7 +79,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
|
||||
ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target)
|
||||
}
|
||||
ret.updateReceived = true
|
||||
ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
|
||||
ret.topLevelResolver.onUpdate(func() {})
|
||||
return ret
|
||||
}
|
||||
|
||||
@@ -90,7 +89,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
|
||||
ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err)
|
||||
}
|
||||
ret.updateReceived = true
|
||||
ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
|
||||
ret.topLevelResolver.onUpdate(func() {})
|
||||
return ret
|
||||
}
|
||||
ret.dnsR = r
|
||||
@@ -154,7 +153,7 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
|
||||
dr.updateReceived = true
|
||||
dr.mu.Unlock()
|
||||
|
||||
dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
|
||||
dr.topLevelResolver.onUpdate(func() {})
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -177,7 +176,7 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) {
|
||||
dr.updateReceived = true
|
||||
dr.mu.Unlock()
|
||||
|
||||
dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
|
||||
dr.topLevelResolver.onUpdate(func() {})
|
||||
}
|
||||
|
||||
func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) {
|
||||
|
@@ -76,9 +76,9 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
|
||||
}
|
||||
|
||||
// OnUpdate is invoked to report an update for the resource being watched.
|
||||
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) {
|
||||
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
|
||||
if er.stopped.HasFired() {
|
||||
onDone.OnDone()
|
||||
onDone()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -89,9 +89,9 @@ func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceD
|
||||
er.topLevelResolver.onUpdate(onDone)
|
||||
}
|
||||
|
||||
func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotifier) {
|
||||
func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
|
||||
if er.stopped.HasFired() {
|
||||
onDone.OnDone()
|
||||
onDone()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -104,7 +104,7 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotif
|
||||
// Continue using a previously received good configuration if one
|
||||
// exists.
|
||||
er.mu.Unlock()
|
||||
onDone.OnDone()
|
||||
onDone()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -120,9 +120,9 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotif
|
||||
er.topLevelResolver.onUpdate(onDone)
|
||||
}
|
||||
|
||||
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
|
||||
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
|
||||
if er.stopped.HasFired() {
|
||||
onDone.OnDone()
|
||||
onDone()
|
||||
return
|
||||
}
|
||||
|
||||
|
16
vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go
generated
vendored
16
vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go
generated
vendored
@@ -592,20 +592,6 @@ func (b *outlierDetectionBalancer) Target() string {
|
||||
return b.cc.Target()
|
||||
}
|
||||
|
||||
func max(x, y time.Duration) time.Duration {
|
||||
if x < y {
|
||||
return y
|
||||
}
|
||||
return x
|
||||
}
|
||||
|
||||
func min(x, y time.Duration) time.Duration {
|
||||
if x < y {
|
||||
return x
|
||||
}
|
||||
return y
|
||||
}
|
||||
|
||||
// handleSubConnUpdate stores the recent state and forward the update
|
||||
// if the SubConn is not ejected.
|
||||
func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) {
|
||||
@@ -671,7 +657,7 @@ func (b *outlierDetectionBalancer) handleChildStateUpdate(u balancer.State) {
|
||||
func (b *outlierDetectionBalancer) handleLBConfigUpdate(u lbCfgUpdate) {
|
||||
lbCfg := u.lbCfg
|
||||
noopCfg := lbCfg.SuccessRateEjection == nil && lbCfg.FailurePercentageEjection == nil
|
||||
// If the child has sent it's first update and this config flips the noop
|
||||
// If the child has sent its first update and this config flips the noop
|
||||
// bit compared to the most recent picker update sent upward, then a new
|
||||
// picker with this updated bit needs to be forwarded upward. If a child
|
||||
// update was received during the suppression of child updates within
|
||||
|
25
vendor/google.golang.org/grpc/xds/internal/balancer/ringhash/picker.go
generated
vendored
25
vendor/google.golang.org/grpc/xds/internal/balancer/ringhash/picker.go
generated
vendored
@@ -159,28 +159,3 @@ func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry {
|
||||
// There's no qualifying next entry.
|
||||
return nil
|
||||
}
|
||||
|
||||
// nextSkippingDuplicatesSubConn finds the next subconn in the ring, that's
|
||||
// different from the given subconn.
|
||||
func nextSkippingDuplicatesSubConn(ring *ring, sc *subConn) *subConn {
|
||||
var entry *ringEntry
|
||||
for _, it := range ring.items {
|
||||
if it.sc == sc {
|
||||
entry = it
|
||||
break
|
||||
}
|
||||
}
|
||||
if entry == nil {
|
||||
// If the given subconn is not in the ring (e.g. it was deleted), return
|
||||
// the first one.
|
||||
if len(ring.items) > 0 {
|
||||
return ring.items[0].sc
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ee := nextSkippingDuplicates(ring, entry)
|
||||
if ee == nil {
|
||||
return nil
|
||||
}
|
||||
return ee.sc
|
||||
}
|
||||
|
45
vendor/google.golang.org/grpc/xds/internal/balancer/ringhash/ringhash.go
generated
vendored
45
vendor/google.golang.org/grpc/xds/internal/balancer/ringhash/ringhash.go
generated
vendored
@@ -44,12 +44,13 @@ func init() {
|
||||
|
||||
type bb struct{}
|
||||
|
||||
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
|
||||
func (bb) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
|
||||
b := &ringhashBalancer{
|
||||
cc: cc,
|
||||
subConns: resolver.NewAddressMap(),
|
||||
scStates: make(map[balancer.SubConn]*subConn),
|
||||
csEvltr: &connectivityStateEvaluator{},
|
||||
cc: cc,
|
||||
subConns: resolver.NewAddressMap(),
|
||||
scStates: make(map[balancer.SubConn]*subConn),
|
||||
csEvltr: &connectivityStateEvaluator{},
|
||||
orderedSubConns: make([]*subConn, 0),
|
||||
}
|
||||
b.logger = prefixLogger(b)
|
||||
b.logger.Infof("Created")
|
||||
@@ -197,6 +198,14 @@ type ringhashBalancer struct {
|
||||
|
||||
resolverErr error // the last error reported by the resolver; cleared on successful resolution
|
||||
connErr error // the last connection error; cleared upon leaving TransientFailure
|
||||
|
||||
// orderedSubConns contains the list of subconns in the order that addresses
|
||||
// appear from the resolver. Together with lastInternallyTriggeredSCIndex,
|
||||
// this allows triggering connection attempts to all SubConns independently
|
||||
// of the order they appear on the ring. Always in sync with ring and
|
||||
// subConns. The index is reset when addresses change.
|
||||
orderedSubConns []*subConn
|
||||
lastInternallyTriggeredSCIndex int
|
||||
}
|
||||
|
||||
// updateAddresses creates new SubConns and removes SubConns, based on the
|
||||
@@ -214,6 +223,9 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
|
||||
var addrsUpdated bool
|
||||
// addrsSet is the set converted from addrs, used for quick lookup.
|
||||
addrsSet := resolver.NewAddressMap()
|
||||
|
||||
b.orderedSubConns = b.orderedSubConns[:0] // reuse the underlying array.
|
||||
|
||||
for _, addr := range addrs {
|
||||
addrsSet.Set(addr, true)
|
||||
newWeight := getWeightAttribute(addr)
|
||||
@@ -234,6 +246,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
|
||||
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
|
||||
b.subConns.Set(addr, scs)
|
||||
b.scStates[sc] = scs
|
||||
b.orderedSubConns = append(b.orderedSubConns, scs)
|
||||
addrsUpdated = true
|
||||
} else {
|
||||
// We have seen this address before and created a subConn for it. If the
|
||||
@@ -244,6 +257,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
|
||||
// since *only* the weight attribute has changed, and that does not affect
|
||||
// subConn uniqueness.
|
||||
scInfo := val.(*subConn)
|
||||
b.orderedSubConns = append(b.orderedSubConns, scInfo)
|
||||
if oldWeight := scInfo.weight; oldWeight != newWeight {
|
||||
scInfo.weight = newWeight
|
||||
b.subConns.Set(addr, scInfo)
|
||||
@@ -264,6 +278,9 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
|
||||
// The entry will be deleted in updateSubConnState.
|
||||
}
|
||||
}
|
||||
if addrsUpdated {
|
||||
b.lastInternallyTriggeredSCIndex = 0
|
||||
}
|
||||
return addrsUpdated
|
||||
}
|
||||
|
||||
@@ -399,19 +416,11 @@ func (b *ringhashBalancer) updateSubConnState(sc balancer.SubConn, state balance
|
||||
return
|
||||
}
|
||||
}
|
||||
// Trigger a SubConn (this updated SubConn's next SubConn in the ring)
|
||||
// to connect if nobody is attempting to connect.
|
||||
sc := nextSkippingDuplicatesSubConn(b.ring, scs)
|
||||
if sc != nil {
|
||||
sc.queueConnect()
|
||||
return
|
||||
}
|
||||
// This handles the edge case where we have a single subConn in the
|
||||
// ring. nextSkippingDuplicatesSubCon() would have returned nil. We
|
||||
// still need to ensure that some subConn is attempting to connect, in
|
||||
// order to give the LB policy a chance to move out of
|
||||
// TRANSIENT_FAILURE. Hence, we try connecting on the current subConn.
|
||||
scs.queueConnect()
|
||||
|
||||
// Trigger a SubConn (the next in the order addresses appear in the
|
||||
// resolver) to connect if nobody is attempting to connect.
|
||||
b.lastInternallyTriggeredSCIndex = (b.lastInternallyTriggeredSCIndex + 1) % len(b.orderedSubConns)
|
||||
b.orderedSubConns[b.lastInternallyTriggeredSCIndex].queueConnect()
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user