diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 32825f379c7..b30cd66eece 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -64,6 +64,7 @@ import ( "k8s.io/apiserver/pkg/storageversion" "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/client-go/informers" restclient "k8s.io/client-go/rest" "k8s.io/component-base/logs" @@ -215,6 +216,9 @@ type Config struct { // If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig. MergedResourceConfig *serverstore.ResourceConfig + // RequestWidthEstimator is used to estimate the "width" of the incoming request(s). + RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc + //=========================================================================== // values below here are targets for removal //=========================================================================== @@ -338,6 +342,8 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Default to treating watch as a long-running operation // Generic API servers have no inherent long-running subresources LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), + RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator, + APIServerID: id, StorageVersionManager: storageversion.NewDefaultManager(), } @@ -728,7 +734,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { if c.FlowControl != nil { handler = filterlatency.TrackCompleted(handler) - handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl) + handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, c.RequestWidthEstimator) handler = filterlatency.TrackStarted(handler, "priorityandfairness") } else { handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 23ea5b7287a..6b14b42e203 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -28,6 +28,7 @@ import ( apirequest "k8s.io/apiserver/pkg/endpoints/request" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/klog/v2" ) @@ -59,6 +60,7 @@ func WithPriorityAndFairness( handler http.Handler, longRunningRequestCheck apirequest.LongRunningRequestCheck, fcIfc utilflowcontrol.Interface, + widthEstimator flowcontrolrequest.WidthEstimatorFunc, ) http.Handler { if fcIfc == nil { klog.Warningf("priority and fairness support not found, skipping") @@ -159,7 +161,11 @@ func WithPriorityAndFairness( handler.ServeHTTP(w, innerReq) } } - digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user} + + // find the estimated "width" of the request + width := widthEstimator.EstimateWidth(r) + digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user, Width: width} + fcIfc.Handle(ctx, digest, note, func(inQueue bool) { if inQueue { noteWaitingDelta(1) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index f620c8ff197..d94b18d40df 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -50,6 +50,8 @@ import ( "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" + + "github.com/google/go-cmp/cmp" ) func TestMain(m *testing.M) { @@ -67,6 +69,8 @@ const ( decisionSkipFilter ) +var defaultRequestWidthEstimator = func(*http.Request) uint { return 1 } + type fakeApfFilter struct { mockDecision mockDecision postEnqueue func() @@ -157,7 +161,7 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { onExecute() - }), longRunningRequestCheck, flowControlFilter) + }), longRunningRequestCheck, flowControlFilter, defaultRequestWidthEstimator) handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ @@ -562,6 +566,50 @@ func TestApfCancelWaitRequest(t *testing.T) { }) } +type fakeFilterRequestDigest struct { + *fakeApfFilter + requestDigestGot *utilflowcontrol.RequestDigest +} + +func (f *fakeFilterRequestDigest) Handle(ctx context.Context, + requestDigest utilflowcontrol.RequestDigest, + _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration), + _ fq.QueueNoteFn, _ func(), +) { + f.requestDigestGot = &requestDigest +} + +func TestApfWithRequestDigest(t *testing.T) { + longRunningFunc := func(_ *http.Request, _ *apirequest.RequestInfo) bool { return false } + fakeFilter := &fakeFilterRequestDigest{} + + reqDigestExpected := &utilflowcontrol.RequestDigest{ + RequestInfo: &apirequest.RequestInfo{Verb: "get"}, + User: &user.DefaultInfo{Name: "foo"}, + Width: 5, + } + + handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}), + longRunningFunc, + fakeFilter, + func(_ *http.Request) uint { return reqDigestExpected.Width }, + ) + + w := httptest.NewRecorder() + req, err := http.NewRequest(http.MethodGet, "/bar", nil) + if err != nil { + t.Fatalf("Failed to create new http request - %v", err) + } + req = req.WithContext(apirequest.WithRequestInfo(req.Context(), reqDigestExpected.RequestInfo)) + req = req.WithContext(apirequest.WithUser(req.Context(), reqDigestExpected.User)) + + handler.ServeHTTP(w, req) + + if !reflect.DeepEqual(reqDigestExpected, fakeFilter.requestDigestGot) { + t.Errorf("Expected RequestDigest to match, diff: %s", cmp.Diff(reqDigestExpected, fakeFilter.requestDigestGot)) + } +} + func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { fcmetrics.Register() @@ -1058,7 +1106,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol. requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) - apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter) + apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWidthEstimator) // add the handler in the chain that adds the specified user to the request context handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 3ee3867456d..fe120eaaef6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -82,6 +82,7 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af type RequestDigest struct { RequestInfo *request.RequestInfo User user.Info + Width uint } // `*configController` maintains eventual consistency with the API @@ -804,7 +805,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig } startWaitingTime = time.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) - req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) + req, idle := plState.queues.StartRequest(ctx, rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) if idle { cfgCtlr.maybeReapLocked(plName, plState) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index 92f820916ca..cf0280d3c02 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -139,7 +139,7 @@ func (cqs *ctlrTestQueueSet) IsIdle() bool { return cqs.countActive == 0 } -func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) { +func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) { cqs.cts.lock.Lock() defer cqs.cts.lock.Unlock() cqs.countActive++ diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index a91656c5629..1865daeef12 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -80,7 +80,7 @@ type QueueSet interface { // was idle at the moment of the return. Otherwise idle==false // and the client must call the Finish method of the Request // exactly once. - StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) + StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) // UpdateObservations makes sure any time-based statistics have // caught up with the current clock reading diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 34f2bc370af..d55bc40f549 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -236,10 +236,7 @@ const ( // executing at each point where there is a change in that quantity, // because the metrics --- and only the metrics --- track that // quantity per FlowSchema. -func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { - // all request(s) have a width of 1, in keeping with the current behavior - width := 1.0 - +func (qs *queueSet) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { qs.lockAndSyncTime() defer qs.lock.Unlock() var req *request @@ -320,7 +317,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist // Seats returns the number of seats this request requires. func (req *request) Seats() int { - return int(math.Ceil(req.width)) + return int(req.width) } func (req *request) NoteQueued(inQueue bool) { @@ -440,7 +437,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached -func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width float64, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { // Start with the shuffle sharding, to pick a queue. queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] @@ -578,7 +575,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { } } -func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width float64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { +func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width uint, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { now := qs.clock.Now() req := &request{ qs: qs, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 406e820b4bf..0779a4ffaca 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -226,7 +226,7 @@ func (ust *uniformScenarioThread) callK(k int) { if k >= ust.nCalls { return } - req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) + req, idle := ust.uss.qs.StartRequest(context.Background(), 1, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) if req == nil { atomic.AddUint64(&ust.uss.failedCount, 1) @@ -658,7 +658,7 @@ func TestContextCancel(t *testing.T) { ctx1 := context.Background() b2i := map[bool]int{false: 0, true: 1} var qnc [2][2]int32 - req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) + req1, _ := qs.StartRequest(ctx1, 1, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) if req1 == nil { t.Error("Request rejected") return @@ -686,7 +686,7 @@ func TestContextCancel(t *testing.T) { counter.Add(1) cancel2() }() - req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) + req2, idle2a := qs.StartRequest(ctx2, 1, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) if idle2a { t.Error("2nd StartRequest returned idle") } @@ -745,7 +745,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { } ctx := context.Background() - req, _ := qs.StartRequest(ctx, 1, "", "fs", "test", "one", func(inQueue bool) {}) + req, _ := qs.StartRequest(ctx, 1, 1, "", "fs", "test", "one", func(inQueue bool) {}) if req == nil { t.Fatal("expected a Request object from StartRequest, but got nil") } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 7309d3bf04e..d074b0bfdc7 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -44,7 +44,7 @@ type request struct { startTime time.Time // width of the request - width float64 + width uint // decision gets set to a `requestDecision` indicating what to do // with this request. It gets set exactly once, when the request diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 0cc08b18215..f711126c4df 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -55,7 +55,7 @@ func (noRestraint) IsIdle() bool { return false } -func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { +func (noRestraint) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { return noRestraintRequest{}, false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go index 66551f25c9d..f82085f6087 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go @@ -89,7 +89,7 @@ func TestLiterals(t *testing.T) { ui := &user.DefaultInfo{Name: "goodu", UID: "1", Groups: []string{"goodg1", "goodg2"}} reqRN := RequestDigest{ - &request.RequestInfo{ + RequestInfo: &request.RequestInfo{ IsResourceRequest: true, Path: "/apis/goodapig/v1/namespaces/goodns/goodrscs", Verb: "goodverb", @@ -99,10 +99,13 @@ func TestLiterals(t *testing.T) { Namespace: "goodns", Resource: "goodrscs", Name: "eman", - Parts: []string{"goodrscs", "eman"}}, - ui} + Parts: []string{"goodrscs", "eman"}, + }, + User: ui, + Width: 1, + } reqRU := RequestDigest{ - &request.RequestInfo{ + RequestInfo: &request.RequestInfo{ IsResourceRequest: true, Path: "/apis/goodapig/v1/goodrscs", Verb: "goodverb", @@ -112,14 +115,20 @@ func TestLiterals(t *testing.T) { Namespace: "", Resource: "goodrscs", Name: "eman", - Parts: []string{"goodrscs", "eman"}}, - ui} + Parts: []string{"goodrscs", "eman"}, + }, + User: ui, + Width: 1, + } reqN := RequestDigest{ - &request.RequestInfo{ + RequestInfo: &request.RequestInfo{ IsResourceRequest: false, Path: "/openapi/v2", - Verb: "goodverb"}, - ui} + Verb: "goodverb", + }, + User: ui, + Width: 1, + } checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{ Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser, User: &flowcontrol.UserSubject{"goodu"}}}, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go new file mode 100644 index 00000000000..6fc65b5d5fc --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -0,0 +1,40 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "net/http" +) + +// DefaultWidthEstimator returns returns '1' as the "width" +// of the given request. +// +// TODO: when we plumb in actual "width" handling for different +// type of request(s) this function will iterate through a chain +// of widthEstimator instance(s). +func DefaultWidthEstimator(_ *http.Request) uint { + return 1 +} + +// WidthEstimatorFunc returns the estimated "width" of a given request. +// This function will be used by the Priority & Fairness filter to +// estimate the "width" of incoming requests. +type WidthEstimatorFunc func(*http.Request) uint + +func (e WidthEstimatorFunc) EstimateWidth(r *http.Request) uint { + return e(r) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 11e550992df..3e1fd126d02 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1494,6 +1494,7 @@ k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing k8s.io/apiserver/pkg/util/flowcontrol/format k8s.io/apiserver/pkg/util/flowcontrol/metrics +k8s.io/apiserver/pkg/util/flowcontrol/request k8s.io/apiserver/pkg/util/flushwriter k8s.io/apiserver/pkg/util/openapi k8s.io/apiserver/pkg/util/proxy