diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go index b27156c4571..5c7e7acf5da 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go @@ -48,9 +48,9 @@ type fifo interface { // Length returns the number of requests in the list. Length() int - // Width returns the total width (number of seats) of requests + // SeatsSum returns the total number of seats of all requests // in this list. - Width() int + SeatsSum() int // Walk iterates through the list in order of oldest -> newest // and executes the specified walkFunc for each request in that order. @@ -65,7 +65,7 @@ type fifo interface { type requestFIFO struct { *list.List - width int + seatsSum int } func newRequestFIFO() fifo { @@ -78,19 +78,19 @@ func (l *requestFIFO) Length() int { return l.Len() } -func (l *requestFIFO) Width() int { - return l.width +func (l *requestFIFO) SeatsSum() int { + return l.seatsSum } func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc { e := l.PushBack(req) - l.width += req.Seats() + l.seatsSum += req.Seats() return func() *request { if e.Value != nil { l.Remove(e) e.Value = nil - l.width -= req.Seats() + l.seatsSum -= req.Seats() } return req } @@ -109,7 +109,7 @@ func (l *requestFIFO) Dequeue() (*request, bool) { request, ok := e.Value.(*request) if ok { - l.width -= request.Seats() + l.seatsSum -= request.Seats() } return request, ok } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go index b4f83e47ca6..4abedd1d789 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go @@ -150,7 +150,7 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) { verifyOrder(t, orderExpected, remainingRequests) } -func TestFIFOWidth(t *testing.T) { +func TestFIFOSeatsSum(t *testing.T) { list := newRequestFIFO() newRequest := func(width uint) *request { @@ -159,49 +159,49 @@ func TestFIFOWidth(t *testing.T) { arrival := []*request{newRequest(1), newRequest(2), newRequest(3)} removeFn := make([]removeFromFIFOFunc, 0) - width := 0 + seatsSum := 0 for i := range arrival { removeFn = append(removeFn, list.Enqueue(arrival[i])) - width += i + 1 - if list.Width() != width { - t.Errorf("Expected width: %d, but got: %d", width, list.Width()) + seatsSum += i + 1 + if list.SeatsSum() != seatsSum { + t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) } } for i := range removeFn { removeFn[i]() - width -= i + 1 - if list.Width() != width { - t.Errorf("Expected width: %d, but got: %d", width, list.Width()) + seatsSum -= i + 1 + if list.SeatsSum() != seatsSum { + t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) } // check idempotency removeFn[i]() - if list.Width() != width { - t.Errorf("Expected width: %d, but got: %d", width, list.Width()) + if list.SeatsSum() != seatsSum { + t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) } } // Check second type of idempotency: Dequeue + removeFn. for i := range arrival { removeFn[i] = list.Enqueue(arrival[i]) - width += i + 1 + seatsSum += i + 1 } for i := range arrival { if _, ok := list.Dequeue(); !ok { t.Errorf("Unexpected failed dequeue: %d", i) } - width -= i + 1 - if list.Width() != width { - t.Errorf("Expected width: %d, but got: %d", width, list.Width()) + seatsSum -= i + 1 + if list.SeatsSum() != seatsSum { + t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) } removeFn[i]() - if list.Width() != width { - t.Errorf("Expected width: %d, but got: %d", width, list.Width()) + if list.SeatsSum() != seatsSum { + t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) } } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index e8bc4691f55..68816c503e1 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -472,18 +472,20 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte // using the given hashValue and the shuffle sharding parameters of the queueSet. func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int { bestQueueIdx := -1 - bestQueueWidth := int(math.MaxInt32) + bestQueueSeatsSum := int(math.MaxInt32) // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. qs.dealer.Deal(hashValue, func(queueIdx int) { // TODO: Consider taking into account `additional latency` of requests // in addition to their widths. - thisWidth := qs.queues[queueIdx].requests.Width() - klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of width %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisWidth) - if thisWidth < bestQueueWidth { - bestQueueIdx, bestQueueWidth = queueIdx, thisWidth + // Ideally, this should be based on projected completion time in the + // virtual world of the youngest request in the queue. + thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum() + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum) + if thisSeatsSum < bestQueueSeatsSum { + bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum } }) - klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueWidth, qs.queues[bestQueueIdx].requestsExecuting) + klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting) return bestQueueIdx } @@ -533,7 +535,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s // Otherwise enqueues and returns true. func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { queue := request.queue - curQueueLength := queue.requests.Width() + curQueueLength := queue.requests.Length() // rejects the newly arrived request if resource criteria not met if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit && curQueueLength >= qs.qCfg.QueueLengthLimit {