diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index b1a2720fd01..67b67f9d0e5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -18,6 +18,7 @@ package filters import ( "context" + "errors" "fmt" "net/http" "net/http/httptest" @@ -178,11 +179,19 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ Groups: []string{user.AllUnauthenticated}, })) - apfHandler.ServeHTTP(w, r) - postExecute() - if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyExecuting); want != got { - t.Errorf("Wanted %d requests executing, got %d", want, got) - } + func() { + // the defer ensures that the following assertion is + // executed, even if the APF handler panics + // TODO: all test(s) using this filter must run serially to each other + defer func() { + t.Logf("the APF handler has finished, checking atomicReadOnlyExecuting") + if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyExecuting); want != got { + t.Errorf("Wanted %d requests executing, got %d", want, got) + } + }() + apfHandler.ServeHTTP(w, r) + postExecute() + }() }), requestInfoFactory) return handler @@ -346,19 +355,21 @@ func TestApfCancelWaitRequest(t *testing.T) { } type fakeWatchApfFilter struct { + t *testing.T lock sync.Mutex inflight int capacity int - postExecutePanic bool - preExecutePanic bool + postExecutePanic error + preExecutePanic error utilflowcontrol.WatchTracker utilflowcontrol.MaxSeatsTracker } -func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { +func newFakeWatchApfFilter(t *testing.T, capacity int) *fakeWatchApfFilter { return &fakeWatchApfFilter{ + t: t, capacity: capacity, WatchTracker: utilflowcontrol.NewWatchTracker(), MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(), @@ -386,17 +397,23 @@ func (f *fakeWatchApfFilter) Handle(ctx context.Context, return } - if f.preExecutePanic { - panic("pre-exec-panic") - } - execFn() - if f.postExecutePanic { - panic("post-exec-panic") - } + func() { + defer func() { + f.lock.Lock() + defer f.lock.Unlock() + f.inflight-- + }() - f.lock.Lock() - defer f.lock.Unlock() - f.inflight-- + if f.preExecutePanic != nil { + f.t.Logf("going to panic (pre-exec) as expected with error: %v, fakeWatchApfFilter: %#v", f.preExecutePanic, f) + panic(f.preExecutePanic) + } + execFn() + if f.postExecutePanic != nil { + f.t.Logf("going to panic (post-exec) as expected with error: %v, fakeWatchApfFilter: %#v", f.postExecutePanic, f) + panic(f.postExecutePanic) + } + }() } func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error { @@ -448,7 +465,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { allRunning := sync.WaitGroup{} allRunning.Add(2 * concurrentRequests) - fakeFilter := newFakeWatchApfFilter(concurrentRequests) + fakeFilter := newFakeWatchApfFilter(t, concurrentRequests) onExecuteFunc := func() { firstRunning.Done() @@ -494,7 +511,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { } func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) { - fakeFilter := newFakeWatchApfFilter(0) + fakeFilter := newFakeWatchApfFilter(t, 0) onExecuteFunc := func() { t.Errorf("Request unexepectedly executing") @@ -513,7 +530,7 @@ func TestApfWatchPanic(t *testing.T) { epmetrics.Register() fcmetrics.Register() - fakeFilter := newFakeWatchApfFilter(1) + fakeFilter := newFakeWatchApfFilter(t, 1) onExecuteFunc := func() { panic("test panic") @@ -540,11 +557,11 @@ func TestApfWatchPanic(t *testing.T) { func TestApfWatchHandlePanic(t *testing.T) { epmetrics.Register() fcmetrics.Register() - preExecutePanicingFilter := newFakeWatchApfFilter(1) - preExecutePanicingFilter.preExecutePanic = true + preExecutePanicingFilter := newFakeWatchApfFilter(t, 1) + preExecutePanicingFilter.preExecutePanic = http.ErrAbortHandler - postExecutePanicingFilter := newFakeWatchApfFilter(1) - postExecutePanicingFilter.postExecutePanic = true + postExecutePanicingFilter := newFakeWatchApfFilter(t, 1) + postExecutePanicingFilter.postExecutePanic = http.ErrAbortHandler testCases := []struct { name string @@ -560,18 +577,31 @@ func TestApfWatchHandlePanic(t *testing.T) { }, } - onExecuteFunc := func() { - time.Sleep(5 * time.Second) - } - postExecuteFunc := func() {} - for _, test := range testCases { t.Run(test.name, func(t *testing.T) { + onExecuteFunc := func() { + time.Sleep(5 * time.Second) + + // this function should not be executed if + // pre-execute panic is set + if test.filter.preExecutePanic != nil { + t.Errorf("did not expect the execute function to be executed") + } + t.Logf("on-execute function invoked") + } + + // we either panic before the execute function, or after, + // so the following function should never be executed. + postExecuteFunc := func() { + t.Errorf("did not expect the post-execute function to be invoked") + } + apfHandler := newApfHandlerWithFilter(t, test.filter, time.Minute/4, onExecuteFunc, postExecuteFunc) handler := func(w http.ResponseWriter, r *http.Request) { defer func() { - if err := recover(); err == nil { - t.Errorf("expected panic, got %v", err) + recovered := recover() + if err, ok := recovered.(error); !ok || !errors.Is(err, http.ErrAbortHandler) { + t.Errorf("expected panic with error: %v, but got: %v", http.ErrAbortHandler, err) } }() apfHandler.ServeHTTP(w, r)