mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
cleanup(scheduler): move metricRecorder to metrics package
This commit is contained in:
parent
eabb70833a
commit
aa7b1766e5
@ -93,7 +93,7 @@ type frameworkImpl struct {
|
|||||||
eventRecorder events.EventRecorder
|
eventRecorder events.EventRecorder
|
||||||
informerFactory informers.SharedInformerFactory
|
informerFactory informers.SharedInformerFactory
|
||||||
|
|
||||||
metricsRecorder *metricsRecorder
|
metricsRecorder *metrics.MetricAsyncRecorder
|
||||||
profileName string
|
profileName string
|
||||||
percentageOfNodesToScore *int32
|
percentageOfNodesToScore *int32
|
||||||
|
|
||||||
@ -143,7 +143,7 @@ type frameworkOptions struct {
|
|||||||
eventRecorder events.EventRecorder
|
eventRecorder events.EventRecorder
|
||||||
informerFactory informers.SharedInformerFactory
|
informerFactory informers.SharedInformerFactory
|
||||||
snapshotSharedLister framework.SharedLister
|
snapshotSharedLister framework.SharedLister
|
||||||
metricsRecorder *metricsRecorder
|
metricsRecorder *metrics.MetricAsyncRecorder
|
||||||
podNominator framework.PodNominator
|
podNominator framework.PodNominator
|
||||||
extenders []framework.Extender
|
extenders []framework.Extender
|
||||||
captureProfile CaptureProfile
|
captureProfile CaptureProfile
|
||||||
@ -232,7 +232,7 @@ func WithCaptureProfile(c CaptureProfile) Option {
|
|||||||
|
|
||||||
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
|
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
|
||||||
return frameworkOptions{
|
return frameworkOptions{
|
||||||
metricsRecorder: newMetricsRecorder(1000, time.Second, stopCh),
|
metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh),
|
||||||
clusterEventMap: make(map[framework.ClusterEvent]sets.String),
|
clusterEventMap: make(map[framework.ClusterEvent]sets.String),
|
||||||
parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism),
|
parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism),
|
||||||
}
|
}
|
||||||
@ -646,7 +646,7 @@ func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.Pre
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
result, status := pl.PreFilter(ctx, state, pod)
|
result, status := pl.PreFilter(ctx, state, pod)
|
||||||
f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(preFilter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return result, status
|
return result, status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -681,7 +681,7 @@ func (f *frameworkImpl) runPreFilterExtensionAddPod(ctx context.Context, pl fram
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo)
|
status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo)
|
||||||
f.metricsRecorder.observePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -716,7 +716,7 @@ func (f *frameworkImpl) runPreFilterExtensionRemovePod(ctx context.Context, pl f
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo)
|
status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo)
|
||||||
f.metricsRecorder.observePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -754,7 +754,7 @@ func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.Filter
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
status := pl.Filter(ctx, state, pod, nodeInfo)
|
status := pl.Filter(ctx, state, pod, nodeInfo)
|
||||||
f.metricsRecorder.observePluginDurationAsync(Filter, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -800,7 +800,7 @@ func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.Po
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
|
r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
|
||||||
f.metricsRecorder.observePluginDurationAsync(postFilter, pl.Name(), s, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(postFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return r, s
|
return r, s
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -914,7 +914,7 @@ func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreS
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
status := pl.PreScore(ctx, state, pod, nodes)
|
status := pl.PreScore(ctx, state, pod, nodes)
|
||||||
f.metricsRecorder.observePluginDurationAsync(preScore, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(preScore, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1014,7 +1014,7 @@ func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePl
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
s, status := pl.Score(ctx, state, pod, nodeName)
|
s, status := pl.Score(ctx, state, pod, nodeName)
|
||||||
f.metricsRecorder.observePluginDurationAsync(score, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return s, status
|
return s, status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1024,7 +1024,7 @@ func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.Scor
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
|
status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
|
||||||
f.metricsRecorder.observePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1058,7 +1058,7 @@ func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBi
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
status := pl.PreBind(ctx, state, pod, nodeName)
|
status := pl.PreBind(ctx, state, pod, nodeName)
|
||||||
f.metricsRecorder.observePluginDurationAsync(preBind, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(preBind, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1097,7 +1097,7 @@ func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlug
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
status := bp.Bind(ctx, state, pod, nodeName)
|
status := bp.Bind(ctx, state, pod, nodeName)
|
||||||
f.metricsRecorder.observePluginDurationAsync(bind, bp.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(bind, bp.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1119,7 +1119,7 @@ func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.Post
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
pl.PostBind(ctx, state, pod, nodeName)
|
pl.PostBind(ctx, state, pod, nodeName)
|
||||||
f.metricsRecorder.observePluginDurationAsync(postBind, pl.Name(), nil, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(postBind, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunReservePluginsReserve runs the Reserve method in the set of configured
|
// RunReservePluginsReserve runs the Reserve method in the set of configured
|
||||||
@ -1149,7 +1149,7 @@ func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framewor
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
status := pl.Reserve(ctx, state, pod, nodeName)
|
status := pl.Reserve(ctx, state, pod, nodeName)
|
||||||
f.metricsRecorder.observePluginDurationAsync(reserve, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1174,7 +1174,7 @@ func (f *frameworkImpl) runReservePluginUnreserve(ctx context.Context, pl framew
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
pl.Unreserve(ctx, state, pod, nodeName)
|
pl.Unreserve(ctx, state, pod, nodeName)
|
||||||
f.metricsRecorder.observePluginDurationAsync(unreserve, pl.Name(), nil, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(unreserve, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunPermitPlugins runs the set of configured permit plugins. If any of these
|
// RunPermitPlugins runs the set of configured permit plugins. If any of these
|
||||||
@ -1228,7 +1228,7 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.Permit
|
|||||||
}
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
status, timeout := pl.Permit(ctx, state, pod, nodeName)
|
status, timeout := pl.Permit(ctx, state, pod, nodeName)
|
||||||
f.metricsRecorder.observePluginDurationAsync(permit, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
f.metricsRecorder.ObservePluginDurationAsync(permit, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
|
||||||
return status, timeout
|
return status, timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2637,7 +2637,7 @@ func TestPermitPlugins(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// withMetricsRecorder set metricsRecorder for the scheduling frameworkImpl.
|
// withMetricsRecorder set metricsRecorder for the scheduling frameworkImpl.
|
||||||
func withMetricsRecorder(recorder *metricsRecorder) Option {
|
func withMetricsRecorder(recorder *metrics.MetricAsyncRecorder) Option {
|
||||||
return func(o *frameworkOptions) {
|
return func(o *frameworkOptions) {
|
||||||
o.metricsRecorder = recorder
|
o.metricsRecorder = recorder
|
||||||
}
|
}
|
||||||
@ -2793,7 +2793,7 @@ func TestRecordingMetrics(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
recorder := newMetricsRecorder(100, time.Nanosecond, stopCh)
|
recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, stopCh)
|
||||||
profile := config.KubeSchedulerProfile{
|
profile := config.KubeSchedulerProfile{
|
||||||
PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore),
|
PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore),
|
||||||
SchedulerName: testProfileName,
|
SchedulerName: testProfileName,
|
||||||
@ -2809,9 +2809,9 @@ func TestRecordingMetrics(t *testing.T) {
|
|||||||
|
|
||||||
// Stop the goroutine which records metrics and ensure it's stopped.
|
// Stop the goroutine which records metrics and ensure it's stopped.
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
<-recorder.isStoppedCh
|
<-recorder.IsStoppedCh
|
||||||
// Try to clean up the metrics buffer again in case it's not empty.
|
// Try to clean up the metrics buffer again in case it's not empty.
|
||||||
recorder.flushMetrics()
|
recorder.FlushMetrics()
|
||||||
|
|
||||||
collectAndCompareFrameworkMetrics(t, tt.wantExtensionPoint, tt.wantStatus)
|
collectAndCompareFrameworkMetrics(t, tt.wantExtensionPoint, tt.wantStatus)
|
||||||
collectAndComparePluginMetrics(t, tt.wantExtensionPoint, testPlugin, tt.wantStatus)
|
collectAndComparePluginMetrics(t, tt.wantExtensionPoint, testPlugin, tt.wantStatus)
|
||||||
@ -2905,7 +2905,7 @@ func TestRunBindPlugins(t *testing.T) {
|
|||||||
}
|
}
|
||||||
plugins := &config.Plugins{Bind: pluginSet}
|
plugins := &config.Plugins{Bind: pluginSet}
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
recorder := newMetricsRecorder(100, time.Nanosecond, stopCh)
|
recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, stopCh)
|
||||||
profile := config.KubeSchedulerProfile{
|
profile := config.KubeSchedulerProfile{
|
||||||
SchedulerName: testProfileName,
|
SchedulerName: testProfileName,
|
||||||
PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore),
|
PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore),
|
||||||
@ -2924,9 +2924,9 @@ func TestRunBindPlugins(t *testing.T) {
|
|||||||
|
|
||||||
// Stop the goroutine which records metrics and ensure it's stopped.
|
// Stop the goroutine which records metrics and ensure it's stopped.
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
<-recorder.isStoppedCh
|
<-recorder.IsStoppedCh
|
||||||
// Try to clean up the metrics buffer again in case it's not empty.
|
// Try to clean up the metrics buffer again in case it's not empty.
|
||||||
recorder.flushMetrics()
|
recorder.FlushMetrics()
|
||||||
collectAndCompareFrameworkMetrics(t, "Bind", tt.wantStatus)
|
collectAndCompareFrameworkMetrics(t, "Bind", tt.wantStatus)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1,101 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2019 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package runtime
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
k8smetrics "k8s.io/component-base/metrics"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
|
||||||
)
|
|
||||||
|
|
||||||
// frameworkMetric is the data structure passed in the buffer channel between the main framework thread
|
|
||||||
// and the metricsRecorder goroutine.
|
|
||||||
type frameworkMetric struct {
|
|
||||||
metric *k8smetrics.HistogramVec
|
|
||||||
labelValues []string
|
|
||||||
value float64
|
|
||||||
}
|
|
||||||
|
|
||||||
// metricRecorder records framework metrics in a separate goroutine to avoid overhead in the critical path.
|
|
||||||
type metricsRecorder struct {
|
|
||||||
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
|
|
||||||
bufferCh chan *frameworkMetric
|
|
||||||
// if bufferSize is reached, incoming metrics will be discarded.
|
|
||||||
bufferSize int
|
|
||||||
// how often the recorder runs to flush the metrics.
|
|
||||||
interval time.Duration
|
|
||||||
|
|
||||||
// stopCh is used to stop the goroutine which periodically flushes metrics.
|
|
||||||
stopCh <-chan struct{}
|
|
||||||
// isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
|
|
||||||
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
|
|
||||||
isStoppedCh chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newMetricsRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *metricsRecorder {
|
|
||||||
recorder := &metricsRecorder{
|
|
||||||
bufferCh: make(chan *frameworkMetric, bufferSize),
|
|
||||||
bufferSize: bufferSize,
|
|
||||||
interval: interval,
|
|
||||||
stopCh: stopCh,
|
|
||||||
isStoppedCh: make(chan struct{}),
|
|
||||||
}
|
|
||||||
go recorder.run()
|
|
||||||
return recorder
|
|
||||||
}
|
|
||||||
|
|
||||||
// observePluginDurationAsync observes the plugin_execution_duration_seconds metric.
|
|
||||||
// The metric will be flushed to Prometheus asynchronously.
|
|
||||||
func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *framework.Status, value float64) {
|
|
||||||
newMetric := &frameworkMetric{
|
|
||||||
metric: metrics.PluginExecutionDuration,
|
|
||||||
labelValues: []string{pluginName, extensionPoint, status.Code().String()},
|
|
||||||
value: value,
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case r.bufferCh <- newMetric:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// run flushes buffered metrics into Prometheus every second.
|
|
||||||
func (r *metricsRecorder) run() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-r.stopCh:
|
|
||||||
close(r.isStoppedCh)
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
r.flushMetrics()
|
|
||||||
time.Sleep(r.interval)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// flushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics.
|
|
||||||
func (r *metricsRecorder) flushMetrics() {
|
|
||||||
for i := 0; i < r.bufferSize; i++ {
|
|
||||||
select {
|
|
||||||
case m := <-r.bufferCh:
|
|
||||||
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
|
|
||||||
default:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -17,6 +17,8 @@ limitations under the License.
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/component-base/metrics"
|
"k8s.io/component-base/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -77,3 +79,79 @@ func (r *PendingPodsRecorder) Dec() {
|
|||||||
func (r *PendingPodsRecorder) Clear() {
|
func (r *PendingPodsRecorder) Clear() {
|
||||||
r.recorder.Set(float64(0))
|
r.recorder.Set(float64(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// metric is the data structure passed in the buffer channel between the main framework thread
|
||||||
|
// and the metricsRecorder goroutine.
|
||||||
|
type metric struct {
|
||||||
|
metric *metrics.HistogramVec
|
||||||
|
labelValues []string
|
||||||
|
value float64
|
||||||
|
}
|
||||||
|
|
||||||
|
// MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path.
|
||||||
|
type MetricAsyncRecorder struct {
|
||||||
|
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
|
||||||
|
bufferCh chan *metric
|
||||||
|
// if bufferSize is reached, incoming metrics will be discarded.
|
||||||
|
bufferSize int
|
||||||
|
// how often the recorder runs to flush the metrics.
|
||||||
|
interval time.Duration
|
||||||
|
|
||||||
|
// stopCh is used to stop the goroutine which periodically flushes metrics.
|
||||||
|
stopCh <-chan struct{}
|
||||||
|
// IsStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
|
||||||
|
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
|
||||||
|
IsStoppedCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder {
|
||||||
|
recorder := &MetricAsyncRecorder{
|
||||||
|
bufferCh: make(chan *metric, bufferSize),
|
||||||
|
bufferSize: bufferSize,
|
||||||
|
interval: interval,
|
||||||
|
stopCh: stopCh,
|
||||||
|
IsStoppedCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
go recorder.run()
|
||||||
|
return recorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObservePluginDurationAsync observes the plugin_execution_duration_seconds metric.
|
||||||
|
// The metric will be flushed to Prometheus asynchronously.
|
||||||
|
func (r *MetricAsyncRecorder) ObservePluginDurationAsync(extensionPoint, pluginName, status string, value float64) {
|
||||||
|
newMetric := &metric{
|
||||||
|
metric: PluginExecutionDuration,
|
||||||
|
labelValues: []string{pluginName, extensionPoint, status},
|
||||||
|
value: value,
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case r.bufferCh <- newMetric:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run flushes buffered metrics into Prometheus every second.
|
||||||
|
func (r *MetricAsyncRecorder) run() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-r.stopCh:
|
||||||
|
close(r.IsStoppedCh)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
r.FlushMetrics()
|
||||||
|
time.Sleep(r.interval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics.
|
||||||
|
func (r *MetricAsyncRecorder) FlushMetrics() {
|
||||||
|
for i := 0; i < r.bufferSize; i++ {
|
||||||
|
select {
|
||||||
|
case m := <-r.bufferCh:
|
||||||
|
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user