Merge pull request #112003 from sanposhiho/metrics-goroutine

feature(scheduler): add "goroutines" metric and deprecate the "scheduler_goroutines" metric
This commit is contained in:
Kubernetes Prow Robot 2022-09-12 12:01:16 -07:00 committed by GitHub
commit 3ac752e4a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 40 additions and 18 deletions

View File

@ -21,6 +21,7 @@ import (
"math" "math"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
) )
// DefaultParallelism is the default parallelism used in scheduler. // DefaultParallelism is the default parallelism used in scheduler.
@ -51,6 +52,13 @@ func chunkSizeFor(n, parallelism int) int {
} }
// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms. // Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) { // A given operation will be a label that is recorded in the goroutine metric.
workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism))) func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc, operation string) {
withMetrics := func(piece int) {
metrics.Goroutines.WithLabelValues(operation).Inc()
defer metrics.Goroutines.WithLabelValues(operation).Dec()
doWorkPiece(piece)
}
workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, withMetrics, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism)))
} }

View File

@ -170,7 +170,7 @@ func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, p
topoMaps[atomic.AddInt32(&index, 1)] = topoMap topoMaps[atomic.AddInt32(&index, 1)] = topoMap
} }
} }
pl.parallelizer.Until(ctx, len(nodes), processNode) pl.parallelizer.Until(ctx, len(nodes), processNode, pl.Name())
result := make(topologyToMatchedTermCount) result := make(topologyToMatchedTermCount)
for i := 0; i <= int(index); i++ { for i := 0; i <= int(index); i++ {
@ -216,7 +216,7 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Co
antiAffinityCountsList[k] = antiAffinity antiAffinityCountsList[k] = antiAffinity
} }
} }
pl.parallelizer.Until(ctx, len(allNodes), processNode) pl.parallelizer.Until(ctx, len(allNodes), processNode, pl.Name())
for i := 0; i <= int(index); i++ { for i := 0; i <= int(index); i++ {
affinityCounts.append(affinityCountsList[i]) affinityCounts.append(affinityCountsList[i])

View File

@ -203,7 +203,7 @@ func (pl *InterPodAffinity) PreScore(
topoScores[atomic.AddInt32(&index, 1)] = topoScore topoScores[atomic.AddInt32(&index, 1)] = topoScore
} }
} }
pl.parallelizer.Until(pCtx, len(allNodes), processNode) pl.parallelizer.Until(pCtx, len(allNodes), processNode, pl.Name())
for i := 0; i <= int(index); i++ { for i := 0; i <= int(index); i++ {
state.topologyScore.append(topoScores[i]) state.topologyScore.append(topoScores[i])

View File

@ -304,7 +304,7 @@ func (pl *PodTopologySpread) calPreFilterState(ctx context.Context, pod *v1.Pod)
} }
tpCountsByNode[i] = tpCounts tpCountsByNode[i] = tpCounts
} }
pl.parallelizer.Until(ctx, len(allNodes), processNode) pl.parallelizer.Until(ctx, len(allNodes), processNode, pl.Name())
for _, tpCounts := range tpCountsByNode { for _, tpCounts := range tpCountsByNode {
for tp, count := range tpCounts { for tp, count := range tpCounts {

View File

@ -2013,7 +2013,7 @@ func BenchmarkFilter(b *testing.B) {
n, _ := p.sharedLister.NodeInfos().Get(allNodes[i].Name) n, _ := p.sharedLister.NodeInfos().Get(allNodes[i].Name)
p.Filter(ctx, state, tt.pod, n) p.Filter(ctx, state, tt.pod, n)
} }
p.parallelizer.Until(ctx, len(allNodes), filterNode) p.parallelizer.Until(ctx, len(allNodes), filterNode, "")
} }
}) })
b.Run(tt.name+"/Clone", func(b *testing.B) { b.Run(tt.name+"/Clone", func(b *testing.B) {

View File

@ -183,7 +183,7 @@ func (pl *PodTopologySpread) PreScore(
atomic.AddInt64(tpCount, int64(count)) atomic.AddInt64(tpCount, int64(count))
} }
} }
pl.parallelizer.Until(ctx, len(allNodes), processAllNode) pl.parallelizer.Until(ctx, len(allNodes), processAllNode, pl.Name())
cycleState.Write(preScoreStateKey, state) cycleState.Write(preScoreStateKey, state)
return nil return nil

View File

@ -1408,7 +1408,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
score, _ := p.Score(ctx, state, pod, n.Name) score, _ := p.Score(ctx, state, pod, n.Name)
gotList[i] = framework.NodeScore{Name: n.Name, Score: score} gotList[i] = framework.NodeScore{Name: n.Name, Score: score}
} }
p.parallelizer.Until(ctx, len(filteredNodes), scoreNode) p.parallelizer.Until(ctx, len(filteredNodes), scoreNode, "")
status = p.NormalizeScore(ctx, state, pod, gotList) status = p.NormalizeScore(ctx, state, pod, gotList)
if !status.IsSuccess() { if !status.IsSuccess() {
b.Fatal(status) b.Fatal(status)

View File

@ -90,7 +90,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
gotList[i] = framework.NodeScore{Name: n.Name, Score: score} gotList[i] = framework.NodeScore{Name: n.Name, Score: score}
} }
parallelizer := parallelize.NewParallelizer(parallelize.DefaultParallelism) parallelizer := parallelize.NewParallelizer(parallelize.DefaultParallelism)
parallelizer.Until(ctx, len(filteredNodes), scoreNode) parallelizer.Until(ctx, len(filteredNodes), scoreNode, "")
status = plugin.NormalizeScore(ctx, state, pod, gotList) status = plugin.NormalizeScore(ctx, state, pod, gotList)
if !status.IsSuccess() { if !status.IsSuccess() {
b.Fatal(status) b.Fatal(status)

View File

@ -611,6 +611,6 @@ func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentia
nodeStatuses[nodeInfoCopy.Node().Name] = status nodeStatuses[nodeInfoCopy.Node().Name] = status
statusesLock.Unlock() statusesLock.Unlock()
} }
fh.Parallelizer().Until(ctx, len(potentialNodes), checkNode) fh.Parallelizer().Until(ctx, len(potentialNodes), checkNode, ev.PluginName)
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs) return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs)
} }

View File

@ -925,7 +925,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
Score: s, Score: s,
} }
} }
}) }, score)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err)) return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
} }
@ -943,7 +943,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
errCh.SendErrorWithCancel(err, cancel) errCh.SendErrorWithCancel(err, cancel)
return return
} }
}) }, score)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err)) return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
} }
@ -964,7 +964,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
} }
nodeScoreList[i].Score = nodeScore.Score * int64(weight) nodeScoreList[i].Score = nodeScore.Score * int64(weight)
} }
}) }, score)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err)) return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err))
} }

View File

@ -95,14 +95,23 @@ var (
Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.", Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.",
StabilityLevel: metrics.STABLE, StabilityLevel: metrics.STABLE,
}, []string{"queue"}) }, []string{"queue"})
// SchedulerGoroutines isn't called in some parts where goroutines start.
// Goroutines metric replaces SchedulerGoroutines metric. Goroutine metric tracks all goroutines.
SchedulerGoroutines = metrics.NewGaugeVec( SchedulerGoroutines = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: SchedulerSubsystem,
DeprecatedVersion: "1.26.0",
Name: "scheduler_goroutines",
Help: "Number of running goroutines split by the work they do such as binding. This metric is replaced by the \"goroutines\" metric.",
StabilityLevel: metrics.ALPHA,
}, []string{"work"})
Goroutines = metrics.NewGaugeVec(
&metrics.GaugeOpts{ &metrics.GaugeOpts{
Subsystem: SchedulerSubsystem, Subsystem: SchedulerSubsystem,
Name: "scheduler_goroutines", Name: "goroutines",
Help: "Number of running goroutines split by the work they do such as binding.", Help: "Number of running goroutines split by the work they do such as binding.",
StabilityLevel: metrics.ALPHA, StabilityLevel: metrics.ALPHA,
}, []string{"work"}) }, []string{"operation"})
PodSchedulingDuration = metrics.NewHistogramVec( PodSchedulingDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{ &metrics.HistogramOpts{
Subsystem: SchedulerSubsystem, Subsystem: SchedulerSubsystem,
@ -194,6 +203,7 @@ var (
PluginExecutionDuration, PluginExecutionDuration,
SchedulerQueueIncomingPods, SchedulerQueueIncomingPods,
SchedulerGoroutines, SchedulerGoroutines,
Goroutines,
PermitWaitDuration, PermitWaitDuration,
CacheSize, CacheSize,
unschedulableReasons, unschedulableReasons,

View File

@ -103,6 +103,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc() metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec() defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()
sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, podsToActivate, start) sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, podsToActivate, start)
}() }()
@ -563,7 +565,7 @@ func (sched *Scheduler) findNodesThatPassFilters(
// Stops searching for more nodes once the configured number of feasible nodes // Stops searching for more nodes once the configured number of feasible nodes
// are found. // are found.
fwk.Parallelizer().Until(ctx, numAllNodes, checkNode) fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, frameworkruntime.Filter)
feasibleNodes = feasibleNodes[:feasibleNodesLen] feasibleNodes = feasibleNodes[:feasibleNodesLen]
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
statusCode = framework.Error statusCode = framework.Error
@ -718,8 +720,10 @@ func prioritizeNodes(
wg.Add(1) wg.Add(1)
go func(extIndex int) { go func(extIndex int) {
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc() metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
defer func() { defer func() {
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec() metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
wg.Done() wg.Done()
}() }()
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes) prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)