diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go index 78fb777f842..b27156c4571 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go @@ -48,6 +48,10 @@ type fifo interface { // Length returns the number of requests in the list. Length() int + // Width returns the total width (number of seats) of requests + // in this list. + Width() int + // Walk iterates through the list in order of oldest -> newest // and executes the specified walkFunc for each request in that order. // @@ -60,6 +64,8 @@ type fifo interface { // goroutines without additional locking or coordination. type requestFIFO struct { *list.List + + width int } func newRequestFIFO() fifo { @@ -72,10 +78,20 @@ func (l *requestFIFO) Length() int { return l.Len() } +func (l *requestFIFO) Width() int { + return l.width +} + func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc { e := l.PushBack(req) + l.width += req.Seats() + return func() *request { - l.Remove(e) + if e.Value != nil { + l.Remove(e) + e.Value = nil + l.width -= req.Seats() + } return req } } @@ -85,9 +101,16 @@ func (l *requestFIFO) Dequeue() (*request, bool) { if e == nil { return nil, false } - defer l.Remove(e) + + defer func() { + l.Remove(e) + e.Value = nil + }() request, ok := e.Value.(*request) + if ok { + l.width -= request.Seats() + } return request, ok } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go index 079f0d770ba..b4f83e47ca6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go @@ -20,6 +20,8 @@ import ( "math/rand" "testing" "time" + + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" ) func TestFIFOWithEnqueueDequeueSingleRequest(t *testing.T) { @@ -148,6 +150,62 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) { verifyOrder(t, orderExpected, remainingRequests) } +func TestFIFOWidth(t *testing.T) { + list := newRequestFIFO() + + newRequest := func(width uint) *request { + return &request{width: fcrequest.Width{Seats: width}} + } + arrival := []*request{newRequest(1), newRequest(2), newRequest(3)} + removeFn := make([]removeFromFIFOFunc, 0) + + width := 0 + for i := range arrival { + removeFn = append(removeFn, list.Enqueue(arrival[i])) + + width += i + 1 + if list.Width() != width { + t.Errorf("Expected width: %d, but got: %d", width, list.Width()) + } + } + + for i := range removeFn { + removeFn[i]() + + width -= i + 1 + if list.Width() != width { + t.Errorf("Expected width: %d, but got: %d", width, list.Width()) + } + + // check idempotency + removeFn[i]() + if list.Width() != width { + t.Errorf("Expected width: %d, but got: %d", width, list.Width()) + } + } + + // Check second type of idempotency: Dequeue + removeFn. + for i := range arrival { + removeFn[i] = list.Enqueue(arrival[i]) + width += i + 1 + } + + for i := range arrival { + if _, ok := list.Dequeue(); !ok { + t.Errorf("Unexpected failed dequeue: %d", i) + } + width -= i + 1 + if list.Width() != width { + t.Errorf("Expected width: %d, but got: %d", width, list.Width()) + } + + removeFn[i]() + if list.Width() != width { + t.Errorf("Expected width: %d, but got: %d", width, list.Width()) + } + } +} + func TestFIFOWithWalk(t *testing.T) { list := newRequestFIFO() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index c2275c0a393..e8bc4691f55 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -438,7 +438,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { - // Start with the shuffle sharding, to pick a queue. + // Start with the shuffle sharding, to pick a queue. queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] // The next step is the logic to reject requests that have been waiting too long @@ -472,16 +472,18 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte // using the given hashValue and the shuffle sharding parameters of the queueSet. func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int { bestQueueIdx := -1 - bestQueueLen := int(math.MaxInt32) + bestQueueWidth := int(math.MaxInt32) // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. qs.dealer.Deal(hashValue, func(queueIdx int) { - thisLen := qs.queues[queueIdx].requests.Length() - klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen) - if thisLen < bestQueueLen { - bestQueueIdx, bestQueueLen = queueIdx, thisLen + // TODO: Consider taking into account `additional latency` of requests + // in addition to their widths. + thisWidth := qs.queues[queueIdx].requests.Width() + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of width %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisWidth) + if thisWidth < bestQueueWidth { + bestQueueIdx, bestQueueWidth = queueIdx, thisWidth } }) - klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) + klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueWidth, qs.queues[bestQueueIdx].requestsExecuting) return bestQueueIdx } @@ -531,7 +533,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s // Otherwise enqueues and returns true. func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { queue := request.queue - curQueueLength := queue.requests.Length() + curQueueLength := queue.requests.Width() // rejects the newly arrived request if resource criteria not met if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit && curQueueLength >= qs.qCfg.QueueLengthLimit {