From 0e291d1f1b412a4a090d4f36e643db097322faae Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Tue, 7 May 2019 17:35:43 +0200 Subject: [PATCH 1/2] pkg/util/workqueue/prometheus: fix double registration Currently, if workqueue metrics are registered twice, these metrics will be ignored. This fixes it. --- pkg/util/workqueue/prometheus/prometheus.go | 232 ++++++++++-------- .../workqueue/prometheus/prometheus_test.go | 232 ++++++++++++++++++ 2 files changed, 360 insertions(+), 104 deletions(-) create mode 100644 pkg/util/workqueue/prometheus/prometheus_test.go diff --git a/pkg/util/workqueue/prometheus/prometheus.go b/pkg/util/workqueue/prometheus/prometheus.go index 75c460ed0ef..efa3475ddf7 100644 --- a/pkg/util/workqueue/prometheus/prometheus.go +++ b/pkg/util/workqueue/prometheus/prometheus.go @@ -38,120 +38,156 @@ const ( RetriesKey = "retries_total" ) +var ( + depth = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: WorkQueueSubsystem, + Name: DepthKey, + Help: "Current depth of workqueue", + }, + []string{"name"}, + ) + + adds = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: WorkQueueSubsystem, + Name: AddsKey, + Help: "Total number of adds handled by workqueue", + }, + []string{"name"}, + ) + + latency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: WorkQueueSubsystem, + Name: QueueLatencyKey, + Help: "How long in seconds an item stays in workqueue before being requested.", + Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10), + }, + []string{"name"}, + ) + + workDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: WorkQueueSubsystem, + Name: WorkDurationKey, + Help: "How long in seconds processing an item from workqueue takes.", + Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10), + }, + []string{"name"}, + ) + + unfinished = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: WorkQueueSubsystem, + Name: UnfinishedWorkKey, + Help: "How many seconds of work has done that " + + "is in progress and hasn't been observed by work_duration. Large " + + "values indicate stuck threads. One can deduce the number of stuck " + + "threads by observing the rate at which this increases.", + }, + []string{"name"}, + ) + + longestRunningProcessor = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: WorkQueueSubsystem, + Name: LongestRunningProcessorKey, + Help: "How many seconds has the longest running " + + "processor for workqueue been running.", + }, + []string{"name"}, + ) + + retries = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: WorkQueueSubsystem, + Name: RetriesKey, + Help: "Total number of retries handled by workqueue", + }, + []string{"name"}, + ) +) + +func registerMetrics() { + prometheus.MustRegister( + depth, + adds, + latency, + workDuration, + unfinished, + longestRunningProcessor, + retries, + ) +} + func init() { + registerMetrics() workqueue.SetProvider(prometheusMetricsProvider{}) } type prometheusMetricsProvider struct{} func (prometheusMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { - depth := prometheus.NewGauge(prometheus.GaugeOpts{ - Subsystem: WorkQueueSubsystem, - Name: DepthKey, - Help: "Current depth of workqueue", - ConstLabels: prometheus.Labels{"name": name}, - }) - if err := prometheus.Register(depth); err != nil { - klog.Errorf("failed to register depth metric %v: %v", name, err) - } - return depth + return depth.WithLabelValues(name) } func (prometheusMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { - adds := prometheus.NewCounter(prometheus.CounterOpts{ - Subsystem: WorkQueueSubsystem, - Name: AddsKey, - Help: "Total number of adds handled by workqueue", - ConstLabels: prometheus.Labels{"name": name}, - }) - if err := prometheus.Register(adds); err != nil { - klog.Errorf("failed to register adds metric %v: %v", name, err) - } - return adds + return adds.WithLabelValues(name) } func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric { - latency := prometheus.NewHistogram(prometheus.HistogramOpts{ - Subsystem: WorkQueueSubsystem, - Name: QueueLatencyKey, - Help: "How long in seconds an item stays in workqueue before being requested.", - ConstLabels: prometheus.Labels{"name": name}, - Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10), - }) - if err := prometheus.Register(latency); err != nil { - klog.Errorf("failed to register latency metric %v: %v", name, err) - } - return latency + return latency.WithLabelValues(name) } func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric { - workDuration := prometheus.NewHistogram(prometheus.HistogramOpts{ - Subsystem: WorkQueueSubsystem, - Name: WorkDurationKey, - Help: "How long in seconds processing an item from workqueue takes.", - ConstLabels: prometheus.Labels{"name": name}, - Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10), - }) - if err := prometheus.Register(workDuration); err != nil { - klog.Errorf("failed to register workDuration metric %v: %v", name, err) - } - return workDuration + return workDuration.WithLabelValues(name) } func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { - unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ - Subsystem: WorkQueueSubsystem, - Name: UnfinishedWorkKey, - Help: "How many seconds of work has done that " + - "is in progress and hasn't been observed by work_duration. Large " + - "values indicate stuck threads. One can deduce the number of stuck " + - "threads by observing the rate at which this increases.", - ConstLabels: prometheus.Labels{"name": name}, - }) - if err := prometheus.Register(unfinished); err != nil { - klog.Errorf("failed to register unfinished metric %v: %v", name, err) - } - return unfinished + return unfinished.WithLabelValues(name) } func (prometheusMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { - longestRunningProcessor := prometheus.NewGauge(prometheus.GaugeOpts{ - Subsystem: WorkQueueSubsystem, - Name: LongestRunningProcessorKey, - Help: "How many seconds has the longest running " + - "processor for workqueue been running.", - ConstLabels: prometheus.Labels{"name": name}, - }) - if err := prometheus.Register(longestRunningProcessor); err != nil { - klog.Errorf("failed to register unfinished metric %v: %v", name, err) - } - return longestRunningProcessor + return longestRunningProcessor.WithLabelValues(name) } func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { - retries := prometheus.NewCounter(prometheus.CounterOpts{ - Subsystem: WorkQueueSubsystem, - Name: RetriesKey, - Help: "Total number of retries handled by workqueue", - ConstLabels: prometheus.Labels{"name": name}, - }) - if err := prometheus.Register(retries); err != nil { - klog.Errorf("failed to register retries metric %v: %v", name, err) - } - return retries + return retries.WithLabelValues(name) } // TODO(danielqsj): Remove the following metrics, they are deprecated + +// mustRegister attempts to register the given collector with the given metric, and name +// and returns the registered collector. The caller must use the returned collector +// as it might point to a different instance of an already registered collector. +func mustRegister(metric, name string, c prometheus.Collector) prometheus.Collector { + err := prometheus.Register(c) + if err == nil { + return c + } + + if aerr, ok := err.(prometheus.AlreadyRegisteredError); ok { + klog.V(4).Infof("reusing already registered metric %v name %v", metric, name) + return aerr.ExistingCollector + } + + // this should fail hard as this indicates a programmatic error, i.e. + // an invalid or duplicate metric descriptor, + // a previously registered descriptor with the same fqdn but different labels, + // or inconsistent label names or help strings for the same fqdn. + klog.Fatalf("failed to register metric %v name %v: %v", metric, name, err) + return nil +} + func (prometheusMetricsProvider) NewDeprecatedDepthMetric(name string) workqueue.GaugeMetric { depth := prometheus.NewGauge(prometheus.GaugeOpts{ Subsystem: name, Name: "depth", Help: "(Deprecated) Current depth of workqueue: " + name, }) - if err := prometheus.Register(depth); err != nil { - klog.Errorf("failed to register depth metric %v: %v", name, err) - } - return depth + + return mustRegister("depth", name, depth).(prometheus.Gauge) } func (prometheusMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue.CounterMetric { @@ -160,10 +196,8 @@ func (prometheusMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue. Name: "adds", Help: "(Deprecated) Total number of adds handled by workqueue: " + name, }) - if err := prometheus.Register(adds); err != nil { - klog.Errorf("failed to register adds metric %v: %v", name, err) - } - return adds + + return mustRegister("adds", name, adds).(prometheus.Counter) } func (prometheusMetricsProvider) NewDeprecatedLatencyMetric(name string) workqueue.SummaryMetric { @@ -172,10 +206,8 @@ func (prometheusMetricsProvider) NewDeprecatedLatencyMetric(name string) workque Name: "queue_latency", Help: "(Deprecated) How long an item stays in workqueue" + name + " before being requested.", }) - if err := prometheus.Register(latency); err != nil { - klog.Errorf("failed to register latency metric %v: %v", name, err) - } - return latency + + return mustRegister("queue_latency", name, latency).(prometheus.Summary) } func (prometheusMetricsProvider) NewDeprecatedWorkDurationMetric(name string) workqueue.SummaryMetric { @@ -184,10 +216,8 @@ func (prometheusMetricsProvider) NewDeprecatedWorkDurationMetric(name string) wo Name: "work_duration", Help: "(Deprecated) How long processing an item from workqueue" + name + " takes.", }) - if err := prometheus.Register(workDuration); err != nil { - klog.Errorf("failed to register work_duration metric %v: %v", name, err) - } - return workDuration + + return mustRegister("work_duration", name, workDuration).(prometheus.Summary) } func (prometheusMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { @@ -199,10 +229,8 @@ func (prometheusMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name s "values indicate stuck threads. One can deduce the number of stuck " + "threads by observing the rate at which this increases.", }) - if err := prometheus.Register(unfinished); err != nil { - klog.Errorf("failed to register unfinished_work_seconds metric %v: %v", name, err) - } - return unfinished + + return mustRegister("unfinished_work_seconds", name, unfinished).(prometheus.Gauge) } func (prometheusMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric { @@ -212,10 +240,8 @@ func (prometheusMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecond Help: "(Deprecated) How many microseconds has the longest running " + "processor for " + name + " been running.", }) - if err := prometheus.Register(unfinished); err != nil { - klog.Errorf("failed to register longest_running_processor_microseconds metric %v: %v", name, err) - } - return unfinished + + return mustRegister("longest_running_processor_microseconds", name, unfinished).(prometheus.Gauge) } func (prometheusMetricsProvider) NewDeprecatedRetriesMetric(name string) workqueue.CounterMetric { @@ -224,8 +250,6 @@ func (prometheusMetricsProvider) NewDeprecatedRetriesMetric(name string) workque Name: "retries", Help: "(Deprecated) Total number of retries handled by workqueue: " + name, }) - if err := prometheus.Register(retries); err != nil { - klog.Errorf("failed to register retries metric %v: %v", name, err) - } - return retries + + return mustRegister("retries", name, retries).(prometheus.Counter) } diff --git a/pkg/util/workqueue/prometheus/prometheus_test.go b/pkg/util/workqueue/prometheus/prometheus_test.go new file mode 100644 index 00000000000..ff6d125e8bf --- /dev/null +++ b/pkg/util/workqueue/prometheus/prometheus_test.go @@ -0,0 +1,232 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prometheus + +import ( + "fmt" + "testing" + + client "github.com/prometheus/client_model/go" + + "github.com/prometheus/client_golang/prometheus" +) + +func TestRegistration(t *testing.T) { + type checkFunc func([]*client.MetricFamily) error + + gaugeHasValue := func(name string, expected float64) checkFunc { + return func(mfs []*client.MetricFamily) error { + for _, mf := range mfs { + if mf.GetName() != name { + continue + } + + if got := mf.GetMetric()[0].Gauge.GetValue(); got != expected { + return fmt.Errorf("expected %q gauge value %v, got %v", name, expected, got) + } + + return nil + } + + return fmt.Errorf("want metric %q, got none", name) + } + } + + counterHasValue := func(name string, expected float64) checkFunc { + return func(mfs []*client.MetricFamily) error { + for _, mf := range mfs { + if mf.GetName() != name { + continue + } + + if got := mf.GetMetric()[0].Counter.GetValue(); got != expected { + return fmt.Errorf("expected %q counter value %v, got %v", name, expected, got) + } + + return nil + } + + return fmt.Errorf("want metric %q, got none", name) + } + } + + histogramHasSum := func(name string, expected float64) checkFunc { + return func(mfs []*client.MetricFamily) error { + for _, mf := range mfs { + if mf.GetName() != name { + continue + } + + if got := mf.GetMetric()[0].Histogram.GetSampleSum(); got != expected { + return fmt.Errorf("expected %q histogram sample sum %v, got %v", name, expected, got) + } + + return nil + } + + return fmt.Errorf("want metric %q, got none", name) + } + } + + tests := []struct { + name string + register func(*prometheusMetricsProvider) + checks []checkFunc + }{ + { + name: "depth", + + register: func(p *prometheusMetricsProvider) { + d := p.NewDepthMetric("foo") + d.Inc() + }, + + checks: []checkFunc{ + gaugeHasValue("workqueue_depth", 1.0), + }, + }, + { + name: "adds", + + register: func(p *prometheusMetricsProvider) { + d := p.NewAddsMetric("foo") + d.Inc() + }, + + checks: []checkFunc{ + counterHasValue("workqueue_adds_total", 1.0), + }, + }, + { + name: "latency", + + register: func(p *prometheusMetricsProvider) { + d := p.NewLatencyMetric("foo") + d.Observe(10.0) + }, + + checks: []checkFunc{ + histogramHasSum("workqueue_queue_duration_seconds", 10.0), + }, + }, + { + name: "duration", + + register: func(p *prometheusMetricsProvider) { + d := p.NewWorkDurationMetric("foo") + d.Observe(10.0) + }, + + checks: []checkFunc{ + histogramHasSum("workqueue_work_duration_seconds", 10.0), + }, + }, + { + name: "unfinished work", + + register: func(p *prometheusMetricsProvider) { + d := p.NewUnfinishedWorkSecondsMetric("foo") + d.Set(3.0) + }, + + checks: []checkFunc{ + gaugeHasValue("workqueue_unfinished_work_seconds", 3.0), + }, + }, + { + name: "unfinished work", + + register: func(p *prometheusMetricsProvider) { + d := p.NewUnfinishedWorkSecondsMetric("foo") + d.Set(3.0) + }, + + checks: []checkFunc{ + gaugeHasValue("workqueue_unfinished_work_seconds", 3.0), + }, + }, + { + name: "longest running processor", + + register: func(p *prometheusMetricsProvider) { + d := p.NewLongestRunningProcessorSecondsMetric("foo") + d.Set(3.0) + }, + + checks: []checkFunc{ + gaugeHasValue("workqueue_longest_running_processor_seconds", 3.0), + }, + }, + { + name: "retries", + + register: func(p *prometheusMetricsProvider) { + d := p.NewRetriesMetric("foo") + d.Inc() + }, + + checks: []checkFunc{ + counterHasValue("workqueue_retries_total", 1.0), + }, + }, + + { + name: "double registration", + + register: func(p *prometheusMetricsProvider) { + d1 := p.NewDepthMetric("bar") + d1.Inc() + d2 := p.NewDepthMetric("bar") + d2.Inc() + + d3 := p.NewDeprecatedDepthMetric("bar") + d3.Inc() + d4 := p.NewDeprecatedDepthMetric("bar") + d4.Inc() + }, + + checks: []checkFunc{ + gaugeHasValue("workqueue_depth", 2.0), + gaugeHasValue("bar_depth", 2.0), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // reset prometheus registry for each test + reg := prometheus.NewRegistry() + prometheus.DefaultRegisterer = reg + prometheus.DefaultGatherer = reg + registerMetrics() + + var p prometheusMetricsProvider + + tc.register(&p) + mfs, err := prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatal(err) + } + + for _, check := range tc.checks { + if err := check(mfs); err != nil { + t.Error(err) + } + } + }) + } +} From 4532cfd85c00da6f64e03fcf05f5636adc1151c7 Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Mon, 13 May 2019 13:22:08 +0200 Subject: [PATCH 2/2] pkg/util/workqueue: delete deprecated metrics This deletes deprecated metrics and simplifies registration. --- pkg/util/workqueue/prometheus/BUILD | 1 - pkg/util/workqueue/prometheus/prometheus.go | 99 -------- .../workqueue/prometheus/prometheus_test.go | 232 ------------------ .../util/workqueue/delaying_queue.go | 17 +- .../client-go/util/workqueue/metrics.go | 84 +------ .../client-go/util/workqueue/metrics_test.go | 92 ------- .../workqueue/rate_limiting_queue_test.go | 13 +- 7 files changed, 22 insertions(+), 516 deletions(-) delete mode 100644 pkg/util/workqueue/prometheus/prometheus_test.go diff --git a/pkg/util/workqueue/prometheus/BUILD b/pkg/util/workqueue/prometheus/BUILD index d9708f7eb4f..62a98289a50 100644 --- a/pkg/util/workqueue/prometheus/BUILD +++ b/pkg/util/workqueue/prometheus/BUILD @@ -12,7 +12,6 @@ go_library( deps = [ "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", - "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/util/workqueue/prometheus/prometheus.go b/pkg/util/workqueue/prometheus/prometheus.go index efa3475ddf7..8b26d35fffe 100644 --- a/pkg/util/workqueue/prometheus/prometheus.go +++ b/pkg/util/workqueue/prometheus/prometheus.go @@ -18,7 +18,6 @@ package prometheus import ( "k8s.io/client-go/util/workqueue" - "k8s.io/klog" "github.com/prometheus/client_golang/prometheus" ) @@ -155,101 +154,3 @@ func (prometheusMetricsProvider) NewLongestRunningProcessorSecondsMetric(name st func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { return retries.WithLabelValues(name) } - -// TODO(danielqsj): Remove the following metrics, they are deprecated - -// mustRegister attempts to register the given collector with the given metric, and name -// and returns the registered collector. The caller must use the returned collector -// as it might point to a different instance of an already registered collector. -func mustRegister(metric, name string, c prometheus.Collector) prometheus.Collector { - err := prometheus.Register(c) - if err == nil { - return c - } - - if aerr, ok := err.(prometheus.AlreadyRegisteredError); ok { - klog.V(4).Infof("reusing already registered metric %v name %v", metric, name) - return aerr.ExistingCollector - } - - // this should fail hard as this indicates a programmatic error, i.e. - // an invalid or duplicate metric descriptor, - // a previously registered descriptor with the same fqdn but different labels, - // or inconsistent label names or help strings for the same fqdn. - klog.Fatalf("failed to register metric %v name %v: %v", metric, name, err) - return nil -} - -func (prometheusMetricsProvider) NewDeprecatedDepthMetric(name string) workqueue.GaugeMetric { - depth := prometheus.NewGauge(prometheus.GaugeOpts{ - Subsystem: name, - Name: "depth", - Help: "(Deprecated) Current depth of workqueue: " + name, - }) - - return mustRegister("depth", name, depth).(prometheus.Gauge) -} - -func (prometheusMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue.CounterMetric { - adds := prometheus.NewCounter(prometheus.CounterOpts{ - Subsystem: name, - Name: "adds", - Help: "(Deprecated) Total number of adds handled by workqueue: " + name, - }) - - return mustRegister("adds", name, adds).(prometheus.Counter) -} - -func (prometheusMetricsProvider) NewDeprecatedLatencyMetric(name string) workqueue.SummaryMetric { - latency := prometheus.NewSummary(prometheus.SummaryOpts{ - Subsystem: name, - Name: "queue_latency", - Help: "(Deprecated) How long an item stays in workqueue" + name + " before being requested.", - }) - - return mustRegister("queue_latency", name, latency).(prometheus.Summary) -} - -func (prometheusMetricsProvider) NewDeprecatedWorkDurationMetric(name string) workqueue.SummaryMetric { - workDuration := prometheus.NewSummary(prometheus.SummaryOpts{ - Subsystem: name, - Name: "work_duration", - Help: "(Deprecated) How long processing an item from workqueue" + name + " takes.", - }) - - return mustRegister("work_duration", name, workDuration).(prometheus.Summary) -} - -func (prometheusMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { - unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ - Subsystem: name, - Name: "unfinished_work_seconds", - Help: "(Deprecated) How many seconds of work " + name + " has done that " + - "is in progress and hasn't been observed by work_duration. Large " + - "values indicate stuck threads. One can deduce the number of stuck " + - "threads by observing the rate at which this increases.", - }) - - return mustRegister("unfinished_work_seconds", name, unfinished).(prometheus.Gauge) -} - -func (prometheusMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric { - unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ - Subsystem: name, - Name: "longest_running_processor_microseconds", - Help: "(Deprecated) How many microseconds has the longest running " + - "processor for " + name + " been running.", - }) - - return mustRegister("longest_running_processor_microseconds", name, unfinished).(prometheus.Gauge) -} - -func (prometheusMetricsProvider) NewDeprecatedRetriesMetric(name string) workqueue.CounterMetric { - retries := prometheus.NewCounter(prometheus.CounterOpts{ - Subsystem: name, - Name: "retries", - Help: "(Deprecated) Total number of retries handled by workqueue: " + name, - }) - - return mustRegister("retries", name, retries).(prometheus.Counter) -} diff --git a/pkg/util/workqueue/prometheus/prometheus_test.go b/pkg/util/workqueue/prometheus/prometheus_test.go deleted file mode 100644 index ff6d125e8bf..00000000000 --- a/pkg/util/workqueue/prometheus/prometheus_test.go +++ /dev/null @@ -1,232 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package prometheus - -import ( - "fmt" - "testing" - - client "github.com/prometheus/client_model/go" - - "github.com/prometheus/client_golang/prometheus" -) - -func TestRegistration(t *testing.T) { - type checkFunc func([]*client.MetricFamily) error - - gaugeHasValue := func(name string, expected float64) checkFunc { - return func(mfs []*client.MetricFamily) error { - for _, mf := range mfs { - if mf.GetName() != name { - continue - } - - if got := mf.GetMetric()[0].Gauge.GetValue(); got != expected { - return fmt.Errorf("expected %q gauge value %v, got %v", name, expected, got) - } - - return nil - } - - return fmt.Errorf("want metric %q, got none", name) - } - } - - counterHasValue := func(name string, expected float64) checkFunc { - return func(mfs []*client.MetricFamily) error { - for _, mf := range mfs { - if mf.GetName() != name { - continue - } - - if got := mf.GetMetric()[0].Counter.GetValue(); got != expected { - return fmt.Errorf("expected %q counter value %v, got %v", name, expected, got) - } - - return nil - } - - return fmt.Errorf("want metric %q, got none", name) - } - } - - histogramHasSum := func(name string, expected float64) checkFunc { - return func(mfs []*client.MetricFamily) error { - for _, mf := range mfs { - if mf.GetName() != name { - continue - } - - if got := mf.GetMetric()[0].Histogram.GetSampleSum(); got != expected { - return fmt.Errorf("expected %q histogram sample sum %v, got %v", name, expected, got) - } - - return nil - } - - return fmt.Errorf("want metric %q, got none", name) - } - } - - tests := []struct { - name string - register func(*prometheusMetricsProvider) - checks []checkFunc - }{ - { - name: "depth", - - register: func(p *prometheusMetricsProvider) { - d := p.NewDepthMetric("foo") - d.Inc() - }, - - checks: []checkFunc{ - gaugeHasValue("workqueue_depth", 1.0), - }, - }, - { - name: "adds", - - register: func(p *prometheusMetricsProvider) { - d := p.NewAddsMetric("foo") - d.Inc() - }, - - checks: []checkFunc{ - counterHasValue("workqueue_adds_total", 1.0), - }, - }, - { - name: "latency", - - register: func(p *prometheusMetricsProvider) { - d := p.NewLatencyMetric("foo") - d.Observe(10.0) - }, - - checks: []checkFunc{ - histogramHasSum("workqueue_queue_duration_seconds", 10.0), - }, - }, - { - name: "duration", - - register: func(p *prometheusMetricsProvider) { - d := p.NewWorkDurationMetric("foo") - d.Observe(10.0) - }, - - checks: []checkFunc{ - histogramHasSum("workqueue_work_duration_seconds", 10.0), - }, - }, - { - name: "unfinished work", - - register: func(p *prometheusMetricsProvider) { - d := p.NewUnfinishedWorkSecondsMetric("foo") - d.Set(3.0) - }, - - checks: []checkFunc{ - gaugeHasValue("workqueue_unfinished_work_seconds", 3.0), - }, - }, - { - name: "unfinished work", - - register: func(p *prometheusMetricsProvider) { - d := p.NewUnfinishedWorkSecondsMetric("foo") - d.Set(3.0) - }, - - checks: []checkFunc{ - gaugeHasValue("workqueue_unfinished_work_seconds", 3.0), - }, - }, - { - name: "longest running processor", - - register: func(p *prometheusMetricsProvider) { - d := p.NewLongestRunningProcessorSecondsMetric("foo") - d.Set(3.0) - }, - - checks: []checkFunc{ - gaugeHasValue("workqueue_longest_running_processor_seconds", 3.0), - }, - }, - { - name: "retries", - - register: func(p *prometheusMetricsProvider) { - d := p.NewRetriesMetric("foo") - d.Inc() - }, - - checks: []checkFunc{ - counterHasValue("workqueue_retries_total", 1.0), - }, - }, - - { - name: "double registration", - - register: func(p *prometheusMetricsProvider) { - d1 := p.NewDepthMetric("bar") - d1.Inc() - d2 := p.NewDepthMetric("bar") - d2.Inc() - - d3 := p.NewDeprecatedDepthMetric("bar") - d3.Inc() - d4 := p.NewDeprecatedDepthMetric("bar") - d4.Inc() - }, - - checks: []checkFunc{ - gaugeHasValue("workqueue_depth", 2.0), - gaugeHasValue("bar_depth", 2.0), - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - // reset prometheus registry for each test - reg := prometheus.NewRegistry() - prometheus.DefaultRegisterer = reg - prometheus.DefaultGatherer = reg - registerMetrics() - - var p prometheusMetricsProvider - - tc.register(&p) - mfs, err := prometheus.DefaultGatherer.Gather() - if err != nil { - t.Fatal(err) - } - - for _, check := range tc.checks { - if err := check(mfs); err != nil { - t.Error(err) - } - } - }) - } -} diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go index bd654bf3112..a37177425d7 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go @@ -43,13 +43,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface { func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { ret := &delayingType{ - Interface: NewNamed(name), - clock: clock, - heartbeat: clock.NewTicker(maxWait), - stopCh: make(chan struct{}), - waitingForAddCh: make(chan *waitFor, 1000), - metrics: newRetryMetrics(name), - deprecatedMetrics: newDeprecatedRetryMetrics(name), + Interface: NewNamed(name), + clock: clock, + heartbeat: clock.NewTicker(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan *waitFor, 1000), + metrics: newRetryMetrics(name), } go ret.waitingLoop() @@ -74,8 +73,7 @@ type delayingType struct { waitingForAddCh chan *waitFor // metrics counts the number of retries - metrics retryMetrics - deprecatedMetrics retryMetrics + metrics retryMetrics } // waitFor holds the data to add and the time it should be added @@ -148,7 +146,6 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { } q.metrics.retry() - q.deprecatedMetrics.retry() // immediately add things with no delay if duration <= 0 { diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/util/workqueue/metrics.go index be23ddd05f7..a3911bf2d63 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics.go @@ -87,14 +87,6 @@ type defaultQueueMetrics struct { // how long have current threads been working? unfinishedWorkSeconds SettableGaugeMetric longestRunningProcessor SettableGaugeMetric - - // TODO(danielqsj): Remove the following metrics, they are deprecated - deprecatedDepth GaugeMetric - deprecatedAdds CounterMetric - deprecatedLatency SummaryMetric - deprecatedWorkDuration SummaryMetric - deprecatedUnfinishedWorkSeconds SettableGaugeMetric - deprecatedLongestRunningProcessor SettableGaugeMetric } func (m *defaultQueueMetrics) add(item t) { @@ -103,9 +95,7 @@ func (m *defaultQueueMetrics) add(item t) { } m.adds.Inc() - m.deprecatedAdds.Inc() m.depth.Inc() - m.deprecatedDepth.Inc() if _, exists := m.addTimes[item]; !exists { m.addTimes[item] = m.clock.Now() } @@ -117,11 +107,9 @@ func (m *defaultQueueMetrics) get(item t) { } m.depth.Dec() - m.deprecatedDepth.Dec() m.processingStartTimes[item] = m.clock.Now() if startTime, exists := m.addTimes[item]; exists { m.latency.Observe(m.sinceInSeconds(startTime)) - m.deprecatedLatency.Observe(m.sinceInMicroseconds(startTime)) delete(m.addTimes, item) } } @@ -133,7 +121,6 @@ func (m *defaultQueueMetrics) done(item t) { if startTime, exists := m.processingStartTimes[item]; exists { m.workDuration.Observe(m.sinceInSeconds(startTime)) - m.deprecatedWorkDuration.Observe(m.sinceInMicroseconds(startTime)) delete(m.processingStartTimes, item) } } @@ -153,9 +140,7 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() { // Convert to seconds; microseconds is unhelpfully granular for this. total /= 1000000 m.unfinishedWorkSeconds.Set(total) - m.deprecatedUnfinishedWorkSeconds.Set(total) m.longestRunningProcessor.Set(oldest / 1000000) - m.deprecatedLongestRunningProcessor.Set(oldest) // in microseconds. } type noMetrics struct{} @@ -200,13 +185,6 @@ type MetricsProvider interface { NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric - NewDeprecatedDepthMetric(name string) GaugeMetric - NewDeprecatedAddsMetric(name string) CounterMetric - NewDeprecatedLatencyMetric(name string) SummaryMetric - NewDeprecatedWorkDurationMetric(name string) SummaryMetric - NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric - NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric - NewDeprecatedRetriesMetric(name string) CounterMetric } type noopMetricsProvider struct{} @@ -239,34 +217,6 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } -func (_ noopMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric { - return noopMetric{} -} - var globalMetricsFactory = queueMetricsFactory{ metricsProvider: noopMetricsProvider{}, } @@ -289,21 +239,15 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu return noMetrics{} } return &defaultQueueMetrics{ - clock: clock, - depth: mp.NewDepthMetric(name), - adds: mp.NewAddsMetric(name), - latency: mp.NewLatencyMetric(name), - workDuration: mp.NewWorkDurationMetric(name), - unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), - longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), - deprecatedDepth: mp.NewDeprecatedDepthMetric(name), - deprecatedAdds: mp.NewDeprecatedAddsMetric(name), - deprecatedLatency: mp.NewDeprecatedLatencyMetric(name), - deprecatedWorkDuration: mp.NewDeprecatedWorkDurationMetric(name), - deprecatedUnfinishedWorkSeconds: mp.NewDeprecatedUnfinishedWorkSecondsMetric(name), - deprecatedLongestRunningProcessor: mp.NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name), - addTimes: map[t]time.Time{}, - processingStartTimes: map[t]time.Time{}, + clock: clock, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), + longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, } } @@ -317,16 +261,6 @@ func newRetryMetrics(name string) retryMetrics { } } -func newDeprecatedRetryMetrics(name string) retryMetrics { - var ret *defaultRetryMetrics - if len(name) == 0 { - return ret - } - return &defaultRetryMetrics{ - retries: globalMetricsFactory.metricsProvider.NewDeprecatedRetriesMetric(name), - } -} - // SetProvider sets the metrics provider for all subsequently created work // queues. Only the first call has an effect. func SetProvider(metricsProvider MetricsProvider) { diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go index ceb8f4c9ae3..d1178285fd5 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -137,14 +137,6 @@ type testMetricsProvider struct { unfinished testMetric longest testMetric retries testMetric - // deprecated metrics - deprecatedDepth testMetric - deprecatedAdds testMetric - deprecatedLatency testMetric - deprecatedDuration testMetric - deprecatedUnfinished testMetric - deprecatedLongest testMetric - deprecatedRetries testMetric } func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric { @@ -175,34 +167,6 @@ func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { return &m.retries } -func (m *testMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric { - return &m.deprecatedDepth -} - -func (m *testMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric { - return &m.deprecatedAdds -} - -func (m *testMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric { - return &m.deprecatedLatency -} - -func (m *testMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric { - return &m.deprecatedDuration -} - -func (m *testMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { - return &m.deprecatedUnfinished -} - -func (m *testMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { - return &m.deprecatedLongest -} - -func (m *testMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric { - return &m.deprecatedRetries -} - func TestSinceInMicroseconds(t *testing.T) { mp := testMetricsProvider{} c := clock.NewFakeClock(time.Now()) @@ -237,18 +201,10 @@ func TestMetrics(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1.0, mp.deprecatedAdds.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 1.0, mp.depth.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - c.Step(50 * time.Microsecond) // Start processing @@ -263,18 +219,6 @@ func TestMetrics(t *testing.T) { if e, a := 1, mp.latency.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 50.0, mp.deprecatedLatency.observationValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 1, mp.deprecatedLatency.observationCount(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 0.0, mp.depth.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 0.0, mp.deprecatedDepth.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // Add it back while processing; multiple adds of the same item are // de-duped. @@ -286,16 +230,10 @@ func TestMetrics(t *testing.T) { if e, a := 2.0, mp.adds.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 2.0, mp.deprecatedAdds.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // One thing remains in the queue if e, a := 1.0, mp.depth.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } c.Step(25 * time.Microsecond) @@ -308,20 +246,11 @@ func TestMetrics(t *testing.T) { if e, a := 1, mp.duration.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 25.0, mp.deprecatedDuration.observationValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 1, mp.deprecatedDuration.observationCount(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // One thing remains in the queue if e, a := 1.0, mp.depth.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // It should be back on the queue i, _ = q.Get() @@ -335,35 +264,20 @@ func TestMetrics(t *testing.T) { if e, a := 2, mp.latency.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 25.0, mp.deprecatedLatency.observationValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 2, mp.deprecatedLatency.observationCount(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // use a channel to ensure we don't look at the metric before it's // been set. ch := make(chan struct{}, 1) mp.unfinished.notifyCh = ch - mp.deprecatedUnfinished.notifyCh = ch c.Step(time.Millisecond) <-ch - <-ch mp.unfinished.notifyCh = nil - mp.deprecatedUnfinished.notifyCh = nil if e, a := .001, mp.unfinished.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := .001, mp.deprecatedUnfinished.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } if e, a := .001, mp.longest.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1000.0, mp.deprecatedLongest.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // Finish that one up q.Done(i) @@ -373,10 +287,4 @@ func TestMetrics(t *testing.T) { if e, a := 2, mp.duration.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1000.0, mp.deprecatedDuration.observationValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 2, mp.deprecatedDuration.observationCount(); e != a { - t.Errorf("expected %v, got %v", e, a) - } } diff --git a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go index daa0d86041b..3fbe07d0d8b 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go @@ -28,13 +28,12 @@ func TestRateLimitingQueue(t *testing.T) { queue := NewRateLimitingQueue(limiter).(*rateLimitingType) fakeClock := clock.NewFakeClock(time.Now()) delayingQueue := &delayingType{ - Interface: New(), - clock: fakeClock, - heartbeat: fakeClock.NewTicker(maxWait), - stopCh: make(chan struct{}), - waitingForAddCh: make(chan *waitFor, 1000), - metrics: newRetryMetrics(""), - deprecatedMetrics: newDeprecatedRetryMetrics(""), + Interface: New(), + clock: fakeClock, + heartbeat: fakeClock.NewTicker(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan *waitFor, 1000), + metrics: newRetryMetrics(""), } queue.DelayingInterface = delayingQueue