pkg/util/workqueue/prometheus: fix double registration

Currently, if workqueue metrics are registered twice, these metrics will be ignored. This fixes it.
This commit is contained in:
Sergiusz Urbaniak 2019-05-07 17:35:43 +02:00
parent 3148eb750d
commit 0e291d1f1b
No known key found for this signature in database
GPG Key ID: 300137157DBED7EF
2 changed files with 360 additions and 104 deletions

View File

@ -38,120 +38,156 @@ const (
RetriesKey = "retries_total"
)
var (
depth = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
},
[]string{"name"},
)
adds = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
},
[]string{"name"},
)
latency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
},
[]string{"name"},
)
workDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
},
[]string{"name"},
)
unfinished = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
},
[]string{"name"},
)
longestRunningProcessor = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
},
[]string{"name"},
)
retries = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
},
[]string{"name"},
)
)
func registerMetrics() {
prometheus.MustRegister(
depth,
adds,
latency,
workDuration,
unfinished,
longestRunningProcessor,
retries,
)
}
func init() {
registerMetrics()
workqueue.SetProvider(prometheusMetricsProvider{})
}
type prometheusMetricsProvider struct{}
func (prometheusMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(depth); err != nil {
klog.Errorf("failed to register depth metric %v: %v", name, err)
}
return depth
return depth.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
adds := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(adds); err != nil {
klog.Errorf("failed to register adds metric %v: %v", name, err)
}
return adds
return adds.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
latency := prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested.",
ConstLabels: prometheus.Labels{"name": name},
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
})
if err := prometheus.Register(latency); err != nil {
klog.Errorf("failed to register latency metric %v: %v", name, err)
}
return latency
return latency.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
workDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
ConstLabels: prometheus.Labels{"name": name},
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
})
if err := prometheus.Register(workDuration); err != nil {
klog.Errorf("failed to register workDuration metric %v: %v", name, err)
}
return workDuration
return workDuration.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register unfinished metric %v: %v", name, err)
}
return unfinished
return unfinished.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
longestRunningProcessor := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(longestRunningProcessor); err != nil {
klog.Errorf("failed to register unfinished metric %v: %v", name, err)
}
return longestRunningProcessor
return longestRunningProcessor.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(retries); err != nil {
klog.Errorf("failed to register retries metric %v: %v", name, err)
}
return retries
return retries.WithLabelValues(name)
}
// TODO(danielqsj): Remove the following metrics, they are deprecated
// mustRegister attempts to register the given collector with the given metric, and name
// and returns the registered collector. The caller must use the returned collector
// as it might point to a different instance of an already registered collector.
func mustRegister(metric, name string, c prometheus.Collector) prometheus.Collector {
err := prometheus.Register(c)
if err == nil {
return c
}
if aerr, ok := err.(prometheus.AlreadyRegisteredError); ok {
klog.V(4).Infof("reusing already registered metric %v name %v", metric, name)
return aerr.ExistingCollector
}
// this should fail hard as this indicates a programmatic error, i.e.
// an invalid or duplicate metric descriptor,
// a previously registered descriptor with the same fqdn but different labels,
// or inconsistent label names or help strings for the same fqdn.
klog.Fatalf("failed to register metric %v name %v: %v", metric, name, err)
return nil
}
func (prometheusMetricsProvider) NewDeprecatedDepthMetric(name string) workqueue.GaugeMetric {
depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "depth",
Help: "(Deprecated) Current depth of workqueue: " + name,
})
if err := prometheus.Register(depth); err != nil {
klog.Errorf("failed to register depth metric %v: %v", name, err)
}
return depth
return mustRegister("depth", name, depth).(prometheus.Gauge)
}
func (prometheusMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue.CounterMetric {
@ -160,10 +196,8 @@ func (prometheusMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue.
Name: "adds",
Help: "(Deprecated) Total number of adds handled by workqueue: " + name,
})
if err := prometheus.Register(adds); err != nil {
klog.Errorf("failed to register adds metric %v: %v", name, err)
}
return adds
return mustRegister("adds", name, adds).(prometheus.Counter)
}
func (prometheusMetricsProvider) NewDeprecatedLatencyMetric(name string) workqueue.SummaryMetric {
@ -172,10 +206,8 @@ func (prometheusMetricsProvider) NewDeprecatedLatencyMetric(name string) workque
Name: "queue_latency",
Help: "(Deprecated) How long an item stays in workqueue" + name + " before being requested.",
})
if err := prometheus.Register(latency); err != nil {
klog.Errorf("failed to register latency metric %v: %v", name, err)
}
return latency
return mustRegister("queue_latency", name, latency).(prometheus.Summary)
}
func (prometheusMetricsProvider) NewDeprecatedWorkDurationMetric(name string) workqueue.SummaryMetric {
@ -184,10 +216,8 @@ func (prometheusMetricsProvider) NewDeprecatedWorkDurationMetric(name string) wo
Name: "work_duration",
Help: "(Deprecated) How long processing an item from workqueue" + name + " takes.",
})
if err := prometheus.Register(workDuration); err != nil {
klog.Errorf("failed to register work_duration metric %v: %v", name, err)
}
return workDuration
return mustRegister("work_duration", name, workDuration).(prometheus.Summary)
}
func (prometheusMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
@ -199,10 +229,8 @@ func (prometheusMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name s
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register unfinished_work_seconds metric %v: %v", name, err)
}
return unfinished
return mustRegister("unfinished_work_seconds", name, unfinished).(prometheus.Gauge)
}
func (prometheusMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
@ -212,10 +240,8 @@ func (prometheusMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecond
Help: "(Deprecated) How many microseconds has the longest running " +
"processor for " + name + " been running.",
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register longest_running_processor_microseconds metric %v: %v", name, err)
}
return unfinished
return mustRegister("longest_running_processor_microseconds", name, unfinished).(prometheus.Gauge)
}
func (prometheusMetricsProvider) NewDeprecatedRetriesMetric(name string) workqueue.CounterMetric {
@ -224,8 +250,6 @@ func (prometheusMetricsProvider) NewDeprecatedRetriesMetric(name string) workque
Name: "retries",
Help: "(Deprecated) Total number of retries handled by workqueue: " + name,
})
if err := prometheus.Register(retries); err != nil {
klog.Errorf("failed to register retries metric %v: %v", name, err)
}
return retries
return mustRegister("retries", name, retries).(prometheus.Counter)
}

View File

@ -0,0 +1,232 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prometheus
import (
"fmt"
"testing"
client "github.com/prometheus/client_model/go"
"github.com/prometheus/client_golang/prometheus"
)
func TestRegistration(t *testing.T) {
type checkFunc func([]*client.MetricFamily) error
gaugeHasValue := func(name string, expected float64) checkFunc {
return func(mfs []*client.MetricFamily) error {
for _, mf := range mfs {
if mf.GetName() != name {
continue
}
if got := mf.GetMetric()[0].Gauge.GetValue(); got != expected {
return fmt.Errorf("expected %q gauge value %v, got %v", name, expected, got)
}
return nil
}
return fmt.Errorf("want metric %q, got none", name)
}
}
counterHasValue := func(name string, expected float64) checkFunc {
return func(mfs []*client.MetricFamily) error {
for _, mf := range mfs {
if mf.GetName() != name {
continue
}
if got := mf.GetMetric()[0].Counter.GetValue(); got != expected {
return fmt.Errorf("expected %q counter value %v, got %v", name, expected, got)
}
return nil
}
return fmt.Errorf("want metric %q, got none", name)
}
}
histogramHasSum := func(name string, expected float64) checkFunc {
return func(mfs []*client.MetricFamily) error {
for _, mf := range mfs {
if mf.GetName() != name {
continue
}
if got := mf.GetMetric()[0].Histogram.GetSampleSum(); got != expected {
return fmt.Errorf("expected %q histogram sample sum %v, got %v", name, expected, got)
}
return nil
}
return fmt.Errorf("want metric %q, got none", name)
}
}
tests := []struct {
name string
register func(*prometheusMetricsProvider)
checks []checkFunc
}{
{
name: "depth",
register: func(p *prometheusMetricsProvider) {
d := p.NewDepthMetric("foo")
d.Inc()
},
checks: []checkFunc{
gaugeHasValue("workqueue_depth", 1.0),
},
},
{
name: "adds",
register: func(p *prometheusMetricsProvider) {
d := p.NewAddsMetric("foo")
d.Inc()
},
checks: []checkFunc{
counterHasValue("workqueue_adds_total", 1.0),
},
},
{
name: "latency",
register: func(p *prometheusMetricsProvider) {
d := p.NewLatencyMetric("foo")
d.Observe(10.0)
},
checks: []checkFunc{
histogramHasSum("workqueue_queue_duration_seconds", 10.0),
},
},
{
name: "duration",
register: func(p *prometheusMetricsProvider) {
d := p.NewWorkDurationMetric("foo")
d.Observe(10.0)
},
checks: []checkFunc{
histogramHasSum("workqueue_work_duration_seconds", 10.0),
},
},
{
name: "unfinished work",
register: func(p *prometheusMetricsProvider) {
d := p.NewUnfinishedWorkSecondsMetric("foo")
d.Set(3.0)
},
checks: []checkFunc{
gaugeHasValue("workqueue_unfinished_work_seconds", 3.0),
},
},
{
name: "unfinished work",
register: func(p *prometheusMetricsProvider) {
d := p.NewUnfinishedWorkSecondsMetric("foo")
d.Set(3.0)
},
checks: []checkFunc{
gaugeHasValue("workqueue_unfinished_work_seconds", 3.0),
},
},
{
name: "longest running processor",
register: func(p *prometheusMetricsProvider) {
d := p.NewLongestRunningProcessorSecondsMetric("foo")
d.Set(3.0)
},
checks: []checkFunc{
gaugeHasValue("workqueue_longest_running_processor_seconds", 3.0),
},
},
{
name: "retries",
register: func(p *prometheusMetricsProvider) {
d := p.NewRetriesMetric("foo")
d.Inc()
},
checks: []checkFunc{
counterHasValue("workqueue_retries_total", 1.0),
},
},
{
name: "double registration",
register: func(p *prometheusMetricsProvider) {
d1 := p.NewDepthMetric("bar")
d1.Inc()
d2 := p.NewDepthMetric("bar")
d2.Inc()
d3 := p.NewDeprecatedDepthMetric("bar")
d3.Inc()
d4 := p.NewDeprecatedDepthMetric("bar")
d4.Inc()
},
checks: []checkFunc{
gaugeHasValue("workqueue_depth", 2.0),
gaugeHasValue("bar_depth", 2.0),
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// reset prometheus registry for each test
reg := prometheus.NewRegistry()
prometheus.DefaultRegisterer = reg
prometheus.DefaultGatherer = reg
registerMetrics()
var p prometheusMetricsProvider
tc.register(&p)
mfs, err := prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatal(err)
}
for _, check := range tc.checks {
if err := check(mfs); err != nil {
t.Error(err)
}
}
})
}
}