Merge pull request #114296 from cbroglie/concurrent-monitor-node-health

controller/nodelifecycle: Make monitorNodeHealth process nodes concurrently
This commit is contained in:
Kubernetes Prow Robot 2023-01-12 12:42:54 -08:00 committed by GitHub
commit 1b8692ce46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 179 additions and 5 deletions

View File

@ -30,6 +30,9 @@ const (
zoneNoUnhealthyNodesKey = "unhealthy_nodes_in_zone"
evictionsNumberKey = "evictions_number"
evictionsTotalKey = "evictions_total"
updateNodeHealthKey = "update_node_health_duration_seconds"
updateAllNodesHealthKey = "update_all_nodes_health_duration_seconds"
)
var (
@ -79,6 +82,25 @@ var (
},
[]string{"zone"},
)
updateNodeHealthDuration = metrics.NewHistogram(
&metrics.HistogramOpts{
Subsystem: nodeControllerSubsystem,
Name: updateNodeHealthKey,
Help: "Duration in seconds for NodeController to update the health of a single node.",
Buckets: metrics.ExponentialBuckets(0.001, 4, 8), // 1ms -> ~15s
StabilityLevel: metrics.ALPHA,
},
)
updateAllNodesHealthDuration = metrics.NewHistogram(
&metrics.HistogramOpts{
Subsystem: nodeControllerSubsystem,
Name: updateAllNodesHealthKey,
Help: "Duration in seconds for NodeController to update the health of all nodes.",
Buckets: metrics.ExponentialBuckets(0.01, 4, 8), // 10ms -> ~3m
StabilityLevel: metrics.ALPHA,
},
)
)
var registerMetrics sync.Once
@ -91,5 +113,7 @@ func Register() {
legacyregistry.MustRegister(unhealthyNodes)
legacyregistry.MustRegister(evictionsNumber)
legacyregistry.MustRegister(evictionsTotal)
legacyregistry.MustRegister(updateNodeHealthDuration)
legacyregistry.MustRegister(updateAllNodesHealthDuration)
})
}

View File

@ -332,6 +332,10 @@ type Controller struct {
// value takes longer for user to see up-to-date node health.
nodeMonitorGracePeriod time.Duration
// Number of workers Controller uses to process node monitor health updates.
// Defaults to scheduler.UpdateWorkerSize.
nodeUpdateWorkerSize int
podEvictionTimeout time.Duration
evictionLimiterQPS float32
secondaryEvictionLimiterQPS float32
@ -383,6 +387,7 @@ func NewNodeLifecycleController(
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeUpdateWorkerSize: scheduler.UpdateWorkerSize,
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue),
nodesToRetry: sync.Map{},
@ -794,6 +799,11 @@ func (nc *Controller) doEvictionPass(ctx context.Context) {
// if not, post "NodeReady==ConditionUnknown".
// This function will taint nodes who are not ready or not reachable for a long period of time.
func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
start := nc.now()
defer func() {
updateAllNodesHealthDuration.Observe(time.Since(start.Time).Seconds())
}()
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
nodes, err := nc.nodeLister.List(labels.Everything())
@ -824,13 +834,21 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
delete(nc.knownNodeSet, deleted[i].Name)
}
var zoneToNodeConditionsLock sync.Mutex
zoneToNodeConditions := map[string][]*v1.NodeCondition{}
for i := range nodes {
updateNodeFunc := func(piece int) {
start := nc.now()
defer func() {
updateNodeHealthDuration.Observe(time.Since(start.Time).Seconds())
}()
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
var currentReadyCondition *v1.NodeCondition
node := nodes[i].DeepCopy()
node := nodes[piece].DeepCopy()
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
var err error
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(ctx, node)
if err == nil {
return true, nil
@ -845,12 +863,14 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
}); err != nil {
klog.Errorf("Update health of Node '%v' from Controller error: %v. "+
"Skipping - no pods will be evicted.", node.Name, err)
continue
return
}
// Some nodes may be excluded from disruption checking
if !isNodeExcludedFromDisruptionChecks(node) {
zoneToNodeConditionsLock.Lock()
zoneToNodeConditions[nodetopology.GetZoneKey(node)] = append(zoneToNodeConditions[nodetopology.GetZoneKey(node)], currentReadyCondition)
zoneToNodeConditionsLock.Unlock()
}
if currentReadyCondition != nil {
@ -863,7 +883,7 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
// in the next iteration.
nc.nodesToRetry.Store(node.Name, struct{}{})
}
continue
return
}
if nc.runTaintManager {
nc.processTaintBaseEviction(ctx, node, &observedReadyCondition)
@ -883,12 +903,20 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
if err = controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, node.Name); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err))
nc.nodesToRetry.Store(node.Name, struct{}{})
continue
return
}
}
}
nc.nodesToRetry.Delete(node.Name)
}
// Marking the pods not ready on a node requires looping over them and
// updating each pod's status one at a time. This is performed serially, and
// can take a while if we're processing each node serially as well. So we
// process them with bounded concurrency instead, since most of the time is
// spent waiting on io.
workqueue.ParallelizeUntil(ctx, nc.nodeUpdateWorkerSize, len(nodes), updateNodeFunc)
nc.handleDisruption(ctx, zoneToNodeConditions, nodes)
return nil

View File

@ -2451,6 +2451,128 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) {
}
}
// TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize tests the happy path of
// TestMonitorNodeHealthMarkPodsNotReady with a large number of nodes/pods and
// varying numbers of workers.
func TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize(t *testing.T) {
const numNodes = 50
const podsPerNode = 100
makeNodes := func() []*v1.Node {
nodes := make([]*v1.Node, numNodes)
// Node created long time ago, with status updated by kubelet exceeds grace period.
// Expect pods status updated and Unknown node status posted from node controller
for i := 0; i < numNodes; i++ {
nodes[i] = &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("node%d", i),
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
// Node status hasn't been updated for 1hr.
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
},
},
}
}
return nodes
}
makePods := func() []v1.Pod {
pods := make([]v1.Pod, numNodes*podsPerNode)
for i := 0; i < numNodes*podsPerNode; i++ {
pods[i] = *testutil.NewPod(fmt.Sprintf("pod%d", i), fmt.Sprintf("node%d", i%numNodes))
}
return pods
}
table := []struct {
workers int
}{
{workers: 0}, // will default to scheduler.UpdateWorkerSize
{workers: 1},
}
for i, item := range table {
fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
fakeNodeHandler := &testutil.FakeNodeHandler{
Existing: makeNodes(),
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: makePods()}),
}
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
nodeController.getPodsAssignedToNode = fakeGetPodsAssignedToNode(fakeNodeHandler.Clientset)
if item.workers != 0 {
nodeController.nodeUpdateWorkerSize = item.workers
}
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
}
nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(1 * time.Minute)} }
for i := range fakeNodeHandler.Existing {
fakeNodeHandler.Existing[i].Status = v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
// Node status hasn't been updated for 1hr.
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
},
}
}
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
}
podStatusUpdates := 0
for _, action := range fakeNodeHandler.Actions() {
if action.GetVerb() == "update" && action.GetResource().Resource == "pods" && action.GetSubresource() == "status" {
podStatusUpdates++
}
}
const expectedPodStatusUpdates = numNodes * podsPerNode
if podStatusUpdates != expectedPodStatusUpdates {
t.Errorf("Case[%d] expect pod status updated to be %v, but got %v", i, expectedPodStatusUpdates, podStatusUpdates)
}
}
}
func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) {
type nodeIteration struct {
timeToPass time.Duration