Merge pull request #115521 from aojea/cloudprovidergcp

Improve performance on the cloud provider node-controller
This commit is contained in:
Kubernetes Prow Robot 2023-02-06 03:53:00 -08:00 committed by GitHub
commit 06914bdaf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 13 deletions

View File

@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@ -172,15 +173,16 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet
return
}
// The periodic loop for updateNodeStatus communicates with the APIServer with a worst case complexity
// of O(num_nodes) per cycle. These functions are justified here because these events fire
// very infrequently. DO NOT MODIFY this to perform frequent operations.
// The periodic loop for updateNodeStatus polls the Cloud Provider periodically
// to reconcile the nodes addresses and labels.
go wait.Until(func() {
if err := cnc.UpdateNodeStatus(context.TODO()); err != nil {
klog.Errorf("failed to update node status: %v", err)
}
}, cnc.nodeStatusUpdateFrequency, stopCh)
// These workers initialize the nodes added to the cluster,
// those that are Tainted with TaintExternalCloudProvider.
for i := int32(0); i < cnc.workerCount; i++ {
go wait.Until(cnc.runWorker, time.Second, stopCh)
}
@ -251,28 +253,40 @@ func (cnc *CloudNodeController) syncHandler(key string) error {
// UpdateNodeStatus updates the node status, such as node addresses
func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error {
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"})
start := time.Now()
nodes, err := cnc.nodesLister.List(labels.Everything())
if err != nil {
klog.Errorf("Error monitoring node status: %v", err)
return err
}
defer func() {
klog.V(2).Infof("Update %d nodes status took %v.", len(nodes), time.Since(start))
}()
for i := range nodes.Items {
instanceMetadata, err := cnc.getInstanceNodeAddresses(ctx, &nodes.Items[i])
updateNodeFunc := func(piece int) {
node := nodes[piece].DeepCopy()
// Do not process nodes that are still tainted, those will be processed by syncNode()
cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint != nil {
klog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name)
return
}
instanceMetadata, err := cnc.getInstanceNodeAddresses(ctx, node)
if err != nil {
klog.Errorf("Error getting instance metadata for node addresses: %v", err)
continue
}
cnc.updateNodeAddress(ctx, &nodes.Items[i], instanceMetadata)
return
}
for _, node := range nodes.Items {
cnc.updateNodeAddress(ctx, node, instanceMetadata)
err = cnc.reconcileNodeLabels(node.Name)
if err != nil {
klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err)
}
}
workqueue.ParallelizeUntil(ctx, int(cnc.workerCount), len(nodes), updateNodeFunc)
return nil
}

View File

@ -19,6 +19,7 @@ package cloud
import (
"context"
"errors"
"fmt"
"reflect"
"testing"
"time"
@ -29,8 +30,10 @@ import (
"k8s.io/klog/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
cloudproviderapi "k8s.io/cloud-provider/api"
@ -2434,3 +2437,108 @@ func TestGetProviderID(t *testing.T) {
})
}
}
func TestUpdateNodeStatus(t *testing.T) {
// emaulate the latency of the cloud API calls
const cloudLatency = 10 * time.Millisecond
generateNodes := func(n int) []runtime.Object {
result := []runtime.Object{}
for i := 0; i < n; i++ {
result = append(result, &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("node0%d", i),
},
})
}
return result
}
tests := []struct {
name string
workers int32
nodes int
}{
{
name: "single thread",
workers: 1,
nodes: 100,
},
{
name: "5 workers",
workers: 5,
nodes: 100,
},
{
name: "10 workers",
workers: 10,
nodes: 100,
},
{
name: "30 workers",
workers: 30,
nodes: 100,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeCloud := &fakecloud.Cloud{
EnableInstancesV2: false,
Addresses: []v1.NodeAddress{
{
Type: v1.NodeHostName,
Address: "node0.cloud.internal",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
},
RequestDelay: cloudLatency,
Err: nil,
}
clientset := fake.NewSimpleClientset()
clientset.PrependReactor("patch", "nodes", func(action clienttesting.Action) (bool, runtime.Object, error) {
return true, &v1.Node{}, nil
})
factory := informers.NewSharedInformerFactory(clientset, 0)
eventBroadcaster := record.NewBroadcaster()
nodeInformer := factory.Core().V1().Nodes()
nodeIndexer := nodeInformer.Informer().GetIndexer()
cloudNodeController := &CloudNodeController{
kubeClient: clientset,
nodeInformer: nodeInformer,
nodesLister: nodeInformer.Lister(),
nodesSynced: func() bool { return true },
cloud: fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second,
workerCount: test.workers,
}
for _, n := range generateNodes(test.nodes) {
err := nodeIndexer.Add(n)
if err != nil {
t.Fatal(err)
}
}
w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop()
start := time.Now()
cloudNodeController.UpdateNodeStatus(context.TODO())
t.Logf("%d workers: processed %d nodes int %v ", test.workers, test.nodes, time.Since(start))
if len(fakeCloud.Calls) != test.nodes {
t.Errorf("expected %d cloud-provider calls, got %d", test.nodes, len(fakeCloud.Calls))
}
})
}
}

View File

@ -106,11 +106,10 @@ type Route struct {
}
func (f *Cloud) addCall(desc string) {
f.addCallLock.Lock()
defer f.addCallLock.Unlock()
time.Sleep(f.RequestDelay)
f.addCallLock.Lock()
defer f.addCallLock.Unlock()
f.Calls = append(f.Calls, desc)
}