mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
pkg/util/workqueue: delete deprecated metrics
This deletes deprecated metrics and simplifies registration.
This commit is contained in:
parent
0e291d1f1b
commit
4532cfd85c
@ -12,7 +12,6 @@ go_library(
|
|||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -18,7 +18,6 @@ package prometheus
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/klog"
|
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
@ -155,101 +154,3 @@ func (prometheusMetricsProvider) NewLongestRunningProcessorSecondsMetric(name st
|
|||||||
func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
|
func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
|
||||||
return retries.WithLabelValues(name)
|
return retries.WithLabelValues(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(danielqsj): Remove the following metrics, they are deprecated
|
|
||||||
|
|
||||||
// mustRegister attempts to register the given collector with the given metric, and name
|
|
||||||
// and returns the registered collector. The caller must use the returned collector
|
|
||||||
// as it might point to a different instance of an already registered collector.
|
|
||||||
func mustRegister(metric, name string, c prometheus.Collector) prometheus.Collector {
|
|
||||||
err := prometheus.Register(c)
|
|
||||||
if err == nil {
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
if aerr, ok := err.(prometheus.AlreadyRegisteredError); ok {
|
|
||||||
klog.V(4).Infof("reusing already registered metric %v name %v", metric, name)
|
|
||||||
return aerr.ExistingCollector
|
|
||||||
}
|
|
||||||
|
|
||||||
// this should fail hard as this indicates a programmatic error, i.e.
|
|
||||||
// an invalid or duplicate metric descriptor,
|
|
||||||
// a previously registered descriptor with the same fqdn but different labels,
|
|
||||||
// or inconsistent label names or help strings for the same fqdn.
|
|
||||||
klog.Fatalf("failed to register metric %v name %v: %v", metric, name, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (prometheusMetricsProvider) NewDeprecatedDepthMetric(name string) workqueue.GaugeMetric {
|
|
||||||
depth := prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "depth",
|
|
||||||
Help: "(Deprecated) Current depth of workqueue: " + name,
|
|
||||||
})
|
|
||||||
|
|
||||||
return mustRegister("depth", name, depth).(prometheus.Gauge)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (prometheusMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue.CounterMetric {
|
|
||||||
adds := prometheus.NewCounter(prometheus.CounterOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "adds",
|
|
||||||
Help: "(Deprecated) Total number of adds handled by workqueue: " + name,
|
|
||||||
})
|
|
||||||
|
|
||||||
return mustRegister("adds", name, adds).(prometheus.Counter)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (prometheusMetricsProvider) NewDeprecatedLatencyMetric(name string) workqueue.SummaryMetric {
|
|
||||||
latency := prometheus.NewSummary(prometheus.SummaryOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "queue_latency",
|
|
||||||
Help: "(Deprecated) How long an item stays in workqueue" + name + " before being requested.",
|
|
||||||
})
|
|
||||||
|
|
||||||
return mustRegister("queue_latency", name, latency).(prometheus.Summary)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (prometheusMetricsProvider) NewDeprecatedWorkDurationMetric(name string) workqueue.SummaryMetric {
|
|
||||||
workDuration := prometheus.NewSummary(prometheus.SummaryOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "work_duration",
|
|
||||||
Help: "(Deprecated) How long processing an item from workqueue" + name + " takes.",
|
|
||||||
})
|
|
||||||
|
|
||||||
return mustRegister("work_duration", name, workDuration).(prometheus.Summary)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (prometheusMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
|
|
||||||
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "unfinished_work_seconds",
|
|
||||||
Help: "(Deprecated) How many seconds of work " + name + " has done that " +
|
|
||||||
"is in progress and hasn't been observed by work_duration. Large " +
|
|
||||||
"values indicate stuck threads. One can deduce the number of stuck " +
|
|
||||||
"threads by observing the rate at which this increases.",
|
|
||||||
})
|
|
||||||
|
|
||||||
return mustRegister("unfinished_work_seconds", name, unfinished).(prometheus.Gauge)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (prometheusMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
|
|
||||||
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "longest_running_processor_microseconds",
|
|
||||||
Help: "(Deprecated) How many microseconds has the longest running " +
|
|
||||||
"processor for " + name + " been running.",
|
|
||||||
})
|
|
||||||
|
|
||||||
return mustRegister("longest_running_processor_microseconds", name, unfinished).(prometheus.Gauge)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (prometheusMetricsProvider) NewDeprecatedRetriesMetric(name string) workqueue.CounterMetric {
|
|
||||||
retries := prometheus.NewCounter(prometheus.CounterOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "retries",
|
|
||||||
Help: "(Deprecated) Total number of retries handled by workqueue: " + name,
|
|
||||||
})
|
|
||||||
|
|
||||||
return mustRegister("retries", name, retries).(prometheus.Counter)
|
|
||||||
}
|
|
||||||
|
@ -1,232 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2019 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package prometheus
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
client "github.com/prometheus/client_model/go"
|
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestRegistration(t *testing.T) {
|
|
||||||
type checkFunc func([]*client.MetricFamily) error
|
|
||||||
|
|
||||||
gaugeHasValue := func(name string, expected float64) checkFunc {
|
|
||||||
return func(mfs []*client.MetricFamily) error {
|
|
||||||
for _, mf := range mfs {
|
|
||||||
if mf.GetName() != name {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if got := mf.GetMetric()[0].Gauge.GetValue(); got != expected {
|
|
||||||
return fmt.Errorf("expected %q gauge value %v, got %v", name, expected, got)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Errorf("want metric %q, got none", name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
counterHasValue := func(name string, expected float64) checkFunc {
|
|
||||||
return func(mfs []*client.MetricFamily) error {
|
|
||||||
for _, mf := range mfs {
|
|
||||||
if mf.GetName() != name {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if got := mf.GetMetric()[0].Counter.GetValue(); got != expected {
|
|
||||||
return fmt.Errorf("expected %q counter value %v, got %v", name, expected, got)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Errorf("want metric %q, got none", name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
histogramHasSum := func(name string, expected float64) checkFunc {
|
|
||||||
return func(mfs []*client.MetricFamily) error {
|
|
||||||
for _, mf := range mfs {
|
|
||||||
if mf.GetName() != name {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if got := mf.GetMetric()[0].Histogram.GetSampleSum(); got != expected {
|
|
||||||
return fmt.Errorf("expected %q histogram sample sum %v, got %v", name, expected, got)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Errorf("want metric %q, got none", name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
register func(*prometheusMetricsProvider)
|
|
||||||
checks []checkFunc
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "depth",
|
|
||||||
|
|
||||||
register: func(p *prometheusMetricsProvider) {
|
|
||||||
d := p.NewDepthMetric("foo")
|
|
||||||
d.Inc()
|
|
||||||
},
|
|
||||||
|
|
||||||
checks: []checkFunc{
|
|
||||||
gaugeHasValue("workqueue_depth", 1.0),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "adds",
|
|
||||||
|
|
||||||
register: func(p *prometheusMetricsProvider) {
|
|
||||||
d := p.NewAddsMetric("foo")
|
|
||||||
d.Inc()
|
|
||||||
},
|
|
||||||
|
|
||||||
checks: []checkFunc{
|
|
||||||
counterHasValue("workqueue_adds_total", 1.0),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "latency",
|
|
||||||
|
|
||||||
register: func(p *prometheusMetricsProvider) {
|
|
||||||
d := p.NewLatencyMetric("foo")
|
|
||||||
d.Observe(10.0)
|
|
||||||
},
|
|
||||||
|
|
||||||
checks: []checkFunc{
|
|
||||||
histogramHasSum("workqueue_queue_duration_seconds", 10.0),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "duration",
|
|
||||||
|
|
||||||
register: func(p *prometheusMetricsProvider) {
|
|
||||||
d := p.NewWorkDurationMetric("foo")
|
|
||||||
d.Observe(10.0)
|
|
||||||
},
|
|
||||||
|
|
||||||
checks: []checkFunc{
|
|
||||||
histogramHasSum("workqueue_work_duration_seconds", 10.0),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "unfinished work",
|
|
||||||
|
|
||||||
register: func(p *prometheusMetricsProvider) {
|
|
||||||
d := p.NewUnfinishedWorkSecondsMetric("foo")
|
|
||||||
d.Set(3.0)
|
|
||||||
},
|
|
||||||
|
|
||||||
checks: []checkFunc{
|
|
||||||
gaugeHasValue("workqueue_unfinished_work_seconds", 3.0),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "unfinished work",
|
|
||||||
|
|
||||||
register: func(p *prometheusMetricsProvider) {
|
|
||||||
d := p.NewUnfinishedWorkSecondsMetric("foo")
|
|
||||||
d.Set(3.0)
|
|
||||||
},
|
|
||||||
|
|
||||||
checks: []checkFunc{
|
|
||||||
gaugeHasValue("workqueue_unfinished_work_seconds", 3.0),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "longest running processor",
|
|
||||||
|
|
||||||
register: func(p *prometheusMetricsProvider) {
|
|
||||||
d := p.NewLongestRunningProcessorSecondsMetric("foo")
|
|
||||||
d.Set(3.0)
|
|
||||||
},
|
|
||||||
|
|
||||||
checks: []checkFunc{
|
|
||||||
gaugeHasValue("workqueue_longest_running_processor_seconds", 3.0),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "retries",
|
|
||||||
|
|
||||||
register: func(p *prometheusMetricsProvider) {
|
|
||||||
d := p.NewRetriesMetric("foo")
|
|
||||||
d.Inc()
|
|
||||||
},
|
|
||||||
|
|
||||||
checks: []checkFunc{
|
|
||||||
counterHasValue("workqueue_retries_total", 1.0),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
{
|
|
||||||
name: "double registration",
|
|
||||||
|
|
||||||
register: func(p *prometheusMetricsProvider) {
|
|
||||||
d1 := p.NewDepthMetric("bar")
|
|
||||||
d1.Inc()
|
|
||||||
d2 := p.NewDepthMetric("bar")
|
|
||||||
d2.Inc()
|
|
||||||
|
|
||||||
d3 := p.NewDeprecatedDepthMetric("bar")
|
|
||||||
d3.Inc()
|
|
||||||
d4 := p.NewDeprecatedDepthMetric("bar")
|
|
||||||
d4.Inc()
|
|
||||||
},
|
|
||||||
|
|
||||||
checks: []checkFunc{
|
|
||||||
gaugeHasValue("workqueue_depth", 2.0),
|
|
||||||
gaugeHasValue("bar_depth", 2.0),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range tests {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
// reset prometheus registry for each test
|
|
||||||
reg := prometheus.NewRegistry()
|
|
||||||
prometheus.DefaultRegisterer = reg
|
|
||||||
prometheus.DefaultGatherer = reg
|
|
||||||
registerMetrics()
|
|
||||||
|
|
||||||
var p prometheusMetricsProvider
|
|
||||||
|
|
||||||
tc.register(&p)
|
|
||||||
mfs, err := prometheus.DefaultGatherer.Gather()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, check := range tc.checks {
|
|
||||||
if err := check(mfs); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -43,13 +43,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface {
|
|||||||
|
|
||||||
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
|
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
|
||||||
ret := &delayingType{
|
ret := &delayingType{
|
||||||
Interface: NewNamed(name),
|
Interface: NewNamed(name),
|
||||||
clock: clock,
|
clock: clock,
|
||||||
heartbeat: clock.NewTicker(maxWait),
|
heartbeat: clock.NewTicker(maxWait),
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
waitingForAddCh: make(chan *waitFor, 1000),
|
waitingForAddCh: make(chan *waitFor, 1000),
|
||||||
metrics: newRetryMetrics(name),
|
metrics: newRetryMetrics(name),
|
||||||
deprecatedMetrics: newDeprecatedRetryMetrics(name),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go ret.waitingLoop()
|
go ret.waitingLoop()
|
||||||
@ -74,8 +73,7 @@ type delayingType struct {
|
|||||||
waitingForAddCh chan *waitFor
|
waitingForAddCh chan *waitFor
|
||||||
|
|
||||||
// metrics counts the number of retries
|
// metrics counts the number of retries
|
||||||
metrics retryMetrics
|
metrics retryMetrics
|
||||||
deprecatedMetrics retryMetrics
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitFor holds the data to add and the time it should be added
|
// waitFor holds the data to add and the time it should be added
|
||||||
@ -148,7 +146,6 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
q.metrics.retry()
|
q.metrics.retry()
|
||||||
q.deprecatedMetrics.retry()
|
|
||||||
|
|
||||||
// immediately add things with no delay
|
// immediately add things with no delay
|
||||||
if duration <= 0 {
|
if duration <= 0 {
|
||||||
|
@ -87,14 +87,6 @@ type defaultQueueMetrics struct {
|
|||||||
// how long have current threads been working?
|
// how long have current threads been working?
|
||||||
unfinishedWorkSeconds SettableGaugeMetric
|
unfinishedWorkSeconds SettableGaugeMetric
|
||||||
longestRunningProcessor SettableGaugeMetric
|
longestRunningProcessor SettableGaugeMetric
|
||||||
|
|
||||||
// TODO(danielqsj): Remove the following metrics, they are deprecated
|
|
||||||
deprecatedDepth GaugeMetric
|
|
||||||
deprecatedAdds CounterMetric
|
|
||||||
deprecatedLatency SummaryMetric
|
|
||||||
deprecatedWorkDuration SummaryMetric
|
|
||||||
deprecatedUnfinishedWorkSeconds SettableGaugeMetric
|
|
||||||
deprecatedLongestRunningProcessor SettableGaugeMetric
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *defaultQueueMetrics) add(item t) {
|
func (m *defaultQueueMetrics) add(item t) {
|
||||||
@ -103,9 +95,7 @@ func (m *defaultQueueMetrics) add(item t) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.adds.Inc()
|
m.adds.Inc()
|
||||||
m.deprecatedAdds.Inc()
|
|
||||||
m.depth.Inc()
|
m.depth.Inc()
|
||||||
m.deprecatedDepth.Inc()
|
|
||||||
if _, exists := m.addTimes[item]; !exists {
|
if _, exists := m.addTimes[item]; !exists {
|
||||||
m.addTimes[item] = m.clock.Now()
|
m.addTimes[item] = m.clock.Now()
|
||||||
}
|
}
|
||||||
@ -117,11 +107,9 @@ func (m *defaultQueueMetrics) get(item t) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.depth.Dec()
|
m.depth.Dec()
|
||||||
m.deprecatedDepth.Dec()
|
|
||||||
m.processingStartTimes[item] = m.clock.Now()
|
m.processingStartTimes[item] = m.clock.Now()
|
||||||
if startTime, exists := m.addTimes[item]; exists {
|
if startTime, exists := m.addTimes[item]; exists {
|
||||||
m.latency.Observe(m.sinceInSeconds(startTime))
|
m.latency.Observe(m.sinceInSeconds(startTime))
|
||||||
m.deprecatedLatency.Observe(m.sinceInMicroseconds(startTime))
|
|
||||||
delete(m.addTimes, item)
|
delete(m.addTimes, item)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -133,7 +121,6 @@ func (m *defaultQueueMetrics) done(item t) {
|
|||||||
|
|
||||||
if startTime, exists := m.processingStartTimes[item]; exists {
|
if startTime, exists := m.processingStartTimes[item]; exists {
|
||||||
m.workDuration.Observe(m.sinceInSeconds(startTime))
|
m.workDuration.Observe(m.sinceInSeconds(startTime))
|
||||||
m.deprecatedWorkDuration.Observe(m.sinceInMicroseconds(startTime))
|
|
||||||
delete(m.processingStartTimes, item)
|
delete(m.processingStartTimes, item)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -153,9 +140,7 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() {
|
|||||||
// Convert to seconds; microseconds is unhelpfully granular for this.
|
// Convert to seconds; microseconds is unhelpfully granular for this.
|
||||||
total /= 1000000
|
total /= 1000000
|
||||||
m.unfinishedWorkSeconds.Set(total)
|
m.unfinishedWorkSeconds.Set(total)
|
||||||
m.deprecatedUnfinishedWorkSeconds.Set(total)
|
|
||||||
m.longestRunningProcessor.Set(oldest / 1000000)
|
m.longestRunningProcessor.Set(oldest / 1000000)
|
||||||
m.deprecatedLongestRunningProcessor.Set(oldest) // in microseconds.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type noMetrics struct{}
|
type noMetrics struct{}
|
||||||
@ -200,13 +185,6 @@ type MetricsProvider interface {
|
|||||||
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
|
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
|
||||||
NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
|
NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
|
||||||
NewRetriesMetric(name string) CounterMetric
|
NewRetriesMetric(name string) CounterMetric
|
||||||
NewDeprecatedDepthMetric(name string) GaugeMetric
|
|
||||||
NewDeprecatedAddsMetric(name string) CounterMetric
|
|
||||||
NewDeprecatedLatencyMetric(name string) SummaryMetric
|
|
||||||
NewDeprecatedWorkDurationMetric(name string) SummaryMetric
|
|
||||||
NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
|
|
||||||
NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
|
|
||||||
NewDeprecatedRetriesMetric(name string) CounterMetric
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type noopMetricsProvider struct{}
|
type noopMetricsProvider struct{}
|
||||||
@ -239,34 +217,6 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
|
|||||||
return noopMetric{}
|
return noopMetric{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_ noopMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
|
|
||||||
return noopMetric{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_ noopMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
|
|
||||||
return noopMetric{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_ noopMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
|
|
||||||
return noopMetric{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_ noopMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
|
|
||||||
return noopMetric{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_ noopMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
|
|
||||||
return noopMetric{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_ noopMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
|
|
||||||
return noopMetric{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_ noopMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
|
|
||||||
return noopMetric{}
|
|
||||||
}
|
|
||||||
|
|
||||||
var globalMetricsFactory = queueMetricsFactory{
|
var globalMetricsFactory = queueMetricsFactory{
|
||||||
metricsProvider: noopMetricsProvider{},
|
metricsProvider: noopMetricsProvider{},
|
||||||
}
|
}
|
||||||
@ -289,21 +239,15 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
|
|||||||
return noMetrics{}
|
return noMetrics{}
|
||||||
}
|
}
|
||||||
return &defaultQueueMetrics{
|
return &defaultQueueMetrics{
|
||||||
clock: clock,
|
clock: clock,
|
||||||
depth: mp.NewDepthMetric(name),
|
depth: mp.NewDepthMetric(name),
|
||||||
adds: mp.NewAddsMetric(name),
|
adds: mp.NewAddsMetric(name),
|
||||||
latency: mp.NewLatencyMetric(name),
|
latency: mp.NewLatencyMetric(name),
|
||||||
workDuration: mp.NewWorkDurationMetric(name),
|
workDuration: mp.NewWorkDurationMetric(name),
|
||||||
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
|
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
|
||||||
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
|
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
|
||||||
deprecatedDepth: mp.NewDeprecatedDepthMetric(name),
|
addTimes: map[t]time.Time{},
|
||||||
deprecatedAdds: mp.NewDeprecatedAddsMetric(name),
|
processingStartTimes: map[t]time.Time{},
|
||||||
deprecatedLatency: mp.NewDeprecatedLatencyMetric(name),
|
|
||||||
deprecatedWorkDuration: mp.NewDeprecatedWorkDurationMetric(name),
|
|
||||||
deprecatedUnfinishedWorkSeconds: mp.NewDeprecatedUnfinishedWorkSecondsMetric(name),
|
|
||||||
deprecatedLongestRunningProcessor: mp.NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name),
|
|
||||||
addTimes: map[t]time.Time{},
|
|
||||||
processingStartTimes: map[t]time.Time{},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -317,16 +261,6 @@ func newRetryMetrics(name string) retryMetrics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDeprecatedRetryMetrics(name string) retryMetrics {
|
|
||||||
var ret *defaultRetryMetrics
|
|
||||||
if len(name) == 0 {
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
return &defaultRetryMetrics{
|
|
||||||
retries: globalMetricsFactory.metricsProvider.NewDeprecatedRetriesMetric(name),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetProvider sets the metrics provider for all subsequently created work
|
// SetProvider sets the metrics provider for all subsequently created work
|
||||||
// queues. Only the first call has an effect.
|
// queues. Only the first call has an effect.
|
||||||
func SetProvider(metricsProvider MetricsProvider) {
|
func SetProvider(metricsProvider MetricsProvider) {
|
||||||
|
@ -137,14 +137,6 @@ type testMetricsProvider struct {
|
|||||||
unfinished testMetric
|
unfinished testMetric
|
||||||
longest testMetric
|
longest testMetric
|
||||||
retries testMetric
|
retries testMetric
|
||||||
// deprecated metrics
|
|
||||||
deprecatedDepth testMetric
|
|
||||||
deprecatedAdds testMetric
|
|
||||||
deprecatedLatency testMetric
|
|
||||||
deprecatedDuration testMetric
|
|
||||||
deprecatedUnfinished testMetric
|
|
||||||
deprecatedLongest testMetric
|
|
||||||
deprecatedRetries testMetric
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
|
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
|
||||||
@ -175,34 +167,6 @@ func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
|
|||||||
return &m.retries
|
return &m.retries
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *testMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
|
|
||||||
return &m.deprecatedDepth
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *testMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
|
|
||||||
return &m.deprecatedAdds
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *testMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
|
|
||||||
return &m.deprecatedLatency
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *testMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
|
|
||||||
return &m.deprecatedDuration
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *testMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
|
|
||||||
return &m.deprecatedUnfinished
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *testMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
|
|
||||||
return &m.deprecatedLongest
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *testMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
|
|
||||||
return &m.deprecatedRetries
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSinceInMicroseconds(t *testing.T) {
|
func TestSinceInMicroseconds(t *testing.T) {
|
||||||
mp := testMetricsProvider{}
|
mp := testMetricsProvider{}
|
||||||
c := clock.NewFakeClock(time.Now())
|
c := clock.NewFakeClock(time.Now())
|
||||||
@ -237,18 +201,10 @@ func TestMetrics(t *testing.T) {
|
|||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
if e, a := 1.0, mp.deprecatedAdds.gaugeValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
|
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Step(50 * time.Microsecond)
|
c.Step(50 * time.Microsecond)
|
||||||
|
|
||||||
// Start processing
|
// Start processing
|
||||||
@ -263,18 +219,6 @@ func TestMetrics(t *testing.T) {
|
|||||||
if e, a := 1, mp.latency.observationCount(); e != a {
|
if e, a := 1, mp.latency.observationCount(); e != a {
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
if e, a := 50.0, mp.deprecatedLatency.observationValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
if e, a := 1, mp.deprecatedLatency.observationCount(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
if e, a := 0.0, mp.depth.gaugeValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
if e, a := 0.0, mp.deprecatedDepth.gaugeValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add it back while processing; multiple adds of the same item are
|
// Add it back while processing; multiple adds of the same item are
|
||||||
// de-duped.
|
// de-duped.
|
||||||
@ -286,16 +230,10 @@ func TestMetrics(t *testing.T) {
|
|||||||
if e, a := 2.0, mp.adds.gaugeValue(); e != a {
|
if e, a := 2.0, mp.adds.gaugeValue(); e != a {
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
if e, a := 2.0, mp.deprecatedAdds.gaugeValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
// One thing remains in the queue
|
// One thing remains in the queue
|
||||||
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
|
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Step(25 * time.Microsecond)
|
c.Step(25 * time.Microsecond)
|
||||||
|
|
||||||
@ -308,20 +246,11 @@ func TestMetrics(t *testing.T) {
|
|||||||
if e, a := 1, mp.duration.observationCount(); e != a {
|
if e, a := 1, mp.duration.observationCount(); e != a {
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
if e, a := 25.0, mp.deprecatedDuration.observationValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
if e, a := 1, mp.deprecatedDuration.observationCount(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
// One thing remains in the queue
|
// One thing remains in the queue
|
||||||
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
|
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
// It should be back on the queue
|
// It should be back on the queue
|
||||||
i, _ = q.Get()
|
i, _ = q.Get()
|
||||||
@ -335,35 +264,20 @@ func TestMetrics(t *testing.T) {
|
|||||||
if e, a := 2, mp.latency.observationCount(); e != a {
|
if e, a := 2, mp.latency.observationCount(); e != a {
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
if e, a := 25.0, mp.deprecatedLatency.observationValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
if e, a := 2, mp.deprecatedLatency.observationCount(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
// use a channel to ensure we don't look at the metric before it's
|
// use a channel to ensure we don't look at the metric before it's
|
||||||
// been set.
|
// been set.
|
||||||
ch := make(chan struct{}, 1)
|
ch := make(chan struct{}, 1)
|
||||||
mp.unfinished.notifyCh = ch
|
mp.unfinished.notifyCh = ch
|
||||||
mp.deprecatedUnfinished.notifyCh = ch
|
|
||||||
c.Step(time.Millisecond)
|
c.Step(time.Millisecond)
|
||||||
<-ch
|
<-ch
|
||||||
<-ch
|
|
||||||
mp.unfinished.notifyCh = nil
|
mp.unfinished.notifyCh = nil
|
||||||
mp.deprecatedUnfinished.notifyCh = nil
|
|
||||||
if e, a := .001, mp.unfinished.gaugeValue(); e != a {
|
if e, a := .001, mp.unfinished.gaugeValue(); e != a {
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
if e, a := .001, mp.deprecatedUnfinished.gaugeValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
if e, a := .001, mp.longest.gaugeValue(); e != a {
|
if e, a := .001, mp.longest.gaugeValue(); e != a {
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
if e, a := 1000.0, mp.deprecatedLongest.gaugeValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Finish that one up
|
// Finish that one up
|
||||||
q.Done(i)
|
q.Done(i)
|
||||||
@ -373,10 +287,4 @@ func TestMetrics(t *testing.T) {
|
|||||||
if e, a := 2, mp.duration.observationCount(); e != a {
|
if e, a := 2, mp.duration.observationCount(); e != a {
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
if e, a := 1000.0, mp.deprecatedDuration.observationValue(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
if e, a := 2, mp.deprecatedDuration.observationCount(); e != a {
|
|
||||||
t.Errorf("expected %v, got %v", e, a)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -28,13 +28,12 @@ func TestRateLimitingQueue(t *testing.T) {
|
|||||||
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
|
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
|
||||||
fakeClock := clock.NewFakeClock(time.Now())
|
fakeClock := clock.NewFakeClock(time.Now())
|
||||||
delayingQueue := &delayingType{
|
delayingQueue := &delayingType{
|
||||||
Interface: New(),
|
Interface: New(),
|
||||||
clock: fakeClock,
|
clock: fakeClock,
|
||||||
heartbeat: fakeClock.NewTicker(maxWait),
|
heartbeat: fakeClock.NewTicker(maxWait),
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
waitingForAddCh: make(chan *waitFor, 1000),
|
waitingForAddCh: make(chan *waitFor, 1000),
|
||||||
metrics: newRetryMetrics(""),
|
metrics: newRetryMetrics(""),
|
||||||
deprecatedMetrics: newDeprecatedRetryMetrics(""),
|
|
||||||
}
|
}
|
||||||
queue.DelayingInterface = delayingQueue
|
queue.DelayingInterface = delayingQueue
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user