Merge pull request #126574 from tkashem/apf-data-race

Fix data race in APF tests
This commit is contained in:
Kubernetes Prow Robot 2024-08-29 16:49:38 +01:00 committed by GitHub
commit dc8c42752a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -18,6 +18,7 @@ package filters
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -26,6 +27,7 @@ import (
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -128,20 +130,20 @@ func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptes
t.Errorf("execute should not be invoked") t.Errorf("execute should not be invoked")
} }
// atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time. // atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time.
if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 { if want, got := int32(1), atomic.LoadInt32(&atomicReadOnlyExecuting); decision != decisionSkipFilter && want != got {
t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting) t.Errorf("Wanted %d requests executing, got %d", want, got)
} }
} }
postExecuteFunc := func() {} postExecuteFunc := func() {}
// atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time. // atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time.
postEnqueueFunc := func() { postEnqueueFunc := func() {
if atomicReadOnlyWaiting != 1 { if want, got := int32(1), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) t.Errorf("Wanted %d requests in queue, got %d", want, got)
} }
} }
postDequeueFunc := func() { postDequeueFunc := func() {
if atomicReadOnlyWaiting != 0 { if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) t.Errorf("Wanted %d requests in queue, got %d", want, got)
} }
} }
return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc) return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc)
@ -177,11 +179,19 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
Groups: []string{user.AllUnauthenticated}, Groups: []string{user.AllUnauthenticated},
})) }))
apfHandler.ServeHTTP(w, r) func() {
postExecute() // the defer ensures that the following assertion is
if atomicReadOnlyExecuting != 0 { // executed, even if the APF handler panics
t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting) // TODO: all test(s) using this filter must run serially to each other
} defer func() {
t.Logf("the APF handler has finished, checking atomicReadOnlyExecuting")
if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyExecuting); want != got {
t.Errorf("Wanted %d requests executing, got %d", want, got)
}
}()
apfHandler.ServeHTTP(w, r)
postExecute()
}()
}), requestInfoFactory) }), requestInfoFactory)
return handler return handler
@ -270,8 +280,8 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
onExecuteFunc := func() { onExecuteFunc := func() {
preStartExecute.Done() preStartExecute.Done()
preStartExecute.Wait() preStartExecute.Wait()
if int(atomicReadOnlyExecuting) != concurrentRequests { if want, got := int32(concurrentRequests), atomic.LoadInt32(&atomicReadOnlyExecuting); want != got {
t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, atomicReadOnlyExecuting) t.Errorf("Wanted %d requests executing, got %d", want, got)
} }
postStartExecute.Done() postStartExecute.Done()
postStartExecute.Wait() postStartExecute.Wait()
@ -280,8 +290,8 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
postEnqueueFunc := func() { postEnqueueFunc := func() {
preEnqueue.Done() preEnqueue.Done()
preEnqueue.Wait() preEnqueue.Wait()
if int(atomicReadOnlyWaiting) != concurrentRequests { if want, got := int32(concurrentRequests), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) t.Errorf("Wanted %d requests in queue, got %d", want, got)
} }
postEnqueue.Done() postEnqueue.Done()
@ -291,8 +301,8 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
postDequeueFunc := func() { postDequeueFunc := func() {
preDequeue.Done() preDequeue.Done()
preDequeue.Wait() preDequeue.Wait()
if atomicReadOnlyWaiting != 0 { if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) t.Errorf("Wanted %d requests in queue, got %d", want, got)
} }
postDequeue.Done() postDequeue.Done()
postDequeue.Wait() postDequeue.Wait()
@ -345,19 +355,21 @@ func TestApfCancelWaitRequest(t *testing.T) {
} }
type fakeWatchApfFilter struct { type fakeWatchApfFilter struct {
t *testing.T
lock sync.Mutex lock sync.Mutex
inflight int inflight int
capacity int capacity int
postExecutePanic bool postExecutePanic error
preExecutePanic bool preExecutePanic error
utilflowcontrol.WatchTracker utilflowcontrol.WatchTracker
utilflowcontrol.MaxSeatsTracker utilflowcontrol.MaxSeatsTracker
} }
func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { func newFakeWatchApfFilter(t *testing.T, capacity int) *fakeWatchApfFilter {
return &fakeWatchApfFilter{ return &fakeWatchApfFilter{
t: t,
capacity: capacity, capacity: capacity,
WatchTracker: utilflowcontrol.NewWatchTracker(), WatchTracker: utilflowcontrol.NewWatchTracker(),
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(), MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
@ -385,17 +397,23 @@ func (f *fakeWatchApfFilter) Handle(ctx context.Context,
return return
} }
if f.preExecutePanic { func() {
panic("pre-exec-panic") defer func() {
} f.lock.Lock()
execFn() defer f.lock.Unlock()
if f.postExecutePanic { f.inflight--
panic("post-exec-panic") }()
}
f.lock.Lock() if f.preExecutePanic != nil {
defer f.lock.Unlock() f.t.Logf("going to panic (pre-exec) as expected with error: %v, fakeWatchApfFilter: %#v", f.preExecutePanic, f)
f.inflight-- panic(f.preExecutePanic)
}
execFn()
if f.postExecutePanic != nil {
f.t.Logf("going to panic (post-exec) as expected with error: %v, fakeWatchApfFilter: %#v", f.postExecutePanic, f)
panic(f.postExecutePanic)
}
}()
} }
func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error { func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error {
@ -447,7 +465,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
allRunning := sync.WaitGroup{} allRunning := sync.WaitGroup{}
allRunning.Add(2 * concurrentRequests) allRunning.Add(2 * concurrentRequests)
fakeFilter := newFakeWatchApfFilter(concurrentRequests) fakeFilter := newFakeWatchApfFilter(t, concurrentRequests)
onExecuteFunc := func() { onExecuteFunc := func() {
firstRunning.Done() firstRunning.Done()
@ -493,7 +511,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
} }
func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) { func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) {
fakeFilter := newFakeWatchApfFilter(0) fakeFilter := newFakeWatchApfFilter(t, 0)
onExecuteFunc := func() { onExecuteFunc := func() {
t.Errorf("Request unexepectedly executing") t.Errorf("Request unexepectedly executing")
@ -512,7 +530,7 @@ func TestApfWatchPanic(t *testing.T) {
epmetrics.Register() epmetrics.Register()
fcmetrics.Register() fcmetrics.Register()
fakeFilter := newFakeWatchApfFilter(1) fakeFilter := newFakeWatchApfFilter(t, 1)
onExecuteFunc := func() { onExecuteFunc := func() {
panic("test panic") panic("test panic")
@ -539,11 +557,11 @@ func TestApfWatchPanic(t *testing.T) {
func TestApfWatchHandlePanic(t *testing.T) { func TestApfWatchHandlePanic(t *testing.T) {
epmetrics.Register() epmetrics.Register()
fcmetrics.Register() fcmetrics.Register()
preExecutePanicingFilter := newFakeWatchApfFilter(1) preExecutePanicingFilter := newFakeWatchApfFilter(t, 1)
preExecutePanicingFilter.preExecutePanic = true preExecutePanicingFilter.preExecutePanic = http.ErrAbortHandler
postExecutePanicingFilter := newFakeWatchApfFilter(1) postExecutePanicingFilter := newFakeWatchApfFilter(t, 1)
postExecutePanicingFilter.postExecutePanic = true postExecutePanicingFilter.postExecutePanic = http.ErrAbortHandler
testCases := []struct { testCases := []struct {
name string name string
@ -559,18 +577,31 @@ func TestApfWatchHandlePanic(t *testing.T) {
}, },
} }
onExecuteFunc := func() {
time.Sleep(5 * time.Second)
}
postExecuteFunc := func() {}
for _, test := range testCases { for _, test := range testCases {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
onExecuteFunc := func() {
time.Sleep(5 * time.Second)
// this function should not be executed if
// pre-execute panic is set
if test.filter.preExecutePanic != nil {
t.Errorf("did not expect the execute function to be executed")
}
t.Logf("on-execute function invoked")
}
// we either panic before the execute function, or after,
// so the following function should never be executed.
postExecuteFunc := func() {
t.Errorf("did not expect the post-execute function to be invoked")
}
apfHandler := newApfHandlerWithFilter(t, test.filter, time.Minute/4, onExecuteFunc, postExecuteFunc) apfHandler := newApfHandlerWithFilter(t, test.filter, time.Minute/4, onExecuteFunc, postExecuteFunc)
handler := func(w http.ResponseWriter, r *http.Request) { handler := func(w http.ResponseWriter, r *http.Request) {
defer func() { defer func() {
if err := recover(); err == nil { recovered := recover()
t.Errorf("expected panic, got %v", err) if err, ok := recovered.(error); !ok || !errors.Is(err, http.ErrAbortHandler) {
t.Errorf("expected panic with error: %v, but got: %v", http.ErrAbortHandler, err)
} }
}() }()
apfHandler.ServeHTTP(w, r) apfHandler.ServeHTTP(w, r)