mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Allow changing Schedulers RateLimitter setting during startup.
This commit is contained in:
parent
6129d3d4eb
commit
4cc0a2f117
@ -176,7 +176,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
|
|||||||
handler.delegate = m.Handler
|
handler.delegate = m.Handler
|
||||||
|
|
||||||
// Scheduler
|
// Scheduler
|
||||||
schedulerConfigFactory := factory.NewConfigFactory(cl)
|
schedulerConfigFactory := factory.NewConfigFactory(cl, nil)
|
||||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Couldn't create scheduler config: %v", err)
|
glog.Fatalf("Couldn't create scheduler config: %v", err)
|
||||||
|
@ -65,6 +65,11 @@ var (
|
|||||||
deletingPodsBurst = flag.Int("deleting-pods-burst", 10, "")
|
deletingPodsBurst = flag.Int("deleting-pods-burst", 10, "")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
bindPodsQps = 15
|
||||||
|
bindPodsBurst = 20
|
||||||
|
)
|
||||||
|
|
||||||
type delegateHandler struct {
|
type delegateHandler struct {
|
||||||
delegate http.Handler
|
delegate http.Handler
|
||||||
}
|
}
|
||||||
@ -118,7 +123,7 @@ func runApiServer(etcdClient tools.EtcdClient, addr net.IP, port int, masterServ
|
|||||||
// RunScheduler starts up a scheduler in it's own goroutine
|
// RunScheduler starts up a scheduler in it's own goroutine
|
||||||
func runScheduler(cl *client.Client) {
|
func runScheduler(cl *client.Client) {
|
||||||
// Scheduler
|
// Scheduler
|
||||||
schedulerConfigFactory := factory.NewConfigFactory(cl)
|
schedulerConfigFactory := factory.NewConfigFactory(cl, util.NewTokenBucketRateLimiter(bindPodsQps, bindPodsBurst))
|
||||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Couldn't create scheduler config: %v", err)
|
glog.Fatalf("Couldn't create scheduler config: %v", err)
|
||||||
|
@ -54,6 +54,8 @@ type SchedulerServer struct {
|
|||||||
EnableProfiling bool
|
EnableProfiling bool
|
||||||
Master string
|
Master string
|
||||||
Kubeconfig string
|
Kubeconfig string
|
||||||
|
BindPodsQPS float32
|
||||||
|
BindPodsBurst int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSchedulerServer creates a new SchedulerServer with default parameters
|
// NewSchedulerServer creates a new SchedulerServer with default parameters
|
||||||
@ -75,6 +77,8 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
||||||
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
||||||
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
|
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
|
||||||
|
fs.Float32Var(&s.BindPodsQPS, "bind-pods-qps", 15.0, "Number of bindings per second scheduler is allowed to continuously make")
|
||||||
|
fs.IntVar(&s.BindPodsBurst, "bind-pods-burst", 20, "Number of bindings per second scheduler is allowed to make during bursts")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run runs the specified SchedulerServer. This should never exit.
|
// Run runs the specified SchedulerServer. This should never exit.
|
||||||
@ -116,7 +120,7 @@ func (s *SchedulerServer) Run(_ []string) error {
|
|||||||
glog.Fatal(server.ListenAndServe())
|
glog.Fatal(server.ListenAndServe())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
configFactory := factory.NewConfigFactory(kubeClient)
|
configFactory := factory.NewConfigFactory(kubeClient, util.NewTokenBucketRateLimiter(s.BindPodsQPS, s.BindPodsBurst))
|
||||||
config, err := s.createConfig(configFactory)
|
config, err := s.createConfig(configFactory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Failed to create scheduler configuration: %v", err)
|
glog.Fatalf("Failed to create scheduler configuration: %v", err)
|
||||||
|
@ -38,13 +38,6 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Rate limitations for binding pods to hosts.
|
|
||||||
// TODO: expose these as cmd line flags.
|
|
||||||
const (
|
|
||||||
BindPodsQps = 15
|
|
||||||
BindPodsBurst = 20
|
|
||||||
)
|
|
||||||
|
|
||||||
// ConfigFactory knows how to fill out a scheduler config with its support functions.
|
// ConfigFactory knows how to fill out a scheduler config with its support functions.
|
||||||
type ConfigFactory struct {
|
type ConfigFactory struct {
|
||||||
Client *client.Client
|
Client *client.Client
|
||||||
@ -71,7 +64,7 @@ type ConfigFactory struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initializes the factory.
|
// Initializes the factory.
|
||||||
func NewConfigFactory(client *client.Client) *ConfigFactory {
|
func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter) *ConfigFactory {
|
||||||
c := &ConfigFactory{
|
c := &ConfigFactory{
|
||||||
Client: client,
|
Client: client,
|
||||||
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
|
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
|
||||||
@ -85,7 +78,7 @@ func NewConfigFactory(client *client.Client) *ConfigFactory {
|
|||||||
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister)
|
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister)
|
||||||
c.modeler = modeler
|
c.modeler = modeler
|
||||||
c.PodLister = modeler.PodLister()
|
c.PodLister = modeler.PodLister()
|
||||||
c.BindPodsRateLimiter = util.NewTokenBucketRateLimiter(BindPodsQps, BindPodsBurst)
|
c.BindPodsRateLimiter = rateLimiter
|
||||||
|
|
||||||
// On add/delete to the scheduled pods, remove from the assumed pods.
|
// On add/delete to the scheduled pods, remove from the assumed pods.
|
||||||
// We construct this here instead of in CreateFromKeys because
|
// We construct this here instead of in CreateFromKeys because
|
||||||
|
@ -44,7 +44,7 @@ func TestCreate(t *testing.T) {
|
|||||||
server := httptest.NewServer(&handler)
|
server := httptest.NewServer(&handler)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
|
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
|
||||||
factory := NewConfigFactory(client)
|
factory := NewConfigFactory(client, nil)
|
||||||
factory.Create()
|
factory.Create()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,7 +62,7 @@ func TestCreateFromConfig(t *testing.T) {
|
|||||||
server := httptest.NewServer(&handler)
|
server := httptest.NewServer(&handler)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
|
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
|
||||||
factory := NewConfigFactory(client)
|
factory := NewConfigFactory(client, nil)
|
||||||
|
|
||||||
// Pre-register some predicate and priority functions
|
// Pre-register some predicate and priority functions
|
||||||
RegisterFitPredicate("PredicateOne", PredicateOne)
|
RegisterFitPredicate("PredicateOne", PredicateOne)
|
||||||
@ -104,7 +104,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
|
|||||||
server := httptest.NewServer(&handler)
|
server := httptest.NewServer(&handler)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
|
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
|
||||||
factory := NewConfigFactory(client)
|
factory := NewConfigFactory(client, nil)
|
||||||
|
|
||||||
configData = []byte(`{}`)
|
configData = []byte(`{}`)
|
||||||
err := latestschedulerapi.Codec.DecodeInto(configData, &policy)
|
err := latestschedulerapi.Codec.DecodeInto(configData, &policy)
|
||||||
@ -150,7 +150,7 @@ func TestDefaultErrorFunc(t *testing.T) {
|
|||||||
mux.Handle(testapi.ResourcePath("pods", "bar", "foo"), &handler)
|
mux.Handle(testapi.ResourcePath("pods", "bar", "foo"), &handler)
|
||||||
server := httptest.NewServer(mux)
|
server := httptest.NewServer(mux)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}))
|
factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}), nil)
|
||||||
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
|
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
|
||||||
podBackoff := podBackoff{
|
podBackoff := podBackoff{
|
||||||
perPodBackoff: map[string]*backoffEntry{},
|
perPodBackoff: map[string]*backoffEntry{},
|
||||||
|
@ -79,7 +79,7 @@ func TestUnschedulableNodes(t *testing.T) {
|
|||||||
|
|
||||||
restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()})
|
restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()})
|
||||||
|
|
||||||
schedulerConfigFactory := factory.NewConfigFactory(restClient)
|
schedulerConfigFactory := factory.NewConfigFactory(restClient, nil)
|
||||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Couldn't create scheduler config: %v", err)
|
t.Fatalf("Couldn't create scheduler config: %v", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user