Separate deletion and termination evictors in NodeController, and fix rate_limited_queue.go

This commit is contained in:
gmarek 2015-09-29 11:29:51 +02:00
parent 5d07236be6
commit a3723e2045
7 changed files with 17 additions and 15 deletions

View File

@ -202,7 +202,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
// TODO: Write an integration test for the replication controllers watch. // TODO: Write an integration test for the replication controllers watch.
go controllerManager.Run(3, util.NeverStop) go controllerManager.Run(3, util.NeverStop)
nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(), nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false) 40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5 * time.Second) nodeController.Run(5 * time.Second)
cadvisorInterface := new(cadvisor.Fake) cadvisorInterface := new(cadvisor.Fake)

View File

@ -259,6 +259,7 @@ func (s *CMServer) Run(_ []string) error {
nodeController := nodecontroller.NewNodeController(cloud, kubeClient, nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs) s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod) nodeController.Run(s.NodeSyncPeriod)

View File

@ -128,6 +128,7 @@ func (s *CMServer) Run(_ []string) error {
nodeController := nodecontroller.NewNodeController(cloud, kubeClient, nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod) nodeController.Run(s.NodeSyncPeriod)

View File

@ -105,7 +105,8 @@ func NewNodeController(
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
kubeClient client.Interface, kubeClient client.Interface,
podEvictionTimeout time.Duration, podEvictionTimeout time.Duration,
podEvictionLimiter util.RateLimiter, deletionEvictionLimiter util.RateLimiter,
terminationEvictionLimiter util.RateLimiter,
nodeMonitorGracePeriod time.Duration, nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration, nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration, nodeMonitorPeriod time.Duration,
@ -132,8 +133,8 @@ func NewNodeController(
podEvictionTimeout: podEvictionTimeout, podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute, maximumGracePeriod: 5 * time.Minute,
evictorLock: &evictorLock, evictorLock: &evictorLock,
podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter), podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter),
terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter), terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter),
nodeStatusMap: make(map[string]nodeStatusData), nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod, nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod, nodeMonitorPeriod: nodeMonitorPeriod,
@ -601,8 +602,8 @@ func (nc *NodeController) deletePods(nodeName string) (bool, error) {
continue continue
} }
glog.V(2).Infof("Delete pod %v", pod.Name) glog.V(2).Infof("Starting deletion of pod %v", pod.Name)
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeName) nc.recorder.Eventf(&pod, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err return false, err
} }

View File

@ -325,7 +325,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, evictionTimeout, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
@ -543,7 +543,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -621,7 +621,7 @@ func TestNodeDeletion(t *testing.T) {
Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
} }
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {

View File

@ -21,6 +21,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -163,16 +164,14 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
for ok { for ok {
// rate limit the queue checking // rate limit the queue checking
if !q.limiter.CanAccept() { if !q.limiter.CanAccept() {
glog.V(10).Info("Try rate limitted...")
// Try again later // Try again later
break break
} }
now := now() now := now()
if now.Before(val.ProcessAt) { if now.Before(val.ProcessAt) {
q.queue.Replace(val) break
val, ok = q.queue.Head()
// we do not sleep here because other values may be added at the front of the queue
continue
} }
if ok, wait := fn(val); !ok { if ok, wait := fn(val); !ok {

View File

@ -175,10 +175,10 @@ func TestTryOrdering(t *testing.T) {
order = append(order, value.Value) order = append(order, value.Value)
return true, 0 return true, 0
}) })
if !reflect.DeepEqual(order, []string{"first", "third", "second"}) { if !reflect.DeepEqual(order, []string{"first", "third"}) {
t.Fatalf("order was wrong: %v", order) t.Fatalf("order was wrong: %v", order)
} }
if count != 4 { if count != 3 {
t.Fatalf("unexpected iterations: %d", count) t.Fatalf("unexpected iterations: %d", count)
} }
} }