diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go index 9c77e3bd0cb..22757cff1e6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go @@ -186,7 +186,7 @@ func TestFIFOQueueWorkEstimate(t *testing.T) { update := func(we *queueSum, req *request, multiplier int) { we.InitialSeatsSum += multiplier * req.InitialSeats() we.MaxSeatsSum += multiplier * req.MaxSeats() - we.TotalWorkSum += SeatSeconds(multiplier) * req.totalWork() + we.TotalWorkSum += fcrequest.SeatSeconds(multiplier) * req.totalWork() } assert := func(t *testing.T, want, got *queueSum) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 8bff2032f56..f1bc635b060 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -105,7 +105,7 @@ type queueSet struct { // currentR is the amount of seat-seconds allocated per queue since process startup. // This is our generalization of the progress meter named R in the original fair queuing work. - currentR SeatSeconds + currentR fqrequest.SeatSeconds // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated lastRealTime time.Time @@ -423,7 +423,7 @@ func (qs *queueSet) syncTimeLocked(ctx context.Context) { timeSinceLast := realNow.Sub(qs.lastRealTime) qs.lastRealTime = realNow prevR := qs.currentR - incrR := SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast) + incrR := fqrequest.SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast) qs.currentR = prevR + incrR switch { case prevR > qs.currentR: @@ -436,7 +436,7 @@ func (qs *queueSet) syncTimeLocked(ctx context.Context) { // rDecrement is the amount by which the progress meter R is wound backwards // when needed to avoid overflow. -const rDecrement = MaxSeatSeconds / 2 +const rDecrement = fqrequest.MaxSeatSeconds / 2 // highR is the threshold that triggers advance of the epoch. // That is, decrementing the global progress meter R by rDecrement. @@ -445,7 +445,7 @@ const highR = rDecrement + rDecrement/2 // advanceEpoch subtracts rDecrement from the global progress meter R // and all the readings that have been taked from that meter. // The now and incrR parameters are only used to add info to the log messages. -func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR SeatSeconds) { +func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqrequest.SeatSeconds) { oldR := qs.currentR qs.currentR -= rDecrement klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR) @@ -551,7 +551,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac offset := qs.enqueues % handSize qs.enqueues++ bestQueueIdx := -1 - minQueueSeatSeconds := MaxSeatSeconds + minQueueSeatSeconds := fqrequest.MaxSeatSeconds for i := 0; i < handSize; i++ { queueIdx := hand[(offset+i)%handSize] queue := qs.queues[queueIdx] @@ -746,11 +746,11 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { // the oldest waiting request is minimal, and also returns that request. // Returns nils if the head of the selected queue can not be dispatched now. func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) { - minVirtualFinish := MaxSeatSeconds - sMin := MaxSeatSeconds - dsMin := MaxSeatSeconds - sMax := MinSeatSeconds - dsMax := MinSeatSeconds + minVirtualFinish := fqrequest.MaxSeatSeconds + sMin := fqrequest.MaxSeatSeconds + dsMin := fqrequest.MaxSeatSeconds + sMax := fqrequest.MinSeatSeconds + dsMax := fqrequest.MinSeatSeconds var minQueue *queue var minIndex int nq := len(qs.queues) @@ -761,7 +761,7 @@ func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) { if oldestWaiting != nil { sMin = ssMin(sMin, queue.nextDispatchR) sMax = ssMax(sMax, queue.nextDispatchR) - estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) + estimatedWorkInProgress := fqrequest.SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress) dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress) currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork() @@ -813,14 +813,14 @@ func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) { return minQueue, oldestReqFromMinQueue } -func ssMin(a, b SeatSeconds) SeatSeconds { +func ssMin(a, b fqrequest.SeatSeconds) fqrequest.SeatSeconds { if a > b { return b } return a } -func ssMax(a, b SeatSeconds) SeatSeconds { +func ssMax(a, b fqrequest.SeatSeconds) fqrequest.SeatSeconds { if a < b { return b } @@ -916,7 +916,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { // When a request finishes being served, and the actual service time was S, // the queue’s start R is decremented by (G - S)*width. - r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration) + r.queue.nextDispatchR -= fqrequest.SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration) qs.boundNextDispatchLocked(r.queue) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index ab1343cd3ae..9da21f1e34a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -1173,13 +1173,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - nextDispatchR: SeatsTimesDuration(1, 200*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, ), }, { - nextDispatchR: SeatsTimesDuration(1, 100*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, ), @@ -1196,7 +1196,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - nextDispatchR: SeatsTimesDuration(1, 200*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, ), @@ -1213,13 +1213,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - nextDispatchR: SeatsTimesDuration(1, 200*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})}, ), }, { - nextDispatchR: SeatsTimesDuration(1, 100*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, ), @@ -1236,13 +1236,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - nextDispatchR: SeatsTimesDuration(1, 200*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, ), }, { - nextDispatchR: SeatsTimesDuration(1, 100*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, ), @@ -1259,13 +1259,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - nextDispatchR: SeatsTimesDuration(1, 200*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, ), }, { - nextDispatchR: SeatsTimesDuration(1, 100*time.Second), + nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, ), @@ -1446,7 +1446,7 @@ func TestRequestWork(t *testing.T) { } got := request.totalWork() - want := SeatsTimesDuration(3, 2*time.Second) + SeatsTimesDuration(50, 70*time.Second) + want := fcrequest.SeatsTimesDuration(3, 2*time.Second) + fcrequest.SeatsTimesDuration(50, 70*time.Second) if want != got { t.Errorf("Expected totalWork: %v, but got: %v", want, got) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index d4f7d5d1d52..f1073b96b28 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -18,8 +18,6 @@ package queueset import ( "context" - "fmt" - "math" "time" genericrequest "k8s.io/apiserver/pkg/endpoints/request" @@ -74,7 +72,7 @@ type request struct { // arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time". // This field is meaningful only while the request is waiting in the virtual world. - arrivalR SeatSeconds + arrivalR fcrequest.SeatSeconds // startTime is the real time when the request began executing startTime time.Time @@ -85,8 +83,8 @@ type request struct { type completedWorkEstimate struct { fcrequest.WorkEstimate - totalWork SeatSeconds // initial plus final work - finalWork SeatSeconds // only final work + totalWork fcrequest.SeatSeconds // initial plus final work + finalWork fcrequest.SeatSeconds // only final work } // queue is a sequence of requests that have arrived but not yet finished @@ -97,7 +95,7 @@ type queue struct { // nextDispatchR is the R progress meter reading at // which the next request will be dispatched in the virtual world. - nextDispatchR SeatSeconds + nextDispatchR fcrequest.SeatSeconds // requestsExecuting is the count in the real world. requestsExecuting int @@ -122,10 +120,10 @@ type queueSum struct { MaxSeatsSum int // TotalWorkSum is the sum of totalWork of the waiting requests - TotalWorkSum SeatSeconds + TotalWorkSum fcrequest.SeatSeconds } -func (req *request) totalWork() SeatSeconds { +func (req *request) totalWork() fcrequest.SeatSeconds { return req.workEstimate.totalWork } @@ -138,12 +136,12 @@ func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWo } } -func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) SeatSeconds { - return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) +func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) fcrequest.SeatSeconds { + return fcrequest.SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) } -func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) SeatSeconds { - return SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) +func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) fcrequest.SeatSeconds { + return fcrequest.SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) } func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump { @@ -183,45 +181,3 @@ func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump { QueueSum: queueSum, } } - -// SeatSeconds is a measure of work, in units of seat-seconds, using a fixed-point representation. -// `SeatSeconds(n)` represents `n/ssScale` seat-seconds. -// The constants `ssScale` and `ssScaleDigits` are private to the implementation here, -// no other code should use them. -type SeatSeconds uint64 - -// MaxSeatsSeconds is the maximum representable value of SeatSeconds -const MaxSeatSeconds = SeatSeconds(math.MaxUint64) - -// MinSeatSeconds is the lowest representable value of SeatSeconds -const MinSeatSeconds = SeatSeconds(0) - -// SeatsTimeDuration produces the SeatSeconds value for the given factors. -// This is intended only to produce small values, increments in work -// rather than amount of work done since process start. -func SeatsTimesDuration(seats float64, duration time.Duration) SeatSeconds { - return SeatSeconds(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale))) -} - -// ToFloat converts to a floating-point representation. -// This conversion may lose precision. -func (ss SeatSeconds) ToFloat() float64 { - return float64(ss) / ssScale -} - -// DurationPerSeat returns duration per seat. -// This division may lose precision. -func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration { - return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale)) -} - -// String converts to a string. -// This is suitable for large as well as small values. -func (ss SeatSeconds) String() string { - const div = SeatSeconds(ssScale) - quo := ss / div - rem := ss - quo*div - return fmt.Sprintf("%d.%08dss", quo, rem) -} - -const ssScale = 1e8 diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go index 2765bbc47f7..ff0dd357149 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go @@ -24,27 +24,36 @@ import ( apirequest "k8s.io/apiserver/pkg/endpoints/request" ) +const ( + watchesPerSeat = 10.0 + eventAdditionalDuration = 5 * time.Millisecond + // TODO(wojtekt): Remove it once we tune the algorithm to not fail + // scalability tests. + enableMutatingWorkEstimator = false +) + func newMutatingWorkEstimator(countFn watchCountGetterFunc) WorkEstimatorFunc { + return newTestMutatingWorkEstimator(countFn, enableMutatingWorkEstimator) +} + +func newTestMutatingWorkEstimator(countFn watchCountGetterFunc, enabled bool) WorkEstimatorFunc { estimator := &mutatingWorkEstimator{ countFn: countFn, + enabled: enabled, } return estimator.estimate } type mutatingWorkEstimator struct { countFn watchCountGetterFunc + enabled bool } -const ( - watchesPerSeat = 10.0 - eventAdditionalDuration = 5 * time.Millisecond -) - func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { - // TODO(wojtekt): Remove once we tune the algorithm to not fail - // scalability tests. - return WorkEstimate{ - InitialSeats: 1, + if !e.enabled { + return WorkEstimate{ + InitialSeats: 1, + } } requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) @@ -66,16 +75,13 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { // - cost of processing an event object for each watcher (e.g. filtering, // sending data over network) // We're starting simple to get some operational experience with it and - // we will work on tuning the algorithm later. As a starting point we - // we simply assume that processing 1 event takes 1/Nth of a seat for - // M milliseconds and processing different events is infinitely parallelizable. - // We simply record the appropriate values here and rely on potential - // reshaping of the request if the concurrency limit for a given priority - // level will not allow to run request with that many seats. - // - // TODO: As described in the KEP, we should take into account that not all - // events are equal and try to estimate the cost of a single event based on - // some historical data about size of events. + // we will work on tuning the algorithm later. Given that the actual work + // associated with processing watch events is happening in multiple + // goroutines (proportional to the number of watchers) that are all + // resumed at once, as a starting point we assume that each such goroutine + // is taking 1/Nth of a seat for M milliseconds. + // We allow the accounting of that work in P&F to be reshaped into another + // rectangle of equal area for practical reasons. var finalSeats uint var additionalLatency time.Duration @@ -85,8 +91,44 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { // However, until we tune the estimation we want to stay on the safe side // an avoid introducing additional latency for almost every single request. if watchCount >= watchesPerSeat { + // TODO: As described in the KEP, we should take into account that not all + // events are equal and try to estimate the cost of a single event based on + // some historical data about size of events. finalSeats = uint(math.Ceil(float64(watchCount) / watchesPerSeat)) - additionalLatency = eventAdditionalDuration + finalWork := SeatsTimesDuration(float64(finalSeats), eventAdditionalDuration) + + // While processing individual events is highly parallel, + // the design/implementation of P&F has a couple limitations that + // make using this assumption in the P&F implementation very + // inefficient because: + // - we reserve max(initialSeats, finalSeats) for time of executing + // both phases of the request + // - even more importantly, when a given `wide` request is the one to + // be dispatched, we are not dispatching any other request until + // we accumulate enough seats to dispatch the nominated one, even + // if currently unoccupied seats would allow for dispatching some + // other requests in the meantime + // As a consequence of these, the wider the request, the more capacity + // will effectively be blocked and unused during dispatching and + // executing this request. + // + // To mitigate the impact of it, we're capping the maximum number of + // seats that can be assigned to a given request. Thanks to it: + // 1) we reduce the amount of seat-seconds that are "wasted" during + // dispatching and executing initial phase of the request + // 2) we are not changing the finalWork estimate - just potentially + // reshaping it to be narrower and longer. As long as the maximum + // seats setting will prevent dispatching too many requests at once + // to prevent overloading kube-apiserver (and/or etcd or the VM or + // a physical machine it is running on), we believe the relaxed + // version should be good enough to achieve the P&F goals. + // + // TODO: Confirm that the current cap of maximumSeats allow us to + // achieve the above. + if finalSeats > maximumSeats { + finalSeats = maximumSeats + } + additionalLatency = finalWork.DurationPerSeat(float64(finalSeats)) } return WorkEstimate{ diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds.go new file mode 100644 index 00000000000..e3a40174524 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds.go @@ -0,0 +1,65 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "fmt" + "math" + "time" +) + +// SeatSeconds is a measure of work, in units of seat-seconds, using a fixed-point representation. +// `SeatSeconds(n)` represents `n/ssScale` seat-seconds. +// The `ssScale` constant is private to the implementation here, +// no other code should use it. +type SeatSeconds uint64 + +// MaxSeatsSeconds is the maximum representable value of SeatSeconds +const MaxSeatSeconds = SeatSeconds(math.MaxUint64) + +// MinSeatSeconds is the lowest representable value of SeatSeconds +const MinSeatSeconds = SeatSeconds(0) + +// SeatsTimeDuration produces the SeatSeconds value for the given factors. +// This is intended only to produce small values, increments in work +// rather than amount of work done since process start. +func SeatsTimesDuration(seats float64, duration time.Duration) SeatSeconds { + return SeatSeconds(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale))) +} + +// ToFloat converts to a floating-point representation. +// This conversion may lose precision. +func (ss SeatSeconds) ToFloat() float64 { + return float64(ss) / ssScale +} + +// DurationPerSeat returns duration per seat. +// This division may lose precision. +func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration { + return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale)) +} + +// String converts to a string. +// This is suitable for large as well as small values. +func (ss SeatSeconds) String() string { + const div = SeatSeconds(ssScale) + quo := ss / div + rem := ss - quo*div + return fmt.Sprintf("%d.%08dss", quo, rem) +} + +const ssScale = 1e8 diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds_test.go similarity index 99% rename from staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds_test.go index 663f0ccab49..8e0200a7b2f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/seat_seconds_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package queueset +package request import ( "math" diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go index 8f244669175..caf2e53b739 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go @@ -252,136 +252,132 @@ func TestWorkEstimator(t *testing.T) { countErr: errors.New("unknown error"), initialSeatsExpected: maximumSeats, }, - // TODO(wojtekt): Reenable these tests after tuning algorithm to - // not fail scalability tests. - /* - { - name: "request verb is create, no watches", - requestURI: "http://server/apis/foo.bar/v1/foos", - requestInfo: &apirequest.RequestInfo{ - Verb: "create", - APIGroup: "foo.bar", - Resource: "foos", - }, - initialSeatsExpected: 1, - finalSeatsExpected: 0, - additionalLatencyExpected: 0, + { + name: "request verb is create, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", }, - { - name: "request verb is create, watches registered", - requestURI: "http://server/apis/foo.bar/v1/foos", - requestInfo: &apirequest.RequestInfo{ - Verb: "create", - APIGroup: "foo.bar", - Resource: "foos", - }, - watchCount: 29, - initialSeatsExpected: 1, - finalSeatsExpected: 3, - additionalLatencyExpected: 5 * time.Millisecond, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is create, watches registered", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", }, - { - name: "request verb is create, watches registered, no additional latency", - requestURI: "http://server/apis/foo.bar/v1/foos", - requestInfo: &apirequest.RequestInfo{ - Verb: "create", - APIGroup: "foo.bar", - Resource: "foos", - }, - watchCount: 5, - initialSeatsExpected: 1, - finalSeatsExpected: 0, - additionalLatencyExpected: 0, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is create, watches registered, no additional latency", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", }, - { - name: "request verb is create, watches registered, maximum is exceeded", - requestURI: "http://server/apis/foo.bar/v1/foos", - requestInfo: &apirequest.RequestInfo{ - Verb: "create", - APIGroup: "foo.bar", - Resource: "foos", - }, - watchCount: 199, - initialSeatsExpected: 1, - finalSeatsExpected: 20, - additionalLatencyExpected: 5 * time.Millisecond, + watchCount: 5, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is create, watches registered, maximum is capped", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", }, - { - name: "request verb is update, no watches", - requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", - requestInfo: &apirequest.RequestInfo{ - Verb: "update", - APIGroup: "foo.bar", - Resource: "foos", - }, - initialSeatsExpected: 1, - finalSeatsExpected: 0, - additionalLatencyExpected: 0, + watchCount: 199, + initialSeatsExpected: 1, + finalSeatsExpected: 10, + additionalLatencyExpected: 10 * time.Millisecond, + }, + { + name: "request verb is update, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "update", + APIGroup: "foo.bar", + Resource: "foos", }, - { - name: "request verb is update, watches registered", - requestURI: "http://server/apis/foor.bar/v1/foos/myfoo", - requestInfo: &apirequest.RequestInfo{ - Verb: "update", - APIGroup: "foo.bar", - Resource: "foos", - }, - watchCount: 29, - initialSeatsExpected: 1, - finalSeatsExpected: 3, - additionalLatencyExpected: 5 * time.Millisecond, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is update, watches registered", + requestURI: "http://server/apis/foor.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "update", + APIGroup: "foo.bar", + Resource: "foos", }, - { - name: "request verb is patch, no watches", - requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", - requestInfo: &apirequest.RequestInfo{ - Verb: "patch", - APIGroup: "foo.bar", - Resource: "foos", - }, - initialSeatsExpected: 1, - finalSeatsExpected: 0, - additionalLatencyExpected: 0, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is patch, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "patch", + APIGroup: "foo.bar", + Resource: "foos", }, - { - name: "request verb is patch, watches registered", - requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", - requestInfo: &apirequest.RequestInfo{ - Verb: "patch", - APIGroup: "foo.bar", - Resource: "foos", - }, - watchCount: 29, - initialSeatsExpected: 1, - finalSeatsExpected: 3, - additionalLatencyExpected: 5 * time.Millisecond, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is patch, watches registered", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "patch", + APIGroup: "foo.bar", + Resource: "foos", }, - { - name: "request verb is delete, no watches", - requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", - requestInfo: &apirequest.RequestInfo{ - Verb: "delete", - APIGroup: "foo.bar", - Resource: "foos", - }, - initialSeatsExpected: 1, - finalSeatsExpected: 0, - additionalLatencyExpected: 0, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is delete, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "delete", + APIGroup: "foo.bar", + Resource: "foos", }, - { - name: "request verb is delete, watches registered", - requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", - requestInfo: &apirequest.RequestInfo{ - Verb: "delete", - APIGroup: "foo.bar", - Resource: "foos", - }, - watchCount: 29, - initialSeatsExpected: 1, - finalSeatsExpected: 3, - additionalLatencyExpected: 5 * time.Millisecond, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is delete, watches registered", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "delete", + APIGroup: "foo.bar", + Resource: "foos", }, - */ + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, } for _, test := range tests { @@ -396,7 +392,14 @@ func TestWorkEstimator(t *testing.T) { watchCountsFn := func(_ *apirequest.RequestInfo) int { return test.watchCount } - estimator := NewWorkEstimator(countsFn, watchCountsFn) + + // TODO(wojtek-t): Simplify it once we enable mutating work estimator + // by default. + testEstimator := &workEstimator{ + listWorkEstimator: newListWorkEstimator(countsFn), + mutatingWorkEstimator: newTestMutatingWorkEstimator(watchCountsFn, true), + } + estimator := WorkEstimatorFunc(testEstimator.estimate) req, err := http.NewRequest("GET", test.requestURI, nil) if err != nil {