Reduce tightness of coupling in NodeController

This commit is contained in:
gmarek 2016-07-11 13:23:53 +02:00
parent ed1361f684
commit 7524da877e
3 changed files with 361 additions and 335 deletions

View File

@ -0,0 +1,319 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"fmt"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/types"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/version"
"github.com/golang/glog"
)
// cleanupOrphanedPods deletes pods that are bound to nodes that don't
// exist.
func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if _, exists, _ := nodeStore.GetByKey(pod.Spec.NodeName); exists {
continue
}
if err := forcefulDeletePodFunc(pod); err != nil {
utilruntime.HandleError(err)
}
}
}
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, daemonStore cache.StoreToDaemonSetLister) (bool, error) {
remaining := false
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return remaining, err
}
if len(pods.Items) > 0 {
recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
}
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// if the pod has already been deleted, ignore it
if pod.DeletionGracePeriodSeconds != nil {
continue
}
// if the pod is managed by a daemonset, ignore it
_, err := daemonStore.GetPodDaemonSets(&pod)
if err == nil { // No error means at least one daemonset was found
continue
}
glog.V(2).Infof("Starting deletion of pod %v", pod.Name)
recorder.Eventf(&pod, api.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err
}
remaining = true
}
return remaining, nil
}
func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
var zero int64
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
if err == nil {
glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name)
}
return err
}
// forcefullyDeleteNode immediately deletes all pods on the node, and then
// deletes the node itself.
func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, forcefulDeletePodFunc func(*api.Pod) error) error {
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err)
}
for _, pod := range pods.Items {
if pod.Spec.NodeName != nodeName {
continue
}
if err := forcefulDeletePodFunc(&pod); err != nil {
return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err)
}
}
if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
}
return nil
}
// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes
// all eviction timer to reset.
func forceUpdateAllProbeTimes(now unversioned.Time, statusData map[string]nodeStatusData) {
for k, v := range statusData {
v.probeTimestamp = now
v.readyTransitionTimestamp = now
statusData[k] = v
}
}
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
pod, ok := obj.(*api.Pod)
if !ok {
return
}
// consider only terminating pods
if pod.DeletionTimestamp == nil {
return
}
// delete terminating pods that have not yet been scheduled
if len(pod.Spec.NodeName) == 0 {
utilruntime.HandleError(forcefulDeletePodFunc(pod))
return
}
nodeObj, found, err := nodeStore.GetByKey(pod.Spec.NodeName)
if err != nil {
// this can only happen if the Store.KeyFunc has a problem creating
// a key for the pod. If it happens once, it will happen again so
// don't bother requeuing the pod.
utilruntime.HandleError(err)
return
}
// delete terminating pods that have been scheduled on
// nonexistent nodes
if !found {
glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName)
utilruntime.HandleError(forcefulDeletePodFunc(pod))
return
}
// delete terminating pods that have been scheduled on
// nodes that do not support graceful termination
// TODO(mikedanese): this can be removed when we no longer
// guarantee backwards compatibility of master API to kubelets with
// versions less than 1.1.0
node := nodeObj.(*api.Node)
v, err := version.Parse(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err)
utilruntime.HandleError(forcefulDeletePodFunc(pod))
return
}
if gracefulDeletionVersion.GT(v) {
utilruntime.HandleError(forcefulDeletePodFunc(pod))
return
}
}
// update ready status of all pods running on given node from master
// return true if success
func markAllPodsNotReady(kubeClient clientset.Interface, nodeName string) error {
glog.V(2).Infof("Update ready status of pods on node [%v]", nodeName)
opts := api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName)}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(opts)
if err != nil {
return err
}
errMsg := []string{}
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
for i, cond := range pod.Status.Conditions {
if cond.Type == api.PodReady {
pod.Status.Conditions[i].Status = api.ConditionFalse
glog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
_, err := kubeClient.Core().Pods(pod.Namespace).UpdateStatus(&pod)
if err != nil {
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err)
errMsg = append(errMsg, fmt.Sprintf("%v", err))
}
break
}
}
}
if len(errMsg) == 0 {
return nil
}
return fmt.Errorf("%v", strings.Join(errMsg, "; "))
}
func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
}
if _, err := instances.ExternalID(nodeName); err != nil {
if err == cloudprovider.InstanceNotFound {
return false, nil
}
return false, err
}
return true, nil
}
func recordNodeEvent(recorder record.EventRecorder, nodeName, eventtype, reason, event string) {
ref := &api.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
}
glog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
}
func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) {
ref := &api.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.Name),
Namespace: "",
}
glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
}
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
// long before we should check again (the next deadline for a pod to complete), or an error.
func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) {
// the time before we should try again
nextAttempt := time.Duration(0)
// have we deleted all pods
complete := true
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return false, nextAttempt, err
}
now := time.Now()
elapsed := now.Sub(since)
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// only clean terminated pods
if pod.DeletionGracePeriodSeconds == nil {
continue
}
// the user's requested grace period
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if grace > maxGracePeriod {
grace = maxGracePeriod
}
// the time remaining before the pod should have been deleted
remaining := grace - elapsed
if remaining < 0 {
remaining = 0
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
complete = false
}
} else {
glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining)
complete = false
}
if nextAttempt < remaining {
nextAttempt = remaining
}
}
return complete, nextAttempt, nil
}

View File

@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
@ -35,11 +34,8 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -51,7 +47,8 @@ import (
)
var (
ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
gracefulDeletionVersion = version.MustParse("v1.1.0")
)
const (
@ -208,8 +205,12 @@ func NewNodeController(
&api.Pod{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{
AddFunc: nc.maybeDeleteTerminatingPod,
UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
AddFunc: func(obj interface{}) {
nc.maybeDeleteTerminatingPod(obj, nc.nodeStore.Store, nc.forcefullyDeletePod)
},
UpdateFunc: func(_, obj interface{}) {
nc.maybeDeleteTerminatingPod(obj, nc.nodeStore.Store, nc.forcefullyDeletePod)
},
},
// We don't need to build a index for podStore here actually, but build one for consistency.
// It will ensure that if people start making use of the podStore in more specific ways,
@ -301,7 +302,7 @@ func (nc *NodeController) Run(period time.Duration) {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := nc.deletePods(value.Value)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
@ -320,7 +321,7 @@ func (nc *NodeController) Run(period time.Duration) {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt)
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
@ -328,7 +329,7 @@ func (nc *NodeController) Run(period time.Duration) {
if completed {
glog.V(2).Infof("All pods terminated on %s", value.Value)
nc.recordNodeEvent(value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0
}
@ -341,94 +342,14 @@ func (nc *NodeController) Run(period time.Duration) {
})
}, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop)
}
var gracefulDeletionVersion = version.MustParse("v1.1.0")
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
pod, ok := obj.(*api.Pod)
if !ok {
return
}
// consider only terminating pods
if pod.DeletionTimestamp == nil {
return
}
// delete terminating pods that have not yet been scheduled
if len(pod.Spec.NodeName) == 0 {
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
nodeObj, found, err := nc.nodeStore.GetByKey(pod.Spec.NodeName)
if err != nil {
// this can only happen if the Store.KeyFunc has a problem creating
// a key for the pod. If it happens once, it will happen again so
// don't bother requeuing the pod.
utilruntime.HandleError(err)
return
}
// delete terminating pods that have been scheduled on
// nonexistent nodes
if !found {
glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName)
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
// delete terminating pods that have been scheduled on
// nodes that do not support graceful termination
// TODO(mikedanese): this can be removed when we no longer
// guarantee backwards compatibility of master API to kubelets with
// versions less than 1.1.0
node := nodeObj.(*api.Node)
v, err := version.Parse(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err)
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
if gracefulDeletionVersion.GT(v) {
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
}
// cleanupOrphanedPods deletes pods that are bound to nodes that don't
// exist.
func (nc *NodeController) cleanupOrphanedPods() {
pods, err := nc.podStore.List(labels.Everything())
if err != nil {
utilruntime.HandleError(err)
return
}
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if _, exists, _ := nc.nodeStore.Store.GetByKey(pod.Spec.NodeName); exists {
continue
}
if err := nc.forcefullyDeletePod(pod); err != nil {
go wait.Until(func() {
pods, err := nc.podStore.List(labels.Everything())
if err != nil {
utilruntime.HandleError(err)
return
}
}
}
func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
var zero int64
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
if err == nil {
glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name)
}
return err
cleanupOrphanedPods(pods, nc.nodeStore.Store, nc.forcefullyDeletePod)
}, 30*time.Second, wait.NeverStop)
}
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
@ -442,7 +363,7 @@ func (nc *NodeController) monitorNodeStatus() error {
for _, node := range nodes.Items {
if !nc.knownNodeSet.Has(node.Name) {
glog.V(1).Infof("NodeController observed a new Node: %#v", node)
nc.recordNodeEvent(node.Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name))
recordNodeEvent(nc.recorder, node.Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name))
nc.cancelPodEviction(node.Name)
nc.knownNodeSet.Insert(node.Name)
}
@ -457,7 +378,7 @@ func (nc *NodeController) monitorNodeStatus() error {
deleted := nc.knownNodeSet.Difference(observedSet)
for nodeName := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", nodeName)
nc.recordNodeEvent(nodeName, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName))
recordNodeEvent(nc.recorder, nodeName, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName))
nc.evictPods(nodeName)
nc.knownNodeSet.Delete(nodeName)
}
@ -516,7 +437,7 @@ func (nc *NodeController) monitorNodeStatus() error {
// Report node event.
if currentReadyCondition.Status != api.ConditionTrue && observedReadyCondition.Status == api.ConditionTrue {
recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = nc.markAllPodsNotReady(node.Name); err != nil {
if err = markAllPodsNotReady(nc.kubeClient, node.Name); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
@ -531,13 +452,13 @@ func (nc *NodeController) monitorNodeStatus() error {
}
if !exists {
glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
nc.recordNodeEvent(node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
recordNodeEvent(nc.recorder, node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
go func(nodeName string) {
defer utilruntime.HandleCrash()
// Kubelet is not reporting and Cloud Provider says node
// is gone. Delete it without worrying about grace
// periods.
if err := nc.forcefullyDeleteNode(nodeName); err != nil {
if err := forcefullyDeleteNode(nc.kubeClient, nodeName, nc.forcefullyDeletePod); err != nil {
glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
}
}(node.Name)
@ -555,7 +476,7 @@ func (nc *NodeController) monitorNodeStatus() error {
glog.V(2).Info("NodeController is entering network segmentation mode.")
} else {
if nc.networkSegmentationMode {
nc.forceUpdateAllProbeTimes()
forceUpdateAllProbeTimes(nc.now(), nc.nodeStatusMap)
nc.networkSegmentationMode = false
glog.V(2).Info("NodeController exited network segmentation mode.")
}
@ -563,67 +484,6 @@ func (nc *NodeController) monitorNodeStatus() error {
return nil
}
func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
}
if _, err := instances.ExternalID(nodeName); err != nil {
if err == cloudprovider.InstanceNotFound {
return false, nil
}
return false, err
}
return true, nil
}
// forcefullyDeleteNode immediately deletes all pods on the node, and then
// deletes the node itself.
func (nc *NodeController) forcefullyDeleteNode(nodeName string) error {
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err)
}
for _, pod := range pods.Items {
if pod.Spec.NodeName != nodeName {
continue
}
if err := nc.forcefullyDeletePod(&pod); err != nil {
return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err)
}
}
if err := nc.kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
}
return nil
}
func (nc *NodeController) recordNodeEvent(nodeName, eventtype, reason, event string) {
ref := &api.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
}
glog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
nc.recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
}
func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) {
ref := &api.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.Name),
Namespace: "",
}
glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
}
// For a given node checks its conditions and tries to update it. Returns grace period to which given node
// is entitled, state of current and last observed Ready Condition, and an error if it occurred.
func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) {
@ -789,28 +649,6 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes
// all eviction timer to reset.
func (nc *NodeController) forceUpdateAllProbeTimes() {
now := nc.now()
for k, v := range nc.nodeStatusMap {
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[k] = v
}
}
// evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction.
func (nc *NodeController) evictPods(nodeName string) bool {
if nc.networkSegmentationMode {
return false
}
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.podEvictor.Add(nodeName)
}
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued.
func (nc *NodeController) cancelPodEviction(nodeName string) bool {
@ -825,6 +663,17 @@ func (nc *NodeController) cancelPodEviction(nodeName string) bool {
return false
}
// evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction.
func (nc *NodeController) evictPods(nodeName string) bool {
if nc.networkSegmentationMode {
return false
}
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.podEvictor.Add(nodeName)
}
// stopAllPodEvictions removes any queued evictions for all Nodes.
func (nc *NodeController) stopAllPodEvictions() {
nc.evictorLock.Lock()
@ -833,135 +682,3 @@ func (nc *NodeController) stopAllPodEvictions() {
nc.podEvictor.Clear()
nc.terminationEvictor.Clear()
}
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted.
func (nc *NodeController) deletePods(nodeName string) (bool, error) {
remaining := false
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return remaining, err
}
if len(pods.Items) > 0 {
nc.recordNodeEvent(nodeName, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
}
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// if the pod has already been deleted, ignore it
if pod.DeletionGracePeriodSeconds != nil {
continue
}
// if the pod is managed by a daemonset, ignore it
_, err := nc.daemonSetStore.GetPodDaemonSets(&pod)
if err == nil { // No error means at least one daemonset was found
continue
}
glog.V(2).Infof("Starting deletion of pod %v", pod.Name)
nc.recorder.Eventf(&pod, api.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
if err := nc.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err
}
remaining = true
}
return remaining, nil
}
// update ready status of all pods running on given node from master
// return true if success
func (nc *NodeController) markAllPodsNotReady(nodeName string) error {
glog.V(2).Infof("Update ready status of pods on node [%v]", nodeName)
opts := api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName)}
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(opts)
if err != nil {
return err
}
errMsg := []string{}
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
for i, cond := range pod.Status.Conditions {
if cond.Type == api.PodReady {
pod.Status.Conditions[i].Status = api.ConditionFalse
glog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
_, err := nc.kubeClient.Core().Pods(pod.Namespace).UpdateStatus(&pod)
if err != nil {
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err)
errMsg = append(errMsg, fmt.Sprintf("%v", err))
}
break
}
}
}
if len(errMsg) == 0 {
return nil
}
return fmt.Errorf("%v", strings.Join(errMsg, "; "))
}
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
// long before we should check again (the next deadline for a pod to complete), or an error.
func (nc *NodeController) terminatePods(nodeName string, since time.Time) (bool, time.Duration, error) {
// the time before we should try again
nextAttempt := time.Duration(0)
// have we deleted all pods
complete := true
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return false, nextAttempt, err
}
now := time.Now()
elapsed := now.Sub(since)
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// only clean terminated pods
if pod.DeletionGracePeriodSeconds == nil {
continue
}
// the user's requested grace period
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if grace > nc.maximumGracePeriod {
grace = nc.maximumGracePeriod
}
// the time remaining before the pod should have been deleted
remaining := grace - elapsed
if remaining < 0 {
remaining = 0
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
nc.recordNodeEvent(nodeName, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
if err := nc.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
complete = false
}
} else {
glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining)
complete = false
}
if nextAttempt < remaining {
nextAttempt = remaining
}
}
return complete, nextAttempt, nil
}

View File

@ -555,14 +555,14 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, _ := nodeController.deletePods(value.Value)
remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore)
if remaining {
nodeController.terminationEvictor.Add(value.Value)
}
return true, 0
})
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
nodeController.terminatePods(value.Value, value.AddedAt)
terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod)
return true, 0
})
podEvicted := false
@ -1082,7 +1082,7 @@ func TestNodeDeletion(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
nodeController.deletePods(value.Value)
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore)
return true, 0
})
podEvicted := false
@ -1220,12 +1220,12 @@ func TestCheckPod(t *testing.T) {
for i, tc := range tcs {
var deleteCalls int
nc.forcefullyDeletePod = func(_ *api.Pod) error {
forcefullyDeletePodsFunc := func(_ *api.Pod) error {
deleteCalls++
return nil
}
nc.maybeDeleteTerminatingPod(&tc.pod)
nc.maybeDeleteTerminatingPod(&tc.pod, nc.nodeStore.Store, forcefullyDeletePodsFunc)
if tc.prune && deleteCalls != 1 {
t.Errorf("[%v] expected number of delete calls to be 1 but got %v", i, deleteCalls)
@ -1237,17 +1237,7 @@ func TestCheckPod(t *testing.T) {
}
func TestCleanupOrphanedPods(t *testing.T) {
newPod := func(name, node string) api.Pod {
return api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: api.PodSpec{
NodeName: node,
},
}
}
pods := []api.Pod{
pods := []*api.Pod{
newPod("a", "foo"),
newPod("b", "bar"),
newPod("c", "gone"),
@ -1263,12 +1253,12 @@ func TestCleanupOrphanedPods(t *testing.T) {
var deleteCalls int
var deletedPodName string
nc.forcefullyDeletePod = func(p *api.Pod) error {
forcefullyDeletePodFunc := func(p *api.Pod) error {
deleteCalls++
deletedPodName = p.ObjectMeta.Name
return nil
}
nc.cleanupOrphanedPods()
cleanupOrphanedPods(pods, nc.nodeStore.Store, forcefullyDeletePodFunc)
if deleteCalls != 1 {
t.Fatalf("expected one delete, got: %v", deleteCalls)