review changes - *Locked updates

This commit is contained in:
Aaron Prindle
2019-11-12 09:24:56 -08:00
parent 396e2d4aa3
commit 6619df1798

View File

@@ -130,7 +130,7 @@ func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
qs.config = config qs.config = config
qs.dealer = dealer qs.dealer = dealer
qs.dequeueWithChannelAsMuchAsPossible() qs.dequeueWithChannelLockedAsMuchAsPossibleLocked()
return nil return nil
} }
@@ -206,7 +206,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, exe
// technique to pick a queue, dequeue the request at the head of that // technique to pick a queue, dequeue the request at the head of that
// queue, increment the count of the number executing, and send true to // queue, increment the count of the number executing, and send true to
// the request's channel. // the request's channel.
qs.dequeueWithChannelAsMuchAsPossible() qs.dequeueWithChannelLockedAsMuchAsPossibleLocked()
return false, false, false, func() {} return false, false, false, func() {}
}() }()
if shouldReturn { if shouldReturn {
@@ -390,10 +390,10 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64) int {
return bestQueueIdx return bestQueueIdx
} }
// updateQueueVirtualStartTime updates the virtual start time for a queue // updateQueueVirtualStartTimeLocked updates the virtual start time for a queue
// this is done when a new request is enqueued. For more info see: // this is done when a new request is enqueued. For more info see:
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching // https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching
func (qs *queueSet) updateQueueVirtualStartTime(request *fq.Request, queue *fq.Queue) { func (qs *queueSet) updateQueueVirtualStartTimeLocked(request *fq.Request, queue *fq.Queue) {
// When a request arrives to an empty queue with no requests executing: // When a request arrives to an empty queue with no requests executing:
// len(queue.Requests) == 1 as enqueue has just happened prior (vs == 0) // len(queue.Requests) == 1 as enqueue has just happened prior (vs == 0)
if len(queue.Requests) == 1 && queue.RequestsExecuting == 0 { if len(queue.Requests) == 1 && queue.RequestsExecuting == 0 {
@@ -403,10 +403,10 @@ func (qs *queueSet) updateQueueVirtualStartTime(request *fq.Request, queue *fq.Q
} }
// enqueues a request into an queueSet // enqueues a request into an queueSet
func (qs *queueSet) enqueue(request *fq.Request) { func (qs *queueSet) enqueueLocked(request *fq.Request) {
queue := request.Queue queue := request.Queue
queue.Enqueue(request) queue.Enqueue(request)
qs.updateQueueVirtualStartTime(request, queue) qs.updateQueueVirtualStartTimeLocked(request, queue)
qs.numRequestsEnqueued++ qs.numRequestsEnqueued++
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued) metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued)
@@ -423,13 +423,13 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *fq.Request) bool {
return false return false
} }
qs.enqueue(request) qs.enqueueLocked(request)
return true return true
} }
// selectQueue selects the minimum virtualFinish time from the set of queues // selectQueueLocked selects the minimum virtualFinish time from the set of queues
// the starting queue is selected via roundrobin // the starting queue is selected via roundrobin
func (qs *queueSet) selectQueue() *fq.Queue { func (qs *queueSet) selectQueueLocked() *fq.Queue {
minVirtualFinish := math.Inf(1) minVirtualFinish := math.Inf(1)
var minQueue *fq.Queue var minQueue *fq.Queue
var minIndex int var minIndex int
@@ -454,7 +454,7 @@ func (qs *queueSet) selectQueue() *fq.Queue {
// dequeue dequeues a request from the queueSet // dequeue dequeues a request from the queueSet
func (qs *queueSet) dequeueLocked() (*fq.Request, bool) { func (qs *queueSet) dequeueLocked() (*fq.Request, bool) {
queue := qs.selectQueue() queue := qs.selectQueueLocked()
if queue == nil { if queue == nil {
return nil, false return nil, false
} }
@@ -472,25 +472,25 @@ func (qs *queueSet) dequeueLocked() (*fq.Request, bool) {
return request, ok return request, ok
} }
// dequeueWithChannelAsMuchAsPossible runs a loop, as long as there // dequeueWithChannelLockedAsMuchAsPossibleLocked runs a loop, as long as there
// are non-empty queues and the number currently executing is less than the // are non-empty queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the fair queuing // assured concurrency value. The body of the loop uses the fair queuing
// technique to pick a queue, dequeue the request at the head of that // technique to pick a queue, dequeue the request at the head of that
// queue, increment the count of the number executing, and send true // queue, increment the count of the number executing, and send true
// to the request's channel. // to the request's channel.
func (qs *queueSet) dequeueWithChannelAsMuchAsPossible() { func (qs *queueSet) dequeueWithChannelLockedAsMuchAsPossibleLocked() {
for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit { for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit {
_, ok := qs.dequeueWithChannel() _, ok := qs.dequeueWithChannelLocked()
if !ok { if !ok {
break break
} }
} }
} }
// dequeueWithChannel is a convenience method for dequeueing requests that // dequeueWithChannelLocked is a convenience method for dequeueing requests that
// require a message to be sent through the requests channel // require a message to be sent through the requests channel
// this is a required pattern for the QueueSet the queueSet supports // this is a required pattern for the QueueSet the queueSet supports
func (qs *queueSet) dequeueWithChannel() (*fq.Request, bool) { func (qs *queueSet) dequeueWithChannelLocked() (*fq.Request, bool) {
req, ok := qs.dequeueLocked() req, ok := qs.dequeueLocked()
if !ok { if !ok {
return nil, false return nil, false
@@ -565,5 +565,5 @@ func (qs *queueSet) finishRequestAndDequeueWithChannelAsMuchAsPossible(req *fq.R
defer qs.lock.Unlock() defer qs.lock.Unlock()
qs.finishRequestLocked(req) qs.finishRequestLocked(req)
qs.dequeueWithChannelAsMuchAsPossible() qs.dequeueWithChannelLockedAsMuchAsPossibleLocked()
} }