Merge pull request #110101 from MikeSpreitzer/rename-observers

Give apf metrics abstractions more familiar names
This commit is contained in:
Kubernetes Prow Robot 2022-05-24 07:26:06 -07:00 committed by GitHub
commit c3d550d4e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 253 additions and 218 deletions

View File

@ -61,13 +61,13 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
// requestWatermark is used to track maximal numbers of requests in a particular phase of handling
type requestWatermark struct {
phase string
readOnlyObserver, mutatingObserver fcmetrics.RatioedChangeObserver
readOnlyObserver, mutatingObserver fcmetrics.RatioedGauge
lock sync.Mutex
readOnlyWatermark, mutatingWatermark int
}
func (w *requestWatermark) recordMutating(mutatingVal int) {
w.mutatingObserver.Observe(float64(mutatingVal))
w.mutatingObserver.Set(float64(mutatingVal))
w.lock.Lock()
defer w.lock.Unlock()
@ -78,7 +78,7 @@ func (w *requestWatermark) recordMutating(mutatingVal int) {
}
func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
w.readOnlyObserver.Observe(float64(readOnlyVal))
w.readOnlyObserver.Set(float64(readOnlyVal))
w.lock.Lock()
defer w.lock.Unlock()
@ -91,8 +91,8 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
// watermark tracks requests being executed (not waiting in a queue)
var watermark = &requestWatermark{
phase: metrics.ExecutingPhase,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting,
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting,
mutatingObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{metrics.MutatingKind}).RequestsExecuting,
}
// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.

View File

@ -47,8 +47,8 @@ type PriorityAndFairnessClassification struct {
// waitingMark tracks requests waiting rather than being executed
var waitingMark = &requestWatermark{
phase: epmetrics.WaitingPhase,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting,
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting,
mutatingObserver: fcmetrics.ReadWriteConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting,
}
var atomicMutatingExecuting, atomicReadOnlyExecuting int32

View File

@ -99,11 +99,11 @@ type RequestDigest struct {
// this type and cfgMeal follow the convention that the suffix
// "Locked" means that the caller must hold the configController lock.
type configController struct {
name string // varies in tests of fighting controllers
clock clock.PassiveClock
queueSetFactory fq.QueueSetFactory
reqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator
execSeatsObsGenerator metrics.RatioedChangeObserverGenerator
name string // varies in tests of fighting controllers
clock clock.PassiveClock
queueSetFactory fq.QueueSetFactory
reqsGaugePairVec metrics.RatioedGaugePairVec
execSeatsGaugeVec metrics.RatioedGaugeVec
// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager
asFieldManager string
@ -193,10 +193,10 @@ type priorityLevelState struct {
numPending int
// Observers tracking number of requests waiting, executing
reqsObsPair metrics.RatioedChangeObserverPair
reqsGaugePair metrics.RatioedGaugePair
// Observer of number of seats occupied throughout execution
execSeatsObs metrics.RatioedChangeObserver
execSeatsObs metrics.RatioedGauge
}
// NewTestableController is extra flexible to facilitate testing
@ -205,8 +205,8 @@ func newTestableController(config TestableConfig) *configController {
name: config.Name,
clock: config.Clock,
queueSetFactory: config.QueueSetFactory,
reqsObsPairGenerator: config.ReqsObsPairGenerator,
execSeatsObsGenerator: config.ExecSeatsObsGenerator,
reqsGaugePairVec: config.ReqsGaugePairVec,
execSeatsGaugeVec: config.ExecSeatsGaugeVec,
asFieldManager: config.AsFieldManager,
foundToDangling: config.FoundToDangling,
serverConcurrencyLimit: config.ServerConcurrencyLimit,
@ -292,7 +292,7 @@ func newTestableController(config TestableConfig) *configController {
}
// MaintainObservations keeps the observers from
// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling
// metrics.PriorityLevelConcurrencyPairVec from falling
// too far behind
func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) {
wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh)
@ -539,9 +539,9 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
state := meal.cfgCtlr.priorityLevelStates[pl.Name]
if state == nil {
labelValues := []string{pl.Name}
state = &priorityLevelState{reqsObsPair: meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsObsGenerator.Generate(0, 1, labelValues)}
state = &priorityLevelState{reqsGaugePair: meal.cfgCtlr.reqsGaugePairVec.NewForLabelValuesSafe(1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)}
}
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsObsPair, state.execSeatsObs)
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs)
if err != nil {
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
continue
@ -645,7 +645,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
}
}
var err error
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsObsPair, plState.execSeatsObs)
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs)
if err != nil {
// This can not happen because queueSetCompleterForPL already approved this config
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
@ -694,7 +694,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
// given priority level configuration. Returns nil if that config
// does not call for limiting. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package.
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
return nil, errors.New("broken union structure at the top")
}
@ -769,19 +769,19 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
labelValues := []string{proto.Name}
reqsObsPair := meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues)
execSeatsObs := meal.cfgCtlr.execSeatsObsGenerator.Generate(0, 1, labelValues)
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsObsPair, execSeatsObs)
reqsGaugePair := meal.cfgCtlr.reqsGaugePairVec.NewForLabelValuesSafe(1, 1, labelValues)
execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs)
if err != nil {
// This can not happen because proto is one of the mandatory
// objects and these are not erroneous
panic(err)
}
meal.newPLStates[proto.Name] = &priorityLevelState{
pl: proto,
qsCompleter: qsCompleter,
reqsObsPair: reqsObsPair,
execSeatsObs: execSeatsObs,
pl: proto,
qsCompleter: qsCompleter,
reqsGaugePair: reqsGaugePair,
execSeatsObs: execSeatsObs,
}
if proto.Spec.Limited != nil {
meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares)

View File

@ -100,8 +100,8 @@ func New(
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: serverConcurrencyLimit,
RequestWaitLimit: requestWaitLimit,
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk),
})
}
@ -140,11 +140,11 @@ type TestableConfig struct {
// RequestWaitLimit configured on the server
RequestWaitLimit time.Duration
// ObsPairGenerator for metrics about requests
ReqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator
// GaugePairVec for metrics about requests
ReqsGaugePairVec metrics.RatioedGaugePairVec
// RatioedChangeObserverPairGenerator for metrics about seats occupied by all phases of execution
ExecSeatsObsGenerator metrics.RatioedChangeObserverGenerator
// RatioedGaugePairVec for metrics about seats occupied by all phases of execution
ExecSeatsGaugeVec metrics.RatioedGaugeVec
// QueueSetFactory for the queuing implementation
QueueSetFactory fq.QueueSetFactory

View File

@ -105,7 +105,7 @@ type ctlrTestRequest struct {
descr1, descr2 interface{}
}
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedChangeObserverPair, eso metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
}
@ -261,8 +261,8 @@ func TestConfigConsumer(t *testing.T) {
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 100, // server concurrency limit
RequestWaitLimit: time.Minute, // request wait limit
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: cts,
})
cts.cfgCtlr = ctlr
@ -393,8 +393,8 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) {
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 100,
RequestWaitLimit: time.Minute,
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: cts,
})

View File

@ -21,7 +21,6 @@ import (
"sync"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/utils/clock"
)
@ -30,7 +29,8 @@ import (
// Integrator is created, and ends at the latest operation on the
// Integrator.
type Integrator interface {
metrics.ChangeObserver
Set(float64)
Add(float64)
GetResults() IntegratorResults
@ -69,7 +69,7 @@ func NewIntegrator(clock clock.PassiveClock) Integrator {
}
}
func (igr *integrator) Observe(x float64) {
func (igr *integrator) Set(x float64) {
igr.Lock()
igr.setLocked(x)
igr.Unlock()

View File

@ -38,7 +38,7 @@ func TestIntegrator(t *testing.T) {
if !results.Equal(&rToo) {
t.Errorf("expected %#+v, got %#+v", results, rToo)
}
igr.Observe(2)
igr.Set(2)
results = igr.GetResults()
if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) {
t.Errorf("expected %#+v, got %#+v", e, results)

View File

@ -32,10 +32,10 @@ import (
// before committing to a concurrency allotment for the second.
type QueueSetFactory interface {
// BeginConstruction does the first phase of creating a QueueSet.
// The RatioedChangeObserverPair observes number of requests,
// The RatioedGaugePair observes number of requests,
// execution covering just the regular phase.
// The RatioedChangeObserver observes number of seats occupied through all phases of execution.
BeginConstruction(QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (QueueSetCompleter, error)
// The RatioedGauge observes number of seats occupied through all phases of execution.
BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (QueueSetCompleter, error)
}
// QueueSetCompleter finishes the two-step process of creating or

View File

@ -60,12 +60,12 @@ type promiseFactoryFactory func(*queueSet) promiseFactory
// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
// the fields `factory` and `theSet` is non-nil.
type queueSetCompleter struct {
factory *queueSetFactory
reqsObsPair metrics.RatioedChangeObserverPair
execSeatsObs metrics.RatioedChangeObserver
theSet *queueSet
qCfg fq.QueuingConfig
dealer *shufflesharding.Dealer
factory *queueSetFactory
reqsGaugePair metrics.RatioedGaugePair
execSeatsGauge metrics.RatioedGauge
theSet *queueSet
qCfg fq.QueuingConfig
dealer *shufflesharding.Dealer
}
// queueSet implements the Fair Queuing for Server Requests technique
@ -81,9 +81,9 @@ type queueSet struct {
clock eventclock.Interface
estimatedServiceDuration time.Duration
reqsObsPair metrics.RatioedChangeObserverPair // .RequestsExecuting covers regular phase only
reqsGaugePair metrics.RatioedGaugePair // .RequestsExecuting covers regular phase only
execSeatsObs metrics.RatioedChangeObserver // for all phases of execution
execSeatsGauge metrics.RatioedGauge // for all phases of execution
promiseFactory promiseFactory
@ -148,17 +148,17 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr
}
}
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
dealer, err := checkConfig(qCfg)
if err != nil {
return nil, err
}
return &queueSetCompleter{
factory: qsf,
reqsObsPair: reqsObsPair,
execSeatsObs: execSeatsObs,
qCfg: qCfg,
dealer: dealer}, nil
factory: qsf,
reqsGaugePair: reqsGaugePair,
execSeatsGauge: execSeatsGauge,
qCfg: qCfg,
dealer: dealer}, nil
}
// checkConfig returns a non-nil Dealer if the config is valid and
@ -181,8 +181,8 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
qs = &queueSet{
clock: qsc.factory.clock,
estimatedServiceDuration: 3 * time.Millisecond,
reqsObsPair: qsc.reqsObsPair,
execSeatsObs: qsc.execSeatsObs,
reqsGaugePair: qsc.reqsGaugePair,
execSeatsGauge: qsc.execSeatsGauge,
qCfg: qsc.qCfg,
currentR: 0,
lastRealTime: qsc.factory.clock.Now(),
@ -243,9 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
if qll < 1 {
qll = 1
}
qs.reqsObsPair.RequestsWaiting.SetDenominator(float64(qll))
qs.reqsObsPair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.execSeatsObs.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll))
qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.dispatchAsMuchAsPossibleLocked()
}
@ -398,7 +398,7 @@ func (req *request) wait() (bool, bool) {
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
qs.reqsObsPair.RequestsWaiting.Add(-1)
qs.reqsGaugePair.RequestsWaiting.Add(-1)
}
return false, qs.isIdleLocked()
}
@ -609,7 +609,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
// remove timed out requests from queue
if timeoutCount > 0 {
qs.totRequestsWaiting -= timeoutCount
qs.reqsObsPair.RequestsWaiting.Add(float64(-timeoutCount))
qs.reqsGaugePair.RequestsWaiting.Add(float64(-timeoutCount))
}
}
@ -646,7 +646,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
qs.totRequestsWaiting++
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
request.NoteQueued(true)
qs.reqsObsPair.RequestsWaiting.Add(1)
qs.reqsGaugePair.RequestsWaiting.Add(1)
}
// dispatchAsMuchAsPossibleLocked does as many dispatches as possible now.
@ -675,8 +675,8 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
qs.totSeatsInUse += req.MaxSeats()
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
qs.reqsObsPair.RequestsExecuting.Add(1)
qs.execSeatsObs.Add(float64(req.MaxSeats()))
qs.reqsGaugePair.RequestsExecuting.Add(1)
qs.execSeatsGauge.Add(float64(req.MaxSeats()))
klogV := klog.V(5)
if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting)
@ -700,7 +700,7 @@ func (qs *queueSet) dispatchLocked() bool {
qs.totRequestsWaiting--
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
request.NoteQueued(false)
qs.reqsObsPair.RequestsWaiting.Add(-1)
qs.reqsGaugePair.RequestsWaiting.Add(-1)
defer qs.boundNextDispatchLocked(queue)
if !request.decision.Set(decisionExecute) {
return true
@ -717,8 +717,8 @@ func (qs *queueSet) dispatchLocked() bool {
queue.seatsInUse += request.MaxSeats()
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
qs.reqsObsPair.RequestsExecuting.Add(1)
qs.execSeatsObs.Add(float64(request.MaxSeats()))
qs.reqsGaugePair.RequestsExecuting.Add(1)
qs.execSeatsGauge.Add(float64(request.MaxSeats()))
klogV := klog.V(6)
if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
@ -862,7 +862,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
now := qs.clock.Now()
qs.totRequestsExecuting--
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
qs.reqsObsPair.RequestsExecuting.Add(-1)
qs.reqsGaugePair.RequestsExecuting.Add(-1)
actualServiceDuration := now.Sub(r.startTime)
@ -874,7 +874,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
qs.totSeatsInUse -= r.MaxSeats()
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats())
qs.execSeatsObs.Add(-float64(r.MaxSeats()))
qs.execSeatsGauge.Add(-float64(r.MaxSeats()))
if r.queue != nil {
r.queue.seatsInUse -= r.MaxSeats()
}
@ -989,9 +989,9 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
}
func (qs *queueSet) UpdateObservations() {
qs.reqsObsPair.RequestsWaiting.Add(0)
qs.reqsObsPair.RequestsExecuting.Add(0)
qs.execSeatsObs.Add(0)
qs.reqsGaugePair.RequestsWaiting.Add(0)
qs.reqsGaugePair.RequestsExecuting.Add(0)
qs.execSeatsGauge.Add(0)
}
func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {

View File

@ -445,7 +445,7 @@ func TestNoRestraint(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk), newExecSeatsObserver(clk))
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -481,7 +481,7 @@ func TestBaseline(t *testing.T) {
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -550,7 +550,7 @@ func TestSeparations(t *testing.T) {
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -589,7 +589,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -626,7 +626,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -662,7 +662,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -702,7 +702,7 @@ func TestSeatSecondsRollover(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 40 * Quarter,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -740,7 +740,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -777,7 +777,7 @@ func TestDifferentWidths(t *testing.T) {
HandSize: 7,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -813,7 +813,7 @@ func TestTooWide(t *testing.T) {
HandSize: 7,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -874,7 +874,7 @@ func TestWindup(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -909,7 +909,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
Name: "TestDifferentFlowsWithoutQueuing",
DesiredNumQueues: 0,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -945,7 +945,7 @@ func TestTimeout(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 0,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -996,7 +996,7 @@ func TestContextCancel(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 15 * time.Second,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -1102,7 +1102,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
DesiredNumQueues: 0,
RequestWaitLimit: 15 * time.Second,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
if err != nil {
t.Fatal(err)
}
@ -1357,8 +1357,8 @@ func TestFinishRequestLocked(t *testing.T) {
qs := &queueSet{
clock: clk,
estimatedServiceDuration: time.Second,
reqsObsPair: newObserverPair(clk),
execSeatsObs: newExecSeatsObserver(clk),
reqsGaugePair: newGaugePair(clk),
execSeatsGauge: newExecSeatsGauge(clk),
}
queue := &queue{
requests: newRequestFIFO(),
@ -1461,10 +1461,10 @@ func newFIFO(requests ...*request) fifo {
return l
}
func newObserverPair(clk clock.PassiveClock) metrics.RatioedChangeObserverPair {
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
func newGaugePair(clk clock.PassiveClock) metrics.RatioedGaugePair {
return metrics.PriorityLevelConcurrencyPairVec.NewForLabelValuesSafe(1, 1, []string{"test"})
}
func newExecSeatsObserver(clk clock.PassiveClock) metrics.RatioedChangeObserver {
return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(0, 1, []string{"test"})
func newExecSeatsGauge(clk clock.PassiveClock) metrics.RatioedGauge {
return metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, []string{"test"})
}

View File

@ -40,7 +40,7 @@ type noRestraint struct{}
type noRestraintRequest struct{}
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
return noRestraintCompleter{}, nil
}

View File

@ -57,7 +57,7 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration
QueueLengthLimit: 5}
}
labelVals := []string{"test"}
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, labelVals), metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(0, 1, labelVals))
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyPairVec.NewForLabelValuesSafe(1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals))
if err != nil {
panic(err)
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
// Gauge is the methods of a gauge that are used by instrumented code.
type Gauge interface {
Set(float64)
Inc()
Dec()
Add(float64)
SetToCurrentTime()
}
// RatioedGauge tracks ratios.
// The numerator is set/changed through the Gauge methods,
// and the denominator can be updated through the SetDenominator method.
// A ratio is tracked whenever the numerator or denominator is set/changed.
type RatioedGauge interface {
Gauge
// SetDenominator sets the denominator to use until it is changed again
SetDenominator(float64)
}
// RatioedGaugeVec creates related observers that are
// differentiated by a series of label values
type RatioedGaugeVec interface {
// NewForLabelValuesSafe makes a new vector member for the given tuple of label values,
// initialized with the given numerator and denominator.
// Unlike the usual Vec WithLabelValues method, this is intended to be called only
// once per vector member (at the start of its lifecycle).
// The "Safe" part is saying that the returned object will function properly after metric registration
// even if this method is called before registration.
NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge
}
//////////////////////////////// Pairs ////////////////////////////////
//
// API Priority and Fairness tends to use RatioedGaugeVec members in pairs,
// one for requests waiting in a queue and one for requests being executed.
// The following definitions are a convenience layer that adds support for that
// particular pattern of usage.
// RatioedGaugePair is a corresponding pair of gauges, one for the
// number of requests waiting in queue(s) and one for the number of
// requests being executed.
type RatioedGaugePair struct {
// RequestsWaiting is given observations of the number of currently queued requests
RequestsWaiting RatioedGauge
// RequestsExecuting is given observations of the number of requests currently executing
RequestsExecuting RatioedGauge
}
// RatioedGaugePairVec generates pairs
type RatioedGaugePairVec interface {
// NewForLabelValuesSafe makes a new vector member for the given tuple of label values,
// initialized with the given denominators and zeros for numerators.
// Unlike the usual Vec WithLabelValues method, this is intended to be called only
// once per vector member (at the start of its lifecycle).
// The "Safe" part is saying that the returned object will function properly after metric registration
// even if this method is called before registration.
NewForLabelValuesSafe(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedGaugePair
}

View File

@ -105,8 +105,8 @@ var (
},
[]string{priorityLevel, flowSchema},
)
// PriorityLevelExecutionSeatsObserverGenerator creates observers of seats occupied throughout execution for priority levels
PriorityLevelExecutionSeatsObserverGenerator = NewSampleAndWaterMarkHistogramsGenerator(clock.RealClock{}, time.Millisecond,
// PriorityLevelExecutionSeatsGaugeVec creates observers of seats occupied throughout execution for priority levels
PriorityLevelExecutionSeatsGaugeVec = NewSampleAndWaterMarkHistogramsVec(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -127,8 +127,8 @@ var (
},
[]string{priorityLevel},
)
// PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels
PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
// PriorityLevelConcurrencyPairVec creates pairs that observe concurrency for priority levels
PriorityLevelConcurrencyPairVec = NewSampleAndWaterMarkHistogramsPairVec(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -147,8 +147,8 @@ var (
},
[]string{priorityLevel},
)
// ReadWriteConcurrencyObserverPairGenerator creates pairs that observe concurrency broken down by mutating vs readonly
ReadWriteConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
// ReadWriteConcurrencyPairVec creates pairs that observe concurrency broken down by mutating vs readonly
ReadWriteConcurrencyPairVec = NewSampleAndWaterMarkHistogramsPairVec(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -355,9 +355,9 @@ var (
apiserverWorkEstimatedSeats,
apiserverDispatchWithNoAccommodation,
}.
Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...).
Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...).
Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...)
Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...).
Append(PriorityLevelConcurrencyPairVec.metrics()...).
Append(ReadWriteConcurrencyPairVec.metrics()...)
)
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel

View File

@ -1,65 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
// Observer is something that can be given numeric observations.
type Observer interface {
// Observe takes an observation
Observe(float64)
}
// ChangeObserver extends Observer with the ability to take
// an observation that is relative to the previous observation.
type ChangeObserver interface {
Observer
// Observe a new value that differs by the given amount from the previous observation.
Add(float64)
}
// RatioedChangeObserver tracks ratios.
// The numerator is set/changed through the ChangeObserver methods,
// and the denominator can be updated through the SetDenominator method.
// A ratio is tracked whenever the numerator is set/changed.
type RatioedChangeObserver interface {
ChangeObserver
// SetDenominator sets the denominator to use until it is changed again
SetDenominator(float64)
}
// RatioedChangeObserverGenerator creates related observers that are
// differentiated by a series of label values
type RatioedChangeObserverGenerator interface {
Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver
}
// RatioedChangeObserverPair is a corresponding pair of observers, one for the
// number of requests waiting in queue(s) and one for the number of
// requests being executed
type RatioedChangeObserverPair struct {
// RequestsWaiting is given observations of the number of currently queued requests
RequestsWaiting RatioedChangeObserver
// RequestsExecuting is given observations of the number of requests currently executing
RequestsExecuting RatioedChangeObserver
}
// RatioedChangeObserverPairGenerator generates pairs
type RatioedChangeObserverPairGenerator interface {
Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair
}

View File

@ -34,44 +34,44 @@ const (
labelValueExecuting = "executing"
)
// SampleAndWaterMarkPairGenerator makes pairs of RatioedChangeObservers that
// SampleAndWaterMarkPairVec makes pairs of RatioedGauges that
// track samples and watermarks.
type SampleAndWaterMarkPairGenerator struct {
urGenerator SampleAndWaterMarkObserverGenerator
type SampleAndWaterMarkPairVec struct {
urVec SampleAndWaterMarkObserverVec
}
var _ RatioedChangeObserverPairGenerator = SampleAndWaterMarkPairGenerator{}
var _ RatioedGaugePairVec = SampleAndWaterMarkPairVec{}
// NewSampleAndWaterMarkHistogramsPairGenerator makes a new pair generator
func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairGenerator {
return SampleAndWaterMarkPairGenerator{
urGenerator: NewSampleAndWaterMarkHistogramsGenerator(clock, samplePeriod, sampleOpts, waterMarkOpts, append([]string{labelNamePhase}, labelNames...)),
// NewSampleAndWaterMarkHistogramsPairVec makes a new pair generator
func NewSampleAndWaterMarkHistogramsPairVec(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairVec {
return SampleAndWaterMarkPairVec{
urVec: NewSampleAndWaterMarkHistogramsVec(clock, samplePeriod, sampleOpts, waterMarkOpts, append([]string{labelNamePhase}, labelNames...)),
}
}
// Generate makes a new pair
func (spg SampleAndWaterMarkPairGenerator) Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair {
return RatioedChangeObserverPair{
RequestsWaiting: spg.urGenerator.Generate(0, initialWaitingDenominator, append([]string{labelValueWaiting}, labelValues...)),
RequestsExecuting: spg.urGenerator.Generate(0, initialExecutingDenominator, append([]string{labelValueExecuting}, labelValues...)),
// NewForLabelValuesSafe makes a new pair
func (spg SampleAndWaterMarkPairVec) NewForLabelValuesSafe(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedGaugePair {
return RatioedGaugePair{
RequestsWaiting: spg.urVec.NewForLabelValuesSafe(0, initialWaitingDenominator, append([]string{labelValueWaiting}, labelValues...)),
RequestsExecuting: spg.urVec.NewForLabelValuesSafe(0, initialExecutingDenominator, append([]string{labelValueExecuting}, labelValues...)),
}
}
func (spg SampleAndWaterMarkPairGenerator) metrics() Registerables {
return spg.urGenerator.metrics()
func (spg SampleAndWaterMarkPairVec) metrics() Registerables {
return spg.urVec.metrics()
}
// SampleAndWaterMarkObserverGenerator creates RatioedChangeObservers that
// SampleAndWaterMarkObserverVec creates RatioedGauges that
// populate histograms of samples and low- and high-water-marks. The
// generator has a samplePeriod, and the histograms get an observation
// every samplePeriod. The sampling windows are quantized based on
// the monotonic rather than wall-clock times. The `t0` field is
// there so to provide a baseline for monotonic clock differences.
type SampleAndWaterMarkObserverGenerator struct {
*sampleAndWaterMarkObserverGenerator
type SampleAndWaterMarkObserverVec struct {
*sampleAndWaterMarkObserverVec
}
type sampleAndWaterMarkObserverGenerator struct {
type sampleAndWaterMarkObserverVec struct {
clock clock.PassiveClock
t0 time.Time
samplePeriod time.Duration
@ -79,12 +79,12 @@ type sampleAndWaterMarkObserverGenerator struct {
waterMarks *compbasemetrics.HistogramVec
}
var _ RatioedChangeObserverGenerator = SampleAndWaterMarkObserverGenerator{}
var _ RatioedGaugeVec = SampleAndWaterMarkObserverVec{}
// NewSampleAndWaterMarkHistogramsGenerator makes a new one
func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator {
return SampleAndWaterMarkObserverGenerator{
&sampleAndWaterMarkObserverGenerator{
// NewSampleAndWaterMarkHistogramsVec makes a new one
func NewSampleAndWaterMarkHistogramsVec(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverVec {
return SampleAndWaterMarkObserverVec{
&sampleAndWaterMarkObserverVec{
clock: clock,
t0: clock.Now(),
samplePeriod: samplePeriod,
@ -93,20 +93,20 @@ func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePe
}}
}
func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 {
func (swg *sampleAndWaterMarkObserverVec) quantize(when time.Time) int64 {
return int64(when.Sub(swg.t0) / swg.samplePeriod)
}
// Generate makes a new RatioedChangeObserver
func (swg *sampleAndWaterMarkObserverGenerator) Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver {
// NewForLabelValuesSafe makes a new RatioedGauge
func (swg *sampleAndWaterMarkObserverVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge {
ratio := initialNumerator / initialDenominator
when := swg.clock.Now()
return &sampleAndWaterMarkHistograms{
sampleAndWaterMarkObserverGenerator: swg,
labelValues: labelValues,
loLabelValues: append([]string{labelValueLo}, labelValues...),
hiLabelValues: append([]string{labelValueHi}, labelValues...),
denominator: initialDenominator,
sampleAndWaterMarkObserverVec: swg,
labelValues: labelValues,
loLabelValues: append([]string{labelValueLo}, labelValues...),
hiLabelValues: append([]string{labelValueHi}, labelValues...),
denominator: initialDenominator,
sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{
lastSet: when,
lastSetInt: swg.quantize(when),
@ -117,12 +117,12 @@ func (swg *sampleAndWaterMarkObserverGenerator) Generate(initialNumerator, initi
}}
}
func (swg *sampleAndWaterMarkObserverGenerator) metrics() Registerables {
func (swg *sampleAndWaterMarkObserverVec) metrics() Registerables {
return Registerables{swg.samples, swg.waterMarks}
}
type sampleAndWaterMarkHistograms struct {
*sampleAndWaterMarkObserverGenerator
*sampleAndWaterMarkObserverVec
labelValues []string
loLabelValues, hiLabelValues []string
@ -139,17 +139,39 @@ type sampleAndWaterMarkAccumulator struct {
loRatio, hiRatio float64
}
var _ RatioedChangeObserver = (*sampleAndWaterMarkHistograms)(nil)
var _ RatioedGauge = (*sampleAndWaterMarkHistograms)(nil)
func (saw *sampleAndWaterMarkHistograms) Set(numerator float64) {
saw.innerSet(func() {
saw.numerator = numerator
})
}
func (saw *sampleAndWaterMarkHistograms) Add(deltaNumerator float64) {
saw.innerSet(func() {
saw.numerator += deltaNumerator
})
}
func (saw *sampleAndWaterMarkHistograms) Observe(numerator float64) {
func (saw *sampleAndWaterMarkHistograms) Sub(deltaNumerator float64) {
saw.innerSet(func() {
saw.numerator = numerator
saw.numerator -= deltaNumerator
})
}
func (saw *sampleAndWaterMarkHistograms) Inc() {
saw.innerSet(func() {
saw.numerator += 1
})
}
func (saw *sampleAndWaterMarkHistograms) Dec() {
saw.innerSet(func() {
saw.numerator -= 1
})
}
func (saw *sampleAndWaterMarkHistograms) SetToCurrentTime() {
saw.innerSet(func() {
saw.numerator = float64(saw.clock.Now().Sub(time.Unix(0, 0)))
})
}

View File

@ -56,11 +56,11 @@ func TestSampler(t *testing.T) {
t0 := time.Now()
clk := testclock.NewFakePassiveClock(t0)
buckets := []float64{0, 1}
gen := NewSampleAndWaterMarkHistogramsGenerator(clk, samplingPeriod,
gen := NewSampleAndWaterMarkHistogramsVec(clk, samplingPeriod,
&compbasemetrics.HistogramOpts{Name: samplesHistName, Buckets: buckets},
&compbasemetrics.HistogramOpts{Name: "marks", Buckets: buckets},
[]string{})
saw := gen.Generate(0, 1, []string{})
saw := gen.NewForLabelValuesSafe(0, 1, []string{})
toRegister := gen.metrics()
registry := compbasemetrics.NewKubeRegistry()
for _, reg := range toRegister {
@ -84,7 +84,7 @@ func TestSampler(t *testing.T) {
dt = diff
}
clk.SetTime(t1)
saw.Observe(1)
saw.Set(1)
expectedCount := int64(dt / samplingPeriod)
actualCount, err := getHistogramCount(registry, samplesHistName)
if err != nil && !(err == errMetricNotFound && expectedCount == 0) {

View File

@ -139,8 +139,8 @@ func (ft *fightTest) createController(invert bool, i int) {
FlowcontrolClient: fcIfc,
ServerConcurrencyLimit: 200, // server concurrency limit
RequestWaitLimit: time.Minute / 4, // request wait limit
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
ReqsGaugePairVec: metrics.PriorityLevelConcurrencyPairVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqtesting.NewNoRestraintFactory(),
})
ft.ctlrs[invert][i] = ctlr