From 08bd123b9523e3e6752925c1163def30a3dbf157 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Wed, 24 Aug 2022 10:10:32 +0000 Subject: [PATCH] feature(scheduler): add "goroutines" metric and deprecate the "scheduler_goroutines" metric --- .../framework/parallelize/parallelism.go | 12 ++++++++++-- .../plugins/interpodaffinity/filtering.go | 4 ++-- .../plugins/interpodaffinity/scoring.go | 2 +- .../plugins/podtopologyspread/filtering.go | 2 +- .../plugins/podtopologyspread/filtering_test.go | 2 +- .../plugins/podtopologyspread/scoring.go | 2 +- .../plugins/podtopologyspread/scoring_test.go | 2 +- .../selectorspread/selector_spread_perf_test.go | 2 +- pkg/scheduler/framework/preemption/preemption.go | 2 +- pkg/scheduler/framework/runtime/framework.go | 6 +++--- pkg/scheduler/metrics/metrics.go | 16 +++++++++++++--- pkg/scheduler/schedule_one.go | 6 +++++- 12 files changed, 40 insertions(+), 18 deletions(-) diff --git a/pkg/scheduler/framework/parallelize/parallelism.go b/pkg/scheduler/framework/parallelize/parallelism.go index 55f54a50bbd..6b137d31e9a 100644 --- a/pkg/scheduler/framework/parallelize/parallelism.go +++ b/pkg/scheduler/framework/parallelize/parallelism.go @@ -21,6 +21,7 @@ import ( "math" "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/scheduler/metrics" ) // DefaultParallelism is the default parallelism used in scheduler. @@ -51,6 +52,13 @@ func chunkSizeFor(n, parallelism int) int { } // Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms. -func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) { - workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism))) +// A given operation will be a label that is recorded in the goroutine metric. +func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc, operation string) { + withMetrics := func(piece int) { + metrics.Goroutines.WithLabelValues(operation).Inc() + defer metrics.Goroutines.WithLabelValues(operation).Dec() + doWorkPiece(piece) + } + + workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, withMetrics, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism))) } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go index 11e05e9753e..a9e7848c219 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go @@ -170,7 +170,7 @@ func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, p topoMaps[atomic.AddInt32(&index, 1)] = topoMap } } - pl.parallelizer.Until(ctx, len(nodes), processNode) + pl.parallelizer.Until(ctx, len(nodes), processNode, pl.Name()) result := make(topologyToMatchedTermCount) for i := 0; i <= int(index); i++ { @@ -216,7 +216,7 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Co antiAffinityCountsList[k] = antiAffinity } } - pl.parallelizer.Until(ctx, len(allNodes), processNode) + pl.parallelizer.Until(ctx, len(allNodes), processNode, pl.Name()) for i := 0; i <= int(index); i++ { affinityCounts.append(affinityCountsList[i]) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go index 238691bd46f..65ed4d4874b 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go @@ -203,7 +203,7 @@ func (pl *InterPodAffinity) PreScore( topoScores[atomic.AddInt32(&index, 1)] = topoScore } } - pl.parallelizer.Until(pCtx, len(allNodes), processNode) + pl.parallelizer.Until(pCtx, len(allNodes), processNode, pl.Name()) for i := 0; i <= int(index); i++ { state.topologyScore.append(topoScores[i]) diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go b/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go index 800f2b4d755..061ea16162a 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go @@ -304,7 +304,7 @@ func (pl *PodTopologySpread) calPreFilterState(ctx context.Context, pod *v1.Pod) } tpCountsByNode[i] = tpCounts } - pl.parallelizer.Until(ctx, len(allNodes), processNode) + pl.parallelizer.Until(ctx, len(allNodes), processNode, pl.Name()) for _, tpCounts := range tpCountsByNode { for tp, count := range tpCounts { diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go index cd7ea098e3f..e0e01ef3c2d 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go @@ -2013,7 +2013,7 @@ func BenchmarkFilter(b *testing.B) { n, _ := p.sharedLister.NodeInfos().Get(allNodes[i].Name) p.Filter(ctx, state, tt.pod, n) } - p.parallelizer.Until(ctx, len(allNodes), filterNode) + p.parallelizer.Until(ctx, len(allNodes), filterNode, "") } }) b.Run(tt.name+"/Clone", func(b *testing.B) { diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go index 2bf96ccf277..7a3c0589b1b 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go @@ -183,7 +183,7 @@ func (pl *PodTopologySpread) PreScore( atomic.AddInt64(tpCount, int64(count)) } } - pl.parallelizer.Until(ctx, len(allNodes), processAllNode) + pl.parallelizer.Until(ctx, len(allNodes), processAllNode, pl.Name()) cycleState.Write(preScoreStateKey, state) return nil diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go index 712cc14baa9..4378f8a1f5b 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go @@ -1408,7 +1408,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) { score, _ := p.Score(ctx, state, pod, n.Name) gotList[i] = framework.NodeScore{Name: n.Name, Score: score} } - p.parallelizer.Until(ctx, len(filteredNodes), scoreNode) + p.parallelizer.Until(ctx, len(filteredNodes), scoreNode, "") status = p.NormalizeScore(ctx, state, pod, gotList) if !status.IsSuccess() { b.Fatal(status) diff --git a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go index c69b272017b..6ff35783895 100644 --- a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go +++ b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go @@ -90,7 +90,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) { gotList[i] = framework.NodeScore{Name: n.Name, Score: score} } parallelizer := parallelize.NewParallelizer(parallelize.DefaultParallelism) - parallelizer.Until(ctx, len(filteredNodes), scoreNode) + parallelizer.Until(ctx, len(filteredNodes), scoreNode, "") status = plugin.NormalizeScore(ctx, state, pod, gotList) if !status.IsSuccess() { b.Fatal(status) diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 19db5ee2c85..0fad6f30b0e 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -611,6 +611,6 @@ func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentia nodeStatuses[nodeInfoCopy.Node().Name] = status statusesLock.Unlock() } - fh.Parallelizer().Until(ctx, len(potentialNodes), checkNode) + fh.Parallelizer().Until(ctx, len(potentialNodes), checkNode, ev.PluginName) return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs) } diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index c8db649b898..30d948c9fe6 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -925,7 +925,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy Score: s, } } - }) + }, score) if err := errCh.ReceiveError(); err != nil { return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err)) } @@ -943,7 +943,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy errCh.SendErrorWithCancel(err, cancel) return } - }) + }, score) if err := errCh.ReceiveError(); err != nil { return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err)) } @@ -964,7 +964,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy } nodeScoreList[i].Score = nodeScore.Score * int64(weight) } - }) + }, score) if err := errCh.ReceiveError(); err != nil { return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err)) } diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 8a1f5912b3a..b1da51ef89e 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -96,14 +96,23 @@ var ( Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.", StabilityLevel: metrics.STABLE, }, []string{"queue"}) + // SchedulerGoroutines isn't called in some parts where goroutines start. + // Goroutines metric replaces SchedulerGoroutines metric. Goroutine metric tracks all goroutines. SchedulerGoroutines = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: SchedulerSubsystem, + DeprecatedVersion: "1.26.0", + Name: "scheduler_goroutines", + Help: "Number of running goroutines split by the work they do such as binding. This metric is replaced by the \"goroutines\" metric.", + StabilityLevel: metrics.ALPHA, + }, []string{"work"}) + Goroutines = metrics.NewGaugeVec( &metrics.GaugeOpts{ Subsystem: SchedulerSubsystem, - Name: "scheduler_goroutines", + Name: "goroutines", Help: "Number of running goroutines split by the work they do such as binding.", StabilityLevel: metrics.ALPHA, - }, []string{"work"}) - + }, []string{"operation"}) PodSchedulingDuration = metrics.NewHistogramVec( &metrics.HistogramOpts{ Subsystem: SchedulerSubsystem, @@ -195,6 +204,7 @@ var ( PluginExecutionDuration, SchedulerQueueIncomingPods, SchedulerGoroutines, + Goroutines, PermitWaitDuration, CacheSize, unschedulableReasons, diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 233ea311a1b..f8855699094 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -105,6 +105,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc() defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec() + metrics.Goroutines.WithLabelValues(metrics.Binding).Inc() + defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec() sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, podsToActivate, start) }() @@ -533,7 +535,7 @@ func (sched *Scheduler) findNodesThatPassFilters( // Stops searching for more nodes once the configured number of feasible nodes // are found. - fwk.Parallelizer().Until(ctx, numAllNodes, checkNode) + fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, frameworkruntime.Filter) feasibleNodes = feasibleNodes[:feasibleNodesLen] if err := errCh.ReceiveError(); err != nil { statusCode = framework.Error @@ -688,8 +690,10 @@ func prioritizeNodes( wg.Add(1) go func(extIndex int) { metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc() + metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Inc() defer func() { metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec() + metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec() wg.Done() }() prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)