diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index da2db2b6023..1bfd12a9cb7 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -93,7 +93,7 @@ type frameworkImpl struct { eventRecorder events.EventRecorder informerFactory informers.SharedInformerFactory - metricsRecorder *metricsRecorder + metricsRecorder *metrics.MetricAsyncRecorder profileName string percentageOfNodesToScore *int32 @@ -143,7 +143,7 @@ type frameworkOptions struct { eventRecorder events.EventRecorder informerFactory informers.SharedInformerFactory snapshotSharedLister framework.SharedLister - metricsRecorder *metricsRecorder + metricsRecorder *metrics.MetricAsyncRecorder podNominator framework.PodNominator extenders []framework.Extender captureProfile CaptureProfile @@ -232,7 +232,7 @@ func WithCaptureProfile(c CaptureProfile) Option { func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions { return frameworkOptions{ - metricsRecorder: newMetricsRecorder(1000, time.Second, stopCh), + metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh), clusterEventMap: make(map[framework.ClusterEvent]sets.String), parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism), } @@ -646,7 +646,7 @@ func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.Pre } startTime := time.Now() result, status := pl.PreFilter(ctx, state, pod) - f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(preFilter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return result, status } @@ -681,7 +681,7 @@ func (f *frameworkImpl) runPreFilterExtensionAddPod(ctx context.Context, pl fram } startTime := time.Now() status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo) - f.metricsRecorder.observePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -716,7 +716,7 @@ func (f *frameworkImpl) runPreFilterExtensionRemovePod(ctx context.Context, pl f } startTime := time.Now() status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo) - f.metricsRecorder.observePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -754,7 +754,7 @@ func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.Filter } startTime := time.Now() status := pl.Filter(ctx, state, pod, nodeInfo) - f.metricsRecorder.observePluginDurationAsync(Filter, pl.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -800,7 +800,7 @@ func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.Po } startTime := time.Now() r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap) - f.metricsRecorder.observePluginDurationAsync(postFilter, pl.Name(), s, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(postFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime)) return r, s } @@ -921,7 +921,7 @@ func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreS } startTime := time.Now() status := pl.PreScore(ctx, state, pod, nodes) - f.metricsRecorder.observePluginDurationAsync(preScore, pl.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(preScore, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -1029,7 +1029,7 @@ func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePl } startTime := time.Now() s, status := pl.Score(ctx, state, pod, nodeName) - f.metricsRecorder.observePluginDurationAsync(score, pl.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return s, status } @@ -1039,7 +1039,7 @@ func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.Scor } startTime := time.Now() status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList) - f.metricsRecorder.observePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -1073,7 +1073,7 @@ func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBi } startTime := time.Now() status := pl.PreBind(ctx, state, pod, nodeName) - f.metricsRecorder.observePluginDurationAsync(preBind, pl.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(preBind, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -1112,7 +1112,7 @@ func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlug } startTime := time.Now() status := bp.Bind(ctx, state, pod, nodeName) - f.metricsRecorder.observePluginDurationAsync(bind, bp.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(bind, bp.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -1134,7 +1134,7 @@ func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.Post } startTime := time.Now() pl.PostBind(ctx, state, pod, nodeName) - f.metricsRecorder.observePluginDurationAsync(postBind, pl.Name(), nil, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(postBind, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime)) } // RunReservePluginsReserve runs the Reserve method in the set of configured @@ -1164,7 +1164,7 @@ func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framewor } startTime := time.Now() status := pl.Reserve(ctx, state, pod, nodeName) - f.metricsRecorder.observePluginDurationAsync(reserve, pl.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -1189,7 +1189,7 @@ func (f *frameworkImpl) runReservePluginUnreserve(ctx context.Context, pl framew } startTime := time.Now() pl.Unreserve(ctx, state, pod, nodeName) - f.metricsRecorder.observePluginDurationAsync(unreserve, pl.Name(), nil, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(unreserve, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime)) } // RunPermitPlugins runs the set of configured permit plugins. If any of these @@ -1243,7 +1243,7 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.Permit } startTime := time.Now() status, timeout := pl.Permit(ctx, state, pod, nodeName) - f.metricsRecorder.observePluginDurationAsync(permit, pl.Name(), status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(permit, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status, timeout } diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index a07af5259b4..ff19b8c7af2 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -2829,7 +2829,7 @@ func TestPermitPlugins(t *testing.T) { } // withMetricsRecorder set metricsRecorder for the scheduling frameworkImpl. -func withMetricsRecorder(recorder *metricsRecorder) Option { +func withMetricsRecorder(recorder *metrics.MetricAsyncRecorder) Option { return func(o *frameworkOptions) { o.metricsRecorder = recorder } @@ -2985,7 +2985,7 @@ func TestRecordingMetrics(t *testing.T) { } stopCh := make(chan struct{}) - recorder := newMetricsRecorder(100, time.Nanosecond, stopCh) + recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, stopCh) profile := config.KubeSchedulerProfile{ PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore), SchedulerName: testProfileName, @@ -3001,9 +3001,9 @@ func TestRecordingMetrics(t *testing.T) { // Stop the goroutine which records metrics and ensure it's stopped. close(stopCh) - <-recorder.isStoppedCh + <-recorder.IsStoppedCh // Try to clean up the metrics buffer again in case it's not empty. - recorder.flushMetrics() + recorder.FlushMetrics() collectAndCompareFrameworkMetrics(t, tt.wantExtensionPoint, tt.wantStatus) collectAndComparePluginMetrics(t, tt.wantExtensionPoint, testPlugin, tt.wantStatus) @@ -3097,7 +3097,7 @@ func TestRunBindPlugins(t *testing.T) { } plugins := &config.Plugins{Bind: pluginSet} stopCh := make(chan struct{}) - recorder := newMetricsRecorder(100, time.Nanosecond, stopCh) + recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, stopCh) profile := config.KubeSchedulerProfile{ SchedulerName: testProfileName, PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore), @@ -3116,9 +3116,9 @@ func TestRunBindPlugins(t *testing.T) { // Stop the goroutine which records metrics and ensure it's stopped. close(stopCh) - <-recorder.isStoppedCh + <-recorder.IsStoppedCh // Try to clean up the metrics buffer again in case it's not empty. - recorder.flushMetrics() + recorder.FlushMetrics() collectAndCompareFrameworkMetrics(t, "Bind", tt.wantStatus) }) } diff --git a/pkg/scheduler/framework/runtime/metrics_recorder.go b/pkg/scheduler/framework/runtime/metrics_recorder.go deleted file mode 100644 index 23b25fa3b0a..00000000000 --- a/pkg/scheduler/framework/runtime/metrics_recorder.go +++ /dev/null @@ -1,101 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package runtime - -import ( - "time" - - k8smetrics "k8s.io/component-base/metrics" - "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/metrics" -) - -// frameworkMetric is the data structure passed in the buffer channel between the main framework thread -// and the metricsRecorder goroutine. -type frameworkMetric struct { - metric *k8smetrics.HistogramVec - labelValues []string - value float64 -} - -// metricRecorder records framework metrics in a separate goroutine to avoid overhead in the critical path. -type metricsRecorder struct { - // bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it. - bufferCh chan *frameworkMetric - // if bufferSize is reached, incoming metrics will be discarded. - bufferSize int - // how often the recorder runs to flush the metrics. - interval time.Duration - - // stopCh is used to stop the goroutine which periodically flushes metrics. - stopCh <-chan struct{} - // isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure - // the metric flushing goroutine is stopped so that tests can collect metrics for verification. - isStoppedCh chan struct{} -} - -func newMetricsRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *metricsRecorder { - recorder := &metricsRecorder{ - bufferCh: make(chan *frameworkMetric, bufferSize), - bufferSize: bufferSize, - interval: interval, - stopCh: stopCh, - isStoppedCh: make(chan struct{}), - } - go recorder.run() - return recorder -} - -// observePluginDurationAsync observes the plugin_execution_duration_seconds metric. -// The metric will be flushed to Prometheus asynchronously. -func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *framework.Status, value float64) { - newMetric := &frameworkMetric{ - metric: metrics.PluginExecutionDuration, - labelValues: []string{pluginName, extensionPoint, status.Code().String()}, - value: value, - } - select { - case r.bufferCh <- newMetric: - default: - } -} - -// run flushes buffered metrics into Prometheus every second. -func (r *metricsRecorder) run() { - for { - select { - case <-r.stopCh: - close(r.isStoppedCh) - return - default: - } - r.flushMetrics() - time.Sleep(r.interval) - } -} - -// flushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics. -func (r *metricsRecorder) flushMetrics() { - for i := 0; i < r.bufferSize; i++ { - select { - case m := <-r.bufferCh: - m.metric.WithLabelValues(m.labelValues...).Observe(m.value) - default: - return - } - } -} diff --git a/pkg/scheduler/metrics/metric_recorder.go b/pkg/scheduler/metrics/metric_recorder.go index b52b6a5455b..1146f81cd9e 100644 --- a/pkg/scheduler/metrics/metric_recorder.go +++ b/pkg/scheduler/metrics/metric_recorder.go @@ -17,6 +17,8 @@ limitations under the License. package metrics import ( + "time" + "k8s.io/component-base/metrics" ) @@ -77,3 +79,79 @@ func (r *PendingPodsRecorder) Dec() { func (r *PendingPodsRecorder) Clear() { r.recorder.Set(float64(0)) } + +// metric is the data structure passed in the buffer channel between the main framework thread +// and the metricsRecorder goroutine. +type metric struct { + metric *metrics.HistogramVec + labelValues []string + value float64 +} + +// MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path. +type MetricAsyncRecorder struct { + // bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it. + bufferCh chan *metric + // if bufferSize is reached, incoming metrics will be discarded. + bufferSize int + // how often the recorder runs to flush the metrics. + interval time.Duration + + // stopCh is used to stop the goroutine which periodically flushes metrics. + stopCh <-chan struct{} + // IsStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure + // the metric flushing goroutine is stopped so that tests can collect metrics for verification. + IsStoppedCh chan struct{} +} + +func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder { + recorder := &MetricAsyncRecorder{ + bufferCh: make(chan *metric, bufferSize), + bufferSize: bufferSize, + interval: interval, + stopCh: stopCh, + IsStoppedCh: make(chan struct{}), + } + go recorder.run() + return recorder +} + +// ObservePluginDurationAsync observes the plugin_execution_duration_seconds metric. +// The metric will be flushed to Prometheus asynchronously. +func (r *MetricAsyncRecorder) ObservePluginDurationAsync(extensionPoint, pluginName, status string, value float64) { + newMetric := &metric{ + metric: PluginExecutionDuration, + labelValues: []string{pluginName, extensionPoint, status}, + value: value, + } + select { + case r.bufferCh <- newMetric: + default: + } +} + +// run flushes buffered metrics into Prometheus every second. +func (r *MetricAsyncRecorder) run() { + for { + select { + case <-r.stopCh: + close(r.IsStoppedCh) + return + default: + } + r.FlushMetrics() + time.Sleep(r.interval) + } +} + +// FlushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics. +func (r *MetricAsyncRecorder) FlushMetrics() { + for i := 0; i < r.bufferSize; i++ { + select { + case m := <-r.bufferCh: + m.metric.WithLabelValues(m.labelValues...).Observe(m.value) + default: + return + } + } +}