diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 7f27acf6ab6..c57a3d9a314 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -75,7 +75,7 @@ type serviceCache struct { type Controller struct { cloud cloudprovider.Interface knownHosts []*v1.Node - servicesToUpdate []*v1.Service + servicesToUpdate sets.String kubeClient clientset.Interface clusterName string balancer cloudprovider.LoadBalancer @@ -735,16 +735,28 @@ func (s *Controller) nodeSyncInternal(ctx context.Context, workers int) { if !s.needFullSyncAndUnmark() { // The set of nodes in the cluster hasn't changed, but we can retry // updating any services that we failed to update last time around. - s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, s.servicesToUpdate, workers) + // It is required to call `s.cache.get()` on each Service in case there was + // an update event that occurred between retries. + var servicesToUpdate []*v1.Service + for key := range s.servicesToUpdate { + cachedService, exist := s.cache.get(key) + if !exist { + klog.Errorf("Service %q should be in the cache but not", key) + continue + } + servicesToUpdate = append(servicesToUpdate, cachedService.state) + } + + s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) return } klog.V(2).Infof("Syncing backends for all LB services.") - // Try updating all services, and save the ones that fail to try again next + // Try updating all services, and save the failed ones to try again next // round. - s.servicesToUpdate = s.cache.allServices() - numServices := len(s.servicesToUpdate) - s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, s.servicesToUpdate, workers) + servicesToUpdate := s.cache.allServices() + numServices := len(servicesToUpdate) + s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", numServices-len(s.servicesToUpdate), numServices) } @@ -772,10 +784,11 @@ func (s *Controller) nodeSyncService(svc *v1.Service) bool { // updateLoadBalancerHosts updates all existing load balancers so that // they will match the latest list of nodes with input number of workers. // Returns the list of services that couldn't be updated. -func (s *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry []*v1.Service) { +func (s *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) // lock for servicesToRetry + servicesToRetry = sets.NewString() lock := sync.Mutex{} doWork := func(piece int) { if shouldRetry := s.nodeSyncService(services[piece]); !shouldRetry { @@ -783,7 +796,8 @@ func (s *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 } lock.Lock() defer lock.Unlock() - servicesToRetry = append(servicesToRetry, services[piece]) + key := fmt.Sprintf("%s/%s", services[piece].Namespace, services[piece].Name) + servicesToRetry.Insert(key) } workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index bce21cc051f..866183aeed6 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" @@ -44,6 +45,8 @@ import ( fakecloud "k8s.io/cloud-provider/fake" servicehelper "k8s.io/cloud-provider/service/helpers" utilpointer "k8s.io/utils/pointer" + + "github.com/stretchr/testify/assert" ) const region = "us-central" @@ -548,7 +551,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { controller, cloud, _ := newController() controller.nodeLister = newFakeNodeLister(nil, nodes...) - if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); servicesToRetry != nil { + if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 { t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) } compareUpdateCalls(t, item.expectedUpdateCalls, cloud.UpdateCalls) @@ -569,6 +572,11 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { newService("s4", "123", v1.ServiceTypeLoadBalancer), } + serviceNames := sets.NewString() + for _, svc := range services { + serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName())) + } + controller, cloud, _ := newController() for _, tc := range []struct { desc string @@ -576,7 +584,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { expectedUpdateCalls []fakecloud.UpdateBalancerCall worker int nodeListerErr error - expectedRetryServices []*v1.Service + expectedRetryServices sets.String }{ { desc: "only 1 node", @@ -589,7 +597,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { }, worker: 3, nodeListerErr: nil, - expectedRetryServices: []*v1.Service{}, + expectedRetryServices: sets.NewString(), }, { desc: "2 nodes", @@ -602,7 +610,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { }, worker: 1, nodeListerErr: nil, - expectedRetryServices: []*v1.Service{}, + expectedRetryServices: sets.NewString(), }, { desc: "4 nodes", @@ -615,7 +623,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { }, worker: 3, nodeListerErr: nil, - expectedRetryServices: []*v1.Service{}, + expectedRetryServices: sets.NewString(), }, { desc: "error occur during sync", @@ -623,7 +631,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, worker: 3, nodeListerErr: fmt.Errorf("random error"), - expectedRetryServices: services, + expectedRetryServices: serviceNames, }, { desc: "error occur during sync with 1 workers", @@ -631,7 +639,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, worker: 1, nodeListerErr: fmt.Errorf("random error"), - expectedRetryServices: services, + expectedRetryServices: serviceNames, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -639,37 +647,13 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { defer cancel() controller.nodeLister = newFakeNodeLister(tc.nodeListerErr, tc.nodes...) servicesToRetry := controller.updateLoadBalancerHosts(ctx, services, tc.worker) - compareServiceList(t, tc.expectedRetryServices, servicesToRetry) + assert.Truef(t, tc.expectedRetryServices.Equal(servicesToRetry), "Services to retry are not expected") compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls) cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{} }) } } -// compareServiceList compares if both left and right inputs contains the same service list despite the order. -func compareServiceList(t *testing.T, left, right []*v1.Service) { - if len(left) != len(right) { - t.Errorf("expect len(left) == len(right), but got %v != %v", len(left), len(right)) - } - - mismatch := false - for _, l := range left { - found := false - for _, r := range right { - if reflect.DeepEqual(l, r) { - found = true - } - } - if !found { - mismatch = true - break - } - } - if mismatch { - t.Errorf("expected service list to match, expected %+v, got %+v", left, right) - } -} - // compareUpdateCalls compares if the same update calls were made in both left and right inputs despite the order. func compareUpdateCalls(t *testing.T, left, right []fakecloud.UpdateBalancerCall) { if len(left) != len(right) {