diff --git a/cmd/libs/go2idl/client-gen/generators/generator_for_clientset.go b/cmd/libs/go2idl/client-gen/generators/generator_for_clientset.go index 0ff91d82c2d..84ed6e997cf 100644 --- a/cmd/libs/go2idl/client-gen/generators/generator_for_clientset.go +++ b/cmd/libs/go2idl/client-gen/generators/generator_for_clientset.go @@ -61,8 +61,9 @@ func (g *genClientset) Imports(c *generator.Context) (imports []string) { typedClientPath := filepath.Join(g.typedClientPath, group, version) group = normalization.BeforeFirstDot(group) imports = append(imports, fmt.Sprintf("%s%s \"%s\"", version, group, typedClientPath)) - imports = append(imports, "github.com/golang/glog") } + imports = append(imports, "github.com/golang/glog") + imports = append(imports, "k8s.io/kubernetes/pkg/util/flowcontrol") return } @@ -143,14 +144,18 @@ func (c *Clientset) Discovery() $.DiscoveryInterface|raw$ { var newClientsetForConfigTemplate = ` // NewForConfig creates a new Clientset for the given config. func NewForConfig(c *$.Config|raw$) (*Clientset, error) { + configShallowCopy := *c + if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { + configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) + } var clientset Clientset var err error -$range .allGroups$ clientset.$.Group$Client, err =$.PackageName$.NewForConfig(c) +$range .allGroups$ clientset.$.Group$Client, err =$.PackageName$.NewForConfig(&configShallowCopy) if err!=nil { return &clientset, err } $end$ - clientset.DiscoveryClient, err = $.NewDiscoveryClientForConfig|raw$(c) + clientset.DiscoveryClient, err = $.NewDiscoveryClientForConfig|raw$(&configShallowCopy) if err!=nil { glog.Errorf("failed to create the DiscoveryClient: %v", err) } diff --git a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset.go b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset.go index 4a6c17d3130..003536335bc 100644 --- a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset.go +++ b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset.go @@ -21,6 +21,7 @@ import ( unversionedtestgroup "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned" restclient "k8s.io/kubernetes/pkg/client/restclient" discovery "k8s.io/kubernetes/pkg/client/typed/discovery" + "k8s.io/kubernetes/pkg/util/flowcontrol" ) type Interface interface { @@ -47,14 +48,18 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface { // NewForConfig creates a new Clientset for the given config. func NewForConfig(c *restclient.Config) (*Clientset, error) { + configShallowCopy := *c + if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { + configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) + } var clientset Clientset var err error - clientset.TestgroupClient, err = unversionedtestgroup.NewForConfig(c) + clientset.TestgroupClient, err = unversionedtestgroup.NewForConfig(&configShallowCopy) if err != nil { return &clientset, err } - clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(c) + clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy) if err != nil { glog.Errorf("failed to create the DiscoveryClient: %v", err) } diff --git a/pkg/client/clientset_generated/internalclientset/clientset.go b/pkg/client/clientset_generated/internalclientset/clientset.go index 8ba8558e9d7..1bcde159a61 100644 --- a/pkg/client/clientset_generated/internalclientset/clientset.go +++ b/pkg/client/clientset_generated/internalclientset/clientset.go @@ -22,6 +22,7 @@ import ( unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" restclient "k8s.io/kubernetes/pkg/client/restclient" discovery "k8s.io/kubernetes/pkg/client/typed/discovery" + "k8s.io/kubernetes/pkg/util/flowcontrol" ) type Interface interface { @@ -55,18 +56,22 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface { // NewForConfig creates a new Clientset for the given config. func NewForConfig(c *restclient.Config) (*Clientset, error) { + configShallowCopy := *c + if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { + configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) + } var clientset Clientset var err error - clientset.CoreClient, err = unversionedcore.NewForConfig(c) + clientset.CoreClient, err = unversionedcore.NewForConfig(&configShallowCopy) if err != nil { return &clientset, err } - clientset.ExtensionsClient, err = unversionedextensions.NewForConfig(c) + clientset.ExtensionsClient, err = unversionedextensions.NewForConfig(&configShallowCopy) if err != nil { return &clientset, err } - clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(c) + clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy) if err != nil { glog.Errorf("failed to create the DiscoveryClient: %v", err) } diff --git a/pkg/client/restclient/client.go b/pkg/client/restclient/client.go index 23842b628cc..0494c28a5af 100644 --- a/pkg/client/restclient/client.go +++ b/pkg/client/restclient/client.go @@ -63,7 +63,7 @@ type RESTClient struct { // NewRESTClient creates a new RESTClient. This client performs generic REST functions // such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and // decoding of responses from the server. -func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, client *http.Client) *RESTClient { +func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) *RESTClient { base := *baseURL if !strings.HasSuffix(base.Path, "/") { base.Path += "/" @@ -79,8 +79,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf } var throttle flowcontrol.RateLimiter - if maxQPS > 0 { + if maxQPS > 0 && rateLimiter == nil { throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst) + } else if rateLimiter != nil { + throttle = rateLimiter } return &RESTClient{ base: &base, diff --git a/pkg/client/restclient/config.go b/pkg/client/restclient/config.go index 6b613de96ae..c1928de0818 100644 --- a/pkg/client/restclient/config.go +++ b/pkg/client/restclient/config.go @@ -33,6 +33,7 @@ import ( clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/crypto" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/version" ) @@ -94,6 +95,9 @@ type Config struct { // Maximum burst for throttle Burst int + + // Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst + RateLimiter flowcontrol.RateLimiter } // TLSClientConfig contains settings to enable transport layer security @@ -159,7 +163,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) { httpClient = &http.Client{Transport: transport} } - client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, httpClient) + client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, config.RateLimiter, httpClient) return client, nil } @@ -192,7 +196,7 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) { versionConfig.GroupVersion = &v } - client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, httpClient) + client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient) return client, nil } diff --git a/pkg/client/restclient/request_test.go b/pkg/client/restclient/request_test.go index ccf14cb7e12..06141692e70 100644 --- a/pkg/client/restclient/request_test.go +++ b/pkg/client/restclient/request_test.go @@ -1319,5 +1319,5 @@ func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient { } } versionedAPIPath := testapi.Default.ResourcePath("", "", "") - return NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil) + return NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil, nil) } diff --git a/pkg/client/unversioned/remotecommand/remotecommand_test.go b/pkg/client/unversioned/remotecommand/remotecommand_test.go index f8010035c26..034d423917a 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand_test.go +++ b/pkg/client/unversioned/remotecommand/remotecommand_test.go @@ -212,7 +212,7 @@ func TestStream(t *testing.T) { server := httptest.NewServer(fakeServer(t, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols)) url, _ := url.ParseRequestURI(server.URL) - c := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil) + c := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil, nil) req := c.Post().Resource("testing") if exec {