Merge pull request #102107 from tkashem/client-go-retry

client-go: add retry logic for Watch and Stream

Kubernetes-commit: 9c2684150c4d4aed99c6f950f4bc4c0754720897
This commit is contained in:
Kubernetes Publisher 2021-06-01 05:50:26 -07:00
commit 56494c9682
7 changed files with 656 additions and 253 deletions

2
Godeps/Godeps.json generated
View File

@ -480,7 +480,7 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "b0d9a0e45e6f"
"Rev": "b8a1b1fa40e7"
},
{
"ImportPath": "k8s.io/apimachinery",

4
go.mod
View File

@ -28,7 +28,7 @@ require (
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/protobuf v1.25.0
k8s.io/api v0.0.0-20210526225344-b0d9a0e45e6f
k8s.io/api v0.0.0-20210527185339-b8a1b1fa40e7
k8s.io/apimachinery v0.0.0-20210526145310-44113beed5d3
k8s.io/klog/v2 v2.9.0
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
@ -37,6 +37,6 @@ require (
)
replace (
k8s.io/api => k8s.io/api v0.0.0-20210526225344-b0d9a0e45e6f
k8s.io/api => k8s.io/api v0.0.0-20210527185339-b8a1b1fa40e7
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210526145310-44113beed5d3
)

4
go.sum
View File

@ -437,8 +437,8 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210526225344-b0d9a0e45e6f h1:RltThoVZBYapPUF5y7XS/5Nyf+l05lf0zLUQEm6YJBw=
k8s.io/api v0.0.0-20210526225344-b0d9a0e45e6f/go.mod h1:eWIkn4RdojmLDA8CIg8qxYIu2OMC0e1fQw5tIpdntMo=
k8s.io/api v0.0.0-20210527185339-b8a1b1fa40e7 h1:DDWVZEOA7PFlW0b5iT+HpD/2/hznlhj8eJ32qb5SUWY=
k8s.io/api v0.0.0-20210527185339-b8a1b1fa40e7/go.mod h1:eWIkn4RdojmLDA8CIg8qxYIu2OMC0e1fQw5tIpdntMo=
k8s.io/apimachinery v0.0.0-20210526145310-44113beed5d3 h1:EackAoZYmz9kh+g/lsA1j+zXT594K8sow0aZsaTGrpY=
k8s.io/apimachinery v0.0.0-20210526145310-44113beed5d3/go.mod h1:4Cv2ieSta05uBKxq3lhGnvQDmwXVxNYosMEyZpoDRoA=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=

View File

@ -675,18 +675,38 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
return nil, r.err
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header = r.headers
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
isErrRetryableFunc := func(request *http.Request, err error) bool {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) || net.IsTimeout(err) {
return true
}
return false
}
var retryAfter *RetryAfter
url := r.URL().String()
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return nil, err
}
retryAfter = nil
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {
@ -696,22 +716,47 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
if err != nil {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) || net.IsTimeout(err) {
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp)
}
done, transformErr := func() (bool, error) {
defer readAndCloseResponseBody(resp)
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc)
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
if err == nil {
return false, nil
}
klog.V(4).Infof("Could not retry request - %v", err)
}
if resp == nil {
// the server must have sent us an error in 'err'
return true, nil
}
if result := r.transformResponse(resp, req); result.err != nil {
return true, result.err
}
return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}()
if done {
if isErrRetryableFunc(req, err) {
return watch.NewEmptyWatch(), nil
}
if err == nil {
// if the server sent us an HTTP Response object,
// we need to return the error object from that.
err = transformErr
}
return nil, err
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
if result := r.transformResponse(resp, req); result.err != nil {
return nil, result.err
}
return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}
}
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
@ -766,21 +811,33 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
return nil, err
}
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
var retryAfter *RetryAfter
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, nil)
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}
if r.body != nil {
req.Body = ioutil.NopCloser(r.body)
}
req = req.WithContext(ctx)
req.Header = r.headers
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return nil, err
}
retryAfter = nil
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {
@ -791,6 +848,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
}
}
if err != nil {
// we only retry on an HTTP response with 'Retry-After' header
return nil, err
}
@ -800,15 +858,28 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
return resp.Body, nil
default:
// ensure we close the body before returning the error
done, transformErr := func() (bool, error) {
defer resp.Body.Close()
result := r.transformResponse(resp, req)
err := result.Error()
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, neverRetryError)
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
if err == nil {
err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
return false, nil
}
klog.V(4).Infof("Could not retry request - %v", err)
}
result := r.transformResponse(resp, req)
if err := result.Error(); err != nil {
return true, err
}
return true, fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
}()
if done {
return nil, transformErr
}
}
return nil, err
}
}
@ -940,13 +1011,12 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
return false
})
if retry {
if err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body); err != nil {
klog.V(4).Infof("Could not retry request - %v", err)
f(req, resp)
return true
}
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body)
if err == nil {
return false
}
klog.V(4).Infof("Could not retry request - %v", err)
}
f(req, resp)
return true

View File

@ -924,53 +924,57 @@ func TestTransformUnstructuredError(t *testing.T) {
}
}
type errorReader struct {
err error
}
func (r errorReader) Read(data []byte) (int, error) { return 0, r.err }
func (r errorReader) Close() error { return nil }
func TestRequestWatch(t *testing.T) {
testCases := []struct {
name string
Request *Request
maxRetries int
serverReturns []responseErr
Expect []watch.Event
attemptsExpected int
Err bool
ErrFn func(error) bool
Empty bool
}{
{
name: "Request has error",
Request: &Request{err: errors.New("bail")},
attemptsExpected: 0,
Err: true,
},
{
name: "Client is nil, should use http.DefaultClient",
Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
Err: true,
},
{
name: "error is not retryable",
Request: &Request{
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("err")
}),
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: nil, err: errors.New("err")},
},
attemptsExpected: 1,
Err: true,
},
{
name: "server returns forbidden",
Request: &Request{
c: &RESTClient{
content: defaultContentConfig(),
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusForbidden,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, nil
}),
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: &http.Response{
StatusCode: http.StatusForbidden,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, err: nil},
},
attemptsExpected: 1,
Expect: []watch.Event{
{
Type: watch.Error,
@ -1000,101 +1004,205 @@ func TestRequestWatch(t *testing.T) {
},
},
{
name: "server returns forbidden",
Request: &Request{
c: &RESTClient{
content: defaultContentConfig(),
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusForbidden,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, nil
}),
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: &http.Response{
StatusCode: http.StatusForbidden,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, err: nil},
},
attemptsExpected: 1,
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsForbidden(err)
},
},
{
name: "server returns unauthorized",
Request: &Request{
c: &RESTClient{
content: defaultContentConfig(),
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnauthorized,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, nil
}),
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: &http.Response{
StatusCode: http.StatusUnauthorized,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, err: nil},
},
attemptsExpected: 1,
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsUnauthorized(err)
},
},
{
name: "server returns unauthorized",
Request: &Request{
c: &RESTClient{
content: defaultContentConfig(),
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: &http.Response{
StatusCode: http.StatusUnauthorized,
Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
Status: metav1.StatusFailure,
Reason: metav1.StatusReasonUnauthorized,
})))),
}, nil
}),
base: &url.URL{},
},
}, err: nil},
},
attemptsExpected: 1,
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsUnauthorized(err)
},
},
{
name: "server returns EOF error",
Request: &Request{
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, io.EOF
}),
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: nil, err: io.EOF},
},
attemptsExpected: 1,
Empty: true,
},
{
name: "server returns can't write HTTP request on broken connection error",
Request: &Request{
c: &RESTClient{
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: nil, err: errors.New("http: can't write HTTP request on broken connection")},
},
attemptsExpected: 1,
Empty: true,
},
{
name: "server returns connection reset by peer",
Request: &Request{
c: &RESTClient{
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: nil, err: errors.New("foo: connection reset by peer")},
},
attemptsExpected: 1,
Empty: true,
},
{
name: "max retries 2, server always returns EOF error",
Request: &Request{
c: &RESTClient{
base: &url.URL{},
},
},
maxRetries: 2,
attemptsExpected: 3,
serverReturns: []responseErr{
{response: nil, err: io.EOF},
{response: nil, err: io.EOF},
{response: nil, err: io.EOF},
},
Empty: true,
},
{
name: "max retries 1, server returns a retry-after response, request body seek error",
Request: &Request{
body: &readSeeker{err: io.EOF},
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("http: can't write HTTP request on broken connection")
}),
base: &url.URL{},
},
},
maxRetries: 1,
attemptsExpected: 1,
serverReturns: []responseErr{
{response: retryAfterResponse(), err: nil},
},
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsInternalError(err)
},
},
{
name: "max retries 1, server returns a retryable error, request body seek error",
Request: &Request{
body: &readSeeker{err: io.EOF},
c: &RESTClient{
base: &url.URL{},
},
},
maxRetries: 1,
attemptsExpected: 1,
serverReturns: []responseErr{
{response: nil, err: io.EOF},
},
Empty: true,
},
{
name: "max retries 2, server always returns a response with Retry-After header",
Request: &Request{
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("foo: connection reset by peer")
}),
base: &url.URL{},
},
},
Empty: true,
maxRetries: 2,
attemptsExpected: 3,
serverReturns: []responseErr{
{response: retryAfterResponse(), err: nil},
{response: retryAfterResponse(), err: nil},
{response: retryAfterResponse(), err: nil},
},
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsInternalError(err)
},
},
}
for _, testCase := range testCases {
t.Run("", func(t *testing.T) {
testCase.Request.backoff = &NoBackoff{}
testCase.Request.retry = &withRetry{}
t.Run(testCase.name, func(t *testing.T) {
var attemptsGot int
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
defer func() {
attemptsGot++
}()
if attemptsGot >= len(testCase.serverReturns) {
t.Fatalf("Wrong test setup, the server does not know what to return")
}
re := testCase.serverReturns[attemptsGot]
return re.response, re.err
})
if c := testCase.Request.c; c != nil && len(testCase.serverReturns) > 0 {
c.Client = client
}
testCase.Request.backoff = &noSleepBackOff{}
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
watch, err := testCase.Request.Watch(context.Background())
if watch == nil && err == nil {
t.Fatal("Both watch.Interface and err returned by Watch are nil")
}
if testCase.attemptsExpected != attemptsGot {
t.Errorf("Expected RoundTrip to be invoked %d times, but got: %d", testCase.attemptsExpected, attemptsGot)
}
hasErr := err != nil
if hasErr != testCase.Err {
t.Fatalf("expected %t, got %t: %v", testCase.Err, hasErr, err)
@ -1132,60 +1240,71 @@ func TestRequestWatch(t *testing.T) {
func TestRequestStream(t *testing.T) {
testCases := []struct {
name string
Request *Request
maxRetries int
serverReturns []responseErr
attemptsExpected int
Err bool
ErrFn func(error) bool
}{
{
name: "request has error",
Request: &Request{err: errors.New("bail")},
attemptsExpected: 0,
Err: true,
},
{
name: "Client is nil, should use http.DefaultClient",
Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
Err: true,
},
{
name: "server returns an error",
Request: &Request{
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("err")
}),
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: nil, err: errors.New("err")},
},
attemptsExpected: 1,
Err: true,
},
{
Request: &Request{
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
content: defaultContentConfig(),
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: &http.Response{
StatusCode: http.StatusUnauthorized,
Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
Status: metav1.StatusFailure,
Reason: metav1.StatusReasonUnauthorized,
})))),
}, nil
}),
content: defaultContentConfig(),
base: &url.URL{},
},
}, err: nil},
},
attemptsExpected: 1,
Err: true,
},
{
Request: &Request{
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusBadRequest,
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]","reason":"BadRequest","code":400}`))),
}, nil
}),
content: defaultContentConfig(),
base: &url.URL{},
},
},
serverReturns: []responseErr{
{response: &http.Response{
StatusCode: http.StatusBadRequest,
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]","reason":"BadRequest","code":400}`))),
}, err: nil},
},
attemptsExpected: 1,
Err: true,
ErrFn: func(err error) bool {
if err.Error() == "a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]" {
@ -1194,25 +1313,124 @@ func TestRequestStream(t *testing.T) {
return false
},
},
{
name: "max retries 1, server returns a retry-after response, request body seek error",
Request: &Request{
body: &readSeeker{err: io.EOF},
c: &RESTClient{
base: &url.URL{},
},
},
maxRetries: 1,
attemptsExpected: 1,
serverReturns: []responseErr{
{response: retryAfterResponse(), err: nil},
},
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsInternalError(err)
},
},
{
name: "max retries 2, server always returns a response with Retry-After header",
Request: &Request{
c: &RESTClient{
base: &url.URL{},
},
},
maxRetries: 2,
attemptsExpected: 3,
serverReturns: []responseErr{
{response: retryAfterResponse(), err: nil},
{response: retryAfterResponse(), err: nil},
{response: retryAfterResponse(), err: nil},
},
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsInternalError(err)
},
},
{
name: "server returns EOF after attempt 1, retry aborted",
Request: &Request{
c: &RESTClient{
base: &url.URL{},
},
},
maxRetries: 2,
attemptsExpected: 2,
serverReturns: []responseErr{
{response: retryAfterResponse(), err: nil},
{response: nil, err: io.EOF},
},
Err: true,
ErrFn: func(err error) bool {
return unWrap(err) == io.EOF
},
},
{
name: "max retries 2, server returns success on the final attempt",
Request: &Request{
c: &RESTClient{
base: &url.URL{},
},
},
maxRetries: 2,
attemptsExpected: 3,
serverReturns: []responseErr{
{response: retryAfterResponse(), err: nil},
{response: retryAfterResponse(), err: nil},
{response: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, err: nil},
},
},
}
for i, testCase := range testCases {
testCase.Request.backoff = &NoBackoff{}
testCase.Request.retry = &withRetry{maxRetries: 0}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
var attemptsGot int
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
defer func() {
attemptsGot++
}()
if attemptsGot >= len(testCase.serverReturns) {
t.Fatalf("Wrong test setup, the server does not know what to return")
}
re := testCase.serverReturns[attemptsGot]
return re.response, re.err
})
if c := testCase.Request.c; c != nil && len(testCase.serverReturns) > 0 {
c.Client = client
}
testCase.Request.backoff = &noSleepBackOff{}
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
body, err := testCase.Request.Stream(context.Background())
if body == nil && err == nil {
t.Fatal("Both body and err returned by Stream are nil")
}
if testCase.attemptsExpected != attemptsGot {
t.Errorf("Expected RoundTrip to be invoked %d times, but got: %d", testCase.attemptsExpected, attemptsGot)
}
hasErr := err != nil
if hasErr != testCase.Err {
t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
t.Errorf("expected %t, got %t: %v", testCase.Err, hasErr, err)
}
if hasErr && body != nil {
t.Errorf("%d: body should be nil when error is returned", i)
t.Error("body should be nil when error is returned")
}
if hasErr {
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
t.Errorf("unexpected error: %v", err)
t.Errorf("unexpected error: %#v", err)
}
}
})
}
}
@ -1840,6 +2058,22 @@ func TestBody(t *testing.T) {
}
func TestWatch(t *testing.T) {
tests := []struct {
name string
maxRetries int
}{
{
name: "no retry",
maxRetries: 0,
},
{
name: "with retries",
maxRetries: 3,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var table = []struct {
t watch.EventType
obj runtime.Object
@ -1849,12 +2083,23 @@ func TestWatch(t *testing.T) {
{watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
}
var attempts int
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
attempts++
}()
flusher, ok := w.(http.Flusher)
if !ok {
panic("need flusher!")
}
if attempts < test.maxRetries {
w.Header().Set("Retry-After", "1")
w.WriteHeader(http.StatusTooManyRequests)
return
}
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
@ -1870,7 +2115,8 @@ func TestWatch(t *testing.T) {
defer testServer.Close()
s := testRESTClient(t, testServer)
watching, err := s.Get().Prefix("path/to/watch/thing").Watch(context.Background())
watching, err := s.Get().Prefix("path/to/watch/thing").
MaxRetries(test.maxRetries).Watch(context.Background())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -1892,6 +2138,8 @@ func TestWatch(t *testing.T) {
if ok {
t.Fatal("Unexpected non-close")
}
})
}
}
func TestWatchNonDefaultContentType(t *testing.T) {
@ -2333,14 +2581,27 @@ type seek struct {
type count struct {
// keeps track of the number of Seek(offset, whence) calls.
seeks []seek
// how many times {Request|Response}.Body.Close() has been invoked
lock sync.Mutex
closes int
}
func (c *count) close() {
c.lock.Lock()
defer c.lock.Unlock()
c.closes++
}
func (c *count) getCloseCount() int {
c.lock.Lock()
defer c.lock.Unlock()
return c.closes
}
// used to track {Request|Response}.Body
type readTracker struct {
count *count
delegated io.Reader
count *count
}
func (r *readTracker) Seek(offset int64, whence int) (int64, error) {
@ -2357,7 +2618,7 @@ func (r *readTracker) Read(p []byte) (n int, err error) {
func (r *readTracker) Close() error {
if closer, ok := r.delegated.(io.Closer); ok {
r.count.closes++
r.count.close()
return closer.Close()
}
return nil
@ -2492,26 +2753,46 @@ func TestRequestWithRetry(t *testing.T) {
}
func TestRequestDoWithRetry(t *testing.T) {
testRequestWithRetry(t, func(ctx context.Context, r *Request) {
testRequestWithRetry(t, "Do", func(ctx context.Context, r *Request) {
r.Do(ctx)
})
}
func TestRequestDORawWithRetry(t *testing.T) {
testRequestWithRetry(t, func(ctx context.Context, r *Request) {
func TestRequestDoRawWithRetry(t *testing.T) {
// both request.Do and request.DoRaw have the same behavior and expectations
testRequestWithRetry(t, "Do", func(ctx context.Context, r *Request) {
r.DoRaw(ctx)
})
}
func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Request)) {
func TestRequestStreamWithRetry(t *testing.T) {
testRequestWithRetry(t, "Stream", func(ctx context.Context, r *Request) {
r.Stream(ctx)
})
}
func TestRequestWatchWithRetry(t *testing.T) {
testRequestWithRetry(t, "Watch", func(ctx context.Context, r *Request) {
r.Watch(ctx)
})
}
func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
type expected struct {
attempts int
reqCount *count
respCount *count
}
tests := []struct {
name string
verb string
body func() io.Reader
maxRetries int
serverReturns []responseErr
reqCountExpected *count
respCountExpected *count
// expectations differ based on whether it is 'Watch', 'Stream' or 'Do'
expectations map[string]expected
}{
{
name: "server always returns retry-after response",
@ -2523,8 +2804,23 @@ func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Requ
{response: retryAfterResponse(), err: nil},
{response: retryAfterResponse(), err: nil},
},
reqCountExpected: &count{closes: 0, seeks: make([]seek, 2)},
respCountExpected: &count{closes: 3, seeks: []seek{}},
expectations: map[string]expected{
"Do": {
attempts: 3,
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
respCount: &count{closes: 3, seeks: []seek{}},
},
"Watch": {
attempts: 3,
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
respCount: &count{closes: 3, seeks: []seek{}},
},
"Stream": {
attempts: 3,
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
respCount: &count{closes: 3, seeks: []seek{}},
},
},
},
{
name: "server always returns retryable error",
@ -2536,8 +2832,24 @@ func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Requ
{response: nil, err: io.EOF},
{response: nil, err: io.EOF},
},
reqCountExpected: &count{closes: 0, seeks: make([]seek, 2)},
respCountExpected: &count{closes: 0, seeks: []seek{}},
expectations: map[string]expected{
"Do": {
attempts: 3,
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
respCount: &count{closes: 0, seeks: []seek{}},
},
"Watch": {
attempts: 3,
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
respCount: &count{closes: 0, seeks: []seek{}},
},
// for Stream, we never retry on any error
"Stream": {
attempts: 1, // only the first attempt is expected
reqCount: &count{closes: 0, seeks: []seek{}},
respCount: &count{closes: 0, seeks: []seek{}},
},
},
},
{
name: "server returns success on the final retry",
@ -2549,8 +2861,24 @@ func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Requ
{response: nil, err: io.EOF},
{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
},
reqCountExpected: &count{closes: 0, seeks: make([]seek, 2)},
respCountExpected: &count{closes: 2, seeks: []seek{}},
expectations: map[string]expected{
"Do": {
attempts: 3,
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
respCount: &count{closes: 2, seeks: []seek{}},
},
"Watch": {
attempts: 3,
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
// we don't close the the Body of the final successful response
respCount: &count{closes: 1, seeks: []seek{}},
},
"Stream": {
attempts: 2,
reqCount: &count{closes: 0, seeks: make([]seek, 1)},
respCount: &count{closes: 1, seeks: []seek{}},
},
},
},
}
@ -2580,6 +2908,7 @@ func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Requ
verb: test.verb,
body: reqRecorder,
c: &RESTClient{
content: defaultContentConfig(),
Client: client,
},
backoff: &noSleepBackOff{},
@ -2588,15 +2917,19 @@ func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Requ
doFunc(context.Background(), req)
attemptsExpected := test.maxRetries + 1
if attemptsExpected != attempts {
t.Errorf("Expected retries: %d, but got: %d", attemptsExpected, attempts)
expected, ok := test.expectations[key]
if !ok {
t.Fatalf("Wrong test setup - did not find expected for: %s", key)
}
if !reflect.DeepEqual(test.reqCountExpected.seeks, reqCountGot.seeks) {
t.Errorf("Expected request body to have seek invocation: %v, but got: %v", test.reqCountExpected.seeks, reqCountGot.seeks)
if expected.attempts != attempts {
t.Errorf("Expected retries: %d, but got: %d", expected.attempts, attempts)
}
if test.respCountExpected.closes != respCountGot.closes {
t.Errorf("Expected response body Close to be invoked %d times, but got: %d", test.respCountExpected.closes, respCountGot.closes)
if !reflect.DeepEqual(expected.reqCount.seeks, reqCountGot.seeks) {
t.Errorf("Expected request body to have seek invocation: %v, but got: %v", expected.reqCount.seeks, reqCountGot.seeks)
}
if expected.respCount.closes != respCountGot.getCloseCount() {
t.Errorf("Expected response body Close to be invoked %d times, but got: %d", expected.respCount.closes, respCountGot.getCloseCount())
}
})
}

View File

@ -43,6 +43,10 @@ func (r IsRetryableErrorFunc) IsErrorRetryable(request *http.Request, err error)
return r(request, err)
}
var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool {
return false
})
// WithRetry allows the client to retry a request up to a certain number of times
// Note that WithRetry is not safe for concurrent use by multiple
// goroutines without additional locking or coordination.

View File

@ -30,10 +30,6 @@ var alwaysRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool
return true
})
var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool {
return false
})
func TestNextRetry(t *testing.T) {
fakeError := errors.New("fake error")
tests := []struct {