From c5a77d8a761b0651b53864f9e396d6f23efd01d2 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Fri, 8 Oct 2021 11:14:11 +0200 Subject: [PATCH] Adjust final seats if they don't fit the limit --- .../fairqueuing/queueset/queueset.go | 10 ++++++++++ .../fairqueuing/queueset/seatsecs_test.go | 19 +++++++++++++++++++ .../flowcontrol/fairqueuing/queueset/types.go | 19 ++++++++++++++++--- 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 08a4ab385fd..115ad73187d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -748,6 +748,16 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { return nil } + // If the requested final seats exceed capacity of that queue, + // we reduce them to current capacity and adjust additional latency + // to preserve the total amount of work. + if oldestReqFromMinQueue.workEstimate.FinalSeats > uint(qs.dCfg.ConcurrencyLimit) { + finalSeats := uint(qs.dCfg.ConcurrencyLimit) + additionalLatency := oldestReqFromMinQueue.workEstimate.finalWork.DurationPerSeat(float64(finalSeats)) + oldestReqFromMinQueue.workEstimate.FinalSeats = finalSeats + oldestReqFromMinQueue.workEstimate.AdditionalLatency = additionalLatency + } + // we set the round robin indexing to start at the chose queue // for the next round. This way the non-selected queues // win in the case that the virtual finish times are the same diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go index b0a98b2570b..781f1424edc 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go @@ -43,3 +43,22 @@ func TestSeatSecondsString(t *testing.T) { } } } + +func TestSeatSecondsPerSeat(t *testing.T) { + testCases := []struct { + ss SeatSeconds + seats float64 + expect time.Duration + }{ + {ss: SeatsTimesDuration(10, time.Second), seats: 1, expect: 10 * time.Second}, + {ss: SeatsTimesDuration(1, time.Second), seats: 10, expect: 100 * time.Millisecond}, + {ss: SeatsTimesDuration(13, 5*time.Millisecond), seats: 5, expect: 13 * time.Millisecond}, + {ss: SeatsTimesDuration(12, 0), seats: 10, expect: 0}, + } + for _, testCase := range testCases { + actualDuration := testCase.ss.DurationPerSeat(testCase.seats) + if actualDuration != testCase.expect { + t.Errorf("DurationPerSeats returned %v rather than expected %q", actualDuration, testCase.expect) + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 7e9a3f3bdf1..08a6f6b3dfc 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -81,6 +81,7 @@ type request struct { type completedWorkEstimate struct { fcrequest.WorkEstimate totalWork SeatSeconds // initial plus final work + finalWork SeatSeconds // only final work } // queue is an array of requests with additional metadata required for @@ -122,14 +123,20 @@ func (req *request) totalWork() SeatSeconds { } func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWorkEstimate { + finalWork := qs.computeFinalWork(we) return completedWorkEstimate{ WorkEstimate: *we, - totalWork: qs.computeTotalWork(we), + totalWork: qs.computeInitialWork(we) + finalWork, + finalWork: finalWork, } } -func (qs *queueSet) computeTotalWork(we *fcrequest.WorkEstimate) SeatSeconds { - return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) + SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) +func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) SeatSeconds { + return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) +} + +func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) SeatSeconds { + return SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) } // Enqueue enqueues a request into the queue and @@ -199,6 +206,12 @@ func (ss SeatSeconds) ToFloat() float64 { return float64(ss) / ssScale } +// DurationPerSeat returns duration per seat. +// This division may lose precision. +func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration { + return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale)) +} + // String converts to a string. // This is suitable for large as well as small values. func (ss SeatSeconds) String() string {