From a74fd15e621fe64ec7220182f7de2347e4195635 Mon Sep 17 00:00:00 2001 From: wangqingcan Date: Thu, 4 Oct 2018 07:44:44 +0800 Subject: [PATCH] create a new scheduler constructor --- cmd/kube-scheduler/app/server.go | 24 +- pkg/scheduler/BUILD | 11 +- pkg/scheduler/factory/BUILD | 3 +- pkg/scheduler/factory/factory.go | 110 +++++++- pkg/scheduler/factory/factory_test.go | 3 +- pkg/scheduler/scheduler.go | 263 ++++++++++++------ pkg/scheduler/scheduler_test.go | 13 +- pkg/scheduler/testutil.go | 11 +- test/integration/scheduler/util.go | 6 +- test/integration/scheduler_perf/BUILD | 4 +- .../scheduler_perf/scheduler_test.go | 4 +- test/integration/scheduler_perf/util.go | 4 +- test/integration/util/util.go | 6 +- 13 files changed, 328 insertions(+), 134 deletions(-) diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 32eaa86c0e0..f33835d9139 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -141,14 +141,24 @@ func run(cmd *cobra.Command, args []string, opts *options.Options) error { return fmt.Errorf("unable to register configz: %s", err) } - // Build a scheduler config from the provided algorithm source. - schedulerConfig, err := NewSchedulerConfig(cc) - if err != nil { - return err + var storageClassInformer storageinformers.StorageClassInformer + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + storageClassInformer = c.InformerFactory.Storage().V1().StorageClasses() } // Create the scheduler. - sched := scheduler.NewFromConfig(schedulerConfig) + sched, err := scheduler.New(c.Client, c.InformerFactory.Core().V1().Nodes(), c.PodInformer, + c.InformerFactory.Core().V1().PersistentVolumes(), c.InformerFactory.Core().V1().PersistentVolumeClaims(), + c.InformerFactory.Core().V1().ReplicationControllers(), c.InformerFactory.Apps().V1().ReplicaSets(), + c.InformerFactory.Apps().V1().StatefulSets(), c.InformerFactory.Core().V1().Services(), + c.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), storageClassInformer, c.Recorder, c.ComponentConfig.AlgorithmSource, + scheduler.WithName(c.ComponentConfig.SchedulerName), scheduler.WithHardPodAffinitySymmetricWeight(c.ComponentConfig.HardPodAffinitySymmetricWeight), + scheduler.WithEquivalenceClassCacheEnabled(c.ComponentConfig.EnableContentionProfiling), + scheduler.WithPreemptionDisabled(c.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(c.ComponentConfig.PercentageOfNodesToScore), + scheduler.WithBindTimeoutSeconds(*c.ComponentConfig.BindTimeoutSeconds)) + if err != nil { + return err + } // Prepare the event broadcaster. if cc.Broadcaster != nil && cc.EventClient != nil { @@ -284,7 +294,7 @@ func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, s } // NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests. -func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) { +func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*factory.Config, error) { var storageClassInformer storageinformers.StorageClassInformer if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses() @@ -312,7 +322,7 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Con }) source := s.ComponentConfig.AlgorithmSource - var config *scheduler.Config + var config *factory.Config switch { case source.Provider != nil: // Create the config from a named algorithm provider. diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index fbeefee1ee8..ec840b4b9af 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -13,18 +13,24 @@ go_library( "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/api:go_default_library", + "//pkg/scheduler/api/latest:go_default_library", + "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/core:go_default_library", - "//pkg/scheduler/core/equivalence:go_default_library", + "//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/util:go_default_library", - "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", @@ -43,6 +49,7 @@ go_test( "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/core:go_default_library", + "//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/fake:go_default_library", "//pkg/scheduler/testing:go_default_library", diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index fe8aaac8676..db4ff1bff7c 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -16,7 +16,6 @@ go_library( "//pkg/apis/core/helper:go_default_library", "//pkg/features:go_default_library", "//pkg/kubelet/apis:go_default_library", - "//pkg/scheduler:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/priorities:go_default_library", @@ -50,6 +49,7 @@ go_library( "//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) @@ -64,7 +64,6 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/api/testing:go_default_library", - "//pkg/scheduler:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/api:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index e18866a5699..37bc3c4dee3 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -48,11 +48,11 @@ import ( policylisters "k8s.io/client-go/listers/policy/v1beta1" storagelisters "k8s.io/client-go/listers/storage/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/apis/core/helper" "k8s.io/kubernetes/pkg/features" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - "k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" @@ -78,6 +78,98 @@ var ( maxPDVolumeCountPredicateKeys = []string{predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, predicates.MaxEBSVolumeCountPred} ) +// Binder knows how to write a binding. +type Binder interface { + Bind(binding *v1.Binding) error +} + +// PodConditionUpdater updates the condition of a pod based on the passed +// PodCondition +type PodConditionUpdater interface { + Update(pod *v1.Pod, podCondition *v1.PodCondition) error +} + +// Config is an implementation of the Scheduler's configured input data. +// TODO over time we should make this struct a hidden implementation detail of the scheduler. +type Config struct { + // It is expected that changes made via SchedulerCache will be observed + // by NodeLister and Algorithm. + SchedulerCache schedulerinternalcache.Cache + // Ecache is used for optimistically invalid affected cache items after + // successfully binding a pod + Ecache *equivalence.Cache + NodeLister algorithm.NodeLister + Algorithm algorithm.ScheduleAlgorithm + GetBinder func(pod *v1.Pod) Binder + // PodConditionUpdater is used only in case of scheduling errors. If we succeed + // with scheduling, PodScheduled condition will be updated in apiserver in /bind + // handler so that binding and setting PodCondition it is atomic. + PodConditionUpdater PodConditionUpdater + // PodPreemptor is used to evict pods and update pod annotations. + PodPreemptor PodPreemptor + + // NextPod should be a function that blocks until the next pod + // is available. We don't use a channel for this, because scheduling + // a pod may take some amount of time and we don't want pods to get + // stale while they sit in a channel. + NextPod func() *v1.Pod + + // WaitForCacheSync waits for scheduler cache to populate. + // It returns true if it was successful, false if the controller should shutdown. + WaitForCacheSync func() bool + + // Error is called if there is an error. It is passed the pod in + // question, and the error + Error func(*v1.Pod, error) + + // Recorder is the EventRecorder to use + Recorder record.EventRecorder + + // Close this to shut down the scheduler. + StopEverything chan struct{} + + // VolumeBinder handles PVC/PV binding for the pod. + VolumeBinder *volumebinder.VolumeBinder + + // Disable pod preemption or not. + DisablePreemption bool +} + +// PodPreemptor has methods needed to delete a pod and to update +// annotations of the preemptor pod. +type PodPreemptor interface { + GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) + DeletePod(pod *v1.Pod) error + SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error + RemoveNominatedNodeName(pod *v1.Pod) error +} + +// Configurator defines I/O, caching, and other functionality needed to +// construct a new scheduler. An implementation of this can be seen in +// factory.go. +type Configurator interface { + // Exposed for testing + GetHardPodAffinitySymmetricWeight() int32 + // Exposed for testing + MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) + + // Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler + GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) + GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) + + // Needs to be exposed for things like integration tests where we want to make fake nodes. + GetNodeLister() corelisters.NodeLister + // Exposed for testing + GetClient() clientset.Interface + // Exposed for testing + GetScheduledPodLister() corelisters.PodLister + + Create() (*Config, error) + CreateFromProvider(providerName string) (*Config, error) + CreateFromConfig(policy schedulerapi.Policy) (*Config, error) + CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) +} + // configFactory is the default implementation of the scheduler.Configurator interface. type configFactory struct { client clientset.Interface @@ -164,7 +256,7 @@ type ConfigFactoryArgs struct { // NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only // return the interface. -func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator { +func NewConfigFactory(args *ConfigFactoryArgs) Configurator { stopEverything := make(chan struct{}) schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything) @@ -992,12 +1084,12 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) { } // Create creates a scheduler with the default algorithm provider. -func (c *configFactory) Create() (*scheduler.Config, error) { +func (c *configFactory) Create() (*Config, error) { return c.CreateFromProvider(DefaultProvider) } // Creates a scheduler from the name of a registered algorithm provider. -func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) { +func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) { glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName) provider, err := GetAlgorithmProvider(providerName) if err != nil { @@ -1008,7 +1100,7 @@ func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Conf } // Creates a scheduler from the configuration file -func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) { +func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) { glog.V(2).Infof("Creating scheduler from configuration: %v", policy) // validate the policy configuration @@ -1079,7 +1171,7 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler } // getBinderFunc returns an func which returns an extender that supports bind or a default binder based on the given pod. -func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) scheduler.Binder { +func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder { var extenderBinder algorithm.SchedulerExtender for i := range extenders { if extenders[i].IsBinder() { @@ -1088,7 +1180,7 @@ func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) f } } defaultBinder := &binder{c.client} - return func(pod *v1.Pod) scheduler.Binder { + return func(pod *v1.Pod) Binder { if extenderBinder != nil && extenderBinder.IsInterested(pod) { return extenderBinder } @@ -1097,7 +1189,7 @@ func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) f } // Creates a scheduler from a set of registered fit predicate keys and priority keys. -func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { +func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) { glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys) if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 { @@ -1148,7 +1240,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, ) podBackoff := util.CreateDefaultPodBackoff() - return &scheduler.Config{ + return &Config{ SchedulerCache: c.schedulerCache, Ecache: c.equivalencePodCache, // The scheduler only needs to consider schedulable nodes. diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index 6500db194e1..e14c89c261e 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -36,7 +36,6 @@ import ( "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" apitesting "k8s.io/kubernetes/pkg/api/testing" - "k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" @@ -540,7 +539,7 @@ func TestSkipPodUpdate(t *testing.T) { } } -func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) scheduler.Configurator { +func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) Configurator { informerFactory := informers.NewSharedInformerFactory(client, 0) return NewConfigFactory(&ConfigFactoryArgs{ v1.DefaultSchedulerName, diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 51250217e39..7074768a466 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -17,55 +17,46 @@ limitations under the License. package scheduler import ( + "fmt" + "io/ioutil" + "os" "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" + appsinformers "k8s.io/client-go/informers/apps/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + policyinformers "k8s.io/client-go/informers/policy/v1beta1" + storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" + kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/core" - "k8s.io/kubernetes/pkg/scheduler/core/equivalence" + "k8s.io/kubernetes/pkg/scheduler/factory" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" - internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" - "k8s.io/kubernetes/pkg/scheduler/volumebinder" "github.com/golang/glog" ) -// Binder knows how to write a binding. -type Binder interface { - Bind(binding *v1.Binding) error -} - -// PodConditionUpdater updates the condition of a pod based on the passed -// PodCondition -type PodConditionUpdater interface { - Update(pod *v1.Pod, podCondition *v1.PodCondition) error -} - -// PodPreemptor has methods needed to delete a pod and to update -// annotations of the preemptor pod. -type PodPreemptor interface { - GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) - DeletePod(pod *v1.Pod) error - SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error - RemoveNominatedNodeName(pod *v1.Pod) error -} +const ( + // BindTimeoutSeconds defines the default bind timeout + BindTimeoutSeconds = 100 +) // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { - config *Config + config *factory.Config } // StopEverything closes the scheduler config's StopEverything channel, to shut @@ -79,81 +70,175 @@ func (sched *Scheduler) Cache() schedulerinternalcache.Cache { return sched.config.SchedulerCache } -// Configurator defines I/O, caching, and other functionality needed to -// construct a new scheduler. An implementation of this can be seen in -// factory.go. -type Configurator interface { - // Exposed for testing - GetHardPodAffinitySymmetricWeight() int32 - // Exposed for testing - MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) - - // Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler - GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) - GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) - - // Needs to be exposed for things like integration tests where we want to make fake nodes. - GetNodeLister() corelisters.NodeLister - // Exposed for testing - GetClient() clientset.Interface - // Exposed for testing - GetScheduledPodLister() corelisters.PodLister - - Create() (*Config, error) - CreateFromProvider(providerName string) (*Config, error) - CreateFromConfig(policy schedulerapi.Policy) (*Config, error) - CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) +type schedulerOptions struct { + schedulerName string + hardPodAffinitySymmetricWeight int32 + enableEquivalenceClassCache bool + disablePreemption bool + percentageOfNodesToScore int32 + bindTimeoutSeconds int64 } -// Config is an implementation of the Scheduler's configured input data. -// TODO over time we should make this struct a hidden implementation detail of the scheduler. -type Config struct { - // It is expected that changes made via SchedulerCache will be observed - // by NodeLister and Algorithm. - SchedulerCache schedulerinternalcache.Cache - // Ecache is used for optimistically invalid affected cache items after - // successfully binding a pod - Ecache *equivalence.Cache - NodeLister algorithm.NodeLister - Algorithm algorithm.ScheduleAlgorithm - GetBinder func(pod *v1.Pod) Binder - // PodConditionUpdater is used only in case of scheduling errors. If we succeed - // with scheduling, PodScheduled condition will be updated in apiserver in /bind - // handler so that binding and setting PodCondition it is atomic. - PodConditionUpdater PodConditionUpdater - // PodPreemptor is used to evict pods and update pod annotations. - PodPreemptor PodPreemptor +// Option configures a Scheduler +type Option func(*schedulerOptions) - // NextPod should be a function that blocks until the next pod - // is available. We don't use a channel for this, because scheduling - // a pod may take some amount of time and we don't want pods to get - // stale while they sit in a channel. - NextPod func() *v1.Pod +// WithName sets schedulerName for Scheduler, the default schedulerName is default-scheduler +func WithName(schedulerName string) Option { + return func(o *schedulerOptions) { + o.schedulerName = schedulerName + } +} - // WaitForCacheSync waits for scheduler cache to populate. - // It returns true if it was successful, false if the controller should shutdown. - WaitForCacheSync func() bool +// WithHardPodAffinitySymmetricWeight sets hardPodAffinitySymmetricWeight for Scheduler, the default value is 1 +func WithHardPodAffinitySymmetricWeight(hardPodAffinitySymmetricWeight int32) Option { + return func(o *schedulerOptions) { + o.hardPodAffinitySymmetricWeight = hardPodAffinitySymmetricWeight + } +} - // Error is called if there is an error. It is passed the pod in - // question, and the error - Error func(*v1.Pod, error) +// WithEquivalenceClassCacheEnabled sets enableEquivalenceClassCache for Scheduler, the default value is false +func WithEquivalenceClassCacheEnabled(enableEquivalenceClassCache bool) Option { + return func(o *schedulerOptions) { + o.enableEquivalenceClassCache = enableEquivalenceClassCache + } +} - // Recorder is the EventRecorder to use - Recorder record.EventRecorder +// WithPreemptionDisabled sets disablePreemption for Scheduler, the default value is false +func WithPreemptionDisabled(disablePreemption bool) Option { + return func(o *schedulerOptions) { + o.disablePreemption = disablePreemption + } +} - // Close this to shut down the scheduler. - StopEverything chan struct{} +// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler, the default value is 50 +func WithPercentageOfNodesToScore(percentageOfNodesToScore int32) Option { + return func(o *schedulerOptions) { + o.percentageOfNodesToScore = percentageOfNodesToScore + } +} - // VolumeBinder handles PVC/PV binding for the pod. - VolumeBinder *volumebinder.VolumeBinder +// WithBindTimeoutSeconds sets bindTimeoutSeconds for Scheduler, the default value is 100 +func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option { + return func(o *schedulerOptions) { + o.bindTimeoutSeconds = bindTimeoutSeconds + } +} - // Disable pod preemption or not. - DisablePreemption bool +var defaultSchedulerOptions = schedulerOptions{ + schedulerName: v1.DefaultSchedulerName, + hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceClassCache: false, + disablePreemption: false, + percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, + bindTimeoutSeconds: BindTimeoutSeconds, +} + +// New returns a Scheduler +// TODO:Once we have the nice constructor,we should modify cmd/kube-scheduler to use it. +func New(client clientset.Interface, + nodeInformer coreinformers.NodeInformer, + podInformer coreinformers.PodInformer, + pvInformer coreinformers.PersistentVolumeInformer, + pvcInformer coreinformers.PersistentVolumeClaimInformer, + replicationControllerInformer coreinformers.ReplicationControllerInformer, + replicaSetInformer appsinformers.ReplicaSetInformer, + statefulSetInformer appsinformers.StatefulSetInformer, + serviceInformer coreinformers.ServiceInformer, + pdbInformer policyinformers.PodDisruptionBudgetInformer, + storageClassInformer storageinformers.StorageClassInformer, + recorder record.EventRecorder, + schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource, + opts ...func(o *schedulerOptions)) (*Scheduler, error) { + + options := defaultSchedulerOptions + for _, opt := range opts { + opt(&options) + } + + // Set up the configurator which can create schedulers from configs. + configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{ + SchedulerName: options.schedulerName, + Client: client, + NodeInformer: nodeInformer, + PodInformer: podInformer, + PvInformer: pvInformer, + PvcInformer: pvcInformer, + ReplicationControllerInformer: replicationControllerInformer, + ReplicaSetInformer: replicaSetInformer, + StatefulSetInformer: statefulSetInformer, + ServiceInformer: serviceInformer, + PdbInformer: pdbInformer, + StorageClassInformer: storageClassInformer, + HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight, + EnableEquivalenceClassCache: options.enableEquivalenceClassCache, + DisablePreemption: options.disablePreemption, + PercentageOfNodesToScore: options.percentageOfNodesToScore, + BindTimeoutSeconds: options.bindTimeoutSeconds, + }) + var config *factory.Config + source := schedulerAlgorithmSource + switch { + case source.Provider != nil: + // Create the config from a named algorithm provider. + sc, err := configurator.CreateFromProvider(*source.Provider) + if err != nil { + return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err) + } + config = sc + case source.Policy != nil: + // Create the config from a user specified policy source. + policy := &schedulerapi.Policy{} + switch { + case source.Policy.File != nil: + // Use a policy serialized in a file. + policyFile := source.Policy.File.Path + _, err := os.Stat(policyFile) + if err != nil { + return nil, fmt.Errorf("missing policy config file %s", policyFile) + } + data, err := ioutil.ReadFile(policyFile) + if err != nil { + return nil, fmt.Errorf("couldn't read policy config: %v", err) + } + err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy) + if err != nil { + return nil, fmt.Errorf("invalid policy: %v", err) + } + case source.Policy.ConfigMap != nil: + // Use a policy serialized in a config map value. + policyRef := source.Policy.ConfigMap + policyConfigMap, err := client.CoreV1().ConfigMaps(policyRef.Namespace).Get(policyRef.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err) + } + data, found := policyConfigMap.Data[kubeschedulerconfig.SchedulerPolicyConfigMapKey] + if !found { + return nil, fmt.Errorf("missing policy config map value at key %q", kubeschedulerconfig.SchedulerPolicyConfigMapKey) + } + err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy) + if err != nil { + return nil, fmt.Errorf("invalid policy: %v", err) + } + } + sc, err := configurator.CreateFromConfig(*policy) + if err != nil { + return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) + } + config = sc + default: + return nil, fmt.Errorf("unsupported algorithm source: %v", source) + } + // Additional tweaks to the config produced by the configurator. + config.Recorder = recorder + config.DisablePreemption = options.disablePreemption + // Create the scheduler. + sched := NewFromConfig(config) + return sched, nil } // NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented. // Supports intermediate Config mutation for now if you provide modifier functions which will run after Config is created. -func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Scheduler, error) { +func NewFromConfigurator(c factory.Configurator, modifiers ...func(c *factory.Config)) (*Scheduler, error) { cfg, err := c.Create() if err != nil { return nil, err @@ -171,7 +256,7 @@ func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Schedul } // NewFromConfig returns a new scheduler using the provided Config. -func NewFromConfig(config *Config) *Scheduler { +func NewFromConfig(config *factory.Config) *Scheduler { metrics.Register() return &Scheduler{ config: config, @@ -188,7 +273,7 @@ func (sched *Scheduler) Run() { } // Config returns scheduler's config pointer. It is exposed for testing purposes. -func (sched *Scheduler) Config() *Config { +func (sched *Scheduler) Config() *factory.Config { return sched.config } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index e7865dfa5a7..7960e5a39c9 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/core" + "k8s.io/kubernetes/pkg/scheduler/factory" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" @@ -200,7 +201,7 @@ func TestScheduler(t *testing.T) { var gotAssumedPod *v1.Pod var gotBinding *v1.Binding configurator := &FakeConfigurator{ - Config: &Config{ + Config: &factory.Config{ SchedulerCache: &fakecache.Cache{ ForgetFunc: func(pod *v1.Pod) { gotForgetPod = pod @@ -213,7 +214,7 @@ func TestScheduler(t *testing.T) { []*v1.Node{&testNode}, ), Algorithm: item.algo, - GetBinder: func(pod *v1.Pod) Binder { + GetBinder: func(pod *v1.Pod) factory.Binder { return fakeBinder{func(b *v1.Binding) error { gotBinding = b return item.injectBindError @@ -569,11 +570,11 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern bindingChan := make(chan *v1.Binding, 1) errChan := make(chan error, 1) configurator := &FakeConfigurator{ - Config: &Config{ + Config: &factory.Config{ SchedulerCache: scache, NodeLister: nodeLister, Algorithm: algo, - GetBinder: func(pod *v1.Pod) Binder { + GetBinder: func(pod *v1.Pod) factory.Binder { return fakeBinder{func(b *v1.Binding) error { bindingChan <- b return nil @@ -619,11 +620,11 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc api.DefaultPercentageOfNodesToScore) bindingChan := make(chan *v1.Binding, 2) configurator := &FakeConfigurator{ - Config: &Config{ + Config: &factory.Config{ SchedulerCache: scache, NodeLister: nodeLister, Algorithm: algo, - GetBinder: func(pod *v1.Pod) Binder { + GetBinder: func(pod *v1.Pod) factory.Binder { return fakeBinder{func(b *v1.Binding) error { time.Sleep(bindingTime) bindingChan <- b diff --git a/pkg/scheduler/testutil.go b/pkg/scheduler/testutil.go index 5469263b321..0c101a54573 100644 --- a/pkg/scheduler/testutil.go +++ b/pkg/scheduler/testutil.go @@ -25,13 +25,14 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + "k8s.io/kubernetes/pkg/scheduler/factory" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/util" ) // FakeConfigurator is an implementation for test. type FakeConfigurator struct { - Config *Config + Config *factory.Config } // GetPredicateMetadataProducer is not implemented yet. @@ -70,21 +71,21 @@ func (fc *FakeConfigurator) GetScheduledPodLister() corelisters.PodLister { } // Create returns FakeConfigurator.Config -func (fc *FakeConfigurator) Create() (*Config, error) { +func (fc *FakeConfigurator) Create() (*factory.Config, error) { return fc.Config, nil } // CreateFromProvider returns FakeConfigurator.Config -func (fc *FakeConfigurator) CreateFromProvider(providerName string) (*Config, error) { +func (fc *FakeConfigurator) CreateFromProvider(providerName string) (*factory.Config, error) { return fc.Config, nil } // CreateFromConfig returns FakeConfigurator.Config -func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) { +func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*factory.Config, error) { return fc.Config, nil } // CreateFromKeys returns FakeConfigurator.Config -func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) { +func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*factory.Config, error) { return fc.Config, nil } diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 2ed1d5c45c1..ddc624a050f 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -62,8 +62,8 @@ type TestContext struct { ns *v1.Namespace clientSet *clientset.Clientset informerFactory informers.SharedInformerFactory - schedulerConfigFactory scheduler.Configurator - schedulerConfig *scheduler.Config + schedulerConfigFactory factory.Configurator + schedulerConfig *factory.Config scheduler *scheduler.Scheduler } @@ -73,7 +73,7 @@ func createConfiguratorWithPodInformer( clientSet clientset.Interface, podInformer coreinformers.PodInformer, informerFactory informers.SharedInformerFactory, -) scheduler.Configurator { +) factory.Configurator { return factory.NewConfigFactory(&factory.ConfigFactoryArgs{ SchedulerName: schedulerName, Client: clientSet, diff --git a/test/integration/scheduler_perf/BUILD b/test/integration/scheduler_perf/BUILD index 9cf4f3cd674..ab2aeeb0eaa 100644 --- a/test/integration/scheduler_perf/BUILD +++ b/test/integration/scheduler_perf/BUILD @@ -14,8 +14,8 @@ go_library( ], importpath = "k8s.io/kubernetes/test/integration/scheduler_perf", deps = [ - "//pkg/scheduler:go_default_library", "//pkg/scheduler/algorithmprovider:go_default_library", + "//pkg/scheduler/factory:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", @@ -35,7 +35,7 @@ go_test( tags = ["integration"], deps = [ "//pkg/kubelet/apis:go_default_library", - "//pkg/scheduler:go_default_library", + "//pkg/scheduler/factory:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/test/integration/scheduler_perf/scheduler_test.go b/test/integration/scheduler_perf/scheduler_test.go index c6ecc2e751c..e8b232b8e2d 100644 --- a/test/integration/scheduler_perf/scheduler_test.go +++ b/test/integration/scheduler_perf/scheduler_test.go @@ -23,7 +23,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/kubernetes/pkg/scheduler" + "k8s.io/kubernetes/pkg/scheduler/factory" testutils "k8s.io/kubernetes/test/utils" "math" "strconv" @@ -105,7 +105,7 @@ type testConfig struct { numNodes int mutatedNodeTemplate *v1.Node mutatedPodTemplate *v1.Pod - schedulerSupportFunctions scheduler.Configurator + schedulerSupportFunctions factory.Configurator destroyFunc func() } diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 13f71a6c1b8..9991da31d72 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -20,8 +20,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" - "k8s.io/kubernetes/pkg/scheduler" _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" + "k8s.io/kubernetes/pkg/scheduler/factory" "k8s.io/kubernetes/test/integration/util" ) @@ -32,7 +32,7 @@ import ( // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler() (scheduler.Configurator, util.ShutdownFunc) { +func mustSetupScheduler() (factory.Configurator, util.ShutdownFunc) { apiURL, apiShutdown := util.StartApiserver() clientSet := clientset.NewForConfigOrDie(&restclient.Config{ Host: apiURL, diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 53e27757fcf..fa39562d513 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -59,7 +59,7 @@ func StartApiserver() (string, ShutdownFunc) { // StartScheduler configures and starts a scheduler given a handle to the clientSet interface // and event broadcaster. It returns a handle to the configurator for the running scheduler // and the shutdown function to stop it. -func StartScheduler(clientSet clientset.Interface) (scheduler.Configurator, ShutdownFunc) { +func StartScheduler(clientSet clientset.Interface) (factory.Configurator, ShutdownFunc) { informerFactory := informers.NewSharedInformerFactory(clientSet, 0) evtBroadcaster := record.NewBroadcaster() @@ -68,7 +68,7 @@ func StartScheduler(clientSet clientset.Interface) (scheduler.Configurator, Shut schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory) - sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *scheduler.Config) { + sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *factory.Config) { conf.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}) }) if err != nil { @@ -94,7 +94,7 @@ func StartScheduler(clientSet clientset.Interface) (scheduler.Configurator, Shut func createSchedulerConfigurator( clientSet clientset.Interface, informerFactory informers.SharedInformerFactory, -) scheduler.Configurator { +) factory.Configurator { // Enable EnableEquivalenceClassCache for all integration tests. utilfeature.DefaultFeatureGate.Set("EnableEquivalenceClassCache=true")