diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index decd9d6ca82..5986993d4bb 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -52,8 +52,8 @@ var waitingMark = &requestWatermark{ phase: epmetrics.WaitingPhase, } -var atomicMutatingExecuting, atomicReadOnlyExecuting int32 -var atomicMutatingWaiting, atomicReadOnlyWaiting int32 +var atomicMutatingExecuting, atomicReadOnlyExecuting atomic.Int32 +var atomicMutatingWaiting, atomicReadOnlyWaiting atomic.Int32 // newInitializationSignal is defined for testing purposes. var newInitializationSignal = utilflowcontrol.NewInitializationSignal @@ -143,16 +143,16 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb) noteExecutingDelta := func(delta int32) { if isMutatingRequest { - watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta))) + watermark.recordMutating(int(atomicMutatingExecuting.Add(delta))) } else { - watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta))) + watermark.recordReadOnly(int(atomicReadOnlyExecuting.Add(delta))) } } noteWaitingDelta := func(delta int32) { if isMutatingRequest { - waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta))) + waitingMark.recordMutating(int(atomicMutatingWaiting.Add(delta))) } else { - waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) + waitingMark.recordReadOnly(int(atomicReadOnlyWaiting.Add(delta))) } } queueNote := func(inQueue bool) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 55ef87fdf36..18d99292ccd 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -128,20 +128,25 @@ func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptes t.Errorf("execute should not be invoked") } // atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time. - if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 { - t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting) + currentValue := atomicReadOnlyExecuting.Load() + if decision != decisionSkipFilter && currentValue != 1 { + t.Errorf("Wanted %d requests executing, got %d", 1, currentValue) } } + postExecuteFunc := func() {} // atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time. postEnqueueFunc := func() { - if atomicReadOnlyWaiting != 1 { - t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) + currentValue := atomicReadOnlyWaiting.Load() + if currentValue != 1 { + t.Errorf("Wanted %d requests in queue, got %d", 1, currentValue) } } + postDequeueFunc := func() { - if atomicReadOnlyWaiting != 0 { - t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) + currentValue := atomicReadOnlyWaiting.Load() + if currentValue != 0 { + t.Errorf("Wanted %d requests in queue, got %d", 0, currentValue) } } return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc) @@ -185,8 +190,9 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int // TODO: all test(s) using this filter must run // serially to each other defer func() { - if atomicReadOnlyExecuting != 0 { - t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting) + currentValue := atomicReadOnlyExecuting.Load() + if currentValue != 0 { + t.Errorf("Wanted %d requests executing, got %d", 0, currentValue) } }() apfHandler.ServeHTTP(w, r) @@ -280,8 +286,9 @@ func TestApfExecuteMultipleRequests(t *testing.T) { onExecuteFunc := func() { preStartExecute.Done() preStartExecute.Wait() - if int(atomicReadOnlyExecuting) != concurrentRequests { - t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, atomicReadOnlyExecuting) + currentValue := atomicReadOnlyExecuting.Load() + if int(currentValue) != concurrentRequests { + t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, currentValue) } postStartExecute.Done() postStartExecute.Wait() @@ -290,9 +297,9 @@ func TestApfExecuteMultipleRequests(t *testing.T) { postEnqueueFunc := func() { preEnqueue.Done() preEnqueue.Wait() - if int(atomicReadOnlyWaiting) != concurrentRequests { - t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) - + currentValue := atomicReadOnlyWaiting.Load() + if int(currentValue) != concurrentRequests { + t.Errorf("Wanted %d requests in queue, got %d", concurrentRequests, currentValue) } postEnqueue.Done() postEnqueue.Wait() @@ -301,8 +308,9 @@ func TestApfExecuteMultipleRequests(t *testing.T) { postDequeueFunc := func() { preDequeue.Done() preDequeue.Wait() - if atomicReadOnlyWaiting != 0 { - t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) + currentValue := atomicReadOnlyWaiting.Load() + if currentValue != 0 { + t.Errorf("Wanted %d requests in queue, got %d", 0, currentValue) } postDequeue.Done() postDequeue.Wait()