mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Call queueSet::boundNextDispatchLocked enough
Fix the one path where boundNextDispatchLocked was not being called after modifying a queue. Also check for negative work in a request. These are motivated by https://github.com/kubernetes/kubernetes/issues/112169 but I do not have a way to reproduce it and so can not check that these changes actually remove that symptom. But these changes are good anyway.
This commit is contained in:
parent
d0e413e86d
commit
6ee93e2cee
@ -72,11 +72,19 @@ type queueSetCompleter struct {
|
|||||||
// described in this package's doc, and a pointer to one implements
|
// described in this package's doc, and a pointer to one implements
|
||||||
// the QueueSet interface. The fields listed before the lock
|
// the QueueSet interface. The fields listed before the lock
|
||||||
// should not be changed; the fields listed after the
|
// should not be changed; the fields listed after the
|
||||||
// lock must be accessed only while holding the lock. The methods of
|
// lock must be accessed only while holding the lock.
|
||||||
// this type follow the naming convention that the suffix "Locked"
|
//
|
||||||
// means the caller must hold the lock; for a method whose name does
|
// The methods of this type follow the naming convention that the
|
||||||
// not end in "Locked" either acquires the lock or does not care about
|
// suffix "Locked" means the caller must hold the lock; for a method
|
||||||
// locking.
|
// whose name does not end in "Locked" either acquires the lock or
|
||||||
|
// does not care about locking.
|
||||||
|
//
|
||||||
|
// The methods of this type also follow the convention that the suffix
|
||||||
|
// "ToBoundLocked" means that the caller may have to follow up with a
|
||||||
|
// call to `boundNextDispatchLocked`. This is so for a method that
|
||||||
|
// changes what request is oldest in a queue, because that change means
|
||||||
|
// that the anti-windup hack in boundNextDispatchLocked needs to be
|
||||||
|
// applied wrt the revised oldest request in the queue.
|
||||||
type queueSet struct {
|
type queueSet struct {
|
||||||
clock eventclock.Interface
|
clock eventclock.Interface
|
||||||
estimatedServiceDuration time.Duration
|
estimatedServiceDuration time.Duration
|
||||||
@ -396,7 +404,9 @@ func (req *request) wait() (bool, bool) {
|
|||||||
// TODO(aaron-prindle) add metrics for this case
|
// TODO(aaron-prindle) add metrics for this case
|
||||||
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
|
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
|
||||||
// remove the request from the queue as it has timed out
|
// remove the request from the queue as it has timed out
|
||||||
|
queue := req.queue
|
||||||
if req.removeFromQueueLocked() != nil {
|
if req.removeFromQueueLocked() != nil {
|
||||||
|
defer qs.boundNextDispatchLocked(queue)
|
||||||
qs.totRequestsWaiting--
|
qs.totRequestsWaiting--
|
||||||
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
|
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
|
||||||
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||||
@ -521,7 +531,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
|
queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
|
||||||
queue := qs.queues[queueIdx]
|
queue := qs.queues[queueIdx]
|
||||||
// The next step is the logic to reject requests that have been waiting too long
|
// The next step is the logic to reject requests that have been waiting too long
|
||||||
qs.removeTimedOutRequestsFromQueueLocked(queue, fsName)
|
qs.removeTimedOutRequestsFromQueueToBoundLocked(queue, fsName)
|
||||||
// NOTE: currently timeout is only checked for each new request. This means that there can be
|
// NOTE: currently timeout is only checked for each new request. This means that there can be
|
||||||
// requests that are in the queue longer than the timeout if there are no new requests
|
// requests that are in the queue longer than the timeout if there are no new requests
|
||||||
// We prefer the simplicity over the promptness, at least for now.
|
// We prefer the simplicity over the promptness, at least for now.
|
||||||
@ -543,7 +553,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
queueNoteFn: queueNoteFn,
|
queueNoteFn: queueNoteFn,
|
||||||
workEstimate: qs.completeWorkEstimate(workEstimate),
|
workEstimate: qs.completeWorkEstimate(workEstimate),
|
||||||
}
|
}
|
||||||
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length())
|
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length())
|
||||||
@ -583,9 +593,9 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
|
|||||||
return bestQueueIdx
|
return bestQueueIdx
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
|
// removeTimedOutRequestsFromQueueToBoundLocked rejects old requests that have been enqueued
|
||||||
// past the requestWaitLimit
|
// past the requestWaitLimit
|
||||||
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) {
|
func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) {
|
||||||
timeoutCount := 0
|
timeoutCount := 0
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
reqs := queue.requests
|
reqs := queue.requests
|
||||||
@ -616,11 +626,11 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// rejectOrEnqueueLocked rejects or enqueues the newly arrived
|
// rejectOrEnqueueToBoundLocked rejects or enqueues the newly arrived
|
||||||
// request, which has been assigned to a queue. If up against the
|
// request, which has been assigned to a queue. If up against the
|
||||||
// queue length limit and the concurrency limit then returns false.
|
// queue length limit and the concurrency limit then returns false.
|
||||||
// Otherwise enqueues and returns true.
|
// Otherwise enqueues and returns true.
|
||||||
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool {
|
||||||
queue := request.queue
|
queue := request.queue
|
||||||
curQueueLength := queue.requests.Length()
|
curQueueLength := queue.requests.Length()
|
||||||
// rejects the newly arrived request if resource criteria not met
|
// rejects the newly arrived request if resource criteria not met
|
||||||
@ -629,12 +639,12 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
qs.enqueueLocked(request)
|
qs.enqueueToBoundLocked(request)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueues a request into its queue.
|
// enqueues a request into its queue.
|
||||||
func (qs *queueSet) enqueueLocked(request *request) {
|
func (qs *queueSet) enqueueToBoundLocked(request *request) {
|
||||||
queue := request.queue
|
queue := request.queue
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
|
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
|
||||||
@ -693,7 +703,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
|
|||||||
// be false when either all queues are empty or the request at the head
|
// be false when either all queues are empty or the request at the head
|
||||||
// of the next queue cannot be dispatched.
|
// of the next queue cannot be dispatched.
|
||||||
func (qs *queueSet) dispatchLocked() bool {
|
func (qs *queueSet) dispatchLocked() bool {
|
||||||
queue, request := qs.findDispatchQueueLocked()
|
queue, request := qs.findDispatchQueueToBoundLocked()
|
||||||
if queue == nil {
|
if queue == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -729,6 +739,11 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
|
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
|
||||||
}
|
}
|
||||||
// When a request is dequeued for service -> qs.virtualStart += G * width
|
// When a request is dequeued for service -> qs.virtualStart += G * width
|
||||||
|
if request.totalWork() > rDecrement/100 { // A single increment should never be so big
|
||||||
|
klog.Errorf("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v with implausibly high work %v from queue %d with start R %v",
|
||||||
|
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
|
||||||
|
request.workEstimate, queue.index, queue.nextDispatchR)
|
||||||
|
}
|
||||||
queue.nextDispatchR += request.totalWork()
|
queue.nextDispatchR += request.totalWork()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -756,11 +771,12 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// findDispatchQueueLocked examines the queues in round robin order and
|
// findDispatchQueueToBoundLocked examines the queues in round robin order and
|
||||||
// returns the first one of those for which the virtual finish time of
|
// returns the first one of those for which the virtual finish time of
|
||||||
// the oldest waiting request is minimal, and also returns that request.
|
// the oldest waiting request is minimal, and also returns that request.
|
||||||
// Returns nils if the head of the selected queue can not be dispatched now.
|
// Returns nils if the head of the selected queue can not be dispatched now,
|
||||||
func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) {
|
// in which case the caller does not need to follow up with`qs.boundNextDispatchLocked`.
|
||||||
|
func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
|
||||||
minVirtualFinish := fqrequest.MaxSeatSeconds
|
minVirtualFinish := fqrequest.MaxSeatSeconds
|
||||||
sMin := fqrequest.MaxSeatSeconds
|
sMin := fqrequest.MaxSeatSeconds
|
||||||
dsMin := fqrequest.MaxSeatSeconds
|
dsMin := fqrequest.MaxSeatSeconds
|
||||||
|
@ -1308,7 +1308,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
|
|||||||
minQueueExpected = test.queues[queueIdx]
|
minQueueExpected = test.queues[queueIdx]
|
||||||
}
|
}
|
||||||
|
|
||||||
minQueueGot, reqGot := qs.findDispatchQueueLocked()
|
minQueueGot, reqGot := qs.findDispatchQueueToBoundLocked()
|
||||||
if minQueueExpected != minQueueGot {
|
if minQueueExpected != minQueueGot {
|
||||||
t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot)
|
t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user