flake: fix data race for TestApfWatchHandlePanic unit test

Signed-off-by: googs1025 <googs1025@gmail.com>
This commit is contained in:
googs1025 2025-03-12 10:44:49 +08:00
parent 34349e735c
commit 1a660d3d0c
2 changed files with 29 additions and 21 deletions

View File

@ -52,8 +52,8 @@ var waitingMark = &requestWatermark{
phase: epmetrics.WaitingPhase, phase: epmetrics.WaitingPhase,
} }
var atomicMutatingExecuting, atomicReadOnlyExecuting int32 var atomicMutatingExecuting, atomicReadOnlyExecuting atomic.Int32
var atomicMutatingWaiting, atomicReadOnlyWaiting int32 var atomicMutatingWaiting, atomicReadOnlyWaiting atomic.Int32
// newInitializationSignal is defined for testing purposes. // newInitializationSignal is defined for testing purposes.
var newInitializationSignal = utilflowcontrol.NewInitializationSignal var newInitializationSignal = utilflowcontrol.NewInitializationSignal
@ -143,16 +143,16 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb) isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
noteExecutingDelta := func(delta int32) { noteExecutingDelta := func(delta int32) {
if isMutatingRequest { if isMutatingRequest {
watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta))) watermark.recordMutating(int(atomicMutatingExecuting.Add(delta)))
} else { } else {
watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta))) watermark.recordReadOnly(int(atomicReadOnlyExecuting.Add(delta)))
} }
} }
noteWaitingDelta := func(delta int32) { noteWaitingDelta := func(delta int32) {
if isMutatingRequest { if isMutatingRequest {
waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta))) waitingMark.recordMutating(int(atomicMutatingWaiting.Add(delta)))
} else { } else {
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) waitingMark.recordReadOnly(int(atomicReadOnlyWaiting.Add(delta)))
} }
} }
queueNote := func(inQueue bool) { queueNote := func(inQueue bool) {

View File

@ -128,20 +128,25 @@ func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptes
t.Errorf("execute should not be invoked") t.Errorf("execute should not be invoked")
} }
// atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time. // atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time.
if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 { currentValue := atomicReadOnlyExecuting.Load()
t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting) if decision != decisionSkipFilter && currentValue != 1 {
t.Errorf("Wanted %d requests executing, got %d", 1, currentValue)
} }
} }
postExecuteFunc := func() {} postExecuteFunc := func() {}
// atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time. // atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time.
postEnqueueFunc := func() { postEnqueueFunc := func() {
if atomicReadOnlyWaiting != 1 { currentValue := atomicReadOnlyWaiting.Load()
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) if currentValue != 1 {
t.Errorf("Wanted %d requests in queue, got %d", 1, currentValue)
} }
} }
postDequeueFunc := func() { postDequeueFunc := func() {
if atomicReadOnlyWaiting != 0 { currentValue := atomicReadOnlyWaiting.Load()
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) if currentValue != 0 {
t.Errorf("Wanted %d requests in queue, got %d", 0, currentValue)
} }
} }
return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc) return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc)
@ -185,8 +190,9 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int
// TODO: all test(s) using this filter must run // TODO: all test(s) using this filter must run
// serially to each other // serially to each other
defer func() { defer func() {
if atomicReadOnlyExecuting != 0 { currentValue := atomicReadOnlyExecuting.Load()
t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting) if currentValue != 0 {
t.Errorf("Wanted %d requests executing, got %d", 0, currentValue)
} }
}() }()
apfHandler.ServeHTTP(w, r) apfHandler.ServeHTTP(w, r)
@ -280,8 +286,9 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
onExecuteFunc := func() { onExecuteFunc := func() {
preStartExecute.Done() preStartExecute.Done()
preStartExecute.Wait() preStartExecute.Wait()
if int(atomicReadOnlyExecuting) != concurrentRequests { currentValue := atomicReadOnlyExecuting.Load()
t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, atomicReadOnlyExecuting) if int(currentValue) != concurrentRequests {
t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, currentValue)
} }
postStartExecute.Done() postStartExecute.Done()
postStartExecute.Wait() postStartExecute.Wait()
@ -290,9 +297,9 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
postEnqueueFunc := func() { postEnqueueFunc := func() {
preEnqueue.Done() preEnqueue.Done()
preEnqueue.Wait() preEnqueue.Wait()
if int(atomicReadOnlyWaiting) != concurrentRequests { currentValue := atomicReadOnlyWaiting.Load()
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) if int(currentValue) != concurrentRequests {
t.Errorf("Wanted %d requests in queue, got %d", concurrentRequests, currentValue)
} }
postEnqueue.Done() postEnqueue.Done()
postEnqueue.Wait() postEnqueue.Wait()
@ -301,8 +308,9 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
postDequeueFunc := func() { postDequeueFunc := func() {
preDequeue.Done() preDequeue.Done()
preDequeue.Wait() preDequeue.Wait()
if atomicReadOnlyWaiting != 0 { currentValue := atomicReadOnlyWaiting.Load()
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) if currentValue != 0 {
t.Errorf("Wanted %d requests in queue, got %d", 0, currentValue)
} }
postDequeue.Done() postDequeue.Done()
postDequeue.Wait() postDequeue.Wait()