mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
add metrics for workqueues
This commit is contained in:
parent
301be4eeb5
commit
b981ea1a70
@ -79,8 +79,8 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||||||
// build the resource quota controller
|
// build the resource quota controller
|
||||||
rq := &ResourceQuotaController{
|
rq := &ResourceQuotaController{
|
||||||
kubeClient: options.KubeClient,
|
kubeClient: options.KubeClient,
|
||||||
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller_resourcequota_primary"),
|
||||||
missingUsageQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller_resourcequota_priority"),
|
||||||
resyncPeriod: options.ResyncPeriod,
|
resyncPeriod: options.ResyncPeriod,
|
||||||
registry: options.Registry,
|
registry: options.Registry,
|
||||||
replenishmentControllers: []framework.ControllerInterface{},
|
replenishmentControllers: []framework.ControllerInterface{},
|
||||||
|
@ -34,17 +34,22 @@ type DelayingInterface interface {
|
|||||||
|
|
||||||
// NewDelayingQueue constructs a new workqueue with delayed queuing ability
|
// NewDelayingQueue constructs a new workqueue with delayed queuing ability
|
||||||
func NewDelayingQueue() DelayingInterface {
|
func NewDelayingQueue() DelayingInterface {
|
||||||
return newDelayingQueue(clock.RealClock{})
|
return newDelayingQueue(clock.RealClock{}, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDelayingQueue(clock clock.Clock) DelayingInterface {
|
func NewNamedDelayingQueue(name string) DelayingInterface {
|
||||||
|
return newDelayingQueue(clock.RealClock{}, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
|
||||||
ret := &delayingType{
|
ret := &delayingType{
|
||||||
Interface: New(),
|
Interface: NewNamed(name),
|
||||||
clock: clock,
|
clock: clock,
|
||||||
heartbeat: clock.Tick(maxWait),
|
heartbeat: clock.Tick(maxWait),
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
waitingTimeByEntry: map[t]time.Time{},
|
waitingTimeByEntry: map[t]time.Time{},
|
||||||
waitingForAddCh: make(chan waitFor, 1000),
|
waitingForAddCh: make(chan waitFor, 1000),
|
||||||
|
metrics: newRetryMetrics(name),
|
||||||
}
|
}
|
||||||
|
|
||||||
go ret.waitingLoop()
|
go ret.waitingLoop()
|
||||||
@ -71,6 +76,9 @@ type delayingType struct {
|
|||||||
waitingTimeByEntry map[t]time.Time
|
waitingTimeByEntry map[t]time.Time
|
||||||
// waitingForAddCh is a buffered channel that feeds waitingForAdd
|
// waitingForAddCh is a buffered channel that feeds waitingForAdd
|
||||||
waitingForAddCh chan waitFor
|
waitingForAddCh chan waitFor
|
||||||
|
|
||||||
|
// metrics counts the number of retries
|
||||||
|
metrics retryMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitFor holds the data to add and the time it should be added
|
// waitFor holds the data to add and the time it should be added
|
||||||
@ -92,6 +100,8 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
q.metrics.retry()
|
||||||
|
|
||||||
// immediately add things with no delay
|
// immediately add things with no delay
|
||||||
if duration <= 0 {
|
if duration <= 0 {
|
||||||
q.Add(item)
|
q.Add(item)
|
||||||
|
@ -28,7 +28,7 @@ import (
|
|||||||
|
|
||||||
func TestSimpleQueue(t *testing.T) {
|
func TestSimpleQueue(t *testing.T) {
|
||||||
fakeClock := clock.NewFakeClock(time.Now())
|
fakeClock := clock.NewFakeClock(time.Now())
|
||||||
q := newDelayingQueue(fakeClock)
|
q := newDelayingQueue(fakeClock, "")
|
||||||
|
|
||||||
first := "foo"
|
first := "foo"
|
||||||
|
|
||||||
@ -70,7 +70,7 @@ func TestSimpleQueue(t *testing.T) {
|
|||||||
|
|
||||||
func TestDeduping(t *testing.T) {
|
func TestDeduping(t *testing.T) {
|
||||||
fakeClock := clock.NewFakeClock(time.Now())
|
fakeClock := clock.NewFakeClock(time.Now())
|
||||||
q := newDelayingQueue(fakeClock)
|
q := newDelayingQueue(fakeClock, "")
|
||||||
|
|
||||||
first := "foo"
|
first := "foo"
|
||||||
|
|
||||||
@ -129,7 +129,7 @@ func TestDeduping(t *testing.T) {
|
|||||||
|
|
||||||
func TestAddTwoFireEarly(t *testing.T) {
|
func TestAddTwoFireEarly(t *testing.T) {
|
||||||
fakeClock := clock.NewFakeClock(time.Now())
|
fakeClock := clock.NewFakeClock(time.Now())
|
||||||
q := newDelayingQueue(fakeClock)
|
q := newDelayingQueue(fakeClock, "")
|
||||||
|
|
||||||
first := "foo"
|
first := "foo"
|
||||||
second := "bar"
|
second := "bar"
|
||||||
@ -179,7 +179,7 @@ func TestAddTwoFireEarly(t *testing.T) {
|
|||||||
|
|
||||||
func TestCopyShifting(t *testing.T) {
|
func TestCopyShifting(t *testing.T) {
|
||||||
fakeClock := clock.NewFakeClock(time.Now())
|
fakeClock := clock.NewFakeClock(time.Now())
|
||||||
q := newDelayingQueue(fakeClock)
|
q := newDelayingQueue(fakeClock, "")
|
||||||
|
|
||||||
first := "foo"
|
first := "foo"
|
||||||
second := "bar"
|
second := "bar"
|
||||||
|
153
pkg/util/workqueue/metrics.go
Normal file
153
pkg/util/workqueue/metrics.go
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package workqueue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type queueMetrics interface {
|
||||||
|
add(item t)
|
||||||
|
get(item t)
|
||||||
|
done(item t)
|
||||||
|
}
|
||||||
|
|
||||||
|
type defaultQueueMetrics struct {
|
||||||
|
depth prometheus.Gauge
|
||||||
|
adds prometheus.Counter
|
||||||
|
latency prometheus.Summary
|
||||||
|
workDuration prometheus.Summary
|
||||||
|
addTimes map[t]time.Time
|
||||||
|
processingStartTimes map[t]time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func newQueueMetrics(name string) queueMetrics {
|
||||||
|
var ret *defaultQueueMetrics
|
||||||
|
if len(name) == 0 {
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = &defaultQueueMetrics{
|
||||||
|
depth: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Subsystem: name,
|
||||||
|
Name: "depth",
|
||||||
|
Help: "Current depth of workqueue: " + name,
|
||||||
|
}),
|
||||||
|
adds: prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Subsystem: name,
|
||||||
|
Name: "adds",
|
||||||
|
Help: "Total number of adds handled by workqueue: " + name,
|
||||||
|
}),
|
||||||
|
latency: prometheus.NewSummary(prometheus.SummaryOpts{
|
||||||
|
Subsystem: name,
|
||||||
|
Name: "queue_latency",
|
||||||
|
Help: "How long an item stays in workqueue" + name + " before being requested.",
|
||||||
|
}),
|
||||||
|
workDuration: prometheus.NewSummary(prometheus.SummaryOpts{
|
||||||
|
Subsystem: name,
|
||||||
|
Name: "work_duration",
|
||||||
|
Help: "How long processing an item from workqueue" + name + " takes.",
|
||||||
|
}),
|
||||||
|
addTimes: map[t]time.Time{},
|
||||||
|
processingStartTimes: map[t]time.Time{},
|
||||||
|
}
|
||||||
|
|
||||||
|
prometheus.Register(ret.depth)
|
||||||
|
prometheus.Register(ret.adds)
|
||||||
|
prometheus.Register(ret.latency)
|
||||||
|
prometheus.Register(ret.workDuration)
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultQueueMetrics) add(item t) {
|
||||||
|
if m == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.adds.Inc()
|
||||||
|
m.depth.Inc()
|
||||||
|
if _, exists := m.addTimes[item]; !exists {
|
||||||
|
m.addTimes[item] = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultQueueMetrics) get(item t) {
|
||||||
|
if m == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.depth.Dec()
|
||||||
|
m.processingStartTimes[item] = time.Now()
|
||||||
|
if startTime, exists := m.addTimes[item]; exists {
|
||||||
|
m.latency.Observe(sinceInMicroseconds(startTime))
|
||||||
|
delete(m.addTimes, item)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultQueueMetrics) done(item t) {
|
||||||
|
if m == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if startTime, exists := m.processingStartTimes[item]; exists {
|
||||||
|
m.workDuration.Observe(sinceInMicroseconds(startTime))
|
||||||
|
delete(m.processingStartTimes, item)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gets the time since the specified start in microseconds.
|
||||||
|
func sinceInMicroseconds(start time.Time) float64 {
|
||||||
|
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
type retryMetrics interface {
|
||||||
|
retry()
|
||||||
|
}
|
||||||
|
|
||||||
|
type defaultRetryMetrics struct {
|
||||||
|
retries prometheus.Counter
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRetryMetrics(name string) retryMetrics {
|
||||||
|
var ret *defaultRetryMetrics
|
||||||
|
if len(name) == 0 {
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = &defaultRetryMetrics{
|
||||||
|
retries: prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Subsystem: name,
|
||||||
|
Name: "retries",
|
||||||
|
Help: "Total number of retries handled by workqueue: " + name,
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
prometheus.Register(ret.retries)
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultRetryMetrics) retry() {
|
||||||
|
if m == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.retries.Inc()
|
||||||
|
}
|
@ -31,10 +31,15 @@ type Interface interface {
|
|||||||
|
|
||||||
// New constructs a new workqueue (see the package comment).
|
// New constructs a new workqueue (see the package comment).
|
||||||
func New() *Type {
|
func New() *Type {
|
||||||
|
return NewNamed("")
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNamed(name string) *Type {
|
||||||
return &Type{
|
return &Type{
|
||||||
dirty: set{},
|
dirty: set{},
|
||||||
processing: set{},
|
processing: set{},
|
||||||
cond: sync.NewCond(&sync.Mutex{}),
|
cond: sync.NewCond(&sync.Mutex{}),
|
||||||
|
metrics: newQueueMetrics(name),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,6 +62,8 @@ type Type struct {
|
|||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
|
|
||||||
shuttingDown bool
|
shuttingDown bool
|
||||||
|
|
||||||
|
metrics queueMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
type empty struct{}
|
type empty struct{}
|
||||||
@ -86,10 +93,14 @@ func (q *Type) Add(item interface{}) {
|
|||||||
if q.dirty.has(item) {
|
if q.dirty.has(item) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
q.metrics.add(item)
|
||||||
|
|
||||||
q.dirty.insert(item)
|
q.dirty.insert(item)
|
||||||
if q.processing.has(item) {
|
if q.processing.has(item) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
q.queue = append(q.queue, item)
|
q.queue = append(q.queue, item)
|
||||||
q.cond.Signal()
|
q.cond.Signal()
|
||||||
}
|
}
|
||||||
@ -116,9 +127,14 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
|
|||||||
// We must be shutting down.
|
// We must be shutting down.
|
||||||
return nil, true
|
return nil, true
|
||||||
}
|
}
|
||||||
|
|
||||||
item, q.queue = q.queue[0], q.queue[1:]
|
item, q.queue = q.queue[0], q.queue[1:]
|
||||||
|
|
||||||
|
q.metrics.get(item)
|
||||||
|
|
||||||
q.processing.insert(item)
|
q.processing.insert(item)
|
||||||
q.dirty.delete(item)
|
q.dirty.delete(item)
|
||||||
|
|
||||||
return item, false
|
return item, false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,6 +144,9 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
|
|||||||
func (q *Type) Done(item interface{}) {
|
func (q *Type) Done(item interface{}) {
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
|
|
||||||
|
q.metrics.done(item)
|
||||||
|
|
||||||
q.processing.delete(item)
|
q.processing.delete(item)
|
||||||
if q.dirty.has(item) {
|
if q.dirty.has(item) {
|
||||||
q.queue = append(q.queue, item)
|
q.queue = append(q.queue, item)
|
||||||
|
@ -40,6 +40,13 @@ func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
|
||||||
|
return &rateLimitingType{
|
||||||
|
DelayingInterface: NewNamedDelayingQueue(name),
|
||||||
|
rateLimiter: rateLimiter,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
|
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
|
||||||
type rateLimitingType struct {
|
type rateLimitingType struct {
|
||||||
DelayingInterface
|
DelayingInterface
|
||||||
|
@ -33,6 +33,7 @@ func TestRateLimitingQueue(t *testing.T) {
|
|||||||
heartbeat: fakeClock.Tick(maxWait),
|
heartbeat: fakeClock.Tick(maxWait),
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
waitingForAddCh: make(chan waitFor, 1000),
|
waitingForAddCh: make(chan waitFor, 1000),
|
||||||
|
metrics: newRetryMetrics(""),
|
||||||
}
|
}
|
||||||
queue.DelayingInterface = delayingQueue
|
queue.DelayingInterface = delayingQueue
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user