diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 0c963bc1372..834e0d9267b 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -72,7 +72,6 @@ import ( "k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/crypto" - "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" @@ -239,8 +238,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) } nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), - s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)), - flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)), + s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst), s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod.Duration) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 07d09e6c50c..332453c7c94 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -59,7 +59,6 @@ import ( quotainstall "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/util/crypto" - "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/contrib/mesos/pkg/profile" @@ -155,8 +154,7 @@ func (s *CMServer) Run(_ []string) error { _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) _, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR) nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), - s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)), - flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)), + s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst), s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod.Duration) diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 5532e97e576..ecd11cd6e1c 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -73,13 +73,12 @@ type nodeStatusData struct { } type NodeController struct { - allocateNodeCIDRs bool - cloud cloudprovider.Interface - clusterCIDR *net.IPNet - serviceCIDR *net.IPNet - deletingPodsRateLimiter flowcontrol.RateLimiter - knownNodeSet map[string]*api.Node - kubeClient clientset.Interface + allocateNodeCIDRs bool + cloud cloudprovider.Interface + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + knownNodeSet map[string]*api.Node + kubeClient clientset.Interface // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) // Value used if sync_nodes_status=False. NodeController will not proactively @@ -112,9 +111,11 @@ type NodeController struct { // Lock to access evictor workers evictorLock *sync.Mutex // workers that evicts pods from unresponsive nodes. - podEvictor *RateLimitedTimedQueue - terminationEvictor *RateLimitedTimedQueue - podEvictionTimeout time.Duration + zonePodEvictor map[string]*RateLimitedTimedQueue + zoneTerminationEvictor map[string]*RateLimitedTimedQueue + evictionLimiterQPS float32 + evictionLimiterBurst int + podEvictionTimeout time.Duration // The maximum duration before a pod evicted from a node can be forcefully terminated. maximumGracePeriod time.Duration recorder record.EventRecorder @@ -142,8 +143,8 @@ func NewNodeController( cloud cloudprovider.Interface, kubeClient clientset.Interface, podEvictionTimeout time.Duration, - deletionEvictionLimiter flowcontrol.RateLimiter, - terminationEvictionLimiter flowcontrol.RateLimiter, + evictionLimiterQPS float32, + evictionLimiterBurst int, nodeMonitorGracePeriod time.Duration, nodeStartupGracePeriod time.Duration, nodeMonitorPeriod time.Duration, @@ -184,8 +185,8 @@ func NewNodeController( podEvictionTimeout: podEvictionTimeout, maximumGracePeriod: 5 * time.Minute, evictorLock: &evictorLock, - podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter), - terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter), + zonePodEvictor: make(map[string]*RateLimitedTimedQueue), + zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue), nodeStatusMap: make(map[string]nodeStatusData), nodeMonitorGracePeriod: nodeMonitorGracePeriod, nodeMonitorPeriod: nodeMonitorPeriod, @@ -198,6 +199,8 @@ func NewNodeController( forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) }, nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, computeZoneStateFunc: ComputeZoneState, + evictionLimiterQPS: evictionLimiterQPS, + evictionLimiterBurst: evictionLimiterBurst, zoneStates: make(map[string]zoneState), } @@ -309,18 +312,20 @@ func (nc *NodeController) Run(period time.Duration) { go wait.Until(func() { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() - nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { - remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) - return false, 0 - } + for k := range nc.zonePodEvictor { + nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) { + remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) + return false, 0 + } - if remaining { - nc.terminationEvictor.Add(value.Value) - } - return true, 0 - }) + if remaining { + nc.zoneTerminationEvictor[k].Add(value.Value) + } + return true, 0 + }) + } }, nodeEvictionPeriod, wait.NeverStop) // TODO: replace with a controller that ensures pods that are terminating complete @@ -328,26 +333,28 @@ func (nc *NodeController) Run(period time.Duration) { go wait.Until(func() { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() - nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { - completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) - return false, 0 - } + for k := range nc.zoneTerminationEvictor { + nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) { + completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) + return false, 0 + } - if completed { - glog.V(2).Infof("All pods terminated on %s", value.Value) - recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value)) - return true, 0 - } + if completed { + glog.V(2).Infof("All pods terminated on %s", value.Value) + recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value)) + return true, 0 + } - glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining) - // clamp very short intervals - if remaining < nodeEvictionPeriod { - remaining = nodeEvictionPeriod - } - return false, remaining - }) + glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining) + // clamp very short intervals + if remaining < nodeEvictionPeriod { + remaining = nodeEvictionPeriod + } + return false, remaining + }) + } }, nodeEvictionPeriod, wait.NeverStop) go wait.Until(func() { @@ -372,8 +379,19 @@ func (nc *NodeController) monitorNodeStatus() error { for i := range added { glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name) recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name)) - nc.cancelPodEviction(added[i]) nc.knownNodeSet[added[i].Name] = added[i] + // When adding new Nodes we need to check if new zone appeared, and if so add new evictor. + zone := utilnode.GetZoneKey(added[i]) + if _, found := nc.zonePodEvictor[zone]; !found { + nc.zonePodEvictor[zone] = + NewRateLimitedTimedQueue( + flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst)) + } + if _, found := nc.zoneTerminationEvictor[zone]; !found { + nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue( + flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst)) + } + nc.cancelPodEviction(added[i]) } for i := range deleted { @@ -689,10 +707,11 @@ func (nc *NodeController) checkForNodeAddedDeleted(nodes *api.NodeList) (added, // cancelPodEviction removes any queued evictions, typically because the node is available again. It // returns true if an eviction was queued. func (nc *NodeController) cancelPodEviction(node *api.Node) bool { + zone := utilnode.GetZoneKey(node) nc.evictorLock.Lock() defer nc.evictorLock.Unlock() - wasDeleting := nc.podEvictor.Remove(node.Name) - wasTerminating := nc.terminationEvictor.Remove(node.Name) + wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name) + wasTerminating := nc.zoneTerminationEvictor[zone].Remove(node.Name) if wasDeleting || wasTerminating { glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name) return true @@ -703,10 +722,18 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool { // evictPods queues an eviction for the provided node name, and returns false if the node is already // queued for eviction. func (nc *NodeController) evictPods(node *api.Node) bool { - if nc.zoneStates[utilnode.GetZoneKey(node)] == stateFullSegmentation { - return false - } nc.evictorLock.Lock() defer nc.evictorLock.Unlock() - return nc.podEvictor.Add(node.Name) + foundHealty := false + for _, state := range nc.zoneStates { + if state != stateFullSegmentation { + foundHealty = true + break + } + } + if !foundHealty { + return false + } + zone := utilnode.GetZoneKey(node) + return nc.zonePodEvictor[zone].Add(node.Name) } diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 56cba09120b..31f88814044 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -28,7 +28,6 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "k8s.io/kubernetes/pkg/util/diff" - "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/wait" ) @@ -36,6 +35,8 @@ const ( testNodeMonitorGracePeriod = 40 * time.Second testNodeStartupGracePeriod = 60 * time.Second testNodeMonitorPeriod = 5 * time.Second + testRateLimiterBurst = 10000 + testRateLimiterQPS = float32(10000) ) func TestMonitorNodeStatusEvictPods(t *testing.T) { @@ -74,12 +75,20 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node0", CreationTimestamp: fakeNow, + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, }, { ObjectMeta: api.ObjectMeta{ Name: "node1", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -110,6 +119,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node0", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -126,6 +139,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node1", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -166,6 +183,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node0", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -182,6 +203,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node1", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -249,6 +274,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node0", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -265,6 +294,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node1", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -305,6 +338,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node0", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -321,6 +358,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node1", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -361,6 +402,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node0", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -377,6 +422,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node1", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -417,6 +466,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node0", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -433,6 +486,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node1", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region2", + unversioned.LabelZoneFailureDomain: "zone2", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -484,6 +541,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node0", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -500,6 +561,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { ObjectMeta: api.ObjectMeta{ Name: "node-master", CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + unversioned.LabelZoneRegion: "region1", + unversioned.LabelZoneFailureDomain: "zone1", + }, }, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -536,7 +601,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, item.fakeNodeHandler, - evictionTimeout, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, + evictionTimeout, testRateLimiterQPS, testRateLimiterBurst, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } for _, ds := range item.daemonSets { @@ -553,18 +618,21 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } + zones := getZones(item.fakeNodeHandler) + for _, zone := range zones { + nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { + remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore) + if remaining { + nodeController.zoneTerminationEvictor[zone].Add(value.Value) + } + return true, 0 + }) + nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { + terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod) + return true, 0 + }) + } - nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { - remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore) - if remaining { - nodeController.terminationEvictor.Add(value.Value) - } - return true, 0 - }) - nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { - terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod) - return true, 0 - }) podEvicted := false for _, action := range item.fakeNodeHandler.Actions() { if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { @@ -606,7 +674,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { deleteWaitChan: make(chan struct{}), } nodeController := NewNodeController(nil, fnh, 10*time.Minute, - flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), + testRateLimiterQPS, testRateLimiterBurst, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.cloud = &fakecloud.FakeCloud{} @@ -626,7 +694,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" { t.Errorf("Node was not deleted") } - if nodeOnQueue := nodeController.podEvictor.Remove("node0"); nodeOnQueue { + if nodeOnQueue := nodeController.zonePodEvictor[""].Remove("node0"); nodeOnQueue { t.Errorf("Node was queued for eviction. Should have been immediately deleted.") } } @@ -839,8 +907,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), - flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) + nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst, + testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -989,8 +1057,8 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), - flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) + nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst, + testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("Case[%d] unexpected error: %v", i, err) @@ -1071,8 +1139,9 @@ func TestNodeDeletion(t *testing.T) { Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), } - nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) + nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst, + testNodeMonitorGracePeriod, testNodeStartupGracePeriod, + testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -1081,7 +1150,7 @@ func TestNodeDeletion(t *testing.T) { if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } - nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { + nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) { deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore) return true, 0 }) @@ -1097,7 +1166,6 @@ func TestNodeDeletion(t *testing.T) { } func TestCheckPod(t *testing.T) { - tcs := []struct { pod api.Pod prune bool @@ -1175,7 +1243,7 @@ func TestCheckPod(t *testing.T) { }, } - nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, 0, false) + nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) nc.nodeStore.Store.Add(&api.Node{ ObjectMeta: api.ObjectMeta{ @@ -1242,7 +1310,7 @@ func TestCleanupOrphanedPods(t *testing.T) { newPod("b", "bar"), newPod("c", "gone"), } - nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, 0, false) + nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store.Add(newNode("foo")) nc.nodeStore.Store.Add(newNode("bar")) diff --git a/pkg/controller/node/test_utils.go b/pkg/controller/node/test_utils.go index a7529b9868c..23072d77b90 100644 --- a/pkg/controller/node/test_utils.go +++ b/pkg/controller/node/test_utils.go @@ -25,6 +25,8 @@ import ( "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + utilnode "k8s.io/kubernetes/pkg/util/node" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" ) @@ -235,3 +237,13 @@ func contains(node *api.Node, nodes []*api.Node) bool { } return false } + +// Returns list of zones for all Nodes stored in FakeNodeHandler +func getZones(nodeHandler *FakeNodeHandler) []string { + nodes, _ := nodeHandler.List(api.ListOptions{}) + zones := sets.NewString() + for _, node := range nodes.Items { + zones.Insert(utilnode.GetZoneKey(&node)) + } + return zones.List() +}