Never rate limit watches

This commit is contained in:
Kris 2016-02-11 13:50:05 -08:00
parent 1478cf345a
commit 8396df57e6
4 changed files with 27 additions and 13 deletions

View File

@ -70,7 +70,7 @@ func (c *RESTClient) Delete() *unversioned.Request {
} }
func (c *RESTClient) request(verb string) *unversioned.Request { func (c *RESTClient) request(verb string) *unversioned.Request {
return unversioned.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", unversioned.ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: c.Codec}, nil) return unversioned.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", unversioned.ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: c.Codec}, nil, nil)
} }
func (c *RESTClient) Do(req *http.Request) (*http.Response, error) { func (c *RESTClient) Do(req *http.Request) (*http.Response, error) {

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -110,10 +111,11 @@ type Request struct {
resp *http.Response resp *http.Response
backoffMgr BackoffManager backoffMgr BackoffManager
throttle util.RateLimiter
} }
// NewRequest creates a new request helper object for accessing runtime.Objects on a server. // NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager) *Request { func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle util.RateLimiter) *Request {
if backoff == nil { if backoff == nil {
glog.V(2).Infof("Not implementing request backoff strategy.") glog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{} backoff = &NoBackoff{}
@ -130,6 +132,7 @@ func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPa
pathPrefix: path.Join(pathPrefix, versionedAPIPath), pathPrefix: path.Join(pathPrefix, versionedAPIPath),
content: content, content: content,
backoffMgr: backoff, backoffMgr: backoff,
throttle: throttle,
} }
if len(content.ContentType) > 0 { if len(content.ContentType) > 0 {
r.SetHeader("Accept", content.ContentType+", */*") r.SetHeader("Accept", content.ContentType+", */*")
@ -612,6 +615,8 @@ func (r Request) finalURLTemplate() string {
// Watch attempts to begin watching the requested location. // Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error. // Returns a watch.Interface, or an error.
func (r *Request) Watch() (watch.Interface, error) { func (r *Request) Watch() (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we
// don't use r.throttle here.
if r.err != nil { if r.err != nil {
return nil, r.err return nil, r.err
} }
@ -677,6 +682,11 @@ func (r *Request) Stream() (io.ReadCloser, error) {
if r.err != nil { if r.err != nil {
return nil, r.err return nil, r.err
} }
if r.throttle != nil {
r.throttle.Accept()
}
url := r.URL().String() url := r.URL().String()
req, err := http.NewRequest(r.verb, url, nil) req, err := http.NewRequest(r.verb, url, nil)
if err != nil { if err != nil {
@ -809,6 +819,10 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError // * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// * http.Client.Do errors are returned directly. // * http.Client.Do errors are returned directly.
func (r *Request) Do() Result { func (r *Request) Do() Result {
if r.throttle != nil {
r.throttle.Accept()
}
var result Result var result Result
err := r.request(func(req *http.Request, resp *http.Response) { err := r.request(func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req) result = r.transformResponse(resp, req)
@ -821,6 +835,10 @@ func (r *Request) Do() Result {
// DoRaw executes the request but does not process the response body. // DoRaw executes the request but does not process the response body.
func (r *Request) DoRaw() ([]byte, error) { func (r *Request) DoRaw() ([]byte, error) {
if r.throttle != nil {
r.throttle.Accept()
}
var result Result var result Result
err := r.request(func(req *http.Request, resp *http.Response) { err := r.request(func(req *http.Request, resp *http.Response) {
result.body, result.err = ioutil.ReadAll(resp.Body) result.body, result.err = ioutil.ReadAll(resp.Body)

View File

@ -46,11 +46,11 @@ import (
) )
func TestNewRequestSetsAccept(t *testing.T) { func TestNewRequestSetsAccept(t *testing.T) {
r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, nil) r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, nil, nil)
if r.headers.Get("Accept") != "" { if r.headers.Get("Accept") != "" {
t.Errorf("unexpected headers: %#v", r.headers) t.Errorf("unexpected headers: %#v", r.headers)
} }
r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, nil) r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, nil, nil)
if r.headers.Get("Accept") != "application/other, */*" { if r.headers.Get("Accept") != "application/other, */*" {
t.Errorf("unexpected headers: %#v", r.headers) t.Errorf("unexpected headers: %#v", r.headers)
} }
@ -276,7 +276,7 @@ func TestResultIntoWithErrReturnsErr(t *testing.T) {
func TestURLTemplate(t *testing.T) { func TestURLTemplate(t *testing.T) {
uri, _ := url.Parse("http://localhost") uri, _ := url.Parse("http://localhost")
r := NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil) r := NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil)
r.Prefix("pre1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0") r.Prefix("pre1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0")
full := r.URL() full := r.URL()
if full.String() != "http://localhost/pre1/namespaces/ns/r1/nm?p0=v0" { if full.String() != "http://localhost/pre1/namespaces/ns/r1/nm?p0=v0" {
@ -337,7 +337,7 @@ func TestTransformResponse(t *testing.T) {
{Response: &http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid}, {Response: &http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid},
} }
for i, test := range testCases { for i, test := range testCases {
r := NewRequest(nil, "", uri, "", ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, nil) r := NewRequest(nil, "", uri, "", ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, nil, nil)
if test.Response.Body == nil { if test.Response.Body == nil {
test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{})) test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
} }
@ -1126,7 +1126,7 @@ func TestUintParam(t *testing.T) {
for _, item := range table { for _, item := range table {
u, _ := url.Parse("http://localhost") u, _ := url.Parse("http://localhost")
r := NewRequest(nil, "GET", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil).AbsPath("").UintParam(item.name, item.testVal) r := NewRequest(nil, "GET", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil).AbsPath("").UintParam(item.name, item.testVal)
if e, a := item.expectStr, r.URL().String(); e != a { if e, a := item.expectStr, r.URL().String(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }

View File

@ -121,16 +121,12 @@ func readExpBackoffConfig() BackoffManager {
// list, ok := resp.(*api.PodList) // list, ok := resp.(*api.PodList)
// //
func (c *RESTClient) Verb(verb string) *Request { func (c *RESTClient) Verb(verb string) *Request {
if c.Throttle != nil {
c.Throttle.Accept()
}
backoff := readExpBackoffConfig() backoff := readExpBackoffConfig()
if c.Client == nil { if c.Client == nil {
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff) return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle)
} }
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff) return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle)
} }
// Post begins a POST request. Short for c.Verb("POST"). // Post begins a POST request. Short for c.Verb("POST").