diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index fd5a3ab9d1a..80232fd4fd3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -101,7 +101,7 @@ func WithPriorityAndFairness( } var classification *PriorityAndFairnessClassification - note := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) { + estimateWork := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) flowcontrolrequest.WorkEstimate { classification = &PriorityAndFairnessClassification{ FlowSchemaName: fs.Name, FlowSchemaUID: fs.UID, @@ -111,6 +111,7 @@ func WithPriorityAndFairness( httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name)) httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name)) httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher)) + return workEstimator(r, fs.Name, pl.Name) } var served bool @@ -137,13 +138,9 @@ func WithPriorityAndFairness( } } - // find the estimated amount of work of the request - // TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter. - workEstimate := workEstimator.EstimateWork(r) digest := utilflowcontrol.RequestDigest{ - RequestInfo: requestInfo, - User: user, - WorkEstimate: workEstimate, + RequestInfo: requestInfo, + User: user, } if isWatchRequest { @@ -179,7 +176,7 @@ func WithPriorityAndFairness( execute := func() { startedAt := time.Now() defer func() { - httplog.AddKeyValue(ctx, "apf_init_latency", time.Now().Sub(startedAt)) + httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt)) }() noteExecutingDelta(1) defer noteExecutingDelta(-1) @@ -238,7 +235,7 @@ func WithPriorityAndFairness( // Note that Handle will return irrespective of whether the request // executes or is rejected. In the latter case, the function will return // without calling the passed `execute` function. - fcIfc.Handle(handleCtx, digest, note, queueNote, execute) + fcIfc.Handle(handleCtx, digest, estimateWork, queueNote, execute) }() select { @@ -269,7 +266,7 @@ func WithPriorityAndFairness( handler.ServeHTTP(w, r) } - fcIfc.Handle(ctx, digest, note, queueNote, execute) + fcIfc.Handle(ctx, digest, estimateWork, queueNote, execute) } if !served { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index aeccf5afed2..d4619023bba 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -70,7 +70,7 @@ const ( decisionSkipFilter ) -var defaultRequestWorkEstimator = func(*http.Request) fcrequest.WorkEstimate { +var defaultRequestWorkEstimator = func(req *http.Request, fsName, plName string) fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} } @@ -87,14 +87,14 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) { func (t fakeApfFilter) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn, execFn func(), ) { if t.mockDecision == decisionSkipFilter { panic("Handle should not be invoked") } - noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) + workEstimator(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) switch t.mockDecision { case decisionNoQueuingExecute: execFn() @@ -390,7 +390,7 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { func (f *fakeWatchApfFilter) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, _ fq.QueueNoteFn, execFn func(), ) { @@ -635,14 +635,16 @@ func TestContextClosesOnRequestProcessed(t *testing.T) { type fakeFilterRequestDigest struct { *fakeApfFilter requestDigestGot *utilflowcontrol.RequestDigest + workEstimateGot fcrequest.WorkEstimate } func (f *fakeFilterRequestDigest) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, _ fq.QueueNoteFn, _ func(), ) { f.requestDigestGot = &requestDigest + f.workEstimateGot = workEstimator(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "") } func TestApfWithRequestDigest(t *testing.T) { @@ -652,17 +654,17 @@ func TestApfWithRequestDigest(t *testing.T) { reqDigestExpected := &utilflowcontrol.RequestDigest{ RequestInfo: &apirequest.RequestInfo{Verb: "get"}, User: &user.DefaultInfo{Name: "foo"}, - WorkEstimate: fcrequest.WorkEstimate{ - InitialSeats: 5, - FinalSeats: 7, - AdditionalLatency: 3 * time.Second, - }, + } + workExpected := fcrequest.WorkEstimate{ + InitialSeats: 5, + FinalSeats: 7, + AdditionalLatency: 3 * time.Second, } handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}), longRunningFunc, fakeFilter, - func(_ *http.Request) fcrequest.WorkEstimate { return reqDigestExpected.WorkEstimate }, + func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected }, ) w := httptest.NewRecorder() @@ -678,6 +680,9 @@ func TestApfWithRequestDigest(t *testing.T) { if !reflect.DeepEqual(reqDigestExpected, fakeFilter.requestDigestGot) { t.Errorf("Expected RequestDigest to match, diff: %s", cmp.Diff(reqDigestExpected, fakeFilter.requestDigestGot)) } + if !reflect.DeepEqual(workExpected, fakeFilter.workEstimateGot) { + t.Errorf("Expected WorkEstimate to match, diff: %s", cmp.Diff(workExpected, fakeFilter.workEstimateGot)) + } } func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 8039ddb63ab..71961f007f7 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -89,9 +89,8 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af // RequestDigest holds necessary info from request for flow-control type RequestDigest struct { - RequestInfo *request.RequestInfo - User user.Info - WorkEstimate fcrequest.WorkEstimate + RequestInfo *request.RequestInfo + User user.Info } // `*configController` maintains eventual consistency with the API @@ -800,7 +799,7 @@ func (immediateRequest) Finish(execute func()) bool { // The returned bool indicates whether the request is exempt from // limitation. The startWaitingTime is when the request started // waiting in its queue, or `Time{}` if this did not happen. -func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time, flowDistinguisher string) { +func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { klog.V(7).Infof("startRequest(%#+v)", rd) cfgCtlr.lock.RLock() defer cfgCtlr.lock.RUnlock() @@ -832,24 +831,26 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig plState := cfgCtlr.priorityLevelStates[plName] if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName) - return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}, "" + return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{} } var numQueues int32 if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue { numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues } + var flowDistinguisher string var hashValue uint64 if numQueues > 1 { flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) } + workEstimate := workEstimator(selectedFlowSchema, plState.pl, flowDistinguisher) startWaitingTime = time.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) - req, idle := plState.queues.StartRequest(ctx, &rd.WorkEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) + req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) if idle { cfgCtlr.maybeReapReadLocked(plName, plState) } - return selectedFlowSchema, plState.pl, false, req, startWaitingTime, flowDistinguisher + return selectedFlowSchema, plState.pl, false, req, startWaitingTime } // maybeReap will remove the last internal traces of the named diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 123329ef104..b16d0ae463c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -26,6 +26,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock" fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" kubeinformers "k8s.io/client-go/informers" "k8s.io/klog/v2" "k8s.io/utils/clock" @@ -41,8 +42,9 @@ const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer- // Interface defines how the API Priority and Fairness filter interacts with the underlying system. type Interface interface { // Handle takes care of queuing and dispatching a request - // characterized by the given digest. The given `noteFn` will be - // invoked with the results of request classification. If the + // characterized by the given digest. The given `workEstimator` will be + // invoked with the results of request classification and must return the + // work parameters for the request. If the // request is queued then `queueNoteFn` will be called twice, // first with `true` and then with `false`; otherwise // `queueNoteFn` will not be called at all. If Handle decides @@ -53,7 +55,7 @@ type Interface interface { // ctx is cancelled or times out. Handle(ctx context.Context, requestDigest RequestDigest, - noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn, execFn func(), ) @@ -148,12 +150,11 @@ func NewTestable(config TestableConfig) Interface { } func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest, - noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn, execFn func()) { - fs, pl, isExempt, req, startWaitingTime, flowDistinguisher := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn) + fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, workEstimator, queueNoteFn) queued := startWaitingTime != time.Time{} - noteFn(fs, pl, flowDistinguisher) if req == nil { if queued { metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index 7f7f9042de4..c14b74d70ee 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -462,7 +462,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes startWG.Add(1) go func(matches, isResource bool, rdu RequestDigest) { expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name) - ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) { + ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) fcrequest.WorkEstimate { matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt if testDebugLogs { t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt) @@ -475,6 +475,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name) } } + return fcrequest.WorkEstimate{InitialSeats: 1} }, func(inQueue bool) { }, func() { startWG.Done() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go index d9a720a4bd6..d02e65f79a7 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go @@ -27,7 +27,6 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" - fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" ) func TestMatching(t *testing.T) { @@ -102,8 +101,7 @@ func TestLiterals(t *testing.T) { Name: "eman", Parts: []string{"goodrscs", "eman"}, }, - User: ui, - WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1}, + User: ui, } reqRU := RequestDigest{ RequestInfo: &request.RequestInfo{ @@ -118,8 +116,7 @@ func TestLiterals(t *testing.T) { Name: "eman", Parts: []string{"goodrscs", "eman"}, }, - User: ui, - WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1}, + User: ui, } reqN := RequestDigest{ RequestInfo: &request.RequestInfo{ @@ -127,8 +124,7 @@ func TestLiterals(t *testing.T) { Path: "/openapi/v2", Verb: "goodverb", }, - User: ui, - WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1}, + User: ui, } checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{ Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index 5783f24b1d0..88f812490ba 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -289,6 +289,17 @@ var ( }, []string{priorityLevel, flowSchema}, ) + watchCountSamples = compbasemetrics.NewHistogramVec( + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "watch_count_samples", + Help: "count of watchers for mutating requests in API Priority and Fairness", + Buckets: []float64{0, 1, 10, 100, 1000, 10000}, + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel, flowSchema}, + ) apiserverEpochAdvances = compbasemetrics.NewCounterVec( &compbasemetrics.CounterOpts{ Namespace: namespace, @@ -315,6 +326,7 @@ var ( apiserverCurrentExecutingRequests, apiserverRequestWaitingSeconds, apiserverRequestExecutionSeconds, + watchCountSamples, apiserverEpochAdvances, }. Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...). @@ -383,6 +395,12 @@ func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema str apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) } +// ObserveWatchCount notes a sampling of a watch count +func ObserveWatchCount(ctx context.Context, priorityLevel, flowSchema string, count int) { + watchCountSamples.WithLabelValues(priorityLevel, flowSchema).Observe(float64(count)) +} + +// AddEpochAdvance notes an advance of the progress meter baseline for a given priority level func AddEpochAdvance(ctx context.Context, priorityLevel string, success bool) { apiserverEpochAdvances.WithContext(ctx).WithLabelValues(priorityLevel, strconv.FormatBool(success)).Inc() } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 6cdced290f0..7fcc0903e83 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -40,7 +40,7 @@ type listWorkEstimator struct { countGetterFn objectCountGetterFunc } -func (e *listWorkEstimator) estimate(r *http.Request) WorkEstimate { +func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) if !ok { // no RequestInfo should never happen, but to be on the safe side diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go index ff0dd357149..1c6c441e27c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go @@ -22,6 +22,7 @@ import ( "time" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" ) const ( @@ -49,7 +50,9 @@ type mutatingWorkEstimator struct { enabled bool } -func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { +func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { + // TODO(wojtekt): Remove once we tune the algorithm to not fail + // scalability tests. if !e.enabled { return WorkEstimate{ InitialSeats: 1, @@ -67,6 +70,7 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { } } watchCount := e.countFn(requestInfo) + metrics.ObserveWatchCount(r.Context(), priorityLevelName, flowSchemaName, watchCount) // The cost of the request associated with the watchers of that event // consists of three parts: diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index b2c6d860a7b..675433c2c31 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -83,10 +83,10 @@ func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCou // WorkEstimatorFunc returns the estimated work of a given request. // This function will be used by the Priority & Fairness filter to // estimate the work of of incoming requests. -type WorkEstimatorFunc func(*http.Request) WorkEstimate +type WorkEstimatorFunc func(request *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate -func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate { - return e(r) +func (e WorkEstimatorFunc) EstimateWork(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { + return e(r, flowSchemaName, priorityLevelName) } type workEstimator struct { @@ -96,7 +96,7 @@ type workEstimator struct { mutatingWorkEstimator WorkEstimatorFunc } -func (e *workEstimator) estimate(r *http.Request) WorkEstimate { +func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) if !ok { klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI) @@ -106,9 +106,9 @@ func (e *workEstimator) estimate(r *http.Request) WorkEstimate { switch requestInfo.Verb { case "list": - return e.listWorkEstimator.EstimateWork(r) + return e.listWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName) case "create", "update", "patch", "delete": - return e.mutatingWorkEstimator.EstimateWork(r) + return e.mutatingWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName) } return WorkEstimate{InitialSeats: minimumSeats} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go index caf2e53b739..ee27fff42fc 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go @@ -410,7 +410,7 @@ func TestWorkEstimator(t *testing.T) { req = req.WithContext(apirequest.WithRequestInfo(req.Context(), test.requestInfo)) } - workestimateGot := estimator.EstimateWork(req) + workestimateGot := estimator.EstimateWork(req, "testFS", "testPL") if test.initialSeatsExpected != workestimateGot.InitialSeats { t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats) } diff --git a/test/integration/apiserver/flowcontrol/fight_test.go b/test/integration/apiserver/flowcontrol/fight_test.go index 509f471e0c2..08341d70dcd 100644 --- a/test/integration/apiserver/flowcontrol/fight_test.go +++ b/test/integration/apiserver/flowcontrol/fight_test.go @@ -139,7 +139,8 @@ func (ft *fightTest) createController(invert bool, i int) { FlowcontrolClient: fcIfc, ServerConcurrencyLimit: 200, // server concurrency limit RequestWaitLimit: time.Minute / 4, // request wait limit - ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, QueueSetFactory: fqtesting.NewNoRestraintFactory(), }) ft.ctlrs[invert][i] = ctlr