From a8673fa5b4af3ca79bf14b236c50ef9a6b64f386 Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Mon, 9 Oct 2023 17:29:22 +0200 Subject: [PATCH 1/4] KCCM: add test validating slow node sync issue --- .../controllers/service/controller_test.go | 211 +++++++++++++++--- .../src/k8s.io/cloud-provider/fake/fake.go | 28 ++- 2 files changed, 203 insertions(+), 36 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 2fa2a7fb35a..bcc902f4930 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -23,6 +23,7 @@ import ( "reflect" "sort" "strings" + "sync" "testing" "time" @@ -146,13 +147,16 @@ func defaultExternalService() *v1.Service { return newService("external-balancer", v1.ServiceTypeLoadBalancer) } -func alwaysReady() bool { return true } - -func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { +// newController creates a new service controller. Callers have the option to +// specify `stopChan` for test cases which might require running the +// node/service informers and reacting to resource events. Callers can also +// specify `objects` which represent the initial state of objects, used to +// populate the client set / informer cache at start-up. +func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) { cloud := &fakecloud.Cloud{} cloud.Region = region - kubeClient := fake.NewSimpleClientset() + kubeClient := fake.NewSimpleClientset(objects...) informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) serviceInformer := informerFactory.Core().V1().Services() nodeInformer := informerFactory.Core().V1().Nodes() @@ -162,26 +166,36 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) controller := &Controller{ - cloud: cloud, - kubeClient: kubeClient, - clusterName: "test-cluster", - cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, - eventBroadcaster: broadcaster, - eventRecorder: recorder, - nodeLister: newFakeNodeLister(nil), - nodeListerSynced: nodeInformer.Informer().HasSynced, - serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), - lastSyncedNodes: []*v1.Node{}, + cloud: cloud, + kubeClient: kubeClient, + clusterName: "test-cluster", + eventBroadcaster: broadcaster, + eventRecorder: recorder, + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), + lastSyncedNodes: []*v1.Node{}, } + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + + serviceMap := make(map[string]*cachedService) + services, _ := serviceInformer.Lister().List(labels.Everything()) + for _, service := range services { + serviceMap[service.Name] = &cachedService{ + state: service, + } + } + + controller.cache = &serviceCache{serviceMap: serviceMap} + balancer, _ := cloud.LoadBalancer() controller.balancer = balancer - controller.serviceLister = serviceInformer.Lister() - - controller.nodeListerSynced = alwaysReady - controller.serviceListerSynced = alwaysReady controller.eventRecorder = record.NewFakeRecorder(100) cloud.Calls = nil // ignore any cloud calls made in init() @@ -265,7 +279,7 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller, cloud, client := newController() + controller, cloud, client := newController(nil) cloud.Exists = tc.lbExists key := fmt.Sprintf("%s/%s", tc.service.Namespace, tc.service.Name) if _, err := client.CoreV1().Services(tc.service.Namespace).Create(ctx, tc.service, metav1.CreateOptions{}); err != nil { @@ -439,7 +453,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { t.Run(item.desc, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) controller.nodeLister = newFakeNodeLister(nil, nodes...) if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 { t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) @@ -590,7 +604,8 @@ func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, }} { t.Run(tc.desc, func(t *testing.T) { - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -762,7 +777,8 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, }} { t.Run(tc.desc, func(t *testing.T) { - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -806,7 +822,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName())) } - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) for _, tc := range []struct { desc string nodes []*v1.Node @@ -901,8 +917,28 @@ func compareUpdateCalls(t *testing.T, left, right []fakecloud.UpdateBalancerCall } } +// compareHostSets compares if the nodes in left are in right, despite the order. +func compareHostSets(t *testing.T, left, right []*v1.Node) bool { + if len(left) != len(right) { + return false + } + for _, lHost := range left { + found := false + for _, rHost := range right { + if reflect.DeepEqual(lHost, rHost) { + found = true + break + } + } + if !found { + return false + } + } + return true +} + func TestNodesNotEqual(t *testing.T) { - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) services := []*v1.Service{ newService("s0", v1.ServiceTypeLoadBalancer), @@ -952,7 +988,9 @@ func TestNodesNotEqual(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...) + controller.lastSyncedNodes = tc.lastSyncNodes + controller.updateLoadBalancerHosts(ctx, services, 5) compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls) cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{} @@ -961,7 +999,7 @@ func TestNodesNotEqual(t *testing.T) { } func TestProcessServiceCreateOrUpdate(t *testing.T) { - controller, _, client := newController() + controller, _, client := newController(nil) //A pair of old and new loadbalancer IP address oldLBIP := "192.168.1.1" @@ -1076,7 +1114,7 @@ func TestProcessServiceCreateOrUpdateK8sError(t *testing.T) { svc := newService(svcName, v1.ServiceTypeLoadBalancer) // Preset finalizer so k8s error only happens when patching status. svc.Finalizers = []string{servicehelper.LoadBalancerCleanupFinalizer} - controller, _, client := newController() + controller, _, client := newController(nil) client.PrependReactor("patch", "services", func(action core.Action) (bool, runtime.Object, error) { return true, nil, tc.k8sErr }) @@ -1120,7 +1158,7 @@ func TestSyncService(t *testing.T) { testName: "if an invalid service name is synced", key: "invalid/key/string", updateFn: func() { - controller, _, _ = newController() + controller, _, _ = newController(nil) }, expectedFn: func(e error) error { //TODO: should find a way to test for dependent package errors in such a way that it won't break @@ -1152,7 +1190,7 @@ func TestSyncService(t *testing.T) { key: "external-balancer", updateFn: func() { testSvc := defaultExternalService() - controller, _, _ = newController() + controller, _, _ = newController(nil) controller.enqueueService(testSvc) svc := controller.cache.getOrCreate("external-balancer") svc.state = testSvc @@ -1258,7 +1296,7 @@ func TestProcessServiceDeletion(t *testing.T) { defer cancel() //Create a new controller. - controller, cloud, _ = newController() + controller, cloud, _ = newController(nil) tc.updateFn(controller) obtainedErr := controller.processServiceDeletion(ctx, svcKey) if err := tc.expectedFn(obtainedErr); err != nil { @@ -1333,8 +1371,115 @@ func TestNeedsCleanup(t *testing.T) { } -func TestNeedsUpdate(t *testing.T) { +// This tests a service update while a slow node sync is happening. If we have multiple +// services to process from a node sync: each service will experience a sync delta. +// If a new Node is added and a service is synced while this happens: we want to +// make sure that the slow node sync never removes the Node from LB set because it +// has stale data. +func TestSlowNodeSync(t *testing.T) { + stopCh, updateCallCh := make(chan struct{}), make(chan fakecloud.UpdateBalancerCall) + defer close(stopCh) + defer close(updateCallCh) + duration := time.Millisecond + + syncService := make(chan string) + + node1 := makeNode(tweakName("node1")) + node2 := makeNode(tweakName("node2")) + node3 := makeNode(tweakName("node3")) + service1 := newService("service1", v1.ServiceTypeLoadBalancer) + service2 := newService("service2", v1.ServiceTypeLoadBalancer) + + sKey1, _ := cache.MetaNamespaceKeyFunc(service1) + sKey2, _ := cache.MetaNamespaceKeyFunc(service2) + serviceKeys := sets.New(sKey1, sKey2) + + controller, cloudProvider, kubeClient := newController(stopCh, node1, node2, service1, service2) + cloudProvider.RequestDelay = 4 * duration + cloudProvider.UpdateCallCb = func(update fakecloud.UpdateBalancerCall) { + updateCallCh <- update + } + cloudProvider.EnsureCallCb = func(update fakecloud.UpdateBalancerCall) { + updateCallCh <- update + } + + // Three update calls are expected. This is because this test calls + // controller.syncNodes once with two existing services, so we will have an + // update call for each service, and controller.syncService once. The end + // result is therefore three update calls. Each update call takes + // cloudProvider.RequestDelay to process. The test asserts that the order of + // the Hosts defined by the update calls is respected, but doesn't + // necessarily assert the order of the Service. This is because the + // controller implementation doesn't use a deterministic order when syncing + // services. The test therefor works out which service is impacted by the + // slow node sync (which will be whatever service is not synced first) and + // then validates that the Hosts for each update call is respected. + expectedUpdateCalls := []fakecloud.UpdateBalancerCall{ + // First update call for first service from controller.syncNodes + {Service: service1, Hosts: []*v1.Node{node1, node2}}, + // Second update call for impacted service from controller.syncService + {Service: service2, Hosts: []*v1.Node{node1, node2, node3}}, + // Third update call for second service from controller.syncNodes. Here + // is the problem: this update call removes the previously added node3. + {Service: service2, Hosts: []*v1.Node{node1, node2}}, + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + controller.syncNodes(context.TODO(), 1) + }() + + wg.Add(1) + go func() { + defer wg.Done() + updateCallIdx := 0 + impactedService := "" + for update := range updateCallCh { + // Validate that the call hosts are what we expect + if !compareHostSets(t, expectedUpdateCalls[updateCallIdx].Hosts, update.Hosts) { + t.Errorf("unexpected updated hosts for update: %v, expected: %v, got: %v", updateCallIdx, expectedUpdateCalls[updateCallIdx].Hosts, update.Hosts) + return + } + key, _ := cache.MetaNamespaceKeyFunc(update.Service) + // For call 0: determine impacted service + if updateCallIdx == 0 { + impactedService = serviceKeys.Difference(sets.New(key)).UnsortedList()[0] + syncService <- impactedService + } + // For calls > 0: validate the impacted service + if updateCallIdx > 0 { + if key != impactedService { + t.Error("unexpected impacted service") + return + } + } + if updateCallIdx == len(expectedUpdateCalls)-1 { + return + } + updateCallIdx++ + } + }() + + key := <-syncService + if _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil { + t.Fatalf("error creating node3, err: %v", err) + } + + // Give it some time to update the informer cache, needs to be lower than + // cloudProvider.RequestDelay + time.Sleep(duration) + // Sync the service + if err := controller.syncService(context.TODO(), key); err != nil { + t.Errorf("unexpected service sync error, err: %v", err) + } + + wg.Wait() +} + +func TestNeedsUpdate(t *testing.T) { testCases := []struct { testName string //Name of the test case updateFn func() (*v1.Service, *v1.Service) //Function to update the service object @@ -1494,7 +1639,7 @@ func TestNeedsUpdate(t *testing.T) { expectedNeedsUpdate: true, }} - controller, _, _ := newController() + controller, _, _ := newController(nil) for _, tc := range testCases { oldSvc, newSvc := tc.updateFn() obtainedResult := controller.needsUpdate(oldSvc, newSvc) @@ -2441,7 +2586,7 @@ func TestServiceQueueDelay(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - controller, cloud, client := newController() + controller, cloud, client := newController(nil) queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} controller.serviceQueue = queue cloud.Err = tc.lbCloudErr diff --git a/staging/src/k8s.io/cloud-provider/fake/fake.go b/staging/src/k8s.io/cloud-provider/fake/fake.go index 7fcda129569..13acda485c5 100644 --- a/staging/src/k8s.io/cloud-provider/fake/fake.go +++ b/staging/src/k8s.io/cloud-provider/fake/fake.go @@ -86,6 +86,9 @@ type Cloud struct { ExternalIP net.IP Balancers map[string]Balancer UpdateCalls []UpdateBalancerCall + EnsureCalls []UpdateBalancerCall + EnsureCallCb func(UpdateBalancerCall) + UpdateCallCb func(UpdateBalancerCall) RouteMap map[string]*Route Lock sync.Mutex Provider string @@ -200,6 +203,7 @@ func (f *Cloud) GetLoadBalancerName(ctx context.Context, clusterName string, ser // EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer. // It adds an entry "create" into the internal method call record. func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { + f.markEnsureCall(service, nodes) f.addCall("create") if f.Balancers == nil { f.Balancers = make(map[string]Balancer) @@ -222,13 +226,31 @@ func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, serv return status, f.Err } +func (f *Cloud) markUpdateCall(service *v1.Service, nodes []*v1.Node) { + f.Lock.Lock() + defer f.Lock.Unlock() + update := UpdateBalancerCall{service, nodes} + f.UpdateCalls = append(f.UpdateCalls, update) + if f.UpdateCallCb != nil { + f.UpdateCallCb(update) + } +} + +func (f *Cloud) markEnsureCall(service *v1.Service, nodes []*v1.Node) { + f.Lock.Lock() + defer f.Lock.Unlock() + update := UpdateBalancerCall{service, nodes} + f.EnsureCalls = append(f.EnsureCalls, update) + if f.EnsureCallCb != nil { + f.EnsureCallCb(update) + } +} + // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // It adds an entry "update" into the internal method call record. func (f *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { + f.markUpdateCall(service, nodes) f.addCall("update") - f.Lock.Lock() - defer f.Lock.Unlock() - f.UpdateCalls = append(f.UpdateCalls, UpdateBalancerCall{service, nodes}) return f.Err } From 60338c79d77771dad7dedb0e284b6211531a6e93 Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Mon, 9 Oct 2023 17:29:36 +0200 Subject: [PATCH 2/4] KCCM: fix slow node sync + service update --- .../controllers/service/controller.go | 72 ++++++++++--------- .../controllers/service/controller_test.go | 22 ++++-- 2 files changed, 55 insertions(+), 39 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 5a6a5079f82..27cd0f051df 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -93,10 +93,10 @@ type Controller struct { serviceQueue workqueue.RateLimitingInterface nodeQueue workqueue.RateLimitingInterface // lastSyncedNodes is used when reconciling node state and keeps track of - // the last synced set of nodes. This field is concurrently safe because the - // nodeQueue is serviced by only one go-routine, so node events are not - // processed concurrently. - lastSyncedNodes []*v1.Node + // the last synced set of nodes per service key. This is accessed from the + // service and node controllers, hence it is protected by a lock. + lastSyncedNodes map[string][]*v1.Node + lastSyncedNodesLock sync.Mutex } // New returns a new service controller to keep cloud provider service resources @@ -124,7 +124,7 @@ func New( nodeListerSynced: nodeInformer.Informer().HasSynced, serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), - lastSyncedNodes: []*v1.Node{}, + lastSyncedNodes: make(map[string][]*v1.Node), } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -439,16 +439,32 @@ func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service if err != nil { return nil, err } - // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. if len(nodes) == 0 { c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") } - - // - Only one protocol supported per service // - Not all cloud providers support all protocols and the next step is expected to return // an error for unsupported protocols - return c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes) + status, err := c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes) + if err != nil { + return nil, err + } + c.storeLastSyncedNodes(service, nodes) + return status, nil +} + +func (c *Controller) storeLastSyncedNodes(svc *v1.Service, nodes []*v1.Node) { + c.lastSyncedNodesLock.Lock() + defer c.lastSyncedNodesLock.Unlock() + key, _ := cache.MetaNamespaceKeyFunc(svc) + c.lastSyncedNodes[key] = nodes +} + +func (c *Controller) getLastSyncedNodes(svc *v1.Service) []*v1.Node { + c.lastSyncedNodesLock.Lock() + defer c.lastSyncedNodesLock.Unlock() + key, _ := cache.MetaNamespaceKeyFunc(svc) + return c.lastSyncedNodes[key] } // ListKeys implements the interface required by DeltaFIFO to list the keys we @@ -662,15 +678,6 @@ func portEqualForLB(x, y *v1.ServicePort) bool { return true } -func serviceKeys(services []*v1.Service) sets.String { - ret := sets.NewString() - for _, service := range services { - key, _ := cache.MetaNamespaceKeyFunc(service) - ret.Insert(key) - } - return ret -} - func nodeNames(nodes []*v1.Node) sets.String { ret := sets.NewString() for _, node := range nodes { @@ -737,19 +744,29 @@ func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String { // load balancers and finished doing it successfully, or didn't try to at all because // there's no need. This function returns true if we tried to update load balancers and // failed, indicating to the caller that we should try again. -func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool { +func (c *Controller) nodeSyncService(svc *v1.Service) bool { const retSuccess = false const retNeedRetry = true if svc == nil || !wantsLoadBalancer(svc) { return retSuccess } + newNodes, err := listWithPredicates(c.nodeLister) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) + nodeSyncErrorCount.Inc() + return retNeedRetry + } newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...) - oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...) - + oldNodes := filterWithPredicates(c.getLastSyncedNodes(svc), getNodePredicatesForService(svc)...) + // Store last synced nodes without actually determining if we successfully + // synced them or not. Failed node syncs are passed off to retries in the + // service queue, so no need to wait. If we don't store it now, we risk + // re-syncing all LBs twice, one from another sync in the node sync and + // from the service sync + c.storeLastSyncedNodes(svc, newNodes) if nodesSufficientlyEqual(oldNodes, newNodes) { return retSuccess } - klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil { runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err)) @@ -794,20 +811,12 @@ func nodesSufficientlyEqual(oldNodes, newNodes []*v1.Node) bool { func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) - // Include all nodes and let nodeSyncService filter and figure out if - // the update is relevant for the service in question. - nodes, err := listWithPredicates(c.nodeLister) - if err != nil { - runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) - return serviceKeys(services) - } - // lock for servicesToRetry servicesToRetry = sets.NewString() lock := sync.Mutex{} doWork := func(piece int) { - if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry { + if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry { return } lock.Lock() @@ -816,7 +825,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 servicesToRetry.Insert(key) } workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) - c.lastSyncedNodes = nodes klog.V(4).Infof("Finished updateLoadBalancerHosts") return servicesToRetry } diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index bcc902f4930..157a6676746 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -177,7 +177,7 @@ func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controll nodeListerSynced: nodeInformer.Informer().HasSynced, serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), - lastSyncedNodes: []*v1.Node{}, + lastSyncedNodes: make(map[string][]*v1.Node), } informerFactory.Start(stopCh) @@ -609,7 +609,10 @@ func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller.lastSyncedNodes = tc.initialState + for _, svc := range services { + key, _ := cache.MetaNamespaceKeyFunc(svc) + controller.lastSyncedNodes[key] = tc.initialState + } for _, state := range tc.stateChanges { setupState := func() { @@ -782,7 +785,10 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller.lastSyncedNodes = tc.initialState + for _, svc := range services { + key, _ := cache.MetaNamespaceKeyFunc(svc) + controller.lastSyncedNodes[key] = tc.initialState + } for _, state := range tc.stateChanges { setupState := func() { @@ -989,7 +995,10 @@ func TestNodesNotEqual(t *testing.T) { defer cancel() controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...) - controller.lastSyncedNodes = tc.lastSyncNodes + for _, svc := range services { + key, _ := cache.MetaNamespaceKeyFunc(svc) + controller.lastSyncedNodes[key] = tc.lastSyncNodes + } controller.updateLoadBalancerHosts(ctx, services, 5) compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls) @@ -1420,9 +1429,8 @@ func TestSlowNodeSync(t *testing.T) { {Service: service1, Hosts: []*v1.Node{node1, node2}}, // Second update call for impacted service from controller.syncService {Service: service2, Hosts: []*v1.Node{node1, node2, node3}}, - // Third update call for second service from controller.syncNodes. Here - // is the problem: this update call removes the previously added node3. - {Service: service2, Hosts: []*v1.Node{node1, node2}}, + // Third update call for second service from controller.syncNodes. + {Service: service2, Hosts: []*v1.Node{node1, node2, node3}}, } wg := sync.WaitGroup{} From 9ae1fc366b86e21481f3e68a87209fa07245c75f Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Wed, 18 Oct 2023 18:22:28 +0200 Subject: [PATCH 3/4] Store nodes before calling EnsureLoadBalancer I am having difficulties convincing myself if this is better or worse. I didn't implement this originally because I didn't want to store nodes that we weren't sure we've configured. However: if EnsureLoadBalancer fails we should retry the call from the service controller. Doing it like this might save us one update call from the node controller side for calls which have already started executing from the service controller's side...is this really that expensive at this point though? Is it really that dangerous to not do either, given that we retry failed calls? Ahhhhh!!! Opinions, please! Help, please! --- .../controllers/service/controller.go | 2 +- .../controllers/service/controller_test.go | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 27cd0f051df..f731e0c7a86 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -443,13 +443,13 @@ func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service if len(nodes) == 0 { c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") } + c.storeLastSyncedNodes(service, nodes) // - Not all cloud providers support all protocols and the next step is expected to return // an error for unsupported protocols status, err := c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes) if err != nil { return nil, err } - c.storeLastSyncedNodes(service, nodes) return status, nil } diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 157a6676746..3e05012c24b 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -1413,13 +1413,14 @@ func TestSlowNodeSync(t *testing.T) { updateCallCh <- update } - // Three update calls are expected. This is because this test calls - // controller.syncNodes once with two existing services, so we will have an - // update call for each service, and controller.syncService once. The end - // result is therefore three update calls. Each update call takes - // cloudProvider.RequestDelay to process. The test asserts that the order of - // the Hosts defined by the update calls is respected, but doesn't - // necessarily assert the order of the Service. This is because the + // Two update calls are expected. This is because this test calls + // controller.syncNodes once with two existing services, but with one + // controller.syncService while that is happening. The end result is + // therefore two update calls - since the second controller.syncNodes won't + // trigger an update call because the syncService already did. Each update + // call takes cloudProvider.RequestDelay to process. The test asserts that + // the order of the Hosts defined by the update calls is respected, but + // doesn't necessarily assert the order of the Service. This is because the // controller implementation doesn't use a deterministic order when syncing // services. The test therefor works out which service is impacted by the // slow node sync (which will be whatever service is not synced first) and @@ -1429,8 +1430,6 @@ func TestSlowNodeSync(t *testing.T) { {Service: service1, Hosts: []*v1.Node{node1, node2}}, // Second update call for impacted service from controller.syncService {Service: service2, Hosts: []*v1.Node{node1, node2, node3}}, - // Third update call for second service from controller.syncNodes. - {Service: service2, Hosts: []*v1.Node{node1, node2, node3}}, } wg := sync.WaitGroup{} From f9ab24bf485fc2da857630360def7d4221305fc8 Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Mon, 13 Nov 2023 17:03:51 +0100 Subject: [PATCH 4/4] Refine test case --- .../controllers/service/controller_test.go | 73 +++++++------------ .../src/k8s.io/cloud-provider/fake/fake.go | 56 +++++++------- 2 files changed, 57 insertions(+), 72 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 3e05012c24b..95167142f9b 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -1386,13 +1386,9 @@ func TestNeedsCleanup(t *testing.T) { // make sure that the slow node sync never removes the Node from LB set because it // has stale data. func TestSlowNodeSync(t *testing.T) { - stopCh, updateCallCh := make(chan struct{}), make(chan fakecloud.UpdateBalancerCall) + stopCh, syncServiceDone, syncService := make(chan struct{}), make(chan string), make(chan string) defer close(stopCh) - defer close(updateCallCh) - - duration := time.Millisecond - - syncService := make(chan string) + defer close(syncService) node1 := makeNode(tweakName("node1")) node2 := makeNode(tweakName("node2")) @@ -1405,14 +1401,16 @@ func TestSlowNodeSync(t *testing.T) { serviceKeys := sets.New(sKey1, sKey2) controller, cloudProvider, kubeClient := newController(stopCh, node1, node2, service1, service2) - cloudProvider.RequestDelay = 4 * duration cloudProvider.UpdateCallCb = func(update fakecloud.UpdateBalancerCall) { - updateCallCh <- update + key, _ := cache.MetaNamespaceKeyFunc(update.Service) + impactedService := serviceKeys.Difference(sets.New(key)).UnsortedList()[0] + syncService <- impactedService + <-syncServiceDone + } cloudProvider.EnsureCallCb = func(update fakecloud.UpdateBalancerCall) { - updateCallCh <- update + syncServiceDone <- update.Service.Name } - // Two update calls are expected. This is because this test calls // controller.syncNodes once with two existing services, but with one // controller.syncService while that is happening. The end result is @@ -1428,6 +1426,8 @@ func TestSlowNodeSync(t *testing.T) { expectedUpdateCalls := []fakecloud.UpdateBalancerCall{ // First update call for first service from controller.syncNodes {Service: service1, Hosts: []*v1.Node{node1, node2}}, + } + expectedEnsureCalls := []fakecloud.UpdateBalancerCall{ // Second update call for impacted service from controller.syncService {Service: service2, Hosts: []*v1.Node{node1, node2, node3}}, } @@ -1439,51 +1439,34 @@ func TestSlowNodeSync(t *testing.T) { controller.syncNodes(context.TODO(), 1) }() - wg.Add(1) - go func() { - defer wg.Done() - updateCallIdx := 0 - impactedService := "" - for update := range updateCallCh { - // Validate that the call hosts are what we expect - if !compareHostSets(t, expectedUpdateCalls[updateCallIdx].Hosts, update.Hosts) { - t.Errorf("unexpected updated hosts for update: %v, expected: %v, got: %v", updateCallIdx, expectedUpdateCalls[updateCallIdx].Hosts, update.Hosts) - return - } - key, _ := cache.MetaNamespaceKeyFunc(update.Service) - // For call 0: determine impacted service - if updateCallIdx == 0 { - impactedService = serviceKeys.Difference(sets.New(key)).UnsortedList()[0] - syncService <- impactedService - } - // For calls > 0: validate the impacted service - if updateCallIdx > 0 { - if key != impactedService { - t.Error("unexpected impacted service") - return - } - } - if updateCallIdx == len(expectedUpdateCalls)-1 { - return - } - updateCallIdx++ - } - }() - key := <-syncService if _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil { t.Fatalf("error creating node3, err: %v", err) } - // Give it some time to update the informer cache, needs to be lower than - // cloudProvider.RequestDelay - time.Sleep(duration) // Sync the service if err := controller.syncService(context.TODO(), key); err != nil { - t.Errorf("unexpected service sync error, err: %v", err) + t.Fatalf("unexpected service sync error, err: %v", err) } wg.Wait() + + if len(expectedUpdateCalls) != len(cloudProvider.UpdateCalls) { + t.Fatalf("unexpected amount of update calls, expected: %v, got: %v", len(expectedUpdateCalls), len(cloudProvider.UpdateCalls)) + } + for idx, update := range cloudProvider.UpdateCalls { + if !compareHostSets(t, expectedUpdateCalls[idx].Hosts, update.Hosts) { + t.Fatalf("unexpected updated hosts for update: %v, expected: %v, got: %v", idx, expectedUpdateCalls[idx].Hosts, update.Hosts) + } + } + if len(expectedEnsureCalls) != len(cloudProvider.EnsureCalls) { + t.Fatalf("unexpected amount of ensure calls, expected: %v, got: %v", len(expectedEnsureCalls), len(cloudProvider.EnsureCalls)) + } + for idx, ensure := range cloudProvider.EnsureCalls { + if !compareHostSets(t, expectedEnsureCalls[idx].Hosts, ensure.Hosts) { + t.Fatalf("unexpected updated hosts for ensure: %v, expected: %v, got: %v", idx, expectedEnsureCalls[idx].Hosts, ensure.Hosts) + } + } } func TestNeedsUpdate(t *testing.T) { diff --git a/staging/src/k8s.io/cloud-provider/fake/fake.go b/staging/src/k8s.io/cloud-provider/fake/fake.go index 13acda485c5..14a2c2f9b09 100644 --- a/staging/src/k8s.io/cloud-provider/fake/fake.go +++ b/staging/src/k8s.io/cloud-provider/fake/fake.go @@ -73,27 +73,29 @@ type Cloud struct { ErrShutdownByProviderID error MetadataErr error - Calls []string - Addresses []v1.NodeAddress - addressesMux sync.Mutex - ExtID map[types.NodeName]string - ExtIDErr map[types.NodeName]error - InstanceTypes map[types.NodeName]string - Machines []types.NodeName - NodeResources *v1.NodeResources - ClusterList []string - MasterName string - ExternalIP net.IP - Balancers map[string]Balancer - UpdateCalls []UpdateBalancerCall - EnsureCalls []UpdateBalancerCall - EnsureCallCb func(UpdateBalancerCall) - UpdateCallCb func(UpdateBalancerCall) - RouteMap map[string]*Route - Lock sync.Mutex - Provider string - ProviderID map[types.NodeName]string - addCallLock sync.Mutex + Calls []string + Addresses []v1.NodeAddress + addressesMux sync.Mutex + ExtID map[types.NodeName]string + ExtIDErr map[types.NodeName]error + InstanceTypes map[types.NodeName]string + Machines []types.NodeName + NodeResources *v1.NodeResources + ClusterList []string + MasterName string + ExternalIP net.IP + Balancers map[string]Balancer + updateCallLock sync.Mutex + UpdateCalls []UpdateBalancerCall + ensureCallLock sync.Mutex + EnsureCalls []UpdateBalancerCall + EnsureCallCb func(UpdateBalancerCall) + UpdateCallCb func(UpdateBalancerCall) + RouteMap map[string]*Route + Lock sync.Mutex + Provider string + ProviderID map[types.NodeName]string + addCallLock sync.Mutex cloudprovider.Zone VolumeLabelMap map[string]map[string]string @@ -203,8 +205,8 @@ func (f *Cloud) GetLoadBalancerName(ctx context.Context, clusterName string, ser // EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer. // It adds an entry "create" into the internal method call record. func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { - f.markEnsureCall(service, nodes) f.addCall("create") + f.markEnsureCall(service, nodes) if f.Balancers == nil { f.Balancers = make(map[string]Balancer) } @@ -227,8 +229,8 @@ func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, serv } func (f *Cloud) markUpdateCall(service *v1.Service, nodes []*v1.Node) { - f.Lock.Lock() - defer f.Lock.Unlock() + f.updateCallLock.Lock() + defer f.updateCallLock.Unlock() update := UpdateBalancerCall{service, nodes} f.UpdateCalls = append(f.UpdateCalls, update) if f.UpdateCallCb != nil { @@ -237,8 +239,8 @@ func (f *Cloud) markUpdateCall(service *v1.Service, nodes []*v1.Node) { } func (f *Cloud) markEnsureCall(service *v1.Service, nodes []*v1.Node) { - f.Lock.Lock() - defer f.Lock.Unlock() + f.ensureCallLock.Lock() + defer f.ensureCallLock.Unlock() update := UpdateBalancerCall{service, nodes} f.EnsureCalls = append(f.EnsureCalls, update) if f.EnsureCallCb != nil { @@ -249,8 +251,8 @@ func (f *Cloud) markEnsureCall(service *v1.Service, nodes []*v1.Node) { // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // It adds an entry "update" into the internal method call record. func (f *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { - f.markUpdateCall(service, nodes) f.addCall("update") + f.markUpdateCall(service, nodes) return f.Err }