diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 8c6d2d12c6d..8b95021c747 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -101,7 +101,7 @@ func WithPriorityAndFairness( } var classification *PriorityAndFairnessClassification - note := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) { + estimateWork := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) flowcontrolrequest.WorkEstimate { classification = &PriorityAndFairnessClassification{ FlowSchemaName: fs.Name, FlowSchemaUID: fs.UID, @@ -111,6 +111,7 @@ func WithPriorityAndFairness( httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name)) httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name)) httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher)) + return workEstimator(r, fs.Name, pl.Name) } var served bool @@ -137,13 +138,9 @@ func WithPriorityAndFairness( } } - // find the estimated amount of work of the request - // TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter. - workEstimate := workEstimator.EstimateWork(r) digest := utilflowcontrol.RequestDigest{ - RequestInfo: requestInfo, - User: user, - WorkEstimate: workEstimate, + RequestInfo: requestInfo, + User: user, } if isWatchRequest { @@ -179,7 +176,7 @@ func WithPriorityAndFairness( execute := func() { startedAt := time.Now() defer func() { - httplog.AddKeyValue(ctx, "apf_init_latency", time.Now().Sub(startedAt)) + httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt)) }() noteExecutingDelta(1) defer noteExecutingDelta(-1) @@ -238,7 +235,7 @@ func WithPriorityAndFairness( // Note that Handle will return irrespective of whether the request // executes or is rejected. In the latter case, the function will return // without calling the passed `execute` function. - fcIfc.Handle(handleCtx, digest, note, queueNote, execute) + fcIfc.Handle(handleCtx, digest, estimateWork, queueNote, execute) }() select { @@ -269,7 +266,7 @@ func WithPriorityAndFairness( handler.ServeHTTP(w, r) } - fcIfc.Handle(ctx, digest, note, queueNote, execute) + fcIfc.Handle(ctx, digest, estimateWork, queueNote, execute) } if !served { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index aeccf5afed2..d4619023bba 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -70,7 +70,7 @@ const ( decisionSkipFilter ) -var defaultRequestWorkEstimator = func(*http.Request) fcrequest.WorkEstimate { +var defaultRequestWorkEstimator = func(req *http.Request, fsName, plName string) fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} } @@ -87,14 +87,14 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) { func (t fakeApfFilter) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn, execFn func(), ) { if t.mockDecision == decisionSkipFilter { panic("Handle should not be invoked") } - noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) + workEstimator(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) switch t.mockDecision { case decisionNoQueuingExecute: execFn() @@ -390,7 +390,7 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { func (f *fakeWatchApfFilter) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, _ fq.QueueNoteFn, execFn func(), ) { @@ -635,14 +635,16 @@ func TestContextClosesOnRequestProcessed(t *testing.T) { type fakeFilterRequestDigest struct { *fakeApfFilter requestDigestGot *utilflowcontrol.RequestDigest + workEstimateGot fcrequest.WorkEstimate } func (f *fakeFilterRequestDigest) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, _ fq.QueueNoteFn, _ func(), ) { f.requestDigestGot = &requestDigest + f.workEstimateGot = workEstimator(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "") } func TestApfWithRequestDigest(t *testing.T) { @@ -652,17 +654,17 @@ func TestApfWithRequestDigest(t *testing.T) { reqDigestExpected := &utilflowcontrol.RequestDigest{ RequestInfo: &apirequest.RequestInfo{Verb: "get"}, User: &user.DefaultInfo{Name: "foo"}, - WorkEstimate: fcrequest.WorkEstimate{ - InitialSeats: 5, - FinalSeats: 7, - AdditionalLatency: 3 * time.Second, - }, + } + workExpected := fcrequest.WorkEstimate{ + InitialSeats: 5, + FinalSeats: 7, + AdditionalLatency: 3 * time.Second, } handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}), longRunningFunc, fakeFilter, - func(_ *http.Request) fcrequest.WorkEstimate { return reqDigestExpected.WorkEstimate }, + func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected }, ) w := httptest.NewRecorder() @@ -678,6 +680,9 @@ func TestApfWithRequestDigest(t *testing.T) { if !reflect.DeepEqual(reqDigestExpected, fakeFilter.requestDigestGot) { t.Errorf("Expected RequestDigest to match, diff: %s", cmp.Diff(reqDigestExpected, fakeFilter.requestDigestGot)) } + if !reflect.DeepEqual(workExpected, fakeFilter.workEstimateGot) { + t.Errorf("Expected WorkEstimate to match, diff: %s", cmp.Diff(workExpected, fakeFilter.workEstimateGot)) + } } func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 860636e16fd..71961f007f7 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -89,9 +89,8 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af // RequestDigest holds necessary info from request for flow-control type RequestDigest struct { - RequestInfo *request.RequestInfo - User user.Info - WorkEstimate fcrequest.WorkEstimate + RequestInfo *request.RequestInfo + User user.Info } // `*configController` maintains eventual consistency with the API @@ -100,10 +99,11 @@ type RequestDigest struct { // this type and cfgMeal follow the convention that the suffix // "Locked" means that the caller must hold the configController lock. type configController struct { - name string // varies in tests of fighting controllers - clock clock.PassiveClock - queueSetFactory fq.QueueSetFactory - obsPairGenerator metrics.TimedObserverPairGenerator + name string // varies in tests of fighting controllers + clock clock.PassiveClock + queueSetFactory fq.QueueSetFactory + reqsObsPairGenerator metrics.TimedObserverPairGenerator + execSeatsObsGenerator metrics.TimedObserverGenerator // How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager asFieldManager string @@ -192,8 +192,11 @@ type priorityLevelState struct { // returned StartFunction numPending int - // Observers tracking number waiting, executing - obsPair metrics.TimedObserverPair + // Observers tracking number of requests waiting, executing + reqsObsPair metrics.TimedObserverPair + + // Observer of number of seats occupied throughout execution + execSeatsObs metrics.TimedObserver } // NewTestableController is extra flexible to facilitate testing @@ -202,7 +205,8 @@ func newTestableController(config TestableConfig) *configController { name: config.Name, clock: config.Clock, queueSetFactory: config.QueueSetFactory, - obsPairGenerator: config.ObsPairGenerator, + reqsObsPairGenerator: config.ReqsObsPairGenerator, + execSeatsObsGenerator: config.ExecSeatsObsGenerator, asFieldManager: config.AsFieldManager, foundToDangling: config.FoundToDangling, serverConcurrencyLimit: config.ServerConcurrencyLimit, @@ -534,9 +538,10 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi for _, pl := range newPLs { state := meal.cfgCtlr.priorityLevelStates[pl.Name] if state == nil { - state = &priorityLevelState{obsPair: meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{pl.Name})} + labelValues := []string{pl.Name} + state = &priorityLevelState{reqsObsPair: meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsObsGenerator.Generate(1, 1, labelValues)} } - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.obsPair) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsObsPair, state.execSeatsObs) if err != nil { klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) continue @@ -639,7 +644,7 @@ func (meal *cfgMeal) processOldPLsLocked() { } } var err error - plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.obsPair) + plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsObsPair, plState.execSeatsObs) if err != nil { // This can not happen because queueSetCompleterForPL already approved this config panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) @@ -688,7 +693,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { // given priority level configuration. Returns nil if that config // does not call for limiting. Returns nil and an error if the given // object is malformed in a way that is a problem for this package. -func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { +func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) { if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { return nil, errors.New("broken union structure at the top") } @@ -717,7 +722,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow if queues != nil { qsc, err = queues.BeginConfigChange(qcQS) } else { - qsc, err = qsf.BeginConstruction(qcQS, intPair) + qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs) } if err != nil { err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err) @@ -762,17 +767,20 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl // that does not actually exist (right now) as a real API object. func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) { klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name) - obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name}) - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair) + labelValues := []string{proto.Name} + reqsObsPair := meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues) + execSeatsObs := meal.cfgCtlr.execSeatsObsGenerator.Generate(1, 1, labelValues) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsObsPair, execSeatsObs) if err != nil { // This can not happen because proto is one of the mandatory // objects and these are not erroneous panic(err) } meal.newPLStates[proto.Name] = &priorityLevelState{ - pl: proto, - qsCompleter: qsCompleter, - obsPair: obsPair, + pl: proto, + qsCompleter: qsCompleter, + reqsObsPair: reqsObsPair, + execSeatsObs: execSeatsObs, } if proto.Spec.Limited != nil { meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares) @@ -791,7 +799,7 @@ func (immediateRequest) Finish(execute func()) bool { // The returned bool indicates whether the request is exempt from // limitation. The startWaitingTime is when the request started // waiting in its queue, or `Time{}` if this did not happen. -func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time, flowDistinguisher string) { +func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { klog.V(7).Infof("startRequest(%#+v)", rd) cfgCtlr.lock.RLock() defer cfgCtlr.lock.RUnlock() @@ -823,24 +831,26 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig plState := cfgCtlr.priorityLevelStates[plName] if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName) - return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}, "" + return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{} } var numQueues int32 if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue { numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues } + var flowDistinguisher string var hashValue uint64 if numQueues > 1 { flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) } + workEstimate := workEstimator(selectedFlowSchema, plState.pl, flowDistinguisher) startWaitingTime = time.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) - req, idle := plState.queues.StartRequest(ctx, &rd.WorkEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) + req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) if idle { cfgCtlr.maybeReapReadLocked(plName, plState) } - return selectedFlowSchema, plState.pl, false, req, startWaitingTime, flowDistinguisher + return selectedFlowSchema, plState.pl, false, req, startWaitingTime } // maybeReap will remove the last internal traces of the named diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index ae26253d905..b16d0ae463c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -26,6 +26,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock" fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" kubeinformers "k8s.io/client-go/informers" "k8s.io/klog/v2" "k8s.io/utils/clock" @@ -41,8 +42,9 @@ const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer- // Interface defines how the API Priority and Fairness filter interacts with the underlying system. type Interface interface { // Handle takes care of queuing and dispatching a request - // characterized by the given digest. The given `noteFn` will be - // invoked with the results of request classification. If the + // characterized by the given digest. The given `workEstimator` will be + // invoked with the results of request classification and must return the + // work parameters for the request. If the // request is queued then `queueNoteFn` will be called twice, // first with `true` and then with `false`; otherwise // `queueNoteFn` will not be called at all. If Handle decides @@ -53,7 +55,7 @@ type Interface interface { // ctx is cancelled or times out. Handle(ctx context.Context, requestDigest RequestDigest, - noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn, execFn func(), ) @@ -92,7 +94,8 @@ func New( FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: serverConcurrencyLimit, RequestWaitLimit: requestWaitLimit, - ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, QueueSetFactory: fqs.NewQueueSetFactory(clk), }) } @@ -131,8 +134,11 @@ type TestableConfig struct { // RequestWaitLimit configured on the server RequestWaitLimit time.Duration - // ObsPairGenerator for metrics - ObsPairGenerator metrics.TimedObserverPairGenerator + // ObsPairGenerator for metrics about requests + ReqsObsPairGenerator metrics.TimedObserverPairGenerator + + // TimedObserverPairGenerator for metrics about seats occupied by all phases of execution + ExecSeatsObsGenerator metrics.TimedObserverGenerator // QueueSetFactory for the queuing implementation QueueSetFactory fq.QueueSetFactory @@ -144,12 +150,11 @@ func NewTestable(config TestableConfig) Interface { } func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest, - noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn, execFn func()) { - fs, pl, isExempt, req, startWaitingTime, flowDistinguisher := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn) + fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, workEstimator, queueNoteFn) queued := startWaitingTime != time.Time{} - noteFn(fs, pl, flowDistinguisher) if req == nil { if queued { metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index 2550666a051..c14b74d70ee 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -105,7 +105,7 @@ type ctlrTestRequest struct { descr1, descr2 interface{} } -func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, ip metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { +func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.TimedObserverPair, eso metrics.TimedObserver) (fq.QueueSetCompleter, error) { return ctlrTestQueueSetCompleter{cts, nil, qc}, nil } @@ -261,7 +261,8 @@ func TestConfigConsumer(t *testing.T) { FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 100, // server concurrency limit RequestWaitLimit: time.Minute, // request wait limit - ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, QueueSetFactory: cts, }) cts.cfgCtlr = ctlr @@ -392,7 +393,8 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) { FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 100, RequestWaitLimit: time.Minute, - ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, QueueSetFactory: cts, }) @@ -460,7 +462,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes startWG.Add(1) go func(matches, isResource bool, rdu RequestDigest) { expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name) - ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) { + ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) fcrequest.WorkEstimate { matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt if testDebugLogs { t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt) @@ -473,6 +475,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name) } } + return fcrequest.WorkEstimate{InitialSeats: 1} }, func(inQueue bool) { }, func() { startWG.Done() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index add5d4cd185..23215d084a3 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -31,8 +31,11 @@ import ( // are separated so that errors from the first phase can be found // before committing to a concurrency allotment for the second. type QueueSetFactory interface { - // BeginConstruction does the first phase of creating a QueueSet - BeginConstruction(QueuingConfig, metrics.TimedObserverPair) (QueueSetCompleter, error) + // BeginConstruction does the first phase of creating a QueueSet. + // The TimedObserverPair observes number of requests, + // execution covering just the regular phase. + // The TimedObserver observes number of seats occupied through all phases of execution. + BeginConstruction(QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (QueueSetCompleter, error) } // QueueSetCompleter finishes the two-step process of creating or diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index f1bc635b060..0f3a8138b88 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -60,11 +60,12 @@ type promiseFactoryFactory func(*queueSet) promiseFactory // `*queueSetCompleter` implements QueueSetCompleter. Exactly one of // the fields `factory` and `theSet` is non-nil. type queueSetCompleter struct { - factory *queueSetFactory - obsPair metrics.TimedObserverPair - theSet *queueSet - qCfg fq.QueuingConfig - dealer *shufflesharding.Dealer + factory *queueSetFactory + reqsObsPair metrics.TimedObserverPair + execSeatsObs metrics.TimedObserver + theSet *queueSet + qCfg fq.QueuingConfig + dealer *shufflesharding.Dealer } // queueSet implements the Fair Queuing for Server Requests technique @@ -79,7 +80,10 @@ type queueSetCompleter struct { type queueSet struct { clock eventclock.Interface estimatedServiceDuration time.Duration - obsPair metrics.TimedObserverPair + + reqsObsPair metrics.TimedObserverPair // .RequestsExecuting covers regular phase only + + execSeatsObs metrics.TimedObserver // for all phases of execution promiseFactory promiseFactory @@ -144,16 +148,17 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr } } -func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, obsPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { +func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) { dealer, err := checkConfig(qCfg) if err != nil { return nil, err } return &queueSetCompleter{ - factory: qsf, - obsPair: obsPair, - qCfg: qCfg, - dealer: dealer}, nil + factory: qsf, + reqsObsPair: reqsObsPair, + execSeatsObs: execSeatsObs, + qCfg: qCfg, + dealer: dealer}, nil } // checkConfig returns a non-nil Dealer if the config is valid and @@ -176,7 +181,8 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { qs = &queueSet{ clock: qsc.factory.clock, estimatedServiceDuration: 3 * time.Millisecond, - obsPair: qsc.obsPair, + reqsObsPair: qsc.reqsObsPair, + execSeatsObs: qsc.execSeatsObs, qCfg: qsc.qCfg, currentR: 0, lastRealTime: qsc.factory.clock.Now(), @@ -237,8 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, if qll < 1 { qll = 1 } - qs.obsPair.RequestsWaiting.SetX1(float64(qll)) - qs.obsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit)) + qs.reqsObsPair.RequestsWaiting.SetX1(float64(qll)) + qs.reqsObsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit)) + qs.execSeatsObs.SetX1(float64(dCfg.ConcurrencyLimit)) qs.dispatchAsMuchAsPossibleLocked() } @@ -391,7 +398,7 @@ func (req *request) wait() (bool, bool) { metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) - qs.obsPair.RequestsWaiting.Add(-1) + qs.reqsObsPair.RequestsWaiting.Add(-1) } return false, qs.isIdleLocked() } @@ -602,7 +609,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s // remove timed out requests from queue if timeoutCount > 0 { qs.totRequestsWaiting -= timeoutCount - qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount)) + qs.reqsObsPair.RequestsWaiting.Add(float64(-timeoutCount)) } } @@ -638,7 +645,7 @@ func (qs *queueSet) enqueueLocked(request *request) { qs.totRequestsWaiting++ metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) request.NoteQueued(true) - qs.obsPair.RequestsWaiting.Add(1) + qs.reqsObsPair.RequestsWaiting.Add(1) } // dispatchAsMuchAsPossibleLocked does as many dispatches as possible now. @@ -667,7 +674,8 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f qs.totSeatsInUse += req.MaxSeats() metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats()) - qs.obsPair.RequestsExecuting.Add(1) + qs.reqsObsPair.RequestsExecuting.Add(1) + qs.execSeatsObs.Add(float64(req.MaxSeats())) if klog.V(5).Enabled() { klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting) } @@ -690,7 +698,7 @@ func (qs *queueSet) dispatchLocked() bool { qs.totRequestsWaiting-- metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) request.NoteQueued(false) - qs.obsPair.RequestsWaiting.Add(-1) + qs.reqsObsPair.RequestsWaiting.Add(-1) defer qs.boundNextDispatchLocked(queue) if !request.decision.Set(decisionExecute) { return true @@ -707,7 +715,8 @@ func (qs *queueSet) dispatchLocked() bool { queue.seatsInUse += request.MaxSeats() metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats()) - qs.obsPair.RequestsExecuting.Add(1) + qs.reqsObsPair.RequestsExecuting.Add(1) + qs.execSeatsObs.Add(float64(request.MaxSeats())) if klog.V(6).Enabled() { klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2, @@ -848,7 +857,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { now := qs.clock.Now() qs.totRequestsExecuting-- metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) - qs.obsPair.RequestsExecuting.Add(-1) + qs.reqsObsPair.RequestsExecuting.Add(-1) actualServiceDuration := now.Sub(r.startTime) @@ -860,6 +869,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { qs.totSeatsInUse -= r.MaxSeats() metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats()) + qs.execSeatsObs.Add(-float64(r.MaxSeats())) if r.queue != nil { r.queue.seatsInUse -= r.MaxSeats() } @@ -973,8 +983,9 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue { } func (qs *queueSet) UpdateObservations() { - qs.obsPair.RequestsWaiting.Add(0) - qs.obsPair.RequestsExecuting.Add(0) + qs.reqsObsPair.RequestsWaiting.Add(0) + qs.reqsObsPair.RequestsExecuting.Add(0) + qs.execSeatsObs.Add(0) } func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 9da21f1e34a..8679915f1f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -445,7 +445,7 @@ func TestNoRestraint(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk)) + nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -481,7 +481,7 @@ func TestBaseline(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -550,7 +550,7 @@ func TestSeparations(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -589,7 +589,7 @@ func TestUniformFlowsHandSize1(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -626,7 +626,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -662,7 +662,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -702,7 +702,7 @@ func TestSeatSecondsRollover(t *testing.T) { HandSize: 1, RequestWaitLimit: 40 * Quarter, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -740,7 +740,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -777,7 +777,7 @@ func TestDifferentWidths(t *testing.T) { HandSize: 7, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -813,7 +813,7 @@ func TestTooWide(t *testing.T) { HandSize: 7, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -874,7 +874,7 @@ func TestWindup(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -909,7 +909,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { Name: "TestDifferentFlowsWithoutQueuing", DesiredNumQueues: 0, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -945,7 +945,7 @@ func TestTimeout(t *testing.T) { HandSize: 1, RequestWaitLimit: 0, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -996,7 +996,7 @@ func TestContextCancel(t *testing.T) { HandSize: 1, RequestWaitLimit: 15 * time.Second, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -1102,7 +1102,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { DesiredNumQueues: 0, RequestWaitLimit: 15 * time.Second, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -1357,7 +1357,8 @@ func TestFinishRequestLocked(t *testing.T) { qs := &queueSet{ clock: clk, estimatedServiceDuration: time.Second, - obsPair: newObserverPair(clk), + reqsObsPair: newObserverPair(clk), + execSeatsObs: newExecSeatsObserver(clk), } queue := &queue{ requests: newRequestFIFO(), @@ -1463,3 +1464,7 @@ func newFIFO(requests ...*request) fifo { func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair { return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}) } + +func newExecSeatsObserver(clk clock.PassiveClock) metrics.TimedObserver { + return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, []string{"test"}) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 09bb89d1838..1dab3681146 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -40,7 +40,7 @@ type noRestraint struct{} type noRestraintRequest struct{} -func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { +func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (fq.QueueSetCompleter, error) { return noRestraintCompleter{}, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go index 710044f06ba..709b19fb9a0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go @@ -56,7 +56,8 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration HandSize: hs, QueueLengthLimit: 5} } - _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})) + labelVals := []string{"test"} + _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, labelVals), metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, labelVals)) if err != nil { panic(err) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go index d9a720a4bd6..d02e65f79a7 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go @@ -27,7 +27,6 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" - fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" ) func TestMatching(t *testing.T) { @@ -102,8 +101,7 @@ func TestLiterals(t *testing.T) { Name: "eman", Parts: []string{"goodrscs", "eman"}, }, - User: ui, - WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1}, + User: ui, } reqRU := RequestDigest{ RequestInfo: &request.RequestInfo{ @@ -118,8 +116,7 @@ func TestLiterals(t *testing.T) { Name: "eman", Parts: []string{"goodrscs", "eman"}, }, - User: ui, - WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1}, + User: ui, } reqN := RequestDigest{ RequestInfo: &request.RequestInfo{ @@ -127,8 +124,7 @@ func TestLiterals(t *testing.T) { Path: "/openapi/v2", Verb: "goodverb", }, - User: ui, - WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1}, + User: ui, } checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{ Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index 3ad52c22e3c..88f812490ba 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -104,6 +104,28 @@ var ( }, []string{priorityLevel, flowSchema}, ) + // PriorityLevelExecutionSeatsObserverGenerator creates observers of seats occupied throughout execution for priority levels + PriorityLevelExecutionSeatsObserverGenerator = NewSampleAndWaterMarkHistogramsGenerator(clock.RealClock{}, time.Millisecond, + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "priority_level_seat_count_samples", + Help: "Periodic observations of the number of requests", + Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1}, + ConstLabels: map[string]string{phase: "executing"}, + StabilityLevel: compbasemetrics.ALPHA, + }, + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "priority_level_seat_count_watermarks", + Help: "Watermarks of the number of requests", + Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1}, + ConstLabels: map[string]string{phase: "executing"}, + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel}, + ) // PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond, &compbasemetrics.HistogramOpts{ @@ -267,6 +289,17 @@ var ( }, []string{priorityLevel, flowSchema}, ) + watchCountSamples = compbasemetrics.NewHistogramVec( + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "watch_count_samples", + Help: "count of watchers for mutating requests in API Priority and Fairness", + Buckets: []float64{0, 1, 10, 100, 1000, 10000}, + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel, flowSchema}, + ) apiserverEpochAdvances = compbasemetrics.NewCounterVec( &compbasemetrics.CounterOpts{ Namespace: namespace, @@ -293,8 +326,10 @@ var ( apiserverCurrentExecutingRequests, apiserverRequestWaitingSeconds, apiserverRequestExecutionSeconds, + watchCountSamples, apiserverEpochAdvances, }. + Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...). Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...). Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...) ) @@ -360,6 +395,12 @@ func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema str apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) } +// ObserveWatchCount notes a sampling of a watch count +func ObserveWatchCount(ctx context.Context, priorityLevel, flowSchema string, count int) { + watchCountSamples.WithLabelValues(priorityLevel, flowSchema).Observe(float64(count)) +} + +// AddEpochAdvance notes an advance of the progress meter baseline for a given priority level func AddEpochAdvance(ctx context.Context, priorityLevel string, success bool) { apiserverEpochAdvances.WithContext(ctx).WithLabelValues(priorityLevel, strconv.FormatBool(success)).Inc() } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go index addf2fd4cd1..dc12cb71472 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go @@ -79,7 +79,7 @@ type sampleAndWaterMarkObserverGenerator struct { waterMarks *compbasemetrics.HistogramVec } -var _ TimedObserverGenerator = (*sampleAndWaterMarkObserverGenerator)(nil) +var _ TimedObserverGenerator = SampleAndWaterMarkObserverGenerator{} // NewSampleAndWaterMarkHistogramsGenerator makes a new one func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 6cdced290f0..7fcc0903e83 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -40,7 +40,7 @@ type listWorkEstimator struct { countGetterFn objectCountGetterFunc } -func (e *listWorkEstimator) estimate(r *http.Request) WorkEstimate { +func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) if !ok { // no RequestInfo should never happen, but to be on the safe side diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go index ff0dd357149..1c6c441e27c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go @@ -22,6 +22,7 @@ import ( "time" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" ) const ( @@ -49,7 +50,9 @@ type mutatingWorkEstimator struct { enabled bool } -func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { +func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { + // TODO(wojtekt): Remove once we tune the algorithm to not fail + // scalability tests. if !e.enabled { return WorkEstimate{ InitialSeats: 1, @@ -67,6 +70,7 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { } } watchCount := e.countFn(requestInfo) + metrics.ObserveWatchCount(r.Context(), priorityLevelName, flowSchemaName, watchCount) // The cost of the request associated with the watchers of that event // consists of three parts: diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index b2c6d860a7b..675433c2c31 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -83,10 +83,10 @@ func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCou // WorkEstimatorFunc returns the estimated work of a given request. // This function will be used by the Priority & Fairness filter to // estimate the work of of incoming requests. -type WorkEstimatorFunc func(*http.Request) WorkEstimate +type WorkEstimatorFunc func(request *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate -func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate { - return e(r) +func (e WorkEstimatorFunc) EstimateWork(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { + return e(r, flowSchemaName, priorityLevelName) } type workEstimator struct { @@ -96,7 +96,7 @@ type workEstimator struct { mutatingWorkEstimator WorkEstimatorFunc } -func (e *workEstimator) estimate(r *http.Request) WorkEstimate { +func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate { requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) if !ok { klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI) @@ -106,9 +106,9 @@ func (e *workEstimator) estimate(r *http.Request) WorkEstimate { switch requestInfo.Verb { case "list": - return e.listWorkEstimator.EstimateWork(r) + return e.listWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName) case "create", "update", "patch", "delete": - return e.mutatingWorkEstimator.EstimateWork(r) + return e.mutatingWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName) } return WorkEstimate{InitialSeats: minimumSeats} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go index caf2e53b739..ee27fff42fc 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go @@ -410,7 +410,7 @@ func TestWorkEstimator(t *testing.T) { req = req.WithContext(apirequest.WithRequestInfo(req.Context(), test.requestInfo)) } - workestimateGot := estimator.EstimateWork(req) + workestimateGot := estimator.EstimateWork(req, "testFS", "testPL") if test.initialSeatsExpected != workestimateGot.InitialSeats { t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats) } diff --git a/test/integration/apiserver/flowcontrol/fight_test.go b/test/integration/apiserver/flowcontrol/fight_test.go index 509f471e0c2..08341d70dcd 100644 --- a/test/integration/apiserver/flowcontrol/fight_test.go +++ b/test/integration/apiserver/flowcontrol/fight_test.go @@ -139,7 +139,8 @@ func (ft *fightTest) createController(invert bool, i int) { FlowcontrolClient: fcIfc, ServerConcurrencyLimit: 200, // server concurrency limit RequestWaitLimit: time.Minute / 4, // request wait limit - ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, QueueSetFactory: fqtesting.NewNoRestraintFactory(), }) ft.ctlrs[invert][i] = ctlr