Removing force deletion of pods from the node-controller

This commit is contained in:
Anirudh 2016-10-28 10:45:04 -07:00
parent f196c40366
commit 5ccd7a325e
3 changed files with 12 additions and 150 deletions

View File

@ -90,23 +90,9 @@ func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
return err return err
} }
// forcefullyDeleteNode immediately deletes all pods on the node, and then // forcefullyDeleteNode immediately the node. The pods on the node are cleaned
// deletes the node itself. // up by the podGC.
func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, forcefulDeletePodFunc func(*api.Pod) error) error { func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error {
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err)
}
for _, pod := range pods.Items {
if pod.Spec.NodeName != nodeName {
continue
}
if err := forcefulDeletePodFunc(&pod); err != nil {
return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err)
}
}
if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil { if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err) return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
} }
@ -265,59 +251,3 @@ func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_s
// and event is recorded or neither should happen, see issue #6055. // and event is recorded or neither should happen, see issue #6055.
recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status) recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
} }
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
// long before we should check again (the next deadline for a pod to complete), or an error.
func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, nodeUID string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) {
// the time before we should try again
nextAttempt := time.Duration(0)
// have we deleted all pods
complete := true
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return false, nextAttempt, err
}
now := time.Now()
elapsed := now.Sub(since)
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// only clean terminated pods
if pod.DeletionGracePeriodSeconds == nil {
continue
}
// the user's requested grace period
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if grace > maxGracePeriod {
grace = maxGracePeriod
}
// the time remaining before the pod should have been deleted
remaining := grace - elapsed
if remaining < 0 {
remaining = 0
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
recordNodeEvent(recorder, nodeName, nodeUID, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
complete = false
}
} else {
glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining)
complete = false
}
if nextAttempt < remaining {
nextAttempt = remaining
}
}
return complete, nextAttempt, nil
}

View File

@ -126,7 +126,6 @@ type NodeController struct {
evictorLock sync.Mutex evictorLock sync.Mutex
// workers that evicts pods from unresponsive nodes. // workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*RateLimitedTimedQueue zonePodEvictor map[string]*RateLimitedTimedQueue
zoneTerminationEvictor map[string]*RateLimitedTimedQueue
podEvictionTimeout time.Duration podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated. // The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration maximumGracePeriod time.Duration
@ -215,7 +214,6 @@ func NewNodeController(
podEvictionTimeout: podEvictionTimeout, podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute, maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*RateLimitedTimedQueue), zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData), nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod, nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod, nodeMonitorPeriod: nodeMonitorPeriod,
@ -372,17 +370,8 @@ func (nc *NodeController) Run() {
}, nc.nodeMonitorPeriod, wait.NeverStop) }, nc.nodeMonitorPeriod, wait.NeverStop)
// Managing eviction of nodes: // Managing eviction of nodes:
// 1. when we delete pods off a node, if the node was not empty at the time we then // When we delete pods off a node, if the node was not empty at the time we then
// queue a termination watcher // queue an eviction watcher. If we hit an error, retry deletion.
// a. If we hit an error, retry deletion
// 2. The terminator loop ensures that pods are eventually cleaned and we never
// terminate a pod in a time period less than nc.maximumGracePeriod. AddedAt
// is the time from which we measure "has this pod been terminating too long",
// after which we will delete the pod with grace period 0 (force delete).
// a. If we hit errors, retry instantly
// b. If there are no pods left terminating, exit
// c. If there are pods still terminating, wait for their estimated completion
// before retrying
go wait.Until(func() { go wait.Until(func() {
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
@ -407,42 +396,12 @@ func (nc *NodeController) Run() {
} }
if remaining { if remaining {
nc.zoneTerminationEvictor[k].Add(value.Value, value.UID) glog.Infof("Pods awaiting deletion due to NodeController eviction")
} }
return true, 0 return true, 0
}) })
} }
}, nodeEvictionPeriod, wait.NeverStop) }, nodeEvictionPeriod, wait.NeverStop)
// TODO: replace with a controller that ensures pods that are terminating complete
// in a particular time period
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneTerminationEvictor {
nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, value.AddedAt, nc.maximumGracePeriod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
}
if completed {
glog.V(2).Infof("All pods terminated on %s", value.Value)
recordNodeEvent(nc.recorder, value.Value, nodeUid, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0
}
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
// clamp very short intervals
if remaining < nodeEvictionPeriod {
remaining = nodeEvictionPeriod
}
return false, remaining
})
}
}, nodeEvictionPeriod, wait.NeverStop)
}() }()
} }
@ -472,10 +431,6 @@ func (nc *NodeController) monitorNodeStatus() error {
glog.Infof("Initializing eviction metric for zone: %v", zone) glog.Infof("Initializing eviction metric for zone: %v", zone)
EvictionsNumber.WithLabelValues(zone).Add(0) EvictionsNumber.WithLabelValues(zone).Add(0)
} }
if _, found := nc.zoneTerminationEvictor[zone]; !found {
nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
}
nc.cancelPodEviction(added[i]) nc.cancelPodEviction(added[i])
} }
@ -559,7 +514,7 @@ func (nc *NodeController) monitorNodeStatus() error {
// Kubelet is not reporting and Cloud Provider says node // Kubelet is not reporting and Cloud Provider says node
// is gone. Delete it without worrying about grace // is gone. Delete it without worrying about grace
// periods. // periods.
if err := forcefullyDeleteNode(nc.kubeClient, nodeName, nc.forcefullyDeletePod); err != nil { if err := forcefullyDeleteNode(nc.kubeClient, nodeName); err != nil {
glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err) glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
} }
}(node.Name) }(node.Name)
@ -620,7 +575,6 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*ap
// We stop all evictions. // We stop all evictions.
for k := range nc.zonePodEvictor { for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].SwapLimiter(0) nc.zonePodEvictor[k].SwapLimiter(0)
nc.zoneTerminationEvictor[k].SwapLimiter(0)
} }
for k := range nc.zoneStates { for k := range nc.zoneStates {
nc.zoneStates[k] = stateFullDisruption nc.zoneStates[k] = stateFullDisruption
@ -664,17 +618,12 @@ func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zone
switch state { switch state {
case stateNormal: case stateNormal:
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS) nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
nc.zoneTerminationEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
case statePartialDisruption: case statePartialDisruption:
nc.zonePodEvictor[zone].SwapLimiter( nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize)) nc.enterPartialDisruptionFunc(zoneSize))
nc.zoneTerminationEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
case stateFullDisruption: case stateFullDisruption:
nc.zonePodEvictor[zone].SwapLimiter( nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize)) nc.enterFullDisruptionFunc(zoneSize))
nc.zoneTerminationEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
} }
} }
@ -873,8 +822,7 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name) wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name)
wasTerminating := nc.zoneTerminationEvictor[zone].Remove(node.Name) if wasDeleting {
if wasDeleting || wasTerminating {
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name) glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
return true return true
} }

View File

@ -519,15 +519,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, zone := range zones { for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string) nodeUid, _ := value.UID.(string)
remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore) deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
if remaining {
nodeController.zoneTerminationEvictor[zone].Add(value.Value, nodeUid)
}
return true, 0
})
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, value.AddedAt, nodeController.maximumGracePeriod)
return true, 0 return true, 0
}) })
} }
@ -1054,15 +1046,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
for _, zone := range zones { for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string) uid, _ := value.UID.(string)
remaining, _ := deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore) deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore)
if remaining {
nodeController.zoneTerminationEvictor[zone].Add(value.Value, value.UID)
}
return true, 0
})
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
terminatePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, value.AddedAt, nodeController.maximumGracePeriod)
return true, 0 return true, 0
}) })
} }