Merge pull request #106432 from MikeSpreitzer/regularize-observers

Factored TimedObserver into less surprising pieces
This commit is contained in:
Kubernetes Prow Robot 2021-11-16 00:35:26 -08:00 committed by GitHub
commit 2b3ff415ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 139 additions and 130 deletions

View File

@ -61,13 +61,13 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
// requestWatermark is used to track maximal numbers of requests in a particular phase of handling
type requestWatermark struct {
phase string
readOnlyObserver, mutatingObserver fcmetrics.TimedObserver
readOnlyObserver, mutatingObserver fcmetrics.RatioedChangeObserver
lock sync.Mutex
readOnlyWatermark, mutatingWatermark int
}
func (w *requestWatermark) recordMutating(mutatingVal int) {
w.mutatingObserver.Set(float64(mutatingVal))
w.mutatingObserver.Observe(float64(mutatingVal))
w.lock.Lock()
defer w.lock.Unlock()
@ -78,7 +78,7 @@ func (w *requestWatermark) recordMutating(mutatingVal int) {
}
func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
w.readOnlyObserver.Set(float64(readOnlyVal))
w.readOnlyObserver.Observe(float64(readOnlyVal))
w.lock.Lock()
defer w.lock.Unlock()
@ -132,11 +132,11 @@ func WithMaxInFlightLimit(
var mutatingChan chan bool
if nonMutatingLimit != 0 {
nonMutatingChan = make(chan bool, nonMutatingLimit)
watermark.readOnlyObserver.SetX1(float64(nonMutatingLimit))
watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit))
}
if mutatingLimit != 0 {
mutatingChan = make(chan bool, mutatingLimit)
watermark.mutatingObserver.SetX1(float64(mutatingLimit))
watermark.mutatingObserver.SetDenominator(float64(mutatingLimit))
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

View File

@ -102,8 +102,8 @@ type configController struct {
name string // varies in tests of fighting controllers
clock clock.PassiveClock
queueSetFactory fq.QueueSetFactory
reqsObsPairGenerator metrics.TimedObserverPairGenerator
execSeatsObsGenerator metrics.TimedObserverGenerator
reqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator
execSeatsObsGenerator metrics.RatioedChangeObserverGenerator
// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager
asFieldManager string
@ -193,10 +193,10 @@ type priorityLevelState struct {
numPending int
// Observers tracking number of requests waiting, executing
reqsObsPair metrics.TimedObserverPair
reqsObsPair metrics.RatioedChangeObserverPair
// Observer of number of seats occupied throughout execution
execSeatsObs metrics.TimedObserver
execSeatsObs metrics.RatioedChangeObserver
}
// NewTestableController is extra flexible to facilitate testing
@ -693,7 +693,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
// given priority level configuration. Returns nil if that config
// does not call for limiting. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package.
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) {
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
return nil, errors.New("broken union structure at the top")
}

View File

@ -135,10 +135,10 @@ type TestableConfig struct {
RequestWaitLimit time.Duration
// ObsPairGenerator for metrics about requests
ReqsObsPairGenerator metrics.TimedObserverPairGenerator
ReqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator
// TimedObserverPairGenerator for metrics about seats occupied by all phases of execution
ExecSeatsObsGenerator metrics.TimedObserverGenerator
// RatioedChangeObserverPairGenerator for metrics about seats occupied by all phases of execution
ExecSeatsObsGenerator metrics.RatioedChangeObserverGenerator
// QueueSetFactory for the queuing implementation
QueueSetFactory fq.QueueSetFactory

View File

@ -105,7 +105,7 @@ type ctlrTestRequest struct {
descr1, descr2 interface{}
}
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.TimedObserverPair, eso metrics.TimedObserver) (fq.QueueSetCompleter, error) {
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedChangeObserverPair, eso metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
}

View File

@ -28,10 +28,9 @@ import (
// Integrator computes the moments of some variable X over time as
// read from a particular clock. The integrals start when the
// Integrator is created, and ends at the latest operation on the
// Integrator. As a `metrics.TimedObserver` this fixes X1=1 and
// ignores attempts to change X1.
// Integrator.
type Integrator interface {
metrics.TimedObserver
metrics.ChangeObserver
GetResults() IntegratorResults
@ -70,10 +69,7 @@ func NewIntegrator(clock clock.PassiveClock) Integrator {
}
}
func (igr *integrator) SetX1(x1 float64) {
}
func (igr *integrator) Set(x float64) {
func (igr *integrator) Observe(x float64) {
igr.Lock()
igr.setLocked(x)
igr.Unlock()

View File

@ -38,7 +38,7 @@ func TestIntegrator(t *testing.T) {
if !results.Equal(&rToo) {
t.Errorf("expected %#+v, got %#+v", results, rToo)
}
igr.Set(2)
igr.Observe(2)
results = igr.GetResults()
if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) {
t.Errorf("expected %#+v, got %#+v", e, results)

View File

@ -32,10 +32,10 @@ import (
// before committing to a concurrency allotment for the second.
type QueueSetFactory interface {
// BeginConstruction does the first phase of creating a QueueSet.
// The TimedObserverPair observes number of requests,
// The RatioedChangeObserverPair observes number of requests,
// execution covering just the regular phase.
// The TimedObserver observes number of seats occupied through all phases of execution.
BeginConstruction(QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (QueueSetCompleter, error)
// The RatioedChangeObserver observes number of seats occupied through all phases of execution.
BeginConstruction(QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (QueueSetCompleter, error)
}
// QueueSetCompleter finishes the two-step process of creating or

View File

@ -61,8 +61,8 @@ type promiseFactoryFactory func(*queueSet) promiseFactory
// the fields `factory` and `theSet` is non-nil.
type queueSetCompleter struct {
factory *queueSetFactory
reqsObsPair metrics.TimedObserverPair
execSeatsObs metrics.TimedObserver
reqsObsPair metrics.RatioedChangeObserverPair
execSeatsObs metrics.RatioedChangeObserver
theSet *queueSet
qCfg fq.QueuingConfig
dealer *shufflesharding.Dealer
@ -81,9 +81,9 @@ type queueSet struct {
clock eventclock.Interface
estimatedServiceDuration time.Duration
reqsObsPair metrics.TimedObserverPair // .RequestsExecuting covers regular phase only
reqsObsPair metrics.RatioedChangeObserverPair // .RequestsExecuting covers regular phase only
execSeatsObs metrics.TimedObserver // for all phases of execution
execSeatsObs metrics.RatioedChangeObserver // for all phases of execution
promiseFactory promiseFactory
@ -148,7 +148,7 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr
}
}
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) {
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
dealer, err := checkConfig(qCfg)
if err != nil {
return nil, err
@ -243,9 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
if qll < 1 {
qll = 1
}
qs.reqsObsPair.RequestsWaiting.SetX1(float64(qll))
qs.reqsObsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit))
qs.execSeatsObs.SetX1(float64(dCfg.ConcurrencyLimit))
qs.reqsObsPair.RequestsWaiting.SetDenominator(float64(qll))
qs.reqsObsPair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.execSeatsObs.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.dispatchAsMuchAsPossibleLocked()
}

View File

@ -1461,10 +1461,10 @@ func newFIFO(requests ...*request) fifo {
return l
}
func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair {
func newObserverPair(clk clock.PassiveClock) metrics.RatioedChangeObserverPair {
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
}
func newExecSeatsObserver(clk clock.PassiveClock) metrics.TimedObserver {
func newExecSeatsObserver(clk clock.PassiveClock) metrics.RatioedChangeObserver {
return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, []string{"test"})
}

View File

@ -40,7 +40,7 @@ type noRestraint struct{}
type noRestraintRequest struct{}
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (fq.QueueSetCompleter, error) {
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
return noRestraintCompleter{}, nil
}

View File

@ -0,0 +1,65 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
// Observer is something that can be given numeric observations.
type Observer interface {
// Observe takes an observation
Observe(float64)
}
// ChangeObserver extends Observer with the ability to take
// an observation that is relative to the previous observation.
type ChangeObserver interface {
Observer
// Observe a new value that differs by the given amount from the previous observation.
Add(float64)
}
// RatioedChangeObserver tracks ratios.
// The numerator is set/changed through the ChangeObserver methods,
// and the denominator can be updated through the SetDenominator method.
// A ratio is tracked whenever the numerator is set/changed.
type RatioedChangeObserver interface {
ChangeObserver
// SetDenominator sets the denominator to use until it is changed again
SetDenominator(float64)
}
// RatioedChangeObserverGenerator creates related observers that are
// differentiated by a series of label values
type RatioedChangeObserverGenerator interface {
Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver
}
// RatioedChangeObserverPair is a corresponding pair of observers, one for the
// number of requests waiting in queue(s) and one for the number of
// requests being executed
type RatioedChangeObserverPair struct {
// RequestsWaiting is given observations of the number of currently queued requests
RequestsWaiting RatioedChangeObserver
// RequestsExecuting is given observations of the number of requests currently executing
RequestsExecuting RatioedChangeObserver
}
// RatioedChangeObserverPairGenerator generates pairs
type RatioedChangeObserverPairGenerator interface {
Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair
}

View File

@ -34,13 +34,13 @@ const (
labelValueExecuting = "executing"
)
// SampleAndWaterMarkPairGenerator makes pairs of TimedObservers that
// SampleAndWaterMarkPairGenerator makes pairs of RatioedChangeObservers that
// track samples and watermarks.
type SampleAndWaterMarkPairGenerator struct {
urGenerator SampleAndWaterMarkObserverGenerator
}
var _ TimedObserverPairGenerator = SampleAndWaterMarkPairGenerator{}
var _ RatioedChangeObserverPairGenerator = SampleAndWaterMarkPairGenerator{}
// NewSampleAndWaterMarkHistogramsPairGenerator makes a new pair generator
func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairGenerator {
@ -50,10 +50,10 @@ func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samp
}
// Generate makes a new pair
func (spg SampleAndWaterMarkPairGenerator) Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair {
return TimedObserverPair{
RequestsWaiting: spg.urGenerator.Generate(0, waiting1, append([]string{labelValueWaiting}, labelValues...)),
RequestsExecuting: spg.urGenerator.Generate(0, executing1, append([]string{labelValueExecuting}, labelValues...)),
func (spg SampleAndWaterMarkPairGenerator) Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair {
return RatioedChangeObserverPair{
RequestsWaiting: spg.urGenerator.Generate(0, initialWaitingDenominator, append([]string{labelValueWaiting}, labelValues...)),
RequestsExecuting: spg.urGenerator.Generate(0, initialExecutingDenominator, append([]string{labelValueExecuting}, labelValues...)),
}
}
@ -61,7 +61,7 @@ func (spg SampleAndWaterMarkPairGenerator) metrics() Registerables {
return spg.urGenerator.metrics()
}
// SampleAndWaterMarkObserverGenerator creates TimedObservers that
// SampleAndWaterMarkObserverGenerator creates RatioedChangeObservers that
// populate histograms of samples and low- and high-water-marks. The
// generator has a samplePeriod, and the histograms get an observation
// every samplePeriod. The sampling windows are quantized based on
@ -79,7 +79,7 @@ type sampleAndWaterMarkObserverGenerator struct {
waterMarks *compbasemetrics.HistogramVec
}
var _ TimedObserverGenerator = SampleAndWaterMarkObserverGenerator{}
var _ RatioedChangeObserverGenerator = SampleAndWaterMarkObserverGenerator{}
// NewSampleAndWaterMarkHistogramsGenerator makes a new one
func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator {
@ -97,23 +97,23 @@ func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 {
return int64(when.Sub(swg.t0) / swg.samplePeriod)
}
// Generate makes a new TimedObserver
func (swg *sampleAndWaterMarkObserverGenerator) Generate(x, x1 float64, labelValues []string) TimedObserver {
relX := x / x1
// Generate makes a new RatioedChangeObserver
func (swg *sampleAndWaterMarkObserverGenerator) Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver {
ratio := initialNumerator / initialDenominator
when := swg.clock.Now()
return &sampleAndWaterMarkHistograms{
sampleAndWaterMarkObserverGenerator: swg,
labelValues: labelValues,
loLabelValues: append([]string{labelValueLo}, labelValues...),
hiLabelValues: append([]string{labelValueHi}, labelValues...),
x1: x1,
denominator: initialDenominator,
sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{
lastSet: when,
lastSetInt: swg.quantize(when),
x: x,
relX: relX,
loRelX: relX,
hiRelX: relX,
numerator: initialNumerator,
ratio: ratio,
loRatio: ratio,
hiRatio: ratio,
}}
}
@ -127,39 +127,39 @@ type sampleAndWaterMarkHistograms struct {
loLabelValues, hiLabelValues []string
sync.Mutex
x1 float64
denominator float64
sampleAndWaterMarkAccumulator
}
type sampleAndWaterMarkAccumulator struct {
lastSet time.Time
lastSetInt int64 // lastSet / samplePeriod
x float64
relX float64 // x / x1
loRelX, hiRelX float64
lastSet time.Time
lastSetInt int64 // lastSet / samplePeriod
numerator float64
ratio float64 // numerator/denominator
loRatio, hiRatio float64
}
var _ TimedObserver = (*sampleAndWaterMarkHistograms)(nil)
var _ RatioedChangeObserver = (*sampleAndWaterMarkHistograms)(nil)
func (saw *sampleAndWaterMarkHistograms) Add(deltaX float64) {
func (saw *sampleAndWaterMarkHistograms) Add(deltaNumerator float64) {
saw.innerSet(func() {
saw.x += deltaX
saw.numerator += deltaNumerator
})
}
func (saw *sampleAndWaterMarkHistograms) Set(x float64) {
func (saw *sampleAndWaterMarkHistograms) Observe(numerator float64) {
saw.innerSet(func() {
saw.x = x
saw.numerator = numerator
})
}
func (saw *sampleAndWaterMarkHistograms) SetX1(x1 float64) {
func (saw *sampleAndWaterMarkHistograms) SetDenominator(denominator float64) {
saw.innerSet(func() {
saw.x1 = x1
saw.denominator = denominator
})
}
func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) {
func (saw *sampleAndWaterMarkHistograms) innerSet(updateNumeratorOrDenominator func()) {
when, whenInt, acc, wellOrdered := func() (time.Time, int64, sampleAndWaterMarkAccumulator, bool) {
saw.Lock()
defer saw.Unlock()
@ -168,11 +168,11 @@ func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) {
whenInt := saw.quantize(when)
acc := saw.sampleAndWaterMarkAccumulator
wellOrdered := !when.Before(acc.lastSet)
updateXOrX1()
saw.relX = saw.x / saw.x1
updateNumeratorOrDenominator()
saw.ratio = saw.numerator / saw.denominator
if wellOrdered {
if acc.lastSetInt < whenInt {
saw.loRelX, saw.hiRelX = acc.relX, acc.relX
saw.loRatio, saw.hiRatio = acc.ratio, acc.ratio
saw.lastSetInt = whenInt
}
saw.lastSet = when
@ -187,10 +187,10 @@ func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) {
// would be wrong to update `saw.lastSet` in this case because
// that plants a time bomb for future updates to
// `saw.lastSetInt`.
if saw.relX < saw.loRelX {
saw.loRelX = saw.relX
} else if saw.relX > saw.hiRelX {
saw.hiRelX = saw.relX
if saw.ratio < saw.loRatio {
saw.loRatio = saw.ratio
} else if saw.ratio > saw.hiRatio {
saw.hiRatio = saw.ratio
}
return when, whenInt, acc, wellOrdered
}()
@ -200,10 +200,10 @@ func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) {
klog.Errorf("Time went backwards from %s to %s for labelValues=%#+v", lastSetS, whenS, saw.labelValues)
}
for acc.lastSetInt < whenInt {
saw.samples.WithLabelValues(saw.labelValues...).Observe(acc.relX)
saw.waterMarks.WithLabelValues(saw.loLabelValues...).Observe(acc.loRelX)
saw.waterMarks.WithLabelValues(saw.hiLabelValues...).Observe(acc.hiRelX)
saw.samples.WithLabelValues(saw.labelValues...).Observe(acc.ratio)
saw.waterMarks.WithLabelValues(saw.loLabelValues...).Observe(acc.loRatio)
saw.waterMarks.WithLabelValues(saw.hiLabelValues...).Observe(acc.hiRatio)
acc.lastSetInt++
acc.loRelX, acc.hiRelX = acc.relX, acc.relX
acc.loRatio, acc.hiRatio = acc.ratio, acc.ratio
}
}

View File

@ -81,7 +81,7 @@ func TestSampler(t *testing.T) {
dt = diff
}
clk.SetTime(t1)
saw.Set(1)
saw.Observe(1)
expectedCount := int64(dt / samplingPeriod)
actualCount, err := getHistogramCount(regs, samplesHistName)
if err != nil {

View File

@ -1,52 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
// TimedObserver gets informed about the values assigned to a variable
// `X float64` over time, and reports on the ratio `X/X1`.
type TimedObserver interface {
// Add notes a change to the variable
Add(deltaX float64)
// Set notes a setting of the variable
Set(x float64)
// SetX1 changes the value to use for X1
SetX1(x1 float64)
}
// TimedObserverGenerator creates related observers that are
// differentiated by a series of label values
type TimedObserverGenerator interface {
Generate(x, x1 float64, labelValues []string) TimedObserver
}
// TimedObserverPair is a corresponding pair of observers, one for the
// number of requests waiting in queue(s) and one for the number of
// requests being executed
type TimedObserverPair struct {
// RequestsWaiting is given observations of the number of currently queued requests
RequestsWaiting TimedObserver
// RequestsExecuting is given observations of the number of requests currently executing
RequestsExecuting TimedObserver
}
// TimedObserverPairGenerator generates pairs
type TimedObserverPairGenerator interface {
Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair
}