pkg/util/workqueue: delete deprecated metrics

This deletes deprecated metrics and simplifies registration.

Kubernetes-commit: 4532cfd85c00da6f64e03fcf05f5636adc1151c7
This commit is contained in:
Sergiusz Urbaniak 2019-05-13 13:22:08 +02:00 committed by Kubernetes Publisher
parent 8720706276
commit ee078c72e2
4 changed files with 22 additions and 184 deletions

View File

@ -50,7 +50,6 @@ func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000), waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name), metrics: newRetryMetrics(name),
deprecatedMetrics: newDeprecatedRetryMetrics(name),
} }
go ret.waitingLoop() go ret.waitingLoop()
@ -78,7 +77,6 @@ type delayingType struct {
// metrics counts the number of retries // metrics counts the number of retries
metrics retryMetrics metrics retryMetrics
deprecatedMetrics retryMetrics
} }
// waitFor holds the data to add and the time it should be added // waitFor holds the data to add and the time it should be added
@ -154,7 +152,6 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
} }
q.metrics.retry() q.metrics.retry()
q.deprecatedMetrics.retry()
// immediately add things with no delay // immediately add things with no delay
if duration <= 0 { if duration <= 0 {

View File

@ -87,14 +87,6 @@ type defaultQueueMetrics struct {
// how long have current threads been working? // how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric longestRunningProcessor SettableGaugeMetric
// TODO(danielqsj): Remove the following metrics, they are deprecated
deprecatedDepth GaugeMetric
deprecatedAdds CounterMetric
deprecatedLatency SummaryMetric
deprecatedWorkDuration SummaryMetric
deprecatedUnfinishedWorkSeconds SettableGaugeMetric
deprecatedLongestRunningProcessor SettableGaugeMetric
} }
func (m *defaultQueueMetrics) add(item t) { func (m *defaultQueueMetrics) add(item t) {
@ -103,9 +95,7 @@ func (m *defaultQueueMetrics) add(item t) {
} }
m.adds.Inc() m.adds.Inc()
m.deprecatedAdds.Inc()
m.depth.Inc() m.depth.Inc()
m.deprecatedDepth.Inc()
if _, exists := m.addTimes[item]; !exists { if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = m.clock.Now() m.addTimes[item] = m.clock.Now()
} }
@ -117,11 +107,9 @@ func (m *defaultQueueMetrics) get(item t) {
} }
m.depth.Dec() m.depth.Dec()
m.deprecatedDepth.Dec()
m.processingStartTimes[item] = m.clock.Now() m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists { if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(m.sinceInSeconds(startTime)) m.latency.Observe(m.sinceInSeconds(startTime))
m.deprecatedLatency.Observe(m.sinceInMicroseconds(startTime))
delete(m.addTimes, item) delete(m.addTimes, item)
} }
} }
@ -133,7 +121,6 @@ func (m *defaultQueueMetrics) done(item t) {
if startTime, exists := m.processingStartTimes[item]; exists { if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(m.sinceInSeconds(startTime)) m.workDuration.Observe(m.sinceInSeconds(startTime))
m.deprecatedWorkDuration.Observe(m.sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item) delete(m.processingStartTimes, item)
} }
} }
@ -153,9 +140,7 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() {
// Convert to seconds; microseconds is unhelpfully granular for this. // Convert to seconds; microseconds is unhelpfully granular for this.
total /= 1000000 total /= 1000000
m.unfinishedWorkSeconds.Set(total) m.unfinishedWorkSeconds.Set(total)
m.deprecatedUnfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest / 1000000) m.longestRunningProcessor.Set(oldest / 1000000)
m.deprecatedLongestRunningProcessor.Set(oldest) // in microseconds.
} }
type noMetrics struct{} type noMetrics struct{}
@ -200,13 +185,6 @@ type MetricsProvider interface {
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric NewRetriesMetric(name string) CounterMetric
NewDeprecatedDepthMetric(name string) GaugeMetric
NewDeprecatedAddsMetric(name string) CounterMetric
NewDeprecatedLatencyMetric(name string) SummaryMetric
NewDeprecatedWorkDurationMetric(name string) SummaryMetric
NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewDeprecatedRetriesMetric(name string) CounterMetric
} }
type noopMetricsProvider struct{} type noopMetricsProvider struct{}
@ -239,34 +217,6 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{} return noopMetric{}
} }
func (_ noopMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
var globalMetricsFactory = queueMetricsFactory{ var globalMetricsFactory = queueMetricsFactory{
metricsProvider: noopMetricsProvider{}, metricsProvider: noopMetricsProvider{},
} }
@ -296,12 +246,6 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
workDuration: mp.NewWorkDurationMetric(name), workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
deprecatedDepth: mp.NewDeprecatedDepthMetric(name),
deprecatedAdds: mp.NewDeprecatedAddsMetric(name),
deprecatedLatency: mp.NewDeprecatedLatencyMetric(name),
deprecatedWorkDuration: mp.NewDeprecatedWorkDurationMetric(name),
deprecatedUnfinishedWorkSeconds: mp.NewDeprecatedUnfinishedWorkSecondsMetric(name),
deprecatedLongestRunningProcessor: mp.NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{}, addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{}, processingStartTimes: map[t]time.Time{},
} }
@ -317,16 +261,6 @@ func newRetryMetrics(name string) retryMetrics {
} }
} }
func newDeprecatedRetryMetrics(name string) retryMetrics {
var ret *defaultRetryMetrics
if len(name) == 0 {
return ret
}
return &defaultRetryMetrics{
retries: globalMetricsFactory.metricsProvider.NewDeprecatedRetriesMetric(name),
}
}
// SetProvider sets the metrics provider for all subsequently created work // SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect. // queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) { func SetProvider(metricsProvider MetricsProvider) {

View File

@ -137,14 +137,6 @@ type testMetricsProvider struct {
unfinished testMetric unfinished testMetric
longest testMetric longest testMetric
retries testMetric retries testMetric
// deprecated metrics
deprecatedDepth testMetric
deprecatedAdds testMetric
deprecatedLatency testMetric
deprecatedDuration testMetric
deprecatedUnfinished testMetric
deprecatedLongest testMetric
deprecatedRetries testMetric
} }
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric { func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
@ -175,34 +167,6 @@ func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return &m.retries return &m.retries
} }
func (m *testMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
return &m.deprecatedDepth
}
func (m *testMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
return &m.deprecatedAdds
}
func (m *testMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
return &m.deprecatedLatency
}
func (m *testMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
return &m.deprecatedDuration
}
func (m *testMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return &m.deprecatedUnfinished
}
func (m *testMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return &m.deprecatedLongest
}
func (m *testMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
return &m.deprecatedRetries
}
func TestSinceInMicroseconds(t *testing.T) { func TestSinceInMicroseconds(t *testing.T) {
mp := testMetricsProvider{} mp := testMetricsProvider{}
c := clock.NewFakeClock(time.Now()) c := clock.NewFakeClock(time.Now())
@ -237,18 +201,10 @@ func TestMetrics(t *testing.T) {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1.0, mp.deprecatedAdds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.depth.gaugeValue(); e != a { if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(50 * time.Microsecond) c.Step(50 * time.Microsecond)
// Start processing // Start processing
@ -263,18 +219,6 @@ func TestMetrics(t *testing.T) {
if e, a := 1, mp.latency.observationCount(); e != a { if e, a := 1, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 50.0, mp.deprecatedLatency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.deprecatedLatency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Add it back while processing; multiple adds of the same item are // Add it back while processing; multiple adds of the same item are
// de-duped. // de-duped.
@ -286,16 +230,10 @@ func TestMetrics(t *testing.T) {
if e, a := 2.0, mp.adds.gaugeValue(); e != a { if e, a := 2.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 2.0, mp.deprecatedAdds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue // One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a { if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(25 * time.Microsecond) c.Step(25 * time.Microsecond)
@ -308,20 +246,11 @@ func TestMetrics(t *testing.T) {
if e, a := 1, mp.duration.observationCount(); e != a { if e, a := 1, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 25.0, mp.deprecatedDuration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.deprecatedDuration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue // One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a { if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// It should be back on the queue // It should be back on the queue
i, _ = q.Get() i, _ = q.Get()
@ -335,35 +264,20 @@ func TestMetrics(t *testing.T) {
if e, a := 2, mp.latency.observationCount(); e != a { if e, a := 2, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 25.0, mp.deprecatedLatency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.deprecatedLatency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// use a channel to ensure we don't look at the metric before it's // use a channel to ensure we don't look at the metric before it's
// been set. // been set.
ch := make(chan struct{}, 1) ch := make(chan struct{}, 1)
mp.unfinished.notifyCh = ch mp.unfinished.notifyCh = ch
mp.deprecatedUnfinished.notifyCh = ch
c.Step(time.Millisecond) c.Step(time.Millisecond)
<-ch <-ch
<-ch
mp.unfinished.notifyCh = nil mp.unfinished.notifyCh = nil
mp.deprecatedUnfinished.notifyCh = nil
if e, a := .001, mp.unfinished.gaugeValue(); e != a { if e, a := .001, mp.unfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := .001, mp.deprecatedUnfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := .001, mp.longest.gaugeValue(); e != a { if e, a := .001, mp.longest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1000.0, mp.deprecatedLongest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Finish that one up // Finish that one up
q.Done(i) q.Done(i)
@ -373,10 +287,4 @@ func TestMetrics(t *testing.T) {
if e, a := 2, mp.duration.observationCount(); e != a { if e, a := 2, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1000.0, mp.deprecatedDuration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.deprecatedDuration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
} }

View File

@ -34,7 +34,6 @@ func TestRateLimitingQueue(t *testing.T) {
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000), waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""), metrics: newRetryMetrics(""),
deprecatedMetrics: newDeprecatedRetryMetrics(""),
} }
queue.DelayingInterface = delayingQueue queue.DelayingInterface = delayingQueue