parallelize node-controller

The node-controllers has 2 reconcilation methods:
- workqueue with workers, is using during bootstrap and process
nodes until the cloud provider taint is removed
- periodic loop, that runs every certain period polling the cloud
provider to get the instances metadata to update the node addresses,
since nodes can Update its addresses anytime during its lifecycle.

These follows up on the parallelization of the node-controller, that
previously increased the number of workers that handle the bootstrap.

This parallelize the periodic loop based on the input value of the
number of workers, and also uses the informer lister instead of doing
a new List to the apiserver.

Added an unit test that can used to evaluate the performance improvement
with different workers values:

=== RUN   TestUpdateNodeStatus/single_thread
    node_controller_test.go:2537: 1 workers: processed 100 nodes int 1.055595262s
=== RUN   TestUpdateNodeStatus/5_workers
    node_controller_test.go:2537: 5 workers: processed 100 nodes int 216.990972ms
=== RUN   TestUpdateNodeStatus/10_workers
    node_controller_test.go:2537: 10 workers: processed 100 nodes int 112.422435ms
=== RUN   TestUpdateNodeStatus/30_workers
    node_controller_test.go:2537: 30 workers: processed 100 nodes int 46.243204ms

Change-Id: I38870993431d38fc81a2dc6a713321cfa2e40d85
This commit is contained in:
Antonio Ojea 2023-02-02 22:59:14 +00:00
parent bdd3e1d8c8
commit 80d21e5929
2 changed files with 132 additions and 10 deletions

View File

@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@ -172,15 +173,16 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet
return
}
// The periodic loop for updateNodeStatus communicates with the APIServer with a worst case complexity
// of O(num_nodes) per cycle. These functions are justified here because these events fire
// very infrequently. DO NOT MODIFY this to perform frequent operations.
// The periodic loop for updateNodeStatus polls the Cloud Provider periodically
// to reconcile the nodes addresses and labels.
go wait.Until(func() {
if err := cnc.UpdateNodeStatus(context.TODO()); err != nil {
klog.Errorf("failed to update node status: %v", err)
}
}, cnc.nodeStatusUpdateFrequency, stopCh)
// These workers initialize the nodes added to the cluster,
// those that are Tainted with TaintExternalCloudProvider.
for i := int32(0); i < cnc.workerCount; i++ {
go wait.Until(cnc.runWorker, time.Second, stopCh)
}
@ -251,28 +253,40 @@ func (cnc *CloudNodeController) syncHandler(key string) error {
// UpdateNodeStatus updates the node status, such as node addresses
func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error {
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"})
start := time.Now()
nodes, err := cnc.nodesLister.List(labels.Everything())
if err != nil {
klog.Errorf("Error monitoring node status: %v", err)
return err
}
defer func() {
klog.V(2).Infof("Update %d nodes status took %v.", len(nodes), time.Since(start))
}()
for i := range nodes.Items {
instanceMetadata, err := cnc.getInstanceNodeAddresses(ctx, &nodes.Items[i])
updateNodeFunc := func(piece int) {
node := nodes[piece].DeepCopy()
// Do not process nodes that are still tainted, those will be processed by syncNode()
cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint != nil {
klog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name)
return
}
instanceMetadata, err := cnc.getInstanceNodeAddresses(ctx, node)
if err != nil {
klog.Errorf("Error getting instance metadata for node addresses: %v", err)
continue
return
}
cnc.updateNodeAddress(ctx, &nodes.Items[i], instanceMetadata)
}
for _, node := range nodes.Items {
cnc.updateNodeAddress(ctx, node, instanceMetadata)
err = cnc.reconcileNodeLabels(node.Name)
if err != nil {
klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err)
}
}
workqueue.ParallelizeUntil(ctx, int(cnc.workerCount), len(nodes), updateNodeFunc)
return nil
}

View File

@ -19,6 +19,7 @@ package cloud
import (
"context"
"errors"
"fmt"
"reflect"
"testing"
"time"
@ -29,8 +30,10 @@ import (
"k8s.io/klog/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
cloudproviderapi "k8s.io/cloud-provider/api"
@ -2434,3 +2437,108 @@ func TestGetProviderID(t *testing.T) {
})
}
}
func TestUpdateNodeStatus(t *testing.T) {
// emaulate the latency of the cloud API calls
const cloudLatency = 10 * time.Millisecond
generateNodes := func(n int) []runtime.Object {
result := []runtime.Object{}
for i := 0; i < n; i++ {
result = append(result, &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("node0%d", i),
},
})
}
return result
}
tests := []struct {
name string
workers int32
nodes int
}{
{
name: "single thread",
workers: 1,
nodes: 100,
},
{
name: "5 workers",
workers: 5,
nodes: 100,
},
{
name: "10 workers",
workers: 10,
nodes: 100,
},
{
name: "30 workers",
workers: 30,
nodes: 100,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeCloud := &fakecloud.Cloud{
EnableInstancesV2: false,
Addresses: []v1.NodeAddress{
{
Type: v1.NodeHostName,
Address: "node0.cloud.internal",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
},
RequestDelay: cloudLatency,
Err: nil,
}
clientset := fake.NewSimpleClientset()
clientset.PrependReactor("patch", "nodes", func(action clienttesting.Action) (bool, runtime.Object, error) {
return true, &v1.Node{}, nil
})
factory := informers.NewSharedInformerFactory(clientset, 0)
eventBroadcaster := record.NewBroadcaster()
nodeInformer := factory.Core().V1().Nodes()
nodeIndexer := nodeInformer.Informer().GetIndexer()
cloudNodeController := &CloudNodeController{
kubeClient: clientset,
nodeInformer: nodeInformer,
nodesLister: nodeInformer.Lister(),
nodesSynced: func() bool { return true },
cloud: fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second,
workerCount: test.workers,
}
for _, n := range generateNodes(test.nodes) {
err := nodeIndexer.Add(n)
if err != nil {
t.Fatal(err)
}
}
w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop()
start := time.Now()
cloudNodeController.UpdateNodeStatus(context.TODO())
t.Logf("%d workers: processed %d nodes int %v ", test.workers, test.nodes, time.Since(start))
if len(fakeCloud.Calls) != test.nodes {
t.Errorf("expected %d cloud-provider calls, got %d", test.nodes, len(fakeCloud.Calls))
}
})
}
}