Merge pull request #105930 from wojtek-t/pf_watch_support_7

P&F: Update and cleanup mutating work estimator
This commit is contained in:
Kubernetes Prow Robot
2021-11-01 07:33:18 -07:00
committed by GitHub
8 changed files with 287 additions and 221 deletions

View File

@@ -186,7 +186,7 @@ func TestFIFOQueueWorkEstimate(t *testing.T) {
update := func(we *queueSum, req *request, multiplier int) { update := func(we *queueSum, req *request, multiplier int) {
we.InitialSeatsSum += multiplier * req.InitialSeats() we.InitialSeatsSum += multiplier * req.InitialSeats()
we.MaxSeatsSum += multiplier * req.MaxSeats() we.MaxSeatsSum += multiplier * req.MaxSeats()
we.TotalWorkSum += SeatSeconds(multiplier) * req.totalWork() we.TotalWorkSum += fcrequest.SeatSeconds(multiplier) * req.totalWork()
} }
assert := func(t *testing.T, want, got *queueSum) { assert := func(t *testing.T, want, got *queueSum) {

View File

@@ -105,7 +105,7 @@ type queueSet struct {
// currentR is the amount of seat-seconds allocated per queue since process startup. // currentR is the amount of seat-seconds allocated per queue since process startup.
// This is our generalization of the progress meter named R in the original fair queuing work. // This is our generalization of the progress meter named R in the original fair queuing work.
currentR SeatSeconds currentR fqrequest.SeatSeconds
// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
lastRealTime time.Time lastRealTime time.Time
@@ -423,7 +423,7 @@ func (qs *queueSet) syncTimeLocked(ctx context.Context) {
timeSinceLast := realNow.Sub(qs.lastRealTime) timeSinceLast := realNow.Sub(qs.lastRealTime)
qs.lastRealTime = realNow qs.lastRealTime = realNow
prevR := qs.currentR prevR := qs.currentR
incrR := SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast) incrR := fqrequest.SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast)
qs.currentR = prevR + incrR qs.currentR = prevR + incrR
switch { switch {
case prevR > qs.currentR: case prevR > qs.currentR:
@@ -436,7 +436,7 @@ func (qs *queueSet) syncTimeLocked(ctx context.Context) {
// rDecrement is the amount by which the progress meter R is wound backwards // rDecrement is the amount by which the progress meter R is wound backwards
// when needed to avoid overflow. // when needed to avoid overflow.
const rDecrement = MaxSeatSeconds / 2 const rDecrement = fqrequest.MaxSeatSeconds / 2
// highR is the threshold that triggers advance of the epoch. // highR is the threshold that triggers advance of the epoch.
// That is, decrementing the global progress meter R by rDecrement. // That is, decrementing the global progress meter R by rDecrement.
@@ -445,7 +445,7 @@ const highR = rDecrement + rDecrement/2
// advanceEpoch subtracts rDecrement from the global progress meter R // advanceEpoch subtracts rDecrement from the global progress meter R
// and all the readings that have been taked from that meter. // and all the readings that have been taked from that meter.
// The now and incrR parameters are only used to add info to the log messages. // The now and incrR parameters are only used to add info to the log messages.
func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR SeatSeconds) { func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqrequest.SeatSeconds) {
oldR := qs.currentR oldR := qs.currentR
qs.currentR -= rDecrement qs.currentR -= rDecrement
klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR) klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR)
@@ -551,7 +551,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
offset := qs.enqueues % handSize offset := qs.enqueues % handSize
qs.enqueues++ qs.enqueues++
bestQueueIdx := -1 bestQueueIdx := -1
minQueueSeatSeconds := MaxSeatSeconds minQueueSeatSeconds := fqrequest.MaxSeatSeconds
for i := 0; i < handSize; i++ { for i := 0; i < handSize; i++ {
queueIdx := hand[(offset+i)%handSize] queueIdx := hand[(offset+i)%handSize]
queue := qs.queues[queueIdx] queue := qs.queues[queueIdx]
@@ -746,11 +746,11 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
// the oldest waiting request is minimal, and also returns that request. // the oldest waiting request is minimal, and also returns that request.
// Returns nils if the head of the selected queue can not be dispatched now. // Returns nils if the head of the selected queue can not be dispatched now.
func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) { func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) {
minVirtualFinish := MaxSeatSeconds minVirtualFinish := fqrequest.MaxSeatSeconds
sMin := MaxSeatSeconds sMin := fqrequest.MaxSeatSeconds
dsMin := MaxSeatSeconds dsMin := fqrequest.MaxSeatSeconds
sMax := MinSeatSeconds sMax := fqrequest.MinSeatSeconds
dsMax := MinSeatSeconds dsMax := fqrequest.MinSeatSeconds
var minQueue *queue var minQueue *queue
var minIndex int var minIndex int
nq := len(qs.queues) nq := len(qs.queues)
@@ -761,7 +761,7 @@ func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) {
if oldestWaiting != nil { if oldestWaiting != nil {
sMin = ssMin(sMin, queue.nextDispatchR) sMin = ssMin(sMin, queue.nextDispatchR)
sMax = ssMax(sMax, queue.nextDispatchR) sMax = ssMax(sMax, queue.nextDispatchR)
estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) estimatedWorkInProgress := fqrequest.SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress) dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress) dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork() currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork()
@@ -813,14 +813,14 @@ func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) {
return minQueue, oldestReqFromMinQueue return minQueue, oldestReqFromMinQueue
} }
func ssMin(a, b SeatSeconds) SeatSeconds { func ssMin(a, b fqrequest.SeatSeconds) fqrequest.SeatSeconds {
if a > b { if a > b {
return b return b
} }
return a return a
} }
func ssMax(a, b SeatSeconds) SeatSeconds { func ssMax(a, b fqrequest.SeatSeconds) fqrequest.SeatSeconds {
if a < b { if a < b {
return b return b
} }
@@ -916,7 +916,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// When a request finishes being served, and the actual service time was S, // When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width. // the queues start R is decremented by (G - S)*width.
r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration) r.queue.nextDispatchR -= fqrequest.SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration)
qs.boundNextDispatchLocked(r.queue) qs.boundNextDispatchLocked(r.queue)
} }
} }

View File

@@ -1173,13 +1173,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
@@ -1196,7 +1196,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
@@ -1213,13 +1213,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
@@ -1236,13 +1236,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
@@ -1259,13 +1259,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
@@ -1446,7 +1446,7 @@ func TestRequestWork(t *testing.T) {
} }
got := request.totalWork() got := request.totalWork()
want := SeatsTimesDuration(3, 2*time.Second) + SeatsTimesDuration(50, 70*time.Second) want := fcrequest.SeatsTimesDuration(3, 2*time.Second) + fcrequest.SeatsTimesDuration(50, 70*time.Second)
if want != got { if want != got {
t.Errorf("Expected totalWork: %v, but got: %v", want, got) t.Errorf("Expected totalWork: %v, but got: %v", want, got)
} }

View File

@@ -18,8 +18,6 @@ package queueset
import ( import (
"context" "context"
"fmt"
"math"
"time" "time"
genericrequest "k8s.io/apiserver/pkg/endpoints/request" genericrequest "k8s.io/apiserver/pkg/endpoints/request"
@@ -74,7 +72,7 @@ type request struct {
// arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time". // arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time".
// This field is meaningful only while the request is waiting in the virtual world. // This field is meaningful only while the request is waiting in the virtual world.
arrivalR SeatSeconds arrivalR fcrequest.SeatSeconds
// startTime is the real time when the request began executing // startTime is the real time when the request began executing
startTime time.Time startTime time.Time
@@ -85,8 +83,8 @@ type request struct {
type completedWorkEstimate struct { type completedWorkEstimate struct {
fcrequest.WorkEstimate fcrequest.WorkEstimate
totalWork SeatSeconds // initial plus final work totalWork fcrequest.SeatSeconds // initial plus final work
finalWork SeatSeconds // only final work finalWork fcrequest.SeatSeconds // only final work
} }
// queue is a sequence of requests that have arrived but not yet finished // queue is a sequence of requests that have arrived but not yet finished
@@ -97,7 +95,7 @@ type queue struct {
// nextDispatchR is the R progress meter reading at // nextDispatchR is the R progress meter reading at
// which the next request will be dispatched in the virtual world. // which the next request will be dispatched in the virtual world.
nextDispatchR SeatSeconds nextDispatchR fcrequest.SeatSeconds
// requestsExecuting is the count in the real world. // requestsExecuting is the count in the real world.
requestsExecuting int requestsExecuting int
@@ -122,10 +120,10 @@ type queueSum struct {
MaxSeatsSum int MaxSeatsSum int
// TotalWorkSum is the sum of totalWork of the waiting requests // TotalWorkSum is the sum of totalWork of the waiting requests
TotalWorkSum SeatSeconds TotalWorkSum fcrequest.SeatSeconds
} }
func (req *request) totalWork() SeatSeconds { func (req *request) totalWork() fcrequest.SeatSeconds {
return req.workEstimate.totalWork return req.workEstimate.totalWork
} }
@@ -138,12 +136,12 @@ func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWo
} }
} }
func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) SeatSeconds { func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) fcrequest.SeatSeconds {
return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) return fcrequest.SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration)
} }
func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) SeatSeconds { func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) fcrequest.SeatSeconds {
return SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) return fcrequest.SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency)
} }
func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump { func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
@@ -183,45 +181,3 @@ func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
QueueSum: queueSum, QueueSum: queueSum,
} }
} }
// SeatSeconds is a measure of work, in units of seat-seconds, using a fixed-point representation.
// `SeatSeconds(n)` represents `n/ssScale` seat-seconds.
// The constants `ssScale` and `ssScaleDigits` are private to the implementation here,
// no other code should use them.
type SeatSeconds uint64
// MaxSeatsSeconds is the maximum representable value of SeatSeconds
const MaxSeatSeconds = SeatSeconds(math.MaxUint64)
// MinSeatSeconds is the lowest representable value of SeatSeconds
const MinSeatSeconds = SeatSeconds(0)
// SeatsTimeDuration produces the SeatSeconds value for the given factors.
// This is intended only to produce small values, increments in work
// rather than amount of work done since process start.
func SeatsTimesDuration(seats float64, duration time.Duration) SeatSeconds {
return SeatSeconds(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale)))
}
// ToFloat converts to a floating-point representation.
// This conversion may lose precision.
func (ss SeatSeconds) ToFloat() float64 {
return float64(ss) / ssScale
}
// DurationPerSeat returns duration per seat.
// This division may lose precision.
func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration {
return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale))
}
// String converts to a string.
// This is suitable for large as well as small values.
func (ss SeatSeconds) String() string {
const div = SeatSeconds(ssScale)
quo := ss / div
rem := ss - quo*div
return fmt.Sprintf("%d.%08dss", quo, rem)
}
const ssScale = 1e8

View File

@@ -24,28 +24,37 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
) )
const (
watchesPerSeat = 10.0
eventAdditionalDuration = 5 * time.Millisecond
// TODO(wojtekt): Remove it once we tune the algorithm to not fail
// scalability tests.
enableMutatingWorkEstimator = false
)
func newMutatingWorkEstimator(countFn watchCountGetterFunc) WorkEstimatorFunc { func newMutatingWorkEstimator(countFn watchCountGetterFunc) WorkEstimatorFunc {
return newTestMutatingWorkEstimator(countFn, enableMutatingWorkEstimator)
}
func newTestMutatingWorkEstimator(countFn watchCountGetterFunc, enabled bool) WorkEstimatorFunc {
estimator := &mutatingWorkEstimator{ estimator := &mutatingWorkEstimator{
countFn: countFn, countFn: countFn,
enabled: enabled,
} }
return estimator.estimate return estimator.estimate
} }
type mutatingWorkEstimator struct { type mutatingWorkEstimator struct {
countFn watchCountGetterFunc countFn watchCountGetterFunc
enabled bool
} }
const (
watchesPerSeat = 10.0
eventAdditionalDuration = 5 * time.Millisecond
)
func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
// TODO(wojtekt): Remove once we tune the algorithm to not fail if !e.enabled {
// scalability tests.
return WorkEstimate{ return WorkEstimate{
InitialSeats: 1, InitialSeats: 1,
} }
}
requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok { if !ok {
@@ -66,16 +75,13 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
// - cost of processing an event object for each watcher (e.g. filtering, // - cost of processing an event object for each watcher (e.g. filtering,
// sending data over network) // sending data over network)
// We're starting simple to get some operational experience with it and // We're starting simple to get some operational experience with it and
// we will work on tuning the algorithm later. As a starting point we // we will work on tuning the algorithm later. Given that the actual work
// we simply assume that processing 1 event takes 1/Nth of a seat for // associated with processing watch events is happening in multiple
// M milliseconds and processing different events is infinitely parallelizable. // goroutines (proportional to the number of watchers) that are all
// We simply record the appropriate values here and rely on potential // resumed at once, as a starting point we assume that each such goroutine
// reshaping of the request if the concurrency limit for a given priority // is taking 1/Nth of a seat for M milliseconds.
// level will not allow to run request with that many seats. // We allow the accounting of that work in P&F to be reshaped into another
// // rectangle of equal area for practical reasons.
// TODO: As described in the KEP, we should take into account that not all
// events are equal and try to estimate the cost of a single event based on
// some historical data about size of events.
var finalSeats uint var finalSeats uint
var additionalLatency time.Duration var additionalLatency time.Duration
@@ -85,8 +91,44 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
// However, until we tune the estimation we want to stay on the safe side // However, until we tune the estimation we want to stay on the safe side
// an avoid introducing additional latency for almost every single request. // an avoid introducing additional latency for almost every single request.
if watchCount >= watchesPerSeat { if watchCount >= watchesPerSeat {
// TODO: As described in the KEP, we should take into account that not all
// events are equal and try to estimate the cost of a single event based on
// some historical data about size of events.
finalSeats = uint(math.Ceil(float64(watchCount) / watchesPerSeat)) finalSeats = uint(math.Ceil(float64(watchCount) / watchesPerSeat))
additionalLatency = eventAdditionalDuration finalWork := SeatsTimesDuration(float64(finalSeats), eventAdditionalDuration)
// While processing individual events is highly parallel,
// the design/implementation of P&F has a couple limitations that
// make using this assumption in the P&F implementation very
// inefficient because:
// - we reserve max(initialSeats, finalSeats) for time of executing
// both phases of the request
// - even more importantly, when a given `wide` request is the one to
// be dispatched, we are not dispatching any other request until
// we accumulate enough seats to dispatch the nominated one, even
// if currently unoccupied seats would allow for dispatching some
// other requests in the meantime
// As a consequence of these, the wider the request, the more capacity
// will effectively be blocked and unused during dispatching and
// executing this request.
//
// To mitigate the impact of it, we're capping the maximum number of
// seats that can be assigned to a given request. Thanks to it:
// 1) we reduce the amount of seat-seconds that are "wasted" during
// dispatching and executing initial phase of the request
// 2) we are not changing the finalWork estimate - just potentially
// reshaping it to be narrower and longer. As long as the maximum
// seats setting will prevent dispatching too many requests at once
// to prevent overloading kube-apiserver (and/or etcd or the VM or
// a physical machine it is running on), we believe the relaxed
// version should be good enough to achieve the P&F goals.
//
// TODO: Confirm that the current cap of maximumSeats allow us to
// achieve the above.
if finalSeats > maximumSeats {
finalSeats = maximumSeats
}
additionalLatency = finalWork.DurationPerSeat(float64(finalSeats))
} }
return WorkEstimate{ return WorkEstimate{

View File

@@ -0,0 +1,65 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"fmt"
"math"
"time"
)
// SeatSeconds is a measure of work, in units of seat-seconds, using a fixed-point representation.
// `SeatSeconds(n)` represents `n/ssScale` seat-seconds.
// The `ssScale` constant is private to the implementation here,
// no other code should use it.
type SeatSeconds uint64
// MaxSeatsSeconds is the maximum representable value of SeatSeconds
const MaxSeatSeconds = SeatSeconds(math.MaxUint64)
// MinSeatSeconds is the lowest representable value of SeatSeconds
const MinSeatSeconds = SeatSeconds(0)
// SeatsTimeDuration produces the SeatSeconds value for the given factors.
// This is intended only to produce small values, increments in work
// rather than amount of work done since process start.
func SeatsTimesDuration(seats float64, duration time.Duration) SeatSeconds {
return SeatSeconds(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale)))
}
// ToFloat converts to a floating-point representation.
// This conversion may lose precision.
func (ss SeatSeconds) ToFloat() float64 {
return float64(ss) / ssScale
}
// DurationPerSeat returns duration per seat.
// This division may lose precision.
func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration {
return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale))
}
// String converts to a string.
// This is suitable for large as well as small values.
func (ss SeatSeconds) String() string {
const div = SeatSeconds(ssScale)
quo := ss / div
rem := ss - quo*div
return fmt.Sprintf("%d.%08dss", quo, rem)
}
const ssScale = 1e8

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package queueset package request
import ( import (
"math" "math"

View File

@@ -252,9 +252,6 @@ func TestWorkEstimator(t *testing.T) {
countErr: errors.New("unknown error"), countErr: errors.New("unknown error"),
initialSeatsExpected: maximumSeats, initialSeatsExpected: maximumSeats,
}, },
// TODO(wojtekt): Reenable these tests after tuning algorithm to
// not fail scalability tests.
/*
{ {
name: "request verb is create, no watches", name: "request verb is create, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos", requestURI: "http://server/apis/foo.bar/v1/foos",
@@ -294,7 +291,7 @@ func TestWorkEstimator(t *testing.T) {
additionalLatencyExpected: 0, additionalLatencyExpected: 0,
}, },
{ {
name: "request verb is create, watches registered, maximum is exceeded", name: "request verb is create, watches registered, maximum is capped",
requestURI: "http://server/apis/foo.bar/v1/foos", requestURI: "http://server/apis/foo.bar/v1/foos",
requestInfo: &apirequest.RequestInfo{ requestInfo: &apirequest.RequestInfo{
Verb: "create", Verb: "create",
@@ -303,8 +300,8 @@ func TestWorkEstimator(t *testing.T) {
}, },
watchCount: 199, watchCount: 199,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 20, finalSeatsExpected: 10,
additionalLatencyExpected: 5 * time.Millisecond, additionalLatencyExpected: 10 * time.Millisecond,
}, },
{ {
name: "request verb is update, no watches", name: "request verb is update, no watches",
@@ -381,7 +378,6 @@ func TestWorkEstimator(t *testing.T) {
finalSeatsExpected: 3, finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond, additionalLatencyExpected: 5 * time.Millisecond,
}, },
*/
} }
for _, test := range tests { for _, test := range tests {
@@ -396,7 +392,14 @@ func TestWorkEstimator(t *testing.T) {
watchCountsFn := func(_ *apirequest.RequestInfo) int { watchCountsFn := func(_ *apirequest.RequestInfo) int {
return test.watchCount return test.watchCount
} }
estimator := NewWorkEstimator(countsFn, watchCountsFn)
// TODO(wojtek-t): Simplify it once we enable mutating work estimator
// by default.
testEstimator := &workEstimator{
listWorkEstimator: newListWorkEstimator(countsFn),
mutatingWorkEstimator: newTestMutatingWorkEstimator(watchCountsFn, true),
}
estimator := WorkEstimatorFunc(testEstimator.estimate)
req, err := http.NewRequest("GET", test.requestURI, nil) req, err := http.NewRequest("GET", test.requestURI, nil)
if err != nil { if err != nil {