diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 68d03ad2d15..e958e922d29 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -79,8 +79,8 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour // build the resource quota controller rq := &ResourceQuotaController{ kubeClient: options.KubeClient, - queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - missingUsageQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller_resourcequota_primary"), + missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller_resourcequota_priority"), resyncPeriod: options.ResyncPeriod, registry: options.Registry, replenishmentControllers: []framework.ControllerInterface{}, diff --git a/pkg/util/workqueue/delaying_queue.go b/pkg/util/workqueue/delaying_queue.go index a9aae96aa59..5a71b8181a1 100644 --- a/pkg/util/workqueue/delaying_queue.go +++ b/pkg/util/workqueue/delaying_queue.go @@ -34,17 +34,22 @@ type DelayingInterface interface { // NewDelayingQueue constructs a new workqueue with delayed queuing ability func NewDelayingQueue() DelayingInterface { - return newDelayingQueue(clock.RealClock{}) + return newDelayingQueue(clock.RealClock{}, "") } -func newDelayingQueue(clock clock.Clock) DelayingInterface { +func NewNamedDelayingQueue(name string) DelayingInterface { + return newDelayingQueue(clock.RealClock{}, name) +} + +func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { ret := &delayingType{ - Interface: New(), + Interface: NewNamed(name), clock: clock, heartbeat: clock.Tick(maxWait), stopCh: make(chan struct{}), waitingTimeByEntry: map[t]time.Time{}, waitingForAddCh: make(chan waitFor, 1000), + metrics: newRetryMetrics(name), } go ret.waitingLoop() @@ -71,6 +76,9 @@ type delayingType struct { waitingTimeByEntry map[t]time.Time // waitingForAddCh is a buffered channel that feeds waitingForAdd waitingForAddCh chan waitFor + + // metrics counts the number of retries + metrics retryMetrics } // waitFor holds the data to add and the time it should be added @@ -92,6 +100,8 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { return } + q.metrics.retry() + // immediately add things with no delay if duration <= 0 { q.Add(item) diff --git a/pkg/util/workqueue/delaying_queue_test.go b/pkg/util/workqueue/delaying_queue_test.go index aa1cc336f9e..15cb93fc5e7 100644 --- a/pkg/util/workqueue/delaying_queue_test.go +++ b/pkg/util/workqueue/delaying_queue_test.go @@ -28,7 +28,7 @@ import ( func TestSimpleQueue(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - q := newDelayingQueue(fakeClock) + q := newDelayingQueue(fakeClock, "") first := "foo" @@ -70,7 +70,7 @@ func TestSimpleQueue(t *testing.T) { func TestDeduping(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - q := newDelayingQueue(fakeClock) + q := newDelayingQueue(fakeClock, "") first := "foo" @@ -129,7 +129,7 @@ func TestDeduping(t *testing.T) { func TestAddTwoFireEarly(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - q := newDelayingQueue(fakeClock) + q := newDelayingQueue(fakeClock, "") first := "foo" second := "bar" @@ -179,7 +179,7 @@ func TestAddTwoFireEarly(t *testing.T) { func TestCopyShifting(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - q := newDelayingQueue(fakeClock) + q := newDelayingQueue(fakeClock, "") first := "foo" second := "bar" diff --git a/pkg/util/workqueue/metrics.go b/pkg/util/workqueue/metrics.go new file mode 100644 index 00000000000..8a37d2e701d --- /dev/null +++ b/pkg/util/workqueue/metrics.go @@ -0,0 +1,153 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type queueMetrics interface { + add(item t) + get(item t) + done(item t) +} + +type defaultQueueMetrics struct { + depth prometheus.Gauge + adds prometheus.Counter + latency prometheus.Summary + workDuration prometheus.Summary + addTimes map[t]time.Time + processingStartTimes map[t]time.Time +} + +func newQueueMetrics(name string) queueMetrics { + var ret *defaultQueueMetrics + if len(name) == 0 { + return ret + } + + ret = &defaultQueueMetrics{ + depth: prometheus.NewGauge(prometheus.GaugeOpts{ + Subsystem: name, + Name: "depth", + Help: "Current depth of workqueue: " + name, + }), + adds: prometheus.NewCounter(prometheus.CounterOpts{ + Subsystem: name, + Name: "adds", + Help: "Total number of adds handled by workqueue: " + name, + }), + latency: prometheus.NewSummary(prometheus.SummaryOpts{ + Subsystem: name, + Name: "queue_latency", + Help: "How long an item stays in workqueue" + name + " before being requested.", + }), + workDuration: prometheus.NewSummary(prometheus.SummaryOpts{ + Subsystem: name, + Name: "work_duration", + Help: "How long processing an item from workqueue" + name + " takes.", + }), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, + } + + prometheus.Register(ret.depth) + prometheus.Register(ret.adds) + prometheus.Register(ret.latency) + prometheus.Register(ret.workDuration) + + return ret +} + +func (m *defaultQueueMetrics) add(item t) { + if m == nil { + return + } + + m.adds.Inc() + m.depth.Inc() + if _, exists := m.addTimes[item]; !exists { + m.addTimes[item] = time.Now() + } +} + +func (m *defaultQueueMetrics) get(item t) { + if m == nil { + return + } + + m.depth.Dec() + m.processingStartTimes[item] = time.Now() + if startTime, exists := m.addTimes[item]; exists { + m.latency.Observe(sinceInMicroseconds(startTime)) + delete(m.addTimes, item) + } +} + +func (m *defaultQueueMetrics) done(item t) { + if m == nil { + return + } + + if startTime, exists := m.processingStartTimes[item]; exists { + m.workDuration.Observe(sinceInMicroseconds(startTime)) + delete(m.processingStartTimes, item) + } +} + +// Gets the time since the specified start in microseconds. +func sinceInMicroseconds(start time.Time) float64 { + return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) +} + +type retryMetrics interface { + retry() +} + +type defaultRetryMetrics struct { + retries prometheus.Counter +} + +func newRetryMetrics(name string) retryMetrics { + var ret *defaultRetryMetrics + if len(name) == 0 { + return ret + } + + ret = &defaultRetryMetrics{ + retries: prometheus.NewCounter(prometheus.CounterOpts{ + Subsystem: name, + Name: "retries", + Help: "Total number of retries handled by workqueue: " + name, + }), + } + + prometheus.Register(ret.retries) + + return ret +} + +func (m *defaultRetryMetrics) retry() { + if m == nil { + return + } + + m.retries.Inc() +} diff --git a/pkg/util/workqueue/queue.go b/pkg/util/workqueue/queue.go index 63fdd81910c..9a2ecad382d 100644 --- a/pkg/util/workqueue/queue.go +++ b/pkg/util/workqueue/queue.go @@ -31,10 +31,15 @@ type Interface interface { // New constructs a new workqueue (see the package comment). func New() *Type { + return NewNamed("") +} + +func NewNamed(name string) *Type { return &Type{ dirty: set{}, processing: set{}, cond: sync.NewCond(&sync.Mutex{}), + metrics: newQueueMetrics(name), } } @@ -57,6 +62,8 @@ type Type struct { cond *sync.Cond shuttingDown bool + + metrics queueMetrics } type empty struct{} @@ -86,10 +93,14 @@ func (q *Type) Add(item interface{}) { if q.dirty.has(item) { return } + + q.metrics.add(item) + q.dirty.insert(item) if q.processing.has(item) { return } + q.queue = append(q.queue, item) q.cond.Signal() } @@ -116,9 +127,14 @@ func (q *Type) Get() (item interface{}, shutdown bool) { // We must be shutting down. return nil, true } + item, q.queue = q.queue[0], q.queue[1:] + + q.metrics.get(item) + q.processing.insert(item) q.dirty.delete(item) + return item, false } @@ -128,6 +144,9 @@ func (q *Type) Get() (item interface{}, shutdown bool) { func (q *Type) Done(item interface{}) { q.cond.L.Lock() defer q.cond.L.Unlock() + + q.metrics.done(item) + q.processing.delete(item) if q.dirty.has(item) { q.queue = append(q.queue, item) diff --git a/pkg/util/workqueue/rate_limitting_queue.go b/pkg/util/workqueue/rate_limitting_queue.go index a4f86eb10ae..9a2bfbb56a7 100644 --- a/pkg/util/workqueue/rate_limitting_queue.go +++ b/pkg/util/workqueue/rate_limitting_queue.go @@ -40,6 +40,13 @@ func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface { } } +func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface { + return &rateLimitingType{ + DelayingInterface: NewNamedDelayingQueue(name), + rateLimiter: rateLimiter, + } +} + // rateLimitingType wraps an Interface and provides rateLimited re-enquing type rateLimitingType struct { DelayingInterface diff --git a/pkg/util/workqueue/rate_limitting_queue_test.go b/pkg/util/workqueue/rate_limitting_queue_test.go index 0be1dc2e9ad..5e3db0d162e 100644 --- a/pkg/util/workqueue/rate_limitting_queue_test.go +++ b/pkg/util/workqueue/rate_limitting_queue_test.go @@ -33,6 +33,7 @@ func TestRateLimitingQueue(t *testing.T) { heartbeat: fakeClock.Tick(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan waitFor, 1000), + metrics: newRetryMetrics(""), } queue.DelayingInterface = delayingQueue