diff --git a/hack/verify-prometheus-imports.sh b/hack/verify-prometheus-imports.sh index 8b92336c63f..d7c689fe72f 100755 --- a/hack/verify-prometheus-imports.sh +++ b/hack/verify-prometheus-imports.sh @@ -38,6 +38,12 @@ source "${KUBE_ROOT}/hack/lib/util.sh" allowed_prometheus_importers=( ./cluster/images/etcd-version-monitor/etcd-version-monitor.go ./pkg/volume/util/operationexecutor/operation_generator_test.go + ./staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram.go + ./staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram_test.go + ./staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram_vec.go + ./staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram.go + ./staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram_test.go + ./staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram_vec.go ./staging/src/k8s.io/component-base/metrics/collector.go ./staging/src/k8s.io/component-base/metrics/collector_test.go ./staging/src/k8s.io/component-base/metrics/counter.go diff --git a/staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram.go b/staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram.go new file mode 100644 index 00000000000..be07977e281 --- /dev/null +++ b/staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram.go @@ -0,0 +1,189 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prometheusextension + +import ( + "errors" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +// GaugeOps is the part of `prometheus.Gauge` that is relevant to +// instrumented code. +// This factoring should be in prometheus, analogous to the way +// it already factors out the Observer interface for histograms and summaries. +type GaugeOps interface { + // Set is the same as Gauge.Set + Set(float64) + // Inc is the same as Gauge.inc + Inc() + // Dec is the same as Gauge.Dec + Dec() + // Add is the same as Gauge.Add + Add(float64) + // Sub is the same as Gauge.Sub + Sub(float64) + + // SetToCurrentTime the same as Gauge.SetToCurrentTime + SetToCurrentTime() +} + +// A TimingHistogram tracks how long a `float64` variable spends in +// ranges defined by buckets. Time is counted in nanoseconds. The +// histogram's sum is the integral over time (in nanoseconds, from +// creation of the histogram) of the variable's value. +type TimingHistogram interface { + prometheus.Metric + prometheus.Collector + GaugeOps +} + +// TimingHistogramOpts is the parameters of the TimingHistogram constructor +type TimingHistogramOpts struct { + Namespace string + Subsystem string + Name string + Help string + ConstLabels prometheus.Labels + + // Buckets defines the buckets into which observations are + // accumulated. Each element in the slice is the upper + // inclusive bound of a bucket. The values must be sorted in + // strictly increasing order. There is no need to add a + // highest bucket with +Inf bound. The default value is + // prometheus.DefBuckets. + Buckets []float64 + + // The initial value of the variable. + InitialValue float64 +} + +// NewTimingHistogram creates a new TimingHistogram +func NewTimingHistogram(opts TimingHistogramOpts) (TimingHistogram, error) { + return NewTestableTimingHistogram(time.Now, opts) +} + +// NewTestableTimingHistogram creates a TimingHistogram that uses a mockable clock +func NewTestableTimingHistogram(nowFunc func() time.Time, opts TimingHistogramOpts) (TimingHistogram, error) { + desc := prometheus.NewDesc( + prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), + wrapTimingHelp(opts.Help), + nil, + opts.ConstLabels, + ) + return newTimingHistogram(nowFunc, desc, opts) +} + +func wrapTimingHelp(given string) string { + return "EXPERIMENTAL: " + given +} + +func newTimingHistogram(nowFunc func() time.Time, desc *prometheus.Desc, opts TimingHistogramOpts, variableLabelValues ...string) (TimingHistogram, error) { + allLabelsM := prometheus.Labels{} + allLabelsS := prometheus.MakeLabelPairs(desc, variableLabelValues) + for _, pair := range allLabelsS { + if pair == nil || pair.Name == nil || pair.Value == nil { + return nil, errors.New("prometheus.MakeLabelPairs returned a nil") + } + allLabelsM[*pair.Name] = *pair.Value + } + weighted, err := newWeightedHistogram(desc, WeightedHistogramOpts{ + Namespace: opts.Namespace, + Subsystem: opts.Subsystem, + Name: opts.Name, + Help: opts.Help, + ConstLabels: allLabelsM, + Buckets: opts.Buckets, + }, variableLabelValues...) + if err != nil { + return nil, err + } + return &timingHistogram{ + nowFunc: nowFunc, + weighted: weighted, + lastSetTime: nowFunc(), + value: opts.InitialValue, + }, nil +} + +type timingHistogram struct { + nowFunc func() time.Time + weighted *weightedHistogram + + // The following fields must only be accessed with weighted's lock held + + lastSetTime time.Time // identifies when value was last set + value float64 +} + +var _ TimingHistogram = &timingHistogram{} + +func (th *timingHistogram) Set(newValue float64) { + th.update(func(float64) float64 { return newValue }) +} + +func (th *timingHistogram) Inc() { + th.update(func(oldValue float64) float64 { return oldValue + 1 }) +} + +func (th *timingHistogram) Dec() { + th.update(func(oldValue float64) float64 { return oldValue - 1 }) +} + +func (th *timingHistogram) Add(delta float64) { + th.update(func(oldValue float64) float64 { return oldValue + delta }) +} + +func (th *timingHistogram) Sub(delta float64) { + th.update(func(oldValue float64) float64 { return oldValue - delta }) +} + +func (th *timingHistogram) SetToCurrentTime() { + th.update(func(oldValue float64) float64 { return th.nowFunc().Sub(time.Unix(0, 0)).Seconds() }) +} + +func (th *timingHistogram) update(updateFn func(float64) float64) { + th.weighted.lock.Lock() + defer th.weighted.lock.Unlock() + now := th.nowFunc() + delta := now.Sub(th.lastSetTime) + value := th.value + if delta > 0 { + th.weighted.observeWithWeightLocked(value, uint64(delta)) + th.lastSetTime = now + } + th.value = updateFn(value) +} + +func (th *timingHistogram) Desc() *prometheus.Desc { + return th.weighted.Desc() +} + +func (th *timingHistogram) Write(dest *dto.Metric) error { + th.Add(0) // account for time since last update + return th.weighted.Write(dest) +} + +func (th *timingHistogram) Describe(ch chan<- *prometheus.Desc) { + ch <- th.weighted.Desc() +} + +func (th *timingHistogram) Collect(ch chan<- prometheus.Metric) { + ch <- th +} diff --git a/staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram_test.go b/staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram_test.go new file mode 100644 index 00000000000..2fe900d3bc3 --- /dev/null +++ b/staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram_test.go @@ -0,0 +1,223 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prometheusextension + +import ( + "math" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +func TestTimingHistogramNonMonotonicBuckets(t *testing.T) { + testCases := map[string][]float64{ + "not strictly monotonic": {1, 2, 2, 3}, + "not monotonic at all": {1, 2, 4, 3, 5}, + "have +Inf in the middle": {1, 2, math.Inf(+1), 3}, + } + for name, buckets := range testCases { + _, err := NewTimingHistogram(TimingHistogramOpts{ + Name: "test_histogram", + Help: "helpless", + Buckets: buckets, + }) + if err == nil { + t.Errorf("Buckets %v are %s but NewHTimingistogram did not complain.", buckets, name) + } + } +} + +func exerciseTimingHistogramAndCollector(th GaugeOps, t0 time.Time, clk *unsyncFakeClock, collect func(chan<- prometheus.Metric), expectCollection ...GaugeOps) func(t *testing.T) { + return func(t *testing.T) { + exerciseTimingHistogramData(t, th, t0, clk) + exerciseTimingHistogramCollector(t, collect, expectCollection) + } +} + +func exerciseTimingHistogramCollector(t *testing.T, collect func(chan<- prometheus.Metric), expectCollection []GaugeOps) { + remainingCollection := expectCollection + metch := make(chan prometheus.Metric) + go func() { + collect(metch) + close(metch) + }() + for collected := range metch { + collectedGO := collected.(GaugeOps) + newRem, found := findAndRemove(remainingCollection, collectedGO) + if !found { + t.Errorf("Collected unexpected value %#+v", collected) + } + remainingCollection = newRem + } + if len(remainingCollection) > 0 { + t.Errorf("Collection omitted %#+v", remainingCollection) + } +} + +var thTestBuckets = []float64{0, 0.5, 1} +var thTestV0 float64 = 0.25 + +// exerciseTimingHistogramData takes the given histogram through the following points in (time,value) space. +// t0 is the clock time of the histogram's construction +// value=v0 for t0 <= t <= t1 where v0 = 0.25 and t1 = t0 + 1 ns +// value=v1 for t1 <= t <= t2 where v1 = 0.75 and t2 = t1 + 1 microsecond +// value=v2 for t2 <= t <= t3 where v2 = 1.25 and t3 = t2 + 1 millisecond +// value=v3 for t3 <= t <= t4 where v3 = 0.65 and t4 = t3 + 1 second +func exerciseTimingHistogramData(t *testing.T, th GaugeOps, t0 time.Time, clk *unsyncFakeClock) { + t1 := t0.Add(time.Nanosecond) + v0 := thTestV0 + var v1 float64 = 0.75 + clk.SetTime(t1) + th.Set(v1) + t2 := t1.Add(time.Microsecond) + var d2 float64 = 0.5 + v2 := v1 + d2 + clk.SetTime(t2) + th.Add(d2) + t3 := t2 + for i := 0; i < 1000000; i++ { + t3 = t3.Add(time.Nanosecond) + clk.SetTime(t3) + th.Set(v2) + } + var d3 float64 = -0.6 + v3 := v2 + d3 + th.Add(d3) + t4 := t3.Add(time.Second) + clk.SetTime(t4) + + metric := &dto.Metric{} + writer := th.(prometheus.Metric) + err := writer.Write(metric) + if err != nil { + t.Error(err) + } + wroteHist := metric.Histogram + if want, got := uint64(t4.Sub(t0)), wroteHist.GetSampleCount(); want != got { + t.Errorf("Wanted %v but got %v", want, got) + } + if want, got := tDiff(t1, t0)*v0+tDiff(t2, t1)*v1+tDiff(t3, t2)*v2+tDiff(t4, t3)*v3, wroteHist.GetSampleSum(); want != got { + t.Errorf("Wanted %v but got %v", want, got) + } + wroteBuckets := wroteHist.GetBucket() + if len(wroteBuckets) != len(thTestBuckets) { + t.Errorf("Got buckets %#+v", wroteBuckets) + } + expectedCounts := []time.Duration{0, t1.Sub(t0), t2.Sub(t0) + t4.Sub(t3)} + for idx, ub := range thTestBuckets { + if want, got := uint64(expectedCounts[idx]), wroteBuckets[idx].GetCumulativeCount(); want != got { + t.Errorf("In bucket %d, wanted %v but got %v", idx, want, got) + } + if want, got := ub, wroteBuckets[idx].GetUpperBound(); want != got { + t.Errorf("In bucket %d, wanted %v but got %v", idx, want, got) + } + } +} + +// tDiff returns a time difference as float +func tDiff(hi, lo time.Time) float64 { return float64(hi.Sub(lo)) } + +func findAndRemove(metrics []GaugeOps, seek GaugeOps) ([]GaugeOps, bool) { + for idx, metric := range metrics { + if metric == seek { + return append(append([]GaugeOps{}, metrics[:idx]...), metrics[idx+1:]...), true + } + } + return metrics, false +} + +func TestTimeIntegrationDirect(t *testing.T) { + t0 := time.Now() + clk := &unsyncFakeClock{t0} + th, err := NewTestableTimingHistogram(clk.Now, TimingHistogramOpts{ + Name: "TestTimeIntegration", + Help: "helpless", + Buckets: thTestBuckets, + InitialValue: thTestV0, + }) + if err != nil { + t.Error(err) + return + } + t.Run("non-vec", exerciseTimingHistogramAndCollector(th, t0, clk, th.Collect, th)) +} + +func TestTimingHistogramVec(t *testing.T) { + t0 := time.Now() + clk := &unsyncFakeClock{t0} + vec := NewTestableTimingHistogramVec(clk.Now, TimingHistogramOpts{ + Name: "TestTimeIntegration", + Help: "helpless", + Buckets: thTestBuckets, + InitialValue: thTestV0, + }, "k1", "k2") + th1 := vec.With(prometheus.Labels{"k1": "a", "k2": "x"}) + th1b := vec.WithLabelValues("a", "x") + if th1 != th1b { + t.Errorf("Vector not functional") + } + t.Run("th1", exerciseTimingHistogramAndCollector(th1, t0, clk, vec.Collect, th1)) + t0 = clk.Now() + th2 := vec.WithLabelValues("a", "y") + if th1 == th2 { + t.Errorf("Vector does not distinguish label values") + } + t.Run("th2", exerciseTimingHistogramAndCollector(th2, t0, clk, vec.Collect, th1, th2)) + t0 = clk.Now() + th3 := vec.WithLabelValues("b", "y") + if th1 == th3 || th2 == th3 { + t.Errorf("Vector does not distinguish label values") + } + t.Run("th2", exerciseTimingHistogramAndCollector(th3, t0, clk, vec.Collect, th1, th2, th3)) +} + +type unsyncFakeClock struct { + now time.Time +} + +func (ufc *unsyncFakeClock) Now() time.Time { + return ufc.now +} + +func (ufc *unsyncFakeClock) SetTime(now time.Time) { + ufc.now = now +} + +func BenchmarkTimingHistogramDirect(b *testing.B) { + b.StopTimer() + now := time.Now() + clk := &unsyncFakeClock{now: now} + hist, err := NewTestableTimingHistogram(clk.Now, TimingHistogramOpts{ + Namespace: "testns", + Subsystem: "testsubsys", + Name: "testhist", + Help: "Me", + Buckets: []float64{1, 2, 4, 8, 16}, + }) + if err != nil { + b.Error(err) + } + var x int + b.StartTimer() + for i := 0; i < b.N; i++ { + clk.now = clk.now.Add(time.Duration(31-x) * time.Microsecond) + hist.Set(float64(x)) + x = (x + i) % 23 + } +} diff --git a/staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram_vec.go b/staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram_vec.go new file mode 100644 index 00000000000..7af1a45860a --- /dev/null +++ b/staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram_vec.go @@ -0,0 +1,111 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prometheusextension + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// GaugeVecOps is a bunch of Gauge that have the same +// Desc and are distinguished by the values for their variable labels. +type GaugeVecOps interface { + GetMetricWith(prometheus.Labels) (GaugeOps, error) + GetMetricWithLabelValues(lvs ...string) (GaugeOps, error) + With(prometheus.Labels) GaugeOps + WithLabelValues(...string) GaugeOps + CurryWith(prometheus.Labels) (GaugeVecOps, error) + MustCurryWith(prometheus.Labels) GaugeVecOps +} + +type TimingHistogramVec struct { + *prometheus.MetricVec +} + +var _ GaugeVecOps = &TimingHistogramVec{} +var _ prometheus.Collector = &TimingHistogramVec{} + +func NewTimingHistogramVec(opts TimingHistogramOpts, labelNames ...string) *TimingHistogramVec { + return NewTestableTimingHistogramVec(time.Now, opts, labelNames...) +} + +func NewTestableTimingHistogramVec(nowFunc func() time.Time, opts TimingHistogramOpts, labelNames ...string) *TimingHistogramVec { + desc := prometheus.NewDesc( + prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), + wrapTimingHelp(opts.Help), + labelNames, + opts.ConstLabels, + ) + return &TimingHistogramVec{ + MetricVec: prometheus.NewMetricVec(desc, func(lvs ...string) prometheus.Metric { + metric, err := newTimingHistogram(nowFunc, desc, opts, lvs...) + if err != nil { + panic(err) // like in prometheus.newHistogram + } + return metric + }), + } +} + +func (hv *TimingHistogramVec) GetMetricWith(labels prometheus.Labels) (GaugeOps, error) { + metric, err := hv.MetricVec.GetMetricWith(labels) + if metric != nil { + return metric.(GaugeOps), err + } + return nil, err +} + +func (hv *TimingHistogramVec) GetMetricWithLabelValues(lvs ...string) (GaugeOps, error) { + metric, err := hv.MetricVec.GetMetricWithLabelValues(lvs...) + if metric != nil { + return metric.(GaugeOps), err + } + return nil, err +} + +func (hv *TimingHistogramVec) With(labels prometheus.Labels) GaugeOps { + h, err := hv.GetMetricWith(labels) + if err != nil { + panic(err) + } + return h +} + +func (hv *TimingHistogramVec) WithLabelValues(lvs ...string) GaugeOps { + h, err := hv.GetMetricWithLabelValues(lvs...) + if err != nil { + panic(err) + } + return h +} + +func (hv *TimingHistogramVec) CurryWith(labels prometheus.Labels) (GaugeVecOps, error) { + vec, err := hv.MetricVec.CurryWith(labels) + if vec != nil { + return &TimingHistogramVec{MetricVec: vec}, err + } + return nil, err +} + +func (hv *TimingHistogramVec) MustCurryWith(labels prometheus.Labels) GaugeVecOps { + vec, err := hv.CurryWith(labels) + if err != nil { + panic(err) + } + return vec +} diff --git a/staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram.go b/staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram.go new file mode 100644 index 00000000000..a060019b254 --- /dev/null +++ b/staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram.go @@ -0,0 +1,203 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prometheusextension + +import ( + "fmt" + "math" + "sort" + "sync" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +// WeightedHistogram generalizes Histogram: each observation has +// an associated _weight_. For a given `x` and `N`, +// `1` call on `ObserveWithWeight(x, N)` has the same meaning as +// `N` calls on `ObserveWithWeight(x, 1)`. +// The weighted sum might differ slightly due to the use of +// floating point, although the implementation takes some steps +// to mitigate that. +// If every weight were 1, +// this would be the same as the existing Histogram abstraction. +type WeightedHistogram interface { + prometheus.Metric + prometheus.Collector + WeightedObserver +} + +// WeightedObserver generalizes the Observer interface. +type WeightedObserver interface { + // Set the variable to the given value with the given weight. + ObserveWithWeight(value float64, weight uint64) +} + +// WeightedHistogramOpts is the same as for an ordinary Histogram +type WeightedHistogramOpts = prometheus.HistogramOpts + +// NewWeightedHistogram creates a new WeightedHistogram +func NewWeightedHistogram(opts WeightedHistogramOpts) (WeightedHistogram, error) { + desc := prometheus.NewDesc( + prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), + wrapWeightedHelp(opts.Help), + nil, + opts.ConstLabels, + ) + return newWeightedHistogram(desc, opts) +} + +func wrapWeightedHelp(given string) string { + return "EXPERIMENTAL: " + given +} + +func newWeightedHistogram(desc *prometheus.Desc, opts WeightedHistogramOpts, variableLabelValues ...string) (*weightedHistogram, error) { + if len(opts.Buckets) == 0 { + opts.Buckets = prometheus.DefBuckets + } + + for i, upperBound := range opts.Buckets { + if i < len(opts.Buckets)-1 { + if upperBound >= opts.Buckets[i+1] { + return nil, fmt.Errorf( + "histogram buckets must be in increasing order: %f >= %f", + upperBound, opts.Buckets[i+1], + ) + } + } else { + if math.IsInf(upperBound, +1) { + // The +Inf bucket is implicit. Remove it here. + opts.Buckets = opts.Buckets[:i] + } + } + } + upperBounds := make([]float64, len(opts.Buckets)) + copy(upperBounds, opts.Buckets) + + return &weightedHistogram{ + desc: desc, + variableLabelValues: variableLabelValues, + upperBounds: upperBounds, + buckets: make([]uint64, len(upperBounds)+1), + hotCount: initialHotCount, + }, nil +} + +type weightedHistogram struct { + desc *prometheus.Desc + variableLabelValues []string + upperBounds []float64 // exclusive of +Inf + + lock sync.Mutex // applies to all the following + + // buckets is longer by one than upperBounds. + // For 0 <= idx < len(upperBounds), buckets[idx] holds the + // accumulated time.Duration that value has been <= + // upperBounds[idx] but not <= upperBounds[idx-1]. + // buckets[len(upperBounds)] holds the accumulated + // time.Duration when value fit in no other bucket. + buckets []uint64 + + // sumHot + sumCold is the weighted sum of value. + // Rather than risk loss of precision in one + // float64, we do this sum hierarchically. Many successive + // increments are added into sumHot; once in a while + // the magnitude of sumHot is compared to the magnitude + // of sumCold and, if the ratio is high enough, + // sumHot is transferred into sumCold. + sumHot float64 + sumCold float64 + + transferThreshold float64 // = math.Abs(sumCold) / 2^26 (that's about half of the bits of precision in a float64) + + // hotCount is used to decide when to consider dumping sumHot into sumCold. + // hotCount counts upward from initialHotCount to zero. + hotCount int +} + +// initialHotCount is the negative of the number of terms +// that are summed into sumHot before considering whether +// to transfer to sumCold. This only has to be big enough +// to make the extra floating point operations occur in a +// distinct minority of cases. +const initialHotCount = -15 + +var _ WeightedHistogram = &weightedHistogram{} +var _ prometheus.Metric = &weightedHistogram{} +var _ prometheus.Collector = &weightedHistogram{} + +func (sh *weightedHistogram) ObserveWithWeight(value float64, weight uint64) { + idx := sort.SearchFloat64s(sh.upperBounds, value) + sh.lock.Lock() + defer sh.lock.Unlock() + sh.updateLocked(idx, value, weight) +} + +func (sh *weightedHistogram) observeWithWeightLocked(value float64, weight uint64) { + idx := sort.SearchFloat64s(sh.upperBounds, value) + sh.updateLocked(idx, value, weight) +} + +func (sh *weightedHistogram) updateLocked(idx int, value float64, weight uint64) { + sh.buckets[idx] += weight + newSumHot := sh.sumHot + float64(weight)*value + sh.hotCount++ + if sh.hotCount >= 0 { + sh.hotCount = initialHotCount + if math.Abs(newSumHot) > sh.transferThreshold { + newSumCold := sh.sumCold + newSumHot + sh.sumCold = newSumCold + sh.transferThreshold = math.Abs(newSumCold / 67108864) + sh.sumHot = 0 + return + } + } + sh.sumHot = newSumHot +} + +func (sh *weightedHistogram) Desc() *prometheus.Desc { + return sh.desc +} + +func (sh *weightedHistogram) Write(dest *dto.Metric) error { + count, sum, buckets := func() (uint64, float64, map[float64]uint64) { + sh.lock.Lock() + defer sh.lock.Unlock() + nBounds := len(sh.upperBounds) + buckets := make(map[float64]uint64, nBounds) + var count uint64 + for idx, upperBound := range sh.upperBounds { + count += sh.buckets[idx] + buckets[upperBound] = count + } + count += sh.buckets[nBounds] + return count, sh.sumHot + sh.sumCold, buckets + }() + metric, err := prometheus.NewConstHistogram(sh.desc, count, sum, buckets, sh.variableLabelValues...) + if err != nil { + return err + } + return metric.Write(dest) +} + +func (sh *weightedHistogram) Describe(ch chan<- *prometheus.Desc) { + ch <- sh.desc +} + +func (sh *weightedHistogram) Collect(ch chan<- prometheus.Metric) { + ch <- sh +} diff --git a/staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram_test.go b/staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram_test.go new file mode 100644 index 00000000000..d9b5fbbd0dc --- /dev/null +++ b/staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram_test.go @@ -0,0 +1,302 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prometheusextension + +import ( + "fmt" + "math" + "math/rand" + "sort" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +// Float64Slice is a slice of float64 that sorts by magnitude +type Float64Slice []float64 + +func (fs Float64Slice) Len() int { return len(fs) } + +func (fs Float64Slice) Less(i, j int) bool { return math.Abs(fs[i]) < math.Abs(fs[j]) } + +func (fs Float64Slice) Swap(i, j int) { fs[i], fs[j] = fs[j], fs[i] } + +// weightedHistogramSpecFunc returns a WeightedHistogram and the upper bounds +// to expect it to have. +// Every invocation of the same function returns the same histogram. +type weightedHistogramSpecFunc func() (wh WeightedObserver, upperBounds []float64) + +// exerciseWeightedHistograms exercises a given collection of WeightedHistograms. +// Each histogram is given by a function that returns it, so that we can test +// that the Vec functions return the same result for the same input. +// For each histogram, with N upper bounds, the exercise provides two 2N+1 values: +// the upper bounds and values halfway between them (extended below the bottom and above +// the top). For the Jth value, there are J*m1 calls to ObserveWithWeight with m1 +// chosen so that m1 * sum[1 <= J <= 2N+1] J is large enough to trigger several +// considerations of spilling from sumHot to sumCold. +// The ObserveWithWeight calls to the various histograms are interleaved to check +// that there is no interference between them. +func exerciseWeightedHistograms(t *testing.T, whSpecs ...weightedHistogramSpecFunc) { + var whos []weightedHistogramObs + expectations := []whExerciseExpectation{} + // Create expectations and specs of calls ot ObserveWithWeight + for whIdx, whSpec := range whSpecs { + wh, upperBounds := whSpec() + numUBs := len(upperBounds) + numWhos := numUBs*2 + 1 + multSum := (numWhos * (numWhos + 1)) / 2 + m1 := (-10 * initialHotCount) / multSum + terms := Float64Slice{} + ee := whExerciseExpectation{wh: wh, + upperBounds: upperBounds, + buckets: make([]uint64, numUBs), + } + addWHOs := func(val float64, weight uint64, mult, idx int) { + multipliedWeight := weight * uint64(mult) + terms = append(terms, val*float64(multipliedWeight)) + t.Logf("For WH %d, adding obs val=%v, weight=%v, mult=%d, idx=%d", whIdx, val, weight, mult, idx) + for i := 0; i < mult; i++ { + whos = append(whos, weightedHistogramObs{whSpec, val, weight}) + } + for j := idx; j < numUBs; j++ { + ee.buckets[j] += multipliedWeight + } + ee.count += multipliedWeight + } + for idx, ub := range upperBounds { + var val float64 + if idx > 0 { + val = (upperBounds[idx-1] + ub) / 2 + } else if numUBs > 1 { + val = (3*ub - upperBounds[1]) / 2 + } else { + val = ub - 1 + } + addWHOs(val, (1 << rand.Intn(40)), (2*idx+1)*m1, idx) + addWHOs(ub, (1 << rand.Intn(40)), (2*idx+2)*m1, idx) + } + val := upperBounds[numUBs-1] + 1 + if numUBs > 1 { + val = (3*upperBounds[numUBs-1] - upperBounds[numUBs-2]) / 2 + } + addWHOs(val, 1+uint64(rand.Intn(1000000)), (2*numUBs+1)*m1, numUBs) + sort.Sort(terms) + for _, term := range terms { + ee.sum += term + } + t.Logf("Adding expectation %#+v", ee) + expectations = append(expectations, ee) + } + // Do the planned calls on ObserveWithWeight, in randomized order + for len(whos) > 0 { + var wi weightedHistogramObs + whos, wi = whosPick(whos) + wh, _ := wi.whSpec() + wh.ObserveWithWeight(wi.val, wi.weight) + // t.Logf("ObserveWithWeight(%v, %v) => %#+v", wi.val, wi.weight, wh) + } + // Check expectations + for idx, ee := range expectations { + wh := ee.wh + whAsMetric := wh.(prometheus.Metric) + var metric dto.Metric + whAsMetric.Write(&metric) + actualHist := metric.GetHistogram() + if actualHist == nil { + t.Errorf("At idx=%d, Write produced nil Histogram", idx) + } + actualCount := actualHist.GetSampleCount() + if actualCount != ee.count { + t.Errorf("At idx=%d, expected count %v but got %v", idx, ee.count, actualCount) + + } + actualBuckets := actualHist.GetBucket() + if len(ee.buckets) != len(actualBuckets) { + t.Errorf("At idx=%d, expected %v buckets but got %v", idx, len(ee.buckets), len(actualBuckets)) + } + for j := 0; j < len(ee.buckets) && j < len(actualBuckets); j++ { + actualUB := actualBuckets[j].GetUpperBound() + actualCount := actualBuckets[j].GetCumulativeCount() + if ee.upperBounds[j] != actualUB { + t.Errorf("At idx=%d, bucket %d, expected upper bound %v but got %v, err=%v", idx, j, ee.upperBounds[j], actualUB, actualUB-ee.upperBounds[j]) + } + if ee.buckets[j] != actualCount { + t.Errorf("At idx=%d, bucket %d expected count %d but got %d", idx, j, ee.buckets[j], actualCount) + } + } + actualSum := actualHist.GetSampleSum() + num := math.Abs(actualSum - ee.sum) + den := math.Max(math.Abs(actualSum), math.Abs(ee.sum)) + if num > den/1e14 { + t.Errorf("At idx=%d, expected sum %v but got %v, err=%v", idx, ee.sum, actualSum, actualSum-ee.sum) + } + } +} + +// weightedHistogramObs prescribes a call on WeightedHistogram::ObserveWithWeight +type weightedHistogramObs struct { + whSpec weightedHistogramSpecFunc + val float64 + weight uint64 +} + +// whExerciseExpectation is the expected result from exercising a WeightedHistogram +type whExerciseExpectation struct { + wh WeightedObserver + upperBounds []float64 + buckets []uint64 + sum float64 + count uint64 +} + +func whosPick(whos []weightedHistogramObs) ([]weightedHistogramObs, weightedHistogramObs) { + n := len(whos) + if n < 2 { + return whos[:0], whos[0] + } + idx := rand.Intn(n) + ans := whos[idx] + whos[idx] = whos[n-1] + return whos[:n-1], ans +} + +func TestOneWeightedHistogram(t *testing.T) { + // First, some literal test cases + for _, testCase := range []struct { + name string + upperBounds []float64 + }{ + {"one bucket", []float64{0.07}}, + {"two buckets", []float64{0.07, 0.13}}, + {"three buckets", []float64{0.07, 0.13, 1e6}}, + } { + t.Run(testCase.name, func(t *testing.T) { + wh, err := NewWeightedHistogram(WeightedHistogramOpts{ + Namespace: "testns", + Subsystem: "testsubsys", + Name: "testhist", + Help: "Me", + Buckets: testCase.upperBounds, + }) + if err != nil { + t.Error(err) + } + exerciseWeightedHistograms(t, func() (WeightedObserver, []float64) { return wh, testCase.upperBounds }) + }) + } + // Now, some randomized test cases + for i := 0; i < 10; i++ { + name := fmt.Sprintf("random_case_%d", i) + t.Run(name, func(t *testing.T) { + nBounds := rand.Intn(10) + 1 + ubs := []float64{} + var bound float64 + for j := 0; j < nBounds; j++ { + bound += rand.Float64() + ubs = append(ubs, bound) + } + wh, err := NewWeightedHistogram(WeightedHistogramOpts{ + Namespace: "testns", + Subsystem: "testsubsys", + Name: name, + Help: "Me", + Buckets: ubs, + ConstLabels: prometheus.Labels{"k0": "v0"}, + }) + if err != nil { + t.Error(err) + } + exerciseWeightedHistograms(t, func() (WeightedObserver, []float64) { return wh, ubs }) + }) + } +} + +func TestWeightedHistogramVec(t *testing.T) { + ubs1 := []float64{0.07, 1.3, 1e6} + vec1 := NewWeightedHistogramVec(WeightedHistogramOpts{ + Namespace: "testns", + Subsystem: "testsubsys", + Name: "vec1", + Help: "Me", + Buckets: ubs1, + ConstLabels: prometheus.Labels{"k0": "v0"}, + }, "k1", "k2") + gen1 := func(lvs ...string) func() (WeightedObserver, []float64) { + return func() (WeightedObserver, []float64) { return vec1.WithLabelValues(lvs...), ubs1 } + } + ubs2 := []float64{-0.03, 0.71, 1e9} + vec2 := NewWeightedHistogramVec(WeightedHistogramOpts{ + Namespace: "testns", + Subsystem: "testsubsys", + Name: "vec2", + Help: "Me", + Buckets: ubs2, + ConstLabels: prometheus.Labels{"j0": "u0"}, + }, "j1", "j2") + gen2 := func(lvs ...string) func() (WeightedObserver, []float64) { + varLabels := prometheus.Labels{} + varLabels["j1"] = lvs[0] + varLabels["j2"] = lvs[1] + return func() (WeightedObserver, []float64) { return vec2.With(varLabels), ubs2 } + } + exerciseWeightedHistograms(t, + gen1("v11", "v21"), + gen1("v12", "v21"), + gen1("v12", "v22"), + gen2("a", "b"), + gen2("a", "c"), + gen2("b", "c"), + ) +} + +func BenchmarkWeightedHistogram(b *testing.B) { + b.StopTimer() + wh, err := NewWeightedHistogram(WeightedHistogramOpts{ + Namespace: "testns", + Subsystem: "testsubsys", + Name: "testhist", + Help: "Me", + Buckets: []float64{1, 2, 4, 8, 16}, + }) + if err != nil { + b.Error(err) + } + var x int + b.StartTimer() + for i := 0; i < b.N; i++ { + wh.ObserveWithWeight(float64(x), uint64(i)%32+1) + x = (x + i) % 20 + } +} + +func BenchmarkHistogram(b *testing.B) { + b.StopTimer() + hist := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "testns", + Subsystem: "testsubsys", + Name: "testhist", + Help: "Me", + Buckets: []float64{1, 2, 4, 8, 16}, + }) + var x int + b.StartTimer() + for i := 0; i < b.N; i++ { + hist.Observe(float64(x)) + x = (x + i) % 20 + } +} diff --git a/staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram_vec.go b/staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram_vec.go new file mode 100644 index 00000000000..2ca95f0a7ff --- /dev/null +++ b/staging/src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram_vec.go @@ -0,0 +1,106 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prometheusextension + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// WeightedObserverVec is a bunch of WeightedObservers that have the same +// Desc and are distinguished by the values for their variable labels. +type WeightedObserverVec interface { + GetMetricWith(prometheus.Labels) (WeightedObserver, error) + GetMetricWithLabelValues(lvs ...string) (WeightedObserver, error) + With(prometheus.Labels) WeightedObserver + WithLabelValues(...string) WeightedObserver + CurryWith(prometheus.Labels) (WeightedObserverVec, error) + MustCurryWith(prometheus.Labels) WeightedObserverVec +} + +// WeightedHistogramVec implements WeightedObserverVec +type WeightedHistogramVec struct { + *prometheus.MetricVec +} + +var _ WeightedObserverVec = &WeightedHistogramVec{} +var _ prometheus.Collector = &WeightedHistogramVec{} + +func NewWeightedHistogramVec(opts WeightedHistogramOpts, labelNames ...string) *WeightedHistogramVec { + desc := prometheus.NewDesc( + prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), + wrapWeightedHelp(opts.Help), + labelNames, + opts.ConstLabels, + ) + return &WeightedHistogramVec{ + MetricVec: prometheus.NewMetricVec(desc, func(lvs ...string) prometheus.Metric { + metric, err := newWeightedHistogram(desc, opts, lvs...) + if err != nil { + panic(err) // like in prometheus.newHistogram + } + return metric + }), + } +} + +func (hv *WeightedHistogramVec) GetMetricWith(labels prometheus.Labels) (WeightedObserver, error) { + metric, err := hv.MetricVec.GetMetricWith(labels) + if metric != nil { + return metric.(WeightedObserver), err + } + return nil, err +} + +func (hv *WeightedHistogramVec) GetMetricWithLabelValues(lvs ...string) (WeightedObserver, error) { + metric, err := hv.MetricVec.GetMetricWithLabelValues(lvs...) + if metric != nil { + return metric.(WeightedObserver), err + } + return nil, err +} + +func (hv *WeightedHistogramVec) With(labels prometheus.Labels) WeightedObserver { + h, err := hv.GetMetricWith(labels) + if err != nil { + panic(err) + } + return h +} + +func (hv *WeightedHistogramVec) WithLabelValues(lvs ...string) WeightedObserver { + h, err := hv.GetMetricWithLabelValues(lvs...) + if err != nil { + panic(err) + } + return h +} + +func (hv *WeightedHistogramVec) CurryWith(labels prometheus.Labels) (WeightedObserverVec, error) { + vec, err := hv.MetricVec.CurryWith(labels) + if vec != nil { + return &WeightedHistogramVec{MetricVec: vec}, err + } + return nil, err +} + +func (hv *WeightedHistogramVec) MustCurryWith(labels prometheus.Labels) WeightedObserverVec { + vec, err := hv.CurryWith(labels) + if err != nil { + panic(err) + } + return vec +}