From a8673fa5b4af3ca79bf14b236c50ef9a6b64f386 Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Mon, 9 Oct 2023 17:29:22 +0200 Subject: [PATCH] KCCM: add test validating slow node sync issue --- .../controllers/service/controller_test.go | 211 +++++++++++++++--- .../src/k8s.io/cloud-provider/fake/fake.go | 28 ++- 2 files changed, 203 insertions(+), 36 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 2fa2a7fb35a..bcc902f4930 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -23,6 +23,7 @@ import ( "reflect" "sort" "strings" + "sync" "testing" "time" @@ -146,13 +147,16 @@ func defaultExternalService() *v1.Service { return newService("external-balancer", v1.ServiceTypeLoadBalancer) } -func alwaysReady() bool { return true } - -func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { +// newController creates a new service controller. Callers have the option to +// specify `stopChan` for test cases which might require running the +// node/service informers and reacting to resource events. Callers can also +// specify `objects` which represent the initial state of objects, used to +// populate the client set / informer cache at start-up. +func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) { cloud := &fakecloud.Cloud{} cloud.Region = region - kubeClient := fake.NewSimpleClientset() + kubeClient := fake.NewSimpleClientset(objects...) informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) serviceInformer := informerFactory.Core().V1().Services() nodeInformer := informerFactory.Core().V1().Nodes() @@ -162,26 +166,36 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) controller := &Controller{ - cloud: cloud, - kubeClient: kubeClient, - clusterName: "test-cluster", - cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, - eventBroadcaster: broadcaster, - eventRecorder: recorder, - nodeLister: newFakeNodeLister(nil), - nodeListerSynced: nodeInformer.Informer().HasSynced, - serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), - lastSyncedNodes: []*v1.Node{}, + cloud: cloud, + kubeClient: kubeClient, + clusterName: "test-cluster", + eventBroadcaster: broadcaster, + eventRecorder: recorder, + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), + lastSyncedNodes: []*v1.Node{}, } + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + + serviceMap := make(map[string]*cachedService) + services, _ := serviceInformer.Lister().List(labels.Everything()) + for _, service := range services { + serviceMap[service.Name] = &cachedService{ + state: service, + } + } + + controller.cache = &serviceCache{serviceMap: serviceMap} + balancer, _ := cloud.LoadBalancer() controller.balancer = balancer - controller.serviceLister = serviceInformer.Lister() - - controller.nodeListerSynced = alwaysReady - controller.serviceListerSynced = alwaysReady controller.eventRecorder = record.NewFakeRecorder(100) cloud.Calls = nil // ignore any cloud calls made in init() @@ -265,7 +279,7 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller, cloud, client := newController() + controller, cloud, client := newController(nil) cloud.Exists = tc.lbExists key := fmt.Sprintf("%s/%s", tc.service.Namespace, tc.service.Name) if _, err := client.CoreV1().Services(tc.service.Namespace).Create(ctx, tc.service, metav1.CreateOptions{}); err != nil { @@ -439,7 +453,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { t.Run(item.desc, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) controller.nodeLister = newFakeNodeLister(nil, nodes...) if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 { t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) @@ -590,7 +604,8 @@ func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, }} { t.Run(tc.desc, func(t *testing.T) { - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -762,7 +777,8 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, }} { t.Run(tc.desc, func(t *testing.T) { - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -806,7 +822,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName())) } - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) for _, tc := range []struct { desc string nodes []*v1.Node @@ -901,8 +917,28 @@ func compareUpdateCalls(t *testing.T, left, right []fakecloud.UpdateBalancerCall } } +// compareHostSets compares if the nodes in left are in right, despite the order. +func compareHostSets(t *testing.T, left, right []*v1.Node) bool { + if len(left) != len(right) { + return false + } + for _, lHost := range left { + found := false + for _, rHost := range right { + if reflect.DeepEqual(lHost, rHost) { + found = true + break + } + } + if !found { + return false + } + } + return true +} + func TestNodesNotEqual(t *testing.T) { - controller, cloud, _ := newController() + controller, cloud, _ := newController(nil) services := []*v1.Service{ newService("s0", v1.ServiceTypeLoadBalancer), @@ -952,7 +988,9 @@ func TestNodesNotEqual(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...) + controller.lastSyncedNodes = tc.lastSyncNodes + controller.updateLoadBalancerHosts(ctx, services, 5) compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls) cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{} @@ -961,7 +999,7 @@ func TestNodesNotEqual(t *testing.T) { } func TestProcessServiceCreateOrUpdate(t *testing.T) { - controller, _, client := newController() + controller, _, client := newController(nil) //A pair of old and new loadbalancer IP address oldLBIP := "192.168.1.1" @@ -1076,7 +1114,7 @@ func TestProcessServiceCreateOrUpdateK8sError(t *testing.T) { svc := newService(svcName, v1.ServiceTypeLoadBalancer) // Preset finalizer so k8s error only happens when patching status. svc.Finalizers = []string{servicehelper.LoadBalancerCleanupFinalizer} - controller, _, client := newController() + controller, _, client := newController(nil) client.PrependReactor("patch", "services", func(action core.Action) (bool, runtime.Object, error) { return true, nil, tc.k8sErr }) @@ -1120,7 +1158,7 @@ func TestSyncService(t *testing.T) { testName: "if an invalid service name is synced", key: "invalid/key/string", updateFn: func() { - controller, _, _ = newController() + controller, _, _ = newController(nil) }, expectedFn: func(e error) error { //TODO: should find a way to test for dependent package errors in such a way that it won't break @@ -1152,7 +1190,7 @@ func TestSyncService(t *testing.T) { key: "external-balancer", updateFn: func() { testSvc := defaultExternalService() - controller, _, _ = newController() + controller, _, _ = newController(nil) controller.enqueueService(testSvc) svc := controller.cache.getOrCreate("external-balancer") svc.state = testSvc @@ -1258,7 +1296,7 @@ func TestProcessServiceDeletion(t *testing.T) { defer cancel() //Create a new controller. - controller, cloud, _ = newController() + controller, cloud, _ = newController(nil) tc.updateFn(controller) obtainedErr := controller.processServiceDeletion(ctx, svcKey) if err := tc.expectedFn(obtainedErr); err != nil { @@ -1333,8 +1371,115 @@ func TestNeedsCleanup(t *testing.T) { } -func TestNeedsUpdate(t *testing.T) { +// This tests a service update while a slow node sync is happening. If we have multiple +// services to process from a node sync: each service will experience a sync delta. +// If a new Node is added and a service is synced while this happens: we want to +// make sure that the slow node sync never removes the Node from LB set because it +// has stale data. +func TestSlowNodeSync(t *testing.T) { + stopCh, updateCallCh := make(chan struct{}), make(chan fakecloud.UpdateBalancerCall) + defer close(stopCh) + defer close(updateCallCh) + duration := time.Millisecond + + syncService := make(chan string) + + node1 := makeNode(tweakName("node1")) + node2 := makeNode(tweakName("node2")) + node3 := makeNode(tweakName("node3")) + service1 := newService("service1", v1.ServiceTypeLoadBalancer) + service2 := newService("service2", v1.ServiceTypeLoadBalancer) + + sKey1, _ := cache.MetaNamespaceKeyFunc(service1) + sKey2, _ := cache.MetaNamespaceKeyFunc(service2) + serviceKeys := sets.New(sKey1, sKey2) + + controller, cloudProvider, kubeClient := newController(stopCh, node1, node2, service1, service2) + cloudProvider.RequestDelay = 4 * duration + cloudProvider.UpdateCallCb = func(update fakecloud.UpdateBalancerCall) { + updateCallCh <- update + } + cloudProvider.EnsureCallCb = func(update fakecloud.UpdateBalancerCall) { + updateCallCh <- update + } + + // Three update calls are expected. This is because this test calls + // controller.syncNodes once with two existing services, so we will have an + // update call for each service, and controller.syncService once. The end + // result is therefore three update calls. Each update call takes + // cloudProvider.RequestDelay to process. The test asserts that the order of + // the Hosts defined by the update calls is respected, but doesn't + // necessarily assert the order of the Service. This is because the + // controller implementation doesn't use a deterministic order when syncing + // services. The test therefor works out which service is impacted by the + // slow node sync (which will be whatever service is not synced first) and + // then validates that the Hosts for each update call is respected. + expectedUpdateCalls := []fakecloud.UpdateBalancerCall{ + // First update call for first service from controller.syncNodes + {Service: service1, Hosts: []*v1.Node{node1, node2}}, + // Second update call for impacted service from controller.syncService + {Service: service2, Hosts: []*v1.Node{node1, node2, node3}}, + // Third update call for second service from controller.syncNodes. Here + // is the problem: this update call removes the previously added node3. + {Service: service2, Hosts: []*v1.Node{node1, node2}}, + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + controller.syncNodes(context.TODO(), 1) + }() + + wg.Add(1) + go func() { + defer wg.Done() + updateCallIdx := 0 + impactedService := "" + for update := range updateCallCh { + // Validate that the call hosts are what we expect + if !compareHostSets(t, expectedUpdateCalls[updateCallIdx].Hosts, update.Hosts) { + t.Errorf("unexpected updated hosts for update: %v, expected: %v, got: %v", updateCallIdx, expectedUpdateCalls[updateCallIdx].Hosts, update.Hosts) + return + } + key, _ := cache.MetaNamespaceKeyFunc(update.Service) + // For call 0: determine impacted service + if updateCallIdx == 0 { + impactedService = serviceKeys.Difference(sets.New(key)).UnsortedList()[0] + syncService <- impactedService + } + // For calls > 0: validate the impacted service + if updateCallIdx > 0 { + if key != impactedService { + t.Error("unexpected impacted service") + return + } + } + if updateCallIdx == len(expectedUpdateCalls)-1 { + return + } + updateCallIdx++ + } + }() + + key := <-syncService + if _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil { + t.Fatalf("error creating node3, err: %v", err) + } + + // Give it some time to update the informer cache, needs to be lower than + // cloudProvider.RequestDelay + time.Sleep(duration) + // Sync the service + if err := controller.syncService(context.TODO(), key); err != nil { + t.Errorf("unexpected service sync error, err: %v", err) + } + + wg.Wait() +} + +func TestNeedsUpdate(t *testing.T) { testCases := []struct { testName string //Name of the test case updateFn func() (*v1.Service, *v1.Service) //Function to update the service object @@ -1494,7 +1639,7 @@ func TestNeedsUpdate(t *testing.T) { expectedNeedsUpdate: true, }} - controller, _, _ := newController() + controller, _, _ := newController(nil) for _, tc := range testCases { oldSvc, newSvc := tc.updateFn() obtainedResult := controller.needsUpdate(oldSvc, newSvc) @@ -2441,7 +2586,7 @@ func TestServiceQueueDelay(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - controller, cloud, client := newController() + controller, cloud, client := newController(nil) queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} controller.serviceQueue = queue cloud.Err = tc.lbCloudErr diff --git a/staging/src/k8s.io/cloud-provider/fake/fake.go b/staging/src/k8s.io/cloud-provider/fake/fake.go index 7fcda129569..13acda485c5 100644 --- a/staging/src/k8s.io/cloud-provider/fake/fake.go +++ b/staging/src/k8s.io/cloud-provider/fake/fake.go @@ -86,6 +86,9 @@ type Cloud struct { ExternalIP net.IP Balancers map[string]Balancer UpdateCalls []UpdateBalancerCall + EnsureCalls []UpdateBalancerCall + EnsureCallCb func(UpdateBalancerCall) + UpdateCallCb func(UpdateBalancerCall) RouteMap map[string]*Route Lock sync.Mutex Provider string @@ -200,6 +203,7 @@ func (f *Cloud) GetLoadBalancerName(ctx context.Context, clusterName string, ser // EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer. // It adds an entry "create" into the internal method call record. func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { + f.markEnsureCall(service, nodes) f.addCall("create") if f.Balancers == nil { f.Balancers = make(map[string]Balancer) @@ -222,13 +226,31 @@ func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, serv return status, f.Err } +func (f *Cloud) markUpdateCall(service *v1.Service, nodes []*v1.Node) { + f.Lock.Lock() + defer f.Lock.Unlock() + update := UpdateBalancerCall{service, nodes} + f.UpdateCalls = append(f.UpdateCalls, update) + if f.UpdateCallCb != nil { + f.UpdateCallCb(update) + } +} + +func (f *Cloud) markEnsureCall(service *v1.Service, nodes []*v1.Node) { + f.Lock.Lock() + defer f.Lock.Unlock() + update := UpdateBalancerCall{service, nodes} + f.EnsureCalls = append(f.EnsureCalls, update) + if f.EnsureCallCb != nil { + f.EnsureCallCb(update) + } +} + // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // It adds an entry "update" into the internal method call record. func (f *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { + f.markUpdateCall(service, nodes) f.addCall("update") - f.Lock.Lock() - defer f.Lock.Unlock() - f.UpdateCalls = append(f.UpdateCalls, UpdateBalancerCall{service, nodes}) return f.Err }