Merge pull request #86129 from damemi/remove-priority-code

Remove priority execution path from prioritizeNodes in generic_scheduler
This commit is contained in:
Kubernetes Prow Robot 2019-12-13 01:41:04 -08:00 committed by GitHub
commit c34d140241
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 240 additions and 283 deletions

View File

@ -39,13 +39,12 @@ import (
) )
type testCase struct { type testCase struct {
name string name string
JSON string JSON string
featureGates map[featuregate.Feature]bool featureGates map[featuregate.Feature]bool
wantPredicates sets.String wantPredicates sets.String
wantPrioritizers sets.String wantPlugins map[string][]config.Plugin
wantPlugins map[string][]config.Plugin wantExtenders []config.Extender
wantExtenders []config.Extender
} }
func TestCompatibility_v1_Scheduler(t *testing.T) { func TestCompatibility_v1_Scheduler(t *testing.T) {
@ -99,9 +98,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"PodFitsPorts", "PodFitsPorts",
), ),
wantPrioritizers: sets.NewString(
"ServiceSpreadingPriority",
),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@ -144,8 +140,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}} {"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}}
] ]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@ -197,8 +192,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}} {"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}}
] ]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@ -259,8 +253,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{"name": "InterPodAffinityPriority", "weight": 2} {"name": "InterPodAffinityPriority", "weight": 2}
] ]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -327,8 +320,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{"name": "MostRequestedPriority", "weight": 2} {"name": "MostRequestedPriority", "weight": 2}
] ]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -406,8 +398,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"nodeCacheCapable": true "nodeCacheCapable": true
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -496,8 +487,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"nodeCacheCapable": true "nodeCacheCapable": true
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -587,8 +577,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"nodeCacheCapable": true "nodeCacheCapable": true
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -682,8 +671,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ignorable":true "ignorable":true
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -789,8 +777,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ignorable":true "ignorable":true
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -898,8 +885,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ignorable":true "ignorable":true
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -1007,8 +993,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ignorable":true "ignorable":true
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -1121,8 +1106,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ignorable":true "ignorable":true
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -1201,7 +1185,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}, },
} }
registeredPredicates := sets.NewString(scheduler.ListRegisteredFitPredicates()...) registeredPredicates := sets.NewString(scheduler.ListRegisteredFitPredicates()...)
registeredPriorities := sets.NewString(scheduler.ListRegisteredPriorityFunctions()...)
seenPredicates := sets.NewString() seenPredicates := sets.NewString()
seenPriorities := sets.NewString() seenPriorities := sets.NewString()
mandatoryPredicates := sets.NewString() mandatoryPredicates := sets.NewString()
@ -1285,14 +1268,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
t.Errorf("Got predicates %v, want %v", gotPredicates, wantPredicates) t.Errorf("Got predicates %v, want %v", gotPredicates, wantPredicates)
} }
gotPrioritizers := sets.NewString()
for _, p := range sched.Algorithm.Prioritizers() {
gotPrioritizers.Insert(p.Name)
}
if !gotPrioritizers.Equal(tc.wantPrioritizers) {
t.Errorf("Got prioritizers %v, want %v", gotPrioritizers, tc.wantPrioritizers)
}
gotPlugins := sched.Framework.ListPlugins() gotPlugins := sched.Framework.ListPlugins()
for _, p := range gotPlugins["FilterPlugin"] { for _, p := range gotPlugins["FilterPlugin"] {
seenPredicates.Insert(filterToPredicateMap[p.Name]) seenPredicates.Insert(filterToPredicateMap[p.Name])
@ -1326,16 +1301,12 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
} }
seenPredicates = seenPredicates.Union(gotPredicates) seenPredicates = seenPredicates.Union(gotPredicates)
seenPriorities = seenPriorities.Union(gotPrioritizers)
}) })
} }
if !seenPredicates.HasAll(registeredPredicates.List()...) { if !seenPredicates.HasAll(registeredPredicates.List()...) {
t.Errorf("Registered predicates are missing from compatibility test (add to test stanza for version currently in development): %#v", registeredPredicates.Difference(seenPredicates).List()) t.Errorf("Registered predicates are missing from compatibility test (add to test stanza for version currently in development): %#v", registeredPredicates.Difference(seenPredicates).List())
} }
if !seenPriorities.HasAll(registeredPriorities.List()...) {
t.Errorf("Registered priorities are missing from compatibility test (add to test stanza for version currently in development): %#v", registeredPriorities.Difference(seenPriorities).List())
}
} }
func pluginsToStringSet(plugins []config.Plugin) sets.String { func pluginsToStringSet(plugins []config.Plugin) sets.String {

View File

@ -28,7 +28,6 @@ go_library(
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
@ -55,13 +54,13 @@ go_test(
"//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/extender/v1:go_default_library", "//pkg/scheduler/apis/extender/v1:go_default_library",
"//pkg/scheduler/framework/plugins/defaultpodtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library", "//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library", "//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library", "//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library", "//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library",
@ -72,7 +71,6 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",

View File

@ -18,7 +18,6 @@ package core
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
@ -28,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@ -108,16 +108,28 @@ func machine2PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.Node
return &result, nil return &result, nil
} }
func machine2Prioritizer(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { type machine2PrioritizerPlugin struct{}
node := nodeInfo.Node()
if node == nil { func newMachine2PrioritizerPlugin() framework.PluginFactory {
return framework.NodeScore{}, errors.New("node not found") return func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &machine2PrioritizerPlugin{}, nil
} }
}
func (pl *machine2PrioritizerPlugin) Name() string {
return "Machine2Prioritizer"
}
func (pl *machine2PrioritizerPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
score := 10 score := 10
if node.Name == "machine2" { if nodeName == "machine2" {
score = 100 score = 100
} }
return framework.NodeScore{Name: node.Name, Score: int64(score)}, nil return int64(score), nil
}
func (pl *machine2PrioritizerPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
} }
type FakeExtender struct { type FakeExtender struct {
@ -351,7 +363,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
registerFilterPlugin st.RegisterFilterPluginFunc registerFilterPlugin st.RegisterFilterPluginFunc
prioritizers []priorities.PriorityConfig registerScorePlugin st.RegisterScorePluginFunc
extenders []FakeExtender extenders []FakeExtender
nodes []string nodes []string
expectedResult ScheduleResult expectedResult ScheduleResult
@ -458,7 +470,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
}, },
{ {
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 20}}, registerScorePlugin: st.RegisterScorePlugin("Machine2Prioritizer", newMachine2PrioritizerPlugin(), 20),
extenders: []FakeExtender{ extenders: []FakeExtender{
{ {
predicates: []fitPredicate{truePredicateExtender}, predicates: []fitPredicate{truePredicateExtender},
@ -483,7 +495,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
// because of the errors from errorPredicateExtender and/or // because of the errors from errorPredicateExtender and/or
// errorPrioritizerExtender. // errorPrioritizerExtender.
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 1}}, registerScorePlugin: st.RegisterScorePlugin("Machine2Prioritizer", newMachine2PrioritizerPlugin(), 1),
extenders: []FakeExtender{ extenders: []FakeExtender{
{ {
predicates: []fitPredicate{errorPredicateExtender}, predicates: []fitPredicate{errorPredicateExtender},
@ -545,9 +557,13 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
registry := framework.Registry{} registry := framework.Registry{}
plugins := &schedulerapi.Plugins{ plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{}, Filter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
} }
var pluginConfigs []schedulerapi.PluginConfig var pluginConfigs []schedulerapi.PluginConfig
test.registerFilterPlugin(&registry, plugins, pluginConfigs) test.registerFilterPlugin(&registry, plugins, pluginConfigs)
if test.registerScorePlugin != nil {
test.registerScorePlugin(&registry, plugins, pluginConfigs)
}
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs) fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
@ -555,7 +571,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
queue, queue,
nil, nil,
predicates.EmptyMetadataProducer, predicates.EmptyMetadataProducer,
test.prioritizers,
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, emptySnapshot,
fwk, fwk,

View File

@ -33,7 +33,6 @@ import (
policy "k8s.io/api/policy/v1beta1" policy "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/errors"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1" policylisters "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@ -129,9 +128,6 @@ type ScheduleAlgorithm interface {
Predicates() map[string]predicates.FitPredicate Predicates() map[string]predicates.FitPredicate
// Prioritizers returns a slice of priority config. This is exposed for // Prioritizers returns a slice of priority config. This is exposed for
// testing. // testing.
Prioritizers() []priorities.PriorityConfig
// Extenders returns a slice of extender config. This is exposed for
// testing.
Extenders() []algorithm.SchedulerExtender Extenders() []algorithm.SchedulerExtender
// GetPredicateMetadataProducer returns the predicate metadata producer. This is needed // GetPredicateMetadataProducer returns the predicate metadata producer. This is needed
// for cluster autoscaler integration. // for cluster autoscaler integration.
@ -693,11 +689,10 @@ func (g *genericScheduler) podFitsOnNode(
return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
} }
// prioritizeNodes prioritizes the nodes by running the individual priority functions in parallel. // prioritizeNodes prioritizes the nodes by running the score plugins,
// Each priority function is expected to set a score of 0-10 // which return a score for each node from the call to RunScorePlugins().
// 0 is the lowest priority score (least preferred node) and 10 is the highest // The scores from each plugin are added together to make the score for that node, then
// Each priority function can also have its own weight // any extenders are run as well.
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes // All scores are finally combined (added) to get the total weighted scores of all nodes
func (g *genericScheduler) prioritizeNodes( func (g *genericScheduler) prioritizeNodes(
ctx context.Context, ctx context.Context,
@ -708,7 +703,7 @@ func (g *genericScheduler) prioritizeNodes(
) (framework.NodeScoreList, error) { ) (framework.NodeScoreList, error) {
// If no priority configs are provided, then all nodes will have a score of one. // If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format // This is required to generate the priority list in the required format
if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() { if len(g.extenders) == 0 && !g.framework.HasScorePlugins() {
result := make(framework.NodeScoreList, 0, len(nodes)) result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes { for i := range nodes {
result = append(result, framework.NodeScore{ result = append(result, framework.NodeScore{
@ -719,62 +714,6 @@ func (g *genericScheduler) prioritizeNodes(
return result, nil return result, nil
} }
var (
mu = sync.Mutex{}
wg = sync.WaitGroup{}
errs []error
)
appendError := func(err error) {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
results := make([]framework.NodeScoreList, len(g.prioritizers))
for i := range g.prioritizers {
results[i] = make(framework.NodeScoreList, len(nodes))
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
for i := range g.prioritizers {
var err error
results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Name = nodes[index].Name
}
}
})
for i := range g.prioritizers {
if g.prioritizers[i].Reduce == nil {
continue
}
wg.Add(1)
go func(index int) {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()
wg.Done()
}()
if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {
appendError(err)
}
if klog.V(10) {
for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Name, g.prioritizers[index].Name, hostPriority.Score)
}
}
}(i)
}
// Wait for all computations to be finished.
wg.Wait()
if len(errs) != 0 {
return framework.NodeScoreList{}, errors.NewAggregate(errs)
}
// Run the Score plugins. // Run the Score plugins.
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta}) state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta})
scoresMap, scoreStatus := g.framework.RunScorePlugins(ctx, state, pod, nodes) scoresMap, scoreStatus := g.framework.RunScorePlugins(ctx, state, pod, nodes)
@ -787,16 +726,14 @@ func (g *genericScheduler) prioritizeNodes(
for i := range nodes { for i := range nodes {
result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
for j := range g.prioritizers {
result[i].Score += results[j][i].Score * g.prioritizers[j].Weight
}
for j := range scoresMap { for j := range scoresMap {
result[i].Score += scoresMap[j][i].Score result[i].Score += scoresMap[j][i].Score
} }
} }
if len(g.extenders) != 0 && nodes != nil { if len(g.extenders) != 0 && nodes != nil {
var mu sync.Mutex
var wg sync.WaitGroup
combinedScores := make(map[string]int64, len(g.nodeInfoSnapshot.NodeInfoList)) combinedScores := make(map[string]int64, len(g.nodeInfoSnapshot.NodeInfoList))
for i := range g.extenders { for i := range g.extenders {
if !g.extenders[i].IsInterested(pod) { if !g.extenders[i].IsInterested(pod) {
@ -1258,7 +1195,6 @@ func NewGenericScheduler(
podQueue internalqueue.SchedulingQueue, podQueue internalqueue.SchedulingQueue,
predicates map[string]predicates.FitPredicate, predicates map[string]predicates.FitPredicate,
predicateMetaProducer predicates.MetadataProducer, predicateMetaProducer predicates.MetadataProducer,
prioritizers []priorities.PriorityConfig,
priorityMetaProducer priorities.MetadataProducer, priorityMetaProducer priorities.MetadataProducer,
nodeInfoSnapshot *nodeinfosnapshot.Snapshot, nodeInfoSnapshot *nodeinfosnapshot.Snapshot,
framework framework.Framework, framework framework.Framework,
@ -1275,7 +1211,6 @@ func NewGenericScheduler(
schedulingQueue: podQueue, schedulingQueue: podQueue,
predicates: predicates, predicates: predicates,
predicateMetaProducer: predicateMetaProducer, predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer, priorityMetaProducer: priorityMetaProducer,
framework: framework, framework: framework,
extenders: extenders, extenders: extenders,

View File

@ -32,7 +32,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@ -43,13 +42,13 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1" extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -173,60 +172,120 @@ func NewFakeFilterPlugin(failedNodeReturnCodeMap map[string]framework.Code) fram
} }
} }
func numericMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { type numericMapPlugin struct{}
node := nodeInfo.Node()
score, err := strconv.Atoi(node.Name)
if err != nil {
return framework.NodeScore{}, err
}
return framework.NodeScore{ func newNumericMapPlugin() framework.PluginFactory {
Name: node.Name, return func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
Score: int64(score), return &numericMapPlugin{}, nil
}, nil }
} }
func reverseNumericReducePriority(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error { func (pl *numericMapPlugin) Name() string {
return "NumericMap"
}
func (pl *numericMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
score, err := strconv.Atoi(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Error converting nodename to int: %+v", nodeName))
}
return int64(score), nil
}
func (pl *numericMapPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
}
type reverseNumericMapPlugin struct{}
func newReverseNumericMapPlugin() framework.PluginFactory {
return func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &reverseNumericMapPlugin{}, nil
}
}
func (pl *reverseNumericMapPlugin) Name() string {
return "ReverseNumericMap"
}
func (pl *reverseNumericMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
score, err := strconv.Atoi(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Error converting nodename to int: %+v", nodeName))
}
return int64(score), nil
}
func (pl *reverseNumericMapPlugin) ScoreExtensions() framework.ScoreExtensions {
return pl
}
func (pl *reverseNumericMapPlugin) NormalizeScore(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeScores framework.NodeScoreList) *framework.Status {
var maxScore float64 var maxScore float64
minScore := math.MaxFloat64 minScore := math.MaxFloat64
for _, hostPriority := range result { for _, hostPriority := range nodeScores {
maxScore = math.Max(maxScore, float64(hostPriority.Score)) maxScore = math.Max(maxScore, float64(hostPriority.Score))
minScore = math.Min(minScore, float64(hostPriority.Score)) minScore = math.Min(minScore, float64(hostPriority.Score))
} }
for i, hostPriority := range result { for i, hostPriority := range nodeScores {
result[i] = framework.NodeScore{ nodeScores[i] = framework.NodeScore{
Name: hostPriority.Name, Name: hostPriority.Name,
Score: int64(maxScore + minScore - float64(hostPriority.Score)), Score: int64(maxScore + minScore - float64(hostPriority.Score)),
} }
} }
return nil return nil
} }
func trueMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { type trueMapPlugin struct{}
return framework.NodeScore{
Name: nodeInfo.Node().Name, func newTrueMapPlugin() framework.PluginFactory {
Score: 1, return func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
}, nil return &trueMapPlugin{}, nil
}
} }
func falseMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { func (pl *trueMapPlugin) Name() string {
return framework.NodeScore{}, errPrioritize return "TrueMap"
} }
func getNodeReducePriority(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error { func (pl *trueMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (int64, *framework.Status) {
for _, host := range result { return 1, nil
}
func (pl *trueMapPlugin) ScoreExtensions() framework.ScoreExtensions {
return pl
}
func (pl *trueMapPlugin) NormalizeScore(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeScores framework.NodeScoreList) *framework.Status {
for _, host := range nodeScores {
if host.Name == "" { if host.Name == "" {
return fmt.Errorf("unexpected empty host name") return framework.NewStatus(framework.Error, "unexpected empty host name")
} }
} }
return nil return nil
} }
// emptyPluginRegistry is a test plugin set used by the default scheduler. type falseMapPlugin struct{}
var emptyPluginRegistry = framework.Registry{}
var emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, []schedulerapi.PluginConfig{}) func newFalseMapPlugin() framework.PluginFactory {
return func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &falseMapPlugin{}, nil
}
}
func (pl *falseMapPlugin) Name() string {
return "FalseMap"
}
func (pl *falseMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (int64, *framework.Status) {
return 0, framework.NewStatus(framework.Error, errPrioritize.Error())
}
func (pl *falseMapPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
}
var emptySnapshot = nodeinfosnapshot.NewEmptySnapshot() var emptySnapshot = nodeinfosnapshot.NewEmptySnapshot()
func makeNodeList(nodeNames []string) []*v1.Node { func makeNodeList(nodeNames []string) []*v1.Node {
@ -313,7 +372,7 @@ func TestGenericScheduler(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
registerFilterPlugins []st.RegisterFilterPluginFunc registerFilterPlugins []st.RegisterFilterPluginFunc
prioritizers []priorities.PriorityConfig registerScorePlugins []st.RegisterScorePluginFunc
alwaysCheckAllPredicates bool alwaysCheckAllPredicates bool
nodes []string nodes []string
pvcs []v1.PersistentVolumeClaim pvcs []v1.PersistentVolumeClaim
@ -365,7 +424,9 @@ func TestGenericScheduler(t *testing.T) {
registerFilterPlugins: []st.RegisterFilterPluginFunc{ registerFilterPlugins: []st.RegisterFilterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}},
expectedHosts: sets.NewString("3"), expectedHosts: sets.NewString("3"),
@ -376,7 +437,9 @@ func TestGenericScheduler(t *testing.T) {
registerFilterPlugins: []st.RegisterFilterPluginFunc{ registerFilterPlugins: []st.RegisterFilterPluginFunc{
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
expectedHosts: sets.NewString("2"), expectedHosts: sets.NewString("2"),
@ -387,16 +450,9 @@ func TestGenericScheduler(t *testing.T) {
registerFilterPlugins: []st.RegisterFilterPluginFunc{ registerFilterPlugins: []st.RegisterFilterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
}, },
prioritizers: []priorities.PriorityConfig{ registerScorePlugins: []st.RegisterScorePluginFunc{
{ st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
Map: numericMapPriority, st.RegisterScorePlugin("ReverseNumericMap", newReverseNumericMapPlugin(), 2),
Weight: 1,
},
{
Map: numericMapPriority,
Reduce: reverseNumericReducePriority,
Weight: 2,
},
}, },
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
@ -409,10 +465,12 @@ func TestGenericScheduler(t *testing.T) {
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("FalseFilter", NewFalseFilterPlugin), st.RegisterFilterPlugin("FalseFilter", NewFalseFilterPlugin),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
nodes: []string{"3", "2", "1"}, st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, },
name: "test 7", nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
name: "test 7",
wErr: &FitError{ wErr: &FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
NumAllNodes: 3, NumAllNodes: 3,
@ -429,6 +487,9 @@ func TestGenericScheduler(t *testing.T) {
st.RegisterFilterPlugin("NoPodsFilter", NewNoPodsFilterPlugin), st.RegisterFilterPlugin("NoPodsFilter", NewNoPodsFilterPlugin),
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
}, },
registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
pods: []*v1.Pod{ pods: []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}, ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")},
@ -440,10 +501,9 @@ func TestGenericScheduler(t *testing.T) {
}, },
}, },
}, },
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, nodes: []string{"1", "2"},
nodes: []string{"1", "2"}, name: "test 8",
name: "test 8",
wErr: &FitError{ wErr: &FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
NumAllNodes: 2, NumAllNodes: 2,
@ -530,11 +590,14 @@ func TestGenericScheduler(t *testing.T) {
registerFilterPlugins: []st.RegisterFilterPluginFunc{ registerFilterPlugins: []st.RegisterFilterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
}, },
prioritizers: []priorities.PriorityConfig{{Map: falseMapPriority, Weight: 1}, {Map: trueMapPriority, Reduce: getNodeReducePriority, Weight: 2}}, registerScorePlugins: []st.RegisterScorePluginFunc{
nodes: []string{"2", "1"}, st.RegisterScorePlugin("FalseMap", newFalseMapPlugin(), 1),
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}}, st.RegisterScorePlugin("TrueMap", newTrueMapPlugin(), 2),
name: "test error with priority map", },
wErr: errors.NewAggregate([]error{errPrioritize, errPrioritize}), nodes: []string{"2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}},
name: "test error with priority map",
wErr: fmt.Errorf("error while running score plugin for pod \"2\": %+v", errPrioritize),
}, },
{ {
name: "test even pods spread predicate - 2 nodes with maxskew=1", name: "test even pods spread predicate - 2 nodes with maxskew=1",
@ -644,7 +707,9 @@ func TestGenericScheduler(t *testing.T) {
NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}), NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}),
), ),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"3"}, nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil, expectedHosts: nil,
@ -665,7 +730,9 @@ func TestGenericScheduler(t *testing.T) {
NewFakeFilterPlugin(map[string]framework.Code{"3": framework.UnschedulableAndUnresolvable}), NewFakeFilterPlugin(map[string]framework.Code{"3": framework.UnschedulableAndUnresolvable}),
), ),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"3"}, nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil, expectedHosts: nil,
@ -686,7 +753,9 @@ func TestGenericScheduler(t *testing.T) {
NewFakeFilterPlugin(map[string]framework.Code{"1": framework.Unschedulable}), NewFakeFilterPlugin(map[string]framework.Code{"1": framework.Unschedulable}),
), ),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"1", "2"}, nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil, expectedHosts: nil,
@ -701,11 +770,15 @@ func TestGenericScheduler(t *testing.T) {
registry := framework.Registry{} registry := framework.Registry{}
plugins := &schedulerapi.Plugins{ plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{}, Filter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
} }
var pluginConfigs []schedulerapi.PluginConfig var pluginConfigs []schedulerapi.PluginConfig
for _, f := range test.registerFilterPlugins { for _, f := range test.registerFilterPlugins {
f(&registry, plugins, pluginConfigs) f(&registry, plugins, pluginConfigs)
} }
for _, f := range test.registerScorePlugins {
f(&registry, plugins, pluginConfigs)
}
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs) fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
cache := internalcache.New(time.Duration(0), wait.NeverStop) cache := internalcache.New(time.Duration(0), wait.NeverStop)
@ -730,7 +803,6 @@ func TestGenericScheduler(t *testing.T) {
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
nil, nil,
predMetaProducer, predMetaProducer,
test.prioritizers,
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, emptySnapshot,
fwk, fwk,
@ -778,7 +850,6 @@ func makeScheduler(nodes []*v1.Node, fns ...st.RegisterFilterPluginFunc) *generi
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
nil, nil,
algorithmpredicates.EmptyMetadataProducer, algorithmpredicates.EmptyMetadataProducer,
nil,
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, emptySnapshot,
fwk, fwk,
@ -908,7 +979,6 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
queue, queue,
nil, nil,
algorithmpredicates.EmptyMetadataProducer, algorithmpredicates.EmptyMetadataProducer,
nil,
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, emptySnapshot,
fwk, fwk,
@ -1063,22 +1133,6 @@ func TestZeroRequest(t *testing.T) {
client := clientsetfake.NewSimpleClientset() client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
// This should match the configuration in defaultPriorities() in
// pkg/scheduler/algorithmprovider/defaults/defaults.go if you want
// to test what's actually in production.
priorityConfigs := []priorities.PriorityConfig{
{Map: priorities.LeastRequestedPriorityMap, Weight: 1},
{Map: priorities.BalancedResourceAllocationMap, Weight: 1},
}
selectorSpreadPriorityMap, selectorSpreadPriorityReduce := priorities.NewSelectorSpreadPriority(
informerFactory.Core().V1().Services().Lister(),
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
)
pc := priorities.PriorityConfig{Map: selectorSpreadPriorityMap, Reduce: selectorSpreadPriorityReduce, Weight: 1}
priorityConfigs = append(priorityConfigs, pc)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes)) snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes))
metaDataProducer := priorities.NewMetadataFactory( metaDataProducer := priorities.NewMetadataFactory(
@ -1091,15 +1145,40 @@ func TestZeroRequest(t *testing.T) {
metaData := metaDataProducer(test.pod, test.nodes, snapshot) metaData := metaDataProducer(test.pod, test.nodes, snapshot)
registry := framework.Registry{}
plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
pluginRegistrations := []st.RegisterScorePluginFunc{
st.RegisterScorePlugin(noderesources.LeastAllocatedName, noderesources.NewLeastAllocated, 1),
st.RegisterScorePlugin(noderesources.BalancedAllocationName, noderesources.NewBalancedAllocation, 1),
st.RegisterScorePlugin(defaultpodtopologyspread.Name, defaultpodtopologyspread.New, 1),
}
for _, f := range pluginRegistrations {
f(&registry, plugins, pluginConfigs)
}
fwk, err := framework.NewFramework(
registry,
plugins,
pluginConfigs,
framework.WithInformerFactory(informerFactory),
framework.WithSnapshotSharedLister(snapshot),
framework.WithClientSet(client),
)
if err != nil {
t.Fatalf("error creating framework: %+v", err)
}
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
nil, nil,
nil, nil,
nil, nil,
nil, nil,
priorityConfigs,
metaDataProducer, metaDataProducer,
emptySnapshot, emptySnapshot,
emptyFramework, fwk,
[]algorithm.SchedulerExtender{}, []algorithm.SchedulerExtender{},
nil, nil,
nil, nil,
@ -1539,7 +1618,6 @@ func TestSelectNodesForPreemption(t *testing.T) {
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
nil, nil,
factory.GetPredicateMetadata, factory.GetPredicateMetadata,
nil,
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
snapshot, snapshot,
fwk, fwk,
@ -2283,7 +2361,6 @@ func TestPreempt(t *testing.T) {
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
nil, nil,
predMetaProducer, predMetaProducer,
[]priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, emptySnapshot,
fwk, fwk,

View File

@ -227,7 +227,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
return nil, err return nil, err
} }
priorityConfigs, pluginsForPriorities, pluginConfigForPriorities, err := c.getPriorityConfigs(priorityKeys) pluginsForPriorities, pluginConfigForPriorities, err := c.getPriorityConfigs(priorityKeys)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -285,7 +285,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
podQueue, podQueue,
predicateFuncs, predicateFuncs,
predicateMetaProducer, predicateMetaProducer,
priorityConfigs,
priorityMetaProducer, priorityMetaProducer,
c.nodeInfoSnapshot, c.nodeInfoSnapshot,
framework, framework,
@ -327,20 +326,20 @@ func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerEx
} }
} }
// getPriorityConfigs
// getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run // getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run
// as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was // as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was
// registered for that priority. // registered for that priority.
func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, *schedulerapi.Plugins, []schedulerapi.PluginConfig, error) { func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) (*schedulerapi.Plugins, []schedulerapi.PluginConfig, error) {
allPriorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, c.algorithmFactoryArgs) allPriorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, c.algorithmFactoryArgs)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
if c.pluginConfigProducerRegistry == nil { if c.pluginConfigProducerRegistry == nil {
return allPriorityConfigs, nil, nil, nil return nil, nil, nil
} }
var priorityConfigs []priorities.PriorityConfig
var plugins schedulerapi.Plugins var plugins schedulerapi.Plugins
var pluginConfig []schedulerapi.PluginConfig var pluginConfig []schedulerapi.PluginConfig
frameworkConfigProducers := c.pluginConfigProducerRegistry.PriorityToConfigProducer frameworkConfigProducers := c.pluginConfigProducerRegistry.PriorityToConfigProducer
@ -351,11 +350,9 @@ func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]prioritie
pl, pc := producer(args) pl, pc := producer(args)
plugins.Append(&pl) plugins.Append(&pl)
pluginConfig = append(pluginConfig, pc...) pluginConfig = append(pluginConfig, pc...)
} else {
priorityConfigs = append(priorityConfigs, p)
} }
} }
return priorityConfigs, &plugins, pluginConfig, nil return &plugins, pluginConfig, nil
} }
// getPredicateConfigs returns predicates configuration: ones that will run as fitPredicates and ones that will run // getPredicateConfigs returns predicates configuration: ones that will run as fitPredicates and ones that will run

View File

@ -270,9 +270,6 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
if !foundPlugin(c.Plugins.Filter.Enabled, predicateOne) { if !foundPlugin(c.Plugins.Filter.Enabled, predicateOne) {
t.Errorf("Expected predicate PredicateOne from %q", schedulerapi.SchedulerDefaultProviderName) t.Errorf("Expected predicate PredicateOne from %q", schedulerapi.SchedulerDefaultProviderName)
} }
if len(c.Algorithm.Prioritizers()) != 1 || c.Algorithm.Prioritizers()[0].Name != "PriorityOne" {
t.Errorf("Expected priority PriorityOne from %q", schedulerapi.SchedulerDefaultProviderName)
}
} }
func foundPlugin(plugins []schedulerapi.Plugin, name string) bool { func foundPlugin(plugins []schedulerapi.Plugin, name string) bool {
@ -316,9 +313,6 @@ func TestCreateFromConfigWithEmptyPredicatesOrPriorities(t *testing.T) {
if len(c.Algorithm.Predicates()) != 0 { if len(c.Algorithm.Predicates()) != 0 {
t.Error("Expected empty predicate sets") t.Error("Expected empty predicate sets")
} }
if len(c.Algorithm.Prioritizers()) != 0 {
t.Error("Expected empty priority sets")
}
} }
func PredicateFunc(pod *v1.Pod, meta predicates.Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) { func PredicateFunc(pod *v1.Pod, meta predicates.Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
@ -854,15 +848,6 @@ func TestCreateWithFrameworkPlugins(t *testing.T) {
t.Errorf("unexpected predicates diff (-want, +got): %s", diff) t.Errorf("unexpected predicates diff (-want, +got): %s", diff)
} }
gotPriorities := sets.NewString()
for _, p := range c.Algorithm.Prioritizers() {
gotPriorities.Insert(p.Name)
}
wantPriorities := sets.NewString(priorityThreeName)
if diff := cmp.Diff(wantPriorities, gotPriorities); diff != "" {
t.Errorf("unexpected priorities diff (-want, +got): %s", diff)
}
// Verify the aggregated configuration. // Verify the aggregated configuration.
wantPlugins := schedulerapi.Plugins{ wantPlugins := schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{}, QueueSort: &schedulerapi.PluginSet{},

View File

@ -167,9 +167,7 @@ func (es mockScheduler) Schedule(ctx context.Context, state *framework.CycleStat
func (es mockScheduler) Predicates() map[string]predicates.FitPredicate { func (es mockScheduler) Predicates() map[string]predicates.FitPredicate {
return nil return nil
} }
func (es mockScheduler) Prioritizers() []priorities.PriorityConfig {
return nil
}
func (es mockScheduler) Extenders() []algorithm.SchedulerExtender { func (es mockScheduler) Extenders() []algorithm.SchedulerExtender {
return nil return nil
} }
@ -694,7 +692,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
nil, nil,
predicates.EmptyMetadataProducer, predicates.EmptyMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
nodeinfosnapshot.NewEmptySnapshot(), nodeinfosnapshot.NewEmptySnapshot(),
fwk, fwk,
@ -753,7 +750,6 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
queue, queue,
nil, nil,
predicates.EmptyMetadataProducer, predicates.EmptyMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
nodeinfosnapshot.NewEmptySnapshot(), nodeinfosnapshot.NewEmptySnapshot(),
fwk, fwk,
@ -1006,9 +1002,8 @@ func TestInitPolicyFromFile(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
for i, test := range []struct { for i, test := range []struct {
policy string policy string
expectedPredicates sets.String expectedPredicates sets.String
expectedPrioritizers sets.String
}{ }{
// Test json format policy file // Test json format policy file
{ {
@ -1028,10 +1023,6 @@ func TestInitPolicyFromFile(t *testing.T) {
"PredicateOne", "PredicateOne",
"PredicateTwo", "PredicateTwo",
), ),
expectedPrioritizers: sets.NewString(
"PriorityOne",
"PriorityTwo",
),
}, },
// Test yaml format policy file // Test yaml format policy file
{ {
@ -1050,10 +1041,6 @@ priorities:
"PredicateOne", "PredicateOne",
"PredicateTwo", "PredicateTwo",
), ),
expectedPrioritizers: sets.NewString(
"PriorityOne",
"PriorityTwo",
),
}, },
} { } {
file := fmt.Sprintf("scheduler-policy-config-file-%d", i) file := fmt.Sprintf("scheduler-policy-config-file-%d", i)
@ -1081,8 +1068,5 @@ priorities:
if !schedPredicates.Equal(test.expectedPredicates) { if !schedPredicates.Equal(test.expectedPredicates) {
t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates) t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates)
} }
if !schedPrioritizers.Equal(test.expectedPrioritizers) {
t.Errorf("Expected priority functions %v, got %v", test.expectedPrioritizers, schedPrioritizers)
}
} }
} }

View File

@ -35,3 +35,18 @@ func RegisterFilterPlugin(pluginName string, pluginNewFunc framework.PluginFacto
pluginConfigs = append(pluginConfigs, schedulerapi.PluginConfig{Name: pluginName}) pluginConfigs = append(pluginConfigs, schedulerapi.PluginConfig{Name: pluginName})
} }
} }
// RegisterScorePluginFunc is a function signature used in method RegisterScorePlugin()
// to register a Score Plugin to a given registry.
type RegisterScorePluginFunc func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig)
// RegisterScorePlugin returns a function to register a Score Plugin to a given registry.
func RegisterScorePlugin(pluginName string, pluginNewFunc framework.PluginFactory, weight int32) RegisterScorePluginFunc {
return func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig) {
reg.Register(pluginName, pluginNewFunc)
plugins.Score.Enabled = append(plugins.Score.Enabled, schedulerapi.Plugin{Name: pluginName, Weight: weight})
//lint:ignore SA4006 this value of pluginConfigs is never used.
//lint:ignore SA4010 this result of append is never used.
pluginConfigs = append(pluginConfigs, schedulerapi.PluginConfig{Name: pluginName})
}
}

View File

@ -91,10 +91,9 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
scheduler.RegisterPriorityMapReduceFunction("PriorityTwo", PriorityTwo, nil, 1) scheduler.RegisterPriorityMapReduceFunction("PriorityTwo", PriorityTwo, nil, 1)
for i, test := range []struct { for i, test := range []struct {
policy string policy string
expectedPredicates sets.String expectedPredicates sets.String
expectedPrioritizers sets.String expectedPlugins map[string][]kubeschedulerconfig.Plugin
expectedPlugins map[string][]kubeschedulerconfig.Plugin
}{ }{
{ {
policy: `{ policy: `{
@ -113,10 +112,6 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"PredicateOne", "PredicateOne",
"PredicateTwo", "PredicateTwo",
), ),
expectedPrioritizers: sets.NewString(
"PriorityOne",
"PriorityTwo",
),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@ -129,7 +124,6 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"kind" : "Policy", "kind" : "Policy",
"apiVersion" : "v1" "apiVersion" : "v1"
}`, }`,
expectedPrioritizers: sets.NewString(),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -169,8 +163,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"predicates" : [], "predicates" : [],
"priorities" : [] "priorities" : []
}`, }`,
expectedPredicates: sets.NewString(), expectedPredicates: sets.NewString(),
expectedPrioritizers: sets.NewString(),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@ -194,10 +187,6 @@ priorities:
"PredicateOne", "PredicateOne",
"PredicateTwo", "PredicateTwo",
), ),
expectedPrioritizers: sets.NewString(
"PriorityOne",
"PriorityTwo",
),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@ -209,7 +198,6 @@ priorities:
policy: `apiVersion: v1 policy: `apiVersion: v1
kind: Policy kind: Policy
`, `,
expectedPrioritizers: sets.NewString(),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"PreFilterPlugin": { "PreFilterPlugin": {
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
@ -248,8 +236,7 @@ kind: Policy
predicates: [] predicates: []
priorities: [] priorities: []
`, `,
expectedPredicates: sets.NewString(), expectedPredicates: sets.NewString(),
expectedPrioritizers: sets.NewString(),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@ -300,16 +287,9 @@ priorities: []
for k := range sched.Algorithm.Predicates() { for k := range sched.Algorithm.Predicates() {
schedPredicates.Insert(k) schedPredicates.Insert(k)
} }
schedPrioritizers := sets.NewString()
for _, p := range sched.Algorithm.Prioritizers() {
schedPrioritizers.Insert(p.Name)
}
if !schedPredicates.Equal(test.expectedPredicates) { if !schedPredicates.Equal(test.expectedPredicates) {
t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates) t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates)
} }
if !schedPrioritizers.Equal(test.expectedPrioritizers) {
t.Errorf("Expected priority functions %v, got %v", test.expectedPrioritizers, schedPrioritizers)
}
schedPlugins := sched.Framework.ListPlugins() schedPlugins := sched.Framework.ListPlugins()
if diff := cmp.Diff(test.expectedPlugins, schedPlugins); diff != "" { if diff := cmp.Diff(test.expectedPlugins, schedPlugins); diff != "" {
t.Errorf("unexpected predicates diff (-want, +got): %s", diff) t.Errorf("unexpected predicates diff (-want, +got): %s", diff)