diff --git a/rest/request_test.go b/rest/request_test.go index f2130cbf..c69b04dc 100644 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -36,8 +36,7 @@ import ( "testing" "time" - "k8s.io/klog/v2" - + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -52,8 +51,10 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/scheme" restclientwatch "k8s.io/client-go/rest/watch" + "k8s.io/client-go/tools/metrics" "k8s.io/client-go/util/flowcontrol" utiltesting "k8s.io/client-go/util/testing" + "k8s.io/klog/v2" testingclock "k8s.io/utils/clock/testing" ) @@ -2555,6 +2556,34 @@ func TestRequestWatchWithRetry(t *testing.T) { }) } +func TestRequestDoRetryWithRateLimiterBackoffAndMetrics(t *testing.T) { + // both request.Do and request.DoRaw have the same behavior and expectations + testRetryWithRateLimiterBackoffAndMetrics(t, "Do", func(ctx context.Context, r *Request) { + r.DoRaw(ctx) + }) +} + +func TestRequestStreamRetryWithRateLimiterBackoffAndMetrics(t *testing.T) { + testRetryWithRateLimiterBackoffAndMetrics(t, "Stream", func(ctx context.Context, r *Request) { + r.Stream(ctx) + }) +} + +func TestRequestWatchRetryWithRateLimiterBackoffAndMetrics(t *testing.T) { + testRetryWithRateLimiterBackoffAndMetrics(t, "Watch", func(ctx context.Context, r *Request) { + w, err := r.Watch(ctx) + if err == nil { + // in this test the the response body returned by the server is always empty, + // this will cause StreamWatcher.receive() to: + // - return an io.EOF to indicate that the watch closed normally and + // - then close the io.Reader + // since we assert on the number of times 'Close' has been called on the + // body of the response object, we need to wait here to avoid race condition. + <-w.ResultChan() + } + }) +} + func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) { type expected struct { attempts int @@ -2714,6 +2743,231 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont } } +type retryTestKeyType int + +const retryTestKey retryTestKeyType = iota + +// fake flowcontrol.RateLimiter so we can tap into the Wait method of the rate limiter. +// fake BackoffManager so we can tap into backoff calls +// fake metrics.ResultMetric to tap into the metric calls +// we use it to verify that RateLimiter, BackoffManager, and +// metric calls are invoked appropriately in right order. +type withRateLimiterBackoffManagerAndMetrics struct { + flowcontrol.RateLimiter + *NoBackoff + metrics.ResultMetric + backoffWaitSeconds int + + invokeOrderGot []string + sleepsGot []string + statusCodesGot []string +} + +func (lb *withRateLimiterBackoffManagerAndMetrics) Wait(ctx context.Context) error { + lb.invokeOrderGot = append(lb.invokeOrderGot, "RateLimiter.Wait") + return nil +} + +func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *url.URL) time.Duration { + lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.CalculateBackoff") + + // we simulate a sleep sequence of 0m, 2m, 4m, 6m, ... + waitFor := time.Duration(lb.backoffWaitSeconds) * time.Minute + lb.backoffWaitSeconds += 2 + return waitFor +} + +func (lb *withRateLimiterBackoffManagerAndMetrics) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) { + lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.UpdateBackoff") +} + +func (lb *withRateLimiterBackoffManagerAndMetrics) Sleep(d time.Duration) { + lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.Sleep") + lb.sleepsGot = append(lb.sleepsGot, d.String()) +} + +func (lb *withRateLimiterBackoffManagerAndMetrics) Increment(ctx context.Context, code, _, _ string) { + // we are interested in the request context that is marked by this test + if marked, ok := ctx.Value(retryTestKey).(bool); ok && marked { + lb.invokeOrderGot = append(lb.invokeOrderGot, "RequestResult.Increment") + lb.statusCodesGot = append(lb.statusCodesGot, code) + } +} + +func (lb *withRateLimiterBackoffManagerAndMetrics) Do() { + lb.invokeOrderGot = append(lb.invokeOrderGot, "Client.Do") +} + +func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) { + type expected struct { + attempts int + order []string + } + + // we define the expected order of how the client invokes the + // rate limiter, backoff, and metrics methods. + // scenario: + // - A: original request fails with a retryable response: (500, 'Retry-After: 1') + // - B: retry 1: successful with a status code 200 + // so we have a total of 2 attempts + invokeOrderWant := []string{ + // before we send the request to the server: + // - we wait as dictated by the client rate lmiter + // - we wait, as dictated by the backoff manager + "RateLimiter.Wait", + "BackoffManager.CalculateBackoff", + "BackoffManager.Sleep", + + // A: first attempt for which the server sends a retryable response + "Client.Do", + + // we got a response object, status code: 500, Retry-Afer: 1 + // - call metrics method with appropriate status code + // - update backoff parameters with the status code returned + // - sleep for N seconds from 'Retry-After: N' response header + "RequestResult.Increment", + "BackoffManager.UpdateBackoff", + "BackoffManager.Sleep", + // sleep for delay dictated by backoff parameters + "BackoffManager.CalculateBackoff", + "BackoffManager.Sleep", + // wait as dictated by the client rate lmiter + "RateLimiter.Wait", + + // B: 2nd attempt: retry, and this should return a status code=200 + "Client.Do", + + // it's a success, so do the following: + // - call metrics and update backoff parameters + "RequestResult.Increment", + "BackoffManager.UpdateBackoff", + } + sleepWant := []string{ + // initial backoff.Sleep before we send the request to the server for the first time + "0s", + // from 'Retry-After: 1' response header (A) + (1 * time.Second).String(), + // backoff.Sleep before retry 1 (B) + (2 * time.Minute).String(), + } + statusCodesWant := []string{ + "500", + "200", + } + + tests := []struct { + name string + maxRetries int + serverReturns []responseErr + // expectations differ based on whether it is 'Watch', 'Stream' or 'Do' + expectations map[string]expected + }{ + { + name: "success after one retry", + maxRetries: 1, + serverReturns: []responseErr{ + {response: retryAfterResponse(), err: nil}, + {response: &http.Response{StatusCode: http.StatusOK}, err: nil}, + }, + expectations: map[string]expected{ + "Do": { + attempts: 2, + order: invokeOrderWant, + }, + "Watch": { + attempts: 2, + // Watch does not do 'RateLimiter.Wait' before initially sending the request to the server + order: invokeOrderWant[1:], + }, + "Stream": { + attempts: 2, + order: invokeOrderWant, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + interceptor := &withRateLimiterBackoffManagerAndMetrics{ + RateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(), + NoBackoff: &NoBackoff{}, + } + + // TODO: today this is the only site where a test overrides the + // default metric interfaces, in future if we other tests want + // to override as well, and we want tests to be able to run in + // parallel then we will need to provide a way for tests to + // register/deregister their own metric inerfaces. + old := metrics.RequestResult + metrics.RequestResult = interceptor + defer func() { + metrics.RequestResult = old + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // we are changing metrics.RequestResult (a global state) in + // this test, to avoid interference from other tests running in + // parallel we need to associate a key to the context so we + // can identify the metric calls associated with this test. + ctx = context.WithValue(ctx, retryTestKey, true) + + var attempts int + client := clientForFunc(func(req *http.Request) (*http.Response, error) { + defer func() { + attempts++ + }() + + interceptor.Do() + resp := test.serverReturns[attempts].response + if resp != nil { + resp.Body = ioutil.NopCloser(bytes.NewReader([]byte{})) + } + return resp, test.serverReturns[attempts].err + }) + + base, err := url.Parse("http://foo.bar") + if err != nil { + t.Fatalf("Wrong test setup - did not find expected for: %s", key) + } + req := &Request{ + verb: "GET", + body: bytes.NewReader([]byte{}), + c: &RESTClient{ + base: base, + content: defaultContentConfig(), + Client: client, + rateLimiter: interceptor, + }, + pathPrefix: "/api/v1", + rateLimiter: interceptor, + backoff: interceptor, + retry: &withRetry{maxRetries: test.maxRetries}, + } + + doFunc(ctx, req) + + want, ok := test.expectations[key] + if !ok { + t.Fatalf("Wrong test setup - did not find expected for: %s", key) + } + if want.attempts != attempts { + t.Errorf("%s: Expected retries: %d, but got: %d", key, want.attempts, attempts) + } + if !cmp.Equal(want.order, interceptor.invokeOrderGot) { + t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(want.order, interceptor.invokeOrderGot)) + } + if !cmp.Equal(sleepWant, interceptor.sleepsGot) { + t.Errorf("%s: Expected sleep sequence to match, diff: %s", key, cmp.Diff(sleepWant, interceptor.sleepsGot)) + } + if !cmp.Equal(statusCodesWant, interceptor.statusCodesGot) { + t.Errorf("%s: Expected status codes to match, diff: %s", key, cmp.Diff(statusCodesWant, interceptor.statusCodesGot)) + } + }) + } +} + func TestReuseRequest(t *testing.T) { var tests = []struct { name string