From bdd3e1d8c8937ee47672e03b48b606db55767963 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 4 Feb 2023 15:53:28 +0000 Subject: [PATCH 1/2] fake cloud provider don't lock emulating delay Change-Id: Icf0cf5d67a4c1d53556f93bbda5f286faaa456b2 --- staging/src/k8s.io/cloud-provider/fake/fake.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/fake/fake.go b/staging/src/k8s.io/cloud-provider/fake/fake.go index 6074591e8b8..a0ec6399729 100644 --- a/staging/src/k8s.io/cloud-provider/fake/fake.go +++ b/staging/src/k8s.io/cloud-provider/fake/fake.go @@ -106,11 +106,10 @@ type Route struct { } func (f *Cloud) addCall(desc string) { - f.addCallLock.Lock() - defer f.addCallLock.Unlock() - time.Sleep(f.RequestDelay) + f.addCallLock.Lock() + defer f.addCallLock.Unlock() f.Calls = append(f.Calls, desc) } From 80d21e59290f958a6d20bd53cd25846c1a4e1cd8 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Thu, 2 Feb 2023 22:59:14 +0000 Subject: [PATCH 2/2] parallelize node-controller The node-controllers has 2 reconcilation methods: - workqueue with workers, is using during bootstrap and process nodes until the cloud provider taint is removed - periodic loop, that runs every certain period polling the cloud provider to get the instances metadata to update the node addresses, since nodes can Update its addresses anytime during its lifecycle. These follows up on the parallelization of the node-controller, that previously increased the number of workers that handle the bootstrap. This parallelize the periodic loop based on the input value of the number of workers, and also uses the informer lister instead of doing a new List to the apiserver. Added an unit test that can used to evaluate the performance improvement with different workers values: === RUN TestUpdateNodeStatus/single_thread node_controller_test.go:2537: 1 workers: processed 100 nodes int 1.055595262s === RUN TestUpdateNodeStatus/5_workers node_controller_test.go:2537: 5 workers: processed 100 nodes int 216.990972ms === RUN TestUpdateNodeStatus/10_workers node_controller_test.go:2537: 10 workers: processed 100 nodes int 112.422435ms === RUN TestUpdateNodeStatus/30_workers node_controller_test.go:2537: 30 workers: processed 100 nodes int 46.243204ms Change-Id: I38870993431d38fc81a2dc6a713321cfa2e40d85 --- .../controllers/node/node_controller.go | 34 ++++-- .../controllers/node/node_controller_test.go | 108 ++++++++++++++++++ 2 files changed, 132 insertions(+), 10 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go index 7dfbbd88cb8..878ca1194a6 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -172,15 +173,16 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet return } - // The periodic loop for updateNodeStatus communicates with the APIServer with a worst case complexity - // of O(num_nodes) per cycle. These functions are justified here because these events fire - // very infrequently. DO NOT MODIFY this to perform frequent operations. + // The periodic loop for updateNodeStatus polls the Cloud Provider periodically + // to reconcile the nodes addresses and labels. go wait.Until(func() { if err := cnc.UpdateNodeStatus(context.TODO()); err != nil { klog.Errorf("failed to update node status: %v", err) } }, cnc.nodeStatusUpdateFrequency, stopCh) + // These workers initialize the nodes added to the cluster, + // those that are Tainted with TaintExternalCloudProvider. for i := int32(0); i < cnc.workerCount; i++ { go wait.Until(cnc.runWorker, time.Second, stopCh) } @@ -251,28 +253,40 @@ func (cnc *CloudNodeController) syncHandler(key string) error { // UpdateNodeStatus updates the node status, such as node addresses func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error { - nodes, err := cnc.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"}) + start := time.Now() + nodes, err := cnc.nodesLister.List(labels.Everything()) if err != nil { klog.Errorf("Error monitoring node status: %v", err) return err } + defer func() { + klog.V(2).Infof("Update %d nodes status took %v.", len(nodes), time.Since(start)) + }() - for i := range nodes.Items { - instanceMetadata, err := cnc.getInstanceNodeAddresses(ctx, &nodes.Items[i]) + updateNodeFunc := func(piece int) { + node := nodes[piece].DeepCopy() + // Do not process nodes that are still tainted, those will be processed by syncNode() + cloudTaint := getCloudTaint(node.Spec.Taints) + if cloudTaint != nil { + klog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name) + return + } + + instanceMetadata, err := cnc.getInstanceNodeAddresses(ctx, node) if err != nil { klog.Errorf("Error getting instance metadata for node addresses: %v", err) - continue + return } - cnc.updateNodeAddress(ctx, &nodes.Items[i], instanceMetadata) - } - for _, node := range nodes.Items { + cnc.updateNodeAddress(ctx, node, instanceMetadata) + err = cnc.reconcileNodeLabels(node.Name) if err != nil { klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err) } } + workqueue.ParallelizeUntil(ctx, int(cnc.workerCount), len(nodes), updateNodeFunc) return nil } diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go index bfb9952c0d8..723756242aa 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go @@ -19,6 +19,7 @@ package cloud import ( "context" "errors" + "fmt" "reflect" "testing" "time" @@ -29,8 +30,10 @@ import ( "k8s.io/klog/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" + clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" cloudprovider "k8s.io/cloud-provider" cloudproviderapi "k8s.io/cloud-provider/api" @@ -2434,3 +2437,108 @@ func TestGetProviderID(t *testing.T) { }) } } + +func TestUpdateNodeStatus(t *testing.T) { + // emaulate the latency of the cloud API calls + const cloudLatency = 10 * time.Millisecond + + generateNodes := func(n int) []runtime.Object { + result := []runtime.Object{} + for i := 0; i < n; i++ { + result = append(result, &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node0%d", i), + }, + }) + } + return result + } + + tests := []struct { + name string + workers int32 + nodes int + }{ + { + name: "single thread", + workers: 1, + nodes: 100, + }, + { + name: "5 workers", + workers: 5, + nodes: 100, + }, + { + name: "10 workers", + workers: 10, + nodes: 100, + }, + { + name: "30 workers", + workers: 30, + nodes: 100, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeCloud := &fakecloud.Cloud{ + EnableInstancesV2: false, + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeHostName, + Address: "node0.cloud.internal", + }, + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + }, + RequestDelay: cloudLatency, + Err: nil, + } + + clientset := fake.NewSimpleClientset() + clientset.PrependReactor("patch", "nodes", func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, &v1.Node{}, nil + }) + + factory := informers.NewSharedInformerFactory(clientset, 0) + eventBroadcaster := record.NewBroadcaster() + nodeInformer := factory.Core().V1().Nodes() + nodeIndexer := nodeInformer.Informer().GetIndexer() + cloudNodeController := &CloudNodeController{ + kubeClient: clientset, + nodeInformer: nodeInformer, + nodesLister: nodeInformer.Lister(), + nodesSynced: func() bool { return true }, + cloud: fakeCloud, + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), + nodeStatusUpdateFrequency: 1 * time.Second, + workerCount: test.workers, + } + + for _, n := range generateNodes(test.nodes) { + err := nodeIndexer.Add(n) + if err != nil { + t.Fatal(err) + } + } + + w := eventBroadcaster.StartLogging(klog.Infof) + defer w.Stop() + + start := time.Now() + cloudNodeController.UpdateNodeStatus(context.TODO()) + t.Logf("%d workers: processed %d nodes int %v ", test.workers, test.nodes, time.Since(start)) + if len(fakeCloud.Calls) != test.nodes { + t.Errorf("expected %d cloud-provider calls, got %d", test.nodes, len(fakeCloud.Calls)) + } + + }) + } +}