mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Merge pull request #28770 from gmarek/coupling
Automatic merge from submit-queue Reduce tightness of coupling in NodeController Depends on #28604
This commit is contained in:
commit
ea70eca37b
319
pkg/controller/node/deletion_utils.go
Normal file
319
pkg/controller/node/deletion_utils.go
Normal file
@ -0,0 +1,319 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package node
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
|
"k8s.io/kubernetes/pkg/types"
|
||||||
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/version"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// cleanupOrphanedPods deletes pods that are bound to nodes that don't
|
||||||
|
// exist.
|
||||||
|
func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
|
||||||
|
for _, pod := range pods {
|
||||||
|
if pod.Spec.NodeName == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, exists, _ := nodeStore.GetByKey(pod.Spec.NodeName); exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := forcefulDeletePodFunc(pod); err != nil {
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// deletePods will delete all pods from master running on given node, and return true
|
||||||
|
// if any pods were deleted.
|
||||||
|
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, daemonStore cache.StoreToDaemonSetLister) (bool, error) {
|
||||||
|
remaining := false
|
||||||
|
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
|
||||||
|
options := api.ListOptions{FieldSelector: selector}
|
||||||
|
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
|
||||||
|
if err != nil {
|
||||||
|
return remaining, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pods.Items) > 0 {
|
||||||
|
recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
// Defensive check, also needed for tests.
|
||||||
|
if pod.Spec.NodeName != nodeName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// if the pod has already been deleted, ignore it
|
||||||
|
if pod.DeletionGracePeriodSeconds != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// if the pod is managed by a daemonset, ignore it
|
||||||
|
_, err := daemonStore.GetPodDaemonSets(&pod)
|
||||||
|
if err == nil { // No error means at least one daemonset was found
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(2).Infof("Starting deletion of pod %v", pod.Name)
|
||||||
|
recorder.Eventf(&pod, api.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
|
||||||
|
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
remaining = true
|
||||||
|
}
|
||||||
|
return remaining, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
|
||||||
|
var zero int64
|
||||||
|
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
|
||||||
|
if err == nil {
|
||||||
|
glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// forcefullyDeleteNode immediately deletes all pods on the node, and then
|
||||||
|
// deletes the node itself.
|
||||||
|
func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, forcefulDeletePodFunc func(*api.Pod) error) error {
|
||||||
|
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
|
||||||
|
options := api.ListOptions{FieldSelector: selector}
|
||||||
|
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err)
|
||||||
|
}
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if pod.Spec.NodeName != nodeName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := forcefulDeletePodFunc(&pod); err != nil {
|
||||||
|
return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
|
||||||
|
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes
|
||||||
|
// all eviction timer to reset.
|
||||||
|
func forceUpdateAllProbeTimes(now unversioned.Time, statusData map[string]nodeStatusData) {
|
||||||
|
for k, v := range statusData {
|
||||||
|
v.probeTimestamp = now
|
||||||
|
v.readyTransitionTimestamp = now
|
||||||
|
statusData[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
|
||||||
|
// that should not be gracefully terminated.
|
||||||
|
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
|
||||||
|
pod, ok := obj.(*api.Pod)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// consider only terminating pods
|
||||||
|
if pod.DeletionTimestamp == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete terminating pods that have not yet been scheduled
|
||||||
|
if len(pod.Spec.NodeName) == 0 {
|
||||||
|
utilruntime.HandleError(forcefulDeletePodFunc(pod))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeObj, found, err := nodeStore.GetByKey(pod.Spec.NodeName)
|
||||||
|
if err != nil {
|
||||||
|
// this can only happen if the Store.KeyFunc has a problem creating
|
||||||
|
// a key for the pod. If it happens once, it will happen again so
|
||||||
|
// don't bother requeuing the pod.
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete terminating pods that have been scheduled on
|
||||||
|
// nonexistent nodes
|
||||||
|
if !found {
|
||||||
|
glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName)
|
||||||
|
utilruntime.HandleError(forcefulDeletePodFunc(pod))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete terminating pods that have been scheduled on
|
||||||
|
// nodes that do not support graceful termination
|
||||||
|
// TODO(mikedanese): this can be removed when we no longer
|
||||||
|
// guarantee backwards compatibility of master API to kubelets with
|
||||||
|
// versions less than 1.1.0
|
||||||
|
node := nodeObj.(*api.Node)
|
||||||
|
v, err := version.Parse(node.Status.NodeInfo.KubeletVersion)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err)
|
||||||
|
utilruntime.HandleError(forcefulDeletePodFunc(pod))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if gracefulDeletionVersion.GT(v) {
|
||||||
|
utilruntime.HandleError(forcefulDeletePodFunc(pod))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update ready status of all pods running on given node from master
|
||||||
|
// return true if success
|
||||||
|
func markAllPodsNotReady(kubeClient clientset.Interface, nodeName string) error {
|
||||||
|
glog.V(2).Infof("Update ready status of pods on node [%v]", nodeName)
|
||||||
|
opts := api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName)}
|
||||||
|
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(opts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
errMsg := []string{}
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
// Defensive check, also needed for tests.
|
||||||
|
if pod.Spec.NodeName != nodeName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, cond := range pod.Status.Conditions {
|
||||||
|
if cond.Type == api.PodReady {
|
||||||
|
pod.Status.Conditions[i].Status = api.ConditionFalse
|
||||||
|
glog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
|
||||||
|
_, err := kubeClient.Core().Pods(pod.Namespace).UpdateStatus(&pod)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err)
|
||||||
|
errMsg = append(errMsg, fmt.Sprintf("%v", err))
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(errMsg) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("%v", strings.Join(errMsg, "; "))
|
||||||
|
}
|
||||||
|
|
||||||
|
func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) {
|
||||||
|
instances, ok := cloud.Instances()
|
||||||
|
if !ok {
|
||||||
|
return false, fmt.Errorf("%v", ErrCloudInstance)
|
||||||
|
}
|
||||||
|
if _, err := instances.ExternalID(nodeName); err != nil {
|
||||||
|
if err == cloudprovider.InstanceNotFound {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func recordNodeEvent(recorder record.EventRecorder, nodeName, eventtype, reason, event string) {
|
||||||
|
ref := &api.ObjectReference{
|
||||||
|
Kind: "Node",
|
||||||
|
Name: nodeName,
|
||||||
|
UID: types.UID(nodeName),
|
||||||
|
Namespace: "",
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
|
||||||
|
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) {
|
||||||
|
ref := &api.ObjectReference{
|
||||||
|
Kind: "Node",
|
||||||
|
Name: node.Name,
|
||||||
|
UID: types.UID(node.Name),
|
||||||
|
Namespace: "",
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name)
|
||||||
|
// TODO: This requires a transaction, either both node status is updated
|
||||||
|
// and event is recorded or neither should happen, see issue #6055.
|
||||||
|
recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
|
||||||
|
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
|
||||||
|
// long before we should check again (the next deadline for a pod to complete), or an error.
|
||||||
|
func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) {
|
||||||
|
// the time before we should try again
|
||||||
|
nextAttempt := time.Duration(0)
|
||||||
|
// have we deleted all pods
|
||||||
|
complete := true
|
||||||
|
|
||||||
|
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
|
||||||
|
options := api.ListOptions{FieldSelector: selector}
|
||||||
|
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
|
||||||
|
if err != nil {
|
||||||
|
return false, nextAttempt, err
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
elapsed := now.Sub(since)
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
// Defensive check, also needed for tests.
|
||||||
|
if pod.Spec.NodeName != nodeName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// only clean terminated pods
|
||||||
|
if pod.DeletionGracePeriodSeconds == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// the user's requested grace period
|
||||||
|
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
|
||||||
|
if grace > maxGracePeriod {
|
||||||
|
grace = maxGracePeriod
|
||||||
|
}
|
||||||
|
|
||||||
|
// the time remaining before the pod should have been deleted
|
||||||
|
remaining := grace - elapsed
|
||||||
|
if remaining < 0 {
|
||||||
|
remaining = 0
|
||||||
|
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
|
||||||
|
recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
|
||||||
|
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
|
||||||
|
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
|
||||||
|
complete = false
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining)
|
||||||
|
complete = false
|
||||||
|
}
|
||||||
|
|
||||||
|
if nextAttempt < remaining {
|
||||||
|
nextAttempt = remaining
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return complete, nextAttempt, nil
|
||||||
|
}
|
@ -20,7 +20,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -35,11 +34,8 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/framework"
|
"k8s.io/kubernetes/pkg/controller/framework"
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
|
||||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
@ -52,6 +48,7 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
|
ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
|
||||||
|
gracefulDeletionVersion = version.MustParse("v1.1.0")
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -208,8 +205,12 @@ func NewNodeController(
|
|||||||
&api.Pod{},
|
&api.Pod{},
|
||||||
controller.NoResyncPeriodFunc(),
|
controller.NoResyncPeriodFunc(),
|
||||||
framework.ResourceEventHandlerFuncs{
|
framework.ResourceEventHandlerFuncs{
|
||||||
AddFunc: nc.maybeDeleteTerminatingPod,
|
AddFunc: func(obj interface{}) {
|
||||||
UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
|
nc.maybeDeleteTerminatingPod(obj, nc.nodeStore.Store, nc.forcefullyDeletePod)
|
||||||
|
},
|
||||||
|
UpdateFunc: func(_, obj interface{}) {
|
||||||
|
nc.maybeDeleteTerminatingPod(obj, nc.nodeStore.Store, nc.forcefullyDeletePod)
|
||||||
|
},
|
||||||
},
|
},
|
||||||
// We don't need to build a index for podStore here actually, but build one for consistency.
|
// We don't need to build a index for podStore here actually, but build one for consistency.
|
||||||
// It will ensure that if people start making use of the podStore in more specific ways,
|
// It will ensure that if people start making use of the podStore in more specific ways,
|
||||||
@ -301,7 +302,7 @@ func (nc *NodeController) Run(period time.Duration) {
|
|||||||
nc.evictorLock.Lock()
|
nc.evictorLock.Lock()
|
||||||
defer nc.evictorLock.Unlock()
|
defer nc.evictorLock.Unlock()
|
||||||
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||||
remaining, err := nc.deletePods(value.Value)
|
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
|
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
|
||||||
return false, 0
|
return false, 0
|
||||||
@ -320,7 +321,7 @@ func (nc *NodeController) Run(period time.Duration) {
|
|||||||
nc.evictorLock.Lock()
|
nc.evictorLock.Lock()
|
||||||
defer nc.evictorLock.Unlock()
|
defer nc.evictorLock.Unlock()
|
||||||
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||||
completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt)
|
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
|
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
|
||||||
return false, 0
|
return false, 0
|
||||||
@ -328,7 +329,7 @@ func (nc *NodeController) Run(period time.Duration) {
|
|||||||
|
|
||||||
if completed {
|
if completed {
|
||||||
glog.V(2).Infof("All pods terminated on %s", value.Value)
|
glog.V(2).Infof("All pods terminated on %s", value.Value)
|
||||||
nc.recordNodeEvent(value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
|
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
|
||||||
return true, 0
|
return true, 0
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -341,94 +342,14 @@ func (nc *NodeController) Run(period time.Duration) {
|
|||||||
})
|
})
|
||||||
}, nodeEvictionPeriod, wait.NeverStop)
|
}, nodeEvictionPeriod, wait.NeverStop)
|
||||||
|
|
||||||
go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop)
|
go wait.Until(func() {
|
||||||
}
|
|
||||||
|
|
||||||
var gracefulDeletionVersion = version.MustParse("v1.1.0")
|
|
||||||
|
|
||||||
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
|
|
||||||
// that should not be gracefully terminated.
|
|
||||||
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
|
|
||||||
pod, ok := obj.(*api.Pod)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// consider only terminating pods
|
|
||||||
if pod.DeletionTimestamp == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete terminating pods that have not yet been scheduled
|
|
||||||
if len(pod.Spec.NodeName) == 0 {
|
|
||||||
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeObj, found, err := nc.nodeStore.GetByKey(pod.Spec.NodeName)
|
|
||||||
if err != nil {
|
|
||||||
// this can only happen if the Store.KeyFunc has a problem creating
|
|
||||||
// a key for the pod. If it happens once, it will happen again so
|
|
||||||
// don't bother requeuing the pod.
|
|
||||||
utilruntime.HandleError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete terminating pods that have been scheduled on
|
|
||||||
// nonexistent nodes
|
|
||||||
if !found {
|
|
||||||
glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName)
|
|
||||||
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete terminating pods that have been scheduled on
|
|
||||||
// nodes that do not support graceful termination
|
|
||||||
// TODO(mikedanese): this can be removed when we no longer
|
|
||||||
// guarantee backwards compatibility of master API to kubelets with
|
|
||||||
// versions less than 1.1.0
|
|
||||||
node := nodeObj.(*api.Node)
|
|
||||||
v, err := version.Parse(node.Status.NodeInfo.KubeletVersion)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err)
|
|
||||||
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if gracefulDeletionVersion.GT(v) {
|
|
||||||
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// cleanupOrphanedPods deletes pods that are bound to nodes that don't
|
|
||||||
// exist.
|
|
||||||
func (nc *NodeController) cleanupOrphanedPods() {
|
|
||||||
pods, err := nc.podStore.List(labels.Everything())
|
pods, err := nc.podStore.List(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(err)
|
utilruntime.HandleError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
cleanupOrphanedPods(pods, nc.nodeStore.Store, nc.forcefullyDeletePod)
|
||||||
for _, pod := range pods {
|
}, 30*time.Second, wait.NeverStop)
|
||||||
if pod.Spec.NodeName == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, exists, _ := nc.nodeStore.Store.GetByKey(pod.Spec.NodeName); exists {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := nc.forcefullyDeletePod(pod); err != nil {
|
|
||||||
utilruntime.HandleError(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
|
|
||||||
var zero int64
|
|
||||||
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
|
|
||||||
if err == nil {
|
|
||||||
glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
|
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
|
||||||
@ -442,7 +363,7 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
for _, node := range nodes.Items {
|
for _, node := range nodes.Items {
|
||||||
if !nc.knownNodeSet.Has(node.Name) {
|
if !nc.knownNodeSet.Has(node.Name) {
|
||||||
glog.V(1).Infof("NodeController observed a new Node: %#v", node)
|
glog.V(1).Infof("NodeController observed a new Node: %#v", node)
|
||||||
nc.recordNodeEvent(node.Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name))
|
recordNodeEvent(nc.recorder, node.Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name))
|
||||||
nc.cancelPodEviction(node.Name)
|
nc.cancelPodEviction(node.Name)
|
||||||
nc.knownNodeSet.Insert(node.Name)
|
nc.knownNodeSet.Insert(node.Name)
|
||||||
}
|
}
|
||||||
@ -457,7 +378,7 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
deleted := nc.knownNodeSet.Difference(observedSet)
|
deleted := nc.knownNodeSet.Difference(observedSet)
|
||||||
for nodeName := range deleted {
|
for nodeName := range deleted {
|
||||||
glog.V(1).Infof("NodeController observed a Node deletion: %v", nodeName)
|
glog.V(1).Infof("NodeController observed a Node deletion: %v", nodeName)
|
||||||
nc.recordNodeEvent(nodeName, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName))
|
recordNodeEvent(nc.recorder, nodeName, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName))
|
||||||
nc.evictPods(nodeName)
|
nc.evictPods(nodeName)
|
||||||
nc.knownNodeSet.Delete(nodeName)
|
nc.knownNodeSet.Delete(nodeName)
|
||||||
}
|
}
|
||||||
@ -516,7 +437,7 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
// Report node event.
|
// Report node event.
|
||||||
if currentReadyCondition.Status != api.ConditionTrue && observedReadyCondition.Status == api.ConditionTrue {
|
if currentReadyCondition.Status != api.ConditionTrue && observedReadyCondition.Status == api.ConditionTrue {
|
||||||
recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
|
recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
|
||||||
if err = nc.markAllPodsNotReady(node.Name); err != nil {
|
if err = markAllPodsNotReady(nc.kubeClient, node.Name); err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
|
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -531,13 +452,13 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
|
glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
|
||||||
nc.recordNodeEvent(node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
|
recordNodeEvent(nc.recorder, node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
|
||||||
go func(nodeName string) {
|
go func(nodeName string) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
// Kubelet is not reporting and Cloud Provider says node
|
// Kubelet is not reporting and Cloud Provider says node
|
||||||
// is gone. Delete it without worrying about grace
|
// is gone. Delete it without worrying about grace
|
||||||
// periods.
|
// periods.
|
||||||
if err := nc.forcefullyDeleteNode(nodeName); err != nil {
|
if err := forcefullyDeleteNode(nc.kubeClient, nodeName, nc.forcefullyDeletePod); err != nil {
|
||||||
glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
|
glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
|
||||||
}
|
}
|
||||||
}(node.Name)
|
}(node.Name)
|
||||||
@ -555,7 +476,7 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
glog.V(2).Info("NodeController is entering network segmentation mode.")
|
glog.V(2).Info("NodeController is entering network segmentation mode.")
|
||||||
} else {
|
} else {
|
||||||
if nc.networkSegmentationMode {
|
if nc.networkSegmentationMode {
|
||||||
nc.forceUpdateAllProbeTimes()
|
forceUpdateAllProbeTimes(nc.now(), nc.nodeStatusMap)
|
||||||
nc.networkSegmentationMode = false
|
nc.networkSegmentationMode = false
|
||||||
glog.V(2).Info("NodeController exited network segmentation mode.")
|
glog.V(2).Info("NodeController exited network segmentation mode.")
|
||||||
}
|
}
|
||||||
@ -563,67 +484,6 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) {
|
|
||||||
instances, ok := cloud.Instances()
|
|
||||||
if !ok {
|
|
||||||
return false, fmt.Errorf("%v", ErrCloudInstance)
|
|
||||||
}
|
|
||||||
if _, err := instances.ExternalID(nodeName); err != nil {
|
|
||||||
if err == cloudprovider.InstanceNotFound {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// forcefullyDeleteNode immediately deletes all pods on the node, and then
|
|
||||||
// deletes the node itself.
|
|
||||||
func (nc *NodeController) forcefullyDeleteNode(nodeName string) error {
|
|
||||||
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
|
|
||||||
options := api.ListOptions{FieldSelector: selector}
|
|
||||||
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err)
|
|
||||||
}
|
|
||||||
for _, pod := range pods.Items {
|
|
||||||
if pod.Spec.NodeName != nodeName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := nc.forcefullyDeletePod(&pod); err != nil {
|
|
||||||
return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := nc.kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
|
|
||||||
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nc *NodeController) recordNodeEvent(nodeName, eventtype, reason, event string) {
|
|
||||||
ref := &api.ObjectReference{
|
|
||||||
Kind: "Node",
|
|
||||||
Name: nodeName,
|
|
||||||
UID: types.UID(nodeName),
|
|
||||||
Namespace: "",
|
|
||||||
}
|
|
||||||
glog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
|
|
||||||
nc.recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
|
|
||||||
}
|
|
||||||
|
|
||||||
func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) {
|
|
||||||
ref := &api.ObjectReference{
|
|
||||||
Kind: "Node",
|
|
||||||
Name: node.Name,
|
|
||||||
UID: types.UID(node.Name),
|
|
||||||
Namespace: "",
|
|
||||||
}
|
|
||||||
glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name)
|
|
||||||
// TODO: This requires a transaction, either both node status is updated
|
|
||||||
// and event is recorded or neither should happen, see issue #6055.
|
|
||||||
recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
|
|
||||||
}
|
|
||||||
|
|
||||||
// For a given node checks its conditions and tries to update it. Returns grace period to which given node
|
// For a given node checks its conditions and tries to update it. Returns grace period to which given node
|
||||||
// is entitled, state of current and last observed Ready Condition, and an error if it occurred.
|
// is entitled, state of current and last observed Ready Condition, and an error if it occurred.
|
||||||
func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) {
|
func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) {
|
||||||
@ -789,28 +649,6 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||||||
return gracePeriod, observedReadyCondition, currentReadyCondition, err
|
return gracePeriod, observedReadyCondition, currentReadyCondition, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes
|
|
||||||
// all eviction timer to reset.
|
|
||||||
func (nc *NodeController) forceUpdateAllProbeTimes() {
|
|
||||||
now := nc.now()
|
|
||||||
for k, v := range nc.nodeStatusMap {
|
|
||||||
v.probeTimestamp = now
|
|
||||||
v.readyTransitionTimestamp = now
|
|
||||||
nc.nodeStatusMap[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// evictPods queues an eviction for the provided node name, and returns false if the node is already
|
|
||||||
// queued for eviction.
|
|
||||||
func (nc *NodeController) evictPods(nodeName string) bool {
|
|
||||||
if nc.networkSegmentationMode {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
nc.evictorLock.Lock()
|
|
||||||
defer nc.evictorLock.Unlock()
|
|
||||||
return nc.podEvictor.Add(nodeName)
|
|
||||||
}
|
|
||||||
|
|
||||||
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
|
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
|
||||||
// returns true if an eviction was queued.
|
// returns true if an eviction was queued.
|
||||||
func (nc *NodeController) cancelPodEviction(nodeName string) bool {
|
func (nc *NodeController) cancelPodEviction(nodeName string) bool {
|
||||||
@ -825,6 +663,17 @@ func (nc *NodeController) cancelPodEviction(nodeName string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// evictPods queues an eviction for the provided node name, and returns false if the node is already
|
||||||
|
// queued for eviction.
|
||||||
|
func (nc *NodeController) evictPods(nodeName string) bool {
|
||||||
|
if nc.networkSegmentationMode {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
nc.evictorLock.Lock()
|
||||||
|
defer nc.evictorLock.Unlock()
|
||||||
|
return nc.podEvictor.Add(nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
// stopAllPodEvictions removes any queued evictions for all Nodes.
|
// stopAllPodEvictions removes any queued evictions for all Nodes.
|
||||||
func (nc *NodeController) stopAllPodEvictions() {
|
func (nc *NodeController) stopAllPodEvictions() {
|
||||||
nc.evictorLock.Lock()
|
nc.evictorLock.Lock()
|
||||||
@ -833,135 +682,3 @@ func (nc *NodeController) stopAllPodEvictions() {
|
|||||||
nc.podEvictor.Clear()
|
nc.podEvictor.Clear()
|
||||||
nc.terminationEvictor.Clear()
|
nc.terminationEvictor.Clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
// deletePods will delete all pods from master running on given node, and return true
|
|
||||||
// if any pods were deleted.
|
|
||||||
func (nc *NodeController) deletePods(nodeName string) (bool, error) {
|
|
||||||
remaining := false
|
|
||||||
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
|
|
||||||
options := api.ListOptions{FieldSelector: selector}
|
|
||||||
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
|
|
||||||
if err != nil {
|
|
||||||
return remaining, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(pods.Items) > 0 {
|
|
||||||
nc.recordNodeEvent(nodeName, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, pod := range pods.Items {
|
|
||||||
// Defensive check, also needed for tests.
|
|
||||||
if pod.Spec.NodeName != nodeName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// if the pod has already been deleted, ignore it
|
|
||||||
if pod.DeletionGracePeriodSeconds != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// if the pod is managed by a daemonset, ignore it
|
|
||||||
_, err := nc.daemonSetStore.GetPodDaemonSets(&pod)
|
|
||||||
if err == nil { // No error means at least one daemonset was found
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(2).Infof("Starting deletion of pod %v", pod.Name)
|
|
||||||
nc.recorder.Eventf(&pod, api.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
|
|
||||||
if err := nc.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
remaining = true
|
|
||||||
}
|
|
||||||
return remaining, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// update ready status of all pods running on given node from master
|
|
||||||
// return true if success
|
|
||||||
func (nc *NodeController) markAllPodsNotReady(nodeName string) error {
|
|
||||||
glog.V(2).Infof("Update ready status of pods on node [%v]", nodeName)
|
|
||||||
opts := api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName)}
|
|
||||||
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(opts)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
errMsg := []string{}
|
|
||||||
for _, pod := range pods.Items {
|
|
||||||
// Defensive check, also needed for tests.
|
|
||||||
if pod.Spec.NodeName != nodeName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, cond := range pod.Status.Conditions {
|
|
||||||
if cond.Type == api.PodReady {
|
|
||||||
pod.Status.Conditions[i].Status = api.ConditionFalse
|
|
||||||
glog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
|
|
||||||
_, err := nc.kubeClient.Core().Pods(pod.Namespace).UpdateStatus(&pod)
|
|
||||||
if err != nil {
|
|
||||||
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err)
|
|
||||||
errMsg = append(errMsg, fmt.Sprintf("%v", err))
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(errMsg) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return fmt.Errorf("%v", strings.Join(errMsg, "; "))
|
|
||||||
}
|
|
||||||
|
|
||||||
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
|
|
||||||
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
|
|
||||||
// long before we should check again (the next deadline for a pod to complete), or an error.
|
|
||||||
func (nc *NodeController) terminatePods(nodeName string, since time.Time) (bool, time.Duration, error) {
|
|
||||||
// the time before we should try again
|
|
||||||
nextAttempt := time.Duration(0)
|
|
||||||
// have we deleted all pods
|
|
||||||
complete := true
|
|
||||||
|
|
||||||
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
|
|
||||||
options := api.ListOptions{FieldSelector: selector}
|
|
||||||
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
|
|
||||||
if err != nil {
|
|
||||||
return false, nextAttempt, err
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
elapsed := now.Sub(since)
|
|
||||||
for _, pod := range pods.Items {
|
|
||||||
// Defensive check, also needed for tests.
|
|
||||||
if pod.Spec.NodeName != nodeName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// only clean terminated pods
|
|
||||||
if pod.DeletionGracePeriodSeconds == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// the user's requested grace period
|
|
||||||
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
|
|
||||||
if grace > nc.maximumGracePeriod {
|
|
||||||
grace = nc.maximumGracePeriod
|
|
||||||
}
|
|
||||||
|
|
||||||
// the time remaining before the pod should have been deleted
|
|
||||||
remaining := grace - elapsed
|
|
||||||
if remaining < 0 {
|
|
||||||
remaining = 0
|
|
||||||
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
|
|
||||||
nc.recordNodeEvent(nodeName, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
|
|
||||||
if err := nc.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
|
|
||||||
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
|
|
||||||
complete = false
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining)
|
|
||||||
complete = false
|
|
||||||
}
|
|
||||||
|
|
||||||
if nextAttempt < remaining {
|
|
||||||
nextAttempt = remaining
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return complete, nextAttempt, nil
|
|
||||||
}
|
|
||||||
|
@ -555,14 +555,14 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||||
remaining, _ := nodeController.deletePods(value.Value)
|
remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore)
|
||||||
if remaining {
|
if remaining {
|
||||||
nodeController.terminationEvictor.Add(value.Value)
|
nodeController.terminationEvictor.Add(value.Value)
|
||||||
}
|
}
|
||||||
return true, 0
|
return true, 0
|
||||||
})
|
})
|
||||||
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||||
nodeController.terminatePods(value.Value, value.AddedAt)
|
terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod)
|
||||||
return true, 0
|
return true, 0
|
||||||
})
|
})
|
||||||
podEvicted := false
|
podEvicted := false
|
||||||
@ -1082,7 +1082,7 @@ func TestNodeDeletion(t *testing.T) {
|
|||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||||
nodeController.deletePods(value.Value)
|
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore)
|
||||||
return true, 0
|
return true, 0
|
||||||
})
|
})
|
||||||
podEvicted := false
|
podEvicted := false
|
||||||
@ -1220,12 +1220,12 @@ func TestCheckPod(t *testing.T) {
|
|||||||
|
|
||||||
for i, tc := range tcs {
|
for i, tc := range tcs {
|
||||||
var deleteCalls int
|
var deleteCalls int
|
||||||
nc.forcefullyDeletePod = func(_ *api.Pod) error {
|
forcefullyDeletePodsFunc := func(_ *api.Pod) error {
|
||||||
deleteCalls++
|
deleteCalls++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
nc.maybeDeleteTerminatingPod(&tc.pod)
|
nc.maybeDeleteTerminatingPod(&tc.pod, nc.nodeStore.Store, forcefullyDeletePodsFunc)
|
||||||
|
|
||||||
if tc.prune && deleteCalls != 1 {
|
if tc.prune && deleteCalls != 1 {
|
||||||
t.Errorf("[%v] expected number of delete calls to be 1 but got %v", i, deleteCalls)
|
t.Errorf("[%v] expected number of delete calls to be 1 but got %v", i, deleteCalls)
|
||||||
@ -1237,17 +1237,7 @@ func TestCheckPod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestCleanupOrphanedPods(t *testing.T) {
|
func TestCleanupOrphanedPods(t *testing.T) {
|
||||||
newPod := func(name, node string) api.Pod {
|
pods := []*api.Pod{
|
||||||
return api.Pod{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: name,
|
|
||||||
},
|
|
||||||
Spec: api.PodSpec{
|
|
||||||
NodeName: node,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pods := []api.Pod{
|
|
||||||
newPod("a", "foo"),
|
newPod("a", "foo"),
|
||||||
newPod("b", "bar"),
|
newPod("b", "bar"),
|
||||||
newPod("c", "gone"),
|
newPod("c", "gone"),
|
||||||
@ -1263,12 +1253,12 @@ func TestCleanupOrphanedPods(t *testing.T) {
|
|||||||
|
|
||||||
var deleteCalls int
|
var deleteCalls int
|
||||||
var deletedPodName string
|
var deletedPodName string
|
||||||
nc.forcefullyDeletePod = func(p *api.Pod) error {
|
forcefullyDeletePodFunc := func(p *api.Pod) error {
|
||||||
deleteCalls++
|
deleteCalls++
|
||||||
deletedPodName = p.ObjectMeta.Name
|
deletedPodName = p.ObjectMeta.Name
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
nc.cleanupOrphanedPods()
|
cleanupOrphanedPods(pods, nc.nodeStore.Store, forcefullyDeletePodFunc)
|
||||||
|
|
||||||
if deleteCalls != 1 {
|
if deleteCalls != 1 {
|
||||||
t.Fatalf("expected one delete, got: %v", deleteCalls)
|
t.Fatalf("expected one delete, got: %v", deleteCalls)
|
||||||
|
Loading…
Reference in New Issue
Block a user