adding pods lister

This commit is contained in:
Krzysztof Siedlecki 2019-08-01 18:59:22 +02:00
parent 08410cbf06
commit 9406e5bf2a
4 changed files with 286 additions and 71 deletions

View File

@ -9,7 +9,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/nodelifecycle/scheduler:go_default_library",
"//pkg/controller/util/node:go_default_library",
@ -23,7 +22,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -34,7 +34,6 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@ -53,7 +52,6 @@ import (
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
apicore "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
@ -131,6 +129,9 @@ const (
// The amount of time the nodecontroller should sleep between retrying node health updates
retrySleepTime = 20 * time.Millisecond
nodeNameKeyIndex = "spec.nodeName"
// podUpdateWorkerSizes assumes that in most cases pod will be handled by monitorNodeHealth pass.
// Pod update workes will only handle lagging cache pods. 4 workes should be enough.
podUpdateWorkerSize = 4
)
// labelReconcileInfo lists Node labels to reconcile, and how to reconcile them.
@ -208,10 +209,67 @@ func (n *nodeHealthMap) set(name string, data *nodeHealthData) {
n.nodeHealths[name] = data
}
type podUpdateItem struct {
namespace string
name string
}
type evictionStatus int
const (
unmarked = iota
toBeEvicted
evicted
)
// nodeEvictionMap stores evictionStatus data for each node.
type nodeEvictionMap struct {
lock sync.Mutex
nodeEvictions map[string]evictionStatus
}
func newNodeEvictionMap() *nodeEvictionMap {
return &nodeEvictionMap{
nodeEvictions: make(map[string]evictionStatus),
}
}
func (n *nodeEvictionMap) registerNode(nodeName string) {
n.lock.Lock()
defer n.lock.Unlock()
n.nodeEvictions[nodeName] = unmarked
}
func (n *nodeEvictionMap) unregisterNode(nodeName string) {
n.lock.Lock()
defer n.lock.Unlock()
delete(n.nodeEvictions, nodeName)
}
func (n *nodeEvictionMap) setStatus(nodeName string, status evictionStatus) bool {
n.lock.Lock()
defer n.lock.Unlock()
if _, exists := n.nodeEvictions[nodeName]; !exists {
return false
}
n.nodeEvictions[nodeName] = status
return true
}
func (n *nodeEvictionMap) getStatus(nodeName string) (evictionStatus, bool) {
n.lock.Lock()
defer n.lock.Unlock()
if _, exists := n.nodeEvictions[nodeName]; !exists {
return unmarked, false
}
return n.nodeEvictions[nodeName], true
}
// Controller is the controller that manages node's life cycle.
type Controller struct {
taintManager *scheduler.NoExecuteTaintManager
podLister corelisters.PodLister
podInformerSynced cache.InformerSynced
kubeClient clientset.Interface
@ -227,12 +285,12 @@ type Controller struct {
// per Node map storing last observed health together with a local time when it was observed.
nodeHealthMap *nodeHealthMap
// Lock to access evictor workers
evictorLock sync.Mutex
// evictorLock protects zonePodEvictor and zoneNoExecuteTainter.
// TODO(#83954): API calls shouldn't be executed under the lock.
evictorLock sync.Mutex
nodeEvictionMap *nodeEvictionMap
// workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
// workers that are responsible for tainting nodes.
zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
@ -299,6 +357,7 @@ type Controller struct {
useTaintBasedEvictions bool
nodeUpdateQueue workqueue.Interface
podUpdateQueue workqueue.RateLimitingInterface
}
// NewNodeLifecycleController returns a new taint controller.
@ -343,6 +402,7 @@ func NewNodeLifecycleController(
now: metav1.Now,
knownNodeSet: make(map[string]*v1.Node),
nodeHealthMap: newNodeHealthMap(),
nodeEvictionMap: newNodeEvictionMap(),
recorder: recorder,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
@ -359,6 +419,7 @@ func NewNodeLifecycleController(
runTaintManager: runTaintManager,
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
nodeUpdateQueue: workqueue.NewNamed("node_lifecycle_controller"),
podUpdateQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_lifecycle_controller_pods"),
}
if useTaintBasedEvictions {
klog.Infof("Controller is using taint based evictions.")
@ -371,6 +432,7 @@ func NewNodeLifecycleController(
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
nc.podUpdated(nil, pod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(nil, pod)
}
@ -378,6 +440,7 @@ func NewNodeLifecycleController(
UpdateFunc: func(prev, obj interface{}) {
prevPod := prev.(*v1.Pod)
newPod := obj.(*v1.Pod)
nc.podUpdated(prevPod, newPod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(prevPod, newPod)
}
@ -397,6 +460,7 @@ func NewNodeLifecycleController(
return
}
}
nc.podUpdated(pod, nil)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(pod, nil)
}
@ -432,10 +496,10 @@ func NewNodeLifecycleController(
}
return pods, nil
}
nc.podLister = podInformer.Lister()
if nc.runTaintManager {
podLister := podInformer.Lister()
podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) }
podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) }
nodeLister := nodeInformer.Lister()
nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode)
@ -459,6 +523,7 @@ func NewNodeLifecycleController(
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.nodeUpdateQueue.Add(node.Name)
nc.nodeEvictionMap.registerNode(node.Name)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
@ -467,6 +532,7 @@ func NewNodeLifecycleController(
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.nodesToRetry.Delete(node.Name)
nc.nodeEvictionMap.unregisterNode(node.Name)
return nil
}),
})
@ -505,6 +571,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
defer nc.podUpdateQueue.ShutDown()
// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
@ -515,6 +582,10 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)
}
for i := 0; i < podUpdateWorkerSize; i++ {
go wait.Until(nc.doPodProcessingWorker, time.Second, stopCh)
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
@ -671,21 +742,26 @@ func (nc *Controller) doEvictionPass() {
klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
}
nodeUID, _ := value.UID.(string)
pods, err := listPodsFromNode(nc.kubeClient, value.Value)
pods, err := nc.getPodsAssignedToNode(value.Value)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to list pods from node %q: %v", value.Value, err))
return false, 0
}
remaining, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil {
// We are not setting eviction status here.
// New pods will be handled by zonePodEvictor retry
// instead of immediate pod eviction.
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if !nc.nodeEvictionMap.setStatus(value.Value, evicted) {
klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", value.Value)
}
if remaining {
klog.Infof("Pods awaiting deletion due to Controller eviction")
}
//count the evictionsNumber
if node != nil {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
@ -696,20 +772,6 @@ func (nc *Controller) doEvictionPass() {
}
}
func listPodsFromNode(kubeClient clientset.Interface, nodeName string) ([]*v1.Pod, error) {
selector := fields.OneTermEqualSelector(apicore.PodHostField, nodeName).String()
options := metav1.ListOptions{FieldSelector: selector}
pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(options)
if err != nil {
return nil, err
}
rPods := make([]*v1.Pod, len(pods.Items))
for i := range pods.Items {
rPods[i] = &pods.Items[i]
}
return rPods, nil
}
// monitorNodeHealth verifies node health are constantly updated by kubelet, and
// if not, post "NodeReady==ConditionUnknown".
// For nodes who are not ready or not reachable for a long period of time.
@ -775,16 +837,24 @@ func (nc *Controller) monitorNodeHealth() error {
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
}
nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name)
if nodeHealthData == nil {
klog.Errorf("Skipping %v node processing: health data doesn't exist.", node.Name)
continue
}
if currentReadyCondition != nil {
pods, err := nc.getPodsAssignedToNode(node.Name)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to list pods of node %v: %v", node.Name, err))
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
// If error happened during node status transition (Ready -> NotReady)
// we need to mark node for retry to force MarkPodsNotReady execution
// in the next iteration.
nc.nodesToRetry.Store(node.Name, struct{}{})
}
continue
}
if nc.useTaintBasedEvictions {
nc.processTaintBaseEviction(node, &observedReadyCondition)
} else {
nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod)
if err := nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod, pods); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict all pods from node %v: %v; queuing for retry", node.Name, err))
}
}
_, needsRetry := nc.nodesToRetry.Load(node.Name)
@ -794,8 +864,8 @@ func (nc *Controller) monitorNodeHealth() error {
nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
fallthrough
case needsRetry && observedReadyCondition.Status != v1.ConditionTrue:
if err := nc.markPodsNotReady(node.Name); err != nil {
utilruntime.HandleError(err)
if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, node.Name); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err))
nc.nodesToRetry.Store(node.Name, struct{}{})
continue
}
@ -808,17 +878,6 @@ func (nc *Controller) monitorNodeHealth() error {
return nil
}
func (nc *Controller) markPodsNotReady(nodeName string) error {
pods, err := listPodsFromNode(nc.kubeClient, nodeName)
if err != nil {
return fmt.Errorf("unable to list pods from node %v: %v", nodeName, err)
}
if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, nodeName); err != nil {
return fmt.Errorf("unable to mark all pods NotReady on node %v: %v", nodeName, err)
}
return nil
}
func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition) {
decisionTimestamp := nc.now()
// Check eviction timeout against decisionTimestamp
@ -860,18 +919,21 @@ func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondi
}
}
func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition, gracePeriod time.Duration) {
func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition, gracePeriod time.Duration, pods []*v1.Pod) error {
decisionTimestamp := nc.now()
nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name)
if nodeHealthData == nil {
klog.Errorf("Skipping %v node processing: health data doesn't exist.", node.Name)
return
return fmt.Errorf("health data doesn't exist for node %q", node.Name)
}
// Check eviction timeout against decisionTimestamp
switch observedReadyCondition.Status {
case v1.ConditionFalse:
if decisionTimestamp.After(nodeHealthData.readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) {
enqueued, err := nc.evictPods(node, pods)
if err != nil {
return err
}
if enqueued {
klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v",
node.Name,
decisionTimestamp,
@ -882,7 +944,11 @@ func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCon
}
case v1.ConditionUnknown:
if decisionTimestamp.After(nodeHealthData.probeTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) {
enqueued, err := nc.evictPods(node, pods)
if err != nil {
return err
}
if enqueued {
klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v",
node.Name,
decisionTimestamp,
@ -896,6 +962,7 @@ func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCon
klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
return nil
}
// labelNodeDisruptionExclusion is a label on nodes that controls whether they are
@ -1206,6 +1273,90 @@ func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.Nod
}
}
func (nc *Controller) podUpdated(oldPod, newPod *v1.Pod) {
if newPod == nil {
return
}
if len(newPod.Spec.NodeName) != 0 && (oldPod == nil || newPod.Spec.NodeName != oldPod.Spec.NodeName) {
podItem := podUpdateItem{newPod.Namespace, newPod.Name}
nc.podUpdateQueue.Add(podItem)
}
}
func (nc *Controller) doPodProcessingWorker() {
for {
obj, shutdown := nc.podUpdateQueue.Get()
// "podUpdateQueue" will be shutdown when "stopCh" closed;
// we do not need to re-check "stopCh" again.
if shutdown {
return
}
podItem := obj.(podUpdateItem)
nc.processPod(podItem)
}
}
// processPod is processing events of assigning pods to nodes. In particular:
// 1. for NodeReady=true node, taint eviction for this pod will be cancelled
// 2. for NodeReady=false or unknown node, taint eviction of pod will happen and pod will be marked as not ready
// 3. if node doesn't exist in cache, it will be skipped and handled later by doEvictionPass
func (nc *Controller) processPod(podItem podUpdateItem) {
defer nc.podUpdateQueue.Done(podItem)
pod, err := nc.podLister.Pods(podItem.namespace).Get(podItem.name)
if err != nil {
if apierrors.IsNotFound(err) {
// If the pod was deleted, there is no need to requeue.
return
}
klog.Warningf("Failed to read pod %v/%v: %v.", podItem.namespace, podItem.name, err)
nc.podUpdateQueue.AddRateLimited(podItem)
return
}
nodeName := pod.Spec.NodeName
nodeHealth := nc.nodeHealthMap.getDeepCopy(nodeName)
if nodeHealth == nil {
// Node data is not gathered yet or node has beed removed in the meantime.
// Pod will be handled by doEvictionPass method.
return
}
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
klog.Warningf("Failed to read node %v: %v.", nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
return
}
_, currentReadyCondition := nodeutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
if currentReadyCondition == nil {
// Lack of NodeReady condition may only happen after node addition (or if it will be maliciously deleted).
// In both cases, the pod will be handled correctly (evicted if needed) during processing
// of the next node update event.
return
}
pods := []*v1.Pod{pod}
// In taint-based eviction mode, only node updates are processed by NodeLifecycleController.
// Pods are processed by TaintManager.
if !nc.useTaintBasedEvictions {
if err := nc.processNoTaintBaseEviction(node, currentReadyCondition, nc.nodeMonitorGracePeriod, pods); err != nil {
klog.Warningf("Unable to process pod %+v eviction from node %v: %v.", podItem, nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
return
}
}
if currentReadyCondition.Status != v1.ConditionTrue {
if err := nodeutil.MarkPodsNotReady(nc.kubeClient, pods, nodeName); err != nil {
klog.Warningf("Unable to mark pod %+v NotReady on node %v: %v.", podItem, nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
}
}
}
func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
switch state {
case stateNormal:
@ -1310,6 +1461,9 @@ func (nc *Controller) cancelPodEviction(node *v1.Node) bool {
zone := utilnode.GetZoneKey(node)
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
if !nc.nodeEvictionMap.setStatus(node.Name, unmarked) {
klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", node.Name)
}
wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name)
if wasDeleting {
klog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
@ -1318,12 +1472,28 @@ func (nc *Controller) cancelPodEviction(node *v1.Node) bool {
return false
}
// evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction.
func (nc *Controller) evictPods(node *v1.Node) bool {
// evictPods:
// - adds node to evictor queue if the node is not marked as evicted.
// Returns false if the node name was already enqueued.
// - deletes pods immediately if node is already marked as evicted.
// Returns false, because the node wasn't added to the queue.
func (nc *Controller) evictPods(node *v1.Node, pods []*v1.Pod) (bool, error) {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
status, ok := nc.nodeEvictionMap.getStatus(node.Name)
if ok && status == evicted {
// Node eviction already happened for this node.
// Handling immediate pod deletion.
_, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, node.Name, string(node.UID), nc.daemonSetStore)
if err != nil {
return false, fmt.Errorf("unable to delete pods from node %q: %v", node.Name, err)
}
return false, nil
}
if !nc.nodeEvictionMap.setStatus(node.Name, toBeEvicted) {
klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", node.Name)
}
return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)), nil
}
func (nc *Controller) markNodeForTainting(node *v1.Node) bool {

View File

@ -91,27 +91,25 @@ type nodeLifecycleController struct {
// doEviction does the fake eviction and returns the status of eviction operation.
func (nc *nodeLifecycleController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler) bool {
var podEvicted bool
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
zones := testutil.GetZones(fakeNodeHandler)
for _, zone := range zones {
nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
pods, err := listPodsFromNode(fakeNodeHandler, value.Value)
if err != nil {
return false, 0
}
pods, _ := nc.getPodsAssignedToNode(value.Value)
nodeutil.DeletePods(fakeNodeHandler, pods, nc.recorder, value.Value, uid, nc.daemonSetStore)
_ = nc.nodeEvictionMap.setStatus(value.Value, evicted)
return true, 0
})
}
for _, action := range fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podEvicted = true
return podEvicted
return true
}
}
return podEvicted
return false
}
func createNodeLease(nodeName string, renewTime metav1.MicroTime) *coordv1.Lease {
@ -701,10 +699,11 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) {
if _, ok := nodeController.zonePodEvictor[zone]; ok {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUID, _ := value.UID.(string)
pods, err := listPodsFromNode(item.fakeNodeHandler, value.Value)
pods, err := nodeController.getPodsAssignedToNode(value.Value)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
t.Logf("listed pods %d for node %v", len(pods), value.Value)
nodeutil.DeletePods(item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister())
return true, 0
})
@ -858,7 +857,7 @@ func TestPodStatusChange(t *testing.T) {
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUID, _ := value.UID.(string)
pods, err := listPodsFromNode(item.fakeNodeHandler, value.Value)
pods, err := nodeController.getPodsAssignedToNode(value.Value)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -879,7 +878,7 @@ func TestPodStatusChange(t *testing.T) {
}
if podReasonUpdate != item.expectedPodUpdate {
t.Errorf("expected pod update: %+v, got %+v for %+v", podReasonUpdate, item.expectedPodUpdate, item.description)
t.Errorf("expected pod update: %+v, got %+v for %+v", item.expectedPodUpdate, podReasonUpdate, item.description)
}
}
}
@ -2418,11 +2417,12 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) {
}
}
table := []struct {
desc string
fakeNodeHandler *testutil.FakeNodeHandler
updateReactor func(action testcore.Action) (bool, runtime.Object, error)
nodeIterations []nodeIteration
expectedPodStatusUpdates int
desc string
fakeNodeHandler *testutil.FakeNodeHandler
updateReactor func(action testcore.Action) (bool, runtime.Object, error)
fakeGetPodsAssignedToNode func(c *fake.Clientset) func(string) ([]*v1.Pod, error)
nodeIterations []nodeIteration
expectedPodStatusUpdates int
}{
// Node created long time ago, with status updated by kubelet exceeds grace period.
// First monitorNodeHealth check will update pod status to NotReady.
@ -2432,6 +2432,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) {
fakeNodeHandler: &testutil.FakeNodeHandler{
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
},
fakeGetPodsAssignedToNode: fakeGetPodsAssignedToNode,
nodeIterations: []nodeIteration{
{
timeToPass: 0,
@ -2472,6 +2473,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) {
return true, nil, fmt.Errorf("unsupported action")
}
}(),
fakeGetPodsAssignedToNode: fakeGetPodsAssignedToNode,
nodeIterations: []nodeIteration{
{
timeToPass: 0,
@ -2488,6 +2490,41 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) {
},
expectedPodStatusUpdates: 2, // One failed and one retry.
},
// Node created long time ago, with status updated by kubelet exceeds grace period.
// First monitorNodeHealth check will fail to list pods.
// Second monitorNodeHealth check will update pod status to NotReady (retry).
{
desc: "unsuccessful pod list, retry required",
fakeNodeHandler: &testutil.FakeNodeHandler{
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
},
fakeGetPodsAssignedToNode: func(c *fake.Clientset) func(string) ([]*v1.Pod, error) {
i := 0
f := fakeGetPodsAssignedToNode(c)
return func(nodeName string) ([]*v1.Pod, error) {
i++
if i == 1 {
return nil, fmt.Errorf("fake error")
}
return f(nodeName)
}
},
nodeIterations: []nodeIteration{
{
timeToPass: 0,
newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow),
},
{
timeToPass: 1 * time.Minute,
newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow),
},
{
timeToPass: 1 * time.Minute,
newNodes: makeNodes(v1.ConditionFalse, timePlusTwoMinutes, timePlusTwoMinutes),
},
},
expectedPodStatusUpdates: 1,
},
}
for _, item := range table {
@ -2508,7 +2545,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) {
}
nodeController.now = func() metav1.Time { return timeNow }
nodeController.recorder = testutil.NewFakeRecorder()
nodeController.getPodsAssignedToNode = fakeGetPodsAssignedToNode(item.fakeNodeHandler.Clientset)
nodeController.getPodsAssignedToNode = item.fakeGetPodsAssignedToNode(item.fakeNodeHandler.Clientset)
for _, itertion := range item.nodeIterations {
nodeController.now = func() metav1.Time { return metav1.Time{Time: timeNow.Add(itertion.timeToPass)} }
item.fakeNodeHandler.Existing = itertion.newNodes

View File

@ -80,6 +80,11 @@ func DeletePods(kubeClient clientset.Interface, pods []*v1.Pod, recorder record.
klog.V(2).Infof("Starting deletion of pod %v/%v", pod.Namespace, pod.Name)
recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
if apierrors.IsNotFound(err) {
// NotFound error means that pod was already deleted.
// There is nothing left to do with this pod.
continue
}
return false, err
}
remaining = true
@ -133,6 +138,11 @@ func MarkPodsNotReady(kubeClient clientset.Interface, pods []*v1.Pod, nodeName s
klog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
_, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
if err != nil {
if apierrors.IsNotFound(err) {
// NotFound error means that pod was already deleted.
// There is nothing left to do with this pod.
continue
}
klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
errMsg = append(errMsg, fmt.Sprintf("%v", err))
}