From 75d4dad9228f1c110d5eb541d5f7257178d47371 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 9 Nov 2018 10:43:44 -0800 Subject: [PATCH] add a metric that can be used to notice stuck worker threads Kubernetes-commit: 6195d1005d81eaa5dd49da744f5beab178340f5a --- util/workqueue/metrics.go | 52 ++++++++++++++++++++++++++++------ util/workqueue/metrics_test.go | 49 ++++++++++++++++++++++++++++++++ util/workqueue/queue.go | 47 ++++++++++++++++++++++++++---- 3 files changed, 133 insertions(+), 15 deletions(-) create mode 100644 util/workqueue/metrics_test.go diff --git a/util/workqueue/metrics.go b/util/workqueue/metrics.go index a481bdfb..a55ff5c2 100644 --- a/util/workqueue/metrics.go +++ b/util/workqueue/metrics.go @@ -28,6 +28,7 @@ type queueMetrics interface { add(item t) get(item t) done(item t) + updateUnfinishedWork() } // GaugeMetric represents a single numerical value that can arbitrarily go up @@ -37,6 +38,12 @@ type GaugeMetric interface { Dec() } +// SettableGaugeMetric represents a single numerical value that can arbitrarily go up +// and down. (Separate from GaugeMetric to preserve backwards compatibility.) +type SettableGaugeMetric interface { + Set(float64) +} + // CounterMetric represents a single numerical value that only ever // goes up. type CounterMetric interface { @@ -52,6 +59,7 @@ type noopMetric struct{} func (noopMetric) Inc() {} func (noopMetric) Dec() {} +func (noopMetric) Set(float64) {} func (noopMetric) Observe(float64) {} type defaultQueueMetrics struct { @@ -65,6 +73,9 @@ type defaultQueueMetrics struct { workDuration SummaryMetric addTimes map[t]time.Time processingStartTimes map[t]time.Time + + // how long have current threads been working? + unfinishedWorkMicroseconds SettableGaugeMetric } func (m *defaultQueueMetrics) add(item t) { @@ -103,6 +114,23 @@ func (m *defaultQueueMetrics) done(item t) { } } +func (m *defaultQueueMetrics) updateUnfinishedWork() { + var total float64 + if m.processingStartTimes != nil { + for _, t := range m.processingStartTimes { + total += sinceInMicroseconds(t) + } + } + m.unfinishedWorkMicroseconds.Set(total) +} + +type noMetrics struct{} + +func (noMetrics) add(item t) {} +func (noMetrics) get(item t) {} +func (noMetrics) done(item t) {} +func (noMetrics) updateUnfinishedWork() {} + // Gets the time since the specified start in microseconds. func sinceInMicroseconds(start time.Time) float64 { return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) @@ -130,6 +158,7 @@ type MetricsProvider interface { NewAddsMetric(name string) CounterMetric NewLatencyMetric(name string) SummaryMetric NewWorkDurationMetric(name string) SummaryMetric + NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric } @@ -151,6 +180,10 @@ func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { return noopMetric{} } +func (_ noopMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric { + return noopMetric{} +} + func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } @@ -163,17 +196,18 @@ var metricsFactory = struct { } func newQueueMetrics(name string) queueMetrics { - var ret *defaultQueueMetrics - if len(name) == 0 { - return ret + mp := metricsFactory.metricsProvider + if len(name) == 0 || mp == (noopMetricsProvider{}) { + return noMetrics{} } return &defaultQueueMetrics{ - depth: metricsFactory.metricsProvider.NewDepthMetric(name), - adds: metricsFactory.metricsProvider.NewAddsMetric(name), - latency: metricsFactory.metricsProvider.NewLatencyMetric(name), - workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name), - addTimes: map[t]time.Time{}, - processingStartTimes: map[t]time.Time{}, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkMicroseconds: mp.NewUnfinishedWorkMicrosecondsMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, } } diff --git a/util/workqueue/metrics_test.go b/util/workqueue/metrics_test.go new file mode 100644 index 00000000..e8576cf9 --- /dev/null +++ b/util/workqueue/metrics_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "testing" + "time" +) + +type testMetrics struct { + added, gotten, finished int64 + + updateCalled chan<- struct{} +} + +func (m *testMetrics) add(item t) { m.added++ } +func (m *testMetrics) get(item t) { m.gotten++ } +func (m *testMetrics) done(item t) { m.finished++ } +func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} } + +func TestMetrics(t *testing.T) { + ch := make(chan struct{}) + m := &testMetrics{ + updateCalled: ch, + } + q := newQueue("test", m, time.Millisecond) + <-ch + q.ShutDown() + select { + case <-time.After(time.Second): + return + case <-ch: + t.Errorf("Unexpected update after shutdown was called.") + } +} diff --git a/util/workqueue/queue.go b/util/workqueue/queue.go index dc9a7cc7..66118cd0 100644 --- a/util/workqueue/queue.go +++ b/util/workqueue/queue.go @@ -18,6 +18,7 @@ package workqueue import ( "sync" + "time" ) type Interface interface { @@ -35,14 +36,27 @@ func New() *Type { } func NewNamed(name string) *Type { - return &Type{ - dirty: set{}, - processing: set{}, - cond: sync.NewCond(&sync.Mutex{}), - metrics: newQueueMetrics(name), - } + return newQueue( + name, + newQueueMetrics(name), + defaultUnfinishedWorkUpdatePeriod, + ) } +func newQueue(name string, metrics queueMetrics, updatePeriod time.Duration) *Type { + t := &Type{ + dirty: set{}, + processing: set{}, + cond: sync.NewCond(&sync.Mutex{}), + metrics: metrics, + unfinishedWorkUpdatePeriod: updatePeriod, + } + go t.updateUnfinishedWorkLook() + return t +} + +const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond + // Type is a work queue (see the package comment). type Type struct { // queue defines the order in which we will work on items. Every @@ -64,6 +78,8 @@ type Type struct { shuttingDown bool metrics queueMetrics + + unfinishedWorkUpdatePeriod time.Duration } type empty struct{} @@ -170,3 +186,22 @@ func (q *Type) ShuttingDown() bool { return q.shuttingDown } + +func (q *Type) updateUnfinishedWorkLook() { + t := time.NewTicker(q.unfinishedWorkUpdatePeriod) + defer t.Stop() + for range t.C { + if !func() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if !q.shuttingDown { + q.metrics.updateUnfinishedWork() + return true + } + return false + + }() { + return + } + } +}