Merge pull request #108262 from tkashem/retry-unit-test

client-go: add unit test to verify order of calls with retry

Kubernetes-commit: 08d32851fec349175f1e7bb84e5690cad39ec5e1
This commit is contained in:
Kubernetes Publisher 2022-02-23 11:46:08 -08:00
commit cc43a708a0

View File

@ -36,8 +36,7 @@ import (
"testing"
"time"
"k8s.io/klog/v2"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -52,8 +51,10 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
restclientwatch "k8s.io/client-go/rest/watch"
"k8s.io/client-go/tools/metrics"
"k8s.io/client-go/util/flowcontrol"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/klog/v2"
testingclock "k8s.io/utils/clock/testing"
)
@ -2555,6 +2556,34 @@ func TestRequestWatchWithRetry(t *testing.T) {
})
}
func TestRequestDoRetryWithRateLimiterBackoffAndMetrics(t *testing.T) {
// both request.Do and request.DoRaw have the same behavior and expectations
testRetryWithRateLimiterBackoffAndMetrics(t, "Do", func(ctx context.Context, r *Request) {
r.DoRaw(ctx)
})
}
func TestRequestStreamRetryWithRateLimiterBackoffAndMetrics(t *testing.T) {
testRetryWithRateLimiterBackoffAndMetrics(t, "Stream", func(ctx context.Context, r *Request) {
r.Stream(ctx)
})
}
func TestRequestWatchRetryWithRateLimiterBackoffAndMetrics(t *testing.T) {
testRetryWithRateLimiterBackoffAndMetrics(t, "Watch", func(ctx context.Context, r *Request) {
w, err := r.Watch(ctx)
if err == nil {
// in this test the the response body returned by the server is always empty,
// this will cause StreamWatcher.receive() to:
// - return an io.EOF to indicate that the watch closed normally and
// - then close the io.Reader
// since we assert on the number of times 'Close' has been called on the
// body of the response object, we need to wait here to avoid race condition.
<-w.ResultChan()
}
})
}
func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
type expected struct {
attempts int
@ -2714,6 +2743,231 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
}
}
type retryTestKeyType int
const retryTestKey retryTestKeyType = iota
// fake flowcontrol.RateLimiter so we can tap into the Wait method of the rate limiter.
// fake BackoffManager so we can tap into backoff calls
// fake metrics.ResultMetric to tap into the metric calls
// we use it to verify that RateLimiter, BackoffManager, and
// metric calls are invoked appropriately in right order.
type withRateLimiterBackoffManagerAndMetrics struct {
flowcontrol.RateLimiter
*NoBackoff
metrics.ResultMetric
backoffWaitSeconds int
invokeOrderGot []string
sleepsGot []string
statusCodesGot []string
}
func (lb *withRateLimiterBackoffManagerAndMetrics) Wait(ctx context.Context) error {
lb.invokeOrderGot = append(lb.invokeOrderGot, "RateLimiter.Wait")
return nil
}
func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *url.URL) time.Duration {
lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.CalculateBackoff")
// we simulate a sleep sequence of 0m, 2m, 4m, 6m, ...
waitFor := time.Duration(lb.backoffWaitSeconds) * time.Minute
lb.backoffWaitSeconds += 2
return waitFor
}
func (lb *withRateLimiterBackoffManagerAndMetrics) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.UpdateBackoff")
}
func (lb *withRateLimiterBackoffManagerAndMetrics) Sleep(d time.Duration) {
lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.Sleep")
lb.sleepsGot = append(lb.sleepsGot, d.String())
}
func (lb *withRateLimiterBackoffManagerAndMetrics) Increment(ctx context.Context, code, _, _ string) {
// we are interested in the request context that is marked by this test
if marked, ok := ctx.Value(retryTestKey).(bool); ok && marked {
lb.invokeOrderGot = append(lb.invokeOrderGot, "RequestResult.Increment")
lb.statusCodesGot = append(lb.statusCodesGot, code)
}
}
func (lb *withRateLimiterBackoffManagerAndMetrics) Do() {
lb.invokeOrderGot = append(lb.invokeOrderGot, "Client.Do")
}
func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
type expected struct {
attempts int
order []string
}
// we define the expected order of how the client invokes the
// rate limiter, backoff, and metrics methods.
// scenario:
// - A: original request fails with a retryable response: (500, 'Retry-After: 1')
// - B: retry 1: successful with a status code 200
// so we have a total of 2 attempts
invokeOrderWant := []string{
// before we send the request to the server:
// - we wait as dictated by the client rate lmiter
// - we wait, as dictated by the backoff manager
"RateLimiter.Wait",
"BackoffManager.CalculateBackoff",
"BackoffManager.Sleep",
// A: first attempt for which the server sends a retryable response
"Client.Do",
// we got a response object, status code: 500, Retry-Afer: 1
// - call metrics method with appropriate status code
// - update backoff parameters with the status code returned
// - sleep for N seconds from 'Retry-After: N' response header
"RequestResult.Increment",
"BackoffManager.UpdateBackoff",
"BackoffManager.Sleep",
// sleep for delay dictated by backoff parameters
"BackoffManager.CalculateBackoff",
"BackoffManager.Sleep",
// wait as dictated by the client rate lmiter
"RateLimiter.Wait",
// B: 2nd attempt: retry, and this should return a status code=200
"Client.Do",
// it's a success, so do the following:
// - call metrics and update backoff parameters
"RequestResult.Increment",
"BackoffManager.UpdateBackoff",
}
sleepWant := []string{
// initial backoff.Sleep before we send the request to the server for the first time
"0s",
// from 'Retry-After: 1' response header (A)
(1 * time.Second).String(),
// backoff.Sleep before retry 1 (B)
(2 * time.Minute).String(),
}
statusCodesWant := []string{
"500",
"200",
}
tests := []struct {
name string
maxRetries int
serverReturns []responseErr
// expectations differ based on whether it is 'Watch', 'Stream' or 'Do'
expectations map[string]expected
}{
{
name: "success after one retry",
maxRetries: 1,
serverReturns: []responseErr{
{response: retryAfterResponse(), err: nil},
{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
},
expectations: map[string]expected{
"Do": {
attempts: 2,
order: invokeOrderWant,
},
"Watch": {
attempts: 2,
// Watch does not do 'RateLimiter.Wait' before initially sending the request to the server
order: invokeOrderWant[1:],
},
"Stream": {
attempts: 2,
order: invokeOrderWant,
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
interceptor := &withRateLimiterBackoffManagerAndMetrics{
RateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
NoBackoff: &NoBackoff{},
}
// TODO: today this is the only site where a test overrides the
// default metric interfaces, in future if we other tests want
// to override as well, and we want tests to be able to run in
// parallel then we will need to provide a way for tests to
// register/deregister their own metric inerfaces.
old := metrics.RequestResult
metrics.RequestResult = interceptor
defer func() {
metrics.RequestResult = old
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// we are changing metrics.RequestResult (a global state) in
// this test, to avoid interference from other tests running in
// parallel we need to associate a key to the context so we
// can identify the metric calls associated with this test.
ctx = context.WithValue(ctx, retryTestKey, true)
var attempts int
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
defer func() {
attempts++
}()
interceptor.Do()
resp := test.serverReturns[attempts].response
if resp != nil {
resp.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
}
return resp, test.serverReturns[attempts].err
})
base, err := url.Parse("http://foo.bar")
if err != nil {
t.Fatalf("Wrong test setup - did not find expected for: %s", key)
}
req := &Request{
verb: "GET",
body: bytes.NewReader([]byte{}),
c: &RESTClient{
base: base,
content: defaultContentConfig(),
Client: client,
rateLimiter: interceptor,
},
pathPrefix: "/api/v1",
rateLimiter: interceptor,
backoff: interceptor,
retry: &withRetry{maxRetries: test.maxRetries},
}
doFunc(ctx, req)
want, ok := test.expectations[key]
if !ok {
t.Fatalf("Wrong test setup - did not find expected for: %s", key)
}
if want.attempts != attempts {
t.Errorf("%s: Expected retries: %d, but got: %d", key, want.attempts, attempts)
}
if !cmp.Equal(want.order, interceptor.invokeOrderGot) {
t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(want.order, interceptor.invokeOrderGot))
}
if !cmp.Equal(sleepWant, interceptor.sleepsGot) {
t.Errorf("%s: Expected sleep sequence to match, diff: %s", key, cmp.Diff(sleepWant, interceptor.sleepsGot))
}
if !cmp.Equal(statusCodesWant, interceptor.statusCodesGot) {
t.Errorf("%s: Expected status codes to match, diff: %s", key, cmp.Diff(statusCodesWant, interceptor.statusCodesGot))
}
})
}
}
func TestReuseRequest(t *testing.T) {
var tests = []struct {
name string