mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #83099 from ahg-g/ahg-config-layer2
[migration] Mapping Layer - 2. Convert predicates/priorities configurations to a framework plugin one.
This commit is contained in:
commit
a87c1b2d72
@ -18,6 +18,7 @@ go_library(
|
||||
"//pkg/scheduler/api/validation:go_default_library",
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/core:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//pkg/scheduler/internal/cache/debugger:go_default_library",
|
||||
@ -65,6 +66,7 @@ go_test(
|
||||
"//pkg/scheduler/api/latest:go_default_library",
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//pkg/scheduler/internal/queue:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||
@ -79,6 +81,7 @@ go_test(
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1/fake:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -54,6 +54,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/api/validation"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/core"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
|
||||
@ -122,6 +123,10 @@ type Config struct {
|
||||
|
||||
// SchedulingQueue holds pods to be scheduled
|
||||
SchedulingQueue internalqueue.SchedulingQueue
|
||||
|
||||
// The final configuration of the framework.
|
||||
Plugins config.Plugins
|
||||
PluginConfig []config.PluginConfig
|
||||
}
|
||||
|
||||
// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
|
||||
@ -189,9 +194,10 @@ type Configurator struct {
|
||||
enableNonPreempting bool
|
||||
|
||||
// framework configuration arguments.
|
||||
registry framework.Registry
|
||||
plugins *config.Plugins
|
||||
pluginConfig []config.PluginConfig
|
||||
registry framework.Registry
|
||||
plugins *config.Plugins
|
||||
pluginConfig []config.PluginConfig
|
||||
pluginConfigProducerRegistry *plugins.ConfigProducerRegistry
|
||||
}
|
||||
|
||||
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
|
||||
@ -216,6 +222,7 @@ type ConfigFactoryArgs struct {
|
||||
Registry framework.Registry
|
||||
Plugins *config.Plugins
|
||||
PluginConfig []config.PluginConfig
|
||||
PluginConfigProducerRegistry *plugins.ConfigProducerRegistry
|
||||
}
|
||||
|
||||
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
|
||||
@ -261,6 +268,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
|
||||
registry: args.Registry,
|
||||
plugins: args.Plugins,
|
||||
pluginConfig: args.PluginConfig,
|
||||
pluginConfigProducerRegistry: args.PluginConfigProducerRegistry,
|
||||
}
|
||||
// Setup volume binder
|
||||
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
||||
@ -375,12 +383,12 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
|
||||
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
|
||||
}
|
||||
|
||||
predicateFuncs, err := c.GetPredicates(predicateKeys)
|
||||
predicateFuncs, pluginsForPredicates, pluginConfigForPredicates, err := c.getPredicateConfigs(predicateKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
priorityConfigs, err := c.getPriorityFunctionConfigs(priorityKeys)
|
||||
priorityConfigs, pluginsForPriorities, pluginConfigForPriorities, err := c.getPriorityConfigs(priorityKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -395,14 +403,25 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
framework, err := framework.NewFramework(c.registry, c.plugins, c.pluginConfig)
|
||||
// Combine all framework configurations. If this results in any duplication, framework
|
||||
// instantiation should fail.
|
||||
var plugins config.Plugins
|
||||
plugins.Append(pluginsForPredicates)
|
||||
plugins.Append(pluginsForPriorities)
|
||||
plugins.Append(c.plugins)
|
||||
var pluginConfig []config.PluginConfig
|
||||
pluginConfig = append(pluginConfig, pluginConfigForPredicates...)
|
||||
pluginConfig = append(pluginConfig, pluginConfigForPriorities...)
|
||||
pluginConfig = append(pluginConfig, c.pluginConfig...)
|
||||
|
||||
framework, err := framework.NewFramework(c.registry, &plugins, pluginConfig)
|
||||
if err != nil {
|
||||
klog.Fatalf("error initializing the scheduling framework: %v", err)
|
||||
}
|
||||
|
||||
podQueue := internalqueue.NewSchedulingQueue(c.StopEverything, framework)
|
||||
|
||||
// Setup cache debugger
|
||||
// Setup cache debugger.
|
||||
debugger := cachedebugger.New(
|
||||
c.nodeLister,
|
||||
c.podLister,
|
||||
@ -449,6 +468,8 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
|
||||
StopEverything: c.StopEverything,
|
||||
VolumeBinder: c.volumeBinder,
|
||||
SchedulingQueue: podQueue,
|
||||
Plugins: plugins,
|
||||
PluginConfig: pluginConfig,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -465,22 +486,43 @@ func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerEx
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Configurator) getPriorityFunctionConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, error) {
|
||||
pluginArgs, err := c.getPluginArgs()
|
||||
// getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run
|
||||
// as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was
|
||||
// registered for that priority.
|
||||
func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, *config.Plugins, []config.PluginConfig, error) {
|
||||
algorithmArgs, configProducerArgs := c.getAlgorithmArgs()
|
||||
|
||||
allPriorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, *algorithmArgs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
return getPriorityFunctionConfigs(priorityKeys, *pluginArgs)
|
||||
if c.pluginConfigProducerRegistry == nil {
|
||||
return allPriorityConfigs, nil, nil, nil
|
||||
}
|
||||
|
||||
var priorityConfigs []priorities.PriorityConfig
|
||||
var plugins config.Plugins
|
||||
var pluginConfig []config.PluginConfig
|
||||
frameworkConfigProducers := c.pluginConfigProducerRegistry.PriorityToConfigProducer
|
||||
for _, p := range allPriorityConfigs {
|
||||
if producer, exist := frameworkConfigProducers[p.Name]; exist {
|
||||
args := *configProducerArgs
|
||||
args.Weight = int32(p.Weight)
|
||||
pl, pc := producer(args)
|
||||
plugins.Append(&pl)
|
||||
pluginConfig = append(pluginConfig, pc...)
|
||||
} else {
|
||||
priorityConfigs = append(priorityConfigs, p)
|
||||
}
|
||||
}
|
||||
return priorityConfigs, &plugins, pluginConfig, nil
|
||||
}
|
||||
|
||||
func (c *Configurator) getPriorityMetadataProducer() (priorities.PriorityMetadataProducer, error) {
|
||||
pluginArgs, err := c.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
algorithmArgs, _ := c.getAlgorithmArgs()
|
||||
|
||||
return getPriorityMetadataProducer(*pluginArgs)
|
||||
return getPriorityMetadataProducer(*algorithmArgs)
|
||||
}
|
||||
|
||||
// GetPredicateMetadataProducer returns a function to build Predicate Metadata.
|
||||
@ -489,18 +531,63 @@ func (c *Configurator) GetPredicateMetadataProducer() (predicates.PredicateMetad
|
||||
return getPredicateMetadataProducer()
|
||||
}
|
||||
|
||||
// GetPredicates returns the predicate functions.
|
||||
// It is used by the scheduler and other components, such as k8s.io/autoscaler/cluster-autoscaler.
|
||||
func (c *Configurator) GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error) {
|
||||
pluginArgs, err := c.getPluginArgs()
|
||||
// getPredicateConfigs returns predicates configuration: ones that will run as fitPredicates and ones that will run
|
||||
// as framework plugins. Specifically, a predicate will run as a framework plugin if a plugin config producer was
|
||||
// registered for that predicate.
|
||||
// Note that the framework executes plugins according to their order in the Plugins list, and so predicates run as plugins
|
||||
// are added to the Plugins list according to the order specified in predicates.Ordering().
|
||||
func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[string]predicates.FitPredicate, *config.Plugins, []config.PluginConfig, error) {
|
||||
algorithmArgs, configProducerArgs := c.getAlgorithmArgs()
|
||||
|
||||
allFitPredicates, err := getFitPredicateFunctions(predicateKeys, *algorithmArgs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
return getFitPredicateFunctions(predicateKeys, *pluginArgs)
|
||||
if c.pluginConfigProducerRegistry == nil {
|
||||
return allFitPredicates, nil, nil, nil
|
||||
}
|
||||
|
||||
asPlugins := sets.NewString()
|
||||
asFitPredicates := make(map[string]predicates.FitPredicate)
|
||||
frameworkConfigProducers := c.pluginConfigProducerRegistry.PredicateToConfigProducer
|
||||
|
||||
// First, identify the predicates that will run as actual fit predicates, and ones
|
||||
// that will run as framework plugins.
|
||||
for predicateKey := range allFitPredicates {
|
||||
if _, exist := frameworkConfigProducers[predicateKey]; exist {
|
||||
asPlugins.Insert(predicateKey)
|
||||
} else {
|
||||
asFitPredicates[predicateKey] = allFitPredicates[predicateKey]
|
||||
}
|
||||
}
|
||||
|
||||
// Second, create the framework plugin configurations, and place them in the order
|
||||
// that the corresponding predicates were supposed to run.
|
||||
var plugins config.Plugins
|
||||
var pluginConfig []config.PluginConfig
|
||||
for _, predicateKey := range predicates.Ordering() {
|
||||
if asPlugins.Has(predicateKey) {
|
||||
producer := frameworkConfigProducers[predicateKey]
|
||||
p, pc := producer(*configProducerArgs)
|
||||
plugins.Append(&p)
|
||||
pluginConfig = append(pluginConfig, pc...)
|
||||
asPlugins.Delete(predicateKey)
|
||||
}
|
||||
}
|
||||
|
||||
// Third, add the rest in no specific order.
|
||||
for predicateKey := range asPlugins {
|
||||
producer := frameworkConfigProducers[predicateKey]
|
||||
p, pc := producer(*configProducerArgs)
|
||||
plugins.Append(&p)
|
||||
pluginConfig = append(pluginConfig, pc...)
|
||||
}
|
||||
|
||||
return asFitPredicates, &plugins, pluginConfig, nil
|
||||
}
|
||||
|
||||
func (c *Configurator) getPluginArgs() (*PluginFactoryArgs, error) {
|
||||
func (c *Configurator) getAlgorithmArgs() (*PluginFactoryArgs, *plugins.ConfigProducerArgs) {
|
||||
return &PluginFactoryArgs{
|
||||
PodLister: c.schedulerCache,
|
||||
ServiceLister: c.serviceLister,
|
||||
@ -516,7 +603,7 @@ func (c *Configurator) getPluginArgs() (*PluginFactoryArgs, error) {
|
||||
StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},
|
||||
VolumeBinder: c.volumeBinder,
|
||||
HardPodAffinitySymmetricWeight: c.hardPodAffinitySymmetricWeight,
|
||||
}, nil
|
||||
}, &plugins.ConfigProducerArgs{}
|
||||
}
|
||||
|
||||
type podInformer struct {
|
||||
|
@ -20,9 +20,12 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -39,8 +42,9 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
plugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
|
||||
config "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
@ -71,10 +75,10 @@ func TestCreateFromConfig(t *testing.T) {
|
||||
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
|
||||
|
||||
// Pre-register some predicate and priority functions
|
||||
RegisterFitPredicate("PredicateOne", PredicateOne)
|
||||
RegisterFitPredicate("PredicateTwo", PredicateTwo)
|
||||
RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
|
||||
RegisterPriorityFunction("PriorityTwo", PriorityTwo, 1)
|
||||
RegisterFitPredicate("PredicateOne", PredicateFunc)
|
||||
RegisterFitPredicate("PredicateTwo", PredicateFunc)
|
||||
RegisterPriorityFunction("PriorityOne", PriorityFunc, 1)
|
||||
RegisterPriorityFunction("PriorityTwo", PriorityFunc, 1)
|
||||
|
||||
configData = []byte(`{
|
||||
"kind" : "Policy",
|
||||
@ -111,10 +115,10 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
|
||||
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
|
||||
|
||||
// Pre-register some predicate and priority functions
|
||||
RegisterFitPredicate("PredicateOne", PredicateOne)
|
||||
RegisterFitPredicate("PredicateTwo", PredicateTwo)
|
||||
RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
|
||||
RegisterPriorityFunction("PriorityTwo", PriorityTwo, 1)
|
||||
RegisterFitPredicate("PredicateOne", PredicateFunc)
|
||||
RegisterFitPredicate("PredicateTwo", PredicateFunc)
|
||||
RegisterPriorityFunction("PriorityOne", PriorityFunc, 1)
|
||||
RegisterPriorityFunction("PriorityTwo", PriorityFunc, 1)
|
||||
|
||||
configData = []byte(`{
|
||||
"kind" : "Policy",
|
||||
@ -168,8 +172,8 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
|
||||
defer close(stopCh)
|
||||
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
|
||||
|
||||
RegisterFitPredicate("PredicateOne", PredicateOne)
|
||||
RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
|
||||
RegisterFitPredicate("PredicateOne", PredicateFunc)
|
||||
RegisterPriorityFunction("PriorityOne", PriorityFunc, 1)
|
||||
|
||||
RegisterAlgorithmProvider(DefaultProvider, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
|
||||
|
||||
@ -182,14 +186,14 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
|
||||
t.Fatalf("Invalid configuration: %v", err)
|
||||
}
|
||||
|
||||
config, err := factory.CreateFromConfig(policy)
|
||||
c, err := factory.CreateFromConfig(policy)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create scheduler from configuration: %v", err)
|
||||
}
|
||||
if _, found := config.Algorithm.Predicates()["PredicateOne"]; !found {
|
||||
if _, found := c.Algorithm.Predicates()["PredicateOne"]; !found {
|
||||
t.Errorf("Expected predicate PredicateOne from %q", DefaultProvider)
|
||||
}
|
||||
if len(config.Algorithm.Prioritizers()) != 1 || config.Algorithm.Prioritizers()[0].Name != "PriorityOne" {
|
||||
if len(c.Algorithm.Prioritizers()) != 1 || c.Algorithm.Prioritizers()[0].Name != "PriorityOne" {
|
||||
t.Errorf("Expected priority PriorityOne from %q", DefaultProvider)
|
||||
}
|
||||
}
|
||||
@ -203,8 +207,8 @@ func TestCreateFromConfigWithEmptyPredicatesOrPriorities(t *testing.T) {
|
||||
defer close(stopCh)
|
||||
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
|
||||
|
||||
RegisterFitPredicate("PredicateOne", PredicateOne)
|
||||
RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
|
||||
RegisterFitPredicate("PredicateOne", PredicateFunc)
|
||||
RegisterPriorityFunction("PriorityOne", PriorityFunc, 1)
|
||||
|
||||
RegisterAlgorithmProvider(DefaultProvider, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
|
||||
|
||||
@ -219,31 +223,23 @@ func TestCreateFromConfigWithEmptyPredicatesOrPriorities(t *testing.T) {
|
||||
t.Fatalf("Invalid configuration: %v", err)
|
||||
}
|
||||
|
||||
config, err := factory.CreateFromConfig(policy)
|
||||
c, err := factory.CreateFromConfig(policy)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create scheduler from configuration: %v", err)
|
||||
}
|
||||
if len(config.Algorithm.Predicates()) != 0 {
|
||||
if len(c.Algorithm.Predicates()) != 0 {
|
||||
t.Error("Expected empty predicate sets")
|
||||
}
|
||||
if len(config.Algorithm.Prioritizers()) != 0 {
|
||||
if len(c.Algorithm.Prioritizers()) != 0 {
|
||||
t.Error("Expected empty priority sets")
|
||||
}
|
||||
}
|
||||
|
||||
func PredicateOne(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
|
||||
func PredicateFunc(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
func PredicateTwo(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
func PriorityOne(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
|
||||
return []schedulerapi.HostPriority{}, nil
|
||||
}
|
||||
|
||||
func PriorityTwo(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
|
||||
func PriorityFunc(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
|
||||
return []schedulerapi.HostPriority{}, nil
|
||||
}
|
||||
|
||||
@ -476,7 +472,9 @@ func TestInvalidFactoryArgs(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) *Configurator {
|
||||
func newConfigFactoryWithFrameworkRegistry(
|
||||
client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{},
|
||||
registry framework.Registry, pluginConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry) *Configurator {
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
return NewConfigFactory(&ConfigFactoryArgs{
|
||||
client,
|
||||
@ -496,12 +494,19 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight
|
||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
bindTimeoutSeconds,
|
||||
stopCh,
|
||||
plugins.NewDefaultRegistry(),
|
||||
registry,
|
||||
nil,
|
||||
[]config.PluginConfig{},
|
||||
pluginConfigProducerRegistry,
|
||||
})
|
||||
}
|
||||
|
||||
func newConfigFactory(
|
||||
client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) *Configurator {
|
||||
return newConfigFactoryWithFrameworkRegistry(client, hardPodAffinitySymmetricWeight, stopCh,
|
||||
frameworkplugins.NewDefaultRegistry(), frameworkplugins.NewDefaultConfigProducerRegistry())
|
||||
}
|
||||
|
||||
type fakeExtender struct {
|
||||
isBinder bool
|
||||
interestedPodName string
|
||||
@ -616,3 +621,188 @@ func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm
|
||||
t.Errorf("Expected binder %q but got %q", expectedBinderType, binderType)
|
||||
}
|
||||
}
|
||||
|
||||
type TestPlugin struct {
|
||||
name string
|
||||
}
|
||||
|
||||
func (t *TestPlugin) Name() string {
|
||||
return t.name
|
||||
}
|
||||
|
||||
func (t *TestPlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName string) (int, *framework.Status) {
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
func (t *TestPlugin) NormalizeScore(pc *framework.PluginContext, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Test configures a scheduler from a policies defined in a file
|
||||
// It combines some configurable predicate/priorities with some pre-defined ones
|
||||
func TestCreateWithFrameworkPlugins(t *testing.T) {
|
||||
var configData []byte
|
||||
var policy schedulerapi.Policy
|
||||
|
||||
client := fake.NewSimpleClientset()
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
predicateOneName := "PredicateOne"
|
||||
filterOneName := "FilterOne"
|
||||
predicateTwoName := "PredicateTwo"
|
||||
filterTwoName := "FilterTwo"
|
||||
predicateThreeName := "PredicateThree"
|
||||
predicateFourName := "PredicateFour"
|
||||
priorityOneName := "PriorityOne"
|
||||
scoreOneName := "ScoreOne"
|
||||
priorityTwoName := "PriorityTwo"
|
||||
scoreTwoName := "ScoreTwo"
|
||||
priorityThreeName := "PriorityThree"
|
||||
|
||||
configProducerRegistry := &frameworkplugins.ConfigProducerRegistry{
|
||||
PredicateToConfigProducer: make(map[string]frameworkplugins.ConfigProducer),
|
||||
PriorityToConfigProducer: make(map[string]frameworkplugins.ConfigProducer),
|
||||
}
|
||||
configProducerRegistry.RegisterPredicate(predicateOneName,
|
||||
func(_ frameworkplugins.ConfigProducerArgs) (config.Plugins, []config.PluginConfig) {
|
||||
return config.Plugins{
|
||||
Filter: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: filterOneName}}}}, nil
|
||||
})
|
||||
|
||||
configProducerRegistry.RegisterPredicate(predicateTwoName,
|
||||
func(_ frameworkplugins.ConfigProducerArgs) (config.Plugins, []config.PluginConfig) {
|
||||
return config.Plugins{
|
||||
Filter: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: filterTwoName}}}}, nil
|
||||
})
|
||||
|
||||
configProducerRegistry.RegisterPriority(priorityOneName,
|
||||
func(args frameworkplugins.ConfigProducerArgs) (config.Plugins, []config.PluginConfig) {
|
||||
return config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: scoreOneName, Weight: args.Weight}}}}, nil
|
||||
})
|
||||
|
||||
configProducerRegistry.RegisterPriority(priorityTwoName,
|
||||
func(args frameworkplugins.ConfigProducerArgs) (config.Plugins, []config.PluginConfig) {
|
||||
return config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: scoreTwoName, Weight: args.Weight}}}}, nil
|
||||
})
|
||||
|
||||
registry := framework.Registry{
|
||||
filterOneName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return &TestPlugin{name: filterOneName}, nil
|
||||
},
|
||||
filterTwoName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return &TestPlugin{name: filterTwoName}, nil
|
||||
},
|
||||
scoreOneName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return &TestPlugin{name: scoreOneName}, nil
|
||||
},
|
||||
scoreTwoName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return &TestPlugin{name: scoreTwoName}, nil
|
||||
},
|
||||
}
|
||||
|
||||
factory := newConfigFactoryWithFrameworkRegistry(
|
||||
client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh, registry, configProducerRegistry)
|
||||
|
||||
// Pre-register some predicate and priority functions
|
||||
RegisterMandatoryFitPredicate(predicateOneName, PredicateFunc)
|
||||
RegisterFitPredicate(predicateTwoName, PredicateFunc)
|
||||
RegisterFitPredicate(predicateThreeName, PredicateFunc)
|
||||
RegisterMandatoryFitPredicate(predicateFourName, PredicateFunc)
|
||||
RegisterPriorityFunction(priorityOneName, PriorityFunc, 1)
|
||||
RegisterPriorityFunction(priorityTwoName, PriorityFunc, 1)
|
||||
RegisterPriorityFunction(priorityThreeName, PriorityFunc, 1)
|
||||
|
||||
configData = []byte(`{
|
||||
"kind" : "Policy",
|
||||
"apiVersion" : "v1",
|
||||
"predicates" : [
|
||||
{"name" : "PredicateOne"},
|
||||
{"name" : "PredicateTwo"},
|
||||
{"name" : "PredicateThree"},
|
||||
{"name" : "PredicateThree"}
|
||||
],
|
||||
"priorities" : [
|
||||
{"name" : "PriorityOne", "weight" : 2},
|
||||
{"name" : "PriorityTwo", "weight" : 1},
|
||||
{"name" : "PriorityThree", "weight" : 1} ]
|
||||
}`)
|
||||
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
|
||||
t.Errorf("Invalid configuration: %v", err)
|
||||
}
|
||||
|
||||
c, err := factory.CreateFromConfig(policy)
|
||||
if err != nil {
|
||||
t.Fatalf("creating config: %v", err)
|
||||
}
|
||||
|
||||
gotPredicates := sets.NewString()
|
||||
for p := range c.Algorithm.Predicates() {
|
||||
gotPredicates.Insert(p)
|
||||
}
|
||||
wantPredicates := sets.NewString(
|
||||
predicateThreeName,
|
||||
predicateFourName,
|
||||
)
|
||||
if diff := cmp.Diff(wantPredicates, gotPredicates); diff != "" {
|
||||
t.Errorf("unexpected predicates diff (-want, +got): %s", diff)
|
||||
}
|
||||
|
||||
gotPriorities := sets.NewString()
|
||||
for _, p := range c.Algorithm.Prioritizers() {
|
||||
gotPriorities.Insert(p.Name)
|
||||
}
|
||||
wantPriorities := sets.NewString(priorityThreeName)
|
||||
if diff := cmp.Diff(wantPriorities, gotPriorities); diff != "" {
|
||||
t.Errorf("unexpected priorities diff (-want, +got): %s", diff)
|
||||
}
|
||||
|
||||
// Verify the aggregated configuration.
|
||||
wantPlugins := config.Plugins{
|
||||
QueueSort: &config.PluginSet{},
|
||||
PreFilter: &config.PluginSet{},
|
||||
Filter: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: filterOneName},
|
||||
{Name: filterTwoName},
|
||||
},
|
||||
},
|
||||
PostFilter: &config.PluginSet{},
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: scoreOneName, Weight: 2},
|
||||
{Name: scoreTwoName, Weight: 1},
|
||||
},
|
||||
},
|
||||
Reserve: &config.PluginSet{},
|
||||
Permit: &config.PluginSet{},
|
||||
PreBind: &config.PluginSet{},
|
||||
Bind: &config.PluginSet{},
|
||||
PostBind: &config.PluginSet{},
|
||||
Unreserve: &config.PluginSet{},
|
||||
}
|
||||
|
||||
trans := cmp.Transformer("Sort", func(in []config.Plugin) []config.Plugin {
|
||||
out := append([]config.Plugin(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name })
|
||||
return out
|
||||
})
|
||||
|
||||
if diff := cmp.Diff(wantPlugins, c.Plugins, trans); diff != "" {
|
||||
t.Errorf("unexpected plugin configuration (-want, +got): %s", diff)
|
||||
}
|
||||
}
|
||||
|
@ -53,8 +53,8 @@ type ConfigProducerRegistry struct {
|
||||
PriorityToConfigProducer map[string]ConfigProducer
|
||||
}
|
||||
|
||||
// NewConfigProducerRegistry creates a new producer registry.
|
||||
func NewConfigProducerRegistry() *ConfigProducerRegistry {
|
||||
// NewDefaultConfigProducerRegistry creates a new producer registry.
|
||||
func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
|
||||
return &ConfigProducerRegistry{
|
||||
PredicateToConfigProducer: make(map[string]ConfigProducer),
|
||||
PriorityToConfigProducer: make(map[string]ConfigProducer),
|
||||
|
@ -53,7 +53,7 @@ func produceConfig(keys []string, producersMap map[string]ConfigProducer, args C
|
||||
}
|
||||
|
||||
func TestRegisterConfigProducers(t *testing.T) {
|
||||
registry := NewConfigProducerRegistry()
|
||||
registry := NewDefaultConfigProducerRegistry()
|
||||
testPredicateName1 := "testPredicate1"
|
||||
testFilterName1 := "testFilter1"
|
||||
registry.RegisterPredicate(testPredicateName1,
|
||||
|
Loading…
Reference in New Issue
Block a user