From 8396df57e6d07567be1a88ae1e9744bf5ffdfefa Mon Sep 17 00:00:00 2001 From: Kris Date: Thu, 11 Feb 2016 13:50:05 -0800 Subject: [PATCH] Never rate limit watches --- pkg/client/unversioned/fake/fake.go | 2 +- pkg/client/unversioned/request.go | 20 +++++++++++++++++++- pkg/client/unversioned/request_test.go | 10 +++++----- pkg/client/unversioned/restclient.go | 8 ++------ 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/pkg/client/unversioned/fake/fake.go b/pkg/client/unversioned/fake/fake.go index 2fad59cc1eb..3a314708fab 100644 --- a/pkg/client/unversioned/fake/fake.go +++ b/pkg/client/unversioned/fake/fake.go @@ -70,7 +70,7 @@ func (c *RESTClient) Delete() *unversioned.Request { } func (c *RESTClient) request(verb string) *unversioned.Request { - return unversioned.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", unversioned.ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: c.Codec}, nil) + return unversioned.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", unversioned.ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: c.Codec}, nil, nil) } func (c *RESTClient) Do(req *http.Request) (*http.Response, error) { diff --git a/pkg/client/unversioned/request.go b/pkg/client/unversioned/request.go index 3baa5f29ed8..c4696199951 100644 --- a/pkg/client/unversioned/request.go +++ b/pkg/client/unversioned/request.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" @@ -110,10 +111,11 @@ type Request struct { resp *http.Response backoffMgr BackoffManager + throttle util.RateLimiter } // NewRequest creates a new request helper object for accessing runtime.Objects on a server. -func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager) *Request { +func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle util.RateLimiter) *Request { if backoff == nil { glog.V(2).Infof("Not implementing request backoff strategy.") backoff = &NoBackoff{} @@ -130,6 +132,7 @@ func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPa pathPrefix: path.Join(pathPrefix, versionedAPIPath), content: content, backoffMgr: backoff, + throttle: throttle, } if len(content.ContentType) > 0 { r.SetHeader("Accept", content.ContentType+", */*") @@ -612,6 +615,8 @@ func (r Request) finalURLTemplate() string { // Watch attempts to begin watching the requested location. // Returns a watch.Interface, or an error. func (r *Request) Watch() (watch.Interface, error) { + // We specifically don't want to rate limit watches, so we + // don't use r.throttle here. if r.err != nil { return nil, r.err } @@ -677,6 +682,11 @@ func (r *Request) Stream() (io.ReadCloser, error) { if r.err != nil { return nil, r.err } + + if r.throttle != nil { + r.throttle.Accept() + } + url := r.URL().String() req, err := http.NewRequest(r.verb, url, nil) if err != nil { @@ -809,6 +819,10 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { // * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError // * http.Client.Do errors are returned directly. func (r *Request) Do() Result { + if r.throttle != nil { + r.throttle.Accept() + } + var result Result err := r.request(func(req *http.Request, resp *http.Response) { result = r.transformResponse(resp, req) @@ -821,6 +835,10 @@ func (r *Request) Do() Result { // DoRaw executes the request but does not process the response body. func (r *Request) DoRaw() ([]byte, error) { + if r.throttle != nil { + r.throttle.Accept() + } + var result Result err := r.request(func(req *http.Request, resp *http.Response) { result.body, result.err = ioutil.ReadAll(resp.Body) diff --git a/pkg/client/unversioned/request_test.go b/pkg/client/unversioned/request_test.go index 2b5bf6d2977..266be6b242f 100644 --- a/pkg/client/unversioned/request_test.go +++ b/pkg/client/unversioned/request_test.go @@ -46,11 +46,11 @@ import ( ) func TestNewRequestSetsAccept(t *testing.T) { - r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, nil) + r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, nil, nil) if r.headers.Get("Accept") != "" { t.Errorf("unexpected headers: %#v", r.headers) } - r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, nil) + r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, nil, nil) if r.headers.Get("Accept") != "application/other, */*" { t.Errorf("unexpected headers: %#v", r.headers) } @@ -276,7 +276,7 @@ func TestResultIntoWithErrReturnsErr(t *testing.T) { func TestURLTemplate(t *testing.T) { uri, _ := url.Parse("http://localhost") - r := NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil) + r := NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil) r.Prefix("pre1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0") full := r.URL() if full.String() != "http://localhost/pre1/namespaces/ns/r1/nm?p0=v0" { @@ -337,7 +337,7 @@ func TestTransformResponse(t *testing.T) { {Response: &http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid}, } for i, test := range testCases { - r := NewRequest(nil, "", uri, "", ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, nil) + r := NewRequest(nil, "", uri, "", ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, nil, nil) if test.Response.Body == nil { test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{})) } @@ -1126,7 +1126,7 @@ func TestUintParam(t *testing.T) { for _, item := range table { u, _ := url.Parse("http://localhost") - r := NewRequest(nil, "GET", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil).AbsPath("").UintParam(item.name, item.testVal) + r := NewRequest(nil, "GET", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil).AbsPath("").UintParam(item.name, item.testVal) if e, a := item.expectStr, r.URL().String(); e != a { t.Errorf("expected %v, got %v", e, a) } diff --git a/pkg/client/unversioned/restclient.go b/pkg/client/unversioned/restclient.go index 914eee0c650..3946754f31e 100644 --- a/pkg/client/unversioned/restclient.go +++ b/pkg/client/unversioned/restclient.go @@ -121,16 +121,12 @@ func readExpBackoffConfig() BackoffManager { // list, ok := resp.(*api.PodList) // func (c *RESTClient) Verb(verb string) *Request { - if c.Throttle != nil { - c.Throttle.Accept() - } - backoff := readExpBackoffConfig() if c.Client == nil { - return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff) + return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle) } - return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff) + return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle) } // Post begins a POST request. Short for c.Verb("POST").