From ab64e852023965fd8873abcd50ff09cf79814d11 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Mon, 15 Nov 2021 14:59:30 -0500 Subject: [PATCH] Factored TimedObserver into less surprising pieces --- .../pkg/server/filters/maxinflight.go | 10 +-- .../pkg/util/flowcontrol/apf_controller.go | 10 +-- .../pkg/util/flowcontrol/apf_filter.go | 6 +- .../pkg/util/flowcontrol/controller_test.go | 2 +- .../flowcontrol/fairqueuing/integrator.go | 10 +-- .../fairqueuing/integrator_test.go | 2 +- .../util/flowcontrol/fairqueuing/interface.go | 6 +- .../fairqueuing/queueset/queueset.go | 16 ++-- .../fairqueuing/queueset/queueset_test.go | 4 +- .../fairqueuing/testing/no-restraint.go | 2 +- .../pkg/util/flowcontrol/metrics/observer.go | 65 +++++++++++++++ .../metrics/sample_and_watermark.go | 82 +++++++++---------- .../metrics/sample_and_watermark_test.go | 2 +- .../flowcontrol/metrics/timed_observer.go | 52 ------------ 14 files changed, 139 insertions(+), 130 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/observer.go delete mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timed_observer.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index 70c8d8b855c..71d8a534bbc 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -61,13 +61,13 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) { // requestWatermark is used to track maximal numbers of requests in a particular phase of handling type requestWatermark struct { phase string - readOnlyObserver, mutatingObserver fcmetrics.TimedObserver + readOnlyObserver, mutatingObserver fcmetrics.RatioedChangeObserver lock sync.Mutex readOnlyWatermark, mutatingWatermark int } func (w *requestWatermark) recordMutating(mutatingVal int) { - w.mutatingObserver.Set(float64(mutatingVal)) + w.mutatingObserver.Observe(float64(mutatingVal)) w.lock.Lock() defer w.lock.Unlock() @@ -78,7 +78,7 @@ func (w *requestWatermark) recordMutating(mutatingVal int) { } func (w *requestWatermark) recordReadOnly(readOnlyVal int) { - w.readOnlyObserver.Set(float64(readOnlyVal)) + w.readOnlyObserver.Observe(float64(readOnlyVal)) w.lock.Lock() defer w.lock.Unlock() @@ -132,11 +132,11 @@ func WithMaxInFlightLimit( var mutatingChan chan bool if nonMutatingLimit != 0 { nonMutatingChan = make(chan bool, nonMutatingLimit) - watermark.readOnlyObserver.SetX1(float64(nonMutatingLimit)) + watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit)) } if mutatingLimit != 0 { mutatingChan = make(chan bool, mutatingLimit) - watermark.mutatingObserver.SetX1(float64(mutatingLimit)) + watermark.mutatingObserver.SetDenominator(float64(mutatingLimit)) } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 71961f007f7..3b2b4a1738e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -102,8 +102,8 @@ type configController struct { name string // varies in tests of fighting controllers clock clock.PassiveClock queueSetFactory fq.QueueSetFactory - reqsObsPairGenerator metrics.TimedObserverPairGenerator - execSeatsObsGenerator metrics.TimedObserverGenerator + reqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator + execSeatsObsGenerator metrics.RatioedChangeObserverGenerator // How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager asFieldManager string @@ -193,10 +193,10 @@ type priorityLevelState struct { numPending int // Observers tracking number of requests waiting, executing - reqsObsPair metrics.TimedObserverPair + reqsObsPair metrics.RatioedChangeObserverPair // Observer of number of seats occupied throughout execution - execSeatsObs metrics.TimedObserver + execSeatsObs metrics.RatioedChangeObserver } // NewTestableController is extra flexible to facilitate testing @@ -693,7 +693,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { // given priority level configuration. Returns nil if that config // does not call for limiting. Returns nil and an error if the given // object is malformed in a way that is a problem for this package. -func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) { +func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { return nil, errors.New("broken union structure at the top") } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index b16d0ae463c..67149fd836f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -135,10 +135,10 @@ type TestableConfig struct { RequestWaitLimit time.Duration // ObsPairGenerator for metrics about requests - ReqsObsPairGenerator metrics.TimedObserverPairGenerator + ReqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator - // TimedObserverPairGenerator for metrics about seats occupied by all phases of execution - ExecSeatsObsGenerator metrics.TimedObserverGenerator + // RatioedChangeObserverPairGenerator for metrics about seats occupied by all phases of execution + ExecSeatsObsGenerator metrics.RatioedChangeObserverGenerator // QueueSetFactory for the queuing implementation QueueSetFactory fq.QueueSetFactory diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index c14b74d70ee..9a40fb71929 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -105,7 +105,7 @@ type ctlrTestRequest struct { descr1, descr2 interface{} } -func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.TimedObserverPair, eso metrics.TimedObserver) (fq.QueueSetCompleter, error) { +func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedChangeObserverPair, eso metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { return ctlrTestQueueSetCompleter{cts, nil, qc}, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go index 8499f0a7430..800fa765fb6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go @@ -28,10 +28,9 @@ import ( // Integrator computes the moments of some variable X over time as // read from a particular clock. The integrals start when the // Integrator is created, and ends at the latest operation on the -// Integrator. As a `metrics.TimedObserver` this fixes X1=1 and -// ignores attempts to change X1. +// Integrator. type Integrator interface { - metrics.TimedObserver + metrics.ChangeObserver GetResults() IntegratorResults @@ -70,10 +69,7 @@ func NewIntegrator(clock clock.PassiveClock) Integrator { } } -func (igr *integrator) SetX1(x1 float64) { -} - -func (igr *integrator) Set(x float64) { +func (igr *integrator) Observe(x float64) { igr.Lock() igr.setLocked(x) igr.Unlock() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go index e377ea2da52..698f9abe3d8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go @@ -38,7 +38,7 @@ func TestIntegrator(t *testing.T) { if !results.Equal(&rToo) { t.Errorf("expected %#+v, got %#+v", results, rToo) } - igr.Set(2) + igr.Observe(2) results = igr.GetResults() if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) { t.Errorf("expected %#+v, got %#+v", e, results) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 23215d084a3..cc772d8b7dc 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -32,10 +32,10 @@ import ( // before committing to a concurrency allotment for the second. type QueueSetFactory interface { // BeginConstruction does the first phase of creating a QueueSet. - // The TimedObserverPair observes number of requests, + // The RatioedChangeObserverPair observes number of requests, // execution covering just the regular phase. - // The TimedObserver observes number of seats occupied through all phases of execution. - BeginConstruction(QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (QueueSetCompleter, error) + // The RatioedChangeObserver observes number of seats occupied through all phases of execution. + BeginConstruction(QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (QueueSetCompleter, error) } // QueueSetCompleter finishes the two-step process of creating or diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 0f3a8138b88..440975faf3b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -61,8 +61,8 @@ type promiseFactoryFactory func(*queueSet) promiseFactory // the fields `factory` and `theSet` is non-nil. type queueSetCompleter struct { factory *queueSetFactory - reqsObsPair metrics.TimedObserverPair - execSeatsObs metrics.TimedObserver + reqsObsPair metrics.RatioedChangeObserverPair + execSeatsObs metrics.RatioedChangeObserver theSet *queueSet qCfg fq.QueuingConfig dealer *shufflesharding.Dealer @@ -81,9 +81,9 @@ type queueSet struct { clock eventclock.Interface estimatedServiceDuration time.Duration - reqsObsPair metrics.TimedObserverPair // .RequestsExecuting covers regular phase only + reqsObsPair metrics.RatioedChangeObserverPair // .RequestsExecuting covers regular phase only - execSeatsObs metrics.TimedObserver // for all phases of execution + execSeatsObs metrics.RatioedChangeObserver // for all phases of execution promiseFactory promiseFactory @@ -148,7 +148,7 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr } } -func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) { +func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { dealer, err := checkConfig(qCfg) if err != nil { return nil, err @@ -243,9 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, if qll < 1 { qll = 1 } - qs.reqsObsPair.RequestsWaiting.SetX1(float64(qll)) - qs.reqsObsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit)) - qs.execSeatsObs.SetX1(float64(dCfg.ConcurrencyLimit)) + qs.reqsObsPair.RequestsWaiting.SetDenominator(float64(qll)) + qs.reqsObsPair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit)) + qs.execSeatsObs.SetDenominator(float64(dCfg.ConcurrencyLimit)) qs.dispatchAsMuchAsPossibleLocked() } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 8679915f1f6..e14e57bf285 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -1461,10 +1461,10 @@ func newFIFO(requests ...*request) fifo { return l } -func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair { +func newObserverPair(clk clock.PassiveClock) metrics.RatioedChangeObserverPair { return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}) } -func newExecSeatsObserver(clk clock.PassiveClock) metrics.TimedObserver { +func newExecSeatsObserver(clk clock.PassiveClock) metrics.RatioedChangeObserver { return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, []string{"test"}) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 1dab3681146..86e507ecc76 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -40,7 +40,7 @@ type noRestraint struct{} type noRestraintRequest struct{} -func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (fq.QueueSetCompleter, error) { +func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { return noRestraintCompleter{}, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/observer.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/observer.go new file mode 100644 index 00000000000..1e55a0e1e77 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/observer.go @@ -0,0 +1,65 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +// Observer is something that can be given numeric observations. +type Observer interface { + // Observe takes an observation + Observe(float64) +} + +// ChangeObserver extends Observer with the ability to take +// an observation that is relative to the previous observation. +type ChangeObserver interface { + Observer + + // Observe a new value that differs by the given amount from the previous observation. + Add(float64) +} + +// RatioedChangeObserver tracks ratios. +// The numerator is set/changed through the ChangeObserver methods, +// and the denominator can be updated through the SetDenominator method. +// A ratio is tracked whenever the numerator is set/changed. +type RatioedChangeObserver interface { + ChangeObserver + + // SetDenominator sets the denominator to use until it is changed again + SetDenominator(float64) +} + +// RatioedChangeObserverGenerator creates related observers that are +// differentiated by a series of label values +type RatioedChangeObserverGenerator interface { + Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver +} + +// RatioedChangeObserverPair is a corresponding pair of observers, one for the +// number of requests waiting in queue(s) and one for the number of +// requests being executed +type RatioedChangeObserverPair struct { + // RequestsWaiting is given observations of the number of currently queued requests + RequestsWaiting RatioedChangeObserver + + // RequestsExecuting is given observations of the number of requests currently executing + RequestsExecuting RatioedChangeObserver +} + +// RatioedChangeObserverPairGenerator generates pairs +type RatioedChangeObserverPairGenerator interface { + Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go index dc12cb71472..29366b53635 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go @@ -34,13 +34,13 @@ const ( labelValueExecuting = "executing" ) -// SampleAndWaterMarkPairGenerator makes pairs of TimedObservers that +// SampleAndWaterMarkPairGenerator makes pairs of RatioedChangeObservers that // track samples and watermarks. type SampleAndWaterMarkPairGenerator struct { urGenerator SampleAndWaterMarkObserverGenerator } -var _ TimedObserverPairGenerator = SampleAndWaterMarkPairGenerator{} +var _ RatioedChangeObserverPairGenerator = SampleAndWaterMarkPairGenerator{} // NewSampleAndWaterMarkHistogramsPairGenerator makes a new pair generator func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairGenerator { @@ -50,10 +50,10 @@ func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samp } // Generate makes a new pair -func (spg SampleAndWaterMarkPairGenerator) Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair { - return TimedObserverPair{ - RequestsWaiting: spg.urGenerator.Generate(0, waiting1, append([]string{labelValueWaiting}, labelValues...)), - RequestsExecuting: spg.urGenerator.Generate(0, executing1, append([]string{labelValueExecuting}, labelValues...)), +func (spg SampleAndWaterMarkPairGenerator) Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair { + return RatioedChangeObserverPair{ + RequestsWaiting: spg.urGenerator.Generate(0, initialWaitingDenominator, append([]string{labelValueWaiting}, labelValues...)), + RequestsExecuting: spg.urGenerator.Generate(0, initialExecutingDenominator, append([]string{labelValueExecuting}, labelValues...)), } } @@ -61,7 +61,7 @@ func (spg SampleAndWaterMarkPairGenerator) metrics() Registerables { return spg.urGenerator.metrics() } -// SampleAndWaterMarkObserverGenerator creates TimedObservers that +// SampleAndWaterMarkObserverGenerator creates RatioedChangeObservers that // populate histograms of samples and low- and high-water-marks. The // generator has a samplePeriod, and the histograms get an observation // every samplePeriod. The sampling windows are quantized based on @@ -79,7 +79,7 @@ type sampleAndWaterMarkObserverGenerator struct { waterMarks *compbasemetrics.HistogramVec } -var _ TimedObserverGenerator = SampleAndWaterMarkObserverGenerator{} +var _ RatioedChangeObserverGenerator = SampleAndWaterMarkObserverGenerator{} // NewSampleAndWaterMarkHistogramsGenerator makes a new one func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator { @@ -97,23 +97,23 @@ func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 { return int64(when.Sub(swg.t0) / swg.samplePeriod) } -// Generate makes a new TimedObserver -func (swg *sampleAndWaterMarkObserverGenerator) Generate(x, x1 float64, labelValues []string) TimedObserver { - relX := x / x1 +// Generate makes a new RatioedChangeObserver +func (swg *sampleAndWaterMarkObserverGenerator) Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver { + ratio := initialNumerator / initialDenominator when := swg.clock.Now() return &sampleAndWaterMarkHistograms{ sampleAndWaterMarkObserverGenerator: swg, labelValues: labelValues, loLabelValues: append([]string{labelValueLo}, labelValues...), hiLabelValues: append([]string{labelValueHi}, labelValues...), - x1: x1, + denominator: initialDenominator, sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{ lastSet: when, lastSetInt: swg.quantize(when), - x: x, - relX: relX, - loRelX: relX, - hiRelX: relX, + numerator: initialNumerator, + ratio: ratio, + loRatio: ratio, + hiRatio: ratio, }} } @@ -127,39 +127,39 @@ type sampleAndWaterMarkHistograms struct { loLabelValues, hiLabelValues []string sync.Mutex - x1 float64 + denominator float64 sampleAndWaterMarkAccumulator } type sampleAndWaterMarkAccumulator struct { - lastSet time.Time - lastSetInt int64 // lastSet / samplePeriod - x float64 - relX float64 // x / x1 - loRelX, hiRelX float64 + lastSet time.Time + lastSetInt int64 // lastSet / samplePeriod + numerator float64 + ratio float64 // numerator/denominator + loRatio, hiRatio float64 } -var _ TimedObserver = (*sampleAndWaterMarkHistograms)(nil) +var _ RatioedChangeObserver = (*sampleAndWaterMarkHistograms)(nil) -func (saw *sampleAndWaterMarkHistograms) Add(deltaX float64) { +func (saw *sampleAndWaterMarkHistograms) Add(deltaNumerator float64) { saw.innerSet(func() { - saw.x += deltaX + saw.numerator += deltaNumerator }) } -func (saw *sampleAndWaterMarkHistograms) Set(x float64) { +func (saw *sampleAndWaterMarkHistograms) Observe(numerator float64) { saw.innerSet(func() { - saw.x = x + saw.numerator = numerator }) } -func (saw *sampleAndWaterMarkHistograms) SetX1(x1 float64) { +func (saw *sampleAndWaterMarkHistograms) SetDenominator(denominator float64) { saw.innerSet(func() { - saw.x1 = x1 + saw.denominator = denominator }) } -func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) { +func (saw *sampleAndWaterMarkHistograms) innerSet(updateNumeratorOrDenominator func()) { when, whenInt, acc, wellOrdered := func() (time.Time, int64, sampleAndWaterMarkAccumulator, bool) { saw.Lock() defer saw.Unlock() @@ -168,11 +168,11 @@ func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) { whenInt := saw.quantize(when) acc := saw.sampleAndWaterMarkAccumulator wellOrdered := !when.Before(acc.lastSet) - updateXOrX1() - saw.relX = saw.x / saw.x1 + updateNumeratorOrDenominator() + saw.ratio = saw.numerator / saw.denominator if wellOrdered { if acc.lastSetInt < whenInt { - saw.loRelX, saw.hiRelX = acc.relX, acc.relX + saw.loRatio, saw.hiRatio = acc.ratio, acc.ratio saw.lastSetInt = whenInt } saw.lastSet = when @@ -187,10 +187,10 @@ func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) { // would be wrong to update `saw.lastSet` in this case because // that plants a time bomb for future updates to // `saw.lastSetInt`. - if saw.relX < saw.loRelX { - saw.loRelX = saw.relX - } else if saw.relX > saw.hiRelX { - saw.hiRelX = saw.relX + if saw.ratio < saw.loRatio { + saw.loRatio = saw.ratio + } else if saw.ratio > saw.hiRatio { + saw.hiRatio = saw.ratio } return when, whenInt, acc, wellOrdered }() @@ -200,10 +200,10 @@ func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) { klog.Errorf("Time went backwards from %s to %s for labelValues=%#+v", lastSetS, whenS, saw.labelValues) } for acc.lastSetInt < whenInt { - saw.samples.WithLabelValues(saw.labelValues...).Observe(acc.relX) - saw.waterMarks.WithLabelValues(saw.loLabelValues...).Observe(acc.loRelX) - saw.waterMarks.WithLabelValues(saw.hiLabelValues...).Observe(acc.hiRelX) + saw.samples.WithLabelValues(saw.labelValues...).Observe(acc.ratio) + saw.waterMarks.WithLabelValues(saw.loLabelValues...).Observe(acc.loRatio) + saw.waterMarks.WithLabelValues(saw.hiLabelValues...).Observe(acc.hiRatio) acc.lastSetInt++ - acc.loRelX, acc.hiRelX = acc.relX, acc.relX + acc.loRatio, acc.hiRatio = acc.ratio, acc.ratio } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark_test.go index cdbbfba6d25..57a8c003f3b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark_test.go @@ -81,7 +81,7 @@ func TestSampler(t *testing.T) { dt = diff } clk.SetTime(t1) - saw.Set(1) + saw.Observe(1) expectedCount := int64(dt / samplingPeriod) actualCount, err := getHistogramCount(regs, samplesHistName) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timed_observer.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timed_observer.go deleted file mode 100644 index 25f41493c3e..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timed_observer.go +++ /dev/null @@ -1,52 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package metrics - -// TimedObserver gets informed about the values assigned to a variable -// `X float64` over time, and reports on the ratio `X/X1`. -type TimedObserver interface { - // Add notes a change to the variable - Add(deltaX float64) - - // Set notes a setting of the variable - Set(x float64) - - // SetX1 changes the value to use for X1 - SetX1(x1 float64) -} - -// TimedObserverGenerator creates related observers that are -// differentiated by a series of label values -type TimedObserverGenerator interface { - Generate(x, x1 float64, labelValues []string) TimedObserver -} - -// TimedObserverPair is a corresponding pair of observers, one for the -// number of requests waiting in queue(s) and one for the number of -// requests being executed -type TimedObserverPair struct { - // RequestsWaiting is given observations of the number of currently queued requests - RequestsWaiting TimedObserver - - // RequestsExecuting is given observations of the number of requests currently executing - RequestsExecuting TimedObserver -} - -// TimedObserverPairGenerator generates pairs -type TimedObserverPairGenerator interface { - Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair -}