P&F: move seat-seconds to a better location

This commit is contained in:
Wojciech Tyczyński 2021-10-27 10:30:25 +02:00
parent 943bc38c0e
commit e262db7a4d
6 changed files with 101 additions and 80 deletions

View File

@ -186,7 +186,7 @@ func TestFIFOQueueWorkEstimate(t *testing.T) {
update := func(we *queueSum, req *request, multiplier int) { update := func(we *queueSum, req *request, multiplier int) {
we.InitialSeatsSum += multiplier * req.InitialSeats() we.InitialSeatsSum += multiplier * req.InitialSeats()
we.MaxSeatsSum += multiplier * req.MaxSeats() we.MaxSeatsSum += multiplier * req.MaxSeats()
we.TotalWorkSum += SeatSeconds(multiplier) * req.totalWork() we.TotalWorkSum += fcrequest.SeatSeconds(multiplier) * req.totalWork()
} }
assert := func(t *testing.T, want, got *queueSum) { assert := func(t *testing.T, want, got *queueSum) {

View File

@ -105,7 +105,7 @@ type queueSet struct {
// currentR is the amount of seat-seconds allocated per queue since process startup. // currentR is the amount of seat-seconds allocated per queue since process startup.
// This is our generalization of the progress meter named R in the original fair queuing work. // This is our generalization of the progress meter named R in the original fair queuing work.
currentR SeatSeconds currentR fqrequest.SeatSeconds
// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
lastRealTime time.Time lastRealTime time.Time
@ -422,7 +422,7 @@ func (qs *queueSet) syncTimeLocked(ctx context.Context) {
timeSinceLast := realNow.Sub(qs.lastRealTime) timeSinceLast := realNow.Sub(qs.lastRealTime)
qs.lastRealTime = realNow qs.lastRealTime = realNow
prevR := qs.currentR prevR := qs.currentR
incrR := SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast) incrR := fqrequest.SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast)
qs.currentR = prevR + incrR qs.currentR = prevR + incrR
switch { switch {
case prevR > qs.currentR: case prevR > qs.currentR:
@ -435,7 +435,7 @@ func (qs *queueSet) syncTimeLocked(ctx context.Context) {
// rDecrement is the amount by which the progress meter R is wound backwards // rDecrement is the amount by which the progress meter R is wound backwards
// when needed to avoid overflow. // when needed to avoid overflow.
const rDecrement = MaxSeatSeconds / 2 const rDecrement = fqrequest.MaxSeatSeconds / 2
// highR is the threshold that triggers advance of the epoch. // highR is the threshold that triggers advance of the epoch.
// That is, decrementing the global progress meter R by rDecrement. // That is, decrementing the global progress meter R by rDecrement.
@ -444,7 +444,7 @@ const highR = rDecrement + rDecrement/2
// advanceEpoch subtracts rDecrement from the global progress meter R // advanceEpoch subtracts rDecrement from the global progress meter R
// and all the readings that have been taked from that meter. // and all the readings that have been taked from that meter.
// The now and incrR parameters are only used to add info to the log messages. // The now and incrR parameters are only used to add info to the log messages.
func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR SeatSeconds) { func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqrequest.SeatSeconds) {
oldR := qs.currentR oldR := qs.currentR
qs.currentR -= rDecrement qs.currentR -= rDecrement
klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR) klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR)
@ -550,7 +550,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
offset := qs.enqueues % handSize offset := qs.enqueues % handSize
qs.enqueues++ qs.enqueues++
bestQueueIdx := -1 bestQueueIdx := -1
minQueueSeatSeconds := MaxSeatSeconds minQueueSeatSeconds := fqrequest.MaxSeatSeconds
for i := 0; i < handSize; i++ { for i := 0; i < handSize; i++ {
queueIdx := hand[(offset+i)%handSize] queueIdx := hand[(offset+i)%handSize]
queue := qs.queues[queueIdx] queue := qs.queues[queueIdx]
@ -745,11 +745,11 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
// the oldest waiting request is minimal, and also returns that request. // the oldest waiting request is minimal, and also returns that request.
// Returns nils if the head of the selected queue can not be dispatched now. // Returns nils if the head of the selected queue can not be dispatched now.
func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) { func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) {
minVirtualFinish := MaxSeatSeconds minVirtualFinish := fqrequest.MaxSeatSeconds
sMin := MaxSeatSeconds sMin := fqrequest.MaxSeatSeconds
dsMin := MaxSeatSeconds dsMin := fqrequest.MaxSeatSeconds
sMax := MinSeatSeconds sMax := fqrequest.MinSeatSeconds
dsMax := MinSeatSeconds dsMax := fqrequest.MinSeatSeconds
var minQueue *queue var minQueue *queue
var minIndex int var minIndex int
nq := len(qs.queues) nq := len(qs.queues)
@ -760,7 +760,7 @@ func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) {
if oldestWaiting != nil { if oldestWaiting != nil {
sMin = ssMin(sMin, queue.nextDispatchR) sMin = ssMin(sMin, queue.nextDispatchR)
sMax = ssMax(sMax, queue.nextDispatchR) sMax = ssMax(sMax, queue.nextDispatchR)
estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) estimatedWorkInProgress := fqrequest.SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress) dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress) dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork() currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork()
@ -812,14 +812,14 @@ func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) {
return minQueue, oldestReqFromMinQueue return minQueue, oldestReqFromMinQueue
} }
func ssMin(a, b SeatSeconds) SeatSeconds { func ssMin(a, b fqrequest.SeatSeconds) fqrequest.SeatSeconds {
if a > b { if a > b {
return b return b
} }
return a return a
} }
func ssMax(a, b SeatSeconds) SeatSeconds { func ssMax(a, b fqrequest.SeatSeconds) fqrequest.SeatSeconds {
if a < b { if a < b {
return b return b
} }
@ -915,7 +915,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// When a request finishes being served, and the actual service time was S, // When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width. // the queues start R is decremented by (G - S)*width.
r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration) r.queue.nextDispatchR -= fqrequest.SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration)
qs.boundNextDispatchLocked(r.queue) qs.boundNextDispatchLocked(r.queue)
} }
} }

View File

@ -1173,13 +1173,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
@ -1196,7 +1196,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
@ -1213,13 +1213,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
@ -1236,13 +1236,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
@ -1259,13 +1259,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
@ -1446,7 +1446,7 @@ func TestRequestWork(t *testing.T) {
} }
got := request.totalWork() got := request.totalWork()
want := SeatsTimesDuration(3, 2*time.Second) + SeatsTimesDuration(50, 70*time.Second) want := fcrequest.SeatsTimesDuration(3, 2*time.Second) + fcrequest.SeatsTimesDuration(50, 70*time.Second)
if want != got { if want != got {
t.Errorf("Expected totalWork: %v, but got: %v", want, got) t.Errorf("Expected totalWork: %v, but got: %v", want, got)
} }

View File

@ -18,8 +18,6 @@ package queueset
import ( import (
"context" "context"
"fmt"
"math"
"time" "time"
genericrequest "k8s.io/apiserver/pkg/endpoints/request" genericrequest "k8s.io/apiserver/pkg/endpoints/request"
@ -74,7 +72,7 @@ type request struct {
// arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time". // arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time".
// This field is meaningful only while the request is waiting in the virtual world. // This field is meaningful only while the request is waiting in the virtual world.
arrivalR SeatSeconds arrivalR fcrequest.SeatSeconds
// startTime is the real time when the request began executing // startTime is the real time when the request began executing
startTime time.Time startTime time.Time
@ -85,8 +83,8 @@ type request struct {
type completedWorkEstimate struct { type completedWorkEstimate struct {
fcrequest.WorkEstimate fcrequest.WorkEstimate
totalWork SeatSeconds // initial plus final work totalWork fcrequest.SeatSeconds // initial plus final work
finalWork SeatSeconds // only final work finalWork fcrequest.SeatSeconds // only final work
} }
// queue is a sequence of requests that have arrived but not yet finished // queue is a sequence of requests that have arrived but not yet finished
@ -97,7 +95,7 @@ type queue struct {
// nextDispatchR is the R progress meter reading at // nextDispatchR is the R progress meter reading at
// which the next request will be dispatched in the virtual world. // which the next request will be dispatched in the virtual world.
nextDispatchR SeatSeconds nextDispatchR fcrequest.SeatSeconds
// requestsExecuting is the count in the real world. // requestsExecuting is the count in the real world.
requestsExecuting int requestsExecuting int
@ -122,10 +120,10 @@ type queueSum struct {
MaxSeatsSum int MaxSeatsSum int
// TotalWorkSum is the sum of totalWork of the waiting requests // TotalWorkSum is the sum of totalWork of the waiting requests
TotalWorkSum SeatSeconds TotalWorkSum fcrequest.SeatSeconds
} }
func (req *request) totalWork() SeatSeconds { func (req *request) totalWork() fcrequest.SeatSeconds {
return req.workEstimate.totalWork return req.workEstimate.totalWork
} }
@ -138,12 +136,12 @@ func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWo
} }
} }
func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) SeatSeconds { func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) fcrequest.SeatSeconds {
return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) return fcrequest.SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration)
} }
func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) SeatSeconds { func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) fcrequest.SeatSeconds {
return SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) return fcrequest.SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency)
} }
func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump { func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
@ -183,45 +181,3 @@ func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
QueueSum: queueSum, QueueSum: queueSum,
} }
} }
// SeatSeconds is a measure of work, in units of seat-seconds, using a fixed-point representation.
// `SeatSeconds(n)` represents `n/ssScale` seat-seconds.
// The constants `ssScale` and `ssScaleDigits` are private to the implementation here,
// no other code should use them.
type SeatSeconds uint64
// MaxSeatsSeconds is the maximum representable value of SeatSeconds
const MaxSeatSeconds = SeatSeconds(math.MaxUint64)
// MinSeatSeconds is the lowest representable value of SeatSeconds
const MinSeatSeconds = SeatSeconds(0)
// SeatsTimeDuration produces the SeatSeconds value for the given factors.
// This is intended only to produce small values, increments in work
// rather than amount of work done since process start.
func SeatsTimesDuration(seats float64, duration time.Duration) SeatSeconds {
return SeatSeconds(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale)))
}
// ToFloat converts to a floating-point representation.
// This conversion may lose precision.
func (ss SeatSeconds) ToFloat() float64 {
return float64(ss) / ssScale
}
// DurationPerSeat returns duration per seat.
// This division may lose precision.
func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration {
return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale))
}
// String converts to a string.
// This is suitable for large as well as small values.
func (ss SeatSeconds) String() string {
const div = SeatSeconds(ssScale)
quo := ss / div
rem := ss - quo*div
return fmt.Sprintf("%d.%08dss", quo, rem)
}
const ssScale = 1e8

View File

@ -0,0 +1,65 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"fmt"
"math"
"time"
)
// SeatSeconds is a measure of work, in units of seat-seconds, using a fixed-point representation.
// `SeatSeconds(n)` represents `n/ssScale` seat-seconds.
// The `ssScale` constant is private to the implementation here,
// no other code should use it.
type SeatSeconds uint64
// MaxSeatsSeconds is the maximum representable value of SeatSeconds
const MaxSeatSeconds = SeatSeconds(math.MaxUint64)
// MinSeatSeconds is the lowest representable value of SeatSeconds
const MinSeatSeconds = SeatSeconds(0)
// SeatsTimeDuration produces the SeatSeconds value for the given factors.
// This is intended only to produce small values, increments in work
// rather than amount of work done since process start.
func SeatsTimesDuration(seats float64, duration time.Duration) SeatSeconds {
return SeatSeconds(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale)))
}
// ToFloat converts to a floating-point representation.
// This conversion may lose precision.
func (ss SeatSeconds) ToFloat() float64 {
return float64(ss) / ssScale
}
// DurationPerSeat returns duration per seat.
// This division may lose precision.
func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration {
return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale))
}
// String converts to a string.
// This is suitable for large as well as small values.
func (ss SeatSeconds) String() string {
const div = SeatSeconds(ssScale)
quo := ss / div
rem := ss - quo*div
return fmt.Sprintf("%d.%08dss", quo, rem)
}
const ssScale = 1e8

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package queueset package request
import ( import (
"math" "math"