diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 752fc7b3299..fe6ba7abb0a 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -105,13 +105,12 @@ func TestSkipPodUpdate(t *testing.T) { } for _, test := range table { t.Run(test.name, func(t *testing.T) { - c := NewFromConfig(&Config{ + c := &Scheduler{ SchedulerCache: &fakecache.Cache{ IsAssumedPodFunc: test.isAssumedPodFunc, GetPodFunc: test.getPodFunc, }, - }, - ) + } got := c.skipPodUpdate(test.pod) if got != test.expected { t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected) diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index f3261372db0..4eb83c86ba0 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -42,7 +42,6 @@ import ( storagelistersv1 "k8s.io/client-go/listers/storage/v1" storagelistersv1beta1 "k8s.io/client-go/listers/storage/v1beta1" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/events" "k8s.io/klog" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/algorithm" @@ -70,46 +69,6 @@ type Binder interface { Bind(binding *v1.Binding) error } -// Config is an implementation of the Scheduler's configured input data. -// TODO over time we should make this struct a hidden implementation detail of the scheduler. -type Config struct { - SchedulerCache internalcache.Cache - - Algorithm core.ScheduleAlgorithm - GetBinder func(pod *v1.Pod) Binder - // Framework runs scheduler plugins at configured extension points. - Framework framework.Framework - - // NextPod should be a function that blocks until the next pod - // is available. We don't use a channel for this, because scheduling - // a pod may take some amount of time and we don't want pods to get - // stale while they sit in a channel. - NextPod func() *framework.PodInfo - - // Error is called if there is an error. It is passed the pod in - // question, and the error - Error func(*framework.PodInfo, error) - - // Recorder is the EventRecorder to use - Recorder events.EventRecorder - - // Close this to shut down the scheduler. - StopEverything <-chan struct{} - - // VolumeBinder handles PVC/PV binding for the pod. - VolumeBinder *volumebinder.VolumeBinder - - // Disable pod preemption or not. - DisablePreemption bool - - // SchedulingQueue holds pods to be scheduled - SchedulingQueue internalqueue.SchedulingQueue - - // The final configuration of the framework. - Plugins schedulerapi.Plugins - PluginConfig []schedulerapi.PluginConfig -} - // Configurator defines I/O, caching, and other functionality needed to // construct a new scheduler. type Configurator struct { @@ -291,12 +250,12 @@ func (c *Configurator) GetHardPodAffinitySymmetricWeight() int32 { } // Create creates a scheduler with the default algorithm provider. -func (c *Configurator) Create() (*Config, error) { +func (c *Configurator) Create() (*Scheduler, error) { return c.CreateFromProvider(DefaultProvider) } // CreateFromProvider creates a scheduler from the name of a registered algorithm provider. -func (c *Configurator) CreateFromProvider(providerName string) (*Config, error) { +func (c *Configurator) CreateFromProvider(providerName string) (*Scheduler, error) { klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName) provider, err := GetAlgorithmProvider(providerName) if err != nil { @@ -306,7 +265,7 @@ func (c *Configurator) CreateFromProvider(providerName string) (*Config, error) } // CreateFromConfig creates a scheduler from the configuration file -func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) { +func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler, error) { klog.V(2).Infof("Creating scheduler from configuration: %v", policy) // validate the policy configuration @@ -388,7 +347,7 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, er } // CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys. -func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) { +func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Scheduler, error) { klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys) if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 { @@ -478,7 +437,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e c.enableNonPreempting, ) - return &Config{ + return &Scheduler{ SchedulerCache: c.schedulerCache, Algorithm: algo, GetBinder: getBinderFunc(c.client, extenders), diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index a32afb2617f..980284a4b34 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -109,7 +109,7 @@ func TestCreateFromConfig(t *testing.T) { t.Errorf("Invalid configuration: %v", err) } - conf, err := factory.CreateFromConfig(policy) + sched, err := factory.CreateFromConfig(policy) if err != nil { t.Fatalf("CreateFromConfig failed: %v", err) } @@ -120,15 +120,15 @@ func TestCreateFromConfig(t *testing.T) { // Verify that node label predicate/priority are converted to framework plugins. wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}` - verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, conf, 6, wantArgs) + verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, 6, wantArgs) // Verify that service affinity custom predicate/priority is converted to framework plugin. wantArgs = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"],"antiAffinityLabelsPreference":["rack","zone"]}}` - verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, conf, 6, wantArgs) + verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, 6, wantArgs) } -func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, conf *Config, wantWeight int32, wantArgs string) { +func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, sched *Scheduler, wantWeight int32, wantArgs string) { for _, extensionPoint := range extentionPoints { - plugin, ok := findPlugin(name, extensionPoint, conf) + plugin, ok := findPlugin(name, extensionPoint, sched) if !ok { t.Fatalf("%q plugin does not exist in framework.", name) } @@ -138,7 +138,7 @@ func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, } } // Verify that the policy config is converted to plugin config. - pluginConfig := findPluginConfig(name, conf) + pluginConfig := findPluginConfig(name, sched) encoding, err := json.Marshal(pluginConfig) if err != nil { t.Errorf("Failed to marshal %+v: %v", pluginConfig, err) @@ -149,8 +149,8 @@ func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, } } -func findPlugin(name, extensionPoint string, schedConf *Config) (schedulerapi.Plugin, bool) { - for _, pl := range schedConf.Framework.ListPlugins()[extensionPoint] { +func findPlugin(name, extensionPoint string, sched *Scheduler) (schedulerapi.Plugin, bool) { + for _, pl := range sched.Framework.ListPlugins()[extensionPoint] { if pl.Name == name { return pl, true } @@ -158,8 +158,8 @@ func findPlugin(name, extensionPoint string, schedConf *Config) (schedulerapi.Pl return schedulerapi.Plugin{}, false } -func findPluginConfig(name string, schedConf *Config) schedulerapi.PluginConfig { - for _, c := range schedConf.PluginConfig { +func findPluginConfig(name string, sched *Scheduler) schedulerapi.PluginConfig { + for _, c := range sched.PluginConfig { if c.Name == name { return c } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 6fe15338bc0..b2afd8d5154 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -119,6 +119,10 @@ type Scheduler struct { SchedulingQueue internalqueue.SchedulingQueue scheduledPodsHasSynced func() bool + + // The final configuration of the framework. + Plugins schedulerapi.Plugins + PluginConfig []schedulerapi.PluginConfig } // Cache returns the cache in scheduler for test to check the data in scheduler. @@ -321,7 +325,7 @@ func New(client clientset.Interface, Plugins: options.frameworkPlugins, PluginConfig: options.frameworkPluginConfig, }) - var config *Config + var sched *Scheduler source := schedulerAlgorithmSource switch { case source.Provider != nil: @@ -330,7 +334,7 @@ func New(client clientset.Interface, if err != nil { return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err) } - config = sc + sched = sc case source.Policy != nil: // Create the config from a user specified policy source. policy := &schedulerapi.Policy{} @@ -348,17 +352,15 @@ func New(client clientset.Interface, if err != nil { return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) } - config = sc + sched = sc default: return nil, fmt.Errorf("unsupported algorithm source: %v", source) } + metrics.Register() // Additional tweaks to the config produced by the configurator. - config.Recorder = recorder - config.DisablePreemption = options.disablePreemption - config.StopEverything = stopCh - - // Create the scheduler. - sched := NewFromConfig(config) + sched.Recorder = recorder + sched.DisablePreemption = options.disablePreemption + sched.StopEverything = stopCh sched.podConditionUpdater = &podConditionUpdaterImpl{client} sched.podPreemptor = &podPreemptorImpl{client} sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced @@ -403,24 +405,6 @@ func initPolicyFromConfigMap(client clientset.Interface, policyRef *schedulerapi return nil } -// NewFromConfig returns a new scheduler using the provided Config. -func NewFromConfig(config *Config) *Scheduler { - metrics.Register() - return &Scheduler{ - SchedulerCache: config.SchedulerCache, - Algorithm: config.Algorithm, - GetBinder: config.GetBinder, - Framework: config.Framework, - NextPod: config.NextPod, - Error: config.Error, - Recorder: config.Recorder, - StopEverything: config.StopEverything, - VolumeBinder: config.VolumeBinder, - DisablePreemption: config.DisablePreemption, - SchedulingQueue: config.SchedulingQueue, - } -} - // Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done. func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {