Merge pull request #30296 from deads2k/wq-metrics

Automatic merge from submit-queue

add metrics for workqueues

Adds prometheus metrics to work queues and enables them for the resourcequota controller.  It would be easy to add this to all other workqueue based controllers and gather basic responsiveness metrics.

@kubernetes/rh-cluster-infra helps debug quota controller responsiveness problems.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.kubernetes.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.kubernetes.io/reviews/kubernetes/kubernetes/30296)
<!-- Reviewable:end -->
This commit is contained in:
Kubernetes Submit Queue 2016-08-10 11:13:13 -07:00 committed by GitHub
commit a40d2cd92e
7 changed files with 199 additions and 9 deletions

View File

@ -79,8 +79,8 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
// build the resource quota controller
rq := &ResourceQuotaController{
kubeClient: options.KubeClient,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
missingUsageQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller_resourcequota_primary"),
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller_resourcequota_priority"),
resyncPeriod: options.ResyncPeriod,
registry: options.Registry,
replenishmentControllers: []framework.ControllerInterface{},

View File

@ -34,17 +34,22 @@ type DelayingInterface interface {
// NewDelayingQueue constructs a new workqueue with delayed queuing ability
func NewDelayingQueue() DelayingInterface {
return newDelayingQueue(clock.RealClock{})
return newDelayingQueue(clock.RealClock{}, "")
}
func newDelayingQueue(clock clock.Clock) DelayingInterface {
func NewNamedDelayingQueue(name string) DelayingInterface {
return newDelayingQueue(clock.RealClock{}, name)
}
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: New(),
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingTimeByEntry: map[t]time.Time{},
waitingForAddCh: make(chan waitFor, 1000),
metrics: newRetryMetrics(name),
}
go ret.waitingLoop()
@ -71,6 +76,9 @@ type delayingType struct {
waitingTimeByEntry map[t]time.Time
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan waitFor
// metrics counts the number of retries
metrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
@ -92,6 +100,8 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
return
}
q.metrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)

View File

@ -28,7 +28,7 @@ import (
func TestSimpleQueue(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
q := newDelayingQueue(fakeClock, "")
first := "foo"
@ -70,7 +70,7 @@ func TestSimpleQueue(t *testing.T) {
func TestDeduping(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
q := newDelayingQueue(fakeClock, "")
first := "foo"
@ -129,7 +129,7 @@ func TestDeduping(t *testing.T) {
func TestAddTwoFireEarly(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
q := newDelayingQueue(fakeClock, "")
first := "foo"
second := "bar"
@ -179,7 +179,7 @@ func TestAddTwoFireEarly(t *testing.T) {
func TestCopyShifting(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
q := newDelayingQueue(fakeClock, "")
first := "foo"
second := "bar"

View File

@ -0,0 +1,153 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
type queueMetrics interface {
add(item t)
get(item t)
done(item t)
}
type defaultQueueMetrics struct {
depth prometheus.Gauge
adds prometheus.Counter
latency prometheus.Summary
workDuration prometheus.Summary
addTimes map[t]time.Time
processingStartTimes map[t]time.Time
}
func newQueueMetrics(name string) queueMetrics {
var ret *defaultQueueMetrics
if len(name) == 0 {
return ret
}
ret = &defaultQueueMetrics{
depth: prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "depth",
Help: "Current depth of workqueue: " + name,
}),
adds: prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Name: "adds",
Help: "Total number of adds handled by workqueue: " + name,
}),
latency: prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name,
Name: "queue_latency",
Help: "How long an item stays in workqueue" + name + " before being requested.",
}),
workDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name,
Name: "work_duration",
Help: "How long processing an item from workqueue" + name + " takes.",
}),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
}
prometheus.Register(ret.depth)
prometheus.Register(ret.adds)
prometheus.Register(ret.latency)
prometheus.Register(ret.workDuration)
return ret
}
func (m *defaultQueueMetrics) add(item t) {
if m == nil {
return
}
m.adds.Inc()
m.depth.Inc()
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = time.Now()
}
}
func (m *defaultQueueMetrics) get(item t) {
if m == nil {
return
}
m.depth.Dec()
m.processingStartTimes[item] = time.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(sinceInMicroseconds(startTime))
delete(m.addTimes, item)
}
}
func (m *defaultQueueMetrics) done(item t) {
if m == nil {
return
}
if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item)
}
}
// Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
type retryMetrics interface {
retry()
}
type defaultRetryMetrics struct {
retries prometheus.Counter
}
func newRetryMetrics(name string) retryMetrics {
var ret *defaultRetryMetrics
if len(name) == 0 {
return ret
}
ret = &defaultRetryMetrics{
retries: prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Name: "retries",
Help: "Total number of retries handled by workqueue: " + name,
}),
}
prometheus.Register(ret.retries)
return ret
}
func (m *defaultRetryMetrics) retry() {
if m == nil {
return
}
m.retries.Inc()
}

View File

@ -31,10 +31,15 @@ type Interface interface {
// New constructs a new workqueue (see the package comment).
func New() *Type {
return NewNamed("")
}
func NewNamed(name string) *Type {
return &Type{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: newQueueMetrics(name),
}
}
@ -57,6 +62,8 @@ type Type struct {
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
}
type empty struct{}
@ -86,10 +93,14 @@ func (q *Type) Add(item interface{}) {
if q.dirty.has(item) {
return
}
q.metrics.add(item)
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
@ -116,9 +127,14 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
@ -128,6 +144,9 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)

View File

@ -40,6 +40,13 @@ func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
}
}
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: NewNamedDelayingQueue(name),
rateLimiter: rateLimiter,
}
}
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
DelayingInterface

View File

@ -33,6 +33,7 @@ func TestRateLimitingQueue(t *testing.T) {
heartbeat: fakeClock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan waitFor, 1000),
metrics: newRetryMetrics(""),
}
queue.DelayingInterface = delayingQueue