diff --git a/pkg/controller/nodelifecycle/BUILD b/pkg/controller/nodelifecycle/BUILD index e5fb2f89fe9..80d131594a6 100644 --- a/pkg/controller/nodelifecycle/BUILD +++ b/pkg/controller/nodelifecycle/BUILD @@ -70,6 +70,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 66ab5ece80d..7d340e2b511 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -236,6 +236,8 @@ type Controller struct { // workers that are responsible for tainting nodes. zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue + nodesToRetry sync.Map + zoneStates map[string]ZoneState daemonSetStore appsv1listers.DaemonSetLister @@ -347,6 +349,7 @@ func NewNodeLifecycleController( nodeMonitorGracePeriod: nodeMonitorGracePeriod, zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue), zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue), + nodesToRetry: sync.Map{}, zoneStates: make(map[string]ZoneState), podEvictionTimeout: podEvictionTimeout, evictionLimiterQPS: evictionLimiterQPS, @@ -462,6 +465,10 @@ func NewNodeLifecycleController( nc.nodeUpdateQueue.Add(newNode.Name) return nil }), + DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error { + nc.nodesToRetry.Delete(node.Name) + return nil + }), }) nc.leaseLister = leaseInformer.Lister() @@ -780,25 +787,38 @@ func (nc *Controller) monitorNodeHealth() error { nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod) } - // Report node event. - if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue { + _, needsRetry := nc.nodesToRetry.Load(node.Name) + switch { + case currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue: + // Report node event only once when status changed. nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady") - pods, err := listPodsFromNode(nc.kubeClient, node.Name) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Unable to list pods from node %v: %v", node.Name, err)) + fallthrough + case needsRetry && observedReadyCondition.Status != v1.ConditionTrue: + if err := nc.markPodsNotReady(node.Name); err != nil { + utilruntime.HandleError(err) + nc.nodesToRetry.Store(node.Name, struct{}{}) continue } - if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, node.Name); err != nil { - utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) - } } } + nc.nodesToRetry.Delete(node.Name) } nc.handleDisruption(zoneToNodeConditions, nodes) return nil } +func (nc *Controller) markPodsNotReady(nodeName string) error { + pods, err := listPodsFromNode(nc.kubeClient, nodeName) + if err != nil { + return fmt.Errorf("unable to list pods from node %v: %v", nodeName, err) + } + if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, nodeName); err != nil { + return fmt.Errorf("unable to mark all pods NotReady on node %v: %v", nodeName, err) + } + return nil +} + func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition) { decisionTimestamp := nc.now() // Check eviction timeout against decisionTimestamp diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index 0022e57ae4d..dfca0518a6d 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" @@ -2389,6 +2390,149 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) { } } +func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) { + type nodeIteration struct { + timeToPass time.Duration + newNodes []*v1.Node + } + timeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) + timePlusTwoMinutes := metav1.Date(2015, 1, 1, 12, 0, 2, 0, time.UTC) + makeNodes := func(status v1.ConditionStatus, lastHeartbeatTime, lastTransitionTime metav1.Time) []*v1.Node { + return []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: timeNow, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: status, + LastHeartbeatTime: lastHeartbeatTime, + LastTransitionTime: lastTransitionTime, + }, + }, + }, + }, + } + } + table := []struct { + desc string + fakeNodeHandler *testutil.FakeNodeHandler + updateReactor func(action testcore.Action) (bool, runtime.Object, error) + nodeIterations []nodeIteration + expectedPodStatusUpdates int + }{ + // Node created long time ago, with status updated by kubelet exceeds grace period. + // First monitorNodeHealth check will update pod status to NotReady. + // Second monitorNodeHealth check will do no updates (no retry). + { + desc: "successful pod status update, no retry required", + fakeNodeHandler: &testutil.FakeNodeHandler{ + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), + }, + nodeIterations: []nodeIteration{ + { + timeToPass: 0, + newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow), + }, + { + timeToPass: 1 * time.Minute, + newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow), + }, + { + timeToPass: 1 * time.Minute, + newNodes: makeNodes(v1.ConditionFalse, timePlusTwoMinutes, timePlusTwoMinutes), + }, + }, + expectedPodStatusUpdates: 1, + }, + // Node created long time ago, with status updated by kubelet exceeds grace period. + // First monitorNodeHealth check will fail to update pod status to NotReady. + // Second monitorNodeHealth check will update pod status to NotReady (retry). + { + desc: "unsuccessful pod status update, retry required", + fakeNodeHandler: &testutil.FakeNodeHandler{ + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), + }, + updateReactor: func() func(action testcore.Action) (bool, runtime.Object, error) { + i := 0 + return func(action testcore.Action) (bool, runtime.Object, error) { + if action.GetVerb() == "update" && action.GetResource().Resource == "pods" && action.GetSubresource() == "status" { + i++ + switch i { + case 1: + return true, nil, fmt.Errorf("fake error") + default: + return true, testutil.NewPod("pod0", "node0"), nil + } + } + + return true, nil, fmt.Errorf("unsupported action") + } + }(), + nodeIterations: []nodeIteration{ + { + timeToPass: 0, + newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow), + }, + { + timeToPass: 1 * time.Minute, + newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow), + }, + { + timeToPass: 1 * time.Minute, + newNodes: makeNodes(v1.ConditionFalse, timePlusTwoMinutes, timePlusTwoMinutes), + }, + }, + expectedPodStatusUpdates: 2, // One failed and one retry. + }, + } + + for _, item := range table { + t.Run(item.desc, func(t *testing.T) { + nodeController, _ := newNodeLifecycleControllerFromClient( + item.fakeNodeHandler, + 5*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) + if item.updateReactor != nil { + item.fakeNodeHandler.Clientset.PrependReactor("update", "pods", item.updateReactor) + } + nodeController.now = func() metav1.Time { return timeNow } + nodeController.recorder = testutil.NewFakeRecorder() + nodeController.getPodsAssignedToNode = fakeGetPodsAssignedToNode(item.fakeNodeHandler.Clientset) + for _, itertion := range item.nodeIterations { + nodeController.now = func() metav1.Time { return metav1.Time{Time: timeNow.Add(itertion.timeToPass)} } + item.fakeNodeHandler.Existing = itertion.newNodes + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { + t.Errorf("unexpected error: %v", err) + } + if err := nodeController.monitorNodeHealth(); err != nil { + t.Errorf("unexpected error: %v", err) + } + } + + podStatusUpdates := 0 + for _, action := range item.fakeNodeHandler.Actions() { + if action.GetVerb() == "update" && action.GetResource().Resource == "pods" && action.GetSubresource() == "status" { + podStatusUpdates++ + } + } + if podStatusUpdates != item.expectedPodStatusUpdates { + t.Errorf("expect pod status updated to happen %d times, but got %d", item.expectedPodStatusUpdates, podStatusUpdates) + } + }) + } +} + // TestApplyNoExecuteTaints, ensures we just have a NoExecute taint applied to node. // NodeController is just responsible for enqueuing the node to tainting queue from which taint manager picks up // and evicts the pods on the node.