diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index faa670bf173..684e76da64e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -72,11 +72,19 @@ type queueSetCompleter struct { // described in this package's doc, and a pointer to one implements // the QueueSet interface. The fields listed before the lock // should not be changed; the fields listed after the -// lock must be accessed only while holding the lock. The methods of -// this type follow the naming convention that the suffix "Locked" -// means the caller must hold the lock; for a method whose name does -// not end in "Locked" either acquires the lock or does not care about -// locking. +// lock must be accessed only while holding the lock. +// +// The methods of this type follow the naming convention that the +// suffix "Locked" means the caller must hold the lock; for a method +// whose name does not end in "Locked" either acquires the lock or +// does not care about locking. +// +// The methods of this type also follow the convention that the suffix +// "ToBoundLocked" means that the caller may have to follow up with a +// call to `boundNextDispatchLocked`. This is so for a method that +// changes what request is oldest in a queue, because that change means +// that the anti-windup hack in boundNextDispatchLocked needs to be +// applied wrt the revised oldest request in the queue. type queueSet struct { clock eventclock.Interface estimatedServiceDuration time.Duration @@ -396,7 +404,9 @@ func (req *request) wait() (bool, bool) { // TODO(aaron-prindle) add metrics for this case klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2) // remove the request from the queue as it has timed out + queue := req.queue if req.removeFromQueueLocked() != nil { + defer qs.boundNextDispatchLocked(queue) qs.totRequestsWaiting-- metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) @@ -521,7 +531,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] // The next step is the logic to reject requests that have been waiting too long - qs.removeTimedOutRequestsFromQueueLocked(queue, fsName) + qs.removeTimedOutRequestsFromQueueToBoundLocked(queue, fsName) // NOTE: currently timeout is only checked for each new request. This means that there can be // requests that are in the queue longer than the timeout if there are no new requests // We prefer the simplicity over the promptness, at least for now. @@ -543,7 +553,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte queueNoteFn: queueNoteFn, workEstimate: qs.completeWorkEstimate(workEstimate), } - if ok := qs.rejectOrEnqueueLocked(req); !ok { + if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok { return nil } metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length()) @@ -583,9 +593,9 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac return bestQueueIdx } -// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued +// removeTimedOutRequestsFromQueueToBoundLocked rejects old requests that have been enqueued // past the requestWaitLimit -func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) { +func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) { timeoutCount := 0 now := qs.clock.Now() reqs := queue.requests @@ -616,11 +626,11 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s } } -// rejectOrEnqueueLocked rejects or enqueues the newly arrived +// rejectOrEnqueueToBoundLocked rejects or enqueues the newly arrived // request, which has been assigned to a queue. If up against the // queue length limit and the concurrency limit then returns false. // Otherwise enqueues and returns true. -func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { +func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool { queue := request.queue curQueueLength := queue.requests.Length() // rejects the newly arrived request if resource criteria not met @@ -629,12 +639,12 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { return false } - qs.enqueueLocked(request) + qs.enqueueToBoundLocked(request) return true } // enqueues a request into its queue. -func (qs *queueSet) enqueueLocked(request *request) { +func (qs *queueSet) enqueueToBoundLocked(request *request) { queue := request.queue now := qs.clock.Now() if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { @@ -693,7 +703,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f // be false when either all queues are empty or the request at the head // of the next queue cannot be dispatched. func (qs *queueSet) dispatchLocked() bool { - queue, request := qs.findDispatchQueueLocked() + queue, request := qs.findDispatchQueueToBoundLocked() if queue == nil { return false } @@ -729,6 +739,11 @@ func (qs *queueSet) dispatchLocked() bool { request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) } // When a request is dequeued for service -> qs.virtualStart += G * width + if request.totalWork() > rDecrement/100 { // A single increment should never be so big + klog.Errorf("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v with implausibly high work %v from queue %d with start R %v", + qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2, + request.workEstimate, queue.index, queue.nextDispatchR) + } queue.nextDispatchR += request.totalWork() return true } @@ -756,11 +771,12 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { return true } -// findDispatchQueueLocked examines the queues in round robin order and +// findDispatchQueueToBoundLocked examines the queues in round robin order and // returns the first one of those for which the virtual finish time of // the oldest waiting request is minimal, and also returns that request. -// Returns nils if the head of the selected queue can not be dispatched now. -func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) { +// Returns nils if the head of the selected queue can not be dispatched now, +// in which case the caller does not need to follow up with`qs.boundNextDispatchLocked`. +func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) { minVirtualFinish := fqrequest.MaxSeatSeconds sMin := fqrequest.MaxSeatSeconds dsMin := fqrequest.MaxSeatSeconds diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index d7727221c5f..26ba9fb7f38 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -1308,7 +1308,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { minQueueExpected = test.queues[queueIdx] } - minQueueGot, reqGot := qs.findDispatchQueueLocked() + minQueueGot, reqGot := qs.findDispatchQueueToBoundLocked() if minQueueExpected != minQueueGot { t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot) }