mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Merge pull request #102511 from tkashem/issue-102016
apf: fix data race in test
This commit is contained in:
commit
ff6f9e009a
@ -594,8 +594,10 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
var err error
|
// we send two requests synchronously, one at a time
|
||||||
_, err = requestGetter(firstRequestPathPanic)
|
// - first request is expected to panic as designed
|
||||||
|
// - second request is expected to success
|
||||||
|
_, err := requestGetter(firstRequestPathPanic)
|
||||||
if !executed {
|
if !executed {
|
||||||
t.Errorf("Expected inner handler to be executed for request: %q", firstRequestPathPanic)
|
t.Errorf("Expected inner handler to be executed for request: %q", firstRequestPathPanic)
|
||||||
}
|
}
|
||||||
@ -660,11 +662,8 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
go hardStop(t, stopCh, requestTimeout*3, func() {
|
// send a request synchronously with a client timeout of requestTimeout*2 seconds
|
||||||
// the client timed out unexpectedly, let's clean up and abort.
|
// this ensures the test does not block indefinitely if the server does not respond.
|
||||||
close(reqHandlerCompletedCh)
|
|
||||||
})
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
response *http.Response
|
response *http.Response
|
||||||
err error
|
err error
|
||||||
@ -715,6 +714,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
||||||
|
|
||||||
|
var innerHandlerWriteErr error
|
||||||
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
||||||
rquestTimesOutPath := "/request/time-out-as-designed"
|
rquestTimesOutPath := "/request/time-out-as-designed"
|
||||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -726,9 +726,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
// we expect the timeout handler to have timed out this request by now and any attempt
|
// we expect the timeout handler to have timed out this request by now and any attempt
|
||||||
// to write to the response should return a http.ErrHandlerTimeout error.
|
// to write to the response should return a http.ErrHandlerTimeout error.
|
||||||
if _, err := w.Write([]byte("foo")); err != http.ErrHandlerTimeout {
|
_, innerHandlerWriteErr = w.Write([]byte("foo"))
|
||||||
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
panic(http.ErrAbortHandler)
|
panic(http.ErrAbortHandler)
|
||||||
}
|
}
|
||||||
@ -738,11 +736,8 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
go hardStop(t, stopCh, requestTimeout*3, func() {
|
// send a request synchronously with a client timeout of requestTimeout*2 seconds
|
||||||
// the client timed out unexpectedly, let's clean up and abort.
|
// this ensures the test does not block indefinitely if the server does not respond.
|
||||||
close(reqHandlerCompletedCh)
|
|
||||||
})
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
response *http.Response
|
response *http.Response
|
||||||
err error
|
err error
|
||||||
@ -759,6 +754,9 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
||||||
<-reqHandlerCompletedCh
|
<-reqHandlerCompletedCh
|
||||||
|
|
||||||
|
if innerHandlerWriteErr != http.ErrHandlerTimeout {
|
||||||
|
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err)
|
t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err)
|
||||||
}
|
}
|
||||||
@ -789,6 +787,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
||||||
|
|
||||||
|
var innerHandlerWriteErr error
|
||||||
rquestTimesOutPath := "/request/time-out-as-designed"
|
rquestTimesOutPath := "/request/time-out-as-designed"
|
||||||
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
||||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -803,9 +802,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
// we expect the timeout handler to have timed out this request by now and any attempt
|
// we expect the timeout handler to have timed out this request by now and any attempt
|
||||||
// to write to the response should return a http.ErrHandlerTimeout error.
|
// to write to the response should return a http.ErrHandlerTimeout error.
|
||||||
if _, err := w.Write([]byte("foo")); err != http.ErrHandlerTimeout {
|
_, innerHandlerWriteErr = w.Write([]byte("foo"))
|
||||||
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
||||||
@ -813,11 +810,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
go hardStop(t, stopCh, requestTimeout*3, func() {
|
|
||||||
// the client timed out unexpectedly, let's clean up and abort.
|
|
||||||
close(reqHandlerCompletedCh)
|
|
||||||
})
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
func() {
|
func() {
|
||||||
defer close(callerRoundTripDoneCh)
|
defer close(callerRoundTripDoneCh)
|
||||||
@ -831,6 +823,9 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
||||||
<-reqHandlerCompletedCh
|
<-reqHandlerCompletedCh
|
||||||
|
|
||||||
|
if innerHandlerWriteErr != http.ErrHandlerTimeout {
|
||||||
|
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
||||||
|
}
|
||||||
expectResetStreamError(t, err)
|
expectResetStreamError(t, err)
|
||||||
|
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
@ -844,6 +839,12 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("priority level concurrency is set to 1, queue length is 1, first request should time out and second (enqueued) request should time out as well", func(t *testing.T) {
|
t.Run("priority level concurrency is set to 1, queue length is 1, first request should time out and second (enqueued) request should time out as well", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
type result struct {
|
||||||
|
err error
|
||||||
|
response *http.Response
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
requestTimeout = 5 * time.Second
|
requestTimeout = 5 * time.Second
|
||||||
userName = "alice"
|
userName = "alice"
|
||||||
@ -856,6 +857,8 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
||||||
|
|
||||||
|
var firstRequestInnerHandlerWriteErr error
|
||||||
|
var secondRequestExecuted bool
|
||||||
firstRequestTimesOutPath := "/request/first/time-out-as-designed"
|
firstRequestTimesOutPath := "/request/first/time-out-as-designed"
|
||||||
secondRequestEnqueuedPath := "/request/second/enqueued-as-designed"
|
secondRequestEnqueuedPath := "/request/second/enqueued-as-designed"
|
||||||
firstReqHandlerCompletedCh, firstReqInProgressCh := make(chan struct{}), make(chan struct{})
|
firstReqHandlerCompletedCh, firstReqInProgressCh := make(chan struct{}), make(chan struct{})
|
||||||
@ -875,15 +878,13 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
// we expect the timeout handler to have timed out this request by now and any attempt
|
// we expect the timeout handler to have timed out this request by now and any attempt
|
||||||
// to write to the response should return a http.ErrHandlerTimeout error.
|
// to write to the response should return a http.ErrHandlerTimeout error.
|
||||||
if _, err := w.Write([]byte("foo")); err != http.ErrHandlerTimeout {
|
_, firstRequestInnerHandlerWriteErr = w.Write([]byte("foo"))
|
||||||
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.URL.Path == secondRequestEnqueuedPath {
|
if r.URL.Path == secondRequestEnqueuedPath {
|
||||||
// we expect the concurrency to be set to 1 and so this request should never be executed.
|
// we expect the concurrency to be set to 1 and so this request should never be executed.
|
||||||
t.Fatalf("Expected request to be enqueued: %q", secondRequestEnqueuedPath)
|
secondRequestExecuted = true
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
||||||
@ -891,52 +892,61 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
go hardStop(t, stopCh, requestTimeout*3, func() {
|
// we send two requests, each with a client timeout of requestTimeout*2 seconds
|
||||||
// the client timed out unexpectedly, let's clean up and abort.
|
// this ensures the test does not block indefinitely if the server does not respond.
|
||||||
close(firstReqInProgressCh)
|
// - first request (expected to timeout as designed) sent from a new goroutine
|
||||||
close(firstReqHandlerCompletedCh)
|
// - second request (expected to be enqueued) is sent from a new goroutine
|
||||||
})
|
firstReqResultCh, secondReqResultCh := make(chan result, 1), make(chan result, 1)
|
||||||
|
|
||||||
var firstReqErr, secondReqErr error
|
|
||||||
var resp1, resp2 *http.Response
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(firstReqRoundTripDoneCh)
|
defer close(firstReqRoundTripDoneCh)
|
||||||
t.Logf("Waiting for the request: %q to time out", firstRequestTimesOutPath)
|
t.Logf("Sending request: %q", firstRequestTimesOutPath)
|
||||||
resp1, firstReqErr = requestGetter(firstRequestTimesOutPath)
|
resp, err := requestGetter(firstRequestTimesOutPath)
|
||||||
if isClientTimeout(firstReqErr) {
|
t.Logf("RoundTrip of request: %q has completed", firstRequestTimesOutPath)
|
||||||
t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", firstRequestTimesOutPath, firstReqErr.Error())
|
firstReqResultCh <- result{err: err, response: resp}
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
func() {
|
go func() {
|
||||||
|
// we must wait for the "first" request to start executing before
|
||||||
|
// we can initiate the "second".
|
||||||
defer close(secondReqRoundTripDoneCh)
|
defer close(secondReqRoundTripDoneCh)
|
||||||
|
|
||||||
// we must wait for the "first" request to start executing first
|
|
||||||
<-firstReqInProgressCh
|
<-firstReqInProgressCh
|
||||||
resp2, secondReqErr = requestGetter(secondRequestEnqueuedPath)
|
t.Logf("Sending request: %q", secondRequestEnqueuedPath)
|
||||||
if isClientTimeout(secondReqErr) {
|
resp, err := requestGetter(secondRequestEnqueuedPath)
|
||||||
t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", secondRequestEnqueuedPath, secondReqErr.Error())
|
t.Logf("RoundTrip of request: %q has completed", secondRequestEnqueuedPath)
|
||||||
}
|
secondReqResultCh <- result{err: err, response: resp}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
<-firstReqRoundTripDoneCh
|
firstReqResult := <-firstReqResultCh
|
||||||
|
if isClientTimeout(firstReqResult.err) {
|
||||||
|
t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", firstRequestTimesOutPath, firstReqResult.err.Error())
|
||||||
|
}
|
||||||
t.Logf("Waiting for the inner handler of the request: %q to complete", firstRequestTimesOutPath)
|
t.Logf("Waiting for the inner handler of the request: %q to complete", firstRequestTimesOutPath)
|
||||||
<-firstReqHandlerCompletedCh
|
<-firstReqHandlerCompletedCh
|
||||||
|
|
||||||
// first request is expected to time out.
|
// first request is expected to time out.
|
||||||
if firstReqErr != nil {
|
if firstRequestInnerHandlerWriteErr != http.ErrHandlerTimeout {
|
||||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", firstRequestTimesOutPath, firstReqErr)
|
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, firstRequestInnerHandlerWriteErr)
|
||||||
}
|
}
|
||||||
if resp1.StatusCode != http.StatusGatewayTimeout {
|
if firstReqResult.err != nil {
|
||||||
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, firstRequestTimesOutPath, resp1)
|
t.Fatalf("Expected request: %q to get a response, but got error: %#v", firstRequestTimesOutPath, firstReqResult.err)
|
||||||
|
}
|
||||||
|
if firstReqResult.response.StatusCode != http.StatusGatewayTimeout {
|
||||||
|
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, firstRequestTimesOutPath, firstReqResult.response)
|
||||||
}
|
}
|
||||||
|
|
||||||
// second request is expected to either be rejected (ideal behavior) or time out (current approximation of the ideal behavior)
|
// second request is expected to either be rejected (ideal behavior) or time out (current approximation of the ideal behavior)
|
||||||
if secondReqErr != nil {
|
secondReqResult := <-secondReqResultCh
|
||||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", secondRequestEnqueuedPath, secondReqErr)
|
if isClientTimeout(secondReqResult.err) {
|
||||||
|
t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", secondRequestEnqueuedPath, secondReqResult.err.Error())
|
||||||
}
|
}
|
||||||
if !(resp2.StatusCode == http.StatusTooManyRequests || resp2.StatusCode == http.StatusGatewayTimeout) {
|
if secondRequestExecuted {
|
||||||
t.Errorf("Expected HTTP status code: %d or %d for request: %q, but got: %#v", http.StatusTooManyRequests, http.StatusGatewayTimeout, secondRequestEnqueuedPath, resp2)
|
t.Errorf("Expected second request to be enqueued: %q", secondRequestEnqueuedPath)
|
||||||
|
}
|
||||||
|
if secondReqResult.err != nil {
|
||||||
|
t.Fatalf("Expected request: %q to get a response, but got error: %#v", secondRequestEnqueuedPath, secondReqResult.err)
|
||||||
|
}
|
||||||
|
if !(secondReqResult.response.StatusCode == http.StatusTooManyRequests || secondReqResult.response.StatusCode == http.StatusGatewayTimeout) {
|
||||||
|
t.Errorf("Expected HTTP status code: %d or %d for request: %q, but got: %#v", http.StatusTooManyRequests, http.StatusGatewayTimeout, secondRequestEnqueuedPath, secondReqResult.response)
|
||||||
}
|
}
|
||||||
|
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
@ -1212,13 +1222,3 @@ func isClientTimeout(err error) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func hardStop(t *testing.T, stopCh <-chan struct{}, testTimeout time.Duration, cleanup func()) {
|
|
||||||
select {
|
|
||||||
case <-stopCh:
|
|
||||||
// The test has completed normally.
|
|
||||||
case <-time.After(testTimeout):
|
|
||||||
cleanup()
|
|
||||||
t.Fatalf("the test did not finish within %s", testTimeout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user