diff --git a/util/workqueue/metrics.go b/util/workqueue/metrics.go index a55ff5c2..af224238 100644 --- a/util/workqueue/metrics.go +++ b/util/workqueue/metrics.go @@ -19,6 +19,8 @@ package workqueue import ( "sync" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) // This file provides abstractions for setting the provider (e.g., prometheus) @@ -63,6 +65,8 @@ func (noopMetric) Set(float64) {} func (noopMetric) Observe(float64) {} type defaultQueueMetrics struct { + clock clock.Clock + // current depth of a workqueue depth GaugeMetric // total number of adds handled by a workqueue @@ -86,7 +90,7 @@ func (m *defaultQueueMetrics) add(item t) { m.adds.Inc() m.depth.Inc() if _, exists := m.addTimes[item]; !exists { - m.addTimes[item] = time.Now() + m.addTimes[item] = m.clock.Now() } } @@ -96,9 +100,9 @@ func (m *defaultQueueMetrics) get(item t) { } m.depth.Dec() - m.processingStartTimes[item] = time.Now() + m.processingStartTimes[item] = m.clock.Now() if startTime, exists := m.addTimes[item]; exists { - m.latency.Observe(sinceInMicroseconds(startTime)) + m.latency.Observe(m.sinceInMicroseconds(startTime)) delete(m.addTimes, item) } } @@ -109,17 +113,15 @@ func (m *defaultQueueMetrics) done(item t) { } if startTime, exists := m.processingStartTimes[item]; exists { - m.workDuration.Observe(sinceInMicroseconds(startTime)) + m.workDuration.Observe(m.sinceInMicroseconds(startTime)) delete(m.processingStartTimes, item) } } func (m *defaultQueueMetrics) updateUnfinishedWork() { var total float64 - if m.processingStartTimes != nil { - for _, t := range m.processingStartTimes { - total += sinceInMicroseconds(t) - } + for _, t := range m.processingStartTimes { + total += m.sinceInMicroseconds(t) } m.unfinishedWorkMicroseconds.Set(total) } @@ -132,8 +134,8 @@ func (noMetrics) done(item t) {} func (noMetrics) updateUnfinishedWork() {} // Gets the time since the specified start in microseconds. -func sinceInMicroseconds(start time.Time) float64 { - return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) +func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 { + return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) } type retryMetrics interface { @@ -188,19 +190,28 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } -var metricsFactory = struct { - metricsProvider MetricsProvider - setProviders sync.Once -}{ +var globalMetricsFactory = metricsFactory{ metricsProvider: noopMetricsProvider{}, } -func newQueueMetrics(name string) queueMetrics { - mp := metricsFactory.metricsProvider +type metricsFactory struct { + metricsProvider MetricsProvider + setProviders sync.Once +} + +func (f *metricsFactory) set(mp MetricsProvider) { + f.setProviders.Do(func() { + f.metricsProvider = mp + }) +} + +func (f *metricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics { + mp := f.metricsProvider if len(name) == 0 || mp == (noopMetricsProvider{}) { return noMetrics{} } return &defaultQueueMetrics{ + clock: clock, depth: mp.NewDepthMetric(name), adds: mp.NewAddsMetric(name), latency: mp.NewLatencyMetric(name), @@ -217,13 +228,12 @@ func newRetryMetrics(name string) retryMetrics { return ret } return &defaultRetryMetrics{ - retries: metricsFactory.metricsProvider.NewRetriesMetric(name), + retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name), } } -// SetProvider sets the metrics provider of the metricsFactory. +// SetProvider sets the metrics provider for all subsequently created work +// queues. Only the first call has an effect. func SetProvider(metricsProvider MetricsProvider) { - metricsFactory.setProviders.Do(func() { - metricsFactory.metricsProvider = metricsProvider - }) + globalMetricsFactory.set(metricsProvider) } diff --git a/util/workqueue/metrics_test.go b/util/workqueue/metrics_test.go index e8576cf9..64305acd 100644 --- a/util/workqueue/metrics_test.go +++ b/util/workqueue/metrics_test.go @@ -19,6 +19,8 @@ package workqueue import ( "testing" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) type testMetrics struct { @@ -32,18 +34,211 @@ func (m *testMetrics) get(item t) { m.gotten++ } func (m *testMetrics) done(item t) { m.finished++ } func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} } -func TestMetrics(t *testing.T) { +func TestMetricShutdown(t *testing.T) { ch := make(chan struct{}) m := &testMetrics{ updateCalled: ch, } - q := newQueue("test", m, time.Millisecond) + c := clock.NewFakeClock(time.Now()) + q := newQueue(c, m, time.Millisecond) + for !c.HasWaiters() { + // Wait for the go routine to call NewTicker() + time.Sleep(time.Millisecond) + } + + c.Step(time.Millisecond) <-ch q.ShutDown() + + c.Step(time.Hour) select { - case <-time.After(time.Second): + default: return case <-ch: t.Errorf("Unexpected update after shutdown was called.") } } + +type testMetric struct { + inc int64 + dec int64 + set float64 + + observedValue float64 + observedCount int + + notifyCh chan<- struct{} +} + +func (m *testMetric) Inc() { m.inc++; m.notify() } +func (m *testMetric) Dec() { m.dec++; m.notify() } +func (m *testMetric) Set(f float64) { m.set = f; m.notify() } +func (m *testMetric) Observe(f float64) { m.observedValue = f; m.observedCount++; m.notify() } + +func (m *testMetric) gaugeValue() float64 { + if m.set != 0 { + return m.set + } + return float64(m.inc - m.dec) +} + +func (m *testMetric) notify() { + if m.notifyCh != nil { + m.notifyCh <- struct{}{} + } +} + +type testMetricsProvider struct { + depth testMetric + adds testMetric + latency testMetric + duration testMetric + unfinished testMetric + retries testMetric +} + +func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric { + return &m.depth +} + +func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric { + return &m.adds +} + +func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric { + return &m.latency +} + +func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { + return &m.duration +} + +func (m *testMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric { + return &m.unfinished +} + +func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { + return &m.retries +} + +func TestSinceInMicroseconds(t *testing.T) { + mp := testMetricsProvider{} + c := clock.NewFakeClock(time.Now()) + mf := metricsFactory{metricsProvider: &mp} + m := mf.newQueueMetrics("test", c) + dqm := m.(*defaultQueueMetrics) + + for _, i := range []int{1, 50, 100, 500, 1000, 10000, 100000, 1000000} { + n := c.Now() + c.Step(time.Duration(i) * time.Microsecond) + if e, a := float64(i), dqm.sinceInMicroseconds(n); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} + +func TestMetrics(t *testing.T) { + mp := testMetricsProvider{} + t0 := time.Unix(0, 0) + c := clock.NewFakeClock(t0) + mf := metricsFactory{metricsProvider: &mp} + m := mf.newQueueMetrics("test", c) + q := newQueue(c, m, time.Millisecond) + defer q.ShutDown() + for !c.HasWaiters() { + // Wait for the go routine to call NewTicker() + time.Sleep(time.Millisecond) + } + + q.Add("foo") + if e, a := 1.0, mp.adds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + c.Step(50 * time.Microsecond) + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + if e, a := 50.0, mp.latency.observedValue; e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.latency.observedCount; e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 0.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // Add it back while processing; multiple adds of the same item are + // de-duped. + q.Add(i) + q.Add(i) + q.Add(i) + q.Add(i) + q.Add(i) + if e, a := 2.0, mp.adds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + // One thing remains in the queue + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + c.Step(25 * time.Microsecond) + + // Finish it up + q.Done(i) + + if e, a := 25.0, mp.duration.observedValue; e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.duration.observedCount; e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // One thing remains in the queue + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // It should be back on the queue + i, _ = q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + if e, a := 25.0, mp.latency.observedValue; e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.latency.observedCount; e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // use a channel to ensure we don't look at the metric before it's + // been set. + ch := make(chan struct{}, 1) + mp.unfinished.notifyCh = ch + c.Step(time.Millisecond) + <-ch + mp.unfinished.notifyCh = nil + if e, a := 1000.0, mp.unfinished.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // Finish that one up + q.Done(i) + if e, a := 1000.0, mp.duration.observedValue; e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.duration.observedCount; e != a { + t.Errorf("expected %v, got %v", e, a) + } +} diff --git a/util/workqueue/queue.go b/util/workqueue/queue.go index 66118cd0..39009b8e 100644 --- a/util/workqueue/queue.go +++ b/util/workqueue/queue.go @@ -19,6 +19,8 @@ package workqueue import ( "sync" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) type Interface interface { @@ -36,22 +38,24 @@ func New() *Type { } func NewNamed(name string) *Type { + rc := clock.RealClock{} return newQueue( - name, - newQueueMetrics(name), + rc, + globalMetricsFactory.newQueueMetrics(name, rc), defaultUnfinishedWorkUpdatePeriod, ) } -func newQueue(name string, metrics queueMetrics, updatePeriod time.Duration) *Type { +func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type { t := &Type{ + clock: c, dirty: set{}, processing: set{}, cond: sync.NewCond(&sync.Mutex{}), metrics: metrics, unfinishedWorkUpdatePeriod: updatePeriod, } - go t.updateUnfinishedWorkLook() + go t.updateUnfinishedWorkLoop() return t } @@ -80,6 +84,7 @@ type Type struct { metrics queueMetrics unfinishedWorkUpdatePeriod time.Duration + clock clock.Clock } type empty struct{} @@ -187,10 +192,10 @@ func (q *Type) ShuttingDown() bool { return q.shuttingDown } -func (q *Type) updateUnfinishedWorkLook() { - t := time.NewTicker(q.unfinishedWorkUpdatePeriod) +func (q *Type) updateUnfinishedWorkLoop() { + t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod) defer t.Stop() - for range t.C { + for range t.C() { if !func() bool { q.cond.L.Lock() defer q.cond.L.Unlock()