From 4004a68ae983fec2e24741e10833d6f9f78f741b Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Mon, 18 May 2015 17:38:05 -0700 Subject: [PATCH] Limit the scheduler to a burst qps of 30 and rate limit pod binding --- plugin/cmd/kube-scheduler/app/server.go | 2 +- plugin/pkg/scheduler/factory/factory.go | 15 +++++- plugin/pkg/scheduler/scheduler.go | 7 +++ plugin/pkg/scheduler/scheduler_test.go | 64 +++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 3 deletions(-) diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index e2cca460b57..c2693a1dc88 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -92,7 +92,7 @@ func (s *SchedulerServer) Run(_ []string) error { return err } kubeconfig.QPS = 20.0 - kubeconfig.Burst = 100 + kubeconfig.Burst = 30 kubeClient, err := client.New(kubeconfig) if err != nil { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 87515b45cdb..ddc807b1e8f 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -38,6 +38,13 @@ import ( "github.com/golang/glog" ) +// Rate limitations for binding pods to hosts. +// TODO: expose these as cmd line flags. +const ( + BindPodsQps = 15 + BindPodsBurst = 20 +) + // ConfigFactory knows how to fill out a scheduler config with its support functions. type ConfigFactory struct { Client *client.Client @@ -54,6 +61,8 @@ type ConfigFactory struct { // Close this to stop all reflectors StopEverything chan struct{} + // Rate limiter for binding pods + BindPodsRateLimiter util.RateLimiter scheduledPodPopulator *framework.Controller modeler scheduler.SystemModeler @@ -73,6 +82,7 @@ func NewConfigFactory(client *client.Client) *ConfigFactory { modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister) c.modeler = modeler c.PodLister = modeler.PodLister() + c.BindPodsRateLimiter = util.NewTokenBucketRateLimiter(BindPodsQps, BindPodsBurst) // On add/delete to the scheduled pods, remove from the assumed pods. // We construct this here instead of in CreateFromKeys because @@ -204,8 +214,9 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe glog.V(2).Infof("About to try and schedule pod %v", pod.Name) return pod }, - Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), - StopEverything: f.StopEverything, + Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), + BindPodsRateLimiter: f.BindPodsRateLimiter, + StopEverything: f.StopEverything, }, nil } diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 7d3830fcac2..e1e04128280 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -73,6 +73,9 @@ type Config struct { Algorithm algorithm.ScheduleAlgorithm Binder Binder + // Rate at which we can create pods + BindPodsRateLimiter util.RateLimiter + // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling // a pod may take some amount of time and we don't want pods to get @@ -106,6 +109,10 @@ func (s *Scheduler) Run() { func (s *Scheduler) scheduleOne() { pod := s.config.NextPod() + if s.config.BindPodsRateLimiter != nil { + s.config.BindPodsRateLimiter.Accept() + } + glog.V(3).Infof("Attempting to schedule: %v", pod) start := time.Now() defer func() { diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 39093a4c2e8..f00872aa321 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -295,3 +295,67 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { <-called events.Stop() } + +// Fake rate limiter that records the 'accept' tokens from the real rate limiter +type FakeRateLimiter struct { + r util.RateLimiter + acceptValues []bool +} + +func (fr *FakeRateLimiter) CanAccept() bool { + return true +} + +func (fr *FakeRateLimiter) Stop() {} + +func (fr *FakeRateLimiter) Accept() { + fr.acceptValues = append(fr.acceptValues, fr.r.CanAccept()) +} + +func TestSchedulerRateLimitsBinding(t *testing.T) { + scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + scheduledPodLister := &cache.StoreToPodLister{scheduledPodStore} + queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + queuedPodLister := &cache.StoreToPodLister{queuedPodStore} + modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister) + + algo := NewGenericScheduler( + map[string]algorithm.FitPredicate{}, + []algorithm.PriorityConfig{}, + modeler.PodLister(), + rand.New(rand.NewSource(time.Now().UnixNano()))) + + // Rate limit to 1 pod + fr := FakeRateLimiter{util.NewTokenBucketRateLimiter(0.02, 1), []bool{}} + c := &Config{ + Modeler: modeler, + MinionLister: algorithm.FakeMinionLister( + api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}}, + ), + Algorithm: algo, + Binder: fakeBinder{func(b *api.Binding) error { + return nil + }}, + NextPod: func() *api.Pod { + return queuedPodStore.Pop().(*api.Pod) + }, + Error: func(p *api.Pod, err error) { + t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err) + }, + Recorder: &record.FakeRecorder{}, + BindPodsRateLimiter: &fr, + } + + s := New(c) + firstPod := podWithID("foo", "") + secondPod := podWithID("boo", "") + queuedPodStore.Add(firstPod) + queuedPodStore.Add(secondPod) + + for i, hitRateLimit := range []bool{true, false} { + s.scheduleOne() + if fr.acceptValues[i] != hitRateLimit { + t.Errorf("Unexpected rate limiting, expect rate limit to be: %v but found it was %v", hitRateLimit, fr.acceptValues[i]) + } + } +}