diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 73759609cad..761b0df22ba 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -6,6 +6,7 @@ go_library( "cycle_state.go", "framework.go", "interface.go", + "metrics_recorder.go", "registry.go", "waiting_pods_map.go", ], @@ -25,6 +26,7 @@ go_library( "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//staging/src/k8s.io/component-base/metrics:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ], diff --git a/pkg/scheduler/framework/v1alpha1/cycle_state.go b/pkg/scheduler/framework/v1alpha1/cycle_state.go index f9eb99ad598..be62a81e4a9 100644 --- a/pkg/scheduler/framework/v1alpha1/cycle_state.go +++ b/pkg/scheduler/framework/v1alpha1/cycle_state.go @@ -44,6 +44,8 @@ type StateKey string type CycleState struct { mx sync.RWMutex storage map[StateKey]StateData + // if recordFrameworkMetrics is true, framework metrics will be recorded for this cycle. + recordFrameworkMetrics bool } // NewCycleState initializes a new CycleState and returns its pointer. @@ -53,6 +55,22 @@ func NewCycleState() *CycleState { } } +// ShouldRecordFrameworkMetrics returns whether framework metrics should be recorded. +func (c *CycleState) ShouldRecordFrameworkMetrics() bool { + if c == nil { + return false + } + return c.recordFrameworkMetrics +} + +// SetRecordFrameworkMetrics sets recordFrameworkMetrics to the given value. +func (c *CycleState) SetRecordFrameworkMetrics(flag bool) { + if c == nil { + return + } + c.recordFrameworkMetrics = flag +} + // Clone creates a copy of CycleState and returns its pointer. Clone returns // nil if the context being cloned is nil. func (c *CycleState) Clone() *CycleState { diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index fd63a131af6..7fd263befb0 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -46,6 +46,7 @@ const ( filter = "Filter" postFilter = "PostFilter" score = "Score" + scoreExtensionNormalize = "ScoreExtensionNormalize" preBind = "PreBind" bind = "Bind" postBind = "PostBind" @@ -75,6 +76,8 @@ type framework struct { clientSet clientset.Interface informerFactory informers.SharedInformerFactory + + metricsRecorder *metricsRecorder } // extensionPoint encapsulates desired and applied set of plugins at a specific extension @@ -108,6 +111,7 @@ type frameworkOptions struct { clientSet clientset.Interface informerFactory informers.SharedInformerFactory snapshotSharedLister schedulerlisters.SharedLister + metricsRecorder *metricsRecorder } // Option for the framework. @@ -134,7 +138,16 @@ func WithSnapshotSharedLister(snapshotSharedLister schedulerlisters.SharedLister } } -var defaultFrameworkOptions = frameworkOptions{} +// withMetricsRecorder is only used in tests. +func withMetricsRecorder(recorder *metricsRecorder) Option { + return func(o *frameworkOptions) { + o.metricsRecorder = recorder + } +} + +var defaultFrameworkOptions = frameworkOptions{ + metricsRecorder: newMetricsRecorder(1000, time.Second), +} var _ Framework = &framework{} @@ -152,6 +165,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi waitingPods: newWaitingPodsMap(), clientSet: options.clientSet, informerFactory: options.informerFactory, + metricsRecorder: options.metricsRecorder, } if plugins == nil { return f, nil @@ -254,12 +268,15 @@ func (f *framework) QueueSortFunc() LessFunc { // *Status and its code is set to non-success if any of the plugins returns // anything but Success. If a non-success status is returned, then the scheduling // cycle is aborted. -func (f *framework) RunPreFilterPlugins( - ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) { - startTime := time.Now() - defer func() { recordExtensionPointDuration(startTime, preFilter, status) }() +func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) { + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(preFilter, status, metrics.SinceInSeconds(startTime)) + }() + } for _, pl := range f.preFilterPlugins { - status := pl.PreFilter(ctx, state, pod) + status = f.runPreFilterPlugin(ctx, pl, state, pod) if !status.IsSuccess() { if status.IsUnschedulable() { msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message()) @@ -275,6 +292,16 @@ func (f *framework) RunPreFilterPlugins( return nil } +func (f *framework) runPreFilterPlugin(ctx context.Context, pl PreFilterPlugin, state *CycleState, pod *v1.Pod) *Status { + if !state.ShouldRecordFrameworkMetrics() { + return pl.PreFilter(ctx, state, pod) + } + startTime := time.Now() + status := pl.PreFilter(ctx, state, pod) + f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime)) + return status +} + // RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured // PreFilter plugins. It returns directly if any of the plugins return any // status other than Success. @@ -285,13 +312,18 @@ func (f *framework) RunPreFilterExtensionAddPod( podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ) (status *Status) { - startTime := time.Now() - defer func() { recordExtensionPointDuration(startTime, preFilterExtensionAddPod, status) }() + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionAddPod, status, metrics.SinceInSeconds(startTime)) + }() + } for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil { continue } - if status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() { + status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podToAdd, nodeInfo) + if !status.IsSuccess() { msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v", pl.Name(), podToSchedule.Name, status.Message()) klog.Error(msg) @@ -302,6 +334,16 @@ func (f *framework) RunPreFilterExtensionAddPod( return nil } +func (f *framework) runPreFilterExtensionAddPod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + if !state.ShouldRecordFrameworkMetrics() { + return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo) + } + startTime := time.Now() + status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo) + f.metricsRecorder.observePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status, metrics.SinceInSeconds(startTime)) + return status +} + // RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured // PreFilter plugins. It returns directly if any of the plugins return any // status other than Success. @@ -312,13 +354,18 @@ func (f *framework) RunPreFilterExtensionRemovePod( podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ) (status *Status) { - startTime := time.Now() - defer func() { recordExtensionPointDuration(startTime, preFilterExtensionRemovePod, status) }() + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionRemovePod, status, metrics.SinceInSeconds(startTime)) + }() + } for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil { continue } - if status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() { + status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podToRemove, nodeInfo) + if !status.IsSuccess() { msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v", pl.Name(), podToSchedule.Name, status.Message()) klog.Error(msg) @@ -329,6 +376,16 @@ func (f *framework) RunPreFilterExtensionRemovePod( return nil } +func (f *framework) runPreFilterExtensionRemovePod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + if !state.ShouldRecordFrameworkMetrics() { + return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo) + } + startTime := time.Now() + status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo) + f.metricsRecorder.observePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status, metrics.SinceInSeconds(startTime)) + return status +} + // RunFilterPlugins runs the set of configured Filter plugins for pod on // the given node. If any of these plugins doesn't return "Success", the // given node is not suitable for running pod. @@ -339,10 +396,14 @@ func (f *framework) RunFilterPlugins( pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ) (status *Status) { - startTime := time.Now() - defer func() { recordExtensionPointDuration(startTime, filter, status) }() + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(filter, status, metrics.SinceInSeconds(startTime)) + }() + } for _, pl := range f.filterPlugins { - status := pl.Filter(ctx, state, pod, nodeInfo) + status = f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) if !status.IsSuccess() { if !status.IsUnschedulable() { errMsg := fmt.Sprintf("error while running %q filter plugin for pod %q: %v", @@ -357,6 +418,16 @@ func (f *framework) RunFilterPlugins( return nil } +func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + if !state.ShouldRecordFrameworkMetrics() { + return pl.Filter(ctx, state, pod, nodeInfo) + } + startTime := time.Now() + status := pl.Filter(ctx, state, pod, nodeInfo) + f.metricsRecorder.observePluginDurationAsync(filter, pl.Name(), status, metrics.SinceInSeconds(startTime)) + return status +} + // RunPostFilterPlugins runs the set of configured post-filter plugins. If any // of these plugins returns any status other than "Success", the given node is // rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses. @@ -367,10 +438,14 @@ func (f *framework) RunPostFilterPlugins( nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap, ) (status *Status) { - startTime := time.Now() - defer func() { recordExtensionPointDuration(startTime, postFilter, status) }() + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(postFilter, status, metrics.SinceInSeconds(startTime)) + }() + } for _, pl := range f.postFilterPlugins { - status := pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses) + status = f.runPostFilterPlugin(ctx, pl, state, pod, nodes, filteredNodesStatuses) if !status.IsSuccess() { msg := fmt.Sprintf("error while running %q postfilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) @@ -381,13 +456,27 @@ func (f *framework) RunPostFilterPlugins( return nil } +func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status { + if !state.ShouldRecordFrameworkMetrics() { + return pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses) + } + startTime := time.Now() + status := pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses) + f.metricsRecorder.observePluginDurationAsync(postFilter, pl.Name(), status, metrics.SinceInSeconds(startTime)) + return status +} + // RunScorePlugins runs the set of configured scoring plugins. It returns a list that // stores for each scoring plugin name the corresponding NodeScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns // a non-success status. func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) { - startTime := time.Now() - defer func() { recordExtensionPointDuration(startTime, score, status) }() + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(score, status, metrics.SinceInSeconds(startTime)) + }() + } pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins)) for _, pl := range f.scorePlugins { pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes)) @@ -399,14 +488,14 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) { for _, pl := range f.scorePlugins { nodeName := nodes[index].Name - score, status := pl.Score(ctx, state, pod, nodeName) + s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel) return } pluginToNodeScores[pl.Name()][index] = NodeScore{ Name: nodeName, - Score: int64(score), + Score: int64(s), } } }) @@ -423,7 +512,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod if pl.ScoreExtensions() == nil { return } - status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList) + status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList) if !status.IsSuccess() { err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message()) errCh.SendErrorWithCancel(err, cancel) @@ -462,15 +551,38 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod return pluginToNodeScores, nil } +func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeName string) (int64, *Status) { + if !state.ShouldRecordFrameworkMetrics() { + return pl.Score(ctx, state, pod, nodeName) + } + startTime := time.Now() + s, status := pl.Score(ctx, state, pod, nodeName) + f.metricsRecorder.observePluginDurationAsync(score, pl.Name(), status, metrics.SinceInSeconds(startTime)) + return s, status +} + +func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeScoreList NodeScoreList) *Status { + if !state.ShouldRecordFrameworkMetrics() { + return pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList) + } + startTime := time.Now() + status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList) + f.metricsRecorder.observePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status, metrics.SinceInSeconds(startTime)) + return status +} + // RunPreBindPlugins runs the set of configured prebind plugins. It returns a // failure (bool) if any of the plugins returns an error. It also returns an // error containing the rejection message or the error occurred in the plugin. -func (f *framework) RunPreBindPlugins( - ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - startTime := time.Now() - defer func() { recordExtensionPointDuration(startTime, preBind, status) }() +func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(preBind, status, metrics.SinceInSeconds(startTime)) + }() + } for _, pl := range f.preBindPlugins { - status := pl.PreBind(ctx, state, pod, nodeName) + status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { msg := fmt.Sprintf("error while running %q prebind plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) @@ -480,15 +592,29 @@ func (f *framework) RunPreBindPlugins( return nil } +func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status { + if !state.ShouldRecordFrameworkMetrics() { + return pl.PreBind(ctx, state, pod, nodeName) + } + startTime := time.Now() + status := pl.PreBind(ctx, state, pod, nodeName) + f.metricsRecorder.observePluginDurationAsync(preBind, pl.Name(), status, metrics.SinceInSeconds(startTime)) + return status +} + // RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status. func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - startTime := time.Now() - defer func() { recordExtensionPointDuration(startTime, bind, status) }() + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(bind, status, metrics.SinceInSeconds(startTime)) + }() + } if len(f.bindPlugins) == 0 { return NewStatus(Skip, "") } for _, bp := range f.bindPlugins { - status = bp.Bind(ctx, state, pod, nodeName) + status = f.runBindPlugin(ctx, bp, state, pod, nodeName) if status != nil && status.Code() == Skip { continue } @@ -502,25 +628,51 @@ func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod * return status } -// RunPostBindPlugins runs the set of configured postbind plugins. -func (f *framework) RunPostBindPlugins( - ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) { - startTime := time.Now() - defer recordExtensionPointDuration(startTime, postBind, nil) - for _, pl := range f.postBindPlugins { - pl.PostBind(ctx, state, pod, nodeName) +func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status { + if !state.ShouldRecordFrameworkMetrics() { + return bp.Bind(ctx, state, pod, nodeName) } + startTime := time.Now() + status := bp.Bind(ctx, state, pod, nodeName) + f.metricsRecorder.observePluginDurationAsync(bind, bp.Name(), status, metrics.SinceInSeconds(startTime)) + return status +} + +// RunPostBindPlugins runs the set of configured postbind plugins. +func (f *framework) RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) { + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(postBind, nil, metrics.SinceInSeconds(startTime)) + }() + } + for _, pl := range f.postBindPlugins { + f.runPostBindPlugin(ctx, pl, state, pod, nodeName) + } +} + +func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) { + if !state.ShouldRecordFrameworkMetrics() { + pl.PostBind(ctx, state, pod, nodeName) + return + } + startTime := time.Now() + pl.PostBind(ctx, state, pod, nodeName) + f.metricsRecorder.observePluginDurationAsync(postBind, pl.Name(), nil, metrics.SinceInSeconds(startTime)) } // RunReservePlugins runs the set of configured reserve plugins. If any of these // plugins returns an error, it does not continue running the remaining ones and // returns the error. In such case, pod will not be scheduled. -func (f *framework) RunReservePlugins( - ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - startTime := time.Now() - defer func() { recordExtensionPointDuration(startTime, reserve, status) }() +func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(reserve, status, metrics.SinceInSeconds(startTime)) + }() + } for _, pl := range f.reservePlugins { - status := pl.Reserve(ctx, state, pod, nodeName) + status = f.runReservePlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { msg := fmt.Sprintf("error while running %q reserve plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) @@ -530,14 +682,37 @@ func (f *framework) RunReservePlugins( return nil } -// RunUnreservePlugins runs the set of configured unreserve plugins. -func (f *framework) RunUnreservePlugins( - ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) { - startTime := time.Now() - defer recordExtensionPointDuration(startTime, unreserve, nil) - for _, pl := range f.unreservePlugins { - pl.Unreserve(ctx, state, pod, nodeName) +func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status { + if !state.ShouldRecordFrameworkMetrics() { + return pl.Reserve(ctx, state, pod, nodeName) } + startTime := time.Now() + status := pl.Reserve(ctx, state, pod, nodeName) + f.metricsRecorder.observePluginDurationAsync(reserve, pl.Name(), status, metrics.SinceInSeconds(startTime)) + return status +} + +// RunUnreservePlugins runs the set of configured unreserve plugins. +func (f *framework) RunUnreservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) { + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(unreserve, nil, metrics.SinceInSeconds(startTime)) + }() + } + for _, pl := range f.unreservePlugins { + f.runUnreservePlugin(ctx, pl, state, pod, nodeName) + } +} + +func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, state *CycleState, pod *v1.Pod, nodeName string) { + if !state.ShouldRecordFrameworkMetrics() { + pl.Unreserve(ctx, state, pod, nodeName) + return + } + startTime := time.Now() + pl.Unreserve(ctx, state, pod, nodeName) + f.metricsRecorder.observePluginDurationAsync(unreserve, pl.Name(), nil, metrics.SinceInSeconds(startTime)) } // RunPermitPlugins runs the set of configured permit plugins. If any of these @@ -547,14 +722,17 @@ func (f *framework) RunUnreservePlugins( // returned by the plugin, if the time expires, then it will return an error. // Note that if multiple plugins asked to wait, then we wait for the minimum // timeout duration. -func (f *framework) RunPermitPlugins( - ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { - startTime := time.Now() - defer func() { recordExtensionPointDuration(startTime, permit, status) }() +func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { + if state.ShouldRecordFrameworkMetrics() { + startTime := time.Now() + defer func() { + f.metricsRecorder.observeExtensionPointDurationAsync(permit, status, metrics.SinceInSeconds(startTime)) + }() + } pluginsWaitTime := make(map[string]time.Duration) statusCode := Success for _, pl := range f.permitPlugins { - status, timeout := pl.Permit(ctx, state, pod, nodeName) + status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { if status.IsUnschedulable() { msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message()) @@ -601,6 +779,16 @@ func (f *framework) RunPermitPlugins( return nil } +func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state *CycleState, pod *v1.Pod, nodeName string) (*Status, time.Duration) { + if !state.ShouldRecordFrameworkMetrics() { + return pl.Permit(ctx, state, pod, nodeName) + } + startTime := time.Now() + status, timeout := pl.Permit(ctx, state, pod, nodeName) + f.metricsRecorder.observePluginDurationAsync(permit, pl.Name(), status, metrics.SinceInSeconds(startTime)) + return status, timeout +} + // SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo // snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains // unchanged until a pod finishes "Reserve". There is no guarantee that the information @@ -695,11 +883,3 @@ func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plu } return pgMap } - -func recordExtensionPointDuration(start time.Time, extensionPoint string, status *Status) { - statusCode := Success.String() - if status != nil { - statusCode = status.Code().String() - } - metrics.FrameworkExtensionPointDuration.WithLabelValues(extensionPoint, statusCode).Observe(metrics.SinceInSeconds(start)) -} diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index 7cf824f00b8..177c20daf55 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -133,6 +133,17 @@ type TestPlugin struct { inj injectedResult } +type TestPluginPreFilterExtension struct { + inj injectedResult +} + +func (e *TestPluginPreFilterExtension) AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + return NewStatus(Code(e.inj.PreFilterAddPodStatus), "injected status") +} +func (e *TestPluginPreFilterExtension) RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + return NewStatus(Code(e.inj.PreFilterRemovePodStatus), "injected status") +} + func (pl *TestPlugin) Name() string { return pl.name } @@ -150,7 +161,7 @@ func (pl *TestPlugin) PreFilter(ctx context.Context, state *CycleState, p *v1.Po } func (pl *TestPlugin) PreFilterExtensions() PreFilterExtensions { - return nil + return &TestPluginPreFilterExtension{inj: pl.inj} } func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { @@ -586,6 +597,7 @@ func TestPreFilterPlugins(t *testing.T) { } func TestRecordingMetrics(t *testing.T) { + state := &CycleState{recordFrameworkMetrics: true} tests := []struct { name string action func(f Framework) @@ -595,117 +607,149 @@ func TestRecordingMetrics(t *testing.T) { }{ { name: "PreFilter - Success", - action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), nil, pod) }, + action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), state, pod) }, + wantExtensionPoint: "PreFilter", + wantStatus: Success, + }, + { + name: "PreFilterAddPod - Success", + action: func(f Framework) { f.RunPreFilterExtensionAddPod(context.Background(), state, pod, nil, nil) }, + wantExtensionPoint: "PreFilterExtensionAddPod", + wantStatus: Success, + }, + { + name: "PreFilterRemovePod - Success", + action: func(f Framework) { f.RunPreFilterExtensionRemovePod(context.Background(), state, pod, nil, nil) }, + wantExtensionPoint: "PreFilterExtensionRemovePod", + wantStatus: Success, + }, + { + name: "PreFilterRemovePod - Success", + action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), state, pod) }, wantExtensionPoint: "PreFilter", wantStatus: Success, }, { name: "Filter - Success", - action: func(f Framework) { f.RunFilterPlugins(context.Background(), nil, pod, nil) }, + action: func(f Framework) { f.RunFilterPlugins(context.Background(), state, pod, nil) }, wantExtensionPoint: "Filter", wantStatus: Success, }, { name: "PostFilter - Success", - action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), nil, pod, nil, nil) }, + action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), state, pod, nil, nil) }, wantExtensionPoint: "PostFilter", wantStatus: Success, }, { name: "Score - Success", - action: func(f Framework) { f.RunScorePlugins(context.Background(), nil, pod, nodes) }, + action: func(f Framework) { f.RunScorePlugins(context.Background(), state, pod, nodes) }, wantExtensionPoint: "Score", wantStatus: Success, }, { name: "Reserve - Success", - action: func(f Framework) { f.RunReservePlugins(context.Background(), nil, pod, "") }, + action: func(f Framework) { f.RunReservePlugins(context.Background(), state, pod, "") }, wantExtensionPoint: "Reserve", wantStatus: Success, }, { name: "Unreserve - Success", - action: func(f Framework) { f.RunUnreservePlugins(context.Background(), nil, pod, "") }, + action: func(f Framework) { f.RunUnreservePlugins(context.Background(), state, pod, "") }, wantExtensionPoint: "Unreserve", wantStatus: Success, }, { name: "PreBind - Success", - action: func(f Framework) { f.RunPreBindPlugins(context.Background(), nil, pod, "") }, + action: func(f Framework) { f.RunPreBindPlugins(context.Background(), state, pod, "") }, wantExtensionPoint: "PreBind", wantStatus: Success, }, { name: "Bind - Success", - action: func(f Framework) { f.RunBindPlugins(context.Background(), nil, pod, "") }, + action: func(f Framework) { f.RunBindPlugins(context.Background(), state, pod, "") }, wantExtensionPoint: "Bind", wantStatus: Success, }, { name: "PostBind - Success", - action: func(f Framework) { f.RunPostBindPlugins(context.Background(), nil, pod, "") }, + action: func(f Framework) { f.RunPostBindPlugins(context.Background(), state, pod, "") }, wantExtensionPoint: "PostBind", wantStatus: Success, }, { name: "Permit - Success", - action: func(f Framework) { f.RunPermitPlugins(context.Background(), nil, pod, "") }, + action: func(f Framework) { f.RunPermitPlugins(context.Background(), state, pod, "") }, wantExtensionPoint: "Permit", wantStatus: Success, }, { name: "PreFilter - Error", - action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), nil, pod) }, + action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), state, pod) }, inject: injectedResult{PreFilterStatus: int(Error)}, wantExtensionPoint: "PreFilter", wantStatus: Error, }, + { + name: "PreFilterAddPod - Error", + action: func(f Framework) { f.RunPreFilterExtensionAddPod(context.Background(), state, pod, nil, nil) }, + inject: injectedResult{PreFilterAddPodStatus: int(Error)}, + wantExtensionPoint: "PreFilterExtensionAddPod", + wantStatus: Error, + }, + { + name: "PreFilterRemovePod - Error", + action: func(f Framework) { f.RunPreFilterExtensionRemovePod(context.Background(), state, pod, nil, nil) }, + inject: injectedResult{PreFilterRemovePodStatus: int(Error)}, + wantExtensionPoint: "PreFilterExtensionRemovePod", + wantStatus: Error, + }, { name: "Filter - Error", - action: func(f Framework) { f.RunFilterPlugins(context.Background(), nil, pod, nil) }, + action: func(f Framework) { f.RunFilterPlugins(context.Background(), state, pod, nil) }, inject: injectedResult{FilterStatus: int(Error)}, wantExtensionPoint: "Filter", wantStatus: Error, }, { name: "PostFilter - Error", - action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), nil, pod, nil, nil) }, + action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), state, pod, nil, nil) }, inject: injectedResult{PostFilterStatus: int(Error)}, wantExtensionPoint: "PostFilter", wantStatus: Error, }, { name: "Score - Error", - action: func(f Framework) { f.RunScorePlugins(context.Background(), nil, pod, nodes) }, + action: func(f Framework) { f.RunScorePlugins(context.Background(), state, pod, nodes) }, inject: injectedResult{ScoreStatus: int(Error)}, wantExtensionPoint: "Score", wantStatus: Error, }, { name: "Reserve - Error", - action: func(f Framework) { f.RunReservePlugins(context.Background(), nil, pod, "") }, + action: func(f Framework) { f.RunReservePlugins(context.Background(), state, pod, "") }, inject: injectedResult{ReserveStatus: int(Error)}, wantExtensionPoint: "Reserve", wantStatus: Error, }, { name: "PreBind - Error", - action: func(f Framework) { f.RunPreBindPlugins(context.Background(), nil, pod, "") }, + action: func(f Framework) { f.RunPreBindPlugins(context.Background(), state, pod, "") }, inject: injectedResult{PreBindStatus: int(Error)}, wantExtensionPoint: "PreBind", wantStatus: Error, }, { name: "Bind - Error", - action: func(f Framework) { f.RunBindPlugins(context.Background(), nil, pod, "") }, + action: func(f Framework) { f.RunBindPlugins(context.Background(), state, pod, "") }, inject: injectedResult{BindStatus: int(Error)}, wantExtensionPoint: "Bind", wantStatus: Error, }, { name: "Permit - Error", - action: func(f Framework) { f.RunPermitPlugins(context.Background(), nil, pod, "") }, + action: func(f Framework) { f.RunPermitPlugins(context.Background(), state, pod, "") }, inject: injectedResult{PermitStatus: int(Error)}, wantExtensionPoint: "Permit", wantStatus: Error, @@ -714,6 +758,10 @@ func TestRecordingMetrics(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + metrics.Register() + metrics.FrameworkExtensionPointDuration.Reset() + metrics.PluginExecutionDuration.Reset() + plugin := &TestPlugin{name: testPlugin, inj: tt.inject} r := make(Registry) r.Register(testPlugin, @@ -733,16 +781,22 @@ func TestRecordingMetrics(t *testing.T) { PostBind: pluginSet, Unreserve: pluginSet, } - f, err := NewFramework(r, plugins, emptyArgs) + recorder := newMetricsRecorder(100, time.Nanosecond) + f, err := NewFramework(r, plugins, emptyArgs, withMetricsRecorder(recorder)) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } - metrics.Register() - metrics.FrameworkExtensionPointDuration.Reset() tt.action(f) + // Stop the goroutine which records metrics and ensure it's stopped. + close(recorder.stopCh) + <-recorder.isStoppedCh + // Try to clean up the metrics buffer again in case it's not empty. + recorder.flushMetrics() + collectAndCompareFrameworkMetrics(t, tt.wantExtensionPoint, tt.wantStatus) + collectAndComparePluginMetrics(t, tt.wantExtensionPoint, testPlugin, tt.wantStatus) }) } } @@ -765,6 +819,9 @@ func TestPermitWaitingMetric(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + metrics.Register() + metrics.PermitWaitDuration.Reset() + plugin := &TestPlugin{name: testPlugin, inj: tt.inject} r := make(Registry) r.Register(testPlugin, @@ -778,8 +835,6 @@ func TestPermitWaitingMetric(t *testing.T) { if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } - metrics.Register() - metrics.PermitWaitDuration.Reset() f.RunPermitPlugins(context.TODO(), nil, pod, "") @@ -839,17 +894,19 @@ func buildScoreConfigWithWeights(weights map[string]int32, ps ...string) *config } type injectedResult struct { - ScoreRes int64 `json:"scoreRes,omitempty"` - NormalizeRes int64 `json:"normalizeRes,omitempty"` - ScoreStatus int `json:"scoreStatus,omitempty"` - NormalizeStatus int `json:"normalizeStatus,omitempty"` - PreFilterStatus int `json:"preFilterStatus,omitempty"` - FilterStatus int `json:"filterStatus,omitempty"` - PostFilterStatus int `json:"postFilterStatus,omitempty"` - ReserveStatus int `json:"reserveStatus,omitempty"` - PreBindStatus int `json:"preBindStatus,omitempty"` - BindStatus int `json:"bindStatus,omitempty"` - PermitStatus int `json:"permitStatus,omitempty"` + ScoreRes int64 `json:"scoreRes,omitempty"` + NormalizeRes int64 `json:"normalizeRes,omitempty"` + ScoreStatus int `json:"scoreStatus,omitempty"` + NormalizeStatus int `json:"normalizeStatus,omitempty"` + PreFilterStatus int `json:"preFilterStatus,omitempty"` + PreFilterAddPodStatus int `json:"preFilterAddPodStatus,omitempty"` + PreFilterRemovePodStatus int `json:"preFilterRemovePodStatus,omitempty"` + FilterStatus int `json:"filterStatus,omitempty"` + PostFilterStatus int `json:"postFilterStatus,omitempty"` + ReserveStatus int `json:"reserveStatus,omitempty"` + PreBindStatus int `json:"preBindStatus,omitempty"` + BindStatus int `json:"bindStatus,omitempty"` + PermitStatus int `json:"permitStatus,omitempty"` } func setScoreRes(inj injectedResult) (int64, *Status) { @@ -869,6 +926,33 @@ func injectNormalizeRes(inj injectedResult, scores NodeScoreList) *Status { return nil } +func collectAndComparePluginMetrics(t *testing.T, wantExtensionPoint, wantPlugin string, wantStatus Code) { + m := collectHistogramMetric(metrics.PluginExecutionDuration) + if len(m.Label) != 3 { + t.Fatalf("Unexpected number of label pairs, got: %v, want: 2", len(m.Label)) + } + + if *m.Label[0].Value != wantExtensionPoint { + t.Errorf("Unexpected extension point label, got: %q, want %q", *m.Label[0].Value, wantExtensionPoint) + } + + if *m.Label[1].Value != wantPlugin { + t.Errorf("Unexpected plugin label, got: %q, want %q", *m.Label[1].Value, wantPlugin) + } + + if *m.Label[2].Value != wantStatus.String() { + t.Errorf("Unexpected status code label, got: %q, want %q", *m.Label[2].Value, wantStatus) + } + + if *m.Histogram.SampleCount == 0 { + t.Error("Expect at least 1 sample") + } + + if *m.Histogram.SampleSum <= 0 { + t.Errorf("Expect latency to be greater than 0, got: %v", *m.Histogram.SampleSum) + } +} + func collectAndCompareFrameworkMetrics(t *testing.T, wantExtensionPoint string, wantStatus Code) { m := collectHistogramMetric(metrics.FrameworkExtensionPointDuration) @@ -885,11 +969,11 @@ func collectAndCompareFrameworkMetrics(t *testing.T, wantExtensionPoint string, } if *m.Histogram.SampleCount != 1 { - t.Errorf("Expect 1 sample, got: %v", m.Histogram.SampleCount) + t.Errorf("Expect 1 sample, got: %v", *m.Histogram.SampleCount) } if *m.Histogram.SampleSum <= 0 { - t.Errorf("Expect latency to be greater than 0, got: %v", m.Histogram.SampleSum) + t.Errorf("Expect latency to be greater than 0, got: %v", *m.Histogram.SampleSum) } } @@ -911,17 +995,17 @@ func collectAndComparePermitWaitDuration(t *testing.T, wantRes string) { } if *m.Histogram.SampleCount != 1 { - t.Errorf("Expect 1 sample, got: %v", m.Histogram.SampleCount) + t.Errorf("Expect 1 sample, got: %v", *m.Histogram.SampleCount) } if *m.Histogram.SampleSum <= 0 { - t.Errorf("Expect latency to be greater than 0, got: %v", m.Histogram.SampleSum) + t.Errorf("Expect latency to be greater than 0, got: %v", *m.Histogram.SampleSum) } } } func collectHistogramMetric(metric prometheus.Collector) *dto.Metric { - ch := make(chan prometheus.Metric, 1) + ch := make(chan prometheus.Metric, 100) metric.Collect(ch) select { case got := <-ch: diff --git a/pkg/scheduler/framework/v1alpha1/interface_test.go b/pkg/scheduler/framework/v1alpha1/interface_test.go index 466f2ceb068..0a9ed22571d 100644 --- a/pkg/scheduler/framework/v1alpha1/interface_test.go +++ b/pkg/scheduler/framework/v1alpha1/interface_test.go @@ -74,3 +74,19 @@ func TestStatus(t *testing.T) { } } } + +// The String() method relies on the value and order of the status codes to function properly. +func TestStatusCodes(t *testing.T) { + assertStatusCode(t, Success, 0) + assertStatusCode(t, Error, 1) + assertStatusCode(t, Unschedulable, 2) + assertStatusCode(t, UnschedulableAndUnresolvable, 3) + assertStatusCode(t, Wait, 4) + assertStatusCode(t, Skip, 5) +} + +func assertStatusCode(t *testing.T, code Code, value int) { + if int(code) != value { + t.Errorf("Status code %q should have a value of %v but got %v", code.String(), value, int(code)) + } +} diff --git a/pkg/scheduler/framework/v1alpha1/metrics_recorder.go b/pkg/scheduler/framework/v1alpha1/metrics_recorder.go new file mode 100644 index 00000000000..f6ae15f97ff --- /dev/null +++ b/pkg/scheduler/framework/v1alpha1/metrics_recorder.go @@ -0,0 +1,115 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "time" + + k8smetrics "k8s.io/component-base/metrics" + "k8s.io/kubernetes/pkg/scheduler/metrics" +) + +// frameworkMetric is the data structure passed in the buffer channel between the main framework thread +// and the metricsRecorder goroutine. +type frameworkMetric struct { + metric *k8smetrics.HistogramVec + labelValues []string + value float64 +} + +// metricRecorder records framework metrics in a separate goroutine to avoid overhead in the critical path. +type metricsRecorder struct { + // bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it. + bufferCh chan *frameworkMetric + // if bufferSize is reached, incoming metrics will be discarded. + bufferSize int + // how often the recorder runs to flush the metrics. + interval time.Duration + + // stopCh is used to stop the goroutine which periodically flushes metrics. It's currently only + // used in tests. + stopCh chan struct{} + // isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure + // the metric flushing goroutine is stopped so that tests can collect metrics for verification. + isStoppedCh chan struct{} +} + +func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder { + recorder := &metricsRecorder{ + bufferCh: make(chan *frameworkMetric, bufferSize), + bufferSize: bufferSize, + interval: interval, + stopCh: make(chan struct{}), + isStoppedCh: make(chan struct{}), + } + go recorder.run() + return recorder +} + +// observeExtensionPointDurationAsync observes the framework_extension_point_duration_seconds metric. +// The metric will be flushed to Prometheus asynchronously. +func (r *metricsRecorder) observeExtensionPointDurationAsync(extensionPoint string, status *Status, value float64) { + newMetric := &frameworkMetric{ + metric: metrics.FrameworkExtensionPointDuration, + labelValues: []string{extensionPoint, status.Code().String()}, + value: value, + } + select { + case r.bufferCh <- newMetric: + default: + } +} + +// observeExtensionPointDurationAsync observes the plugin_execution_duration_seconds metric. +// The metric will be flushed to Prometheus asynchronously. +func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *Status, value float64) { + newMetric := &frameworkMetric{ + metric: metrics.PluginExecutionDuration, + labelValues: []string{pluginName, extensionPoint, status.Code().String()}, + value: value, + } + select { + case r.bufferCh <- newMetric: + default: + } +} + +// run flushes buffered metrics into Prometheus every second. +func (r *metricsRecorder) run() { + for { + select { + case <-r.stopCh: + close(r.isStoppedCh) + return + default: + } + r.flushMetrics() + time.Sleep(r.interval) + } +} + +// flushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics. +func (r *metricsRecorder) flushMetrics() { + for i := 0; i < r.bufferSize; i++ { + select { + case m := <-r.bufferCh: + m.metric.WithLabelValues(m.labelValues...).Observe(m.value) + default: + return + } + } +} diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index d37de61909f..8f3d88e0179 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -254,6 +254,16 @@ var ( }, []string{"extension_point", "status"}) + PluginExecutionDuration = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "plugin_execution_duration_seconds", + Help: "Duration for running a plugin at a specific extension point.", + Buckets: nil, + StabilityLevel: metrics.ALPHA, + }, + []string{"plugin", "extension_point", "status"}) + SchedulerQueueIncomingPods = metrics.NewCounterVec( &metrics.CounterOpts{ Subsystem: SchedulerSubsystem, @@ -302,6 +312,7 @@ var ( PodSchedulingDuration, PodSchedulingAttempts, FrameworkExtensionPointDuration, + PluginExecutionDuration, SchedulerQueueIncomingPods, SchedulerGoroutines, PermitWaitDuration, diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ebc2833bcf5..73784718d13 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io/ioutil" + "math/rand" "os" "time" @@ -55,6 +56,8 @@ const ( BindTimeoutSeconds = 100 // SchedulerError is the reason recorded for events when an error occurs during scheduling a pod. SchedulerError = "SchedulerError" + // Percentage of framework metrics to be sampled. + frameworkMetricsSamplePercent = 10 ) // podConditionUpdater updates the condition of a pod based on the passed @@ -597,6 +600,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() + state.SetRecordFrameworkMetrics(rand.Intn(100) < frameworkMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)