Test workqueue metrics

This commit is contained in:
Daniel Smith 2018-11-09 16:12:11 -08:00
parent 6195d1005d
commit 5a8444ceec
3 changed files with 241 additions and 31 deletions

View File

@ -19,6 +19,8 @@ package workqueue
import ( import (
"sync" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/util/clock"
) )
// This file provides abstractions for setting the provider (e.g., prometheus) // This file provides abstractions for setting the provider (e.g., prometheus)
@ -63,6 +65,8 @@ func (noopMetric) Set(float64) {}
func (noopMetric) Observe(float64) {} func (noopMetric) Observe(float64) {}
type defaultQueueMetrics struct { type defaultQueueMetrics struct {
clock clock.Clock
// current depth of a workqueue // current depth of a workqueue
depth GaugeMetric depth GaugeMetric
// total number of adds handled by a workqueue // total number of adds handled by a workqueue
@ -86,7 +90,7 @@ func (m *defaultQueueMetrics) add(item t) {
m.adds.Inc() m.adds.Inc()
m.depth.Inc() m.depth.Inc()
if _, exists := m.addTimes[item]; !exists { if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = time.Now() m.addTimes[item] = m.clock.Now()
} }
} }
@ -96,9 +100,9 @@ func (m *defaultQueueMetrics) get(item t) {
} }
m.depth.Dec() m.depth.Dec()
m.processingStartTimes[item] = time.Now() m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists { if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(sinceInMicroseconds(startTime)) m.latency.Observe(m.sinceInMicroseconds(startTime))
delete(m.addTimes, item) delete(m.addTimes, item)
} }
} }
@ -109,17 +113,15 @@ func (m *defaultQueueMetrics) done(item t) {
} }
if startTime, exists := m.processingStartTimes[item]; exists { if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(sinceInMicroseconds(startTime)) m.workDuration.Observe(m.sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item) delete(m.processingStartTimes, item)
} }
} }
func (m *defaultQueueMetrics) updateUnfinishedWork() { func (m *defaultQueueMetrics) updateUnfinishedWork() {
var total float64 var total float64
if m.processingStartTimes != nil { for _, t := range m.processingStartTimes {
for _, t := range m.processingStartTimes { total += m.sinceInMicroseconds(t)
total += sinceInMicroseconds(t)
}
} }
m.unfinishedWorkMicroseconds.Set(total) m.unfinishedWorkMicroseconds.Set(total)
} }
@ -132,8 +134,8 @@ func (noMetrics) done(item t) {}
func (noMetrics) updateUnfinishedWork() {} func (noMetrics) updateUnfinishedWork() {}
// Gets the time since the specified start in microseconds. // Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 { func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
} }
type retryMetrics interface { type retryMetrics interface {
@ -188,19 +190,28 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{} return noopMetric{}
} }
var metricsFactory = struct { var globalMetricsFactory = metricsFactory{
metricsProvider MetricsProvider
setProviders sync.Once
}{
metricsProvider: noopMetricsProvider{}, metricsProvider: noopMetricsProvider{},
} }
func newQueueMetrics(name string) queueMetrics { type metricsFactory struct {
mp := metricsFactory.metricsProvider metricsProvider MetricsProvider
setProviders sync.Once
}
func (f *metricsFactory) set(mp MetricsProvider) {
f.setProviders.Do(func() {
f.metricsProvider = mp
})
}
func (f *metricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
mp := f.metricsProvider
if len(name) == 0 || mp == (noopMetricsProvider{}) { if len(name) == 0 || mp == (noopMetricsProvider{}) {
return noMetrics{} return noMetrics{}
} }
return &defaultQueueMetrics{ return &defaultQueueMetrics{
clock: clock,
depth: mp.NewDepthMetric(name), depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name), adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name), latency: mp.NewLatencyMetric(name),
@ -217,13 +228,12 @@ func newRetryMetrics(name string) retryMetrics {
return ret return ret
} }
return &defaultRetryMetrics{ return &defaultRetryMetrics{
retries: metricsFactory.metricsProvider.NewRetriesMetric(name), retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
} }
} }
// SetProvider sets the metrics provider of the metricsFactory. // SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) { func SetProvider(metricsProvider MetricsProvider) {
metricsFactory.setProviders.Do(func() { globalMetricsFactory.set(metricsProvider)
metricsFactory.metricsProvider = metricsProvider
})
} }

View File

@ -19,6 +19,8 @@ package workqueue
import ( import (
"testing" "testing"
"time" "time"
"k8s.io/apimachinery/pkg/util/clock"
) )
type testMetrics struct { type testMetrics struct {
@ -32,18 +34,211 @@ func (m *testMetrics) get(item t) { m.gotten++ }
func (m *testMetrics) done(item t) { m.finished++ } func (m *testMetrics) done(item t) { m.finished++ }
func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} } func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} }
func TestMetrics(t *testing.T) { func TestMetricShutdown(t *testing.T) {
ch := make(chan struct{}) ch := make(chan struct{})
m := &testMetrics{ m := &testMetrics{
updateCalled: ch, updateCalled: ch,
} }
q := newQueue("test", m, time.Millisecond) c := clock.NewFakeClock(time.Now())
q := newQueue(c, m, time.Millisecond)
for !c.HasWaiters() {
// Wait for the go routine to call NewTicker()
time.Sleep(time.Millisecond)
}
c.Step(time.Millisecond)
<-ch <-ch
q.ShutDown() q.ShutDown()
c.Step(time.Hour)
select { select {
case <-time.After(time.Second): default:
return return
case <-ch: case <-ch:
t.Errorf("Unexpected update after shutdown was called.") t.Errorf("Unexpected update after shutdown was called.")
} }
} }
type testMetric struct {
inc int64
dec int64
set float64
observedValue float64
observedCount int
notifyCh chan<- struct{}
}
func (m *testMetric) Inc() { m.inc++; m.notify() }
func (m *testMetric) Dec() { m.dec++; m.notify() }
func (m *testMetric) Set(f float64) { m.set = f; m.notify() }
func (m *testMetric) Observe(f float64) { m.observedValue = f; m.observedCount++; m.notify() }
func (m *testMetric) gaugeValue() float64 {
if m.set != 0 {
return m.set
}
return float64(m.inc - m.dec)
}
func (m *testMetric) notify() {
if m.notifyCh != nil {
m.notifyCh <- struct{}{}
}
}
type testMetricsProvider struct {
depth testMetric
adds testMetric
latency testMetric
duration testMetric
unfinished testMetric
retries testMetric
}
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
return &m.depth
}
func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric {
return &m.adds
}
func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
return &m.latency
}
func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return &m.duration
}
func (m *testMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric {
return &m.unfinished
}
func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return &m.retries
}
func TestSinceInMicroseconds(t *testing.T) {
mp := testMetricsProvider{}
c := clock.NewFakeClock(time.Now())
mf := metricsFactory{metricsProvider: &mp}
m := mf.newQueueMetrics("test", c)
dqm := m.(*defaultQueueMetrics)
for _, i := range []int{1, 50, 100, 500, 1000, 10000, 100000, 1000000} {
n := c.Now()
c.Step(time.Duration(i) * time.Microsecond)
if e, a := float64(i), dqm.sinceInMicroseconds(n); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
}
func TestMetrics(t *testing.T) {
mp := testMetricsProvider{}
t0 := time.Unix(0, 0)
c := clock.NewFakeClock(t0)
mf := metricsFactory{metricsProvider: &mp}
m := mf.newQueueMetrics("test", c)
q := newQueue(c, m, time.Millisecond)
defer q.ShutDown()
for !c.HasWaiters() {
// Wait for the go routine to call NewTicker()
time.Sleep(time.Millisecond)
}
q.Add("foo")
if e, a := 1.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(50 * time.Microsecond)
// Start processing
i, _ := q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 50.0, mp.latency.observedValue; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.latency.observedCount; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Add it back while processing; multiple adds of the same item are
// de-duped.
q.Add(i)
q.Add(i)
q.Add(i)
q.Add(i)
q.Add(i)
if e, a := 2.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(25 * time.Microsecond)
// Finish it up
q.Done(i)
if e, a := 25.0, mp.duration.observedValue; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.duration.observedCount; e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// It should be back on the queue
i, _ = q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 25.0, mp.latency.observedValue; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.latency.observedCount; e != a {
t.Errorf("expected %v, got %v", e, a)
}
// use a channel to ensure we don't look at the metric before it's
// been set.
ch := make(chan struct{}, 1)
mp.unfinished.notifyCh = ch
c.Step(time.Millisecond)
<-ch
mp.unfinished.notifyCh = nil
if e, a := 1000.0, mp.unfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Finish that one up
q.Done(i)
if e, a := 1000.0, mp.duration.observedValue; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.duration.observedCount; e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -19,6 +19,8 @@ package workqueue
import ( import (
"sync" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/util/clock"
) )
type Interface interface { type Interface interface {
@ -36,22 +38,24 @@ func New() *Type {
} }
func NewNamed(name string) *Type { func NewNamed(name string) *Type {
rc := clock.RealClock{}
return newQueue( return newQueue(
name, rc,
newQueueMetrics(name), globalMetricsFactory.newQueueMetrics(name, rc),
defaultUnfinishedWorkUpdatePeriod, defaultUnfinishedWorkUpdatePeriod,
) )
} }
func newQueue(name string, metrics queueMetrics, updatePeriod time.Duration) *Type { func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := &Type{ t := &Type{
clock: c,
dirty: set{}, dirty: set{},
processing: set{}, processing: set{},
cond: sync.NewCond(&sync.Mutex{}), cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics, metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod, unfinishedWorkUpdatePeriod: updatePeriod,
} }
go t.updateUnfinishedWorkLook() go t.updateUnfinishedWorkLoop()
return t return t
} }
@ -80,6 +84,7 @@ type Type struct {
metrics queueMetrics metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
} }
type empty struct{} type empty struct{}
@ -187,10 +192,10 @@ func (q *Type) ShuttingDown() bool {
return q.shuttingDown return q.shuttingDown
} }
func (q *Type) updateUnfinishedWorkLook() { func (q *Type) updateUnfinishedWorkLoop() {
t := time.NewTicker(q.unfinishedWorkUpdatePeriod) t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop() defer t.Stop()
for range t.C { for range t.C() {
if !func() bool { if !func() bool {
q.cond.L.Lock() q.cond.L.Lock()
defer q.cond.L.Unlock() defer q.cond.L.Unlock()