Merge pull request #85049 from ahmad-diaa/make-configurator-create-return-sched

Change Configurator.Create to Return a Scheduler
This commit is contained in:
Kubernetes Prow Robot 2019-11-12 09:06:36 -08:00 committed by GitHub
commit ed4d515f0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 28 additions and 86 deletions

View File

@ -105,13 +105,12 @@ func TestSkipPodUpdate(t *testing.T) {
} }
for _, test := range table { for _, test := range table {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
c := NewFromConfig(&Config{ c := &Scheduler{
SchedulerCache: &fakecache.Cache{ SchedulerCache: &fakecache.Cache{
IsAssumedPodFunc: test.isAssumedPodFunc, IsAssumedPodFunc: test.isAssumedPodFunc,
GetPodFunc: test.getPodFunc, GetPodFunc: test.getPodFunc,
}, },
}, }
)
got := c.skipPodUpdate(test.pod) got := c.skipPodUpdate(test.pod)
if got != test.expected { if got != test.expected {
t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected) t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected)

View File

@ -42,7 +42,6 @@ import (
storagelistersv1 "k8s.io/client-go/listers/storage/v1" storagelistersv1 "k8s.io/client-go/listers/storage/v1"
storagelistersv1beta1 "k8s.io/client-go/listers/storage/v1beta1" storagelistersv1beta1 "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
@ -70,46 +69,6 @@ type Binder interface {
Bind(binding *v1.Binding) error Bind(binding *v1.Binding) error
} }
// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
SchedulerCache internalcache.Cache
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// Framework runs scheduler plugins at configured extension points.
Framework framework.Framework
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *framework.PodInfo
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*framework.PodInfo, error)
// Recorder is the EventRecorder to use
Recorder events.EventRecorder
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder
// Disable pod preemption or not.
DisablePreemption bool
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
// The final configuration of the framework.
Plugins schedulerapi.Plugins
PluginConfig []schedulerapi.PluginConfig
}
// Configurator defines I/O, caching, and other functionality needed to // Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler. // construct a new scheduler.
type Configurator struct { type Configurator struct {
@ -291,12 +250,12 @@ func (c *Configurator) GetHardPodAffinitySymmetricWeight() int32 {
} }
// Create creates a scheduler with the default algorithm provider. // Create creates a scheduler with the default algorithm provider.
func (c *Configurator) Create() (*Config, error) { func (c *Configurator) Create() (*Scheduler, error) {
return c.CreateFromProvider(DefaultProvider) return c.CreateFromProvider(DefaultProvider)
} }
// CreateFromProvider creates a scheduler from the name of a registered algorithm provider. // CreateFromProvider creates a scheduler from the name of a registered algorithm provider.
func (c *Configurator) CreateFromProvider(providerName string) (*Config, error) { func (c *Configurator) CreateFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName) klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName) provider, err := GetAlgorithmProvider(providerName)
if err != nil { if err != nil {
@ -306,7 +265,7 @@ func (c *Configurator) CreateFromProvider(providerName string) (*Config, error)
} }
// CreateFromConfig creates a scheduler from the configuration file // CreateFromConfig creates a scheduler from the configuration file
func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) { func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from configuration: %v", policy) klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
// validate the policy configuration // validate the policy configuration
@ -388,7 +347,7 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, er
} }
// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys. // CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) { func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys) klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 { if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
@ -478,7 +437,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
c.enableNonPreempting, c.enableNonPreempting,
) )
return &Config{ return &Scheduler{
SchedulerCache: c.schedulerCache, SchedulerCache: c.schedulerCache,
Algorithm: algo, Algorithm: algo,
GetBinder: getBinderFunc(c.client, extenders), GetBinder: getBinderFunc(c.client, extenders),

View File

@ -109,7 +109,7 @@ func TestCreateFromConfig(t *testing.T) {
t.Errorf("Invalid configuration: %v", err) t.Errorf("Invalid configuration: %v", err)
} }
conf, err := factory.CreateFromConfig(policy) sched, err := factory.CreateFromConfig(policy)
if err != nil { if err != nil {
t.Fatalf("CreateFromConfig failed: %v", err) t.Fatalf("CreateFromConfig failed: %v", err)
} }
@ -120,15 +120,15 @@ func TestCreateFromConfig(t *testing.T) {
// Verify that node label predicate/priority are converted to framework plugins. // Verify that node label predicate/priority are converted to framework plugins.
wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}` wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}`
verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, conf, 6, wantArgs) verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, 6, wantArgs)
// Verify that service affinity custom predicate/priority is converted to framework plugin. // Verify that service affinity custom predicate/priority is converted to framework plugin.
wantArgs = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"],"antiAffinityLabelsPreference":["rack","zone"]}}` wantArgs = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"],"antiAffinityLabelsPreference":["rack","zone"]}}`
verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, conf, 6, wantArgs) verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, 6, wantArgs)
} }
func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, conf *Config, wantWeight int32, wantArgs string) { func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, sched *Scheduler, wantWeight int32, wantArgs string) {
for _, extensionPoint := range extentionPoints { for _, extensionPoint := range extentionPoints {
plugin, ok := findPlugin(name, extensionPoint, conf) plugin, ok := findPlugin(name, extensionPoint, sched)
if !ok { if !ok {
t.Fatalf("%q plugin does not exist in framework.", name) t.Fatalf("%q plugin does not exist in framework.", name)
} }
@ -138,7 +138,7 @@ func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string,
} }
} }
// Verify that the policy config is converted to plugin config. // Verify that the policy config is converted to plugin config.
pluginConfig := findPluginConfig(name, conf) pluginConfig := findPluginConfig(name, sched)
encoding, err := json.Marshal(pluginConfig) encoding, err := json.Marshal(pluginConfig)
if err != nil { if err != nil {
t.Errorf("Failed to marshal %+v: %v", pluginConfig, err) t.Errorf("Failed to marshal %+v: %v", pluginConfig, err)
@ -149,8 +149,8 @@ func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string,
} }
} }
func findPlugin(name, extensionPoint string, schedConf *Config) (schedulerapi.Plugin, bool) { func findPlugin(name, extensionPoint string, sched *Scheduler) (schedulerapi.Plugin, bool) {
for _, pl := range schedConf.Framework.ListPlugins()[extensionPoint] { for _, pl := range sched.Framework.ListPlugins()[extensionPoint] {
if pl.Name == name { if pl.Name == name {
return pl, true return pl, true
} }
@ -158,8 +158,8 @@ func findPlugin(name, extensionPoint string, schedConf *Config) (schedulerapi.Pl
return schedulerapi.Plugin{}, false return schedulerapi.Plugin{}, false
} }
func findPluginConfig(name string, schedConf *Config) schedulerapi.PluginConfig { func findPluginConfig(name string, sched *Scheduler) schedulerapi.PluginConfig {
for _, c := range schedConf.PluginConfig { for _, c := range sched.PluginConfig {
if c.Name == name { if c.Name == name {
return c return c
} }

View File

@ -119,6 +119,10 @@ type Scheduler struct {
SchedulingQueue internalqueue.SchedulingQueue SchedulingQueue internalqueue.SchedulingQueue
scheduledPodsHasSynced func() bool scheduledPodsHasSynced func() bool
// The final configuration of the framework.
Plugins schedulerapi.Plugins
PluginConfig []schedulerapi.PluginConfig
} }
// Cache returns the cache in scheduler for test to check the data in scheduler. // Cache returns the cache in scheduler for test to check the data in scheduler.
@ -321,7 +325,7 @@ func New(client clientset.Interface,
Plugins: options.frameworkPlugins, Plugins: options.frameworkPlugins,
PluginConfig: options.frameworkPluginConfig, PluginConfig: options.frameworkPluginConfig,
}) })
var config *Config var sched *Scheduler
source := schedulerAlgorithmSource source := schedulerAlgorithmSource
switch { switch {
case source.Provider != nil: case source.Provider != nil:
@ -330,7 +334,7 @@ func New(client clientset.Interface,
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err) return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
} }
config = sc sched = sc
case source.Policy != nil: case source.Policy != nil:
// Create the config from a user specified policy source. // Create the config from a user specified policy source.
policy := &schedulerapi.Policy{} policy := &schedulerapi.Policy{}
@ -348,17 +352,15 @@ func New(client clientset.Interface,
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
} }
config = sc sched = sc
default: default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source) return nil, fmt.Errorf("unsupported algorithm source: %v", source)
} }
metrics.Register()
// Additional tweaks to the config produced by the configurator. // Additional tweaks to the config produced by the configurator.
config.Recorder = recorder sched.Recorder = recorder
config.DisablePreemption = options.disablePreemption sched.DisablePreemption = options.disablePreemption
config.StopEverything = stopCh sched.StopEverything = stopCh
// Create the scheduler.
sched := NewFromConfig(config)
sched.podConditionUpdater = &podConditionUpdaterImpl{client} sched.podConditionUpdater = &podConditionUpdaterImpl{client}
sched.podPreemptor = &podPreemptorImpl{client} sched.podPreemptor = &podPreemptorImpl{client}
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
@ -403,24 +405,6 @@ func initPolicyFromConfigMap(client clientset.Interface, policyRef *schedulerapi
return nil return nil
} }
// NewFromConfig returns a new scheduler using the provided Config.
func NewFromConfig(config *Config) *Scheduler {
metrics.Register()
return &Scheduler{
SchedulerCache: config.SchedulerCache,
Algorithm: config.Algorithm,
GetBinder: config.GetBinder,
Framework: config.Framework,
NextPod: config.NextPod,
Error: config.Error,
Recorder: config.Recorder,
StopEverything: config.StopEverything,
VolumeBinder: config.VolumeBinder,
DisablePreemption: config.DisablePreemption,
SchedulingQueue: config.SchedulingQueue,
}
}
// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done. // Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) { func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {