Merge pull request #99644 from gavinfish/sched-param

Scheduler: make parallelism as part of schedulerOptions
This commit is contained in:
Kubernetes Prow Robot 2021-03-09 11:05:42 -08:00 committed by GitHub
commit d256468a09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 149 additions and 107 deletions

View File

@ -333,7 +333,7 @@ func (g *genericScheduler) findNodesThatPassFilters(
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
parallelize.Until(ctx, len(nodes), checkNode)
fwk.Parallelizer().Until(ctx, len(nodes), checkNode)
processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)

View File

@ -83,6 +83,7 @@ type Configurator struct {
nodeInfoSnapshot *internalcache.Snapshot
extenders []schedulerapi.Extender
frameworkCapturer FrameworkCapturer
parallellism int32
}
// create a scheduler from a set of registered plugins.
@ -142,6 +143,7 @@ func (c *Configurator) create() (*Scheduler, error) {
frameworkruntime.WithPodNominator(nominator),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(c.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithParallelism(int(c.parallellism)),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)

View File

@ -33,6 +33,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)
// NodeScoreList declares a list of nodes and their scores.
@ -597,6 +598,9 @@ type Handle interface {
// Extenders returns registered scheduler extenders.
Extenders() []Extender
// Parallelizer returns a parallelizer holding parallelism for scheduler.
Parallelizer() parallelize.Parallelizer
}
// PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase.

View File

@ -44,7 +44,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -360,7 +359,7 @@ func dryRunPreemption(ctx context.Context, fh framework.Handle,
statusesLock.Unlock()
}
}
parallelize.Until(parallelCtx, len(potentialNodes), checkNode)
fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode)
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses
}

View File

@ -996,11 +996,19 @@ func TestDryRunPreemption(t *testing.T) {
registeredPlugins = append(registeredPlugins, tt.registerPlugins...)
objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
parallelism := parallelize.DefaultParallelism
if tt.disableParallelism {
// We need disableParallelism because of the non-deterministic nature
// of the results of tests that set custom minCandidateNodesPercentage
// or minCandidateNodesAbsolute. This is only done in a handful of tests.
parallelism = 1
}
fwk, err := st.NewFramework(
registeredPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithParallelism(parallelism),
)
if err != nil {
t.Fatal(err)
@ -1019,17 +1027,6 @@ func TestDryRunPreemption(t *testing.T) {
return nodeInfos[i].Node().Name < nodeInfos[j].Node().Name
})
if tt.disableParallelism {
// We need disableParallelism because of the non-deterministic nature
// of the results of tests that set custom minCandidateNodesPercentage
// or minCandidateNodesAbsolute. This is only done in a handful of tests.
oldParallelism := parallelize.GetParallelism()
parallelize.SetParallelism(1)
t.Cleanup(func() {
parallelize.SetParallelism(oldParallelism)
})
}
if tt.args == nil {
tt.args = getDefaultDefaultPreemptionArgs()
}

View File

@ -25,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)
const (
@ -157,7 +156,7 @@ func podMatchesAllAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod, ena
// calculates the following for each existing pod on each node:
// (1) Whether it has PodAntiAffinity
// (2) Whether any AffinityTerm matches the incoming pod
func getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo, enableNamespaceSelector bool) topologyToMatchedTermCount {
func (pl *InterPodAffinity) getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo, enableNamespaceSelector bool) topologyToMatchedTermCount {
topoMaps := make([]topologyToMatchedTermCount, len(nodes))
index := int32(-1)
processNode := func(i int) {
@ -175,7 +174,7 @@ func getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*fr
topoMaps[atomic.AddInt32(&index, 1)] = topoMap
}
}
parallelize.Until(context.Background(), len(nodes), processNode)
pl.parallelizer.Until(context.Background(), len(nodes), processNode)
result := make(topologyToMatchedTermCount)
for i := 0; i <= int(index); i++ {
@ -189,7 +188,7 @@ func getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*fr
// It returns a topologyToMatchedTermCount that are checked later by the affinity
// predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
// need to check all the pods in the cluster.
func getIncomingAffinityAntiAffinityCounts(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo, enableNamespaceSelector bool) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo, enableNamespaceSelector bool) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
affinityCounts := make(topologyToMatchedTermCount)
antiAffinityCounts := make(topologyToMatchedTermCount)
if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 {
@ -221,7 +220,7 @@ func getIncomingAffinityAntiAffinityCounts(podInfo *framework.PodInfo, allNodes
antiAffinityCountsList[k] = antiAffinity
}
}
parallelize.Until(context.Background(), len(allNodes), processNode)
pl.parallelizer.Until(context.Background(), len(allNodes), processNode)
for i := 0; i <= int(index); i++ {
affinityCounts.append(affinityCountsList[i])
@ -266,8 +265,8 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister)
}
s.existingAntiAffinityCounts = getExistingAntiAffinityCounts(pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods, pl.enableNamespaceSelector)
s.affinityCounts, s.antiAffinityCounts = getIncomingAffinityAntiAffinityCounts(s.podInfo, allNodes, pl.enableNamespaceSelector)
s.existingAntiAffinityCounts = pl.getExistingAntiAffinityCounts(pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods, pl.enableNamespaceSelector)
s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(s.podInfo, allNodes, pl.enableNamespaceSelector)
cycleState.Write(preFilterStateKey, s)
return nil

View File

@ -1016,7 +1016,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
EnablePodAffinityNamespaceSelector: !test.disableNSSelector,
})
}
p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, namespaces)
p := plugintesting.SetupPluginWithInformers(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, namespaces)
state := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod)
if !preFilterStatus.IsSuccess() {
@ -1908,7 +1908,7 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{})
}
p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot,
p := plugintesting.SetupPluginWithInformers(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot,
[]runtime.Object{
&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "NS1"}},
})
@ -1933,9 +1933,12 @@ func TestPreFilterDisabled(t *testing.T) {
nodeInfo := framework.NewNodeInfo()
node := v1.Node{}
nodeInfo.SetNode(&node)
p := &InterPodAffinity{}
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{})
}
p := plugintesting.SetupPlugin(t, n, &config.InterPodAffinityArgs{}, cache.NewEmptySnapshot())
cycleState := framework.NewCycleState()
gotStatus := p.Filter(context.Background(), cycleState, pod, nodeInfo)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, pod, nodeInfo)
wantStatus := framework.AsStatus(fmt.Errorf(`error reading "PreFilterInterPodAffinity" from cycleState: %w`, framework.ErrNotFound))
if !reflect.DeepEqual(gotStatus, wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus)
@ -2204,7 +2207,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{})
}
p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, nil)
p := plugintesting.SetupPlugin(t, n, &config.InterPodAffinityArgs{}, snapshot)
cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pendingPod)
if !preFilterStatus.IsSuccess() {
@ -2485,9 +2488,13 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := cache.NewSnapshot(tt.existingPods, tt.nodes)
l, _ := s.NodeInfos().List()
gotAffinityPodsMap, gotAntiAffinityPodsMap := getIncomingAffinityAntiAffinityCounts(framework.NewPodInfo(tt.pod), l, true)
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
l, _ := snapshot.NodeInfos().List()
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{})
}
p := plugintesting.SetupPlugin(t, n, &config.InterPodAffinityArgs{}, snapshot)
gotAffinityPodsMap, gotAntiAffinityPodsMap := p.(*InterPodAffinity).getIncomingAffinityAntiAffinityCounts(framework.NewPodInfo(tt.pod), l, true)
if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) {
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap)
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)
const (
@ -41,6 +42,7 @@ var _ framework.ScorePlugin = &InterPodAffinity{}
// InterPodAffinity is a plugin that checks inter pod affinity
type InterPodAffinity struct {
parallelizer parallelize.Parallelizer
args config.InterPodAffinityArgs
sharedLister framework.SharedLister
nsLister listersv1.NamespaceLister
@ -65,6 +67,7 @@ func New(plArgs runtime.Object, h framework.Handle, fts feature.Features) (frame
return nil, err
}
pl := &InterPodAffinity{
parallelizer: h.Parallelizer(),
args: args,
sharedLister: h.SnapshotSharedLister(),
enableNamespaceSelector: fts.EnablePodAffinityNamespaceSelector,

View File

@ -25,7 +25,6 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)
// preScoreStateKey is the key in CycleState to InterPodAffinity pre-computed data for Scoring.
@ -206,7 +205,7 @@ func (pl *InterPodAffinity) PreScore(
topoScores[atomic.AddInt32(&index, 1)] = topoScore
}
}
parallelize.Until(context.Background(), len(allNodes), processNode)
pl.parallelizer.Until(context.Background(), len(allNodes), processNode)
for i := 0; i <= int(index); i++ {
state.topologyScore.append(topoScores[i])

View File

@ -743,13 +743,12 @@ func TestPreferredAffinity(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.pods, test.nodes)
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{
EnablePodAffinityNamespaceSelector: !test.disableNSSelector,
})
}
p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: 1}, snapshot, namespaces)
p := plugintesting.SetupPluginWithInformers(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: 1}, cache.NewSnapshot(test.pods, test.nodes), namespaces)
status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes)
if !status.IsSuccess() {
if !strings.Contains(status.Message(), test.wantStatus.Message()) {
@ -910,13 +909,12 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.pods, test.nodes)
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{
EnablePodAffinityNamespaceSelector: !test.disableNSSelector,
})
}
p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, snapshot, namespaces)
p := plugintesting.SetupPluginWithInformers(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, cache.NewSnapshot(test.pods, test.nodes), namespaces)
status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)

View File

@ -27,7 +27,6 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)
const preFilterStateKey = "PreFilter" + Name
@ -258,7 +257,7 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
atomic.AddInt32(tpCount, int32(count))
}
}
parallelize.Until(context.Background(), len(allNodes), processNode)
pl.parallelizer.Until(context.Background(), len(allNodes), processNode)
// calculate min match for each topology pair
for i := 0; i < len(constraints); i++ {

View File

@ -27,11 +27,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/utils/pointer"
)
@ -515,16 +514,13 @@ func TestPreFilterState(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0)
pl := PodTopologySpread{
sharedLister: cache.NewSnapshot(tt.existingPods, tt.nodes),
defaultConstraints: tt.defaultConstraints,
args := &config.PodTopologySpreadArgs{
DefaultConstraints: tt.defaultConstraints,
DefaultingType: config.ListDefaulting,
}
pl.setListers(informerFactory)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
p := plugintesting.SetupPluginWithInformers(ctx, t, New, args, cache.NewSnapshot(tt.existingPods, tt.nodes), tt.objs)
cs := framework.NewCycleState()
if s := pl.PreFilter(ctx, cs, tt.pod); !s.IsSuccess() {
if s := p.(*PodTopologySpread).PreFilter(ctx, cs, tt.pod); !s.IsSuccess() {
t.Fatal(s.AsError())
}
got, err := getPreFilterState(cs)
@ -825,20 +821,19 @@ func TestPreFilterStateAddPod(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
pl := PodTopologySpread{
sharedLister: snapshot,
}
cs := framework.NewCycleState()
ctx := context.Background()
if s := pl.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() {
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread)
cs := framework.NewCycleState()
if s := p.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() {
t.Fatal(s.AsError())
}
nodeInfo, err := snapshot.Get(tt.nodes[tt.nodeIdx].Name)
if err != nil {
t.Fatal(err)
}
if s := pl.AddPod(ctx, cs, tt.preemptor, framework.NewPodInfo(tt.addedPod), nodeInfo); !s.IsSuccess() {
if s := p.AddPod(ctx, cs, tt.preemptor, framework.NewPodInfo(tt.addedPod), nodeInfo); !s.IsSuccess() {
t.Fatal(s.AsError())
}
state, err := getPreFilterState(cs)
@ -1030,13 +1025,12 @@ func TestPreFilterStateRemovePod(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
pl := PodTopologySpread{
sharedLister: snapshot,
}
cs := framework.NewCycleState()
ctx := context.Background()
s := pl.PreFilter(ctx, cs, tt.preemptor)
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread)
cs := framework.NewCycleState()
s := p.PreFilter(ctx, cs, tt.preemptor)
if !s.IsSuccess() {
t.Fatal(s.AsError())
}
@ -1050,7 +1044,7 @@ func TestPreFilterStateRemovePod(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if s := pl.RemovePod(ctx, cs, tt.preemptor, framework.NewPodInfo(deletedPod), nodeInfo); !s.IsSuccess() {
if s := p.RemovePod(ctx, cs, tt.preemptor, framework.NewPodInfo(deletedPod), nodeInfo); !s.IsSuccess() {
t.Fatal(s.AsError())
}
@ -1106,22 +1100,21 @@ func BenchmarkFilter(b *testing.B) {
var state *framework.CycleState
b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes, _ := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
pl := PodTopologySpread{
sharedLister: cache.NewSnapshot(existingPods, allNodes),
}
ctx := context.Background()
pl := plugintesting.SetupPlugin(b, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes))
p := pl.(*PodTopologySpread)
b.ResetTimer()
for i := 0; i < b.N; i++ {
state = framework.NewCycleState()
s := pl.PreFilter(ctx, state, tt.pod)
s := p.PreFilter(ctx, state, tt.pod)
if !s.IsSuccess() {
b.Fatal(s.AsError())
}
filterNode := func(i int) {
n, _ := pl.sharedLister.NodeInfos().Get(allNodes[i].Name)
pl.Filter(ctx, state, tt.pod, n)
n, _ := p.sharedLister.NodeInfos().Get(allNodes[i].Name)
p.Filter(ctx, state, tt.pod, n)
}
parallelize.Until(ctx, len(allNodes), filterNode)
p.parallelizer.Until(ctx, len(allNodes), filterNode)
}
})
b.Run(tt.name+"/Clone", func(b *testing.B) {
@ -1411,7 +1404,8 @@ func TestSingleConstraint(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
p := &PodTopologySpread{sharedLister: snapshot}
pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread)
state := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), state, tt.pod)
if !preFilterStatus.IsSuccess() {
@ -1637,7 +1631,8 @@ func TestMultipleConstraints(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
p := &PodTopologySpread{sharedLister: snapshot}
pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread)
state := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), state, tt.pod)
if !preFilterStatus.IsSuccess() {
@ -1660,9 +1655,9 @@ func TestPreFilterDisabled(t *testing.T) {
nodeInfo := framework.NewNodeInfo()
node := v1.Node{}
nodeInfo.SetNode(&node)
p := &PodTopologySpread{}
p := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewEmptySnapshot())
cycleState := framework.NewCycleState()
gotStatus := p.Filter(context.Background(), cycleState, pod, nodeInfo)
gotStatus := p.(*PodTopologySpread).Filter(context.Background(), cycleState, pod, nodeInfo)
wantStatus := framework.AsStatus(fmt.Errorf(`reading "PreFilterPodTopologySpread" from cycleState: %w`, framework.ErrNotFound))
if !reflect.DeepEqual(gotStatus, wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus)

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)
const (
@ -51,6 +52,7 @@ var systemDefaultConstraints = []v1.TopologySpreadConstraint{
// PodTopologySpread is a plugin that ensures pod's topologySpreadConstraints is satisfied.
type PodTopologySpread struct {
parallelizer parallelize.Parallelizer
defaultConstraints []v1.TopologySpreadConstraint
sharedLister framework.SharedLister
services corelisters.ServiceLister
@ -87,6 +89,7 @@ func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) {
return nil, err
}
pl := &PodTopologySpread{
parallelizer: h.Parallelizer(),
sharedLister: h.SnapshotSharedLister(),
defaultConstraints: args.DefaultConstraints,
}

View File

@ -26,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/framework"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)
const preScoreStateKey = "PreScore" + Name
@ -163,7 +162,7 @@ func (pl *PodTopologySpread) PreScore(
atomic.AddInt64(tpCount, int64(count))
}
}
parallelize.Until(ctx, len(allNodes), processAllNode)
pl.parallelizer.Until(ctx, len(allNodes), processAllNode)
cycleState.Write(preScoreStateKey, state)
return nil

View File

@ -29,9 +29,9 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/utils/pointer"
)
@ -687,8 +687,8 @@ func TestPodTopologySpreadScore(t *testing.T) {
allNodes := append([]*v1.Node{}, tt.nodes...)
allNodes = append(allNodes, tt.failedNodes...)
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(tt.existingPods, allNodes)
p := &PodTopologySpread{sharedLister: snapshot}
pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(tt.existingPods, allNodes))
p := pl.(*PodTopologySpread)
status := p.PreScore(context.Background(), state, tt.pod, tt.nodes)
if !status.IsSuccess() {
@ -757,8 +757,8 @@ func BenchmarkTestPodTopologySpreadScore(b *testing.B) {
b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(existingPods, allNodes)
p := &PodTopologySpread{sharedLister: snapshot}
pl := plugintesting.SetupPlugin(b, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes))
p := pl.(*PodTopologySpread)
status := p.PreScore(context.Background(), state, tt.pod, filteredNodes)
if !status.IsSuccess() {
@ -854,7 +854,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
score, _ := p.Score(ctx, state, pod, n.Name)
gotList[i] = framework.NodeScore{Name: n.Name, Score: score}
}
parallelize.Until(ctx, len(filteredNodes), scoreNode)
p.parallelizer.Until(ctx, len(filteredNodes), scoreNode)
status = p.NormalizeScore(ctx, state, pod, gotList)
if !status.IsSuccess() {
b.Fatal(status)

View File

@ -88,7 +88,8 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
score, _ := plugin.Score(ctx, state, pod, n.Name)
gotList[i] = framework.NodeScore{Name: n.Name, Score: score}
}
parallelize.Until(ctx, len(filteredNodes), scoreNode)
parallelizer := parallelize.NewParallelizer(parallelize.DefaultParallelism)
parallelizer.Until(ctx, len(filteredNodes), scoreNode)
status = plugin.NormalizeScore(ctx, state, pod, gotList)
if !status.IsSuccess() {
b.Fatal(status)

View File

@ -29,13 +29,13 @@ import (
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
)
// SetupPlugin creates a plugin using a framework handle that includes
// SetupPluginWithInformers creates a plugin using a framework handle that includes
// the provided sharedLister and a SharedInformerFactory with the provided objects.
// The function also creates an empty namespace (since most tests creates pods with
// empty namespace), and start informer factory.
func SetupPlugin(
func SetupPluginWithInformers(
ctx context.Context,
t *testing.T,
tb testing.TB,
pf frameworkruntime.PluginFactory,
config runtime.Object,
sharedLister framework.SharedLister,
@ -47,13 +47,33 @@ func SetupPlugin(
frameworkruntime.WithSnapshotSharedLister(sharedLister),
frameworkruntime.WithInformerFactory(informerFactory))
if err != nil {
t.Fatalf("Failed creating framework runtime: %v", err)
tb.Fatalf("Failed creating framework runtime: %v", err)
}
p, err := pf(config, fh)
if err != nil {
t.Fatal(err)
tb.Fatal(err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
return p
}
// SetupPlugin creates a plugin using a framework handle that includes
// the provided sharedLister.
func SetupPlugin(
tb testing.TB,
pf frameworkruntime.PluginFactory,
config runtime.Object,
sharedLister framework.SharedLister,
) framework.Plugin {
fh, err := frameworkruntime.NewFramework(nil, nil,
frameworkruntime.WithSnapshotSharedLister(sharedLister))
if err != nil {
tb.Fatalf("Failed creating framework runtime: %v", err)
}
p, err := pf(config, fh)
if err != nil {
tb.Fatal(err)
}
return p
}

View File

@ -101,6 +101,8 @@ type frameworkImpl struct {
extenders []framework.Extender
framework.PodNominator
parallelizer parallelize.Parallelizer
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
// after the first failure.
runAllFilters bool
@ -149,6 +151,7 @@ type frameworkOptions struct {
runAllFilters bool
captureProfile CaptureProfile
clusterEventMap map[framework.ClusterEvent]sets.String
parallelizer parallelize.Parallelizer
}
// Option for the frameworkImpl.
@ -204,6 +207,13 @@ func WithExtenders(extenders []framework.Extender) Option {
}
}
// WithParallelism sets parallelism for the scheduling frameworkImpl.
func WithParallelism(parallelism int) Option {
return func(o *frameworkOptions) {
o.parallelizer = parallelize.NewParallelizer(parallelism)
}
}
// CaptureProfile is a callback to capture a finalized profile.
type CaptureProfile func(config.KubeSchedulerProfile)
@ -218,6 +228,7 @@ func defaultFrameworkOptions() frameworkOptions {
return frameworkOptions{
metricsRecorder: newMetricsRecorder(1000, time.Second),
clusterEventMap: make(map[framework.ClusterEvent]sets.String),
parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism),
}
}
@ -249,6 +260,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
runAllFilters: options.runAllFilters,
extenders: options.extenders,
PodNominator: options.podNominator,
parallelizer: options.parallelizer,
}
if profile == nil {
@ -757,7 +769,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
errCh := parallelize.NewErrorChannel()
// Run Score method for each node in parallel.
parallelize.Until(ctx, len(nodes), func(index int) {
f.Parallelizer().Until(ctx, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
@ -777,7 +789,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
}
// Run NormalizeScore method for each ScorePlugin in parallel.
parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
nodeScoreList := pluginToNodeScores[pl.Name()]
if pl.ScoreExtensions() == nil {
@ -795,7 +807,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
}
// Apply score defaultWeights for each ScorePlugin in parallel.
parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// Score plugins' weight has been checked when they are initialized.
weight := f.pluginNameToWeightMap[pl.Name()]
@ -1169,3 +1181,8 @@ func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) map[string]config
func (f *frameworkImpl) ProfileName() string {
return f.profileName
}
// Parallelizer returns a parallelizer holding parallelism for scheduler.
func (f *frameworkImpl) Parallelizer() parallelize.Parallelizer {
return f.parallelizer
}

View File

@ -23,25 +23,23 @@ import (
"k8s.io/client-go/util/workqueue"
)
var (
parallelism = 16
)
// DefaultParallelism is the default parallelism used in scheduler.
const DefaultParallelism int = 16
// GetParallelism returns the currently set parallelism.
func GetParallelism() int {
return parallelism
// Parallelizer holds the parallelism for scheduler.
type Parallelizer struct {
parallelism int
}
// SetParallelism sets the parallelism for all scheduler algorithms.
// TODO(#95952): Remove global setter in favor of a struct that holds the configuration.
func SetParallelism(p int) {
parallelism = p
// NewParallelizer returns an object holding the parallelism.
func NewParallelizer(p int) Parallelizer {
return Parallelizer{parallelism: p}
}
// chunkSizeFor returns a chunk size for the given number of items to use for
// parallel work. The size aims to produce good CPU utilization.
// returns max(1, min(sqrt(n), n/Parallelism))
func chunkSizeFor(n int) int {
func chunkSizeFor(n, parallelism int) int {
s := int(math.Sqrt(float64(n)))
if r := n/parallelism + 1; s > r {
@ -53,6 +51,6 @@ func chunkSizeFor(n int) int {
}
// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
func Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) {
workqueue.ParallelizeUntil(ctx, parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces)))
func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) {
workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism)))
}

View File

@ -46,8 +46,8 @@ func TestChunkSize(t *testing.T) {
for _, test := range tests {
t.Run(fmt.Sprintf("%d", test.input), func(t *testing.T) {
if chunkSizeFor(test.input) != test.wantOutput {
t.Errorf("Expected: %d, got: %d", test.wantOutput, chunkSizeFor(test.input))
if chunkSizeFor(test.input, DefaultParallelism) != test.wantOutput {
t.Errorf("Expected: %d, got: %d", test.wantOutput, chunkSizeFor(test.input, DefaultParallelism))
}
})
}

View File

@ -98,6 +98,7 @@ type schedulerOptions struct {
profiles []schedulerapi.KubeSchedulerProfile
extenders []schedulerapi.Extender
frameworkCapturer FrameworkCapturer
parallelism int32
}
// Option configures a Scheduler
@ -112,10 +113,9 @@ func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
}
// WithParallelism sets the parallelism for all scheduler algorithms. Default is 16.
// TODO(#95952): Remove global setter in favor of a struct that holds the configuration.
func WithParallelism(threads int32) Option {
return func(o *schedulerOptions) {
parallelize.SetParallelism(int(threads))
o.parallelism = threads
}
}
@ -183,6 +183,7 @@ var defaultSchedulerOptions = schedulerOptions{
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
parallelism: int32(parallelize.DefaultParallelism),
}
// New returns a Scheduler
@ -225,6 +226,7 @@ func New(client clientset.Interface,
nodeInfoSnapshot: snapshot,
extenders: options.extenders,
frameworkCapturer: options.frameworkCapturer,
parallellism: options.parallelism,
}
metrics.Register()