Merge pull request #115519 from sanposhiho/move-metric-recorder

cleanup(scheduler): move metricRecorder to metrics package
This commit is contained in:
Kubernetes Prow Robot 2023-02-16 03:57:38 -08:00 committed by GitHub
commit 77af0be42f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 102 additions and 125 deletions

View File

@ -93,7 +93,7 @@ type frameworkImpl struct {
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
metricsRecorder *metricsRecorder
metricsRecorder *metrics.MetricAsyncRecorder
profileName string
percentageOfNodesToScore *int32
@ -143,7 +143,7 @@ type frameworkOptions struct {
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
snapshotSharedLister framework.SharedLister
metricsRecorder *metricsRecorder
metricsRecorder *metrics.MetricAsyncRecorder
podNominator framework.PodNominator
extenders []framework.Extender
captureProfile CaptureProfile
@ -232,7 +232,7 @@ func WithCaptureProfile(c CaptureProfile) Option {
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
return frameworkOptions{
metricsRecorder: newMetricsRecorder(1000, time.Second, stopCh),
metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh),
clusterEventMap: make(map[framework.ClusterEvent]sets.String),
parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism),
}
@ -646,7 +646,7 @@ func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.Pre
}
startTime := time.Now()
result, status := pl.PreFilter(ctx, state, pod)
f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(preFilter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return result, status
}
@ -681,7 +681,7 @@ func (f *frameworkImpl) runPreFilterExtensionAddPod(ctx context.Context, pl fram
}
startTime := time.Now()
status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo)
f.metricsRecorder.observePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -716,7 +716,7 @@ func (f *frameworkImpl) runPreFilterExtensionRemovePod(ctx context.Context, pl f
}
startTime := time.Now()
status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo)
f.metricsRecorder.observePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -754,7 +754,7 @@ func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.Filter
}
startTime := time.Now()
status := pl.Filter(ctx, state, pod, nodeInfo)
f.metricsRecorder.observePluginDurationAsync(Filter, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -800,7 +800,7 @@ func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.Po
}
startTime := time.Now()
r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
f.metricsRecorder.observePluginDurationAsync(postFilter, pl.Name(), s, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(postFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime))
return r, s
}
@ -921,7 +921,7 @@ func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreS
}
startTime := time.Now()
status := pl.PreScore(ctx, state, pod, nodes)
f.metricsRecorder.observePluginDurationAsync(preScore, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(preScore, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -1029,7 +1029,7 @@ func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePl
}
startTime := time.Now()
s, status := pl.Score(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(score, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return s, status
}
@ -1039,7 +1039,7 @@ func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.Scor
}
startTime := time.Now()
status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
f.metricsRecorder.observePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -1073,7 +1073,7 @@ func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBi
}
startTime := time.Now()
status := pl.PreBind(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(preBind, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(preBind, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -1112,7 +1112,7 @@ func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlug
}
startTime := time.Now()
status := bp.Bind(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(bind, bp.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(bind, bp.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -1134,7 +1134,7 @@ func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.Post
}
startTime := time.Now()
pl.PostBind(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(postBind, pl.Name(), nil, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(postBind, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
}
// RunReservePluginsReserve runs the Reserve method in the set of configured
@ -1164,7 +1164,7 @@ func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framewor
}
startTime := time.Now()
status := pl.Reserve(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(reserve, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -1189,7 +1189,7 @@ func (f *frameworkImpl) runReservePluginUnreserve(ctx context.Context, pl framew
}
startTime := time.Now()
pl.Unreserve(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(unreserve, pl.Name(), nil, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(unreserve, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
}
// RunPermitPlugins runs the set of configured permit plugins. If any of these
@ -1243,7 +1243,7 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.Permit
}
startTime := time.Now()
status, timeout := pl.Permit(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(permit, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(permit, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status, timeout
}

View File

@ -2829,7 +2829,7 @@ func TestPermitPlugins(t *testing.T) {
}
// withMetricsRecorder set metricsRecorder for the scheduling frameworkImpl.
func withMetricsRecorder(recorder *metricsRecorder) Option {
func withMetricsRecorder(recorder *metrics.MetricAsyncRecorder) Option {
return func(o *frameworkOptions) {
o.metricsRecorder = recorder
}
@ -2985,7 +2985,7 @@ func TestRecordingMetrics(t *testing.T) {
}
stopCh := make(chan struct{})
recorder := newMetricsRecorder(100, time.Nanosecond, stopCh)
recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, stopCh)
profile := config.KubeSchedulerProfile{
PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore),
SchedulerName: testProfileName,
@ -3001,9 +3001,9 @@ func TestRecordingMetrics(t *testing.T) {
// Stop the goroutine which records metrics and ensure it's stopped.
close(stopCh)
<-recorder.isStoppedCh
<-recorder.IsStoppedCh
// Try to clean up the metrics buffer again in case it's not empty.
recorder.flushMetrics()
recorder.FlushMetrics()
collectAndCompareFrameworkMetrics(t, tt.wantExtensionPoint, tt.wantStatus)
collectAndComparePluginMetrics(t, tt.wantExtensionPoint, testPlugin, tt.wantStatus)
@ -3097,7 +3097,7 @@ func TestRunBindPlugins(t *testing.T) {
}
plugins := &config.Plugins{Bind: pluginSet}
stopCh := make(chan struct{})
recorder := newMetricsRecorder(100, time.Nanosecond, stopCh)
recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, stopCh)
profile := config.KubeSchedulerProfile{
SchedulerName: testProfileName,
PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore),
@ -3116,9 +3116,9 @@ func TestRunBindPlugins(t *testing.T) {
// Stop the goroutine which records metrics and ensure it's stopped.
close(stopCh)
<-recorder.isStoppedCh
<-recorder.IsStoppedCh
// Try to clean up the metrics buffer again in case it's not empty.
recorder.flushMetrics()
recorder.FlushMetrics()
collectAndCompareFrameworkMetrics(t, "Bind", tt.wantStatus)
})
}

View File

@ -1,101 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
import (
"time"
k8smetrics "k8s.io/component-base/metrics"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
// frameworkMetric is the data structure passed in the buffer channel between the main framework thread
// and the metricsRecorder goroutine.
type frameworkMetric struct {
metric *k8smetrics.HistogramVec
labelValues []string
value float64
}
// metricRecorder records framework metrics in a separate goroutine to avoid overhead in the critical path.
type metricsRecorder struct {
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
bufferCh chan *frameworkMetric
// if bufferSize is reached, incoming metrics will be discarded.
bufferSize int
// how often the recorder runs to flush the metrics.
interval time.Duration
// stopCh is used to stop the goroutine which periodically flushes metrics.
stopCh <-chan struct{}
// isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
isStoppedCh chan struct{}
}
func newMetricsRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *metricsRecorder {
recorder := &metricsRecorder{
bufferCh: make(chan *frameworkMetric, bufferSize),
bufferSize: bufferSize,
interval: interval,
stopCh: stopCh,
isStoppedCh: make(chan struct{}),
}
go recorder.run()
return recorder
}
// observePluginDurationAsync observes the plugin_execution_duration_seconds metric.
// The metric will be flushed to Prometheus asynchronously.
func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *framework.Status, value float64) {
newMetric := &frameworkMetric{
metric: metrics.PluginExecutionDuration,
labelValues: []string{pluginName, extensionPoint, status.Code().String()},
value: value,
}
select {
case r.bufferCh <- newMetric:
default:
}
}
// run flushes buffered metrics into Prometheus every second.
func (r *metricsRecorder) run() {
for {
select {
case <-r.stopCh:
close(r.isStoppedCh)
return
default:
}
r.flushMetrics()
time.Sleep(r.interval)
}
}
// flushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics.
func (r *metricsRecorder) flushMetrics() {
for i := 0; i < r.bufferSize; i++ {
select {
case m := <-r.bufferCh:
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
default:
return
}
}
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package metrics
import (
"time"
"k8s.io/component-base/metrics"
)
@ -77,3 +79,79 @@ func (r *PendingPodsRecorder) Dec() {
func (r *PendingPodsRecorder) Clear() {
r.recorder.Set(float64(0))
}
// metric is the data structure passed in the buffer channel between the main framework thread
// and the metricsRecorder goroutine.
type metric struct {
metric *metrics.HistogramVec
labelValues []string
value float64
}
// MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path.
type MetricAsyncRecorder struct {
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
bufferCh chan *metric
// if bufferSize is reached, incoming metrics will be discarded.
bufferSize int
// how often the recorder runs to flush the metrics.
interval time.Duration
// stopCh is used to stop the goroutine which periodically flushes metrics.
stopCh <-chan struct{}
// IsStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
IsStoppedCh chan struct{}
}
func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder {
recorder := &MetricAsyncRecorder{
bufferCh: make(chan *metric, bufferSize),
bufferSize: bufferSize,
interval: interval,
stopCh: stopCh,
IsStoppedCh: make(chan struct{}),
}
go recorder.run()
return recorder
}
// ObservePluginDurationAsync observes the plugin_execution_duration_seconds metric.
// The metric will be flushed to Prometheus asynchronously.
func (r *MetricAsyncRecorder) ObservePluginDurationAsync(extensionPoint, pluginName, status string, value float64) {
newMetric := &metric{
metric: PluginExecutionDuration,
labelValues: []string{pluginName, extensionPoint, status},
value: value,
}
select {
case r.bufferCh <- newMetric:
default:
}
}
// run flushes buffered metrics into Prometheus every second.
func (r *MetricAsyncRecorder) run() {
for {
select {
case <-r.stopCh:
close(r.IsStoppedCh)
return
default:
}
r.FlushMetrics()
time.Sleep(r.interval)
}
}
// FlushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics.
func (r *MetricAsyncRecorder) FlushMetrics() {
for i := 0; i < r.bufferSize; i++ {
select {
case m := <-r.bufferCh:
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
default:
return
}
}
}