Merge pull request #86673 from ahg-g/ahg1-provider

Define algorithm providers in terms of plugins
This commit is contained in:
Kubernetes Prow Robot 2020-01-02 22:25:53 -08:00 committed by GitHub
commit b3c4c90a72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1341 additions and 1869 deletions

View File

@ -15,6 +15,7 @@ go_library(
"//pkg/client/leaderelectionconfig:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/apis/config/v1alpha1:go_default_library",

View File

@ -22,7 +22,7 @@ import (
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
@ -44,7 +44,7 @@ func (o *DeprecatedOptions) AddFlags(fs *pflag.FlagSet, cfg *kubeschedulerconfig
return
}
fs.StringVar(&o.AlgorithmProvider, "algorithm-provider", o.AlgorithmProvider, "DEPRECATED: the scheduling algorithm provider to use, one of: "+scheduler.ListAlgorithmProviders())
fs.StringVar(&o.AlgorithmProvider, "algorithm-provider", o.AlgorithmProvider, "DEPRECATED: the scheduling algorithm provider to use, one of: "+algorithmprovider.ListAlgorithmProviders())
fs.StringVar(&o.PolicyConfigFile, "policy-config-file", o.PolicyConfigFile, "DEPRECATED: file with scheduler policy configuration. This file is used if policy ConfigMap is not provided or --use-legacy-policy-config=true")
usage := fmt.Sprintf("DEPRECATED: name of the ConfigMap object that contains scheduler's policy configuration. It must exist in the system namespace before scheduler initialization if --use-legacy-policy-config=false. The config must be provided as the value of an element in 'Data' map with the key='%v'", kubeschedulerconfig.SchedulerPolicyConfigMapKey)
fs.StringVar(&o.PolicyConfigMapName, "policy-configmap", o.PolicyConfigMapName, usage)

View File

@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"algorithm_factory.go",
"eventhandlers.go",
"factory.go",
"scheduler.go",
@ -16,20 +15,18 @@ go_library(
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/apis/config/validation:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/debugger:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
@ -59,7 +56,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"algorithm_factory_test.go",
"eventhandlers_test.go",
"factory_test.go",
"scheduler_test.go",
@ -110,7 +106,6 @@ go_test(
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)

View File

@ -396,9 +396,3 @@ func podToleratesNodeTaints(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, f
}
return false, []PredicateFailureReason{ErrTaintsTolerationsNotMatch}, nil
}
// EvenPodsSpreadPredicate is the legacy function using old path of metadata.
// DEPRECATED
func EvenPodsSpreadPredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
return false, nil, fmt.Errorf("this function should never be called")
}

View File

@ -27,11 +27,10 @@ import (
// MetadataFactory is a factory to produce PriorityMetadata.
type MetadataFactory struct {
serviceLister corelisters.ServiceLister
controllerLister corelisters.ReplicationControllerLister
replicaSetLister appslisters.ReplicaSetLister
statefulSetLister appslisters.StatefulSetLister
hardPodAffinityWeight int32
serviceLister corelisters.ServiceLister
controllerLister corelisters.ReplicationControllerLister
replicaSetLister appslisters.ReplicaSetLister
statefulSetLister appslisters.StatefulSetLister
}
// NewMetadataFactory creates a MetadataFactory.
@ -40,14 +39,12 @@ func NewMetadataFactory(
controllerLister corelisters.ReplicationControllerLister,
replicaSetLister appslisters.ReplicaSetLister,
statefulSetLister appslisters.StatefulSetLister,
hardPodAffinityWeight int32,
) MetadataProducer {
factory := &MetadataFactory{
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
hardPodAffinityWeight: hardPodAffinityWeight,
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
}
return factory.PriorityMetadata
}

View File

@ -155,7 +155,6 @@ func TestPriorityMetadata(t *testing.T) {
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
1,
)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {

View File

@ -352,7 +352,6 @@ func TestSelectorSpreadPriority(t *testing.T) {
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss),
1,
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
@ -590,7 +589,6 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss),
1,
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
list, err := runMapReducePriority(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData, test.pod, snapshot, makeLabeledNodeList(labeledNodes))

View File

@ -1,460 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"regexp"
"sort"
"strings"
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
"k8s.io/klog"
)
// AlgorithmFactoryArgs are passed to all factory functions.
type AlgorithmFactoryArgs struct {
SharedLister schedulerlisters.SharedLister
InformerFactory informers.SharedInformerFactory
VolumeBinder *volumebinder.VolumeBinder
HardPodAffinitySymmetricWeight int32
}
// PriorityMetadataProducerFactory produces MetadataProducer from the given args.
type PriorityMetadataProducerFactory func(AlgorithmFactoryArgs) priorities.MetadataProducer
var (
algorithmRegistry = &AlgorithmRegistry{
// predicate keys supported for backward compatibility with v1.Policy.
predicateKeys: sets.NewString(
"PodFitsPorts", // This exists for compatibility reasons.
predicates.PodFitsHostPortsPred,
predicates.PodFitsResourcesPred,
predicates.HostNamePred,
predicates.MatchNodeSelectorPred,
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MaxCinderVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckNodeUnschedulablePred,
predicates.CheckVolumeBindingPred,
),
// priority keys to weights, this exist for backward compatibility with v1.Policy.
priorityKeys: map[string]int64{
priorities.LeastRequestedPriority: 1,
priorities.BalancedResourceAllocation: 1,
priorities.MostRequestedPriority: 1,
priorities.ImageLocalityPriority: 1,
priorities.NodeAffinityPriority: 1,
priorities.SelectorSpreadPriority: 1,
priorities.ServiceSpreadingPriority: 1,
priorities.TaintTolerationPriority: 1,
priorities.InterPodAffinityPriority: 1,
priorities.NodePreferAvoidPodsPriority: 10000,
},
// MandatoryPredicates the set of keys for predicates that the scheduler will
// be configured with all the time.
mandatoryPredicateKeys: sets.NewString(
predicates.PodToleratesNodeTaintsPred,
predicates.CheckNodeUnschedulablePred,
),
algorithmProviders: make(map[string]AlgorithmProviderConfig),
}
// Registered metadata producers
priorityMetadataProducerFactory PriorityMetadataProducerFactory
schedulerFactoryMutex sync.RWMutex
)
// AlgorithmProviderConfig is used to store the configuration of algorithm providers.
type AlgorithmProviderConfig struct {
PredicateKeys sets.String
PriorityKeys sets.String
}
// AlgorithmRegistry is used to store current state of registered predicates and priorities.
type AlgorithmRegistry struct {
predicateKeys sets.String
priorityKeys map[string]int64
mandatoryPredicateKeys sets.String
algorithmProviders map[string]AlgorithmProviderConfig
}
// RegisteredPredicatesAndPrioritiesSnapshot returns a snapshot of current registered predicates and priorities.
func RegisteredPredicatesAndPrioritiesSnapshot() *AlgorithmRegistry {
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
copy := AlgorithmRegistry{
predicateKeys: sets.NewString(),
mandatoryPredicateKeys: sets.NewString(),
priorityKeys: make(map[string]int64),
algorithmProviders: make(map[string]AlgorithmProviderConfig),
}
for k := range algorithmRegistry.predicateKeys {
copy.predicateKeys.Insert(k)
}
for k := range algorithmRegistry.mandatoryPredicateKeys {
copy.mandatoryPredicateKeys.Insert(k)
}
for k, v := range algorithmRegistry.priorityKeys {
copy.priorityKeys[k] = v
}
for provider, config := range algorithmRegistry.algorithmProviders {
copyPredKeys, copyPrioKeys := sets.NewString(), sets.NewString()
for k := range config.PredicateKeys {
copyPredKeys.Insert(k)
}
for k := range config.PriorityKeys {
copyPrioKeys.Insert(k)
}
copy.algorithmProviders[provider] = AlgorithmProviderConfig{
PredicateKeys: copyPredKeys,
PriorityKeys: copyPrioKeys,
}
}
return &copy
}
// ApplyPredicatesAndPriorities sets state of predicates and priorities to `s`.
func ApplyPredicatesAndPriorities(s *AlgorithmRegistry) {
schedulerFactoryMutex.Lock()
algorithmRegistry = s
schedulerFactoryMutex.Unlock()
}
// RegisterPredicate registers a fit predicate with the algorithm
// registry. Returns the name with which the predicate was registered.
// TODO(Huang-Wei): remove this.
func RegisterPredicate(name string) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
algorithmRegistry.predicateKeys.Insert(name)
return name
}
// RegisterMandatoryPredicate registers a mandatory predicate.
func RegisterMandatoryPredicate(name string) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
algorithmRegistry.predicateKeys.Insert(name)
algorithmRegistry.mandatoryPredicateKeys.Insert(name)
return name
}
// AddPredicateToAlgorithmProviders adds a predicate key to all algorithm providers.
func AddPredicateToAlgorithmProviders(key string) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
for _, provider := range algorithmRegistry.algorithmProviders {
provider.PredicateKeys.Insert(key)
}
}
// AddPriorityToAlgorithmProviders adds a priority key to all algorithm providers.
func AddPriorityToAlgorithmProviders(key string) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
for _, provider := range algorithmRegistry.algorithmProviders {
provider.PriorityKeys.Insert(key)
}
}
// RegisterCustomPredicate registers a custom fit predicate with the algorithm registry.
// Returns the name, with which the predicate was registered.
func RegisterCustomPredicate(policy schedulerapi.PredicatePolicy, pluginArgs *plugins.ConfigProducerArgs) string {
var ok bool
var predicate string
validatePredicateOrDie(policy)
// generate the predicate function, if a custom type is requested
if policy.Argument != nil {
if policy.Argument.ServiceAffinity != nil {
// We use the ServiceAffinity predicate name for all ServiceAffinity custom predicates.
// It may get called multiple times but we essentially only register one instance of ServiceAffinity predicate.
// This name is then used to find the registered plugin and run the plugin instead of the predicate.
predicate = predicates.CheckServiceAffinityPred
// map LabelsPresence policy to ConfigProducerArgs that's used to configure the ServiceAffinity plugin.
if pluginArgs.ServiceAffinityArgs == nil {
pluginArgs.ServiceAffinityArgs = &serviceaffinity.Args{}
}
pluginArgs.ServiceAffinityArgs.AffinityLabels = append(pluginArgs.ServiceAffinityArgs.AffinityLabels, policy.Argument.ServiceAffinity.Labels...)
} else if policy.Argument.LabelsPresence != nil {
// We use the CheckNodeLabelPresencePred predicate name for all kNodeLabel custom predicates.
// It may get called multiple times but we essentially only register one instance of NodeLabel predicate.
// This name is then used to find the registered plugin and run the plugin instead of the predicate.
predicate = predicates.CheckNodeLabelPresencePred
// Map LabelPresence policy to ConfigProducerArgs that's used to configure the NodeLabel plugin.
if pluginArgs.NodeLabelArgs == nil {
pluginArgs.NodeLabelArgs = &nodelabel.Args{}
}
if policy.Argument.LabelsPresence.Presence {
pluginArgs.NodeLabelArgs.PresentLabels = append(pluginArgs.NodeLabelArgs.PresentLabels, policy.Argument.LabelsPresence.Labels...)
} else {
pluginArgs.NodeLabelArgs.AbsentLabels = append(pluginArgs.NodeLabelArgs.AbsentLabels, policy.Argument.LabelsPresence.Labels...)
}
}
} else if _, ok = algorithmRegistry.predicateKeys[policy.Name]; ok {
// checking to see if a pre-defined predicate is requested
klog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name)
return policy.Name
}
if len(predicate) == 0 {
klog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name)
}
return predicate
}
// RegisterPriorityMetadataProducerFactory registers a PriorityMetadataProducerFactory.
func RegisterPriorityMetadataProducerFactory(f PriorityMetadataProducerFactory) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
priorityMetadataProducerFactory = f
}
// RegisterPriority registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
func RegisterPriority(name string, weight int64) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
algorithmRegistry.priorityKeys[name] = weight
return name
}
// RegisterCustomPriority registers a custom priority with the algorithm registry.
// Returns the name, with which the priority function was registered.
func RegisterCustomPriority(policy schedulerapi.PriorityPolicy, configProducerArgs *plugins.ConfigProducerArgs) string {
var priority string
var weight int64
validatePriorityOrDie(policy)
// generate the priority function, if a custom priority is requested
if policy.Argument != nil {
if policy.Argument.ServiceAntiAffinity != nil {
// We use the ServiceAffinity plugin name for all ServiceAffinity custom priorities.
// It may get called multiple times but we essentially only register one instance of
// ServiceAffinity priority.
// This name is then used to find the registered plugin and run the plugin instead of the priority.
priority = serviceaffinity.Name
if configProducerArgs.ServiceAffinityArgs == nil {
configProducerArgs.ServiceAffinityArgs = &serviceaffinity.Args{}
}
configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference = append(configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference, policy.Argument.ServiceAntiAffinity.Label)
weight = policy.Weight
schedulerFactoryMutex.RLock()
if existingWeight, ok := algorithmRegistry.priorityKeys[priority]; ok {
// If there are n ServiceAffinity priorities in the policy, the weight for the corresponding
// score plugin is n*(weight of each priority).
weight += existingWeight
}
schedulerFactoryMutex.RUnlock()
} else if policy.Argument.LabelPreference != nil {
// We use the NodeLabel plugin name for all NodeLabel custom priorities.
// It may get called multiple times but we essentially only register one instance of NodeLabel priority.
// This name is then used to find the registered plugin and run the plugin instead of the priority.
priority = nodelabel.Name
if configProducerArgs.NodeLabelArgs == nil {
configProducerArgs.NodeLabelArgs = &nodelabel.Args{}
}
if policy.Argument.LabelPreference.Presence {
configProducerArgs.NodeLabelArgs.PresentLabelsPreference = append(configProducerArgs.NodeLabelArgs.PresentLabelsPreference, policy.Argument.LabelPreference.Label)
} else {
configProducerArgs.NodeLabelArgs.AbsentLabelsPreference = append(configProducerArgs.NodeLabelArgs.AbsentLabelsPreference, policy.Argument.LabelPreference.Label)
}
weight = policy.Weight
schedulerFactoryMutex.RLock()
if existingWeight, ok := algorithmRegistry.priorityKeys[priority]; ok {
// If there are n NodeLabel priority configured in the policy, the weight for the corresponding
// priority is n*(weight of each priority in policy).
weight += existingWeight
}
schedulerFactoryMutex.RUnlock()
} else if policy.Argument.RequestedToCapacityRatioArguments != nil {
scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments)
configProducerArgs.RequestedToCapacityRatioArgs = &noderesources.RequestedToCapacityRatioArgs{
FunctionShape: scoringFunctionShape,
ResourceToWeightMap: resources,
}
// We do not allow specifying the name for custom plugins, see #83472
priority = noderesources.RequestedToCapacityRatioName
weight = policy.Weight
}
} else if _, ok := algorithmRegistry.priorityKeys[policy.Name]; ok {
klog.V(2).Infof("Priority type %s already registered, reusing.", policy.Name)
// set/update the weight based on the policy
priority = policy.Name
weight = policy.Weight
}
if len(priority) == 0 {
klog.Fatalf("Invalid configuration: Priority type not found for %s", policy.Name)
}
return RegisterPriority(priority, weight)
}
func buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(arguments *schedulerapi.RequestedToCapacityRatioArguments) (noderesources.FunctionShape, noderesources.ResourceToWeightMap) {
n := len(arguments.Shape)
points := make([]noderesources.FunctionShapePoint, 0, n)
for _, point := range arguments.Shape {
points = append(points, noderesources.FunctionShapePoint{
Utilization: int64(point.Utilization),
// MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by requested to capacity ratio to the score range
// used by the scheduler.
Score: int64(point.Score) * (framework.MaxNodeScore / schedulerapi.MaxCustomPriorityScore),
})
}
shape, err := noderesources.NewFunctionShape(points)
if err != nil {
klog.Fatalf("invalid RequestedToCapacityRatioPriority arguments: %s", err.Error())
}
resourceToWeightMap := make(noderesources.ResourceToWeightMap, 0)
if len(arguments.Resources) == 0 {
resourceToWeightMap = noderesources.DefaultRequestedRatioResources
return shape, resourceToWeightMap
}
for _, resource := range arguments.Resources {
resourceToWeightMap[v1.ResourceName(resource.Name)] = resource.Weight
if resource.Weight == 0 {
resourceToWeightMap[v1.ResourceName(resource.Name)] = 1
}
}
return shape, resourceToWeightMap
}
// RegisterAlgorithmProvider registers a new algorithm provider with the algorithm registry.
func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys sets.String) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
algorithmRegistry.algorithmProviders[name] = AlgorithmProviderConfig{
PredicateKeys: predicateKeys,
PriorityKeys: priorityKeys,
}
return name
}
// GetAlgorithmProvider should not be used to modify providers. It is publicly visible for testing.
func GetAlgorithmProvider(name string) (*AlgorithmProviderConfig, error) {
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
provider, ok := algorithmRegistry.algorithmProviders[name]
if !ok {
return nil, fmt.Errorf("provider %q is not registered", name)
}
return &provider, nil
}
func getPriorityMetadataProducer(args AlgorithmFactoryArgs) (priorities.MetadataProducer, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
if priorityMetadataProducerFactory == nil {
return priorities.EmptyMetadataProducer, nil
}
return priorityMetadataProducerFactory(args), nil
}
var validName = regexp.MustCompile("^[a-zA-Z0-9]([-a-zA-Z0-9]*[a-zA-Z0-9])$")
func validateAlgorithmNameOrDie(name string) {
if !validName.MatchString(name) {
klog.Fatalf("Algorithm name %v does not match the name validation regexp \"%v\".", name, validName)
}
}
func validatePredicateOrDie(predicate schedulerapi.PredicatePolicy) {
if predicate.Argument != nil {
numArgs := 0
if predicate.Argument.ServiceAffinity != nil {
numArgs++
}
if predicate.Argument.LabelsPresence != nil {
numArgs++
}
if numArgs != 1 {
klog.Fatalf("Exactly 1 predicate argument is required, numArgs: %v, Predicate: %s", numArgs, predicate.Name)
}
}
}
func validatePriorityOrDie(priority schedulerapi.PriorityPolicy) {
if priority.Argument != nil {
numArgs := 0
if priority.Argument.ServiceAntiAffinity != nil {
numArgs++
}
if priority.Argument.LabelPreference != nil {
numArgs++
}
if priority.Argument.RequestedToCapacityRatioArguments != nil {
numArgs++
}
if numArgs != 1 {
klog.Fatalf("Exactly 1 priority argument is required, numArgs: %v, Priority: %s", numArgs, priority.Name)
}
}
}
// ListAlgorithmProviders is called when listing all available algorithm providers in `kube-scheduler --help`
func ListAlgorithmProviders() string {
var availableAlgorithmProviders []string
for name := range algorithmRegistry.algorithmProviders {
availableAlgorithmProviders = append(availableAlgorithmProviders, name)
}
sort.Strings(availableAlgorithmProviders)
return strings.Join(availableAlgorithmProviders, " | ")
}

View File

@ -1,100 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
)
func TestAlgorithmNameValidation(t *testing.T) {
algorithmNamesShouldValidate := []string{
"1SomeAlgo1rithm",
"someAlgor-ithm1",
}
algorithmNamesShouldNotValidate := []string{
"-SomeAlgorithm",
"SomeAlgorithm-",
"Some,Alg:orithm",
}
for _, name := range algorithmNamesShouldValidate {
t.Run(name, func(t *testing.T) {
if !validName.MatchString(name) {
t.Errorf("should be a valid algorithm name but is not valid.")
}
})
}
for _, name := range algorithmNamesShouldNotValidate {
t.Run(name, func(t *testing.T) {
if validName.MatchString(name) {
t.Errorf("should be an invalid algorithm name but is valid.")
}
})
}
}
func TestBuildScoringFunctionShapeFromRequestedToCapacityRatioArguments(t *testing.T) {
arguments := schedulerapi.RequestedToCapacityRatioArguments{
Shape: []schedulerapi.UtilizationShapePoint{
{Utilization: 10, Score: 1},
{Utilization: 30, Score: 5},
{Utilization: 70, Score: 2},
},
Resources: []schedulerapi.ResourceSpec{
{Name: string(v1.ResourceCPU)},
{Name: string(v1.ResourceMemory)},
},
}
builtShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(&arguments)
expectedShape, _ := noderesources.NewFunctionShape([]noderesources.FunctionShapePoint{
{Utilization: 10, Score: 10},
{Utilization: 30, Score: 50},
{Utilization: 70, Score: 20},
})
expectedResources := noderesources.ResourceToWeightMap{
v1.ResourceCPU: 1,
v1.ResourceMemory: 1,
}
assert.Equal(t, expectedShape, builtShape)
assert.Equal(t, expectedResources, resources)
}
func TestBuildScoringFunctionShapeFromRequestedToCapacityRatioArgumentsNilResourceToWeightMap(t *testing.T) {
arguments := schedulerapi.RequestedToCapacityRatioArguments{
Shape: []schedulerapi.UtilizationShapePoint{
{Utilization: 10, Score: 1},
{Utilization: 30, Score: 5},
{Utilization: 70, Score: 2},
},
}
builtShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(&arguments)
expectedShape, _ := noderesources.NewFunctionShape([]noderesources.FunctionShapePoint{
{Utilization: 10, Score: 10},
{Utilization: 30, Score: 50},
{Utilization: 70, Score: 20},
})
expectedResources := noderesources.ResourceToWeightMap{
v1.ResourceCPU: 1,
v1.ResourceMemory: 1,
}
assert.Equal(t, expectedShape, builtShape)
assert.Equal(t, expectedResources, resources)
}

View File

@ -8,14 +8,10 @@ load(
go_library(
name = "go_default_library",
srcs = [
"plugins.go",
"registry.go",
],
srcs = ["registry.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/algorithmprovider",
deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithmprovider/defaults:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins/defaultpodtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/imagelocality:go_default_library",
@ -40,20 +36,29 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"plugins_test.go",
"registry_test.go",
],
srcs = ["registry_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//pkg/scheduler/framework/plugins/defaultpodtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/imagelocality:go_default_library",
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/nodeaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/nodename:go_default_library",
"//pkg/scheduler/framework/plugins/nodeports:go_default_library",
"//pkg/scheduler/framework/plugins/nodepreferavoidpods:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library",
"//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",
"//pkg/scheduler/framework/plugins/volumezone:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
],
)

View File

@ -1,44 +1,10 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"defaults.go",
"register_priorities.go",
],
srcs = ["defaults.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults",
deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["defaults_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
],
visibility = ["//visibility:public"],
)
filegroup(
@ -52,4 +18,5 @@ filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -16,104 +16,4 @@ limitations under the License.
package defaults
import (
"k8s.io/klog"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
const (
// ClusterAutoscalerProvider defines the default autoscaler provider
ClusterAutoscalerProvider = "ClusterAutoscalerProvider"
)
func init() {
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
}
func defaultPredicates() sets.String {
return sets.NewString(
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckVolumeBindingPred,
predicates.CheckNodeUnschedulablePred,
)
}
// ApplyFeatureGates applies algorithm by feature gates.
// The returned function is used to restore the state of registered predicates/priorities
// when this function is called, and should be called in tests which may modify the value
// of a feature gate temporarily.
// TODO(Huang-Wei): refactor this function to have a clean way to disable/enable plugins.
func ApplyFeatureGates() (restore func()) {
snapshot := scheduler.RegisteredPredicatesAndPrioritiesSnapshot()
// Only register EvenPodsSpread predicate & priority if the feature is enabled
if utilfeature.DefaultFeatureGate.Enabled(features.EvenPodsSpread) {
klog.Infof("Registering EvenPodsSpread predicate and priority function")
// register predicate
scheduler.AddPredicateToAlgorithmProviders(predicates.EvenPodsSpreadPred)
scheduler.RegisterPredicate(predicates.EvenPodsSpreadPred)
// register priority
scheduler.AddPriorityToAlgorithmProviders(priorities.EvenPodsSpreadPriority)
scheduler.RegisterPriority(priorities.EvenPodsSpreadPriority, 1)
}
// Prioritizes nodes that satisfy pod's resource limits
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
klog.Infof("Registering resourcelimits priority function")
scheduler.RegisterPriority(priorities.ResourceLimitsPriority, 1)
// Register the priority function to specific provider too.
scheduler.AddPriorityToAlgorithmProviders(priorities.ResourceLimitsPriority)
}
restore = func() {
scheduler.ApplyPredicatesAndPriorities(snapshot)
}
return
}
func registerAlgorithmProvider(predSet, priSet sets.String) {
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
// by specifying flag.
scheduler.RegisterAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName, predSet, priSet)
// Cluster autoscaler friendly scheduling algorithm.
scheduler.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}
func defaultPriorities() sets.String {
return sets.NewString(
priorities.SelectorSpreadPriority,
priorities.InterPodAffinityPriority,
priorities.LeastRequestedPriority,
priorities.BalancedResourceAllocation,
priorities.NodePreferAvoidPodsPriority,
priorities.NodeAffinityPriority,
priorities.TaintTolerationPriority,
priorities.ImageLocalityPriority,
)
}
func copyAndReplace(set sets.String, replaceWhat, replaceWith string) sets.String {
result := sets.NewString(set.List()...)
if result.Has(replaceWhat) {
result.Delete(replaceWhat)
result.Insert(replaceWith)
}
return result
}
// TODO(ahg-g): remove this pkg.

View File

@ -1,258 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package defaults
import (
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
)
func TestCopyAndReplace(t *testing.T) {
testCases := []struct {
set sets.String
replaceWhat string
replaceWith string
expected sets.String
}{
{
set: sets.String{"A": sets.Empty{}, "B": sets.Empty{}},
replaceWhat: "A",
replaceWith: "C",
expected: sets.String{"B": sets.Empty{}, "C": sets.Empty{}},
},
{
set: sets.String{"A": sets.Empty{}, "B": sets.Empty{}},
replaceWhat: "D",
replaceWith: "C",
expected: sets.String{"A": sets.Empty{}, "B": sets.Empty{}},
},
}
for _, testCase := range testCases {
result := copyAndReplace(testCase.set, testCase.replaceWhat, testCase.replaceWith)
if !result.Equal(testCase.expected) {
t.Errorf("expected %v got %v", testCase.expected, result)
}
}
}
func TestDefaultPriorities(t *testing.T) {
result := sets.NewString(
priorities.SelectorSpreadPriority,
priorities.InterPodAffinityPriority,
priorities.LeastRequestedPriority,
priorities.BalancedResourceAllocation,
priorities.NodePreferAvoidPodsPriority,
priorities.NodeAffinityPriority,
priorities.TaintTolerationPriority,
priorities.ImageLocalityPriority,
)
if expected := defaultPriorities(); !result.Equal(expected) {
t.Errorf("expected %v got %v", expected, result)
}
}
func TestDefaultPredicates(t *testing.T) {
result := sets.NewString(
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckVolumeBindingPred,
predicates.CheckNodeUnschedulablePred,
)
if expected := defaultPredicates(); !result.Equal(expected) {
t.Errorf("expected %v got %v", expected, result)
}
}
func TestCompatibility(t *testing.T) {
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
testcases := []struct {
name string
provider string
wantPlugins map[string][]config.Plugin
}{
{
name: "No Provider specified",
wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
},
},
{
name: "DefaultProvider",
provider: config.SchedulerDefaultProviderName,
wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
},
},
{
name: "ClusterAutoscalerProvider",
provider: ClusterAutoscalerProvider,
wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesMostAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
var opts []scheduler.Option
if len(tc.provider) != 0 {
opts = append(opts, scheduler.WithAlgorithmSource(config.SchedulerAlgorithmSource{
Provider: &tc.provider,
}))
}
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
sched, err := scheduler.New(
client,
informerFactory,
informerFactory.Core().V1().Pods(),
nil,
make(chan struct{}),
opts...,
)
if err != nil {
t.Fatalf("Error constructing: %v", err)
}
gotPlugins := sched.Framework.ListPlugins()
if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
})
}
}

View File

@ -1,34 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package defaults
import (
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
)
func init() {
// Register functions that extract metadata used by priorities computations.
scheduler.RegisterPriorityMetadataProducerFactory(
func(args scheduler.AlgorithmFactoryArgs) priorities.MetadataProducer {
serviceLister := args.InformerFactory.Core().V1().Services().Lister()
controllerLister := args.InformerFactory.Core().V1().ReplicationControllers().Lister()
replicaSetLister := args.InformerFactory.Apps().V1().ReplicaSets().Lister()
statefulSetLister := args.InformerFactory.Apps().V1().StatefulSets().Lister()
return priorities.NewMetadataFactory(serviceLister, controllerLister, replicaSetLister, statefulSetLister, args.HardPodAffinitySymmetricWeight)
})
}

View File

@ -1,26 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package algorithmprovider
import (
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
)
// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() func() {
return defaults.ApplyFeatureGates()
}

View File

@ -1,60 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package algorithmprovider
import (
"testing"
"k8s.io/kubernetes/pkg/scheduler"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
var (
algorithmProviderNames = []string{
schedulerapi.SchedulerDefaultProviderName,
}
)
func TestApplyFeatureGates(t *testing.T) {
for _, pn := range algorithmProviderNames {
t.Run(pn, func(t *testing.T) {
p, err := scheduler.GetAlgorithmProvider(pn)
if err != nil {
t.Fatalf("Error retrieving provider: %v", err)
}
if !p.PredicateKeys.Has("PodToleratesNodeTaints") {
t.Fatalf("Failed to find predicate: 'PodToleratesNodeTaints'")
}
})
}
defer ApplyFeatureGates()()
for _, pn := range algorithmProviderNames {
t.Run(pn, func(t *testing.T) {
p, err := scheduler.GetAlgorithmProvider(pn)
if err != nil {
t.Fatalf("Error retrieving '%s' provider: %v", pn, err)
}
if !p.PredicateKeys.Has("PodToleratesNodeTaints") {
t.Fatalf("Failed to find predicate: 'PodToleratesNodeTaints'")
}
})
}
}

View File

@ -18,6 +18,8 @@ package algorithmprovider
import (
"fmt"
"sort"
"strings"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -67,6 +69,17 @@ func NewRegistry(hardPodAffinityWeight int64) Registry {
}
}
// ListAlgorithmProviders lists registered algorithm providers.
func ListAlgorithmProviders() string {
r := NewRegistry(1)
var providers []string
for k := range r {
providers = append(providers, k)
}
sort.Strings(providers)
return strings.Join(providers, " | ")
}
func getDefaultConfig(hardPodAffinityWeight int64) *Config {
return &Config{
FrameworkPlugins: &schedulerapi.Plugins{
@ -148,6 +161,7 @@ func applyFeatureGates(config *Config) {
f := schedulerapi.Plugin{Name: podtopologyspread.Name}
config.FrameworkPlugins.PreFilter.Enabled = append(config.FrameworkPlugins.PreFilter.Enabled, f)
config.FrameworkPlugins.Filter.Enabled = append(config.FrameworkPlugins.Filter.Enabled, f)
config.FrameworkPlugins.PostFilter.Enabled = append(config.FrameworkPlugins.PostFilter.Enabled, f)
s := schedulerapi.Plugin{Name: podtopologyspread.Name, Weight: 1}
config.FrameworkPlugins.Score.Enabled = append(config.FrameworkPlugins.Score.Enabled, s)
}
@ -155,6 +169,13 @@ func applyFeatureGates(config *Config) {
// Prioritizes nodes that satisfy pod's resource limits
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
klog.Infof("Registering resourcelimits priority function")
// TODO(ahg-g): append to config.FrameworkPlugins.Score.Enabled when available.
s := schedulerapi.Plugin{Name: noderesources.ResourceLimitsName, Weight: 1}
config.FrameworkPlugins.Score.Enabled = append(config.FrameworkPlugins.Score.Enabled, s)
}
}
// ApplyFeatureGates applies algorithm by feature gates.
// TODO(ahg-g): DEPRECATED, remove.
func ApplyFeatureGates() func() {
return func() {}
}

View File

@ -17,82 +17,233 @@ limitations under the License.
package algorithmprovider
import (
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodepreferavoidpods"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
)
func TestCompatibility(t *testing.T) {
testcases := []struct {
name string
provider string
wantPlugins map[string][]config.Plugin
}{
{
name: "DefaultProvider",
provider: config.SchedulerDefaultProviderName,
func TestClusterAutoscalerProvider(t *testing.T) {
hardPodAffinityWeight := int64(1)
wantConfig := &Config{
FrameworkPlugins: &schedulerapi.Plugins{
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},
{Name: nodeports.Name},
{Name: interpodaffinity.Name},
},
},
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: nodeunschedulable.Name},
{Name: noderesources.FitName},
{Name: nodename.Name},
{Name: nodeports.Name},
{Name: nodeaffinity.Name},
{Name: volumerestrictions.Name},
{Name: tainttoleration.Name},
{Name: nodevolumelimits.EBSName},
{Name: nodevolumelimits.GCEPDName},
{Name: nodevolumelimits.CSIName},
{Name: nodevolumelimits.AzureDiskName},
{Name: volumebinding.Name},
{Name: volumezone.Name},
{Name: interpodaffinity.Name},
},
},
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: tainttoleration.Name},
},
},
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.BalancedAllocationName, Weight: 1},
{Name: imagelocality.Name, Weight: 1},
{Name: interpodaffinity.Name, Weight: 1},
{Name: noderesources.MostAllocatedName, Weight: 1},
{Name: nodeaffinity.Name, Weight: 1},
{Name: nodepreferavoidpods.Name, Weight: 10000},
{Name: defaultpodtopologyspread.Name, Weight: 1},
{Name: tainttoleration.Name, Weight: 1},
},
},
},
{
name: "ClusterAutoscalerProvider",
provider: ClusterAutoscalerProvider,
FrameworkPluginConfig: []schedulerapi.PluginConfig{
{
Name: interpodaffinity.Name,
Args: runtime.Unknown{Raw: []byte(fmt.Sprintf(`{"hardPodAffinityWeight":%d}`, hardPodAffinityWeight))},
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
sched, err := scheduler.New(
client,
informerFactory,
informerFactory.Core().V1().Pods(),
nil,
make(chan struct{}),
scheduler.WithAlgorithmSource(config.SchedulerAlgorithmSource{
Provider: &tc.provider,
}))
if err != nil {
t.Fatalf("Error constructing: %v", err)
}
gotPlugins := sched.Framework.ListPlugins()
r := NewRegistry(hardPodAffinityWeight)
gotConfig := r[ClusterAutoscalerProvider]
if diff := cmp.Diff(wantConfig, gotConfig); diff != "" {
t.Errorf("unexpected config diff (-want, +got): %s", diff)
}
}
volumeBinder := volumebinder.NewVolumeBinder(
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().StorageClasses(),
time.Second,
)
providerRegistry := NewRegistry(1)
config := providerRegistry[tc.provider]
fwk, err := framework.NewFramework(
plugins.NewInTreeRegistry(&plugins.RegistryArgs{
VolumeBinder: volumeBinder,
}),
config.FrameworkPlugins,
config.FrameworkPluginConfig,
framework.WithClientSet(client),
framework.WithInformerFactory(informerFactory),
framework.WithSnapshotSharedLister(nodeinfosnapshot.NewEmptySnapshot()),
)
if err != nil {
t.Fatalf("error initializing the scheduling framework: %v", err)
}
wantPlugins := fwk.ListPlugins()
func TestApplyFeatureGates(t *testing.T) {
hardPodAffinityWeight := int64(1)
tests := []struct {
name string
featuresEnabled bool
wantConfig *Config
}{
{
name: "Feature gates disabled",
featuresEnabled: false,
wantConfig: &Config{
FrameworkPlugins: &schedulerapi.Plugins{
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},
{Name: nodeports.Name},
{Name: interpodaffinity.Name},
},
},
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: nodeunschedulable.Name},
{Name: noderesources.FitName},
{Name: nodename.Name},
{Name: nodeports.Name},
{Name: nodeaffinity.Name},
{Name: volumerestrictions.Name},
{Name: tainttoleration.Name},
{Name: nodevolumelimits.EBSName},
{Name: nodevolumelimits.GCEPDName},
{Name: nodevolumelimits.CSIName},
{Name: nodevolumelimits.AzureDiskName},
{Name: volumebinding.Name},
{Name: volumezone.Name},
{Name: interpodaffinity.Name},
},
},
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: tainttoleration.Name},
},
},
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.BalancedAllocationName, Weight: 1},
{Name: imagelocality.Name, Weight: 1},
{Name: interpodaffinity.Name, Weight: 1},
{Name: noderesources.LeastAllocatedName, Weight: 1},
{Name: nodeaffinity.Name, Weight: 1},
{Name: nodepreferavoidpods.Name, Weight: 10000},
{Name: defaultpodtopologyspread.Name, Weight: 1},
{Name: tainttoleration.Name, Weight: 1},
},
},
},
FrameworkPluginConfig: []schedulerapi.PluginConfig{
{
Name: interpodaffinity.Name,
Args: runtime.Unknown{Raw: []byte(fmt.Sprintf(`{"hardPodAffinityWeight":%d}`, hardPodAffinityWeight))},
},
},
},
},
{
name: "Feature gates enabled",
featuresEnabled: true,
wantConfig: &Config{
FrameworkPlugins: &schedulerapi.Plugins{
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},
{Name: nodeports.Name},
{Name: interpodaffinity.Name},
{Name: podtopologyspread.Name},
},
},
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: nodeunschedulable.Name},
{Name: noderesources.FitName},
{Name: nodename.Name},
{Name: nodeports.Name},
{Name: nodeaffinity.Name},
{Name: volumerestrictions.Name},
{Name: tainttoleration.Name},
{Name: nodevolumelimits.EBSName},
{Name: nodevolumelimits.GCEPDName},
{Name: nodevolumelimits.CSIName},
{Name: nodevolumelimits.AzureDiskName},
{Name: volumebinding.Name},
{Name: volumezone.Name},
{Name: interpodaffinity.Name},
{Name: podtopologyspread.Name},
},
},
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: tainttoleration.Name},
{Name: podtopologyspread.Name},
},
},
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.BalancedAllocationName, Weight: 1},
{Name: imagelocality.Name, Weight: 1},
{Name: interpodaffinity.Name, Weight: 1},
{Name: noderesources.LeastAllocatedName, Weight: 1},
{Name: nodeaffinity.Name, Weight: 1},
{Name: nodepreferavoidpods.Name, Weight: 10000},
{Name: defaultpodtopologyspread.Name, Weight: 1},
{Name: tainttoleration.Name, Weight: 1},
{Name: podtopologyspread.Name, Weight: 1},
{Name: noderesources.ResourceLimitsName, Weight: 1},
},
},
},
FrameworkPluginConfig: []schedulerapi.PluginConfig{
{
Name: interpodaffinity.Name,
Args: runtime.Unknown{Raw: []byte(fmt.Sprintf(`{"hardPodAffinityWeight":%d}`, hardPodAffinityWeight))},
},
},
},
},
}
if diff := cmp.Diff(wantPlugins, gotPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceLimitsPriorityFunction, test.featuresEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EvenPodsSpread, test.featuresEnabled)()
r := NewRegistry(hardPodAffinityWeight)
gotConfig := r[schedulerapi.SchedulerDefaultProviderName]
if diff := cmp.Diff(test.wantConfig, gotConfig); diff != "" {
t.Errorf("unexpected config diff (-want, +got): %s", diff)
}
})
}

View File

@ -11,7 +11,6 @@ go_test(
"//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/algorithmprovider/defaults:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/core:go_default_library",

View File

@ -32,7 +32,6 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core"
)
@ -134,6 +133,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ScorePlugin": {
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeLabel", Weight: 4},
{Name: "DefaultPodTopologySpread", Weight: 2},
{Name: "ServiceAffinity", Weight: 3},
},
},
@ -159,8 +159,10 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{"name": "LeastRequestedPriority", "weight": 2},
{"name": "BalancedResourceAllocation", "weight": 2},
{"name": "SelectorSpreadPriority", "weight": 2},
{"name": "TestServiceAntiAffinity", "weight": 3, "argument": {"serviceAntiAffinity": {"label": "zone"}}},
{"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}}
{"name": "TestServiceAntiAffinity1", "weight": 3, "argument": {"serviceAntiAffinity": {"label": "zone"}}},
{"name": "TestServiceAntiAffinity2", "weight": 3, "argument": {"serviceAntiAffinity": {"label": "region"}}},
{"name": "TestLabelPreference1", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}},
{"name": "TestLabelPreference2", "weight": 4, "argument": {"labelPreference": {"label": "foo", "presence":false}}}
]
}`,
wantPlugins: map[string][]config.Plugin{
@ -183,9 +185,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "NodeResourcesLeastAllocated", Weight: 2},
{Name: "NodeLabel", Weight: 4},
{Name: "NodeLabel", Weight: 8}, // Weight is 4 * number of LabelPreference priorities
{Name: "DefaultPodTopologySpread", Weight: 2},
{Name: "ServiceAffinity", Weight: 3},
{Name: "ServiceAffinity", Weight: 6}, // Weight is the 3 * number of custom ServiceAntiAffinity priorities
},
},
},
@ -1312,7 +1314,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
for feature, value := range tc.featureGates {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, value)()
}
defer algorithmprovider.ApplyFeatureGates()()
policyConfigMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: "scheduler-custom-policy-config"},
Data: map[string]string{config.SchedulerPolicyConfigMapKey: tc.JSON},
@ -1363,3 +1365,133 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
})
}
}
func TestAlgorithmProviderCompatibility(t *testing.T) {
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
defaultPlugins := map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
}
testcases := []struct {
name string
provider string
wantPlugins map[string][]config.Plugin
}{
{
name: "No Provider specified",
wantPlugins: defaultPlugins,
},
{
name: "DefaultProvider",
provider: config.SchedulerDefaultProviderName,
wantPlugins: defaultPlugins,
},
{
name: "ClusterAutoscalerProvider",
provider: algorithmprovider.ClusterAutoscalerProvider,
wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesMostAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
var opts []scheduler.Option
if len(tc.provider) != 0 {
opts = append(opts, scheduler.WithAlgorithmSource(config.SchedulerAlgorithmSource{
Provider: &tc.provider,
}))
}
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
sched, err := scheduler.New(
client,
informerFactory,
informerFactory.Core().V1().Pods(),
nil,
make(chan struct{}),
opts...,
)
if err != nil {
t.Fatalf("Error constructing: %v", err)
}
gotPlugins := sched.Framework.ListPlugins()
if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
})
}
}

View File

@ -108,6 +108,10 @@ func ValidatePolicy(policy config.Policy) error {
if binders > 1 {
validationErrors = append(validationErrors, fmt.Errorf("Only one extender can implement bind, found %v", binders))
}
if policy.HardPodAffinitySymmetricWeight < 0 || policy.HardPodAffinitySymmetricWeight > 100 {
validationErrors = append(validationErrors, field.Invalid(field.NewPath("hardPodAffinitySymmetricWeight"), policy.HardPodAffinitySymmetricWeight, "not in valid range [0-100]"))
}
return utilerrors.NewAggregate(validationErrors)
}

View File

@ -262,6 +262,20 @@ func TestValidatePolicy(t *testing.T) {
},
expected: errors.New("ServiceAntiAffinity priority \"customPriority2\" has a different weight with \"customPriority1\""),
},
{
name: "invalid hardPodAffinitySymmetricWeight, above the range",
policy: config.Policy{
HardPodAffinitySymmetricWeight: 101,
},
expected: errors.New("hardPodAffinitySymmetricWeight: Invalid value: 101: not in valid range [0-100]"),
},
{
name: "invalid hardPodAffinitySymmetricWeight, below the range",
policy: config.Policy{
HardPodAffinitySymmetricWeight: -1,
},
expected: errors.New("hardPodAffinitySymmetricWeight: Invalid value: -1: not in valid range [0-100]"),
},
}
for _, test := range tests {

View File

@ -1117,7 +1117,6 @@ func TestZeroRequest(t *testing.T) {
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
1,
)
metadata := metadataProducer(test.pod, test.nodes, snapshot)

View File

@ -18,6 +18,7 @@ package scheduler
import (
"fmt"
"sort"
"time"
v1 "k8s.io/api/core/v1"
@ -40,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/core"
@ -104,157 +106,18 @@ type Configurator struct {
enableNonPreempting bool
// framework configuration arguments.
registry framework.Registry
plugins *schedulerapi.Plugins
pluginConfig []schedulerapi.PluginConfig
pluginConfigProducerRegistry *plugins.ConfigProducerRegistry
nodeInfoSnapshot *nodeinfosnapshot.Snapshot
algorithmFactoryArgs AlgorithmFactoryArgs
configProducerArgs *plugins.ConfigProducerArgs
registry framework.Registry
plugins *schedulerapi.Plugins
pluginConfig []schedulerapi.PluginConfig
nodeInfoSnapshot *nodeinfosnapshot.Snapshot
}
// Create creates a scheduler with the default algorithm provider.
func (c *Configurator) Create() (*Scheduler, error) {
return c.CreateFromProvider(schedulerapi.SchedulerDefaultProviderName)
}
// CreateFromProvider creates a scheduler from the name of a registered algorithm provider.
func (c *Configurator) CreateFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
return nil, err
}
return c.CreateFromKeys(provider.PredicateKeys, provider.PriorityKeys, []algorithm.SchedulerExtender{})
}
// CreateFromConfig creates a scheduler from the configuration file
func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
// validate the policy configuration
if err := validation.ValidatePolicy(policy); err != nil {
return nil, err
}
predicateKeys := sets.NewString()
if policy.Predicates == nil {
klog.V(2).Infof("Using predicates from algorithm provider '%v'", schedulerapi.SchedulerDefaultProviderName)
provider, err := GetAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName)
if err != nil {
return nil, err
}
predicateKeys = provider.PredicateKeys
} else {
for _, predicate := range policy.Predicates {
klog.V(2).Infof("Registering predicate: %s", predicate.Name)
predicateKeys.Insert(RegisterCustomPredicate(predicate, c.configProducerArgs))
}
}
priorityKeys := sets.NewString()
if policy.Priorities == nil {
klog.V(2).Infof("Using priorities from algorithm provider '%v'", schedulerapi.SchedulerDefaultProviderName)
provider, err := GetAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName)
if err != nil {
return nil, err
}
priorityKeys = provider.PriorityKeys
} else {
for _, priority := range policy.Priorities {
if priority.Name == priorities.EqualPriority {
klog.V(2).Infof("Skip registering priority: %s", priority.Name)
continue
}
klog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys.Insert(RegisterCustomPriority(priority, c.configProducerArgs))
}
}
var extenders []algorithm.SchedulerExtender
if len(policy.Extenders) != 0 {
var ignorableExtenders []algorithm.SchedulerExtender
var ignoredExtendedResources []string
for ii := range policy.Extenders {
klog.V(2).Infof("Creating extender with config %+v", policy.Extenders[ii])
extender, err := core.NewHTTPExtender(&policy.Extenders[ii])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
extenders = append(extenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range policy.Extenders[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
}
}
c.configProducerArgs.NodeResourcesFitArgs = &noderesources.FitArgs{
IgnoredResources: ignoredExtendedResources,
}
// place ignorable extenders to the tail of extenders
extenders = append(extenders, ignorableExtenders...)
}
// Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
// Give it higher precedence than scheduler CLI configuration when it is provided.
if policy.HardPodAffinitySymmetricWeight != 0 {
c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
}
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
}
// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.hardPodAffinitySymmetricWeight < 1 || c.hardPodAffinitySymmetricWeight > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.hardPodAffinitySymmetricWeight)
}
c.configProducerArgs.InterPodAffinityArgs = &interpodaffinity.Args{
HardPodAffinityWeight: c.hardPodAffinitySymmetricWeight,
}
pluginsForPredicates, pluginConfigForPredicates, err := c.getPredicateConfigs(predicateKeys)
if err != nil {
return nil, err
}
pluginsForPriorities, pluginConfigForPriorities, err := c.getPriorityConfigs(priorityKeys)
if err != nil {
return nil, err
}
priorityMetaProducer, err := getPriorityMetadataProducer(c.algorithmFactoryArgs)
if err != nil {
return nil, err
}
// Combine all framework configurations. If this results in any duplication, framework
// instantiation should fail.
var plugins schedulerapi.Plugins
plugins.Append(pluginsForPredicates)
plugins.Append(pluginsForPriorities)
plugins.Append(c.plugins)
var pluginConfig []schedulerapi.PluginConfig
pluginConfig = append(pluginConfig, pluginConfigForPredicates...)
pluginConfig = append(pluginConfig, pluginConfigForPriorities...)
pluginConfig = append(pluginConfig, c.pluginConfig...)
// create a scheduler from a set of registered plugins.
func (c *Configurator) create(extenders []algorithm.SchedulerExtender) (*Scheduler, error) {
framework, err := framework.NewFramework(
c.registry,
&plugins,
pluginConfig,
c.plugins,
c.pluginConfig,
framework.WithClientSet(c.client),
framework.WithInformerFactory(c.informerFactory),
framework.WithSnapshotSharedLister(c.nodeInfoSnapshot),
@ -283,7 +146,12 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
c.schedulerCache,
podQueue,
nil,
priorityMetaProducer,
priorities.NewMetadataFactory(
c.informerFactory.Core().V1().Services().Lister(),
c.informerFactory.Core().V1().ReplicationControllers().Lister(),
c.informerFactory.Apps().V1().ReplicaSets().Lister(),
c.informerFactory.Apps().V1().StatefulSets().Lister(),
),
c.nodeInfoSnapshot,
framework,
extenders,
@ -305,11 +173,215 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: podQueue,
Plugins: plugins,
PluginConfig: pluginConfig,
}, nil
}
// createFromProvider creates a scheduler from the name of a registered algorithm provider.
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
r := algorithmprovider.NewRegistry(int64(c.hardPodAffinitySymmetricWeight))
provider, exist := r[providerName]
if !exist {
return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
}
// Combine the provided plugins with the ones from component config.
// TODO(#86789): address disabled plugins.
var plugins schedulerapi.Plugins
plugins.Append(provider.FrameworkPlugins)
plugins.Append(c.plugins)
c.plugins = &plugins
var pluginConfig []schedulerapi.PluginConfig
pluginConfig = append(pluginConfig, provider.FrameworkPluginConfig...)
pluginConfig = append(pluginConfig, c.pluginConfig...)
c.pluginConfig = pluginConfig
return c.create([]algorithm.SchedulerExtender{})
}
// CreateFromConfig creates a scheduler from the configuration file
func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, error) {
lr := plugins.NewLegacyRegistry()
args := &plugins.ConfigProducerArgs{}
klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
// validate the policy configuration
if err := validation.ValidatePolicy(policy); err != nil {
return nil, err
}
predicateKeys := sets.NewString()
if policy.Predicates == nil {
klog.V(2).Infof("Using predicates from algorithm provider '%v'", schedulerapi.SchedulerDefaultProviderName)
predicateKeys = lr.DefaultPredicates
} else {
for _, predicate := range policy.Predicates {
klog.V(2).Infof("Registering predicate: %s", predicate.Name)
predicateKeys.Insert(lr.ProcessPredicatePolicy(predicate, args))
}
}
priorityKeys := make(map[string]int64)
if policy.Priorities == nil {
klog.V(2).Infof("Using default priorities")
priorityKeys = lr.DefaultPriorities
} else {
for _, priority := range policy.Priorities {
if priority.Name == priorities.EqualPriority {
klog.V(2).Infof("Skip registering priority: %s", priority.Name)
continue
}
klog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys[lr.ProcessPriorityPolicy(priority, args)] = priority.Weight
}
}
var extenders []algorithm.SchedulerExtender
if len(policy.Extenders) != 0 {
var ignorableExtenders []algorithm.SchedulerExtender
var ignoredExtendedResources []string
for ii := range policy.Extenders {
klog.V(2).Infof("Creating extender with config %+v", policy.Extenders[ii])
extender, err := core.NewHTTPExtender(&policy.Extenders[ii])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
extenders = append(extenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range policy.Extenders[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
}
}
args.NodeResourcesFitArgs = &noderesources.FitArgs{
IgnoredResources: ignoredExtendedResources,
}
// place ignorable extenders to the tail of extenders
extenders = append(extenders, ignorableExtenders...)
}
// Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
// Give it higher precedence than scheduler CLI configuration when it is provided.
if policy.HardPodAffinitySymmetricWeight != 0 {
c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
}
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.hardPodAffinitySymmetricWeight < 1 || c.hardPodAffinitySymmetricWeight > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.hardPodAffinitySymmetricWeight)
}
args.InterPodAffinityArgs = &interpodaffinity.Args{
HardPodAffinityWeight: c.hardPodAffinitySymmetricWeight,
}
pluginsForPredicates, pluginConfigForPredicates, err := getPredicateConfigs(predicateKeys, lr, args)
if err != nil {
return nil, err
}
pluginsForPriorities, pluginConfigForPriorities, err := getPriorityConfigs(priorityKeys, lr, args)
if err != nil {
return nil, err
}
// Combine all framework configurations. If this results in any duplication, framework
// instantiation should fail.
var plugins schedulerapi.Plugins
plugins.Append(pluginsForPredicates)
plugins.Append(pluginsForPriorities)
plugins.Append(c.plugins)
c.plugins = &plugins
var pluginConfig []schedulerapi.PluginConfig
pluginConfig = append(pluginConfig, pluginConfigForPredicates...)
pluginConfig = append(pluginConfig, pluginConfigForPriorities...)
pluginConfig = append(pluginConfig, c.pluginConfig...)
c.pluginConfig = pluginConfig
return c.create(extenders)
}
// getPriorityConfigs
// getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run
// as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was
// registered for that priority.
func getPriorityConfigs(keys map[string]int64, lr *plugins.LegacyRegistry, args *plugins.ConfigProducerArgs) (*schedulerapi.Plugins, []schedulerapi.PluginConfig, error) {
var plugins schedulerapi.Plugins
var pluginConfig []schedulerapi.PluginConfig
// Sort the keys so that it is easier for unit tests to do compare.
var sortedKeys []string
for k := range keys {
sortedKeys = append(sortedKeys, k)
}
sort.Strings(sortedKeys)
for _, priority := range sortedKeys {
weight := keys[priority]
producer, exist := lr.PriorityToConfigProducer[priority]
if !exist {
return nil, nil, fmt.Errorf("no config producer registered for %q", priority)
}
a := *args
a.Weight = int32(weight)
pl, plc := producer(a)
plugins.Append(&pl)
pluginConfig = append(pluginConfig, plc...)
}
return &plugins, pluginConfig, nil
}
// getPredicateConfigs returns predicates configuration: ones that will run as fitPredicates and ones that will run
// as framework plugins. Specifically, a predicate will run as a framework plugin if a plugin config producer was
// registered for that predicate.
// Note that the framework executes plugins according to their order in the Plugins list, and so predicates run as plugins
// are added to the Plugins list according to the order specified in predicates.Ordering().
func getPredicateConfigs(keys sets.String, lr *plugins.LegacyRegistry, args *plugins.ConfigProducerArgs) (*schedulerapi.Plugins, []schedulerapi.PluginConfig, error) {
allPredicates := keys.Union(lr.MandatoryPredicates)
// Create the framework plugin configurations, and place them in the order
// that the corresponding predicates were supposed to run.
var plugins schedulerapi.Plugins
var pluginConfig []schedulerapi.PluginConfig
for _, predicateKey := range predicates.Ordering() {
if allPredicates.Has(predicateKey) {
producer, exist := lr.PredicateToConfigProducer[predicateKey]
if !exist {
return nil, nil, fmt.Errorf("no framework config producer registered for %q", predicateKey)
}
pl, plc := producer(*args)
plugins.Append(&pl)
pluginConfig = append(pluginConfig, plc...)
allPredicates.Delete(predicateKey)
}
}
// Third, add the rest in no specific order.
for predicateKey := range allPredicates {
producer, exist := lr.PredicateToConfigProducer[predicateKey]
if !exist {
return nil, nil, fmt.Errorf("no framework config producer registered for %q", predicateKey)
}
pl, plc := producer(*args)
plugins.Append(&pl)
pluginConfig = append(pluginConfig, plc...)
}
return &plugins, pluginConfig, nil
}
// getBinderFunc returns a func which returns an extender that supports bind or a default binder based on the given pod.
func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
defaultBinder := &binder{client}
@ -323,85 +395,6 @@ func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerEx
}
}
// getPriorityConfigs
// getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run
// as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was
// registered for that priority.
func (c *Configurator) getPriorityConfigs(keys sets.String) (*schedulerapi.Plugins, []schedulerapi.PluginConfig, error) {
if c.pluginConfigProducerRegistry == nil {
return nil, nil, nil
}
var plugins schedulerapi.Plugins
var pluginConfig []schedulerapi.PluginConfig
frameworkConfigProducers := c.pluginConfigProducerRegistry.PriorityToConfigProducer
for _, p := range keys.List() {
weight, exist := algorithmRegistry.priorityKeys[p]
if !exist {
return nil, nil, fmt.Errorf("priority key %q is not registered", p)
}
if producer, exist := frameworkConfigProducers[p]; exist {
args := *c.configProducerArgs
args.Weight = int32(weight)
pl, pc := producer(args)
plugins.Append(&pl)
pluginConfig = append(pluginConfig, pc...)
}
}
return &plugins, pluginConfig, nil
}
// getPredicateConfigs returns predicates configuration: ones that will run as fitPredicates and ones that will run
// as framework plugins. Specifically, a predicate will run as a framework plugin if a plugin config producer was
// registered for that predicate.
// Note that the framework executes plugins according to their order in the Plugins list, and so predicates run as plugins
// are added to the Plugins list according to the order specified in predicates.Ordering().
func (c *Configurator) getPredicateConfigs(keys sets.String) (*schedulerapi.Plugins, []schedulerapi.PluginConfig, error) {
if c.pluginConfigProducerRegistry == nil {
// No config producer registry available, so predicates can't be translated to plugins.
return nil, nil, fmt.Errorf("No config producer registry available, can't producer plugins configs for provided predicate keys")
}
allPredicates := keys.Union(algorithmRegistry.mandatoryPredicateKeys)
if allPredicates.Has("PodFitsPorts") {
// For compatibility reasons, "PodFitsPorts" as a key is still supported.
allPredicates.Delete("PodFitsPorts")
allPredicates.Insert(predicates.PodFitsHostPortsPred)
}
frameworkConfigProducers := c.pluginConfigProducerRegistry.PredicateToConfigProducer
// Create the framework plugin configurations, and place them in the order
// that the corresponding predicates were supposed to run.
var plugins schedulerapi.Plugins
var pluginConfig []schedulerapi.PluginConfig
for _, predicateKey := range predicates.Ordering() {
if allPredicates.Has(predicateKey) {
producer, exist := frameworkConfigProducers[predicateKey]
if !exist {
return nil, nil, fmt.Errorf("no framework config producer registered for %q", predicateKey)
}
p, pc := producer(*c.configProducerArgs)
plugins.Append(&p)
pluginConfig = append(pluginConfig, pc...)
allPredicates.Delete(predicateKey)
}
}
// Third, add the rest in no specific order.
for predicateKey := range allPredicates {
producer, exist := frameworkConfigProducers[predicateKey]
if !exist {
return nil, nil, fmt.Errorf("no framework config producer registered for %q", predicateKey)
}
p, pc := producer(*c.configProducerArgs)
plugins.Append(&p)
pluginConfig = append(pluginConfig, pc...)
}
return &plugins, pluginConfig, nil
}
type podInformer struct {
informer cache.SharedIndexInformer
}

View File

@ -29,7 +29,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
@ -65,7 +64,7 @@ func TestCreate(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
factory.Create()
factory.createFromProvider(schedulerapi.SchedulerDefaultProviderName)
}
// Test configures a scheduler from a policies defined in a file
@ -102,9 +101,9 @@ func TestCreateFromConfig(t *testing.T) {
t.Errorf("Invalid configuration: %v", err)
}
sched, err := factory.CreateFromConfig(policy)
sched, err := factory.createFromConfig(policy)
if err != nil {
t.Fatalf("CreateFromConfig failed: %v", err)
t.Fatalf("createFromConfig failed: %v", err)
}
hpa := factory.hardPodAffinitySymmetricWeight
if hpa != v1.DefaultHardPodAffinitySymmetricWeight {
@ -113,13 +112,13 @@ func TestCreateFromConfig(t *testing.T) {
// Verify that node label predicate/priority are converted to framework plugins.
wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}`
verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, 6, wantArgs)
verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, factory, 6, wantArgs)
// Verify that service affinity custom predicate/priority is converted to framework plugin.
wantArgs = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"],"antiAffinityLabelsPreference":["rack","zone"]}}`
verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, 6, wantArgs)
verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, factory, 6, wantArgs)
}
func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, sched *Scheduler, wantWeight int32, wantArgs string) {
func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, sched *Scheduler, configurator *Configurator, wantWeight int32, wantArgs string) {
for _, extensionPoint := range extentionPoints {
plugin, ok := findPlugin(name, extensionPoint, sched)
if !ok {
@ -131,7 +130,7 @@ func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string,
}
}
// Verify that the policy config is converted to plugin config.
pluginConfig := findPluginConfig(name, sched)
pluginConfig := findPluginConfig(name, configurator)
encoding, err := json.Marshal(pluginConfig)
if err != nil {
t.Errorf("Failed to marshal %+v: %v", pluginConfig, err)
@ -151,8 +150,8 @@ func findPlugin(name, extensionPoint string, sched *Scheduler) (schedulerapi.Plu
return schedulerapi.Plugin{}, false
}
func findPluginConfig(name string, sched *Scheduler) schedulerapi.PluginConfig {
for _, c := range sched.PluginConfig {
func findPluginConfig(name string, configurator *Configurator) schedulerapi.PluginConfig {
for _, c := range configurator.pluginConfig {
if c.Name == name {
return c
}
@ -188,7 +187,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil {
t.Errorf("Invalid configuration: %v", err)
}
factory.CreateFromConfig(policy)
factory.createFromConfig(policy)
hpa := factory.hardPodAffinitySymmetricWeight
if hpa != 10 {
t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", 10, hpa)
@ -209,21 +208,18 @@ func TestCreateFromEmptyConfig(t *testing.T) {
t.Errorf("Invalid configuration: %v", err)
}
factory.CreateFromConfig(policy)
factory.createFromConfig(policy)
}
// Test configures a scheduler from a policy that does not specify any
// predicate/priority.
// The predicate/priority from DefaultProvider will be used.
// TODO(Huang-Wei): refactor (or remove) this test along with eliminating 'RegisterFitPredicate()'.
func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
client := fake.NewSimpleClientset()
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
RegisterAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName, sets.NewString("PodFitsResources"), sets.NewString("NodeAffinityPriority"))
configData := []byte(`{
"kind" : "Policy",
"apiVersion" : "v1"
@ -233,24 +229,15 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
t.Fatalf("Invalid configuration: %v", err)
}
c, err := factory.CreateFromConfig(policy)
sched, err := factory.createFromConfig(policy)
if err != nil {
t.Fatalf("Failed to create scheduler from configuration: %v", err)
}
if !foundPlugin(c.Plugins.Filter.Enabled, "NodeResourcesFit") {
if _, exist := findPlugin("NodeResourcesFit", "FilterPlugin", sched); !exist {
t.Errorf("Expected plugin NodeResourcesFit")
}
}
func foundPlugin(plugins []schedulerapi.Plugin, name string) bool {
for _, plugin := range plugins {
if plugin.Name == name {
return true
}
}
return false
}
func TestDefaultErrorFunc(t *testing.T) {
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
@ -435,55 +422,9 @@ func testBind(binding *v1.Binding, t *testing.T) {
}
}
func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
client := fake.NewSimpleClientset()
// factory of "default-scheduler"
stopCh := make(chan struct{})
factory := newConfigFactory(client, -1, stopCh)
defer close(stopCh)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: invalid hardPodAffinitySymmetricWeight, got nothing")
}
}
func TestInvalidFactoryArgs(t *testing.T) {
client := fake.NewSimpleClientset()
testCases := []struct {
name string
hardPodAffinitySymmetricWeight int32
expectErr string
}{
{
name: "symmetric weight below range",
hardPodAffinitySymmetricWeight: -1,
expectErr: "invalid hardPodAffinitySymmetricWeight: -1, must be in the range [0-100]",
},
{
name: "symmetric weight above range",
hardPodAffinitySymmetricWeight: 101,
expectErr: "invalid hardPodAffinitySymmetricWeight: 101, must be in the range [0-100]",
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
stopCh := make(chan struct{})
factory := newConfigFactory(client, test.hardPodAffinitySymmetricWeight, stopCh)
defer close(stopCh)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: %s, got nothing", test.expectErr)
}
})
}
}
func newConfigFactoryWithFrameworkRegistry(
client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{},
registry framework.Registry, pluginConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry) *Configurator {
registry framework.Registry) *Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
snapshot := nodeinfosnapshot.NewEmptySnapshot()
return &Configurator{
@ -501,21 +442,14 @@ func newConfigFactoryWithFrameworkRegistry(
registry: registry,
plugins: nil,
pluginConfig: []schedulerapi.PluginConfig{},
pluginConfigProducerRegistry: pluginConfigProducerRegistry,
nodeInfoSnapshot: snapshot,
algorithmFactoryArgs: AlgorithmFactoryArgs{
SharedLister: snapshot,
InformerFactory: informerFactory,
HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
},
configProducerArgs: &frameworkplugins.ConfigProducerArgs{},
}
}
func newConfigFactory(
client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) *Configurator {
return newConfigFactoryWithFrameworkRegistry(client, hardPodAffinitySymmetricWeight, stopCh,
frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{}), frameworkplugins.NewConfigProducerRegistry())
frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{}))
}
type fakeExtender struct {

View File

@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["registry.go"],
srcs = [
"legacy_registry.go",
"registry.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
@ -28,7 +32,10 @@ go_library(
"//pkg/scheduler/framework/plugins/volumezone:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@ -71,10 +78,13 @@ filegroup(
go_test(
name = "go_default_test",
srcs = ["registry_test.go"],
srcs = ["legacy_registry_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)

View File

@ -357,7 +357,6 @@ func TestDefaultPodTopologySpreadScore(t *testing.T) {
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss),
1,
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
@ -615,7 +614,6 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss),
1,
)
metaData := metaDataProducer(test.pod, nodes, snapshot)

View File

@ -0,0 +1,581 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugins
import (
"encoding/json"
"fmt"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodepreferavoidpods"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// LegacyRegistry is used to store current state of registered predicates and priorities.
type LegacyRegistry struct {
// maps that associate predicates/priorities with framework plugin configurations.
PredicateToConfigProducer map[string]ConfigProducer
PriorityToConfigProducer map[string]ConfigProducer
// predicates that will always be configured.
MandatoryPredicates sets.String
// predicates and priorities that will be used if either was set to nil in a
// given v1.Policy configuration.
DefaultPredicates sets.String
DefaultPriorities map[string]int64
}
// ConfigProducerArgs contains arguments that are passed to the producer.
// As we add more predicates/priorities to framework plugins mappings, more arguments
// may be added here.
type ConfigProducerArgs struct {
// Weight used for priority functions.
Weight int32
// NodeLabelArgs is the args for the NodeLabel plugin.
NodeLabelArgs *nodelabel.Args
// RequestedToCapacityRatioArgs is the args for the RequestedToCapacityRatio plugin.
RequestedToCapacityRatioArgs *noderesources.RequestedToCapacityRatioArgs
// ServiceAffinityArgs is the args for the ServiceAffinity plugin.
ServiceAffinityArgs *serviceaffinity.Args
// NodeResourcesFitArgs is the args for the NodeResources fit filter.
NodeResourcesFitArgs *noderesources.FitArgs
// InterPodAffinityArgs is the args for InterPodAffinity plugin
InterPodAffinityArgs *interpodaffinity.Args
}
// ConfigProducer returns the set of plugins and their configuration for a
// predicate/priority given the args.
type ConfigProducer func(args ConfigProducerArgs) (config.Plugins, []config.PluginConfig)
// NewLegacyRegistry returns a legacy algorithm registry of predicates and priorities.
func NewLegacyRegistry() *LegacyRegistry {
registry := &LegacyRegistry{
// MandatoryPredicates the set of keys for predicates that the scheduler will
// be configured with all the time.
MandatoryPredicates: sets.NewString(
predicates.PodToleratesNodeTaintsPred,
predicates.CheckNodeUnschedulablePred,
),
// Used as the default set of predicates if Policy was specified, but predicates was nil.
DefaultPredicates: sets.NewString(
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckVolumeBindingPred,
predicates.CheckNodeUnschedulablePred,
),
// Used as the default set of predicates if Policy was specified, but priorities was nil.
DefaultPriorities: map[string]int64{
priorities.SelectorSpreadPriority: 1,
priorities.InterPodAffinityPriority: 1,
priorities.LeastRequestedPriority: 1,
priorities.BalancedResourceAllocation: 1,
priorities.NodePreferAvoidPodsPriority: 10000,
priorities.NodeAffinityPriority: 1,
priorities.TaintTolerationPriority: 1,
priorities.ImageLocalityPriority: 1,
},
PredicateToConfigProducer: make(map[string]ConfigProducer),
PriorityToConfigProducer: make(map[string]ConfigProducer),
}
registry.registerPredicateConfigProducer(predicates.GeneralPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
// GeneralPredicate is a combination of predicates.
plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
return
})
registry.registerPredicateConfigProducer(predicates.PodToleratesNodeTaintsPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, tainttoleration.Name, nil)
return
})
registry.registerPredicateConfigProducer(predicates.PodFitsResourcesPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
return
})
registry.registerPredicateConfigProducer(predicates.HostNamePred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
return
})
registry.registerPredicateConfigProducer(predicates.PodFitsHostPortsPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
return
})
registry.registerPredicateConfigProducer(predicates.MatchNodeSelectorPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
return
})
registry.registerPredicateConfigProducer(predicates.CheckNodeUnschedulablePred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeunschedulable.Name, nil)
return
})
registry.registerPredicateConfigProducer(predicates.CheckVolumeBindingPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil)
return
})
registry.registerPredicateConfigProducer(predicates.NoDiskConflictPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumerestrictions.Name, nil)
return
})
registry.registerPredicateConfigProducer(predicates.NoVolumeZoneConflictPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumezone.Name, nil)
return
})
registry.registerPredicateConfigProducer(predicates.MaxCSIVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CSIName, nil)
return
})
registry.registerPredicateConfigProducer(predicates.MaxEBSVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.EBSName, nil)
return
})
registry.registerPredicateConfigProducer(predicates.MaxGCEPDVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.GCEPDName, nil)
return
})
registry.registerPredicateConfigProducer(predicates.MaxAzureDiskVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.AzureDiskName, nil)
return
})
registry.registerPredicateConfigProducer(predicates.MaxCinderVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CinderName, nil)
return
})
registry.registerPredicateConfigProducer(predicates.MatchInterPodAffinityPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, interpodaffinity.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, interpodaffinity.Name, nil)
return
})
registry.registerPredicateConfigProducer(predicates.CheckNodeLabelPresencePred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})
registry.registerPredicateConfigProducer(predicates.CheckServiceAffinityPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, serviceaffinity.Name, nil)
pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, serviceaffinity.Name, nil)
return
})
// Register Priorities.
registry.registerPriorityConfigProducer(priorities.SelectorSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, defaultpodtopologyspread.Name, &args.Weight)
return
})
registry.registerPriorityConfigProducer(priorities.TaintTolerationPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, tainttoleration.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, tainttoleration.Name, &args.Weight)
return
})
registry.registerPriorityConfigProducer(priorities.NodeAffinityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodeaffinity.Name, &args.Weight)
return
})
registry.registerPriorityConfigProducer(priorities.ImageLocalityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, imagelocality.Name, &args.Weight)
return
})
registry.registerPriorityConfigProducer(priorities.InterPodAffinityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, interpodaffinity.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, interpodaffinity.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(interpodaffinity.Name, args.InterPodAffinityArgs))
return
})
registry.registerPriorityConfigProducer(priorities.NodePreferAvoidPodsPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodepreferavoidpods.Name, &args.Weight)
return
})
registry.registerPriorityConfigProducer(priorities.MostRequestedPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.MostAllocatedName, &args.Weight)
return
})
registry.registerPriorityConfigProducer(priorities.BalancedResourceAllocation,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.BalancedAllocationName, &args.Weight)
return
})
registry.registerPriorityConfigProducer(priorities.LeastRequestedPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.LeastAllocatedName, &args.Weight)
return
})
registry.registerPriorityConfigProducer(noderesources.RequestedToCapacityRatioName,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.RequestedToCapacityRatioName, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.RequestedToCapacityRatioName, args.RequestedToCapacityRatioArgs))
return
})
registry.registerPriorityConfigProducer(nodelabel.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
// If there are n LabelPreference priorities in the policy, the weight for the corresponding
// score plugin is n*weight (note that the validation logic verifies that all LabelPreference
// priorities specified in Policy have the same weight).
weight := args.Weight * int32(len(args.NodeLabelArgs.PresentLabelsPreference)+len(args.NodeLabelArgs.AbsentLabelsPreference))
plugins.Score = appendToPluginSet(plugins.Score, nodelabel.Name, &weight)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})
registry.registerPriorityConfigProducer(serviceaffinity.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
// If there are n ServiceAffinity priorities in the policy, the weight for the corresponding
// score plugin is n*weight (note that the validation logic verifies that all ServiceAffinity
// priorities specified in Policy have the same weight).
weight := args.Weight * int32(len(args.ServiceAffinityArgs.AntiAffinityLabelsPreference))
plugins.Score = appendToPluginSet(plugins.Score, serviceaffinity.Name, &weight)
pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
return
})
// The following two features are the last ones to be supported as predicate/priority.
// Once they graduate to GA, there will be no more checking for featue gates here.
// Only register EvenPodsSpread predicate & priority if the feature is enabled
if utilfeature.DefaultFeatureGate.Enabled(features.EvenPodsSpread) {
klog.Infof("Registering EvenPodsSpread predicate and priority function")
registry.registerPredicateConfigProducer(predicates.EvenPodsSpreadPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, podtopologyspread.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil)
return
})
registry.DefaultPredicates.Insert(predicates.EvenPodsSpreadPred)
registry.registerPriorityConfigProducer(priorities.EvenPodsSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, podtopologyspread.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight)
return
})
registry.DefaultPriorities[priorities.EvenPodsSpreadPriority] = 1
}
// Prioritizes nodes that satisfy pod's resource limits
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
klog.Infof("Registering resourcelimits priority function")
registry.registerPriorityConfigProducer(priorities.ResourceLimitsPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, noderesources.ResourceLimitsName, nil)
plugins.Score = appendToPluginSet(plugins.Score, noderesources.ResourceLimitsName, &args.Weight)
return
})
registry.DefaultPriorities[priorities.ResourceLimitsPriority] = 1
}
return registry
}
// registers a config producer for a predicate.
func (lr *LegacyRegistry) registerPredicateConfigProducer(name string, producer ConfigProducer) {
if _, exist := lr.PredicateToConfigProducer[name]; exist {
klog.Fatalf("already registered %q", name)
}
lr.PredicateToConfigProducer[name] = producer
}
// registers a framework config producer for a priority.
func (lr *LegacyRegistry) registerPriorityConfigProducer(name string, producer ConfigProducer) {
if _, exist := lr.PriorityToConfigProducer[name]; exist {
klog.Fatalf("already registered %q", name)
}
lr.PriorityToConfigProducer[name] = producer
}
func appendToPluginSet(set *config.PluginSet, name string, weight *int32) *config.PluginSet {
if set == nil {
set = &config.PluginSet{}
}
cfg := config.Plugin{Name: name}
if weight != nil {
cfg.Weight = *weight
}
set.Enabled = append(set.Enabled, cfg)
return set
}
func makePluginConfig(pluginName string, args interface{}) config.PluginConfig {
encoding, err := json.Marshal(args)
if err != nil {
klog.Fatal(fmt.Errorf("failed to marshal %+v: %v", args, err))
return config.PluginConfig{}
}
config := config.PluginConfig{
Name: pluginName,
Args: runtime.Unknown{Raw: encoding},
}
return config
}
// ProcessPredicatePolicy given a PredicatePolicy, return the plugin name implementing the predicate and update
// the ConfigProducerArgs if necessary.
func (lr *LegacyRegistry) ProcessPredicatePolicy(policy config.PredicatePolicy, pluginArgs *ConfigProducerArgs) string {
validatePredicateOrDie(policy)
predicateName := policy.Name
if policy.Name == "PodFitsPorts" {
// For compatibility reasons, "PodFitsPorts" as a key is still supported.
predicateName = predicates.PodFitsHostPortsPred
}
if _, ok := lr.PredicateToConfigProducer[predicateName]; ok {
// checking to see if a pre-defined predicate is requested
klog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name)
return predicateName
}
if policy.Argument == nil || (policy.Argument.ServiceAffinity == nil &&
policy.Argument.LabelsPresence == nil) {
klog.Fatalf("Invalid configuration: Predicate type not found for %q", policy.Name)
}
// generate the predicate function, if a custom type is requested
if policy.Argument.ServiceAffinity != nil {
// map LabelsPresence policy to ConfigProducerArgs that's used to configure the ServiceAffinity plugin.
if pluginArgs.ServiceAffinityArgs == nil {
pluginArgs.ServiceAffinityArgs = &serviceaffinity.Args{}
}
pluginArgs.ServiceAffinityArgs.AffinityLabels = append(pluginArgs.ServiceAffinityArgs.AffinityLabels, policy.Argument.ServiceAffinity.Labels...)
// We use the ServiceAffinity predicate name for all ServiceAffinity custom predicates.
// It may get called multiple times but we essentially only register one instance of ServiceAffinity predicate.
// This name is then used to find the registered plugin and run the plugin instead of the predicate.
predicateName = predicates.CheckServiceAffinityPred
}
if policy.Argument.LabelsPresence != nil {
// Map LabelPresence policy to ConfigProducerArgs that's used to configure the NodeLabel plugin.
if pluginArgs.NodeLabelArgs == nil {
pluginArgs.NodeLabelArgs = &nodelabel.Args{}
}
if policy.Argument.LabelsPresence.Presence {
pluginArgs.NodeLabelArgs.PresentLabels = append(pluginArgs.NodeLabelArgs.PresentLabels, policy.Argument.LabelsPresence.Labels...)
} else {
pluginArgs.NodeLabelArgs.AbsentLabels = append(pluginArgs.NodeLabelArgs.AbsentLabels, policy.Argument.LabelsPresence.Labels...)
}
// We use the CheckNodeLabelPresencePred predicate name for all kNodeLabel custom predicates.
// It may get called multiple times but we essentially only register one instance of NodeLabel predicate.
// This name is then used to find the registered plugin and run the plugin instead of the predicate.
predicateName = predicates.CheckNodeLabelPresencePred
}
return predicateName
}
// ProcessPriorityPolicy given a PriorityPolicy, return the plugin name implementing the priority and update
// the ConfigProducerArgs if necessary.
func (lr *LegacyRegistry) ProcessPriorityPolicy(policy config.PriorityPolicy, configProducerArgs *ConfigProducerArgs) string {
validatePriorityOrDie(policy)
priorityName := policy.Name
if policy.Name == priorities.ServiceSpreadingPriority {
// For compatibility reasons, "ServiceSpreadingPriority" as a key is still supported.
priorityName = priorities.SelectorSpreadPriority
}
if _, ok := lr.PriorityToConfigProducer[priorityName]; ok {
klog.V(2).Infof("Priority type %s already registered, reusing.", priorityName)
return priorityName
}
// generate the priority function, if a custom priority is requested
if policy.Argument == nil ||
(policy.Argument.ServiceAntiAffinity == nil &&
policy.Argument.RequestedToCapacityRatioArguments == nil &&
policy.Argument.LabelPreference == nil) {
klog.Fatalf("Invalid configuration: Priority type not found for %q", priorityName)
}
if policy.Argument.ServiceAntiAffinity != nil {
// We use the ServiceAffinity plugin name for all ServiceAffinity custom priorities.
// It may get called multiple times but we essentially only register one instance of
// ServiceAffinity priority.
// This name is then used to find the registered plugin and run the plugin instead of the priority.
priorityName = serviceaffinity.Name
if configProducerArgs.ServiceAffinityArgs == nil {
configProducerArgs.ServiceAffinityArgs = &serviceaffinity.Args{}
}
configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference = append(
configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference,
policy.Argument.ServiceAntiAffinity.Label,
)
}
if policy.Argument.LabelPreference != nil {
// We use the NodeLabel plugin name for all NodeLabel custom priorities.
// It may get called multiple times but we essentially only register one instance of NodeLabel priority.
// This name is then used to find the registered plugin and run the plugin instead of the priority.
priorityName = nodelabel.Name
if configProducerArgs.NodeLabelArgs == nil {
configProducerArgs.NodeLabelArgs = &nodelabel.Args{}
}
if policy.Argument.LabelPreference.Presence {
configProducerArgs.NodeLabelArgs.PresentLabelsPreference = append(
configProducerArgs.NodeLabelArgs.PresentLabelsPreference,
policy.Argument.LabelPreference.Label,
)
} else {
configProducerArgs.NodeLabelArgs.AbsentLabelsPreference = append(
configProducerArgs.NodeLabelArgs.AbsentLabelsPreference,
policy.Argument.LabelPreference.Label,
)
}
}
if policy.Argument.RequestedToCapacityRatioArguments != nil {
scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments)
configProducerArgs.RequestedToCapacityRatioArgs = &noderesources.RequestedToCapacityRatioArgs{
FunctionShape: scoringFunctionShape,
ResourceToWeightMap: resources,
}
// We do not allow specifying the name for custom plugins, see #83472
priorityName = noderesources.RequestedToCapacityRatioName
}
return priorityName
}
// TODO(ahg-g): move to RequestedToCapacityRatio plugin.
func buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(arguments *config.RequestedToCapacityRatioArguments) (noderesources.FunctionShape, noderesources.ResourceToWeightMap) {
n := len(arguments.Shape)
points := make([]noderesources.FunctionShapePoint, 0, n)
for _, point := range arguments.Shape {
points = append(points, noderesources.FunctionShapePoint{
Utilization: int64(point.Utilization),
// MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by requested to capacity ratio to the score range
// used by the scheduler.
Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
})
}
shape, err := noderesources.NewFunctionShape(points)
if err != nil {
klog.Fatalf("invalid RequestedToCapacityRatioPriority arguments: %s", err.Error())
}
resourceToWeightMap := make(noderesources.ResourceToWeightMap)
if len(arguments.Resources) == 0 {
resourceToWeightMap = noderesources.DefaultRequestedRatioResources
return shape, resourceToWeightMap
}
for _, resource := range arguments.Resources {
resourceToWeightMap[v1.ResourceName(resource.Name)] = resource.Weight
if resource.Weight == 0 {
resourceToWeightMap[v1.ResourceName(resource.Name)] = 1
}
}
return shape, resourceToWeightMap
}
func validatePredicateOrDie(predicate config.PredicatePolicy) {
if predicate.Argument != nil {
numArgs := 0
if predicate.Argument.ServiceAffinity != nil {
numArgs++
}
if predicate.Argument.LabelsPresence != nil {
numArgs++
}
if numArgs != 1 {
klog.Fatalf("Exactly 1 predicate argument is required, numArgs: %v, Predicate: %s", numArgs, predicate.Name)
}
}
}
func validatePriorityOrDie(priority config.PriorityPolicy) {
if priority.Argument != nil {
numArgs := 0
if priority.Argument.ServiceAntiAffinity != nil {
numArgs++
}
if priority.Argument.LabelPreference != nil {
numArgs++
}
if priority.Argument.RequestedToCapacityRatioArguments != nil {
numArgs++
}
if numArgs != 1 {
klog.Fatalf("Exactly 1 priority argument is required, numArgs: %v, Priority: %s", numArgs, priority.Name)
}
}
}

View File

@ -22,29 +22,81 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
)
func TestBuildScoringFunctionShapeFromRequestedToCapacityRatioArguments(t *testing.T) {
arguments := schedulerapi.RequestedToCapacityRatioArguments{
Shape: []schedulerapi.UtilizationShapePoint{
{Utilization: 10, Score: 1},
{Utilization: 30, Score: 5},
{Utilization: 70, Score: 2},
},
Resources: []schedulerapi.ResourceSpec{
{Name: string(v1.ResourceCPU)},
{Name: string(v1.ResourceMemory)},
},
}
builtShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(&arguments)
expectedShape, _ := noderesources.NewFunctionShape([]noderesources.FunctionShapePoint{
{Utilization: 10, Score: 10},
{Utilization: 30, Score: 50},
{Utilization: 70, Score: 20},
})
expectedResources := noderesources.ResourceToWeightMap{
v1.ResourceCPU: 1,
v1.ResourceMemory: 1,
}
assert.Equal(t, expectedShape, builtShape)
assert.Equal(t, expectedResources, resources)
}
func TestBuildScoringFunctionShapeFromRequestedToCapacityRatioArgumentsNilResourceToWeightMap(t *testing.T) {
arguments := schedulerapi.RequestedToCapacityRatioArguments{
Shape: []schedulerapi.UtilizationShapePoint{
{Utilization: 10, Score: 1},
{Utilization: 30, Score: 5},
{Utilization: 70, Score: 2},
},
}
builtShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(&arguments)
expectedShape, _ := noderesources.NewFunctionShape([]noderesources.FunctionShapePoint{
{Utilization: 10, Score: 10},
{Utilization: 30, Score: 50},
{Utilization: 70, Score: 20},
})
expectedResources := noderesources.ResourceToWeightMap{
v1.ResourceCPU: 1,
v1.ResourceMemory: 1,
}
assert.Equal(t, expectedShape, builtShape)
assert.Equal(t, expectedResources, resources)
}
func produceConfig(keys []string, producersMap map[string]ConfigProducer, args ConfigProducerArgs) (*config.Plugins, []config.PluginConfig, error) {
var plugins config.Plugins
var pluginConfig []config.PluginConfig
for _, k := range keys {
producer, exist := producersMap[k]
p, exist := producersMap[k]
if !exist {
return nil, nil, fmt.Errorf("finding key %q", k)
}
p, pc := producer(args)
plugins.Append(&p)
pluginConfig = append(pluginConfig, pc...)
pl, plc := p(args)
plugins.Append(&pl)
pluginConfig = append(pluginConfig, plc...)
}
return &plugins, pluginConfig, nil
}
func TestRegisterConfigProducers(t *testing.T) {
registry := NewConfigProducerRegistry()
registry := NewLegacyRegistry()
testPredicateName1 := "testPredicate1"
testFilterName1 := "testFilter1"
registry.RegisterPredicate(testPredicateName1,
registry.registerPredicateConfigProducer(testPredicateName1,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, testFilterName1, nil)
return
@ -52,7 +104,7 @@ func TestRegisterConfigProducers(t *testing.T) {
testPredicateName2 := "testPredicate2"
testFilterName2 := "testFilter2"
registry.RegisterPredicate(testPredicateName2,
registry.registerPredicateConfigProducer(testPredicateName2,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, testFilterName2, nil)
return
@ -60,7 +112,7 @@ func TestRegisterConfigProducers(t *testing.T) {
testPriorityName1 := "testPriority1"
testScoreName1 := "testScore1"
registry.RegisterPriority(testPriorityName1,
registry.registerPriorityConfigProducer(testPriorityName1,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, testScoreName1, &args.Weight)
return
@ -68,7 +120,7 @@ func TestRegisterConfigProducers(t *testing.T) {
testPriorityName2 := "testPriority2"
testScoreName2 := "testScore2"
registry.RegisterPriority(testPriorityName2,
registry.registerPriorityConfigProducer(testPriorityName2,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, testScoreName2, &args.Weight)
return

View File

@ -17,14 +17,7 @@ limitations under the License.
package plugins
import (
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
@ -86,275 +79,3 @@ func NewInTreeRegistry(args *RegistryArgs) framework.Registry {
serviceaffinity.Name: serviceaffinity.New,
}
}
// ConfigProducerArgs contains arguments that are passed to the producer.
// As we add more predicates/priorities to framework plugins mappings, more arguments
// may be added here.
type ConfigProducerArgs struct {
// Weight used for priority functions.
Weight int32
// NodeLabelArgs is the args for the NodeLabel plugin.
NodeLabelArgs *nodelabel.Args
// RequestedToCapacityRatioArgs is the args for the RequestedToCapacityRatio plugin.
RequestedToCapacityRatioArgs *noderesources.RequestedToCapacityRatioArgs
// ServiceAffinityArgs is the args for the ServiceAffinity plugin.
ServiceAffinityArgs *serviceaffinity.Args
// NodeResourcesFitArgs is the args for the NodeResources fit filter.
NodeResourcesFitArgs *noderesources.FitArgs
// InterPodAffinityArgs is the args for InterPodAffinity plugin
InterPodAffinityArgs *interpodaffinity.Args
}
// ConfigProducer produces a framework's configuration.
type ConfigProducer func(args ConfigProducerArgs) (config.Plugins, []config.PluginConfig)
// ConfigProducerRegistry tracks mappings from predicates/priorities to framework config producers.
type ConfigProducerRegistry struct {
// maps that associate predicates/priorities with framework plugin configurations.
PredicateToConfigProducer map[string]ConfigProducer
PriorityToConfigProducer map[string]ConfigProducer
}
// NewConfigProducerRegistry creates a new producer registry.
func NewConfigProducerRegistry() *ConfigProducerRegistry {
registry := &ConfigProducerRegistry{
PredicateToConfigProducer: make(map[string]ConfigProducer),
PriorityToConfigProducer: make(map[string]ConfigProducer),
}
// Register Predicates.
registry.RegisterPredicate(predicates.GeneralPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
// GeneralPredicate is a combination of predicates.
plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
return
})
registry.RegisterPredicate(predicates.PodToleratesNodeTaintsPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, tainttoleration.Name, nil)
return
})
registry.RegisterPredicate(predicates.PodFitsResourcesPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
return
})
registry.RegisterPredicate(predicates.HostNamePred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
return
})
registry.RegisterPredicate(predicates.PodFitsHostPortsPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
return
})
registry.RegisterPredicate(predicates.MatchNodeSelectorPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
return
})
registry.RegisterPredicate(predicates.CheckNodeUnschedulablePred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeunschedulable.Name, nil)
return
})
registry.RegisterPredicate(predicates.CheckVolumeBindingPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil)
return
})
registry.RegisterPredicate(predicates.NoDiskConflictPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumerestrictions.Name, nil)
return
})
registry.RegisterPredicate(predicates.NoVolumeZoneConflictPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumezone.Name, nil)
return
})
registry.RegisterPredicate(predicates.MaxCSIVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CSIName, nil)
return
})
registry.RegisterPredicate(predicates.MaxEBSVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.EBSName, nil)
return
})
registry.RegisterPredicate(predicates.MaxGCEPDVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.GCEPDName, nil)
return
})
registry.RegisterPredicate(predicates.MaxAzureDiskVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.AzureDiskName, nil)
return
})
registry.RegisterPredicate(predicates.MaxCinderVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CinderName, nil)
return
})
registry.RegisterPredicate(predicates.MatchInterPodAffinityPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, interpodaffinity.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, interpodaffinity.Name, nil)
return
})
registry.RegisterPredicate(predicates.EvenPodsSpreadPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, podtopologyspread.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil)
return
})
registry.RegisterPredicate(predicates.CheckNodeLabelPresencePred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})
registry.RegisterPredicate(predicates.CheckServiceAffinityPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, serviceaffinity.Name, nil)
pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, serviceaffinity.Name, nil)
return
})
// Register Priorities.
registry.RegisterPriority(priorities.SelectorSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, defaultpodtopologyspread.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.TaintTolerationPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, tainttoleration.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, tainttoleration.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.NodeAffinityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodeaffinity.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.ImageLocalityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, imagelocality.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.InterPodAffinityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, interpodaffinity.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, interpodaffinity.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(interpodaffinity.Name, args.InterPodAffinityArgs))
return
})
registry.RegisterPriority(priorities.NodePreferAvoidPodsPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodepreferavoidpods.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.MostRequestedPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.MostAllocatedName, &args.Weight)
return
})
registry.RegisterPriority(priorities.BalancedResourceAllocation,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.BalancedAllocationName, &args.Weight)
return
})
registry.RegisterPriority(priorities.LeastRequestedPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.LeastAllocatedName, &args.Weight)
return
})
registry.RegisterPriority(priorities.EvenPodsSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, podtopologyspread.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight)
return
})
registry.RegisterPriority(noderesources.RequestedToCapacityRatioName,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.RequestedToCapacityRatioName, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.RequestedToCapacityRatioName, args.RequestedToCapacityRatioArgs))
return
})
registry.RegisterPriority(nodelabel.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodelabel.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})
registry.RegisterPriority(serviceaffinity.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, serviceaffinity.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
return
})
registry.RegisterPriority(priorities.ResourceLimitsPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, noderesources.ResourceLimitsName, nil)
plugins.Score = appendToPluginSet(plugins.Score, noderesources.ResourceLimitsName, &args.Weight)
return
})
return registry
}
func registerProducer(name string, producer ConfigProducer, producersMap map[string]ConfigProducer) error {
if _, exist := producersMap[name]; exist {
return fmt.Errorf("already registered %q", name)
}
producersMap[name] = producer
return nil
}
// RegisterPredicate registers a config producer for a predicate.
func (f *ConfigProducerRegistry) RegisterPredicate(name string, producer ConfigProducer) error {
return registerProducer(name, producer, f.PredicateToConfigProducer)
}
// RegisterPriority registers a framework config producer for a priority.
func (f *ConfigProducerRegistry) RegisterPriority(name string, producer ConfigProducer) error {
return registerProducer(name, producer, f.PriorityToConfigProducer)
}
func appendToPluginSet(set *config.PluginSet, name string, weight *int32) *config.PluginSet {
if set == nil {
set = &config.PluginSet{}
}
cfg := config.Plugin{Name: name}
if weight != nil {
cfg.Weight = *weight
}
set.Enabled = append(set.Enabled, cfg)
return set
}
func makePluginConfig(pluginName string, args interface{}) config.PluginConfig {
encoding, err := json.Marshal(args)
if err != nil {
klog.Fatal(fmt.Errorf("Failed to marshal %+v: %v", args, err))
return config.PluginConfig{}
}
config := config.PluginConfig{
Name: pluginName,
Args: runtime.Unknown{Raw: encoding},
}
return config
}

View File

@ -403,8 +403,7 @@ func TestServiceAffinityScore(t *testing.T) {
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(rcs),
fakelisters.ReplicaSetLister(rss),
fakelisters.StatefulSetLister(sss),
1)
fakelisters.StatefulSetLister(sss))
metaData := metaDataProducer(test.pod, nodes, snapshot)
state := framework.NewCycleState()
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: metaData})

View File

@ -24,8 +24,6 @@ import (
"os"
"time"
"k8s.io/klog"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -36,6 +34,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/klog"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
kubefeatures "k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -121,10 +120,6 @@ type Scheduler struct {
SchedulingQueue internalqueue.SchedulingQueue
scheduledPodsHasSynced func() bool
// The final configuration of the framework.
Plugins schedulerapi.Plugins
PluginConfig []schedulerapi.PluginConfig
}
// Cache returns the cache in scheduler for test to check the data in scheduler.
@ -141,13 +136,11 @@ type schedulerOptions struct {
bindTimeoutSeconds int64
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
// Contains all in-tree plugins.
frameworkInTreeRegistry framework.Registry
// This registry contains out of tree plugins to be merged with the in-tree registry.
frameworkOutOfTreeRegistry framework.Registry
frameworkConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry
frameworkPlugins *schedulerapi.Plugins
frameworkPluginConfig []schedulerapi.PluginConfig
// Contains out-of-tree plugins to be merged with the in-tree registry.
frameworkOutOfTreeRegistry framework.Registry
// Plugins and PluginConfig set from ComponentConfig.
frameworkPlugins *schedulerapi.Plugins
frameworkPluginConfig []schedulerapi.PluginConfig
}
// Option configures a Scheduler
@ -195,13 +188,6 @@ func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option {
}
}
// WithFrameworkInTreeRegistry sets the framework's in-tree registry. This is only used in integration tests.
func WithFrameworkInTreeRegistry(registry framework.Registry) Option {
return func(o *schedulerOptions) {
o.frameworkInTreeRegistry = registry
}
}
// WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
// will be appended to the default registry.
func WithFrameworkOutOfTreeRegistry(registry framework.Registry) Option {
@ -210,13 +196,6 @@ func WithFrameworkOutOfTreeRegistry(registry framework.Registry) Option {
}
}
// WithFrameworkConfigProducerRegistry sets the framework plugin producer registry.
func WithFrameworkConfigProducerRegistry(registry *frameworkplugins.ConfigProducerRegistry) Option {
return func(o *schedulerOptions) {
o.frameworkConfigProducerRegistry = registry
}
}
// WithFrameworkPlugins sets the plugins that the framework should be configured with.
func WithFrameworkPlugins(plugins *schedulerapi.Plugins) Option {
return func(o *schedulerOptions) {
@ -250,22 +229,12 @@ var defaultSchedulerOptions = schedulerOptions{
schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
Provider: defaultAlgorithmSourceProviderName(),
},
hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
frameworkConfigProducerRegistry: frameworkplugins.NewConfigProducerRegistry(),
// The plugins and pluginConfig options are currently nil because we currently don't have
// "default" plugins. All plugins that we run through the framework currently come from two
// sources: 1) specified in component config, in which case those two options should be
// set using their corresponding With* functions, 2) predicate/priority-mapped plugins, which
// pluginConfigProducerRegistry contains a mapping for and produces their configurations.
// TODO(ahg-g) Once predicates and priorities are migrated to natively run as plugins, the
// below two parameters will be populated accordingly.
frameworkPlugins: nil,
frameworkPluginConfig: nil,
hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
}
// New returns a Scheduler
@ -297,12 +266,9 @@ func New(client clientset.Interface,
time.Duration(options.bindTimeoutSeconds)*time.Second,
)
registry := options.frameworkInTreeRegistry
if registry == nil {
registry = frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{
VolumeBinder: volumeBinder,
})
}
registry := frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{
VolumeBinder: volumeBinder,
})
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
@ -326,15 +292,7 @@ func New(client clientset.Interface,
registry: registry,
plugins: options.frameworkPlugins,
pluginConfig: options.frameworkPluginConfig,
pluginConfigProducerRegistry: options.frameworkConfigProducerRegistry,
nodeInfoSnapshot: snapshot,
algorithmFactoryArgs: AlgorithmFactoryArgs{
SharedLister: snapshot,
InformerFactory: informerFactory,
VolumeBinder: volumeBinder,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
},
configProducerArgs: &frameworkplugins.ConfigProducerArgs{},
}
var sched *Scheduler
@ -342,7 +300,7 @@ func New(client clientset.Interface,
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.
sc, err := configurator.CreateFromProvider(*source.Provider)
sc, err := configurator.createFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
@ -360,7 +318,7 @@ func New(client clientset.Interface,
return nil, err
}
}
sc, err := configurator.CreateFromConfig(*policy)
sc, err := configurator.createFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}

View File

@ -28,7 +28,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -879,8 +878,6 @@ func TestInterPodAffinity(t *testing.T) {
// TestEvenPodsSpreadPredicate verifies that EvenPodsSpread predicate functions well.
func TestEvenPodsSpreadPredicate(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EvenPodsSpread, true)()
// Apply feature gates to enable EvenPodsSpread
defer algorithmprovider.ApplyFeatureGates()()
context := initTest(t, "eps-predicate")
cs := context.clientSet

View File

@ -28,7 +28,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -247,8 +246,6 @@ func makeContainersWithImages(images []string) []v1.Container {
// TestEvenPodsSpreadPriority verifies that EvenPodsSpread priority functions well.
func TestEvenPodsSpreadPriority(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EvenPodsSpread, true)()
// Apply feature gates to enable EvenPodsSpread
defer algorithmprovider.ApplyFeatureGates()()
context := initTest(t, "eps-priority")
cs := context.clientSet

View File

@ -37,7 +37,6 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller/nodelifecycle"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/plugin/pkg/admission/defaulttolerationseconds"
"k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction"
pluginapi "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction"
@ -83,9 +82,6 @@ func TestTaintNodeByCondition(t *testing.T) {
admission.SetExternalKubeClientSet(externalClientset)
admission.SetExternalKubeInformerFactory(externalInformers)
// Apply feature gates to enable TaintNodesByCondition
defer algorithmprovider.ApplyFeatureGates()()
context = initTestScheduler(t, context, false, nil)
defer cleanupTest(t, context)
@ -648,8 +644,6 @@ func TestTaintBasedEvictions(t *testing.T) {
// Enable TaintBasedEvictions
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TaintBasedEvictions, true)()
// ApplyFeatureGates() is called to ensure TaintNodesByCondition related logic is applied/restored properly.
defer algorithmprovider.ApplyFeatureGates()()
// Build admission chain handler.
podTolerations := podtolerationrestriction.NewPodTolerationsPlugin(&pluginapi.Configuration{})