From 4af1f4b30bbe024e64cf69998e96cf24f659f0f4 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 27 Jan 2021 13:57:27 -0800 Subject: [PATCH] Optimize and parallelize LoadBalancer Host update --- .../controllers/service/controller.go | 144 ++++++--- .../controllers/service/controller_test.go | 287 ++++++++++++++++-- .../src/k8s.io/cloud-provider/fake/fake.go | 2 + 3 files changed, 376 insertions(+), 57 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 8ecbaf7bc80..92107dab417 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -72,7 +72,6 @@ type serviceCache struct { type Controller struct { cloud cloudprovider.Interface knownHosts []*v1.Node - knownHostsLock sync.Mutex servicesToUpdate []*v1.Service kubeClient clientset.Interface clusterName string @@ -87,6 +86,14 @@ type Controller struct { nodeListerSynced cache.InformerSynced // services that need to be synced queue workqueue.RateLimitingInterface + + // nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time + // and protects internal states (needFullSync) of nodeSync + nodeSyncLock sync.Mutex + // nodeSyncCh triggers nodeSyncLoop to run + nodeSyncCh chan interface{} + // needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services. + needFullSync bool } // New returns a new service controller to keep cloud provider service resources @@ -122,6 +129,8 @@ func New( nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + // nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached. + nodeSyncCh: make(chan interface{}, 1), } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -152,7 +161,7 @@ func New( nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(cur interface{}) { - s.nodeSyncLoop() + s.triggerNodeSync() }, UpdateFunc: func(old, cur interface{}) { oldNode, ok := old.(*v1.Node) @@ -169,10 +178,10 @@ func New( return } - s.nodeSyncLoop() + s.triggerNodeSync() }, DeleteFunc: func(old interface{}) { - s.nodeSyncLoop() + s.triggerNodeSync() }, }, time.Duration(0), @@ -185,6 +194,15 @@ func New( return s, nil } +// needFullSyncAndUnmark returns the value and needFullSync and marks the field to false. +func (s *Controller) needFullSyncAndUnmark() bool { + s.nodeSyncLock.Lock() + defer s.nodeSyncLock.Unlock() + ret := s.needFullSync + s.needFullSync = false + return ret +} + // obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. func (s *Controller) enqueueService(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) @@ -220,11 +238,42 @@ func (s *Controller) Run(stopCh <-chan struct{}, workers int) { go wait.Until(s.worker, time.Second, stopCh) } - go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh) + go s.nodeSyncLoop(workers) + go wait.Until(s.triggerNodeSync, nodeSyncPeriod, stopCh) <-stopCh } +// triggerNodeSync triggers a nodeSync asynchronously +func (s *Controller) triggerNodeSync() { + s.nodeSyncLock.Lock() + defer s.nodeSyncLock.Unlock() + newHosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) + // if node list cannot be retrieve, trigger full node sync to be safe. + s.needFullSync = true + } else if !nodeSlicesEqualForLB(newHosts, s.knownHosts) { + // Here the last known state is recorded as knownHosts. For each + // LB update, the latest node list is retrieved. This is to prevent + // a stale set of nodes were used to be update loadbalancers when + // there are many loadbalancers in the clusters. nodeSyncInternal + // would be triggered until all loadbalancers are updated to the new state. + klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services") + s.needFullSync = true + s.knownHosts = newHosts + } + + select { + case s.nodeSyncCh <- struct{}{}: + klog.V(4).Info("Triggering nodeSync") + return + default: + klog.V(4).Info("A pending nodeSync is already in queue") + return + } +} + // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (s *Controller) worker() { @@ -232,6 +281,16 @@ func (s *Controller) worker() { } } +// nodeSyncLoop takes nodeSync signal and triggers nodeSync +func (s *Controller) nodeSyncLoop(workers int) { + klog.V(4).Info("nodeSyncLoop Started") + for range s.nodeSyncCh { + klog.V(4).Info("nodeSync has been triggered") + s.nodeSyncInternal(workers) + } + klog.V(2).Info("s.nodeSyncCh is closed. Exiting nodeSyncLoop") +} + func (s *Controller) processNextWorkItem() bool { key, quit := s.queue.Get() if quit { @@ -652,68 +711,78 @@ func nodeReadyConditionStatus(node *v1.Node) v1.ConditionStatus { return "" } -// nodeSyncLoop handles updating the hosts pointed to by all load +// nodeSyncInternal handles updating the hosts pointed to by all load // balancers whenever the set of nodes in the cluster changes. -func (s *Controller) nodeSyncLoop() { - s.knownHostsLock.Lock() - defer s.knownHostsLock.Unlock() +func (s *Controller) nodeSyncInternal(workers int) { startTime := time.Now() defer func() { latency := time.Now().Sub(startTime).Seconds() - klog.V(4).Infof("It took %v seconds to finish nodeSyncLoop", latency) + klog.V(4).Infof("It took %v seconds to finish nodeSyncInternal", latency) nodeSyncLatency.Observe(latency) }() - newHosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) - if err != nil { - runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) - return - } - if nodeSlicesEqualForLB(newHosts, s.knownHosts) { + if !s.needFullSyncAndUnmark() { // The set of nodes in the cluster hasn't changed, but we can retry // updating any services that we failed to update last time around. - s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) + s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, workers) return } - - klog.V(2).Infof("Detected change in list of current cluster nodes. New node set: %v", - nodeNames(newHosts)) + klog.V(2).Infof("Syncing backends for all LB services.") // Try updating all services, and save the ones that fail to try again next // round. s.servicesToUpdate = s.cache.allServices() numServices := len(s.servicesToUpdate) - s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) + s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, workers) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", numServices-len(s.servicesToUpdate), numServices) +} - s.knownHosts = newHosts +// nodeSyncService syncs the nodes for one load balancer type service +func (s *Controller) nodeSyncService(svc *v1.Service) bool { + if svc == nil || !wantsLoadBalancer(svc) { + return false + } + klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) + hosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) + return true + } + + if err := s.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil { + runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err)) + return true + } + klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name) + return false } // updateLoadBalancerHosts updates all existing load balancers so that -// they will match the list of hosts provided. +// they will match the latest list of nodes with input number of workers. // Returns the list of services that couldn't be updated. -func (s *Controller) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) { - for _, service := range services { - func() { - if service == nil { - return - } - if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil { - runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", service.Namespace, service.Name, err)) - servicesToRetry = append(servicesToRetry, service) - } - }() +func (s *Controller) updateLoadBalancerHosts(services []*v1.Service, workers int) (servicesToRetry []*v1.Service) { + klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) + + // lock for servicesToRetry + lock := sync.Mutex{} + doWork := func(piece int) { + if shouldRetry := s.nodeSyncService(services[piece]); !shouldRetry { + return + } + lock.Lock() + defer lock.Unlock() + servicesToRetry = append(servicesToRetry, services[piece]) } + + workqueue.ParallelizeUntil(context.TODO(), workers, len(services), doWork) + klog.V(4).Infof("Finished updateLoadBalancerHosts") return servicesToRetry } // Updates the load balancer of a service, assuming we hold the mutex // associated with the service. func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { - if !wantsLoadBalancer(service) { - return nil - } startTime := time.Now() defer func() { latency := time.Now().Sub(startTime).Seconds() @@ -721,6 +790,7 @@ func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts [] updateLoadBalancerHostLatency.Observe(latency) }() + klog.V(2).Infof("Updating backends for load balancer %s/%s with node set: %v", service.Namespace, service.Name, nodeNames(hosts)) // This operation doesn't normally take very long (and happens pretty often), so we only record the final event err := s.balancer.UpdateLoadBalancer(context.TODO(), s.clusterName, service, hosts) if err == nil { diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index df2b1e09076..f1dff9e394d 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -29,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -72,11 +73,9 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { cloud.Region = region kubeClient := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) serviceInformer := informerFactory.Core().V1().Services() nodeInformer := informerFactory.Core().V1().Nodes() - broadcaster := record.NewBroadcaster() broadcaster.StartStructuredLogging(0) broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) @@ -90,9 +89,10 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, eventBroadcaster: broadcaster, eventRecorder: recorder, - nodeLister: nodeInformer.Lister(), + nodeLister: newFakeNodeLister(nil), nodeListerSynced: nodeInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + nodeSyncCh: make(chan interface{}, 1), } balancer, _ := cloud.LoadBalancer() @@ -416,38 +416,43 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) { // TODO: Finish converting and update comments func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { nodes := []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "node0"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "node73"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node0"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node73"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}, } table := []struct { + desc string services []*v1.Service expectedUpdateCalls []fakecloud.UpdateBalancerCall + workers int }{ { - // No services present: no calls should be made. + desc: "No services present: no calls should be made.", services: []*v1.Service{}, expectedUpdateCalls: nil, + workers: 1, }, { - // Services do not have external load balancers: no calls should be made. + desc: "Services do not have external load balancers: no calls should be made.", services: []*v1.Service{ newService("s0", "111", v1.ServiceTypeClusterIP), newService("s1", "222", v1.ServiceTypeNodePort), }, expectedUpdateCalls: nil, + workers: 2, }, { - // Services does have an external load balancer: one call should be made. + desc: "Services does have an external load balancer: one call should be made.", services: []*v1.Service{ newService("s0", "333", v1.ServiceTypeLoadBalancer), }, expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ {Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, + workers: 3, }, { - // Three services have an external load balancer: three calls. + desc: "Three services have an external load balancer: three calls.", services: []*v1.Service{ newService("s0", "444", v1.ServiceTypeLoadBalancer), newService("s1", "555", v1.ServiceTypeLoadBalancer), @@ -458,9 +463,10 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s1", "555", v1.ServiceTypeLoadBalancer), Hosts: nodes}, {Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, + workers: 4, }, { - // Two services have an external load balancer and two don't: two calls. + desc: "Two services have an external load balancer and two don't: two calls.", services: []*v1.Service{ newService("s0", "777", v1.ServiceTypeNodePort), newService("s1", "888", v1.ServiceTypeLoadBalancer), @@ -471,9 +477,10 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: nodes}, {Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, + workers: 5, }, { - // One service has an external load balancer and one is nil: one call. + desc: "One service has an external load balancer and one is nil: one call.", services: []*v1.Service{ newService("s0", "234", v1.ServiceTypeLoadBalancer), nil, @@ -481,21 +488,172 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ {Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, + workers: 6, + }, + { + desc: "Four services have external load balancer with only 2 workers", + services: []*v1.Service{ + newService("s0", "777", v1.ServiceTypeLoadBalancer), + newService("s1", "888", v1.ServiceTypeLoadBalancer), + newService("s3", "999", v1.ServiceTypeLoadBalancer), + newService("s4", "123", v1.ServiceTypeLoadBalancer), + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: newService("s0", "777", v1.ServiceTypeLoadBalancer), Hosts: nodes}, + {Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: nodes}, + {Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes}, + {Service: newService("s4", "123", v1.ServiceTypeLoadBalancer), Hosts: nodes}, + }, + workers: 2, }, } for _, item := range table { - controller, cloud, _ := newController() + t.Run(item.desc, func(t *testing.T) { + controller, cloud, _ := newController() + controller.nodeLister = newFakeNodeLister(nil, nodes...) - var services []*v1.Service - services = append(services, item.services...) + if servicesToRetry := controller.updateLoadBalancerHosts(item.services, item.workers); servicesToRetry != nil { + t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) + } + compareUpdateCalls(t, item.expectedUpdateCalls, cloud.UpdateCalls) + }) + } +} - if servicesToRetry := controller.updateLoadBalancerHosts(services, nodes); servicesToRetry != nil { - t.Errorf("unexpected servicesToRetry: %v", servicesToRetry) +func TestNodeChangesInExternalLoadBalancer(t *testing.T) { + node1 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node0"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} + node2 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} + node3 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node73"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} + node4 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node4"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} + + services := []*v1.Service{ + newService("s0", "777", v1.ServiceTypeLoadBalancer), + newService("s1", "888", v1.ServiceTypeLoadBalancer), + newService("s3", "999", v1.ServiceTypeLoadBalancer), + newService("s4", "123", v1.ServiceTypeLoadBalancer), + } + + controller, cloud, _ := newController() + for _, tc := range []struct { + desc string + nodes []*v1.Node + expectedUpdateCalls []fakecloud.UpdateBalancerCall + worker int + nodeListerErr error + expectedRetryServices []*v1.Service + }{ + { + desc: "only 1 node", + nodes: []*v1.Node{node1}, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: newService("s0", "777", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1}}, + {Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1}}, + {Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1}}, + {Service: newService("s4", "123", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1}}, + }, + worker: 3, + nodeListerErr: nil, + expectedRetryServices: []*v1.Service{}, + }, + { + desc: "2 nodes", + nodes: []*v1.Node{node1, node2}, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: newService("s0", "777", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2}}, + {Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2}}, + {Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2}}, + {Service: newService("s4", "123", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2}}, + }, + worker: 1, + nodeListerErr: nil, + expectedRetryServices: []*v1.Service{}, + }, + { + desc: "4 nodes", + nodes: []*v1.Node{node1, node2, node3, node4}, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: newService("s0", "777", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2, node3, node4}}, + {Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2, node3, node4}}, + {Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2, node3, node4}}, + {Service: newService("s4", "123", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2, node3, node4}}, + }, + worker: 3, + nodeListerErr: nil, + expectedRetryServices: []*v1.Service{}, + }, + { + desc: "error occur during sync", + nodes: []*v1.Node{node1, node2, node3, node4}, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, + worker: 3, + nodeListerErr: fmt.Errorf("random error"), + expectedRetryServices: services, + }, + { + desc: "error occur during sync with 1 workers", + nodes: []*v1.Node{node1, node2, node3, node4}, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, + worker: 1, + nodeListerErr: fmt.Errorf("random error"), + expectedRetryServices: services, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + controller.nodeLister = newFakeNodeLister(tc.nodeListerErr, tc.nodes...) + servicesToRetry := controller.updateLoadBalancerHosts(services, tc.worker) + compareServiceList(t, tc.expectedRetryServices, servicesToRetry) + compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls) + cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{} + }) + } +} + +// compareServiceList compares if both left and right inputs contains the same service list despite the order. +func compareServiceList(t *testing.T, left, right []*v1.Service) { + if len(left) != len(right) { + t.Errorf("expect len(left) == len(right), but got %v != %v", len(left), len(right)) + } + + mismatch := false + for _, l := range left { + found := false + for _, r := range right { + if reflect.DeepEqual(l, r) { + found = true + } } - if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) { - t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls) + if !found { + mismatch = true + break } } + if mismatch { + t.Errorf("expected service list to match, expected %+v, got %+v", left, right) + } +} + +// compareUpdateCalls compares if the same update calls were made in both left and right inputs despite the order. +func compareUpdateCalls(t *testing.T, left, right []fakecloud.UpdateBalancerCall) { + if len(left) != len(right) { + t.Errorf("expect len(left) == len(right), but got %v != %v", len(left), len(right)) + } + + mismatch := false + for _, l := range left { + found := false + for _, r := range right { + if reflect.DeepEqual(l, r) { + found = true + } + } + if !found { + mismatch = true + break + } + } + if mismatch { + t.Errorf("expected update calls to match, expected %+v, got %+v", left, right) + } } func TestProcessServiceCreateOrUpdate(t *testing.T) { @@ -1138,7 +1296,7 @@ func TestServiceCache(t *testing.T) { } } -//Test a utility functions as it's not easy to unit test nodeSyncLoop directly +//Test a utility functions as it's not easy to unit test nodeSyncInternal directly func TestNodeSlicesEqualForLB(t *testing.T) { numNodes := 10 nArray := make([]*v1.Node, numNodes) @@ -1577,3 +1735,92 @@ func Test_shouldSyncNode(t *testing.T) { }) } } + +func TestTriggerNodeSync(t *testing.T) { + controller, _, _ := newController() + + tryReadFromChannel(t, controller.nodeSyncCh, false) + controller.triggerNodeSync() + tryReadFromChannel(t, controller.nodeSyncCh, true) + tryReadFromChannel(t, controller.nodeSyncCh, false) + tryReadFromChannel(t, controller.nodeSyncCh, false) + tryReadFromChannel(t, controller.nodeSyncCh, false) + controller.triggerNodeSync() + controller.triggerNodeSync() + controller.triggerNodeSync() + tryReadFromChannel(t, controller.nodeSyncCh, true) + tryReadFromChannel(t, controller.nodeSyncCh, false) + tryReadFromChannel(t, controller.nodeSyncCh, false) + tryReadFromChannel(t, controller.nodeSyncCh, false) +} + +func TestMarkAndUnmarkFullSync(t *testing.T) { + controller, _, _ := newController() + if controller.needFullSync != false { + t.Errorf("expect controller.needFullSync to be false, but got true") + } + + ret := controller.needFullSyncAndUnmark() + if ret != false { + t.Errorf("expect ret == false, but got true") + } + + ret = controller.needFullSyncAndUnmark() + if ret != false { + t.Errorf("expect ret == false, but got true") + } + controller.needFullSync = true + ret = controller.needFullSyncAndUnmark() + if ret != true { + t.Errorf("expect ret == true, but got false") + } + ret = controller.needFullSyncAndUnmark() + if ret != false { + t.Errorf("expect ret == false, but got true") + } +} + +func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) { + select { + case _, ok := <-ch: + if !ok { + t.Errorf("The channel is closed") + } + if !expectValue { + t.Errorf("Does not expect value from the channel, but got a value") + } + default: + if expectValue { + t.Errorf("Expect value from the channel, but got none") + } + } +} + +type fakeNodeLister struct { + cache []*v1.Node + err error +} + +func newFakeNodeLister(err error, nodes ...*v1.Node) *fakeNodeLister { + ret := &fakeNodeLister{} + ret.cache = nodes + ret.err = err + return ret +} + +// List lists all Nodes in the indexer. +// Objects returned here must be treated as read-only. +func (l *fakeNodeLister) List(selector labels.Selector) (ret []*v1.Node, err error) { + return l.cache, l.err +} + +// Get retrieves the Node from the index for a given name. +// Objects returned here must be treated as read-only. +func (l *fakeNodeLister) Get(name string) (*v1.Node, error) { + for _, node := range l.cache { + if node.Name == name { + return node, nil + } + } + return nil, nil +} diff --git a/staging/src/k8s.io/cloud-provider/fake/fake.go b/staging/src/k8s.io/cloud-provider/fake/fake.go index 7748dbfd7e5..48f2684b7ca 100644 --- a/staging/src/k8s.io/cloud-provider/fake/fake.go +++ b/staging/src/k8s.io/cloud-provider/fake/fake.go @@ -224,6 +224,8 @@ func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, serv // It adds an entry "update" into the internal method call record. func (f *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { f.addCall("update") + f.Lock.Lock() + defer f.Lock.Unlock() f.UpdateCalls = append(f.UpdateCalls, UpdateBalancerCall{service, nodes}) return f.Err }