feat(scheduler): remove deprecated pattern in scheduler priority

This commit is contained in:
draveness 2019-11-07 14:59:04 +08:00
parent c843d9614c
commit 3bb88356f4
10 changed files with 76 additions and 126 deletions

View File

@ -85,7 +85,6 @@ go_test(
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",

View File

@ -48,10 +48,7 @@ type PriorityConfig struct {
Name string
Map PriorityMapFunction
Reduce PriorityReduceFunction
// TODO: Remove it after migrating all functions to
// Map-Reduce pattern.
Function PriorityFunction
Weight int64
Weight int64
}
// EmptyPriorityMetadataProducer returns a no-op PriorityMetadataProducer type.

View File

@ -81,7 +81,6 @@ type PriorityFunctionFactory2 func(PluginFactoryArgs) (priorities.PriorityMapFun
// PriorityConfigFactory produces a PriorityConfig from the given function and weight
type PriorityConfigFactory struct {
Function PriorityFunctionFactory
MapReduceFunction PriorityFunctionFactory2
Weight int64
}
@ -341,19 +340,6 @@ func RegisterPredicateMetadataProducer(producer predicates.PredicateMetadataProd
predicateMetadataProducer = producer
}
// RegisterPriorityFunction registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
// DEPRECATED
// Use Map-Reduce pattern for priority functions.
func RegisterPriorityFunction(name string, function priorities.PriorityFunction, weight int) string {
return RegisterPriorityConfigFactory(name, PriorityConfigFactory{
Function: func(PluginFactoryArgs) priorities.PriorityFunction {
return function
},
Weight: int64(weight),
})
}
// RegisterPriorityMapReduceFunction registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
func RegisterPriorityMapReduceFunction(
@ -449,7 +435,6 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy, args *pl
klog.V(2).Infof("Priority type %s already registered, reusing.", name)
// set/update the weight based on the policy
pcf = &PriorityConfigFactory{
Function: existingPcf.Function,
MapReduceFunction: existingPcf.MapReduceFunction,
Weight: policy.Weight,
}
@ -579,21 +564,13 @@ func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]pr
if !ok {
return nil, fmt.Errorf("invalid priority name %s specified - no corresponding function found", name)
}
if factory.Function != nil {
configs = append(configs, priorities.PriorityConfig{
Name: name,
Function: factory.Function(args),
Weight: factory.Weight,
})
} else {
mapFunction, reduceFunction := factory.MapReduceFunction(args)
configs = append(configs, priorities.PriorityConfig{
Name: name,
Map: mapFunction,
Reduce: reduceFunction,
Weight: factory.Weight,
})
}
mapFunction, reduceFunction := factory.MapReduceFunction(args)
configs = append(configs, priorities.PriorityConfig{
Name: name,
Map: mapFunction,
Reduce: reduceFunction,
Weight: factory.Weight,
})
}
if err := validateSelectedConfigs(configs); err != nil {
return nil, err

View File

@ -18,6 +18,7 @@ package core
import (
"context"
"errors"
"fmt"
"reflect"
"sort"
@ -40,7 +41,6 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -107,16 +107,16 @@ func machine2PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.Node
return &result, nil
}
func machine2Prioritizer(_ *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := []framework.NodeScore{}
for _, node := range nodes {
score := 10
if node.Name == "machine2" {
score = 100
}
result = append(result, framework.NodeScore{Name: node.Name, Score: int64(score)})
func machine2Prioritizer(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, errors.New("node not found")
}
return result, nil
score := 10
if node.Name == "machine2" {
score = 100
}
return framework.NodeScore{Name: node.Name, Score: int64(score)}, nil
}
type FakeExtender struct {
@ -457,7 +457,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: machine2Prioritizer, Weight: 20}},
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 20}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -482,7 +482,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
// because of the errors from errorPredicateExtender and/or
// errorPrioritizerExtender.
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: machine2Prioritizer, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 1}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{errorPredicateExtender},

View File

@ -744,35 +744,13 @@ func (g *genericScheduler) prioritizeNodes(
results := make([]framework.NodeScoreList, len(g.prioritizers))
// DEPRECATED: we can remove this when all priorityConfigs implement the
// Map-Reduce pattern.
for i := range g.prioritizers {
if g.prioritizers[i].Function != nil {
wg.Add(1)
go func(index int) {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_legacy").Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_legacy").Dec()
wg.Done()
}()
var err error
results[index], err = g.prioritizers[index].Function(pod, g.nodeInfoSnapshot, nodes)
if err != nil {
appendError(err)
}
}(i)
} else {
results[i] = make(framework.NodeScoreList, len(nodes))
}
results[i] = make(framework.NodeScoreList, len(nodes))
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
for i := range g.prioritizers {
if g.prioritizers[i].Function != nil {
continue
}
var err error
results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
if err != nil {

View File

@ -83,42 +83,35 @@ func hasNoPodsPredicate(pod *v1.Pod, meta algorithmpredicates.PredicateMetadata,
return false, []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
func numericPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := []framework.NodeScore{}
for _, node := range nodes {
score, err := strconv.Atoi(node.Name)
if err != nil {
return nil, err
}
result = append(result, framework.NodeScore{
Name: node.Name,
Score: int64(score),
})
func numericMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
score, err := strconv.Atoi(node.Name)
if err != nil {
return framework.NodeScore{}, err
}
return result, nil
return framework.NodeScore{
Name: node.Name,
Score: int64(score),
}, nil
}
func reverseNumericPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
func reverseNumericReducePriority(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error {
var maxScore float64
minScore := math.MaxFloat64
reverseResult := []framework.NodeScore{}
result, err := numericPriority(pod, sharedLister, nodes)
if err != nil {
return nil, err
}
for _, hostPriority := range result {
maxScore = math.Max(maxScore, float64(hostPriority.Score))
minScore = math.Min(minScore, float64(hostPriority.Score))
}
for _, hostPriority := range result {
reverseResult = append(reverseResult, framework.NodeScore{
for i, hostPriority := range result {
result[i] = framework.NodeScore{
Name: hostPriority.Name,
Score: int64(maxScore + minScore - float64(hostPriority.Score)),
})
}
}
return reverseResult, nil
return nil
}
func trueMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
@ -326,7 +319,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}},
expectedHosts: sets.NewString("3"),
@ -335,7 +328,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
predicates: map[string]algorithmpredicates.FitPredicate{"matches": matchesPredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
expectedHosts: sets.NewString("2"),
@ -343,8 +336,18 @@ func TestGenericScheduler(t *testing.T) {
wErr: nil,
},
{
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}, {Function: reverseNumericPriority, Weight: 2}},
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{
{
Map: numericMapPriority,
Weight: 1,
},
{
Map: numericMapPriority,
Reduce: reverseNumericReducePriority,
Weight: 2,
},
},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
expectedHosts: sets.NewString("1"),
@ -353,7 +356,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate, "false": falsePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
name: "test 7",
@ -385,7 +388,7 @@ func TestGenericScheduler(t *testing.T) {
},
},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"1", "2"},
name: "test 8",
wErr: &FitError{
@ -591,7 +594,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test with filter plugin returning Unschedulable status",
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
@ -608,7 +611,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test with filter plugin returning UnschedulableAndUnresolvable status",
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
@ -625,7 +628,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test with partial failed filter plugin",
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
@ -2156,7 +2159,7 @@ func TestPreempt(t *testing.T) {
internalqueue.NewSchedulingQueue(nil, nil),
map[string]algorithmpredicates.FitPredicate{"matches": predicate},
predMetaProducer,
[]priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
[]priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
priorities.EmptyPriorityMetadataProducer,
emptySnapshot,
emptyFramework,

View File

@ -50,7 +50,6 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -83,8 +82,8 @@ func TestCreateFromConfig(t *testing.T) {
// Pre-register some predicate and priority functions
RegisterFitPredicate("PredicateOne", PredicateFunc)
RegisterFitPredicate("PredicateTwo", PredicateFunc)
RegisterPriorityFunction("PriorityOne", PriorityFunc, 1)
RegisterPriorityFunction("PriorityTwo", PriorityFunc, 1)
RegisterPriorityMapReduceFunction("PriorityOne", PriorityFunc, nil, 1)
RegisterPriorityMapReduceFunction("PriorityTwo", PriorityFunc, nil, 1)
configData = []byte(`{
"kind" : "Policy",
@ -169,8 +168,8 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
// Pre-register some predicate and priority functions
RegisterFitPredicate("PredicateOne", PredicateFunc)
RegisterFitPredicate("PredicateTwo", PredicateFunc)
RegisterPriorityFunction("PriorityOne", PriorityFunc, 1)
RegisterPriorityFunction("PriorityTwo", PriorityFunc, 1)
RegisterPriorityMapReduceFunction("PriorityOne", PriorityFunc, nil, 1)
RegisterPriorityMapReduceFunction("PriorityTwo", PriorityFunc, nil, 1)
configData = []byte(`{
"kind" : "Policy",
@ -225,7 +224,7 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
RegisterFitPredicate("PredicateOne", PredicateFunc)
RegisterPriorityFunction("PriorityOne", PriorityFunc, 1)
RegisterPriorityMapReduceFunction("PriorityOne", PriorityFunc, nil, 1)
RegisterAlgorithmProvider(DefaultProvider, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
@ -260,7 +259,7 @@ func TestCreateFromConfigWithEmptyPredicatesOrPriorities(t *testing.T) {
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
RegisterFitPredicate("PredicateOne", PredicateFunc)
RegisterPriorityFunction("PriorityOne", PriorityFunc, 1)
RegisterPriorityMapReduceFunction("PriorityOne", PriorityFunc, nil, 1)
RegisterAlgorithmProvider(DefaultProvider, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
@ -291,8 +290,8 @@ func PredicateFunc(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *sch
return true, nil, nil
}
func PriorityFunc(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
return []framework.NodeScore{}, nil
func PriorityFunc(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
return framework.NodeScore{}, nil
}
func TestDefaultErrorFunc(t *testing.T) {
@ -782,9 +781,9 @@ func TestCreateWithFrameworkPlugins(t *testing.T) {
RegisterFitPredicate(predicateTwoName, PredicateFunc)
RegisterFitPredicate(predicateThreeName, PredicateFunc)
RegisterMandatoryFitPredicate(predicateFourName, PredicateFunc)
RegisterPriorityFunction(priorityOneName, PriorityFunc, 1)
RegisterPriorityFunction(priorityTwoName, PriorityFunc, 1)
RegisterPriorityFunction(priorityThreeName, PriorityFunc, 1)
RegisterPriorityMapReduceFunction(priorityOneName, PriorityFunc, nil, 1)
RegisterPriorityMapReduceFunction(priorityTwoName, PriorityFunc, nil, 1)
RegisterPriorityMapReduceFunction(priorityThreeName, PriorityFunc, nil, 1)
configData = []byte(`{
"kind" : "Policy",

View File

@ -52,7 +52,6 @@ import (
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -144,8 +143,8 @@ func PredicateOne(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *sche
return true, nil, nil
}
func PriorityOne(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
return []framework.NodeScore{}, nil
func PriorityOne(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
return framework.NodeScore{}, nil
}
type mockScheduler struct {
@ -184,7 +183,7 @@ func TestSchedulerCreation(t *testing.T) {
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
RegisterFitPredicate("PredicateOne", PredicateOne)
RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
RegisterPriorityMapReduceFunction("PriorityOne", PriorityOne, nil, 1)
RegisterAlgorithmProvider(testSource, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
stopCh := make(chan struct{})

View File

@ -33,7 +33,6 @@ go_test(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/extender/v1:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//plugin/pkg/admission/defaulttolerationseconds:go_default_library",

View File

@ -43,7 +43,6 @@ import (
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/test/integration/framework"
)
@ -63,12 +62,12 @@ func PredicateTwo(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *sche
return true, nil, nil
}
func PriorityOne(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (schedulerframework.NodeScoreList, error) {
return []schedulerframework.NodeScore{}, nil
func PriorityOne(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerframework.NodeScore, error) {
return schedulerframework.NodeScore{}, nil
}
func PriorityTwo(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (schedulerframework.NodeScoreList, error) {
return []schedulerframework.NodeScore{}, nil
func PriorityTwo(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerframework.NodeScore, error) {
return schedulerframework.NodeScore{}, nil
}
// TestSchedulerCreationFromConfigMap verifies that scheduler can be created
@ -88,8 +87,8 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
// Pre-register some predicate and priority functions
scheduler.RegisterFitPredicate("PredicateOne", PredicateOne)
scheduler.RegisterFitPredicate("PredicateTwo", PredicateTwo)
scheduler.RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
scheduler.RegisterPriorityFunction("PriorityTwo", PriorityTwo, 1)
scheduler.RegisterPriorityMapReduceFunction("PriorityOne", PriorityOne, nil, 1)
scheduler.RegisterPriorityMapReduceFunction("PriorityTwo", PriorityTwo, nil, 1)
for i, test := range []struct {
policy string