mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 22:17:14 +00:00
158 lines
4.4 KiB
Go
158 lines
4.4 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package metrics
|
|
|
|
import (
|
|
"time"
|
|
|
|
"k8s.io/component-base/metrics"
|
|
)
|
|
|
|
// MetricRecorder represents a metric recorder which takes action when the
|
|
// metric Inc(), Dec() and Clear()
|
|
type MetricRecorder interface {
|
|
Inc()
|
|
Dec()
|
|
Clear()
|
|
}
|
|
|
|
var _ MetricRecorder = &PendingPodsRecorder{}
|
|
|
|
// PendingPodsRecorder is an implementation of MetricRecorder
|
|
type PendingPodsRecorder struct {
|
|
recorder metrics.GaugeMetric
|
|
}
|
|
|
|
// NewActivePodsRecorder returns ActivePods in a Prometheus metric fashion
|
|
func NewActivePodsRecorder() *PendingPodsRecorder {
|
|
return &PendingPodsRecorder{
|
|
recorder: ActivePods(),
|
|
}
|
|
}
|
|
|
|
// NewUnschedulablePodsRecorder returns UnschedulablePods in a Prometheus metric fashion
|
|
func NewUnschedulablePodsRecorder() *PendingPodsRecorder {
|
|
return &PendingPodsRecorder{
|
|
recorder: UnschedulablePods(),
|
|
}
|
|
}
|
|
|
|
// NewBackoffPodsRecorder returns BackoffPods in a Prometheus metric fashion
|
|
func NewBackoffPodsRecorder() *PendingPodsRecorder {
|
|
return &PendingPodsRecorder{
|
|
recorder: BackoffPods(),
|
|
}
|
|
}
|
|
|
|
// NewGatedPodsRecorder returns GatedPods in a Prometheus metric fashion
|
|
func NewGatedPodsRecorder() *PendingPodsRecorder {
|
|
return &PendingPodsRecorder{
|
|
recorder: GatedPods(),
|
|
}
|
|
}
|
|
|
|
// Inc increases a metric counter by 1, in an atomic way
|
|
func (r *PendingPodsRecorder) Inc() {
|
|
r.recorder.Inc()
|
|
}
|
|
|
|
// Dec decreases a metric counter by 1, in an atomic way
|
|
func (r *PendingPodsRecorder) Dec() {
|
|
r.recorder.Dec()
|
|
}
|
|
|
|
// Clear set a metric counter to 0, in an atomic way
|
|
func (r *PendingPodsRecorder) Clear() {
|
|
r.recorder.Set(float64(0))
|
|
}
|
|
|
|
// metric is the data structure passed in the buffer channel between the main framework thread
|
|
// and the metricsRecorder goroutine.
|
|
type metric struct {
|
|
metric *metrics.HistogramVec
|
|
labelValues []string
|
|
value float64
|
|
}
|
|
|
|
// MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path.
|
|
type MetricAsyncRecorder struct {
|
|
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
|
|
bufferCh chan *metric
|
|
// if bufferSize is reached, incoming metrics will be discarded.
|
|
bufferSize int
|
|
// how often the recorder runs to flush the metrics.
|
|
interval time.Duration
|
|
|
|
// stopCh is used to stop the goroutine which periodically flushes metrics.
|
|
stopCh <-chan struct{}
|
|
// IsStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
|
|
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
|
|
IsStoppedCh chan struct{}
|
|
}
|
|
|
|
func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder {
|
|
recorder := &MetricAsyncRecorder{
|
|
bufferCh: make(chan *metric, bufferSize),
|
|
bufferSize: bufferSize,
|
|
interval: interval,
|
|
stopCh: stopCh,
|
|
IsStoppedCh: make(chan struct{}),
|
|
}
|
|
go recorder.run()
|
|
return recorder
|
|
}
|
|
|
|
// ObservePluginDurationAsync observes the plugin_execution_duration_seconds metric.
|
|
// The metric will be flushed to Prometheus asynchronously.
|
|
func (r *MetricAsyncRecorder) ObservePluginDurationAsync(extensionPoint, pluginName, status string, value float64) {
|
|
newMetric := &metric{
|
|
metric: PluginExecutionDuration,
|
|
labelValues: []string{pluginName, extensionPoint, status},
|
|
value: value,
|
|
}
|
|
select {
|
|
case r.bufferCh <- newMetric:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// run flushes buffered metrics into Prometheus every second.
|
|
func (r *MetricAsyncRecorder) run() {
|
|
for {
|
|
select {
|
|
case <-r.stopCh:
|
|
close(r.IsStoppedCh)
|
|
return
|
|
default:
|
|
}
|
|
r.FlushMetrics()
|
|
time.Sleep(r.interval)
|
|
}
|
|
}
|
|
|
|
// FlushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics.
|
|
func (r *MetricAsyncRecorder) FlushMetrics() {
|
|
for i := 0; i < r.bufferSize; i++ {
|
|
select {
|
|
case m := <-r.bufferCh:
|
|
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|