Merge pull request #116312 from sanposhiho/move-label

cleanup(scheduler): move metric labels to metrics package
This commit is contained in:
Kubernetes Prow Robot 2023-03-12 10:48:40 -07:00 committed by GitHub
commit e413e6a59c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 52 deletions

View File

@ -40,23 +40,8 @@ import (
)
const (
// Filter is the name of the filter extension point.
Filter = "Filter"
// Specifies the maximum timeout a permit plugin can return.
maxTimeout = 15 * time.Minute
preFilter = "PreFilter"
preFilterExtensionAddPod = "PreFilterExtensionAddPod"
preFilterExtensionRemovePod = "PreFilterExtensionRemovePod"
postFilter = "PostFilter"
preScore = "PreScore"
score = "Score"
scoreExtensionNormalize = "ScoreExtensionNormalize"
preBind = "PreBind"
bind = "Bind"
postBind = "PostBind"
reserve = "Reserve"
unreserve = "Unreserve"
permit = "Permit"
maxTimeout = 15 * time.Minute
)
var allClusterEvents = []framework.ClusterEvent{
@ -610,7 +595,7 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
var result *framework.PreFilterResult
var pluginsWithNodes []string
@ -621,7 +606,7 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
skipPlugins.Insert(pl.Name())
continue
}
metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), preFilter, f.profileName).Inc()
metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.PreFilter, f.profileName).Inc()
if !s.IsSuccess() {
s.SetFailedPlugin(pl.Name())
if s.IsUnschedulable() {
@ -651,7 +636,7 @@ func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.Pre
}
startTime := time.Now()
result, status := pl.PreFilter(ctx, state, pod)
f.metricsRecorder.ObservePluginDurationAsync(preFilter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return result, status
}
@ -686,7 +671,7 @@ func (f *frameworkImpl) runPreFilterExtensionAddPod(ctx context.Context, pl fram
}
startTime := time.Now()
status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo)
f.metricsRecorder.ObservePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilterExtensionAddPod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -721,7 +706,7 @@ func (f *frameworkImpl) runPreFilterExtensionRemovePod(ctx context.Context, pl f
}
startTime := time.Now()
status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo)
f.metricsRecorder.ObservePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilterExtensionRemovePod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -739,7 +724,7 @@ func (f *frameworkImpl) RunFilterPlugins(
if state.SkipFilterPlugins.Has(pl.Name()) {
continue
}
metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), Filter, f.profileName).Inc()
metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.Filter, f.profileName).Inc()
if status := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo); !status.IsSuccess() {
if !status.IsUnschedulable() {
// Filter plugins are not supposed to return any status other than
@ -760,7 +745,7 @@ func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.Filter
}
startTime := time.Now()
status := pl.Filter(ctx, state, pod, nodeInfo)
f.metricsRecorder.ObservePluginDurationAsync(Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -769,7 +754,7 @@ func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.Filter
func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (_ *framework.PostFilterResult, status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(postFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
// `result` records the last meaningful(non-noop) PostFilterResult.
@ -806,7 +791,7 @@ func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.Po
}
startTime := time.Now()
r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
f.metricsRecorder.ObservePluginDurationAsync(postFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.PostFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime))
return r, s
}
@ -904,7 +889,7 @@ func (f *frameworkImpl) RunPreScorePlugins(
) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
skipPlugins := sets.New[string]()
for _, pl := range f.preScorePlugins {
@ -927,7 +912,7 @@ func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreS
}
startTime := time.Now()
status := pl.PreScore(ctx, state, pod, nodes)
f.metricsRecorder.ObservePluginDurationAsync(preScore, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.PreScore, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -938,7 +923,7 @@ func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreS
func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ns []framework.NodePluginScores, status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
allNodePluginScores := make([]framework.NodePluginScores, len(nodes))
numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len()
@ -971,7 +956,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
Score: s,
}
}
}, score)
}, metrics.Score)
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
}
@ -990,7 +975,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
errCh.SendErrorWithCancel(err, cancel)
return
}
}, score)
}, metrics.Score)
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
}
@ -1021,7 +1006,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
nodePluginScores.TotalScore += weightedScore
}
allNodePluginScores[index] = nodePluginScores
}, score)
}, metrics.Score)
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err))
}
@ -1035,7 +1020,7 @@ func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePl
}
startTime := time.Now()
s, status := pl.Score(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.Score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return s, status
}
@ -1045,7 +1030,7 @@ func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.Scor
}
startTime := time.Now()
status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
f.metricsRecorder.ObservePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.ScoreExtensionNormalize, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -1055,7 +1040,7 @@ func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.Scor
func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preBind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.preBindPlugins {
status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
@ -1079,7 +1064,7 @@ func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBi
}
startTime := time.Now()
status := pl.PreBind(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(preBind, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.PreBind, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -1087,7 +1072,7 @@ func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBi
func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(bind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Bind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
if len(f.bindPlugins) == 0 {
return framework.NewStatus(framework.Skip, "")
@ -1118,7 +1103,7 @@ func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlug
}
startTime := time.Now()
status := bp.Bind(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(bind, bp.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.Bind, bp.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -1126,7 +1111,7 @@ func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlug
func (f *frameworkImpl) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(postBind, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostBind, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.postBindPlugins {
f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
@ -1140,7 +1125,7 @@ func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.Post
}
startTime := time.Now()
pl.PostBind(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(postBind, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.PostBind, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
}
// RunReservePluginsReserve runs the Reserve method in the set of configured
@ -1151,7 +1136,7 @@ func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.Post
func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.reservePlugins {
status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
@ -1170,7 +1155,7 @@ func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framewor
}
startTime := time.Now()
status := pl.Reserve(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.Reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
@ -1179,7 +1164,7 @@ func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framewor
func (f *frameworkImpl) RunReservePluginsUnreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(unreserve, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Unreserve, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
// Execute the Unreserve operation of each reserve plugin in the
// *reverse* order in which the Reserve operation was executed.
@ -1195,7 +1180,7 @@ func (f *frameworkImpl) runReservePluginUnreserve(ctx context.Context, pl framew
}
startTime := time.Now()
pl.Unreserve(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(unreserve, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.Unreserve, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
}
// RunPermitPlugins runs the set of configured permit plugins. If any of these
@ -1207,7 +1192,7 @@ func (f *frameworkImpl) runReservePluginUnreserve(ctx context.Context, pl framew
func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Permit, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
pluginsWaitTime := make(map[string]time.Duration)
statusCode := framework.Success
@ -1249,7 +1234,7 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.Permit
}
startTime := time.Now()
status, timeout := pl.Permit(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(permit, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
f.metricsRecorder.ObservePluginDurationAsync(metrics.Permit, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status, timeout
}

View File

@ -26,16 +26,36 @@ import (
)
const (
// SchedulerSubsystem - subsystem name used by scheduler
// SchedulerSubsystem - subsystem name used by scheduler.
SchedulerSubsystem = "scheduler"
// Below are possible values for the work label.
)
// PrioritizingExtender - prioritizing extender work label value
// Below are possible values for the work and operation label.
const (
// PrioritizingExtender - prioritizing extender work/operation label value.
PrioritizingExtender = "prioritizing_extender"
// Binding - binding work label value
// Binding - binding work/operation label value.
Binding = "binding"
)
// Below are possible values for the extension_point label.
const (
PreFilter = "PreFilter"
Filter = "Filter"
PreFilterExtensionAddPod = "PreFilterExtensionAddPod"
PreFilterExtensionRemovePod = "PreFilterExtensionRemovePod"
PostFilter = "PostFilter"
PreScore = "PreScore"
Score = "Score"
ScoreExtensionNormalize = "ScoreExtensionNormalize"
PreBind = "PreBind"
Bind = "Bind"
PostBind = "PostBind"
Reserve = "Reserve"
Unreserve = "Unreserve"
Permit = "Permit"
)
// All the histogram based metrics have 1ms as size for the smallest bucket.
var (
scheduleAttempts = metrics.NewCounterVec(

View File

@ -37,7 +37,6 @@ import (
"k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
@ -530,12 +529,12 @@ func (sched *Scheduler) findNodesThatPassFilters(
// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
metrics.FrameworkExtensionPointDuration.WithLabelValues(frameworkruntime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
}()
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, frameworkruntime.Filter)
fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, metrics.Filter)
feasibleNodes = feasibleNodes[:feasibleNodesLen]
if err := errCh.ReceiveError(); err != nil {
statusCode = framework.Error