From c2ceb21a3ef625744284ff6c1fd658c393581e9f Mon Sep 17 00:00:00 2001 From: drfish Date: Mon, 1 Mar 2021 22:20:18 +0800 Subject: [PATCH] Make parallelism as part of schedulerOptions --- pkg/scheduler/core/generic_scheduler.go | 2 +- pkg/scheduler/factory.go | 2 + pkg/scheduler/framework/interface.go | 4 ++ .../defaultpreemption/default_preemption.go | 3 +- .../default_preemption_test.go | 19 +++--- .../plugins/interpodaffinity/filtering.go | 13 ++-- .../interpodaffinity/filtering_test.go | 23 ++++--- .../plugins/interpodaffinity/plugin.go | 3 + .../plugins/interpodaffinity/scoring.go | 3 +- .../plugins/interpodaffinity/scoring_test.go | 6 +- .../plugins/podtopologyspread/filtering.go | 3 +- .../podtopologyspread/filtering_test.go | 67 +++++++++---------- .../plugins/podtopologyspread/plugin.go | 3 + .../plugins/podtopologyspread/scoring.go | 3 +- .../plugins/podtopologyspread/scoring_test.go | 12 ++-- .../selector_spread_perf_test.go | 3 +- .../framework/plugins/testing/testing.go | 30 +++++++-- pkg/scheduler/framework/runtime/framework.go | 23 ++++++- .../internal/parallelize/parallelism.go | 24 +++---- .../internal/parallelize/parallelism_test.go | 4 +- pkg/scheduler/scheduler.go | 6 +- 21 files changed, 149 insertions(+), 107 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index ec22ca28e24..c9cdd774d50 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -333,7 +333,7 @@ func (g *genericScheduler) findNodesThatPassFilters( // Stops searching for more nodes once the configured number of feasible nodes // are found. - parallelize.Until(ctx, len(nodes), checkNode) + fwk.Parallelizer().Until(ctx, len(nodes), checkNode) processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap) g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes) diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 05acc274ae1..0288872afb2 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -83,6 +83,7 @@ type Configurator struct { nodeInfoSnapshot *internalcache.Snapshot extenders []schedulerapi.Extender frameworkCapturer FrameworkCapturer + parallellism int32 } // create a scheduler from a set of registered plugins. @@ -142,6 +143,7 @@ func (c *Configurator) create() (*Scheduler, error) { frameworkruntime.WithPodNominator(nominator), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(c.frameworkCapturer)), frameworkruntime.WithClusterEventMap(clusterEventMap), + frameworkruntime.WithParallelism(int(c.parallellism)), ) if err != nil { return nil, fmt.Errorf("initializing profiles: %v", err) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 8ae1c50fa02..87f5d8b8438 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -33,6 +33,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/events" "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" ) // NodeScoreList declares a list of nodes and their scores. @@ -597,6 +598,9 @@ type Handle interface { // Extenders returns registered scheduler extenders. Extenders() []Extender + + // Parallelizer returns a parallelizer holding parallelism for scheduler. + Parallelizer() parallelize.Parallelizer } // PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase. diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index 6437e09883c..d992ba77dff 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -44,7 +44,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -360,7 +359,7 @@ func dryRunPreemption(ctx context.Context, fh framework.Handle, statusesLock.Unlock() } } - parallelize.Until(parallelCtx, len(potentialNodes), checkNode) + fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode) return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses } diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 9952c315fb5..7bbd61eed68 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -996,11 +996,19 @@ func TestDryRunPreemption(t *testing.T) { registeredPlugins = append(registeredPlugins, tt.registerPlugins...) objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}} informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0) + parallelism := parallelize.DefaultParallelism + if tt.disableParallelism { + // We need disableParallelism because of the non-deterministic nature + // of the results of tests that set custom minCandidateNodesPercentage + // or minCandidateNodesAbsolute. This is only done in a handful of tests. + parallelism = 1 + } fwk, err := st.NewFramework( registeredPlugins, "", frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithParallelism(parallelism), ) if err != nil { t.Fatal(err) @@ -1019,17 +1027,6 @@ func TestDryRunPreemption(t *testing.T) { return nodeInfos[i].Node().Name < nodeInfos[j].Node().Name }) - if tt.disableParallelism { - // We need disableParallelism because of the non-deterministic nature - // of the results of tests that set custom minCandidateNodesPercentage - // or minCandidateNodesAbsolute. This is only done in a handful of tests. - oldParallelism := parallelize.GetParallelism() - parallelize.SetParallelism(1) - t.Cleanup(func() { - parallelize.SetParallelism(oldParallelism) - }) - } - if tt.args == nil { tt.args = getDefaultDefaultPreemptionArgs() } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go index 2b0d5d71c4e..d3e46ac2ea8 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go @@ -25,7 +25,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" ) const ( @@ -157,7 +156,7 @@ func podMatchesAllAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod, ena // calculates the following for each existing pod on each node: // (1) Whether it has PodAntiAffinity // (2) Whether any AffinityTerm matches the incoming pod -func getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo, enableNamespaceSelector bool) topologyToMatchedTermCount { +func (pl *InterPodAffinity) getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo, enableNamespaceSelector bool) topologyToMatchedTermCount { topoMaps := make([]topologyToMatchedTermCount, len(nodes)) index := int32(-1) processNode := func(i int) { @@ -175,7 +174,7 @@ func getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*fr topoMaps[atomic.AddInt32(&index, 1)] = topoMap } } - parallelize.Until(context.Background(), len(nodes), processNode) + pl.parallelizer.Until(context.Background(), len(nodes), processNode) result := make(topologyToMatchedTermCount) for i := 0; i <= int(index); i++ { @@ -189,7 +188,7 @@ func getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*fr // It returns a topologyToMatchedTermCount that are checked later by the affinity // predicate. With this topologyToMatchedTermCount available, the affinity predicate does not // need to check all the pods in the cluster. -func getIncomingAffinityAntiAffinityCounts(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo, enableNamespaceSelector bool) (topologyToMatchedTermCount, topologyToMatchedTermCount) { +func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo, enableNamespaceSelector bool) (topologyToMatchedTermCount, topologyToMatchedTermCount) { affinityCounts := make(topologyToMatchedTermCount) antiAffinityCounts := make(topologyToMatchedTermCount) if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 { @@ -221,7 +220,7 @@ func getIncomingAffinityAntiAffinityCounts(podInfo *framework.PodInfo, allNodes antiAffinityCountsList[k] = antiAffinity } } - parallelize.Until(context.Background(), len(allNodes), processNode) + pl.parallelizer.Until(context.Background(), len(allNodes), processNode) for i := 0; i <= int(index); i++ { affinityCounts.append(affinityCountsList[i]) @@ -266,8 +265,8 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister) } - s.existingAntiAffinityCounts = getExistingAntiAffinityCounts(pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods, pl.enableNamespaceSelector) - s.affinityCounts, s.antiAffinityCounts = getIncomingAffinityAntiAffinityCounts(s.podInfo, allNodes, pl.enableNamespaceSelector) + s.existingAntiAffinityCounts = pl.getExistingAntiAffinityCounts(pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods, pl.enableNamespaceSelector) + s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(s.podInfo, allNodes, pl.enableNamespaceSelector) cycleState.Write(preFilterStateKey, s) return nil diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go index 6cf104c291e..8460aef658b 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go @@ -1016,7 +1016,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) { EnablePodAffinityNamespaceSelector: !test.disableNSSelector, }) } - p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, namespaces) + p := plugintesting.SetupPluginWithInformers(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, namespaces) state := framework.NewCycleState() preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod) if !preFilterStatus.IsSuccess() { @@ -1908,7 +1908,7 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) { n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { return New(plArgs, fh, feature.Features{}) } - p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, + p := plugintesting.SetupPluginWithInformers(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, []runtime.Object{ &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "NS1"}}, }) @@ -1933,9 +1933,12 @@ func TestPreFilterDisabled(t *testing.T) { nodeInfo := framework.NewNodeInfo() node := v1.Node{} nodeInfo.SetNode(&node) - p := &InterPodAffinity{} + n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return New(plArgs, fh, feature.Features{}) + } + p := plugintesting.SetupPlugin(t, n, &config.InterPodAffinityArgs{}, cache.NewEmptySnapshot()) cycleState := framework.NewCycleState() - gotStatus := p.Filter(context.Background(), cycleState, pod, nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, pod, nodeInfo) wantStatus := framework.AsStatus(fmt.Errorf(`error reading "PreFilterInterPodAffinity" from cycleState: %w`, framework.ErrNotFound)) if !reflect.DeepEqual(gotStatus, wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus) @@ -2204,7 +2207,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) { n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { return New(plArgs, fh, feature.Features{}) } - p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, nil) + p := plugintesting.SetupPlugin(t, n, &config.InterPodAffinityArgs{}, snapshot) cycleState := framework.NewCycleState() preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pendingPod) if !preFilterStatus.IsSuccess() { @@ -2485,9 +2488,13 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := cache.NewSnapshot(tt.existingPods, tt.nodes) - l, _ := s.NodeInfos().List() - gotAffinityPodsMap, gotAntiAffinityPodsMap := getIncomingAffinityAntiAffinityCounts(framework.NewPodInfo(tt.pod), l, true) + snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) + l, _ := snapshot.NodeInfos().List() + n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return New(plArgs, fh, feature.Features{}) + } + p := plugintesting.SetupPlugin(t, n, &config.InterPodAffinityArgs{}, snapshot) + gotAffinityPodsMap, gotAntiAffinityPodsMap := p.(*InterPodAffinity).getIncomingAffinityAntiAffinityCounts(framework.NewPodInfo(tt.pod), l, true) if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) { t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap) } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go index 9928b260235..bb02616939e 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" ) const ( @@ -41,6 +42,7 @@ var _ framework.ScorePlugin = &InterPodAffinity{} // InterPodAffinity is a plugin that checks inter pod affinity type InterPodAffinity struct { + parallelizer parallelize.Parallelizer args config.InterPodAffinityArgs sharedLister framework.SharedLister nsLister listersv1.NamespaceLister @@ -65,6 +67,7 @@ func New(plArgs runtime.Object, h framework.Handle, fts feature.Features) (frame return nil, err } pl := &InterPodAffinity{ + parallelizer: h.Parallelizer(), args: args, sharedLister: h.SnapshotSharedLister(), enableNamespaceSelector: fts.EnablePodAffinityNamespaceSelector, diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go index 716a67fa972..357c6ad15d1 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go @@ -25,7 +25,6 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" ) // preScoreStateKey is the key in CycleState to InterPodAffinity pre-computed data for Scoring. @@ -206,7 +205,7 @@ func (pl *InterPodAffinity) PreScore( topoScores[atomic.AddInt32(&index, 1)] = topoScore } } - parallelize.Until(context.Background(), len(allNodes), processNode) + pl.parallelizer.Until(context.Background(), len(allNodes), processNode) for i := 0; i <= int(index); i++ { state.topologyScore.append(topoScores[i]) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go index 78ef9c584c3..fb90f9b82ce 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go @@ -743,13 +743,12 @@ func TestPreferredAffinity(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := context.Background() state := framework.NewCycleState() - snapshot := cache.NewSnapshot(test.pods, test.nodes) n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { return New(plArgs, fh, feature.Features{ EnablePodAffinityNamespaceSelector: !test.disableNSSelector, }) } - p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: 1}, snapshot, namespaces) + p := plugintesting.SetupPluginWithInformers(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: 1}, cache.NewSnapshot(test.pods, test.nodes), namespaces) status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes) if !status.IsSuccess() { if !strings.Contains(status.Message(), test.wantStatus.Message()) { @@ -910,13 +909,12 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := context.Background() state := framework.NewCycleState() - snapshot := cache.NewSnapshot(test.pods, test.nodes) n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { return New(plArgs, fh, feature.Features{ EnablePodAffinityNamespaceSelector: !test.disableNSSelector, }) } - p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, snapshot, namespaces) + p := plugintesting.SetupPluginWithInformers(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, cache.NewSnapshot(test.pods, test.nodes), namespaces) status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go b/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go index c4d9253c1d3..b66037bd2a0 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go @@ -27,7 +27,6 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" - "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" ) const preFilterStateKey = "PreFilter" + Name @@ -258,7 +257,7 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er atomic.AddInt32(tpCount, int32(count)) } } - parallelize.Until(context.Background(), len(allNodes), processNode) + pl.parallelizer.Until(context.Background(), len(allNodes), processNode) // calculate min match for each topology pair for i := 0; i < len(constraints); i++ { diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go index 1628604832f..a72e0abbd1f 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go @@ -27,11 +27,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" + plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" "k8s.io/kubernetes/pkg/scheduler/internal/cache" - "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/utils/pointer" ) @@ -515,16 +514,13 @@ func TestPreFilterState(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0) - pl := PodTopologySpread{ - sharedLister: cache.NewSnapshot(tt.existingPods, tt.nodes), - defaultConstraints: tt.defaultConstraints, + args := &config.PodTopologySpreadArgs{ + DefaultConstraints: tt.defaultConstraints, + DefaultingType: config.ListDefaulting, } - pl.setListers(informerFactory) - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) + p := plugintesting.SetupPluginWithInformers(ctx, t, New, args, cache.NewSnapshot(tt.existingPods, tt.nodes), tt.objs) cs := framework.NewCycleState() - if s := pl.PreFilter(ctx, cs, tt.pod); !s.IsSuccess() { + if s := p.(*PodTopologySpread).PreFilter(ctx, cs, tt.pod); !s.IsSuccess() { t.Fatal(s.AsError()) } got, err := getPreFilterState(cs) @@ -825,20 +821,19 @@ func TestPreFilterStateAddPod(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) - pl := PodTopologySpread{ - sharedLister: snapshot, - } - cs := framework.NewCycleState() ctx := context.Background() - if s := pl.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() { + snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) + pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) + p := pl.(*PodTopologySpread) + cs := framework.NewCycleState() + if s := p.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() { t.Fatal(s.AsError()) } nodeInfo, err := snapshot.Get(tt.nodes[tt.nodeIdx].Name) if err != nil { t.Fatal(err) } - if s := pl.AddPod(ctx, cs, tt.preemptor, framework.NewPodInfo(tt.addedPod), nodeInfo); !s.IsSuccess() { + if s := p.AddPod(ctx, cs, tt.preemptor, framework.NewPodInfo(tt.addedPod), nodeInfo); !s.IsSuccess() { t.Fatal(s.AsError()) } state, err := getPreFilterState(cs) @@ -1030,13 +1025,12 @@ func TestPreFilterStateRemovePod(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) - pl := PodTopologySpread{ - sharedLister: snapshot, - } - cs := framework.NewCycleState() ctx := context.Background() - s := pl.PreFilter(ctx, cs, tt.preemptor) + snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) + pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) + p := pl.(*PodTopologySpread) + cs := framework.NewCycleState() + s := p.PreFilter(ctx, cs, tt.preemptor) if !s.IsSuccess() { t.Fatal(s.AsError()) } @@ -1050,7 +1044,7 @@ func TestPreFilterStateRemovePod(t *testing.T) { if err != nil { t.Fatal(err) } - if s := pl.RemovePod(ctx, cs, tt.preemptor, framework.NewPodInfo(deletedPod), nodeInfo); !s.IsSuccess() { + if s := p.RemovePod(ctx, cs, tt.preemptor, framework.NewPodInfo(deletedPod), nodeInfo); !s.IsSuccess() { t.Fatal(s.AsError()) } @@ -1106,22 +1100,21 @@ func BenchmarkFilter(b *testing.B) { var state *framework.CycleState b.Run(tt.name, func(b *testing.B) { existingPods, allNodes, _ := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) - pl := PodTopologySpread{ - sharedLister: cache.NewSnapshot(existingPods, allNodes), - } ctx := context.Background() + pl := plugintesting.SetupPlugin(b, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes)) + p := pl.(*PodTopologySpread) b.ResetTimer() for i := 0; i < b.N; i++ { state = framework.NewCycleState() - s := pl.PreFilter(ctx, state, tt.pod) + s := p.PreFilter(ctx, state, tt.pod) if !s.IsSuccess() { b.Fatal(s.AsError()) } filterNode := func(i int) { - n, _ := pl.sharedLister.NodeInfos().Get(allNodes[i].Name) - pl.Filter(ctx, state, tt.pod, n) + n, _ := p.sharedLister.NodeInfos().Get(allNodes[i].Name) + p.Filter(ctx, state, tt.pod, n) } - parallelize.Until(ctx, len(allNodes), filterNode) + p.parallelizer.Until(ctx, len(allNodes), filterNode) } }) b.Run(tt.name+"/Clone", func(b *testing.B) { @@ -1411,7 +1404,8 @@ func TestSingleConstraint(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) - p := &PodTopologySpread{sharedLister: snapshot} + pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) + p := pl.(*PodTopologySpread) state := framework.NewCycleState() preFilterStatus := p.PreFilter(context.Background(), state, tt.pod) if !preFilterStatus.IsSuccess() { @@ -1637,7 +1631,8 @@ func TestMultipleConstraints(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) - p := &PodTopologySpread{sharedLister: snapshot} + pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) + p := pl.(*PodTopologySpread) state := framework.NewCycleState() preFilterStatus := p.PreFilter(context.Background(), state, tt.pod) if !preFilterStatus.IsSuccess() { @@ -1660,9 +1655,9 @@ func TestPreFilterDisabled(t *testing.T) { nodeInfo := framework.NewNodeInfo() node := v1.Node{} nodeInfo.SetNode(&node) - p := &PodTopologySpread{} + p := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewEmptySnapshot()) cycleState := framework.NewCycleState() - gotStatus := p.Filter(context.Background(), cycleState, pod, nodeInfo) + gotStatus := p.(*PodTopologySpread).Filter(context.Background(), cycleState, pod, nodeInfo) wantStatus := framework.AsStatus(fmt.Errorf(`reading "PreFilterPodTopologySpread" from cycleState: %w`, framework.ErrNotFound)) if !reflect.DeepEqual(gotStatus, wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus) diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go index d80355b8895..e58df1143b5 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" ) const ( @@ -51,6 +52,7 @@ var systemDefaultConstraints = []v1.TopologySpreadConstraint{ // PodTopologySpread is a plugin that ensures pod's topologySpreadConstraints is satisfied. type PodTopologySpread struct { + parallelizer parallelize.Parallelizer defaultConstraints []v1.TopologySpreadConstraint sharedLister framework.SharedLister services corelisters.ServiceLister @@ -87,6 +89,7 @@ func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { return nil, err } pl := &PodTopologySpread{ + parallelizer: h.Parallelizer(), sharedLister: h.SnapshotSharedLister(), defaultConstraints: args.DefaultConstraints, } diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go index bba5abe9750..3203e1dd1b4 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go @@ -26,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/scheduler/framework" pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" - "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" ) const preScoreStateKey = "PreScore" + Name @@ -163,7 +162,7 @@ func (pl *PodTopologySpread) PreScore( atomic.AddInt64(tpCount, int64(count)) } } - parallelize.Until(ctx, len(allNodes), processAllNode) + pl.parallelizer.Until(ctx, len(allNodes), processAllNode) cycleState.Write(preScoreStateKey, state) return nil diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go index ac9415d8e14..57a24ac76b9 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go @@ -29,9 +29,9 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" + plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/internal/cache" - "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/utils/pointer" ) @@ -687,8 +687,8 @@ func TestPodTopologySpreadScore(t *testing.T) { allNodes := append([]*v1.Node{}, tt.nodes...) allNodes = append(allNodes, tt.failedNodes...) state := framework.NewCycleState() - snapshot := cache.NewSnapshot(tt.existingPods, allNodes) - p := &PodTopologySpread{sharedLister: snapshot} + pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(tt.existingPods, allNodes)) + p := pl.(*PodTopologySpread) status := p.PreScore(context.Background(), state, tt.pod, tt.nodes) if !status.IsSuccess() { @@ -757,8 +757,8 @@ func BenchmarkTestPodTopologySpreadScore(b *testing.B) { b.Run(tt.name, func(b *testing.B) { existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) state := framework.NewCycleState() - snapshot := cache.NewSnapshot(existingPods, allNodes) - p := &PodTopologySpread{sharedLister: snapshot} + pl := plugintesting.SetupPlugin(b, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes)) + p := pl.(*PodTopologySpread) status := p.PreScore(context.Background(), state, tt.pod, filteredNodes) if !status.IsSuccess() { @@ -854,7 +854,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) { score, _ := p.Score(ctx, state, pod, n.Name) gotList[i] = framework.NodeScore{Name: n.Name, Score: score} } - parallelize.Until(ctx, len(filteredNodes), scoreNode) + p.parallelizer.Until(ctx, len(filteredNodes), scoreNode) status = p.NormalizeScore(ctx, state, pod, gotList) if !status.IsSuccess() { b.Fatal(status) diff --git a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go index e238eec90a4..b5ea1dd8c69 100644 --- a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go +++ b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go @@ -88,7 +88,8 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) { score, _ := plugin.Score(ctx, state, pod, n.Name) gotList[i] = framework.NodeScore{Name: n.Name, Score: score} } - parallelize.Until(ctx, len(filteredNodes), scoreNode) + parallelizer := parallelize.NewParallelizer(parallelize.DefaultParallelism) + parallelizer.Until(ctx, len(filteredNodes), scoreNode) status = plugin.NormalizeScore(ctx, state, pod, gotList) if !status.IsSuccess() { b.Fatal(status) diff --git a/pkg/scheduler/framework/plugins/testing/testing.go b/pkg/scheduler/framework/plugins/testing/testing.go index 72f6cfa83e2..e42c713915c 100644 --- a/pkg/scheduler/framework/plugins/testing/testing.go +++ b/pkg/scheduler/framework/plugins/testing/testing.go @@ -29,13 +29,13 @@ import ( frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" ) -// SetupPlugin creates a plugin using a framework handle that includes +// SetupPluginWithInformers creates a plugin using a framework handle that includes // the provided sharedLister and a SharedInformerFactory with the provided objects. // The function also creates an empty namespace (since most tests creates pods with // empty namespace), and start informer factory. -func SetupPlugin( +func SetupPluginWithInformers( ctx context.Context, - t *testing.T, + tb testing.TB, pf frameworkruntime.PluginFactory, config runtime.Object, sharedLister framework.SharedLister, @@ -47,13 +47,33 @@ func SetupPlugin( frameworkruntime.WithSnapshotSharedLister(sharedLister), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { - t.Fatalf("Failed creating framework runtime: %v", err) + tb.Fatalf("Failed creating framework runtime: %v", err) } p, err := pf(config, fh) if err != nil { - t.Fatal(err) + tb.Fatal(err) } informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) return p } + +// SetupPlugin creates a plugin using a framework handle that includes +// the provided sharedLister. +func SetupPlugin( + tb testing.TB, + pf frameworkruntime.PluginFactory, + config runtime.Object, + sharedLister framework.SharedLister, +) framework.Plugin { + fh, err := frameworkruntime.NewFramework(nil, nil, + frameworkruntime.WithSnapshotSharedLister(sharedLister)) + if err != nil { + tb.Fatalf("Failed creating framework runtime: %v", err) + } + p, err := pf(config, fh) + if err != nil { + tb.Fatal(err) + } + return p +} diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index fea07023af0..7177d2ba342 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -101,6 +101,8 @@ type frameworkImpl struct { extenders []framework.Extender framework.PodNominator + parallelizer parallelize.Parallelizer + // Indicates that RunFilterPlugins should accumulate all failed statuses and not return // after the first failure. runAllFilters bool @@ -149,6 +151,7 @@ type frameworkOptions struct { runAllFilters bool captureProfile CaptureProfile clusterEventMap map[framework.ClusterEvent]sets.String + parallelizer parallelize.Parallelizer } // Option for the frameworkImpl. @@ -204,6 +207,13 @@ func WithExtenders(extenders []framework.Extender) Option { } } +// WithParallelism sets parallelism for the scheduling frameworkImpl. +func WithParallelism(parallelism int) Option { + return func(o *frameworkOptions) { + o.parallelizer = parallelize.NewParallelizer(parallelism) + } +} + // CaptureProfile is a callback to capture a finalized profile. type CaptureProfile func(config.KubeSchedulerProfile) @@ -218,6 +228,7 @@ func defaultFrameworkOptions() frameworkOptions { return frameworkOptions{ metricsRecorder: newMetricsRecorder(1000, time.Second), clusterEventMap: make(map[framework.ClusterEvent]sets.String), + parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism), } } @@ -249,6 +260,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti runAllFilters: options.runAllFilters, extenders: options.extenders, PodNominator: options.podNominator, + parallelizer: options.parallelizer, } if profile == nil { @@ -757,7 +769,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy errCh := parallelize.NewErrorChannel() // Run Score method for each node in parallel. - parallelize.Until(ctx, len(nodes), func(index int) { + f.Parallelizer().Until(ctx, len(nodes), func(index int) { for _, pl := range f.scorePlugins { nodeName := nodes[index].Name s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) @@ -777,7 +789,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy } // Run NormalizeScore method for each ScorePlugin in parallel. - parallelize.Until(ctx, len(f.scorePlugins), func(index int) { + f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) { pl := f.scorePlugins[index] nodeScoreList := pluginToNodeScores[pl.Name()] if pl.ScoreExtensions() == nil { @@ -795,7 +807,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy } // Apply score defaultWeights for each ScorePlugin in parallel. - parallelize.Until(ctx, len(f.scorePlugins), func(index int) { + f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) { pl := f.scorePlugins[index] // Score plugins' weight has been checked when they are initialized. weight := f.pluginNameToWeightMap[pl.Name()] @@ -1169,3 +1181,8 @@ func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) map[string]config func (f *frameworkImpl) ProfileName() string { return f.profileName } + +// Parallelizer returns a parallelizer holding parallelism for scheduler. +func (f *frameworkImpl) Parallelizer() parallelize.Parallelizer { + return f.parallelizer +} diff --git a/pkg/scheduler/internal/parallelize/parallelism.go b/pkg/scheduler/internal/parallelize/parallelism.go index db2df1c5eaa..55f54a50bbd 100644 --- a/pkg/scheduler/internal/parallelize/parallelism.go +++ b/pkg/scheduler/internal/parallelize/parallelism.go @@ -23,25 +23,23 @@ import ( "k8s.io/client-go/util/workqueue" ) -var ( - parallelism = 16 -) +// DefaultParallelism is the default parallelism used in scheduler. +const DefaultParallelism int = 16 -// GetParallelism returns the currently set parallelism. -func GetParallelism() int { - return parallelism +// Parallelizer holds the parallelism for scheduler. +type Parallelizer struct { + parallelism int } -// SetParallelism sets the parallelism for all scheduler algorithms. -// TODO(#95952): Remove global setter in favor of a struct that holds the configuration. -func SetParallelism(p int) { - parallelism = p +// NewParallelizer returns an object holding the parallelism. +func NewParallelizer(p int) Parallelizer { + return Parallelizer{parallelism: p} } // chunkSizeFor returns a chunk size for the given number of items to use for // parallel work. The size aims to produce good CPU utilization. // returns max(1, min(sqrt(n), n/Parallelism)) -func chunkSizeFor(n int) int { +func chunkSizeFor(n, parallelism int) int { s := int(math.Sqrt(float64(n))) if r := n/parallelism + 1; s > r { @@ -53,6 +51,6 @@ func chunkSizeFor(n int) int { } // Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms. -func Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) { - workqueue.ParallelizeUntil(ctx, parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces))) +func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) { + workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism))) } diff --git a/pkg/scheduler/internal/parallelize/parallelism_test.go b/pkg/scheduler/internal/parallelize/parallelism_test.go index 4f22dea1062..8449c4f4510 100644 --- a/pkg/scheduler/internal/parallelize/parallelism_test.go +++ b/pkg/scheduler/internal/parallelize/parallelism_test.go @@ -46,8 +46,8 @@ func TestChunkSize(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("%d", test.input), func(t *testing.T) { - if chunkSizeFor(test.input) != test.wantOutput { - t.Errorf("Expected: %d, got: %d", test.wantOutput, chunkSizeFor(test.input)) + if chunkSizeFor(test.input, DefaultParallelism) != test.wantOutput { + t.Errorf("Expected: %d, got: %d", test.wantOutput, chunkSizeFor(test.input, DefaultParallelism)) } }) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index fa95b878a09..e1de5812fd7 100755 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -98,6 +98,7 @@ type schedulerOptions struct { profiles []schedulerapi.KubeSchedulerProfile extenders []schedulerapi.Extender frameworkCapturer FrameworkCapturer + parallelism int32 } // Option configures a Scheduler @@ -112,10 +113,9 @@ func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option { } // WithParallelism sets the parallelism for all scheduler algorithms. Default is 16. -// TODO(#95952): Remove global setter in favor of a struct that holds the configuration. func WithParallelism(threads int32) Option { return func(o *schedulerOptions) { - parallelize.SetParallelism(int(threads)) + o.parallelism = threads } } @@ -183,6 +183,7 @@ var defaultSchedulerOptions = schedulerOptions{ percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), + parallelism: int32(parallelize.DefaultParallelism), } // New returns a Scheduler @@ -225,6 +226,7 @@ func New(client clientset.Interface, nodeInfoSnapshot: snapshot, extenders: options.extenders, frameworkCapturer: options.frameworkCapturer, + parallellism: options.parallelism, } metrics.Register()