diff --git a/pkg/controller/node/deletion_utils.go b/pkg/controller/node/deletion_utils.go new file mode 100644 index 00000000000..aec2c9ace61 --- /dev/null +++ b/pkg/controller/node/deletion_utils.go @@ -0,0 +1,319 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "fmt" + "strings" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/kubelet/util/format" + "k8s.io/kubernetes/pkg/types" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/version" + + "github.com/golang/glog" +) + +// cleanupOrphanedPods deletes pods that are bound to nodes that don't +// exist. +func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) { + for _, pod := range pods { + if pod.Spec.NodeName == "" { + continue + } + if _, exists, _ := nodeStore.GetByKey(pod.Spec.NodeName); exists { + continue + } + if err := forcefulDeletePodFunc(pod); err != nil { + utilruntime.HandleError(err) + } + } +} + +// deletePods will delete all pods from master running on given node, and return true +// if any pods were deleted. +func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, daemonStore cache.StoreToDaemonSetLister) (bool, error) { + remaining := false + selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) + options := api.ListOptions{FieldSelector: selector} + pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options) + if err != nil { + return remaining, err + } + + if len(pods.Items) > 0 { + recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) + } + + for _, pod := range pods.Items { + // Defensive check, also needed for tests. + if pod.Spec.NodeName != nodeName { + continue + } + // if the pod has already been deleted, ignore it + if pod.DeletionGracePeriodSeconds != nil { + continue + } + // if the pod is managed by a daemonset, ignore it + _, err := daemonStore.GetPodDaemonSets(&pod) + if err == nil { // No error means at least one daemonset was found + continue + } + + glog.V(2).Infof("Starting deletion of pod %v", pod.Name) + recorder.Eventf(&pod, api.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) + if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { + return false, err + } + remaining = true + } + return remaining, nil +} + +func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error { + var zero int64 + err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero}) + if err == nil { + glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name) + } + return err +} + +// forcefullyDeleteNode immediately deletes all pods on the node, and then +// deletes the node itself. +func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, forcefulDeletePodFunc func(*api.Pod) error) error { + selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) + options := api.ListOptions{FieldSelector: selector} + pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options) + if err != nil { + return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err) + } + for _, pod := range pods.Items { + if pod.Spec.NodeName != nodeName { + continue + } + if err := forcefulDeletePodFunc(&pod); err != nil { + return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err) + } + } + if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil { + return fmt.Errorf("unable to delete node %q: %v", nodeName, err) + } + return nil +} + +// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes +// all eviction timer to reset. +func forceUpdateAllProbeTimes(now unversioned.Time, statusData map[string]nodeStatusData) { + for k, v := range statusData { + v.probeTimestamp = now + v.readyTransitionTimestamp = now + statusData[k] = v + } +} + +// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating +// that should not be gracefully terminated. +func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) { + pod, ok := obj.(*api.Pod) + if !ok { + return + } + + // consider only terminating pods + if pod.DeletionTimestamp == nil { + return + } + + // delete terminating pods that have not yet been scheduled + if len(pod.Spec.NodeName) == 0 { + utilruntime.HandleError(forcefulDeletePodFunc(pod)) + return + } + + nodeObj, found, err := nodeStore.GetByKey(pod.Spec.NodeName) + if err != nil { + // this can only happen if the Store.KeyFunc has a problem creating + // a key for the pod. If it happens once, it will happen again so + // don't bother requeuing the pod. + utilruntime.HandleError(err) + return + } + + // delete terminating pods that have been scheduled on + // nonexistent nodes + if !found { + glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName) + utilruntime.HandleError(forcefulDeletePodFunc(pod)) + return + } + + // delete terminating pods that have been scheduled on + // nodes that do not support graceful termination + // TODO(mikedanese): this can be removed when we no longer + // guarantee backwards compatibility of master API to kubelets with + // versions less than 1.1.0 + node := nodeObj.(*api.Node) + v, err := version.Parse(node.Status.NodeInfo.KubeletVersion) + if err != nil { + glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err) + utilruntime.HandleError(forcefulDeletePodFunc(pod)) + return + } + if gracefulDeletionVersion.GT(v) { + utilruntime.HandleError(forcefulDeletePodFunc(pod)) + return + } +} + +// update ready status of all pods running on given node from master +// return true if success +func markAllPodsNotReady(kubeClient clientset.Interface, nodeName string) error { + glog.V(2).Infof("Update ready status of pods on node [%v]", nodeName) + opts := api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName)} + pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(opts) + if err != nil { + return err + } + + errMsg := []string{} + for _, pod := range pods.Items { + // Defensive check, also needed for tests. + if pod.Spec.NodeName != nodeName { + continue + } + + for i, cond := range pod.Status.Conditions { + if cond.Type == api.PodReady { + pod.Status.Conditions[i].Status = api.ConditionFalse + glog.V(2).Infof("Updating ready status of pod %v to false", pod.Name) + _, err := kubeClient.Core().Pods(pod.Namespace).UpdateStatus(&pod) + if err != nil { + glog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err) + errMsg = append(errMsg, fmt.Sprintf("%v", err)) + } + break + } + } + } + if len(errMsg) == 0 { + return nil + } + return fmt.Errorf("%v", strings.Join(errMsg, "; ")) +} + +func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) { + instances, ok := cloud.Instances() + if !ok { + return false, fmt.Errorf("%v", ErrCloudInstance) + } + if _, err := instances.ExternalID(nodeName); err != nil { + if err == cloudprovider.InstanceNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +func recordNodeEvent(recorder record.EventRecorder, nodeName, eventtype, reason, event string) { + ref := &api.ObjectReference{ + Kind: "Node", + Name: nodeName, + UID: types.UID(nodeName), + Namespace: "", + } + glog.V(2).Infof("Recording %s event message for node %s", event, nodeName) + recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event) +} + +func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) { + ref := &api.ObjectReference{ + Kind: "Node", + Name: node.Name, + UID: types.UID(node.Name), + Namespace: "", + } + glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name) + // TODO: This requires a transaction, either both node status is updated + // and event is recorded or neither should happen, see issue #6055. + recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status) +} + +// terminatePods will ensure all pods on the given node that are in terminating state are eventually +// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how +// long before we should check again (the next deadline for a pod to complete), or an error. +func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) { + // the time before we should try again + nextAttempt := time.Duration(0) + // have we deleted all pods + complete := true + + selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) + options := api.ListOptions{FieldSelector: selector} + pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options) + if err != nil { + return false, nextAttempt, err + } + + now := time.Now() + elapsed := now.Sub(since) + for _, pod := range pods.Items { + // Defensive check, also needed for tests. + if pod.Spec.NodeName != nodeName { + continue + } + // only clean terminated pods + if pod.DeletionGracePeriodSeconds == nil { + continue + } + + // the user's requested grace period + grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second + if grace > maxGracePeriod { + grace = maxGracePeriod + } + + // the time remaining before the pod should have been deleted + remaining := grace - elapsed + if remaining < 0 { + remaining = 0 + glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace) + recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName)) + if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil { + glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err) + complete = false + } + } else { + glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining) + complete = false + } + + if nextAttempt < remaining { + nextAttempt = remaining + } + } + return complete, nextAttempt, nil +} diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 7a79b6e98c5..2ac2d1e4c38 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "net" - "strings" "sync" "time" @@ -35,11 +34,8 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -51,7 +47,8 @@ import ( ) var ( - ErrCloudInstance = errors.New("cloud provider doesn't support instances.") + ErrCloudInstance = errors.New("cloud provider doesn't support instances.") + gracefulDeletionVersion = version.MustParse("v1.1.0") ) const ( @@ -208,8 +205,12 @@ func NewNodeController( &api.Pod{}, controller.NoResyncPeriodFunc(), framework.ResourceEventHandlerFuncs{ - AddFunc: nc.maybeDeleteTerminatingPod, - UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) }, + AddFunc: func(obj interface{}) { + nc.maybeDeleteTerminatingPod(obj, nc.nodeStore.Store, nc.forcefullyDeletePod) + }, + UpdateFunc: func(_, obj interface{}) { + nc.maybeDeleteTerminatingPod(obj, nc.nodeStore.Store, nc.forcefullyDeletePod) + }, }, // We don't need to build a index for podStore here actually, but build one for consistency. // It will ensure that if people start making use of the podStore in more specific ways, @@ -301,7 +302,7 @@ func (nc *NodeController) Run(period time.Duration) { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { - remaining, err := nc.deletePods(value.Value) + remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) return false, 0 @@ -320,7 +321,7 @@ func (nc *NodeController) Run(period time.Duration) { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { - completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt) + completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) return false, 0 @@ -328,7 +329,7 @@ func (nc *NodeController) Run(period time.Duration) { if completed { glog.V(2).Infof("All pods terminated on %s", value.Value) - nc.recordNodeEvent(value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value)) + recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value)) return true, 0 } @@ -341,94 +342,14 @@ func (nc *NodeController) Run(period time.Duration) { }) }, nodeEvictionPeriod, wait.NeverStop) - go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop) -} - -var gracefulDeletionVersion = version.MustParse("v1.1.0") - -// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating -// that should not be gracefully terminated. -func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) { - pod, ok := obj.(*api.Pod) - if !ok { - return - } - - // consider only terminating pods - if pod.DeletionTimestamp == nil { - return - } - - // delete terminating pods that have not yet been scheduled - if len(pod.Spec.NodeName) == 0 { - utilruntime.HandleError(nc.forcefullyDeletePod(pod)) - return - } - - nodeObj, found, err := nc.nodeStore.GetByKey(pod.Spec.NodeName) - if err != nil { - // this can only happen if the Store.KeyFunc has a problem creating - // a key for the pod. If it happens once, it will happen again so - // don't bother requeuing the pod. - utilruntime.HandleError(err) - return - } - - // delete terminating pods that have been scheduled on - // nonexistent nodes - if !found { - glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName) - utilruntime.HandleError(nc.forcefullyDeletePod(pod)) - return - } - - // delete terminating pods that have been scheduled on - // nodes that do not support graceful termination - // TODO(mikedanese): this can be removed when we no longer - // guarantee backwards compatibility of master API to kubelets with - // versions less than 1.1.0 - node := nodeObj.(*api.Node) - v, err := version.Parse(node.Status.NodeInfo.KubeletVersion) - if err != nil { - glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err) - utilruntime.HandleError(nc.forcefullyDeletePod(pod)) - return - } - if gracefulDeletionVersion.GT(v) { - utilruntime.HandleError(nc.forcefullyDeletePod(pod)) - return - } -} - -// cleanupOrphanedPods deletes pods that are bound to nodes that don't -// exist. -func (nc *NodeController) cleanupOrphanedPods() { - pods, err := nc.podStore.List(labels.Everything()) - if err != nil { - utilruntime.HandleError(err) - return - } - - for _, pod := range pods { - if pod.Spec.NodeName == "" { - continue - } - if _, exists, _ := nc.nodeStore.Store.GetByKey(pod.Spec.NodeName); exists { - continue - } - if err := nc.forcefullyDeletePod(pod); err != nil { + go wait.Until(func() { + pods, err := nc.podStore.List(labels.Everything()) + if err != nil { utilruntime.HandleError(err) + return } - } -} - -func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error { - var zero int64 - err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero}) - if err == nil { - glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name) - } - return err + cleanupOrphanedPods(pods, nc.nodeStore.Store, nc.forcefullyDeletePod) + }, 30*time.Second, wait.NeverStop) } // monitorNodeStatus verifies node status are constantly updated by kubelet, and if not, @@ -442,7 +363,7 @@ func (nc *NodeController) monitorNodeStatus() error { for _, node := range nodes.Items { if !nc.knownNodeSet.Has(node.Name) { glog.V(1).Infof("NodeController observed a new Node: %#v", node) - nc.recordNodeEvent(node.Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name)) + recordNodeEvent(nc.recorder, node.Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name)) nc.cancelPodEviction(node.Name) nc.knownNodeSet.Insert(node.Name) } @@ -457,7 +378,7 @@ func (nc *NodeController) monitorNodeStatus() error { deleted := nc.knownNodeSet.Difference(observedSet) for nodeName := range deleted { glog.V(1).Infof("NodeController observed a Node deletion: %v", nodeName) - nc.recordNodeEvent(nodeName, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName)) + recordNodeEvent(nc.recorder, nodeName, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName)) nc.evictPods(nodeName) nc.knownNodeSet.Delete(nodeName) } @@ -516,7 +437,7 @@ func (nc *NodeController) monitorNodeStatus() error { // Report node event. if currentReadyCondition.Status != api.ConditionTrue && observedReadyCondition.Status == api.ConditionTrue { recordNodeStatusChange(nc.recorder, node, "NodeNotReady") - if err = nc.markAllPodsNotReady(node.Name); err != nil { + if err = markAllPodsNotReady(nc.kubeClient, node.Name); err != nil { utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) } } @@ -531,13 +452,13 @@ func (nc *NodeController) monitorNodeStatus() error { } if !exists { glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name) - nc.recordNodeEvent(node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) + recordNodeEvent(nc.recorder, node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) go func(nodeName string) { defer utilruntime.HandleCrash() // Kubelet is not reporting and Cloud Provider says node // is gone. Delete it without worrying about grace // periods. - if err := nc.forcefullyDeleteNode(nodeName); err != nil { + if err := forcefullyDeleteNode(nc.kubeClient, nodeName, nc.forcefullyDeletePod); err != nil { glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err) } }(node.Name) @@ -555,7 +476,7 @@ func (nc *NodeController) monitorNodeStatus() error { glog.V(2).Info("NodeController is entering network segmentation mode.") } else { if nc.networkSegmentationMode { - nc.forceUpdateAllProbeTimes() + forceUpdateAllProbeTimes(nc.now(), nc.nodeStatusMap) nc.networkSegmentationMode = false glog.V(2).Info("NodeController exited network segmentation mode.") } @@ -563,67 +484,6 @@ func (nc *NodeController) monitorNodeStatus() error { return nil } -func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) { - instances, ok := cloud.Instances() - if !ok { - return false, fmt.Errorf("%v", ErrCloudInstance) - } - if _, err := instances.ExternalID(nodeName); err != nil { - if err == cloudprovider.InstanceNotFound { - return false, nil - } - return false, err - } - return true, nil -} - -// forcefullyDeleteNode immediately deletes all pods on the node, and then -// deletes the node itself. -func (nc *NodeController) forcefullyDeleteNode(nodeName string) error { - selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) - options := api.ListOptions{FieldSelector: selector} - pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options) - if err != nil { - return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err) - } - for _, pod := range pods.Items { - if pod.Spec.NodeName != nodeName { - continue - } - if err := nc.forcefullyDeletePod(&pod); err != nil { - return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err) - } - } - if err := nc.kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil { - return fmt.Errorf("unable to delete node %q: %v", nodeName, err) - } - return nil -} - -func (nc *NodeController) recordNodeEvent(nodeName, eventtype, reason, event string) { - ref := &api.ObjectReference{ - Kind: "Node", - Name: nodeName, - UID: types.UID(nodeName), - Namespace: "", - } - glog.V(2).Infof("Recording %s event message for node %s", event, nodeName) - nc.recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event) -} - -func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) { - ref := &api.ObjectReference{ - Kind: "Node", - Name: node.Name, - UID: types.UID(node.Name), - Namespace: "", - } - glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name) - // TODO: This requires a transaction, either both node status is updated - // and event is recorded or neither should happen, see issue #6055. - recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status) -} - // For a given node checks its conditions and tries to update it. Returns grace period to which given node // is entitled, state of current and last observed Ready Condition, and an error if it occurred. func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) { @@ -789,28 +649,6 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap return gracePeriod, observedReadyCondition, currentReadyCondition, err } -// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes -// all eviction timer to reset. -func (nc *NodeController) forceUpdateAllProbeTimes() { - now := nc.now() - for k, v := range nc.nodeStatusMap { - v.probeTimestamp = now - v.readyTransitionTimestamp = now - nc.nodeStatusMap[k] = v - } -} - -// evictPods queues an eviction for the provided node name, and returns false if the node is already -// queued for eviction. -func (nc *NodeController) evictPods(nodeName string) bool { - if nc.networkSegmentationMode { - return false - } - nc.evictorLock.Lock() - defer nc.evictorLock.Unlock() - return nc.podEvictor.Add(nodeName) -} - // cancelPodEviction removes any queued evictions, typically because the node is available again. It // returns true if an eviction was queued. func (nc *NodeController) cancelPodEviction(nodeName string) bool { @@ -825,6 +663,17 @@ func (nc *NodeController) cancelPodEviction(nodeName string) bool { return false } +// evictPods queues an eviction for the provided node name, and returns false if the node is already +// queued for eviction. +func (nc *NodeController) evictPods(nodeName string) bool { + if nc.networkSegmentationMode { + return false + } + nc.evictorLock.Lock() + defer nc.evictorLock.Unlock() + return nc.podEvictor.Add(nodeName) +} + // stopAllPodEvictions removes any queued evictions for all Nodes. func (nc *NodeController) stopAllPodEvictions() { nc.evictorLock.Lock() @@ -833,135 +682,3 @@ func (nc *NodeController) stopAllPodEvictions() { nc.podEvictor.Clear() nc.terminationEvictor.Clear() } - -// deletePods will delete all pods from master running on given node, and return true -// if any pods were deleted. -func (nc *NodeController) deletePods(nodeName string) (bool, error) { - remaining := false - selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) - options := api.ListOptions{FieldSelector: selector} - pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options) - if err != nil { - return remaining, err - } - - if len(pods.Items) > 0 { - nc.recordNodeEvent(nodeName, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) - } - - for _, pod := range pods.Items { - // Defensive check, also needed for tests. - if pod.Spec.NodeName != nodeName { - continue - } - // if the pod has already been deleted, ignore it - if pod.DeletionGracePeriodSeconds != nil { - continue - } - // if the pod is managed by a daemonset, ignore it - _, err := nc.daemonSetStore.GetPodDaemonSets(&pod) - if err == nil { // No error means at least one daemonset was found - continue - } - - glog.V(2).Infof("Starting deletion of pod %v", pod.Name) - nc.recorder.Eventf(&pod, api.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) - if err := nc.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { - return false, err - } - remaining = true - } - return remaining, nil -} - -// update ready status of all pods running on given node from master -// return true if success -func (nc *NodeController) markAllPodsNotReady(nodeName string) error { - glog.V(2).Infof("Update ready status of pods on node [%v]", nodeName) - opts := api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName)} - pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(opts) - if err != nil { - return err - } - - errMsg := []string{} - for _, pod := range pods.Items { - // Defensive check, also needed for tests. - if pod.Spec.NodeName != nodeName { - continue - } - - for i, cond := range pod.Status.Conditions { - if cond.Type == api.PodReady { - pod.Status.Conditions[i].Status = api.ConditionFalse - glog.V(2).Infof("Updating ready status of pod %v to false", pod.Name) - _, err := nc.kubeClient.Core().Pods(pod.Namespace).UpdateStatus(&pod) - if err != nil { - glog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err) - errMsg = append(errMsg, fmt.Sprintf("%v", err)) - } - break - } - } - } - if len(errMsg) == 0 { - return nil - } - return fmt.Errorf("%v", strings.Join(errMsg, "; ")) -} - -// terminatePods will ensure all pods on the given node that are in terminating state are eventually -// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how -// long before we should check again (the next deadline for a pod to complete), or an error. -func (nc *NodeController) terminatePods(nodeName string, since time.Time) (bool, time.Duration, error) { - // the time before we should try again - nextAttempt := time.Duration(0) - // have we deleted all pods - complete := true - - selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) - options := api.ListOptions{FieldSelector: selector} - pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options) - if err != nil { - return false, nextAttempt, err - } - - now := time.Now() - elapsed := now.Sub(since) - for _, pod := range pods.Items { - // Defensive check, also needed for tests. - if pod.Spec.NodeName != nodeName { - continue - } - // only clean terminated pods - if pod.DeletionGracePeriodSeconds == nil { - continue - } - - // the user's requested grace period - grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second - if grace > nc.maximumGracePeriod { - grace = nc.maximumGracePeriod - } - - // the time remaining before the pod should have been deleted - remaining := grace - elapsed - if remaining < 0 { - remaining = 0 - glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace) - nc.recordNodeEvent(nodeName, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName)) - if err := nc.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil { - glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err) - complete = false - } - } else { - glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining) - complete = false - } - - if nextAttempt < remaining { - nextAttempt = remaining - } - } - return complete, nextAttempt, nil -} diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 8a3d392099a..56cba09120b 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -555,14 +555,14 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { - remaining, _ := nodeController.deletePods(value.Value) + remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore) if remaining { nodeController.terminationEvictor.Add(value.Value) } return true, 0 }) nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { - nodeController.terminatePods(value.Value, value.AddedAt) + terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod) return true, 0 }) podEvicted := false @@ -1082,7 +1082,7 @@ func TestNodeDeletion(t *testing.T) { t.Errorf("unexpected error: %v", err) } nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { - nodeController.deletePods(value.Value) + deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore) return true, 0 }) podEvicted := false @@ -1220,12 +1220,12 @@ func TestCheckPod(t *testing.T) { for i, tc := range tcs { var deleteCalls int - nc.forcefullyDeletePod = func(_ *api.Pod) error { + forcefullyDeletePodsFunc := func(_ *api.Pod) error { deleteCalls++ return nil } - nc.maybeDeleteTerminatingPod(&tc.pod) + nc.maybeDeleteTerminatingPod(&tc.pod, nc.nodeStore.Store, forcefullyDeletePodsFunc) if tc.prune && deleteCalls != 1 { t.Errorf("[%v] expected number of delete calls to be 1 but got %v", i, deleteCalls) @@ -1237,17 +1237,7 @@ func TestCheckPod(t *testing.T) { } func TestCleanupOrphanedPods(t *testing.T) { - newPod := func(name, node string) api.Pod { - return api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: name, - }, - Spec: api.PodSpec{ - NodeName: node, - }, - } - } - pods := []api.Pod{ + pods := []*api.Pod{ newPod("a", "foo"), newPod("b", "bar"), newPod("c", "gone"), @@ -1263,12 +1253,12 @@ func TestCleanupOrphanedPods(t *testing.T) { var deleteCalls int var deletedPodName string - nc.forcefullyDeletePod = func(p *api.Pod) error { + forcefullyDeletePodFunc := func(p *api.Pod) error { deleteCalls++ deletedPodName = p.ObjectMeta.Name return nil } - nc.cleanupOrphanedPods() + cleanupOrphanedPods(pods, nc.nodeStore.Store, forcefullyDeletePodFunc) if deleteCalls != 1 { t.Fatalf("expected one delete, got: %v", deleteCalls)