From e262db7a4daf5218520e49b423789ea55a94af75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 27 Oct 2021 10:30:25 +0200 Subject: [PATCH] P&F: move seat-seconds to a better location --- .../fairqueuing/queueset/fifo_list_test.go | 2 +- .../fairqueuing/queueset/queueset.go | 28 ++++---- .../fairqueuing/queueset/queueset_test.go | 20 +++--- .../flowcontrol/fairqueuing/queueset/types.go | 64 +++--------------- .../util/flowcontrol/request/seat_seconds.go | 65 +++++++++++++++++++ .../seat_seconds_test.go} | 2 +- 6 files changed, 101 insertions(+), 80 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds.go rename staging/src/k8s.io/apiserver/pkg/util/flowcontrol/{fairqueuing/queueset/seatsecs_test.go => request/seat_seconds_test.go} (99%) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go index 9c77e3bd0cb..22757cff1e6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go @@ -186,7 +186,7 @@ func TestFIFOQueueWorkEstimate(t *testing.T) { update := func(we *queueSum, req *request, multiplier int) { we.InitialSeatsSum += multiplier * req.InitialSeats() we.MaxSeatsSum += multiplier * req.MaxSeats() - we.TotalWorkSum += SeatSeconds(multiplier) * req.totalWork() + we.TotalWorkSum += fcrequest.SeatSeconds(multiplier) * req.totalWork() } assert := func(t *testing.T, want, got *queueSum) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 6f74492e3ca..b6e254c0a8e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -105,7 +105,7 @@ type queueSet struct { // currentR is the amount of seat-seconds allocated per queue since process startup. // This is our generalization of the progress meter named R in the original fair queuing work. - currentR SeatSeconds + currentR fqrequest.SeatSeconds // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated lastRealTime time.Time @@ -422,7 +422,7 @@ func (qs *queueSet) syncTimeLocked(ctx context.Context) { timeSinceLast := realNow.Sub(qs.lastRealTime) qs.lastRealTime = realNow prevR := qs.currentR - incrR := SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast) + incrR := fqrequest.SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast) qs.currentR = prevR + incrR switch { case prevR > qs.currentR: @@ -435,7 +435,7 @@ func (qs *queueSet) syncTimeLocked(ctx context.Context) { // rDecrement is the amount by which the progress meter R is wound backwards // when needed to avoid overflow. -const rDecrement = MaxSeatSeconds / 2 +const rDecrement = fqrequest.MaxSeatSeconds / 2 // highR is the threshold that triggers advance of the epoch. // That is, decrementing the global progress meter R by rDecrement. @@ -444,7 +444,7 @@ const highR = rDecrement + rDecrement/2 // advanceEpoch subtracts rDecrement from the global progress meter R // and all the readings that have been taked from that meter. // The now and incrR parameters are only used to add info to the log messages. -func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR SeatSeconds) { +func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqrequest.SeatSeconds) { oldR := qs.currentR qs.currentR -= rDecrement klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR) @@ -550,7 +550,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac offset := qs.enqueues % handSize qs.enqueues++ bestQueueIdx := -1 - minQueueSeatSeconds := MaxSeatSeconds + minQueueSeatSeconds := fqrequest.MaxSeatSeconds for i := 0; i < handSize; i++ { queueIdx := hand[(offset+i)%handSize] queue := qs.queues[queueIdx] @@ -745,11 +745,11 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { // the oldest waiting request is minimal, and also returns that request. // Returns nils if the head of the selected queue can not be dispatched now. func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) { - minVirtualFinish := MaxSeatSeconds - sMin := MaxSeatSeconds - dsMin := MaxSeatSeconds - sMax := MinSeatSeconds - dsMax := MinSeatSeconds + minVirtualFinish := fqrequest.MaxSeatSeconds + sMin := fqrequest.MaxSeatSeconds + dsMin := fqrequest.MaxSeatSeconds + sMax := fqrequest.MinSeatSeconds + dsMax := fqrequest.MinSeatSeconds var minQueue *queue var minIndex int nq := len(qs.queues) @@ -760,7 +760,7 @@ func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) { if oldestWaiting != nil { sMin = ssMin(sMin, queue.nextDispatchR) sMax = ssMax(sMax, queue.nextDispatchR) - estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) + estimatedWorkInProgress := fqrequest.SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress) dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress) currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork() @@ -812,14 +812,14 @@ func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) { return minQueue, oldestReqFromMinQueue } -func ssMin(a, b SeatSeconds) SeatSeconds { +func ssMin(a, b fqrequest.SeatSeconds) fqrequest.SeatSeconds { if a > b { return b } return a } -func ssMax(a, b SeatSeconds) SeatSeconds { +func ssMax(a, b fqrequest.SeatSeconds) fqrequest.SeatSeconds { if a < b { return b } @@ -915,7 +915,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { // When a request finishes being served, and the actual service time was S, // the queue’s start R is decremented by (G - S)*width. - r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration) + r.queue.nextDispatchR -= fqrequest.SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration) qs.boundNextDispatchLocked(r.queue) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index ab1343cd3ae..9da21f1e34a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -1173,13 +1173,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - nextDispatchR: SeatsTimesDuration(1, 200*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, ), }, { - nextDispatchR: SeatsTimesDuration(1, 100*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, ), @@ -1196,7 +1196,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - nextDispatchR: SeatsTimesDuration(1, 200*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, ), @@ -1213,13 +1213,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - nextDispatchR: SeatsTimesDuration(1, 200*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})}, ), }, { - nextDispatchR: SeatsTimesDuration(1, 100*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, ), @@ -1236,13 +1236,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - nextDispatchR: SeatsTimesDuration(1, 200*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, ), }, { - nextDispatchR: SeatsTimesDuration(1, 100*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, ), @@ -1259,13 +1259,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - nextDispatchR: SeatsTimesDuration(1, 200*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, ), }, { - nextDispatchR: SeatsTimesDuration(1, 100*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, ), @@ -1446,7 +1446,7 @@ func TestRequestWork(t *testing.T) { } got := request.totalWork() - want := SeatsTimesDuration(3, 2*time.Second) + SeatsTimesDuration(50, 70*time.Second) + want := fcrequest.SeatsTimesDuration(3, 2*time.Second) + fcrequest.SeatsTimesDuration(50, 70*time.Second) if want != got { t.Errorf("Expected totalWork: %v, but got: %v", want, got) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index d4f7d5d1d52..f1073b96b28 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -18,8 +18,6 @@ package queueset import ( "context" - "fmt" - "math" "time" genericrequest "k8s.io/apiserver/pkg/endpoints/request" @@ -74,7 +72,7 @@ type request struct { // arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time". // This field is meaningful only while the request is waiting in the virtual world. - arrivalR SeatSeconds + arrivalR fcrequest.SeatSeconds // startTime is the real time when the request began executing startTime time.Time @@ -85,8 +83,8 @@ type request struct { type completedWorkEstimate struct { fcrequest.WorkEstimate - totalWork SeatSeconds // initial plus final work - finalWork SeatSeconds // only final work + totalWork fcrequest.SeatSeconds // initial plus final work + finalWork fcrequest.SeatSeconds // only final work } // queue is a sequence of requests that have arrived but not yet finished @@ -97,7 +95,7 @@ type queue struct { // nextDispatchR is the R progress meter reading at // which the next request will be dispatched in the virtual world. - nextDispatchR SeatSeconds + nextDispatchR fcrequest.SeatSeconds // requestsExecuting is the count in the real world. requestsExecuting int @@ -122,10 +120,10 @@ type queueSum struct { MaxSeatsSum int // TotalWorkSum is the sum of totalWork of the waiting requests - TotalWorkSum SeatSeconds + TotalWorkSum fcrequest.SeatSeconds } -func (req *request) totalWork() SeatSeconds { +func (req *request) totalWork() fcrequest.SeatSeconds { return req.workEstimate.totalWork } @@ -138,12 +136,12 @@ func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWo } } -func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) SeatSeconds { - return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) +func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) fcrequest.SeatSeconds { + return fcrequest.SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) } -func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) SeatSeconds { - return SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) +func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) fcrequest.SeatSeconds { + return fcrequest.SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) } func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump { @@ -183,45 +181,3 @@ func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump { QueueSum: queueSum, } } - -// SeatSeconds is a measure of work, in units of seat-seconds, using a fixed-point representation. -// `SeatSeconds(n)` represents `n/ssScale` seat-seconds. -// The constants `ssScale` and `ssScaleDigits` are private to the implementation here, -// no other code should use them. -type SeatSeconds uint64 - -// MaxSeatsSeconds is the maximum representable value of SeatSeconds -const MaxSeatSeconds = SeatSeconds(math.MaxUint64) - -// MinSeatSeconds is the lowest representable value of SeatSeconds -const MinSeatSeconds = SeatSeconds(0) - -// SeatsTimeDuration produces the SeatSeconds value for the given factors. -// This is intended only to produce small values, increments in work -// rather than amount of work done since process start. -func SeatsTimesDuration(seats float64, duration time.Duration) SeatSeconds { - return SeatSeconds(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale))) -} - -// ToFloat converts to a floating-point representation. -// This conversion may lose precision. -func (ss SeatSeconds) ToFloat() float64 { - return float64(ss) / ssScale -} - -// DurationPerSeat returns duration per seat. -// This division may lose precision. -func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration { - return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale)) -} - -// String converts to a string. -// This is suitable for large as well as small values. -func (ss SeatSeconds) String() string { - const div = SeatSeconds(ssScale) - quo := ss / div - rem := ss - quo*div - return fmt.Sprintf("%d.%08dss", quo, rem) -} - -const ssScale = 1e8 diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds.go new file mode 100644 index 00000000000..e3a40174524 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds.go @@ -0,0 +1,65 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "fmt" + "math" + "time" +) + +// SeatSeconds is a measure of work, in units of seat-seconds, using a fixed-point representation. +// `SeatSeconds(n)` represents `n/ssScale` seat-seconds. +// The `ssScale` constant is private to the implementation here, +// no other code should use it. +type SeatSeconds uint64 + +// MaxSeatsSeconds is the maximum representable value of SeatSeconds +const MaxSeatSeconds = SeatSeconds(math.MaxUint64) + +// MinSeatSeconds is the lowest representable value of SeatSeconds +const MinSeatSeconds = SeatSeconds(0) + +// SeatsTimeDuration produces the SeatSeconds value for the given factors. +// This is intended only to produce small values, increments in work +// rather than amount of work done since process start. +func SeatsTimesDuration(seats float64, duration time.Duration) SeatSeconds { + return SeatSeconds(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale))) +} + +// ToFloat converts to a floating-point representation. +// This conversion may lose precision. +func (ss SeatSeconds) ToFloat() float64 { + return float64(ss) / ssScale +} + +// DurationPerSeat returns duration per seat. +// This division may lose precision. +func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration { + return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale)) +} + +// String converts to a string. +// This is suitable for large as well as small values. +func (ss SeatSeconds) String() string { + const div = SeatSeconds(ssScale) + quo := ss / div + rem := ss - quo*div + return fmt.Sprintf("%d.%08dss", quo, rem) +} + +const ssScale = 1e8 diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds_test.go similarity index 99% rename from staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds_test.go index 663f0ccab49..8e0200a7b2f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package queueset +package request import ( "math"