diff --git a/pkg/scheduler/framework/v1alpha1/cycle_state.go b/pkg/scheduler/framework/v1alpha1/cycle_state.go index be62a81e4a9..34f8dd510ad 100644 --- a/pkg/scheduler/framework/v1alpha1/cycle_state.go +++ b/pkg/scheduler/framework/v1alpha1/cycle_state.go @@ -44,8 +44,8 @@ type StateKey string type CycleState struct { mx sync.RWMutex storage map[StateKey]StateData - // if recordFrameworkMetrics is true, framework metrics will be recorded for this cycle. - recordFrameworkMetrics bool + // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle. + recordPluginMetrics bool } // NewCycleState initializes a new CycleState and returns its pointer. @@ -55,20 +55,20 @@ func NewCycleState() *CycleState { } } -// ShouldRecordFrameworkMetrics returns whether framework metrics should be recorded. -func (c *CycleState) ShouldRecordFrameworkMetrics() bool { +// ShouldRecordPluginMetrics returns whether PluginExecutionDuration metrics should be recorded. +func (c *CycleState) ShouldRecordPluginMetrics() bool { if c == nil { return false } - return c.recordFrameworkMetrics + return c.recordPluginMetrics } -// SetRecordFrameworkMetrics sets recordFrameworkMetrics to the given value. -func (c *CycleState) SetRecordFrameworkMetrics(flag bool) { +// SetRecordPluginMetrics sets recordPluginMetrics to the given value. +func (c *CycleState) SetRecordPluginMetrics(flag bool) { if c == nil { return } - c.recordFrameworkMetrics = flag + c.recordPluginMetrics = flag } // Clone creates a copy of CycleState and returns its pointer. Clone returns diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 47572645bb5..2da48880c37 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -300,12 +300,10 @@ func (f *framework) QueueSortFunc() LessFunc { // anything but Success. If a non-success status is returned, then the scheduling // cycle is aborted. func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(preFilter, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.preFilterPlugins { status = f.runPreFilterPlugin(ctx, pl, state, pod) if !status.IsSuccess() { @@ -324,7 +322,7 @@ func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, } func (f *framework) runPreFilterPlugin(ctx context.Context, pl PreFilterPlugin, state *CycleState, pod *v1.Pod) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.PreFilter(ctx, state, pod) } startTime := time.Now() @@ -343,12 +341,10 @@ func (f *framework) RunPreFilterExtensionAddPod( podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionAddPod, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilterExtensionAddPod, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil { continue @@ -366,7 +362,7 @@ func (f *framework) RunPreFilterExtensionAddPod( } func (f *framework) runPreFilterExtensionAddPod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo) } startTime := time.Now() @@ -385,12 +381,10 @@ func (f *framework) RunPreFilterExtensionRemovePod( podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionRemovePod, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilterExtensionRemovePod, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil { continue @@ -408,7 +402,7 @@ func (f *framework) RunPreFilterExtensionRemovePod( } func (f *framework) runPreFilterExtensionRemovePod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo) } startTime := time.Now() @@ -428,12 +422,10 @@ func (f *framework) RunFilterPlugins( nodeInfo *schedulernodeinfo.NodeInfo, ) PluginToStatus { var firstFailedStatus *Status - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(filter, firstFailedStatus, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(filter, firstFailedStatus.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() statuses := make(PluginToStatus) for _, pl := range f.filterPlugins { pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) @@ -459,7 +451,7 @@ func (f *framework) RunFilterPlugins( } func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.Filter(ctx, state, pod, nodeInfo) } startTime := time.Now() @@ -478,12 +470,10 @@ func (f *framework) RunPostFilterPlugins( nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap, ) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(postFilter, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(postFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.postFilterPlugins { status = f.runPostFilterPlugin(ctx, pl, state, pod, nodes, filteredNodesStatuses) if !status.IsSuccess() { @@ -497,7 +487,7 @@ func (f *framework) RunPostFilterPlugins( } func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses) } startTime := time.Now() @@ -511,12 +501,10 @@ func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin // It also returns *Status, which is set to non-success if any of the plugins returns // a non-success status. func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(score, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins)) for _, pl := range f.scorePlugins { pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes)) @@ -592,7 +580,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod } func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeName string) (int64, *Status) { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.Score(ctx, state, pod, nodeName) } startTime := time.Now() @@ -602,7 +590,7 @@ func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *C } func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeScoreList NodeScoreList) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList) } startTime := time.Now() @@ -615,12 +603,10 @@ func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state // failure (bool) if any of the plugins returns an error. It also returns an // error containing the rejection message or the error occurred in the plugin. func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(preBind, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(preBind, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.preBindPlugins { status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { @@ -633,7 +619,7 @@ func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, po } func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.PreBind(ctx, state, pod, nodeName) } startTime := time.Now() @@ -644,12 +630,10 @@ func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, stat // RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status. func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(bind, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(bind, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() if len(f.bindPlugins) == 0 { return NewStatus(Skip, "") } @@ -669,7 +653,7 @@ func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod * } func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return bp.Bind(ctx, state, pod, nodeName) } startTime := time.Now() @@ -680,19 +664,17 @@ func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *Cyc // RunPostBindPlugins runs the set of configured postbind plugins. func (f *framework) RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(postBind, nil, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(postBind, Success.String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.postBindPlugins { f.runPostBindPlugin(ctx, pl, state, pod, nodeName) } } func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { pl.PostBind(ctx, state, pod, nodeName) return } @@ -705,12 +687,10 @@ func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, st // plugins returns an error, it does not continue running the remaining ones and // returns the error. In such case, pod will not be scheduled. func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(reserve, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.reservePlugins { status = f.runReservePlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { @@ -723,7 +703,7 @@ func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, po } func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.Reserve(ctx, state, pod, nodeName) } startTime := time.Now() @@ -734,19 +714,17 @@ func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, stat // RunUnreservePlugins runs the set of configured unreserve plugins. func (f *framework) RunUnreservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(unreserve, nil, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(unreserve, Success.String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.unreservePlugins { f.runUnreservePlugin(ctx, pl, state, pod, nodeName) } } func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, state *CycleState, pod *v1.Pod, nodeName string) { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { pl.Unreserve(ctx, state, pod, nodeName) return } @@ -763,12 +741,10 @@ func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, // Note that if multiple plugins asked to wait, then we wait for the minimum // timeout duration. func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(permit, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() pluginsWaitTime := make(map[string]time.Duration) statusCode := Success for _, pl := range f.permitPlugins { @@ -820,7 +796,7 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod } func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state *CycleState, pod *v1.Pod, nodeName string) (*Status, time.Duration) { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.Permit(ctx, state, pod, nodeName) } startTime := time.Now() diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index ac85daa0f7c..429394c913d 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -593,7 +593,6 @@ func TestPreFilterPlugins(t *testing.T) { t.Errorf("AddPod called %v, expected: 1", preFilter2.RemoveCalled) } }) - } func TestFilterPlugins(t *testing.T) { @@ -817,7 +816,9 @@ func TestFilterPlugins(t *testing.T) { } func TestRecordingMetrics(t *testing.T) { - state := &CycleState{recordFrameworkMetrics: true} + state := &CycleState{ + recordPluginMetrics: true, + } tests := []struct { name string action func(f Framework) diff --git a/pkg/scheduler/framework/v1alpha1/metrics_recorder.go b/pkg/scheduler/framework/v1alpha1/metrics_recorder.go index f6ae15f97ff..7751b0def37 100644 --- a/pkg/scheduler/framework/v1alpha1/metrics_recorder.go +++ b/pkg/scheduler/framework/v1alpha1/metrics_recorder.go @@ -60,21 +60,7 @@ func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder return recorder } -// observeExtensionPointDurationAsync observes the framework_extension_point_duration_seconds metric. -// The metric will be flushed to Prometheus asynchronously. -func (r *metricsRecorder) observeExtensionPointDurationAsync(extensionPoint string, status *Status, value float64) { - newMetric := &frameworkMetric{ - metric: metrics.FrameworkExtensionPointDuration, - labelValues: []string{extensionPoint, status.Code().String()}, - value: value, - } - select { - case r.bufferCh <- newMetric: - default: - } -} - -// observeExtensionPointDurationAsync observes the plugin_execution_duration_seconds metric. +// observePluginDurationAsync observes the plugin_execution_duration_seconds metric. // The metric will be flushed to Prometheus asynchronously. func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *Status, value float64) { newMetric := &frameworkMetric{ diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 6b3d833ab6a..a34ff4c967a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -54,8 +54,8 @@ const ( BindTimeoutSeconds = 100 // SchedulerError is the reason recorded for events when an error occurs during scheduling a pod. SchedulerError = "SchedulerError" - // Percentage of framework metrics to be sampled. - frameworkMetricsSamplePercent = 10 + // Percentage of plugin metrics to be sampled. + pluginMetricsSamplePercent = 10 ) // podConditionUpdater updates the condition of a pod based on the passed @@ -566,7 +566,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() - state.SetRecordFrameworkMetrics(rand.Intn(100) < frameworkMetricsSamplePercent) + state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)