Merge pull request #24166 from gmarek/client

Automatic merge from submit-queue

All clients under ClientSet share one RateLimiter.

Currently we create a rate limiter for each client in client set. It makes the reasoning about rate limiting behavior much harder. This PR changes this behavior and now all clients in the set share single rate limiter. Ref. #24157

cc @lavalamp @wojtek-t
This commit is contained in:
k8s-merge-robot 2016-04-21 22:31:23 -07:00
commit 495251b983
7 changed files with 35 additions and 14 deletions

View File

@ -61,8 +61,9 @@ func (g *genClientset) Imports(c *generator.Context) (imports []string) {
typedClientPath := filepath.Join(g.typedClientPath, group, version) typedClientPath := filepath.Join(g.typedClientPath, group, version)
group = normalization.BeforeFirstDot(group) group = normalization.BeforeFirstDot(group)
imports = append(imports, fmt.Sprintf("%s%s \"%s\"", version, group, typedClientPath)) imports = append(imports, fmt.Sprintf("%s%s \"%s\"", version, group, typedClientPath))
imports = append(imports, "github.com/golang/glog")
} }
imports = append(imports, "github.com/golang/glog")
imports = append(imports, "k8s.io/kubernetes/pkg/util/flowcontrol")
return return
} }
@ -143,14 +144,18 @@ func (c *Clientset) Discovery() $.DiscoveryInterface|raw$ {
var newClientsetForConfigTemplate = ` var newClientsetForConfigTemplate = `
// NewForConfig creates a new Clientset for the given config. // NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *$.Config|raw$) (*Clientset, error) { func NewForConfig(c *$.Config|raw$) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var clientset Clientset var clientset Clientset
var err error var err error
$range .allGroups$ clientset.$.Group$Client, err =$.PackageName$.NewForConfig(c) $range .allGroups$ clientset.$.Group$Client, err =$.PackageName$.NewForConfig(&configShallowCopy)
if err!=nil { if err!=nil {
return &clientset, err return &clientset, err
} }
$end$ $end$
clientset.DiscoveryClient, err = $.NewDiscoveryClientForConfig|raw$(c) clientset.DiscoveryClient, err = $.NewDiscoveryClientForConfig|raw$(&configShallowCopy)
if err!=nil { if err!=nil {
glog.Errorf("failed to create the DiscoveryClient: %v", err) glog.Errorf("failed to create the DiscoveryClient: %v", err)
} }

View File

@ -21,6 +21,7 @@ import (
unversionedtestgroup "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned" unversionedtestgroup "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned"
restclient "k8s.io/kubernetes/pkg/client/restclient" restclient "k8s.io/kubernetes/pkg/client/restclient"
discovery "k8s.io/kubernetes/pkg/client/typed/discovery" discovery "k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/util/flowcontrol"
) )
type Interface interface { type Interface interface {
@ -47,14 +48,18 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface {
// NewForConfig creates a new Clientset for the given config. // NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *restclient.Config) (*Clientset, error) { func NewForConfig(c *restclient.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var clientset Clientset var clientset Clientset
var err error var err error
clientset.TestgroupClient, err = unversionedtestgroup.NewForConfig(c) clientset.TestgroupClient, err = unversionedtestgroup.NewForConfig(&configShallowCopy)
if err != nil { if err != nil {
return &clientset, err return &clientset, err
} }
clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(c) clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)
if err != nil { if err != nil {
glog.Errorf("failed to create the DiscoveryClient: %v", err) glog.Errorf("failed to create the DiscoveryClient: %v", err)
} }

View File

@ -22,6 +22,7 @@ import (
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
restclient "k8s.io/kubernetes/pkg/client/restclient" restclient "k8s.io/kubernetes/pkg/client/restclient"
discovery "k8s.io/kubernetes/pkg/client/typed/discovery" discovery "k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/util/flowcontrol"
) )
type Interface interface { type Interface interface {
@ -55,18 +56,22 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface {
// NewForConfig creates a new Clientset for the given config. // NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *restclient.Config) (*Clientset, error) { func NewForConfig(c *restclient.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var clientset Clientset var clientset Clientset
var err error var err error
clientset.CoreClient, err = unversionedcore.NewForConfig(c) clientset.CoreClient, err = unversionedcore.NewForConfig(&configShallowCopy)
if err != nil { if err != nil {
return &clientset, err return &clientset, err
} }
clientset.ExtensionsClient, err = unversionedextensions.NewForConfig(c) clientset.ExtensionsClient, err = unversionedextensions.NewForConfig(&configShallowCopy)
if err != nil { if err != nil {
return &clientset, err return &clientset, err
} }
clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(c) clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)
if err != nil { if err != nil {
glog.Errorf("failed to create the DiscoveryClient: %v", err) glog.Errorf("failed to create the DiscoveryClient: %v", err)
} }

View File

@ -63,7 +63,7 @@ type RESTClient struct {
// NewRESTClient creates a new RESTClient. This client performs generic REST functions // NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and // such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server. // decoding of responses from the server.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, client *http.Client) *RESTClient { func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) *RESTClient {
base := *baseURL base := *baseURL
if !strings.HasSuffix(base.Path, "/") { if !strings.HasSuffix(base.Path, "/") {
base.Path += "/" base.Path += "/"
@ -79,8 +79,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
} }
var throttle flowcontrol.RateLimiter var throttle flowcontrol.RateLimiter
if maxQPS > 0 { if maxQPS > 0 && rateLimiter == nil {
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst) throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
} else if rateLimiter != nil {
throttle = rateLimiter
} }
return &RESTClient{ return &RESTClient{
base: &base, base: &base,

View File

@ -33,6 +33,7 @@ import (
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/crypto" "k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/version"
) )
@ -94,6 +95,9 @@ type Config struct {
// Maximum burst for throttle // Maximum burst for throttle
Burst int Burst int
// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
} }
// TLSClientConfig contains settings to enable transport layer security // TLSClientConfig contains settings to enable transport layer security
@ -159,7 +163,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
httpClient = &http.Client{Transport: transport} httpClient = &http.Client{Transport: transport}
} }
client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, httpClient) client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
return client, nil return client, nil
} }
@ -192,7 +196,7 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
versionConfig.GroupVersion = &v versionConfig.GroupVersion = &v
} }
client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, httpClient) client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
return client, nil return client, nil
} }

View File

@ -1319,5 +1319,5 @@ func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
} }
} }
versionedAPIPath := testapi.Default.ResourcePath("", "", "") versionedAPIPath := testapi.Default.ResourcePath("", "", "")
return NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil) return NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil, nil)
} }

View File

@ -212,7 +212,7 @@ func TestStream(t *testing.T) {
server := httptest.NewServer(fakeServer(t, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols)) server := httptest.NewServer(fakeServer(t, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols))
url, _ := url.ParseRequestURI(server.URL) url, _ := url.ParseRequestURI(server.URL)
c := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil) c := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil, nil)
req := c.Post().Resource("testing") req := c.Post().Resource("testing")
if exec { if exec {