Adjust final seats if they don't fit the limit

This commit is contained in:
wojtekt 2021-10-08 11:14:11 +02:00
parent 223f9be597
commit c5a77d8a76
3 changed files with 45 additions and 3 deletions

View File

@ -748,6 +748,16 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
return nil
}
// If the requested final seats exceed capacity of that queue,
// we reduce them to current capacity and adjust additional latency
// to preserve the total amount of work.
if oldestReqFromMinQueue.workEstimate.FinalSeats > uint(qs.dCfg.ConcurrencyLimit) {
finalSeats := uint(qs.dCfg.ConcurrencyLimit)
additionalLatency := oldestReqFromMinQueue.workEstimate.finalWork.DurationPerSeat(float64(finalSeats))
oldestReqFromMinQueue.workEstimate.FinalSeats = finalSeats
oldestReqFromMinQueue.workEstimate.AdditionalLatency = additionalLatency
}
// we set the round robin indexing to start at the chose queue
// for the next round. This way the non-selected queues
// win in the case that the virtual finish times are the same

View File

@ -43,3 +43,22 @@ func TestSeatSecondsString(t *testing.T) {
}
}
}
func TestSeatSecondsPerSeat(t *testing.T) {
testCases := []struct {
ss SeatSeconds
seats float64
expect time.Duration
}{
{ss: SeatsTimesDuration(10, time.Second), seats: 1, expect: 10 * time.Second},
{ss: SeatsTimesDuration(1, time.Second), seats: 10, expect: 100 * time.Millisecond},
{ss: SeatsTimesDuration(13, 5*time.Millisecond), seats: 5, expect: 13 * time.Millisecond},
{ss: SeatsTimesDuration(12, 0), seats: 10, expect: 0},
}
for _, testCase := range testCases {
actualDuration := testCase.ss.DurationPerSeat(testCase.seats)
if actualDuration != testCase.expect {
t.Errorf("DurationPerSeats returned %v rather than expected %q", actualDuration, testCase.expect)
}
}
}

View File

@ -81,6 +81,7 @@ type request struct {
type completedWorkEstimate struct {
fcrequest.WorkEstimate
totalWork SeatSeconds // initial plus final work
finalWork SeatSeconds // only final work
}
// queue is an array of requests with additional metadata required for
@ -122,14 +123,20 @@ func (req *request) totalWork() SeatSeconds {
}
func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWorkEstimate {
finalWork := qs.computeFinalWork(we)
return completedWorkEstimate{
WorkEstimate: *we,
totalWork: qs.computeTotalWork(we),
totalWork: qs.computeInitialWork(we) + finalWork,
finalWork: finalWork,
}
}
func (qs *queueSet) computeTotalWork(we *fcrequest.WorkEstimate) SeatSeconds {
return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) + SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency)
func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) SeatSeconds {
return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration)
}
func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) SeatSeconds {
return SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency)
}
// Enqueue enqueues a request into the queue and
@ -199,6 +206,12 @@ func (ss SeatSeconds) ToFloat() float64 {
return float64(ss) / ssScale
}
// DurationPerSeat returns duration per seat.
// This division may lose precision.
func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration {
return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale))
}
// String converts to a string.
// This is suitable for large as well as small values.
func (ss SeatSeconds) String() string {