add a metric that can be used to notice stuck worker threads

Kubernetes-commit: 6195d1005d81eaa5dd49da744f5beab178340f5a
This commit is contained in:
Daniel Smith 2018-11-09 10:43:44 -08:00 committed by Kubernetes Publisher
parent b5f2e1aa4f
commit 75d4dad922
3 changed files with 133 additions and 15 deletions

View File

@ -28,6 +28,7 @@ type queueMetrics interface {
add(item t)
get(item t)
done(item t)
updateUnfinishedWork()
}
// GaugeMetric represents a single numerical value that can arbitrarily go up
@ -37,6 +38,12 @@ type GaugeMetric interface {
Dec()
}
// SettableGaugeMetric represents a single numerical value that can arbitrarily go up
// and down. (Separate from GaugeMetric to preserve backwards compatibility.)
type SettableGaugeMetric interface {
Set(float64)
}
// CounterMetric represents a single numerical value that only ever
// goes up.
type CounterMetric interface {
@ -52,6 +59,7 @@ type noopMetric struct{}
func (noopMetric) Inc() {}
func (noopMetric) Dec() {}
func (noopMetric) Set(float64) {}
func (noopMetric) Observe(float64) {}
type defaultQueueMetrics struct {
@ -65,6 +73,9 @@ type defaultQueueMetrics struct {
workDuration SummaryMetric
addTimes map[t]time.Time
processingStartTimes map[t]time.Time
// how long have current threads been working?
unfinishedWorkMicroseconds SettableGaugeMetric
}
func (m *defaultQueueMetrics) add(item t) {
@ -103,6 +114,23 @@ func (m *defaultQueueMetrics) done(item t) {
}
}
func (m *defaultQueueMetrics) updateUnfinishedWork() {
var total float64
if m.processingStartTimes != nil {
for _, t := range m.processingStartTimes {
total += sinceInMicroseconds(t)
}
}
m.unfinishedWorkMicroseconds.Set(total)
}
type noMetrics struct{}
func (noMetrics) add(item t) {}
func (noMetrics) get(item t) {}
func (noMetrics) done(item t) {}
func (noMetrics) updateUnfinishedWork() {}
// Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
@ -130,6 +158,7 @@ type MetricsProvider interface {
NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) SummaryMetric
NewWorkDurationMetric(name string) SummaryMetric
NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric
}
@ -151,6 +180,10 @@ func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
@ -163,17 +196,18 @@ var metricsFactory = struct {
}
func newQueueMetrics(name string) queueMetrics {
var ret *defaultQueueMetrics
if len(name) == 0 {
return ret
mp := metricsFactory.metricsProvider
if len(name) == 0 || mp == (noopMetricsProvider{}) {
return noMetrics{}
}
return &defaultQueueMetrics{
depth: metricsFactory.metricsProvider.NewDepthMetric(name),
adds: metricsFactory.metricsProvider.NewAddsMetric(name),
latency: metricsFactory.metricsProvider.NewLatencyMetric(name),
workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkMicroseconds: mp.NewUnfinishedWorkMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
}
}

View File

@ -0,0 +1,49 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"testing"
"time"
)
type testMetrics struct {
added, gotten, finished int64
updateCalled chan<- struct{}
}
func (m *testMetrics) add(item t) { m.added++ }
func (m *testMetrics) get(item t) { m.gotten++ }
func (m *testMetrics) done(item t) { m.finished++ }
func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} }
func TestMetrics(t *testing.T) {
ch := make(chan struct{})
m := &testMetrics{
updateCalled: ch,
}
q := newQueue("test", m, time.Millisecond)
<-ch
q.ShutDown()
select {
case <-time.After(time.Second):
return
case <-ch:
t.Errorf("Unexpected update after shutdown was called.")
}
}

View File

@ -18,6 +18,7 @@ package workqueue
import (
"sync"
"time"
)
type Interface interface {
@ -35,14 +36,27 @@ func New() *Type {
}
func NewNamed(name string) *Type {
return &Type{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: newQueueMetrics(name),
}
return newQueue(
name,
newQueueMetrics(name),
defaultUnfinishedWorkUpdatePeriod,
)
}
func newQueue(name string, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := &Type{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod,
}
go t.updateUnfinishedWorkLook()
return t
}
const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
@ -64,6 +78,8 @@ type Type struct {
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
}
type empty struct{}
@ -170,3 +186,22 @@ func (q *Type) ShuttingDown() bool {
return q.shuttingDown
}
func (q *Type) updateUnfinishedWorkLook() {
t := time.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop()
for range t.C {
if !func() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.shuttingDown {
q.metrics.updateUnfinishedWork()
return true
}
return false
}() {
return
}
}
}