Merge pull request #25571 from gmarek/nodecontroller

Automatic merge from submit-queue

NodeController doesn't evict Pods if no Nodes are Ready

Fix #13412 #24597

When NodeControllers don't see any Ready Node it goes into "network segmentation mode". In this mode it cancels all evictions and don't evict any Pods.

It leaves network segmentation mode when it sees at least one Ready Node. When leaving it resets all timers, so each Node has full grace period to reconnect to the cluster.

cc @lavalamp @davidopp @mml @wojtek-t @fgrzadkowski
This commit is contained in:
k8s-merge-robot
2016-05-20 05:31:34 -07:00
4 changed files with 373 additions and 58 deletions

View File

@@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/system"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
@@ -123,6 +124,11 @@ type NodeController struct {
forcefullyDeletePod func(*api.Pod) error
nodeExistsInCloudProvider func(string) (bool, error)
// If in network segmentation mode NodeController won't evict Pods from unhealthy Nodes.
// It is enabled when all Nodes observed by the NodeController are NotReady and disabled
// when NC sees any healthy Node. This is a temporary fix for v1.3.
networkSegmentationMode bool
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@@ -141,10 +147,10 @@ func NewNodeController(
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
eventBroadcaster.StartLogging(glog.Infof)
if kubeClient != nil {
glog.Infof("Sending events to api server.")
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
} else {
glog.Infof("No api server defined - no events will be sent to API server.")
glog.V(0).Infof("No api server defined - no events will be sent to API server.")
}
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
@@ -282,7 +288,7 @@ func (nc *NodeController) Run(period time.Duration) {
}
if completed {
glog.Infof("All pods terminated on %s", value.Value)
glog.V(2).Infof("All pods terminated on %s", value.Value)
nc.recordNodeEvent(value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0
}
@@ -371,7 +377,7 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
node := nodeObj.(*api.Node)
v, err := version.Parse(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err)
glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err)
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
@@ -407,7 +413,7 @@ func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
var zero int64
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
if err == nil {
glog.Infof("forceful deletion of %s succeeded", pod.Name)
glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name)
}
return err
}
@@ -449,13 +455,14 @@ func (nc *NodeController) monitorNodeStatus() error {
// reduce lists/decouple this from monitoring status.
nc.reconcileNodeCIDRs(nodes)
}
seenReady := false
for i := range nodes.Items {
var gracePeriod time.Duration
var lastReadyCondition api.NodeCondition
var readyCondition *api.NodeCondition
var observedReadyCondition api.NodeCondition
var currentReadyCondition *api.NodeCondition
node := &nodes.Items[i]
for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
gracePeriod, lastReadyCondition, readyCondition, err = nc.tryUpdateNodeStatus(node)
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
if err == nil {
break
}
@@ -474,28 +481,32 @@ func (nc *NodeController) monitorNodeStatus() error {
decisionTimestamp := nc.now()
if readyCondition != nil {
if currentReadyCondition != nil {
// Check eviction timeout against decisionTimestamp
if lastReadyCondition.Status == api.ConditionFalse &&
if observedReadyCondition.Status == api.ConditionFalse &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node.Name) {
glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
}
}
if lastReadyCondition.Status == api.ConditionUnknown &&
if observedReadyCondition.Status == api.ConditionUnknown &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node.Name) {
glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
}
}
if lastReadyCondition.Status == api.ConditionTrue {
if observedReadyCondition.Status == api.ConditionTrue {
// We do not treat a master node as a part of the cluster for network segmentation checking.
if !system.IsMasterNode(node) {
seenReady = true
}
if nc.cancelPodEviction(node.Name) {
glog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
// Report node event.
if readyCondition.Status != api.ConditionTrue && lastReadyCondition.Status == api.ConditionTrue {
if currentReadyCondition.Status != api.ConditionTrue && observedReadyCondition.Status == api.ConditionTrue {
nc.recordNodeStatusChange(node, "NodeNotReady")
if err = nc.markAllPodsNotReady(node.Name); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
@@ -504,14 +515,14 @@ func (nc *NodeController) monitorNodeStatus() error {
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately.
if readyCondition.Status != api.ConditionTrue && nc.cloud != nil {
if currentReadyCondition.Status != api.ConditionTrue && nc.cloud != nil {
exists, err := nc.nodeExistsInCloudProvider(node.Name)
if err != nil {
glog.Errorf("Error determining if node %v exists in cloud: %v", node.Name, err)
continue
}
if !exists {
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
nc.recordNodeEvent(node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
go func(nodeName string) {
defer utilruntime.HandleCrash()
@@ -527,6 +538,18 @@ func (nc *NodeController) monitorNodeStatus() error {
}
}
}
// NC don't see any Ready Node. We assume that the network is segmented and Nodes cannot connect to API server and
// update their statuses. NC enteres network segmentation mode and cancels all evictions in progress.
if !seenReady {
nc.networkSegmentationMode = true
nc.stopAllPodEvictions()
} else {
if nc.networkSegmentationMode {
nc.forceUpdateAllProbeTimes()
nc.networkSegmentationMode = false
}
}
return nil
}
@@ -632,13 +655,13 @@ func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status stri
func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) {
var err error
var gracePeriod time.Duration
var lastReadyCondition api.NodeCondition
readyCondition := nc.getCondition(&node.Status, api.NodeReady)
if readyCondition == nil {
var observedReadyCondition api.NodeCondition
currentReadyCondition := nc.getCondition(&node.Status, api.NodeReady)
if currentReadyCondition == nil {
// If ready condition is nil, then kubelet (or nodecontroller) never posted node status.
// A fake ready condition is created, where LastProbeTime and LastTransitionTime is set
// to node.CreationTimestamp to avoid handle the corner case.
lastReadyCondition = api.NodeCondition{
observedReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastHeartbeatTime: node.CreationTimestamp,
@@ -652,7 +675,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
}
} else {
// If ready condition is not nil, make a copy of it, since we may modify it in place later.
lastReadyCondition = *readyCondition
observedReadyCondition = *currentReadyCondition
gracePeriod = nc.nodeMonitorGracePeriod
}
@@ -683,7 +706,6 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(),
}
nc.nodeStatusMap[node.Name] = savedNodeStatus
} else if savedCondition == nil && observedCondition != nil {
glog.V(1).Infof("Creating timestamp entry for newly observed Node %s", node.Name)
savedNodeStatus = nodeStatusData{
@@ -691,7 +713,6 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(),
}
nc.nodeStatusMap[node.Name] = savedNodeStatus
} else if savedCondition != nil && observedCondition == nil {
glog.Errorf("ReadyCondition was removed from Status of Node %s", node.Name)
// TODO: figure out what to do in this case. For now we do the same thing as above.
@@ -700,7 +721,6 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(),
}
nc.nodeStatusMap[node.Name] = savedNodeStatus
} else if savedCondition != nil && observedCondition != nil && savedCondition.LastHeartbeatTime != observedCondition.LastHeartbeatTime {
var transitionTime unversioned.Time
// If ReadyCondition changed since the last time we checked, we update the transition timestamp to "now",
@@ -713,7 +733,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
transitionTime = savedNodeStatus.readyTransitionTimestamp
}
if glog.V(5) {
glog.Infof("Node %s ReadyCondition updated. Updating timestamp: %+v vs %+v.", node.Name, savedNodeStatus.status, node.Status)
glog.V(5).Infof("Node %s ReadyCondition updated. Updating timestamp: %+v vs %+v.", node.Name, savedNodeStatus.status, node.Status)
} else {
glog.V(3).Infof("Node %s ReadyCondition updated. Updating timestamp.", node.Name)
}
@@ -722,13 +742,13 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
probeTimestamp: nc.now(),
readyTransitionTimestamp: transitionTime,
}
nc.nodeStatusMap[node.Name] = savedNodeStatus
}
nc.nodeStatusMap[node.Name] = savedNodeStatus
if nc.now().After(savedNodeStatus.probeTimestamp.Add(gracePeriod)) {
// NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown
// (regardless of its current value) in the master.
if readyCondition == nil {
if currentReadyCondition == nil {
glog.V(2).Infof("node %v is never updated by kubelet", node.Name)
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
Type: api.NodeReady,
@@ -740,14 +760,14 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
})
} else {
glog.V(4).Infof("node %v hasn't been updated for %+v. Last ready condition is: %+v",
node.Name, nc.now().Time.Sub(savedNodeStatus.probeTimestamp.Time), lastReadyCondition)
if lastReadyCondition.Status != api.ConditionUnknown {
readyCondition.Status = api.ConditionUnknown
readyCondition.Reason = "NodeStatusUnknown"
readyCondition.Message = fmt.Sprintf("Kubelet stopped posting node status.")
node.Name, nc.now().Time.Sub(savedNodeStatus.probeTimestamp.Time), observedReadyCondition)
if observedReadyCondition.Status != api.ConditionUnknown {
currentReadyCondition.Status = api.ConditionUnknown
currentReadyCondition.Reason = "NodeStatusUnknown"
currentReadyCondition.Message = fmt.Sprintf("Kubelet stopped posting node status.")
// LastProbeTime is the last time we heard from kubelet.
readyCondition.LastHeartbeatTime = lastReadyCondition.LastHeartbeatTime
readyCondition.LastTransitionTime = nc.now()
currentReadyCondition.LastHeartbeatTime = observedReadyCondition.LastHeartbeatTime
currentReadyCondition.LastTransitionTime = nc.now()
}
}
@@ -776,27 +796,41 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
}
}
if !api.Semantic.DeepEqual(nc.getCondition(&node.Status, api.NodeReady), &lastReadyCondition) {
if !api.Semantic.DeepEqual(nc.getCondition(&node.Status, api.NodeReady), &observedReadyCondition) {
if _, err = nc.kubeClient.Core().Nodes().UpdateStatus(node); err != nil {
glog.Errorf("Error updating node %s: %v", node.Name, err)
return gracePeriod, lastReadyCondition, readyCondition, err
return gracePeriod, observedReadyCondition, currentReadyCondition, err
} else {
nc.nodeStatusMap[node.Name] = nodeStatusData{
status: node.Status,
probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp,
readyTransitionTimestamp: nc.now(),
}
return gracePeriod, lastReadyCondition, readyCondition, nil
return gracePeriod, observedReadyCondition, currentReadyCondition, nil
}
}
}
return gracePeriod, lastReadyCondition, readyCondition, err
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes
// all eviction timer to reset.
func (nc *NodeController) forceUpdateAllProbeTimes() {
now := nc.now()
for k, v := range nc.nodeStatusMap {
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[k] = v
}
}
// evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction.
func (nc *NodeController) evictPods(nodeName string) bool {
if nc.networkSegmentationMode {
return false
}
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.podEvictor.Add(nodeName)
@@ -816,6 +850,15 @@ func (nc *NodeController) cancelPodEviction(nodeName string) bool {
return false
}
// stopAllPodEvictions removes any queued evictions for all Nodes.
func (nc *NodeController) stopAllPodEvictions() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
glog.V(3).Infof("Cancelling all pod evictions.")
nc.podEvictor.Clear()
nc.terminationEvictor.Clear()
}
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted.
func (nc *NodeController) deletePods(nodeName string) (bool, error) {

View File

@@ -165,13 +165,29 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
evictionTimeout := 10 * time.Minute
// Because of the logic that prevents NC from evicting anything when all Nodes are NotReady
// we need second healthy node in tests. Because of how the tests are written we need to update
// the status of this Node.
healthyNodeNewStatus := api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
// Node status has just been updated, and is NotReady for 10min.
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 9, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
}
table := []struct {
fakeNodeHandler *FakeNodeHandler
daemonSets []extensions.DaemonSet
timeToPass time.Duration
newNodeStatus api.NodeStatus
expectedEvictPods bool
description string
fakeNodeHandler *FakeNodeHandler
daemonSets []extensions.DaemonSet
timeToPass time.Duration
newNodeStatus api.NodeStatus
secondNodeNewStatus api.NodeStatus
expectedEvictPods bool
description string
}{
// Node created recently, with no status (happens only at cluster startup).
{
@@ -183,14 +199,31 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
CreationTimestamp: fakeNow,
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
},
daemonSets: nil,
timeToPass: 0,
newNodeStatus: api.NodeStatus{},
expectedEvictPods: false,
description: "Node created recently, with no status.",
daemonSets: nil,
timeToPass: 0,
newNodeStatus: api.NodeStatus{},
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: false,
description: "Node created recently, with no status.",
},
// Node created long time ago, and kubelet posted NotReady for a short period of time.
{
@@ -212,6 +245,22 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
},
@@ -228,8 +277,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
},
},
},
expectedEvictPods: false,
description: "Node created long time ago, and kubelet posted NotReady for a short period of time.",
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: false,
description: "Node created long time ago, and kubelet posted NotReady for a short period of time.",
},
// Pod is ds-managed, and kubelet posted NotReady for a long period of time.
{
@@ -251,6 +301,22 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(
&api.PodList{
@@ -294,8 +360,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
},
},
},
expectedEvictPods: false,
description: "Pod is ds-managed, and kubelet posted NotReady for a long period of time.",
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: false,
description: "Pod is ds-managed, and kubelet posted NotReady for a long period of time.",
},
// Node created long time ago, and kubelet posted NotReady for a long period of time.
{
@@ -317,6 +384,22 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
},
@@ -333,8 +416,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
},
},
},
expectedEvictPods: true,
description: "Node created long time ago, and kubelet posted NotReady for a long period of time.",
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: true,
description: "Node created long time ago, and kubelet posted NotReady for a long period of time.",
},
// Node created long time ago, node controller posted Unknown for a short period of time.
{
@@ -356,6 +440,22 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
},
@@ -372,8 +472,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
},
},
},
expectedEvictPods: false,
description: "Node created long time ago, node controller posted Unknown for a short period of time.",
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: false,
description: "Node created long time ago, node controller posted Unknown for a short period of time.",
},
// Node created long time ago, node controller posted Unknown for a long period of time.
{
@@ -395,6 +496,78 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
},
daemonSets: nil,
timeToPass: 60 * time.Minute,
newNodeStatus: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
// Node status was updated by nodecontroller 1hr ago
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: true,
description: "Node created long time ago, node controller posted Unknown for a long period of time.",
},
// NetworkSegmentation: Node created long time ago, node controller posted Unknown for a long period of time on both Nodes.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
},
@@ -411,8 +584,76 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
},
},
},
expectedEvictPods: true,
description: "Node created long time ago, node controller posted Unknown for a long period of time.",
secondNodeNewStatus: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
// Node status was updated by nodecontroller 1hr ago
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
expectedEvictPods: false,
description: "Network Segmentation: Node created long time ago, node controller posted Unknown for a long period of time on both Nodes.",
},
// NetworkSegmentation: Node created long time ago, node controller posted Unknown for a long period
// of on first Node, eviction should stop even though -master Node is healthy.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node-master",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
},
daemonSets: nil,
timeToPass: 60 * time.Minute,
newNodeStatus: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
// Node status was updated by nodecontroller 1hr ago
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
secondNodeNewStatus: healthyNodeNewStatus,
expectedEvictPods: false,
description: "NetworkSegmentation: Node created long time ago, node controller posted Unknown for a long period of on first Node, eviction should stop even though -master Node is healthy",
},
}
@@ -430,6 +671,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
if item.timeToPass > 0 {
nodeController.now = func() unversioned.Time { return unversioned.Time{Time: fakeNow.Add(item.timeToPass)} }
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
item.fakeNodeHandler.Existing[1].Status = item.secondNodeNewStatus
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)

View File

@@ -133,6 +133,18 @@ func (q *UniqueQueue) Head() (TimedValue, bool) {
return *result, true
}
// Clear removes all items from the queue and duplication preventing set.
func (q *UniqueQueue) Clear() {
q.lock.Lock()
defer q.lock.Unlock()
if q.queue.Len() > 0 {
q.queue = make(TimedQueue, 0)
}
if len(q.set) > 0 {
q.set = sets.NewString()
}
}
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
// of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
@@ -199,3 +211,8 @@ func (q *RateLimitedTimedQueue) Add(value string) bool {
func (q *RateLimitedTimedQueue) Remove(value string) bool {
return q.queue.Remove(value)
}
// Removes all items from the queue
func (q *RateLimitedTimedQueue) Clear() {
q.queue.Clear()
}

View File

@@ -261,3 +261,16 @@ func TestTryRemovingWhileTry(t *testing.T) {
t.Fatalf("unexpected iterations: %d", count)
}
}
func TestClear(t *testing.T) {
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Clear()
if len(evictor.queue.queue) != 0 {
t.Fatalf("Clear should remove all elements from the queue.")
}
}