From 1490543d5437ce0fa19008e1e6ed8ce11e95c271 Mon Sep 17 00:00:00 2001 From: gmarek Date: Tue, 19 May 2015 13:23:59 +0200 Subject: [PATCH] Move evicting pods to separate thread to allow for correct ratelimitting. --- cmd/integration/integration.go | 2 +- .../app/controllermanager.go | 2 +- cmd/kubernetes/kubernetes.go | 2 +- .../nodecontroller/nodecontroller.go | 68 ++++---- .../nodecontroller/nodecontroller_test.go | 8 +- .../nodecontroller/podevictor.go | 129 ++++++++++++++++ .../nodecontroller/podevictor_test.go | 146 ++++++++++++++++++ 7 files changed, 321 insertions(+), 36 deletions(-) create mode 100644 pkg/cloudprovider/nodecontroller/podevictor.go create mode 100644 pkg/cloudprovider/nodecontroller/podevictor_test.go diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 88a949ef835..9f9bc96cdef 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -188,7 +188,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st // TODO: Write an integration test for the replication controllers watch. go controllerManager.Run(3, util.NeverStop) - nodeController := nodecontroller.NewNodeController(nil, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), + nodeController := nodecontroller.NewNodeController(nil, cl, 10, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewFakeRateLimiter()), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(5 * time.Second) cadvisorInterface := new(cadvisor.Fake) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 9424c0bf2c9..79ca36491b3 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -208,7 +208,7 @@ func (s *CMServer) Run(_ []string) error { } nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.RegisterRetryCount, - s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), + s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 5231f4b7a88..05ad427ceb8 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -122,7 +122,7 @@ func runScheduler(cl *client.Client) { func runControllerManager(cl *client.Client) { const nodeSyncPeriod = 10 * time.Second nodeController := nodecontroller.NewNodeController( - nil, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), + nil, cl, 10, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst)), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(nodeSyncPeriod) diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller.go b/pkg/cloudprovider/nodecontroller/nodecontroller.go index 1bf7f6a015a..2d213243fcf 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller.go @@ -39,8 +39,12 @@ var ( ErrCloudInstance = errors.New("cloud provider doesn't support instances.") ) -// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update. -const nodeStatusUpdateRetry = 5 +const ( + // nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update. + nodeStatusUpdateRetry = 5 + // controls how often NodeController will try to evict Pods from non-responsive Nodes. + nodeEvictionPeriod = 100 * time.Millisecond +) type nodeStatusData struct { probeTimestamp util.Time @@ -55,6 +59,9 @@ type NodeController struct { registerRetryCount int podEvictionTimeout time.Duration deletingPodsRateLimiter util.RateLimiter + // worker that evicts pods from unresponsive nodes. + podEvictor *PodEvictor + // per Node map storing last observed Status together with a local time when it was observed. // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this // to aviod the problem with time skew across the cluster. @@ -94,7 +101,7 @@ func NewNodeController( kubeClient client.Interface, registerRetryCount int, podEvictionTimeout time.Duration, - deletingPodsRateLimiter util.RateLimiter, + podEvictor *PodEvictor, nodeMonitorGracePeriod time.Duration, nodeStartupGracePeriod time.Duration, nodeMonitorPeriod time.Duration, @@ -112,20 +119,20 @@ func NewNodeController( glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.") } return &NodeController{ - cloud: cloud, - kubeClient: kubeClient, - recorder: recorder, - registerRetryCount: registerRetryCount, - podEvictionTimeout: podEvictionTimeout, - deletingPodsRateLimiter: deletingPodsRateLimiter, - nodeStatusMap: make(map[string]nodeStatusData), - nodeMonitorGracePeriod: nodeMonitorGracePeriod, - nodeMonitorPeriod: nodeMonitorPeriod, - nodeStartupGracePeriod: nodeStartupGracePeriod, - lookupIP: net.LookupIP, - now: util.Now, - clusterCIDR: clusterCIDR, - allocateNodeCIDRs: allocateNodeCIDRs, + cloud: cloud, + kubeClient: kubeClient, + recorder: recorder, + registerRetryCount: registerRetryCount, + podEvictionTimeout: podEvictionTimeout, + podEvictor: podEvictor, + nodeStatusMap: make(map[string]nodeStatusData), + nodeMonitorGracePeriod: nodeMonitorGracePeriod, + nodeMonitorPeriod: nodeMonitorPeriod, + nodeStartupGracePeriod: nodeStartupGracePeriod, + lookupIP: net.LookupIP, + now: util.Now, + clusterCIDR: clusterCIDR, + allocateNodeCIDRs: allocateNodeCIDRs, } } @@ -179,6 +186,10 @@ func (nc *NodeController) Run(period time.Duration) { glog.Errorf("Error monitoring node status: %v", err) } }, nc.nodeMonitorPeriod) + + go util.Forever(func() { + nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) }) + }, nodeEvictionPeriod) } func (nc *NodeController) recordNodeEvent(node *api.Node, event string) { @@ -366,24 +377,19 @@ func (nc *NodeController) monitorNodeStatus() error { // Check eviction timeout. if lastReadyCondition.Status == api.ConditionFalse && nc.now().After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { - // Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node. - // Makes sure we are not removing pods from too many nodes in the same time. - glog.Infof("Evicting pods: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) - if nc.deletingPodsRateLimiter.CanAccept() { - if err := nc.deletePods(node.Name); err != nil { - glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) - } + if nc.podEvictor.AddNodeToEvict(node.Name) { + glog.Infof("Adding pods to evict: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) } } if lastReadyCondition.Status == api.ConditionUnknown && nc.now().After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) { - // Same as above. Note however, since condition unknown is posted by node controller, which means we - // need to substract monitoring grace period in order to get the real 'podEvictionTimeout'. - glog.Infof("Evicting pods2: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) - if nc.deletingPodsRateLimiter.CanAccept() { - if err := nc.deletePods(node.Name); err != nil { - glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) - } + if nc.podEvictor.AddNodeToEvict(node.Name) { + glog.Infof("Adding pods to evict2: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) + } + } + if lastReadyCondition.Status == api.ConditionTrue { + if nc.podEvictor.RemoveNodeToEvict(node.Name) { + glog.Infof("Pods on %v won't be evicted", node.Name) } } diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller_test.go b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go index d6e03d33cb1..88aa2f390ab 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller_test.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go @@ -324,8 +324,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { + podEvictor := NewPodEvictor(util.NewFakeRateLimiter()) nodeController := NewNodeController(nil, item.fakeNodeHandler, 10, - evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, + evictionTimeout, podEvictor, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -338,12 +339,15 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } + + podEvictor.TryEvict(func(nodeName string) { nodeController.deletePods(nodeName) }) podEvicted := false for _, action := range item.fakeNodeHandler.Actions { if action.Action == "delete-pod" { podEvicted = true } } + if item.expectedEvictPods != podEvicted { t.Errorf("expected pod eviction: %+v, got %+v for %+v", item.expectedEvictPods, podEvicted, item.description) @@ -527,7 +531,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(), + nodeController := NewNodeController(nil, item.fakeNodeHandler, 10, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { diff --git a/pkg/cloudprovider/nodecontroller/podevictor.go b/pkg/cloudprovider/nodecontroller/podevictor.go new file mode 100644 index 00000000000..64bced4f499 --- /dev/null +++ b/pkg/cloudprovider/nodecontroller/podevictor.go @@ -0,0 +1,129 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodecontroller + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/golang/glog" +) + +// A FIFO queue which additionally guarantees that any element can be added only once until +// it is removed. +type UniqueQueue struct { + lock sync.Mutex + queue []string + set util.StringSet +} + +// Entity responsible for evicting Pods from inserted Nodes. It uses RateLimiter to avoid +// evicting everything at once. Note that we rate limit eviction of Nodes not individual Pods. +type PodEvictor struct { + queue UniqueQueue + deletingPodsRateLimiter util.RateLimiter +} + +// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the +// Remove call. Returns true if new value was added. +func (q *UniqueQueue) Add(value string) bool { + q.lock.Lock() + defer q.lock.Unlock() + + if !q.set.Has(value) { + q.queue = append(q.queue, value) + q.set.Insert(value) + return true + } else { + return false + } +} + +// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition +// of the given value. If the value is not present does nothing and returns false. +func (q *UniqueQueue) Remove(value string) bool { + q.lock.Lock() + defer q.lock.Unlock() + + q.set.Delete(value) + for i, val := range q.queue { + if val == value { + if i > 0 && i < len(q.queue)-1 { + q.queue = append(q.queue[0:i], q.queue[i+1:len(q.queue)]...) + } else if i > 0 { + q.queue = q.queue[0 : len(q.queue)-1] + } else { + q.queue = q.queue[1:len(q.queue)] + } + return true + } + } + return false +} + +// Returns the oldest added value that wasn't returned yet. +func (q *UniqueQueue) Get() (string, bool) { + q.lock.Lock() + defer q.lock.Unlock() + if len(q.queue) == 0 { + return "", false + } + + result := q.queue[0] + q.queue = q.queue[1:len(q.queue)] + return result, true +} + +// Creates new PodEvictor which will use given RateLimiter to oversee eviction. +func NewPodEvictor(deletingPodsRateLimiter util.RateLimiter) *PodEvictor { + return &PodEvictor{ + queue: UniqueQueue{ + queue: make([]string, 0), + set: util.NewStringSet(), + }, + deletingPodsRateLimiter: deletingPodsRateLimiter, + } +} + +// Tries to evict all Pods from previously inserted Nodes. Ends prematurely if RateLimiter forbids any eviction. +// Each Node is processed only once, as long as it's not Removed, i.e. calling multiple AddNodeToEvict does not result +// with multiple evictions as long as RemoveNodeToEvict is not called. +func (pe *PodEvictor) TryEvict(delFunc func(string)) { + val, ok := pe.queue.Get() + for ok { + if pe.deletingPodsRateLimiter.CanAccept() { + glog.Infof("PodEvictor is evicting Pods on Node: %v", val) + delFunc(val) + } else { + glog.V(1).Info("PodEvictor is rate limitted.") + break + } + val, ok = pe.queue.Get() + } +} + +// Adds Node to the Evictor to be processed later. Won't add the same Node second time if it was already +// added and not removed. +func (pe *PodEvictor) AddNodeToEvict(nodeName string) bool { + return pe.queue.Add(nodeName) +} + +// Removes Node from the Evictor. The Node won't be processed until added again. +func (pe *PodEvictor) RemoveNodeToEvict(nodeName string) bool { + return pe.queue.Remove(nodeName) +} diff --git a/pkg/cloudprovider/nodecontroller/podevictor_test.go b/pkg/cloudprovider/nodecontroller/podevictor_test.go new file mode 100644 index 00000000000..cc03e5afac9 --- /dev/null +++ b/pkg/cloudprovider/nodecontroller/podevictor_test.go @@ -0,0 +1,146 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodecontroller + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +func CheckQueueEq(lhs, rhs []string) bool { + for i := 0; i < len(lhs); i++ { + if rhs[i] != lhs[i] { + return false + } + } + return true +} + +func CheckSetEq(lhs, rhs util.StringSet) bool { + return lhs.HasAll(rhs.List()...) && rhs.HasAll(lhs.List()...) +} + +func TestAddNode(t *testing.T) { + evictor := NewPodEvictor(util.NewFakeRateLimiter()) + evictor.AddNodeToEvict("first") + evictor.AddNodeToEvict("second") + evictor.AddNodeToEvict("third") + + queuePattern := []string{"first", "second", "third"} + if len(evictor.queue.queue) != len(queuePattern) { + t.Fatalf("Queue %v should have lenght %d", evictor.queue.queue, len(queuePattern)) + } + if !CheckQueueEq(queuePattern, evictor.queue.queue) { + t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern) + } + + setPattern := util.NewStringSet("first", "second", "third") + if len(evictor.queue.set) != len(setPattern) { + t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern)) + } + if !CheckSetEq(setPattern, evictor.queue.set) { + t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) + } +} + +func TestDelNode(t *testing.T) { + evictor := NewPodEvictor(util.NewFakeRateLimiter()) + evictor.AddNodeToEvict("first") + evictor.AddNodeToEvict("second") + evictor.AddNodeToEvict("third") + evictor.RemoveNodeToEvict("first") + + queuePattern := []string{"second", "third"} + if len(evictor.queue.queue) != len(queuePattern) { + t.Fatalf("Queue %v should have length %d", evictor.queue.queue, len(queuePattern)) + } + if !CheckQueueEq(queuePattern, evictor.queue.queue) { + t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern) + } + + setPattern := util.NewStringSet("second", "third") + if len(evictor.queue.set) != len(setPattern) { + t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern)) + } + if !CheckSetEq(setPattern, evictor.queue.set) { + t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) + } + + evictor = NewPodEvictor(util.NewFakeRateLimiter()) + evictor.AddNodeToEvict("first") + evictor.AddNodeToEvict("second") + evictor.AddNodeToEvict("third") + evictor.RemoveNodeToEvict("second") + + queuePattern = []string{"first", "third"} + if len(evictor.queue.queue) != len(queuePattern) { + t.Fatalf("Queue %v should have lenght %d", evictor.queue.queue, len(queuePattern)) + } + if !CheckQueueEq(queuePattern, evictor.queue.queue) { + t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern) + } + + setPattern = util.NewStringSet("first", "third") + if len(evictor.queue.set) != len(setPattern) { + t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern)) + } + if !CheckSetEq(setPattern, evictor.queue.set) { + t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) + } + + evictor = NewPodEvictor(util.NewFakeRateLimiter()) + evictor.AddNodeToEvict("first") + evictor.AddNodeToEvict("second") + evictor.AddNodeToEvict("third") + evictor.RemoveNodeToEvict("third") + + queuePattern = []string{"first", "second"} + if len(evictor.queue.queue) != len(queuePattern) { + t.Fatalf("Queue %v should have lenght %d", evictor.queue.queue, len(queuePattern)) + } + if !CheckQueueEq(queuePattern, evictor.queue.queue) { + t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern) + } + + setPattern = util.NewStringSet("first", "second") + if len(evictor.queue.set) != len(setPattern) { + t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern)) + } + if !CheckSetEq(setPattern, evictor.queue.set) { + t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) + } +} + +func TestEvictNode(t *testing.T) { + evictor := NewPodEvictor(util.NewFakeRateLimiter()) + evictor.AddNodeToEvict("first") + evictor.AddNodeToEvict("second") + evictor.AddNodeToEvict("third") + evictor.RemoveNodeToEvict("second") + + deletedMap := util.NewStringSet() + evictor.TryEvict(func(nodeName string) { deletedMap.Insert(nodeName) }) + + setPattern := util.NewStringSet("first", "third") + if len(deletedMap) != len(setPattern) { + t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern)) + } + if !CheckSetEq(setPattern, deletedMap) { + t.Errorf("Invalid map. Got %v, expected %v", deletedMap, setPattern) + } +}