apf: estimate list width

This commit is contained in:
Abu Kashem 2021-06-22 19:38:00 -04:00
parent 2c1c1b7546
commit 296c18ec32
No known key found for this signature in database
GPG Key ID: 33A4FA7088DB68A9
5 changed files with 437 additions and 10 deletions

View File

@ -749,8 +749,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = filterlatency.TrackStarted(handler, "authorization")
if c.FlowControl != nil {
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, flowcontrolrequest.DefaultWorkEstimator)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)

View File

@ -593,6 +593,8 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
return nil
}
// NOTICE: Keep in sync with shouldListFromStorage function in
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
func shouldDelegateList(opts storage.ListOptions) bool {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate

View File

@ -0,0 +1,136 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"math"
"net/http"
"net/url"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
)
func newListWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc {
estimator := &listWorkEstimator{
countGetterFn: countFn,
}
return estimator.estimate
}
type listWorkEstimator struct {
countGetterFn objectCountGetterFunc
}
func (e *listWorkEstimator) estimate(r *http.Request) WorkEstimate {
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok {
// no RequestInfo should never happen, but to be on the safe side
// let's return maximumSeats
return WorkEstimate{Seats: maximumSeats}
}
query := r.URL.Query()
listOptions := metav1.ListOptions{}
if err := metav1.Convert_url_Values_To_v1_ListOptions(&query, &listOptions, nil); err != nil {
klog.ErrorS(err, "Failed to convert options while estimating work for the list request")
// This request is destined to fail in the validation layer,
// return maximumSeats for this request to be consistent.
return WorkEstimate{Seats: maximumSeats}
}
isListFromCache := !shouldListFromStorage(query, &listOptions)
count, err := e.countGetterFn(key(requestInfo))
switch {
case err == ObjectCountStaleErr:
// object count going stale is indicative of degradation, so we should
// be conservative here and allocate maximum seats to this list request.
// NOTE: if a CRD is removed, its count will go stale first and then the
// pruner will eventually remove the CRD from the cache.
return WorkEstimate{Seats: maximumSeats}
case err == ObjectCountNotFoundErr:
// there are two scenarios in which we can see this error:
// a. the type is truly unknown, a typo on the caller's part.
// b. the count has gone stale for too long and the pruner
// has removed the type from the cache.
// we don't have a way to distinguish between a and b. b seems to indicate
// to a more severe case of degradation, although b can naturally trigger
// when a CRD is removed. let's be conservative and allocate maximum seats.
return WorkEstimate{Seats: maximumSeats}
case err != nil:
// we should never be here since Get returns either ObjectCountStaleErr or
// ObjectCountNotFoundErr, return maximumSeats to be on the safe side.
return WorkEstimate{Seats: maximumSeats}
}
// TODO: For resources that implement indexes at the watchcache level,
// we need to adjust the cost accordingly
var estimatedObjectsToBeProcessed int64
switch {
case isListFromCache:
// if we are here, count is known
estimatedObjectsToBeProcessed = count
default:
// Even if a selector is specified and we may need to list and go over more objects from etcd
// to produce the result of size <limit>, each individual chunk will be of size at most <limit>.
// As a result. the work estimate of the request should be computed based on <limit> and the actual
// cost of processing more elements will be hidden in the request processing latency.
estimatedObjectsToBeProcessed = listOptions.Limit
if estimatedObjectsToBeProcessed == 0 {
// limit has not been specified, fall back to count
estimatedObjectsToBeProcessed = count
}
}
// for now, our rough estimate is to allocate one seat to each 100 obejcts that
// will be processed by the list request.
// we will come up with a different formula for the transformation function and/or
// fine tune this number in future iteratons.
seats := uint(math.Ceil(float64(estimatedObjectsToBeProcessed) / float64(100)))
// make sure we never return a seat of zero
if seats < minimumSeats {
seats = minimumSeats
}
if seats > maximumSeats {
seats = maximumSeats
}
return WorkEstimate{Seats: seats}
}
func key(requestInfo *apirequest.RequestInfo) string {
groupResource := &schema.GroupResource{
Group: requestInfo.APIGroup,
Resource: requestInfo.Resource,
}
return groupResource.String()
}
// NOTICE: Keep in sync with shouldDelegateList function in
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
resourceVersion := opts.ResourceVersion
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(opts.Continue) > 0
hasLimit := pagingEnabled && opts.Limit > 0 && resourceVersion != "0"
return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact
}

View File

@ -17,7 +17,19 @@ limitations under the License.
package request
import (
"fmt"
"net/http"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
)
const (
// the minimum number of seats a request must occupy
minimumSeats = 1
// the maximum number of seats a request can occupy
maximumSeats = 10
)
type WorkEstimate struct {
@ -25,16 +37,18 @@ type WorkEstimate struct {
Seats uint
}
// DefaultWorkEstimator returns estimation with default number of seats
// of 1.
//
// TODO: when we plumb in actual work estimate handling for different
// type of request(s) this function will iterate through a chain
// of workEstimator instance(s).
func DefaultWorkEstimator(_ *http.Request) WorkEstimate {
return WorkEstimate{
Seats: 1,
// objectCountGetterFunc represents a function that gets the total
// number of objects for a given resource.
type objectCountGetterFunc func(string) (int64, error)
// NewWorkEstimator estimates the work that will be done by a given request,
// if no WorkEstimatorFunc matches the given request then the default
// work estimate of 1 seat is allocated to the request.
func NewWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc {
estimator := &workEstimator{
listWorkEstimator: newListWorkEstimator(countFn),
}
return estimator.estimate
}
// WorkEstimatorFunc returns the estimated work of a given request.
@ -45,3 +59,24 @@ type WorkEstimatorFunc func(*http.Request) WorkEstimate
func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate {
return e(r)
}
type workEstimator struct {
// listWorkEstimator estimates work for list request(s)
listWorkEstimator WorkEstimatorFunc
}
func (e *workEstimator) estimate(r *http.Request) WorkEstimate {
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok {
klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI)
// no RequestInfo should never happen, but to be on the safe side let's return maximumSeats
return WorkEstimate{Seats: maximumSeats}
}
switch requestInfo.Verb {
case "list":
return e.listWorkEstimator.EstimateWork(r)
}
return WorkEstimate{Seats: minimumSeats}
}

View File

@ -0,0 +1,253 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"errors"
"net/http"
"testing"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
func TestWorkEstimator(t *testing.T) {
tests := []struct {
name string
requestURI string
requestInfo *apirequest.RequestInfo
counts map[string]int64
countErr error
seatsExpected uint
}{
{
name: "request has no RequestInfo",
requestURI: "http://server/apis/",
requestInfo: nil,
seatsExpected: maximumSeats,
},
{
name: "request verb is not list",
requestURI: "http://server/apis/",
requestInfo: &apirequest.RequestInfo{
Verb: "get",
},
seatsExpected: minimumSeats,
},
{
name: "request verb is list, conversion to ListOptions returns error",
requestURI: "http://server/apis/foo.bar/v1/events?limit=invalid",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
seatsExpected: maximumSeats,
},
{
name: "request verb is list, resource version not set",
requestURI: "http://server/apis/foo.bar/v1/events?limit=499",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
seatsExpected: 5,
},
{
name: "request verb is list, continuation is set",
requestURI: "http://server/apis/foo.bar/v1/events?continue=token&limit=499&resourceVersion=1",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
seatsExpected: 5,
},
{
name: "request verb is list, has limit",
requestURI: "http://server/apis/foo.bar/v1/events?limit=499&resourceVersion=1",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
seatsExpected: 5,
},
{
name: "request verb is list, resource version is zero",
requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=0",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
seatsExpected: 8,
},
{
name: "request verb is list, no query parameters, count known",
requestURI: "http://server/apis/foo.bar/v1/events",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
seatsExpected: 8,
},
{
name: "request verb is list, no query parameters, count not known",
requestURI: "http://server/apis/foo.bar/v1/events",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
countErr: ObjectCountNotFoundErr,
seatsExpected: maximumSeats,
},
{
name: "request verb is list, resource version match is Exact",
requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=foo&resourceVersionMatch=Exact&limit=499",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
seatsExpected: 5,
},
{
name: "request verb is list, resource version match is NotOlderThan, limit not specified",
requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=foo&resourceVersionMatch=NotOlderThan",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
seatsExpected: 8,
},
{
name: "request verb is list, maximum is capped",
requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=foo",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 1999,
},
seatsExpected: maximumSeats,
},
{
name: "request verb is list, list from cache, count not known",
requestURI: "http://server/apis/foo.bar/v1/events?resourceVersion=0&limit=799",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
countErr: ObjectCountNotFoundErr,
seatsExpected: maximumSeats,
},
{
name: "request verb is list, object count is stale",
requestURI: "http://server/apis/foo.bar/v1/events?limit=499",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
countErr: ObjectCountStaleErr,
seatsExpected: maximumSeats,
},
{
name: "request verb is list, object count is not found",
requestURI: "http://server/apis/foo.bar/v1/events?limit=499",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
countErr: ObjectCountNotFoundErr,
seatsExpected: maximumSeats,
},
{
name: "request verb is list, count getter throws unknown error",
requestURI: "http://server/apis/foo.bar/v1/events?limit=499",
requestInfo: &apirequest.RequestInfo{
Verb: "list",
APIGroup: "foo.bar",
Resource: "events",
},
countErr: errors.New("unknown error"),
seatsExpected: maximumSeats,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
counts := test.counts
if len(counts) == 0 {
counts = map[string]int64{}
}
countsFn := func(key string) (int64, error) {
return counts[key], test.countErr
}
estimator := NewWorkEstimator(countsFn)
req, err := http.NewRequest("GET", test.requestURI, nil)
if err != nil {
t.Fatalf("Failed to create new HTTP request - %v", err)
}
if test.requestInfo != nil {
req = req.WithContext(apirequest.WithRequestInfo(req.Context(), test.requestInfo))
}
workestimateGot := estimator.EstimateWork(req)
if test.seatsExpected != workestimateGot.Seats {
t.Errorf("Expected work estimate to match: %d seats, but got: %d seats", test.seatsExpected, workestimateGot.Seats)
}
})
}
}