diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 60be44a7a21..a0660b4d49e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -809,7 +809,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = filterlatency.TrackStarted(handler, "authorization") if c.FlowControl != nil { - requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount) + workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig() + requestWorkEstimator := flowcontrolrequest.NewWorkEstimator( + c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg) handler = filterlatency.TrackCompleted(handler) handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) handler = filterlatency.TrackStarted(handler, "priorityandfairness") diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/config.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/config.go new file mode 100644 index 00000000000..11477532739 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/config.go @@ -0,0 +1,92 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + minimumSeats = 1 + maximumSeats = 10 + objectsPerSeat = 100.0 + watchesPerSeat = 10.0 + enableMutatingWorkEstimator = true +) + +var eventAdditionalDuration = 5 * time.Millisecond + +// WorkEstimatorConfig holds work estimator parameters. +type WorkEstimatorConfig struct { + *ListWorkEstimatorConfig `json:"listWorkEstimatorConfig,omitempty"` + *MutatingWorkEstimatorConfig `json:"mutatingWorkEstimatorConfig,omitempty"` + + // MinimumSeats is the minimum number of seats a request must occupy. + MinimumSeats uint `json:"minimumSeats,omitempty"` + // MaximumSeats is the maximum number of seats a request can occupy + // + // NOTE: work_estimate_seats_samples metric uses the value of maximumSeats + // as the upper bound, so when we change maximumSeats we should also + // update the buckets of the metric. + MaximumSeats uint `json:"maximumSeats,omitempty"` +} + +// ListWorkEstimatorConfig holds work estimator parameters related to list requests. +type ListWorkEstimatorConfig struct { + ObjectsPerSeat float64 `json:"objectsPerSeat,omitempty"` +} + +// MutatingWorkEstimatorConfig holds work estimator +// parameters related to watches of mutating objects. +type MutatingWorkEstimatorConfig struct { + // TODO(wojtekt): Remove it once we tune the algorithm to not fail + // scalability tests. + Enabled bool `json:"enable,omitempty"` + EventAdditionalDuration metav1.Duration `json:"eventAdditionalDurationMs,omitempty"` + WatchesPerSeat float64 `json:"watchesPerSeat,omitempty"` +} + +// DefaultWorkEstimatorConfig creates a new WorkEstimatorConfig with default values. +func DefaultWorkEstimatorConfig() *WorkEstimatorConfig { + return &WorkEstimatorConfig{ + MinimumSeats: minimumSeats, + MaximumSeats: maximumSeats, + ListWorkEstimatorConfig: defaultListWorkEstimatorConfig(), + MutatingWorkEstimatorConfig: defaultMutatingWorkEstimatorConfig(), + } +} + +// defaultListWorkEstimatorConfig creates a new ListWorkEstimatorConfig with default values. +func defaultListWorkEstimatorConfig() *ListWorkEstimatorConfig { + return &ListWorkEstimatorConfig{ObjectsPerSeat: objectsPerSeat} +} + +// defaultMutatingWorkEstimatorConfig creates a new MutatingWorkEstimatorConfig with default values. +func defaultMutatingWorkEstimatorConfig() *MutatingWorkEstimatorConfig { + return &MutatingWorkEstimatorConfig{ + Enabled: enableMutatingWorkEstimator, + EventAdditionalDuration: metav1.Duration{Duration: eventAdditionalDuration}, + WatchesPerSeat: watchesPerSeat, + } +} + +// eventAdditionalDuration converts eventAdditionalDurationMs to a time.Duration type. +func (c *MutatingWorkEstimatorConfig) eventAdditionalDuration() time.Duration { + return c.EventAdditionalDuration.Duration +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 50125715c8a..e0f00f7e4f8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -29,14 +29,16 @@ import ( "k8s.io/klog/v2" ) -func newListWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc { +func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc { estimator := &listWorkEstimator{ + config: config, countGetterFn: countFn, } return estimator.estimate } type listWorkEstimator struct { + config *WorkEstimatorConfig countGetterFn objectCountGetterFunc } @@ -45,7 +47,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe if !ok { // no RequestInfo should never happen, but to be on the safe side // let's return maximumSeats - return WorkEstimate{InitialSeats: maximumSeats} + return WorkEstimate{InitialSeats: e.config.MaximumSeats} } query := r.URL.Query() @@ -55,7 +57,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe // This request is destined to fail in the validation layer, // return maximumSeats for this request to be consistent. - return WorkEstimate{InitialSeats: maximumSeats} + return WorkEstimate{InitialSeats: e.config.MaximumSeats} } isListFromCache := !shouldListFromStorage(query, &listOptions) @@ -66,7 +68,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe // be conservative here and allocate maximum seats to this list request. // NOTE: if a CRD is removed, its count will go stale first and then the // pruner will eventually remove the CRD from the cache. - return WorkEstimate{InitialSeats: maximumSeats} + return WorkEstimate{InitialSeats: e.config.MaximumSeats} case err == ObjectCountNotFoundErr: // there are multiple scenarios in which we can see this error: // a. the type is truly unknown, a typo on the caller's part. @@ -80,12 +82,12 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe // when aggregated API calls are overestimated, we allocate the minimum // possible seats (see #109106 as an example when being more conservative // led to problems). - return WorkEstimate{InitialSeats: minimumSeats} + return WorkEstimate{InitialSeats: e.config.MinimumSeats} case err != nil: // we should never be here since Get returns either ObjectCountStaleErr or // ObjectCountNotFoundErr, return maximumSeats to be on the safe side. klog.ErrorS(err, "Unexpected error from object count tracker") - return WorkEstimate{InitialSeats: maximumSeats} + return WorkEstimate{InitialSeats: e.config.MaximumSeats} } limit := numStored @@ -111,14 +113,14 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe // will be processed by the list request. // we will come up with a different formula for the transformation function and/or // fine tune this number in future iteratons. - seats := uint(math.Ceil(float64(estimatedObjectsToBeProcessed) / float64(100))) + seats := uint(math.Ceil(float64(estimatedObjectsToBeProcessed) / e.config.ObjectsPerSeat)) // make sure we never return a seat of zero - if seats < minimumSeats { - seats = minimumSeats + if seats < e.config.MinimumSeats { + seats = e.config.MinimumSeats } - if seats > maximumSeats { - seats = maximumSeats + if seats > e.config.MaximumSeats { + seats = e.config.MaximumSeats } return WorkEstimate{InitialSeats: seats} } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go index 45f47686a60..7a7ca194f12 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go @@ -25,35 +25,23 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/metrics" ) -const ( - watchesPerSeat = 10.0 - eventAdditionalDuration = 5 * time.Millisecond - // TODO(wojtekt): Remove it once we tune the algorithm to not fail - // scalability tests. - enableMutatingWorkEstimator = true -) - -func newMutatingWorkEstimator(countFn watchCountGetterFunc) WorkEstimatorFunc { - return newTestMutatingWorkEstimator(countFn, enableMutatingWorkEstimator) -} - -func newTestMutatingWorkEstimator(countFn watchCountGetterFunc, enabled bool) WorkEstimatorFunc { +func newMutatingWorkEstimator(countFn watchCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc { estimator := &mutatingWorkEstimator{ + config: config, countFn: countFn, - enabled: enabled, } return estimator.estimate } type mutatingWorkEstimator struct { + config *WorkEstimatorConfig countFn watchCountGetterFunc - enabled bool } func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { // TODO(wojtekt): Remove once we tune the algorithm to not fail // scalability tests. - if !e.enabled { + if !e.config.Enabled { return WorkEstimate{ InitialSeats: 1, } @@ -65,8 +53,8 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori // let's return a large value. return WorkEstimate{ InitialSeats: 1, - FinalSeats: maximumSeats, - AdditionalLatency: eventAdditionalDuration, + FinalSeats: e.config.MaximumSeats, + AdditionalLatency: e.config.eventAdditionalDuration(), } } watchCount := e.countFn(requestInfo) @@ -94,12 +82,12 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori // the request finishes even if there is a small number of watches. // However, until we tune the estimation we want to stay on the safe side // an avoid introducing additional latency for almost every single request. - if watchCount >= watchesPerSeat { + if watchCount >= int(e.config.WatchesPerSeat) { // TODO: As described in the KEP, we should take into account that not all // events are equal and try to estimate the cost of a single event based on // some historical data about size of events. - finalSeats = uint(math.Ceil(float64(watchCount) / watchesPerSeat)) - finalWork := SeatsTimesDuration(float64(finalSeats), eventAdditionalDuration) + finalSeats = uint(math.Ceil(float64(watchCount) / e.config.WatchesPerSeat)) + finalWork := SeatsTimesDuration(float64(finalSeats), e.config.eventAdditionalDuration()) // While processing individual events is highly parallel, // the design/implementation of P&F has a couple limitations that @@ -129,8 +117,8 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori // // TODO: Confirm that the current cap of maximumSeats allow us to // achieve the above. - if finalSeats > maximumSeats { - finalSeats = maximumSeats + if finalSeats > e.config.MaximumSeats { + finalSeats = e.config.MaximumSeats } additionalLatency = finalWork.DurationPerSeat(float64(finalSeats)) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index c9f1d155c4a..aa0c4542d58 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -25,18 +25,6 @@ import ( "k8s.io/klog/v2" ) -const ( - // the minimum number of seats a request must occupy - minimumSeats = 1 - - // the maximum number of seats a request can occupy - // - // NOTE: work_estimate_seats_samples metric uses the value of maximumSeats - // as the upper bound, so when we change maximumSeats we should also - // update the buckets of the metric. - maximumSeats = 10 -) - // WorkEstimate carries three of the four parameters that determine the work in a request. // The fourth parameter is the duration of the initial phase of execution. type WorkEstimate struct { @@ -76,10 +64,12 @@ type watchCountGetterFunc func(*apirequest.RequestInfo) int // NewWorkEstimator estimates the work that will be done by a given request, // if no WorkEstimatorFunc matches the given request then the default // work estimate of 1 seat is allocated to the request. -func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc) WorkEstimatorFunc { +func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc { estimator := &workEstimator{ - listWorkEstimator: newListWorkEstimator(objectCountFn), - mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn), + minimumSeats: config.MinimumSeats, + maximumSeats: config.MaximumSeats, + listWorkEstimator: newListWorkEstimator(objectCountFn, config), + mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn, config), } return estimator.estimate } @@ -94,6 +84,10 @@ func (e WorkEstimatorFunc) EstimateWork(r *http.Request, flowSchemaName, priorit } type workEstimator struct { + // the minimum number of seats a request must occupy + minimumSeats uint + // the maximum number of seats a request can occupy + maximumSeats uint // listWorkEstimator estimates work for list request(s) listWorkEstimator WorkEstimatorFunc // mutatingWorkEstimator calculates the width of mutating request(s) @@ -105,7 +99,7 @@ func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelN if !ok { klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI) // no RequestInfo should never happen, but to be on the safe side let's return maximumSeats - return WorkEstimate{InitialSeats: maximumSeats} + return WorkEstimate{InitialSeats: e.maximumSeats} } switch requestInfo.Verb { @@ -115,5 +109,5 @@ func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelN return e.mutatingWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName) } - return WorkEstimate{InitialSeats: minimumSeats} + return WorkEstimate{InitialSeats: e.minimumSeats} } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go index cebe8cb9a66..e3867285bfc 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go @@ -26,6 +26,10 @@ import ( ) func TestWorkEstimator(t *testing.T) { + defaultCfg := DefaultWorkEstimatorConfig() + minimumSeats := defaultCfg.MinimumSeats + maximumSeats := defaultCfg.MaximumSeats + tests := []struct { name string requestURI string @@ -393,13 +397,7 @@ func TestWorkEstimator(t *testing.T) { return test.watchCount } - // TODO(wojtek-t): Simplify it once we enable mutating work estimator - // by default. - testEstimator := &workEstimator{ - listWorkEstimator: newListWorkEstimator(countsFn), - mutatingWorkEstimator: newTestMutatingWorkEstimator(watchCountsFn, true), - } - estimator := WorkEstimatorFunc(testEstimator.estimate) + estimator := NewWorkEstimator(countsFn, watchCountsFn, defaultCfg) req, err := http.NewRequest("GET", test.requestURI, nil) if err != nil {