NodeController doesn't evict Pods if no Nodes are Ready

This commit is contained in:
gmarek 2016-05-16 11:20:23 +02:00
parent 1cba05574b
commit 6d27009db1
4 changed files with 373 additions and 58 deletions

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/system"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -123,6 +124,11 @@ type NodeController struct {
forcefullyDeletePod func(*api.Pod) error forcefullyDeletePod func(*api.Pod) error
nodeExistsInCloudProvider func(string) (bool, error) nodeExistsInCloudProvider func(string) (bool, error)
// If in network segmentation mode NodeController won't evict Pods from unhealthy Nodes.
// It is enabled when all Nodes observed by the NodeController are NotReady and disabled
// when NC sees any healthy Node. This is a temporary fix for v1.3.
networkSegmentationMode bool
} }
// NewNodeController returns a new node controller to sync instances from cloudprovider. // NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -141,10 +147,10 @@ func NewNodeController(
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
if kubeClient != nil { if kubeClient != nil {
glog.Infof("Sending events to api server.") glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
} else { } else {
glog.Infof("No api server defined - no events will be sent to API server.") glog.V(0).Infof("No api server defined - no events will be sent to API server.")
} }
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
@ -282,7 +288,7 @@ func (nc *NodeController) Run(period time.Duration) {
} }
if completed { if completed {
glog.Infof("All pods terminated on %s", value.Value) glog.V(2).Infof("All pods terminated on %s", value.Value)
nc.recordNodeEvent(value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value)) nc.recordNodeEvent(value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0 return true, 0
} }
@ -371,7 +377,7 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
node := nodeObj.(*api.Node) node := nodeObj.(*api.Node)
v, err := version.Parse(node.Status.NodeInfo.KubeletVersion) v, err := version.Parse(node.Status.NodeInfo.KubeletVersion)
if err != nil { if err != nil {
glog.Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err) glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err)
utilruntime.HandleError(nc.forcefullyDeletePod(pod)) utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return return
} }
@ -407,7 +413,7 @@ func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
var zero int64 var zero int64
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero}) err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
if err == nil { if err == nil {
glog.Infof("forceful deletion of %s succeeded", pod.Name) glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name)
} }
return err return err
} }
@ -449,13 +455,14 @@ func (nc *NodeController) monitorNodeStatus() error {
// reduce lists/decouple this from monitoring status. // reduce lists/decouple this from monitoring status.
nc.reconcileNodeCIDRs(nodes) nc.reconcileNodeCIDRs(nodes)
} }
seenReady := false
for i := range nodes.Items { for i := range nodes.Items {
var gracePeriod time.Duration var gracePeriod time.Duration
var lastReadyCondition api.NodeCondition var observedReadyCondition api.NodeCondition
var readyCondition *api.NodeCondition var currentReadyCondition *api.NodeCondition
node := &nodes.Items[i] node := &nodes.Items[i]
for rep := 0; rep < nodeStatusUpdateRetry; rep++ { for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
gracePeriod, lastReadyCondition, readyCondition, err = nc.tryUpdateNodeStatus(node) gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
if err == nil { if err == nil {
break break
} }
@ -474,28 +481,32 @@ func (nc *NodeController) monitorNodeStatus() error {
decisionTimestamp := nc.now() decisionTimestamp := nc.now()
if readyCondition != nil { if currentReadyCondition != nil {
// Check eviction timeout against decisionTimestamp // Check eviction timeout against decisionTimestamp
if lastReadyCondition.Status == api.ConditionFalse && if observedReadyCondition.Status == api.ConditionFalse &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node.Name) { if nc.evictPods(node.Name) {
glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
} }
} }
if lastReadyCondition.Status == api.ConditionUnknown && if observedReadyCondition.Status == api.ConditionUnknown &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) { decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node.Name) { if nc.evictPods(node.Name) {
glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
} }
} }
if lastReadyCondition.Status == api.ConditionTrue { if observedReadyCondition.Status == api.ConditionTrue {
// We do not treat a master node as a part of the cluster for network segmentation checking.
if !system.IsMasterNode(node) {
seenReady = true
}
if nc.cancelPodEviction(node.Name) { if nc.cancelPodEviction(node.Name) {
glog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name) glog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
} }
} }
// Report node event. // Report node event.
if readyCondition.Status != api.ConditionTrue && lastReadyCondition.Status == api.ConditionTrue { if currentReadyCondition.Status != api.ConditionTrue && observedReadyCondition.Status == api.ConditionTrue {
nc.recordNodeStatusChange(node, "NodeNotReady") nc.recordNodeStatusChange(node, "NodeNotReady")
if err = nc.markAllPodsNotReady(node.Name); err != nil { if err = nc.markAllPodsNotReady(node.Name); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
@ -504,14 +515,14 @@ func (nc *NodeController) monitorNodeStatus() error {
// Check with the cloud provider to see if the node still exists. If it // Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately. // doesn't, delete the node immediately.
if readyCondition.Status != api.ConditionTrue && nc.cloud != nil { if currentReadyCondition.Status != api.ConditionTrue && nc.cloud != nil {
exists, err := nc.nodeExistsInCloudProvider(node.Name) exists, err := nc.nodeExistsInCloudProvider(node.Name)
if err != nil { if err != nil {
glog.Errorf("Error determining if node %v exists in cloud: %v", node.Name, err) glog.Errorf("Error determining if node %v exists in cloud: %v", node.Name, err)
continue continue
} }
if !exists { if !exists {
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name) glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
nc.recordNodeEvent(node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) nc.recordNodeEvent(node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
go func(nodeName string) { go func(nodeName string) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
@ -527,6 +538,18 @@ func (nc *NodeController) monitorNodeStatus() error {
} }
} }
} }
// NC don't see any Ready Node. We assume that the network is segmented and Nodes cannot connect to API server and
// update their statuses. NC enteres network segmentation mode and cancels all evictions in progress.
if !seenReady {
nc.networkSegmentationMode = true
nc.stopAllPodEvictions()
} else {
if nc.networkSegmentationMode {
nc.forceUpdateAllProbeTimes()
nc.networkSegmentationMode = false
}
}
return nil return nil
} }
@ -632,13 +655,13 @@ func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status stri
func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) { func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) {
var err error var err error
var gracePeriod time.Duration var gracePeriod time.Duration
var lastReadyCondition api.NodeCondition var observedReadyCondition api.NodeCondition
readyCondition := nc.getCondition(&node.Status, api.NodeReady) currentReadyCondition := nc.getCondition(&node.Status, api.NodeReady)
if readyCondition == nil { if currentReadyCondition == nil {
// If ready condition is nil, then kubelet (or nodecontroller) never posted node status. // If ready condition is nil, then kubelet (or nodecontroller) never posted node status.
// A fake ready condition is created, where LastProbeTime and LastTransitionTime is set // A fake ready condition is created, where LastProbeTime and LastTransitionTime is set
// to node.CreationTimestamp to avoid handle the corner case. // to node.CreationTimestamp to avoid handle the corner case.
lastReadyCondition = api.NodeCondition{ observedReadyCondition = api.NodeCondition{
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionUnknown, Status: api.ConditionUnknown,
LastHeartbeatTime: node.CreationTimestamp, LastHeartbeatTime: node.CreationTimestamp,
@ -652,7 +675,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
} }
} else { } else {
// If ready condition is not nil, make a copy of it, since we may modify it in place later. // If ready condition is not nil, make a copy of it, since we may modify it in place later.
lastReadyCondition = *readyCondition observedReadyCondition = *currentReadyCondition
gracePeriod = nc.nodeMonitorGracePeriod gracePeriod = nc.nodeMonitorGracePeriod
} }
@ -683,7 +706,6 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
probeTimestamp: nc.now(), probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(), readyTransitionTimestamp: nc.now(),
} }
nc.nodeStatusMap[node.Name] = savedNodeStatus
} else if savedCondition == nil && observedCondition != nil { } else if savedCondition == nil && observedCondition != nil {
glog.V(1).Infof("Creating timestamp entry for newly observed Node %s", node.Name) glog.V(1).Infof("Creating timestamp entry for newly observed Node %s", node.Name)
savedNodeStatus = nodeStatusData{ savedNodeStatus = nodeStatusData{
@ -691,7 +713,6 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
probeTimestamp: nc.now(), probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(), readyTransitionTimestamp: nc.now(),
} }
nc.nodeStatusMap[node.Name] = savedNodeStatus
} else if savedCondition != nil && observedCondition == nil { } else if savedCondition != nil && observedCondition == nil {
glog.Errorf("ReadyCondition was removed from Status of Node %s", node.Name) glog.Errorf("ReadyCondition was removed from Status of Node %s", node.Name)
// TODO: figure out what to do in this case. For now we do the same thing as above. // TODO: figure out what to do in this case. For now we do the same thing as above.
@ -700,7 +721,6 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
probeTimestamp: nc.now(), probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(), readyTransitionTimestamp: nc.now(),
} }
nc.nodeStatusMap[node.Name] = savedNodeStatus
} else if savedCondition != nil && observedCondition != nil && savedCondition.LastHeartbeatTime != observedCondition.LastHeartbeatTime { } else if savedCondition != nil && observedCondition != nil && savedCondition.LastHeartbeatTime != observedCondition.LastHeartbeatTime {
var transitionTime unversioned.Time var transitionTime unversioned.Time
// If ReadyCondition changed since the last time we checked, we update the transition timestamp to "now", // If ReadyCondition changed since the last time we checked, we update the transition timestamp to "now",
@ -713,7 +733,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
transitionTime = savedNodeStatus.readyTransitionTimestamp transitionTime = savedNodeStatus.readyTransitionTimestamp
} }
if glog.V(5) { if glog.V(5) {
glog.Infof("Node %s ReadyCondition updated. Updating timestamp: %+v vs %+v.", node.Name, savedNodeStatus.status, node.Status) glog.V(5).Infof("Node %s ReadyCondition updated. Updating timestamp: %+v vs %+v.", node.Name, savedNodeStatus.status, node.Status)
} else { } else {
glog.V(3).Infof("Node %s ReadyCondition updated. Updating timestamp.", node.Name) glog.V(3).Infof("Node %s ReadyCondition updated. Updating timestamp.", node.Name)
} }
@ -722,13 +742,13 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
probeTimestamp: nc.now(), probeTimestamp: nc.now(),
readyTransitionTimestamp: transitionTime, readyTransitionTimestamp: transitionTime,
} }
nc.nodeStatusMap[node.Name] = savedNodeStatus
} }
nc.nodeStatusMap[node.Name] = savedNodeStatus
if nc.now().After(savedNodeStatus.probeTimestamp.Add(gracePeriod)) { if nc.now().After(savedNodeStatus.probeTimestamp.Add(gracePeriod)) {
// NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown // NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown
// (regardless of its current value) in the master. // (regardless of its current value) in the master.
if readyCondition == nil { if currentReadyCondition == nil {
glog.V(2).Infof("node %v is never updated by kubelet", node.Name) glog.V(2).Infof("node %v is never updated by kubelet", node.Name)
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
Type: api.NodeReady, Type: api.NodeReady,
@ -740,14 +760,14 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
}) })
} else { } else {
glog.V(4).Infof("node %v hasn't been updated for %+v. Last ready condition is: %+v", glog.V(4).Infof("node %v hasn't been updated for %+v. Last ready condition is: %+v",
node.Name, nc.now().Time.Sub(savedNodeStatus.probeTimestamp.Time), lastReadyCondition) node.Name, nc.now().Time.Sub(savedNodeStatus.probeTimestamp.Time), observedReadyCondition)
if lastReadyCondition.Status != api.ConditionUnknown { if observedReadyCondition.Status != api.ConditionUnknown {
readyCondition.Status = api.ConditionUnknown currentReadyCondition.Status = api.ConditionUnknown
readyCondition.Reason = "NodeStatusUnknown" currentReadyCondition.Reason = "NodeStatusUnknown"
readyCondition.Message = fmt.Sprintf("Kubelet stopped posting node status.") currentReadyCondition.Message = fmt.Sprintf("Kubelet stopped posting node status.")
// LastProbeTime is the last time we heard from kubelet. // LastProbeTime is the last time we heard from kubelet.
readyCondition.LastHeartbeatTime = lastReadyCondition.LastHeartbeatTime currentReadyCondition.LastHeartbeatTime = observedReadyCondition.LastHeartbeatTime
readyCondition.LastTransitionTime = nc.now() currentReadyCondition.LastTransitionTime = nc.now()
} }
} }
@ -776,27 +796,41 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
} }
} }
if !api.Semantic.DeepEqual(nc.getCondition(&node.Status, api.NodeReady), &lastReadyCondition) { if !api.Semantic.DeepEqual(nc.getCondition(&node.Status, api.NodeReady), &observedReadyCondition) {
if _, err = nc.kubeClient.Core().Nodes().UpdateStatus(node); err != nil { if _, err = nc.kubeClient.Core().Nodes().UpdateStatus(node); err != nil {
glog.Errorf("Error updating node %s: %v", node.Name, err) glog.Errorf("Error updating node %s: %v", node.Name, err)
return gracePeriod, lastReadyCondition, readyCondition, err return gracePeriod, observedReadyCondition, currentReadyCondition, err
} else { } else {
nc.nodeStatusMap[node.Name] = nodeStatusData{ nc.nodeStatusMap[node.Name] = nodeStatusData{
status: node.Status, status: node.Status,
probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp, probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp,
readyTransitionTimestamp: nc.now(), readyTransitionTimestamp: nc.now(),
} }
return gracePeriod, lastReadyCondition, readyCondition, nil return gracePeriod, observedReadyCondition, currentReadyCondition, nil
} }
} }
} }
return gracePeriod, lastReadyCondition, readyCondition, err return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes
// all eviction timer to reset.
func (nc *NodeController) forceUpdateAllProbeTimes() {
now := nc.now()
for k, v := range nc.nodeStatusMap {
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[k] = v
}
} }
// evictPods queues an eviction for the provided node name, and returns false if the node is already // evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction. // queued for eviction.
func (nc *NodeController) evictPods(nodeName string) bool { func (nc *NodeController) evictPods(nodeName string) bool {
if nc.networkSegmentationMode {
return false
}
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
return nc.podEvictor.Add(nodeName) return nc.podEvictor.Add(nodeName)
@ -816,6 +850,15 @@ func (nc *NodeController) cancelPodEviction(nodeName string) bool {
return false return false
} }
// stopAllPodEvictions removes any queued evictions for all Nodes.
func (nc *NodeController) stopAllPodEvictions() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
glog.V(3).Infof("Cancelling all pod evictions.")
nc.podEvictor.Clear()
nc.terminationEvictor.Clear()
}
// deletePods will delete all pods from master running on given node, and return true // deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted. // if any pods were deleted.
func (nc *NodeController) deletePods(nodeName string) (bool, error) { func (nc *NodeController) deletePods(nodeName string) (bool, error) {

View File

@ -165,11 +165,27 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
evictionTimeout := 10 * time.Minute evictionTimeout := 10 * time.Minute
// Because of the logic that prevents NC from evicting anything when all Nodes are NotReady
// we need second healthy node in tests. Because of how the tests are written we need to update
// the status of this Node.
healthyNodeNewStatus := api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
// Node status has just been updated, and is NotReady for 10min.
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 9, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
}
table := []struct { table := []struct {
fakeNodeHandler *FakeNodeHandler fakeNodeHandler *FakeNodeHandler
daemonSets []extensions.DaemonSet daemonSets []extensions.DaemonSet
timeToPass time.Duration timeToPass time.Duration
newNodeStatus api.NodeStatus newNodeStatus api.NodeStatus
secondNodeNewStatus api.NodeStatus
expectedEvictPods bool expectedEvictPods bool
description string description string
}{ }{
@ -183,12 +199,29 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
CreationTimestamp: fakeNow, CreationTimestamp: fakeNow,
}, },
}, },
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
}, },
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}), Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
}, },
daemonSets: nil, daemonSets: nil,
timeToPass: 0, timeToPass: 0,
newNodeStatus: api.NodeStatus{}, newNodeStatus: api.NodeStatus{},
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: false, expectedEvictPods: false,
description: "Node created recently, with no status.", description: "Node created recently, with no status.",
}, },
@ -212,6 +245,22 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
}, },
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
}, },
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}), Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
}, },
@ -228,6 +277,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
}, },
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: false, expectedEvictPods: false,
description: "Node created long time ago, and kubelet posted NotReady for a short period of time.", description: "Node created long time ago, and kubelet posted NotReady for a short period of time.",
}, },
@ -251,6 +301,22 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
}, },
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
}, },
Clientset: fake.NewSimpleClientset( Clientset: fake.NewSimpleClientset(
&api.PodList{ &api.PodList{
@ -294,6 +360,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
}, },
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: false, expectedEvictPods: false,
description: "Pod is ds-managed, and kubelet posted NotReady for a long period of time.", description: "Pod is ds-managed, and kubelet posted NotReady for a long period of time.",
}, },
@ -317,6 +384,22 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
}, },
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
}, },
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}), Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
}, },
@ -333,6 +416,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
}, },
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: true, expectedEvictPods: true,
description: "Node created long time ago, and kubelet posted NotReady for a long period of time.", description: "Node created long time ago, and kubelet posted NotReady for a long period of time.",
}, },
@ -356,6 +440,22 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
}, },
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
}, },
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}), Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
}, },
@ -372,6 +472,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
}, },
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: false, expectedEvictPods: false,
description: "Node created long time ago, node controller posted Unknown for a short period of time.", description: "Node created long time ago, node controller posted Unknown for a short period of time.",
}, },
@ -395,6 +496,78 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
}, },
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
},
daemonSets: nil,
timeToPass: 60 * time.Minute,
newNodeStatus: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
// Node status was updated by nodecontroller 1hr ago
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: true,
description: "Node created long time ago, node controller posted Unknown for a long period of time.",
},
// NetworkSegmentation: Node created long time ago, node controller posted Unknown for a long period of time on both Nodes.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
}, },
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}), Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
}, },
@ -411,8 +584,76 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}, },
}, },
}, },
expectedEvictPods: true, secondNodeNewStatus: api.NodeStatus{
description: "Node created long time ago, node controller posted Unknown for a long period of time.", Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
// Node status was updated by nodecontroller 1hr ago
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
expectedEvictPods: false,
description: "Network Segmentation: Node created long time ago, node controller posted Unknown for a long period of time on both Nodes.",
},
// NetworkSegmentation: Node created long time ago, node controller posted Unknown for a long period
// of on first Node, eviction should stop even though -master Node is healthy.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node-master",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
},
daemonSets: nil,
timeToPass: 60 * time.Minute,
newNodeStatus: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
// Node status was updated by nodecontroller 1hr ago
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: false,
description: "NetworkSegmentation: Node created long time ago, node controller posted Unknown for a long period of on first Node, eviction should stop even though -master Node is healthy",
}, },
} }
@ -430,6 +671,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
if item.timeToPass > 0 { if item.timeToPass > 0 {
nodeController.now = func() unversioned.Time { return unversioned.Time{Time: fakeNow.Add(item.timeToPass)} } nodeController.now = func() unversioned.Time { return unversioned.Time{Time: fakeNow.Add(item.timeToPass)} }
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
item.fakeNodeHandler.Existing[1].Status = item.secondNodeNewStatus
} }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)

View File

@ -133,6 +133,18 @@ func (q *UniqueQueue) Head() (TimedValue, bool) {
return *result, true return *result, true
} }
// Clear removes all items from the queue and duplication preventing set.
func (q *UniqueQueue) Clear() {
q.lock.Lock()
defer q.lock.Unlock()
if q.queue.Len() > 0 {
q.queue = make(TimedQueue, 0)
}
if len(q.set) > 0 {
q.set = sets.NewString()
}
}
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time // RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
// of execution. It is also rate limited. // of execution. It is also rate limited.
type RateLimitedTimedQueue struct { type RateLimitedTimedQueue struct {
@ -199,3 +211,8 @@ func (q *RateLimitedTimedQueue) Add(value string) bool {
func (q *RateLimitedTimedQueue) Remove(value string) bool { func (q *RateLimitedTimedQueue) Remove(value string) bool {
return q.queue.Remove(value) return q.queue.Remove(value)
} }
// Removes all items from the queue
func (q *RateLimitedTimedQueue) Clear() {
q.queue.Clear()
}

View File

@ -261,3 +261,16 @@ func TestTryRemovingWhileTry(t *testing.T) {
t.Fatalf("unexpected iterations: %d", count) t.Fatalf("unexpected iterations: %d", count)
} }
} }
func TestClear(t *testing.T) {
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Clear()
if len(evictor.queue.queue) != 0 {
t.Fatalf("Clear should remove all elements from the queue.")
}
}