From 6c58cc0ad007179a54f3079083dfdd1bf8bcf393 Mon Sep 17 00:00:00 2001 From: sunxiaofei03 Date: Thu, 9 Jan 2020 12:23:05 +0800 Subject: [PATCH] change framework_extension_point_duration_seconds from sampling to always record --- .../framework/v1alpha1/cycle_state.go | 16 +- pkg/scheduler/framework/v1alpha1/framework.go | 146 ++++++++---------- .../framework/v1alpha1/framework_test.go | 5 +- .../framework/v1alpha1/metrics_recorder.go | 16 +- pkg/scheduler/scheduler.go | 6 +- 5 files changed, 76 insertions(+), 113 deletions(-) diff --git a/pkg/scheduler/framework/v1alpha1/cycle_state.go b/pkg/scheduler/framework/v1alpha1/cycle_state.go index be62a81e4a9..34f8dd510ad 100644 --- a/pkg/scheduler/framework/v1alpha1/cycle_state.go +++ b/pkg/scheduler/framework/v1alpha1/cycle_state.go @@ -44,8 +44,8 @@ type StateKey string type CycleState struct { mx sync.RWMutex storage map[StateKey]StateData - // if recordFrameworkMetrics is true, framework metrics will be recorded for this cycle. - recordFrameworkMetrics bool + // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle. + recordPluginMetrics bool } // NewCycleState initializes a new CycleState and returns its pointer. @@ -55,20 +55,20 @@ func NewCycleState() *CycleState { } } -// ShouldRecordFrameworkMetrics returns whether framework metrics should be recorded. -func (c *CycleState) ShouldRecordFrameworkMetrics() bool { +// ShouldRecordPluginMetrics returns whether PluginExecutionDuration metrics should be recorded. +func (c *CycleState) ShouldRecordPluginMetrics() bool { if c == nil { return false } - return c.recordFrameworkMetrics + return c.recordPluginMetrics } -// SetRecordFrameworkMetrics sets recordFrameworkMetrics to the given value. -func (c *CycleState) SetRecordFrameworkMetrics(flag bool) { +// SetRecordPluginMetrics sets recordPluginMetrics to the given value. +func (c *CycleState) SetRecordPluginMetrics(flag bool) { if c == nil { return } - c.recordFrameworkMetrics = flag + c.recordPluginMetrics = flag } // Clone creates a copy of CycleState and returns its pointer. Clone returns diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 4d480fa5ac4..2fef66983c9 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -289,12 +289,10 @@ func (f *framework) QueueSortFunc() LessFunc { // anything but Success. If a non-success status is returned, then the scheduling // cycle is aborted. func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(preFilter, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.preFilterPlugins { status = f.runPreFilterPlugin(ctx, pl, state, pod) if !status.IsSuccess() { @@ -313,7 +311,7 @@ func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, } func (f *framework) runPreFilterPlugin(ctx context.Context, pl PreFilterPlugin, state *CycleState, pod *v1.Pod) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.PreFilter(ctx, state, pod) } startTime := time.Now() @@ -332,12 +330,10 @@ func (f *framework) RunPreFilterExtensionAddPod( podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionAddPod, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilterExtensionAddPod, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil { continue @@ -355,7 +351,7 @@ func (f *framework) RunPreFilterExtensionAddPod( } func (f *framework) runPreFilterExtensionAddPod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo) } startTime := time.Now() @@ -374,12 +370,10 @@ func (f *framework) RunPreFilterExtensionRemovePod( podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionRemovePod, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilterExtensionRemovePod, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil { continue @@ -397,7 +391,7 @@ func (f *framework) RunPreFilterExtensionRemovePod( } func (f *framework) runPreFilterExtensionRemovePod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo) } startTime := time.Now() @@ -417,12 +411,10 @@ func (f *framework) RunFilterPlugins( nodeInfo *schedulernodeinfo.NodeInfo, ) PluginToStatus { var firstFailedStatus *Status - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(filter, firstFailedStatus, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(filter, firstFailedStatus.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() statuses := make(PluginToStatus) for _, pl := range f.filterPlugins { pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) @@ -448,7 +440,7 @@ func (f *framework) RunFilterPlugins( } func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.Filter(ctx, state, pod, nodeInfo) } startTime := time.Now() @@ -467,12 +459,10 @@ func (f *framework) RunPostFilterPlugins( nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap, ) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(postFilter, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(postFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.postFilterPlugins { status = f.runPostFilterPlugin(ctx, pl, state, pod, nodes, filteredNodesStatuses) if !status.IsSuccess() { @@ -486,7 +476,7 @@ func (f *framework) RunPostFilterPlugins( } func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses) } startTime := time.Now() @@ -500,12 +490,10 @@ func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin // It also returns *Status, which is set to non-success if any of the plugins returns // a non-success status. func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(score, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins)) for _, pl := range f.scorePlugins { pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes)) @@ -581,7 +569,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod } func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeName string) (int64, *Status) { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.Score(ctx, state, pod, nodeName) } startTime := time.Now() @@ -591,7 +579,7 @@ func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *C } func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeScoreList NodeScoreList) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList) } startTime := time.Now() @@ -604,12 +592,10 @@ func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state // failure (bool) if any of the plugins returns an error. It also returns an // error containing the rejection message or the error occurred in the plugin. func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(preBind, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(preBind, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.preBindPlugins { status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { @@ -622,7 +608,7 @@ func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, po } func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.PreBind(ctx, state, pod, nodeName) } startTime := time.Now() @@ -633,12 +619,10 @@ func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, stat // RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status. func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(bind, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(bind, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() if len(f.bindPlugins) == 0 { return NewStatus(Skip, "") } @@ -658,7 +642,7 @@ func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod * } func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return bp.Bind(ctx, state, pod, nodeName) } startTime := time.Now() @@ -669,19 +653,17 @@ func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *Cyc // RunPostBindPlugins runs the set of configured postbind plugins. func (f *framework) RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(postBind, nil, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(postBind, Success.String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.postBindPlugins { f.runPostBindPlugin(ctx, pl, state, pod, nodeName) } } func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { pl.PostBind(ctx, state, pod, nodeName) return } @@ -694,12 +676,10 @@ func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, st // plugins returns an error, it does not continue running the remaining ones and // returns the error. In such case, pod will not be scheduled. func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(reserve, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.reservePlugins { status = f.runReservePlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { @@ -712,7 +692,7 @@ func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, po } func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.Reserve(ctx, state, pod, nodeName) } startTime := time.Now() @@ -723,19 +703,17 @@ func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, stat // RunUnreservePlugins runs the set of configured unreserve plugins. func (f *framework) RunUnreservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(unreserve, nil, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(unreserve, Success.String()).Observe(metrics.SinceInSeconds(startTime)) + }() for _, pl := range f.unreservePlugins { f.runUnreservePlugin(ctx, pl, state, pod, nodeName) } } func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, state *CycleState, pod *v1.Pod, nodeName string) { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { pl.Unreserve(ctx, state, pod, nodeName) return } @@ -752,12 +730,10 @@ func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, // Note that if multiple plugins asked to wait, then we wait for the minimum // timeout duration. func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - if state.ShouldRecordFrameworkMetrics() { - startTime := time.Now() - defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(permit, status, metrics.SinceInSeconds(startTime)) - }() - } + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + }() pluginsWaitTime := make(map[string]time.Duration) statusCode := Success for _, pl := range f.permitPlugins { @@ -809,7 +785,7 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod } func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state *CycleState, pod *v1.Pod, nodeName string) (*Status, time.Duration) { - if !state.ShouldRecordFrameworkMetrics() { + if !state.ShouldRecordPluginMetrics() { return pl.Permit(ctx, state, pod, nodeName) } startTime := time.Now() diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index ac85daa0f7c..429394c913d 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -593,7 +593,6 @@ func TestPreFilterPlugins(t *testing.T) { t.Errorf("AddPod called %v, expected: 1", preFilter2.RemoveCalled) } }) - } func TestFilterPlugins(t *testing.T) { @@ -817,7 +816,9 @@ func TestFilterPlugins(t *testing.T) { } func TestRecordingMetrics(t *testing.T) { - state := &CycleState{recordFrameworkMetrics: true} + state := &CycleState{ + recordPluginMetrics: true, + } tests := []struct { name string action func(f Framework) diff --git a/pkg/scheduler/framework/v1alpha1/metrics_recorder.go b/pkg/scheduler/framework/v1alpha1/metrics_recorder.go index f6ae15f97ff..7751b0def37 100644 --- a/pkg/scheduler/framework/v1alpha1/metrics_recorder.go +++ b/pkg/scheduler/framework/v1alpha1/metrics_recorder.go @@ -60,21 +60,7 @@ func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder return recorder } -// observeExtensionPointDurationAsync observes the framework_extension_point_duration_seconds metric. -// The metric will be flushed to Prometheus asynchronously. -func (r *metricsRecorder) observeExtensionPointDurationAsync(extensionPoint string, status *Status, value float64) { - newMetric := &frameworkMetric{ - metric: metrics.FrameworkExtensionPointDuration, - labelValues: []string{extensionPoint, status.Code().String()}, - value: value, - } - select { - case r.bufferCh <- newMetric: - default: - } -} - -// observeExtensionPointDurationAsync observes the plugin_execution_duration_seconds metric. +// observePluginDurationAsync observes the plugin_execution_duration_seconds metric. // The metric will be flushed to Prometheus asynchronously. func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *Status, value float64) { newMetric := &frameworkMetric{ diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 6b3d833ab6a..a34ff4c967a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -54,8 +54,8 @@ const ( BindTimeoutSeconds = 100 // SchedulerError is the reason recorded for events when an error occurs during scheduling a pod. SchedulerError = "SchedulerError" - // Percentage of framework metrics to be sampled. - frameworkMetricsSamplePercent = 10 + // Percentage of plugin metrics to be sampled. + pluginMetricsSamplePercent = 10 ) // podConditionUpdater updates the condition of a pod based on the passed @@ -566,7 +566,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() - state.SetRecordFrameworkMetrics(rand.Intn(100) < frameworkMetricsSamplePercent) + state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)