All clients under ClientSet share one RateLimiter.

This commit is contained in:
gmarek 2016-04-13 00:39:39 +02:00
parent 8a8177f99e
commit b76bed0cc9
7 changed files with 35 additions and 14 deletions

View File

@ -61,8 +61,9 @@ func (g *genClientset) Imports(c *generator.Context) (imports []string) {
typedClientPath := filepath.Join(g.typedClientPath, group, version) typedClientPath := filepath.Join(g.typedClientPath, group, version)
group = normalization.BeforeFirstDot(group) group = normalization.BeforeFirstDot(group)
imports = append(imports, fmt.Sprintf("%s%s \"%s\"", version, group, typedClientPath)) imports = append(imports, fmt.Sprintf("%s%s \"%s\"", version, group, typedClientPath))
imports = append(imports, "github.com/golang/glog")
} }
imports = append(imports, "github.com/golang/glog")
imports = append(imports, "k8s.io/kubernetes/pkg/util/flowcontrol")
return return
} }
@ -143,14 +144,18 @@ func (c *Clientset) Discovery() $.DiscoveryInterface|raw$ {
var newClientsetForConfigTemplate = ` var newClientsetForConfigTemplate = `
// NewForConfig creates a new Clientset for the given config. // NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *$.Config|raw$) (*Clientset, error) { func NewForConfig(c *$.Config|raw$) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var clientset Clientset var clientset Clientset
var err error var err error
$range .allGroups$ clientset.$.Group$Client, err =$.PackageName$.NewForConfig(c) $range .allGroups$ clientset.$.Group$Client, err =$.PackageName$.NewForConfig(&configShallowCopy)
if err!=nil { if err!=nil {
return &clientset, err return &clientset, err
} }
$end$ $end$
clientset.DiscoveryClient, err = $.NewDiscoveryClientForConfig|raw$(c) clientset.DiscoveryClient, err = $.NewDiscoveryClientForConfig|raw$(&configShallowCopy)
if err!=nil { if err!=nil {
glog.Errorf("failed to create the DiscoveryClient: %v", err) glog.Errorf("failed to create the DiscoveryClient: %v", err)
} }

View File

@ -21,6 +21,7 @@ import (
unversionedtestgroup "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned" unversionedtestgroup "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned"
restclient "k8s.io/kubernetes/pkg/client/restclient" restclient "k8s.io/kubernetes/pkg/client/restclient"
discovery "k8s.io/kubernetes/pkg/client/typed/discovery" discovery "k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/util/flowcontrol"
) )
type Interface interface { type Interface interface {
@ -47,14 +48,18 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface {
// NewForConfig creates a new Clientset for the given config. // NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *restclient.Config) (*Clientset, error) { func NewForConfig(c *restclient.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var clientset Clientset var clientset Clientset
var err error var err error
clientset.TestgroupClient, err = unversionedtestgroup.NewForConfig(c) clientset.TestgroupClient, err = unversionedtestgroup.NewForConfig(&configShallowCopy)
if err != nil { if err != nil {
return &clientset, err return &clientset, err
} }
clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(c) clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)
if err != nil { if err != nil {
glog.Errorf("failed to create the DiscoveryClient: %v", err) glog.Errorf("failed to create the DiscoveryClient: %v", err)
} }

View File

@ -22,6 +22,7 @@ import (
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
restclient "k8s.io/kubernetes/pkg/client/restclient" restclient "k8s.io/kubernetes/pkg/client/restclient"
discovery "k8s.io/kubernetes/pkg/client/typed/discovery" discovery "k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/util/flowcontrol"
) )
type Interface interface { type Interface interface {
@ -55,18 +56,22 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface {
// NewForConfig creates a new Clientset for the given config. // NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *restclient.Config) (*Clientset, error) { func NewForConfig(c *restclient.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var clientset Clientset var clientset Clientset
var err error var err error
clientset.CoreClient, err = unversionedcore.NewForConfig(c) clientset.CoreClient, err = unversionedcore.NewForConfig(&configShallowCopy)
if err != nil { if err != nil {
return &clientset, err return &clientset, err
} }
clientset.ExtensionsClient, err = unversionedextensions.NewForConfig(c) clientset.ExtensionsClient, err = unversionedextensions.NewForConfig(&configShallowCopy)
if err != nil { if err != nil {
return &clientset, err return &clientset, err
} }
clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(c) clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)
if err != nil { if err != nil {
glog.Errorf("failed to create the DiscoveryClient: %v", err) glog.Errorf("failed to create the DiscoveryClient: %v", err)
} }

View File

@ -63,7 +63,7 @@ type RESTClient struct {
// NewRESTClient creates a new RESTClient. This client performs generic REST functions // NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and // such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server. // decoding of responses from the server.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, client *http.Client) *RESTClient { func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) *RESTClient {
base := *baseURL base := *baseURL
if !strings.HasSuffix(base.Path, "/") { if !strings.HasSuffix(base.Path, "/") {
base.Path += "/" base.Path += "/"
@ -79,8 +79,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
} }
var throttle flowcontrol.RateLimiter var throttle flowcontrol.RateLimiter
if maxQPS > 0 { if maxQPS > 0 && rateLimiter == nil {
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst) throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
} else if rateLimiter != nil {
throttle = rateLimiter
} }
return &RESTClient{ return &RESTClient{
base: &base, base: &base,

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/crypto" "k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/version"
) )
@ -90,6 +91,9 @@ type Config struct {
// Maximum burst for throttle // Maximum burst for throttle
Burst int Burst int
// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
} }
// TLSClientConfig contains settings to enable transport layer security // TLSClientConfig contains settings to enable transport layer security
@ -155,7 +159,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
httpClient = &http.Client{Transport: transport} httpClient = &http.Client{Transport: transport}
} }
client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, httpClient) client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
return client, nil return client, nil
} }
@ -188,7 +192,7 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
versionConfig.GroupVersion = &v versionConfig.GroupVersion = &v
} }
client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, httpClient) client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
return client, nil return client, nil
} }

View File

@ -1319,5 +1319,5 @@ func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
} }
} }
versionedAPIPath := testapi.Default.ResourcePath("", "", "") versionedAPIPath := testapi.Default.ResourcePath("", "", "")
return NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil) return NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil, nil)
} }

View File

@ -212,7 +212,7 @@ func TestStream(t *testing.T) {
server := httptest.NewServer(fakeServer(t, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols)) server := httptest.NewServer(fakeServer(t, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols))
url, _ := url.ParseRequestURI(server.URL) url, _ := url.ParseRequestURI(server.URL)
c := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil) c := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil, nil)
req := c.Post().Resource("testing") req := c.Post().Resource("testing")
if exec { if exec {