From c270b352df14c6441431e4c008b12265f40118a9 Mon Sep 17 00:00:00 2001 From: danielqsj Date: Wed, 21 Nov 2018 11:43:47 +0800 Subject: [PATCH 1/2] Use prometheus conventions for workqueue metrics Kubernetes-commit: b828bc1a50dfa29ab85248e5af6da5acb27189c1 --- util/workqueue/delaying_queue.go | 17 ++-- util/workqueue/metrics.go | 99 +++++++++++++++++--- util/workqueue/metrics_test.go | 101 +++++++++++++++++++-- util/workqueue/rate_limiting_queue_test.go | 13 +-- 4 files changed, 197 insertions(+), 33 deletions(-) diff --git a/util/workqueue/delaying_queue.go b/util/workqueue/delaying_queue.go index a3717742..bd654bf3 100644 --- a/util/workqueue/delaying_queue.go +++ b/util/workqueue/delaying_queue.go @@ -43,12 +43,13 @@ func NewNamedDelayingQueue(name string) DelayingInterface { func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { ret := &delayingType{ - Interface: NewNamed(name), - clock: clock, - heartbeat: clock.NewTicker(maxWait), - stopCh: make(chan struct{}), - waitingForAddCh: make(chan *waitFor, 1000), - metrics: newRetryMetrics(name), + Interface: NewNamed(name), + clock: clock, + heartbeat: clock.NewTicker(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan *waitFor, 1000), + metrics: newRetryMetrics(name), + deprecatedMetrics: newDeprecatedRetryMetrics(name), } go ret.waitingLoop() @@ -73,7 +74,8 @@ type delayingType struct { waitingForAddCh chan *waitFor // metrics counts the number of retries - metrics retryMetrics + metrics retryMetrics + deprecatedMetrics retryMetrics } // waitFor holds the data to add and the time it should be added @@ -146,6 +148,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { } q.metrics.retry() + q.deprecatedMetrics.retry() // immediately add things with no delay if duration <= 0 { diff --git a/util/workqueue/metrics.go b/util/workqueue/metrics.go index d4c03d83..f13f106e 100644 --- a/util/workqueue/metrics.go +++ b/util/workqueue/metrics.go @@ -82,6 +82,14 @@ type defaultQueueMetrics struct { // how long have current threads been working? unfinishedWorkSeconds SettableGaugeMetric longestRunningProcessor SettableGaugeMetric + + // TODO(danielqsj): Remove the following metrics, they are deprecated + deprecatedDepth GaugeMetric + deprecatedAdds CounterMetric + deprecatedLatency SummaryMetric + deprecatedWorkDuration SummaryMetric + deprecatedUnfinishedWorkSeconds SettableGaugeMetric + deprecatedLongestRunningProcessor SettableGaugeMetric } func (m *defaultQueueMetrics) add(item t) { @@ -90,7 +98,9 @@ func (m *defaultQueueMetrics) add(item t) { } m.adds.Inc() + m.deprecatedAdds.Inc() m.depth.Inc() + m.deprecatedDepth.Inc() if _, exists := m.addTimes[item]; !exists { m.addTimes[item] = m.clock.Now() } @@ -102,9 +112,11 @@ func (m *defaultQueueMetrics) get(item t) { } m.depth.Dec() + m.deprecatedDepth.Dec() m.processingStartTimes[item] = m.clock.Now() if startTime, exists := m.addTimes[item]; exists { - m.latency.Observe(m.sinceInMicroseconds(startTime)) + m.latency.Observe(m.sinceInSeconds(startTime)) + m.deprecatedLatency.Observe(m.sinceInMicroseconds(startTime)) delete(m.addTimes, item) } } @@ -115,7 +127,8 @@ func (m *defaultQueueMetrics) done(item t) { } if startTime, exists := m.processingStartTimes[item]; exists { - m.workDuration.Observe(m.sinceInMicroseconds(startTime)) + m.workDuration.Observe(m.sinceInSeconds(startTime)) + m.deprecatedWorkDuration.Observe(m.sinceInMicroseconds(startTime)) delete(m.processingStartTimes, item) } } @@ -135,7 +148,9 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() { // Convert to seconds; microseconds is unhelpfully granular for this. total /= 1000000 m.unfinishedWorkSeconds.Set(total) - m.longestRunningProcessor.Set(oldest) // in microseconds. + m.deprecatedUnfinishedWorkSeconds.Set(total) + m.longestRunningProcessor.Set(oldest / 1000000) + m.deprecatedLongestRunningProcessor.Set(oldest) // in microseconds. } type noMetrics struct{} @@ -150,6 +165,11 @@ func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 { return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) } +// Gets the time since the specified start in seconds. +func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 { + return m.clock.Since(start).Seconds() +} + type retryMetrics interface { retry() } @@ -173,8 +193,15 @@ type MetricsProvider interface { NewLatencyMetric(name string) SummaryMetric NewWorkDurationMetric(name string) SummaryMetric NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric - NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric + NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric + NewDeprecatedDepthMetric(name string) GaugeMetric + NewDeprecatedAddsMetric(name string) CounterMetric + NewDeprecatedLatencyMetric(name string) SummaryMetric + NewDeprecatedWorkDurationMetric(name string) SummaryMetric + NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric + NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric + NewDeprecatedRetriesMetric(name string) CounterMetric } type noopMetricsProvider struct{} @@ -199,7 +226,7 @@ func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settabl return noopMetric{} } -func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { +func (_ noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric { return noopMetric{} } @@ -207,6 +234,34 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } +func (_ noopMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric { + return noopMetric{} +} + var globalMetricsFactory = queueMetricsFactory{ metricsProvider: noopMetricsProvider{}, } @@ -229,15 +284,21 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu return noMetrics{} } return &defaultQueueMetrics{ - clock: clock, - depth: mp.NewDepthMetric(name), - adds: mp.NewAddsMetric(name), - latency: mp.NewLatencyMetric(name), - workDuration: mp.NewWorkDurationMetric(name), - unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), - longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name), - addTimes: map[t]time.Time{}, - processingStartTimes: map[t]time.Time{}, + clock: clock, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), + longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), + deprecatedDepth: mp.NewDeprecatedDepthMetric(name), + deprecatedAdds: mp.NewDeprecatedAddsMetric(name), + deprecatedLatency: mp.NewDeprecatedLatencyMetric(name), + deprecatedWorkDuration: mp.NewDeprecatedWorkDurationMetric(name), + deprecatedUnfinishedWorkSeconds: mp.NewDeprecatedUnfinishedWorkSecondsMetric(name), + deprecatedLongestRunningProcessor: mp.NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, } } @@ -251,6 +312,16 @@ func newRetryMetrics(name string) retryMetrics { } } +func newDeprecatedRetryMetrics(name string) retryMetrics { + var ret *defaultRetryMetrics + if len(name) == 0 { + return ret + } + return &defaultRetryMetrics{ + retries: globalMetricsFactory.metricsProvider.NewDeprecatedRetriesMetric(name), + } +} + // SetProvider sets the metrics provider for all subsequently created work // queues. Only the first call has an effect. func SetProvider(metricsProvider MetricsProvider) { diff --git a/util/workqueue/metrics_test.go b/util/workqueue/metrics_test.go index 117f9080..fa299009 100644 --- a/util/workqueue/metrics_test.go +++ b/util/workqueue/metrics_test.go @@ -137,6 +137,14 @@ type testMetricsProvider struct { unfinished testMetric longest testMetric retries testMetric + // deprecated metrics + deprecatedDepth testMetric + deprecatedAdds testMetric + deprecatedLatency testMetric + deprecatedDuration testMetric + deprecatedUnfinished testMetric + deprecatedLongest testMetric + deprecatedRetries testMetric } func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric { @@ -159,7 +167,7 @@ func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settab return &m.unfinished } -func (m *testMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { +func (m *testMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric { return &m.longest } @@ -167,6 +175,34 @@ func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { return &m.retries } +func (m *testMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric { + return &m.deprecatedDepth +} + +func (m *testMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric { + return &m.deprecatedAdds +} + +func (m *testMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric { + return &m.deprecatedLatency +} + +func (m *testMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric { + return &m.deprecatedDuration +} + +func (m *testMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { + return &m.deprecatedUnfinished +} + +func (m *testMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { + return &m.deprecatedLongest +} + +func (m *testMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric { + return &m.deprecatedRetries +} + func TestSinceInMicroseconds(t *testing.T) { mp := testMetricsProvider{} c := clock.NewFakeClock(time.Now()) @@ -201,10 +237,18 @@ func TestMetrics(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } + if e, a := 1.0, mp.deprecatedAdds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1.0, mp.depth.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + c.Step(50 * time.Microsecond) // Start processing @@ -213,15 +257,24 @@ func TestMetrics(t *testing.T) { t.Errorf("Expected %v, got %v", "foo", i) } - if e, a := 50.0, mp.latency.observationValue(); e != a { + if e, a := 5e-05, mp.latency.observationValue(); e != a { t.Errorf("expected %v, got %v", e, a) } if e, a := 1, mp.latency.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 50.0, mp.deprecatedLatency.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.deprecatedLatency.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } if e, a := 0.0, mp.depth.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 0.0, mp.deprecatedDepth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } // Add it back while processing; multiple adds of the same item are // de-duped. @@ -233,27 +286,42 @@ func TestMetrics(t *testing.T) { if e, a := 2.0, mp.adds.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 2.0, mp.deprecatedAdds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } // One thing remains in the queue if e, a := 1.0, mp.depth.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } c.Step(25 * time.Microsecond) // Finish it up q.Done(i) - if e, a := 25.0, mp.duration.observationValue(); e != a { + if e, a := 2.5e-05, mp.duration.observationValue(); e != a { t.Errorf("expected %v, got %v", e, a) } if e, a := 1, mp.duration.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 25.0, mp.deprecatedDuration.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.deprecatedDuration.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } // One thing remains in the queue if e, a := 1.0, mp.depth.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } // It should be back on the queue i, _ = q.Get() @@ -261,33 +329,54 @@ func TestMetrics(t *testing.T) { t.Errorf("Expected %v, got %v", "foo", i) } - if e, a := 25.0, mp.latency.observationValue(); e != a { + if e, a := 2.5e-05, mp.latency.observationValue(); e != a { t.Errorf("expected %v, got %v", e, a) } if e, a := 2, mp.latency.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 25.0, mp.deprecatedLatency.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.deprecatedLatency.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } // use a channel to ensure we don't look at the metric before it's // been set. ch := make(chan struct{}, 1) mp.unfinished.notifyCh = ch + mp.deprecatedUnfinished.notifyCh = ch c.Step(time.Millisecond) <-ch + <-ch mp.unfinished.notifyCh = nil + mp.deprecatedUnfinished.notifyCh = nil if e, a := .001, mp.unfinished.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1000.0, mp.longest.gaugeValue(); e != a { + if e, a := .001, mp.deprecatedUnfinished.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := .001, mp.longest.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1000.0, mp.deprecatedLongest.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } // Finish that one up q.Done(i) - if e, a := 1000.0, mp.duration.observationValue(); e != a { + if e, a := .001, mp.duration.observationValue(); e != a { t.Errorf("expected %v, got %v", e, a) } if e, a := 2, mp.duration.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 1000.0, mp.deprecatedDuration.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.deprecatedDuration.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } } diff --git a/util/workqueue/rate_limiting_queue_test.go b/util/workqueue/rate_limiting_queue_test.go index 3fbe07d0..daa0d860 100644 --- a/util/workqueue/rate_limiting_queue_test.go +++ b/util/workqueue/rate_limiting_queue_test.go @@ -28,12 +28,13 @@ func TestRateLimitingQueue(t *testing.T) { queue := NewRateLimitingQueue(limiter).(*rateLimitingType) fakeClock := clock.NewFakeClock(time.Now()) delayingQueue := &delayingType{ - Interface: New(), - clock: fakeClock, - heartbeat: fakeClock.NewTicker(maxWait), - stopCh: make(chan struct{}), - waitingForAddCh: make(chan *waitFor, 1000), - metrics: newRetryMetrics(""), + Interface: New(), + clock: fakeClock, + heartbeat: fakeClock.NewTicker(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan *waitFor, 1000), + metrics: newRetryMetrics(""), + deprecatedMetrics: newDeprecatedRetryMetrics(""), } queue.DelayingInterface = delayingQueue From 6658e1f4a301b5c5ebcfc7e5160415448f8746ce Mon Sep 17 00:00:00 2001 From: danielqsj Date: Wed, 12 Dec 2018 16:50:32 +0800 Subject: [PATCH 2/2] Using histogram metrics instead of summary Kubernetes-commit: 42214c5ac423289a6c87e0f3c1014f652d58bf62 --- util/workqueue/metrics.go | 17 +++++++++++------ util/workqueue/metrics_test.go | 4 ++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/util/workqueue/metrics.go b/util/workqueue/metrics.go index f13f106e..be23ddd0 100644 --- a/util/workqueue/metrics.go +++ b/util/workqueue/metrics.go @@ -57,6 +57,11 @@ type SummaryMetric interface { Observe(float64) } +// HistogramMetric counts individual observations. +type HistogramMetric interface { + Observe(float64) +} + type noopMetric struct{} func (noopMetric) Inc() {} @@ -73,9 +78,9 @@ type defaultQueueMetrics struct { // total number of adds handled by a workqueue adds CounterMetric // how long an item stays in a workqueue - latency SummaryMetric + latency HistogramMetric // how long processing an item from a workqueue takes - workDuration SummaryMetric + workDuration HistogramMetric addTimes map[t]time.Time processingStartTimes map[t]time.Time @@ -190,8 +195,8 @@ func (m *defaultRetryMetrics) retry() { type MetricsProvider interface { NewDepthMetric(name string) GaugeMetric NewAddsMetric(name string) CounterMetric - NewLatencyMetric(name string) SummaryMetric - NewWorkDurationMetric(name string) SummaryMetric + NewLatencyMetric(name string) HistogramMetric + NewWorkDurationMetric(name string) HistogramMetric NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric @@ -214,11 +219,11 @@ func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric { return noopMetric{} } -func (_ noopMetricsProvider) NewLatencyMetric(name string) SummaryMetric { +func (_ noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric { return noopMetric{} } -func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { +func (_ noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric { return noopMetric{} } diff --git a/util/workqueue/metrics_test.go b/util/workqueue/metrics_test.go index fa299009..ceb8f4c9 100644 --- a/util/workqueue/metrics_test.go +++ b/util/workqueue/metrics_test.go @@ -155,11 +155,11 @@ func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric { return &m.adds } -func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric { +func (m *testMetricsProvider) NewLatencyMetric(name string) HistogramMetric { return &m.latency } -func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { +func (m *testMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric { return &m.duration }