Merge pull request #77553 from s-urbaniak/fix-76956

pkg/util/workqueue/prometheus: fix double registration
This commit is contained in:
Kubernetes Prow Robot 2019-07-22 19:10:53 -07:00 committed by GitHub
commit f101466d2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 351 deletions

View File

@ -12,7 +12,6 @@ go_library(
deps = [
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -18,7 +18,6 @@ package prometheus
import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"github.com/prometheus/client_golang/prometheus"
)
@ -38,194 +37,120 @@ const (
RetriesKey = "retries_total"
)
var (
depth = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
},
[]string{"name"},
)
adds = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
},
[]string{"name"},
)
latency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
},
[]string{"name"},
)
workDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
},
[]string{"name"},
)
unfinished = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
},
[]string{"name"},
)
longestRunningProcessor = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
},
[]string{"name"},
)
retries = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
},
[]string{"name"},
)
)
func registerMetrics() {
prometheus.MustRegister(
depth,
adds,
latency,
workDuration,
unfinished,
longestRunningProcessor,
retries,
)
}
func init() {
registerMetrics()
workqueue.SetProvider(prometheusMetricsProvider{})
}
type prometheusMetricsProvider struct{}
func (prometheusMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(depth); err != nil {
klog.Errorf("failed to register depth metric %v: %v", name, err)
}
return depth
return depth.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
adds := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(adds); err != nil {
klog.Errorf("failed to register adds metric %v: %v", name, err)
}
return adds
return adds.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
latency := prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested.",
ConstLabels: prometheus.Labels{"name": name},
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
})
if err := prometheus.Register(latency); err != nil {
klog.Errorf("failed to register latency metric %v: %v", name, err)
}
return latency
return latency.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
workDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
ConstLabels: prometheus.Labels{"name": name},
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
})
if err := prometheus.Register(workDuration); err != nil {
klog.Errorf("failed to register workDuration metric %v: %v", name, err)
}
return workDuration
return workDuration.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register unfinished metric %v: %v", name, err)
}
return unfinished
return unfinished.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
longestRunningProcessor := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(longestRunningProcessor); err != nil {
klog.Errorf("failed to register unfinished metric %v: %v", name, err)
}
return longestRunningProcessor
return longestRunningProcessor.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(retries); err != nil {
klog.Errorf("failed to register retries metric %v: %v", name, err)
}
return retries
}
// TODO(danielqsj): Remove the following metrics, they are deprecated
func (prometheusMetricsProvider) NewDeprecatedDepthMetric(name string) workqueue.GaugeMetric {
depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "depth",
Help: "(Deprecated) Current depth of workqueue: " + name,
})
if err := prometheus.Register(depth); err != nil {
klog.Errorf("failed to register depth metric %v: %v", name, err)
}
return depth
}
func (prometheusMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue.CounterMetric {
adds := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Name: "adds",
Help: "(Deprecated) Total number of adds handled by workqueue: " + name,
})
if err := prometheus.Register(adds); err != nil {
klog.Errorf("failed to register adds metric %v: %v", name, err)
}
return adds
}
func (prometheusMetricsProvider) NewDeprecatedLatencyMetric(name string) workqueue.SummaryMetric {
latency := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name,
Name: "queue_latency",
Help: "(Deprecated) How long an item stays in workqueue" + name + " before being requested.",
})
if err := prometheus.Register(latency); err != nil {
klog.Errorf("failed to register latency metric %v: %v", name, err)
}
return latency
}
func (prometheusMetricsProvider) NewDeprecatedWorkDurationMetric(name string) workqueue.SummaryMetric {
workDuration := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name,
Name: "work_duration",
Help: "(Deprecated) How long processing an item from workqueue" + name + " takes.",
})
if err := prometheus.Register(workDuration); err != nil {
klog.Errorf("failed to register work_duration metric %v: %v", name, err)
}
return workDuration
}
func (prometheusMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "unfinished_work_seconds",
Help: "(Deprecated) How many seconds of work " + name + " has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register unfinished_work_seconds metric %v: %v", name, err)
}
return unfinished
}
func (prometheusMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "longest_running_processor_microseconds",
Help: "(Deprecated) How many microseconds has the longest running " +
"processor for " + name + " been running.",
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register longest_running_processor_microseconds metric %v: %v", name, err)
}
return unfinished
}
func (prometheusMetricsProvider) NewDeprecatedRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Name: "retries",
Help: "(Deprecated) Total number of retries handled by workqueue: " + name,
})
if err := prometheus.Register(retries); err != nil {
klog.Errorf("failed to register retries metric %v: %v", name, err)
}
return retries
return retries.WithLabelValues(name)
}

View File

@ -44,13 +44,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface {
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
deprecatedMetrics: newDeprecatedRetryMetrics(name),
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
go ret.waitingLoop()
@ -77,8 +76,7 @@ type delayingType struct {
waitingForAddCh chan *waitFor
// metrics counts the number of retries
metrics retryMetrics
deprecatedMetrics retryMetrics
metrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
@ -154,7 +152,6 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
}
q.metrics.retry()
q.deprecatedMetrics.retry()
// immediately add things with no delay
if duration <= 0 {

View File

@ -87,14 +87,6 @@ type defaultQueueMetrics struct {
// how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric
// TODO(danielqsj): Remove the following metrics, they are deprecated
deprecatedDepth GaugeMetric
deprecatedAdds CounterMetric
deprecatedLatency SummaryMetric
deprecatedWorkDuration SummaryMetric
deprecatedUnfinishedWorkSeconds SettableGaugeMetric
deprecatedLongestRunningProcessor SettableGaugeMetric
}
func (m *defaultQueueMetrics) add(item t) {
@ -103,9 +95,7 @@ func (m *defaultQueueMetrics) add(item t) {
}
m.adds.Inc()
m.deprecatedAdds.Inc()
m.depth.Inc()
m.deprecatedDepth.Inc()
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = m.clock.Now()
}
@ -117,11 +107,9 @@ func (m *defaultQueueMetrics) get(item t) {
}
m.depth.Dec()
m.deprecatedDepth.Dec()
m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(m.sinceInSeconds(startTime))
m.deprecatedLatency.Observe(m.sinceInMicroseconds(startTime))
delete(m.addTimes, item)
}
}
@ -133,7 +121,6 @@ func (m *defaultQueueMetrics) done(item t) {
if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(m.sinceInSeconds(startTime))
m.deprecatedWorkDuration.Observe(m.sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item)
}
}
@ -153,9 +140,7 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() {
// Convert to seconds; microseconds is unhelpfully granular for this.
total /= 1000000
m.unfinishedWorkSeconds.Set(total)
m.deprecatedUnfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest / 1000000)
m.deprecatedLongestRunningProcessor.Set(oldest) // in microseconds.
}
type noMetrics struct{}
@ -200,13 +185,6 @@ type MetricsProvider interface {
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric
NewDeprecatedDepthMetric(name string) GaugeMetric
NewDeprecatedAddsMetric(name string) CounterMetric
NewDeprecatedLatencyMetric(name string) SummaryMetric
NewDeprecatedWorkDurationMetric(name string) SummaryMetric
NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewDeprecatedRetriesMetric(name string) CounterMetric
}
type noopMetricsProvider struct{}
@ -239,34 +217,6 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
var globalMetricsFactory = queueMetricsFactory{
metricsProvider: noopMetricsProvider{},
}
@ -289,21 +239,15 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
return noMetrics{}
}
return &defaultQueueMetrics{
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
deprecatedDepth: mp.NewDeprecatedDepthMetric(name),
deprecatedAdds: mp.NewDeprecatedAddsMetric(name),
deprecatedLatency: mp.NewDeprecatedLatencyMetric(name),
deprecatedWorkDuration: mp.NewDeprecatedWorkDurationMetric(name),
deprecatedUnfinishedWorkSeconds: mp.NewDeprecatedUnfinishedWorkSecondsMetric(name),
deprecatedLongestRunningProcessor: mp.NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
}
}
@ -317,16 +261,6 @@ func newRetryMetrics(name string) retryMetrics {
}
}
func newDeprecatedRetryMetrics(name string) retryMetrics {
var ret *defaultRetryMetrics
if len(name) == 0 {
return ret
}
return &defaultRetryMetrics{
retries: globalMetricsFactory.metricsProvider.NewDeprecatedRetriesMetric(name),
}
}
// SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) {

View File

@ -137,14 +137,6 @@ type testMetricsProvider struct {
unfinished testMetric
longest testMetric
retries testMetric
// deprecated metrics
deprecatedDepth testMetric
deprecatedAdds testMetric
deprecatedLatency testMetric
deprecatedDuration testMetric
deprecatedUnfinished testMetric
deprecatedLongest testMetric
deprecatedRetries testMetric
}
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
@ -175,34 +167,6 @@ func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return &m.retries
}
func (m *testMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
return &m.deprecatedDepth
}
func (m *testMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
return &m.deprecatedAdds
}
func (m *testMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
return &m.deprecatedLatency
}
func (m *testMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
return &m.deprecatedDuration
}
func (m *testMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return &m.deprecatedUnfinished
}
func (m *testMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return &m.deprecatedLongest
}
func (m *testMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
return &m.deprecatedRetries
}
func TestSinceInMicroseconds(t *testing.T) {
mp := testMetricsProvider{}
c := clock.NewFakeClock(time.Now())
@ -237,18 +201,10 @@ func TestMetrics(t *testing.T) {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.deprecatedAdds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(50 * time.Microsecond)
// Start processing
@ -263,18 +219,6 @@ func TestMetrics(t *testing.T) {
if e, a := 1, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 50.0, mp.deprecatedLatency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.deprecatedLatency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Add it back while processing; multiple adds of the same item are
// de-duped.
@ -286,16 +230,10 @@ func TestMetrics(t *testing.T) {
if e, a := 2.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2.0, mp.deprecatedAdds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(25 * time.Microsecond)
@ -308,20 +246,11 @@ func TestMetrics(t *testing.T) {
if e, a := 1, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 25.0, mp.deprecatedDuration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.deprecatedDuration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// It should be back on the queue
i, _ = q.Get()
@ -335,35 +264,20 @@ func TestMetrics(t *testing.T) {
if e, a := 2, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 25.0, mp.deprecatedLatency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.deprecatedLatency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// use a channel to ensure we don't look at the metric before it's
// been set.
ch := make(chan struct{}, 1)
mp.unfinished.notifyCh = ch
mp.deprecatedUnfinished.notifyCh = ch
c.Step(time.Millisecond)
<-ch
<-ch
mp.unfinished.notifyCh = nil
mp.deprecatedUnfinished.notifyCh = nil
if e, a := .001, mp.unfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := .001, mp.deprecatedUnfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := .001, mp.longest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1000.0, mp.deprecatedLongest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Finish that one up
q.Done(i)
@ -373,10 +287,4 @@ func TestMetrics(t *testing.T) {
if e, a := 2, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1000.0, mp.deprecatedDuration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.deprecatedDuration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -28,13 +28,12 @@ func TestRateLimitingQueue(t *testing.T) {
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
fakeClock := clock.NewFakeClock(time.Now())
delayingQueue := &delayingType{
Interface: New(),
clock: fakeClock,
heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),
deprecatedMetrics: newDeprecatedRetryMetrics(""),
Interface: New(),
clock: fakeClock,
heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),
}
queue.DelayingInterface = delayingQueue