From 639007b28e4ae3d7ee510667b1e545555c4e28a7 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Mon, 6 Mar 2023 21:54:01 +0000 Subject: [PATCH] cleanup(scheduler): move metric labels to metrics package --- pkg/scheduler/framework/runtime/framework.go | 75 ++++++++------------ pkg/scheduler/metrics/metrics.go | 28 ++++++-- pkg/scheduler/schedule_one.go | 5 +- 3 files changed, 56 insertions(+), 52 deletions(-) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 0303a72db5f..a9129d5f951 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -40,23 +40,8 @@ import ( ) const ( - // Filter is the name of the filter extension point. - Filter = "Filter" // Specifies the maximum timeout a permit plugin can return. - maxTimeout = 15 * time.Minute - preFilter = "PreFilter" - preFilterExtensionAddPod = "PreFilterExtensionAddPod" - preFilterExtensionRemovePod = "PreFilterExtensionRemovePod" - postFilter = "PostFilter" - preScore = "PreScore" - score = "Score" - scoreExtensionNormalize = "ScoreExtensionNormalize" - preBind = "PreBind" - bind = "Bind" - postBind = "PostBind" - reserve = "Reserve" - unreserve = "Unreserve" - permit = "Permit" + maxTimeout = 15 * time.Minute ) var allClusterEvents = []framework.ClusterEvent{ @@ -606,7 +591,7 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc { func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) { startTime := time.Now() defer func() { - metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() var result *framework.PreFilterResult var pluginsWithNodes []string @@ -617,7 +602,7 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor skipPlugins.Insert(pl.Name()) continue } - metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), preFilter, f.profileName).Inc() + metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.PreFilter, f.profileName).Inc() if !s.IsSuccess() { s.SetFailedPlugin(pl.Name()) if s.IsUnschedulable() { @@ -647,7 +632,7 @@ func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.Pre } startTime := time.Now() result, status := pl.PreFilter(ctx, state, pod) - f.metricsRecorder.ObservePluginDurationAsync(preFilter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return result, status } @@ -682,7 +667,7 @@ func (f *frameworkImpl) runPreFilterExtensionAddPod(ctx context.Context, pl fram } startTime := time.Now() status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo) - f.metricsRecorder.ObservePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilterExtensionAddPod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -717,7 +702,7 @@ func (f *frameworkImpl) runPreFilterExtensionRemovePod(ctx context.Context, pl f } startTime := time.Now() status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo) - f.metricsRecorder.ObservePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilterExtensionRemovePod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -735,7 +720,7 @@ func (f *frameworkImpl) RunFilterPlugins( if state.SkipFilterPlugins.Has(pl.Name()) { continue } - metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), Filter, f.profileName).Inc() + metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.Filter, f.profileName).Inc() if status := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo); !status.IsSuccess() { if !status.IsUnschedulable() { // Filter plugins are not supposed to return any status other than @@ -756,7 +741,7 @@ func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.Filter } startTime := time.Now() status := pl.Filter(ctx, state, pod, nodeInfo) - f.metricsRecorder.ObservePluginDurationAsync(Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -765,7 +750,7 @@ func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.Filter func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (_ *framework.PostFilterResult, status *framework.Status) { startTime := time.Now() defer func() { - metrics.FrameworkExtensionPointDuration.WithLabelValues(postFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() // `result` records the last meaningful(non-noop) PostFilterResult. @@ -802,7 +787,7 @@ func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.Po } startTime := time.Now() r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap) - f.metricsRecorder.ObservePluginDurationAsync(postFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.PostFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime)) return r, s } @@ -900,7 +885,7 @@ func (f *frameworkImpl) RunPreScorePlugins( ) (status *framework.Status) { startTime := time.Now() defer func() { - metrics.FrameworkExtensionPointDuration.WithLabelValues(preScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() skipPlugins := sets.New[string]() for _, pl := range f.preScorePlugins { @@ -923,7 +908,7 @@ func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreS } startTime := time.Now() status := pl.PreScore(ctx, state, pod, nodes) - f.metricsRecorder.ObservePluginDurationAsync(preScore, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.PreScore, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -934,7 +919,7 @@ func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreS func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ns []framework.NodePluginScores, status *framework.Status) { startTime := time.Now() defer func() { - metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() allNodePluginScores := make([]framework.NodePluginScores, len(nodes)) numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len() @@ -967,7 +952,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy Score: s, } } - }, score) + }, metrics.Score) if err := errCh.ReceiveError(); err != nil { return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err)) } @@ -986,7 +971,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy errCh.SendErrorWithCancel(err, cancel) return } - }, score) + }, metrics.Score) if err := errCh.ReceiveError(); err != nil { return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err)) } @@ -1017,7 +1002,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy nodePluginScores.TotalScore += weightedScore } allNodePluginScores[index] = nodePluginScores - }, score) + }, metrics.Score) if err := errCh.ReceiveError(); err != nil { return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err)) } @@ -1031,7 +1016,7 @@ func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePl } startTime := time.Now() s, status := pl.Score(ctx, state, pod, nodeName) - f.metricsRecorder.ObservePluginDurationAsync(score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.Score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return s, status } @@ -1041,7 +1026,7 @@ func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.Scor } startTime := time.Now() status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList) - f.metricsRecorder.ObservePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.ScoreExtensionNormalize, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -1051,7 +1036,7 @@ func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.Scor func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) { startTime := time.Now() defer func() { - metrics.FrameworkExtensionPointDuration.WithLabelValues(preBind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() for _, pl := range f.preBindPlugins { status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName) @@ -1075,7 +1060,7 @@ func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBi } startTime := time.Now() status := pl.PreBind(ctx, state, pod, nodeName) - f.metricsRecorder.ObservePluginDurationAsync(preBind, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.PreBind, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -1083,7 +1068,7 @@ func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBi func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) { startTime := time.Now() defer func() { - metrics.FrameworkExtensionPointDuration.WithLabelValues(bind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Bind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() if len(f.bindPlugins) == 0 { return framework.NewStatus(framework.Skip, "") @@ -1114,7 +1099,7 @@ func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlug } startTime := time.Now() status := bp.Bind(ctx, state, pod, nodeName) - f.metricsRecorder.ObservePluginDurationAsync(bind, bp.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.Bind, bp.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -1122,7 +1107,7 @@ func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlug func (f *frameworkImpl) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { startTime := time.Now() defer func() { - metrics.FrameworkExtensionPointDuration.WithLabelValues(postBind, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostBind, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() for _, pl := range f.postBindPlugins { f.runPostBindPlugin(ctx, pl, state, pod, nodeName) @@ -1136,7 +1121,7 @@ func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.Post } startTime := time.Now() pl.PostBind(ctx, state, pod, nodeName) - f.metricsRecorder.ObservePluginDurationAsync(postBind, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.PostBind, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime)) } // RunReservePluginsReserve runs the Reserve method in the set of configured @@ -1147,7 +1132,7 @@ func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.Post func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) { startTime := time.Now() defer func() { - metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() for _, pl := range f.reservePlugins { status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName) @@ -1166,7 +1151,7 @@ func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framewor } startTime := time.Now() status := pl.Reserve(ctx, state, pod, nodeName) - f.metricsRecorder.ObservePluginDurationAsync(reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.Reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status } @@ -1175,7 +1160,7 @@ func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framewor func (f *frameworkImpl) RunReservePluginsUnreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { startTime := time.Now() defer func() { - metrics.FrameworkExtensionPointDuration.WithLabelValues(unreserve, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Unreserve, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() // Execute the Unreserve operation of each reserve plugin in the // *reverse* order in which the Reserve operation was executed. @@ -1191,7 +1176,7 @@ func (f *frameworkImpl) runReservePluginUnreserve(ctx context.Context, pl framew } startTime := time.Now() pl.Unreserve(ctx, state, pod, nodeName) - f.metricsRecorder.ObservePluginDurationAsync(unreserve, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.Unreserve, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime)) } // RunPermitPlugins runs the set of configured permit plugins. If any of these @@ -1203,7 +1188,7 @@ func (f *frameworkImpl) runReservePluginUnreserve(ctx context.Context, pl framew func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) { startTime := time.Now() defer func() { - metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Permit, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() pluginsWaitTime := make(map[string]time.Duration) statusCode := framework.Success @@ -1245,7 +1230,7 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.Permit } startTime := time.Now() status, timeout := pl.Permit(ctx, state, pod, nodeName) - f.metricsRecorder.ObservePluginDurationAsync(permit, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + f.metricsRecorder.ObservePluginDurationAsync(metrics.Permit, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) return status, timeout } diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 414d0d4d779..f2632bf80b9 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -26,16 +26,36 @@ import ( ) const ( - // SchedulerSubsystem - subsystem name used by scheduler + // SchedulerSubsystem - subsystem name used by scheduler. SchedulerSubsystem = "scheduler" - // Below are possible values for the work label. +) - // PrioritizingExtender - prioritizing extender work label value +// Below are possible values for the work and operation label. +const ( + // PrioritizingExtender - prioritizing extender work/operation label value. PrioritizingExtender = "prioritizing_extender" - // Binding - binding work label value + // Binding - binding work/operation label value. Binding = "binding" ) +// Below are possible values for the extension_point label. +const ( + PreFilter = "PreFilter" + Filter = "Filter" + PreFilterExtensionAddPod = "PreFilterExtensionAddPod" + PreFilterExtensionRemovePod = "PreFilterExtensionRemovePod" + PostFilter = "PostFilter" + PreScore = "PreScore" + Score = "Score" + ScoreExtensionNormalize = "ScoreExtensionNormalize" + PreBind = "PreBind" + Bind = "Bind" + PostBind = "PostBind" + Reserve = "Reserve" + Unreserve = "Unreserve" + Permit = "Permit" +) + // All the histogram based metrics have 1ms as size for the smallest bucket. var ( scheduleAttempts = metrics.NewCounterVec( diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index cc0fbc73725..90e2d30e9c9 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -37,7 +37,6 @@ import ( "k8s.io/kubernetes/pkg/apis/core/validation" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" - frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" @@ -530,12 +529,12 @@ func (sched *Scheduler) findNodesThatPassFilters( // We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins // function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle. // Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod. - metrics.FrameworkExtensionPointDuration.WithLabelValues(frameworkruntime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode)) }() // Stops searching for more nodes once the configured number of feasible nodes // are found. - fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, frameworkruntime.Filter) + fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, metrics.Filter) feasibleNodes = feasibleNodes[:feasibleNodesLen] if err := errCh.ReceiveError(); err != nil { statusCode = framework.Error