Merge pull request #71300 from danielqsj/71165

Use prometheus conventions for workqueue metrics

Kubernetes-commit: 7284660483d3db5b6647dcda19aa6365cae3b268
This commit is contained in:
Kubernetes Publisher 2018-12-31 21:18:45 -08:00
commit 1a4f03865c
5 changed files with 262 additions and 93 deletions

104
Godeps/Godeps.json generated
View File

@ -1,7 +1,7 @@
{
"ImportPath": "k8s.io/client-go",
"GoVersion": "go1.11",
"GodepVersion": "v80-k8s-r1",
"GodepVersion": "v80",
"Packages": [
"./..."
],
@ -408,207 +408,207 @@
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/apitesting",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/apitesting/fuzzer",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/apitesting/roundtrip",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/equality",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/errors",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/meta",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/resource",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/fuzzer",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/internalversion",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/conversion",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/conversion/queryparams",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/fields",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/labels",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/schema",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/json",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/protobuf",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/recognizer",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/streaming",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/versioning",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/selection",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/types",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/cache",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/clock",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/diff",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/errors",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/framer",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/httpstream",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/httpstream/spdy",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/intstr",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/json",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/mergepatch",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/naming",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/net",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/remotecommand",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/runtime",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/sets",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/strategicpatch",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/validation",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/validation/field",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/wait",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/yaml",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/version",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/watch",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/third_party/forked/golang/json",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/third_party/forked/golang/netutil",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/apimachinery/third_party/forked/golang/reflect",
"Rev": "173ce66c1e39d1d0f56e0b3347ff2988068aecd0"
"Rev": "9c4c366543346abeca2a5cd2c40cf1a30d19a2ec"
},
{
"ImportPath": "k8s.io/klog",

View File

@ -43,12 +43,13 @@ func NewNamedDelayingQueue(name string) DelayingInterface {
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
deprecatedMetrics: newDeprecatedRetryMetrics(name),
}
go ret.waitingLoop()
@ -73,7 +74,8 @@ type delayingType struct {
waitingForAddCh chan *waitFor
// metrics counts the number of retries
metrics retryMetrics
metrics retryMetrics
deprecatedMetrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
@ -146,6 +148,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
}
q.metrics.retry()
q.deprecatedMetrics.retry()
// immediately add things with no delay
if duration <= 0 {

View File

@ -57,6 +57,11 @@ type SummaryMetric interface {
Observe(float64)
}
// HistogramMetric counts individual observations.
type HistogramMetric interface {
Observe(float64)
}
type noopMetric struct{}
func (noopMetric) Inc() {}
@ -73,15 +78,23 @@ type defaultQueueMetrics struct {
// total number of adds handled by a workqueue
adds CounterMetric
// how long an item stays in a workqueue
latency SummaryMetric
latency HistogramMetric
// how long processing an item from a workqueue takes
workDuration SummaryMetric
workDuration HistogramMetric
addTimes map[t]time.Time
processingStartTimes map[t]time.Time
// how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric
// TODO(danielqsj): Remove the following metrics, they are deprecated
deprecatedDepth GaugeMetric
deprecatedAdds CounterMetric
deprecatedLatency SummaryMetric
deprecatedWorkDuration SummaryMetric
deprecatedUnfinishedWorkSeconds SettableGaugeMetric
deprecatedLongestRunningProcessor SettableGaugeMetric
}
func (m *defaultQueueMetrics) add(item t) {
@ -90,7 +103,9 @@ func (m *defaultQueueMetrics) add(item t) {
}
m.adds.Inc()
m.deprecatedAdds.Inc()
m.depth.Inc()
m.deprecatedDepth.Inc()
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = m.clock.Now()
}
@ -102,9 +117,11 @@ func (m *defaultQueueMetrics) get(item t) {
}
m.depth.Dec()
m.deprecatedDepth.Dec()
m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(m.sinceInMicroseconds(startTime))
m.latency.Observe(m.sinceInSeconds(startTime))
m.deprecatedLatency.Observe(m.sinceInMicroseconds(startTime))
delete(m.addTimes, item)
}
}
@ -115,7 +132,8 @@ func (m *defaultQueueMetrics) done(item t) {
}
if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(m.sinceInMicroseconds(startTime))
m.workDuration.Observe(m.sinceInSeconds(startTime))
m.deprecatedWorkDuration.Observe(m.sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item)
}
}
@ -135,7 +153,9 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() {
// Convert to seconds; microseconds is unhelpfully granular for this.
total /= 1000000
m.unfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest) // in microseconds.
m.deprecatedUnfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest / 1000000)
m.deprecatedLongestRunningProcessor.Set(oldest) // in microseconds.
}
type noMetrics struct{}
@ -150,6 +170,11 @@ func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 {
return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
// Gets the time since the specified start in seconds.
func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 {
return m.clock.Since(start).Seconds()
}
type retryMetrics interface {
retry()
}
@ -170,11 +195,18 @@ func (m *defaultRetryMetrics) retry() {
type MetricsProvider interface {
NewDepthMetric(name string) GaugeMetric
NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) SummaryMetric
NewWorkDurationMetric(name string) SummaryMetric
NewLatencyMetric(name string) HistogramMetric
NewWorkDurationMetric(name string) HistogramMetric
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric
NewDeprecatedDepthMetric(name string) GaugeMetric
NewDeprecatedAddsMetric(name string) CounterMetric
NewDeprecatedLatencyMetric(name string) SummaryMetric
NewDeprecatedWorkDurationMetric(name string) SummaryMetric
NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewDeprecatedRetriesMetric(name string) CounterMetric
}
type noopMetricsProvider struct{}
@ -187,11 +219,11 @@ func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
func (_ noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
func (_ noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
return noopMetric{}
}
@ -199,7 +231,7 @@ func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settabl
return noopMetric{}
}
func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
func (_ noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
@ -207,6 +239,34 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
var globalMetricsFactory = queueMetricsFactory{
metricsProvider: noopMetricsProvider{},
}
@ -229,15 +289,21 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
return noMetrics{}
}
return &defaultQueueMetrics{
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
deprecatedDepth: mp.NewDeprecatedDepthMetric(name),
deprecatedAdds: mp.NewDeprecatedAddsMetric(name),
deprecatedLatency: mp.NewDeprecatedLatencyMetric(name),
deprecatedWorkDuration: mp.NewDeprecatedWorkDurationMetric(name),
deprecatedUnfinishedWorkSeconds: mp.NewDeprecatedUnfinishedWorkSecondsMetric(name),
deprecatedLongestRunningProcessor: mp.NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
}
}
@ -251,6 +317,16 @@ func newRetryMetrics(name string) retryMetrics {
}
}
func newDeprecatedRetryMetrics(name string) retryMetrics {
var ret *defaultRetryMetrics
if len(name) == 0 {
return ret
}
return &defaultRetryMetrics{
retries: globalMetricsFactory.metricsProvider.NewDeprecatedRetriesMetric(name),
}
}
// SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) {

View File

@ -137,6 +137,14 @@ type testMetricsProvider struct {
unfinished testMetric
longest testMetric
retries testMetric
// deprecated metrics
deprecatedDepth testMetric
deprecatedAdds testMetric
deprecatedLatency testMetric
deprecatedDuration testMetric
deprecatedUnfinished testMetric
deprecatedLongest testMetric
deprecatedRetries testMetric
}
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
@ -147,11 +155,11 @@ func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric {
return &m.adds
}
func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
func (m *testMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
return &m.latency
}
func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
func (m *testMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
return &m.duration
}
@ -159,7 +167,7 @@ func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settab
return &m.unfinished
}
func (m *testMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
func (m *testMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
return &m.longest
}
@ -167,6 +175,34 @@ func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return &m.retries
}
func (m *testMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
return &m.deprecatedDepth
}
func (m *testMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
return &m.deprecatedAdds
}
func (m *testMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
return &m.deprecatedLatency
}
func (m *testMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
return &m.deprecatedDuration
}
func (m *testMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return &m.deprecatedUnfinished
}
func (m *testMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return &m.deprecatedLongest
}
func (m *testMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
return &m.deprecatedRetries
}
func TestSinceInMicroseconds(t *testing.T) {
mp := testMetricsProvider{}
c := clock.NewFakeClock(time.Now())
@ -201,10 +237,18 @@ func TestMetrics(t *testing.T) {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.deprecatedAdds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(50 * time.Microsecond)
// Start processing
@ -213,15 +257,24 @@ func TestMetrics(t *testing.T) {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 50.0, mp.latency.observationValue(); e != a {
if e, a := 5e-05, mp.latency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 50.0, mp.deprecatedLatency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.deprecatedLatency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Add it back while processing; multiple adds of the same item are
// de-duped.
@ -233,27 +286,42 @@ func TestMetrics(t *testing.T) {
if e, a := 2.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2.0, mp.deprecatedAdds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(25 * time.Microsecond)
// Finish it up
q.Done(i)
if e, a := 25.0, mp.duration.observationValue(); e != a {
if e, a := 2.5e-05, mp.duration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 25.0, mp.deprecatedDuration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.deprecatedDuration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// It should be back on the queue
i, _ = q.Get()
@ -261,33 +329,54 @@ func TestMetrics(t *testing.T) {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 25.0, mp.latency.observationValue(); e != a {
if e, a := 2.5e-05, mp.latency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 25.0, mp.deprecatedLatency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.deprecatedLatency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// use a channel to ensure we don't look at the metric before it's
// been set.
ch := make(chan struct{}, 1)
mp.unfinished.notifyCh = ch
mp.deprecatedUnfinished.notifyCh = ch
c.Step(time.Millisecond)
<-ch
<-ch
mp.unfinished.notifyCh = nil
mp.deprecatedUnfinished.notifyCh = nil
if e, a := .001, mp.unfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1000.0, mp.longest.gaugeValue(); e != a {
if e, a := .001, mp.deprecatedUnfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := .001, mp.longest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1000.0, mp.deprecatedLongest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Finish that one up
q.Done(i)
if e, a := 1000.0, mp.duration.observationValue(); e != a {
if e, a := .001, mp.duration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1000.0, mp.deprecatedDuration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.deprecatedDuration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -28,12 +28,13 @@ func TestRateLimitingQueue(t *testing.T) {
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
fakeClock := clock.NewFakeClock(time.Now())
delayingQueue := &delayingType{
Interface: New(),
clock: fakeClock,
heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),
Interface: New(),
clock: fakeClock,
heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),
deprecatedMetrics: newDeprecatedRetryMetrics(""),
}
queue.DelayingInterface = delayingQueue