diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 860636e16fd..8039ddb63ab 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -100,10 +100,11 @@ type RequestDigest struct { // this type and cfgMeal follow the convention that the suffix // "Locked" means that the caller must hold the configController lock. type configController struct { - name string // varies in tests of fighting controllers - clock clock.PassiveClock - queueSetFactory fq.QueueSetFactory - obsPairGenerator metrics.TimedObserverPairGenerator + name string // varies in tests of fighting controllers + clock clock.PassiveClock + queueSetFactory fq.QueueSetFactory + reqsObsPairGenerator metrics.TimedObserverPairGenerator + execSeatsObsGenerator metrics.TimedObserverGenerator // How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager asFieldManager string @@ -192,8 +193,11 @@ type priorityLevelState struct { // returned StartFunction numPending int - // Observers tracking number waiting, executing - obsPair metrics.TimedObserverPair + // Observers tracking number of requests waiting, executing + reqsObsPair metrics.TimedObserverPair + + // Observer of number of seats occupied throughout execution + execSeatsObs metrics.TimedObserver } // NewTestableController is extra flexible to facilitate testing @@ -202,7 +206,8 @@ func newTestableController(config TestableConfig) *configController { name: config.Name, clock: config.Clock, queueSetFactory: config.QueueSetFactory, - obsPairGenerator: config.ObsPairGenerator, + reqsObsPairGenerator: config.ReqsObsPairGenerator, + execSeatsObsGenerator: config.ExecSeatsObsGenerator, asFieldManager: config.AsFieldManager, foundToDangling: config.FoundToDangling, serverConcurrencyLimit: config.ServerConcurrencyLimit, @@ -534,9 +539,10 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi for _, pl := range newPLs { state := meal.cfgCtlr.priorityLevelStates[pl.Name] if state == nil { - state = &priorityLevelState{obsPair: meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{pl.Name})} + labelValues := []string{pl.Name} + state = &priorityLevelState{reqsObsPair: meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsObsGenerator.Generate(1, 1, labelValues)} } - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.obsPair) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsObsPair, state.execSeatsObs) if err != nil { klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) continue @@ -639,7 +645,7 @@ func (meal *cfgMeal) processOldPLsLocked() { } } var err error - plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.obsPair) + plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsObsPair, plState.execSeatsObs) if err != nil { // This can not happen because queueSetCompleterForPL already approved this config panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) @@ -688,7 +694,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { // given priority level configuration. Returns nil if that config // does not call for limiting. Returns nil and an error if the given // object is malformed in a way that is a problem for this package. -func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { +func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) { if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { return nil, errors.New("broken union structure at the top") } @@ -717,7 +723,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow if queues != nil { qsc, err = queues.BeginConfigChange(qcQS) } else { - qsc, err = qsf.BeginConstruction(qcQS, intPair) + qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs) } if err != nil { err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err) @@ -762,17 +768,20 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl // that does not actually exist (right now) as a real API object. func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) { klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name) - obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name}) - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair) + labelValues := []string{proto.Name} + reqsObsPair := meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues) + execSeatsObs := meal.cfgCtlr.execSeatsObsGenerator.Generate(1, 1, labelValues) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsObsPair, execSeatsObs) if err != nil { // This can not happen because proto is one of the mandatory // objects and these are not erroneous panic(err) } meal.newPLStates[proto.Name] = &priorityLevelState{ - pl: proto, - qsCompleter: qsCompleter, - obsPair: obsPair, + pl: proto, + qsCompleter: qsCompleter, + reqsObsPair: reqsObsPair, + execSeatsObs: execSeatsObs, } if proto.Spec.Limited != nil { meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index ae26253d905..123329ef104 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -92,7 +92,8 @@ func New( FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: serverConcurrencyLimit, RequestWaitLimit: requestWaitLimit, - ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, QueueSetFactory: fqs.NewQueueSetFactory(clk), }) } @@ -131,8 +132,11 @@ type TestableConfig struct { // RequestWaitLimit configured on the server RequestWaitLimit time.Duration - // ObsPairGenerator for metrics - ObsPairGenerator metrics.TimedObserverPairGenerator + // ObsPairGenerator for metrics about requests + ReqsObsPairGenerator metrics.TimedObserverPairGenerator + + // TimedObserverPairGenerator for metrics about seats occupied by all phases of execution + ExecSeatsObsGenerator metrics.TimedObserverGenerator // QueueSetFactory for the queuing implementation QueueSetFactory fq.QueueSetFactory diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index 2550666a051..7f7f9042de4 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -105,7 +105,7 @@ type ctlrTestRequest struct { descr1, descr2 interface{} } -func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, ip metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { +func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.TimedObserverPair, eso metrics.TimedObserver) (fq.QueueSetCompleter, error) { return ctlrTestQueueSetCompleter{cts, nil, qc}, nil } @@ -261,7 +261,8 @@ func TestConfigConsumer(t *testing.T) { FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 100, // server concurrency limit RequestWaitLimit: time.Minute, // request wait limit - ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, QueueSetFactory: cts, }) cts.cfgCtlr = ctlr @@ -392,7 +393,8 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) { FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 100, RequestWaitLimit: time.Minute, - ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, QueueSetFactory: cts, }) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index add5d4cd185..23215d084a3 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -31,8 +31,11 @@ import ( // are separated so that errors from the first phase can be found // before committing to a concurrency allotment for the second. type QueueSetFactory interface { - // BeginConstruction does the first phase of creating a QueueSet - BeginConstruction(QueuingConfig, metrics.TimedObserverPair) (QueueSetCompleter, error) + // BeginConstruction does the first phase of creating a QueueSet. + // The TimedObserverPair observes number of requests, + // execution covering just the regular phase. + // The TimedObserver observes number of seats occupied through all phases of execution. + BeginConstruction(QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (QueueSetCompleter, error) } // QueueSetCompleter finishes the two-step process of creating or diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index f1bc635b060..0f3a8138b88 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -60,11 +60,12 @@ type promiseFactoryFactory func(*queueSet) promiseFactory // `*queueSetCompleter` implements QueueSetCompleter. Exactly one of // the fields `factory` and `theSet` is non-nil. type queueSetCompleter struct { - factory *queueSetFactory - obsPair metrics.TimedObserverPair - theSet *queueSet - qCfg fq.QueuingConfig - dealer *shufflesharding.Dealer + factory *queueSetFactory + reqsObsPair metrics.TimedObserverPair + execSeatsObs metrics.TimedObserver + theSet *queueSet + qCfg fq.QueuingConfig + dealer *shufflesharding.Dealer } // queueSet implements the Fair Queuing for Server Requests technique @@ -79,7 +80,10 @@ type queueSetCompleter struct { type queueSet struct { clock eventclock.Interface estimatedServiceDuration time.Duration - obsPair metrics.TimedObserverPair + + reqsObsPair metrics.TimedObserverPair // .RequestsExecuting covers regular phase only + + execSeatsObs metrics.TimedObserver // for all phases of execution promiseFactory promiseFactory @@ -144,16 +148,17 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr } } -func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, obsPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { +func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) { dealer, err := checkConfig(qCfg) if err != nil { return nil, err } return &queueSetCompleter{ - factory: qsf, - obsPair: obsPair, - qCfg: qCfg, - dealer: dealer}, nil + factory: qsf, + reqsObsPair: reqsObsPair, + execSeatsObs: execSeatsObs, + qCfg: qCfg, + dealer: dealer}, nil } // checkConfig returns a non-nil Dealer if the config is valid and @@ -176,7 +181,8 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { qs = &queueSet{ clock: qsc.factory.clock, estimatedServiceDuration: 3 * time.Millisecond, - obsPair: qsc.obsPair, + reqsObsPair: qsc.reqsObsPair, + execSeatsObs: qsc.execSeatsObs, qCfg: qsc.qCfg, currentR: 0, lastRealTime: qsc.factory.clock.Now(), @@ -237,8 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, if qll < 1 { qll = 1 } - qs.obsPair.RequestsWaiting.SetX1(float64(qll)) - qs.obsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit)) + qs.reqsObsPair.RequestsWaiting.SetX1(float64(qll)) + qs.reqsObsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit)) + qs.execSeatsObs.SetX1(float64(dCfg.ConcurrencyLimit)) qs.dispatchAsMuchAsPossibleLocked() } @@ -391,7 +398,7 @@ func (req *request) wait() (bool, bool) { metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) - qs.obsPair.RequestsWaiting.Add(-1) + qs.reqsObsPair.RequestsWaiting.Add(-1) } return false, qs.isIdleLocked() } @@ -602,7 +609,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s // remove timed out requests from queue if timeoutCount > 0 { qs.totRequestsWaiting -= timeoutCount - qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount)) + qs.reqsObsPair.RequestsWaiting.Add(float64(-timeoutCount)) } } @@ -638,7 +645,7 @@ func (qs *queueSet) enqueueLocked(request *request) { qs.totRequestsWaiting++ metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) request.NoteQueued(true) - qs.obsPair.RequestsWaiting.Add(1) + qs.reqsObsPair.RequestsWaiting.Add(1) } // dispatchAsMuchAsPossibleLocked does as many dispatches as possible now. @@ -667,7 +674,8 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f qs.totSeatsInUse += req.MaxSeats() metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats()) - qs.obsPair.RequestsExecuting.Add(1) + qs.reqsObsPair.RequestsExecuting.Add(1) + qs.execSeatsObs.Add(float64(req.MaxSeats())) if klog.V(5).Enabled() { klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting) } @@ -690,7 +698,7 @@ func (qs *queueSet) dispatchLocked() bool { qs.totRequestsWaiting-- metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) request.NoteQueued(false) - qs.obsPair.RequestsWaiting.Add(-1) + qs.reqsObsPair.RequestsWaiting.Add(-1) defer qs.boundNextDispatchLocked(queue) if !request.decision.Set(decisionExecute) { return true @@ -707,7 +715,8 @@ func (qs *queueSet) dispatchLocked() bool { queue.seatsInUse += request.MaxSeats() metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats()) - qs.obsPair.RequestsExecuting.Add(1) + qs.reqsObsPair.RequestsExecuting.Add(1) + qs.execSeatsObs.Add(float64(request.MaxSeats())) if klog.V(6).Enabled() { klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2, @@ -848,7 +857,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { now := qs.clock.Now() qs.totRequestsExecuting-- metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) - qs.obsPair.RequestsExecuting.Add(-1) + qs.reqsObsPair.RequestsExecuting.Add(-1) actualServiceDuration := now.Sub(r.startTime) @@ -860,6 +869,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { qs.totSeatsInUse -= r.MaxSeats() metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats()) + qs.execSeatsObs.Add(-float64(r.MaxSeats())) if r.queue != nil { r.queue.seatsInUse -= r.MaxSeats() } @@ -973,8 +983,9 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue { } func (qs *queueSet) UpdateObservations() { - qs.obsPair.RequestsWaiting.Add(0) - qs.obsPair.RequestsExecuting.Add(0) + qs.reqsObsPair.RequestsWaiting.Add(0) + qs.reqsObsPair.RequestsExecuting.Add(0) + qs.execSeatsObs.Add(0) } func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 9da21f1e34a..8679915f1f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -445,7 +445,7 @@ func TestNoRestraint(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk)) + nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -481,7 +481,7 @@ func TestBaseline(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -550,7 +550,7 @@ func TestSeparations(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -589,7 +589,7 @@ func TestUniformFlowsHandSize1(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -626,7 +626,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -662,7 +662,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -702,7 +702,7 @@ func TestSeatSecondsRollover(t *testing.T) { HandSize: 1, RequestWaitLimit: 40 * Quarter, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -740,7 +740,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -777,7 +777,7 @@ func TestDifferentWidths(t *testing.T) { HandSize: 7, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -813,7 +813,7 @@ func TestTooWide(t *testing.T) { HandSize: 7, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -874,7 +874,7 @@ func TestWindup(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -909,7 +909,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { Name: "TestDifferentFlowsWithoutQueuing", DesiredNumQueues: 0, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -945,7 +945,7 @@ func TestTimeout(t *testing.T) { HandSize: 1, RequestWaitLimit: 0, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -996,7 +996,7 @@ func TestContextCancel(t *testing.T) { HandSize: 1, RequestWaitLimit: 15 * time.Second, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -1102,7 +1102,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { DesiredNumQueues: 0, RequestWaitLimit: 15 * time.Second, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) if err != nil { t.Fatal(err) } @@ -1357,7 +1357,8 @@ func TestFinishRequestLocked(t *testing.T) { qs := &queueSet{ clock: clk, estimatedServiceDuration: time.Second, - obsPair: newObserverPair(clk), + reqsObsPair: newObserverPair(clk), + execSeatsObs: newExecSeatsObserver(clk), } queue := &queue{ requests: newRequestFIFO(), @@ -1463,3 +1464,7 @@ func newFIFO(requests ...*request) fifo { func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair { return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}) } + +func newExecSeatsObserver(clk clock.PassiveClock) metrics.TimedObserver { + return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, []string{"test"}) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 09bb89d1838..1dab3681146 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -40,7 +40,7 @@ type noRestraint struct{} type noRestraintRequest struct{} -func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { +func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (fq.QueueSetCompleter, error) { return noRestraintCompleter{}, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go index 710044f06ba..709b19fb9a0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go @@ -56,7 +56,8 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration HandSize: hs, QueueLengthLimit: 5} } - _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})) + labelVals := []string{"test"} + _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, labelVals), metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, labelVals)) if err != nil { panic(err) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index 3ad52c22e3c..5783f24b1d0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -104,6 +104,28 @@ var ( }, []string{priorityLevel, flowSchema}, ) + // PriorityLevelExecutionSeatsObserverGenerator creates observers of seats occupied throughout execution for priority levels + PriorityLevelExecutionSeatsObserverGenerator = NewSampleAndWaterMarkHistogramsGenerator(clock.RealClock{}, time.Millisecond, + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "priority_level_seat_count_samples", + Help: "Periodic observations of the number of requests", + Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1}, + ConstLabels: map[string]string{phase: "executing"}, + StabilityLevel: compbasemetrics.ALPHA, + }, + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "priority_level_seat_count_watermarks", + Help: "Watermarks of the number of requests", + Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1}, + ConstLabels: map[string]string{phase: "executing"}, + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel}, + ) // PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond, &compbasemetrics.HistogramOpts{ @@ -295,6 +317,7 @@ var ( apiserverRequestExecutionSeconds, apiserverEpochAdvances, }. + Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...). Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...). Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...) ) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go index addf2fd4cd1..dc12cb71472 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go @@ -79,7 +79,7 @@ type sampleAndWaterMarkObserverGenerator struct { waterMarks *compbasemetrics.HistogramVec } -var _ TimedObserverGenerator = (*sampleAndWaterMarkObserverGenerator)(nil) +var _ TimedObserverGenerator = SampleAndWaterMarkObserverGenerator{} // NewSampleAndWaterMarkHistogramsGenerator makes a new one func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator {