diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go index 3b22a12e1a1..eb56e1e9467 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go @@ -20,15 +20,18 @@ import ( "container/list" ) -// removeFromFIFOFunc removes a designated element from the list. -// The complexity of the runtime cost is O(1) -// It returns the request that has been removed from the list, -// it returns nil if the request has already been removed. +// removeFromFIFOFunc removes a designated element from the list +// if that element is in the list. +// The complexity of the runtime cost is O(1). +// The returned value is the element removed, if indeed one was removed, +// otherwise `nil`. type removeFromFIFOFunc func() *request // walkFunc is called for each request in the list in the // oldest -> newest order. // ok: if walkFunc returns false then the iteration stops immediately. +// walkFunc may remove the given request from the fifo, +// but may not mutate the fifo in any othe way. type walkFunc func(*request) (ok bool) // Internal interface to abstract out the implementation details @@ -129,7 +132,9 @@ func (l *requestFIFO) getFirst(remove bool) (*request, bool) { } func (l *requestFIFO) Walk(f walkFunc) { - for current := l.Front(); current != nil; current = current.Next() { + var next *list.Element + for current := l.Front(); current != nil; current = next { + next = current.Next() // f is allowed to remove current if r, ok := current.Value.(*request); ok { if !f(r) { return diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go index 22cb2ca7068..9c77e3bd0cb 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go @@ -88,16 +88,22 @@ func TestFIFOWithRemoveMultipleRequestsInArrivalOrder(t *testing.T) { removeFn = append(removeFn, list.Enqueue(arrival[i])) } - dequeued := make([]*request, 0) - for _, f := range removeFn { - dequeued = append(dequeued, f()) + expected := append([]*request{}, arrival...) + for idx, f := range removeFn { + if a := f(); a != arrival[idx] { + t.Errorf("Removal %d returned %v instead of expected pointer", idx, a) + } + if a := f(); a != nil { + t.Errorf("Redundant removal %d returned %v instead of expected nil", idx, a) + } + expected = expected[1:] + actual := walkAll(list) + verifyOrder(t, expected, actual) } if list.Length() != 0 { t.Errorf("Expected length: %d, but got: %d)", 0, list.Length()) } - - verifyOrder(t, arrival, dequeued) } func TestFIFORemoveFromFIFOFunc(t *testing.T) { @@ -124,19 +130,25 @@ func TestFIFOWithRemoveMultipleRequestsInRandomOrder(t *testing.T) { removeFn = append(removeFn, list.Enqueue(arrival[i])) } - dequeued := make([]*request, 0) + expected := append([]*request{}, arrival...) r := rand.New(rand.NewSource(time.Now().UnixNano())) - randomIndices := r.Perm(len(removeFn)) - t.Logf("Random remove order: %v", randomIndices) - for i := range randomIndices { - dequeued = append(dequeued, removeFn[i]()) + for range arrival { + idx := r.Intn(len(expected)) + t.Logf("Removing random index %d", idx) + if e, a := expected[idx], removeFn[idx](); e != a { + t.Errorf("Removal of %d returned %v instead of expected pointer %v", idx, a, e) + } + if e, a := (*request)(nil), removeFn[idx](); e != a { + t.Errorf("Redundant removal of %d returned %v instead of expected nil pointer", idx, a) + } + expected = append(expected[:idx], expected[idx+1:]...) + actual := walkAll(list) + verifyOrder(t, expected, actual) + removeFn = append(removeFn[:idx], removeFn[idx+1:]...) } - if list.Length() != 0 { t.Errorf("Expected length: %d, but got: %d)", 0, list.Length()) } - - verifyOrder(t, arrival, dequeued) } func TestFIFOWithRemoveIsIdempotent(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 1dfd7d5757f..6f74492e3ca 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -582,31 +582,24 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s // can short circuit loop (break) if oldest requests are not timing out // as newer requests also will not have timed out - // now - requestWaitLimit = waitLimit - waitLimit := now.Add(-qs.qCfg.RequestWaitLimit) + // now - requestWaitLimit = arrivalLimit + arrivalLimit := now.Add(-qs.qCfg.RequestWaitLimit) reqs.Walk(func(req *request) bool { - if waitLimit.After(req.arrivalTime) { - req.decision.Set(decisionReject) - timeoutCount++ - metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) - req.NoteQueued(false) - + if arrivalLimit.After(req.arrivalTime) { + if req.decision.Set(decisionReject) && req.removeFromQueueLocked() != nil { + timeoutCount++ + req.NoteQueued(false) + metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) + } // we need to check if the next request has timed out. return true } - // since reqs are sorted oldest -> newest, we are done here. return false }) // remove timed out requests from queue if timeoutCount > 0 { - // The number of requests we have timed out is timeoutCount, - // so, let's dequeue the exact number of requests for this queue. - for i := 0; i < timeoutCount; i++ { - queue.requests.Dequeue() - } - // decrement the # of requestsEnqueued qs.totRequestsWaiting -= timeoutCount qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount)) } @@ -647,18 +640,9 @@ func (qs *queueSet) enqueueLocked(request *request) { qs.obsPair.RequestsWaiting.Add(1) } -// dispatchAsMuchAsPossibleLocked runs a loop, as long as there -// are non-empty queues and the number currently executing is less than the -// assured concurrency value. The body of the loop uses the fair queuing -// technique to pick a queue, dequeue the request at the head of that -// queue, increment the count of the number executing, and send true -// to the request's channel. +// dispatchAsMuchAsPossibleLocked does as many dispatches as possible now. func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { - for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit { - ok := qs.dispatchLocked() - if !ok { - break - } + for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit && qs.dispatchLocked() { } } @@ -691,8 +675,9 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f // dispatchLocked uses the Fair Queuing for Server Requests method to // select a queue and dispatch the oldest request in that queue. The -// return value indicates whether a request was dispatched; this will -// be false when there are no requests waiting in any queue. +// return value indicates whether a request was dequeued; this will +// be false when either all queues are empty or the request at the head +// of the next queue cannot be dispatched. func (qs *queueSet) dispatchLocked() bool { queue, request := qs.findDispatchQueueLocked() if queue == nil { @@ -701,22 +686,26 @@ func (qs *queueSet) dispatchLocked() bool { if request == nil { // This should never happen. But if it does... return false } + qs.totRequestsWaiting-- + metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) + request.NoteQueued(false) + qs.obsPair.RequestsWaiting.Add(-1) + defer qs.boundNextDispatchLocked(queue) + if !request.decision.Set(decisionExecute) { + return true + } request.startTime = qs.clock.Now() // At this moment the request leaves its queue and starts // executing. We do not recognize any interim state between // "queued" and "executing". While that means "executing" // includes a little overhead from this package, this is not a // problem because other overhead is also included. - qs.totRequestsWaiting-- qs.totRequestsExecuting++ qs.totSeatsInUse += request.MaxSeats() queue.requestsExecuting++ queue.seatsInUse += request.MaxSeats() - metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) - request.NoteQueued(false) metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats()) - qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(6).Enabled() { klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", @@ -725,8 +714,6 @@ func (qs *queueSet) dispatchLocked() bool { } // When a request is dequeued for service -> qs.virtualStart += G * width queue.nextDispatchR += request.totalWork() - qs.boundNextDispatchLocked(queue) - request.decision.Set(decisionExecute) return true }