From fd77aa5a41bbce7490dd4538c0d5743cb59b2be4 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 12 Nov 2018 10:52:18 -0800 Subject: [PATCH] add longest_running_processor_microseconds metric --- pkg/util/workqueue/prometheus/prometheus.go | 11 ++++++ .../client-go/util/workqueue/metrics.go | 35 +++++++++++++------ .../client-go/util/workqueue/metrics_test.go | 8 +++++ 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/pkg/util/workqueue/prometheus/prometheus.go b/pkg/util/workqueue/prometheus/prometheus.go index 7c1f270f6f1..c2f2cfce522 100644 --- a/pkg/util/workqueue/prometheus/prometheus.go +++ b/pkg/util/workqueue/prometheus/prometheus.go @@ -84,6 +84,17 @@ func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) wor return unfinished } +func (prometheusMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric { + unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ + Subsystem: name, + Name: "longest_running_procesor_microseconds", + Help: "How many microseconds has the longest running " + + "processor for " + name + " been running.", + }) + prometheus.Register(unfinished) + return unfinished +} + func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { retries := prometheus.NewCounter(prometheus.CounterOpts{ Subsystem: name, diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/util/workqueue/metrics.go index 51b5f2426d1..d4c03d8378f 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics.go @@ -80,7 +80,8 @@ type defaultQueueMetrics struct { processingStartTimes map[t]time.Time // how long have current threads been working? - unfinishedWorkSeconds SettableGaugeMetric + unfinishedWorkSeconds SettableGaugeMetric + longestRunningProcessor SettableGaugeMetric } func (m *defaultQueueMetrics) add(item t) { @@ -120,13 +121,21 @@ func (m *defaultQueueMetrics) done(item t) { } func (m *defaultQueueMetrics) updateUnfinishedWork() { + // Note that a summary metric would be better for this, but prometheus + // doesn't seem to have non-hacky ways to reset the summary metrics. var total float64 + var oldest float64 for _, t := range m.processingStartTimes { - total += m.sinceInMicroseconds(t) + age := m.sinceInMicroseconds(t) + total += age + if age > oldest { + oldest = age + } } // Convert to seconds; microseconds is unhelpfully granular for this. total /= 1000000 m.unfinishedWorkSeconds.Set(total) + m.longestRunningProcessor.Set(oldest) // in microseconds. } type noMetrics struct{} @@ -164,6 +173,7 @@ type MetricsProvider interface { NewLatencyMetric(name string) SummaryMetric NewWorkDurationMetric(name string) SummaryMetric NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric + NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric } @@ -189,6 +199,10 @@ func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settabl return noopMetric{} } +func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { + return noopMetric{} +} + func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } @@ -215,14 +229,15 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu return noMetrics{} } return &defaultQueueMetrics{ - clock: clock, - depth: mp.NewDepthMetric(name), - adds: mp.NewAddsMetric(name), - latency: mp.NewLatencyMetric(name), - workDuration: mp.NewWorkDurationMetric(name), - unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), - addTimes: map[t]time.Time{}, - processingStartTimes: map[t]time.Time{}, + clock: clock, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), + longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, } } diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go index 90b2cf25e77..ceacabf55fd 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -94,6 +94,7 @@ type testMetricsProvider struct { latency testMetric duration testMetric unfinished testMetric + longest testMetric retries testMetric } @@ -117,6 +118,10 @@ func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settab return &m.unfinished } +func (m *testMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { + return &m.longest +} + func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { return &m.retries } @@ -232,6 +237,9 @@ func TestMetrics(t *testing.T) { if e, a := .001, mp.unfinished.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 1000.0, mp.longest.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } // Finish that one up q.Done(i)