introduce final seats for work estimate

This commit is contained in:
Abu Kashem 2021-09-24 15:18:27 -04:00
parent 907d62eac8
commit 3d6cc118fe
No known key found for this signature in database
GPG Key ID: 33A4FA7088DB68A9
7 changed files with 218 additions and 73 deletions

View File

@ -653,7 +653,9 @@ func TestApfWithRequestDigest(t *testing.T) {
RequestInfo: &apirequest.RequestInfo{Verb: "get"}, RequestInfo: &apirequest.RequestInfo{Verb: "get"},
User: &user.DefaultInfo{Name: "foo"}, User: &user.DefaultInfo{Name: "foo"},
WorkEstimate: fcrequest.WorkEstimate{ WorkEstimate: fcrequest.WorkEstimate{
InitialSeats: 5, InitialSeats: 5,
FinalSeats: 7,
AdditionalLatency: 3 * time.Second,
}, },
} }

View File

@ -51,9 +51,9 @@ type fifo interface {
// Length returns the number of requests in the list. // Length returns the number of requests in the list.
Length() int Length() int
// SeatsSum returns the total number of seats of all requests // QueueSum returns the sum of initial seats, final seats, and
// in this list. // additional latency aggregated from all requests in this queue.
SeatsSum() int QueueSum() queueSum
// Walk iterates through the list in order of oldest -> newest // Walk iterates through the list in order of oldest -> newest
// and executes the specified walkFunc for each request in that order. // and executes the specified walkFunc for each request in that order.
@ -68,7 +68,7 @@ type fifo interface {
type requestFIFO struct { type requestFIFO struct {
*list.List *list.List
seatsSum int sum queueSum
} }
func newRequestFIFO() fifo { func newRequestFIFO() fifo {
@ -81,19 +81,19 @@ func (l *requestFIFO) Length() int {
return l.Len() return l.Len()
} }
func (l *requestFIFO) SeatsSum() int { func (l *requestFIFO) QueueSum() queueSum {
return l.seatsSum return l.sum
} }
func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc { func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc {
e := l.PushBack(req) e := l.PushBack(req)
l.seatsSum += req.Seats() addToQueueSum(&l.sum, req)
return func() *request { return func() *request {
if e.Value != nil { if e.Value != nil {
l.Remove(e) l.Remove(e)
e.Value = nil e.Value = nil
l.seatsSum -= req.Seats() deductFromQueueSum(&l.sum, req)
} }
return req return req
} }
@ -122,7 +122,7 @@ func (l *requestFIFO) getFirst(remove bool) (*request, bool) {
request, ok := e.Value.(*request) request, ok := e.Value.(*request)
if remove && ok { if remove && ok {
l.seatsSum -= request.Seats() deductFromQueueSum(&l.sum, request)
} }
return request, ok return request, ok
} }
@ -136,3 +136,15 @@ func (l *requestFIFO) Walk(f walkFunc) {
} }
} }
} }
func addToQueueSum(sum *queueSum, req *request) {
sum.InitialSeatsSum += req.InitialSeats()
sum.MaxSeatsSum += req.MaxSeats()
sum.AdditionalSeatSecondsSum += req.AdditionalSeatSeconds()
}
func deductFromQueueSum(sum *queueSum, req *request) {
sum.InitialSeatsSum -= req.InitialSeats()
sum.MaxSeatsSum -= req.MaxSeats()
sum.AdditionalSeatSecondsSum -= req.AdditionalSeatSeconds()
}

View File

@ -18,9 +18,11 @@ package queueset
import ( import (
"math/rand" "math/rand"
"reflect"
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
) )
@ -150,59 +152,88 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
verifyOrder(t, orderExpected, remainingRequests) verifyOrder(t, orderExpected, remainingRequests)
} }
func TestFIFOSeatsSum(t *testing.T) { func TestFIFOQueueWorkEstimate(t *testing.T) {
list := newRequestFIFO() list := newRequestFIFO()
newRequest := func(width uint) *request { update := func(we *queueSum, req *request, multiplier int) {
return &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: width}} we.InitialSeatsSum += multiplier * req.InitialSeats()
we.MaxSeatsSum += multiplier * req.MaxSeats()
we.AdditionalSeatSecondsSum += SeatSeconds(multiplier) * req.AdditionalSeatSeconds()
}
assert := func(t *testing.T, want, got *queueSum) {
if !reflect.DeepEqual(want, got) {
t.Errorf("Expected queue work estimate to match, diff: %s", cmp.Diff(want, got))
}
}
newRequest := func(initialSeats, finalSeats uint, additionalLatency time.Duration) *request {
return &request{workEstimate: fcrequest.WorkEstimate{
InitialSeats: initialSeats,
FinalSeats: finalSeats,
AdditionalLatency: additionalLatency,
}}
}
arrival := []*request{
newRequest(1, 3, time.Second),
newRequest(2, 2, 2*time.Second),
newRequest(3, 1, 3*time.Second),
} }
arrival := []*request{newRequest(1), newRequest(2), newRequest(3)}
removeFn := make([]removeFromFIFOFunc, 0) removeFn := make([]removeFromFIFOFunc, 0)
seatsSum := 0 queueSumExpected := queueSum{}
for i := range arrival { for i := range arrival {
removeFn = append(removeFn, list.Enqueue(arrival[i])) req := arrival[i]
removeFn = append(removeFn, list.Enqueue(req))
update(&queueSumExpected, req, 1)
seatsSum += i + 1 workEstimateGot := list.QueueSum()
if list.SeatsSum() != seatsSum { assert(t, &queueSumExpected, &workEstimateGot)
t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
}
} }
// NOTE: the test expects the request and the remove func to be at the same index
for i := range removeFn { for i := range removeFn {
req := arrival[i]
removeFn[i]() removeFn[i]()
seatsSum -= i + 1 update(&queueSumExpected, req, -1)
if list.SeatsSum() != seatsSum {
t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) workEstimateGot := list.QueueSum()
} assert(t, &queueSumExpected, &workEstimateGot)
// check idempotency // check idempotency
removeFn[i]() removeFn[i]()
if list.SeatsSum() != seatsSum {
t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) workEstimateGot = list.QueueSum()
} assert(t, &queueSumExpected, &workEstimateGot)
} }
// Check second type of idempotency: Dequeue + removeFn. // Check second type of idempotency: Dequeue + removeFn.
for i := range arrival { for i := range arrival {
removeFn[i] = list.Enqueue(arrival[i]) req := arrival[i]
seatsSum += i + 1 removeFn[i] = list.Enqueue(req)
update(&queueSumExpected, req, 1)
} }
for i := range arrival { for i := range arrival {
// we expect Dequeue to pop the oldest request that should
// have the lowest index as well.
req := arrival[i]
if _, ok := list.Dequeue(); !ok { if _, ok := list.Dequeue(); !ok {
t.Errorf("Unexpected failed dequeue: %d", i) t.Errorf("Unexpected failed dequeue: %d", i)
} }
seatsSum -= i + 1
if list.SeatsSum() != seatsSum { update(&queueSumExpected, req, -1)
t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
} queueSumGot := list.QueueSum()
assert(t, &queueSumExpected, &queueSumGot)
removeFn[i]() removeFn[i]()
if list.SeatsSum() != seatsSum {
t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum()) queueSumGot = list.QueueSum()
} assert(t, &queueSumExpected, &queueSumGot)
} }
} }

View File

@ -268,7 +268,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo
// Step 0: // Step 0:
// Apply only concurrency limit, if zero queues desired // Apply only concurrency limit, if zero queues desired
if qs.qCfg.DesiredNumQueues < 1 { if qs.qCfg.DesiredNumQueues < 1 {
if !qs.canAccommodateSeatsLocked(int(workEstimate.InitialSeats)) { if !qs.canAccommodateSeatsLocked(workEstimate.MaxSeats()) {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d", klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d",
qs.qCfg.Name, fsName, descr1, descr2, workEstimate, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) qs.qCfg.Name, fsName, descr1, descr2, workEstimate, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
@ -315,11 +315,28 @@ func ordinaryPromiseFactoryFactory(qs *queueSet) promiseFactory {
return promise.NewWriteOnce return promise.NewWriteOnce
} }
// Seats returns the number of seats this request requires. // MaxSeats returns the maximum number of seats this request requires, it is
func (req *request) Seats() int { // the maxumum of the two - WorkEstimate.InitialSeats, WorkEstimate.FinalSeats.
func (req *request) MaxSeats() int {
return req.workEstimate.MaxSeats()
}
func (req *request) InitialSeats() int {
return int(req.workEstimate.InitialSeats) return int(req.workEstimate.InitialSeats)
} }
// AdditionalSeatSeconds returns the amount of work in SeatSeconds produced by
// the final seats and the additional latency associated with a request.
func (req *request) AdditionalSeatSeconds() SeatSeconds {
return SeatsTimesDuration(float64(req.workEstimate.FinalSeats), req.workEstimate.AdditionalLatency)
}
// InitialSeatSeconds returns the amount of work in SeatSeconds projected
// by the initial seats for a given estimated service duration.
func (req *request) InitialSeatSeconds(estimatedServiceDuration time.Duration) SeatSeconds {
return SeatsTimesDuration(float64(req.workEstimate.InitialSeats), estimatedServiceDuration)
}
func (req *request) NoteQueued(inQueue bool) { func (req *request) NoteQueued(inQueue bool) {
if req.queueNoteFn != nil { if req.queueNoteFn != nil {
req.queueNoteFn(inQueue) req.queueNoteFn(inQueue)
@ -423,7 +440,9 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
activeQueues := 0 activeQueues := 0
seatsRequested := 0 seatsRequested := 0
for _, queue := range qs.queues { for _, queue := range qs.queues {
seatsRequested += (queue.seatsInUse + queue.requests.SeatsSum()) // here we want the sum of the maximum width of the requests in this queue since our
// goal is to find the maximum rate at which the queue could work.
seatsRequested += (queue.seatsInUse + queue.requests.QueueSum().MaxSeatsSum)
if queue.requests.Length() > 0 || queue.requestsExecuting > 0 { if queue.requests.Length() > 0 || queue.requestsExecuting > 0 {
activeQueues++ activeQueues++
} }
@ -489,25 +508,24 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
offset := qs.enqueues % handSize offset := qs.enqueues % handSize
qs.enqueues++ qs.enqueues++
bestQueueIdx := -1 bestQueueIdx := -1
bestQueueSeatsSum := int(math.MaxInt32) minQueueSeatSeconds := MaxSeatSeconds
for i := 0; i < handSize; i++ { for i := 0; i < handSize; i++ {
queueIdx := hand[(offset+i)%handSize] queueIdx := hand[(offset+i)%handSize]
queue := qs.queues[queueIdx] queue := qs.queues[queueIdx]
waitingSeats := queue.requests.SeatsSum() queueSum := queue.requests.QueueSum()
// TODO: Consider taking into account `additional latency` of requests
// in addition to their seats.
// Ideally, this should be based on projected completion time in the
// virtual world of the youngest request in the queue.
thisSeatsSum := waitingSeats + queue.seatsInUse
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.nextDispatchR)
if thisSeatsSum < bestQueueSeatsSum {
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
}
// this is the total amount of work in seat-seconds for requests
// waiting in this queue, we will select the queue with the minimum.
thisQueueSeatSeconds := SeatsTimesDuration(float64(queueSum.InitialSeatsSum), qs.estimatedServiceDuration) + queueSum.AdditionalSeatSecondsSum
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d with sum: %#v and %d seats in use, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, queueSum, queue.seatsInUse, queue.nextDispatchR)
if thisQueueSeatSeconds < minQueueSeatSeconds {
minQueueSeatSeconds = thisQueueSeatSeconds
bestQueueIdx = queueIdx
}
} }
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
chosenQueue := qs.queues[bestQueueIdx] chosenQueue := qs.queues[bestQueueIdx]
klog.V(6).Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.nextDispatchR) klog.V(6).Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, with sum: %#v & %d seats in use & nextDispatchR=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.QueueSum(), chosenQueue.seatsInUse, chosenQueue.nextDispatchR)
} }
return bestQueueIdx return bestQueueIdx
} }
@ -619,9 +637,9 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
workEstimate: *workEstimate, workEstimate: *workEstimate,
} }
qs.totRequestsExecuting++ qs.totRequestsExecuting++
qs.totSeatsInUse += req.Seats() qs.totSeatsInUse += req.MaxSeats()
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
qs.obsPair.RequestsExecuting.Add(1) qs.obsPair.RequestsExecuting.Add(1)
if klog.V(5).Enabled() { if klog.V(5).Enabled() {
klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting) klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting)
@ -650,13 +668,13 @@ func (qs *queueSet) dispatchLocked() bool {
// problem because other overhead is also included. // problem because other overhead is also included.
qs.totRequestsWaiting-- qs.totRequestsWaiting--
qs.totRequestsExecuting++ qs.totRequestsExecuting++
qs.totSeatsInUse += request.Seats() qs.totSeatsInUse += request.MaxSeats()
queue.requestsExecuting++ queue.requestsExecuting++
queue.seatsInUse += request.Seats() queue.seatsInUse += request.MaxSeats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
request.NoteQueued(false) request.NoteQueued(false)
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.Seats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsWaiting.Add(-1)
qs.obsPair.RequestsExecuting.Add(1) qs.obsPair.RequestsExecuting.Add(1)
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
@ -665,7 +683,7 @@ func (qs *queueSet) dispatchLocked() bool {
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
} }
// When a request is dequeued for service -> qs.virtualStart += G * width // When a request is dequeued for service -> qs.virtualStart += G * width
queue.nextDispatchR += SeatsTimesDuration(float64(request.Seats()), qs.estimatedServiceDuration) queue.nextDispatchR += request.InitialSeatSeconds(qs.estimatedServiceDuration) + request.AdditionalSeatSeconds()
qs.boundNextDispatch(queue) qs.boundNextDispatch(queue)
request.decision.Set(decisionExecute) request.decision.Set(decisionExecute)
return ok return ok
@ -716,7 +734,7 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress) dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress) dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
currentVirtualFinish := queue.nextDispatchR + SeatsTimesDuration(float64(oldestWaiting.Seats()), qs.estimatedServiceDuration) currentVirtualFinish := queue.nextDispatchR + oldestWaiting.InitialSeatSeconds(qs.estimatedServiceDuration) + oldestWaiting.AdditionalSeatSeconds()
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish) klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
if currentVirtualFinish < minVirtualFinish { if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish minVirtualFinish = currentVirtualFinish
@ -732,12 +750,12 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name) klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
return nil return nil
} }
if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) { if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.MaxSeats()) {
// since we have not picked the queue with the minimum virtual finish // since we have not picked the queue with the minimum virtual finish
// time, we are not going to advance the round robin index here. // time, we are not going to advance the round robin index here.
if klog.V(4).Enabled() { if klog.V(4).Enabled() {
klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d", klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d",
qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.Seats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit) qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.MaxSeats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
} }
return nil return nil
} }
@ -799,10 +817,10 @@ func (qs *queueSet) finishRequestLocked(r *request) {
releaseSeatsLocked := func() { releaseSeatsLocked := func() {
defer qs.removeQueueIfEmptyLocked(r) defer qs.removeQueueIfEmptyLocked(r)
qs.totSeatsInUse -= r.Seats() qs.totSeatsInUse -= r.MaxSeats()
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats())
if r.queue != nil { if r.queue != nil {
r.queue.seatsInUse -= r.Seats() r.queue.seatsInUse -= r.MaxSeats()
} }
} }
@ -812,9 +830,9 @@ func (qs *queueSet) finishRequestLocked(r *request) {
releaseSeatsLocked() releaseSeatsLocked()
if !klog.V(6).Enabled() { if !klog.V(6).Enabled() {
} else if r.queue != nil { } else if r.queue != nil {
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats", klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue sum: %#v, %d requests waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.MaxSeats(), r.queue.index,
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.QueueSum(), r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.seatsInUse)
} else { } else {
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
} }
@ -824,9 +842,9 @@ func (qs *queueSet) finishRequestLocked(r *request) {
additionalLatency := r.workEstimate.AdditionalLatency additionalLatency := r.workEstimate.AdditionalLatency
if !klog.V(6).Enabled() { if !klog.V(6).Enabled() {
} else if r.queue != nil { } else if r.queue != nil {
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing", klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests waiting & %d executing, queue sum: %#v & %d seats in uses",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index, qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index,
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse) r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.QueueSum(), r.queue.seatsInUse)
} else { } else {
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse) klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
} }
@ -841,9 +859,9 @@ func (qs *queueSet) finishRequestLocked(r *request) {
releaseSeatsLocked() releaseSeatsLocked()
if !klog.V(6).Enabled() { if !klog.V(6).Enabled() {
} else if r.queue != nil { } else if r.queue != nil {
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats", klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, queue sum: %#v, & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index,
r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
} else { } else {
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
} }
@ -857,7 +875,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// When a request finishes being served, and the actual service time was S, // When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width. // the queues start R is decremented by (G - S)*width.
r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.Seats()), qs.estimatedServiceDuration-actualServiceDuration) r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration)
qs.boundNextDispatch(r.queue) qs.boundNextDispatch(r.queue)
} }
} }

View File

@ -1354,6 +1354,55 @@ func TestFinishRequestLocked(t *testing.T) {
} }
} }
func TestRequestSeats(t *testing.T) {
tests := []struct {
name string
request *request
expected int
}{
{
name: "",
request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 3}},
expected: 3,
},
{
name: "",
request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1, FinalSeats: 3}},
expected: 3,
},
{
name: "",
request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 1}},
expected: 3,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
seatsGot := test.request.MaxSeats()
if test.expected != seatsGot {
t.Errorf("Expected seats: %d, got %d", test.expected, seatsGot)
}
})
}
}
func TestRequestAdditionalSeatSeconds(t *testing.T) {
request := &request{
workEstimate: fcrequest.WorkEstimate{
InitialSeats: 3,
FinalSeats: 5,
AdditionalLatency: 3 * time.Second,
},
}
got := request.AdditionalSeatSeconds()
want := SeatsTimesDuration(5, 3*time.Second)
if want != got {
t.Errorf("Expected AdditionalSeatSeconds: %v, but got: %v", want, got)
}
}
func newFIFO(requests ...*request) fifo { func newFIFO(requests ...*request) fifo {
l := newRequestFIFO() l := newRequestFIFO()
for i := range requests { for i := range requests {

View File

@ -97,6 +97,22 @@ type queue struct {
seatsInUse int seatsInUse int
} }
// queueSum tracks the sum of initial seats, final seats, and
// additional latency aggregated from all requests in a given queue
type queueSum struct {
// InitialSeatsSum is the sum of InitialSeats
// associated with all requests in a given queue.
InitialSeatsSum int
// MaxSeatsSum is the sum of MaxSeats
// associated with all requests in a given queue.
MaxSeatsSum int
// AdditionalSeatSecondsSum is sum of AdditionalSeatsSeconds
// associated with all requests in a given queue.
AdditionalSeatSecondsSum SeatSeconds
}
// Enqueue enqueues a request into the queue and // Enqueue enqueues a request into the queue and
// sets the removeFromQueueFn of the request appropriately. // sets the removeFromQueueFn of the request appropriately.
func (q *queue) Enqueue(request *request) { func (q *queue) Enqueue(request *request) {
@ -129,6 +145,8 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump {
i++ i++
return true return true
}) })
// TODO: change QueueDump to include queueSum stats
return debug.QueueDump{ return debug.QueueDump{
VirtualStart: q.nextDispatchR.ToFloat(), // TODO: change QueueDump to use SeatSeconds VirtualStart: q.nextDispatchR.ToFloat(), // TODO: change QueueDump to use SeatSeconds
Requests: digest, Requests: digest,

View File

@ -34,9 +34,14 @@ const (
) )
type WorkEstimate struct { type WorkEstimate struct {
// InitialSeats represents the number of initial seats associated with this request // InitialSeats is the number of seats occupied while the server is
// executing this request.
InitialSeats uint InitialSeats uint
// FinalSeats is the number of seats occupied at the end,
// during the AdditionalLatency.
FinalSeats uint
// AdditionalLatency specifies the additional duration the seats allocated // AdditionalLatency specifies the additional duration the seats allocated
// to this request must be reserved after the given request had finished. // to this request must be reserved after the given request had finished.
// AdditionalLatency should not have any impact on the user experience, the // AdditionalLatency should not have any impact on the user experience, the
@ -44,6 +49,16 @@ type WorkEstimate struct {
AdditionalLatency time.Duration AdditionalLatency time.Duration
} }
// MaxSeats returns the number of seats this request requires, it is the maximum
// of the two, WorkEstimate.InitialSeats and WorkEstimate.FinalSeats.
func (we *WorkEstimate) MaxSeats() int {
if we.InitialSeats >= we.FinalSeats {
return int(we.InitialSeats)
}
return int(we.FinalSeats)
}
// objectCountGetterFunc represents a function that gets the total // objectCountGetterFunc represents a function that gets the total
// number of objects for a given resource. // number of objects for a given resource.
type objectCountGetterFunc func(string) (int64, error) type objectCountGetterFunc func(string) (int64, error)