remove legacy scheduler policy config, as well as associated flags policy-config-file, policy-configmap, policy-configmap-namespace and use-legacy-policy-config

Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
kerthcet
2021-10-08 23:57:49 +08:00
parent 1123a7041e
commit a6f695581b
24 changed files with 47 additions and 1669 deletions

View File

@@ -1,286 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/scheduler/profile"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
)
type testCase struct {
name string
JSON string
featureGates map[featuregate.Feature]bool
wantPlugins config.Plugins
wantExtenders []config.Extender
}
func TestPolicyCompatibility(t *testing.T) {
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
testcases := []testCase{
// This is a special test for the "composite" predicate "GeneralPredicate". GeneralPredicate is a combination
// of predicates, and here we test that if given, it is mapped to the set of plugins that should be executed.
{
name: "GeneralPredicate",
JSON: `{
"kind": "Policy",
"apiVersion": "v1",
"predicates": [
{"name": "GeneralPredicates"}
],
"priorities": [
]
}`,
wantPlugins: config.Plugins{
QueueSort: config.PluginSet{Enabled: []config.Plugin{{Name: "PrioritySort"}}},
PreFilter: config.PluginSet{Enabled: []config.Plugin{
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
}},
Filter: config.PluginSet{Enabled: []config.Plugin{
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "TaintToleration"},
}},
PostFilter: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultPreemption"}}},
Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}},
},
},
// This is a special test for the case where a policy is specified without specifying any filters.
{
name: "default config",
JSON: `{
"kind": "Policy",
"apiVersion": "v1",
"predicates": [
],
"priorities": [
]
}`,
wantPlugins: config.Plugins{
QueueSort: config.PluginSet{Enabled: []config.Plugin{{Name: "PrioritySort"}}},
Filter: config.PluginSet{Enabled: []config.Plugin{
{Name: "NodeUnschedulable"},
{Name: "TaintToleration"},
}},
PostFilter: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultPreemption"}}},
Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}},
},
},
{
name: "all predicates and priorities",
JSON: `{
"kind": "Policy",
"apiVersion": "v1",
"predicates": [
{"name": "MatchNodeSelector"},
{"name": "PodFitsResources"},
{"name": "PodFitsHostPorts"},
{"name": "HostName"},
{"name": "NoDiskConflict"},
{"name": "NoVolumeZoneConflict"},
{"name": "PodToleratesNodeTaints"},
{"name": "MaxEBSVolumeCount"},
{"name": "MaxGCEPDVolumeCount"},
{"name": "MaxAzureDiskVolumeCount"},
{"name": "MaxCSIVolumeCountPred"},
{"name": "MaxCinderVolumeCount"},
{"name": "MatchInterPodAffinity"},
{"name": "CheckVolumeBinding"},
{"name": "TestServiceAffinity", "argument": {"serviceAffinity" : {"labels" : ["region"]}}},
{"name": "TestLabelsPresence", "argument": {"labelsPresence" : {"labels" : ["foo"], "presence":true}}}
],"priorities": [
{"name": "EqualPriority", "weight": 2},
{"name": "ImageLocalityPriority", "weight": 2},
{"name": "LeastRequestedPriority", "weight": 2},
{"name": "BalancedResourceAllocation", "weight": 2},
{"name": "SelectorSpreadPriority", "weight": 2},
{"name": "NodePreferAvoidPodsPriority", "weight": 2},
{"name": "NodeAffinityPriority", "weight": 2},
{"name": "TaintTolerationPriority", "weight": 2},
{"name": "InterPodAffinityPriority", "weight": 2},
{"name": "MostRequestedPriority", "weight": 2},
{
"name": "RequestedToCapacityRatioPriority",
"weight": 2,
"argument": {
"requestedToCapacityRatioArguments": {
"shape": [
{"utilization": 0, "score": 0},
{"utilization": 50, "score": 7}
],
"resources": [
{"name": "intel.com/foo", "weight": 3},
{"name": "intel.com/bar", "weight": 5}
]
}
}}
],"extenders": [{
"urlPrefix": "/prefix",
"filterVerb": "filter",
"prioritizeVerb": "prioritize",
"weight": 1,
"bindVerb": "bind",
"enableHttps": true,
"tlsConfig": {"Insecure":true},
"httpTimeout": 1,
"nodeCacheCapable": true,
"managedResources": [{"name":"example.com/foo","ignoredByScheduler":true}],
"ignorable":true
}]
}`,
wantPlugins: config.Plugins{
QueueSort: config.PluginSet{Enabled: []config.Plugin{{Name: "PrioritySort"}}},
PreFilter: config.PluginSet{Enabled: []config.Plugin{
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "NodeResourcesFit"},
{Name: "ServiceAffinity"},
{Name: "VolumeBinding"},
{Name: "InterPodAffinity"},
}},
Filter: config.PluginSet{Enabled: []config.Plugin{
{Name: "NodeUnschedulable"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "CinderLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
}},
PostFilter: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultPreemption"}}},
PreScore: config.PluginSet{Enabled: []config.Plugin{
{Name: "InterPodAffinity"},
{Name: "NodeAffinity"},
{Name: "PodTopologySpread"},
{Name: "TaintToleration"},
}},
Score: config.PluginSet{Enabled: []config.Plugin{
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
{Name: "InterPodAffinity", Weight: 2},
{Name: "NodeResourcesLeastAllocated", Weight: 2},
{Name: "NodeResourcesMostAllocated", Weight: 2},
{Name: "NodeAffinity", Weight: 2},
{Name: "NodePreferAvoidPods", Weight: 2},
{Name: "RequestedToCapacityRatio", Weight: 2},
{Name: "PodTopologySpread", Weight: 2},
{Name: "TaintToleration", Weight: 2},
}},
Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}},
Reserve: config.PluginSet{Enabled: []config.Plugin{{Name: "VolumeBinding"}}},
PreBind: config.PluginSet{Enabled: []config.Plugin{{Name: "VolumeBinding"}}},
},
wantExtenders: []config.Extender{{
URLPrefix: "/prefix",
FilterVerb: "filter",
PrioritizeVerb: "prioritize",
Weight: 1,
BindVerb: "bind", // 1.11 restored case-sensitivity, but allowed either "BindVerb" or "bindVerb"
EnableHTTPS: true,
TLSConfig: &config.ExtenderTLSConfig{Insecure: true},
HTTPTimeout: metav1.Duration{Duration: 1},
NodeCacheCapable: true,
ManagedResources: []config.ExtenderManagedResource{{Name: "example.com/foo", IgnoredByScheduler: true}},
Ignorable: true,
}},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
for feature, value := range tc.featureGates {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, value)()
}
policyConfigMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: "scheduler-custom-policy-config"},
Data: map[string]string{config.SchedulerPolicyConfigMapKey: tc.JSON},
}
client := fake.NewSimpleClientset(&policyConfigMap)
informerFactory := informers.NewSharedInformerFactory(client, 0)
recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}))
sched, err := scheduler.New(
client,
informerFactory,
nil,
recorderFactory,
make(chan struct{}),
scheduler.WithProfiles([]config.KubeSchedulerProfile(nil)...),
scheduler.WithLegacyPolicySource(&config.SchedulerPolicySource{
ConfigMap: &config.SchedulerPolicyConfigMapSource{
Namespace: policyConfigMap.Namespace,
Name: policyConfigMap.Name,
},
}),
)
if err != nil {
t.Fatalf("Error constructing: %v", err)
}
defProf := sched.Profiles["default-scheduler"]
gotPlugins := defProf.ListPlugins()
if diff := cmp.Diff(&tc.wantPlugins, gotPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
gotExtenders := sched.Extenders
var wantExtenders []*scheduler.HTTPExtender
for _, e := range tc.wantExtenders {
extender, err := scheduler.NewHTTPExtender(&e)
if err != nil {
t.Errorf("Error transforming extender: %+v", e)
}
wantExtenders = append(wantExtenders, extender.(*scheduler.HTTPExtender))
}
for i := range gotExtenders {
if !scheduler.Equal(wantExtenders[i], gotExtenders[i].(*scheduler.HTTPExtender)) {
t.Errorf("Got extender #%d %+v, want %+v", i, gotExtenders[i], wantExtenders[i])
}
}
})
}
}

View File

@@ -121,31 +121,6 @@ type KubeSchedulerProfile struct {
PluginConfig []PluginConfig
}
// SchedulerPolicySource configures a means to obtain a scheduler Policy. One
// source field must be specified, and source fields are mutually exclusive.
type SchedulerPolicySource struct {
// File is a file policy source.
File *SchedulerPolicyFileSource
// ConfigMap is a config map policy source.
ConfigMap *SchedulerPolicyConfigMapSource
}
// SchedulerPolicyFileSource is a policy serialized to disk and accessed via
// path.
type SchedulerPolicyFileSource struct {
// Path is the location of a serialized policy.
Path string
}
// SchedulerPolicyConfigMapSource is a policy serialized into a config map value
// under the SchedulerPolicyConfigMapKey key.
type SchedulerPolicyConfigMapSource struct {
// Namespace is the namespace of the policy config map.
Namespace string
// Name is the name of the policy config map.
Name string
}
// Plugins include multiple extension points. When specified, the list of plugins for
// a particular extension point are the only ones enabled. If an extension point is
// omitted from the config, then the default set of plugins is used for that extension point.

View File

@@ -825,64 +825,6 @@ func (in *ResourceSpec) DeepCopy() *ResourceSpec {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SchedulerPolicyConfigMapSource) DeepCopyInto(out *SchedulerPolicyConfigMapSource) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulerPolicyConfigMapSource.
func (in *SchedulerPolicyConfigMapSource) DeepCopy() *SchedulerPolicyConfigMapSource {
if in == nil {
return nil
}
out := new(SchedulerPolicyConfigMapSource)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SchedulerPolicyFileSource) DeepCopyInto(out *SchedulerPolicyFileSource) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulerPolicyFileSource.
func (in *SchedulerPolicyFileSource) DeepCopy() *SchedulerPolicyFileSource {
if in == nil {
return nil
}
out := new(SchedulerPolicyFileSource)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SchedulerPolicySource) DeepCopyInto(out *SchedulerPolicySource) {
*out = *in
if in.File != nil {
in, out := &in.File, &out.File
*out = new(SchedulerPolicyFileSource)
**out = **in
}
if in.ConfigMap != nil {
in, out := &in.ConfigMap, &out.ConfigMap
*out = new(SchedulerPolicyConfigMapSource)
**out = **in
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulerPolicySource.
func (in *SchedulerPolicySource) DeepCopy() *SchedulerPolicySource {
if in == nil {
return nil
}
out := new(SchedulerPolicySource)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ScoringStrategy) DeepCopyInto(out *ScoringStrategy) {
*out = *in

View File

@@ -22,11 +22,9 @@ import (
"fmt"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
@@ -34,14 +32,8 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta2"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
@@ -205,170 +197,6 @@ func (c *Configurator) create() (*Scheduler, error) {
}, nil
}
// createFromPolicy creates a scheduler from the legacy policy file.
func (c *Configurator) createFromPolicy(policy schedulerapi.Policy) (*Scheduler, error) {
lr := frameworkplugins.NewLegacyRegistry()
args := &frameworkplugins.ConfigProducerArgs{}
klog.V(2).InfoS("Creating scheduler from configuration", "policy", policy)
// validate the policy configuration
if err := validation.ValidatePolicy(policy); err != nil {
return nil, err
}
// If profiles is already set, it means the user is using both CC and policy config, error out
// since these configs are no longer merged and they should not be used simultaneously.
if c.profiles != nil {
return nil, fmt.Errorf("profiles and policy config both set, this should not happen")
}
predicateKeys := sets.NewString()
if policy.Predicates == nil {
predicateKeys = lr.DefaultPredicates
} else {
for _, predicate := range policy.Predicates {
klog.V(2).InfoS("Registering predicate", "predicate", predicate.Name)
predicateName, err := lr.ProcessPredicatePolicy(predicate, args)
if err != nil {
return nil, err
}
predicateKeys.Insert(predicateName)
}
}
priorityKeys := make(map[string]int64)
if policy.Priorities == nil {
klog.V(2).InfoS("Using default priorities")
priorityKeys = lr.DefaultPriorities
} else {
for _, priority := range policy.Priorities {
if priority.Name == frameworkplugins.EqualPriority {
klog.V(2).InfoS("Skip registering priority", "priority", priority.Name)
continue
}
klog.V(2).InfoS("Registering priority", "priority", priority.Name)
priorityName, err := lr.ProcessPriorityPolicy(priority, args)
if err != nil {
return nil, err
}
priorityKeys[priorityName] = priority.Weight
}
}
// HardPodAffinitySymmetricWeight in the policy config takes precedence over
// CLI configuration.
if policy.HardPodAffinitySymmetricWeight != 0 {
args.InterPodAffinityArgs = &schedulerapi.InterPodAffinityArgs{
HardPodAffinityWeight: policy.HardPodAffinitySymmetricWeight,
}
}
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}
klog.V(2).InfoS("Creating scheduler", "predicates", predicateKeys, "priorities", priorityKeys)
// Combine all framework configurations. If this results in any duplication, framework
// instantiation should fail.
// "PrioritySort", "DefaultPreemption" and "DefaultBinder" were neither predicates nor priorities
// before. We add them by default.
plugins := schedulerapi.Plugins{
QueueSort: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{{Name: queuesort.Name}},
},
PostFilter: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{{Name: defaultpreemption.Name}},
},
Bind: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{{Name: defaultbinder.Name}},
},
}
var pluginConfig []schedulerapi.PluginConfig
var err error
if plugins, pluginConfig, err = lr.AppendPredicateConfigs(predicateKeys, args, plugins, pluginConfig); err != nil {
return nil, err
}
if plugins, pluginConfig, err = lr.AppendPriorityConfigs(priorityKeys, args, plugins, pluginConfig); err != nil {
return nil, err
}
if pluginConfig, err = dedupPluginConfigs(pluginConfig); err != nil {
return nil, err
}
c.profiles = []schedulerapi.KubeSchedulerProfile{
{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &plugins,
PluginConfig: pluginConfig,
},
}
if err := defaultPluginConfigArgs(&c.profiles[0]); err != nil {
return nil, err
}
return c.create()
}
func defaultPluginConfigArgs(prof *schedulerapi.KubeSchedulerProfile) error {
scheme := v1beta2.GetPluginArgConversionScheme()
existingConfigs := sets.NewString()
for j := range prof.PluginConfig {
existingConfigs.Insert(prof.PluginConfig[j].Name)
// For existing plugin configs, we don't apply any defaulting, the assumption
// is that the legacy registry does it already.
}
// Append default configs for plugins that didn't have one explicitly set.
for _, name := range prof.Plugins.Names() {
if existingConfigs.Has(name) {
continue
}
gvk := v1beta2.SchemeGroupVersion.WithKind(name + "Args")
args, err := scheme.New(gvk)
if err != nil {
if runtime.IsNotRegisteredError(err) {
// This plugin is out-of-tree or doesn't require configuration.
continue
}
return err
}
scheme.Default(args)
internalArgs, err := scheme.ConvertToVersion(args, schedulerapi.SchemeGroupVersion)
if err != nil {
return fmt.Errorf("converting %q into internal type: %w", gvk.Kind, err)
}
prof.PluginConfig = append(prof.PluginConfig, schedulerapi.PluginConfig{
Name: name,
Args: internalArgs,
})
}
return nil
}
// dedupPluginConfigs removes duplicates from pluginConfig, ensuring that,
// if a plugin name is repeated, the arguments are the same.
func dedupPluginConfigs(pc []schedulerapi.PluginConfig) ([]schedulerapi.PluginConfig, error) {
args := make(map[string]runtime.Object)
result := make([]schedulerapi.PluginConfig, 0, len(pc))
for _, c := range pc {
if v, found := args[c.Name]; !found {
result = append(result, c)
args[c.Name] = c.Args
} else if !cmp.Equal(v, c.Args) {
// This should be unreachable.
return nil, fmt.Errorf("inconsistent configuration produced for plugin %s", c.Name)
}
}
return result, nil
}
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
return func(podInfo *framework.QueuedPodInfo, err error) {

View File

@@ -20,7 +20,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
@@ -38,14 +37,6 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@@ -69,387 +60,6 @@ func TestCreate(t *testing.T) {
}
}
func createPolicySource(configData []byte, clientSet clientset.Interface) *schedulerapi.SchedulerPolicySource {
configPolicyName := "scheduler-custom-policy-config"
policyConfigMap := v1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: configPolicyName},
Data: map[string]string{schedulerapi.SchedulerPolicyConfigMapKey: string(configData)},
}
clientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(context.TODO(), &policyConfigMap, metav1.CreateOptions{})
return &schedulerapi.SchedulerPolicySource{
ConfigMap: &schedulerapi.SchedulerPolicyConfigMapSource{
Namespace: policyConfigMap.Namespace,
Name: policyConfigMap.Name,
},
}
}
// TestCreateFromConfig configures a scheduler from policies defined in a configMap.
// It combines some configurable predicate/priorities with some pre-defined ones
func TestCreateFromConfig(t *testing.T) {
testcases := []struct {
name string
configData []byte
wantPluginConfig []schedulerapi.PluginConfig
wantPlugins *schedulerapi.Plugins
wantErr string
}{
{
name: "policy with unspecified predicates or priorities uses default",
configData: []byte(`{
"kind" : "Policy",
"apiVersion" : "v1"
}`),
wantPluginConfig: []schedulerapi.PluginConfig{
{
Name: defaultpreemption.Name,
Args: &schedulerapi.DefaultPreemptionArgs{
MinCandidateNodesPercentage: 10,
MinCandidateNodesAbsolute: 100,
},
},
{
Name: interpodaffinity.Name,
Args: &schedulerapi.InterPodAffinityArgs{
HardPodAffinityWeight: 1,
},
},
{
Name: nodeaffinity.Name,
Args: &schedulerapi.NodeAffinityArgs{},
},
{
Name: noderesources.BalancedAllocationName,
Args: &schedulerapi.NodeResourcesBalancedAllocationArgs{
Resources: []schedulerapi.ResourceSpec{{Name: "cpu", Weight: 1}, {Name: "memory", Weight: 1}},
},
},
{
Name: noderesources.FitName,
Args: &schedulerapi.NodeResourcesFitArgs{
ScoringStrategy: &schedulerapi.ScoringStrategy{
Type: schedulerapi.LeastAllocated,
Resources: []schedulerapi.ResourceSpec{
{Name: "cpu", Weight: 1},
{Name: "memory", Weight: 1},
},
},
},
},
{
Name: noderesources.LeastAllocatedName,
Args: &schedulerapi.NodeResourcesLeastAllocatedArgs{
Resources: []schedulerapi.ResourceSpec{
{Name: "cpu", Weight: 1},
{Name: "memory", Weight: 1},
},
},
},
{
Name: podtopologyspread.Name,
Args: &schedulerapi.PodTopologySpreadArgs{DefaultingType: schedulerapi.SystemDefaulting},
},
{
Name: volumebinding.Name,
Args: &schedulerapi.VolumeBindingArgs{
BindTimeoutSeconds: 600,
},
},
},
wantPlugins: &schedulerapi.Plugins{
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
PreFilter: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeBinding"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
},
},
Filter: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
},
},
PostFilter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultPreemption"}}},
PreScore: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: "PodTopologySpread"},
{Name: "InterPodAffinity"},
{Name: "NodeAffinity"},
{Name: "TaintToleration"},
},
},
Score: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "PodTopologySpread", Weight: 2},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "TaintToleration", Weight: 1},
},
},
Reserve: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "VolumeBinding"}}},
PreBind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "VolumeBinding"}}},
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
},
},
{
name: "policy with arguments",
configData: []byte(`{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["zone"]}}},
{"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["foo"]}}},
{"name" : "TestRequireZone", "argument" : {"labelsPresence" : {"labels" : ["zone"], "presence" : true}}},
{"name" : "TestNoFooLabel", "argument" : {"labelsPresence" : {"labels" : ["foo"], "presence" : false}}}
],
"priorities" : [
{"name" : "RackSpread", "weight" : 3, "argument" : {"serviceAntiAffinity" : {"label" : "rack"}}},
{"name" : "ZoneSpread", "weight" : 3, "argument" : {"serviceAntiAffinity" : {"label" : "zone"}}},
{
"name": "RequestedToCapacityRatioPriority",
"weight": 2,
"argument": {
"requestedToCapacityRatioArguments": {
"shape": [
{"utilization": 0, "score": 0},
{"utilization": 50, "score": 7}
]
}
}
},
{"name" : "LabelPreference1", "weight" : 3, "argument" : {"labelPreference" : {"label" : "l1", "presence": true}}},
{"name" : "LabelPreference2", "weight" : 3, "argument" : {"labelPreference" : {"label" : "l2", "presence": false}}},
{"name" : "NodeAffinityPriority", "weight" : 2},
{"name" : "InterPodAffinityPriority", "weight" : 1}
]
}`),
wantPluginConfig: []schedulerapi.PluginConfig{
{
Name: defaultpreemption.Name,
Args: &schedulerapi.DefaultPreemptionArgs{
MinCandidateNodesPercentage: 10,
MinCandidateNodesAbsolute: 100,
},
},
{
Name: interpodaffinity.Name,
Args: &schedulerapi.InterPodAffinityArgs{
HardPodAffinityWeight: 1,
},
},
{
Name: nodeaffinity.Name,
Args: &schedulerapi.NodeAffinityArgs{},
},
{
Name: nodelabel.Name,
Args: &schedulerapi.NodeLabelArgs{
PresentLabels: []string{"zone"},
AbsentLabels: []string{"foo"},
PresentLabelsPreference: []string{"l1"},
AbsentLabelsPreference: []string{"l2"},
},
},
{
Name: noderesources.RequestedToCapacityRatioName,
Args: &schedulerapi.RequestedToCapacityRatioArgs{
Shape: []schedulerapi.UtilizationShapePoint{
{Utilization: 0, Score: 0},
{Utilization: 50, Score: 7},
},
Resources: []schedulerapi.ResourceSpec{},
},
},
{
Name: serviceaffinity.Name,
Args: &schedulerapi.ServiceAffinityArgs{
AffinityLabels: []string{"zone", "foo"},
AntiAffinityLabelsPreference: []string{"rack", "zone"},
},
},
},
wantPlugins: &schedulerapi.Plugins{
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
PreFilter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "ServiceAffinity"}}},
Filter: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: "NodeUnschedulable"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
},
},
PostFilter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultPreemption"}}},
PreScore: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: "InterPodAffinity"},
{Name: "NodeAffinity"},
},
},
Score: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeAffinity", Weight: 2},
{Name: "NodeLabel", Weight: 6},
{Name: "RequestedToCapacityRatio", Weight: 2},
{Name: "ServiceAffinity", Weight: 6},
},
},
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
},
},
{
name: "policy with HardPodAffinitySymmetricWeight argument",
configData: []byte(`{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsResources"},
{"name" : "PodFitsHostPorts"}
],
"priorities" : [
{"name" : "InterPodAffinityPriority", "weight" : 1}
],
"hardPodAffinitySymmetricWeight" : 10
}`),
wantPluginConfig: []schedulerapi.PluginConfig{
{
Name: defaultpreemption.Name,
Args: &schedulerapi.DefaultPreemptionArgs{
MinCandidateNodesPercentage: 10,
MinCandidateNodesAbsolute: 100,
},
},
{
Name: interpodaffinity.Name,
Args: &schedulerapi.InterPodAffinityArgs{
HardPodAffinityWeight: 10,
},
},
{
Name: "NodeResourcesFit",
Args: &schedulerapi.NodeResourcesFitArgs{
ScoringStrategy: &schedulerapi.ScoringStrategy{
Type: schedulerapi.LeastAllocated,
Resources: []schedulerapi.ResourceSpec{
{Name: "cpu", Weight: 1},
{Name: "memory", Weight: 1},
},
},
},
},
},
wantPlugins: &schedulerapi.Plugins{
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
PreFilter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
}},
Filter: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: "NodeUnschedulable"},
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
{Name: "TaintToleration"},
},
},
PostFilter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultPreemption"}}},
PreScore: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: "InterPodAffinity"},
},
},
Score: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: "InterPodAffinity", Weight: 1},
},
},
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
},
},
{
name: "policy with invalid arguments",
configData: []byte(`{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["zone"]}}}
],
"priorities" : [
{"name": "RequestedToCapacityRatioPriority", "weight": 2},
{"name" : "NodeAffinityPriority", "weight" : 10},
{"name" : "InterPodAffinityPriority", "weight" : 100}
]
}`),
wantErr: `couldn't create scheduler from policy: priority type not found for "RequestedToCapacityRatioPriority"`,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}))
_, err := New(
client,
informerFactory,
nil,
recorderFactory,
make(chan struct{}),
WithProfiles([]schedulerapi.KubeSchedulerProfile(nil)...),
WithLegacyPolicySource(createPolicySource(tc.configData, client)),
WithBuildFrameworkCapturer(func(p schedulerapi.KubeSchedulerProfile) {
if p.SchedulerName != v1.DefaultSchedulerName {
t.Errorf("unexpected scheduler name: want %q, got %q", v1.DefaultSchedulerName, p.SchedulerName)
}
if diff := cmp.Diff(tc.wantPluginConfig, p.PluginConfig); diff != "" {
t.Errorf("unexpected plugins config diff (-want, +got): %s", diff)
}
if diff := cmp.Diff(tc.wantPlugins, p.Plugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
}),
)
if err != nil {
if !strings.Contains(err.Error(), tc.wantErr) {
t.Fatalf("Unexpected error, got %v, expect: %s", err, tc.wantErr)
}
}
})
}
}
func TestDefaultErrorFunc(t *testing.T) {
testPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}
testPodUpdated := testPod.DeepCopy()

View File

@@ -19,15 +19,12 @@ package scheduler
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"os"
"strconv"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@@ -98,7 +95,6 @@ type Scheduler struct {
type schedulerOptions struct {
componentConfigVersion string
kubeConfig *restclient.Config
legacyPolicySource *schedulerapi.SchedulerPolicySource
percentageOfNodesToScore int32
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
@@ -147,13 +143,6 @@ func WithParallelism(threads int32) Option {
}
}
// WithLegacyPolicySource sets legacy policy config file source.
func WithLegacyPolicySource(source *schedulerapi.SchedulerPolicySource) Option {
return func(o *schedulerOptions) {
o.legacyPolicySource = source
}
}
// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler, the default value is 50
func WithPercentageOfNodesToScore(percentageOfNodesToScore int32) Option {
return func(o *schedulerOptions) {
@@ -271,36 +260,10 @@ func New(client clientset.Interface,
metrics.Register()
var sched *Scheduler
if options.legacyPolicySource == nil {
// Create the config from component config
sc, err := configurator.create()
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler: %v", err)
}
sched = sc
} else {
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case options.legacyPolicySource.File != nil:
if err := initPolicyFromFile(options.legacyPolicySource.File.Path, policy); err != nil {
return nil, err
}
case options.legacyPolicySource.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, options.legacyPolicySource.ConfigMap, policy); err != nil {
return nil, err
}
}
// Set extenders on the configurator now that we've decoded the policy
// In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig,
// which would have set extenders in the above instantiation of Configurator from CC options)
configurator.extenders = policy.Extenders
sc, err := configurator.createFromPolicy(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
sched = sc
// Create the config from component config
sched, err := configurator.create()
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler: %v", err)
}
// Additional tweaks to the config produced by the configurator.
@@ -324,42 +287,6 @@ func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]fra
return gvkMap
}
// initPolicyFromFile initialize policy from file
func initPolicyFromFile(policyFile string, policy *schedulerapi.Policy) error {
// Use a policy serialized in a file.
_, err := os.Stat(policyFile)
if err != nil {
return fmt.Errorf("missing policy config file %s", policyFile)
}
data, err := ioutil.ReadFile(policyFile)
if err != nil {
return fmt.Errorf("couldn't read policy config: %v", err)
}
err = runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), []byte(data), policy)
if err != nil {
return fmt.Errorf("invalid policy: %v", err)
}
return nil
}
// initPolicyFromConfigMap initialize policy from configMap
func initPolicyFromConfigMap(client clientset.Interface, policyRef *schedulerapi.SchedulerPolicyConfigMapSource, policy *schedulerapi.Policy) error {
// Use a policy serialized in a config map value.
policyConfigMap, err := client.CoreV1().ConfigMaps(policyRef.Namespace).Get(context.TODO(), policyRef.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err)
}
data, found := policyConfigMap.Data[schedulerapi.SchedulerPolicyConfigMapKey]
if !found {
return fmt.Errorf("missing policy config map value at key %q", schedulerapi.SchedulerPolicyConfigMapKey)
}
err = runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), []byte(data), policy)
if err != nil {
return fmt.Errorf("invalid policy: %v", err)
}
return nil
}
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Run()

View File

@@ -20,9 +20,6 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"regexp"
"sort"
@@ -1136,83 +1133,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
}
}
func TestInitPolicyFromFile(t *testing.T) {
dir, err := ioutil.TempDir(os.TempDir(), "policy")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
defer os.RemoveAll(dir)
for i, test := range []struct {
policy string
expectedPredicates sets.String
}{
// Test json format policy file
{
policy: `{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PredicateOne"},
{"name" : "PredicateTwo"}
],
"priorities" : [
{"name" : "PriorityOne", "weight" : 1},
{"name" : "PriorityTwo", "weight" : 5}
]
}`,
expectedPredicates: sets.NewString(
"PredicateOne",
"PredicateTwo",
),
},
// Test yaml format policy file
{
policy: `apiVersion: v1
kind: Policy
predicates:
- name: PredicateOne
- name: PredicateTwo
priorities:
- name: PriorityOne
weight: 1
- name: PriorityTwo
weight: 5
`,
expectedPredicates: sets.NewString(
"PredicateOne",
"PredicateTwo",
),
},
} {
file := fmt.Sprintf("scheduler-policy-config-file-%d", i)
fullPath := path.Join(dir, file)
if err := ioutil.WriteFile(fullPath, []byte(test.policy), 0644); err != nil {
t.Fatalf("Failed writing a policy config file: %v", err)
}
policy := &schedulerapi.Policy{}
if err := initPolicyFromFile(fullPath, policy); err != nil {
t.Fatalf("Failed writing a policy config file: %v", err)
}
// Verify that the policy is initialized correctly.
schedPredicates := sets.NewString()
for _, p := range policy.Predicates {
schedPredicates.Insert(p.Name)
}
schedPrioritizers := sets.NewString()
for _, p := range policy.Priorities {
schedPrioritizers.Insert(p.Name)
}
if !schedPredicates.Equal(test.expectedPredicates) {
t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates)
}
}
}
func TestSchedulerBinding(t *testing.T) {
table := []struct {
podName string