diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/BUILD b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/BUILD index e3ecf81dfb1..e33400f9102 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/BUILD +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/BUILD @@ -2,18 +2,31 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["azure_error.go"], + srcs = [ + "azure_error.go", + "azure_retry.go", + "doc.go", + ], importmap = "k8s.io/kubernetes/vendor/k8s.io/legacy-cloud-providers/azure/retry", importpath = "k8s.io/legacy-cloud-providers/azure/retry", visibility = ["//visibility:public"], - deps = ["//vendor/k8s.io/klog:go_default_library"], + deps = [ + "//vendor/github.com/Azure/go-autorest/autorest:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], ) go_test( name = "go_default_test", - srcs = ["azure_error_test.go"], + srcs = [ + "azure_error_test.go", + "azure_retry_test.go", + ], embed = [":go_default_library"], - deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"], + deps = [ + "//vendor/github.com/Azure/go-autorest/autorest/mocks:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + ], ) filegroup( diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error.go b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error.go index 4bf262897dc..9e604e973de 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error.go @@ -19,7 +19,9 @@ limitations under the License. package retry import ( + "bytes" "fmt" + "io/ioutil" "net/http" "strconv" "strings" @@ -28,6 +30,11 @@ import ( "k8s.io/klog" ) +const ( + // RetryAfterHeaderKey is the retry-after header key in ARM responses. + RetryAfterHeaderKey = "Retry-After" +) + var ( // The function to get current time. now = time.Now @@ -57,6 +64,15 @@ func (err *Error) Error() error { err.Retriable, err.RetryAfter.String(), err.HTTPStatusCode, err.RawError) } +// IsThrottled returns true the if the request is being throttled. +func (err *Error) IsThrottled() bool { + if err == nil { + return false + } + + return err.HTTPStatusCode == http.StatusTooManyRequests || err.RetryAfter.After(now()) +} + // NewError creates a new Error. func NewError(retriable bool, err error) *Error { return &Error{ @@ -73,6 +89,20 @@ func GetRetriableError(err error) *Error { } } +// GetRateLimitError creates a new error for rate limiting. +func GetRateLimitError(isWrite bool, opName string) *Error { + opType := "read" + if isWrite { + opType = "write" + } + return GetRetriableError(fmt.Errorf("azure cloud provider rate limited(%s) for operation %q", opType, opName)) +} + +// GetThrottlingError creates a new error for throttling. +func GetThrottlingError(operation, reason string) *Error { + return GetRetriableError(fmt.Errorf("azure cloud provider throttled for operation %s with reason %q", operation, reason)) +} + // GetError gets a new Error based on resp and error. func GetError(resp *http.Response, err error) *Error { if err == nil && resp == nil { @@ -88,12 +118,8 @@ func GetError(resp *http.Response, err error) *Error { if retryAfterDuration := getRetryAfter(resp); retryAfterDuration != 0 { retryAfter = now().Add(retryAfterDuration) } - rawError := err - if err == nil && resp != nil { - rawError = fmt.Errorf("HTTP response: %v", resp.StatusCode) - } return &Error{ - RawError: rawError, + RawError: getRawError(resp, err), RetryAfter: retryAfter, Retriable: shouldRetryHTTPRequest(resp, err), HTTPStatusCode: getHTTPStatusCode(resp), @@ -114,6 +140,27 @@ func isSuccessHTTPResponse(resp *http.Response) bool { return false } +func getRawError(resp *http.Response, err error) error { + if err != nil { + return err + } + + if resp == nil || resp.Body == nil { + return fmt.Errorf("empty HTTP response") + } + + // return the http status if unabled to get response body. + defer resp.Body.Close() + respBody, _ := ioutil.ReadAll(resp.Body) + resp.Body = ioutil.NopCloser(bytes.NewReader(respBody)) + if len(respBody) == 0 { + return fmt.Errorf("HTTP status code (%d)", resp.StatusCode) + } + + // return the raw response body. + return fmt.Errorf("%s", string(respBody)) +} + func getHTTPStatusCode(resp *http.Response) int { if resp == nil { return -1 @@ -151,7 +198,7 @@ func getRetryAfter(resp *http.Response) time.Duration { return 0 } - ra := resp.Header.Get("Retry-After") + ra := resp.Header.Get(RetryAfterHeaderKey) if ra == "" { return 0 } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error_test.go index 52057ffc3b4..4eafe565b17 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error_test.go @@ -19,7 +19,9 @@ limitations under the License. package retry import ( + "bytes" "fmt" + "io/ioutil" "net/http" "testing" "time" @@ -44,11 +46,11 @@ func TestGetError(t *testing.T) { }, { code: http.StatusOK, - err: fmt.Errorf("some error"), + err: fmt.Errorf("unknown error"), expected: &Error{ Retriable: true, HTTPStatusCode: http.StatusOK, - RawError: fmt.Errorf("some error"), + RawError: fmt.Errorf("unknown error"), }, }, { @@ -56,7 +58,7 @@ func TestGetError(t *testing.T) { expected: &Error{ Retriable: false, HTTPStatusCode: http.StatusBadRequest, - RawError: fmt.Errorf("HTTP response: 400"), + RawError: fmt.Errorf("some error"), }, }, { @@ -64,7 +66,7 @@ func TestGetError(t *testing.T) { expected: &Error{ Retriable: true, HTTPStatusCode: http.StatusInternalServerError, - RawError: fmt.Errorf("HTTP response: 500"), + RawError: fmt.Errorf("some error"), }, }, { @@ -83,7 +85,7 @@ func TestGetError(t *testing.T) { Retriable: true, HTTPStatusCode: http.StatusTooManyRequests, RetryAfter: now().Add(100 * time.Second), - RawError: fmt.Errorf("HTTP response: 429"), + RawError: fmt.Errorf("some error"), }, }, } @@ -92,6 +94,7 @@ func TestGetError(t *testing.T) { resp := &http.Response{ StatusCode: test.code, Header: http.Header{}, + Body: ioutil.NopCloser(bytes.NewReader([]byte("some error"))), } if test.retryAfter != 0 { resp.Header.Add("Retry-After", fmt.Sprintf("%d", test.retryAfter)) @@ -138,7 +141,7 @@ func TestGetStatusNotFoundAndForbiddenIgnoredError(t *testing.T) { expected: &Error{ Retriable: false, HTTPStatusCode: http.StatusBadRequest, - RawError: fmt.Errorf("HTTP response: 400"), + RawError: fmt.Errorf("some error"), }, }, { @@ -146,7 +149,7 @@ func TestGetStatusNotFoundAndForbiddenIgnoredError(t *testing.T) { expected: &Error{ Retriable: true, HTTPStatusCode: http.StatusInternalServerError, - RawError: fmt.Errorf("HTTP response: 500"), + RawError: fmt.Errorf("some error"), }, }, { @@ -165,7 +168,7 @@ func TestGetStatusNotFoundAndForbiddenIgnoredError(t *testing.T) { Retriable: true, HTTPStatusCode: http.StatusTooManyRequests, RetryAfter: now().Add(100 * time.Second), - RawError: fmt.Errorf("HTTP response: 429"), + RawError: fmt.Errorf("some error"), }, }, } @@ -174,6 +177,7 @@ func TestGetStatusNotFoundAndForbiddenIgnoredError(t *testing.T) { resp := &http.Response{ StatusCode: test.code, Header: http.Header{}, + Body: ioutil.NopCloser(bytes.NewReader([]byte("some error"))), } if test.retryAfter != 0 { resp.Header.Add("Retry-After", fmt.Sprintf("%d", test.retryAfter)) @@ -251,3 +255,38 @@ func TestIsSuccessResponse(t *testing.T) { } } } + +func TestIsThrottled(t *testing.T) { + tests := []struct { + err *Error + expected bool + }{ + { + err: nil, + expected: false, + }, + { + err: &Error{ + HTTPStatusCode: http.StatusOK, + }, + expected: false, + }, + { + err: &Error{ + HTTPStatusCode: http.StatusTooManyRequests, + }, + expected: true, + }, + { + err: &Error{ + RetryAfter: time.Now().Add(time.Hour), + }, + expected: true, + }, + } + + for _, test := range tests { + real := test.err.IsThrottled() + assert.Equal(t, test.expected, real) + } +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_retry.go b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_retry.go new file mode 100644 index 00000000000..562b04418c5 --- /dev/null +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_retry.go @@ -0,0 +1,161 @@ +// +build !providerless + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package retry + +import ( + "math/rand" + "net/http" + "time" + + "github.com/Azure/go-autorest/autorest" + "k8s.io/klog" +) + +// Backoff holds parameters applied to a Backoff function. +type Backoff struct { + // The initial duration. + Duration time.Duration + // Duration is multiplied by factor each iteration, if factor is not zero + // and the limits imposed by Steps and Cap have not been reached. + // Should not be negative. + // The jitter does not contribute to the updates to the duration parameter. + Factor float64 + // The sleep at each iteration is the duration plus an additional + // amount chosen uniformly at random from the interval between + // zero and `jitter*duration`. + Jitter float64 + // The remaining number of iterations in which the duration + // parameter may change (but progress can be stopped earlier by + // hitting the cap). If not positive, the duration is not + // changed. Used for exponential backoff in combination with + // Factor and Cap. + Steps int + // A limit on revised values of the duration parameter. If a + // multiplication by the factor parameter would make the duration + // exceed the cap then the duration is set to the cap and the + // steps parameter is set to zero. + Cap time.Duration +} + +// NewBackoff creates a new Backoff. +func NewBackoff(duration time.Duration, factor float64, jitter float64, steps int, cap time.Duration) *Backoff { + return &Backoff{ + Duration: duration, + Factor: factor, + Jitter: jitter, + Steps: steps, + Cap: cap, + } +} + +// Step (1) returns an amount of time to sleep determined by the +// original Duration and Jitter and (2) mutates the provided Backoff +// to update its Steps and Duration. +func (b *Backoff) Step() time.Duration { + if b.Steps < 1 { + if b.Jitter > 0 { + return jitter(b.Duration, b.Jitter) + } + return b.Duration + } + b.Steps-- + + duration := b.Duration + + // calculate the next step + if b.Factor != 0 { + b.Duration = time.Duration(float64(b.Duration) * b.Factor) + if b.Cap > 0 && b.Duration > b.Cap { + b.Duration = b.Cap + b.Steps = 0 + } + } + + if b.Jitter > 0 { + duration = jitter(duration, b.Jitter) + } + return duration +} + +// Jitter returns a time.Duration between duration and duration + maxFactor * +// duration. +// +// This allows clients to avoid converging on periodic behavior. If maxFactor +// is 0.0, a suggested default value will be chosen. +func jitter(duration time.Duration, maxFactor float64) time.Duration { + if maxFactor <= 0.0 { + maxFactor = 1.0 + } + wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) + return wait +} + +// DoExponentialBackoffRetry reprents an autorest.SendDecorator with backoff retry. +func DoExponentialBackoffRetry(backoff *Backoff) autorest.SendDecorator { + return func(s autorest.Sender) autorest.Sender { + return autorest.SenderFunc(func(r *http.Request) (*http.Response, error) { + return doBackoffRetry(s, r, backoff) + }) + } +} + +// doBackoffRetry does the backoff retries for the request. +func doBackoffRetry(s autorest.Sender, r *http.Request, backoff *Backoff) (resp *http.Response, err error) { + rr := autorest.NewRetriableRequest(r) + // Increment to add the first call (attempts denotes number of retries) + for backoff.Steps > 0 { + err = rr.Prepare() + if err != nil { + return + } + resp, err = s.Do(rr.Request()) + rerr := GetError(resp, err) + // Abort retries in the following scenarios: + // 1) request succeed + // 2) request is not retriable + // 3) request has been throttled + // 4) request has completed all the retry steps + if rerr == nil || !rerr.Retriable || rerr.IsThrottled() || backoff.Steps == 1 { + return resp, rerr.Error() + } + + if !delayForBackOff(backoff, r.Context().Done()) { + if r.Context().Err() != nil { + return resp, r.Context().Err() + } + return resp, rerr.Error() + } + + klog.V(3).Infof("Backoff retrying %s %q with error %v", r.Method, r.URL.String(), rerr) + } + + return resp, err +} + +// delayForBackOff invokes time.After for the supplied backoff duration. +// The delay may be canceled by closing the passed channel. If terminated early, returns false. +func delayForBackOff(backoff *Backoff, cancel <-chan struct{}) bool { + d := backoff.Step() + select { + case <-time.After(d): + return true + case <-cancel: + return false + } +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_retry_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_retry_test.go new file mode 100644 index 00000000000..8799792a96f --- /dev/null +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_retry_test.go @@ -0,0 +1,125 @@ +// +build !providerless + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package retry + +import ( + "fmt" + "math/rand" + "net/http" + "net/url" + "testing" + "time" + + "github.com/Azure/go-autorest/autorest/mocks" + "github.com/stretchr/testify/assert" +) + +func TestStep(t *testing.T) { + tests := []struct { + initial *Backoff + want []time.Duration + }{ + {initial: &Backoff{Duration: time.Second, Steps: 0}, want: []time.Duration{time.Second, time.Second, time.Second}}, + {initial: &Backoff{Duration: time.Second, Steps: 1}, want: []time.Duration{time.Second, time.Second, time.Second}}, + {initial: &Backoff{Duration: time.Second, Factor: 1.0, Steps: 1}, want: []time.Duration{time.Second, time.Second, time.Second}}, + {initial: &Backoff{Duration: time.Second, Factor: 2, Steps: 3}, want: []time.Duration{1 * time.Second, 2 * time.Second, 4 * time.Second}}, + {initial: &Backoff{Duration: time.Second, Factor: 2, Steps: 3, Cap: 3 * time.Second}, want: []time.Duration{1 * time.Second, 2 * time.Second, 3 * time.Second}}, + {initial: &Backoff{Duration: time.Second, Factor: 2, Steps: 2, Cap: 3 * time.Second, Jitter: 0.5}, want: []time.Duration{2 * time.Second, 3 * time.Second, 3 * time.Second}}, + {initial: &Backoff{Duration: time.Second, Factor: 2, Steps: 6, Jitter: 4}, want: []time.Duration{1 * time.Second, 2 * time.Second, 4 * time.Second, 8 * time.Second, 16 * time.Second, 32 * time.Second}}, + } + for seed := int64(0); seed < 5; seed++ { + for _, tt := range tests { + initial := *tt.initial + t.Run(fmt.Sprintf("%#v seed=%d", initial, seed), func(t *testing.T) { + rand.Seed(seed) + for i := 0; i < len(tt.want); i++ { + got := initial.Step() + t.Logf("[%d]=%s", i, got) + if initial.Jitter > 0 { + if got == tt.want[i] { + // this is statistically unlikely to happen by chance + t.Errorf("Backoff.Step(%d) = %v, no jitter", i, got) + continue + } + diff := float64(tt.want[i]-got) / float64(tt.want[i]) + if diff > initial.Jitter { + t.Errorf("Backoff.Step(%d) = %v, want %v, outside range", i, got, tt.want) + continue + } + } else { + if got != tt.want[i] { + t.Errorf("Backoff.Step(%d) = %v, want %v", i, got, tt.want) + continue + } + } + } + }) + } + } +} + +func TestDoBackoffRetry(t *testing.T) { + backoff := &Backoff{Factor: 1.0, Steps: 3} + fakeRequest := &http.Request{ + URL: &url.URL{ + Host: "localhost", + Path: "/api", + }, + } + r := mocks.NewResponseWithStatus("500 InternelServerError", http.StatusInternalServerError) + client := mocks.NewSender() + client.AppendAndRepeatResponse(r, 3) + + // retries up to steps on errors + expectedErr := &Error{ + Retriable: true, + HTTPStatusCode: 500, + RawError: fmt.Errorf("HTTP status code (500)"), + } + resp, err := doBackoffRetry(client, fakeRequest, backoff) + assert.NotNil(t, resp) + assert.Equal(t, 500, resp.StatusCode) + assert.Equal(t, expectedErr.Error(), err) + assert.Equal(t, 3, client.Attempts()) + + // returns immediately on succeed + r = mocks.NewResponseWithStatus("200 OK", http.StatusOK) + client = mocks.NewSender() + client.AppendAndRepeatResponse(r, 1) + resp, err = doBackoffRetry(client, fakeRequest, backoff) + assert.Nil(t, err) + assert.Equal(t, 1, client.Attempts()) + assert.NotNil(t, resp) + assert.Equal(t, 200, resp.StatusCode) + + // returns immediately on throttling + r = mocks.NewResponseWithStatus("429 TooManyRequests", http.StatusTooManyRequests) + client = mocks.NewSender() + client.AppendAndRepeatResponse(r, 1) + expectedErr = &Error{ + Retriable: true, + HTTPStatusCode: 429, + RawError: fmt.Errorf("HTTP status code (429)"), + } + resp, err = doBackoffRetry(client, fakeRequest, backoff) + assert.Equal(t, expectedErr.Error(), err) + assert.Equal(t, 1, client.Attempts()) + assert.NotNil(t, resp) + assert.Equal(t, 429, resp.StatusCode) +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/doc.go b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/doc.go new file mode 100644 index 00000000000..a6c0fc930a6 --- /dev/null +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/doc.go @@ -0,0 +1,21 @@ +// +build !providerless + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package retry defines a general library to handle errors and retries for various +// Azure clients. +package retry // import "k8s.io/legacy-cloud-providers/azure/retry"