diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 6862b4ecd77..8be7dceb188 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -130,7 +130,7 @@ func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { qs.config = config qs.dealer = dealer - qs.dequeueWithChannelAsMuchAsPossible() + qs.dequeueWithChannelLockedAsMuchAsPossibleLocked() return nil } @@ -206,7 +206,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, exe // technique to pick a queue, dequeue the request at the head of that // queue, increment the count of the number executing, and send true to // the request's channel. - qs.dequeueWithChannelAsMuchAsPossible() + qs.dequeueWithChannelLockedAsMuchAsPossibleLocked() return false, false, false, func() {} }() if shouldReturn { @@ -390,10 +390,10 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64) int { return bestQueueIdx } -// updateQueueVirtualStartTime updates the virtual start time for a queue +// updateQueueVirtualStartTimeLocked updates the virtual start time for a queue // this is done when a new request is enqueued. For more info see: // https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching -func (qs *queueSet) updateQueueVirtualStartTime(request *fq.Request, queue *fq.Queue) { +func (qs *queueSet) updateQueueVirtualStartTimeLocked(request *fq.Request, queue *fq.Queue) { // When a request arrives to an empty queue with no requests executing: // len(queue.Requests) == 1 as enqueue has just happened prior (vs == 0) if len(queue.Requests) == 1 && queue.RequestsExecuting == 0 { @@ -403,10 +403,10 @@ func (qs *queueSet) updateQueueVirtualStartTime(request *fq.Request, queue *fq.Q } // enqueues a request into an queueSet -func (qs *queueSet) enqueue(request *fq.Request) { +func (qs *queueSet) enqueueLocked(request *fq.Request) { queue := request.Queue queue.Enqueue(request) - qs.updateQueueVirtualStartTime(request, queue) + qs.updateQueueVirtualStartTimeLocked(request, queue) qs.numRequestsEnqueued++ metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued) @@ -423,13 +423,13 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *fq.Request) bool { return false } - qs.enqueue(request) + qs.enqueueLocked(request) return true } -// selectQueue selects the minimum virtualFinish time from the set of queues +// selectQueueLocked selects the minimum virtualFinish time from the set of queues // the starting queue is selected via roundrobin -func (qs *queueSet) selectQueue() *fq.Queue { +func (qs *queueSet) selectQueueLocked() *fq.Queue { minVirtualFinish := math.Inf(1) var minQueue *fq.Queue var minIndex int @@ -454,7 +454,7 @@ func (qs *queueSet) selectQueue() *fq.Queue { // dequeue dequeues a request from the queueSet func (qs *queueSet) dequeueLocked() (*fq.Request, bool) { - queue := qs.selectQueue() + queue := qs.selectQueueLocked() if queue == nil { return nil, false } @@ -472,25 +472,25 @@ func (qs *queueSet) dequeueLocked() (*fq.Request, bool) { return request, ok } -// dequeueWithChannelAsMuchAsPossible runs a loop, as long as there +// dequeueWithChannelLockedAsMuchAsPossibleLocked runs a loop, as long as there // are non-empty queues and the number currently executing is less than the // assured concurrency value. The body of the loop uses the fair queuing // technique to pick a queue, dequeue the request at the head of that // queue, increment the count of the number executing, and send true // to the request's channel. -func (qs *queueSet) dequeueWithChannelAsMuchAsPossible() { +func (qs *queueSet) dequeueWithChannelLockedAsMuchAsPossibleLocked() { for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit { - _, ok := qs.dequeueWithChannel() + _, ok := qs.dequeueWithChannelLocked() if !ok { break } } } -// dequeueWithChannel is a convenience method for dequeueing requests that +// dequeueWithChannelLocked is a convenience method for dequeueing requests that // require a message to be sent through the requests channel // this is a required pattern for the QueueSet the queueSet supports -func (qs *queueSet) dequeueWithChannel() (*fq.Request, bool) { +func (qs *queueSet) dequeueWithChannelLocked() (*fq.Request, bool) { req, ok := qs.dequeueLocked() if !ok { return nil, false @@ -565,5 +565,5 @@ func (qs *queueSet) finishRequestAndDequeueWithChannelAsMuchAsPossible(req *fq.R defer qs.lock.Unlock() qs.finishRequestLocked(req) - qs.dequeueWithChannelAsMuchAsPossible() + qs.dequeueWithChannelLockedAsMuchAsPossibleLocked() }