create a new scheduler constructor

This commit is contained in:
wangqingcan 2018-10-04 07:44:44 +08:00
parent d3fe0ea7ff
commit a74fd15e62
13 changed files with 328 additions and 134 deletions

View File

@ -141,14 +141,24 @@ func run(cmd *cobra.Command, args []string, opts *options.Options) error {
return fmt.Errorf("unable to register configz: %s", err)
}
// Build a scheduler config from the provided algorithm source.
schedulerConfig, err := NewSchedulerConfig(cc)
if err != nil {
return err
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = c.InformerFactory.Storage().V1().StorageClasses()
}
// Create the scheduler.
sched := scheduler.NewFromConfig(schedulerConfig)
sched, err := scheduler.New(c.Client, c.InformerFactory.Core().V1().Nodes(), c.PodInformer,
c.InformerFactory.Core().V1().PersistentVolumes(), c.InformerFactory.Core().V1().PersistentVolumeClaims(),
c.InformerFactory.Core().V1().ReplicationControllers(), c.InformerFactory.Apps().V1().ReplicaSets(),
c.InformerFactory.Apps().V1().StatefulSets(), c.InformerFactory.Core().V1().Services(),
c.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), storageClassInformer, c.Recorder, c.ComponentConfig.AlgorithmSource,
scheduler.WithName(c.ComponentConfig.SchedulerName), scheduler.WithHardPodAffinitySymmetricWeight(c.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(c.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(c.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(c.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*c.ComponentConfig.BindTimeoutSeconds))
if err != nil {
return err
}
// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {
@ -284,7 +294,7 @@ func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, s
}
// NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests.
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*factory.Config, error) {
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses()
@ -312,7 +322,7 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Con
})
source := s.ComponentConfig.AlgorithmSource
var config *scheduler.Config
var config *factory.Config
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.

View File

@ -13,18 +13,24 @@ go_library(
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/latest:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
@ -43,6 +49,7 @@ go_test(
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/testing:go_default_library",

View File

@ -16,7 +16,6 @@ go_library(
"//pkg/apis/core/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
@ -50,6 +49,7 @@ go_library(
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
@ -64,7 +64,6 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/api/testing:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/api:go_default_library",

View File

@ -48,11 +48,11 @@ import (
policylisters "k8s.io/client-go/listers/policy/v1beta1"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
@ -78,6 +78,98 @@ var (
maxPDVolumeCountPredicateKeys = []string{predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, predicates.MaxEBSVolumeCountPred}
)
// Binder knows how to write a binding.
type Binder interface {
Bind(binding *v1.Binding) error
}
// PodConditionUpdater updates the condition of a pod based on the passed
// PodCondition
type PodConditionUpdater interface {
Update(pod *v1.Pod, podCondition *v1.PodCondition) error
}
// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *equivalence.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *v1.Pod
// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)
// Recorder is the EventRecorder to use
Recorder record.EventRecorder
// Close this to shut down the scheduler.
StopEverything chan struct{}
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder
// Disable pod preemption or not.
DisablePreemption bool
}
// PodPreemptor has methods needed to delete a pod and to update
// annotations of the preemptor pod.
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}
// Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler. An implementation of this can be seen in
// factory.go.
type Configurator interface {
// Exposed for testing
GetHardPodAffinitySymmetricWeight() int32
// Exposed for testing
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error)
// Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error)
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister
// Exposed for testing
GetClient() clientset.Interface
// Exposed for testing
GetScheduledPodLister() corelisters.PodLister
Create() (*Config, error)
CreateFromProvider(providerName string) (*Config, error)
CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
}
// configFactory is the default implementation of the scheduler.Configurator interface.
type configFactory struct {
client clientset.Interface
@ -164,7 +256,7 @@ type ConfigFactoryArgs struct {
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := make(chan struct{})
schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
@ -992,12 +1084,12 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
}
// Create creates a scheduler with the default algorithm provider.
func (c *configFactory) Create() (*scheduler.Config, error) {
func (c *configFactory) Create() (*Config, error) {
return c.CreateFromProvider(DefaultProvider)
}
// Creates a scheduler from the name of a registered algorithm provider.
func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
@ -1008,7 +1100,7 @@ func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Conf
}
// Creates a scheduler from the configuration file
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) {
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
glog.V(2).Infof("Creating scheduler from configuration: %v", policy)
// validate the policy configuration
@ -1079,7 +1171,7 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
}
// getBinderFunc returns an func which returns an extender that supports bind or a default binder based on the given pod.
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) scheduler.Binder {
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
var extenderBinder algorithm.SchedulerExtender
for i := range extenders {
if extenders[i].IsBinder() {
@ -1088,7 +1180,7 @@ func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) f
}
}
defaultBinder := &binder{c.client}
return func(pod *v1.Pod) scheduler.Binder {
return func(pod *v1.Pod) Binder {
if extenderBinder != nil && extenderBinder.IsInterested(pod) {
return extenderBinder
}
@ -1097,7 +1189,7 @@ func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) f
}
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
@ -1148,7 +1240,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
)
podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
return &Config{
SchedulerCache: c.schedulerCache,
Ecache: c.equivalencePodCache,
// The scheduler only needs to consider schedulable nodes.

View File

@ -36,7 +36,6 @@ import (
"k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
@ -540,7 +539,7 @@ func TestSkipPodUpdate(t *testing.T) {
}
}
func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) scheduler.Configurator {
func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{
v1.DefaultSchedulerName,

View File

@ -17,55 +17,46 @@ limitations under the License.
package scheduler
import (
"fmt"
"io/ioutil"
"os"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
"k8s.io/kubernetes/pkg/scheduler/factory"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
"github.com/golang/glog"
)
// Binder knows how to write a binding.
type Binder interface {
Bind(binding *v1.Binding) error
}
// PodConditionUpdater updates the condition of a pod based on the passed
// PodCondition
type PodConditionUpdater interface {
Update(pod *v1.Pod, podCondition *v1.PodCondition) error
}
// PodPreemptor has methods needed to delete a pod and to update
// annotations of the preemptor pod.
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}
const (
// BindTimeoutSeconds defines the default bind timeout
BindTimeoutSeconds = 100
)
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
config *Config
config *factory.Config
}
// StopEverything closes the scheduler config's StopEverything channel, to shut
@ -79,81 +70,175 @@ func (sched *Scheduler) Cache() schedulerinternalcache.Cache {
return sched.config.SchedulerCache
}
// Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler. An implementation of this can be seen in
// factory.go.
type Configurator interface {
// Exposed for testing
GetHardPodAffinitySymmetricWeight() int32
// Exposed for testing
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error)
// Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error)
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister
// Exposed for testing
GetClient() clientset.Interface
// Exposed for testing
GetScheduledPodLister() corelisters.PodLister
Create() (*Config, error)
CreateFromProvider(providerName string) (*Config, error)
CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
type schedulerOptions struct {
schedulerName string
hardPodAffinitySymmetricWeight int32
enableEquivalenceClassCache bool
disablePreemption bool
percentageOfNodesToScore int32
bindTimeoutSeconds int64
}
// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *equivalence.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor
// Option configures a Scheduler
type Option func(*schedulerOptions)
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *v1.Pod
// WithName sets schedulerName for Scheduler, the default schedulerName is default-scheduler
func WithName(schedulerName string) Option {
return func(o *schedulerOptions) {
o.schedulerName = schedulerName
}
}
// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool
// WithHardPodAffinitySymmetricWeight sets hardPodAffinitySymmetricWeight for Scheduler, the default value is 1
func WithHardPodAffinitySymmetricWeight(hardPodAffinitySymmetricWeight int32) Option {
return func(o *schedulerOptions) {
o.hardPodAffinitySymmetricWeight = hardPodAffinitySymmetricWeight
}
}
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)
// WithEquivalenceClassCacheEnabled sets enableEquivalenceClassCache for Scheduler, the default value is false
func WithEquivalenceClassCacheEnabled(enableEquivalenceClassCache bool) Option {
return func(o *schedulerOptions) {
o.enableEquivalenceClassCache = enableEquivalenceClassCache
}
}
// Recorder is the EventRecorder to use
Recorder record.EventRecorder
// WithPreemptionDisabled sets disablePreemption for Scheduler, the default value is false
func WithPreemptionDisabled(disablePreemption bool) Option {
return func(o *schedulerOptions) {
o.disablePreemption = disablePreemption
}
}
// Close this to shut down the scheduler.
StopEverything chan struct{}
// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler, the default value is 50
func WithPercentageOfNodesToScore(percentageOfNodesToScore int32) Option {
return func(o *schedulerOptions) {
o.percentageOfNodesToScore = percentageOfNodesToScore
}
}
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder
// WithBindTimeoutSeconds sets bindTimeoutSeconds for Scheduler, the default value is 100
func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option {
return func(o *schedulerOptions) {
o.bindTimeoutSeconds = bindTimeoutSeconds
}
}
// Disable pod preemption or not.
DisablePreemption bool
var defaultSchedulerOptions = schedulerOptions{
schedulerName: v1.DefaultSchedulerName,
hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: false,
disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds,
}
// New returns a Scheduler
// TODOOnce we have the nice constructorwe should modify cmd/kube-scheduler to use it.
func New(client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformers.StorageClassInformer,
recorder record.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: options.schedulerName,
Client: client,
NodeInformer: nodeInformer,
PodInformer: podInformer,
PvInformer: pvInformer,
PvcInformer: pvcInformer,
ReplicationControllerInformer: replicationControllerInformer,
ReplicaSetInformer: replicaSetInformer,
StatefulSetInformer: statefulSetInformer,
ServiceInformer: serviceInformer,
PdbInformer: pdbInformer,
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: options.enableEquivalenceClassCache,
DisablePreemption: options.disablePreemption,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
})
var config *factory.Config
source := schedulerAlgorithmSource
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.
sc, err := configurator.CreateFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
config = sc
case source.Policy != nil:
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
// Use a policy serialized in a file.
policyFile := source.Policy.File.Path
_, err := os.Stat(policyFile)
if err != nil {
return nil, fmt.Errorf("missing policy config file %s", policyFile)
}
data, err := ioutil.ReadFile(policyFile)
if err != nil {
return nil, fmt.Errorf("couldn't read policy config: %v", err)
}
err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
if err != nil {
return nil, fmt.Errorf("invalid policy: %v", err)
}
case source.Policy.ConfigMap != nil:
// Use a policy serialized in a config map value.
policyRef := source.Policy.ConfigMap
policyConfigMap, err := client.CoreV1().ConfigMaps(policyRef.Namespace).Get(policyRef.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err)
}
data, found := policyConfigMap.Data[kubeschedulerconfig.SchedulerPolicyConfigMapKey]
if !found {
return nil, fmt.Errorf("missing policy config map value at key %q", kubeschedulerconfig.SchedulerPolicyConfigMapKey)
}
err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
if err != nil {
return nil, fmt.Errorf("invalid policy: %v", err)
}
}
sc, err := configurator.CreateFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
config = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
// Additional tweaks to the config produced by the configurator.
config.Recorder = recorder
config.DisablePreemption = options.disablePreemption
// Create the scheduler.
sched := NewFromConfig(config)
return sched, nil
}
// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented.
// Supports intermediate Config mutation for now if you provide modifier functions which will run after Config is created.
func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Scheduler, error) {
func NewFromConfigurator(c factory.Configurator, modifiers ...func(c *factory.Config)) (*Scheduler, error) {
cfg, err := c.Create()
if err != nil {
return nil, err
@ -171,7 +256,7 @@ func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Schedul
}
// NewFromConfig returns a new scheduler using the provided Config.
func NewFromConfig(config *Config) *Scheduler {
func NewFromConfig(config *factory.Config) *Scheduler {
metrics.Register()
return &Scheduler{
config: config,
@ -188,7 +273,7 @@ func (sched *Scheduler) Run() {
}
// Config returns scheduler's config pointer. It is exposed for testing purposes.
func (sched *Scheduler) Config() *Config {
func (sched *Scheduler) Config() *factory.Config {
return sched.config
}

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/factory"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
@ -200,7 +201,7 @@ func TestScheduler(t *testing.T) {
var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding
configurator := &FakeConfigurator{
Config: &Config{
Config: &factory.Config{
SchedulerCache: &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) {
gotForgetPod = pod
@ -213,7 +214,7 @@ func TestScheduler(t *testing.T) {
[]*v1.Node{&testNode},
),
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) Binder {
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
gotBinding = b
return item.injectBindError
@ -569,11 +570,11 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1)
configurator := &FakeConfigurator{
Config: &Config{
Config: &factory.Config{
SchedulerCache: scache,
NodeLister: nodeLister,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) Binder {
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
bindingChan <- b
return nil
@ -619,11 +620,11 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
api.DefaultPercentageOfNodesToScore)
bindingChan := make(chan *v1.Binding, 2)
configurator := &FakeConfigurator{
Config: &Config{
Config: &factory.Config{
SchedulerCache: scache,
NodeLister: nodeLister,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) Binder {
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
time.Sleep(bindingTime)
bindingChan <- b

View File

@ -25,13 +25,14 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/util"
)
// FakeConfigurator is an implementation for test.
type FakeConfigurator struct {
Config *Config
Config *factory.Config
}
// GetPredicateMetadataProducer is not implemented yet.
@ -70,21 +71,21 @@ func (fc *FakeConfigurator) GetScheduledPodLister() corelisters.PodLister {
}
// Create returns FakeConfigurator.Config
func (fc *FakeConfigurator) Create() (*Config, error) {
func (fc *FakeConfigurator) Create() (*factory.Config, error) {
return fc.Config, nil
}
// CreateFromProvider returns FakeConfigurator.Config
func (fc *FakeConfigurator) CreateFromProvider(providerName string) (*Config, error) {
func (fc *FakeConfigurator) CreateFromProvider(providerName string) (*factory.Config, error) {
return fc.Config, nil
}
// CreateFromConfig returns FakeConfigurator.Config
func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*factory.Config, error) {
return fc.Config, nil
}
// CreateFromKeys returns FakeConfigurator.Config
func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*factory.Config, error) {
return fc.Config, nil
}

View File

@ -62,8 +62,8 @@ type TestContext struct {
ns *v1.Namespace
clientSet *clientset.Clientset
informerFactory informers.SharedInformerFactory
schedulerConfigFactory scheduler.Configurator
schedulerConfig *scheduler.Config
schedulerConfigFactory factory.Configurator
schedulerConfig *factory.Config
scheduler *scheduler.Scheduler
}
@ -73,7 +73,7 @@ func createConfiguratorWithPodInformer(
clientSet clientset.Interface,
podInformer coreinformers.PodInformer,
informerFactory informers.SharedInformerFactory,
) scheduler.Configurator {
) factory.Configurator {
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: schedulerName,
Client: clientSet,

View File

@ -14,8 +14,8 @@ go_library(
],
importpath = "k8s.io/kubernetes/test/integration/scheduler_perf",
deps = [
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
@ -35,7 +35,7 @@ go_test(
tags = ["integration"],
deps = [
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/factory"
testutils "k8s.io/kubernetes/test/utils"
"math"
"strconv"
@ -105,7 +105,7 @@ type testConfig struct {
numNodes int
mutatedNodeTemplate *v1.Node
mutatedPodTemplate *v1.Pod
schedulerSupportFunctions scheduler.Configurator
schedulerSupportFunctions factory.Configurator
destroyFunc func()
}

View File

@ -20,8 +20,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/scheduler"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/pkg/scheduler/factory"
"k8s.io/kubernetes/test/integration/util"
)
@ -32,7 +32,7 @@ import (
// remove resources after finished.
// Notes on rate limiter:
// - client rate limit is set to 5000.
func mustSetupScheduler() (scheduler.Configurator, util.ShutdownFunc) {
func mustSetupScheduler() (factory.Configurator, util.ShutdownFunc) {
apiURL, apiShutdown := util.StartApiserver()
clientSet := clientset.NewForConfigOrDie(&restclient.Config{
Host: apiURL,

View File

@ -59,7 +59,7 @@ func StartApiserver() (string, ShutdownFunc) {
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface
// and event broadcaster. It returns a handle to the configurator for the running scheduler
// and the shutdown function to stop it.
func StartScheduler(clientSet clientset.Interface) (scheduler.Configurator, ShutdownFunc) {
func StartScheduler(clientSet clientset.Interface) (factory.Configurator, ShutdownFunc) {
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
evtBroadcaster := record.NewBroadcaster()
@ -68,7 +68,7 @@ func StartScheduler(clientSet clientset.Interface) (scheduler.Configurator, Shut
schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory)
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *scheduler.Config) {
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *factory.Config) {
conf.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
})
if err != nil {
@ -94,7 +94,7 @@ func StartScheduler(clientSet clientset.Interface) (scheduler.Configurator, Shut
func createSchedulerConfigurator(
clientSet clientset.Interface,
informerFactory informers.SharedInformerFactory,
) scheduler.Configurator {
) factory.Configurator {
// Enable EnableEquivalenceClassCache for all integration tests.
utilfeature.DefaultFeatureGate.Set("EnableEquivalenceClassCache=true")