add longest_running_processor_microseconds metric

This commit is contained in:
Daniel Smith 2018-11-12 10:52:18 -08:00
parent 578962d934
commit fd77aa5a41
3 changed files with 44 additions and 10 deletions

View File

@ -84,6 +84,17 @@ func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) wor
return unfinished return unfinished
} }
func (prometheusMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "longest_running_procesor_microseconds",
Help: "How many microseconds has the longest running " +
"processor for " + name + " been running.",
})
prometheus.Register(unfinished)
return unfinished
}
func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{ retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name, Subsystem: name,

View File

@ -81,6 +81,7 @@ type defaultQueueMetrics struct {
// how long have current threads been working? // how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric
} }
func (m *defaultQueueMetrics) add(item t) { func (m *defaultQueueMetrics) add(item t) {
@ -120,13 +121,21 @@ func (m *defaultQueueMetrics) done(item t) {
} }
func (m *defaultQueueMetrics) updateUnfinishedWork() { func (m *defaultQueueMetrics) updateUnfinishedWork() {
// Note that a summary metric would be better for this, but prometheus
// doesn't seem to have non-hacky ways to reset the summary metrics.
var total float64 var total float64
var oldest float64
for _, t := range m.processingStartTimes { for _, t := range m.processingStartTimes {
total += m.sinceInMicroseconds(t) age := m.sinceInMicroseconds(t)
total += age
if age > oldest {
oldest = age
}
} }
// Convert to seconds; microseconds is unhelpfully granular for this. // Convert to seconds; microseconds is unhelpfully granular for this.
total /= 1000000 total /= 1000000
m.unfinishedWorkSeconds.Set(total) m.unfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest) // in microseconds.
} }
type noMetrics struct{} type noMetrics struct{}
@ -164,6 +173,7 @@ type MetricsProvider interface {
NewLatencyMetric(name string) SummaryMetric NewLatencyMetric(name string) SummaryMetric
NewWorkDurationMetric(name string) SummaryMetric NewWorkDurationMetric(name string) SummaryMetric
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric NewRetriesMetric(name string) CounterMetric
} }
@ -189,6 +199,10 @@ func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settabl
return noopMetric{} return noopMetric{}
} }
func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{} return noopMetric{}
} }
@ -221,6 +235,7 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
latency: mp.NewLatencyMetric(name), latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name), workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{}, addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{}, processingStartTimes: map[t]time.Time{},
} }

View File

@ -94,6 +94,7 @@ type testMetricsProvider struct {
latency testMetric latency testMetric
duration testMetric duration testMetric
unfinished testMetric unfinished testMetric
longest testMetric
retries testMetric retries testMetric
} }
@ -117,6 +118,10 @@ func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settab
return &m.unfinished return &m.unfinished
} }
func (m *testMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return &m.longest
}
func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return &m.retries return &m.retries
} }
@ -232,6 +237,9 @@ func TestMetrics(t *testing.T) {
if e, a := .001, mp.unfinished.gaugeValue(); e != a { if e, a := .001, mp.unfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1000.0, mp.longest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Finish that one up // Finish that one up
q.Done(i) q.Done(i)