From 3d6cc118fee15313419bf7aa0082a2a608ec62f6 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Fri, 24 Sep 2021 15:18:27 -0400 Subject: [PATCH] introduce final seats for work estimate --- .../filters/priority-and-fairness_test.go | 4 +- .../fairqueuing/queueset/fifo_list.go | 30 +++++-- .../fairqueuing/queueset/fifo_list_test.go | 83 +++++++++++------ .../fairqueuing/queueset/queueset.go | 90 +++++++++++-------- .../fairqueuing/queueset/queueset_test.go | 49 ++++++++++ .../flowcontrol/fairqueuing/queueset/types.go | 18 ++++ .../pkg/util/flowcontrol/request/width.go | 17 +++- 7 files changed, 218 insertions(+), 73 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 7853054d496..aeccf5afed2 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -653,7 +653,9 @@ func TestApfWithRequestDigest(t *testing.T) { RequestInfo: &apirequest.RequestInfo{Verb: "get"}, User: &user.DefaultInfo{Name: "foo"}, WorkEstimate: fcrequest.WorkEstimate{ - InitialSeats: 5, + InitialSeats: 5, + FinalSeats: 7, + AdditionalLatency: 3 * time.Second, }, } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go index 9f531b96442..0c66939b8b1 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go @@ -51,9 +51,9 @@ type fifo interface { // Length returns the number of requests in the list. Length() int - // SeatsSum returns the total number of seats of all requests - // in this list. - SeatsSum() int + // QueueSum returns the sum of initial seats, final seats, and + // additional latency aggregated from all requests in this queue. + QueueSum() queueSum // Walk iterates through the list in order of oldest -> newest // and executes the specified walkFunc for each request in that order. @@ -68,7 +68,7 @@ type fifo interface { type requestFIFO struct { *list.List - seatsSum int + sum queueSum } func newRequestFIFO() fifo { @@ -81,19 +81,19 @@ func (l *requestFIFO) Length() int { return l.Len() } -func (l *requestFIFO) SeatsSum() int { - return l.seatsSum +func (l *requestFIFO) QueueSum() queueSum { + return l.sum } func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc { e := l.PushBack(req) - l.seatsSum += req.Seats() + addToQueueSum(&l.sum, req) return func() *request { if e.Value != nil { l.Remove(e) e.Value = nil - l.seatsSum -= req.Seats() + deductFromQueueSum(&l.sum, req) } return req } @@ -122,7 +122,7 @@ func (l *requestFIFO) getFirst(remove bool) (*request, bool) { request, ok := e.Value.(*request) if remove && ok { - l.seatsSum -= request.Seats() + deductFromQueueSum(&l.sum, request) } return request, ok } @@ -136,3 +136,15 @@ func (l *requestFIFO) Walk(f walkFunc) { } } } + +func addToQueueSum(sum *queueSum, req *request) { + sum.InitialSeatsSum += req.InitialSeats() + sum.MaxSeatsSum += req.MaxSeats() + sum.AdditionalSeatSecondsSum += req.AdditionalSeatSeconds() +} + +func deductFromQueueSum(sum *queueSum, req *request) { + sum.InitialSeatsSum -= req.InitialSeats() + sum.MaxSeatsSum -= req.MaxSeats() + sum.AdditionalSeatSecondsSum -= req.AdditionalSeatSeconds() +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go index 1d490642ae8..5f6da64584b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go @@ -18,9 +18,11 @@ package queueset import ( "math/rand" + "reflect" "testing" "time" + "github.com/google/go-cmp/cmp" fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" ) @@ -150,59 +152,88 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) { verifyOrder(t, orderExpected, remainingRequests) } -func TestFIFOSeatsSum(t *testing.T) { +func TestFIFOQueueWorkEstimate(t *testing.T) { list := newRequestFIFO() - newRequest := func(width uint) *request { - return &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: width}} + update := func(we *queueSum, req *request, multiplier int) { + we.InitialSeatsSum += multiplier * req.InitialSeats() + we.MaxSeatsSum += multiplier * req.MaxSeats() + we.AdditionalSeatSecondsSum += SeatSeconds(multiplier) * req.AdditionalSeatSeconds() + } + + assert := func(t *testing.T, want, got *queueSum) { + if !reflect.DeepEqual(want, got) { + t.Errorf("Expected queue work estimate to match, diff: %s", cmp.Diff(want, got)) + } + } + + newRequest := func(initialSeats, finalSeats uint, additionalLatency time.Duration) *request { + return &request{workEstimate: fcrequest.WorkEstimate{ + InitialSeats: initialSeats, + FinalSeats: finalSeats, + AdditionalLatency: additionalLatency, + }} + } + arrival := []*request{ + newRequest(1, 3, time.Second), + newRequest(2, 2, 2*time.Second), + newRequest(3, 1, 3*time.Second), } - arrival := []*request{newRequest(1), newRequest(2), newRequest(3)} removeFn := make([]removeFromFIFOFunc, 0) - seatsSum := 0 + queueSumExpected := queueSum{} for i := range arrival { - removeFn = append(removeFn, list.Enqueue(arrival[i])) + req := arrival[i] + removeFn = append(removeFn, list.Enqueue(req)) + update(&queueSumExpected, req, 1) - seatsSum += i + 1 - if list.SeatsSum() != seatsSum { - t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) - } + workEstimateGot := list.QueueSum() + assert(t, &queueSumExpected, &workEstimateGot) } + // NOTE: the test expects the request and the remove func to be at the same index for i := range removeFn { + req := arrival[i] removeFn[i]() - seatsSum -= i + 1 - if list.SeatsSum() != seatsSum { - t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) - } + update(&queueSumExpected, req, -1) + + workEstimateGot := list.QueueSum() + assert(t, &queueSumExpected, &workEstimateGot) // check idempotency removeFn[i]() - if list.SeatsSum() != seatsSum { - t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) - } + + workEstimateGot = list.QueueSum() + assert(t, &queueSumExpected, &workEstimateGot) } // Check second type of idempotency: Dequeue + removeFn. for i := range arrival { - removeFn[i] = list.Enqueue(arrival[i]) - seatsSum += i + 1 + req := arrival[i] + removeFn[i] = list.Enqueue(req) + + update(&queueSumExpected, req, 1) } for i := range arrival { + // we expect Dequeue to pop the oldest request that should + // have the lowest index as well. + req := arrival[i] + if _, ok := list.Dequeue(); !ok { t.Errorf("Unexpected failed dequeue: %d", i) } - seatsSum -= i + 1 - if list.SeatsSum() != seatsSum { - t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) - } + + update(&queueSumExpected, req, -1) + + queueSumGot := list.QueueSum() + assert(t, &queueSumExpected, &queueSumGot) removeFn[i]() - if list.SeatsSum() != seatsSum { - t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) - } + + queueSumGot = list.QueueSum() + assert(t, &queueSumExpected, &queueSumGot) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 1a41a833e6e..f66b7619abc 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -268,7 +268,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo // Step 0: // Apply only concurrency limit, if zero queues desired if qs.qCfg.DesiredNumQueues < 1 { - if !qs.canAccommodateSeatsLocked(int(workEstimate.InitialSeats)) { + if !qs.canAccommodateSeatsLocked(workEstimate.MaxSeats()) { klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, workEstimate, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") @@ -315,11 +315,28 @@ func ordinaryPromiseFactoryFactory(qs *queueSet) promiseFactory { return promise.NewWriteOnce } -// Seats returns the number of seats this request requires. -func (req *request) Seats() int { +// MaxSeats returns the maximum number of seats this request requires, it is +// the maxumum of the two - WorkEstimate.InitialSeats, WorkEstimate.FinalSeats. +func (req *request) MaxSeats() int { + return req.workEstimate.MaxSeats() +} + +func (req *request) InitialSeats() int { return int(req.workEstimate.InitialSeats) } +// AdditionalSeatSeconds returns the amount of work in SeatSeconds produced by +// the final seats and the additional latency associated with a request. +func (req *request) AdditionalSeatSeconds() SeatSeconds { + return SeatsTimesDuration(float64(req.workEstimate.FinalSeats), req.workEstimate.AdditionalLatency) +} + +// InitialSeatSeconds returns the amount of work in SeatSeconds projected +// by the initial seats for a given estimated service duration. +func (req *request) InitialSeatSeconds(estimatedServiceDuration time.Duration) SeatSeconds { + return SeatsTimesDuration(float64(req.workEstimate.InitialSeats), estimatedServiceDuration) +} + func (req *request) NoteQueued(inQueue bool) { if req.queueNoteFn != nil { req.queueNoteFn(inQueue) @@ -423,7 +440,9 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { activeQueues := 0 seatsRequested := 0 for _, queue := range qs.queues { - seatsRequested += (queue.seatsInUse + queue.requests.SeatsSum()) + // here we want the sum of the maximum width of the requests in this queue since our + // goal is to find the maximum rate at which the queue could work. + seatsRequested += (queue.seatsInUse + queue.requests.QueueSum().MaxSeatsSum) if queue.requests.Length() > 0 || queue.requestsExecuting > 0 { activeQueues++ } @@ -489,25 +508,24 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac offset := qs.enqueues % handSize qs.enqueues++ bestQueueIdx := -1 - bestQueueSeatsSum := int(math.MaxInt32) + minQueueSeatSeconds := MaxSeatSeconds for i := 0; i < handSize; i++ { queueIdx := hand[(offset+i)%handSize] queue := qs.queues[queueIdx] - waitingSeats := queue.requests.SeatsSum() - // TODO: Consider taking into account `additional latency` of requests - // in addition to their seats. - // Ideally, this should be based on projected completion time in the - // virtual world of the youngest request in the queue. - thisSeatsSum := waitingSeats + queue.seatsInUse - klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.nextDispatchR) - if thisSeatsSum < bestQueueSeatsSum { - bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum - } + queueSum := queue.requests.QueueSum() + // this is the total amount of work in seat-seconds for requests + // waiting in this queue, we will select the queue with the minimum. + thisQueueSeatSeconds := SeatsTimesDuration(float64(queueSum.InitialSeatsSum), qs.estimatedServiceDuration) + queueSum.AdditionalSeatSecondsSum + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d with sum: %#v and %d seats in use, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, queueSum, queue.seatsInUse, queue.nextDispatchR) + if thisQueueSeatSeconds < minQueueSeatSeconds { + minQueueSeatSeconds = thisQueueSeatSeconds + bestQueueIdx = queueIdx + } } if klog.V(6).Enabled() { chosenQueue := qs.queues[bestQueueIdx] - klog.V(6).Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.nextDispatchR) + klog.V(6).Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, with sum: %#v & %d seats in use & nextDispatchR=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.QueueSum(), chosenQueue.seatsInUse, chosenQueue.nextDispatchR) } return bestQueueIdx } @@ -619,9 +637,9 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f workEstimate: *workEstimate, } qs.totRequestsExecuting++ - qs.totSeatsInUse += req.Seats() + qs.totSeatsInUse += req.MaxSeats() metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) - metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats()) + metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats()) qs.obsPair.RequestsExecuting.Add(1) if klog.V(5).Enabled() { klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting) @@ -650,13 +668,13 @@ func (qs *queueSet) dispatchLocked() bool { // problem because other overhead is also included. qs.totRequestsWaiting-- qs.totRequestsExecuting++ - qs.totSeatsInUse += request.Seats() + qs.totSeatsInUse += request.MaxSeats() queue.requestsExecuting++ - queue.seatsInUse += request.Seats() + queue.seatsInUse += request.MaxSeats() metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) request.NoteQueued(false) metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) - metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.Seats()) + metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats()) qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(6).Enabled() { @@ -665,7 +683,7 @@ func (qs *queueSet) dispatchLocked() bool { request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) } // When a request is dequeued for service -> qs.virtualStart += G * width - queue.nextDispatchR += SeatsTimesDuration(float64(request.Seats()), qs.estimatedServiceDuration) + queue.nextDispatchR += request.InitialSeatSeconds(qs.estimatedServiceDuration) + request.AdditionalSeatSeconds() qs.boundNextDispatch(queue) request.decision.Set(decisionExecute) return ok @@ -716,7 +734,7 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress) dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress) - currentVirtualFinish := queue.nextDispatchR + SeatsTimesDuration(float64(oldestWaiting.Seats()), qs.estimatedServiceDuration) + currentVirtualFinish := queue.nextDispatchR + oldestWaiting.InitialSeatSeconds(qs.estimatedServiceDuration) + oldestWaiting.AdditionalSeatSeconds() klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish) if currentVirtualFinish < minVirtualFinish { minVirtualFinish = currentVirtualFinish @@ -732,12 +750,12 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name) return nil } - if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) { + if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.MaxSeats()) { // since we have not picked the queue with the minimum virtual finish // time, we are not going to advance the round robin index here. if klog.V(4).Enabled() { klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d", - qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.Seats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit) + qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.MaxSeats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit) } return nil } @@ -799,10 +817,10 @@ func (qs *queueSet) finishRequestLocked(r *request) { releaseSeatsLocked := func() { defer qs.removeQueueIfEmptyLocked(r) - qs.totSeatsInUse -= r.Seats() - metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats()) + qs.totSeatsInUse -= r.MaxSeats() + metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats()) if r.queue != nil { - r.queue.seatsInUse -= r.Seats() + r.queue.seatsInUse -= r.MaxSeats() } } @@ -812,9 +830,9 @@ func (qs *queueSet) finishRequestLocked(r *request) { releaseSeatsLocked() if !klog.V(6).Enabled() { } else if r.queue != nil { - klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats", - qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, - r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) + klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue sum: %#v, %d requests waiting & %d requests occupying %d seats", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.MaxSeats(), r.queue.index, + r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.QueueSum(), r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.seatsInUse) } else { klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) } @@ -824,9 +842,9 @@ func (qs *queueSet) finishRequestLocked(r *request) { additionalLatency := r.workEstimate.AdditionalLatency if !klog.V(6).Enabled() { } else if r.queue != nil { - klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing", + klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests waiting & %d executing, queue sum: %#v & %d seats in uses", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index, - r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse) + r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.QueueSum(), r.queue.seatsInUse) } else { klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse) } @@ -841,9 +859,9 @@ func (qs *queueSet) finishRequestLocked(r *request) { releaseSeatsLocked() if !klog.V(6).Enabled() { } else if r.queue != nil { - klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats", + klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, queue sum: %#v, & %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, - r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) + r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse) } else { klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) } @@ -857,7 +875,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { // When a request finishes being served, and the actual service time was S, // the queue’s start R is decremented by (G - S)*width. - r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.Seats()), qs.estimatedServiceDuration-actualServiceDuration) + r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration) qs.boundNextDispatch(r.queue) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 7a514a1fa8b..1c4cb04529b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -1354,6 +1354,55 @@ func TestFinishRequestLocked(t *testing.T) { } } +func TestRequestSeats(t *testing.T) { + tests := []struct { + name string + request *request + expected int + }{ + { + name: "", + request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 3}}, + expected: 3, + }, + { + name: "", + request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1, FinalSeats: 3}}, + expected: 3, + }, + { + name: "", + request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 1}}, + expected: 3, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + seatsGot := test.request.MaxSeats() + if test.expected != seatsGot { + t.Errorf("Expected seats: %d, got %d", test.expected, seatsGot) + } + }) + } +} + +func TestRequestAdditionalSeatSeconds(t *testing.T) { + request := &request{ + workEstimate: fcrequest.WorkEstimate{ + InitialSeats: 3, + FinalSeats: 5, + AdditionalLatency: 3 * time.Second, + }, + } + + got := request.AdditionalSeatSeconds() + want := SeatsTimesDuration(5, 3*time.Second) + if want != got { + t.Errorf("Expected AdditionalSeatSeconds: %v, but got: %v", want, got) + } +} + func newFIFO(requests ...*request) fifo { l := newRequestFIFO() for i := range requests { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 2a36040566e..cbd2af6889d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -97,6 +97,22 @@ type queue struct { seatsInUse int } +// queueSum tracks the sum of initial seats, final seats, and +// additional latency aggregated from all requests in a given queue +type queueSum struct { + // InitialSeatsSum is the sum of InitialSeats + // associated with all requests in a given queue. + InitialSeatsSum int + + // MaxSeatsSum is the sum of MaxSeats + // associated with all requests in a given queue. + MaxSeatsSum int + + // AdditionalSeatSecondsSum is sum of AdditionalSeatsSeconds + // associated with all requests in a given queue. + AdditionalSeatSecondsSum SeatSeconds +} + // Enqueue enqueues a request into the queue and // sets the removeFromQueueFn of the request appropriately. func (q *queue) Enqueue(request *request) { @@ -129,6 +145,8 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump { i++ return true }) + + // TODO: change QueueDump to include queueSum stats return debug.QueueDump{ VirtualStart: q.nextDispatchR.ToFloat(), // TODO: change QueueDump to use SeatSeconds Requests: digest, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index 10333d60a83..52eb3fc04e1 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -34,9 +34,14 @@ const ( ) type WorkEstimate struct { - // InitialSeats represents the number of initial seats associated with this request + // InitialSeats is the number of seats occupied while the server is + // executing this request. InitialSeats uint + // FinalSeats is the number of seats occupied at the end, + // during the AdditionalLatency. + FinalSeats uint + // AdditionalLatency specifies the additional duration the seats allocated // to this request must be reserved after the given request had finished. // AdditionalLatency should not have any impact on the user experience, the @@ -44,6 +49,16 @@ type WorkEstimate struct { AdditionalLatency time.Duration } +// MaxSeats returns the number of seats this request requires, it is the maximum +// of the two, WorkEstimate.InitialSeats and WorkEstimate.FinalSeats. +func (we *WorkEstimate) MaxSeats() int { + if we.InitialSeats >= we.FinalSeats { + return int(we.InitialSeats) + } + + return int(we.FinalSeats) +} + // objectCountGetterFunc represents a function that gets the total // number of objects for a given resource. type objectCountGetterFunc func(string) (int64, error)