Test workqueue metrics

Kubernetes-commit: 5a8444ceec9e28e8a7dbf36bfd7cb55554c5b865
This commit is contained in:
Daniel Smith 2018-11-09 16:12:11 -08:00 committed by Kubernetes Publisher
parent 75d4dad922
commit 26f9385b8e
3 changed files with 241 additions and 31 deletions

View File

@ -19,6 +19,8 @@ package workqueue
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
// This file provides abstractions for setting the provider (e.g., prometheus)
@ -63,6 +65,8 @@ func (noopMetric) Set(float64) {}
func (noopMetric) Observe(float64) {}
type defaultQueueMetrics struct {
clock clock.Clock
// current depth of a workqueue
depth GaugeMetric
// total number of adds handled by a workqueue
@ -86,7 +90,7 @@ func (m *defaultQueueMetrics) add(item t) {
m.adds.Inc()
m.depth.Inc()
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = time.Now()
m.addTimes[item] = m.clock.Now()
}
}
@ -96,9 +100,9 @@ func (m *defaultQueueMetrics) get(item t) {
}
m.depth.Dec()
m.processingStartTimes[item] = time.Now()
m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(sinceInMicroseconds(startTime))
m.latency.Observe(m.sinceInMicroseconds(startTime))
delete(m.addTimes, item)
}
}
@ -109,17 +113,15 @@ func (m *defaultQueueMetrics) done(item t) {
}
if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(sinceInMicroseconds(startTime))
m.workDuration.Observe(m.sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item)
}
}
func (m *defaultQueueMetrics) updateUnfinishedWork() {
var total float64
if m.processingStartTimes != nil {
for _, t := range m.processingStartTimes {
total += sinceInMicroseconds(t)
}
for _, t := range m.processingStartTimes {
total += m.sinceInMicroseconds(t)
}
m.unfinishedWorkMicroseconds.Set(total)
}
@ -132,8 +134,8 @@ func (noMetrics) done(item t) {}
func (noMetrics) updateUnfinishedWork() {}
// Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 {
return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
type retryMetrics interface {
@ -188,19 +190,28 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
var metricsFactory = struct {
metricsProvider MetricsProvider
setProviders sync.Once
}{
var globalMetricsFactory = metricsFactory{
metricsProvider: noopMetricsProvider{},
}
func newQueueMetrics(name string) queueMetrics {
mp := metricsFactory.metricsProvider
type metricsFactory struct {
metricsProvider MetricsProvider
setProviders sync.Once
}
func (f *metricsFactory) set(mp MetricsProvider) {
f.setProviders.Do(func() {
f.metricsProvider = mp
})
}
func (f *metricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
mp := f.metricsProvider
if len(name) == 0 || mp == (noopMetricsProvider{}) {
return noMetrics{}
}
return &defaultQueueMetrics{
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
@ -217,13 +228,12 @@ func newRetryMetrics(name string) retryMetrics {
return ret
}
return &defaultRetryMetrics{
retries: metricsFactory.metricsProvider.NewRetriesMetric(name),
retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
}
}
// SetProvider sets the metrics provider of the metricsFactory.
// SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) {
metricsFactory.setProviders.Do(func() {
metricsFactory.metricsProvider = metricsProvider
})
globalMetricsFactory.set(metricsProvider)
}

View File

@ -19,6 +19,8 @@ package workqueue
import (
"testing"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
type testMetrics struct {
@ -32,18 +34,211 @@ func (m *testMetrics) get(item t) { m.gotten++ }
func (m *testMetrics) done(item t) { m.finished++ }
func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} }
func TestMetrics(t *testing.T) {
func TestMetricShutdown(t *testing.T) {
ch := make(chan struct{})
m := &testMetrics{
updateCalled: ch,
}
q := newQueue("test", m, time.Millisecond)
c := clock.NewFakeClock(time.Now())
q := newQueue(c, m, time.Millisecond)
for !c.HasWaiters() {
// Wait for the go routine to call NewTicker()
time.Sleep(time.Millisecond)
}
c.Step(time.Millisecond)
<-ch
q.ShutDown()
c.Step(time.Hour)
select {
case <-time.After(time.Second):
default:
return
case <-ch:
t.Errorf("Unexpected update after shutdown was called.")
}
}
type testMetric struct {
inc int64
dec int64
set float64
observedValue float64
observedCount int
notifyCh chan<- struct{}
}
func (m *testMetric) Inc() { m.inc++; m.notify() }
func (m *testMetric) Dec() { m.dec++; m.notify() }
func (m *testMetric) Set(f float64) { m.set = f; m.notify() }
func (m *testMetric) Observe(f float64) { m.observedValue = f; m.observedCount++; m.notify() }
func (m *testMetric) gaugeValue() float64 {
if m.set != 0 {
return m.set
}
return float64(m.inc - m.dec)
}
func (m *testMetric) notify() {
if m.notifyCh != nil {
m.notifyCh <- struct{}{}
}
}
type testMetricsProvider struct {
depth testMetric
adds testMetric
latency testMetric
duration testMetric
unfinished testMetric
retries testMetric
}
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
return &m.depth
}
func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric {
return &m.adds
}
func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
return &m.latency
}
func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return &m.duration
}
func (m *testMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric {
return &m.unfinished
}
func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return &m.retries
}
func TestSinceInMicroseconds(t *testing.T) {
mp := testMetricsProvider{}
c := clock.NewFakeClock(time.Now())
mf := metricsFactory{metricsProvider: &mp}
m := mf.newQueueMetrics("test", c)
dqm := m.(*defaultQueueMetrics)
for _, i := range []int{1, 50, 100, 500, 1000, 10000, 100000, 1000000} {
n := c.Now()
c.Step(time.Duration(i) * time.Microsecond)
if e, a := float64(i), dqm.sinceInMicroseconds(n); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
}
func TestMetrics(t *testing.T) {
mp := testMetricsProvider{}
t0 := time.Unix(0, 0)
c := clock.NewFakeClock(t0)
mf := metricsFactory{metricsProvider: &mp}
m := mf.newQueueMetrics("test", c)
q := newQueue(c, m, time.Millisecond)
defer q.ShutDown()
for !c.HasWaiters() {
// Wait for the go routine to call NewTicker()
time.Sleep(time.Millisecond)
}
q.Add("foo")
if e, a := 1.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(50 * time.Microsecond)
// Start processing
i, _ := q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 50.0, mp.latency.observedValue; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.latency.observedCount; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Add it back while processing; multiple adds of the same item are
// de-duped.
q.Add(i)
q.Add(i)
q.Add(i)
q.Add(i)
q.Add(i)
if e, a := 2.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(25 * time.Microsecond)
// Finish it up
q.Done(i)
if e, a := 25.0, mp.duration.observedValue; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.duration.observedCount; e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// It should be back on the queue
i, _ = q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 25.0, mp.latency.observedValue; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.latency.observedCount; e != a {
t.Errorf("expected %v, got %v", e, a)
}
// use a channel to ensure we don't look at the metric before it's
// been set.
ch := make(chan struct{}, 1)
mp.unfinished.notifyCh = ch
c.Step(time.Millisecond)
<-ch
mp.unfinished.notifyCh = nil
if e, a := 1000.0, mp.unfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Finish that one up
q.Done(i)
if e, a := 1000.0, mp.duration.observedValue; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.duration.observedCount; e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -19,6 +19,8 @@ package workqueue
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
type Interface interface {
@ -36,22 +38,24 @@ func New() *Type {
}
func NewNamed(name string) *Type {
rc := clock.RealClock{}
return newQueue(
name,
newQueueMetrics(name),
rc,
globalMetricsFactory.newQueueMetrics(name, rc),
defaultUnfinishedWorkUpdatePeriod,
)
}
func newQueue(name string, metrics queueMetrics, updatePeriod time.Duration) *Type {
func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := &Type{
clock: c,
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod,
}
go t.updateUnfinishedWorkLook()
go t.updateUnfinishedWorkLoop()
return t
}
@ -80,6 +84,7 @@ type Type struct {
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
type empty struct{}
@ -187,10 +192,10 @@ func (q *Type) ShuttingDown() bool {
return q.shuttingDown
}
func (q *Type) updateUnfinishedWorkLook() {
t := time.NewTicker(q.unfinishedWorkUpdatePeriod)
func (q *Type) updateUnfinishedWorkLoop() {
t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop()
for range t.C {
for range t.C() {
if !func() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()