Merge pull request #40188 from jayunit100/NewSchedulerFromInterface

Automatic merge from submit-queue (batch tested with PRs 39538, 40188, 40357, 38214, 40195)

Decoupling scheduler creation from creation of scheduler.Config struc…

**What this PR does / why we need it**:

Adds functionality to the scheduler to initialize from an Configurator interface, rather then via a Config struct.

**Which issue this PR fixes** 

Reduces coupling to `scheduler.Config` data structure format so that we can proliferate more interface driven composition of scheduler components.
This commit is contained in:
Kubernetes Submit Queue 2017-01-25 17:47:12 -08:00 committed by GitHub
commit bb323d8dea
3 changed files with 86 additions and 46 deletions

View File

@ -78,24 +78,20 @@ func Run(s *options.SchedulerServer) error {
if err != nil { if err != nil {
return fmt.Errorf("unable to create kube client: %v", err) return fmt.Errorf("unable to create kube client: %v", err)
} }
config, err := createConfig(s, kubecli) recorder := createRecorder(kubecli, s)
sched, err := createScheduler(s, kubecli, recorder)
if err != nil { if err != nil {
return fmt.Errorf("failed to create scheduler configuration: %v", err) return fmt.Errorf("error creating scheduler: %v", err)
} }
sched := scheduler.New(config)
go startHTTP(s) go startHTTP(s)
run := func(_ <-chan struct{}) { run := func(_ <-chan struct{}) {
sched.Run() sched.Run()
select {} select {}
} }
if !s.LeaderElection.LeaderElect { if !s.LeaderElection.LeaderElect {
run(nil) run(nil)
panic("unreachable") panic("unreachable")
} }
id, err := os.Hostname() id, err := os.Hostname()
if err != nil { if err != nil {
return fmt.Errorf("unable to get hostname: %v", err) return fmt.Errorf("unable to get hostname: %v", err)
@ -109,7 +105,7 @@ func Run(s *options.SchedulerServer) error {
Client: kubecli, Client: kubecli,
LockConfig: resourcelock.ResourceLockConfig{ LockConfig: resourcelock.ResourceLockConfig{
Identity: id, Identity: id,
EventRecorder: config.Recorder, EventRecorder: recorder,
}, },
} }
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
@ -127,6 +123,13 @@ func Run(s *options.SchedulerServer) error {
panic("unreachable") panic("unreachable")
} }
func createRecorder(kubecli *clientset.Clientset, s *options.SchedulerServer) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubecli.Core().Events("")})
return eventBroadcaster.NewRecorder(v1.EventSource{Component: s.SchedulerName})
}
func startHTTP(s *options.SchedulerServer) { func startHTTP(s *options.SchedulerServer) {
mux := http.NewServeMux() mux := http.NewServeMux()
healthz.InstallHandler(mux) healthz.InstallHandler(mux)
@ -171,33 +174,42 @@ func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) {
return cli, nil return cli, nil
} }
func createConfig(s *options.SchedulerServer, kubecli *clientset.Clientset) (*scheduler.Config, error) { // schedulerConfigurator is an interface wrapper that provides default Configuration creation based on user
configFactory := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains) // provided config file.
if _, err := os.Stat(s.PolicyConfigFile); err == nil { type schedulerConfigurator struct {
var ( scheduler.Configurator
policy schedulerapi.Policy policyFile string
configData []byte algorithmProvider string
) }
configData, err := ioutil.ReadFile(s.PolicyConfigFile)
if err != nil { func (sc schedulerConfigurator) Create() (*scheduler.Config, error) {
return nil, fmt.Errorf("unable to read policy config: %v", err) if _, err := os.Stat(sc.policyFile); err != nil {
} return sc.Configurator.CreateFromProvider(sc.algorithmProvider)
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil { }
return nil, fmt.Errorf("invalid configuration: %v", err)
} // policy file is valid, try to create a configuration from it.
return configFactory.CreateFromConfig(policy) var policy schedulerapi.Policy
} configData, err := ioutil.ReadFile(sc.policyFile)
if err != nil {
// if the config file isn't provided, use the specified (or default) provider return nil, fmt.Errorf("unable to read policy config: %v", err)
config, err := configFactory.CreateFromProvider(s.AlgorithmProvider) }
if err != nil { if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
return nil, err return nil, fmt.Errorf("invalid configuration: %v", err)
} }
return sc.CreateFromConfig(policy)
eventBroadcaster := record.NewBroadcaster() }
config.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: s.SchedulerName})
eventBroadcaster.StartLogging(glog.Infof) // createScheduler encapsulates the entire creation of a runnable scheduler.
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubecli.Core().Events("")}) func createScheduler(s *options.SchedulerServer, kubecli *clientset.Clientset, recorder record.EventRecorder) (*scheduler.Scheduler, error) {
configurator := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains)
return config, nil
// Rebuild the configurator with a default Create(...) method.
configurator = &schedulerConfigurator{
configurator,
s.PolicyConfigFile,
s.AlgorithmProvider}
return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
cfg.Recorder = recorder
})
} }

View File

@ -51,6 +51,10 @@ type Scheduler struct {
config *Config config *Config
} }
func (sched *Scheduler) StopEverything() {
close(sched.config.StopEverything)
}
// These are the functions which need to be provided in order to build a Scheduler configuration. // These are the functions which need to be provided in order to build a Scheduler configuration.
// An implementation of this can be seen in factory.go. // An implementation of this can be seen in factory.go.
type Configurator interface { type Configurator interface {
@ -78,6 +82,7 @@ type Configurator interface {
CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
} }
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct { type Config struct {
// It is expected that changes made via SchedulerCache will be observed // It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm. // by NodeLister and Algorithm.
@ -108,6 +113,7 @@ type Config struct {
} }
// New returns a new scheduler. // New returns a new scheduler.
// TODO replace this with NewFromConfigurator.
func New(c *Config) *Scheduler { func New(c *Config) *Scheduler {
s := &Scheduler{ s := &Scheduler{
config: c, config: c,
@ -116,6 +122,25 @@ func New(c *Config) *Scheduler {
return s return s
} }
// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented.
// Supports intermediate Config mutation for now if you provide modifier functions which will run after Config is created.
func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Scheduler, error) {
cfg, err := c.Create()
if err != nil {
return nil, err
}
// Mutate it if any functions were provided, changes might be required for certain types of tests (i.e. change the recorder).
for _, modifier := range modifiers {
modifier(cfg)
}
// From this point on the config is immutable to the outside.
s := &Scheduler{
config: cfg,
}
metrics.Register()
return s, nil
}
// Run begins watching and scheduling. It starts a goroutine and returns immediately. // Run begins watching and scheduling. It starts a goroutine and returns immediately.
func (s *Scheduler) Run() { func (s *Scheduler) Run() {
go wait.Until(s.scheduleOne, 0, s.config.StopEverything) go wait.Until(s.scheduleOne, 0, s.config.StopEverything)

View File

@ -40,7 +40,7 @@ import (
// remove resources after finished. // remove resources after finished.
// Notes on rate limiter: // Notes on rate limiter:
// - client rate limit is set to 5000. // - client rate limit is set to 5000.
func mustSetupScheduler() (schedulerConfigFactory scheduler.Configurator, destroyFunc func()) { func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroyFunc func()) {
h := &framework.MasterHolder{Initialized: make(chan struct{})} h := &framework.MasterHolder{Initialized: make(chan struct{})}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -57,20 +57,23 @@ func mustSetupScheduler() (schedulerConfigFactory scheduler.Configurator, destro
Burst: 5000, Burst: 5000,
}) })
schedulerConfigFactory = factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight, v1.DefaultFailureDomains) schedulerConfigurator = factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight, v1.DefaultFailureDomains)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
panic("Couldn't create scheduler config")
}
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: "scheduler"})
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: clientSet.Core().Events("")}) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: clientSet.Core().Events("")})
scheduler.New(schedulerConfig).Run()
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *scheduler.Config) {
conf.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: "scheduler"})
})
if err != nil {
glog.Fatalf("Error creating scheduler: %v", err)
}
sched.Run()
destroyFunc = func() { destroyFunc = func() {
glog.Infof("destroying") glog.Infof("destroying")
close(schedulerConfig.StopEverything) sched.StopEverything()
s.Close() s.Close()
glog.Infof("destroyed") glog.Infof("destroyed")
} }