mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
decouple workqueue metrics from prometheus
This commit is contained in:
parent
a1b1a1a728
commit
feb0d1daa8
@ -30,6 +30,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/healthz"
|
"k8s.io/kubernetes/pkg/healthz"
|
||||||
"k8s.io/kubernetes/pkg/util/flag"
|
"k8s.io/kubernetes/pkg/util/flag"
|
||||||
"k8s.io/kubernetes/pkg/util/logs"
|
"k8s.io/kubernetes/pkg/util/logs"
|
||||||
|
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration
|
||||||
_ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration
|
_ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration
|
||||||
"k8s.io/kubernetes/pkg/version/verflag"
|
"k8s.io/kubernetes/pkg/version/verflag"
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/healthz"
|
"k8s.io/kubernetes/pkg/healthz"
|
||||||
"k8s.io/kubernetes/pkg/util/flag"
|
"k8s.io/kubernetes/pkg/util/flag"
|
||||||
"k8s.io/kubernetes/pkg/util/logs"
|
"k8s.io/kubernetes/pkg/util/logs"
|
||||||
|
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration
|
||||||
"k8s.io/kubernetes/pkg/version/verflag"
|
"k8s.io/kubernetes/pkg/version/verflag"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,65 +17,56 @@ limitations under the License.
|
|||||||
package workqueue
|
package workqueue
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// This file provides abstractions for setting the provider (e.g., prometheus)
|
||||||
|
// of metrics.
|
||||||
|
|
||||||
type queueMetrics interface {
|
type queueMetrics interface {
|
||||||
add(item t)
|
add(item t)
|
||||||
get(item t)
|
get(item t)
|
||||||
done(item t)
|
done(item t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GaugeMetric represents a single numerical value that can arbitrarily go up
|
||||||
|
// and down.
|
||||||
|
type GaugeMetric interface {
|
||||||
|
Inc()
|
||||||
|
Dec()
|
||||||
|
}
|
||||||
|
|
||||||
|
// CounterMetric represents a single numerical value that only ever
|
||||||
|
// goes up.
|
||||||
|
type CounterMetric interface {
|
||||||
|
Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SummaryMetric captures individual observations.
|
||||||
|
type SummaryMetric interface {
|
||||||
|
Observe(float64)
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopMetric struct{}
|
||||||
|
|
||||||
|
func (noopMetric) Inc() {}
|
||||||
|
func (noopMetric) Dec() {}
|
||||||
|
func (noopMetric) Observe(float64) {}
|
||||||
|
|
||||||
type defaultQueueMetrics struct {
|
type defaultQueueMetrics struct {
|
||||||
depth prometheus.Gauge
|
// current depth of a workqueue
|
||||||
adds prometheus.Counter
|
depth GaugeMetric
|
||||||
latency prometheus.Summary
|
// total number of adds handled by a workqueue
|
||||||
workDuration prometheus.Summary
|
adds CounterMetric
|
||||||
|
// how long an item stays in a workqueue
|
||||||
|
latency SummaryMetric
|
||||||
|
// how long processing an item from a workqueue takes
|
||||||
|
workDuration SummaryMetric
|
||||||
addTimes map[t]time.Time
|
addTimes map[t]time.Time
|
||||||
processingStartTimes map[t]time.Time
|
processingStartTimes map[t]time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func newQueueMetrics(name string) queueMetrics {
|
|
||||||
var ret *defaultQueueMetrics
|
|
||||||
if len(name) == 0 {
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = &defaultQueueMetrics{
|
|
||||||
depth: prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "depth",
|
|
||||||
Help: "Current depth of workqueue: " + name,
|
|
||||||
}),
|
|
||||||
adds: prometheus.NewCounter(prometheus.CounterOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "adds",
|
|
||||||
Help: "Total number of adds handled by workqueue: " + name,
|
|
||||||
}),
|
|
||||||
latency: prometheus.NewSummary(prometheus.SummaryOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "queue_latency",
|
|
||||||
Help: "How long an item stays in workqueue" + name + " before being requested.",
|
|
||||||
}),
|
|
||||||
workDuration: prometheus.NewSummary(prometheus.SummaryOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "work_duration",
|
|
||||||
Help: "How long processing an item from workqueue" + name + " takes.",
|
|
||||||
}),
|
|
||||||
addTimes: map[t]time.Time{},
|
|
||||||
processingStartTimes: map[t]time.Time{},
|
|
||||||
}
|
|
||||||
|
|
||||||
prometheus.Register(ret.depth)
|
|
||||||
prometheus.Register(ret.adds)
|
|
||||||
prometheus.Register(ret.latency)
|
|
||||||
prometheus.Register(ret.workDuration)
|
|
||||||
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *defaultQueueMetrics) add(item t) {
|
func (m *defaultQueueMetrics) add(item t) {
|
||||||
if m == nil {
|
if m == nil {
|
||||||
return
|
return
|
||||||
@ -122,26 +113,7 @@ type retryMetrics interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type defaultRetryMetrics struct {
|
type defaultRetryMetrics struct {
|
||||||
retries prometheus.Counter
|
retries CounterMetric
|
||||||
}
|
|
||||||
|
|
||||||
func newRetryMetrics(name string) retryMetrics {
|
|
||||||
var ret *defaultRetryMetrics
|
|
||||||
if len(name) == 0 {
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = &defaultRetryMetrics{
|
|
||||||
retries: prometheus.NewCounter(prometheus.CounterOpts{
|
|
||||||
Subsystem: name,
|
|
||||||
Name: "retries",
|
|
||||||
Help: "Total number of retries handled by workqueue: " + name,
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
|
|
||||||
prometheus.Register(ret.retries)
|
|
||||||
|
|
||||||
return ret
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *defaultRetryMetrics) retry() {
|
func (m *defaultRetryMetrics) retry() {
|
||||||
@ -151,3 +123,73 @@ func (m *defaultRetryMetrics) retry() {
|
|||||||
|
|
||||||
m.retries.Inc()
|
m.retries.Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MetricsProvider generates various metrics used by the queue.
|
||||||
|
type MetricsProvider interface {
|
||||||
|
NewDepthMetric(name string) GaugeMetric
|
||||||
|
NewAddsMetric(name string) CounterMetric
|
||||||
|
NewLatencyMetric(name string) SummaryMetric
|
||||||
|
NewWorkDurationMetric(name string) SummaryMetric
|
||||||
|
NewRetriesMetric(name string) CounterMetric
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopMetricsProvider struct{}
|
||||||
|
|
||||||
|
func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
|
||||||
|
return noopMetric{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
|
||||||
|
return noopMetric{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_ noopMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
|
||||||
|
return noopMetric{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
|
||||||
|
return noopMetric{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
|
||||||
|
return noopMetric{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var metricsFactory = struct {
|
||||||
|
metricsProvider MetricsProvider
|
||||||
|
setProviders sync.Once
|
||||||
|
}{
|
||||||
|
metricsProvider: noopMetricsProvider{},
|
||||||
|
}
|
||||||
|
|
||||||
|
func newQueueMetrics(name string) queueMetrics {
|
||||||
|
var ret *defaultQueueMetrics
|
||||||
|
if len(name) == 0 {
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
return &defaultQueueMetrics{
|
||||||
|
depth: metricsFactory.metricsProvider.NewDepthMetric(name),
|
||||||
|
adds: metricsFactory.metricsProvider.NewAddsMetric(name),
|
||||||
|
latency: metricsFactory.metricsProvider.NewLatencyMetric(name),
|
||||||
|
workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name),
|
||||||
|
addTimes: map[t]time.Time{},
|
||||||
|
processingStartTimes: map[t]time.Time{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRetryMetrics(name string) retryMetrics {
|
||||||
|
var ret *defaultRetryMetrics
|
||||||
|
if len(name) == 0 {
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
return &defaultRetryMetrics{
|
||||||
|
retries: metricsFactory.metricsProvider.NewRetriesMetric(name),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetProvider sets the metrics provider of the metricsFactory.
|
||||||
|
func SetProvider(metricsProvider MetricsProvider) {
|
||||||
|
metricsFactory.setProviders.Do(func() {
|
||||||
|
metricsFactory.metricsProvider = metricsProvider
|
||||||
|
})
|
||||||
|
}
|
||||||
|
82
pkg/util/workqueue/prometheus/prometheus.go
Normal file
82
pkg/util/workqueue/prometheus/prometheus.go
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package prometheus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Package prometheus sets the workqueue DefaultMetricsFactory to produce
|
||||||
|
// prometheus metrics. To use this package, you just have to import it.
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
workqueue.SetProvider(prometheusMetricsProvider{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type prometheusMetricsProvider struct{}
|
||||||
|
|
||||||
|
func (_ prometheusMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
|
||||||
|
depth := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Subsystem: name,
|
||||||
|
Name: "depth",
|
||||||
|
Help: "Current depth of workqueue: " + name,
|
||||||
|
})
|
||||||
|
prometheus.Register(depth)
|
||||||
|
return depth
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_ prometheusMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
|
||||||
|
adds := prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Subsystem: name,
|
||||||
|
Name: "adds",
|
||||||
|
Help: "Total number of adds handled by workqueue: " + name,
|
||||||
|
})
|
||||||
|
prometheus.Register(adds)
|
||||||
|
return adds
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_ prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.SummaryMetric {
|
||||||
|
latency := prometheus.NewSummary(prometheus.SummaryOpts{
|
||||||
|
Subsystem: name,
|
||||||
|
Name: "queue_latency",
|
||||||
|
Help: "How long an item stays in workqueue" + name + " before being requested.",
|
||||||
|
})
|
||||||
|
prometheus.Register(latency)
|
||||||
|
return latency
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_ prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.SummaryMetric {
|
||||||
|
workDuration := prometheus.NewSummary(prometheus.SummaryOpts{
|
||||||
|
Subsystem: name,
|
||||||
|
Name: "work_duration",
|
||||||
|
Help: "How long processing an item from workqueue" + name + " takes.",
|
||||||
|
})
|
||||||
|
prometheus.Register(workDuration)
|
||||||
|
return workDuration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_ prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
|
||||||
|
retries := prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Subsystem: name,
|
||||||
|
Name: "retries",
|
||||||
|
Help: "Total number of retries handled by workqueue: " + name,
|
||||||
|
})
|
||||||
|
prometheus.Register(retries)
|
||||||
|
return retries
|
||||||
|
}
|
@ -33,6 +33,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||||
|
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration
|
||||||
)
|
)
|
||||||
|
|
||||||
// Evaluator is used to see if quota constraints are satisfied.
|
// Evaluator is used to see if quota constraints are satisfied.
|
||||||
|
Loading…
Reference in New Issue
Block a user