diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go index 7dfbbd88cb8..878ca1194a6 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -172,15 +173,16 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet return } - // The periodic loop for updateNodeStatus communicates with the APIServer with a worst case complexity - // of O(num_nodes) per cycle. These functions are justified here because these events fire - // very infrequently. DO NOT MODIFY this to perform frequent operations. + // The periodic loop for updateNodeStatus polls the Cloud Provider periodically + // to reconcile the nodes addresses and labels. go wait.Until(func() { if err := cnc.UpdateNodeStatus(context.TODO()); err != nil { klog.Errorf("failed to update node status: %v", err) } }, cnc.nodeStatusUpdateFrequency, stopCh) + // These workers initialize the nodes added to the cluster, + // those that are Tainted with TaintExternalCloudProvider. for i := int32(0); i < cnc.workerCount; i++ { go wait.Until(cnc.runWorker, time.Second, stopCh) } @@ -251,28 +253,40 @@ func (cnc *CloudNodeController) syncHandler(key string) error { // UpdateNodeStatus updates the node status, such as node addresses func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error { - nodes, err := cnc.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"}) + start := time.Now() + nodes, err := cnc.nodesLister.List(labels.Everything()) if err != nil { klog.Errorf("Error monitoring node status: %v", err) return err } + defer func() { + klog.V(2).Infof("Update %d nodes status took %v.", len(nodes), time.Since(start)) + }() - for i := range nodes.Items { - instanceMetadata, err := cnc.getInstanceNodeAddresses(ctx, &nodes.Items[i]) + updateNodeFunc := func(piece int) { + node := nodes[piece].DeepCopy() + // Do not process nodes that are still tainted, those will be processed by syncNode() + cloudTaint := getCloudTaint(node.Spec.Taints) + if cloudTaint != nil { + klog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name) + return + } + + instanceMetadata, err := cnc.getInstanceNodeAddresses(ctx, node) if err != nil { klog.Errorf("Error getting instance metadata for node addresses: %v", err) - continue + return } - cnc.updateNodeAddress(ctx, &nodes.Items[i], instanceMetadata) - } - for _, node := range nodes.Items { + cnc.updateNodeAddress(ctx, node, instanceMetadata) + err = cnc.reconcileNodeLabels(node.Name) if err != nil { klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err) } } + workqueue.ParallelizeUntil(ctx, int(cnc.workerCount), len(nodes), updateNodeFunc) return nil } diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go index bfb9952c0d8..723756242aa 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go @@ -19,6 +19,7 @@ package cloud import ( "context" "errors" + "fmt" "reflect" "testing" "time" @@ -29,8 +30,10 @@ import ( "k8s.io/klog/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" + clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" cloudprovider "k8s.io/cloud-provider" cloudproviderapi "k8s.io/cloud-provider/api" @@ -2434,3 +2437,108 @@ func TestGetProviderID(t *testing.T) { }) } } + +func TestUpdateNodeStatus(t *testing.T) { + // emaulate the latency of the cloud API calls + const cloudLatency = 10 * time.Millisecond + + generateNodes := func(n int) []runtime.Object { + result := []runtime.Object{} + for i := 0; i < n; i++ { + result = append(result, &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node0%d", i), + }, + }) + } + return result + } + + tests := []struct { + name string + workers int32 + nodes int + }{ + { + name: "single thread", + workers: 1, + nodes: 100, + }, + { + name: "5 workers", + workers: 5, + nodes: 100, + }, + { + name: "10 workers", + workers: 10, + nodes: 100, + }, + { + name: "30 workers", + workers: 30, + nodes: 100, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeCloud := &fakecloud.Cloud{ + EnableInstancesV2: false, + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeHostName, + Address: "node0.cloud.internal", + }, + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + }, + RequestDelay: cloudLatency, + Err: nil, + } + + clientset := fake.NewSimpleClientset() + clientset.PrependReactor("patch", "nodes", func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, &v1.Node{}, nil + }) + + factory := informers.NewSharedInformerFactory(clientset, 0) + eventBroadcaster := record.NewBroadcaster() + nodeInformer := factory.Core().V1().Nodes() + nodeIndexer := nodeInformer.Informer().GetIndexer() + cloudNodeController := &CloudNodeController{ + kubeClient: clientset, + nodeInformer: nodeInformer, + nodesLister: nodeInformer.Lister(), + nodesSynced: func() bool { return true }, + cloud: fakeCloud, + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), + nodeStatusUpdateFrequency: 1 * time.Second, + workerCount: test.workers, + } + + for _, n := range generateNodes(test.nodes) { + err := nodeIndexer.Add(n) + if err != nil { + t.Fatal(err) + } + } + + w := eventBroadcaster.StartLogging(klog.Infof) + defer w.Stop() + + start := time.Now() + cloudNodeController.UpdateNodeStatus(context.TODO()) + t.Logf("%d workers: processed %d nodes int %v ", test.workers, test.nodes, time.Since(start)) + if len(fakeCloud.Calls) != test.nodes { + t.Errorf("expected %d cloud-provider calls, got %d", test.nodes, len(fakeCloud.Calls)) + } + + }) + } +} diff --git a/staging/src/k8s.io/cloud-provider/fake/fake.go b/staging/src/k8s.io/cloud-provider/fake/fake.go index 6074591e8b8..a0ec6399729 100644 --- a/staging/src/k8s.io/cloud-provider/fake/fake.go +++ b/staging/src/k8s.io/cloud-provider/fake/fake.go @@ -106,11 +106,10 @@ type Route struct { } func (f *Cloud) addCall(desc string) { - f.addCallLock.Lock() - defer f.addCallLock.Unlock() - time.Sleep(f.RequestDelay) + f.addCallLock.Lock() + defer f.addCallLock.Unlock() f.Calls = append(f.Calls, desc) }