Merge pull request #69057 from denkensk/create-a-new-scheduler-constructor

create-a-new-scheduler-constructor
This commit is contained in:
k8s-ci-robot 2018-10-11 13:45:02 -07:00 committed by GitHub
commit 94306c12f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 388 additions and 134 deletions

View File

@ -141,14 +141,34 @@ func run(cmd *cobra.Command, args []string, opts *options.Options) error {
return fmt.Errorf("unable to register configz: %s", err)
}
// Build a scheduler config from the provided algorithm source.
schedulerConfig, err := NewSchedulerConfig(cc)
if err != nil {
return err
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = c.InformerFactory.Storage().V1().StorageClasses()
}
// Create the scheduler.
sched := scheduler.NewFromConfig(schedulerConfig)
sched, err := scheduler.New(c.Client,
c.InformerFactory.Core().V1().Nodes(),
c.PodInformer,
c.InformerFactory.Core().V1().PersistentVolumes(),
c.InformerFactory.Core().V1().PersistentVolumeClaims(),
c.InformerFactory.Core().V1().ReplicationControllers(),
c.InformerFactory.Apps().V1().ReplicaSets(),
c.InformerFactory.Apps().V1().StatefulSets(),
c.InformerFactory.Core().V1().Services(),
c.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
storageClassInformer,
c.Recorder,
c.ComponentConfig.AlgorithmSource,
scheduler.WithName(c.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(c.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(c.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(c.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(c.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*c.ComponentConfig.BindTimeoutSeconds))
if err != nil {
return err
}
// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {
@ -284,7 +304,7 @@ func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, s
}
// NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests.
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*factory.Config, error) {
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses()
@ -312,7 +332,7 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Con
})
source := s.ComponentConfig.AlgorithmSource
var config *scheduler.Config
var config *factory.Config
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.

View File

@ -13,18 +13,24 @@ go_library(
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/latest:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
@ -42,7 +48,10 @@ go_test(
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/testing:go_default_library",
@ -53,8 +62,11 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
],

View File

@ -15,7 +15,6 @@ go_library(
"//pkg/apis/core/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
@ -49,6 +48,7 @@ go_library(
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
@ -62,7 +62,6 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/api/testing:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/api:go_default_library",

View File

@ -48,11 +48,11 @@ import (
policylisters "k8s.io/client-go/listers/policy/v1beta1"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
@ -79,6 +79,98 @@ var (
maxPDVolumeCountPredicateKeys = []string{predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, predicates.MaxEBSVolumeCountPred}
)
// Binder knows how to write a binding.
type Binder interface {
Bind(binding *v1.Binding) error
}
// PodConditionUpdater updates the condition of a pod based on the passed
// PodCondition
type PodConditionUpdater interface {
Update(pod *v1.Pod, podCondition *v1.PodCondition) error
}
// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *equivalence.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *v1.Pod
// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)
// Recorder is the EventRecorder to use
Recorder record.EventRecorder
// Close this to shut down the scheduler.
StopEverything chan struct{}
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder
// Disable pod preemption or not.
DisablePreemption bool
}
// PodPreemptor has methods needed to delete a pod and to update
// annotations of the preemptor pod.
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}
// Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler. An implementation of this can be seen in
// factory.go.
type Configurator interface {
// Exposed for testing
GetHardPodAffinitySymmetricWeight() int32
// Exposed for testing
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error)
// Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error)
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister
// Exposed for testing
GetClient() clientset.Interface
// Exposed for testing
GetScheduledPodLister() corelisters.PodLister
Create() (*Config, error)
CreateFromProvider(providerName string) (*Config, error)
CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
}
// configFactory is the default implementation of the scheduler.Configurator interface.
type configFactory struct {
client clientset.Interface
@ -165,7 +257,7 @@ type ConfigFactoryArgs struct {
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := make(chan struct{})
schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
@ -993,12 +1085,12 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
}
// Create creates a scheduler with the default algorithm provider.
func (c *configFactory) Create() (*scheduler.Config, error) {
func (c *configFactory) Create() (*Config, error) {
return c.CreateFromProvider(DefaultProvider)
}
// Creates a scheduler from the name of a registered algorithm provider.
func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
@ -1009,7 +1101,7 @@ func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Conf
}
// Creates a scheduler from the configuration file
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) {
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
glog.V(2).Infof("Creating scheduler from configuration: %v", policy)
// validate the policy configuration
@ -1080,7 +1172,7 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
}
// getBinderFunc returns an func which returns an extender that supports bind or a default binder based on the given pod.
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) scheduler.Binder {
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
var extenderBinder algorithm.SchedulerExtender
for i := range extenders {
if extenders[i].IsBinder() {
@ -1089,7 +1181,7 @@ func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) f
}
}
defaultBinder := &binder{c.client}
return func(pod *v1.Pod) scheduler.Binder {
return func(pod *v1.Pod) Binder {
if extenderBinder != nil && extenderBinder.IsInterested(pod) {
return extenderBinder
}
@ -1098,7 +1190,7 @@ func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) f
}
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
@ -1149,7 +1241,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
)
podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
return &Config{
SchedulerCache: c.schedulerCache,
Ecache: c.equivalencePodCache,
// The scheduler only needs to consider schedulable nodes.

View File

@ -36,7 +36,6 @@ import (
"k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
@ -540,7 +539,7 @@ func TestSkipPodUpdate(t *testing.T) {
}
}
func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) scheduler.Configurator {
func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{
v1.DefaultSchedulerName,

View File

@ -17,55 +17,46 @@ limitations under the License.
package scheduler
import (
"fmt"
"io/ioutil"
"os"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
"k8s.io/kubernetes/pkg/scheduler/factory"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
"github.com/golang/glog"
)
// Binder knows how to write a binding.
type Binder interface {
Bind(binding *v1.Binding) error
}
// PodConditionUpdater updates the condition of a pod based on the passed
// PodCondition
type PodConditionUpdater interface {
Update(pod *v1.Pod, podCondition *v1.PodCondition) error
}
// PodPreemptor has methods needed to delete a pod and to update
// annotations of the preemptor pod.
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}
const (
// BindTimeoutSeconds defines the default bind timeout
BindTimeoutSeconds = 100
)
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
config *Config
config *factory.Config
}
// StopEverything closes the scheduler config's StopEverything channel, to shut
@ -79,81 +70,174 @@ func (sched *Scheduler) Cache() schedulerinternalcache.Cache {
return sched.config.SchedulerCache
}
// Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler. An implementation of this can be seen in
// factory.go.
type Configurator interface {
// Exposed for testing
GetHardPodAffinitySymmetricWeight() int32
// Exposed for testing
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error)
// Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error)
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister
// Exposed for testing
GetClient() clientset.Interface
// Exposed for testing
GetScheduledPodLister() corelisters.PodLister
Create() (*Config, error)
CreateFromProvider(providerName string) (*Config, error)
CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
type schedulerOptions struct {
schedulerName string
hardPodAffinitySymmetricWeight int32
enableEquivalenceClassCache bool
disablePreemption bool
percentageOfNodesToScore int32
bindTimeoutSeconds int64
}
// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *equivalence.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor
// Option configures a Scheduler
type Option func(*schedulerOptions)
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *v1.Pod
// WithName sets schedulerName for Scheduler, the default schedulerName is default-scheduler
func WithName(schedulerName string) Option {
return func(o *schedulerOptions) {
o.schedulerName = schedulerName
}
}
// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool
// WithHardPodAffinitySymmetricWeight sets hardPodAffinitySymmetricWeight for Scheduler, the default value is 1
func WithHardPodAffinitySymmetricWeight(hardPodAffinitySymmetricWeight int32) Option {
return func(o *schedulerOptions) {
o.hardPodAffinitySymmetricWeight = hardPodAffinitySymmetricWeight
}
}
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)
// WithEquivalenceClassCacheEnabled sets enableEquivalenceClassCache for Scheduler, the default value is false
func WithEquivalenceClassCacheEnabled(enableEquivalenceClassCache bool) Option {
return func(o *schedulerOptions) {
o.enableEquivalenceClassCache = enableEquivalenceClassCache
}
}
// Recorder is the EventRecorder to use
Recorder record.EventRecorder
// WithPreemptionDisabled sets disablePreemption for Scheduler, the default value is false
func WithPreemptionDisabled(disablePreemption bool) Option {
return func(o *schedulerOptions) {
o.disablePreemption = disablePreemption
}
}
// Close this to shut down the scheduler.
StopEverything chan struct{}
// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler, the default value is 50
func WithPercentageOfNodesToScore(percentageOfNodesToScore int32) Option {
return func(o *schedulerOptions) {
o.percentageOfNodesToScore = percentageOfNodesToScore
}
}
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder
// WithBindTimeoutSeconds sets bindTimeoutSeconds for Scheduler, the default value is 100
func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option {
return func(o *schedulerOptions) {
o.bindTimeoutSeconds = bindTimeoutSeconds
}
}
// Disable pod preemption or not.
DisablePreemption bool
var defaultSchedulerOptions = schedulerOptions{
schedulerName: v1.DefaultSchedulerName,
hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: false,
disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds,
}
// New returns a Scheduler
func New(client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformers.StorageClassInformer,
recorder record.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: options.schedulerName,
Client: client,
NodeInformer: nodeInformer,
PodInformer: podInformer,
PvInformer: pvInformer,
PvcInformer: pvcInformer,
ReplicationControllerInformer: replicationControllerInformer,
ReplicaSetInformer: replicaSetInformer,
StatefulSetInformer: statefulSetInformer,
ServiceInformer: serviceInformer,
PdbInformer: pdbInformer,
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: options.enableEquivalenceClassCache,
DisablePreemption: options.disablePreemption,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
})
var config *factory.Config
source := schedulerAlgorithmSource
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.
sc, err := configurator.CreateFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
config = sc
case source.Policy != nil:
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
// Use a policy serialized in a file.
policyFile := source.Policy.File.Path
_, err := os.Stat(policyFile)
if err != nil {
return nil, fmt.Errorf("missing policy config file %s", policyFile)
}
data, err := ioutil.ReadFile(policyFile)
if err != nil {
return nil, fmt.Errorf("couldn't read policy config: %v", err)
}
err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
if err != nil {
return nil, fmt.Errorf("invalid policy: %v", err)
}
case source.Policy.ConfigMap != nil:
// Use a policy serialized in a config map value.
policyRef := source.Policy.ConfigMap
policyConfigMap, err := client.CoreV1().ConfigMaps(policyRef.Namespace).Get(policyRef.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err)
}
data, found := policyConfigMap.Data[kubeschedulerconfig.SchedulerPolicyConfigMapKey]
if !found {
return nil, fmt.Errorf("missing policy config map value at key %q", kubeschedulerconfig.SchedulerPolicyConfigMapKey)
}
err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
if err != nil {
return nil, fmt.Errorf("invalid policy: %v", err)
}
}
sc, err := configurator.CreateFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
config = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
// Additional tweaks to the config produced by the configurator.
config.Recorder = recorder
config.DisablePreemption = options.disablePreemption
// Create the scheduler.
sched := NewFromConfig(config)
return sched, nil
}
// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented.
// Supports intermediate Config mutation for now if you provide modifier functions which will run after Config is created.
func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Scheduler, error) {
func NewFromConfigurator(c factory.Configurator, modifiers ...func(c *factory.Config)) (*Scheduler, error) {
cfg, err := c.Create()
if err != nil {
return nil, err
@ -171,7 +255,7 @@ func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Schedul
}
// NewFromConfig returns a new scheduler using the provided Config.
func NewFromConfig(config *Config) *Scheduler {
func NewFromConfig(config *factory.Config) *Scheduler {
metrics.Register()
return &Scheduler{
config: config,
@ -188,7 +272,7 @@ func (sched *Scheduler) Run() {
}
// Config returns scheduler's config pointer. It is exposed for testing purposes.
func (sched *Scheduler) Config() *Config {
func (sched *Scheduler) Config() *factory.Config {
return sched.config
}

View File

@ -29,8 +29,11 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme"
@ -38,7 +41,10 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/api"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/factory"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
@ -119,6 +125,14 @@ func podWithResources(id, desiredHost string, limits v1.ResourceList, requests v
return pod
}
func PredicateOne(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return true, nil, nil
}
func PriorityOne(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (api.HostPriorityList, error) {
return []api.HostPriority{}, nil
}
type mockScheduler struct {
machine string
err error
@ -139,6 +153,39 @@ func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, sc
return nil, nil, nil, nil
}
func TestSchedulerCreation(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
testSource := "testProvider"
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(t.Logf).Stop()
defaultBindTimeout := int64(30)
factory.RegisterFitPredicate("PredicateOne", PredicateOne)
factory.RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
factory.RegisterAlgorithmProvider(testSource, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
_, err := New(client,
informerFactory.Core().V1().Nodes(),
factory.NewPodInformer(client, 0),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
WithBindTimeoutSeconds(defaultBindTimeout))
if err != nil {
t.Fatalf("Failed to create scheduler: %v", err)
}
}
func TestScheduler(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(t.Logf).Stop()
@ -200,7 +247,7 @@ func TestScheduler(t *testing.T) {
var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding
configurator := &FakeConfigurator{
Config: &Config{
Config: &factory.Config{
SchedulerCache: &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) {
gotForgetPod = pod
@ -213,7 +260,7 @@ func TestScheduler(t *testing.T) {
[]*v1.Node{&testNode},
),
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) Binder {
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
gotBinding = b
return item.injectBindError
@ -569,11 +616,11 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1)
configurator := &FakeConfigurator{
Config: &Config{
Config: &factory.Config{
SchedulerCache: scache,
NodeLister: nodeLister,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) Binder {
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
bindingChan <- b
return nil
@ -619,11 +666,11 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
api.DefaultPercentageOfNodesToScore)
bindingChan := make(chan *v1.Binding, 2)
configurator := &FakeConfigurator{
Config: &Config{
Config: &factory.Config{
SchedulerCache: scache,
NodeLister: nodeLister,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) Binder {
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
time.Sleep(bindingTime)
bindingChan <- b

View File

@ -25,13 +25,14 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/util"
)
// FakeConfigurator is an implementation for test.
type FakeConfigurator struct {
Config *Config
Config *factory.Config
}
// GetPredicateMetadataProducer is not implemented yet.
@ -70,21 +71,21 @@ func (fc *FakeConfigurator) GetScheduledPodLister() corelisters.PodLister {
}
// Create returns FakeConfigurator.Config
func (fc *FakeConfigurator) Create() (*Config, error) {
func (fc *FakeConfigurator) Create() (*factory.Config, error) {
return fc.Config, nil
}
// CreateFromProvider returns FakeConfigurator.Config
func (fc *FakeConfigurator) CreateFromProvider(providerName string) (*Config, error) {
func (fc *FakeConfigurator) CreateFromProvider(providerName string) (*factory.Config, error) {
return fc.Config, nil
}
// CreateFromConfig returns FakeConfigurator.Config
func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*factory.Config, error) {
return fc.Config, nil
}
// CreateFromKeys returns FakeConfigurator.Config
func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*factory.Config, error) {
return fc.Config, nil
}

View File

@ -62,8 +62,8 @@ type TestContext struct {
ns *v1.Namespace
clientSet *clientset.Clientset
informerFactory informers.SharedInformerFactory
schedulerConfigFactory scheduler.Configurator
schedulerConfig *scheduler.Config
schedulerConfigFactory factory.Configurator
schedulerConfig *factory.Config
scheduler *scheduler.Scheduler
}
@ -73,7 +73,7 @@ func createConfiguratorWithPodInformer(
clientSet clientset.Interface,
podInformer coreinformers.PodInformer,
informerFactory informers.SharedInformerFactory,
) scheduler.Configurator {
) factory.Configurator {
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: schedulerName,
Client: clientSet,

View File

@ -14,8 +14,8 @@ go_library(
],
importpath = "k8s.io/kubernetes/test/integration/scheduler_perf",
deps = [
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
@ -35,7 +35,7 @@ go_test(
tags = ["integration"],
deps = [
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/factory"
testutils "k8s.io/kubernetes/test/utils"
"math"
"strconv"
@ -105,7 +105,7 @@ type testConfig struct {
numNodes int
mutatedNodeTemplate *v1.Node
mutatedPodTemplate *v1.Pod
schedulerSupportFunctions scheduler.Configurator
schedulerSupportFunctions factory.Configurator
destroyFunc func()
}

View File

@ -20,8 +20,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/scheduler"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/pkg/scheduler/factory"
"k8s.io/kubernetes/test/integration/util"
)
@ -32,7 +32,7 @@ import (
// remove resources after finished.
// Notes on rate limiter:
// - client rate limit is set to 5000.
func mustSetupScheduler() (scheduler.Configurator, util.ShutdownFunc) {
func mustSetupScheduler() (factory.Configurator, util.ShutdownFunc) {
apiURL, apiShutdown := util.StartApiserver()
clientSet := clientset.NewForConfigOrDie(&restclient.Config{
Host: apiURL,

View File

@ -59,7 +59,7 @@ func StartApiserver() (string, ShutdownFunc) {
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface
// and event broadcaster. It returns a handle to the configurator for the running scheduler
// and the shutdown function to stop it.
func StartScheduler(clientSet clientset.Interface) (scheduler.Configurator, ShutdownFunc) {
func StartScheduler(clientSet clientset.Interface) (factory.Configurator, ShutdownFunc) {
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
evtBroadcaster := record.NewBroadcaster()
@ -68,7 +68,7 @@ func StartScheduler(clientSet clientset.Interface) (scheduler.Configurator, Shut
schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory)
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *scheduler.Config) {
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *factory.Config) {
conf.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
})
if err != nil {
@ -94,7 +94,7 @@ func StartScheduler(clientSet clientset.Interface) (scheduler.Configurator, Shut
func createSchedulerConfigurator(
clientSet clientset.Interface,
informerFactory informers.SharedInformerFactory,
) scheduler.Configurator {
) factory.Configurator {
// Enable EnableEquivalenceClassCache for all integration tests.
utilfeature.DefaultFeatureGate.Set("EnableEquivalenceClassCache=true")