From 76e6281168c40521a45aba813c39702bcede3bdf Mon Sep 17 00:00:00 2001 From: Jay Vyas Date: Thu, 19 Nov 2015 14:23:11 -0500 Subject: [PATCH] Exponential backoff for request client, rebased. Updated license to 2015, cleaned more //[a-z] comments. Added in support for Environment variable gaurds over the backcoff w/ default NoBackoff. Rebased. --- pkg/client/unversioned/fake/fake.go | 10 +-- pkg/client/unversioned/request.go | 33 +++++++- pkg/client/unversioned/request_test.go | 46 ++++++++++- pkg/client/unversioned/restclient.go | 40 +++++++++- pkg/client/unversioned/restclient_test.go | 39 +++++++++ pkg/client/unversioned/urlbackoff.go | 97 +++++++++++++++++++++++ pkg/client/unversioned/urlbackoff_test.go | 78 ++++++++++++++++++ 7 files changed, 332 insertions(+), 11 deletions(-) create mode 100644 pkg/client/unversioned/urlbackoff.go create mode 100644 pkg/client/unversioned/urlbackoff_test.go diff --git a/pkg/client/unversioned/fake/fake.go b/pkg/client/unversioned/fake/fake.go index eca37eedb69..f7aba822da0 100644 --- a/pkg/client/unversioned/fake/fake.go +++ b/pkg/client/unversioned/fake/fake.go @@ -50,23 +50,23 @@ type RESTClient struct { } func (c *RESTClient) Get() *unversioned.Request { - return unversioned.NewRequest(c, "GET", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec) + return unversioned.NewRequest(c, "GET", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec, nil) } func (c *RESTClient) Put() *unversioned.Request { - return unversioned.NewRequest(c, "PUT", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec) + return unversioned.NewRequest(c, "PUT", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec, nil) } func (c *RESTClient) Patch(_ api.PatchType) *unversioned.Request { - return unversioned.NewRequest(c, "PATCH", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec) + return unversioned.NewRequest(c, "PATCH", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec, nil) } func (c *RESTClient) Post() *unversioned.Request { - return unversioned.NewRequest(c, "POST", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec) + return unversioned.NewRequest(c, "POST", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec, nil) } func (c *RESTClient) Delete() *unversioned.Request { - return unversioned.NewRequest(c, "DELETE", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec) + return unversioned.NewRequest(c, "DELETE", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec, nil) } func (c *RESTClient) Do(req *http.Request) (*http.Response, error) { diff --git a/pkg/client/unversioned/request.go b/pkg/client/unversioned/request.go index d71a6909fda..fa8f8b3e131 100644 --- a/pkg/client/unversioned/request.go +++ b/pkg/client/unversioned/request.go @@ -109,10 +109,17 @@ type Request struct { // The constructed request and the response req *http.Request resp *http.Response + + backoffMgr BackoffManager } // NewRequest creates a new request helper object for accessing runtime.Objects on a server. -func NewRequest(client HTTPClient, verb string, baseURL *url.URL, groupVersion unversioned.GroupVersion, codec runtime.Codec) *Request { +func NewRequest(client HTTPClient, verb string, baseURL *url.URL, groupVersion unversioned.GroupVersion, codec runtime.Codec, backoff BackoffManager) *Request { + if backoff == nil { + glog.V(2).Infof("Not implementing request backoff strategy.") + backoff = &NoBackoff{} + } + metrics.Register() return &Request{ client: client, verb: verb, @@ -120,6 +127,7 @@ func NewRequest(client HTTPClient, verb string, baseURL *url.URL, groupVersion u path: baseURL.Path, groupVersion: groupVersion, codec: codec, + backoffMgr: backoff, } } @@ -610,8 +618,16 @@ func (r *Request) Watch() (watch.Interface, error) { if client == nil { client = http.DefaultClient } + time.Sleep(r.backoffMgr.CalculateBackoff(r.URL())) resp, err := client.Do(req) updateURLMetrics(r, resp, err) + if r.baseURL != nil { + if err != nil { + r.backoffMgr.UpdateBackoff(r.baseURL, err, 0) + } else { + r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode) + } + } if err != nil { // The watch stream mechanism handles many common partial data errors, so closed // connections can be retried in many cases. @@ -663,8 +679,16 @@ func (r *Request) Stream() (io.ReadCloser, error) { if client == nil { client = http.DefaultClient } + time.Sleep(r.backoffMgr.CalculateBackoff(r.URL())) resp, err := client.Do(req) updateURLMetrics(r, resp, err) + if r.baseURL != nil { + if err != nil { + r.backoffMgr.UpdateBackoff(r.URL(), err, 0) + } else { + r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode) + } + } if err != nil { return nil, err } @@ -708,6 +732,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { }() if r.err != nil { + glog.V(4).Infof("Error in request: %v", r.err) return r.err } @@ -736,8 +761,14 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { } req.Header = r.headers + time.Sleep(r.backoffMgr.CalculateBackoff(r.URL())) resp, err := client.Do(req) updateURLMetrics(r, resp, err) + if err != nil { + r.backoffMgr.UpdateBackoff(r.URL(), err, 0) + } else { + r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode) + } if err != nil { return err } diff --git a/pkg/client/unversioned/request_test.go b/pkg/client/unversioned/request_test.go index 9b02b8e09e3..d40a15811e1 100644 --- a/pkg/client/unversioned/request_test.go +++ b/pkg/client/unversioned/request_test.go @@ -262,7 +262,7 @@ func TestResultIntoWithErrReturnsErr(t *testing.T) { func TestURLTemplate(t *testing.T) { uri, _ := url.Parse("http://localhost") - r := NewRequest(nil, "POST", uri, unversioned.GroupVersion{Group: "test"}, nil) + r := NewRequest(nil, "POST", uri, unversioned.GroupVersion{Group: "test"}, nil, nil) r.Prefix("pre1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0") full := r.URL() if full.String() != "http://localhost/pre1/namespaces/ns/r1/nm?p0=v0" { @@ -323,7 +323,7 @@ func TestTransformResponse(t *testing.T) { {Response: &http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid}, } for i, test := range testCases { - r := NewRequest(nil, "", uri, *testapi.Default.GroupVersion(), testapi.Default.Codec()) + r := NewRequest(nil, "", uri, *testapi.Default.GroupVersion(), testapi.Default.Codec(), nil) if test.Response.Body == nil { test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{})) } @@ -542,6 +542,8 @@ func TestRequestWatch(t *testing.T) { }, } for i, testCase := range testCases { + t.Logf("testcase %v", testCase.Request) + testCase.Request.backoffMgr = &NoBackoff{} watch, err := testCase.Request.Watch() hasErr := err != nil if hasErr != testCase.Err { @@ -604,6 +606,7 @@ func TestRequestStream(t *testing.T) { }, } for i, testCase := range testCases { + testCase.Request.backoffMgr = &NoBackoff{} body, err := testCase.Request.Stream() hasErr := err != nil if hasErr != testCase.Err { @@ -673,6 +676,7 @@ func TestRequestDo(t *testing.T) { }, } for i, testCase := range testCases { + testCase.Request.backoffMgr = &NoBackoff{} body, err := testCase.Request.Do().Raw() hasErr := err != nil if hasErr != testCase.Err { @@ -720,6 +724,42 @@ func TestDoRequestNewWay(t *testing.T) { fakeHandler.ValidateRequest(t, requestURL, "POST", &reqBody) } +// This test assumes that the client implementation backs off exponentially, for an individual request. +func TestBackoffLifecycle(t *testing.T) { + count := 0 + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + count++ + t.Logf("Attempt %d", count) + if count == 5 || count == 9 { + w.WriteHeader(http.StatusOK) + return + } else { + w.WriteHeader(http.StatusGatewayTimeout) + return + } + })) + defer testServer.Close() + c := testRESTClient(t, testServer) + + // Test backoff recovery and increase. This correlates to the constants + // which are used in the server implementation returning StatusOK above. + seconds := []int{0, 1, 2, 4, 8, 0, 1, 2, 4, 0} + request := c.Verb("POST").Prefix("backofftest").Suffix("abc") + request.backoffMgr = &URLBackoff{ + Backoff: util.NewBackOff( + time.Duration(1)*time.Second, + time.Duration(200)*time.Second)} + for _, sec := range seconds { + start := time.Now() + request.DoRaw() + finish := time.Since(start) + t.Logf("%v finished in %v", sec, finish) + if finish < time.Duration(sec)*time.Second || finish >= time.Duration(sec+5)*time.Second { + t.Fatalf("%v not in range %v", finish, sec) + } + } +} + func TestCheckRetryClosesBody(t *testing.T) { count := 0 ch := make(chan struct{}) @@ -1030,7 +1070,7 @@ func TestUintParam(t *testing.T) { for _, item := range table { u, _ := url.Parse("http://localhost") - r := NewRequest(nil, "GET", u, unversioned.GroupVersion{Group: "test"}, nil).AbsPath("").UintParam(item.name, item.testVal) + r := NewRequest(nil, "GET", u, unversioned.GroupVersion{Group: "test"}, nil, nil).AbsPath("").UintParam(item.name, item.testVal) if e, a := item.expectStr, r.URL().String(); e != a { t.Errorf("expected %v, got %v", e, a) } diff --git a/pkg/client/unversioned/restclient.go b/pkg/client/unversioned/restclient.go index 219e94cdec6..63cf01fa440 100644 --- a/pkg/client/unversioned/restclient.go +++ b/pkg/client/unversioned/restclient.go @@ -19,14 +19,25 @@ package unversioned import ( "net/http" "net/url" + "os" + "strconv" "strings" + "time" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" ) +const ( + // Environment variables: Note that the duration should be long enough that the backoff + // persists for some reasonable time (i.e. 120 seconds). The typical base might be "1". + envBackoffBase = "KUBE_CLIENT_BACKOFF_BASE" + envBackoffDuration = "KUBE_CLIENT_BACKOFF_DURATION" +) + // RESTClient imposes common Kubernetes API conventions on a set of resource paths. // The baseURL is expected to point to an HTTP or HTTPS path that is the parent // of one or more resources. The server should return a decodable API resource @@ -74,6 +85,28 @@ func NewRESTClient(baseURL *url.URL, groupVersion unversioned.GroupVersion, c ru } } +// readExpBackoffConfig handles the internal logic of determining what the +// backoff policy is. By default if no information is available, NoBackoff. +// TODO Generalize this see #17727 . +func readExpBackoffConfig() BackoffManager { + backoffBase := os.Getenv(envBackoffBase) + backoffDuration := os.Getenv(envBackoffDuration) + + backoffBaseInt, errBase := strconv.ParseInt(backoffBase, 10, 64) + backoffDurationInt, errDuration := strconv.ParseInt(backoffDuration, 10, 64) + + if errBase != nil || errDuration != nil { + glog.V(2).Infof("Configuring no exponential backoff.") + return &NoBackoff{} + } else { + glog.V(2).Infof("Configuring exponential backoff as %v, %v", backoffBaseInt, backoffDurationInt) + return &URLBackoff{ + Backoff: util.NewBackOff( + time.Duration(backoffBaseInt)*time.Second, + time.Duration(backoffDurationInt)*time.Second)} + } +} + // Verb begins a request with a verb (GET, POST, PUT, DELETE). // // Example usage of RESTClient's request building interface: @@ -90,10 +123,13 @@ func (c *RESTClient) Verb(verb string) *Request { if c.Throttle != nil { c.Throttle.Accept() } + + backoff := readExpBackoffConfig() + if c.Client == nil { - return NewRequest(nil, verb, c.baseURL, c.groupVersion, c.Codec) + return NewRequest(nil, verb, c.baseURL, c.groupVersion, c.Codec, backoff) } - return NewRequest(c.Client, verb, c.baseURL, c.groupVersion, c.Codec) + return NewRequest(c.Client, verb, c.baseURL, c.groupVersion, c.Codec, backoff) } // Post begins a POST request. Short for c.Verb("POST"). diff --git a/pkg/client/unversioned/restclient_test.go b/pkg/client/unversioned/restclient_test.go index b0f66458f5f..77c9711c8c7 100644 --- a/pkg/client/unversioned/restclient_test.go +++ b/pkg/client/unversioned/restclient_test.go @@ -19,8 +19,11 @@ package unversioned import ( "net/http" "net/http/httptest" + "net/url" + "os" "reflect" "testing" + "time" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" @@ -139,3 +142,39 @@ func TestDoRequestCreated(t *testing.T) { } fakeHandler.ValidateRequest(t, "/"+testapi.Default.Version()+"/test", "GET", nil) } + +func TestCreateBackoffManager(t *testing.T) { + + theUrl, _ := url.Parse("http://localhost") + + // 1 second base backoff + duration of 2 seconds -> exponential backoff for requests. + os.Setenv(envBackoffBase, "1") + os.Setenv(envBackoffDuration, "2") + backoff := readExpBackoffConfig() + backoff.UpdateBackoff(theUrl, nil, 500) + backoff.UpdateBackoff(theUrl, nil, 500) + if backoff.CalculateBackoff(theUrl)/time.Second != 2 { + t.Errorf("Backoff env not working.") + } + + // 0 duration -> no backoff. + os.Setenv(envBackoffBase, "1") + os.Setenv(envBackoffDuration, "0") + backoff.UpdateBackoff(theUrl, nil, 500) + backoff.UpdateBackoff(theUrl, nil, 500) + backoff = readExpBackoffConfig() + if backoff.CalculateBackoff(theUrl)/time.Second != 0 { + t.Errorf("Zero backoff duration, but backoff still occuring.") + } + + // No env -> No backoff. + os.Setenv(envBackoffBase, "") + os.Setenv(envBackoffDuration, "") + backoff = readExpBackoffConfig() + backoff.UpdateBackoff(theUrl, nil, 500) + backoff.UpdateBackoff(theUrl, nil, 500) + if backoff.CalculateBackoff(theUrl)/time.Second != 0 { + t.Errorf("Backoff should have been 0.") + } + +} diff --git a/pkg/client/unversioned/urlbackoff.go b/pkg/client/unversioned/urlbackoff.go new file mode 100644 index 00000000000..331079bd7e2 --- /dev/null +++ b/pkg/client/unversioned/urlbackoff.go @@ -0,0 +1,97 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package unversioned + +import ( + "net/url" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/sets" +) + +// Set of resp. Codes that we backoff for. +// In general these should be errors that indicate a server is overloaded. +// These shouldn't be configured by any user, we set them based on conventions +// described in +var serverIsOverloadedSet = sets.NewInt(429) +var maxResponseCode = 499 + +type BackoffManager interface { + UpdateBackoff(actualUrl *url.URL, err error, responseCode int) + CalculateBackoff(actualUrl *url.URL) time.Duration +} + +// URLBackoff struct implements the semantics on top of Backoff which +// we need for URL specific exponential backoff. +type URLBackoff struct { + // Uses backoff as underlying implementation. + Backoff *util.Backoff +} + +// NoBackoff is a stub implementation, can be used for mocking or else as a default. +type NoBackoff struct { +} + +func (n *NoBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) { + // do nothing. +} +func (n *NoBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration { + return 0 * time.Second +} + +// Disable makes the backoff trivial, i.e., sets it to zero. This might be used +// by tests which want to run 1000s of mock requests without slowing down. +func (b *URLBackoff) Disable() { + glog.V(4).Infof("Disabling backoff strategy") + b.Backoff = util.NewBackOff(0*time.Second, 0*time.Second) +} + +// baseUrlKey returns the key which urls will be mapped to. +// For example, 127.0.0.1:8080/api/v2/abcde -> 127.0.0.1:8080. +func (b *URLBackoff) baseUrlKey(rawurl *url.URL) string { + // Simple implementation for now, just the host. + // We may backoff specific paths (i.e. "pods") differentially + // in the future. + host, err := url.Parse(rawurl.String()) + if err != nil { + glog.V(4).Infof("Error extracting url: %v", rawurl) + panic("bad url!") + } + return host.Host +} + +// UpdateBackoff updates backoff metadata +func (b *URLBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) { + // range for retry counts that we store is [0,13] + if responseCode > maxResponseCode || serverIsOverloadedSet.Has(responseCode) { + b.Backoff.Next(b.baseUrlKey(actualUrl), time.Now()) + return + } else if responseCode >= 300 || err != nil { + glog.V(4).Infof("Client is returning errors: code %v, error %v", responseCode, err) + } + + //If we got this far, there is no backoff required for this URL anymore. + b.Backoff.Reset(b.baseUrlKey(actualUrl)) +} + +// CalculateBackoff takes a url and back's off exponentially, +// based on its knowledge of existing failures. +func (b *URLBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration { + return b.Backoff.Get(b.baseUrlKey(actualUrl)) +} diff --git a/pkg/client/unversioned/urlbackoff_test.go b/pkg/client/unversioned/urlbackoff_test.go new file mode 100644 index 00000000000..8457c977b56 --- /dev/null +++ b/pkg/client/unversioned/urlbackoff_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package unversioned + +import ( + "k8s.io/kubernetes/pkg/util" + "net/url" + "testing" + "time" +) + +func parse(raw string) *url.URL { + theUrl, _ := url.Parse(raw) + return theUrl +} + +func TestURLBackoffFunctionalityCollisions(t *testing.T) { + myBackoff := &URLBackoff{ + Backoff: util.NewBackOff(1*time.Second, 60*time.Second), + } + + // Add some noise and make sure backoff for a clean URL is zero. + myBackoff.UpdateBackoff(parse("http://100.200.300.400:8080"), nil, 500) + + myBackoff.UpdateBackoff(parse("http://1.2.3.4:8080"), nil, 500) + + if myBackoff.CalculateBackoff(parse("http://1.2.3.4:100")) > 0 { + t.Errorf("URLs are colliding in the backoff map!") + } +} + +// TestURLBackoffFunctionality generally tests the URLBackoff wrapper. We avoid duplicating tests from backoff and request. +func TestURLBackoffFunctionality(t *testing.T) { + myBackoff := &URLBackoff{ + Backoff: util.NewBackOff(1*time.Second, 60*time.Second), + } + + // Now test that backoff increases, then recovers. + // 200 and 300 should both result in clearing the backoff. + // all others like 429 should result in increased backoff. + seconds := []int{0, + 1, 2, 4, 8, 0, + 1, 2} + returnCodes := []int{ + 429, 500, 501, 502, 300, + 500, 501, 502, + } + + if len(seconds) != len(returnCodes) { + t.Fatalf("responseCode to backoff arrays should be the same length... sanity check failed.") + } + + for i, sec := range seconds { + backoffSec := myBackoff.CalculateBackoff(parse("http://1.2.3.4:100")) + if backoffSec < time.Duration(sec)*time.Second || backoffSec > time.Duration(sec+5)*time.Second { + t.Errorf("Backoff out of range %v: %v %v", i, sec, backoffSec) + } + myBackoff.UpdateBackoff(parse("http://1.2.3.4:100/responseCodeForFuncTest"), nil, returnCodes[i]) + } + + if myBackoff.CalculateBackoff(parse("http://1.2.3.4:100")) == 0 { + t.Errorf("The final return code %v should have resulted in a backoff ! ", returnCodes[7]) + } +}