diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 5a6a5079f82..f731e0c7a86 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -93,10 +93,10 @@ type Controller struct { serviceQueue workqueue.RateLimitingInterface nodeQueue workqueue.RateLimitingInterface // lastSyncedNodes is used when reconciling node state and keeps track of - // the last synced set of nodes. This field is concurrently safe because the - // nodeQueue is serviced by only one go-routine, so node events are not - // processed concurrently. - lastSyncedNodes []*v1.Node + // the last synced set of nodes per service key. This is accessed from the + // service and node controllers, hence it is protected by a lock. + lastSyncedNodes map[string][]*v1.Node + lastSyncedNodesLock sync.Mutex } // New returns a new service controller to keep cloud provider service resources @@ -124,7 +124,7 @@ func New( nodeListerSynced: nodeInformer.Informer().HasSynced, serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), - lastSyncedNodes: []*v1.Node{}, + lastSyncedNodes: make(map[string][]*v1.Node), } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -439,16 +439,32 @@ func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service if err != nil { return nil, err } - // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. if len(nodes) == 0 { c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") } - - // - Only one protocol supported per service + c.storeLastSyncedNodes(service, nodes) // - Not all cloud providers support all protocols and the next step is expected to return // an error for unsupported protocols - return c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes) + status, err := c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes) + if err != nil { + return nil, err + } + return status, nil +} + +func (c *Controller) storeLastSyncedNodes(svc *v1.Service, nodes []*v1.Node) { + c.lastSyncedNodesLock.Lock() + defer c.lastSyncedNodesLock.Unlock() + key, _ := cache.MetaNamespaceKeyFunc(svc) + c.lastSyncedNodes[key] = nodes +} + +func (c *Controller) getLastSyncedNodes(svc *v1.Service) []*v1.Node { + c.lastSyncedNodesLock.Lock() + defer c.lastSyncedNodesLock.Unlock() + key, _ := cache.MetaNamespaceKeyFunc(svc) + return c.lastSyncedNodes[key] } // ListKeys implements the interface required by DeltaFIFO to list the keys we @@ -662,15 +678,6 @@ func portEqualForLB(x, y *v1.ServicePort) bool { return true } -func serviceKeys(services []*v1.Service) sets.String { - ret := sets.NewString() - for _, service := range services { - key, _ := cache.MetaNamespaceKeyFunc(service) - ret.Insert(key) - } - return ret -} - func nodeNames(nodes []*v1.Node) sets.String { ret := sets.NewString() for _, node := range nodes { @@ -737,19 +744,29 @@ func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String { // load balancers and finished doing it successfully, or didn't try to at all because // there's no need. This function returns true if we tried to update load balancers and // failed, indicating to the caller that we should try again. -func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool { +func (c *Controller) nodeSyncService(svc *v1.Service) bool { const retSuccess = false const retNeedRetry = true if svc == nil || !wantsLoadBalancer(svc) { return retSuccess } + newNodes, err := listWithPredicates(c.nodeLister) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) + nodeSyncErrorCount.Inc() + return retNeedRetry + } newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...) - oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...) - + oldNodes := filterWithPredicates(c.getLastSyncedNodes(svc), getNodePredicatesForService(svc)...) + // Store last synced nodes without actually determining if we successfully + // synced them or not. Failed node syncs are passed off to retries in the + // service queue, so no need to wait. If we don't store it now, we risk + // re-syncing all LBs twice, one from another sync in the node sync and + // from the service sync + c.storeLastSyncedNodes(svc, newNodes) if nodesSufficientlyEqual(oldNodes, newNodes) { return retSuccess } - klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil { runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err)) @@ -794,20 +811,12 @@ func nodesSufficientlyEqual(oldNodes, newNodes []*v1.Node) bool { func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) - // Include all nodes and let nodeSyncService filter and figure out if - // the update is relevant for the service in question. - nodes, err := listWithPredicates(c.nodeLister) - if err != nil { - runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) - return serviceKeys(services) - } - // lock for servicesToRetry servicesToRetry = sets.NewString() lock := sync.Mutex{} doWork := func(piece int) { - if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry { + if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry { return } lock.Lock() @@ -816,7 +825,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 servicesToRetry.Insert(key) } workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) - c.lastSyncedNodes = nodes klog.V(4).Infof("Finished updateLoadBalancerHosts") return servicesToRetry } diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 2fa2a7fb35a..95167142f9b 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -23,6 +23,7 @@ import ( "reflect" "sort" "strings" + "sync" "testing" "time" @@ -146,13 +147,16 @@ func defaultExternalService() *v1.Service { return newService("external-balancer", v1.ServiceTypeLoadBalancer) } -func alwaysReady() bool { return true } - -func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { +// newController creates a new service controller. Callers have the option to +// specify `stopChan` for test cases which might require running the +// node/service informers and reacting to resource events. Callers can also +// specify `objects` which represent the initial state of objects, used to +// populate the client set / informer cache at start-up. +func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) { cloud := &fakecloud.Cloud{} cloud.Region = region - kubeClient := fake.NewSimpleClientset() + kubeClient := fake.NewSimpleClientset(objects...) informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) serviceInformer := informerFactory.Core().V1().Services() nodeInformer := informerFactory.Core().V1().Nodes() @@ -162,26 +166,36 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) controller := &Controller{ - cloud: cloud, - kubeClient: kubeClient, - clusterName: "test-cluster", - cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, - eventBroadcaster: broadcaster, - eventRecorder: recorder, - nodeLister: newFakeNodeLister(nil), - nodeListerSynced: nodeInformer.Informer().HasSynced, - serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), - lastSyncedNodes: []*v1.Node{}, + cloud: cloud, + kubeClient: kubeClient, + clusterName: "test-cluster", + eventBroadcaster: broadcaster, + eventRecorder: recorder, + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), + lastSyncedNodes: make(map[string][]*v1.Node), } + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + + serviceMap := make(map[string]*cachedService) + services, _ := serviceInformer.Lister().List(labels.Everything()) + for _, service := range services { + serviceMap[service.Name] = &cachedService{ + state: service, + } + } + + controller.cache = &serviceCache{serviceMap: serviceMap} + balancer, _ := cloud.LoadBalancer() controller.balancer = balancer - controller.serviceLister = serviceInformer.Lister() - - controller.nodeListerSynced = alwaysReady - controller.serviceListerSynced = alwaysReady controller.eventRecorder = record.NewFakeRecorder(100) cloud.Calls = nil // ignore any cloud calls made in init() @@ -265,7 +279,7 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller, cloud, client := newController() + controller, cloud, client := newController(nil) cloud.Exists = tc.lbExists key := fmt.Sprintf("%s/%s", tc.service.Namespace, tc.service.Name) if _, err := client.CoreV1().Services(tc.service.Namespace).Create(ctx, tc.service, metav1.CreateOptions{}); err != nil { @@ -439,7 +453,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { t.Run(item.desc, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) controller.nodeLister = newFakeNodeLister(nil, nodes...) if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 { t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) @@ -590,11 +604,15 @@ func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, }} { t.Run(tc.desc, func(t *testing.T) { - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller.lastSyncedNodes = tc.initialState + for _, svc := range services { + key, _ := cache.MetaNamespaceKeyFunc(svc) + controller.lastSyncedNodes[key] = tc.initialState + } for _, state := range tc.stateChanges { setupState := func() { @@ -762,11 +780,15 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, }} { t.Run(tc.desc, func(t *testing.T) { - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller.lastSyncedNodes = tc.initialState + for _, svc := range services { + key, _ := cache.MetaNamespaceKeyFunc(svc) + controller.lastSyncedNodes[key] = tc.initialState + } for _, state := range tc.stateChanges { setupState := func() { @@ -806,7 +828,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName())) } - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) for _, tc := range []struct { desc string nodes []*v1.Node @@ -901,8 +923,28 @@ func compareUpdateCalls(t *testing.T, left, right []fakecloud.UpdateBalancerCall } } +// compareHostSets compares if the nodes in left are in right, despite the order. +func compareHostSets(t *testing.T, left, right []*v1.Node) bool { + if len(left) != len(right) { + return false + } + for _, lHost := range left { + found := false + for _, rHost := range right { + if reflect.DeepEqual(lHost, rHost) { + found = true + break + } + } + if !found { + return false + } + } + return true +} + func TestNodesNotEqual(t *testing.T) { - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) services := []*v1.Service{ newService("s0", v1.ServiceTypeLoadBalancer), @@ -952,7 +994,12 @@ func TestNodesNotEqual(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...) - controller.lastSyncedNodes = tc.lastSyncNodes + + for _, svc := range services { + key, _ := cache.MetaNamespaceKeyFunc(svc) + controller.lastSyncedNodes[key] = tc.lastSyncNodes + } + controller.updateLoadBalancerHosts(ctx, services, 5) compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls) cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{} @@ -961,7 +1008,7 @@ func TestNodesNotEqual(t *testing.T) { } func TestProcessServiceCreateOrUpdate(t *testing.T) { - controller, _, client := newController() + controller, _, client := newController(nil) //A pair of old and new loadbalancer IP address oldLBIP := "192.168.1.1" @@ -1076,7 +1123,7 @@ func TestProcessServiceCreateOrUpdateK8sError(t *testing.T) { svc := newService(svcName, v1.ServiceTypeLoadBalancer) // Preset finalizer so k8s error only happens when patching status. svc.Finalizers = []string{servicehelper.LoadBalancerCleanupFinalizer} - controller, _, client := newController() + controller, _, client := newController(nil) client.PrependReactor("patch", "services", func(action core.Action) (bool, runtime.Object, error) { return true, nil, tc.k8sErr }) @@ -1120,7 +1167,7 @@ func TestSyncService(t *testing.T) { testName: "if an invalid service name is synced", key: "invalid/key/string", updateFn: func() { - controller, _, _ = newController() + controller, _, _ = newController(nil) }, expectedFn: func(e error) error { //TODO: should find a way to test for dependent package errors in such a way that it won't break @@ -1152,7 +1199,7 @@ func TestSyncService(t *testing.T) { key: "external-balancer", updateFn: func() { testSvc := defaultExternalService() - controller, _, _ = newController() + controller, _, _ = newController(nil) controller.enqueueService(testSvc) svc := controller.cache.getOrCreate("external-balancer") svc.state = testSvc @@ -1258,7 +1305,7 @@ func TestProcessServiceDeletion(t *testing.T) { defer cancel() //Create a new controller. - controller, cloud, _ = newController() + controller, cloud, _ = newController(nil) tc.updateFn(controller) obtainedErr := controller.processServiceDeletion(ctx, svcKey) if err := tc.expectedFn(obtainedErr); err != nil { @@ -1333,8 +1380,96 @@ func TestNeedsCleanup(t *testing.T) { } -func TestNeedsUpdate(t *testing.T) { +// This tests a service update while a slow node sync is happening. If we have multiple +// services to process from a node sync: each service will experience a sync delta. +// If a new Node is added and a service is synced while this happens: we want to +// make sure that the slow node sync never removes the Node from LB set because it +// has stale data. +func TestSlowNodeSync(t *testing.T) { + stopCh, syncServiceDone, syncService := make(chan struct{}), make(chan string), make(chan string) + defer close(stopCh) + defer close(syncService) + node1 := makeNode(tweakName("node1")) + node2 := makeNode(tweakName("node2")) + node3 := makeNode(tweakName("node3")) + service1 := newService("service1", v1.ServiceTypeLoadBalancer) + service2 := newService("service2", v1.ServiceTypeLoadBalancer) + + sKey1, _ := cache.MetaNamespaceKeyFunc(service1) + sKey2, _ := cache.MetaNamespaceKeyFunc(service2) + serviceKeys := sets.New(sKey1, sKey2) + + controller, cloudProvider, kubeClient := newController(stopCh, node1, node2, service1, service2) + cloudProvider.UpdateCallCb = func(update fakecloud.UpdateBalancerCall) { + key, _ := cache.MetaNamespaceKeyFunc(update.Service) + impactedService := serviceKeys.Difference(sets.New(key)).UnsortedList()[0] + syncService <- impactedService + <-syncServiceDone + + } + cloudProvider.EnsureCallCb = func(update fakecloud.UpdateBalancerCall) { + syncServiceDone <- update.Service.Name + } + // Two update calls are expected. This is because this test calls + // controller.syncNodes once with two existing services, but with one + // controller.syncService while that is happening. The end result is + // therefore two update calls - since the second controller.syncNodes won't + // trigger an update call because the syncService already did. Each update + // call takes cloudProvider.RequestDelay to process. The test asserts that + // the order of the Hosts defined by the update calls is respected, but + // doesn't necessarily assert the order of the Service. This is because the + // controller implementation doesn't use a deterministic order when syncing + // services. The test therefor works out which service is impacted by the + // slow node sync (which will be whatever service is not synced first) and + // then validates that the Hosts for each update call is respected. + expectedUpdateCalls := []fakecloud.UpdateBalancerCall{ + // First update call for first service from controller.syncNodes + {Service: service1, Hosts: []*v1.Node{node1, node2}}, + } + expectedEnsureCalls := []fakecloud.UpdateBalancerCall{ + // Second update call for impacted service from controller.syncService + {Service: service2, Hosts: []*v1.Node{node1, node2, node3}}, + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + controller.syncNodes(context.TODO(), 1) + }() + + key := <-syncService + if _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil { + t.Fatalf("error creating node3, err: %v", err) + } + + // Sync the service + if err := controller.syncService(context.TODO(), key); err != nil { + t.Fatalf("unexpected service sync error, err: %v", err) + } + + wg.Wait() + + if len(expectedUpdateCalls) != len(cloudProvider.UpdateCalls) { + t.Fatalf("unexpected amount of update calls, expected: %v, got: %v", len(expectedUpdateCalls), len(cloudProvider.UpdateCalls)) + } + for idx, update := range cloudProvider.UpdateCalls { + if !compareHostSets(t, expectedUpdateCalls[idx].Hosts, update.Hosts) { + t.Fatalf("unexpected updated hosts for update: %v, expected: %v, got: %v", idx, expectedUpdateCalls[idx].Hosts, update.Hosts) + } + } + if len(expectedEnsureCalls) != len(cloudProvider.EnsureCalls) { + t.Fatalf("unexpected amount of ensure calls, expected: %v, got: %v", len(expectedEnsureCalls), len(cloudProvider.EnsureCalls)) + } + for idx, ensure := range cloudProvider.EnsureCalls { + if !compareHostSets(t, expectedEnsureCalls[idx].Hosts, ensure.Hosts) { + t.Fatalf("unexpected updated hosts for ensure: %v, expected: %v, got: %v", idx, expectedEnsureCalls[idx].Hosts, ensure.Hosts) + } + } +} + +func TestNeedsUpdate(t *testing.T) { testCases := []struct { testName string //Name of the test case updateFn func() (*v1.Service, *v1.Service) //Function to update the service object @@ -1494,7 +1629,7 @@ func TestNeedsUpdate(t *testing.T) { expectedNeedsUpdate: true, }} - controller, _, _ := newController() + controller, _, _ := newController(nil) for _, tc := range testCases { oldSvc, newSvc := tc.updateFn() obtainedResult := controller.needsUpdate(oldSvc, newSvc) @@ -2441,7 +2576,7 @@ func TestServiceQueueDelay(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - controller, cloud, client := newController() + controller, cloud, client := newController(nil) queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} controller.serviceQueue = queue cloud.Err = tc.lbCloudErr diff --git a/staging/src/k8s.io/cloud-provider/fake/fake.go b/staging/src/k8s.io/cloud-provider/fake/fake.go index 7fcda129569..14a2c2f9b09 100644 --- a/staging/src/k8s.io/cloud-provider/fake/fake.go +++ b/staging/src/k8s.io/cloud-provider/fake/fake.go @@ -73,24 +73,29 @@ type Cloud struct { ErrShutdownByProviderID error MetadataErr error - Calls []string - Addresses []v1.NodeAddress - addressesMux sync.Mutex - ExtID map[types.NodeName]string - ExtIDErr map[types.NodeName]error - InstanceTypes map[types.NodeName]string - Machines []types.NodeName - NodeResources *v1.NodeResources - ClusterList []string - MasterName string - ExternalIP net.IP - Balancers map[string]Balancer - UpdateCalls []UpdateBalancerCall - RouteMap map[string]*Route - Lock sync.Mutex - Provider string - ProviderID map[types.NodeName]string - addCallLock sync.Mutex + Calls []string + Addresses []v1.NodeAddress + addressesMux sync.Mutex + ExtID map[types.NodeName]string + ExtIDErr map[types.NodeName]error + InstanceTypes map[types.NodeName]string + Machines []types.NodeName + NodeResources *v1.NodeResources + ClusterList []string + MasterName string + ExternalIP net.IP + Balancers map[string]Balancer + updateCallLock sync.Mutex + UpdateCalls []UpdateBalancerCall + ensureCallLock sync.Mutex + EnsureCalls []UpdateBalancerCall + EnsureCallCb func(UpdateBalancerCall) + UpdateCallCb func(UpdateBalancerCall) + RouteMap map[string]*Route + Lock sync.Mutex + Provider string + ProviderID map[types.NodeName]string + addCallLock sync.Mutex cloudprovider.Zone VolumeLabelMap map[string]map[string]string @@ -201,6 +206,7 @@ func (f *Cloud) GetLoadBalancerName(ctx context.Context, clusterName string, ser // It adds an entry "create" into the internal method call record. func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { f.addCall("create") + f.markEnsureCall(service, nodes) if f.Balancers == nil { f.Balancers = make(map[string]Balancer) } @@ -222,13 +228,31 @@ func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, serv return status, f.Err } +func (f *Cloud) markUpdateCall(service *v1.Service, nodes []*v1.Node) { + f.updateCallLock.Lock() + defer f.updateCallLock.Unlock() + update := UpdateBalancerCall{service, nodes} + f.UpdateCalls = append(f.UpdateCalls, update) + if f.UpdateCallCb != nil { + f.UpdateCallCb(update) + } +} + +func (f *Cloud) markEnsureCall(service *v1.Service, nodes []*v1.Node) { + f.ensureCallLock.Lock() + defer f.ensureCallLock.Unlock() + update := UpdateBalancerCall{service, nodes} + f.EnsureCalls = append(f.EnsureCalls, update) + if f.EnsureCallCb != nil { + f.EnsureCallCb(update) + } +} + // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // It adds an entry "update" into the internal method call record. func (f *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { f.addCall("update") - f.Lock.Lock() - defer f.Lock.Unlock() - f.UpdateCalls = append(f.UpdateCalls, UpdateBalancerCall{service, nodes}) + f.markUpdateCall(service, nodes) return f.Err }