diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index ba5fc57288a..e02df2ab0f0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -673,6 +673,7 @@ func TestApfWithRequestDigest(t *testing.T) { func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { epmetrics.Register() fcmetrics.Register() + timeFmt := "15:04:05.999" t.Run("priority level concurrency is set to 1, request handler panics, next request should not be rejected", func(t *testing.T) { const ( @@ -1001,16 +1002,32 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2) defer server.Close() - // we send two requests, each with a client timeout of requestTimeout*2 seconds - // this ensures the test does not block indefinitely if the server does not respond. - // - first request (expected to timeout as designed) sent from a new goroutine - // - second request (expected to be enqueued) is sent from a new goroutine + // This test involves two requests sent to the same priority level, which has 1 queue and + // a concurrency limit of 1. The handler chain include the timeout filter. + // Each request is sent from a separate goroutine, with a client-side timeout that is + // double the timeout filter's limit. + // The first request should get dispatched immediately; execution (a) starts with closing + // the channel that triggers the second client goroutine to send its request and then (b) + // waits for both client goroutines to have gotten a response (expected to be timeouts). + // The second request sits in the queue until the timeout filter does its thing, which + // it does concurrently to both requests. For the first request this should make the client + // get a timeout response without directly affecting execution. For the second request, the + // fact that the timeout filter closes the request's Context.Done() causes the request to be + // promptly ejected from its queue. The goroutine doing the APF handling writes an HTTP + // response message with status 429. + // The timeout handler invokes its inner handler in one goroutine while reacting to the + // passage of time in its original goroutine. That reaction to a time out consists of either + // (a) writing an HTTP response message with status 504 to indicate the timeout or (b) doing an + // HTTP/2 stream close; the latter is done if either the connection has been "hijacked" or some + // other goroutine (e.g., the one running the inner handler) has started to write a response. + // In the scenario tested here, there is thus a race between two goroutines to respond to + // the second request and any of their responses is allowed by the test. firstReqResultCh, secondReqResultCh := make(chan result, 1), make(chan result, 1) go func() { defer close(firstReqRoundTripDoneCh) - t.Logf("Sending request: %q", firstRequestTimesOutPath) + t.Logf("At %s, Sending request: %q", time.Now().Format(timeFmt), firstRequestTimesOutPath) resp, err := requestGetter(firstRequestTimesOutPath) - t.Logf("RoundTrip of request: %q has completed", firstRequestTimesOutPath) + t.Logf("At %s, RoundTrip of request: %q has completed", time.Now().Format(timeFmt), firstRequestTimesOutPath) firstReqResultCh <- result{err: err, response: resp} }() go func() { @@ -1019,43 +1036,45 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { defer close(secondReqRoundTripDoneCh) <-firstReqInProgressCh - t.Logf("Sending request: %q", secondRequestEnqueuedPath) + t.Logf("At %s, Sending request: %q", time.Now().Format(timeFmt), secondRequestEnqueuedPath) resp, err := requestGetter(secondRequestEnqueuedPath) - t.Logf("RoundTrip of request: %q has completed", secondRequestEnqueuedPath) + t.Logf("At %s, RoundTrip of request: %q has completed", time.Now().Format(timeFmt), secondRequestEnqueuedPath) secondReqResultCh <- result{err: err, response: resp} }() firstReqResult := <-firstReqResultCh if isClientTimeout(firstReqResult.err) { - t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", firstRequestTimesOutPath, firstReqResult.err.Error()) + t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", firstRequestTimesOutPath, fmtError(firstReqResult.err)) } t.Logf("Waiting for the inner handler of the request: %q to complete", firstRequestTimesOutPath) <-firstReqHandlerCompletedCh // first request is expected to time out. if firstRequestInnerHandlerWriteErr != http.ErrHandlerTimeout { - t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, firstRequestInnerHandlerWriteErr) + t.Fatalf("Expected error: %#v, but got: %s", http.ErrHandlerTimeout, fmtError(firstRequestInnerHandlerWriteErr)) } - if firstReqResult.err != nil { - t.Fatalf("Expected request: %q to get a response, but got error: %#v", firstRequestTimesOutPath, firstReqResult.err) - } - if firstReqResult.response.StatusCode != http.StatusGatewayTimeout { - t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, firstRequestTimesOutPath, firstReqResult.response) + if isStreamReset(firstReqResult.err) || firstReqResult.response.StatusCode != http.StatusGatewayTimeout { + // got what was expected + } else if firstReqResult.err != nil { + t.Fatalf("Expected request: %q to get a response or stream reset, but got error: %s", firstRequestTimesOutPath, fmtError(firstReqResult.err)) + } else if firstReqResult.response.StatusCode != http.StatusGatewayTimeout { + t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#+v", http.StatusGatewayTimeout, firstRequestTimesOutPath, firstReqResult.response) } // second request is expected to either be rejected (ideal behavior) or time out (current approximation of the ideal behavior) secondReqResult := <-secondReqResultCh if isClientTimeout(secondReqResult.err) { - t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", secondRequestEnqueuedPath, secondReqResult.err.Error()) + t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", secondRequestEnqueuedPath, fmtError(secondReqResult.err)) } if secondRequestExecuted { t.Errorf("Expected second request to be enqueued: %q", secondRequestEnqueuedPath) } - if secondReqResult.err != nil { - t.Fatalf("Expected request: %q to get a response, but got error: %#v", secondRequestEnqueuedPath, secondReqResult.err) - } - if !(secondReqResult.response.StatusCode == http.StatusTooManyRequests || secondReqResult.response.StatusCode == http.StatusGatewayTimeout) { - t.Errorf("Expected HTTP status code: %d or %d for request: %q, but got: %#v", http.StatusTooManyRequests, http.StatusGatewayTimeout, secondRequestEnqueuedPath, secondReqResult.response) + if isStreamReset(secondReqResult.err) || secondReqResult.response.StatusCode == http.StatusTooManyRequests || secondReqResult.response.StatusCode == http.StatusGatewayTimeout { + // got what was expected + } else if secondReqResult.err != nil { + t.Fatalf("Expected request: %q to get a response or stream reset, but got error: %s", secondRequestEnqueuedPath, fmtError(secondReqResult.err)) + } else if !(secondReqResult.response.StatusCode == http.StatusTooManyRequests || secondReqResult.response.StatusCode == http.StatusGatewayTimeout) { + t.Errorf("Expected HTTP status code: %d or %d for request: %q, but got: %#+v", http.StatusTooManyRequests, http.StatusGatewayTimeout, secondRequestEnqueuedPath, secondReqResult.response) } close(stopCh) @@ -1063,11 +1082,15 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { controllerErr := <-controllerCompletedCh if controllerErr != nil { - t.Errorf("Expected no error from the controller, but got: %#v", controllerErr) + t.Errorf("Expected no error from the controller, but got: %s", fmtError(controllerErr)) } }) } +func fmtError(err error) string { + return fmt.Sprintf("%#+v=%q", err, err.Error()) +} + func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration []runtime.Object, serverConcurrency int, requestWaitLimit time.Duration, plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) { clientset := newClientset(t, apfConfiguration...) @@ -1142,14 +1165,10 @@ func expectResetStreamError(t *testing.T, err error) { if err == nil { t.Fatalf("expected the server to send an error, but got nil") } - - uerr, ok := err.(*url.Error) - if !ok { - t.Fatalf("expected the error to be of type *url.Error, but got: %T", err) - } - if !strings.Contains(uerr.Error(), "INTERNAL_ERROR") { - t.Fatalf("expected a stream reset error, but got: %s", uerr.Error()) + if isStreamReset(err) { + return } + t.Fatalf("expected a stream reset error, but got %#+v=%s", err, err.Error()) } func newClientset(t *testing.T, objects ...runtime.Object) clientset.Interface { @@ -1331,3 +1350,15 @@ func isClientTimeout(err error) bool { } return false } + +func isStreamReset(err error) bool { + if err == nil { + return false + } + if urlErr, ok := err.(*url.Error); ok { + // Sadly, the client does not receive a more specific indication + // of stream reset. + return strings.Contains(urlErr.Err.Error(), "INTERNAL_ERROR") + } + return false +}