From 52c58d970e54bf10b78512c68602f70b0a970f31 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Fri, 22 Sep 2023 14:42:43 -0400 Subject: [PATCH] fix data race in apf unit test --- .../filters/priority-and-fairness_test.go | 218 ++++++++---------- 1 file changed, 102 insertions(+), 116 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 04772a7de1b..31ca992f595 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -678,7 +678,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { t.Run("priority level concurrency is set to 1, request handler panics, next request should not be rejected", func(t *testing.T) { const ( - requestTimeout = 1 * time.Minute userName = "alice" fsName = "test-fs" plName = "test-pl" @@ -690,50 +689,55 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} - var executed bool // we will raise a panic for the first request. - firstRequestPathPanic := "/request/panic-as-designed" + firstRequestPathPanic, secondRequestPathShouldWork := "/request/panic-as-designed", "/request/should-succeed-as-expected" + firstHandlerDoneCh, secondHandlerDoneCh := make(chan struct{}), make(chan struct{}) requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - executed = true - headerMatcher.inspect(w, fsName, plName) - - if r.URL.Path == firstRequestPathPanic { + headerMatcher.inspect(t, w, fsName, plName) + switch { + case r.URL.Path == firstRequestPathPanic: + close(firstHandlerDoneCh) panic(fmt.Errorf("request handler panic'd as designed - %#v", r.RequestURI)) + case r.URL.Path == secondRequestPathShouldWork: + close(secondHandlerDoneCh) + } }) - handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout) - server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2) + // NOTE: the server will enforce a 1m timeout on every incoming + // request, and the client enforces a timeout of 2m. + handler := newHandlerChain(t, requestHandler, controller, userName, time.Minute) + server, requestGetter := newHTTP2ServerWithClient(handler, 2*time.Minute) defer server.Close() // we send two requests synchronously, one at a time // - first request is expected to panic as designed - // - second request is expected to success + // - second request is expected to succeed _, err := requestGetter(firstRequestPathPanic) - if !executed { - t.Errorf("Expected inner handler to be executed for request: %q", firstRequestPathPanic) + + // did the server handler panic, as expected? + select { + case <-firstHandlerDoneCh: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected the server handler to panic for request: %q", firstRequestPathPanic) } if isClientTimeout(err) { t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", firstRequestPathPanic, err.Error()) } expectResetStreamError(t, err) - executed = false // the second request should be served successfully. - secondRequestPathShouldWork := "/request/should-succeed-as-expected" response, err := requestGetter(secondRequestPathShouldWork) - if !executed { - t.Errorf("Expected inner handler to be executed for request: %s", secondRequestPathShouldWork) - } if err != nil { t.Fatalf("Expected request: %q to get a response, but got error: %#v", secondRequestPathShouldWork, err) } if response.StatusCode != http.StatusOK { t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusOK, secondRequestPathShouldWork, response) } - - for _, err := range headerMatcher.errors() { - t.Errorf("Expected APF headers to match, but got: %v", err) + select { + case <-secondHandlerDoneCh: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected the server handler to have completed: %q", secondRequestPathShouldWork) } close(stopCh) @@ -748,7 +752,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { t.Run("priority level concurrency is set to 1, request times out and inner handler hasn't written to the response yet", func(t *testing.T) { t.Parallel() const ( - requestTimeout = 5 * time.Second userName = "alice" fsName = "test-fs" plName = "test-pl" @@ -760,12 +763,10 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} - var executed bool rquestTimesOutPath := "/request/time-out-as-designed" reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{}) requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - executed = true - headerMatcher.inspect(w, fsName, plName) + headerMatcher.inspect(t, w, fsName, plName) if r.URL.Path == rquestTimesOutPath { defer close(reqHandlerCompletedCh) @@ -774,13 +775,16 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { <-callerRoundTripDoneCh } }) - handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout) - server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2) + // NOTE: the server will enforce a 5s timeout on every + // incoming request, and the client enforces a timeout of 1m. + handler := newHandlerChain(t, requestHandler, controller, userName, 5*time.Second) + server, requestGetter := newHTTP2ServerWithClient(handler, time.Minute) defer server.Close() - // send a request synchronously with a client timeout of requestTimeout*2 seconds - // this ensures the test does not block indefinitely if the server does not respond. + // send a request synchronously with a client timeout of 1m, this minimizes the + // chance of a flake in ci, the cient waits long enough for the server to send a + // timeout response to the client. var ( response *http.Response err error @@ -795,11 +799,12 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { } }() - if !executed { - t.Errorf("Expected inner handler to be executed for request: %q", rquestTimesOutPath) - } t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath) - <-reqHandlerCompletedCh + select { + case <-reqHandlerCompletedCh: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected the server handler to have completed: %q", rquestTimesOutPath) + } if err != nil { t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err) @@ -808,10 +813,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, rquestTimesOutPath, response) } - for _, err := range headerMatcher.errors() { - t.Errorf("Expected APF headers to match, but got: %v", err) - } - close(stopCh) t.Log("Waiting for the controller to shutdown") @@ -824,7 +825,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { t.Run("priority level concurrency is set to 1, inner handler panics after the request times out", func(t *testing.T) { t.Parallel() const ( - requestTimeout = 5 * time.Second userName = "alice" fsName = "test-fs" plName = "test-pl" @@ -836,30 +836,32 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} - var innerHandlerWriteErr error - reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{}) + reqHandlerErrCh, callerRoundTripDoneCh := make(chan error, 1), make(chan struct{}) rquestTimesOutPath := "/request/time-out-as-designed" requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - headerMatcher.inspect(w, fsName, plName) + headerMatcher.inspect(t, w, fsName, plName) if r.URL.Path == rquestTimesOutPath { - defer close(reqHandlerCompletedCh) <-callerRoundTripDoneCh // we expect the timeout handler to have timed out this request by now and any attempt // to write to the response should return a http.ErrHandlerTimeout error. - _, innerHandlerWriteErr = w.Write([]byte("foo")) + _, innerHandlerWriteErr := w.Write([]byte("foo")) + reqHandlerErrCh <- innerHandlerWriteErr panic(http.ErrAbortHandler) } }) - handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout) - server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2) + // NOTE: the server will enforce a 5s timeout on every + // incoming request, and the client enforces a timeout of 1m. + handler := newHandlerChain(t, requestHandler, controller, userName, 5*time.Second) + server, requestGetter := newHTTP2ServerWithClient(handler, time.Minute) defer server.Close() - // send a request synchronously with a client timeout of requestTimeout*2 seconds - // this ensures the test does not block indefinitely if the server does not respond. + // send a request synchronously with a client timeout of 1m, this minimizes the + // chance of a flake in ci, the cient waits long enough for the server to send a + // timeout response to the client. var ( response *http.Response err error @@ -874,11 +876,15 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { }() t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath) - <-reqHandlerCompletedCh - - if innerHandlerWriteErr != http.ErrHandlerTimeout { - t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err) + select { + case innerHandlerWriteErr := <-reqHandlerErrCh: + if innerHandlerWriteErr != http.ErrHandlerTimeout { + t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected the server handler to have completed: %q", rquestTimesOutPath) } + if err != nil { t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err) } @@ -886,10 +892,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, rquestTimesOutPath, response) } - for _, err := range headerMatcher.errors() { - t.Errorf("Expected APF headers to match, but got: %v", err) - } - close(stopCh) t.Log("Waiting for the controller to shutdown") @@ -902,7 +904,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { t.Run("priority level concurrency is set to 1, inner handler writes to the response before request times out", func(t *testing.T) { t.Parallel() const ( - requestTimeout = 5 * time.Second userName = "alice" fsName = "test-fs" plName = "test-pl" @@ -914,14 +915,12 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} - var innerHandlerWriteErr error rquestTimesOutPath := "/request/time-out-as-designed" - reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{}) + reqHandlerErrCh, callerRoundTripDoneCh := make(chan error, 1), make(chan struct{}) requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - headerMatcher.inspect(w, fsName, plName) + headerMatcher.inspect(t, w, fsName, plName) if r.URL.Path == rquestTimesOutPath { - defer close(reqHandlerCompletedCh) // inner handler writes header and then let the request time out. w.WriteHeader(http.StatusBadRequest) @@ -929,14 +928,20 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { // we expect the timeout handler to have timed out this request by now and any attempt // to write to the response should return a http.ErrHandlerTimeout error. - _, innerHandlerWriteErr = w.Write([]byte("foo")) + _, innerHandlerWriteErr := w.Write([]byte("foo")) + reqHandlerErrCh <- innerHandlerWriteErr } }) - handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout) - server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2) + // NOTE: the server will enforce a 5s timeout on every + // incoming request, and the client enforces a timeout of 1m. + handler := newHandlerChain(t, requestHandler, controller, userName, 5*time.Second) + server, requestGetter := newHTTP2ServerWithClient(handler, time.Minute) defer server.Close() + // send a request synchronously with a client timeout of 1m, this minimizes the + // chance of a flake in ci, the cient waits long enough for the server to send a + // timeout response to the client. var err error func() { defer close(callerRoundTripDoneCh) @@ -948,17 +953,17 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { }() t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath) - <-reqHandlerCompletedCh - - if innerHandlerWriteErr != http.ErrHandlerTimeout { - t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err) + select { + case innerHandlerWriteErr := <-reqHandlerErrCh: + if innerHandlerWriteErr != http.ErrHandlerTimeout { + t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected the server handler to have completed: %q", rquestTimesOutPath) } + expectResetStreamError(t, err) - for _, err := range headerMatcher.errors() { - t.Errorf("Expected APF headers to match, but got: %v", err) - } - close(stopCh) t.Log("Waiting for the controller to shutdown") @@ -977,7 +982,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { } const ( - requestTimeout = 5 * time.Second userName = "alice" fsName = "test-fs" plName = "test-pl" @@ -989,18 +993,13 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} - var firstRequestInnerHandlerWriteErr error - var secondRequestExecuted bool - firstRequestTimesOutPath := "/request/first/time-out-as-designed" - secondRequestEnqueuedPath := "/request/second/enqueued-as-designed" - firstReqHandlerCompletedCh, firstReqInProgressCh := make(chan struct{}), make(chan struct{}) + firstRequestTimesOutPath, secondRequestEnqueuedPath := "/request/first/time-out-as-designed", "/request/second/enqueued-as-designed" + firstReqHandlerErrCh, firstReqInProgressCh := make(chan error, 1), make(chan struct{}) firstReqRoundTripDoneCh, secondReqRoundTripDoneCh := make(chan struct{}), make(chan struct{}) requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - headerMatcher.inspect(w, fsName, plName) - - if r.URL.Path == firstRequestTimesOutPath { - defer close(firstReqHandlerCompletedCh) - + headerMatcher.inspect(t, w, fsName, plName) + switch { + case r.URL.Path == firstRequestTimesOutPath: close(firstReqInProgressCh) <-firstReqRoundTripDoneCh @@ -1010,24 +1009,25 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { // we expect the timeout handler to have timed out this request by now and any attempt // to write to the response should return a http.ErrHandlerTimeout error. - _, firstRequestInnerHandlerWriteErr = w.Write([]byte("foo")) - return - } + _, firstRequestInnerHandlerWriteErr := w.Write([]byte("foo")) + firstReqHandlerErrCh <- firstRequestInnerHandlerWriteErr - if r.URL.Path == secondRequestEnqueuedPath { + case r.URL.Path == secondRequestEnqueuedPath: // we expect the concurrency to be set to 1 and so this request should never be executed. - secondRequestExecuted = true + t.Errorf("Expected second request to be enqueued: %q", secondRequestEnqueuedPath) } }) - handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout) - server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2) + // NOTE: the server will enforce a 5s timeout on every + // incoming request, and the client enforces a timeout of 1m. + handler := newHandlerChain(t, requestHandler, controller, userName, 5*time.Second) + server, requestGetter := newHTTP2ServerWithClient(handler, time.Minute) defer server.Close() // This test involves two requests sent to the same priority level, which has 1 queue and // a concurrency limit of 1. The handler chain include the timeout filter. - // Each request is sent from a separate goroutine, with a client-side timeout that is - // double the timeout filter's limit. + // Each request is sent from a separate goroutine, with a client-side timeout of 1m, on + // the other hand, the server enforces a timeout of 5s (via the timeout filter). // The first request should get dispatched immediately; execution (a) starts with closing // the channel that triggers the second client goroutine to send its request and then (b) // waits for both client goroutines to have gotten a response (expected to be timeouts). @@ -1069,12 +1069,16 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", firstRequestTimesOutPath, fmtError(firstReqResult.err)) } t.Logf("Waiting for the inner handler of the request: %q to complete", firstRequestTimesOutPath) - <-firstReqHandlerCompletedCh + select { + case firstRequestInnerHandlerWriteErr := <-firstReqHandlerErrCh: + if firstRequestInnerHandlerWriteErr != http.ErrHandlerTimeout { + t.Fatalf("Expected error: %#v, but got: %s", http.ErrHandlerTimeout, fmtError(firstRequestInnerHandlerWriteErr)) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected the server handler to have completed: %q", firstRequestTimesOutPath) + } // first request is expected to time out. - if firstRequestInnerHandlerWriteErr != http.ErrHandlerTimeout { - t.Fatalf("Expected error: %#v, but got: %s", http.ErrHandlerTimeout, fmtError(firstRequestInnerHandlerWriteErr)) - } if isStreamReset(firstReqResult.err) || firstReqResult.response.StatusCode != http.StatusGatewayTimeout { // got what was expected } else if firstReqResult.err != nil { @@ -1088,9 +1092,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { if isClientTimeout(secondReqResult.err) { t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", secondRequestEnqueuedPath, fmtError(secondReqResult.err)) } - if secondRequestExecuted { - t.Errorf("Expected second request to be enqueued: %q", secondRequestEnqueuedPath) - } if isStreamReset(secondReqResult.err) || secondReqResult.response.StatusCode == http.StatusTooManyRequests || secondReqResult.response.StatusCode == http.StatusGatewayTimeout { // got what was expected } else if secondReqResult.err != nil { @@ -1099,10 +1100,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { t.Errorf("Expected HTTP status code: %d or %d for request: %q, but got: %#+v", http.StatusTooManyRequests, http.StatusGatewayTimeout, secondRequestEnqueuedPath, secondReqResult.response) } - for _, err := range headerMatcher.errors() { - t.Errorf("Expected APF headers to match, but got: %v", err) - } - close(stopCh) t.Log("Waiting for the controller to shutdown") @@ -1169,13 +1166,11 @@ func newHTTP2ServerWithClient(handler http.Handler, clientTimeout time.Duration) } } -type headerMatcher struct { - lock sync.Mutex - errsGot []error -} +type headerMatcher struct{} // verifies that the expected flow schema and priority level UIDs are attached to the header. -func (m *headerMatcher) inspect(w http.ResponseWriter, expectedFS, expectedPL string) { +func (m *headerMatcher) inspect(t *testing.T, w http.ResponseWriter, expectedFS, expectedPL string) { + t.Helper() err := func() error { if w == nil { return fmt.Errorf("expected a non nil HTTP response") @@ -1195,16 +1190,7 @@ func (m *headerMatcher) inspect(w http.ResponseWriter, expectedFS, expectedPL st if err == nil { return } - - m.lock.Lock() - defer m.lock.Unlock() - m.errsGot = append(m.errsGot, err) -} - -func (m *headerMatcher) errors() []error { - m.lock.Lock() - defer m.lock.Unlock() - return m.errsGot[:] + t.Errorf("Expected APF headers to match, but got: %v", err) } // when a request panics, http2 resets the stream with an INTERNAL_ERROR message