diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 95c96f95aec..ef68dd23c5f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -749,8 +749,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = filterlatency.TrackStarted(handler, "authorization") if c.FlowControl != nil { + requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get) handler = filterlatency.TrackCompleted(handler) - handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, flowcontrolrequest.DefaultWorkEstimator) + handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) handler = filterlatency.TrackStarted(handler, "priorityandfairness") } else { handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 3eeba7103ec..c4f9865c284 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -593,6 +593,8 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o return nil } +// NOTICE: Keep in sync with shouldListFromStorage function in +// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go func shouldDelegateList(opts storage.ListOptions) bool { resourceVersion := opts.ResourceVersion pred := opts.Predicate diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go new file mode 100644 index 00000000000..f5be414f59b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -0,0 +1,136 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "math" + "net/http" + "net/url" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog/v2" +) + +func newListWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc { + estimator := &listWorkEstimator{ + countGetterFn: countFn, + } + return estimator.estimate +} + +type listWorkEstimator struct { + countGetterFn objectCountGetterFunc +} + +func (e *listWorkEstimator) estimate(r *http.Request) WorkEstimate { + requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) + if !ok { + // no RequestInfo should never happen, but to be on the safe side + // let's return maximumSeats + return WorkEstimate{Seats: maximumSeats} + } + + query := r.URL.Query() + listOptions := metav1.ListOptions{} + if err := metav1.Convert_url_Values_To_v1_ListOptions(&query, &listOptions, nil); err != nil { + klog.ErrorS(err, "Failed to convert options while estimating work for the list request") + + // This request is destined to fail in the validation layer, + // return maximumSeats for this request to be consistent. + return WorkEstimate{Seats: maximumSeats} + } + isListFromCache := !shouldListFromStorage(query, &listOptions) + + count, err := e.countGetterFn(key(requestInfo)) + switch { + case err == ObjectCountStaleErr: + // object count going stale is indicative of degradation, so we should + // be conservative here and allocate maximum seats to this list request. + // NOTE: if a CRD is removed, its count will go stale first and then the + // pruner will eventually remove the CRD from the cache. + return WorkEstimate{Seats: maximumSeats} + case err == ObjectCountNotFoundErr: + // there are two scenarios in which we can see this error: + // a. the type is truly unknown, a typo on the caller's part. + // b. the count has gone stale for too long and the pruner + // has removed the type from the cache. + // we don't have a way to distinguish between a and b. b seems to indicate + // to a more severe case of degradation, although b can naturally trigger + // when a CRD is removed. let's be conservative and allocate maximum seats. + return WorkEstimate{Seats: maximumSeats} + case err != nil: + // we should never be here since Get returns either ObjectCountStaleErr or + // ObjectCountNotFoundErr, return maximumSeats to be on the safe side. + return WorkEstimate{Seats: maximumSeats} + } + + // TODO: For resources that implement indexes at the watchcache level, + // we need to adjust the cost accordingly + var estimatedObjectsToBeProcessed int64 + switch { + case isListFromCache: + // if we are here, count is known + estimatedObjectsToBeProcessed = count + default: + // Even if a selector is specified and we may need to list and go over more objects from etcd + // to produce the result of size , each individual chunk will be of size at most . + // As a result. the work estimate of the request should be computed based on and the actual + // cost of processing more elements will be hidden in the request processing latency. + estimatedObjectsToBeProcessed = listOptions.Limit + if estimatedObjectsToBeProcessed == 0 { + // limit has not been specified, fall back to count + estimatedObjectsToBeProcessed = count + } + } + + // for now, our rough estimate is to allocate one seat to each 100 obejcts that + // will be processed by the list request. + // we will come up with a different formula for the transformation function and/or + // fine tune this number in future iteratons. + seats := uint(math.Ceil(float64(estimatedObjectsToBeProcessed) / float64(100))) + + // make sure we never return a seat of zero + if seats < minimumSeats { + seats = minimumSeats + } + if seats > maximumSeats { + seats = maximumSeats + } + return WorkEstimate{Seats: seats} +} + +func key(requestInfo *apirequest.RequestInfo) string { + groupResource := &schema.GroupResource{ + Group: requestInfo.APIGroup, + Resource: requestInfo.Resource, + } + return groupResource.String() +} + +// NOTICE: Keep in sync with shouldDelegateList function in +// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool { + resourceVersion := opts.ResourceVersion + pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) + hasContinuation := pagingEnabled && len(opts.Continue) > 0 + hasLimit := pagingEnabled && opts.Limit > 0 && resourceVersion != "0" + return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index f2f4fdab94d..07f458c0b0b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -17,7 +17,19 @@ limitations under the License. package request import ( + "fmt" "net/http" + + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/klog/v2" +) + +const ( + // the minimum number of seats a request must occupy + minimumSeats = 1 + + // the maximum number of seats a request can occupy + maximumSeats = 10 ) type WorkEstimate struct { @@ -25,16 +37,18 @@ type WorkEstimate struct { Seats uint } -// DefaultWorkEstimator returns estimation with default number of seats -// of 1. -// -// TODO: when we plumb in actual work estimate handling for different -// type of request(s) this function will iterate through a chain -// of workEstimator instance(s). -func DefaultWorkEstimator(_ *http.Request) WorkEstimate { - return WorkEstimate{ - Seats: 1, +// objectCountGetterFunc represents a function that gets the total +// number of objects for a given resource. +type objectCountGetterFunc func(string) (int64, error) + +// NewWorkEstimator estimates the work that will be done by a given request, +// if no WorkEstimatorFunc matches the given request then the default +// work estimate of 1 seat is allocated to the request. +func NewWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc { + estimator := &workEstimator{ + listWorkEstimator: newListWorkEstimator(countFn), } + return estimator.estimate } // WorkEstimatorFunc returns the estimated work of a given request. @@ -45,3 +59,24 @@ type WorkEstimatorFunc func(*http.Request) WorkEstimate func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate { return e(r) } + +type workEstimator struct { + // listWorkEstimator estimates work for list request(s) + listWorkEstimator WorkEstimatorFunc +} + +func (e *workEstimator) estimate(r *http.Request) WorkEstimate { + requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) + if !ok { + klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI) + // no RequestInfo should never happen, but to be on the safe side let's return maximumSeats + return WorkEstimate{Seats: maximumSeats} + } + + switch requestInfo.Verb { + case "list": + return e.listWorkEstimator.EstimateWork(r) + } + + return WorkEstimate{Seats: minimumSeats} +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go new file mode 100644 index 00000000000..61ab78995d0 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go @@ -0,0 +1,253 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "errors" + "net/http" + "testing" + + apirequest "k8s.io/apiserver/pkg/endpoints/request" +) + +func TestWorkEstimator(t *testing.T) { + tests := []struct { + name string + requestURI string + requestInfo *apirequest.RequestInfo + counts map[string]int64 + countErr error + seatsExpected uint + }{ + { + name: "request has no RequestInfo", + requestURI: "http://server/apis/", + requestInfo: nil, + seatsExpected: maximumSeats, + }, + { + name: "request verb is not list", + requestURI: "http://server/apis/", + requestInfo: &apirequest.RequestInfo{ + Verb: "get", + }, + seatsExpected: minimumSeats, + }, + { + name: "request verb is list, conversion to ListOptions returns error", + requestURI: "http://server/apis/foo.bar/v1/events?limit=invalid", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + seatsExpected: maximumSeats, + }, + { + name: "request verb is list, resource version not set", + requestURI: "http://server/apis/foo.bar/v1/events?limit=499", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + seatsExpected: 5, + }, + { + name: "request verb is list, continuation is set", + requestURI: "http://server/apis/foo.bar/v1/events?continue=token&limit=499&resourceVersion=1", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + seatsExpected: 5, + }, + { + name: "request verb is list, has limit", + requestURI: "http://server/apis/foo.bar/v1/events?limit=499&resourceVersion=1", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + seatsExpected: 5, + }, + { + name: "request verb is list, resource version is zero", + requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=0", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + seatsExpected: 8, + }, + { + name: "request verb is list, no query parameters, count known", + requestURI: "http://server/apis/foo.bar/v1/events", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + seatsExpected: 8, + }, + { + name: "request verb is list, no query parameters, count not known", + requestURI: "http://server/apis/foo.bar/v1/events", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + countErr: ObjectCountNotFoundErr, + seatsExpected: maximumSeats, + }, + { + name: "request verb is list, resource version match is Exact", + requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=foo&resourceVersionMatch=Exact&limit=499", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + seatsExpected: 5, + }, + { + name: "request verb is list, resource version match is NotOlderThan, limit not specified", + requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=foo&resourceVersionMatch=NotOlderThan", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + seatsExpected: 8, + }, + { + name: "request verb is list, maximum is capped", + requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=foo", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 1999, + }, + seatsExpected: maximumSeats, + }, + { + name: "request verb is list, list from cache, count not known", + requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=0&limit=799", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + countErr: ObjectCountNotFoundErr, + seatsExpected: maximumSeats, + }, + { + name: "request verb is list, object count is stale", + requestURI: "http://server/apis/foo.bar/v1/events?limit=499", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + countErr: ObjectCountStaleErr, + seatsExpected: maximumSeats, + }, + { + name: "request verb is list, object count is not found", + requestURI: "http://server/apis/foo.bar/v1/events?limit=499", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + countErr: ObjectCountNotFoundErr, + seatsExpected: maximumSeats, + }, + { + name: "request verb is list, count getter throws unknown error", + requestURI: "http://server/apis/foo.bar/v1/events?limit=499", + requestInfo: &apirequest.RequestInfo{ + Verb: "list", + APIGroup: "foo.bar", + Resource: "events", + }, + countErr: errors.New("unknown error"), + seatsExpected: maximumSeats, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + counts := test.counts + if len(counts) == 0 { + counts = map[string]int64{} + } + countsFn := func(key string) (int64, error) { + return counts[key], test.countErr + } + estimator := NewWorkEstimator(countsFn) + + req, err := http.NewRequest("GET", test.requestURI, nil) + if err != nil { + t.Fatalf("Failed to create new HTTP request - %v", err) + } + + if test.requestInfo != nil { + req = req.WithContext(apirequest.WithRequestInfo(req.Context(), test.requestInfo)) + } + + workestimateGot := estimator.EstimateWork(req) + if test.seatsExpected != workestimateGot.Seats { + t.Errorf("Expected work estimate to match: %d seats, but got: %d seats", test.seatsExpected, workestimateGot.Seats) + } + }) + } +}