diff --git a/util/workqueue/delaying_queue.go b/util/workqueue/delaying_queue.go index 6c9e9447..07327988 100644 --- a/util/workqueue/delaying_queue.go +++ b/util/workqueue/delaying_queue.go @@ -44,13 +44,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface { func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { ret := &delayingType{ - Interface: NewNamed(name), - clock: clock, - heartbeat: clock.NewTicker(maxWait), - stopCh: make(chan struct{}), - waitingForAddCh: make(chan *waitFor, 1000), - metrics: newRetryMetrics(name), - deprecatedMetrics: newDeprecatedRetryMetrics(name), + Interface: NewNamed(name), + clock: clock, + heartbeat: clock.NewTicker(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan *waitFor, 1000), + metrics: newRetryMetrics(name), } go ret.waitingLoop() @@ -77,8 +76,7 @@ type delayingType struct { waitingForAddCh chan *waitFor // metrics counts the number of retries - metrics retryMetrics - deprecatedMetrics retryMetrics + metrics retryMetrics } // waitFor holds the data to add and the time it should be added @@ -154,7 +152,6 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { } q.metrics.retry() - q.deprecatedMetrics.retry() // immediately add things with no delay if duration <= 0 { diff --git a/util/workqueue/metrics.go b/util/workqueue/metrics.go index be23ddd0..a3911bf2 100644 --- a/util/workqueue/metrics.go +++ b/util/workqueue/metrics.go @@ -87,14 +87,6 @@ type defaultQueueMetrics struct { // how long have current threads been working? unfinishedWorkSeconds SettableGaugeMetric longestRunningProcessor SettableGaugeMetric - - // TODO(danielqsj): Remove the following metrics, they are deprecated - deprecatedDepth GaugeMetric - deprecatedAdds CounterMetric - deprecatedLatency SummaryMetric - deprecatedWorkDuration SummaryMetric - deprecatedUnfinishedWorkSeconds SettableGaugeMetric - deprecatedLongestRunningProcessor SettableGaugeMetric } func (m *defaultQueueMetrics) add(item t) { @@ -103,9 +95,7 @@ func (m *defaultQueueMetrics) add(item t) { } m.adds.Inc() - m.deprecatedAdds.Inc() m.depth.Inc() - m.deprecatedDepth.Inc() if _, exists := m.addTimes[item]; !exists { m.addTimes[item] = m.clock.Now() } @@ -117,11 +107,9 @@ func (m *defaultQueueMetrics) get(item t) { } m.depth.Dec() - m.deprecatedDepth.Dec() m.processingStartTimes[item] = m.clock.Now() if startTime, exists := m.addTimes[item]; exists { m.latency.Observe(m.sinceInSeconds(startTime)) - m.deprecatedLatency.Observe(m.sinceInMicroseconds(startTime)) delete(m.addTimes, item) } } @@ -133,7 +121,6 @@ func (m *defaultQueueMetrics) done(item t) { if startTime, exists := m.processingStartTimes[item]; exists { m.workDuration.Observe(m.sinceInSeconds(startTime)) - m.deprecatedWorkDuration.Observe(m.sinceInMicroseconds(startTime)) delete(m.processingStartTimes, item) } } @@ -153,9 +140,7 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() { // Convert to seconds; microseconds is unhelpfully granular for this. total /= 1000000 m.unfinishedWorkSeconds.Set(total) - m.deprecatedUnfinishedWorkSeconds.Set(total) m.longestRunningProcessor.Set(oldest / 1000000) - m.deprecatedLongestRunningProcessor.Set(oldest) // in microseconds. } type noMetrics struct{} @@ -200,13 +185,6 @@ type MetricsProvider interface { NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric - NewDeprecatedDepthMetric(name string) GaugeMetric - NewDeprecatedAddsMetric(name string) CounterMetric - NewDeprecatedLatencyMetric(name string) SummaryMetric - NewDeprecatedWorkDurationMetric(name string) SummaryMetric - NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric - NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric - NewDeprecatedRetriesMetric(name string) CounterMetric } type noopMetricsProvider struct{} @@ -239,34 +217,6 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } -func (_ noopMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { - return noopMetric{} -} - -func (_ noopMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric { - return noopMetric{} -} - var globalMetricsFactory = queueMetricsFactory{ metricsProvider: noopMetricsProvider{}, } @@ -289,21 +239,15 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu return noMetrics{} } return &defaultQueueMetrics{ - clock: clock, - depth: mp.NewDepthMetric(name), - adds: mp.NewAddsMetric(name), - latency: mp.NewLatencyMetric(name), - workDuration: mp.NewWorkDurationMetric(name), - unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), - longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), - deprecatedDepth: mp.NewDeprecatedDepthMetric(name), - deprecatedAdds: mp.NewDeprecatedAddsMetric(name), - deprecatedLatency: mp.NewDeprecatedLatencyMetric(name), - deprecatedWorkDuration: mp.NewDeprecatedWorkDurationMetric(name), - deprecatedUnfinishedWorkSeconds: mp.NewDeprecatedUnfinishedWorkSecondsMetric(name), - deprecatedLongestRunningProcessor: mp.NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name), - addTimes: map[t]time.Time{}, - processingStartTimes: map[t]time.Time{}, + clock: clock, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), + longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, } } @@ -317,16 +261,6 @@ func newRetryMetrics(name string) retryMetrics { } } -func newDeprecatedRetryMetrics(name string) retryMetrics { - var ret *defaultRetryMetrics - if len(name) == 0 { - return ret - } - return &defaultRetryMetrics{ - retries: globalMetricsFactory.metricsProvider.NewDeprecatedRetriesMetric(name), - } -} - // SetProvider sets the metrics provider for all subsequently created work // queues. Only the first call has an effect. func SetProvider(metricsProvider MetricsProvider) { diff --git a/util/workqueue/metrics_test.go b/util/workqueue/metrics_test.go index ceb8f4c9..d1178285 100644 --- a/util/workqueue/metrics_test.go +++ b/util/workqueue/metrics_test.go @@ -137,14 +137,6 @@ type testMetricsProvider struct { unfinished testMetric longest testMetric retries testMetric - // deprecated metrics - deprecatedDepth testMetric - deprecatedAdds testMetric - deprecatedLatency testMetric - deprecatedDuration testMetric - deprecatedUnfinished testMetric - deprecatedLongest testMetric - deprecatedRetries testMetric } func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric { @@ -175,34 +167,6 @@ func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { return &m.retries } -func (m *testMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric { - return &m.deprecatedDepth -} - -func (m *testMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric { - return &m.deprecatedAdds -} - -func (m *testMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric { - return &m.deprecatedLatency -} - -func (m *testMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric { - return &m.deprecatedDuration -} - -func (m *testMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { - return &m.deprecatedUnfinished -} - -func (m *testMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { - return &m.deprecatedLongest -} - -func (m *testMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric { - return &m.deprecatedRetries -} - func TestSinceInMicroseconds(t *testing.T) { mp := testMetricsProvider{} c := clock.NewFakeClock(time.Now()) @@ -237,18 +201,10 @@ func TestMetrics(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1.0, mp.deprecatedAdds.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 1.0, mp.depth.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - c.Step(50 * time.Microsecond) // Start processing @@ -263,18 +219,6 @@ func TestMetrics(t *testing.T) { if e, a := 1, mp.latency.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 50.0, mp.deprecatedLatency.observationValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 1, mp.deprecatedLatency.observationCount(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 0.0, mp.depth.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 0.0, mp.deprecatedDepth.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // Add it back while processing; multiple adds of the same item are // de-duped. @@ -286,16 +230,10 @@ func TestMetrics(t *testing.T) { if e, a := 2.0, mp.adds.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 2.0, mp.deprecatedAdds.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // One thing remains in the queue if e, a := 1.0, mp.depth.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } c.Step(25 * time.Microsecond) @@ -308,20 +246,11 @@ func TestMetrics(t *testing.T) { if e, a := 1, mp.duration.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 25.0, mp.deprecatedDuration.observationValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 1, mp.deprecatedDuration.observationCount(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // One thing remains in the queue if e, a := 1.0, mp.depth.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // It should be back on the queue i, _ = q.Get() @@ -335,35 +264,20 @@ func TestMetrics(t *testing.T) { if e, a := 2, mp.latency.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 25.0, mp.deprecatedLatency.observationValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 2, mp.deprecatedLatency.observationCount(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // use a channel to ensure we don't look at the metric before it's // been set. ch := make(chan struct{}, 1) mp.unfinished.notifyCh = ch - mp.deprecatedUnfinished.notifyCh = ch c.Step(time.Millisecond) <-ch - <-ch mp.unfinished.notifyCh = nil - mp.deprecatedUnfinished.notifyCh = nil if e, a := .001, mp.unfinished.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := .001, mp.deprecatedUnfinished.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } if e, a := .001, mp.longest.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1000.0, mp.deprecatedLongest.gaugeValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } // Finish that one up q.Done(i) @@ -373,10 +287,4 @@ func TestMetrics(t *testing.T) { if e, a := 2, mp.duration.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1000.0, mp.deprecatedDuration.observationValue(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - if e, a := 2, mp.deprecatedDuration.observationCount(); e != a { - t.Errorf("expected %v, got %v", e, a) - } } diff --git a/util/workqueue/rate_limiting_queue_test.go b/util/workqueue/rate_limiting_queue_test.go index daa0d860..3fbe07d0 100644 --- a/util/workqueue/rate_limiting_queue_test.go +++ b/util/workqueue/rate_limiting_queue_test.go @@ -28,13 +28,12 @@ func TestRateLimitingQueue(t *testing.T) { queue := NewRateLimitingQueue(limiter).(*rateLimitingType) fakeClock := clock.NewFakeClock(time.Now()) delayingQueue := &delayingType{ - Interface: New(), - clock: fakeClock, - heartbeat: fakeClock.NewTicker(maxWait), - stopCh: make(chan struct{}), - waitingForAddCh: make(chan *waitFor, 1000), - metrics: newRetryMetrics(""), - deprecatedMetrics: newDeprecatedRetryMetrics(""), + Interface: New(), + clock: fakeClock, + heartbeat: fakeClock.NewTicker(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan *waitFor, 1000), + metrics: newRetryMetrics(""), } queue.DelayingInterface = delayingQueue