Merge pull request #107631 from lzhecheng/fix-retry-svc-to-update

Avoid updating Services with stale specs
This commit is contained in:
Kubernetes Prow Robot 2022-04-05 08:23:37 -07:00 committed by GitHub
commit 68d26c868f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 40 deletions

View File

@ -75,7 +75,7 @@ type serviceCache struct {
type Controller struct {
cloud cloudprovider.Interface
knownHosts []*v1.Node
servicesToUpdate []*v1.Service
servicesToUpdate sets.String
kubeClient clientset.Interface
clusterName string
balancer cloudprovider.LoadBalancer
@ -735,16 +735,28 @@ func (s *Controller) nodeSyncInternal(ctx context.Context, workers int) {
if !s.needFullSyncAndUnmark() {
// The set of nodes in the cluster hasn't changed, but we can retry
// updating any services that we failed to update last time around.
s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, s.servicesToUpdate, workers)
// It is required to call `s.cache.get()` on each Service in case there was
// an update event that occurred between retries.
var servicesToUpdate []*v1.Service
for key := range s.servicesToUpdate {
cachedService, exist := s.cache.get(key)
if !exist {
klog.Errorf("Service %q should be in the cache but not", key)
continue
}
servicesToUpdate = append(servicesToUpdate, cachedService.state)
}
s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
return
}
klog.V(2).Infof("Syncing backends for all LB services.")
// Try updating all services, and save the ones that fail to try again next
// Try updating all services, and save the failed ones to try again next
// round.
s.servicesToUpdate = s.cache.allServices()
numServices := len(s.servicesToUpdate)
s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, s.servicesToUpdate, workers)
servicesToUpdate := s.cache.allServices()
numServices := len(servicesToUpdate)
s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
numServices-len(s.servicesToUpdate), numServices)
}
@ -772,10 +784,11 @@ func (s *Controller) nodeSyncService(svc *v1.Service) bool {
// updateLoadBalancerHosts updates all existing load balancers so that
// they will match the latest list of nodes with input number of workers.
// Returns the list of services that couldn't be updated.
func (s *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry []*v1.Service) {
func (s *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
// lock for servicesToRetry
servicesToRetry = sets.NewString()
lock := sync.Mutex{}
doWork := func(piece int) {
if shouldRetry := s.nodeSyncService(services[piece]); !shouldRetry {
@ -783,7 +796,8 @@ func (s *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
}
lock.Lock()
defer lock.Unlock()
servicesToRetry = append(servicesToRetry, services[piece])
key := fmt.Sprintf("%s/%s", services[piece].Namespace, services[piece].Name)
servicesToRetry.Insert(key)
}
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
@ -44,6 +45,8 @@ import (
fakecloud "k8s.io/cloud-provider/fake"
servicehelper "k8s.io/cloud-provider/service/helpers"
utilpointer "k8s.io/utils/pointer"
"github.com/stretchr/testify/assert"
)
const region = "us-central"
@ -548,7 +551,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
controller, cloud, _ := newController()
controller.nodeLister = newFakeNodeLister(nil, nodes...)
if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); servicesToRetry != nil {
if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 {
t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry)
}
compareUpdateCalls(t, item.expectedUpdateCalls, cloud.UpdateCalls)
@ -569,6 +572,11 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
newService("s4", "123", v1.ServiceTypeLoadBalancer),
}
serviceNames := sets.NewString()
for _, svc := range services {
serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName()))
}
controller, cloud, _ := newController()
for _, tc := range []struct {
desc string
@ -576,7 +584,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
expectedUpdateCalls []fakecloud.UpdateBalancerCall
worker int
nodeListerErr error
expectedRetryServices []*v1.Service
expectedRetryServices sets.String
}{
{
desc: "only 1 node",
@ -589,7 +597,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
},
worker: 3,
nodeListerErr: nil,
expectedRetryServices: []*v1.Service{},
expectedRetryServices: sets.NewString(),
},
{
desc: "2 nodes",
@ -602,7 +610,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
},
worker: 1,
nodeListerErr: nil,
expectedRetryServices: []*v1.Service{},
expectedRetryServices: sets.NewString(),
},
{
desc: "4 nodes",
@ -615,7 +623,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
},
worker: 3,
nodeListerErr: nil,
expectedRetryServices: []*v1.Service{},
expectedRetryServices: sets.NewString(),
},
{
desc: "error occur during sync",
@ -623,7 +631,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
worker: 3,
nodeListerErr: fmt.Errorf("random error"),
expectedRetryServices: services,
expectedRetryServices: serviceNames,
},
{
desc: "error occur during sync with 1 workers",
@ -631,7 +639,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
worker: 1,
nodeListerErr: fmt.Errorf("random error"),
expectedRetryServices: services,
expectedRetryServices: serviceNames,
},
} {
t.Run(tc.desc, func(t *testing.T) {
@ -639,37 +647,13 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
defer cancel()
controller.nodeLister = newFakeNodeLister(tc.nodeListerErr, tc.nodes...)
servicesToRetry := controller.updateLoadBalancerHosts(ctx, services, tc.worker)
compareServiceList(t, tc.expectedRetryServices, servicesToRetry)
assert.Truef(t, tc.expectedRetryServices.Equal(servicesToRetry), "Services to retry are not expected")
compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls)
cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{}
})
}
}
// compareServiceList compares if both left and right inputs contains the same service list despite the order.
func compareServiceList(t *testing.T, left, right []*v1.Service) {
if len(left) != len(right) {
t.Errorf("expect len(left) == len(right), but got %v != %v", len(left), len(right))
}
mismatch := false
for _, l := range left {
found := false
for _, r := range right {
if reflect.DeepEqual(l, r) {
found = true
}
}
if !found {
mismatch = true
break
}
}
if mismatch {
t.Errorf("expected service list to match, expected %+v, got %+v", left, right)
}
}
// compareUpdateCalls compares if the same update calls were made in both left and right inputs despite the order.
func compareUpdateCalls(t *testing.T, left, right []fakecloud.UpdateBalancerCall) {
if len(left) != len(right) {