mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 20:42:26 +00:00
fix data race in apf unit test
This commit is contained in:
parent
6f5fa2eb2f
commit
52c58d970e
@ -678,7 +678,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("priority level concurrency is set to 1, request handler panics, next request should not be rejected", func(t *testing.T) {
|
t.Run("priority level concurrency is set to 1, request handler panics, next request should not be rejected", func(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
requestTimeout = 1 * time.Minute
|
|
||||||
userName = "alice"
|
userName = "alice"
|
||||||
fsName = "test-fs"
|
fsName = "test-fs"
|
||||||
plName = "test-pl"
|
plName = "test-pl"
|
||||||
@ -690,50 +689,55 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
||||||
|
|
||||||
headerMatcher := headerMatcher{}
|
headerMatcher := headerMatcher{}
|
||||||
var executed bool
|
|
||||||
// we will raise a panic for the first request.
|
// we will raise a panic for the first request.
|
||||||
firstRequestPathPanic := "/request/panic-as-designed"
|
firstRequestPathPanic, secondRequestPathShouldWork := "/request/panic-as-designed", "/request/should-succeed-as-expected"
|
||||||
|
firstHandlerDoneCh, secondHandlerDoneCh := make(chan struct{}), make(chan struct{})
|
||||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
executed = true
|
headerMatcher.inspect(t, w, fsName, plName)
|
||||||
headerMatcher.inspect(w, fsName, plName)
|
switch {
|
||||||
|
case r.URL.Path == firstRequestPathPanic:
|
||||||
if r.URL.Path == firstRequestPathPanic {
|
close(firstHandlerDoneCh)
|
||||||
panic(fmt.Errorf("request handler panic'd as designed - %#v", r.RequestURI))
|
panic(fmt.Errorf("request handler panic'd as designed - %#v", r.RequestURI))
|
||||||
|
case r.URL.Path == secondRequestPathShouldWork:
|
||||||
|
close(secondHandlerDoneCh)
|
||||||
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
|
||||||
|
|
||||||
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
// NOTE: the server will enforce a 1m timeout on every incoming
|
||||||
|
// request, and the client enforces a timeout of 2m.
|
||||||
|
handler := newHandlerChain(t, requestHandler, controller, userName, time.Minute)
|
||||||
|
server, requestGetter := newHTTP2ServerWithClient(handler, 2*time.Minute)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
// we send two requests synchronously, one at a time
|
// we send two requests synchronously, one at a time
|
||||||
// - first request is expected to panic as designed
|
// - first request is expected to panic as designed
|
||||||
// - second request is expected to success
|
// - second request is expected to succeed
|
||||||
_, err := requestGetter(firstRequestPathPanic)
|
_, err := requestGetter(firstRequestPathPanic)
|
||||||
if !executed {
|
|
||||||
t.Errorf("Expected inner handler to be executed for request: %q", firstRequestPathPanic)
|
// did the server handler panic, as expected?
|
||||||
|
select {
|
||||||
|
case <-firstHandlerDoneCh:
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("Expected the server handler to panic for request: %q", firstRequestPathPanic)
|
||||||
}
|
}
|
||||||
if isClientTimeout(err) {
|
if isClientTimeout(err) {
|
||||||
t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", firstRequestPathPanic, err.Error())
|
t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", firstRequestPathPanic, err.Error())
|
||||||
}
|
}
|
||||||
expectResetStreamError(t, err)
|
expectResetStreamError(t, err)
|
||||||
|
|
||||||
executed = false
|
|
||||||
// the second request should be served successfully.
|
// the second request should be served successfully.
|
||||||
secondRequestPathShouldWork := "/request/should-succeed-as-expected"
|
|
||||||
response, err := requestGetter(secondRequestPathShouldWork)
|
response, err := requestGetter(secondRequestPathShouldWork)
|
||||||
if !executed {
|
|
||||||
t.Errorf("Expected inner handler to be executed for request: %s", secondRequestPathShouldWork)
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", secondRequestPathShouldWork, err)
|
t.Fatalf("Expected request: %q to get a response, but got error: %#v", secondRequestPathShouldWork, err)
|
||||||
}
|
}
|
||||||
if response.StatusCode != http.StatusOK {
|
if response.StatusCode != http.StatusOK {
|
||||||
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusOK, secondRequestPathShouldWork, response)
|
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusOK, secondRequestPathShouldWork, response)
|
||||||
}
|
}
|
||||||
|
select {
|
||||||
for _, err := range headerMatcher.errors() {
|
case <-secondHandlerDoneCh:
|
||||||
t.Errorf("Expected APF headers to match, but got: %v", err)
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("Expected the server handler to have completed: %q", secondRequestPathShouldWork)
|
||||||
}
|
}
|
||||||
|
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
@ -748,7 +752,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
t.Run("priority level concurrency is set to 1, request times out and inner handler hasn't written to the response yet", func(t *testing.T) {
|
t.Run("priority level concurrency is set to 1, request times out and inner handler hasn't written to the response yet", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
const (
|
const (
|
||||||
requestTimeout = 5 * time.Second
|
|
||||||
userName = "alice"
|
userName = "alice"
|
||||||
fsName = "test-fs"
|
fsName = "test-fs"
|
||||||
plName = "test-pl"
|
plName = "test-pl"
|
||||||
@ -760,12 +763,10 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
||||||
|
|
||||||
headerMatcher := headerMatcher{}
|
headerMatcher := headerMatcher{}
|
||||||
var executed bool
|
|
||||||
rquestTimesOutPath := "/request/time-out-as-designed"
|
rquestTimesOutPath := "/request/time-out-as-designed"
|
||||||
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
||||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
executed = true
|
headerMatcher.inspect(t, w, fsName, plName)
|
||||||
headerMatcher.inspect(w, fsName, plName)
|
|
||||||
|
|
||||||
if r.URL.Path == rquestTimesOutPath {
|
if r.URL.Path == rquestTimesOutPath {
|
||||||
defer close(reqHandlerCompletedCh)
|
defer close(reqHandlerCompletedCh)
|
||||||
@ -774,13 +775,16 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
<-callerRoundTripDoneCh
|
<-callerRoundTripDoneCh
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
|
||||||
|
|
||||||
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
// NOTE: the server will enforce a 5s timeout on every
|
||||||
|
// incoming request, and the client enforces a timeout of 1m.
|
||||||
|
handler := newHandlerChain(t, requestHandler, controller, userName, 5*time.Second)
|
||||||
|
server, requestGetter := newHTTP2ServerWithClient(handler, time.Minute)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
// send a request synchronously with a client timeout of requestTimeout*2 seconds
|
// send a request synchronously with a client timeout of 1m, this minimizes the
|
||||||
// this ensures the test does not block indefinitely if the server does not respond.
|
// chance of a flake in ci, the cient waits long enough for the server to send a
|
||||||
|
// timeout response to the client.
|
||||||
var (
|
var (
|
||||||
response *http.Response
|
response *http.Response
|
||||||
err error
|
err error
|
||||||
@ -795,11 +799,12 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if !executed {
|
|
||||||
t.Errorf("Expected inner handler to be executed for request: %q", rquestTimesOutPath)
|
|
||||||
}
|
|
||||||
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
||||||
<-reqHandlerCompletedCh
|
select {
|
||||||
|
case <-reqHandlerCompletedCh:
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("Expected the server handler to have completed: %q", rquestTimesOutPath)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err)
|
t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err)
|
||||||
@ -808,10 +813,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, rquestTimesOutPath, response)
|
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, rquestTimesOutPath, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, err := range headerMatcher.errors() {
|
|
||||||
t.Errorf("Expected APF headers to match, but got: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
t.Log("Waiting for the controller to shutdown")
|
t.Log("Waiting for the controller to shutdown")
|
||||||
|
|
||||||
@ -824,7 +825,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
t.Run("priority level concurrency is set to 1, inner handler panics after the request times out", func(t *testing.T) {
|
t.Run("priority level concurrency is set to 1, inner handler panics after the request times out", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
const (
|
const (
|
||||||
requestTimeout = 5 * time.Second
|
|
||||||
userName = "alice"
|
userName = "alice"
|
||||||
fsName = "test-fs"
|
fsName = "test-fs"
|
||||||
plName = "test-pl"
|
plName = "test-pl"
|
||||||
@ -836,30 +836,32 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
||||||
|
|
||||||
headerMatcher := headerMatcher{}
|
headerMatcher := headerMatcher{}
|
||||||
var innerHandlerWriteErr error
|
reqHandlerErrCh, callerRoundTripDoneCh := make(chan error, 1), make(chan struct{})
|
||||||
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
|
||||||
rquestTimesOutPath := "/request/time-out-as-designed"
|
rquestTimesOutPath := "/request/time-out-as-designed"
|
||||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
headerMatcher.inspect(w, fsName, plName)
|
headerMatcher.inspect(t, w, fsName, plName)
|
||||||
|
|
||||||
if r.URL.Path == rquestTimesOutPath {
|
if r.URL.Path == rquestTimesOutPath {
|
||||||
defer close(reqHandlerCompletedCh)
|
|
||||||
<-callerRoundTripDoneCh
|
<-callerRoundTripDoneCh
|
||||||
|
|
||||||
// we expect the timeout handler to have timed out this request by now and any attempt
|
// we expect the timeout handler to have timed out this request by now and any attempt
|
||||||
// to write to the response should return a http.ErrHandlerTimeout error.
|
// to write to the response should return a http.ErrHandlerTimeout error.
|
||||||
_, innerHandlerWriteErr = w.Write([]byte("foo"))
|
_, innerHandlerWriteErr := w.Write([]byte("foo"))
|
||||||
|
reqHandlerErrCh <- innerHandlerWriteErr
|
||||||
|
|
||||||
panic(http.ErrAbortHandler)
|
panic(http.ErrAbortHandler)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
|
||||||
|
|
||||||
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
// NOTE: the server will enforce a 5s timeout on every
|
||||||
|
// incoming request, and the client enforces a timeout of 1m.
|
||||||
|
handler := newHandlerChain(t, requestHandler, controller, userName, 5*time.Second)
|
||||||
|
server, requestGetter := newHTTP2ServerWithClient(handler, time.Minute)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
// send a request synchronously with a client timeout of requestTimeout*2 seconds
|
// send a request synchronously with a client timeout of 1m, this minimizes the
|
||||||
// this ensures the test does not block indefinitely if the server does not respond.
|
// chance of a flake in ci, the cient waits long enough for the server to send a
|
||||||
|
// timeout response to the client.
|
||||||
var (
|
var (
|
||||||
response *http.Response
|
response *http.Response
|
||||||
err error
|
err error
|
||||||
@ -874,11 +876,15 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
||||||
<-reqHandlerCompletedCh
|
select {
|
||||||
|
case innerHandlerWriteErr := <-reqHandlerErrCh:
|
||||||
if innerHandlerWriteErr != http.ErrHandlerTimeout {
|
if innerHandlerWriteErr != http.ErrHandlerTimeout {
|
||||||
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
||||||
}
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("Expected the server handler to have completed: %q", rquestTimesOutPath)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err)
|
t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err)
|
||||||
}
|
}
|
||||||
@ -886,10 +892,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, rquestTimesOutPath, response)
|
t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, rquestTimesOutPath, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, err := range headerMatcher.errors() {
|
|
||||||
t.Errorf("Expected APF headers to match, but got: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
t.Log("Waiting for the controller to shutdown")
|
t.Log("Waiting for the controller to shutdown")
|
||||||
|
|
||||||
@ -902,7 +904,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
t.Run("priority level concurrency is set to 1, inner handler writes to the response before request times out", func(t *testing.T) {
|
t.Run("priority level concurrency is set to 1, inner handler writes to the response before request times out", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
const (
|
const (
|
||||||
requestTimeout = 5 * time.Second
|
|
||||||
userName = "alice"
|
userName = "alice"
|
||||||
fsName = "test-fs"
|
fsName = "test-fs"
|
||||||
plName = "test-pl"
|
plName = "test-pl"
|
||||||
@ -914,14 +915,12 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
||||||
|
|
||||||
headerMatcher := headerMatcher{}
|
headerMatcher := headerMatcher{}
|
||||||
var innerHandlerWriteErr error
|
|
||||||
rquestTimesOutPath := "/request/time-out-as-designed"
|
rquestTimesOutPath := "/request/time-out-as-designed"
|
||||||
reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
reqHandlerErrCh, callerRoundTripDoneCh := make(chan error, 1), make(chan struct{})
|
||||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
headerMatcher.inspect(w, fsName, plName)
|
headerMatcher.inspect(t, w, fsName, plName)
|
||||||
|
|
||||||
if r.URL.Path == rquestTimesOutPath {
|
if r.URL.Path == rquestTimesOutPath {
|
||||||
defer close(reqHandlerCompletedCh)
|
|
||||||
|
|
||||||
// inner handler writes header and then let the request time out.
|
// inner handler writes header and then let the request time out.
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
@ -929,14 +928,20 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
// we expect the timeout handler to have timed out this request by now and any attempt
|
// we expect the timeout handler to have timed out this request by now and any attempt
|
||||||
// to write to the response should return a http.ErrHandlerTimeout error.
|
// to write to the response should return a http.ErrHandlerTimeout error.
|
||||||
_, innerHandlerWriteErr = w.Write([]byte("foo"))
|
_, innerHandlerWriteErr := w.Write([]byte("foo"))
|
||||||
|
reqHandlerErrCh <- innerHandlerWriteErr
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
|
||||||
|
|
||||||
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
// NOTE: the server will enforce a 5s timeout on every
|
||||||
|
// incoming request, and the client enforces a timeout of 1m.
|
||||||
|
handler := newHandlerChain(t, requestHandler, controller, userName, 5*time.Second)
|
||||||
|
server, requestGetter := newHTTP2ServerWithClient(handler, time.Minute)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
|
// send a request synchronously with a client timeout of 1m, this minimizes the
|
||||||
|
// chance of a flake in ci, the cient waits long enough for the server to send a
|
||||||
|
// timeout response to the client.
|
||||||
var err error
|
var err error
|
||||||
func() {
|
func() {
|
||||||
defer close(callerRoundTripDoneCh)
|
defer close(callerRoundTripDoneCh)
|
||||||
@ -948,17 +953,17 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath)
|
||||||
<-reqHandlerCompletedCh
|
select {
|
||||||
|
case innerHandlerWriteErr := <-reqHandlerErrCh:
|
||||||
if innerHandlerWriteErr != http.ErrHandlerTimeout {
|
if innerHandlerWriteErr != http.ErrHandlerTimeout {
|
||||||
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err)
|
||||||
}
|
}
|
||||||
expectResetStreamError(t, err)
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("Expected the server handler to have completed: %q", rquestTimesOutPath)
|
||||||
for _, err := range headerMatcher.errors() {
|
|
||||||
t.Errorf("Expected APF headers to match, but got: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expectResetStreamError(t, err)
|
||||||
|
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
t.Log("Waiting for the controller to shutdown")
|
t.Log("Waiting for the controller to shutdown")
|
||||||
|
|
||||||
@ -977,7 +982,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
requestTimeout = 5 * time.Second
|
|
||||||
userName = "alice"
|
userName = "alice"
|
||||||
fsName = "test-fs"
|
fsName = "test-fs"
|
||||||
plName = "test-pl"
|
plName = "test-pl"
|
||||||
@ -989,18 +993,13 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
||||||
|
|
||||||
headerMatcher := headerMatcher{}
|
headerMatcher := headerMatcher{}
|
||||||
var firstRequestInnerHandlerWriteErr error
|
firstRequestTimesOutPath, secondRequestEnqueuedPath := "/request/first/time-out-as-designed", "/request/second/enqueued-as-designed"
|
||||||
var secondRequestExecuted bool
|
firstReqHandlerErrCh, firstReqInProgressCh := make(chan error, 1), make(chan struct{})
|
||||||
firstRequestTimesOutPath := "/request/first/time-out-as-designed"
|
|
||||||
secondRequestEnqueuedPath := "/request/second/enqueued-as-designed"
|
|
||||||
firstReqHandlerCompletedCh, firstReqInProgressCh := make(chan struct{}), make(chan struct{})
|
|
||||||
firstReqRoundTripDoneCh, secondReqRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
firstReqRoundTripDoneCh, secondReqRoundTripDoneCh := make(chan struct{}), make(chan struct{})
|
||||||
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
headerMatcher.inspect(w, fsName, plName)
|
headerMatcher.inspect(t, w, fsName, plName)
|
||||||
|
switch {
|
||||||
if r.URL.Path == firstRequestTimesOutPath {
|
case r.URL.Path == firstRequestTimesOutPath:
|
||||||
defer close(firstReqHandlerCompletedCh)
|
|
||||||
|
|
||||||
close(firstReqInProgressCh)
|
close(firstReqInProgressCh)
|
||||||
<-firstReqRoundTripDoneCh
|
<-firstReqRoundTripDoneCh
|
||||||
|
|
||||||
@ -1010,24 +1009,25 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
// we expect the timeout handler to have timed out this request by now and any attempt
|
// we expect the timeout handler to have timed out this request by now and any attempt
|
||||||
// to write to the response should return a http.ErrHandlerTimeout error.
|
// to write to the response should return a http.ErrHandlerTimeout error.
|
||||||
_, firstRequestInnerHandlerWriteErr = w.Write([]byte("foo"))
|
_, firstRequestInnerHandlerWriteErr := w.Write([]byte("foo"))
|
||||||
return
|
firstReqHandlerErrCh <- firstRequestInnerHandlerWriteErr
|
||||||
}
|
|
||||||
|
|
||||||
if r.URL.Path == secondRequestEnqueuedPath {
|
case r.URL.Path == secondRequestEnqueuedPath:
|
||||||
// we expect the concurrency to be set to 1 and so this request should never be executed.
|
// we expect the concurrency to be set to 1 and so this request should never be executed.
|
||||||
secondRequestExecuted = true
|
t.Errorf("Expected second request to be enqueued: %q", secondRequestEnqueuedPath)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
|
|
||||||
|
|
||||||
server, requestGetter := newHTTP2ServerWithClient(handler, requestTimeout*2)
|
// NOTE: the server will enforce a 5s timeout on every
|
||||||
|
// incoming request, and the client enforces a timeout of 1m.
|
||||||
|
handler := newHandlerChain(t, requestHandler, controller, userName, 5*time.Second)
|
||||||
|
server, requestGetter := newHTTP2ServerWithClient(handler, time.Minute)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
// This test involves two requests sent to the same priority level, which has 1 queue and
|
// This test involves two requests sent to the same priority level, which has 1 queue and
|
||||||
// a concurrency limit of 1. The handler chain include the timeout filter.
|
// a concurrency limit of 1. The handler chain include the timeout filter.
|
||||||
// Each request is sent from a separate goroutine, with a client-side timeout that is
|
// Each request is sent from a separate goroutine, with a client-side timeout of 1m, on
|
||||||
// double the timeout filter's limit.
|
// the other hand, the server enforces a timeout of 5s (via the timeout filter).
|
||||||
// The first request should get dispatched immediately; execution (a) starts with closing
|
// The first request should get dispatched immediately; execution (a) starts with closing
|
||||||
// the channel that triggers the second client goroutine to send its request and then (b)
|
// the channel that triggers the second client goroutine to send its request and then (b)
|
||||||
// waits for both client goroutines to have gotten a response (expected to be timeouts).
|
// waits for both client goroutines to have gotten a response (expected to be timeouts).
|
||||||
@ -1069,12 +1069,16 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", firstRequestTimesOutPath, fmtError(firstReqResult.err))
|
t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", firstRequestTimesOutPath, fmtError(firstReqResult.err))
|
||||||
}
|
}
|
||||||
t.Logf("Waiting for the inner handler of the request: %q to complete", firstRequestTimesOutPath)
|
t.Logf("Waiting for the inner handler of the request: %q to complete", firstRequestTimesOutPath)
|
||||||
<-firstReqHandlerCompletedCh
|
select {
|
||||||
|
case firstRequestInnerHandlerWriteErr := <-firstReqHandlerErrCh:
|
||||||
// first request is expected to time out.
|
|
||||||
if firstRequestInnerHandlerWriteErr != http.ErrHandlerTimeout {
|
if firstRequestInnerHandlerWriteErr != http.ErrHandlerTimeout {
|
||||||
t.Fatalf("Expected error: %#v, but got: %s", http.ErrHandlerTimeout, fmtError(firstRequestInnerHandlerWriteErr))
|
t.Fatalf("Expected error: %#v, but got: %s", http.ErrHandlerTimeout, fmtError(firstRequestInnerHandlerWriteErr))
|
||||||
}
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("Expected the server handler to have completed: %q", firstRequestTimesOutPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// first request is expected to time out.
|
||||||
if isStreamReset(firstReqResult.err) || firstReqResult.response.StatusCode != http.StatusGatewayTimeout {
|
if isStreamReset(firstReqResult.err) || firstReqResult.response.StatusCode != http.StatusGatewayTimeout {
|
||||||
// got what was expected
|
// got what was expected
|
||||||
} else if firstReqResult.err != nil {
|
} else if firstReqResult.err != nil {
|
||||||
@ -1088,9 +1092,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
if isClientTimeout(secondReqResult.err) {
|
if isClientTimeout(secondReqResult.err) {
|
||||||
t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", secondRequestEnqueuedPath, fmtError(secondReqResult.err))
|
t.Fatalf("the client has unexpectedly timed out - request: %q error: %s", secondRequestEnqueuedPath, fmtError(secondReqResult.err))
|
||||||
}
|
}
|
||||||
if secondRequestExecuted {
|
|
||||||
t.Errorf("Expected second request to be enqueued: %q", secondRequestEnqueuedPath)
|
|
||||||
}
|
|
||||||
if isStreamReset(secondReqResult.err) || secondReqResult.response.StatusCode == http.StatusTooManyRequests || secondReqResult.response.StatusCode == http.StatusGatewayTimeout {
|
if isStreamReset(secondReqResult.err) || secondReqResult.response.StatusCode == http.StatusTooManyRequests || secondReqResult.response.StatusCode == http.StatusGatewayTimeout {
|
||||||
// got what was expected
|
// got what was expected
|
||||||
} else if secondReqResult.err != nil {
|
} else if secondReqResult.err != nil {
|
||||||
@ -1099,10 +1100,6 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
t.Errorf("Expected HTTP status code: %d or %d for request: %q, but got: %#+v", http.StatusTooManyRequests, http.StatusGatewayTimeout, secondRequestEnqueuedPath, secondReqResult.response)
|
t.Errorf("Expected HTTP status code: %d or %d for request: %q, but got: %#+v", http.StatusTooManyRequests, http.StatusGatewayTimeout, secondRequestEnqueuedPath, secondReqResult.response)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, err := range headerMatcher.errors() {
|
|
||||||
t.Errorf("Expected APF headers to match, but got: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
t.Log("Waiting for the controller to shutdown")
|
t.Log("Waiting for the controller to shutdown")
|
||||||
|
|
||||||
@ -1169,13 +1166,11 @@ func newHTTP2ServerWithClient(handler http.Handler, clientTimeout time.Duration)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type headerMatcher struct {
|
type headerMatcher struct{}
|
||||||
lock sync.Mutex
|
|
||||||
errsGot []error
|
|
||||||
}
|
|
||||||
|
|
||||||
// verifies that the expected flow schema and priority level UIDs are attached to the header.
|
// verifies that the expected flow schema and priority level UIDs are attached to the header.
|
||||||
func (m *headerMatcher) inspect(w http.ResponseWriter, expectedFS, expectedPL string) {
|
func (m *headerMatcher) inspect(t *testing.T, w http.ResponseWriter, expectedFS, expectedPL string) {
|
||||||
|
t.Helper()
|
||||||
err := func() error {
|
err := func() error {
|
||||||
if w == nil {
|
if w == nil {
|
||||||
return fmt.Errorf("expected a non nil HTTP response")
|
return fmt.Errorf("expected a non nil HTTP response")
|
||||||
@ -1195,16 +1190,7 @@ func (m *headerMatcher) inspect(w http.ResponseWriter, expectedFS, expectedPL st
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
t.Errorf("Expected APF headers to match, but got: %v", err)
|
||||||
m.lock.Lock()
|
|
||||||
defer m.lock.Unlock()
|
|
||||||
m.errsGot = append(m.errsGot, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *headerMatcher) errors() []error {
|
|
||||||
m.lock.Lock()
|
|
||||||
defer m.lock.Unlock()
|
|
||||||
return m.errsGot[:]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// when a request panics, http2 resets the stream with an INTERNAL_ERROR message
|
// when a request panics, http2 resets the stream with an INTERNAL_ERROR message
|
||||||
|
Loading…
Reference in New Issue
Block a user