From 3c88de52c843480437102f2d313a0237f619aa3b Mon Sep 17 00:00:00 2001 From: Christopher Broglie Date: Tue, 10 Jan 2023 20:54:39 -0800 Subject: [PATCH] controller/nodelifecycle: Make monitorNodeHealth process nodes concurrently Marking the pods not ready on a node requires looping over them and updating each pod's status one at a time. This is performed serially, and can take a while if we're processing each node serially as well. Since the time is spent waiting on io, there's an opportunity to go faster by processing multiple nodes concurrently. This change modifies the loop to process nodes in parallel, using the same number of workers as doNodeProcessingPassWorker. This change also introduces histogram metrics to better observe monitorNodeHealth. --- pkg/controller/nodelifecycle/metrics.go | 24 ++++ .../node_lifecycle_controller.go | 38 +++++- .../node_lifecycle_controller_test.go | 122 ++++++++++++++++++ 3 files changed, 179 insertions(+), 5 deletions(-) diff --git a/pkg/controller/nodelifecycle/metrics.go b/pkg/controller/nodelifecycle/metrics.go index ace0ca32226..fa0870d96f4 100644 --- a/pkg/controller/nodelifecycle/metrics.go +++ b/pkg/controller/nodelifecycle/metrics.go @@ -30,6 +30,9 @@ const ( zoneNoUnhealthyNodesKey = "unhealthy_nodes_in_zone" evictionsNumberKey = "evictions_number" evictionsTotalKey = "evictions_total" + + updateNodeHealthKey = "update_node_health_duration_seconds" + updateAllNodesHealthKey = "update_all_nodes_health_duration_seconds" ) var ( @@ -79,6 +82,25 @@ var ( }, []string{"zone"}, ) + + updateNodeHealthDuration = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: nodeControllerSubsystem, + Name: updateNodeHealthKey, + Help: "Duration in seconds for NodeController to update the health of a single node.", + Buckets: metrics.ExponentialBuckets(0.001, 4, 8), // 1ms -> ~15s + StabilityLevel: metrics.ALPHA, + }, + ) + updateAllNodesHealthDuration = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: nodeControllerSubsystem, + Name: updateAllNodesHealthKey, + Help: "Duration in seconds for NodeController to update the health of all nodes.", + Buckets: metrics.ExponentialBuckets(0.01, 4, 8), // 10ms -> ~3m + StabilityLevel: metrics.ALPHA, + }, + ) ) var registerMetrics sync.Once @@ -91,5 +113,7 @@ func Register() { legacyregistry.MustRegister(unhealthyNodes) legacyregistry.MustRegister(evictionsNumber) legacyregistry.MustRegister(evictionsTotal) + legacyregistry.MustRegister(updateNodeHealthDuration) + legacyregistry.MustRegister(updateAllNodesHealthDuration) }) } diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 176947a7469..59b74f646d5 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -332,6 +332,10 @@ type Controller struct { // value takes longer for user to see up-to-date node health. nodeMonitorGracePeriod time.Duration + // Number of workers Controller uses to process node monitor health updates. + // Defaults to scheduler.UpdateWorkerSize. + nodeUpdateWorkerSize int + podEvictionTimeout time.Duration evictionLimiterQPS float32 secondaryEvictionLimiterQPS float32 @@ -383,6 +387,7 @@ func NewNodeLifecycleController( nodeMonitorPeriod: nodeMonitorPeriod, nodeStartupGracePeriod: nodeStartupGracePeriod, nodeMonitorGracePeriod: nodeMonitorGracePeriod, + nodeUpdateWorkerSize: scheduler.UpdateWorkerSize, zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue), zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue), nodesToRetry: sync.Map{}, @@ -794,6 +799,11 @@ func (nc *Controller) doEvictionPass(ctx context.Context) { // if not, post "NodeReady==ConditionUnknown". // This function will taint nodes who are not ready or not reachable for a long period of time. func (nc *Controller) monitorNodeHealth(ctx context.Context) error { + start := nc.now() + defer func() { + updateAllNodesHealthDuration.Observe(time.Since(start.Time).Seconds()) + }() + // We are listing nodes from local cache as we can tolerate some small delays // comparing to state from etcd and there is eventual consistency anyway. nodes, err := nc.nodeLister.List(labels.Everything()) @@ -824,13 +834,21 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error { delete(nc.knownNodeSet, deleted[i].Name) } + var zoneToNodeConditionsLock sync.Mutex zoneToNodeConditions := map[string][]*v1.NodeCondition{} - for i := range nodes { + updateNodeFunc := func(piece int) { + start := nc.now() + defer func() { + updateNodeHealthDuration.Observe(time.Since(start.Time).Seconds()) + }() + var gracePeriod time.Duration var observedReadyCondition v1.NodeCondition var currentReadyCondition *v1.NodeCondition - node := nodes[i].DeepCopy() + node := nodes[piece].DeepCopy() + if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) { + var err error gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(ctx, node) if err == nil { return true, nil @@ -845,12 +863,14 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error { }); err != nil { klog.Errorf("Update health of Node '%v' from Controller error: %v. "+ "Skipping - no pods will be evicted.", node.Name, err) - continue + return } // Some nodes may be excluded from disruption checking if !isNodeExcludedFromDisruptionChecks(node) { + zoneToNodeConditionsLock.Lock() zoneToNodeConditions[nodetopology.GetZoneKey(node)] = append(zoneToNodeConditions[nodetopology.GetZoneKey(node)], currentReadyCondition) + zoneToNodeConditionsLock.Unlock() } if currentReadyCondition != nil { @@ -863,7 +883,7 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error { // in the next iteration. nc.nodesToRetry.Store(node.Name, struct{}{}) } - continue + return } if nc.runTaintManager { nc.processTaintBaseEviction(ctx, node, &observedReadyCondition) @@ -883,12 +903,20 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error { if err = controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, node.Name); err != nil { utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err)) nc.nodesToRetry.Store(node.Name, struct{}{}) - continue + return } } } nc.nodesToRetry.Delete(node.Name) } + + // Marking the pods not ready on a node requires looping over them and + // updating each pod's status one at a time. This is performed serially, and + // can take a while if we're processing each node serially as well. So we + // process them with bounded concurrency instead, since most of the time is + // spent waiting on io. + workqueue.ParallelizeUntil(ctx, nc.nodeUpdateWorkerSize, len(nodes), updateNodeFunc) + nc.handleDisruption(ctx, zoneToNodeConditions, nodes) return nil diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index 0d87930049a..673f04acb5b 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -2451,6 +2451,128 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) { } } +// TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize tests the happy path of +// TestMonitorNodeHealthMarkPodsNotReady with a large number of nodes/pods and +// varying numbers of workers. +func TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize(t *testing.T) { + const numNodes = 50 + const podsPerNode = 100 + makeNodes := func() []*v1.Node { + nodes := make([]*v1.Node, numNodes) + // Node created long time ago, with status updated by kubelet exceeds grace period. + // Expect pods status updated and Unknown node status posted from node controller + for i := 0; i < numNodes; i++ { + nodes[i] = &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node%d", i), + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + // Node status hasn't been updated for 1hr. + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"), + }, + }, + } + } + return nodes + } + makePods := func() []v1.Pod { + pods := make([]v1.Pod, numNodes*podsPerNode) + for i := 0; i < numNodes*podsPerNode; i++ { + pods[i] = *testutil.NewPod(fmt.Sprintf("pod%d", i), fmt.Sprintf("node%d", i%numNodes)) + } + return pods + } + + table := []struct { + workers int + }{ + {workers: 0}, // will default to scheduler.UpdateWorkerSize + {workers: 1}, + } + + for i, item := range table { + fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) + + fakeNodeHandler := &testutil.FakeNodeHandler{ + Existing: makeNodes(), + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: makePods()}), + } + + nodeController, _ := newNodeLifecycleControllerFromClient( + context.TODO(), + fakeNodeHandler, + 5*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) + nodeController.now = func() metav1.Time { return fakeNow } + nodeController.recorder = testutil.NewFakeRecorder() + nodeController.getPodsAssignedToNode = fakeGetPodsAssignedToNode(fakeNodeHandler.Clientset) + if item.workers != 0 { + nodeController.nodeUpdateWorkerSize = item.workers + } + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { + t.Errorf("unexpected error: %v", err) + } + if err := nodeController.monitorNodeHealth(context.TODO()); err != nil { + t.Errorf("Case[%d] unexpected error: %v", i, err) + } + + nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(1 * time.Minute)} } + for i := range fakeNodeHandler.Existing { + fakeNodeHandler.Existing[i].Status = v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + // Node status hasn't been updated for 1hr. + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"), + }, + } + } + + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { + t.Errorf("unexpected error: %v", err) + } + if err := nodeController.monitorNodeHealth(context.TODO()); err != nil { + t.Errorf("Case[%d] unexpected error: %v", i, err) + } + + podStatusUpdates := 0 + for _, action := range fakeNodeHandler.Actions() { + if action.GetVerb() == "update" && action.GetResource().Resource == "pods" && action.GetSubresource() == "status" { + podStatusUpdates++ + } + } + const expectedPodStatusUpdates = numNodes * podsPerNode + if podStatusUpdates != expectedPodStatusUpdates { + t.Errorf("Case[%d] expect pod status updated to be %v, but got %v", i, expectedPodStatusUpdates, podStatusUpdates) + } + } +} + func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) { type nodeIteration struct { timeToPass time.Duration