Merge pull request #110104 from MikeSpreitzer/add-timing-ratio-histograms

Add timing ratio histograms
This commit is contained in:
Kubernetes Prow Robot 2022-07-13 18:02:56 -07:00 committed by GitHub
commit 22d018cf76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 538 additions and 409 deletions

View File

@ -220,13 +220,6 @@ func TestApfRejectRequest(t *testing.T) {
}
func TestApfExemptRequest(t *testing.T) {
epmetrics.Register()
fcmetrics.Register()
// Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator,
// so that an observation will cause some data to go into the Prometheus metrics.
time.Sleep(time.Millisecond * 50)
server := newApfServerWithSingleRequest(t, decisionNoQueuingExecute)
defer server.Close()
@ -240,19 +233,11 @@ func TestApfExemptRequest(t *testing.T) {
checkForExpectedMetrics(t, []string{
"apiserver_current_inflight_requests",
"apiserver_flowcontrol_read_vs_write_request_count_watermarks",
"apiserver_flowcontrol_read_vs_write_request_count_samples",
"apiserver_flowcontrol_read_vs_write_current_requests",
})
}
func TestApfExecuteRequest(t *testing.T) {
epmetrics.Register()
fcmetrics.Register()
// Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator,
// so that an observation will cause some data to go into the Prometheus metrics.
time.Sleep(time.Millisecond * 50)
server := newApfServerWithSingleRequest(t, decisionQueuingExecute)
defer server.Close()
@ -267,19 +252,11 @@ func TestApfExecuteRequest(t *testing.T) {
checkForExpectedMetrics(t, []string{
"apiserver_current_inflight_requests",
"apiserver_current_inqueue_requests",
"apiserver_flowcontrol_read_vs_write_request_count_watermarks",
"apiserver_flowcontrol_read_vs_write_request_count_samples",
"apiserver_flowcontrol_read_vs_write_current_requests",
})
}
func TestApfExecuteMultipleRequests(t *testing.T) {
epmetrics.Register()
fcmetrics.Register()
// Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator,
// so that an observation will cause some data to go into the Prometheus metrics.
time.Sleep(time.Millisecond * 50)
concurrentRequests := 5
preStartExecute, postStartExecute := &sync.WaitGroup{}, &sync.WaitGroup{}
preEnqueue, postEnqueue := &sync.WaitGroup{}, &sync.WaitGroup{}
@ -347,8 +324,7 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
checkForExpectedMetrics(t, []string{
"apiserver_current_inflight_requests",
"apiserver_current_inqueue_requests",
"apiserver_flowcontrol_read_vs_write_request_count_watermarks",
"apiserver_flowcontrol_read_vs_write_request_count_samples",
"apiserver_flowcontrol_read_vs_write_current_requests",
})
}

View File

@ -27,7 +27,6 @@ import (
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
basemetricstestutil "k8s.io/component-base/metrics/testutil"
"k8s.io/utils/clock"
)
const (
@ -36,11 +35,13 @@ const (
)
const (
requestKind = "request_kind"
priorityLevel = "priority_level"
flowSchema = "flow_schema"
phase = "phase"
mark = "mark"
requestKind = "request_kind"
priorityLevel = "priority_level"
flowSchema = "flow_schema"
phase = "phase"
LabelNamePhase = "phase"
LabelValueWaiting = "waiting"
LabelValueExecuting = "executing"
)
var (
@ -106,66 +107,45 @@ var (
[]string{priorityLevel, flowSchema},
)
// PriorityLevelExecutionSeatsGaugeVec creates observers of seats occupied throughout execution for priority levels
PriorityLevelExecutionSeatsGaugeVec = NewSampleAndWaterMarkHistogramsVec(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "priority_level_seat_count_samples",
Help: "Periodic observations of number of seats occupied for any stage of execution (but only initial stage for WATCHes)",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
PriorityLevelExecutionSeatsGaugeVec = NewTimingRatioHistogramVec(
&compbasemetrics.TimingHistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "priority_level_seat_utilization",
Help: "Observations, at the end of every nanosecond, of utilization of seats for any stage of execution (but only initial stage for WATCHes)",
// Buckets for both 0.99 and 1.0 mean PromQL's histogram_quantile will reveal saturation
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1},
ConstLabels: map[string]string{phase: "executing"},
StabilityLevel: compbasemetrics.ALPHA,
},
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "priority_level_seat_count_watermarks",
Help: "Watermarks of the number of seats occupied for any stage of execution (but only initial stage for WATCHes)",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
ConstLabels: map[string]string{phase: "executing"},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{priorityLevel},
priorityLevel,
)
// PriorityLevelConcurrencyGaugeVec creates gauges of concurrency broken down by phase, priority level
PriorityLevelConcurrencyGaugeVec = NewSampleAndWaterMarkHistogramsVec(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "priority_level_request_count_samples",
Help: "Periodic observations of the number of requests waiting or in any stage of execution (but only initial stage for WATCHes)",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
PriorityLevelConcurrencyGaugeVec = NewTimingRatioHistogramVec(
&compbasemetrics.TimingHistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "priority_level_request_utilization",
Help: "Observations, at the end of every nanosecond, of number of requests (as a fraction of the relevant limit) waiting or in any stage of execution (but only initial stage for WATCHes)",
// For executing: the denominator will be seats, so this metric will skew low.
// FOr waiting: the denominiator is individual queue length limit, so this metric can go over 1. Issue #110160
Buckets: []float64{0, 0.001, 0.0025, 0.005, 0.1, 0.25, 0.5, 0.75, 1, 10, 100},
StabilityLevel: compbasemetrics.ALPHA,
},
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "priority_level_request_count_watermarks",
Help: "Watermarks of the number of requests waiting or in any stage of execution (but only initial stage for WATCHes)",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{LabelNamePhase, priorityLevel},
LabelNamePhase, priorityLevel,
)
// ReadWriteConcurrencyGaugeVec creates gauges of number of requests broken down by phase and mutating vs readonly
ReadWriteConcurrencyGaugeVec = NewSampleAndWaterMarkHistogramsVec(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_vs_write_request_count_samples",
Help: "Periodic observations of the number of requests waiting or in regular stage of execution",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
// ReadWriteConcurrencyPairVec creates gauges of number of requests broken down by phase and mutating vs readonly
ReadWriteConcurrencyGaugeVec = NewTimingRatioHistogramVec(
&compbasemetrics.TimingHistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_vs_write_current_requests",
Help: "Observations, at the end of every nanosecond, of the number of requests (as a fraction of the relevant limit, if max-in-flight filter is being used) waiting or in regular stage of execution",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1, 3, 10, 30, 100, 300, 1000, 3000},
// TODO: something about the utilization vs count irregularity. Issue #109846
StabilityLevel: compbasemetrics.ALPHA,
},
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_vs_write_request_count_watermarks",
Help: "Watermarks of the number of requests waiting or in regular stage of execution",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{LabelNamePhase, requestKind},
LabelNamePhase, requestKind,
)
apiserverCurrentR = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{

View File

@ -1,204 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"time"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
const (
labelNameMark = "mark"
labelValueLo = "low"
labelValueHi = "high"
LabelNamePhase = "phase"
LabelValueWaiting = "waiting"
LabelValueExecuting = "executing"
)
// SampleAndWaterMarkObserverVec creates RatioedGauges that
// populate histograms of samples and low- and high-water-marks. The
// generator has a samplePeriod, and the histograms get an observation
// every samplePeriod. The sampling windows are quantized based on
// the monotonic rather than wall-clock times. The `t0` field is
// there so to provide a baseline for monotonic clock differences.
type SampleAndWaterMarkObserverVec struct {
*sampleAndWaterMarkObserverVec
}
type sampleAndWaterMarkObserverVec struct {
clock clock.PassiveClock
t0 time.Time
samplePeriod time.Duration
samples *compbasemetrics.HistogramVec
waterMarks *compbasemetrics.HistogramVec
}
var _ RatioedGaugeVec = SampleAndWaterMarkObserverVec{}
// NewSampleAndWaterMarkHistogramsVec makes a new one
func NewSampleAndWaterMarkHistogramsVec(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverVec {
return SampleAndWaterMarkObserverVec{
&sampleAndWaterMarkObserverVec{
clock: clock,
t0: clock.Now(),
samplePeriod: samplePeriod,
samples: compbasemetrics.NewHistogramVec(sampleOpts, labelNames),
waterMarks: compbasemetrics.NewHistogramVec(waterMarkOpts, append([]string{labelNameMark}, labelNames...)),
}}
}
func (swg *sampleAndWaterMarkObserverVec) quantize(when time.Time) int64 {
return int64(when.Sub(swg.t0) / swg.samplePeriod)
}
// NewForLabelValuesSafe makes a new RatioedGauge
func (swg *sampleAndWaterMarkObserverVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge {
ratio := initialNumerator / initialDenominator
when := swg.clock.Now()
return &sampleAndWaterMarkHistograms{
sampleAndWaterMarkObserverVec: swg,
labelValues: labelValues,
loLabelValues: append([]string{labelValueLo}, labelValues...),
hiLabelValues: append([]string{labelValueHi}, labelValues...),
denominator: initialDenominator,
sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{
lastSet: when,
lastSetInt: swg.quantize(when),
numerator: initialNumerator,
ratio: ratio,
loRatio: ratio,
hiRatio: ratio,
}}
}
func (swg *sampleAndWaterMarkObserverVec) metrics() Registerables {
return Registerables{swg.samples, swg.waterMarks}
}
type sampleAndWaterMarkHistograms struct {
*sampleAndWaterMarkObserverVec
labelValues []string
loLabelValues, hiLabelValues []string
sync.Mutex
denominator float64
sampleAndWaterMarkAccumulator
}
type sampleAndWaterMarkAccumulator struct {
lastSet time.Time
lastSetInt int64 // lastSet / samplePeriod
numerator float64
ratio float64 // numerator/denominator
loRatio, hiRatio float64
}
var _ RatioedGauge = (*sampleAndWaterMarkHistograms)(nil)
func (saw *sampleAndWaterMarkHistograms) Set(numerator float64) {
saw.innerSet(func() {
saw.numerator = numerator
})
}
func (saw *sampleAndWaterMarkHistograms) Add(deltaNumerator float64) {
saw.innerSet(func() {
saw.numerator += deltaNumerator
})
}
func (saw *sampleAndWaterMarkHistograms) Sub(deltaNumerator float64) {
saw.innerSet(func() {
saw.numerator -= deltaNumerator
})
}
func (saw *sampleAndWaterMarkHistograms) Inc() {
saw.innerSet(func() {
saw.numerator += 1
})
}
func (saw *sampleAndWaterMarkHistograms) Dec() {
saw.innerSet(func() {
saw.numerator -= 1
})
}
func (saw *sampleAndWaterMarkHistograms) SetToCurrentTime() {
saw.innerSet(func() {
saw.numerator = float64(saw.clock.Now().Sub(time.Unix(0, 0)))
})
}
func (saw *sampleAndWaterMarkHistograms) SetDenominator(denominator float64) {
saw.innerSet(func() {
saw.denominator = denominator
})
}
func (saw *sampleAndWaterMarkHistograms) innerSet(updateNumeratorOrDenominator func()) {
when, whenInt, acc, wellOrdered := func() (time.Time, int64, sampleAndWaterMarkAccumulator, bool) {
saw.Lock()
defer saw.Unlock()
// Moved these variables here to tiptoe around https://github.com/golang/go/issues/43570 for #97685
when := saw.clock.Now()
whenInt := saw.quantize(when)
acc := saw.sampleAndWaterMarkAccumulator
wellOrdered := !when.Before(acc.lastSet)
updateNumeratorOrDenominator()
saw.ratio = saw.numerator / saw.denominator
if wellOrdered {
if acc.lastSetInt < whenInt {
saw.loRatio, saw.hiRatio = acc.ratio, acc.ratio
saw.lastSetInt = whenInt
}
saw.lastSet = when
}
// `wellOrdered` should always be true because we are using
// monotonic clock readings and they never go backwards. Yet
// very small backwards steps (under 1 microsecond) have been
// observed
// (https://github.com/kubernetes/kubernetes/issues/96459).
// In the backwards case, treat the current reading as if it
// had occurred at time `saw.lastSet` and log an error. It
// would be wrong to update `saw.lastSet` in this case because
// that plants a time bomb for future updates to
// `saw.lastSetInt`.
if saw.ratio < saw.loRatio {
saw.loRatio = saw.ratio
} else if saw.ratio > saw.hiRatio {
saw.hiRatio = saw.ratio
}
return when, whenInt, acc, wellOrdered
}()
if !wellOrdered {
lastSetS := acc.lastSet.String()
whenS := when.String()
klog.Errorf("Time went backwards from %s to %s for labelValues=%#+v", lastSetS, whenS, saw.labelValues)
}
for acc.lastSetInt < whenInt {
saw.samples.WithLabelValues(saw.labelValues...).Observe(acc.ratio)
saw.waterMarks.WithLabelValues(saw.loLabelValues...).Observe(acc.loRatio)
saw.waterMarks.WithLabelValues(saw.hiLabelValues...).Observe(acc.hiRatio)
acc.lastSetInt++
acc.loRatio, acc.hiRatio = acc.ratio, acc.ratio
}
}

View File

@ -1,122 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"errors"
"fmt"
"math/rand"
"testing"
"time"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/klog/v2"
testclock "k8s.io/utils/clock/testing"
)
const (
samplesHistName = "sawtestsamples"
samplingPeriod = time.Millisecond
ddtRangeCentiPeriods = 300
ddtOffsetCentiPeriods = 50
numIterations = 100
)
var errMetricNotFound = errors.New("not found")
/* TestSampler does a rough behavioral test of the sampling in a
SampleAndWatermarkHistograms. The test creates one and exercises
it, checking that the count in the sampling histogram is correct at
each step. The sampling histogram is expected to get one
observation at the end of each sampling period. A fake clock is
used, and the exercise consists of repeatedly changing that fake
clock by an amount of time chosen uniformly at random from a range
that goes from a little negative to somewhat more than two sampling
periods. The negative changes are included because small negative
changes have been observed in real monotonic clock readings (see
issue #96459) and we want to test that they are properly tolerated.
The designed toleration is to pretend that the clock did not
change, until it resumes net forward progress.
*/
func TestSampler(t *testing.T) {
t0 := time.Now()
clk := testclock.NewFakePassiveClock(t0)
buckets := []float64{0, 1}
gen := NewSampleAndWaterMarkHistogramsVec(clk, samplingPeriod,
&compbasemetrics.HistogramOpts{Name: samplesHistName, Buckets: buckets},
&compbasemetrics.HistogramOpts{Name: "marks", Buckets: buckets},
[]string{})
saw := gen.NewForLabelValuesSafe(0, 1, []string{})
toRegister := gen.metrics()
registry := compbasemetrics.NewKubeRegistry()
for _, reg := range toRegister {
registry.MustRegister(reg)
}
// `dt` is the admitted cumulative difference in fake time
// since the start of the test. "admitted" means this is
// never allowed to decrease, which matches the designed
// toleration for negative monotonic clock changes.
var dt time.Duration
// `t1` is the current fake time
t1 := t0.Add(dt)
klog.Infof("Expect about %v warnings about time going backwards; this is fake time deliberately misbehaving.", (numIterations*ddtOffsetCentiPeriods)/ddtRangeCentiPeriods)
t.Logf("t0=%s", t0)
for i := 0; i < numIterations; i++ {
// `ddt` is the next step to take in fake time
ddt := time.Duration(rand.Intn(ddtRangeCentiPeriods)-ddtOffsetCentiPeriods) * samplingPeriod / 100
t1 = t1.Add(ddt)
diff := t1.Sub(t0)
if diff > dt {
dt = diff
}
clk.SetTime(t1)
saw.Set(1)
expectedCount := int64(dt / samplingPeriod)
actualCount, err := getHistogramCount(registry, samplesHistName)
if err != nil && !(err == errMetricNotFound && expectedCount == 0) {
t.Fatalf("For t0=%s, t1=%s, failed to getHistogramCount: %#+v", t0, t1, err)
}
t.Logf("For i=%d, ddt=%s, t1=%s, diff=%s, dt=%s, count=%d", i, ddt, t1, diff, dt, actualCount)
if expectedCount != actualCount {
t.Errorf("For i=%d, t0=%s, ddt=%s, t1=%s, expectedCount=%d, actualCount=%d", i, t0, ddt, t1, expectedCount, actualCount)
}
}
}
/* getHistogramCount returns the count of the named histogram or an error (if any) */
func getHistogramCount(registry compbasemetrics.KubeRegistry, metricName string) (int64, error) {
mfs, err := registry.Gather()
if err != nil {
return 0, fmt.Errorf("failed to gather metrics: %w", err)
}
for _, mf := range mfs {
thisName := mf.GetName()
if thisName != metricName {
continue
}
metric := mf.GetMetric()[0]
hist := metric.GetHistogram()
if hist == nil {
return 0, errors.New("dto.Metric has nil Histogram")
}
if hist.SampleCount == nil {
return 0, errors.New("dto.Histogram has nil SampleCount")
}
return int64(*hist.SampleCount), nil
}
return 0, errMetricNotFound
}

View File

@ -0,0 +1,223 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"sync"
"time"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/klog/v2"
)
// TimingRatioHistogram is essentially a gauge for a ratio where the client
// independently controls the numerator and denominator.
// When scraped it produces a histogram of samples of the ratio
// taken at the end of every nanosecond.
// `*TimingRatioHistogram` implements both Registerable and RatioedGauge.
type TimingRatioHistogram struct {
// The implementation is layered on TimingHistogram,
// adding the division by an occasionally adjusted denominator.
// Registerable is the registerable aspect.
// That is the registerable aspect of the underlying TimingHistogram.
compbasemetrics.Registerable
// timingRatioHistogramInner implements the RatioedGauge aspect.
timingRatioHistogramInner
}
// TimingRatioHistogramOpts is the constructor parameters of a TimingRatioHistogram.
// The `TimingHistogramOpts.InitialValue` is the initial numerator.
type TimingRatioHistogramOpts struct {
compbasemetrics.TimingHistogramOpts
InitialDenominator float64
}
// timingRatioHistogramInner implements the instrumentation aspect
type timingRatioHistogramInner struct {
nowFunc func() time.Time
getGaugeOfRatio func() Gauge
sync.Mutex
// access only with mutex locked
numerator, denominator float64
}
var _ RatioedGauge = &timingRatioHistogramInner{}
var _ RatioedGauge = &TimingRatioHistogram{}
var _ compbasemetrics.Registerable = &TimingRatioHistogram{}
// NewTimingHistogram returns an object which is TimingHistogram-like. However, nothing
// will be measured until the histogram is registered in at least one registry.
func NewTimingRatioHistogram(opts *TimingRatioHistogramOpts) *TimingRatioHistogram {
return NewTestableTimingRatioHistogram(time.Now, opts)
}
// NewTestableTimingHistogram adds injection of the clock
func NewTestableTimingRatioHistogram(nowFunc func() time.Time, opts *TimingRatioHistogramOpts) *TimingRatioHistogram {
ratioedOpts := opts.TimingHistogramOpts
ratioedOpts.InitialValue /= opts.InitialDenominator
th := compbasemetrics.NewTestableTimingHistogram(nowFunc, &ratioedOpts)
return &TimingRatioHistogram{
Registerable: th,
timingRatioHistogramInner: timingRatioHistogramInner{
nowFunc: nowFunc,
getGaugeOfRatio: func() Gauge { return th },
numerator: opts.InitialValue,
denominator: opts.InitialDenominator,
}}
}
func (trh *timingRatioHistogramInner) Set(numerator float64) {
trh.Lock()
defer trh.Unlock()
trh.numerator = numerator
ratio := numerator / trh.denominator
trh.getGaugeOfRatio().Set(ratio)
}
func (trh *timingRatioHistogramInner) Add(deltaNumerator float64) {
trh.Lock()
defer trh.Unlock()
numerator := trh.numerator + deltaNumerator
trh.numerator = numerator
ratio := numerator / trh.denominator
trh.getGaugeOfRatio().Set(ratio)
}
func (trh *timingRatioHistogramInner) Sub(deltaNumerator float64) {
trh.Add(-deltaNumerator)
}
func (trh *timingRatioHistogramInner) Inc() {
trh.Add(1)
}
func (trh *timingRatioHistogramInner) Dec() {
trh.Add(-1)
}
func (trh *timingRatioHistogramInner) SetToCurrentTime() {
trh.Set(float64(trh.nowFunc().Sub(time.Unix(0, 0))))
}
func (trh *timingRatioHistogramInner) SetDenominator(denominator float64) {
trh.Lock()
defer trh.Unlock()
trh.denominator = denominator
ratio := trh.numerator / denominator
trh.getGaugeOfRatio().Set(ratio)
}
// WithContext allows the normal TimingHistogram metric to pass in context.
// The context is no-op at the current level of development.
func (trh *timingRatioHistogramInner) WithContext(ctx context.Context) RatioedGauge {
return trh
}
// TimingRatioHistogramVec is a collection of TimingRatioHistograms that differ
// only in label values.
// `*TimingRatioHistogramVec` implements both Registerable and RatioedGaugeVec.
type TimingRatioHistogramVec struct {
// promote only the Registerable methods
compbasemetrics.Registerable
// delegate is TimingHistograms of the ratio
delegate compbasemetrics.GaugeVecMetric
}
var _ RatioedGaugeVec = &TimingRatioHistogramVec{}
var _ compbasemetrics.Registerable = &TimingRatioHistogramVec{}
// NewTimingHistogramVec constructs a new vector.
// `opts.InitialValue` is the initial ratio, but this applies
// only for the tiny period of time until NewForLabelValuesSafe sets
// the ratio based on the given initial numerator and denominator.
// Thus there is a tiny splinter of time during member construction when
// its underlying TimingHistogram is given the initial numerator rather than
// the initial ratio (which is obviously a non-issue when both are zero).
// Note the difficulties associated with extracting a member
// before registering the vector.
func NewTimingRatioHistogramVec(opts *compbasemetrics.TimingHistogramOpts, labelNames ...string) *TimingRatioHistogramVec {
return NewTestableTimingRatioHistogramVec(time.Now, opts, labelNames...)
}
// NewTestableTimingHistogramVec adds injection of the clock.
func NewTestableTimingRatioHistogramVec(nowFunc func() time.Time, opts *compbasemetrics.TimingHistogramOpts, labelNames ...string) *TimingRatioHistogramVec {
delegate := compbasemetrics.NewTestableTimingHistogramVec(nowFunc, opts, labelNames)
return &TimingRatioHistogramVec{
Registerable: delegate,
delegate: delegate,
}
}
func (v *TimingRatioHistogramVec) metrics() Registerables {
return Registerables{v}
}
// NewForLabelValuesChecked will return an error if this vec is not hidden and not yet registered
// or there is a syntactic problem with the labelValues.
func (v *TimingRatioHistogramVec) NewForLabelValuesChecked(initialNumerator, initialDenominator float64, labelValues []string) (RatioedGauge, error) {
underMember, err := v.delegate.WithLabelValuesChecked(labelValues...)
if err != nil {
return noopRatioed{}, err
}
underMember.Set(initialNumerator / initialDenominator)
return &timingRatioHistogramInner{
getGaugeOfRatio: func() Gauge { return underMember },
numerator: initialNumerator,
denominator: initialDenominator,
}, nil
}
// NewForLabelValuesSafe is the same as NewForLabelValuesChecked in cases where that does not
// return an error. When the unsafe version returns an error due to the vector not being
// registered yet, the safe version returns an object that implements its methods
// by looking up the relevant vector member in each call (thus getting a non-noop after registration).
// In the other error cases the object returned here is a noop.
func (v *TimingRatioHistogramVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge {
tro, err := v.NewForLabelValuesChecked(initialNumerator, initialDenominator, labelValues)
if err == nil {
return tro
}
if !compbasemetrics.ErrIsNotRegistered(err) {
klog.ErrorS(err, "Failed to extract TimingRatioHistogramVec member, using noop instead", "vectorname", v.FQName(), "labelValues", labelValues)
return tro
}
// At this point we know v.NewForLabelValuesChecked(..) returns a permanent noop,
// which we precisely want to avoid using. Instead, make our own gauge that
// fetches the element on every Set.
return &timingRatioHistogramInner{
getGaugeOfRatio: func() Gauge { return v.delegate.WithLabelValues(labelValues...) },
numerator: initialNumerator,
denominator: initialDenominator,
}
}
type noopRatioed struct{}
func (noopRatioed) Set(float64) {}
func (noopRatioed) Add(float64) {}
func (noopRatioed) Sub(float64) {}
func (noopRatioed) Inc() {}
func (noopRatioed) Dec() {}
func (noopRatioed) SetToCurrentTime() {}
func (noopRatioed) SetDenominator(float64) {}
func (v *TimingRatioHistogramVec) Reset() {
v.delegate.Reset()
}

View File

@ -0,0 +1,276 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"errors"
"fmt"
"math/rand"
"testing"
"time"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/klog/v2"
testclock "k8s.io/utils/clock/testing"
)
const (
ddtRangeCentiPeriods = 300
ddtOffsetCentiPeriods = 50
numIterations = 100
)
var errMetricNotFound = errors.New("not found")
func TestTimingRatioHistogramVecElementSimple(t *testing.T) {
testHistogramName := "vec_element_simple"
t0 := time.Now()
clk := testclock.NewFakePassiveClock(t0)
buckets := []float64{0, 1}
vec := NewTestableTimingRatioHistogramVec(clk.Now,
&compbasemetrics.TimingHistogramOpts{Name: testHistogramName, Buckets: buckets},
"alabel",
)
toRegister := vec.metrics()
registry := compbasemetrics.NewKubeRegistry()
for _, reg := range toRegister {
registry.MustRegister(reg)
}
tro, err := vec.NewForLabelValuesChecked(0, 1, []string{"avalue"})
if err != nil {
t.Error(err)
}
exerciseTimingRatioHistogram(t, testHistogramName, t0, clk, registry, tro)
}
func TestTimingRatioHistogramVecElementSafeEarly(t *testing.T) {
testHistogramName := "vec_element_safe_early"
t0 := time.Now()
clk := testclock.NewFakePassiveClock(t0)
buckets := []float64{0, 1}
vec := NewTestableTimingRatioHistogramVec(clk.Now,
&compbasemetrics.TimingHistogramOpts{Name: testHistogramName, Buckets: buckets},
"alabel",
)
tro := vec.NewForLabelValuesSafe(0, 1, []string{"avalue"})
toRegister := vec.metrics()
registry := compbasemetrics.NewKubeRegistry()
for _, reg := range toRegister {
registry.MustRegister(reg)
}
exerciseTimingRatioHistogram(t, testHistogramName, t0, clk, registry, tro)
}
func TestTimingRatioHistogramVecElementSafeLate(t *testing.T) {
testHistogramName := "vec_element_safe_late"
t0 := time.Now()
clk := testclock.NewFakePassiveClock(t0)
buckets := []float64{0, 1}
vec := NewTestableTimingRatioHistogramVec(clk.Now,
&compbasemetrics.TimingHistogramOpts{Name: testHistogramName, Buckets: buckets},
"alabel",
)
toRegister := vec.metrics()
registry := compbasemetrics.NewKubeRegistry()
for _, reg := range toRegister {
registry.MustRegister(reg)
}
tro := vec.NewForLabelValuesSafe(0, 1, []string{"avalue"})
exerciseTimingRatioHistogram(t, testHistogramName, t0, clk, registry, tro)
}
// exerciseTimingRatioHistogram does a rough behavioral test of a
// RatioedObserver. A fake clock is used, and the exercise consists
// of repeatedly changing that fake clock by an amount of time chosen
// uniformly at random from a range that goes from a little negative
// to somewhat more than two milliseconds. The negative changes are
// included because small negative changes have been observed in real
// monotonic clock readings (see issue #96459) and we want to test
// that they are properly tolerated. The designed toleration is to
// pretend that the clock did not change, until it resumes net forward
// progress. The exercise checks that the count in the observer is
// correct at each step. The observer is expected to get one
// observation at the end of each nanosecond.
func exerciseTimingRatioHistogram(t *testing.T, histogramName string, t0 time.Time, clk *testclock.FakePassiveClock, registry compbasemetrics.KubeRegistry, tro RatioedGauge) {
samplingPeriod := time.Nanosecond
steppingPeriod := time.Millisecond
tro.Set(1)
// `dt` is the admitted cumulative difference in fake time
// since the start of the test. "admitted" means this is
// never allowed to decrease, which matches the designed
// toleration for negative monotonic clock changes.
var dt time.Duration
// `t1` is the current fake time
t1 := t0.Add(dt)
klog.Infof("Expect about %v warnings about time going backwards; this is fake time deliberately misbehaving.", (numIterations*ddtOffsetCentiPeriods)/ddtRangeCentiPeriods)
t.Logf("t0=%s", t0)
for i := 0; i < numIterations; i++ {
// `ddt` is the next step to take in fake time
ddt := time.Duration(rand.Intn(ddtRangeCentiPeriods)-ddtOffsetCentiPeriods) * steppingPeriod / 100
t1 = t1.Add(ddt)
diff := t1.Sub(t0)
if diff > dt {
dt = diff
}
clk.SetTime(t1)
tro.Set(1)
expectedCount := int64(dt / samplingPeriod)
actualCount, err := getHistogramCount(registry, histogramName)
if err != nil && !(err == errMetricNotFound && expectedCount == 0) {
t.Fatalf("For t0=%s, t1=%s, failed to getHistogramCount: %#+v", t0, t1, err)
}
t.Logf("For i=%d, ddt=%s, t1=%s, diff=%s, dt=%s, count=%d", i, ddt, t1, diff, dt, actualCount)
if expectedCount != actualCount {
t.Errorf("For i=%d, t0=%s, ddt=%s, t1=%s, expectedCount=%d, actualCount=%d", i, t0, ddt, t1, expectedCount, actualCount)
}
}
}
/* getHistogramCount returns the count of the named histogram or an error (if any) */
func getHistogramCount(registry compbasemetrics.KubeRegistry, metricName string) (int64, error) {
mfs, err := registry.Gather()
if err != nil {
return 0, fmt.Errorf("failed to gather metrics: %w", err)
}
for _, mf := range mfs {
thisName := mf.GetName()
if thisName != metricName {
continue
}
metric := mf.GetMetric()[0]
hist := metric.GetHistogram()
if hist == nil {
return 0, errors.New("dto.Metric has nil Histogram")
}
if hist.SampleCount == nil {
return 0, errors.New("dto.Histogram has nil SampleCount")
}
return int64(*hist.SampleCount), nil
}
return 0, errMetricNotFound
}
func BenchmarkTimingRatioHistogram(b *testing.B) {
b.StopTimer()
now := time.Now()
clk := testclock.NewFakePassiveClock(now)
th := NewTestableTimingRatioHistogram(clk.Now,
&TimingRatioHistogramOpts{
compbasemetrics.TimingHistogramOpts{
Namespace: "testns",
Subsystem: "testsubsys",
Name: "testhist",
Help: "Me",
Buckets: []float64{1, 2, 4, 8, 16, 32},
},
1})
registry := compbasemetrics.NewKubeRegistry()
registry.MustRegister(th)
var x int
b.StartTimer()
for i := 0; i < b.N; i++ {
delta := (i % 6) + 1
now = now.Add(time.Duration(delta) * time.Millisecond)
clk.SetTime(now)
th.Set(float64(x))
x = (x + i) % 60
}
}
func BenchmarkTimingRatioHistogramVecElementSimple(b *testing.B) {
b.StopTimer()
now := time.Now()
clk := testclock.NewFakePassiveClock(now)
thv := NewTestableTimingRatioHistogramVec(clk.Now,
&compbasemetrics.TimingHistogramOpts{
Namespace: "testns",
Subsystem: "testsubsys",
Name: "testhist",
Help: "Me",
Buckets: []float64{1, 2, 4, 8, 16, 32},
},
"labelname")
registry := compbasemetrics.NewKubeRegistry()
registry.MustRegister(thv.metrics()...)
th, err := thv.NewForLabelValuesChecked(0, 3, []string{"labelvalue"})
if err != nil {
b.Error(err)
}
var x int
b.StartTimer()
for i := 0; i < b.N; i++ {
delta := (i % 6) + 1
now = now.Add(time.Duration(delta) * time.Millisecond)
clk.SetTime(now)
th.Set(float64(x))
x = (x + i) % 60
}
}
func BenchmarkTimingRatioHistogramVecElementSafeEarly(b *testing.B) {
b.StopTimer()
now := time.Now()
clk := testclock.NewFakePassiveClock(now)
thv := NewTestableTimingRatioHistogramVec(clk.Now,
&compbasemetrics.TimingHistogramOpts{
Namespace: "testns",
Subsystem: "testsubsys",
Name: "testhist",
Help: "Me",
Buckets: []float64{1, 2, 4, 8, 16, 32},
},
"labelname")
th := thv.NewForLabelValuesSafe(0, 3, []string{"labelvalue"})
registry := compbasemetrics.NewKubeRegistry()
registry.MustRegister(thv.metrics()...)
var x int
b.StartTimer()
for i := 0; i < b.N; i++ {
delta := (i % 6) + 1
now = now.Add(time.Duration(delta) * time.Millisecond)
clk.SetTime(now)
th.Set(float64(x))
x = (x + i) % 60
}
}
func BenchmarkTimingRatioHistogramVecElementSafeLate(b *testing.B) {
b.StopTimer()
now := time.Now()
clk := testclock.NewFakePassiveClock(now)
thv := NewTestableTimingRatioHistogramVec(clk.Now,
&compbasemetrics.TimingHistogramOpts{
Namespace: "testns",
Subsystem: "testsubsys",
Name: "testhist",
Help: "Me",
Buckets: []float64{1, 2, 4, 8, 16, 32},
},
"labelname")
registry := compbasemetrics.NewKubeRegistry()
registry.MustRegister(thv.metrics()...)
th := thv.NewForLabelValuesSafe(0, 3, []string{"labelvalue"})
var x int
b.StartTimer()
for i := 0; i < b.N; i++ {
delta := (i % 6) + 1
now = now.Add(time.Duration(delta) * time.Millisecond)
clk.SetTime(now)
th.Set(float64(x))
x = (x + i) % 60
}
}