Give apf metrics abstractions more familiar names

The logic is similar to Prometheus gauges and vectors,
adopt that terminology.
This commit is contained in:
Mike Spreitzer 2022-05-17 23:27:47 -04:00
parent eebfd7b574
commit 7d64a93a14
17 changed files with 219 additions and 181 deletions

View File

@ -61,13 +61,13 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
// requestWatermark is used to track maximal numbers of requests in a particular phase of handling // requestWatermark is used to track maximal numbers of requests in a particular phase of handling
type requestWatermark struct { type requestWatermark struct {
phase string phase string
readOnlyObserver, mutatingObserver fcmetrics.RatioedChangeObserver readOnlyObserver, mutatingObserver fcmetrics.RatioedGauge
lock sync.Mutex lock sync.Mutex
readOnlyWatermark, mutatingWatermark int readOnlyWatermark, mutatingWatermark int
} }
func (w *requestWatermark) recordMutating(mutatingVal int) { func (w *requestWatermark) recordMutating(mutatingVal int) {
w.mutatingObserver.Observe(float64(mutatingVal)) w.mutatingObserver.Set(float64(mutatingVal))
w.lock.Lock() w.lock.Lock()
defer w.lock.Unlock() defer w.lock.Unlock()
@ -78,7 +78,7 @@ func (w *requestWatermark) recordMutating(mutatingVal int) {
} }
func (w *requestWatermark) recordReadOnly(readOnlyVal int) { func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
w.readOnlyObserver.Observe(float64(readOnlyVal)) w.readOnlyObserver.Set(float64(readOnlyVal))
w.lock.Lock() w.lock.Lock()
defer w.lock.Unlock() defer w.lock.Unlock()
@ -91,8 +91,8 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
// watermark tracks requests being executed (not waiting in a queue) // watermark tracks requests being executed (not waiting in a queue)
var watermark = &requestWatermark{ var watermark = &requestWatermark{
phase: metrics.ExecutingPhase, phase: metrics.ExecutingPhase,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting, readOnlyObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting,
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting, mutatingObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{metrics.MutatingKind}).RequestsExecuting,
} }
// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark. // startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.

View File

@ -47,8 +47,8 @@ type PriorityAndFairnessClassification struct {
// waitingMark tracks requests waiting rather than being executed // waitingMark tracks requests waiting rather than being executed
var waitingMark = &requestWatermark{ var waitingMark = &requestWatermark{
phase: epmetrics.WaitingPhase, phase: epmetrics.WaitingPhase,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting, readOnlyObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting,
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting, mutatingObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting,
} }
var atomicMutatingExecuting, atomicReadOnlyExecuting int32 var atomicMutatingExecuting, atomicReadOnlyExecuting int32

View File

@ -99,11 +99,11 @@ type RequestDigest struct {
// this type and cfgMeal follow the convention that the suffix // this type and cfgMeal follow the convention that the suffix
// "Locked" means that the caller must hold the configController lock. // "Locked" means that the caller must hold the configController lock.
type configController struct { type configController struct {
name string // varies in tests of fighting controllers name string // varies in tests of fighting controllers
clock clock.PassiveClock clock clock.PassiveClock
queueSetFactory fq.QueueSetFactory queueSetFactory fq.QueueSetFactory
reqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator reqsGaugePairVec metrics.RatioedGaugePairVec
execSeatsObsGenerator metrics.RatioedChangeObserverGenerator execSeatsGaugeVec metrics.RatioedGaugeVec
// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager // How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager
asFieldManager string asFieldManager string
@ -193,10 +193,10 @@ type priorityLevelState struct {
numPending int numPending int
// Observers tracking number of requests waiting, executing // Observers tracking number of requests waiting, executing
reqsObsPair metrics.RatioedChangeObserverPair reqsGaugePair metrics.RatioedGaugePair
// Observer of number of seats occupied throughout execution // Observer of number of seats occupied throughout execution
execSeatsObs metrics.RatioedChangeObserver execSeatsObs metrics.RatioedGauge
} }
// NewTestableController is extra flexible to facilitate testing // NewTestableController is extra flexible to facilitate testing
@ -205,8 +205,8 @@ func newTestableController(config TestableConfig) *configController {
name: config.Name, name: config.Name,
clock: config.Clock, clock: config.Clock,
queueSetFactory: config.QueueSetFactory, queueSetFactory: config.QueueSetFactory,
reqsObsPairGenerator: config.ReqsObsPairGenerator, reqsGaugePairVec: config.ReqsGaugePairVec,
execSeatsObsGenerator: config.ExecSeatsObsGenerator, execSeatsGaugeVec: config.ExecSeatsGaugeVec,
asFieldManager: config.AsFieldManager, asFieldManager: config.AsFieldManager,
foundToDangling: config.FoundToDangling, foundToDangling: config.FoundToDangling,
serverConcurrencyLimit: config.ServerConcurrencyLimit, serverConcurrencyLimit: config.ServerConcurrencyLimit,
@ -292,7 +292,7 @@ func newTestableController(config TestableConfig) *configController {
} }
// MaintainObservations keeps the observers from // MaintainObservations keeps the observers from
// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling // metrics.PriorityLevelConcurrencyPairVec from falling
// too far behind // too far behind
func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) { func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) {
wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh) wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh)
@ -539,9 +539,9 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
state := meal.cfgCtlr.priorityLevelStates[pl.Name] state := meal.cfgCtlr.priorityLevelStates[pl.Name]
if state == nil { if state == nil {
labelValues := []string{pl.Name} labelValues := []string{pl.Name}
state = &priorityLevelState{reqsObsPair: meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsObsGenerator.Generate(0, 1, labelValues)} state = &priorityLevelState{reqsGaugePair: meal.cfgCtlr.reqsGaugePairVec.NewForLabelValuesSafe(1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)}
} }
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsObsPair, state.execSeatsObs) qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs)
if err != nil { if err != nil {
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
continue continue
@ -645,7 +645,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
} }
} }
var err error var err error
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsObsPair, plState.execSeatsObs) plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs)
if err != nil { if err != nil {
// This can not happen because queueSetCompleterForPL already approved this config // This can not happen because queueSetCompleterForPL already approved this config
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
@ -694,7 +694,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
// given priority level configuration. Returns nil if that config // given priority level configuration. Returns nil if that config
// does not call for limiting. Returns nil and an error if the given // does not call for limiting. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package. // object is malformed in a way that is a problem for this package.
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
return nil, errors.New("broken union structure at the top") return nil, errors.New("broken union structure at the top")
} }
@ -769,19 +769,19 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) { func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name) klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
labelValues := []string{proto.Name} labelValues := []string{proto.Name}
reqsObsPair := meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues) reqsGaugePair := meal.cfgCtlr.reqsGaugePairVec.NewForLabelValuesSafe(1, 1, labelValues)
execSeatsObs := meal.cfgCtlr.execSeatsObsGenerator.Generate(0, 1, labelValues) execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsObsPair, execSeatsObs) qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs)
if err != nil { if err != nil {
// This can not happen because proto is one of the mandatory // This can not happen because proto is one of the mandatory
// objects and these are not erroneous // objects and these are not erroneous
panic(err) panic(err)
} }
meal.newPLStates[proto.Name] = &priorityLevelState{ meal.newPLStates[proto.Name] = &priorityLevelState{
pl: proto, pl: proto,
qsCompleter: qsCompleter, qsCompleter: qsCompleter,
reqsObsPair: reqsObsPair, reqsGaugePair: reqsGaugePair,
execSeatsObs: execSeatsObs, execSeatsObs: execSeatsObs,
} }
if proto.Spec.Limited != nil { if proto.Spec.Limited != nil {
meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares) meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares)

View File

@ -100,8 +100,8 @@ func New(
FlowcontrolClient: flowcontrolClient, FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: serverConcurrencyLimit, ServerConcurrencyLimit: serverConcurrencyLimit,
RequestWaitLimit: requestWaitLimit, RequestWaitLimit: requestWaitLimit,
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec,
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk), QueueSetFactory: fqs.NewQueueSetFactory(clk),
}) })
} }
@ -140,11 +140,11 @@ type TestableConfig struct {
// RequestWaitLimit configured on the server // RequestWaitLimit configured on the server
RequestWaitLimit time.Duration RequestWaitLimit time.Duration
// ObsPairGenerator for metrics about requests // GaugePairVec for metrics about requests
ReqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator ReqsGaugePairVec metrics.RatioedGaugePairVec
// RatioedChangeObserverPairGenerator for metrics about seats occupied by all phases of execution // RatioedGaugePairVec for metrics about seats occupied by all phases of execution
ExecSeatsObsGenerator metrics.RatioedChangeObserverGenerator ExecSeatsGaugeVec metrics.RatioedGaugeVec
// QueueSetFactory for the queuing implementation // QueueSetFactory for the queuing implementation
QueueSetFactory fq.QueueSetFactory QueueSetFactory fq.QueueSetFactory

View File

@ -105,7 +105,7 @@ type ctlrTestRequest struct {
descr1, descr2 interface{} descr1, descr2 interface{}
} }
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedChangeObserverPair, eso metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
} }
@ -261,8 +261,8 @@ func TestConfigConsumer(t *testing.T) {
FlowcontrolClient: flowcontrolClient, FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 100, // server concurrency limit ServerConcurrencyLimit: 100, // server concurrency limit
RequestWaitLimit: time.Minute, // request wait limit RequestWaitLimit: time.Minute, // request wait limit
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec,
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: cts, QueueSetFactory: cts,
}) })
cts.cfgCtlr = ctlr cts.cfgCtlr = ctlr
@ -393,8 +393,8 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) {
FlowcontrolClient: flowcontrolClient, FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 100, ServerConcurrencyLimit: 100,
RequestWaitLimit: time.Minute, RequestWaitLimit: time.Minute,
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec,
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: cts, QueueSetFactory: cts,
}) })

View File

@ -21,7 +21,6 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/utils/clock" "k8s.io/utils/clock"
) )
@ -30,7 +29,8 @@ import (
// Integrator is created, and ends at the latest operation on the // Integrator is created, and ends at the latest operation on the
// Integrator. // Integrator.
type Integrator interface { type Integrator interface {
metrics.ChangeObserver Set(float64)
Add(float64)
GetResults() IntegratorResults GetResults() IntegratorResults
@ -69,7 +69,7 @@ func NewIntegrator(clock clock.PassiveClock) Integrator {
} }
} }
func (igr *integrator) Observe(x float64) { func (igr *integrator) Set(x float64) {
igr.Lock() igr.Lock()
igr.setLocked(x) igr.setLocked(x)
igr.Unlock() igr.Unlock()

View File

@ -38,7 +38,7 @@ func TestIntegrator(t *testing.T) {
if !results.Equal(&rToo) { if !results.Equal(&rToo) {
t.Errorf("expected %#+v, got %#+v", results, rToo) t.Errorf("expected %#+v, got %#+v", results, rToo)
} }
igr.Observe(2) igr.Set(2)
results = igr.GetResults() results = igr.GetResults()
if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) { if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) {
t.Errorf("expected %#+v, got %#+v", e, results) t.Errorf("expected %#+v, got %#+v", e, results)

View File

@ -32,10 +32,10 @@ import (
// before committing to a concurrency allotment for the second. // before committing to a concurrency allotment for the second.
type QueueSetFactory interface { type QueueSetFactory interface {
// BeginConstruction does the first phase of creating a QueueSet. // BeginConstruction does the first phase of creating a QueueSet.
// The RatioedChangeObserverPair observes number of requests, // The RatioedGaugePair observes number of requests,
// execution covering just the regular phase. // execution covering just the regular phase.
// The RatioedChangeObserver observes number of seats occupied through all phases of execution. // The RatioedGauge observes number of seats occupied through all phases of execution.
BeginConstruction(QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (QueueSetCompleter, error) BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (QueueSetCompleter, error)
} }
// QueueSetCompleter finishes the two-step process of creating or // QueueSetCompleter finishes the two-step process of creating or

View File

@ -60,12 +60,12 @@ type promiseFactoryFactory func(*queueSet) promiseFactory
// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of // `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
// the fields `factory` and `theSet` is non-nil. // the fields `factory` and `theSet` is non-nil.
type queueSetCompleter struct { type queueSetCompleter struct {
factory *queueSetFactory factory *queueSetFactory
reqsObsPair metrics.RatioedChangeObserverPair reqsGaugePair metrics.RatioedGaugePair
execSeatsObs metrics.RatioedChangeObserver execSeatsGauge metrics.RatioedGauge
theSet *queueSet theSet *queueSet
qCfg fq.QueuingConfig qCfg fq.QueuingConfig
dealer *shufflesharding.Dealer dealer *shufflesharding.Dealer
} }
// queueSet implements the Fair Queuing for Server Requests technique // queueSet implements the Fair Queuing for Server Requests technique
@ -81,9 +81,9 @@ type queueSet struct {
clock eventclock.Interface clock eventclock.Interface
estimatedServiceDuration time.Duration estimatedServiceDuration time.Duration
reqsObsPair metrics.RatioedChangeObserverPair // .RequestsExecuting covers regular phase only reqsGaugePair metrics.RatioedGaugePair // .RequestsExecuting covers regular phase only
execSeatsObs metrics.RatioedChangeObserver // for all phases of execution execSeatsGauge metrics.RatioedGauge // for all phases of execution
promiseFactory promiseFactory promiseFactory promiseFactory
@ -148,17 +148,17 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr
} }
} }
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
dealer, err := checkConfig(qCfg) dealer, err := checkConfig(qCfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &queueSetCompleter{ return &queueSetCompleter{
factory: qsf, factory: qsf,
reqsObsPair: reqsObsPair, reqsGaugePair: reqsGaugePair,
execSeatsObs: execSeatsObs, execSeatsGauge: execSeatsGauge,
qCfg: qCfg, qCfg: qCfg,
dealer: dealer}, nil dealer: dealer}, nil
} }
// checkConfig returns a non-nil Dealer if the config is valid and // checkConfig returns a non-nil Dealer if the config is valid and
@ -181,8 +181,8 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
qs = &queueSet{ qs = &queueSet{
clock: qsc.factory.clock, clock: qsc.factory.clock,
estimatedServiceDuration: 3 * time.Millisecond, estimatedServiceDuration: 3 * time.Millisecond,
reqsObsPair: qsc.reqsObsPair, reqsGaugePair: qsc.reqsGaugePair,
execSeatsObs: qsc.execSeatsObs, execSeatsGauge: qsc.execSeatsGauge,
qCfg: qsc.qCfg, qCfg: qsc.qCfg,
currentR: 0, currentR: 0,
lastRealTime: qsc.factory.clock.Now(), lastRealTime: qsc.factory.clock.Now(),
@ -243,9 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
if qll < 1 { if qll < 1 {
qll = 1 qll = 1
} }
qs.reqsObsPair.RequestsWaiting.SetDenominator(float64(qll)) qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll))
qs.reqsObsPair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit)) qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.execSeatsObs.SetDenominator(float64(dCfg.ConcurrencyLimit)) qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.dispatchAsMuchAsPossibleLocked() qs.dispatchAsMuchAsPossibleLocked()
} }
@ -398,7 +398,7 @@ func (req *request) wait() (bool, bool) {
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false) req.NoteQueued(false)
qs.reqsObsPair.RequestsWaiting.Add(-1) qs.reqsGaugePair.RequestsWaiting.Add(-1)
} }
return false, qs.isIdleLocked() return false, qs.isIdleLocked()
} }
@ -609,7 +609,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
// remove timed out requests from queue // remove timed out requests from queue
if timeoutCount > 0 { if timeoutCount > 0 {
qs.totRequestsWaiting -= timeoutCount qs.totRequestsWaiting -= timeoutCount
qs.reqsObsPair.RequestsWaiting.Add(float64(-timeoutCount)) qs.reqsGaugePair.RequestsWaiting.Add(float64(-timeoutCount))
} }
} }
@ -646,7 +646,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
qs.totRequestsWaiting++ qs.totRequestsWaiting++
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
request.NoteQueued(true) request.NoteQueued(true)
qs.reqsObsPair.RequestsWaiting.Add(1) qs.reqsGaugePair.RequestsWaiting.Add(1)
} }
// dispatchAsMuchAsPossibleLocked does as many dispatches as possible now. // dispatchAsMuchAsPossibleLocked does as many dispatches as possible now.
@ -675,8 +675,8 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
qs.totSeatsInUse += req.MaxSeats() qs.totSeatsInUse += req.MaxSeats()
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
qs.reqsObsPair.RequestsExecuting.Add(1) qs.reqsGaugePair.RequestsExecuting.Add(1)
qs.execSeatsObs.Add(float64(req.MaxSeats())) qs.execSeatsGauge.Add(float64(req.MaxSeats()))
klogV := klog.V(5) klogV := klog.V(5)
if klogV.Enabled() { if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting) klogV.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting)
@ -700,7 +700,7 @@ func (qs *queueSet) dispatchLocked() bool {
qs.totRequestsWaiting-- qs.totRequestsWaiting--
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
request.NoteQueued(false) request.NoteQueued(false)
qs.reqsObsPair.RequestsWaiting.Add(-1) qs.reqsGaugePair.RequestsWaiting.Add(-1)
defer qs.boundNextDispatchLocked(queue) defer qs.boundNextDispatchLocked(queue)
if !request.decision.Set(decisionExecute) { if !request.decision.Set(decisionExecute) {
return true return true
@ -717,8 +717,8 @@ func (qs *queueSet) dispatchLocked() bool {
queue.seatsInUse += request.MaxSeats() queue.seatsInUse += request.MaxSeats()
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
qs.reqsObsPair.RequestsExecuting.Add(1) qs.reqsGaugePair.RequestsExecuting.Add(1)
qs.execSeatsObs.Add(float64(request.MaxSeats())) qs.execSeatsGauge.Add(float64(request.MaxSeats()))
klogV := klog.V(6) klogV := klog.V(6)
if klogV.Enabled() { if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
@ -862,7 +862,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
now := qs.clock.Now() now := qs.clock.Now()
qs.totRequestsExecuting-- qs.totRequestsExecuting--
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
qs.reqsObsPair.RequestsExecuting.Add(-1) qs.reqsGaugePair.RequestsExecuting.Add(-1)
actualServiceDuration := now.Sub(r.startTime) actualServiceDuration := now.Sub(r.startTime)
@ -874,7 +874,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
qs.totSeatsInUse -= r.MaxSeats() qs.totSeatsInUse -= r.MaxSeats()
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats())
qs.execSeatsObs.Add(-float64(r.MaxSeats())) qs.execSeatsGauge.Add(-float64(r.MaxSeats()))
if r.queue != nil { if r.queue != nil {
r.queue.seatsInUse -= r.MaxSeats() r.queue.seatsInUse -= r.MaxSeats()
} }
@ -989,9 +989,9 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
} }
func (qs *queueSet) UpdateObservations() { func (qs *queueSet) UpdateObservations() {
qs.reqsObsPair.RequestsWaiting.Add(0) qs.reqsGaugePair.RequestsWaiting.Add(0)
qs.reqsObsPair.RequestsExecuting.Add(0) qs.reqsGaugePair.RequestsExecuting.Add(0)
qs.execSeatsObs.Add(0) qs.execSeatsGauge.Add(0)
} }
func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {

View File

@ -445,7 +445,7 @@ func TestNoRestraint(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
now := time.Now() now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil) clk, counter := testeventclock.NewFake(now, 0, nil)
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk), newExecSeatsObserver(clk)) nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -481,7 +481,7 @@ func TestBaseline(t *testing.T) {
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -550,7 +550,7 @@ func TestSeparations(t *testing.T) {
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -589,7 +589,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -626,7 +626,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -662,7 +662,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -702,7 +702,7 @@ func TestSeatSecondsRollover(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 40 * Quarter, RequestWaitLimit: 40 * Quarter,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -740,7 +740,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -777,7 +777,7 @@ func TestDifferentWidths(t *testing.T) {
HandSize: 7, HandSize: 7,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -813,7 +813,7 @@ func TestTooWide(t *testing.T) {
HandSize: 7, HandSize: 7,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -874,7 +874,7 @@ func TestWindup(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -909,7 +909,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
Name: "TestDifferentFlowsWithoutQueuing", Name: "TestDifferentFlowsWithoutQueuing",
DesiredNumQueues: 0, DesiredNumQueues: 0,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -945,7 +945,7 @@ func TestTimeout(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 0, RequestWaitLimit: 0,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -996,7 +996,7 @@ func TestContextCancel(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 15 * time.Second, RequestWaitLimit: 15 * time.Second,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1102,7 +1102,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
DesiredNumQueues: 0, DesiredNumQueues: 0,
RequestWaitLimit: 15 * time.Second, RequestWaitLimit: 15 * time.Second,
} }
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1357,8 +1357,8 @@ func TestFinishRequestLocked(t *testing.T) {
qs := &queueSet{ qs := &queueSet{
clock: clk, clock: clk,
estimatedServiceDuration: time.Second, estimatedServiceDuration: time.Second,
reqsObsPair: newObserverPair(clk), reqsGaugePair: newGaugePair(clk),
execSeatsObs: newExecSeatsObserver(clk), execSeatsGauge: newExecSeatsGauge(clk),
} }
queue := &queue{ queue := &queue{
requests: newRequestFIFO(), requests: newRequestFIFO(),
@ -1461,10 +1461,10 @@ func newFIFO(requests ...*request) fifo {
return l return l
} }
func newObserverPair(clk clock.PassiveClock) metrics.RatioedChangeObserverPair { func newGaugePair(clk clock.PassiveClock) metrics.RatioedGaugePair {
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}) return metrics.PriorityLevelConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{"test"})
} }
func newExecSeatsObserver(clk clock.PassiveClock) metrics.RatioedChangeObserver { func newExecSeatsGauge(clk clock.PassiveClock) metrics.RatioedGauge {
return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(0, 1, []string{"test"}) return metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, []string{"test"})
} }

View File

@ -40,7 +40,7 @@ type noRestraint struct{}
type noRestraintRequest struct{} type noRestraintRequest struct{}
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
return noRestraintCompleter{}, nil return noRestraintCompleter{}, nil
} }

View File

@ -57,7 +57,7 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration
QueueLengthLimit: 5} QueueLengthLimit: 5}
} }
labelVals := []string{"test"} labelVals := []string{"test"}
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, labelVals), metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(0, 1, labelVals)) _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyPairVec.NewForLabelValuesSafe(1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals))
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -105,8 +105,8 @@ var (
}, },
[]string{priorityLevel, flowSchema}, []string{priorityLevel, flowSchema},
) )
// PriorityLevelExecutionSeatsObserverGenerator creates observers of seats occupied throughout execution for priority levels // PriorityLevelExecutionSeatsGaugeVec creates observers of seats occupied throughout execution for priority levels
PriorityLevelExecutionSeatsObserverGenerator = NewSampleAndWaterMarkHistogramsGenerator(clock.RealClock{}, time.Millisecond, PriorityLevelExecutionSeatsGaugeVec = NewSampleAndWaterMarkHistogramsVec(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{ &compbasemetrics.HistogramOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -127,8 +127,8 @@ var (
}, },
[]string{priorityLevel}, []string{priorityLevel},
) )
// PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels // PriorityLevelConcurrencyPairVec creates pairs that observe concurrency for priority levels
PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond, PriorityLevelConcurrencyPairVec = NewSampleAndWaterMarkHistogramsPairVec(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{ &compbasemetrics.HistogramOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -147,8 +147,8 @@ var (
}, },
[]string{priorityLevel}, []string{priorityLevel},
) )
// ReadWriteConcurrencyObserverPairGenerator creates pairs that observe concurrency broken down by mutating vs readonly // ReadWriteConcurrencyPairVec creates pairs that observe concurrency broken down by mutating vs readonly
ReadWriteConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond, ReadWriteConcurrencyPairVec = NewSampleAndWaterMarkHistogramsPairVec(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{ &compbasemetrics.HistogramOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -355,9 +355,9 @@ var (
apiserverWorkEstimatedSeats, apiserverWorkEstimatedSeats,
apiserverDispatchWithNoAccommodation, apiserverDispatchWithNoAccommodation,
}. }.
Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...). Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...).
Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...). Append(PriorityLevelConcurrencyPairVec.metrics()...).
Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...) Append(ReadWriteConcurrencyPairVec.metrics()...)
) )
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel // AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel

View File

@ -16,50 +16,66 @@ limitations under the License.
package metrics package metrics
// Observer is something that can be given numeric observations. // Gauge is the methods of a gauge that are used by instrumented code.
type Observer interface { type Gauge interface {
// Observe takes an observation Set(float64)
Observe(float64) Inc()
} Dec()
// ChangeObserver extends Observer with the ability to take
// an observation that is relative to the previous observation.
type ChangeObserver interface {
Observer
// Observe a new value that differs by the given amount from the previous observation.
Add(float64) Add(float64)
SetToCurrentTime()
} }
// RatioedChangeObserver tracks ratios. // RatioedGauge tracks ratios.
// The numerator is set/changed through the ChangeObserver methods, // The numerator is set/changed through the Gauge methods,
// and the denominator can be updated through the SetDenominator method. // and the denominator can be updated through the SetDenominator method.
// A ratio is tracked whenever the numerator is set/changed. // A ratio is tracked whenever the numerator or denominator is set/changed.
type RatioedChangeObserver interface { type RatioedGauge interface {
ChangeObserver Gauge
// SetDenominator sets the denominator to use until it is changed again // SetDenominator sets the denominator to use until it is changed again
SetDenominator(float64) SetDenominator(float64)
} }
// RatioedChangeObserverGenerator creates related observers that are // RatioedGaugeVec creates related observers that are
// differentiated by a series of label values // differentiated by a series of label values
type RatioedChangeObserverGenerator interface { type RatioedGaugeVec interface {
Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver
// NewForLabelValuesSafe makes a new vector member for the given tuple of label values,
// initialized with the given numerator and denominator.
// Unlike the usual Vec WithLabelValues method, this is intended to be called only
// once per vector member (at the start of its lifecycle).
// The "Safe" part is saying that the returned object will function properly after metric registration
// even if this method is called before registration.
NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge
} }
// RatioedChangeObserverPair is a corresponding pair of observers, one for the //////////////////////////////// Pairs ////////////////////////////////
/* API Priority and Fairness tends to use RatioedGaugeVec members in pairs,
* one for requests waiting in a queue and one for requests being executed.
* The following definitions are a convenience layer that adds support for that
* particular pattern of usage.
*/
// RatioedGaugePair is a corresponding pair of gauges, one for the
// number of requests waiting in queue(s) and one for the number of // number of requests waiting in queue(s) and one for the number of
// requests being executed // requests being executed.
type RatioedChangeObserverPair struct { type RatioedGaugePair struct {
// RequestsWaiting is given observations of the number of currently queued requests // RequestsWaiting is given observations of the number of currently queued requests
RequestsWaiting RatioedChangeObserver RequestsWaiting RatioedGauge
// RequestsExecuting is given observations of the number of requests currently executing // RequestsExecuting is given observations of the number of requests currently executing
RequestsExecuting RatioedChangeObserver RequestsExecuting RatioedGauge
} }
// RatioedChangeObserverPairGenerator generates pairs // RatioedGaugePairVec generates pairs
type RatioedChangeObserverPairGenerator interface { type RatioedGaugePairVec interface {
Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair
// NewForLabelValuesSafe makes a new vector member for the given tuple of label values,
// initialized with the given denominators and zeros for numerators.
// Unlike the usual Vec WithLabelValues method, this is intended to be called only
// once per vector member (at the start of its lifecycle).
// The "Safe" part is saying that the returned object will function properly after metric registration
// even if this method is called before registration.
NewForLabelValuesSafe(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedGaugePair
} }

View File

@ -34,44 +34,44 @@ const (
labelValueExecuting = "executing" labelValueExecuting = "executing"
) )
// SampleAndWaterMarkPairGenerator makes pairs of RatioedChangeObservers that // SampleAndWaterMarkPairVec makes pairs of RatioedGauges that
// track samples and watermarks. // track samples and watermarks.
type SampleAndWaterMarkPairGenerator struct { type SampleAndWaterMarkPairVec struct {
urGenerator SampleAndWaterMarkObserverGenerator urVec SampleAndWaterMarkObserverVec
} }
var _ RatioedChangeObserverPairGenerator = SampleAndWaterMarkPairGenerator{} var _ RatioedGaugePairVec = SampleAndWaterMarkPairVec{}
// NewSampleAndWaterMarkHistogramsPairGenerator makes a new pair generator // NewSampleAndWaterMarkHistogramsPairVec makes a new pair generator
func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairGenerator { func NewSampleAndWaterMarkHistogramsPairVec(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairVec {
return SampleAndWaterMarkPairGenerator{ return SampleAndWaterMarkPairVec{
urGenerator: NewSampleAndWaterMarkHistogramsGenerator(clock, samplePeriod, sampleOpts, waterMarkOpts, append([]string{labelNamePhase}, labelNames...)), urVec: NewSampleAndWaterMarkHistogramsVec(clock, samplePeriod, sampleOpts, waterMarkOpts, append([]string{labelNamePhase}, labelNames...)),
} }
} }
// Generate makes a new pair // NewForLabelValuesSafe makes a new pair
func (spg SampleAndWaterMarkPairGenerator) Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair { func (spg SampleAndWaterMarkPairVec) NewForLabelValuesSafe(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedGaugePair {
return RatioedChangeObserverPair{ return RatioedGaugePair{
RequestsWaiting: spg.urGenerator.Generate(0, initialWaitingDenominator, append([]string{labelValueWaiting}, labelValues...)), RequestsWaiting: spg.urVec.NewForLabelValuesSafe(0, initialWaitingDenominator, append([]string{labelValueWaiting}, labelValues...)),
RequestsExecuting: spg.urGenerator.Generate(0, initialExecutingDenominator, append([]string{labelValueExecuting}, labelValues...)), RequestsExecuting: spg.urVec.NewForLabelValuesSafe(0, initialExecutingDenominator, append([]string{labelValueExecuting}, labelValues...)),
} }
} }
func (spg SampleAndWaterMarkPairGenerator) metrics() Registerables { func (spg SampleAndWaterMarkPairVec) metrics() Registerables {
return spg.urGenerator.metrics() return spg.urVec.metrics()
} }
// SampleAndWaterMarkObserverGenerator creates RatioedChangeObservers that // SampleAndWaterMarkObserverVec creates RatioedGauges that
// populate histograms of samples and low- and high-water-marks. The // populate histograms of samples and low- and high-water-marks. The
// generator has a samplePeriod, and the histograms get an observation // generator has a samplePeriod, and the histograms get an observation
// every samplePeriod. The sampling windows are quantized based on // every samplePeriod. The sampling windows are quantized based on
// the monotonic rather than wall-clock times. The `t0` field is // the monotonic rather than wall-clock times. The `t0` field is
// there so to provide a baseline for monotonic clock differences. // there so to provide a baseline for monotonic clock differences.
type SampleAndWaterMarkObserverGenerator struct { type SampleAndWaterMarkObserverVec struct {
*sampleAndWaterMarkObserverGenerator *sampleAndWaterMarkObserverVec
} }
type sampleAndWaterMarkObserverGenerator struct { type sampleAndWaterMarkObserverVec struct {
clock clock.PassiveClock clock clock.PassiveClock
t0 time.Time t0 time.Time
samplePeriod time.Duration samplePeriod time.Duration
@ -79,12 +79,12 @@ type sampleAndWaterMarkObserverGenerator struct {
waterMarks *compbasemetrics.HistogramVec waterMarks *compbasemetrics.HistogramVec
} }
var _ RatioedChangeObserverGenerator = SampleAndWaterMarkObserverGenerator{} var _ RatioedGaugeVec = SampleAndWaterMarkObserverVec{}
// NewSampleAndWaterMarkHistogramsGenerator makes a new one // NewSampleAndWaterMarkHistogramsVec makes a new one
func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator { func NewSampleAndWaterMarkHistogramsVec(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverVec {
return SampleAndWaterMarkObserverGenerator{ return SampleAndWaterMarkObserverVec{
&sampleAndWaterMarkObserverGenerator{ &sampleAndWaterMarkObserverVec{
clock: clock, clock: clock,
t0: clock.Now(), t0: clock.Now(),
samplePeriod: samplePeriod, samplePeriod: samplePeriod,
@ -93,20 +93,20 @@ func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePe
}} }}
} }
func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 { func (swg *sampleAndWaterMarkObserverVec) quantize(when time.Time) int64 {
return int64(when.Sub(swg.t0) / swg.samplePeriod) return int64(when.Sub(swg.t0) / swg.samplePeriod)
} }
// Generate makes a new RatioedChangeObserver // NewForLabelValuesSafe makes a new RatioedGauge
func (swg *sampleAndWaterMarkObserverGenerator) Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver { func (swg *sampleAndWaterMarkObserverVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge {
ratio := initialNumerator / initialDenominator ratio := initialNumerator / initialDenominator
when := swg.clock.Now() when := swg.clock.Now()
return &sampleAndWaterMarkHistograms{ return &sampleAndWaterMarkHistograms{
sampleAndWaterMarkObserverGenerator: swg, sampleAndWaterMarkObserverVec: swg,
labelValues: labelValues, labelValues: labelValues,
loLabelValues: append([]string{labelValueLo}, labelValues...), loLabelValues: append([]string{labelValueLo}, labelValues...),
hiLabelValues: append([]string{labelValueHi}, labelValues...), hiLabelValues: append([]string{labelValueHi}, labelValues...),
denominator: initialDenominator, denominator: initialDenominator,
sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{ sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{
lastSet: when, lastSet: when,
lastSetInt: swg.quantize(when), lastSetInt: swg.quantize(when),
@ -117,12 +117,12 @@ func (swg *sampleAndWaterMarkObserverGenerator) Generate(initialNumerator, initi
}} }}
} }
func (swg *sampleAndWaterMarkObserverGenerator) metrics() Registerables { func (swg *sampleAndWaterMarkObserverVec) metrics() Registerables {
return Registerables{swg.samples, swg.waterMarks} return Registerables{swg.samples, swg.waterMarks}
} }
type sampleAndWaterMarkHistograms struct { type sampleAndWaterMarkHistograms struct {
*sampleAndWaterMarkObserverGenerator *sampleAndWaterMarkObserverVec
labelValues []string labelValues []string
loLabelValues, hiLabelValues []string loLabelValues, hiLabelValues []string
@ -139,17 +139,39 @@ type sampleAndWaterMarkAccumulator struct {
loRatio, hiRatio float64 loRatio, hiRatio float64
} }
var _ RatioedChangeObserver = (*sampleAndWaterMarkHistograms)(nil) var _ RatioedGauge = (*sampleAndWaterMarkHistograms)(nil)
func (saw *sampleAndWaterMarkHistograms) Set(numerator float64) {
saw.innerSet(func() {
saw.numerator = numerator
})
}
func (saw *sampleAndWaterMarkHistograms) Add(deltaNumerator float64) { func (saw *sampleAndWaterMarkHistograms) Add(deltaNumerator float64) {
saw.innerSet(func() { saw.innerSet(func() {
saw.numerator += deltaNumerator saw.numerator += deltaNumerator
}) })
} }
func (saw *sampleAndWaterMarkHistograms) Sub(deltaNumerator float64) {
func (saw *sampleAndWaterMarkHistograms) Observe(numerator float64) {
saw.innerSet(func() { saw.innerSet(func() {
saw.numerator = numerator saw.numerator -= deltaNumerator
})
}
func (saw *sampleAndWaterMarkHistograms) Inc() {
saw.innerSet(func() {
saw.numerator += 1
})
}
func (saw *sampleAndWaterMarkHistograms) Dec() {
saw.innerSet(func() {
saw.numerator -= 1
})
}
func (saw *sampleAndWaterMarkHistograms) SetToCurrentTime() {
saw.innerSet(func() {
saw.numerator = float64(saw.clock.Now().Sub(time.Unix(0, 0)))
}) })
} }

View File

@ -56,11 +56,11 @@ func TestSampler(t *testing.T) {
t0 := time.Now() t0 := time.Now()
clk := testclock.NewFakePassiveClock(t0) clk := testclock.NewFakePassiveClock(t0)
buckets := []float64{0, 1} buckets := []float64{0, 1}
gen := NewSampleAndWaterMarkHistogramsGenerator(clk, samplingPeriod, gen := NewSampleAndWaterMarkHistogramsVec(clk, samplingPeriod,
&compbasemetrics.HistogramOpts{Name: samplesHistName, Buckets: buckets}, &compbasemetrics.HistogramOpts{Name: samplesHistName, Buckets: buckets},
&compbasemetrics.HistogramOpts{Name: "marks", Buckets: buckets}, &compbasemetrics.HistogramOpts{Name: "marks", Buckets: buckets},
[]string{}) []string{})
saw := gen.Generate(0, 1, []string{}) saw := gen.NewForLabelValuesSafe(0, 1, []string{})
toRegister := gen.metrics() toRegister := gen.metrics()
registry := compbasemetrics.NewKubeRegistry() registry := compbasemetrics.NewKubeRegistry()
for _, reg := range toRegister { for _, reg := range toRegister {
@ -84,7 +84,7 @@ func TestSampler(t *testing.T) {
dt = diff dt = diff
} }
clk.SetTime(t1) clk.SetTime(t1)
saw.Observe(1) saw.Set(1)
expectedCount := int64(dt / samplingPeriod) expectedCount := int64(dt / samplingPeriod)
actualCount, err := getHistogramCount(registry, samplesHistName) actualCount, err := getHistogramCount(registry, samplesHistName)
if err != nil && !(err == errMetricNotFound && expectedCount == 0) { if err != nil && !(err == errMetricNotFound && expectedCount == 0) {

View File

@ -139,8 +139,8 @@ func (ft *fightTest) createController(invert bool, i int) {
FlowcontrolClient: fcIfc, FlowcontrolClient: fcIfc,
ServerConcurrencyLimit: 200, // server concurrency limit ServerConcurrencyLimit: 200, // server concurrency limit
RequestWaitLimit: time.Minute / 4, // request wait limit RequestWaitLimit: time.Minute / 4, // request wait limit
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec,
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqtesting.NewNoRestraintFactory(), QueueSetFactory: fqtesting.NewNoRestraintFactory(),
}) })
ft.ctlrs[invert][i] = ctlr ft.ctlrs[invert][i] = ctlr