Merge pull request #118601 from andrewsykim/apf-tune-max-seats

priority & fairness: support dynamic max seats
This commit is contained in:
Kubernetes Prow Robot 2023-07-18 01:11:20 -07:00 committed by GitHub
commit f6bcef0fd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 376 additions and 56 deletions

View File

@ -904,7 +904,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
if c.FlowControl != nil { if c.FlowControl != nil {
workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig() workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator( requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg) c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
handler = filterlatency.TrackCompleted(handler) handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness") handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")

View File

@ -80,6 +80,7 @@ type fakeApfFilter struct {
postDequeue func() postDequeue func()
utilflowcontrol.WatchTracker utilflowcontrol.WatchTracker
utilflowcontrol.MaxSeatsTracker
} }
func (t fakeApfFilter) Handle(ctx context.Context, func (t fakeApfFilter) Handle(ctx context.Context,
@ -146,10 +147,11 @@ func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptes
func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func()) *httptest.Server { func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func()) *httptest.Server {
fakeFilter := fakeApfFilter{ fakeFilter := fakeApfFilter{
mockDecision: decision, mockDecision: decision,
postEnqueue: postEnqueue, postEnqueue: postEnqueue,
postDequeue: postDequeue, postDequeue: postDequeue,
WatchTracker: utilflowcontrol.NewWatchTracker(), WatchTracker: utilflowcontrol.NewWatchTracker(),
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
} }
return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute) return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute)
} }
@ -349,12 +351,14 @@ type fakeWatchApfFilter struct {
preExecutePanic bool preExecutePanic bool
utilflowcontrol.WatchTracker utilflowcontrol.WatchTracker
utilflowcontrol.MaxSeatsTracker
} }
func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
return &fakeWatchApfFilter{ return &fakeWatchApfFilter{
capacity: capacity, capacity: capacity,
WatchTracker: utilflowcontrol.NewWatchTracker(), WatchTracker: utilflowcontrol.NewWatchTracker(),
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
} }
} }

View File

@ -58,6 +58,11 @@ import (
const timeFmt = "2006-01-02T15:04:05.999" const timeFmt = "2006-01-02T15:04:05.999"
const (
// priorityLevelMaxSeatsPercent is the percentage of the nominalCL used as max seats allocatable from work estimator
priorityLevelMaxSeatsPercent = float64(0.15)
)
// This file contains a simple local (to the apiserver) controller // This file contains a simple local (to the apiserver) controller
// that digests API Priority and Fairness config objects (FlowSchema // that digests API Priority and Fairness config objects (FlowSchema
// and PriorityLevelConfiguration) into the data structure that the // and PriorityLevelConfiguration) into the data structure that the
@ -151,6 +156,12 @@ type configController struct {
// watchTracker implements the necessary WatchTracker interface. // watchTracker implements the necessary WatchTracker interface.
WatchTracker WatchTracker
// MaxSeatsTracker tracks the maximum seats that should be allocatable from the
// work estimator for a given priority level. This controller does not enforce
// any limits on max seats stored in this tracker, it is up to the work estimator
// to set lower/upper limits on max seats (currently min=1, max=10).
MaxSeatsTracker
// the most recent update attempts, ordered by increasing age. // the most recent update attempts, ordered by increasing age.
// Consumer trims to keep only the last minute's worth of entries. // Consumer trims to keep only the last minute's worth of entries.
// The controller uses this to limit itself to at most six updates // The controller uses this to limit itself to at most six updates
@ -274,6 +285,7 @@ func newTestableController(config TestableConfig) *configController {
flowcontrolClient: config.FlowcontrolClient, flowcontrolClient: config.FlowcontrolClient,
priorityLevelStates: make(map[string]*priorityLevelState), priorityLevelStates: make(map[string]*priorityLevelState),
WatchTracker: NewWatchTracker(), WatchTracker: NewWatchTracker(),
MaxSeatsTracker: NewMaxSeatsTracker(),
} }
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager) klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
// Start with longish delay because conflicts will be between // Start with longish delay because conflicts will be between
@ -770,6 +782,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
// draining and no use is coming from another // draining and no use is coming from another
// goroutine // goroutine
klog.V(3).Infof("Removing undesired priority level %q, Type=%v", plName, plState.pl.Spec.Type) klog.V(3).Infof("Removing undesired priority level %q, Type=%v", plName, plState.pl.Spec.Type)
meal.cfgCtlr.MaxSeatsTracker.ForgetPriorityLevel(plName)
continue continue
} }
if !plState.quiescing { if !plState.quiescing {
@ -828,6 +841,17 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
if limited := plState.pl.Spec.Limited; limited != nil { if limited := plState.pl.Spec.Limited; limited != nil {
if qCfg := limited.LimitResponse.Queuing; qCfg != nil { if qCfg := limited.LimitResponse.Queuing; qCfg != nil {
meal.maxWaitingRequests += int(qCfg.Queues * qCfg.QueueLengthLimit) meal.maxWaitingRequests += int(qCfg.Queues * qCfg.QueueLengthLimit)
// Max seats allocatable from work estimator is calculated as MAX(1, MIN(0.15 * nominalCL, nominalCL/handSize)).
// This is to keep max seats relative to total available concurrency with a minimum value of 1.
// 15% of nominal concurrency was chosen since it preserved the previous max seats of 10 for default priority levels
// when using apiserver's default total server concurrency of 600 (--max-requests-inflight=400, --max-mutating-requests-inflight=200).
// This ensures that clusters with relatively high inflight requests will continue to use a max seats of 10
// while clusters with lower inflight requests will use max seats no greater than nominalCL/handSize.
// Calculated max seats can return arbitrarily high values but work estimator currently limits max seats at 10.
handSize := plState.pl.Spec.Limited.LimitResponse.Queuing.HandSize
maxSeats := uint64(math.Max(1, math.Min(math.Ceil(float64(concurrencyLimit)*priorityLevelMaxSeatsPercent), float64(int32(concurrencyLimit)/handSize))))
meal.cfgCtlr.MaxSeatsTracker.SetMaxSeats(plName, maxSeats)
} }
} }
if plState.queues == nil { if plState.queues == nil {

View File

@ -77,6 +77,10 @@ type Interface interface {
// WatchTracker provides the WatchTracker interface. // WatchTracker provides the WatchTracker interface.
WatchTracker WatchTracker
// MaxSeatsTracker is invoked from the work estimator to track max seats
// that can be occupied by a request for a priority level.
MaxSeatsTracker
} }
// This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md // This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md

View File

@ -0,0 +1,66 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"sync"
)
// MaxSeatsTracker is used to track max seats allocatable per priority level from the work estimator
type MaxSeatsTracker interface {
// GetMaxSeats returns the maximum seats a request should occupy for a given priority level.
GetMaxSeats(priorityLevelName string) uint64
// SetMaxSeats configures max seats for a priority level.
SetMaxSeats(priorityLevelName string, maxSeats uint64)
// ForgetPriorityLevel removes max seats tracking for a priority level.
ForgetPriorityLevel(priorityLevelName string)
}
type maxSeatsTracker struct {
sync.RWMutex
maxSeats map[string]uint64
}
func NewMaxSeatsTracker() MaxSeatsTracker {
return &maxSeatsTracker{
maxSeats: make(map[string]uint64),
}
}
func (m *maxSeatsTracker) GetMaxSeats(plName string) uint64 {
m.RLock()
defer m.RUnlock()
return m.maxSeats[plName]
}
func (m *maxSeatsTracker) SetMaxSeats(plName string, maxSeats uint64) {
m.Lock()
defer m.Unlock()
m.maxSeats[plName] = maxSeats
}
func (m *maxSeatsTracker) ForgetPriorityLevel(plName string) {
m.Lock()
defer m.Unlock()
delete(m.maxSeats, plName)
}

View File

@ -0,0 +1,140 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"testing"
"time"
"k8s.io/api/flowcontrol/v1beta3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
)
// Test_GetMaxSeats tests max seats retrieved from MaxSeatsTracker
func Test_GetMaxSeats(t *testing.T) {
testcases := []struct {
name string
nominalCL int
handSize int32
expectedMaxSeats uint64
}{
{
name: "nominalCL=5, handSize=6",
nominalCL: 5,
handSize: 6,
expectedMaxSeats: 1,
},
{
name: "nominalCL=10, handSize=6",
nominalCL: 10,
handSize: 6,
expectedMaxSeats: 1,
},
{
name: "nominalCL=15, handSize=6",
nominalCL: 15,
handSize: 6,
expectedMaxSeats: 2,
},
{
name: "nominalCL=20, handSize=6",
nominalCL: 20,
handSize: 6,
expectedMaxSeats: 3,
},
{
name: "nominalCL=35, handSize=6",
nominalCL: 35,
handSize: 6,
expectedMaxSeats: 5,
},
{
name: "nominalCL=100, handSize=6",
nominalCL: 100,
handSize: 6,
expectedMaxSeats: 15,
},
{
name: "nominalCL=200, handSize=6",
nominalCL: 200,
handSize: 6,
expectedMaxSeats: 30,
},
{
name: "nominalCL=10, handSize=1",
nominalCL: 10,
handSize: 1,
expectedMaxSeats: 2,
},
{
name: "nominalCL=100, handSize=20",
nominalCL: 100,
handSize: 20,
expectedMaxSeats: 5,
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
clientset := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second)
flowcontrolClient := clientset.FlowcontrolV1beta3()
startTime := time.Now()
clk, _ := eventclock.NewFake(startTime, 0, nil)
c := newTestableController(TestableConfig{
Name: "Controller",
Clock: clk,
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
// for the purposes of this test, serverCL ~= nominalCL since there is
// only 1 PL with large concurrency shares, making mandatory PLs negligible.
ServerConcurrencyLimit: testcase.nominalCL,
RequestWaitLimit: time.Minute,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk),
})
testPriorityLevel := &v1beta3.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pl",
},
Spec: v1beta3.PriorityLevelConfigurationSpec{
Type: v1beta3.PriorityLevelEnablementLimited,
Limited: &v1beta3.LimitedPriorityLevelConfiguration{
NominalConcurrencyShares: 10000,
LimitResponse: v1beta3.LimitResponse{
Queuing: &v1beta3.QueuingConfiguration{
HandSize: testcase.handSize,
},
},
},
},
}
c.digestConfigObjects([]*v1beta3.PriorityLevelConfiguration{testPriorityLevel}, nil)
maxSeats := c.GetMaxSeats("test-pl")
if maxSeats != testcase.expectedMaxSeats {
t.Errorf("unexpected max seats, got=%d, want=%d", maxSeats, testcase.expectedMaxSeats)
}
})
}
}

View File

@ -24,7 +24,7 @@ import (
const ( const (
minimumSeats = 1 minimumSeats = 1
maximumSeats = 10 maximumSeatsLimit = 10
objectsPerSeat = 100.0 objectsPerSeat = 100.0
watchesPerSeat = 10.0 watchesPerSeat = 10.0
enableMutatingWorkEstimator = true enableMutatingWorkEstimator = true
@ -39,12 +39,13 @@ type WorkEstimatorConfig struct {
// MinimumSeats is the minimum number of seats a request must occupy. // MinimumSeats is the minimum number of seats a request must occupy.
MinimumSeats uint64 `json:"minimumSeats,omitempty"` MinimumSeats uint64 `json:"minimumSeats,omitempty"`
// MaximumSeats is the maximum number of seats a request can occupy
// MaximumSeatsLimit is an upper limit on the max seats a request can occupy.
// //
// NOTE: work_estimate_seats_samples metric uses the value of maximumSeats // NOTE: work_estimate_seats_samples metric uses the value of maximumSeats
// as the upper bound, so when we change maximumSeats we should also // as the upper bound, so when we change maximumSeats we should also
// update the buckets of the metric. // update the buckets of the metric.
MaximumSeats uint64 `json:"maximumSeats,omitempty"` MaximumSeatsLimit uint64 `json:"maximumSeatsLimit,omitempty"`
} }
// ListWorkEstimatorConfig holds work estimator parameters related to list requests. // ListWorkEstimatorConfig holds work estimator parameters related to list requests.
@ -66,7 +67,7 @@ type MutatingWorkEstimatorConfig struct {
func DefaultWorkEstimatorConfig() *WorkEstimatorConfig { func DefaultWorkEstimatorConfig() *WorkEstimatorConfig {
return &WorkEstimatorConfig{ return &WorkEstimatorConfig{
MinimumSeats: minimumSeats, MinimumSeats: minimumSeats,
MaximumSeats: maximumSeats, MaximumSeatsLimit: maximumSeatsLimit,
ListWorkEstimatorConfig: defaultListWorkEstimatorConfig(), ListWorkEstimatorConfig: defaultListWorkEstimatorConfig(),
MutatingWorkEstimatorConfig: defaultMutatingWorkEstimatorConfig(), MutatingWorkEstimatorConfig: defaultMutatingWorkEstimatorConfig(),
} }

View File

@ -29,10 +29,11 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc { func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
estimator := &listWorkEstimator{ estimator := &listWorkEstimator{
config: config, config: config,
countGetterFn: countFn, countGetterFn: countFn,
maxSeatsFn: maxSeatsFn,
} }
return estimator.estimate return estimator.estimate
} }
@ -40,14 +41,21 @@ func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorCo
type listWorkEstimator struct { type listWorkEstimator struct {
config *WorkEstimatorConfig config *WorkEstimatorConfig
countGetterFn objectCountGetterFunc countGetterFn objectCountGetterFunc
maxSeatsFn maxSeatsFunc
} }
func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
minSeats := e.config.MinimumSeats
maxSeats := e.maxSeatsFn(priorityLevelName)
if maxSeats == 0 || maxSeats > e.config.MaximumSeatsLimit {
maxSeats = e.config.MaximumSeatsLimit
}
requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok { if !ok {
// no RequestInfo should never happen, but to be on the safe side // no RequestInfo should never happen, but to be on the safe side
// let's return maximumSeats // let's return maximumSeats
return WorkEstimate{InitialSeats: e.config.MaximumSeats} return WorkEstimate{InitialSeats: maxSeats}
} }
if requestInfo.Name != "" { if requestInfo.Name != "" {
@ -56,7 +64,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// Example of such list requests: // Example of such list requests:
// /apis/certificates.k8s.io/v1/certificatesigningrequests?fieldSelector=metadata.name%3Dcsr-xxs4m // /apis/certificates.k8s.io/v1/certificatesigningrequests?fieldSelector=metadata.name%3Dcsr-xxs4m
// /api/v1/namespaces/test/configmaps?fieldSelector=metadata.name%3Dbig-deployment-1&limit=500&resourceVersion=0 // /api/v1/namespaces/test/configmaps?fieldSelector=metadata.name%3Dbig-deployment-1&limit=500&resourceVersion=0
return WorkEstimate{InitialSeats: e.config.MinimumSeats} return WorkEstimate{InitialSeats: minSeats}
} }
query := r.URL.Query() query := r.URL.Query()
@ -66,7 +74,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// This request is destined to fail in the validation layer, // This request is destined to fail in the validation layer,
// return maximumSeats for this request to be consistent. // return maximumSeats for this request to be consistent.
return WorkEstimate{InitialSeats: e.config.MaximumSeats} return WorkEstimate{InitialSeats: maxSeats}
} }
// For watch requests, we want to adjust the cost only if they explicitly request // For watch requests, we want to adjust the cost only if they explicitly request
@ -86,7 +94,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// be conservative here and allocate maximum seats to this list request. // be conservative here and allocate maximum seats to this list request.
// NOTE: if a CRD is removed, its count will go stale first and then the // NOTE: if a CRD is removed, its count will go stale first and then the
// pruner will eventually remove the CRD from the cache. // pruner will eventually remove the CRD from the cache.
return WorkEstimate{InitialSeats: e.config.MaximumSeats} return WorkEstimate{InitialSeats: maxSeats}
case err == ObjectCountNotFoundErr: case err == ObjectCountNotFoundErr:
// there are multiple scenarios in which we can see this error: // there are multiple scenarios in which we can see this error:
// a. the type is truly unknown, a typo on the caller's part. // a. the type is truly unknown, a typo on the caller's part.
@ -100,12 +108,12 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// when aggregated API calls are overestimated, we allocate the minimum // when aggregated API calls are overestimated, we allocate the minimum
// possible seats (see #109106 as an example when being more conservative // possible seats (see #109106 as an example when being more conservative
// led to problems). // led to problems).
return WorkEstimate{InitialSeats: e.config.MinimumSeats} return WorkEstimate{InitialSeats: minSeats}
case err != nil: case err != nil:
// we should never be here since Get returns either ObjectCountStaleErr or // we should never be here since Get returns either ObjectCountStaleErr or
// ObjectCountNotFoundErr, return maximumSeats to be on the safe side. // ObjectCountNotFoundErr, return maximumSeats to be on the safe side.
klog.ErrorS(err, "Unexpected error from object count tracker") klog.ErrorS(err, "Unexpected error from object count tracker")
return WorkEstimate{InitialSeats: e.config.MaximumSeats} return WorkEstimate{InitialSeats: maxSeats}
} }
limit := numStored limit := numStored
@ -134,11 +142,11 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
seats := uint64(math.Ceil(float64(estimatedObjectsToBeProcessed) / e.config.ObjectsPerSeat)) seats := uint64(math.Ceil(float64(estimatedObjectsToBeProcessed) / e.config.ObjectsPerSeat))
// make sure we never return a seat of zero // make sure we never return a seat of zero
if seats < e.config.MinimumSeats { if seats < minSeats {
seats = e.config.MinimumSeats seats = minSeats
} }
if seats > e.config.MaximumSeats { if seats > maxSeats {
seats = e.config.MaximumSeats seats = maxSeats
} }
return WorkEstimate{InitialSeats: seats} return WorkEstimate{InitialSeats: seats}
} }

View File

@ -25,25 +25,33 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
) )
func newMutatingWorkEstimator(countFn watchCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc { func newMutatingWorkEstimator(countFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
estimator := &mutatingWorkEstimator{ estimator := &mutatingWorkEstimator{
config: config, config: config,
countFn: countFn, countFn: countFn,
maxSeatsFn: maxSeatsFn,
} }
return estimator.estimate return estimator.estimate
} }
type mutatingWorkEstimator struct { type mutatingWorkEstimator struct {
config *WorkEstimatorConfig config *WorkEstimatorConfig
countFn watchCountGetterFunc countFn watchCountGetterFunc
maxSeatsFn maxSeatsFunc
} }
func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
minSeats := e.config.MinimumSeats
maxSeats := e.maxSeatsFn(priorityLevelName)
if maxSeats == 0 || maxSeats > e.config.MaximumSeatsLimit {
maxSeats = e.config.MaximumSeatsLimit
}
// TODO(wojtekt): Remove once we tune the algorithm to not fail // TODO(wojtekt): Remove once we tune the algorithm to not fail
// scalability tests. // scalability tests.
if !e.config.Enabled { if !e.config.Enabled {
return WorkEstimate{ return WorkEstimate{
InitialSeats: 1, InitialSeats: minSeats,
} }
} }
@ -52,15 +60,15 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori
// no RequestInfo should never happen, but to be on the safe side // no RequestInfo should never happen, but to be on the safe side
// let's return a large value. // let's return a large value.
return WorkEstimate{ return WorkEstimate{
InitialSeats: 1, InitialSeats: minSeats,
FinalSeats: e.config.MaximumSeats, FinalSeats: maxSeats,
AdditionalLatency: e.config.eventAdditionalDuration(), AdditionalLatency: e.config.eventAdditionalDuration(),
} }
} }
if isRequestExemptFromWatchEvents(requestInfo) { if isRequestExemptFromWatchEvents(requestInfo) {
return WorkEstimate{ return WorkEstimate{
InitialSeats: e.config.MinimumSeats, InitialSeats: minSeats,
FinalSeats: 0, FinalSeats: 0,
AdditionalLatency: time.Duration(0), AdditionalLatency: time.Duration(0),
} }
@ -126,8 +134,8 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori
// //
// TODO: Confirm that the current cap of maximumSeats allow us to // TODO: Confirm that the current cap of maximumSeats allow us to
// achieve the above. // achieve the above.
if finalSeats > e.config.MaximumSeats { if finalSeats > maxSeats {
finalSeats = e.config.MaximumSeats finalSeats = maxSeats
} }
additionalLatency = finalWork.DurationPerSeat(float64(finalSeats)) additionalLatency = finalWork.DurationPerSeat(float64(finalSeats))
} }

View File

@ -64,15 +64,19 @@ type objectCountGetterFunc func(string) (int64, error)
// number of watchers potentially interested in a given request. // number of watchers potentially interested in a given request.
type watchCountGetterFunc func(*apirequest.RequestInfo) int type watchCountGetterFunc func(*apirequest.RequestInfo) int
// MaxSeatsFunc represents a function that returns the maximum seats
// allowed for the work estimator for a given priority level.
type maxSeatsFunc func(priorityLevelName string) uint64
// NewWorkEstimator estimates the work that will be done by a given request, // NewWorkEstimator estimates the work that will be done by a given request,
// if no WorkEstimatorFunc matches the given request then the default // if no WorkEstimatorFunc matches the given request then the default
// work estimate of 1 seat is allocated to the request. // work estimate of 1 seat is allocated to the request.
func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc { func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
estimator := &workEstimator{ estimator := &workEstimator{
minimumSeats: config.MinimumSeats, minimumSeats: config.MinimumSeats,
maximumSeats: config.MaximumSeats, maximumSeatsLimit: config.MaximumSeatsLimit,
listWorkEstimator: newListWorkEstimator(objectCountFn, config), listWorkEstimator: newListWorkEstimator(objectCountFn, config, maxSeatsFn),
mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn, config), mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn, config, maxSeatsFn),
} }
return estimator.estimate return estimator.estimate
} }
@ -89,8 +93,8 @@ func (e WorkEstimatorFunc) EstimateWork(r *http.Request, flowSchemaName, priorit
type workEstimator struct { type workEstimator struct {
// the minimum number of seats a request must occupy // the minimum number of seats a request must occupy
minimumSeats uint64 minimumSeats uint64
// the maximum number of seats a request can occupy // the default maximum number of seats a request can occupy
maximumSeats uint64 maximumSeatsLimit uint64
// listWorkEstimator estimates work for list request(s) // listWorkEstimator estimates work for list request(s)
listWorkEstimator WorkEstimatorFunc listWorkEstimator WorkEstimatorFunc
// mutatingWorkEstimator calculates the width of mutating request(s) // mutatingWorkEstimator calculates the width of mutating request(s)
@ -102,7 +106,7 @@ func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelN
if !ok { if !ok {
klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI) klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI)
// no RequestInfo should never happen, but to be on the safe side let's return maximumSeats // no RequestInfo should never happen, but to be on the safe side let's return maximumSeats
return WorkEstimate{InitialSeats: e.maximumSeats} return WorkEstimate{InitialSeats: e.maximumSeatsLimit}
} }
switch requestInfo.Verb { switch requestInfo.Verb {

View File

@ -32,8 +32,6 @@ func TestWorkEstimator(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
defaultCfg := DefaultWorkEstimatorConfig() defaultCfg := DefaultWorkEstimatorConfig()
minimumSeats := defaultCfg.MinimumSeats
maximumSeats := defaultCfg.MaximumSeats
tests := []struct { tests := []struct {
name string name string
@ -42,6 +40,7 @@ func TestWorkEstimator(t *testing.T) {
counts map[string]int64 counts map[string]int64
countErr error countErr error
watchCount int watchCount int
maxSeats uint64
initialSeatsExpected uint64 initialSeatsExpected uint64
finalSeatsExpected uint64 finalSeatsExpected uint64
additionalLatencyExpected time.Duration additionalLatencyExpected time.Duration
@ -50,7 +49,8 @@ func TestWorkEstimator(t *testing.T) {
name: "request has no RequestInfo", name: "request has no RequestInfo",
requestURI: "http://server/apis/", requestURI: "http://server/apis/",
requestInfo: nil, requestInfo: nil,
initialSeatsExpected: maximumSeats, maxSeats: 10,
initialSeatsExpected: 10,
}, },
{ {
name: "request verb is not list", name: "request verb is not list",
@ -58,7 +58,8 @@ func TestWorkEstimator(t *testing.T) {
requestInfo: &apirequest.RequestInfo{ requestInfo: &apirequest.RequestInfo{
Verb: "get", Verb: "get",
}, },
initialSeatsExpected: minimumSeats, maxSeats: 10,
initialSeatsExpected: 1,
}, },
{ {
name: "request verb is list, conversion to ListOptions returns error", name: "request verb is list, conversion to ListOptions returns error",
@ -71,7 +72,8 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 799, "events.foo.bar": 799,
}, },
initialSeatsExpected: maximumSeats, maxSeats: 10,
initialSeatsExpected: 10,
}, },
{ {
name: "request verb is list, has limit and resource version is 1", name: "request verb is list, has limit and resource version is 1",
@ -84,6 +86,7 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 699, "events.foo.bar": 699,
}, },
maxSeats: 10,
initialSeatsExpected: 8, initialSeatsExpected: 8,
}, },
{ {
@ -97,6 +100,7 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 699, "events.foo.bar": 699,
}, },
maxSeats: 10,
initialSeatsExpected: 7, initialSeatsExpected: 7,
}, },
{ {
@ -110,6 +114,7 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 699, "events.foo.bar": 699,
}, },
maxSeats: 10,
initialSeatsExpected: 8, initialSeatsExpected: 8,
}, },
{ {
@ -123,6 +128,7 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 399, "events.foo.bar": 399,
}, },
maxSeats: 10,
initialSeatsExpected: 8, initialSeatsExpected: 8,
}, },
{ {
@ -134,7 +140,8 @@ func TestWorkEstimator(t *testing.T) {
Resource: "events", Resource: "events",
}, },
countErr: ObjectCountNotFoundErr, countErr: ObjectCountNotFoundErr,
initialSeatsExpected: minimumSeats, maxSeats: 10,
initialSeatsExpected: 1,
}, },
{ {
name: "request verb is list, continuation is set", name: "request verb is list, continuation is set",
@ -147,6 +154,7 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 699, "events.foo.bar": 699,
}, },
maxSeats: 10,
initialSeatsExpected: 8, initialSeatsExpected: 8,
}, },
{ {
@ -160,6 +168,7 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 399, "events.foo.bar": 399,
}, },
maxSeats: 10,
initialSeatsExpected: 4, initialSeatsExpected: 4,
}, },
{ {
@ -186,6 +195,7 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 699, "events.foo.bar": 699,
}, },
maxSeats: 10,
initialSeatsExpected: 8, initialSeatsExpected: 8,
}, },
{ {
@ -199,6 +209,7 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 799, "events.foo.bar": 799,
}, },
maxSeats: 10,
initialSeatsExpected: 8, initialSeatsExpected: 8,
}, },
{ {
@ -212,7 +223,22 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 1999, "events.foo.bar": 1999,
}, },
initialSeatsExpected: maximumSeats, maxSeats: 10,
initialSeatsExpected: 10,
},
{
name: "request verb is list, maximum is capped, lower max seats",
requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=foo",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 1999,
},
maxSeats: 5,
initialSeatsExpected: 5,
}, },
{ {
name: "request verb is list, list from cache, count not known", name: "request verb is list, list from cache, count not known",
@ -223,7 +249,8 @@ func TestWorkEstimator(t *testing.T) {
Resource: "events", Resource: "events",
}, },
countErr: ObjectCountNotFoundErr, countErr: ObjectCountNotFoundErr,
initialSeatsExpected: minimumSeats, maxSeats: 10,
initialSeatsExpected: 1,
}, },
{ {
name: "request verb is list, object count is stale", name: "request verb is list, object count is stale",
@ -237,7 +264,8 @@ func TestWorkEstimator(t *testing.T) {
"events.foo.bar": 799, "events.foo.bar": 799,
}, },
countErr: ObjectCountStaleErr, countErr: ObjectCountStaleErr,
initialSeatsExpected: maximumSeats, maxSeats: 10,
initialSeatsExpected: 10,
}, },
{ {
name: "request verb is list, object count is not found", name: "request verb is list, object count is not found",
@ -248,7 +276,8 @@ func TestWorkEstimator(t *testing.T) {
Resource: "events", Resource: "events",
}, },
countErr: ObjectCountNotFoundErr, countErr: ObjectCountNotFoundErr,
initialSeatsExpected: minimumSeats, maxSeats: 10,
initialSeatsExpected: 1,
}, },
{ {
name: "request verb is list, count getter throws unknown error", name: "request verb is list, count getter throws unknown error",
@ -259,7 +288,8 @@ func TestWorkEstimator(t *testing.T) {
Resource: "events", Resource: "events",
}, },
countErr: errors.New("unknown error"), countErr: errors.New("unknown error"),
initialSeatsExpected: maximumSeats, maxSeats: 10,
initialSeatsExpected: 10,
}, },
{ {
name: "request verb is list, metadata.name specified", name: "request verb is list, metadata.name specified",
@ -273,7 +303,8 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 799, "events.foo.bar": 799,
}, },
initialSeatsExpected: minimumSeats, maxSeats: 10,
initialSeatsExpected: 1,
}, },
{ {
name: "request verb is list, metadata.name, resourceVersion and limit specified", name: "request verb is list, metadata.name, resourceVersion and limit specified",
@ -287,7 +318,8 @@ func TestWorkEstimator(t *testing.T) {
counts: map[string]int64{ counts: map[string]int64{
"events.foo.bar": 799, "events.foo.bar": 799,
}, },
initialSeatsExpected: minimumSeats, maxSeats: 10,
initialSeatsExpected: 1,
}, },
{ {
name: "request verb is watch, sendInitialEvents is nil", name: "request verb is watch, sendInitialEvents is nil",
@ -336,6 +368,7 @@ func TestWorkEstimator(t *testing.T) {
APIGroup: "foo.bar", APIGroup: "foo.bar",
Resource: "foos", Resource: "foos",
}, },
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 0, finalSeatsExpected: 0,
additionalLatencyExpected: 0, additionalLatencyExpected: 0,
@ -349,6 +382,7 @@ func TestWorkEstimator(t *testing.T) {
Resource: "foos", Resource: "foos",
}, },
watchCount: 29, watchCount: 29,
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 3, finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond, additionalLatencyExpected: 5 * time.Millisecond,
@ -362,6 +396,7 @@ func TestWorkEstimator(t *testing.T) {
Resource: "foos", Resource: "foos",
}, },
watchCount: 5, watchCount: 5,
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 0, finalSeatsExpected: 0,
additionalLatencyExpected: 0, additionalLatencyExpected: 0,
@ -375,6 +410,7 @@ func TestWorkEstimator(t *testing.T) {
Resource: "foos", Resource: "foos",
}, },
watchCount: 199, watchCount: 199,
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 10, finalSeatsExpected: 10,
additionalLatencyExpected: 10 * time.Millisecond, additionalLatencyExpected: 10 * time.Millisecond,
@ -387,6 +423,7 @@ func TestWorkEstimator(t *testing.T) {
APIGroup: "foo.bar", APIGroup: "foo.bar",
Resource: "foos", Resource: "foos",
}, },
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 0, finalSeatsExpected: 0,
additionalLatencyExpected: 0, additionalLatencyExpected: 0,
@ -400,6 +437,7 @@ func TestWorkEstimator(t *testing.T) {
Resource: "foos", Resource: "foos",
}, },
watchCount: 29, watchCount: 29,
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 3, finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond, additionalLatencyExpected: 5 * time.Millisecond,
@ -412,6 +450,7 @@ func TestWorkEstimator(t *testing.T) {
APIGroup: "foo.bar", APIGroup: "foo.bar",
Resource: "foos", Resource: "foos",
}, },
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 0, finalSeatsExpected: 0,
additionalLatencyExpected: 0, additionalLatencyExpected: 0,
@ -425,10 +464,25 @@ func TestWorkEstimator(t *testing.T) {
Resource: "foos", Resource: "foos",
}, },
watchCount: 29, watchCount: 29,
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 3, finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond, additionalLatencyExpected: 5 * time.Millisecond,
}, },
{
name: "request verb is patch, watches registered, lower max seats",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "patch",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 100,
maxSeats: 5,
initialSeatsExpected: 1,
finalSeatsExpected: 5,
additionalLatencyExpected: 10 * time.Millisecond,
},
{ {
name: "request verb is delete, no watches", name: "request verb is delete, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
@ -437,6 +491,7 @@ func TestWorkEstimator(t *testing.T) {
APIGroup: "foo.bar", APIGroup: "foo.bar",
Resource: "foos", Resource: "foos",
}, },
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 0, finalSeatsExpected: 0,
additionalLatencyExpected: 0, additionalLatencyExpected: 0,
@ -450,6 +505,7 @@ func TestWorkEstimator(t *testing.T) {
Resource: "foos", Resource: "foos",
}, },
watchCount: 29, watchCount: 29,
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: 3, finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond, additionalLatencyExpected: 5 * time.Millisecond,
@ -464,7 +520,8 @@ func TestWorkEstimator(t *testing.T) {
Subresource: "token", Subresource: "token",
}, },
watchCount: 5777, watchCount: 5777,
initialSeatsExpected: minimumSeats, maxSeats: 10,
initialSeatsExpected: 1,
finalSeatsExpected: 0, finalSeatsExpected: 0,
additionalLatencyExpected: 0, additionalLatencyExpected: 0,
}, },
@ -477,8 +534,9 @@ func TestWorkEstimator(t *testing.T) {
Resource: "serviceaccounts", Resource: "serviceaccounts",
}, },
watchCount: 1000, watchCount: 1000,
maxSeats: 10,
initialSeatsExpected: 1, initialSeatsExpected: 1,
finalSeatsExpected: maximumSeats, finalSeatsExpected: 10,
additionalLatencyExpected: 50 * time.Millisecond, additionalLatencyExpected: 50 * time.Millisecond,
}, },
} }
@ -495,8 +553,11 @@ func TestWorkEstimator(t *testing.T) {
watchCountsFn := func(_ *apirequest.RequestInfo) int { watchCountsFn := func(_ *apirequest.RequestInfo) int {
return test.watchCount return test.watchCount
} }
maxSeatsFn := func(_ string) uint64 {
return test.maxSeats
}
estimator := NewWorkEstimator(countsFn, watchCountsFn, defaultCfg) estimator := NewWorkEstimator(countsFn, watchCountsFn, defaultCfg, maxSeatsFn)
req, err := http.NewRequest("GET", test.requestURI, nil) req, err := http.NewRequest("GET", test.requestURI, nil)
if err != nil { if err != nil {