Introduce config for API Priority and Fairness

Linked all the default values with a single config structure.
This commit is contained in:
jupblb 2022-07-20 11:33:45 +02:00
parent 09e8339ae4
commit 1c594e7e01
No known key found for this signature in database
GPG Key ID: 7FF80D1CBB5F2AC4
6 changed files with 135 additions and 59 deletions

View File

@ -809,7 +809,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = filterlatency.TrackStarted(handler, "authorization") handler = filterlatency.TrackStarted(handler, "authorization")
if c.FlowControl != nil { if c.FlowControl != nil {
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount) workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg)
handler = filterlatency.TrackCompleted(handler) handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness") handler = filterlatency.TrackStarted(handler, "priorityandfairness")

View File

@ -0,0 +1,92 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
minimumSeats = 1
maximumSeats = 10
objectsPerSeat = 100.0
watchesPerSeat = 10.0
enableMutatingWorkEstimator = true
)
var eventAdditionalDuration = 5 * time.Millisecond
// WorkEstimatorConfig holds work estimator parameters.
type WorkEstimatorConfig struct {
*ListWorkEstimatorConfig `json:"listWorkEstimatorConfig,omitempty"`
*MutatingWorkEstimatorConfig `json:"mutatingWorkEstimatorConfig,omitempty"`
// MinimumSeats is the minimum number of seats a request must occupy.
MinimumSeats uint `json:"minimumSeats,omitempty"`
// MaximumSeats is the maximum number of seats a request can occupy
//
// NOTE: work_estimate_seats_samples metric uses the value of maximumSeats
// as the upper bound, so when we change maximumSeats we should also
// update the buckets of the metric.
MaximumSeats uint `json:"maximumSeats,omitempty"`
}
// ListWorkEstimatorConfig holds work estimator parameters related to list requests.
type ListWorkEstimatorConfig struct {
ObjectsPerSeat float64 `json:"objectsPerSeat,omitempty"`
}
// MutatingWorkEstimatorConfig holds work estimator
// parameters related to watches of mutating objects.
type MutatingWorkEstimatorConfig struct {
// TODO(wojtekt): Remove it once we tune the algorithm to not fail
// scalability tests.
Enabled bool `json:"enable,omitempty"`
EventAdditionalDuration metav1.Duration `json:"eventAdditionalDurationMs,omitempty"`
WatchesPerSeat float64 `json:"watchesPerSeat,omitempty"`
}
// DefaultWorkEstimatorConfig creates a new WorkEstimatorConfig with default values.
func DefaultWorkEstimatorConfig() *WorkEstimatorConfig {
return &WorkEstimatorConfig{
MinimumSeats: minimumSeats,
MaximumSeats: maximumSeats,
ListWorkEstimatorConfig: defaultListWorkEstimatorConfig(),
MutatingWorkEstimatorConfig: defaultMutatingWorkEstimatorConfig(),
}
}
// defaultListWorkEstimatorConfig creates a new ListWorkEstimatorConfig with default values.
func defaultListWorkEstimatorConfig() *ListWorkEstimatorConfig {
return &ListWorkEstimatorConfig{ObjectsPerSeat: objectsPerSeat}
}
// defaultMutatingWorkEstimatorConfig creates a new MutatingWorkEstimatorConfig with default values.
func defaultMutatingWorkEstimatorConfig() *MutatingWorkEstimatorConfig {
return &MutatingWorkEstimatorConfig{
Enabled: enableMutatingWorkEstimator,
EventAdditionalDuration: metav1.Duration{Duration: eventAdditionalDuration},
WatchesPerSeat: watchesPerSeat,
}
}
// eventAdditionalDuration converts eventAdditionalDurationMs to a time.Duration type.
func (c *MutatingWorkEstimatorConfig) eventAdditionalDuration() time.Duration {
return c.EventAdditionalDuration.Duration
}

View File

@ -29,14 +29,16 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
func newListWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc { func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc {
estimator := &listWorkEstimator{ estimator := &listWorkEstimator{
config: config,
countGetterFn: countFn, countGetterFn: countFn,
} }
return estimator.estimate return estimator.estimate
} }
type listWorkEstimator struct { type listWorkEstimator struct {
config *WorkEstimatorConfig
countGetterFn objectCountGetterFunc countGetterFn objectCountGetterFunc
} }
@ -45,7 +47,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
if !ok { if !ok {
// no RequestInfo should never happen, but to be on the safe side // no RequestInfo should never happen, but to be on the safe side
// let's return maximumSeats // let's return maximumSeats
return WorkEstimate{InitialSeats: maximumSeats} return WorkEstimate{InitialSeats: e.config.MaximumSeats}
} }
query := r.URL.Query() query := r.URL.Query()
@ -55,7 +57,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// This request is destined to fail in the validation layer, // This request is destined to fail in the validation layer,
// return maximumSeats for this request to be consistent. // return maximumSeats for this request to be consistent.
return WorkEstimate{InitialSeats: maximumSeats} return WorkEstimate{InitialSeats: e.config.MaximumSeats}
} }
isListFromCache := !shouldListFromStorage(query, &listOptions) isListFromCache := !shouldListFromStorage(query, &listOptions)
@ -66,7 +68,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// be conservative here and allocate maximum seats to this list request. // be conservative here and allocate maximum seats to this list request.
// NOTE: if a CRD is removed, its count will go stale first and then the // NOTE: if a CRD is removed, its count will go stale first and then the
// pruner will eventually remove the CRD from the cache. // pruner will eventually remove the CRD from the cache.
return WorkEstimate{InitialSeats: maximumSeats} return WorkEstimate{InitialSeats: e.config.MaximumSeats}
case err == ObjectCountNotFoundErr: case err == ObjectCountNotFoundErr:
// there are multiple scenarios in which we can see this error: // there are multiple scenarios in which we can see this error:
// a. the type is truly unknown, a typo on the caller's part. // a. the type is truly unknown, a typo on the caller's part.
@ -80,12 +82,12 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// when aggregated API calls are overestimated, we allocate the minimum // when aggregated API calls are overestimated, we allocate the minimum
// possible seats (see #109106 as an example when being more conservative // possible seats (see #109106 as an example when being more conservative
// led to problems). // led to problems).
return WorkEstimate{InitialSeats: minimumSeats} return WorkEstimate{InitialSeats: e.config.MinimumSeats}
case err != nil: case err != nil:
// we should never be here since Get returns either ObjectCountStaleErr or // we should never be here since Get returns either ObjectCountStaleErr or
// ObjectCountNotFoundErr, return maximumSeats to be on the safe side. // ObjectCountNotFoundErr, return maximumSeats to be on the safe side.
klog.ErrorS(err, "Unexpected error from object count tracker") klog.ErrorS(err, "Unexpected error from object count tracker")
return WorkEstimate{InitialSeats: maximumSeats} return WorkEstimate{InitialSeats: e.config.MaximumSeats}
} }
limit := numStored limit := numStored
@ -111,14 +113,14 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// will be processed by the list request. // will be processed by the list request.
// we will come up with a different formula for the transformation function and/or // we will come up with a different formula for the transformation function and/or
// fine tune this number in future iteratons. // fine tune this number in future iteratons.
seats := uint(math.Ceil(float64(estimatedObjectsToBeProcessed) / float64(100))) seats := uint(math.Ceil(float64(estimatedObjectsToBeProcessed) / e.config.ObjectsPerSeat))
// make sure we never return a seat of zero // make sure we never return a seat of zero
if seats < minimumSeats { if seats < e.config.MinimumSeats {
seats = minimumSeats seats = e.config.MinimumSeats
} }
if seats > maximumSeats { if seats > e.config.MaximumSeats {
seats = maximumSeats seats = e.config.MaximumSeats
} }
return WorkEstimate{InitialSeats: seats} return WorkEstimate{InitialSeats: seats}
} }

View File

@ -25,35 +25,23 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
) )
const ( func newMutatingWorkEstimator(countFn watchCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc {
watchesPerSeat = 10.0
eventAdditionalDuration = 5 * time.Millisecond
// TODO(wojtekt): Remove it once we tune the algorithm to not fail
// scalability tests.
enableMutatingWorkEstimator = true
)
func newMutatingWorkEstimator(countFn watchCountGetterFunc) WorkEstimatorFunc {
return newTestMutatingWorkEstimator(countFn, enableMutatingWorkEstimator)
}
func newTestMutatingWorkEstimator(countFn watchCountGetterFunc, enabled bool) WorkEstimatorFunc {
estimator := &mutatingWorkEstimator{ estimator := &mutatingWorkEstimator{
config: config,
countFn: countFn, countFn: countFn,
enabled: enabled,
} }
return estimator.estimate return estimator.estimate
} }
type mutatingWorkEstimator struct { type mutatingWorkEstimator struct {
config *WorkEstimatorConfig
countFn watchCountGetterFunc countFn watchCountGetterFunc
enabled bool
} }
func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
// TODO(wojtekt): Remove once we tune the algorithm to not fail // TODO(wojtekt): Remove once we tune the algorithm to not fail
// scalability tests. // scalability tests.
if !e.enabled { if !e.config.Enabled {
return WorkEstimate{ return WorkEstimate{
InitialSeats: 1, InitialSeats: 1,
} }
@ -65,8 +53,8 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori
// let's return a large value. // let's return a large value.
return WorkEstimate{ return WorkEstimate{
InitialSeats: 1, InitialSeats: 1,
FinalSeats: maximumSeats, FinalSeats: e.config.MaximumSeats,
AdditionalLatency: eventAdditionalDuration, AdditionalLatency: e.config.eventAdditionalDuration(),
} }
} }
watchCount := e.countFn(requestInfo) watchCount := e.countFn(requestInfo)
@ -94,12 +82,12 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori
// the request finishes even if there is a small number of watches. // the request finishes even if there is a small number of watches.
// However, until we tune the estimation we want to stay on the safe side // However, until we tune the estimation we want to stay on the safe side
// an avoid introducing additional latency for almost every single request. // an avoid introducing additional latency for almost every single request.
if watchCount >= watchesPerSeat { if watchCount >= int(e.config.WatchesPerSeat) {
// TODO: As described in the KEP, we should take into account that not all // TODO: As described in the KEP, we should take into account that not all
// events are equal and try to estimate the cost of a single event based on // events are equal and try to estimate the cost of a single event based on
// some historical data about size of events. // some historical data about size of events.
finalSeats = uint(math.Ceil(float64(watchCount) / watchesPerSeat)) finalSeats = uint(math.Ceil(float64(watchCount) / e.config.WatchesPerSeat))
finalWork := SeatsTimesDuration(float64(finalSeats), eventAdditionalDuration) finalWork := SeatsTimesDuration(float64(finalSeats), e.config.eventAdditionalDuration())
// While processing individual events is highly parallel, // While processing individual events is highly parallel,
// the design/implementation of P&F has a couple limitations that // the design/implementation of P&F has a couple limitations that
@ -129,8 +117,8 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori
// //
// TODO: Confirm that the current cap of maximumSeats allow us to // TODO: Confirm that the current cap of maximumSeats allow us to
// achieve the above. // achieve the above.
if finalSeats > maximumSeats { if finalSeats > e.config.MaximumSeats {
finalSeats = maximumSeats finalSeats = e.config.MaximumSeats
} }
additionalLatency = finalWork.DurationPerSeat(float64(finalSeats)) additionalLatency = finalWork.DurationPerSeat(float64(finalSeats))
} }

View File

@ -25,18 +25,6 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
const (
// the minimum number of seats a request must occupy
minimumSeats = 1
// the maximum number of seats a request can occupy
//
// NOTE: work_estimate_seats_samples metric uses the value of maximumSeats
// as the upper bound, so when we change maximumSeats we should also
// update the buckets of the metric.
maximumSeats = 10
)
// WorkEstimate carries three of the four parameters that determine the work in a request. // WorkEstimate carries three of the four parameters that determine the work in a request.
// The fourth parameter is the duration of the initial phase of execution. // The fourth parameter is the duration of the initial phase of execution.
type WorkEstimate struct { type WorkEstimate struct {
@ -76,10 +64,12 @@ type watchCountGetterFunc func(*apirequest.RequestInfo) int
// NewWorkEstimator estimates the work that will be done by a given request, // NewWorkEstimator estimates the work that will be done by a given request,
// if no WorkEstimatorFunc matches the given request then the default // if no WorkEstimatorFunc matches the given request then the default
// work estimate of 1 seat is allocated to the request. // work estimate of 1 seat is allocated to the request.
func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc) WorkEstimatorFunc { func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc {
estimator := &workEstimator{ estimator := &workEstimator{
listWorkEstimator: newListWorkEstimator(objectCountFn), minimumSeats: config.MinimumSeats,
mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn), maximumSeats: config.MaximumSeats,
listWorkEstimator: newListWorkEstimator(objectCountFn, config),
mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn, config),
} }
return estimator.estimate return estimator.estimate
} }
@ -94,6 +84,10 @@ func (e WorkEstimatorFunc) EstimateWork(r *http.Request, flowSchemaName, priorit
} }
type workEstimator struct { type workEstimator struct {
// the minimum number of seats a request must occupy
minimumSeats uint
// the maximum number of seats a request can occupy
maximumSeats uint
// listWorkEstimator estimates work for list request(s) // listWorkEstimator estimates work for list request(s)
listWorkEstimator WorkEstimatorFunc listWorkEstimator WorkEstimatorFunc
// mutatingWorkEstimator calculates the width of mutating request(s) // mutatingWorkEstimator calculates the width of mutating request(s)
@ -105,7 +99,7 @@ func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelN
if !ok { if !ok {
klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI) klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI)
// no RequestInfo should never happen, but to be on the safe side let's return maximumSeats // no RequestInfo should never happen, but to be on the safe side let's return maximumSeats
return WorkEstimate{InitialSeats: maximumSeats} return WorkEstimate{InitialSeats: e.maximumSeats}
} }
switch requestInfo.Verb { switch requestInfo.Verb {
@ -115,5 +109,5 @@ func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelN
return e.mutatingWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName) return e.mutatingWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
} }
return WorkEstimate{InitialSeats: minimumSeats} return WorkEstimate{InitialSeats: e.minimumSeats}
} }

View File

@ -26,6 +26,10 @@ import (
) )
func TestWorkEstimator(t *testing.T) { func TestWorkEstimator(t *testing.T) {
defaultCfg := DefaultWorkEstimatorConfig()
minimumSeats := defaultCfg.MinimumSeats
maximumSeats := defaultCfg.MaximumSeats
tests := []struct { tests := []struct {
name string name string
requestURI string requestURI string
@ -393,13 +397,7 @@ func TestWorkEstimator(t *testing.T) {
return test.watchCount return test.watchCount
} }
// TODO(wojtek-t): Simplify it once we enable mutating work estimator estimator := NewWorkEstimator(countsFn, watchCountsFn, defaultCfg)
// by default.
testEstimator := &workEstimator{
listWorkEstimator: newListWorkEstimator(countsFn),
mutatingWorkEstimator: newTestMutatingWorkEstimator(watchCountsFn, true),
}
estimator := WorkEstimatorFunc(testEstimator.estimate)
req, err := http.NewRequest("GET", test.requestURI, nil) req, err := http.NewRequest("GET", test.requestURI, nil)
if err != nil { if err != nil {