diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 61669ac6f00..f38bde3981c 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -176,7 +176,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st handler.delegate = m.Handler // Scheduler - schedulerConfigFactory := factory.NewConfigFactory(cl) + schedulerConfigFactory := factory.NewConfigFactory(cl, nil) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { glog.Fatalf("Couldn't create scheduler config: %v", err) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 7ddb3013e7e..92c8e28f2a2 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -65,6 +65,11 @@ var ( deletingPodsBurst = flag.Int("deleting-pods-burst", 10, "") ) +const ( + bindPodsQps = 15 + bindPodsBurst = 20 +) + type delegateHandler struct { delegate http.Handler } @@ -118,7 +123,7 @@ func runApiServer(etcdClient tools.EtcdClient, addr net.IP, port int, masterServ // RunScheduler starts up a scheduler in it's own goroutine func runScheduler(cl *client.Client) { // Scheduler - schedulerConfigFactory := factory.NewConfigFactory(cl) + schedulerConfigFactory := factory.NewConfigFactory(cl, util.NewTokenBucketRateLimiter(bindPodsQps, bindPodsBurst)) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { glog.Fatalf("Couldn't create scheduler config: %v", err) diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index a59c498dedc..921651cae50 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -54,6 +54,8 @@ type SchedulerServer struct { EnableProfiling bool Master string Kubeconfig string + BindPodsQPS float32 + BindPodsBurst int } // NewSchedulerServer creates a new SchedulerServer with default parameters @@ -75,6 +77,8 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.") + fs.Float32Var(&s.BindPodsQPS, "bind-pods-qps", 15.0, "Number of bindings per second scheduler is allowed to continuously make") + fs.IntVar(&s.BindPodsBurst, "bind-pods-burst", 20, "Number of bindings per second scheduler is allowed to make during bursts") } // Run runs the specified SchedulerServer. This should never exit. @@ -116,7 +120,7 @@ func (s *SchedulerServer) Run(_ []string) error { glog.Fatal(server.ListenAndServe()) }() - configFactory := factory.NewConfigFactory(kubeClient) + configFactory := factory.NewConfigFactory(kubeClient, util.NewTokenBucketRateLimiter(s.BindPodsQPS, s.BindPodsBurst)) config, err := s.createConfig(configFactory) if err != nil { glog.Fatalf("Failed to create scheduler configuration: %v", err) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index a1485d1c7e8..b496b3ed8d7 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -38,13 +38,6 @@ import ( "github.com/golang/glog" ) -// Rate limitations for binding pods to hosts. -// TODO: expose these as cmd line flags. -const ( - BindPodsQps = 15 - BindPodsBurst = 20 -) - // ConfigFactory knows how to fill out a scheduler config with its support functions. type ConfigFactory struct { Client *client.Client @@ -71,7 +64,7 @@ type ConfigFactory struct { } // Initializes the factory. -func NewConfigFactory(client *client.Client) *ConfigFactory { +func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter) *ConfigFactory { c := &ConfigFactory{ Client: client, PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), @@ -85,7 +78,7 @@ func NewConfigFactory(client *client.Client) *ConfigFactory { modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister) c.modeler = modeler c.PodLister = modeler.PodLister() - c.BindPodsRateLimiter = util.NewTokenBucketRateLimiter(BindPodsQps, BindPodsBurst) + c.BindPodsRateLimiter = rateLimiter // On add/delete to the scheduled pods, remove from the assumed pods. // We construct this here instead of in CreateFromKeys because diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 154c9bf854b..1b3819a91d6 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -44,7 +44,7 @@ func TestCreate(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) - factory := NewConfigFactory(client) + factory := NewConfigFactory(client, nil) factory.Create() } @@ -62,7 +62,7 @@ func TestCreateFromConfig(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) - factory := NewConfigFactory(client) + factory := NewConfigFactory(client, nil) // Pre-register some predicate and priority functions RegisterFitPredicate("PredicateOne", PredicateOne) @@ -104,7 +104,7 @@ func TestCreateFromEmptyConfig(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) - factory := NewConfigFactory(client) + factory := NewConfigFactory(client, nil) configData = []byte(`{}`) err := latestschedulerapi.Codec.DecodeInto(configData, &policy) @@ -150,7 +150,7 @@ func TestDefaultErrorFunc(t *testing.T) { mux.Handle(testapi.ResourcePath("pods", "bar", "foo"), &handler) server := httptest.NewServer(mux) defer server.Close() - factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})) + factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}), nil) queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) podBackoff := podBackoff{ perPodBackoff: map[string]*backoffEntry{}, diff --git a/test/integration/scheduler_test.go b/test/integration/scheduler_test.go index beb0c285af5..84416707e26 100644 --- a/test/integration/scheduler_test.go +++ b/test/integration/scheduler_test.go @@ -79,7 +79,7 @@ func TestUnschedulableNodes(t *testing.T) { restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()}) - schedulerConfigFactory := factory.NewConfigFactory(restClient) + schedulerConfigFactory := factory.NewConfigFactory(restClient, nil) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { t.Fatalf("Couldn't create scheduler config: %v", err)