diff --git a/staging/src/k8s.io/client-go/rest/request_test.go b/staging/src/k8s.io/client-go/rest/request_test.go index 05c93349dac..884409ed9ef 100644 --- a/staging/src/k8s.io/client-go/rest/request_test.go +++ b/staging/src/k8s.io/client-go/rest/request_test.go @@ -31,6 +31,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "syscall" "testing" "time" @@ -3987,3 +3988,50 @@ func TestRetryableConditions(t *testing.T) { } } } + +func TestRequestConcurrencyWithRetry(t *testing.T) { + var attempts int32 + client := clientForFunc(func(req *http.Request) (*http.Response, error) { + defer func() { + atomic.AddInt32(&attempts, 1) + }() + + // always send a retry-after response + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Header: http.Header{"Retry-After": []string{"1"}}, + }, nil + }) + + req := &Request{ + verb: "POST", + c: &RESTClient{ + content: defaultContentConfig(), + Client: client, + }, + backoff: &noSleepBackOff{}, + maxRetries: 9, // 10 attempts in total, including the first + retryFn: defaultRequestRetryFn, + } + + concurrency := 20 + wg := sync.WaitGroup{} + wg.Add(concurrency) + startCh := make(chan struct{}) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + <-startCh + req.Do(context.Background()) + }() + } + + close(startCh) + wg.Wait() + + // we expect (concurrency*req.maxRetries+1) attempts to be recorded + expected := concurrency * (req.maxRetries + 1) + if atomic.LoadInt32(&attempts) != int32(expected) { + t.Errorf("Expected attempts: %d, but got: %d", expected, attempts) + } +}