diff --git a/pkg/controller/nodelifecycle/metrics.go b/pkg/controller/nodelifecycle/metrics.go index ace0ca32226..fa0870d96f4 100644 --- a/pkg/controller/nodelifecycle/metrics.go +++ b/pkg/controller/nodelifecycle/metrics.go @@ -30,6 +30,9 @@ const ( zoneNoUnhealthyNodesKey = "unhealthy_nodes_in_zone" evictionsNumberKey = "evictions_number" evictionsTotalKey = "evictions_total" + + updateNodeHealthKey = "update_node_health_duration_seconds" + updateAllNodesHealthKey = "update_all_nodes_health_duration_seconds" ) var ( @@ -79,6 +82,25 @@ var ( }, []string{"zone"}, ) + + updateNodeHealthDuration = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: nodeControllerSubsystem, + Name: updateNodeHealthKey, + Help: "Duration in seconds for NodeController to update the health of a single node.", + Buckets: metrics.ExponentialBuckets(0.001, 4, 8), // 1ms -> ~15s + StabilityLevel: metrics.ALPHA, + }, + ) + updateAllNodesHealthDuration = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: nodeControllerSubsystem, + Name: updateAllNodesHealthKey, + Help: "Duration in seconds for NodeController to update the health of all nodes.", + Buckets: metrics.ExponentialBuckets(0.01, 4, 8), // 10ms -> ~3m + StabilityLevel: metrics.ALPHA, + }, + ) ) var registerMetrics sync.Once @@ -91,5 +113,7 @@ func Register() { legacyregistry.MustRegister(unhealthyNodes) legacyregistry.MustRegister(evictionsNumber) legacyregistry.MustRegister(evictionsTotal) + legacyregistry.MustRegister(updateNodeHealthDuration) + legacyregistry.MustRegister(updateAllNodesHealthDuration) }) } diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 176947a7469..59b74f646d5 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -332,6 +332,10 @@ type Controller struct { // value takes longer for user to see up-to-date node health. nodeMonitorGracePeriod time.Duration + // Number of workers Controller uses to process node monitor health updates. + // Defaults to scheduler.UpdateWorkerSize. + nodeUpdateWorkerSize int + podEvictionTimeout time.Duration evictionLimiterQPS float32 secondaryEvictionLimiterQPS float32 @@ -383,6 +387,7 @@ func NewNodeLifecycleController( nodeMonitorPeriod: nodeMonitorPeriod, nodeStartupGracePeriod: nodeStartupGracePeriod, nodeMonitorGracePeriod: nodeMonitorGracePeriod, + nodeUpdateWorkerSize: scheduler.UpdateWorkerSize, zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue), zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue), nodesToRetry: sync.Map{}, @@ -794,6 +799,11 @@ func (nc *Controller) doEvictionPass(ctx context.Context) { // if not, post "NodeReady==ConditionUnknown". // This function will taint nodes who are not ready or not reachable for a long period of time. func (nc *Controller) monitorNodeHealth(ctx context.Context) error { + start := nc.now() + defer func() { + updateAllNodesHealthDuration.Observe(time.Since(start.Time).Seconds()) + }() + // We are listing nodes from local cache as we can tolerate some small delays // comparing to state from etcd and there is eventual consistency anyway. nodes, err := nc.nodeLister.List(labels.Everything()) @@ -824,13 +834,21 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error { delete(nc.knownNodeSet, deleted[i].Name) } + var zoneToNodeConditionsLock sync.Mutex zoneToNodeConditions := map[string][]*v1.NodeCondition{} - for i := range nodes { + updateNodeFunc := func(piece int) { + start := nc.now() + defer func() { + updateNodeHealthDuration.Observe(time.Since(start.Time).Seconds()) + }() + var gracePeriod time.Duration var observedReadyCondition v1.NodeCondition var currentReadyCondition *v1.NodeCondition - node := nodes[i].DeepCopy() + node := nodes[piece].DeepCopy() + if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) { + var err error gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(ctx, node) if err == nil { return true, nil @@ -845,12 +863,14 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error { }); err != nil { klog.Errorf("Update health of Node '%v' from Controller error: %v. "+ "Skipping - no pods will be evicted.", node.Name, err) - continue + return } // Some nodes may be excluded from disruption checking if !isNodeExcludedFromDisruptionChecks(node) { + zoneToNodeConditionsLock.Lock() zoneToNodeConditions[nodetopology.GetZoneKey(node)] = append(zoneToNodeConditions[nodetopology.GetZoneKey(node)], currentReadyCondition) + zoneToNodeConditionsLock.Unlock() } if currentReadyCondition != nil { @@ -863,7 +883,7 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error { // in the next iteration. nc.nodesToRetry.Store(node.Name, struct{}{}) } - continue + return } if nc.runTaintManager { nc.processTaintBaseEviction(ctx, node, &observedReadyCondition) @@ -883,12 +903,20 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error { if err = controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, node.Name); err != nil { utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err)) nc.nodesToRetry.Store(node.Name, struct{}{}) - continue + return } } } nc.nodesToRetry.Delete(node.Name) } + + // Marking the pods not ready on a node requires looping over them and + // updating each pod's status one at a time. This is performed serially, and + // can take a while if we're processing each node serially as well. So we + // process them with bounded concurrency instead, since most of the time is + // spent waiting on io. + workqueue.ParallelizeUntil(ctx, nc.nodeUpdateWorkerSize, len(nodes), updateNodeFunc) + nc.handleDisruption(ctx, zoneToNodeConditions, nodes) return nil diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index 0d87930049a..673f04acb5b 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -2451,6 +2451,128 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) { } } +// TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize tests the happy path of +// TestMonitorNodeHealthMarkPodsNotReady with a large number of nodes/pods and +// varying numbers of workers. +func TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize(t *testing.T) { + const numNodes = 50 + const podsPerNode = 100 + makeNodes := func() []*v1.Node { + nodes := make([]*v1.Node, numNodes) + // Node created long time ago, with status updated by kubelet exceeds grace period. + // Expect pods status updated and Unknown node status posted from node controller + for i := 0; i < numNodes; i++ { + nodes[i] = &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node%d", i), + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + // Node status hasn't been updated for 1hr. + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"), + }, + }, + } + } + return nodes + } + makePods := func() []v1.Pod { + pods := make([]v1.Pod, numNodes*podsPerNode) + for i := 0; i < numNodes*podsPerNode; i++ { + pods[i] = *testutil.NewPod(fmt.Sprintf("pod%d", i), fmt.Sprintf("node%d", i%numNodes)) + } + return pods + } + + table := []struct { + workers int + }{ + {workers: 0}, // will default to scheduler.UpdateWorkerSize + {workers: 1}, + } + + for i, item := range table { + fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) + + fakeNodeHandler := &testutil.FakeNodeHandler{ + Existing: makeNodes(), + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: makePods()}), + } + + nodeController, _ := newNodeLifecycleControllerFromClient( + context.TODO(), + fakeNodeHandler, + 5*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) + nodeController.now = func() metav1.Time { return fakeNow } + nodeController.recorder = testutil.NewFakeRecorder() + nodeController.getPodsAssignedToNode = fakeGetPodsAssignedToNode(fakeNodeHandler.Clientset) + if item.workers != 0 { + nodeController.nodeUpdateWorkerSize = item.workers + } + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { + t.Errorf("unexpected error: %v", err) + } + if err := nodeController.monitorNodeHealth(context.TODO()); err != nil { + t.Errorf("Case[%d] unexpected error: %v", i, err) + } + + nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(1 * time.Minute)} } + for i := range fakeNodeHandler.Existing { + fakeNodeHandler.Existing[i].Status = v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + // Node status hasn't been updated for 1hr. + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"), + }, + } + } + + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { + t.Errorf("unexpected error: %v", err) + } + if err := nodeController.monitorNodeHealth(context.TODO()); err != nil { + t.Errorf("Case[%d] unexpected error: %v", i, err) + } + + podStatusUpdates := 0 + for _, action := range fakeNodeHandler.Actions() { + if action.GetVerb() == "update" && action.GetResource().Resource == "pods" && action.GetSubresource() == "status" { + podStatusUpdates++ + } + } + const expectedPodStatusUpdates = numNodes * podsPerNode + if podStatusUpdates != expectedPodStatusUpdates { + t.Errorf("Case[%d] expect pod status updated to be %v, but got %v", i, expectedPodStatusUpdates, podStatusUpdates) + } + } +} + func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) { type nodeIteration struct { timeToPass time.Duration