From 080cb60dab7dcdddf22bd658b8f443b2fd2af1b9 Mon Sep 17 00:00:00 2001 From: harry Date: Mon, 28 Dec 2015 17:04:29 +0800 Subject: [PATCH] Fix duplicated rate limit in scheduler Remove BindingRateLimiterSaturation metrics Update generated doc --- cmd/integration/integration.go | 2 +- docs/admin/kube-scheduler.md | 6 +- .../cmd/kube-scheduler/app/options/options.go | 6 -- plugin/cmd/kube-scheduler/app/server.go | 4 +- .../defaults/compatibility_test.go | 2 +- plugin/pkg/scheduler/factory/factory.go | 10 +-- plugin/pkg/scheduler/factory/factory_test.go | 12 ++-- plugin/pkg/scheduler/metrics/metrics.go | 8 --- plugin/pkg/scheduler/scheduler.go | 13 ---- plugin/pkg/scheduler/scheduler_test.go | 69 ------------------- test/component/scheduler/perf/util.go | 3 +- test/integration/extender_test.go | 2 +- test/integration/scheduler_test.go | 6 +- 13 files changed, 20 insertions(+), 123 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index ca5804a6751..0e9db00ac50 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -171,7 +171,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string handler.delegate = m.Handler // Scheduler - schedulerConfigFactory := factory.NewConfigFactory(cl, nil, api.DefaultSchedulerName) + schedulerConfigFactory := factory.NewConfigFactory(cl, api.DefaultSchedulerName) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { glog.Fatalf("Couldn't create scheduler config: %v", err) diff --git a/docs/admin/kube-scheduler.md b/docs/admin/kube-scheduler.md index 4a4694d8943..941f6e10925 100644 --- a/docs/admin/kube-scheduler.md +++ b/docs/admin/kube-scheduler.md @@ -56,14 +56,12 @@ kube-scheduler ``` --address=0.0.0.0: The IP address to serve on (set to 0.0.0.0 for all interfaces) --algorithm-provider="DefaultProvider": The scheduling algorithm provider to use, one of: DefaultProvider - --bind-pods-burst=100: Number of bindings per second scheduler is allowed to make during bursts - --bind-pods-qps=50: Number of bindings per second scheduler is allowed to continuously make --google-json-key="": The Google Cloud Platform Service Account JSON Key to use for authentication. --kube-api-burst=100: Burst to use while talking with kubernetes apiserver --kube-api-qps=50: QPS to use while talking with kubernetes apiserver --kubeconfig="": Path to kubeconfig file with authorization and master location information. --leader-elect[=false]: Start a leader election client and gain leadership before executing scheduler loop. Enable this when running replicated schedulers. - --leader-elect-lease-duration=15s: The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled. + --leader-elect-lease-duration=15s: The duration that non-leader candidates will wait after observing a leadershiprenewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled. --leader-elect-renew-deadline=10s: The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled. --leader-elect-retry-period=2s: The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled. --log-flush-frequency=5s: Maximum number of seconds between log flushes @@ -74,7 +72,7 @@ kube-scheduler --scheduler-name="default-scheduler": Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name' ``` -###### Auto generated by spf13/cobra on 12-Jan-2016 +###### Auto generated by spf13/cobra on 13-Jan-2016 diff --git a/plugin/cmd/kube-scheduler/app/options/options.go b/plugin/cmd/kube-scheduler/app/options/options.go index 36ca7b589d5..d4a94e58276 100644 --- a/plugin/cmd/kube-scheduler/app/options/options.go +++ b/plugin/cmd/kube-scheduler/app/options/options.go @@ -37,8 +37,6 @@ type SchedulerServer struct { EnableProfiling bool Master string Kubeconfig string - BindPodsQPS float32 - BindPodsBurst int KubeAPIQPS float32 KubeAPIBurst int SchedulerName string @@ -51,8 +49,6 @@ func NewSchedulerServer() *SchedulerServer { Port: ports.SchedulerPort, Address: net.ParseIP("0.0.0.0"), AlgorithmProvider: factory.DefaultProvider, - BindPodsQPS: 50.0, - BindPodsBurst: 100, KubeAPIQPS: 50.0, KubeAPIBurst: 100, SchedulerName: api.DefaultSchedulerName, @@ -70,8 +66,6 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.") - fs.Float32Var(&s.BindPodsQPS, "bind-pods-qps", s.BindPodsQPS, "Number of bindings per second scheduler is allowed to continuously make") - fs.IntVar(&s.BindPodsBurst, "bind-pods-burst", s.BindPodsBurst, "Number of bindings per second scheduler is allowed to make during bursts") fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver") fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'") diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index 610098a5e50..c199f49a8b2 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -32,7 +32,6 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/healthz" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" "k8s.io/kubernetes/plugin/pkg/scheduler" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" @@ -99,8 +98,9 @@ func Run(s *options.SchedulerServer) error { glog.Fatal(server.ListenAndServe()) }() - configFactory := factory.NewConfigFactory(kubeClient, util.NewTokenBucketRateLimiter(s.BindPodsQPS, s.BindPodsBurst), s.SchedulerName) + configFactory := factory.NewConfigFactory(kubeClient, s.SchedulerName) config, err := createConfig(s, configFactory) + if err != nil { glog.Fatalf("Failed to create scheduler configuration: %v", err) } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go index b2b1870ce3f..c6b39f29297 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go @@ -100,7 +100,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { if !reflect.DeepEqual(policy, tc.ExpectedPolicy) { t.Errorf("%s: Expected:\n\t%#v\nGot:\n\t%#v", v, tc.ExpectedPolicy, policy) } - _, err = factory.NewConfigFactory(nil, nil, "some-scheduler-name").CreateFromConfig(policy) + _, err = factory.NewConfigFactory(nil, "some-scheduler-name").CreateFromConfig(policy) if err != nil { t.Errorf("%s: Error constructing: %v", v, err) continue diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index a042ddeca21..328fedd11f7 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -69,8 +69,6 @@ type ConfigFactory struct { // Close this to stop all reflectors StopEverything chan struct{} - // Rate limiter for binding pods - BindPodsRateLimiter util.RateLimiter scheduledPodPopulator *framework.Controller modeler scheduler.SystemModeler @@ -82,7 +80,7 @@ type ConfigFactory struct { } // Initializes the factory. -func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter, schedulerName string) *ConfigFactory { +func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactory { c := &ConfigFactory{ Client: client, PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), @@ -99,7 +97,6 @@ func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter, sched modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{Store: c.PodQueue}, c.ScheduledPodLister) c.modeler = modeler c.PodLister = modeler.PodLister() - c.BindPodsRateLimiter = rateLimiter // On add/delete to the scheduled pods, remove from the assumed pods. // We construct this here instead of in CreateFromKeys because @@ -253,9 +250,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, NextPod: func() *api.Pod { return f.getNextPod() }, - Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), - BindPodsRateLimiter: f.BindPodsRateLimiter, - StopEverything: f.StopEverything, + Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), + StopEverything: f.StopEverything, }, nil } diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 7950c7bc702..a02d1518cf7 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -46,7 +46,7 @@ func TestCreate(t *testing.T) { // TODO: Uncomment when fix #19254 // defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}) - factory := NewConfigFactory(client, nil, api.DefaultSchedulerName) + factory := NewConfigFactory(client, api.DefaultSchedulerName) factory.Create() } @@ -65,7 +65,7 @@ func TestCreateFromConfig(t *testing.T) { // TODO: Uncomment when fix #19254 // defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}) - factory := NewConfigFactory(client, nil, api.DefaultSchedulerName) + factory := NewConfigFactory(client, api.DefaultSchedulerName) // Pre-register some predicate and priority functions RegisterFitPredicate("PredicateOne", PredicateOne) @@ -108,7 +108,7 @@ func TestCreateFromEmptyConfig(t *testing.T) { // TODO: Uncomment when fix #19254 // defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}) - factory := NewConfigFactory(client, nil, api.DefaultSchedulerName) + factory := NewConfigFactory(client, api.DefaultSchedulerName) configData = []byte(`{}`) err := latestschedulerapi.Codec.DecodeInto(configData, &policy) @@ -152,7 +152,7 @@ func TestDefaultErrorFunc(t *testing.T) { server := httptest.NewServer(mux) // TODO: Uncomment when fix #19254 // defer server.Close() - factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}), nil, api.DefaultSchedulerName) + factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}), api.DefaultSchedulerName) queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) podBackoff := podBackoff{ perPodBackoff: map[types.NamespacedName]*backoffEntry{}, @@ -324,9 +324,9 @@ func TestResponsibleForPod(t *testing.T) { // defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}) // factory of "default-scheduler" - factoryDefaultScheduler := NewConfigFactory(client, nil, api.DefaultSchedulerName) + factoryDefaultScheduler := NewConfigFactory(client, api.DefaultSchedulerName) // factory of "foo-scheduler" - factoryFooScheduler := NewConfigFactory(client, nil, "foo-scheduler") + factoryFooScheduler := NewConfigFactory(client, "foo-scheduler") // scheduler annotaions to be tested schedulerAnnotationFitsDefault := map[string]string{"scheduler.alpha.kubernetes.io/name": "default-scheduler"} schedulerAnnotationFitsFoo := map[string]string{"scheduler.alpha.kubernetes.io/name": "foo-scheduler"} diff --git a/plugin/pkg/scheduler/metrics/metrics.go b/plugin/pkg/scheduler/metrics/metrics.go index f3c07ca9af3..8491be52d72 100644 --- a/plugin/pkg/scheduler/metrics/metrics.go +++ b/plugin/pkg/scheduler/metrics/metrics.go @@ -52,13 +52,6 @@ var ( Buckets: prometheus.ExponentialBuckets(1000, 2, 15), }, ) - BindingRateLimiterSaturation = prometheus.NewGauge( - prometheus.GaugeOpts{ - Subsystem: schedulerSubsystem, - Name: "binding_ratelimiter_saturation", - Help: "Binding rateLimiter's saturation rate in percentage", - }, - ) ) var registerMetrics sync.Once @@ -70,7 +63,6 @@ func Register() { prometheus.MustRegister(E2eSchedulingLatency) prometheus.MustRegister(SchedulingAlgorithmLatency) prometheus.MustRegister(BindingLatency) - prometheus.MustRegister(BindingRateLimiterSaturation) }) } diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 16f5afc103d..371a3240ed5 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -76,10 +76,6 @@ type Config struct { Algorithm algorithm.ScheduleAlgorithm Binder Binder - // Rate at which we can create pods - // If this field is nil, we don't have any rate limit. - BindPodsRateLimiter util.RateLimiter - // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling // a pod may take some amount of time and we don't want pods to get @@ -108,20 +104,11 @@ func New(c *Config) *Scheduler { // Run begins watching and scheduling. It starts a goroutine and returns immediately. func (s *Scheduler) Run() { - if s.config.BindPodsRateLimiter != nil { - go util.Forever(func() { - sat := s.config.BindPodsRateLimiter.Saturation() - metrics.BindingRateLimiterSaturation.Set(sat) - }, metrics.BindingSaturationReportInterval) - } go util.Until(s.scheduleOne, 0, s.config.StopEverything) } func (s *Scheduler) scheduleOne() { pod := s.config.NextPod() - if s.config.BindPodsRateLimiter != nil { - s.config.BindPodsRateLimiter.Accept() - } glog.V(3).Infof("Attempting to schedule: %+v", pod) start := time.Now() diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 715461a881b..fe568cedc0d 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -296,72 +296,3 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { <-called events.Stop() } - -// Fake rate limiter that records the 'accept' tokens from the real rate limiter -type FakeRateLimiter struct { - r util.RateLimiter - acceptValues []bool -} - -func (fr *FakeRateLimiter) TryAccept() bool { - return true -} - -func (fr *FakeRateLimiter) Saturation() float64 { - return 0 -} - -func (fr *FakeRateLimiter) Stop() {} - -func (fr *FakeRateLimiter) Accept() { - fr.acceptValues = append(fr.acceptValues, fr.r.TryAccept()) -} - -func TestSchedulerRateLimitsBinding(t *testing.T) { - scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - scheduledPodLister := &cache.StoreToPodLister{Store: scheduledPodStore} - queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - queuedPodLister := &cache.StoreToPodLister{Store: queuedPodStore} - modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister) - - algo := NewGenericScheduler( - map[string]algorithm.FitPredicate{}, - []algorithm.PriorityConfig{}, - []algorithm.SchedulerExtender{}, - modeler.PodLister(), - rand.New(rand.NewSource(time.Now().UnixNano()))) - - // Rate limit to 1 pod - fr := FakeRateLimiter{util.NewTokenBucketRateLimiter(0.02, 1), []bool{}} - c := &Config{ - Modeler: modeler, - NodeLister: algorithm.FakeNodeLister( - api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}}, - ), - Algorithm: algo, - Binder: fakeBinder{func(b *api.Binding) error { - return nil - }}, - NextPod: func() *api.Pod { - return queuedPodStore.Pop().(*api.Pod) - }, - Error: func(p *api.Pod, err error) { - t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err) - }, - Recorder: &record.FakeRecorder{}, - BindPodsRateLimiter: &fr, - } - - s := New(c) - firstPod := podWithID("foo", "") - secondPod := podWithID("boo", "") - queuedPodStore.Add(firstPod) - queuedPodStore.Add(secondPod) - - for i, hitRateLimit := range []bool{true, false} { - s.scheduleOne() - if fr.acceptValues[i] != hitRateLimit { - t.Errorf("Unexpected rate limiting, expect rate limit to be: %v but found it was %v", hitRateLimit, fr.acceptValues[i]) - } - } -} diff --git a/test/component/scheduler/perf/util.go b/test/component/scheduler/perf/util.go index 8d00dfb0681..5e74f94f300 100644 --- a/test/component/scheduler/perf/util.go +++ b/test/component/scheduler/perf/util.go @@ -39,7 +39,6 @@ import ( // It returns scheduler config factory and destroyFunc which should be used to // remove resources after finished. // Notes on rate limiter: -// - The BindPodsRateLimiter is nil, meaning no rate limits. // - client rate limit is set to 5000. func mustSetupScheduler() (schedulerConfigFactory *factory.ConfigFactory, destroyFunc func()) { framework.DeleteAllEtcdKeys() @@ -58,7 +57,7 @@ func mustSetupScheduler() (schedulerConfigFactory *factory.ConfigFactory, destro Burst: 5000, }) - schedulerConfigFactory = factory.NewConfigFactory(c, nil, api.DefaultSchedulerName) + schedulerConfigFactory = factory.NewConfigFactory(c, api.DefaultSchedulerName) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { panic("Couldn't create scheduler config") diff --git a/test/integration/extender_test.go b/test/integration/extender_test.go index 3860a577d1a..ddb32d91e08 100644 --- a/test/integration/extender_test.go +++ b/test/integration/extender_test.go @@ -241,7 +241,7 @@ func TestSchedulerExtender(t *testing.T) { } policy.APIVersion = testapi.Default.GroupVersion().String() - schedulerConfigFactory := factory.NewConfigFactory(restClient, nil, api.DefaultSchedulerName) + schedulerConfigFactory := factory.NewConfigFactory(restClient, api.DefaultSchedulerName) schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy) if err != nil { t.Fatalf("Couldn't create scheduler config: %v", err) diff --git a/test/integration/scheduler_test.go b/test/integration/scheduler_test.go index dea6bf77fe8..a94b23b28d7 100644 --- a/test/integration/scheduler_test.go +++ b/test/integration/scheduler_test.go @@ -66,7 +66,7 @@ func TestUnschedulableNodes(t *testing.T) { restClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()}) - schedulerConfigFactory := factory.NewConfigFactory(restClient, nil, api.DefaultSchedulerName) + schedulerConfigFactory := factory.NewConfigFactory(restClient, api.DefaultSchedulerName) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { t.Fatalf("Couldn't create scheduler config: %v", err) @@ -309,7 +309,7 @@ func TestMultiScheduler(t *testing.T) { // 1. create and start default-scheduler restClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()}) - schedulerConfigFactory := factory.NewConfigFactory(restClient, nil, api.DefaultSchedulerName) + schedulerConfigFactory := factory.NewConfigFactory(restClient, api.DefaultSchedulerName) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { t.Fatalf("Couldn't create scheduler config: %v", err) @@ -380,7 +380,7 @@ func TestMultiScheduler(t *testing.T) { // 5. create and start a scheduler with name "foo-scheduler" restClient2 := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()}) - schedulerConfigFactory2 := factory.NewConfigFactory(restClient2, nil, "foo-scheduler") + schedulerConfigFactory2 := factory.NewConfigFactory(restClient2, "foo-scheduler") schedulerConfig2, err := schedulerConfigFactory2.Create() if err != nil { t.Errorf("Couldn't create scheduler config: %v", err)