diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index bb11f22ec67..c9d449572c3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -904,7 +904,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { if c.FlowControl != nil { workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig() requestWorkEstimator := flowcontrolrequest.NewWorkEstimator( - c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg) + c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats) handler = filterlatency.TrackCompleted(handler) handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness") diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 062584177ee..81a17a5b18e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -80,6 +80,7 @@ type fakeApfFilter struct { postDequeue func() utilflowcontrol.WatchTracker + utilflowcontrol.MaxSeatsTracker } func (t fakeApfFilter) Handle(ctx context.Context, @@ -146,10 +147,11 @@ func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptes func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func()) *httptest.Server { fakeFilter := fakeApfFilter{ - mockDecision: decision, - postEnqueue: postEnqueue, - postDequeue: postDequeue, - WatchTracker: utilflowcontrol.NewWatchTracker(), + mockDecision: decision, + postEnqueue: postEnqueue, + postDequeue: postDequeue, + WatchTracker: utilflowcontrol.NewWatchTracker(), + MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(), } return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute) } @@ -349,12 +351,14 @@ type fakeWatchApfFilter struct { preExecutePanic bool utilflowcontrol.WatchTracker + utilflowcontrol.MaxSeatsTracker } func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { return &fakeWatchApfFilter{ - capacity: capacity, - WatchTracker: utilflowcontrol.NewWatchTracker(), + capacity: capacity, + WatchTracker: utilflowcontrol.NewWatchTracker(), + MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(), } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 9bfce4a6fd5..708bf2cdef0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -58,6 +58,11 @@ import ( const timeFmt = "2006-01-02T15:04:05.999" +const ( + // priorityLevelMaxSeatsPercent is the percentage of the nominalCL used as max seats allocatable from work estimator + priorityLevelMaxSeatsPercent = float64(0.15) +) + // This file contains a simple local (to the apiserver) controller // that digests API Priority and Fairness config objects (FlowSchema // and PriorityLevelConfiguration) into the data structure that the @@ -151,6 +156,12 @@ type configController struct { // watchTracker implements the necessary WatchTracker interface. WatchTracker + // MaxSeatsTracker tracks the maximum seats that should be allocatable from the + // work estimator for a given priority level. This controller does not enforce + // any limits on max seats stored in this tracker, it is up to the work estimator + // to set lower/upper limits on max seats (currently min=1, max=10). + MaxSeatsTracker + // the most recent update attempts, ordered by increasing age. // Consumer trims to keep only the last minute's worth of entries. // The controller uses this to limit itself to at most six updates @@ -274,6 +285,7 @@ func newTestableController(config TestableConfig) *configController { flowcontrolClient: config.FlowcontrolClient, priorityLevelStates: make(map[string]*priorityLevelState), WatchTracker: NewWatchTracker(), + MaxSeatsTracker: NewMaxSeatsTracker(), } klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager) // Start with longish delay because conflicts will be between @@ -770,6 +782,7 @@ func (meal *cfgMeal) processOldPLsLocked() { // draining and no use is coming from another // goroutine klog.V(3).Infof("Removing undesired priority level %q, Type=%v", plName, plState.pl.Spec.Type) + meal.cfgCtlr.MaxSeatsTracker.ForgetPriorityLevel(plName) continue } if !plState.quiescing { @@ -828,6 +841,17 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { if limited := plState.pl.Spec.Limited; limited != nil { if qCfg := limited.LimitResponse.Queuing; qCfg != nil { meal.maxWaitingRequests += int(qCfg.Queues * qCfg.QueueLengthLimit) + + // Max seats allocatable from work estimator is calculated as MAX(1, MIN(0.15 * nominalCL, nominalCL/handSize)). + // This is to keep max seats relative to total available concurrency with a minimum value of 1. + // 15% of nominal concurrency was chosen since it preserved the previous max seats of 10 for default priority levels + // when using apiserver's default total server concurrency of 600 (--max-requests-inflight=400, --max-mutating-requests-inflight=200). + // This ensures that clusters with relatively high inflight requests will continue to use a max seats of 10 + // while clusters with lower inflight requests will use max seats no greater than nominalCL/handSize. + // Calculated max seats can return arbitrarily high values but work estimator currently limits max seats at 10. + handSize := plState.pl.Spec.Limited.LimitResponse.Queuing.HandSize + maxSeats := uint64(math.Max(1, math.Min(math.Ceil(float64(concurrencyLimit)*priorityLevelMaxSeatsPercent), float64(int32(concurrencyLimit)/handSize)))) + meal.cfgCtlr.MaxSeatsTracker.SetMaxSeats(plName, maxSeats) } } if plState.queues == nil { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 2929048ecc7..76782623a84 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -77,6 +77,10 @@ type Interface interface { // WatchTracker provides the WatchTracker interface. WatchTracker + + // MaxSeatsTracker is invoked from the work estimator to track max seats + // that can be occupied by a request for a priority level. + MaxSeatsTracker } // This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/max_seats.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/max_seats.go new file mode 100644 index 00000000000..18f88ab3b20 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/max_seats.go @@ -0,0 +1,66 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "sync" +) + +// MaxSeatsTracker is used to track max seats allocatable per priority level from the work estimator +type MaxSeatsTracker interface { + // GetMaxSeats returns the maximum seats a request should occupy for a given priority level. + GetMaxSeats(priorityLevelName string) uint64 + + // SetMaxSeats configures max seats for a priority level. + SetMaxSeats(priorityLevelName string, maxSeats uint64) + + // ForgetPriorityLevel removes max seats tracking for a priority level. + ForgetPriorityLevel(priorityLevelName string) +} + +type maxSeatsTracker struct { + sync.RWMutex + + maxSeats map[string]uint64 +} + +func NewMaxSeatsTracker() MaxSeatsTracker { + return &maxSeatsTracker{ + maxSeats: make(map[string]uint64), + } +} + +func (m *maxSeatsTracker) GetMaxSeats(plName string) uint64 { + m.RLock() + defer m.RUnlock() + + return m.maxSeats[plName] +} + +func (m *maxSeatsTracker) SetMaxSeats(plName string, maxSeats uint64) { + m.Lock() + defer m.Unlock() + + m.maxSeats[plName] = maxSeats +} + +func (m *maxSeatsTracker) ForgetPriorityLevel(plName string) { + m.Lock() + defer m.Unlock() + + delete(m.maxSeats, plName) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/max_seats_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/max_seats_test.go new file mode 100644 index 00000000000..23697acfaa1 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/max_seats_test.go @@ -0,0 +1,140 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "testing" + "time" + + "k8s.io/api/flowcontrol/v1beta3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + "k8s.io/client-go/informers" + clientsetfake "k8s.io/client-go/kubernetes/fake" +) + +// Test_GetMaxSeats tests max seats retrieved from MaxSeatsTracker +func Test_GetMaxSeats(t *testing.T) { + testcases := []struct { + name string + nominalCL int + handSize int32 + expectedMaxSeats uint64 + }{ + { + name: "nominalCL=5, handSize=6", + nominalCL: 5, + handSize: 6, + expectedMaxSeats: 1, + }, + { + name: "nominalCL=10, handSize=6", + nominalCL: 10, + handSize: 6, + expectedMaxSeats: 1, + }, + { + name: "nominalCL=15, handSize=6", + nominalCL: 15, + handSize: 6, + expectedMaxSeats: 2, + }, + { + name: "nominalCL=20, handSize=6", + nominalCL: 20, + handSize: 6, + expectedMaxSeats: 3, + }, + { + name: "nominalCL=35, handSize=6", + nominalCL: 35, + handSize: 6, + expectedMaxSeats: 5, + }, + { + name: "nominalCL=100, handSize=6", + nominalCL: 100, + handSize: 6, + expectedMaxSeats: 15, + }, + { + name: "nominalCL=200, handSize=6", + nominalCL: 200, + handSize: 6, + expectedMaxSeats: 30, + }, + { + name: "nominalCL=10, handSize=1", + nominalCL: 10, + handSize: 1, + expectedMaxSeats: 2, + }, + { + name: "nominalCL=100, handSize=20", + nominalCL: 100, + handSize: 20, + expectedMaxSeats: 5, + }, + } + + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + clientset := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(clientset, time.Second) + flowcontrolClient := clientset.FlowcontrolV1beta3() + startTime := time.Now() + clk, _ := eventclock.NewFake(startTime, 0, nil) + c := newTestableController(TestableConfig{ + Name: "Controller", + Clock: clk, + InformerFactory: informerFactory, + FlowcontrolClient: flowcontrolClient, + // for the purposes of this test, serverCL ~= nominalCL since there is + // only 1 PL with large concurrency shares, making mandatory PLs negligible. + ServerConcurrencyLimit: testcase.nominalCL, + RequestWaitLimit: time.Minute, + ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, + ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, + QueueSetFactory: fqs.NewQueueSetFactory(clk), + }) + + testPriorityLevel := &v1beta3.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pl", + }, + Spec: v1beta3.PriorityLevelConfigurationSpec{ + Type: v1beta3.PriorityLevelEnablementLimited, + Limited: &v1beta3.LimitedPriorityLevelConfiguration{ + NominalConcurrencyShares: 10000, + LimitResponse: v1beta3.LimitResponse{ + Queuing: &v1beta3.QueuingConfiguration{ + HandSize: testcase.handSize, + }, + }, + }, + }, + } + c.digestConfigObjects([]*v1beta3.PriorityLevelConfiguration{testPriorityLevel}, nil) + maxSeats := c.GetMaxSeats("test-pl") + if maxSeats != testcase.expectedMaxSeats { + t.Errorf("unexpected max seats, got=%d, want=%d", maxSeats, testcase.expectedMaxSeats) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/config.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/config.go index b6db19209b5..c51435b1598 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/config.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/config.go @@ -24,7 +24,7 @@ import ( const ( minimumSeats = 1 - maximumSeats = 10 + maximumSeatsLimit = 10 objectsPerSeat = 100.0 watchesPerSeat = 10.0 enableMutatingWorkEstimator = true @@ -39,12 +39,13 @@ type WorkEstimatorConfig struct { // MinimumSeats is the minimum number of seats a request must occupy. MinimumSeats uint64 `json:"minimumSeats,omitempty"` - // MaximumSeats is the maximum number of seats a request can occupy + + // MaximumSeatsLimit is an upper limit on the max seats a request can occupy. // // NOTE: work_estimate_seats_samples metric uses the value of maximumSeats // as the upper bound, so when we change maximumSeats we should also // update the buckets of the metric. - MaximumSeats uint64 `json:"maximumSeats,omitempty"` + MaximumSeatsLimit uint64 `json:"maximumSeatsLimit,omitempty"` } // ListWorkEstimatorConfig holds work estimator parameters related to list requests. @@ -66,7 +67,7 @@ type MutatingWorkEstimatorConfig struct { func DefaultWorkEstimatorConfig() *WorkEstimatorConfig { return &WorkEstimatorConfig{ MinimumSeats: minimumSeats, - MaximumSeats: maximumSeats, + MaximumSeatsLimit: maximumSeatsLimit, ListWorkEstimatorConfig: defaultListWorkEstimatorConfig(), MutatingWorkEstimatorConfig: defaultMutatingWorkEstimatorConfig(), } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 54f85922d07..8d20867d6dd 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -29,10 +29,11 @@ import ( "k8s.io/klog/v2" ) -func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc { +func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc { estimator := &listWorkEstimator{ config: config, countGetterFn: countFn, + maxSeatsFn: maxSeatsFn, } return estimator.estimate } @@ -40,14 +41,21 @@ func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorCo type listWorkEstimator struct { config *WorkEstimatorConfig countGetterFn objectCountGetterFunc + maxSeatsFn maxSeatsFunc } func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { + minSeats := e.config.MinimumSeats + maxSeats := e.maxSeatsFn(priorityLevelName) + if maxSeats == 0 || maxSeats > e.config.MaximumSeatsLimit { + maxSeats = e.config.MaximumSeatsLimit + } + requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) if !ok { // no RequestInfo should never happen, but to be on the safe side // let's return maximumSeats - return WorkEstimate{InitialSeats: e.config.MaximumSeats} + return WorkEstimate{InitialSeats: maxSeats} } if requestInfo.Name != "" { @@ -56,7 +64,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe // Example of such list requests: // /apis/certificates.k8s.io/v1/certificatesigningrequests?fieldSelector=metadata.name%3Dcsr-xxs4m // /api/v1/namespaces/test/configmaps?fieldSelector=metadata.name%3Dbig-deployment-1&limit=500&resourceVersion=0 - return WorkEstimate{InitialSeats: e.config.MinimumSeats} + return WorkEstimate{InitialSeats: minSeats} } query := r.URL.Query() @@ -66,7 +74,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe // This request is destined to fail in the validation layer, // return maximumSeats for this request to be consistent. - return WorkEstimate{InitialSeats: e.config.MaximumSeats} + return WorkEstimate{InitialSeats: maxSeats} } // For watch requests, we want to adjust the cost only if they explicitly request @@ -86,7 +94,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe // be conservative here and allocate maximum seats to this list request. // NOTE: if a CRD is removed, its count will go stale first and then the // pruner will eventually remove the CRD from the cache. - return WorkEstimate{InitialSeats: e.config.MaximumSeats} + return WorkEstimate{InitialSeats: maxSeats} case err == ObjectCountNotFoundErr: // there are multiple scenarios in which we can see this error: // a. the type is truly unknown, a typo on the caller's part. @@ -100,12 +108,12 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe // when aggregated API calls are overestimated, we allocate the minimum // possible seats (see #109106 as an example when being more conservative // led to problems). - return WorkEstimate{InitialSeats: e.config.MinimumSeats} + return WorkEstimate{InitialSeats: minSeats} case err != nil: // we should never be here since Get returns either ObjectCountStaleErr or // ObjectCountNotFoundErr, return maximumSeats to be on the safe side. klog.ErrorS(err, "Unexpected error from object count tracker") - return WorkEstimate{InitialSeats: e.config.MaximumSeats} + return WorkEstimate{InitialSeats: maxSeats} } limit := numStored @@ -134,11 +142,11 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe seats := uint64(math.Ceil(float64(estimatedObjectsToBeProcessed) / e.config.ObjectsPerSeat)) // make sure we never return a seat of zero - if seats < e.config.MinimumSeats { - seats = e.config.MinimumSeats + if seats < minSeats { + seats = minSeats } - if seats > e.config.MaximumSeats { - seats = e.config.MaximumSeats + if seats > maxSeats { + seats = maxSeats } return WorkEstimate{InitialSeats: seats} } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go index 305f8e1ebb5..9b983f0033b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go @@ -25,25 +25,33 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/metrics" ) -func newMutatingWorkEstimator(countFn watchCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc { +func newMutatingWorkEstimator(countFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc { estimator := &mutatingWorkEstimator{ - config: config, - countFn: countFn, + config: config, + countFn: countFn, + maxSeatsFn: maxSeatsFn, } return estimator.estimate } type mutatingWorkEstimator struct { - config *WorkEstimatorConfig - countFn watchCountGetterFunc + config *WorkEstimatorConfig + countFn watchCountGetterFunc + maxSeatsFn maxSeatsFunc } func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { + minSeats := e.config.MinimumSeats + maxSeats := e.maxSeatsFn(priorityLevelName) + if maxSeats == 0 || maxSeats > e.config.MaximumSeatsLimit { + maxSeats = e.config.MaximumSeatsLimit + } + // TODO(wojtekt): Remove once we tune the algorithm to not fail // scalability tests. if !e.config.Enabled { return WorkEstimate{ - InitialSeats: 1, + InitialSeats: minSeats, } } @@ -52,15 +60,15 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori // no RequestInfo should never happen, but to be on the safe side // let's return a large value. return WorkEstimate{ - InitialSeats: 1, - FinalSeats: e.config.MaximumSeats, + InitialSeats: minSeats, + FinalSeats: maxSeats, AdditionalLatency: e.config.eventAdditionalDuration(), } } if isRequestExemptFromWatchEvents(requestInfo) { return WorkEstimate{ - InitialSeats: e.config.MinimumSeats, + InitialSeats: minSeats, FinalSeats: 0, AdditionalLatency: time.Duration(0), } @@ -126,8 +134,8 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori // // TODO: Confirm that the current cap of maximumSeats allow us to // achieve the above. - if finalSeats > e.config.MaximumSeats { - finalSeats = e.config.MaximumSeats + if finalSeats > maxSeats { + finalSeats = maxSeats } additionalLatency = finalWork.DurationPerSeat(float64(finalSeats)) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index be119e5840d..71837edba6b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -64,15 +64,19 @@ type objectCountGetterFunc func(string) (int64, error) // number of watchers potentially interested in a given request. type watchCountGetterFunc func(*apirequest.RequestInfo) int +// MaxSeatsFunc represents a function that returns the maximum seats +// allowed for the work estimator for a given priority level. +type maxSeatsFunc func(priorityLevelName string) uint64 + // NewWorkEstimator estimates the work that will be done by a given request, // if no WorkEstimatorFunc matches the given request then the default // work estimate of 1 seat is allocated to the request. -func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc { +func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc { estimator := &workEstimator{ minimumSeats: config.MinimumSeats, - maximumSeats: config.MaximumSeats, - listWorkEstimator: newListWorkEstimator(objectCountFn, config), - mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn, config), + maximumSeatsLimit: config.MaximumSeatsLimit, + listWorkEstimator: newListWorkEstimator(objectCountFn, config, maxSeatsFn), + mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn, config, maxSeatsFn), } return estimator.estimate } @@ -89,8 +93,8 @@ func (e WorkEstimatorFunc) EstimateWork(r *http.Request, flowSchemaName, priorit type workEstimator struct { // the minimum number of seats a request must occupy minimumSeats uint64 - // the maximum number of seats a request can occupy - maximumSeats uint64 + // the default maximum number of seats a request can occupy + maximumSeatsLimit uint64 // listWorkEstimator estimates work for list request(s) listWorkEstimator WorkEstimatorFunc // mutatingWorkEstimator calculates the width of mutating request(s) @@ -102,7 +106,7 @@ func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelN if !ok { klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI) // no RequestInfo should never happen, but to be on the safe side let's return maximumSeats - return WorkEstimate{InitialSeats: e.maximumSeats} + return WorkEstimate{InitialSeats: e.maximumSeatsLimit} } switch requestInfo.Verb { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go index 441a05565f8..91009f715ca 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go @@ -32,8 +32,6 @@ func TestWorkEstimator(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() defaultCfg := DefaultWorkEstimatorConfig() - minimumSeats := defaultCfg.MinimumSeats - maximumSeats := defaultCfg.MaximumSeats tests := []struct { name string @@ -42,6 +40,7 @@ func TestWorkEstimator(t *testing.T) { counts map[string]int64 countErr error watchCount int + maxSeats uint64 initialSeatsExpected uint64 finalSeatsExpected uint64 additionalLatencyExpected time.Duration @@ -50,7 +49,8 @@ func TestWorkEstimator(t *testing.T) { name: "request has no RequestInfo", requestURI: "http://server/apis/", requestInfo: nil, - initialSeatsExpected: maximumSeats, + maxSeats: 10, + initialSeatsExpected: 10, }, { name: "request verb is not list", @@ -58,7 +58,8 @@ func TestWorkEstimator(t *testing.T) { requestInfo: &apirequest.RequestInfo{ Verb: "get", }, - initialSeatsExpected: minimumSeats, + maxSeats: 10, + initialSeatsExpected: 1, }, { name: "request verb is list, conversion to ListOptions returns error", @@ -71,7 +72,8 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 799, }, - initialSeatsExpected: maximumSeats, + maxSeats: 10, + initialSeatsExpected: 10, }, { name: "request verb is list, has limit and resource version is 1", @@ -84,6 +86,7 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 699, }, + maxSeats: 10, initialSeatsExpected: 8, }, { @@ -97,6 +100,7 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 699, }, + maxSeats: 10, initialSeatsExpected: 7, }, { @@ -110,6 +114,7 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 699, }, + maxSeats: 10, initialSeatsExpected: 8, }, { @@ -123,6 +128,7 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 399, }, + maxSeats: 10, initialSeatsExpected: 8, }, { @@ -134,7 +140,8 @@ func TestWorkEstimator(t *testing.T) { Resource: "events", }, countErr: ObjectCountNotFoundErr, - initialSeatsExpected: minimumSeats, + maxSeats: 10, + initialSeatsExpected: 1, }, { name: "request verb is list, continuation is set", @@ -147,6 +154,7 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 699, }, + maxSeats: 10, initialSeatsExpected: 8, }, { @@ -160,6 +168,7 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 399, }, + maxSeats: 10, initialSeatsExpected: 4, }, { @@ -186,6 +195,7 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 699, }, + maxSeats: 10, initialSeatsExpected: 8, }, { @@ -199,6 +209,7 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 799, }, + maxSeats: 10, initialSeatsExpected: 8, }, { @@ -212,7 +223,22 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 1999, }, - initialSeatsExpected: maximumSeats, + maxSeats: 10, + initialSeatsExpected: 10, + }, + { + name: "request verb is list, maximum is capped, lower max seats", + requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=foo", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 1999, + }, + maxSeats: 5, + initialSeatsExpected: 5, }, { name: "request verb is list, list from cache, count not known", @@ -223,7 +249,8 @@ func TestWorkEstimator(t *testing.T) { Resource: "events", }, countErr: ObjectCountNotFoundErr, - initialSeatsExpected: minimumSeats, + maxSeats: 10, + initialSeatsExpected: 1, }, { name: "request verb is list, object count is stale", @@ -237,7 +264,8 @@ func TestWorkEstimator(t *testing.T) { "events.foo.bar": 799, }, countErr: ObjectCountStaleErr, - initialSeatsExpected: maximumSeats, + maxSeats: 10, + initialSeatsExpected: 10, }, { name: "request verb is list, object count is not found", @@ -248,7 +276,8 @@ func TestWorkEstimator(t *testing.T) { Resource: "events", }, countErr: ObjectCountNotFoundErr, - initialSeatsExpected: minimumSeats, + maxSeats: 10, + initialSeatsExpected: 1, }, { name: "request verb is list, count getter throws unknown error", @@ -259,7 +288,8 @@ func TestWorkEstimator(t *testing.T) { Resource: "events", }, countErr: errors.New("unknown error"), - initialSeatsExpected: maximumSeats, + maxSeats: 10, + initialSeatsExpected: 10, }, { name: "request verb is list, metadata.name specified", @@ -273,7 +303,8 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 799, }, - initialSeatsExpected: minimumSeats, + maxSeats: 10, + initialSeatsExpected: 1, }, { name: "request verb is list, metadata.name, resourceVersion and limit specified", @@ -287,7 +318,8 @@ func TestWorkEstimator(t *testing.T) { counts: map[string]int64{ "events.foo.bar": 799, }, - initialSeatsExpected: minimumSeats, + maxSeats: 10, + initialSeatsExpected: 1, }, { name: "request verb is watch, sendInitialEvents is nil", @@ -336,6 +368,7 @@ func TestWorkEstimator(t *testing.T) { APIGroup: "foo.bar", Resource: "foos", }, + maxSeats: 10, initialSeatsExpected: 1, finalSeatsExpected: 0, additionalLatencyExpected: 0, @@ -349,6 +382,7 @@ func TestWorkEstimator(t *testing.T) { Resource: "foos", }, watchCount: 29, + maxSeats: 10, initialSeatsExpected: 1, finalSeatsExpected: 3, additionalLatencyExpected: 5 * time.Millisecond, @@ -362,6 +396,7 @@ func TestWorkEstimator(t *testing.T) { Resource: "foos", }, watchCount: 5, + maxSeats: 10, initialSeatsExpected: 1, finalSeatsExpected: 0, additionalLatencyExpected: 0, @@ -375,6 +410,7 @@ func TestWorkEstimator(t *testing.T) { Resource: "foos", }, watchCount: 199, + maxSeats: 10, initialSeatsExpected: 1, finalSeatsExpected: 10, additionalLatencyExpected: 10 * time.Millisecond, @@ -387,6 +423,7 @@ func TestWorkEstimator(t *testing.T) { APIGroup: "foo.bar", Resource: "foos", }, + maxSeats: 10, initialSeatsExpected: 1, finalSeatsExpected: 0, additionalLatencyExpected: 0, @@ -400,6 +437,7 @@ func TestWorkEstimator(t *testing.T) { Resource: "foos", }, watchCount: 29, + maxSeats: 10, initialSeatsExpected: 1, finalSeatsExpected: 3, additionalLatencyExpected: 5 * time.Millisecond, @@ -412,6 +450,7 @@ func TestWorkEstimator(t *testing.T) { APIGroup: "foo.bar", Resource: "foos", }, + maxSeats: 10, initialSeatsExpected: 1, finalSeatsExpected: 0, additionalLatencyExpected: 0, @@ -425,10 +464,25 @@ func TestWorkEstimator(t *testing.T) { Resource: "foos", }, watchCount: 29, + maxSeats: 10, initialSeatsExpected: 1, finalSeatsExpected: 3, additionalLatencyExpected: 5 * time.Millisecond, }, + { + name: "request verb is patch, watches registered, lower max seats", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "patch", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 100, + maxSeats: 5, + initialSeatsExpected: 1, + finalSeatsExpected: 5, + additionalLatencyExpected: 10 * time.Millisecond, + }, { name: "request verb is delete, no watches", requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", @@ -437,6 +491,7 @@ func TestWorkEstimator(t *testing.T) { APIGroup: "foo.bar", Resource: "foos", }, + maxSeats: 10, initialSeatsExpected: 1, finalSeatsExpected: 0, additionalLatencyExpected: 0, @@ -450,6 +505,7 @@ func TestWorkEstimator(t *testing.T) { Resource: "foos", }, watchCount: 29, + maxSeats: 10, initialSeatsExpected: 1, finalSeatsExpected: 3, additionalLatencyExpected: 5 * time.Millisecond, @@ -464,7 +520,8 @@ func TestWorkEstimator(t *testing.T) { Subresource: "token", }, watchCount: 5777, - initialSeatsExpected: minimumSeats, + maxSeats: 10, + initialSeatsExpected: 1, finalSeatsExpected: 0, additionalLatencyExpected: 0, }, @@ -477,8 +534,9 @@ func TestWorkEstimator(t *testing.T) { Resource: "serviceaccounts", }, watchCount: 1000, + maxSeats: 10, initialSeatsExpected: 1, - finalSeatsExpected: maximumSeats, + finalSeatsExpected: 10, additionalLatencyExpected: 50 * time.Millisecond, }, } @@ -495,8 +553,11 @@ func TestWorkEstimator(t *testing.T) { watchCountsFn := func(_ *apirequest.RequestInfo) int { return test.watchCount } + maxSeatsFn := func(_ string) uint64 { + return test.maxSeats + } - estimator := NewWorkEstimator(countsFn, watchCountsFn, defaultCfg) + estimator := NewWorkEstimator(countsFn, watchCountsFn, defaultCfg, maxSeatsFn) req, err := http.NewRequest("GET", test.requestURI, nil) if err != nil {