diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index 6f03b09f60e..d84bc4b8a74 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -61,13 +61,13 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) { // requestWatermark is used to track maximal numbers of requests in a particular phase of handling type requestWatermark struct { phase string - readOnlyObserver, mutatingObserver fcmetrics.RatioedChangeObserver + readOnlyObserver, mutatingObserver fcmetrics.RatioedGauge lock sync.Mutex readOnlyWatermark, mutatingWatermark int } func (w *requestWatermark) recordMutating(mutatingVal int) { - w.mutatingObserver.Observe(float64(mutatingVal)) + w.mutatingObserver.Set(float64(mutatingVal)) w.lock.Lock() defer w.lock.Unlock() @@ -78,7 +78,7 @@ func (w *requestWatermark) recordMutating(mutatingVal int) { } func (w *requestWatermark) recordReadOnly(readOnlyVal int) { - w.readOnlyObserver.Observe(float64(readOnlyVal)) + w.readOnlyObserver.Set(float64(readOnlyVal)) w.lock.Lock() defer w.lock.Unlock() @@ -91,8 +91,8 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) { // watermark tracks requests being executed (not waiting in a queue) var watermark = &requestWatermark{ phase: metrics.ExecutingPhase, - readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting, - mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting, + readOnlyObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting, + mutatingObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{metrics.MutatingKind}).RequestsExecuting, } // startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark. diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 66e4007d30f..3dcd827890f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -47,8 +47,8 @@ type PriorityAndFairnessClassification struct { // waitingMark tracks requests waiting rather than being executed var waitingMark = &requestWatermark{ phase: epmetrics.WaitingPhase, - readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting, - mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting, + readOnlyObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting, + mutatingObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting, } var atomicMutatingExecuting, atomicReadOnlyExecuting int32 diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 63a4d7962bc..2a4b8987bd7 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -99,11 +99,11 @@ type RequestDigest struct { // this type and cfgMeal follow the convention that the suffix // "Locked" means that the caller must hold the configController lock. type configController struct { - name string // varies in tests of fighting controllers - clock clock.PassiveClock - queueSetFactory fq.QueueSetFactory - reqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator - execSeatsObsGenerator metrics.RatioedChangeObserverGenerator + name string // varies in tests of fighting controllers + clock clock.PassiveClock + queueSetFactory fq.QueueSetFactory + reqsGaugePairVec metrics.RatioedGaugePairVec + execSeatsGaugeVec metrics.RatioedGaugeVec // How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager asFieldManager string @@ -193,10 +193,10 @@ type priorityLevelState struct { numPending int // Observers tracking number of requests waiting, executing - reqsObsPair metrics.RatioedChangeObserverPair + reqsGaugePair metrics.RatioedGaugePair // Observer of number of seats occupied throughout execution - execSeatsObs metrics.RatioedChangeObserver + execSeatsObs metrics.RatioedGauge } // NewTestableController is extra flexible to facilitate testing @@ -205,8 +205,8 @@ func newTestableController(config TestableConfig) *configController { name: config.Name, clock: config.Clock, queueSetFactory: config.QueueSetFactory, - reqsObsPairGenerator: config.ReqsObsPairGenerator, - execSeatsObsGenerator: config.ExecSeatsObsGenerator, + reqsGaugePairVec: config.ReqsGaugePairVec, + execSeatsGaugeVec: config.ExecSeatsGaugeVec, asFieldManager: config.AsFieldManager, foundToDangling: config.FoundToDangling, serverConcurrencyLimit: config.ServerConcurrencyLimit, @@ -292,7 +292,7 @@ func newTestableController(config TestableConfig) *configController { } // MaintainObservations keeps the observers from -// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling +// metrics.PriorityLevelConcurrencyPairVec from falling // too far behind func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) { wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh) @@ -539,9 +539,9 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi state := meal.cfgCtlr.priorityLevelStates[pl.Name] if state == nil { labelValues := []string{pl.Name} - state = &priorityLevelState{reqsObsPair: meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsObsGenerator.Generate(0, 1, labelValues)} + state = &priorityLevelState{reqsGaugePair: meal.cfgCtlr.reqsGaugePairVec.NewForLabelValuesSafe(1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)} } - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsObsPair, state.execSeatsObs) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs) if err != nil { klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) continue @@ -645,7 +645,7 @@ func (meal *cfgMeal) processOldPLsLocked() { } } var err error - plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsObsPair, plState.execSeatsObs) + plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs) if err != nil { // This can not happen because queueSetCompleterForPL already approved this config panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) @@ -694,7 +694,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { // given priority level configuration. Returns nil if that config // does not call for limiting. Returns nil and an error if the given // object is malformed in a way that is a problem for this package. -func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { +func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge) (fq.QueueSetCompleter, error) { if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { return nil, errors.New("broken union structure at the top") } @@ -769,19 +769,19 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) { klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name) labelValues := []string{proto.Name} - reqsObsPair := meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues) - execSeatsObs := meal.cfgCtlr.execSeatsObsGenerator.Generate(0, 1, labelValues) - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsObsPair, execSeatsObs) + reqsGaugePair := meal.cfgCtlr.reqsGaugePairVec.NewForLabelValuesSafe(1, 1, labelValues) + execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs) if err != nil { // This can not happen because proto is one of the mandatory // objects and these are not erroneous panic(err) } meal.newPLStates[proto.Name] = &priorityLevelState{ - pl: proto, - qsCompleter: qsCompleter, - reqsObsPair: reqsObsPair, - execSeatsObs: execSeatsObs, + pl: proto, + qsCompleter: qsCompleter, + reqsGaugePair: reqsGaugePair, + execSeatsObs: execSeatsObs, } if proto.Spec.Limited != nil { meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index d8b17d27924..54add96ed0a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -100,8 +100,8 @@ func New( FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: serverConcurrencyLimit, RequestWaitLimit: requestWaitLimit, - ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, - ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, + ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec, + ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqs.NewQueueSetFactory(clk), }) } @@ -140,11 +140,11 @@ type TestableConfig struct { // RequestWaitLimit configured on the server RequestWaitLimit time.Duration - // ObsPairGenerator for metrics about requests - ReqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator + // GaugePairVec for metrics about requests + ReqsGaugePairVec metrics.RatioedGaugePairVec - // RatioedChangeObserverPairGenerator for metrics about seats occupied by all phases of execution - ExecSeatsObsGenerator metrics.RatioedChangeObserverGenerator + // RatioedGaugePairVec for metrics about seats occupied by all phases of execution + ExecSeatsGaugeVec metrics.RatioedGaugeVec // QueueSetFactory for the queuing implementation QueueSetFactory fq.QueueSetFactory diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index f3eedeb962e..aeece8cb4ea 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -105,7 +105,7 @@ type ctlrTestRequest struct { descr1, descr2 interface{} } -func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedChangeObserverPair, eso metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { +func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge) (fq.QueueSetCompleter, error) { return ctlrTestQueueSetCompleter{cts, nil, qc}, nil } @@ -261,8 +261,8 @@ func TestConfigConsumer(t *testing.T) { FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 100, // server concurrency limit RequestWaitLimit: time.Minute, // request wait limit - ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, - ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, + ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec, + ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: cts, }) cts.cfgCtlr = ctlr @@ -393,8 +393,8 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) { FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 100, RequestWaitLimit: time.Minute, - ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, - ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, + ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec, + ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: cts, }) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go index 800fa765fb6..07d6f3d376a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/utils/clock" ) @@ -30,7 +29,8 @@ import ( // Integrator is created, and ends at the latest operation on the // Integrator. type Integrator interface { - metrics.ChangeObserver + Set(float64) + Add(float64) GetResults() IntegratorResults @@ -69,7 +69,7 @@ func NewIntegrator(clock clock.PassiveClock) Integrator { } } -func (igr *integrator) Observe(x float64) { +func (igr *integrator) Set(x float64) { igr.Lock() igr.setLocked(x) igr.Unlock() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go index 698f9abe3d8..e377ea2da52 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go @@ -38,7 +38,7 @@ func TestIntegrator(t *testing.T) { if !results.Equal(&rToo) { t.Errorf("expected %#+v, got %#+v", results, rToo) } - igr.Observe(2) + igr.Set(2) results = igr.GetResults() if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) { t.Errorf("expected %#+v, got %#+v", e, results) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index cc772d8b7dc..165bfb9f385 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -32,10 +32,10 @@ import ( // before committing to a concurrency allotment for the second. type QueueSetFactory interface { // BeginConstruction does the first phase of creating a QueueSet. - // The RatioedChangeObserverPair observes number of requests, + // The RatioedGaugePair observes number of requests, // execution covering just the regular phase. - // The RatioedChangeObserver observes number of seats occupied through all phases of execution. - BeginConstruction(QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (QueueSetCompleter, error) + // The RatioedGauge observes number of seats occupied through all phases of execution. + BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (QueueSetCompleter, error) } // QueueSetCompleter finishes the two-step process of creating or diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index d3864d44bb3..171382a7c4a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -60,12 +60,12 @@ type promiseFactoryFactory func(*queueSet) promiseFactory // `*queueSetCompleter` implements QueueSetCompleter. Exactly one of // the fields `factory` and `theSet` is non-nil. type queueSetCompleter struct { - factory *queueSetFactory - reqsObsPair metrics.RatioedChangeObserverPair - execSeatsObs metrics.RatioedChangeObserver - theSet *queueSet - qCfg fq.QueuingConfig - dealer *shufflesharding.Dealer + factory *queueSetFactory + reqsGaugePair metrics.RatioedGaugePair + execSeatsGauge metrics.RatioedGauge + theSet *queueSet + qCfg fq.QueuingConfig + dealer *shufflesharding.Dealer } // queueSet implements the Fair Queuing for Server Requests technique @@ -81,9 +81,9 @@ type queueSet struct { clock eventclock.Interface estimatedServiceDuration time.Duration - reqsObsPair metrics.RatioedChangeObserverPair // .RequestsExecuting covers regular phase only + reqsGaugePair metrics.RatioedGaugePair // .RequestsExecuting covers regular phase only - execSeatsObs metrics.RatioedChangeObserver // for all phases of execution + execSeatsGauge metrics.RatioedGauge // for all phases of execution promiseFactory promiseFactory @@ -148,17 +148,17 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr } } -func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { +func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge) (fq.QueueSetCompleter, error) { dealer, err := checkConfig(qCfg) if err != nil { return nil, err } return &queueSetCompleter{ - factory: qsf, - reqsObsPair: reqsObsPair, - execSeatsObs: execSeatsObs, - qCfg: qCfg, - dealer: dealer}, nil + factory: qsf, + reqsGaugePair: reqsGaugePair, + execSeatsGauge: execSeatsGauge, + qCfg: qCfg, + dealer: dealer}, nil } // checkConfig returns a non-nil Dealer if the config is valid and @@ -181,8 +181,8 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { qs = &queueSet{ clock: qsc.factory.clock, estimatedServiceDuration: 3 * time.Millisecond, - reqsObsPair: qsc.reqsObsPair, - execSeatsObs: qsc.execSeatsObs, + reqsGaugePair: qsc.reqsGaugePair, + execSeatsGauge: qsc.execSeatsGauge, qCfg: qsc.qCfg, currentR: 0, lastRealTime: qsc.factory.clock.Now(), @@ -243,9 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, if qll < 1 { qll = 1 } - qs.reqsObsPair.RequestsWaiting.SetDenominator(float64(qll)) - qs.reqsObsPair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit)) - qs.execSeatsObs.SetDenominator(float64(dCfg.ConcurrencyLimit)) + qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll)) + qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit)) + qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit)) qs.dispatchAsMuchAsPossibleLocked() } @@ -398,7 +398,7 @@ func (req *request) wait() (bool, bool) { metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) - qs.reqsObsPair.RequestsWaiting.Add(-1) + qs.reqsGaugePair.RequestsWaiting.Add(-1) } return false, qs.isIdleLocked() } @@ -609,7 +609,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s // remove timed out requests from queue if timeoutCount > 0 { qs.totRequestsWaiting -= timeoutCount - qs.reqsObsPair.RequestsWaiting.Add(float64(-timeoutCount)) + qs.reqsGaugePair.RequestsWaiting.Add(float64(-timeoutCount)) } } @@ -646,7 +646,7 @@ func (qs *queueSet) enqueueLocked(request *request) { qs.totRequestsWaiting++ metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) request.NoteQueued(true) - qs.reqsObsPair.RequestsWaiting.Add(1) + qs.reqsGaugePair.RequestsWaiting.Add(1) } // dispatchAsMuchAsPossibleLocked does as many dispatches as possible now. @@ -675,8 +675,8 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f qs.totSeatsInUse += req.MaxSeats() metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats()) - qs.reqsObsPair.RequestsExecuting.Add(1) - qs.execSeatsObs.Add(float64(req.MaxSeats())) + qs.reqsGaugePair.RequestsExecuting.Add(1) + qs.execSeatsGauge.Add(float64(req.MaxSeats())) klogV := klog.V(5) if klogV.Enabled() { klogV.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting) @@ -700,7 +700,7 @@ func (qs *queueSet) dispatchLocked() bool { qs.totRequestsWaiting-- metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) request.NoteQueued(false) - qs.reqsObsPair.RequestsWaiting.Add(-1) + qs.reqsGaugePair.RequestsWaiting.Add(-1) defer qs.boundNextDispatchLocked(queue) if !request.decision.Set(decisionExecute) { return true @@ -717,8 +717,8 @@ func (qs *queueSet) dispatchLocked() bool { queue.seatsInUse += request.MaxSeats() metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats()) - qs.reqsObsPair.RequestsExecuting.Add(1) - qs.execSeatsObs.Add(float64(request.MaxSeats())) + qs.reqsGaugePair.RequestsExecuting.Add(1) + qs.execSeatsGauge.Add(float64(request.MaxSeats())) klogV := klog.V(6) if klogV.Enabled() { klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", @@ -862,7 +862,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { now := qs.clock.Now() qs.totRequestsExecuting-- metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) - qs.reqsObsPair.RequestsExecuting.Add(-1) + qs.reqsGaugePair.RequestsExecuting.Add(-1) actualServiceDuration := now.Sub(r.startTime) @@ -874,7 +874,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { qs.totSeatsInUse -= r.MaxSeats() metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats()) - qs.execSeatsObs.Add(-float64(r.MaxSeats())) + qs.execSeatsGauge.Add(-float64(r.MaxSeats())) if r.queue != nil { r.queue.seatsInUse -= r.MaxSeats() } @@ -989,9 +989,9 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue { } func (qs *queueSet) UpdateObservations() { - qs.reqsObsPair.RequestsWaiting.Add(0) - qs.reqsObsPair.RequestsExecuting.Add(0) - qs.execSeatsObs.Add(0) + qs.reqsGaugePair.RequestsWaiting.Add(0) + qs.reqsGaugePair.RequestsExecuting.Add(0) + qs.execSeatsGauge.Add(0) } func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 2de0b12cc63..5bbbdf3600b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -445,7 +445,7 @@ func TestNoRestraint(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk), newExecSeatsObserver(clk)) + nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -481,7 +481,7 @@ func TestBaseline(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -550,7 +550,7 @@ func TestSeparations(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -589,7 +589,7 @@ func TestUniformFlowsHandSize1(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -626,7 +626,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -662,7 +662,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -702,7 +702,7 @@ func TestSeatSecondsRollover(t *testing.T) { HandSize: 1, RequestWaitLimit: 40 * Quarter, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -740,7 +740,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -777,7 +777,7 @@ func TestDifferentWidths(t *testing.T) { HandSize: 7, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -813,7 +813,7 @@ func TestTooWide(t *testing.T) { HandSize: 7, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -874,7 +874,7 @@ func TestWindup(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -909,7 +909,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { Name: "TestDifferentFlowsWithoutQueuing", DesiredNumQueues: 0, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -945,7 +945,7 @@ func TestTimeout(t *testing.T) { HandSize: 1, RequestWaitLimit: 0, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -996,7 +996,7 @@ func TestContextCancel(t *testing.T) { HandSize: 1, RequestWaitLimit: 15 * time.Second, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -1102,7 +1102,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { DesiredNumQueues: 0, RequestWaitLimit: 15 * time.Second, } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) if err != nil { t.Fatal(err) } @@ -1357,8 +1357,8 @@ func TestFinishRequestLocked(t *testing.T) { qs := &queueSet{ clock: clk, estimatedServiceDuration: time.Second, - reqsObsPair: newObserverPair(clk), - execSeatsObs: newExecSeatsObserver(clk), + reqsGaugePair: newGaugePair(clk), + execSeatsGauge: newExecSeatsGauge(clk), } queue := &queue{ requests: newRequestFIFO(), @@ -1461,10 +1461,10 @@ func newFIFO(requests ...*request) fifo { return l } -func newObserverPair(clk clock.PassiveClock) metrics.RatioedChangeObserverPair { - return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}) +func newGaugePair(clk clock.PassiveClock) metrics.RatioedGaugePair { + return metrics.PriorityLevelConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{"test"}) } -func newExecSeatsObserver(clk clock.PassiveClock) metrics.RatioedChangeObserver { - return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(0, 1, []string{"test"}) +func newExecSeatsGauge(clk clock.PassiveClock) metrics.RatioedGauge { + return metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, []string{"test"}) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 86e507ecc76..b002141fdb6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -40,7 +40,7 @@ type noRestraint struct{} type noRestraintRequest struct{} -func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { +func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (fq.QueueSetCompleter, error) { return noRestraintCompleter{}, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go index fe0f8aa97a2..ee440336693 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go @@ -57,7 +57,7 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration QueueLengthLimit: 5} } labelVals := []string{"test"} - _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, labelVals), metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(0, 1, labelVals)) + _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyPairVec.NewForLabelValuesSafe(1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals)) if err != nil { panic(err) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/interface.go new file mode 100644 index 00000000000..aba79b5d77b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/interface.go @@ -0,0 +1,78 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +// Gauge is the methods of a gauge that are used by instrumented code. +type Gauge interface { + Set(float64) + Inc() + Dec() + Add(float64) + SetToCurrentTime() +} + +// RatioedGauge tracks ratios. +// The numerator is set/changed through the Gauge methods, +// and the denominator can be updated through the SetDenominator method. +// A ratio is tracked whenever the numerator or denominator is set/changed. +type RatioedGauge interface { + Gauge + + // SetDenominator sets the denominator to use until it is changed again + SetDenominator(float64) +} + +// RatioedGaugeVec creates related observers that are +// differentiated by a series of label values +type RatioedGaugeVec interface { + // NewForLabelValuesSafe makes a new vector member for the given tuple of label values, + // initialized with the given numerator and denominator. + // Unlike the usual Vec WithLabelValues method, this is intended to be called only + // once per vector member (at the start of its lifecycle). + // The "Safe" part is saying that the returned object will function properly after metric registration + // even if this method is called before registration. + NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge +} + +//////////////////////////////// Pairs //////////////////////////////// +// +// API Priority and Fairness tends to use RatioedGaugeVec members in pairs, +// one for requests waiting in a queue and one for requests being executed. +// The following definitions are a convenience layer that adds support for that +// particular pattern of usage. + +// RatioedGaugePair is a corresponding pair of gauges, one for the +// number of requests waiting in queue(s) and one for the number of +// requests being executed. +type RatioedGaugePair struct { + // RequestsWaiting is given observations of the number of currently queued requests + RequestsWaiting RatioedGauge + + // RequestsExecuting is given observations of the number of requests currently executing + RequestsExecuting RatioedGauge +} + +// RatioedGaugePairVec generates pairs +type RatioedGaugePairVec interface { + // NewForLabelValuesSafe makes a new vector member for the given tuple of label values, + // initialized with the given denominators and zeros for numerators. + // Unlike the usual Vec WithLabelValues method, this is intended to be called only + // once per vector member (at the start of its lifecycle). + // The "Safe" part is saying that the returned object will function properly after metric registration + // even if this method is called before registration. + NewForLabelValuesSafe(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedGaugePair +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index f52b569b2bb..319eea3dcde 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -105,8 +105,8 @@ var ( }, []string{priorityLevel, flowSchema}, ) - // PriorityLevelExecutionSeatsObserverGenerator creates observers of seats occupied throughout execution for priority levels - PriorityLevelExecutionSeatsObserverGenerator = NewSampleAndWaterMarkHistogramsGenerator(clock.RealClock{}, time.Millisecond, + // PriorityLevelExecutionSeatsGaugeVec creates observers of seats occupied throughout execution for priority levels + PriorityLevelExecutionSeatsGaugeVec = NewSampleAndWaterMarkHistogramsVec(clock.RealClock{}, time.Millisecond, &compbasemetrics.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -127,8 +127,8 @@ var ( }, []string{priorityLevel}, ) - // PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels - PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond, + // PriorityLevelConcurrencyPairVec creates pairs that observe concurrency for priority levels + PriorityLevelConcurrencyPairVec = NewSampleAndWaterMarkHistogramsPairVec(clock.RealClock{}, time.Millisecond, &compbasemetrics.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -147,8 +147,8 @@ var ( }, []string{priorityLevel}, ) - // ReadWriteConcurrencyObserverPairGenerator creates pairs that observe concurrency broken down by mutating vs readonly - ReadWriteConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond, + // ReadWriteConcurrencyPairVec creates pairs that observe concurrency broken down by mutating vs readonly + ReadWriteConcurrencyPairVec = NewSampleAndWaterMarkHistogramsPairVec(clock.RealClock{}, time.Millisecond, &compbasemetrics.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -355,9 +355,9 @@ var ( apiserverWorkEstimatedSeats, apiserverDispatchWithNoAccommodation, }. - Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...). - Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...). - Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...) + Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...). + Append(PriorityLevelConcurrencyPairVec.metrics()...). + Append(ReadWriteConcurrencyPairVec.metrics()...) ) // AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/observer.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/observer.go deleted file mode 100644 index 1e55a0e1e77..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/observer.go +++ /dev/null @@ -1,65 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package metrics - -// Observer is something that can be given numeric observations. -type Observer interface { - // Observe takes an observation - Observe(float64) -} - -// ChangeObserver extends Observer with the ability to take -// an observation that is relative to the previous observation. -type ChangeObserver interface { - Observer - - // Observe a new value that differs by the given amount from the previous observation. - Add(float64) -} - -// RatioedChangeObserver tracks ratios. -// The numerator is set/changed through the ChangeObserver methods, -// and the denominator can be updated through the SetDenominator method. -// A ratio is tracked whenever the numerator is set/changed. -type RatioedChangeObserver interface { - ChangeObserver - - // SetDenominator sets the denominator to use until it is changed again - SetDenominator(float64) -} - -// RatioedChangeObserverGenerator creates related observers that are -// differentiated by a series of label values -type RatioedChangeObserverGenerator interface { - Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver -} - -// RatioedChangeObserverPair is a corresponding pair of observers, one for the -// number of requests waiting in queue(s) and one for the number of -// requests being executed -type RatioedChangeObserverPair struct { - // RequestsWaiting is given observations of the number of currently queued requests - RequestsWaiting RatioedChangeObserver - - // RequestsExecuting is given observations of the number of requests currently executing - RequestsExecuting RatioedChangeObserver -} - -// RatioedChangeObserverPairGenerator generates pairs -type RatioedChangeObserverPairGenerator interface { - Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go index 29366b53635..1b43bfb2e2a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go @@ -34,44 +34,44 @@ const ( labelValueExecuting = "executing" ) -// SampleAndWaterMarkPairGenerator makes pairs of RatioedChangeObservers that +// SampleAndWaterMarkPairVec makes pairs of RatioedGauges that // track samples and watermarks. -type SampleAndWaterMarkPairGenerator struct { - urGenerator SampleAndWaterMarkObserverGenerator +type SampleAndWaterMarkPairVec struct { + urVec SampleAndWaterMarkObserverVec } -var _ RatioedChangeObserverPairGenerator = SampleAndWaterMarkPairGenerator{} +var _ RatioedGaugePairVec = SampleAndWaterMarkPairVec{} -// NewSampleAndWaterMarkHistogramsPairGenerator makes a new pair generator -func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairGenerator { - return SampleAndWaterMarkPairGenerator{ - urGenerator: NewSampleAndWaterMarkHistogramsGenerator(clock, samplePeriod, sampleOpts, waterMarkOpts, append([]string{labelNamePhase}, labelNames...)), +// NewSampleAndWaterMarkHistogramsPairVec makes a new pair generator +func NewSampleAndWaterMarkHistogramsPairVec(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairVec { + return SampleAndWaterMarkPairVec{ + urVec: NewSampleAndWaterMarkHistogramsVec(clock, samplePeriod, sampleOpts, waterMarkOpts, append([]string{labelNamePhase}, labelNames...)), } } -// Generate makes a new pair -func (spg SampleAndWaterMarkPairGenerator) Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair { - return RatioedChangeObserverPair{ - RequestsWaiting: spg.urGenerator.Generate(0, initialWaitingDenominator, append([]string{labelValueWaiting}, labelValues...)), - RequestsExecuting: spg.urGenerator.Generate(0, initialExecutingDenominator, append([]string{labelValueExecuting}, labelValues...)), +// NewForLabelValuesSafe makes a new pair +func (spg SampleAndWaterMarkPairVec) NewForLabelValuesSafe(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedGaugePair { + return RatioedGaugePair{ + RequestsWaiting: spg.urVec.NewForLabelValuesSafe(0, initialWaitingDenominator, append([]string{labelValueWaiting}, labelValues...)), + RequestsExecuting: spg.urVec.NewForLabelValuesSafe(0, initialExecutingDenominator, append([]string{labelValueExecuting}, labelValues...)), } } -func (spg SampleAndWaterMarkPairGenerator) metrics() Registerables { - return spg.urGenerator.metrics() +func (spg SampleAndWaterMarkPairVec) metrics() Registerables { + return spg.urVec.metrics() } -// SampleAndWaterMarkObserverGenerator creates RatioedChangeObservers that +// SampleAndWaterMarkObserverVec creates RatioedGauges that // populate histograms of samples and low- and high-water-marks. The // generator has a samplePeriod, and the histograms get an observation // every samplePeriod. The sampling windows are quantized based on // the monotonic rather than wall-clock times. The `t0` field is // there so to provide a baseline for monotonic clock differences. -type SampleAndWaterMarkObserverGenerator struct { - *sampleAndWaterMarkObserverGenerator +type SampleAndWaterMarkObserverVec struct { + *sampleAndWaterMarkObserverVec } -type sampleAndWaterMarkObserverGenerator struct { +type sampleAndWaterMarkObserverVec struct { clock clock.PassiveClock t0 time.Time samplePeriod time.Duration @@ -79,12 +79,12 @@ type sampleAndWaterMarkObserverGenerator struct { waterMarks *compbasemetrics.HistogramVec } -var _ RatioedChangeObserverGenerator = SampleAndWaterMarkObserverGenerator{} +var _ RatioedGaugeVec = SampleAndWaterMarkObserverVec{} -// NewSampleAndWaterMarkHistogramsGenerator makes a new one -func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator { - return SampleAndWaterMarkObserverGenerator{ - &sampleAndWaterMarkObserverGenerator{ +// NewSampleAndWaterMarkHistogramsVec makes a new one +func NewSampleAndWaterMarkHistogramsVec(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverVec { + return SampleAndWaterMarkObserverVec{ + &sampleAndWaterMarkObserverVec{ clock: clock, t0: clock.Now(), samplePeriod: samplePeriod, @@ -93,20 +93,20 @@ func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePe }} } -func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 { +func (swg *sampleAndWaterMarkObserverVec) quantize(when time.Time) int64 { return int64(when.Sub(swg.t0) / swg.samplePeriod) } -// Generate makes a new RatioedChangeObserver -func (swg *sampleAndWaterMarkObserverGenerator) Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver { +// NewForLabelValuesSafe makes a new RatioedGauge +func (swg *sampleAndWaterMarkObserverVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge { ratio := initialNumerator / initialDenominator when := swg.clock.Now() return &sampleAndWaterMarkHistograms{ - sampleAndWaterMarkObserverGenerator: swg, - labelValues: labelValues, - loLabelValues: append([]string{labelValueLo}, labelValues...), - hiLabelValues: append([]string{labelValueHi}, labelValues...), - denominator: initialDenominator, + sampleAndWaterMarkObserverVec: swg, + labelValues: labelValues, + loLabelValues: append([]string{labelValueLo}, labelValues...), + hiLabelValues: append([]string{labelValueHi}, labelValues...), + denominator: initialDenominator, sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{ lastSet: when, lastSetInt: swg.quantize(when), @@ -117,12 +117,12 @@ func (swg *sampleAndWaterMarkObserverGenerator) Generate(initialNumerator, initi }} } -func (swg *sampleAndWaterMarkObserverGenerator) metrics() Registerables { +func (swg *sampleAndWaterMarkObserverVec) metrics() Registerables { return Registerables{swg.samples, swg.waterMarks} } type sampleAndWaterMarkHistograms struct { - *sampleAndWaterMarkObserverGenerator + *sampleAndWaterMarkObserverVec labelValues []string loLabelValues, hiLabelValues []string @@ -139,17 +139,39 @@ type sampleAndWaterMarkAccumulator struct { loRatio, hiRatio float64 } -var _ RatioedChangeObserver = (*sampleAndWaterMarkHistograms)(nil) +var _ RatioedGauge = (*sampleAndWaterMarkHistograms)(nil) + +func (saw *sampleAndWaterMarkHistograms) Set(numerator float64) { + saw.innerSet(func() { + saw.numerator = numerator + }) +} func (saw *sampleAndWaterMarkHistograms) Add(deltaNumerator float64) { saw.innerSet(func() { saw.numerator += deltaNumerator }) } - -func (saw *sampleAndWaterMarkHistograms) Observe(numerator float64) { +func (saw *sampleAndWaterMarkHistograms) Sub(deltaNumerator float64) { saw.innerSet(func() { - saw.numerator = numerator + saw.numerator -= deltaNumerator + }) +} + +func (saw *sampleAndWaterMarkHistograms) Inc() { + saw.innerSet(func() { + saw.numerator += 1 + }) +} +func (saw *sampleAndWaterMarkHistograms) Dec() { + saw.innerSet(func() { + saw.numerator -= 1 + }) +} + +func (saw *sampleAndWaterMarkHistograms) SetToCurrentTime() { + saw.innerSet(func() { + saw.numerator = float64(saw.clock.Now().Sub(time.Unix(0, 0))) }) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark_test.go index 345c3c817e2..b26ada1d028 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark_test.go @@ -56,11 +56,11 @@ func TestSampler(t *testing.T) { t0 := time.Now() clk := testclock.NewFakePassiveClock(t0) buckets := []float64{0, 1} - gen := NewSampleAndWaterMarkHistogramsGenerator(clk, samplingPeriod, + gen := NewSampleAndWaterMarkHistogramsVec(clk, samplingPeriod, &compbasemetrics.HistogramOpts{Name: samplesHistName, Buckets: buckets}, &compbasemetrics.HistogramOpts{Name: "marks", Buckets: buckets}, []string{}) - saw := gen.Generate(0, 1, []string{}) + saw := gen.NewForLabelValuesSafe(0, 1, []string{}) toRegister := gen.metrics() registry := compbasemetrics.NewKubeRegistry() for _, reg := range toRegister { @@ -84,7 +84,7 @@ func TestSampler(t *testing.T) { dt = diff } clk.SetTime(t1) - saw.Observe(1) + saw.Set(1) expectedCount := int64(dt / samplingPeriod) actualCount, err := getHistogramCount(registry, samplesHistName) if err != nil && !(err == errMetricNotFound && expectedCount == 0) { diff --git a/test/integration/apiserver/flowcontrol/fight_test.go b/test/integration/apiserver/flowcontrol/fight_test.go index 08341d70dcd..4c40ed281ae 100644 --- a/test/integration/apiserver/flowcontrol/fight_test.go +++ b/test/integration/apiserver/flowcontrol/fight_test.go @@ -139,8 +139,8 @@ func (ft *fightTest) createController(invert bool, i int) { FlowcontrolClient: fcIfc, ServerConcurrencyLimit: 200, // server concurrency limit RequestWaitLimit: time.Minute / 4, // request wait limit - ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, - ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, + ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec, + ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqtesting.NewNoRestraintFactory(), }) ft.ctlrs[invert][i] = ctlr