diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 8237f55286a..867faf788ef 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -165,7 +165,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster") fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") fs.StringVar(&s.ExternalHost, "external_hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)") - fs.IntVar(&s.MaxRequestsInFlight, "max_requests_inflight", 20, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.") + fs.IntVar(&s.MaxRequestsInFlight, "max_requests_inflight", 400, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.") fs.StringVar(&s.LongRunningRequestRE, "long_running_request_regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.") } diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 0340461ed78..8d6f33d7d9a 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -99,6 +99,8 @@ func NewCMServer() *CMServer { func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on") fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") + s.ClientConfig.QPS = 20.0 + s.ClientConfig.Burst = 30 client.BindClientConfigFlags(fs, &s.ClientConfig) fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") diff --git a/contrib/for-tests/network-tester/webserver.go b/contrib/for-tests/network-tester/webserver.go index 693abec078f..205f598d28b 100644 --- a/contrib/for-tests/network-tester/webserver.go +++ b/contrib/for-tests/network-tester/webserver.go @@ -208,7 +208,7 @@ func contactOthers(state *State) { Host: os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"), Path: "/api/v1beta1", } - client := &client.Client{client.NewRESTClient(&masterRO, "v1beta1", latest.Codec, true, 0)} + client := &client.Client{client.NewRESTClient(&masterRO, "v1beta1", latest.Codec, true, 5, 10)} // Do this repeatedly, in case there's some propagation delay with getting // newly started pods into the endpoints list. diff --git a/pkg/client/flags.go b/pkg/client/flags.go index 4ad32a6cffa..750b705847d 100644 --- a/pkg/client/flags.go +++ b/pkg/client/flags.go @@ -27,6 +27,8 @@ type FlagSet interface { BoolVar(p *bool, name string, value bool, usage string) UintVar(p *uint, name string, value uint, usage string) DurationVar(p *time.Duration, name string, value time.Duration, usage string) + Float32Var(p *float32, name string, value float32, usage string) + IntVar(p *int, name string, value int, usage string) } // BindClientConfigFlags registers a standard set of CLI flags for connecting to a Kubernetes API server. @@ -38,6 +40,8 @@ func BindClientConfigFlags(flags FlagSet, config *Config) { flags.StringVar(&config.CertFile, "client_certificate", config.CertFile, "Path to a client key file for TLS.") flags.StringVar(&config.KeyFile, "client_key", config.KeyFile, "Path to a client key file for TLS.") flags.StringVar(&config.CAFile, "certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority.") + flags.Float32Var(&config.QPS, "max_outgoing_qps", config.QPS, "Maximum number of queries per second that could be issued by this client.") + flags.IntVar(&config.Burst, "max_outgoing_burst", config.Burst, "Maximum throttled burst") } func BindKubeletClientConfigFlags(flags FlagSet, config *KubeletConfig) { diff --git a/pkg/client/flags_test.go b/pkg/client/flags_test.go index 37bebbd00f7..42b02d5b280 100644 --- a/pkg/client/flags_test.go +++ b/pkg/client/flags_test.go @@ -68,11 +68,31 @@ func (f *fakeFlagSet) DurationVar(p *time.Duration, name string, value time.Dura f.set.Insert(name) } +func (f *fakeFlagSet) Float32Var(p *float32, name string, value float32, usage string) { + if p == nil { + f.t.Errorf("unexpected nil pointer") + } + if usage == "" { + f.t.Errorf("unexpected empty usage") + } + f.set.Insert(name) +} + +func (f *fakeFlagSet) IntVar(p *int, name string, value int, usage string) { + if p == nil { + f.t.Errorf("unexpected nil pointer") + } + if usage == "" { + f.t.Errorf("unexpected empty usage") + } + f.set.Insert(name) +} + func TestBindClientConfigFlags(t *testing.T) { flags := &fakeFlagSet{t, util.StringSet{}} config := &Config{} BindClientConfigFlags(flags, config) - if len(flags.set) != 6 { + if len(flags.set) != 8 { t.Errorf("unexpected flag set: %#v", flags) } } diff --git a/pkg/client/helper.go b/pkg/client/helper.go index 601efed8b15..aa607dfa07e 100644 --- a/pkg/client/helper.go +++ b/pkg/client/helper.go @@ -80,6 +80,9 @@ type Config struct { // QPS indicates the maximum QPS to the master from this client. If zero, QPS is unlimited. QPS float32 + + // Maximum burst for throttle + Burst int } type KubeletConfig struct { @@ -181,6 +184,9 @@ func SetKubernetesDefaults(config *Config) error { if config.QPS == 0.0 { config.QPS = 5.0 } + if config.Burst == 0 { + config.Burst = 10 + } return nil } @@ -201,7 +207,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) { return nil, err } - client := NewRESTClient(baseURL, config.Version, config.Codec, config.LegacyBehavior, config.QPS) + client := NewRESTClient(baseURL, config.Version, config.Codec, config.LegacyBehavior, config.QPS, config.Burst) transport, err := TransportFor(config) if err != nil { diff --git a/pkg/client/helper_test.go b/pkg/client/helper_test.go index 5045dac4e22..2a9a334b870 100644 --- a/pkg/client/helper_test.go +++ b/pkg/client/helper_test.go @@ -274,6 +274,7 @@ func TestSetKubernetesDefaults(t *testing.T) { Codec: latest.Codec, LegacyBehavior: (latest.Version == "v1beta1" || latest.Version == "v1beta2"), QPS: 5, + Burst: 10, }, false, }, diff --git a/pkg/client/restclient.go b/pkg/client/restclient.go index 795913ff732..a2447e65a3b 100644 --- a/pkg/client/restclient.go +++ b/pkg/client/restclient.go @@ -61,7 +61,7 @@ type RESTClient struct { // such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and // decoding of responses from the server. If this client should use the older, legacy // API conventions from Kubernetes API v1beta1 and v1beta2, set legacyBehavior true. -func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyBehavior bool, maxQPS float32) *RESTClient { +func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyBehavior bool, maxQPS float32, maxBurst int) *RESTClient { base := *baseURL if !strings.HasSuffix(base.Path, "/") { base.Path += "/" @@ -71,7 +71,7 @@ func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyB var throttle util.RateLimiter if maxQPS > 0 { - throttle = util.NewTokenBucketRateLimiter(maxQPS, 10) + throttle = util.NewTokenBucketRateLimiter(maxQPS, maxBurst) } return &RESTClient{ baseURL: &base, diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index d1857c334c5..6eccca2d0a7 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -48,7 +48,7 @@ func getFakeClient(t *testing.T, validURLs []string) (ClientPosterFunc, *httptes return func(mapping *meta.RESTMapping) (RESTClientPoster, error) { fakeCodec := runtime.CodecFor(api.Scheme, "v1beta1") fakeUri, _ := url.Parse(server.URL + "/api/v1beta1") - return client.NewRESTClient(fakeUri, "v1beta1", fakeCodec, true, 0), nil + return client.NewRESTClient(fakeUri, "v1beta1", fakeCodec, true, 5, 10), nil }, server } diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index 1cdc1f6cd7f..fe508fa7fde 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -67,6 +67,8 @@ func NewSchedulerServer() *SchedulerServer { func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.Port, "port", s.Port, "The port that the scheduler's http service runs on") fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") + s.ClientConfig.QPS = 20.0 + s.ClientConfig.Burst = 100 client.BindClientConfigFlags(fs, &s.ClientConfig) fs.StringVar(&s.AlgorithmProvider, "algorithm_provider", s.AlgorithmProvider, "The scheduling algorithm provider to use") fs.StringVar(&s.PolicyConfigFile, "policy_config_file", s.PolicyConfigFile, "File with scheduler policy configuration")