mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Merge pull request #105729 from MikeSpreitzer/do-not-assume-decision
Remove presumptions about what decision has been made
This commit is contained in:
commit
c269494ebc
@ -20,15 +20,18 @@ import (
|
|||||||
"container/list"
|
"container/list"
|
||||||
)
|
)
|
||||||
|
|
||||||
// removeFromFIFOFunc removes a designated element from the list.
|
// removeFromFIFOFunc removes a designated element from the list
|
||||||
// The complexity of the runtime cost is O(1)
|
// if that element is in the list.
|
||||||
// It returns the request that has been removed from the list,
|
// The complexity of the runtime cost is O(1).
|
||||||
// it returns nil if the request has already been removed.
|
// The returned value is the element removed, if indeed one was removed,
|
||||||
|
// otherwise `nil`.
|
||||||
type removeFromFIFOFunc func() *request
|
type removeFromFIFOFunc func() *request
|
||||||
|
|
||||||
// walkFunc is called for each request in the list in the
|
// walkFunc is called for each request in the list in the
|
||||||
// oldest -> newest order.
|
// oldest -> newest order.
|
||||||
// ok: if walkFunc returns false then the iteration stops immediately.
|
// ok: if walkFunc returns false then the iteration stops immediately.
|
||||||
|
// walkFunc may remove the given request from the fifo,
|
||||||
|
// but may not mutate the fifo in any othe way.
|
||||||
type walkFunc func(*request) (ok bool)
|
type walkFunc func(*request) (ok bool)
|
||||||
|
|
||||||
// Internal interface to abstract out the implementation details
|
// Internal interface to abstract out the implementation details
|
||||||
@ -129,7 +132,9 @@ func (l *requestFIFO) getFirst(remove bool) (*request, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *requestFIFO) Walk(f walkFunc) {
|
func (l *requestFIFO) Walk(f walkFunc) {
|
||||||
for current := l.Front(); current != nil; current = current.Next() {
|
var next *list.Element
|
||||||
|
for current := l.Front(); current != nil; current = next {
|
||||||
|
next = current.Next() // f is allowed to remove current
|
||||||
if r, ok := current.Value.(*request); ok {
|
if r, ok := current.Value.(*request); ok {
|
||||||
if !f(r) {
|
if !f(r) {
|
||||||
return
|
return
|
||||||
|
@ -88,16 +88,22 @@ func TestFIFOWithRemoveMultipleRequestsInArrivalOrder(t *testing.T) {
|
|||||||
removeFn = append(removeFn, list.Enqueue(arrival[i]))
|
removeFn = append(removeFn, list.Enqueue(arrival[i]))
|
||||||
}
|
}
|
||||||
|
|
||||||
dequeued := make([]*request, 0)
|
expected := append([]*request{}, arrival...)
|
||||||
for _, f := range removeFn {
|
for idx, f := range removeFn {
|
||||||
dequeued = append(dequeued, f())
|
if a := f(); a != arrival[idx] {
|
||||||
|
t.Errorf("Removal %d returned %v instead of expected pointer", idx, a)
|
||||||
|
}
|
||||||
|
if a := f(); a != nil {
|
||||||
|
t.Errorf("Redundant removal %d returned %v instead of expected nil", idx, a)
|
||||||
|
}
|
||||||
|
expected = expected[1:]
|
||||||
|
actual := walkAll(list)
|
||||||
|
verifyOrder(t, expected, actual)
|
||||||
}
|
}
|
||||||
|
|
||||||
if list.Length() != 0 {
|
if list.Length() != 0 {
|
||||||
t.Errorf("Expected length: %d, but got: %d)", 0, list.Length())
|
t.Errorf("Expected length: %d, but got: %d)", 0, list.Length())
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyOrder(t, arrival, dequeued)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFIFORemoveFromFIFOFunc(t *testing.T) {
|
func TestFIFORemoveFromFIFOFunc(t *testing.T) {
|
||||||
@ -124,19 +130,25 @@ func TestFIFOWithRemoveMultipleRequestsInRandomOrder(t *testing.T) {
|
|||||||
removeFn = append(removeFn, list.Enqueue(arrival[i]))
|
removeFn = append(removeFn, list.Enqueue(arrival[i]))
|
||||||
}
|
}
|
||||||
|
|
||||||
dequeued := make([]*request, 0)
|
expected := append([]*request{}, arrival...)
|
||||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
randomIndices := r.Perm(len(removeFn))
|
for range arrival {
|
||||||
t.Logf("Random remove order: %v", randomIndices)
|
idx := r.Intn(len(expected))
|
||||||
for i := range randomIndices {
|
t.Logf("Removing random index %d", idx)
|
||||||
dequeued = append(dequeued, removeFn[i]())
|
if e, a := expected[idx], removeFn[idx](); e != a {
|
||||||
|
t.Errorf("Removal of %d returned %v instead of expected pointer %v", idx, a, e)
|
||||||
|
}
|
||||||
|
if e, a := (*request)(nil), removeFn[idx](); e != a {
|
||||||
|
t.Errorf("Redundant removal of %d returned %v instead of expected nil pointer", idx, a)
|
||||||
|
}
|
||||||
|
expected = append(expected[:idx], expected[idx+1:]...)
|
||||||
|
actual := walkAll(list)
|
||||||
|
verifyOrder(t, expected, actual)
|
||||||
|
removeFn = append(removeFn[:idx], removeFn[idx+1:]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
if list.Length() != 0 {
|
if list.Length() != 0 {
|
||||||
t.Errorf("Expected length: %d, but got: %d)", 0, list.Length())
|
t.Errorf("Expected length: %d, but got: %d)", 0, list.Length())
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyOrder(t, arrival, dequeued)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
|
func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
|
||||||
|
@ -582,31 +582,24 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
|
|||||||
// can short circuit loop (break) if oldest requests are not timing out
|
// can short circuit loop (break) if oldest requests are not timing out
|
||||||
// as newer requests also will not have timed out
|
// as newer requests also will not have timed out
|
||||||
|
|
||||||
// now - requestWaitLimit = waitLimit
|
// now - requestWaitLimit = arrivalLimit
|
||||||
waitLimit := now.Add(-qs.qCfg.RequestWaitLimit)
|
arrivalLimit := now.Add(-qs.qCfg.RequestWaitLimit)
|
||||||
reqs.Walk(func(req *request) bool {
|
reqs.Walk(func(req *request) bool {
|
||||||
if waitLimit.After(req.arrivalTime) {
|
if arrivalLimit.After(req.arrivalTime) {
|
||||||
req.decision.Set(decisionReject)
|
if req.decision.Set(decisionReject) && req.removeFromQueueLocked() != nil {
|
||||||
timeoutCount++
|
timeoutCount++
|
||||||
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
|
||||||
req.NoteQueued(false)
|
req.NoteQueued(false)
|
||||||
|
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||||
|
}
|
||||||
// we need to check if the next request has timed out.
|
// we need to check if the next request has timed out.
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// since reqs are sorted oldest -> newest, we are done here.
|
// since reqs are sorted oldest -> newest, we are done here.
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
|
||||||
// remove timed out requests from queue
|
// remove timed out requests from queue
|
||||||
if timeoutCount > 0 {
|
if timeoutCount > 0 {
|
||||||
// The number of requests we have timed out is timeoutCount,
|
|
||||||
// so, let's dequeue the exact number of requests for this queue.
|
|
||||||
for i := 0; i < timeoutCount; i++ {
|
|
||||||
queue.requests.Dequeue()
|
|
||||||
}
|
|
||||||
// decrement the # of requestsEnqueued
|
|
||||||
qs.totRequestsWaiting -= timeoutCount
|
qs.totRequestsWaiting -= timeoutCount
|
||||||
qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount))
|
qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount))
|
||||||
}
|
}
|
||||||
@ -647,18 +640,9 @@ func (qs *queueSet) enqueueLocked(request *request) {
|
|||||||
qs.obsPair.RequestsWaiting.Add(1)
|
qs.obsPair.RequestsWaiting.Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
|
// dispatchAsMuchAsPossibleLocked does as many dispatches as possible now.
|
||||||
// are non-empty queues and the number currently executing is less than the
|
|
||||||
// assured concurrency value. The body of the loop uses the fair queuing
|
|
||||||
// technique to pick a queue, dequeue the request at the head of that
|
|
||||||
// queue, increment the count of the number executing, and send true
|
|
||||||
// to the request's channel.
|
|
||||||
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
||||||
for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit {
|
for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit && qs.dispatchLocked() {
|
||||||
ok := qs.dispatchLocked()
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -691,8 +675,9 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
|
|||||||
|
|
||||||
// dispatchLocked uses the Fair Queuing for Server Requests method to
|
// dispatchLocked uses the Fair Queuing for Server Requests method to
|
||||||
// select a queue and dispatch the oldest request in that queue. The
|
// select a queue and dispatch the oldest request in that queue. The
|
||||||
// return value indicates whether a request was dispatched; this will
|
// return value indicates whether a request was dequeued; this will
|
||||||
// be false when there are no requests waiting in any queue.
|
// be false when either all queues are empty or the request at the head
|
||||||
|
// of the next queue cannot be dispatched.
|
||||||
func (qs *queueSet) dispatchLocked() bool {
|
func (qs *queueSet) dispatchLocked() bool {
|
||||||
queue, request := qs.findDispatchQueueLocked()
|
queue, request := qs.findDispatchQueueLocked()
|
||||||
if queue == nil {
|
if queue == nil {
|
||||||
@ -701,22 +686,26 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
if request == nil { // This should never happen. But if it does...
|
if request == nil { // This should never happen. But if it does...
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
qs.totRequestsWaiting--
|
||||||
|
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
||||||
|
request.NoteQueued(false)
|
||||||
|
qs.obsPair.RequestsWaiting.Add(-1)
|
||||||
|
defer qs.boundNextDispatchLocked(queue)
|
||||||
|
if !request.decision.Set(decisionExecute) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
request.startTime = qs.clock.Now()
|
request.startTime = qs.clock.Now()
|
||||||
// At this moment the request leaves its queue and starts
|
// At this moment the request leaves its queue and starts
|
||||||
// executing. We do not recognize any interim state between
|
// executing. We do not recognize any interim state between
|
||||||
// "queued" and "executing". While that means "executing"
|
// "queued" and "executing". While that means "executing"
|
||||||
// includes a little overhead from this package, this is not a
|
// includes a little overhead from this package, this is not a
|
||||||
// problem because other overhead is also included.
|
// problem because other overhead is also included.
|
||||||
qs.totRequestsWaiting--
|
|
||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
qs.totSeatsInUse += request.MaxSeats()
|
qs.totSeatsInUse += request.MaxSeats()
|
||||||
queue.requestsExecuting++
|
queue.requestsExecuting++
|
||||||
queue.seatsInUse += request.MaxSeats()
|
queue.seatsInUse += request.MaxSeats()
|
||||||
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
|
||||||
request.NoteQueued(false)
|
|
||||||
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
||||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
|
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
|
||||||
qs.obsPair.RequestsWaiting.Add(-1)
|
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.obsPair.RequestsExecuting.Add(1)
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
|
klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
|
||||||
@ -725,8 +714,6 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
}
|
}
|
||||||
// When a request is dequeued for service -> qs.virtualStart += G * width
|
// When a request is dequeued for service -> qs.virtualStart += G * width
|
||||||
queue.nextDispatchR += request.totalWork()
|
queue.nextDispatchR += request.totalWork()
|
||||||
qs.boundNextDispatchLocked(queue)
|
|
||||||
request.decision.Set(decisionExecute)
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user