diff --git a/pkg/controller/nodelifecycle/BUILD b/pkg/controller/nodelifecycle/BUILD index 80d131594a6..9e2adb603d2 100644 --- a/pkg/controller/nodelifecycle/BUILD +++ b/pkg/controller/nodelifecycle/BUILD @@ -9,7 +9,6 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle", visibility = ["//visibility:public"], deps = [ - "//pkg/apis/core:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/nodelifecycle/scheduler:go_default_library", "//pkg/controller/util/node:go_default_library", @@ -23,7 +22,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 7d340e2b511..6887c38037b 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -34,7 +34,6 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -53,7 +52,6 @@ import ( "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/workqueue" "k8s.io/component-base/metrics/prometheus/ratelimiter" - apicore "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler" nodeutil "k8s.io/kubernetes/pkg/controller/util/node" @@ -131,6 +129,9 @@ const ( // The amount of time the nodecontroller should sleep between retrying node health updates retrySleepTime = 20 * time.Millisecond nodeNameKeyIndex = "spec.nodeName" + // podUpdateWorkerSizes assumes that in most cases pod will be handled by monitorNodeHealth pass. + // Pod update workes will only handle lagging cache pods. 4 workes should be enough. + podUpdateWorkerSize = 4 ) // labelReconcileInfo lists Node labels to reconcile, and how to reconcile them. @@ -208,10 +209,67 @@ func (n *nodeHealthMap) set(name string, data *nodeHealthData) { n.nodeHealths[name] = data } +type podUpdateItem struct { + namespace string + name string +} + +type evictionStatus int + +const ( + unmarked = iota + toBeEvicted + evicted +) + +// nodeEvictionMap stores evictionStatus data for each node. +type nodeEvictionMap struct { + lock sync.Mutex + nodeEvictions map[string]evictionStatus +} + +func newNodeEvictionMap() *nodeEvictionMap { + return &nodeEvictionMap{ + nodeEvictions: make(map[string]evictionStatus), + } +} + +func (n *nodeEvictionMap) registerNode(nodeName string) { + n.lock.Lock() + defer n.lock.Unlock() + n.nodeEvictions[nodeName] = unmarked +} + +func (n *nodeEvictionMap) unregisterNode(nodeName string) { + n.lock.Lock() + defer n.lock.Unlock() + delete(n.nodeEvictions, nodeName) +} + +func (n *nodeEvictionMap) setStatus(nodeName string, status evictionStatus) bool { + n.lock.Lock() + defer n.lock.Unlock() + if _, exists := n.nodeEvictions[nodeName]; !exists { + return false + } + n.nodeEvictions[nodeName] = status + return true +} + +func (n *nodeEvictionMap) getStatus(nodeName string) (evictionStatus, bool) { + n.lock.Lock() + defer n.lock.Unlock() + if _, exists := n.nodeEvictions[nodeName]; !exists { + return unmarked, false + } + return n.nodeEvictions[nodeName], true +} + // Controller is the controller that manages node's life cycle. type Controller struct { taintManager *scheduler.NoExecuteTaintManager + podLister corelisters.PodLister podInformerSynced cache.InformerSynced kubeClient clientset.Interface @@ -227,12 +285,12 @@ type Controller struct { // per Node map storing last observed health together with a local time when it was observed. nodeHealthMap *nodeHealthMap - // Lock to access evictor workers - evictorLock sync.Mutex - + // evictorLock protects zonePodEvictor and zoneNoExecuteTainter. + // TODO(#83954): API calls shouldn't be executed under the lock. + evictorLock sync.Mutex + nodeEvictionMap *nodeEvictionMap // workers that evicts pods from unresponsive nodes. zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue - // workers that are responsible for tainting nodes. zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue @@ -299,6 +357,7 @@ type Controller struct { useTaintBasedEvictions bool nodeUpdateQueue workqueue.Interface + podUpdateQueue workqueue.RateLimitingInterface } // NewNodeLifecycleController returns a new taint controller. @@ -343,6 +402,7 @@ func NewNodeLifecycleController( now: metav1.Now, knownNodeSet: make(map[string]*v1.Node), nodeHealthMap: newNodeHealthMap(), + nodeEvictionMap: newNodeEvictionMap(), recorder: recorder, nodeMonitorPeriod: nodeMonitorPeriod, nodeStartupGracePeriod: nodeStartupGracePeriod, @@ -359,6 +419,7 @@ func NewNodeLifecycleController( runTaintManager: runTaintManager, useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager, nodeUpdateQueue: workqueue.NewNamed("node_lifecycle_controller"), + podUpdateQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_lifecycle_controller_pods"), } if useTaintBasedEvictions { klog.Infof("Controller is using taint based evictions.") @@ -371,6 +432,7 @@ func NewNodeLifecycleController( podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*v1.Pod) + nc.podUpdated(nil, pod) if nc.taintManager != nil { nc.taintManager.PodUpdated(nil, pod) } @@ -378,6 +440,7 @@ func NewNodeLifecycleController( UpdateFunc: func(prev, obj interface{}) { prevPod := prev.(*v1.Pod) newPod := obj.(*v1.Pod) + nc.podUpdated(prevPod, newPod) if nc.taintManager != nil { nc.taintManager.PodUpdated(prevPod, newPod) } @@ -397,6 +460,7 @@ func NewNodeLifecycleController( return } } + nc.podUpdated(pod, nil) if nc.taintManager != nil { nc.taintManager.PodUpdated(pod, nil) } @@ -432,10 +496,10 @@ func NewNodeLifecycleController( } return pods, nil } + nc.podLister = podInformer.Lister() if nc.runTaintManager { - podLister := podInformer.Lister() - podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) } + podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) } nodeLister := nodeInformer.Lister() nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) } nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode) @@ -459,6 +523,7 @@ func NewNodeLifecycleController( nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { nc.nodeUpdateQueue.Add(node.Name) + nc.nodeEvictionMap.registerNode(node.Name) return nil }), UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { @@ -467,6 +532,7 @@ func NewNodeLifecycleController( }), DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error { nc.nodesToRetry.Delete(node.Name) + nc.nodeEvictionMap.unregisterNode(node.Name) return nil }), }) @@ -505,6 +571,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { // Close node update queue to cleanup go routine. defer nc.nodeUpdateQueue.ShutDown() + defer nc.podUpdateQueue.ShutDown() // Start workers to reconcile labels and/or update NoSchedule taint for nodes. for i := 0; i < scheduler.UpdateWorkerSize; i++ { @@ -515,6 +582,10 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh) } + for i := 0; i < podUpdateWorkerSize; i++ { + go wait.Until(nc.doPodProcessingWorker, time.Second, stopCh) + } + if nc.useTaintBasedEvictions { // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. @@ -671,21 +742,26 @@ func (nc *Controller) doEvictionPass() { klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err) } nodeUID, _ := value.UID.(string) - pods, err := listPodsFromNode(nc.kubeClient, value.Value) + pods, err := nc.getPodsAssignedToNode(value.Value) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to list pods from node %q: %v", value.Value, err)) return false, 0 } remaining, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore) if err != nil { + // We are not setting eviction status here. + // New pods will be handled by zonePodEvictor retry + // instead of immediate pod eviction. utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) return false, 0 } + if !nc.nodeEvictionMap.setStatus(value.Value, evicted) { + klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", value.Value) + } if remaining { klog.Infof("Pods awaiting deletion due to Controller eviction") } - //count the evictionsNumber if node != nil { zone := utilnode.GetZoneKey(node) evictionsNumber.WithLabelValues(zone).Inc() @@ -696,20 +772,6 @@ func (nc *Controller) doEvictionPass() { } } -func listPodsFromNode(kubeClient clientset.Interface, nodeName string) ([]*v1.Pod, error) { - selector := fields.OneTermEqualSelector(apicore.PodHostField, nodeName).String() - options := metav1.ListOptions{FieldSelector: selector} - pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(options) - if err != nil { - return nil, err - } - rPods := make([]*v1.Pod, len(pods.Items)) - for i := range pods.Items { - rPods[i] = &pods.Items[i] - } - return rPods, nil -} - // monitorNodeHealth verifies node health are constantly updated by kubelet, and // if not, post "NodeReady==ConditionUnknown". // For nodes who are not ready or not reachable for a long period of time. @@ -775,16 +837,24 @@ func (nc *Controller) monitorNodeHealth() error { zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition) } - nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name) - if nodeHealthData == nil { - klog.Errorf("Skipping %v node processing: health data doesn't exist.", node.Name) - continue - } if currentReadyCondition != nil { + pods, err := nc.getPodsAssignedToNode(node.Name) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to list pods of node %v: %v", node.Name, err)) + if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue { + // If error happened during node status transition (Ready -> NotReady) + // we need to mark node for retry to force MarkPodsNotReady execution + // in the next iteration. + nc.nodesToRetry.Store(node.Name, struct{}{}) + } + continue + } if nc.useTaintBasedEvictions { nc.processTaintBaseEviction(node, &observedReadyCondition) } else { - nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod) + if err := nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod, pods); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to evict all pods from node %v: %v; queuing for retry", node.Name, err)) + } } _, needsRetry := nc.nodesToRetry.Load(node.Name) @@ -794,8 +864,8 @@ func (nc *Controller) monitorNodeHealth() error { nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady") fallthrough case needsRetry && observedReadyCondition.Status != v1.ConditionTrue: - if err := nc.markPodsNotReady(node.Name); err != nil { - utilruntime.HandleError(err) + if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, node.Name); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err)) nc.nodesToRetry.Store(node.Name, struct{}{}) continue } @@ -808,17 +878,6 @@ func (nc *Controller) monitorNodeHealth() error { return nil } -func (nc *Controller) markPodsNotReady(nodeName string) error { - pods, err := listPodsFromNode(nc.kubeClient, nodeName) - if err != nil { - return fmt.Errorf("unable to list pods from node %v: %v", nodeName, err) - } - if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, nodeName); err != nil { - return fmt.Errorf("unable to mark all pods NotReady on node %v: %v", nodeName, err) - } - return nil -} - func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition) { decisionTimestamp := nc.now() // Check eviction timeout against decisionTimestamp @@ -860,18 +919,21 @@ func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondi } } -func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition, gracePeriod time.Duration) { +func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition, gracePeriod time.Duration, pods []*v1.Pod) error { decisionTimestamp := nc.now() nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name) if nodeHealthData == nil { - klog.Errorf("Skipping %v node processing: health data doesn't exist.", node.Name) - return + return fmt.Errorf("health data doesn't exist for node %q", node.Name) } // Check eviction timeout against decisionTimestamp switch observedReadyCondition.Status { case v1.ConditionFalse: if decisionTimestamp.After(nodeHealthData.readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { - if nc.evictPods(node) { + enqueued, err := nc.evictPods(node, pods) + if err != nil { + return err + } + if enqueued { klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v", node.Name, decisionTimestamp, @@ -882,7 +944,11 @@ func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCon } case v1.ConditionUnknown: if decisionTimestamp.After(nodeHealthData.probeTimestamp.Add(nc.podEvictionTimeout)) { - if nc.evictPods(node) { + enqueued, err := nc.evictPods(node, pods) + if err != nil { + return err + } + if enqueued { klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v", node.Name, decisionTimestamp, @@ -896,6 +962,7 @@ func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCon klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name) } } + return nil } // labelNodeDisruptionExclusion is a label on nodes that controls whether they are @@ -1206,6 +1273,90 @@ func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.Nod } } +func (nc *Controller) podUpdated(oldPod, newPod *v1.Pod) { + if newPod == nil { + return + } + if len(newPod.Spec.NodeName) != 0 && (oldPod == nil || newPod.Spec.NodeName != oldPod.Spec.NodeName) { + podItem := podUpdateItem{newPod.Namespace, newPod.Name} + nc.podUpdateQueue.Add(podItem) + } +} + +func (nc *Controller) doPodProcessingWorker() { + for { + obj, shutdown := nc.podUpdateQueue.Get() + // "podUpdateQueue" will be shutdown when "stopCh" closed; + // we do not need to re-check "stopCh" again. + if shutdown { + return + } + + podItem := obj.(podUpdateItem) + nc.processPod(podItem) + } +} + +// processPod is processing events of assigning pods to nodes. In particular: +// 1. for NodeReady=true node, taint eviction for this pod will be cancelled +// 2. for NodeReady=false or unknown node, taint eviction of pod will happen and pod will be marked as not ready +// 3. if node doesn't exist in cache, it will be skipped and handled later by doEvictionPass +func (nc *Controller) processPod(podItem podUpdateItem) { + defer nc.podUpdateQueue.Done(podItem) + pod, err := nc.podLister.Pods(podItem.namespace).Get(podItem.name) + if err != nil { + if apierrors.IsNotFound(err) { + // If the pod was deleted, there is no need to requeue. + return + } + klog.Warningf("Failed to read pod %v/%v: %v.", podItem.namespace, podItem.name, err) + nc.podUpdateQueue.AddRateLimited(podItem) + return + } + + nodeName := pod.Spec.NodeName + + nodeHealth := nc.nodeHealthMap.getDeepCopy(nodeName) + if nodeHealth == nil { + // Node data is not gathered yet or node has beed removed in the meantime. + // Pod will be handled by doEvictionPass method. + return + } + + node, err := nc.nodeLister.Get(nodeName) + if err != nil { + klog.Warningf("Failed to read node %v: %v.", nodeName, err) + nc.podUpdateQueue.AddRateLimited(podItem) + return + } + + _, currentReadyCondition := nodeutil.GetNodeCondition(nodeHealth.status, v1.NodeReady) + if currentReadyCondition == nil { + // Lack of NodeReady condition may only happen after node addition (or if it will be maliciously deleted). + // In both cases, the pod will be handled correctly (evicted if needed) during processing + // of the next node update event. + return + } + + pods := []*v1.Pod{pod} + // In taint-based eviction mode, only node updates are processed by NodeLifecycleController. + // Pods are processed by TaintManager. + if !nc.useTaintBasedEvictions { + if err := nc.processNoTaintBaseEviction(node, currentReadyCondition, nc.nodeMonitorGracePeriod, pods); err != nil { + klog.Warningf("Unable to process pod %+v eviction from node %v: %v.", podItem, nodeName, err) + nc.podUpdateQueue.AddRateLimited(podItem) + return + } + } + + if currentReadyCondition.Status != v1.ConditionTrue { + if err := nodeutil.MarkPodsNotReady(nc.kubeClient, pods, nodeName); err != nil { + klog.Warningf("Unable to mark pod %+v NotReady on node %v: %v.", podItem, nodeName, err) + nc.podUpdateQueue.AddRateLimited(podItem) + } + } +} + func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) { switch state { case stateNormal: @@ -1310,6 +1461,9 @@ func (nc *Controller) cancelPodEviction(node *v1.Node) bool { zone := utilnode.GetZoneKey(node) nc.evictorLock.Lock() defer nc.evictorLock.Unlock() + if !nc.nodeEvictionMap.setStatus(node.Name, unmarked) { + klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", node.Name) + } wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name) if wasDeleting { klog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name) @@ -1318,12 +1472,28 @@ func (nc *Controller) cancelPodEviction(node *v1.Node) bool { return false } -// evictPods queues an eviction for the provided node name, and returns false if the node is already -// queued for eviction. -func (nc *Controller) evictPods(node *v1.Node) bool { +// evictPods: +// - adds node to evictor queue if the node is not marked as evicted. +// Returns false if the node name was already enqueued. +// - deletes pods immediately if node is already marked as evicted. +// Returns false, because the node wasn't added to the queue. +func (nc *Controller) evictPods(node *v1.Node, pods []*v1.Pod) (bool, error) { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() - return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)) + status, ok := nc.nodeEvictionMap.getStatus(node.Name) + if ok && status == evicted { + // Node eviction already happened for this node. + // Handling immediate pod deletion. + _, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, node.Name, string(node.UID), nc.daemonSetStore) + if err != nil { + return false, fmt.Errorf("unable to delete pods from node %q: %v", node.Name, err) + } + return false, nil + } + if !nc.nodeEvictionMap.setStatus(node.Name, toBeEvicted) { + klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", node.Name) + } + return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)), nil } func (nc *Controller) markNodeForTainting(node *v1.Node) bool { diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index dfca0518a6d..ef2b0507800 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -91,27 +91,25 @@ type nodeLifecycleController struct { // doEviction does the fake eviction and returns the status of eviction operation. func (nc *nodeLifecycleController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler) bool { - var podEvicted bool + nc.evictorLock.Lock() + defer nc.evictorLock.Unlock() zones := testutil.GetZones(fakeNodeHandler) for _, zone := range zones { nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { uid, _ := value.UID.(string) - pods, err := listPodsFromNode(fakeNodeHandler, value.Value) - if err != nil { - return false, 0 - } + pods, _ := nc.getPodsAssignedToNode(value.Value) nodeutil.DeletePods(fakeNodeHandler, pods, nc.recorder, value.Value, uid, nc.daemonSetStore) + _ = nc.nodeEvictionMap.setStatus(value.Value, evicted) return true, 0 }) } for _, action := range fakeNodeHandler.Actions() { if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { - podEvicted = true - return podEvicted + return true } } - return podEvicted + return false } func createNodeLease(nodeName string, renewTime metav1.MicroTime) *coordv1.Lease { @@ -701,10 +699,11 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) { if _, ok := nodeController.zonePodEvictor[zone]; ok { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeUID, _ := value.UID.(string) - pods, err := listPodsFromNode(item.fakeNodeHandler, value.Value) + pods, err := nodeController.getPodsAssignedToNode(value.Value) if err != nil { t.Errorf("unexpected error: %v", err) } + t.Logf("listed pods %d for node %v", len(pods), value.Value) nodeutil.DeletePods(item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister()) return true, 0 }) @@ -858,7 +857,7 @@ func TestPodStatusChange(t *testing.T) { for _, zone := range zones { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeUID, _ := value.UID.(string) - pods, err := listPodsFromNode(item.fakeNodeHandler, value.Value) + pods, err := nodeController.getPodsAssignedToNode(value.Value) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -879,7 +878,7 @@ func TestPodStatusChange(t *testing.T) { } if podReasonUpdate != item.expectedPodUpdate { - t.Errorf("expected pod update: %+v, got %+v for %+v", podReasonUpdate, item.expectedPodUpdate, item.description) + t.Errorf("expected pod update: %+v, got %+v for %+v", item.expectedPodUpdate, podReasonUpdate, item.description) } } } @@ -2418,11 +2417,12 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) { } } table := []struct { - desc string - fakeNodeHandler *testutil.FakeNodeHandler - updateReactor func(action testcore.Action) (bool, runtime.Object, error) - nodeIterations []nodeIteration - expectedPodStatusUpdates int + desc string + fakeNodeHandler *testutil.FakeNodeHandler + updateReactor func(action testcore.Action) (bool, runtime.Object, error) + fakeGetPodsAssignedToNode func(c *fake.Clientset) func(string) ([]*v1.Pod, error) + nodeIterations []nodeIteration + expectedPodStatusUpdates int }{ // Node created long time ago, with status updated by kubelet exceeds grace period. // First monitorNodeHealth check will update pod status to NotReady. @@ -2432,6 +2432,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) { fakeNodeHandler: &testutil.FakeNodeHandler{ Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), }, + fakeGetPodsAssignedToNode: fakeGetPodsAssignedToNode, nodeIterations: []nodeIteration{ { timeToPass: 0, @@ -2472,6 +2473,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) { return true, nil, fmt.Errorf("unsupported action") } }(), + fakeGetPodsAssignedToNode: fakeGetPodsAssignedToNode, nodeIterations: []nodeIteration{ { timeToPass: 0, @@ -2488,6 +2490,41 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) { }, expectedPodStatusUpdates: 2, // One failed and one retry. }, + // Node created long time ago, with status updated by kubelet exceeds grace period. + // First monitorNodeHealth check will fail to list pods. + // Second monitorNodeHealth check will update pod status to NotReady (retry). + { + desc: "unsuccessful pod list, retry required", + fakeNodeHandler: &testutil.FakeNodeHandler{ + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), + }, + fakeGetPodsAssignedToNode: func(c *fake.Clientset) func(string) ([]*v1.Pod, error) { + i := 0 + f := fakeGetPodsAssignedToNode(c) + return func(nodeName string) ([]*v1.Pod, error) { + i++ + if i == 1 { + return nil, fmt.Errorf("fake error") + } + return f(nodeName) + } + }, + nodeIterations: []nodeIteration{ + { + timeToPass: 0, + newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow), + }, + { + timeToPass: 1 * time.Minute, + newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow), + }, + { + timeToPass: 1 * time.Minute, + newNodes: makeNodes(v1.ConditionFalse, timePlusTwoMinutes, timePlusTwoMinutes), + }, + }, + expectedPodStatusUpdates: 1, + }, } for _, item := range table { @@ -2508,7 +2545,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) { } nodeController.now = func() metav1.Time { return timeNow } nodeController.recorder = testutil.NewFakeRecorder() - nodeController.getPodsAssignedToNode = fakeGetPodsAssignedToNode(item.fakeNodeHandler.Clientset) + nodeController.getPodsAssignedToNode = item.fakeGetPodsAssignedToNode(item.fakeNodeHandler.Clientset) for _, itertion := range item.nodeIterations { nodeController.now = func() metav1.Time { return metav1.Time{Time: timeNow.Add(itertion.timeToPass)} } item.fakeNodeHandler.Existing = itertion.newNodes diff --git a/pkg/controller/util/node/controller_utils.go b/pkg/controller/util/node/controller_utils.go index 415867f798f..b55d0dd6bd6 100644 --- a/pkg/controller/util/node/controller_utils.go +++ b/pkg/controller/util/node/controller_utils.go @@ -80,6 +80,11 @@ func DeletePods(kubeClient clientset.Interface, pods []*v1.Pod, recorder record. klog.V(2).Infof("Starting deletion of pod %v/%v", pod.Namespace, pod.Name) recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { + if apierrors.IsNotFound(err) { + // NotFound error means that pod was already deleted. + // There is nothing left to do with this pod. + continue + } return false, err } remaining = true @@ -133,6 +138,11 @@ func MarkPodsNotReady(kubeClient clientset.Interface, pods []*v1.Pod, nodeName s klog.V(2).Infof("Updating ready status of pod %v to false", pod.Name) _, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) if err != nil { + if apierrors.IsNotFound(err) { + // NotFound error means that pod was already deleted. + // There is nothing left to do with this pod. + continue + } klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) errMsg = append(errMsg, fmt.Sprintf("%v", err)) }