Move users of factory.NewConfigFactory to scheduler.New

This commit is contained in:
Guoliang Wang 2018-12-08 11:26:41 +08:00
parent 13e59ab9ad
commit 3c24c99b08
10 changed files with 131 additions and 258 deletions

View File

@ -16,17 +16,12 @@ go_library(
"//pkg/controller:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/latest:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/util/configz:go_default_library",
"//pkg/util/flag:go_default_library",
"//pkg/version:go_default_library",
"//pkg/version/verflag:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",

View File

@ -21,13 +21,10 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
goruntime "runtime"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/authentication/authenticator"
@ -48,10 +45,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/factory"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/util/configz"
utilflag "k8s.io/kubernetes/pkg/util/flag"
@ -324,85 +318,3 @@ func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, s
}
return pathRecorderMux
}
// NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests.
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*factory.Config, error) {
// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: s.ComponentConfig.SchedulerName,
Client: s.Client,
NodeInformer: s.InformerFactory.Core().V1().Nodes(),
PodInformer: s.PodInformer,
PvInformer: s.InformerFactory.Core().V1().PersistentVolumes(),
PvcInformer: s.InformerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: s.InformerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: s.InformerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: s.InformerFactory.Apps().V1().StatefulSets(),
ServiceInformer: s.InformerFactory.Core().V1().Services(),
PdbInformer: s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: s.InformerFactory.Storage().V1().StorageClasses(),
HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight,
DisablePreemption: s.ComponentConfig.DisablePreemption,
PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore,
BindTimeoutSeconds: *s.ComponentConfig.BindTimeoutSeconds,
})
source := s.ComponentConfig.AlgorithmSource
var config *factory.Config
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.
sc, err := configurator.CreateFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
config = sc
case source.Policy != nil:
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
// Use a policy serialized in a file.
policyFile := source.Policy.File.Path
_, err := os.Stat(policyFile)
if err != nil {
return nil, fmt.Errorf("missing policy config file %s", policyFile)
}
data, err := ioutil.ReadFile(policyFile)
if err != nil {
return nil, fmt.Errorf("couldn't read policy config: %v", err)
}
err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
if err != nil {
return nil, fmt.Errorf("invalid policy: %v", err)
}
case source.Policy.ConfigMap != nil:
// Use a policy serialized in a config map value.
policyRef := source.Policy.ConfigMap
policyConfigMap, err := s.Client.CoreV1().ConfigMaps(policyRef.Namespace).Get(policyRef.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err)
}
data, found := policyConfigMap.Data[kubeschedulerconfig.SchedulerPolicyConfigMapKey]
if !found {
return nil, fmt.Errorf("missing policy config map value at key %q", kubeschedulerconfig.SchedulerPolicyConfigMapKey)
}
err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
if err != nil {
return nil, fmt.Errorf("invalid policy: %v", err)
}
}
sc, err := configurator.CreateFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
config = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
// Additional tweaks to the config produced by the configurator.
config.Recorder = s.Recorder
config.DisablePreemption = s.ComponentConfig.DisablePreemption
return config, nil
}

View File

@ -234,25 +234,6 @@ func initPolicyFromConfigMap(client clientset.Interface, policyRef *kubeschedule
return nil
}
// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented.
// Supports intermediate Config mutation for now if you provide modifier functions which will run after Config is created.
func NewFromConfigurator(c factory.Configurator, modifiers ...func(c *factory.Config)) (*Scheduler, error) {
cfg, err := c.Create()
if err != nil {
return nil, err
}
// Mutate it if any functions were provided, changes might be required for certain types of tests (i.e. change the recorder).
for _, modifier := range modifiers {
modifier(cfg)
}
// From this point on the config is immutable to the outside.
s := &Scheduler{
config: cfg,
}
metrics.Register()
return s, nil
}
// NewFromConfig returns a new scheduler using the provided Config.
func NewFromConfig(config *factory.Config) *Scheduler {
metrics.Register()

View File

@ -266,39 +266,36 @@ func TestScheduler(t *testing.T) {
var gotForgetPod *v1.Pod
var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding
configurator := &FakeConfigurator{
Config: &factory.Config{
SchedulerCache: &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) {
gotForgetPod = pod
},
AssumeFunc: func(pod *v1.Pod) {
gotAssumedPod = pod
},
},
NodeLister: &nodeLister{nl},
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
gotBinding = b
return item.injectBindError
}}
},
PodConditionUpdater: fakePodConditionUpdater{},
Error: func(p *v1.Pod, err error) {
gotPod = p
gotError = err
},
NextPod: func() *v1.Pod {
return item.sendPod
},
PluginSet: &EmptyPluginSet{},
Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
},
}
s, _ := NewFromConfigurator(configurator, nil...)
s := NewFromConfig(&factory.Config{
SchedulerCache: &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) {
gotForgetPod = pod
},
AssumeFunc: func(pod *v1.Pod) {
gotAssumedPod = pod
},
},
NodeLister: &nodeLister{nl},
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
gotBinding = b
return item.injectBindError
}}
},
PodConditionUpdater: fakePodConditionUpdater{},
Error: func(p *v1.Pod, err error) {
gotPod = p
gotError = err
},
NextPod: func() *v1.Pod {
return item.sendPod
},
PluginSet: &EmptyPluginSet{},
Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
})
called := make(chan struct{})
events := eventBroadcaster.StartEventWatcher(func(e *v1.Event) {
if e, a := item.eventReason, e.Reason; e != a {
@ -650,36 +647,35 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
api.DefaultPercentageOfNodesToScore)
bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1)
configurator := &FakeConfigurator{
Config: &factory.Config{
SchedulerCache: scache,
NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
bindingChan <- b
return nil
}}
},
NextPod: func() *v1.Pod {
return clientcache.Pop(queuedPodStore).(*v1.Pod)
},
Error: func(p *v1.Pod, err error) {
errChan <- err
},
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
PluginSet: &EmptyPluginSet{},
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
config := &factory.Config{
SchedulerCache: scache,
NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
bindingChan <- b
return nil
}}
},
NextPod: func() *v1.Pod {
return clientcache.Pop(queuedPodStore).(*v1.Pod)
},
Error: func(p *v1.Pod, err error) {
errChan <- err
},
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
PluginSet: &EmptyPluginSet{},
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
}
if recorder != nil {
configurator.Config.Recorder = recorder
config.Recorder = recorder
}
sched, _ := NewFromConfigurator(configurator, nil...)
sched := NewFromConfig(config)
return sched, bindingChan, errChan
}
@ -701,37 +697,34 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
false,
api.DefaultPercentageOfNodesToScore)
bindingChan := make(chan *v1.Binding, 2)
configurator := &FakeConfigurator{
Config: &factory.Config{
SchedulerCache: scache,
NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
time.Sleep(bindingTime)
bindingChan <- b
return nil
}}
},
WaitForCacheSync: func() bool {
return true
},
NextPod: func() *v1.Pod {
return clientcache.Pop(queuedPodStore).(*v1.Pod)
},
Error: func(p *v1.Pod, err error) {
queuedPodStore.AddIfNotPresent(p)
},
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
StopEverything: stop,
PluginSet: &EmptyPluginSet{},
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
},
}
sched, _ := NewFromConfigurator(configurator, nil...)
sched := NewFromConfig(&factory.Config{
SchedulerCache: scache,
NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
time.Sleep(bindingTime)
bindingChan <- b
return nil
}}
},
WaitForCacheSync: func() bool {
return true
},
NextPod: func() *v1.Pod {
return clientcache.Pop(queuedPodStore).(*v1.Pod)
},
Error: func(p *v1.Pod, err error) {
queuedPodStore.AddIfNotPresent(p)
},
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
StopEverything: stop,
PluginSet: &EmptyPluginSet{},
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
})
return sched, bindingChan
}

View File

@ -132,11 +132,7 @@ func setupScheduler(
Interface: cs.CoreV1().Events(""),
})
sched, err := scheduler.NewFromConfigurator(
&scheduler.FakeConfigurator{Config: schedulerConfig}, nil...)
if err != nil {
t.Fatalf("error creating scheduler: %v", err)
}
sched := scheduler.NewFromConfig(schedulerConfig)
algorithmprovider.ApplyFeatureGates()

View File

@ -23,8 +23,6 @@ go_test(
embed = [":go_default_library"],
tags = ["integration"],
deps = [
"//cmd/kube-scheduler/app:go_default_library",
"//cmd/kube-scheduler/app/config:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library",

View File

@ -36,8 +36,6 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
schedulerapp "k8s.io/kubernetes/cmd/kube-scheduler/app"
schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
@ -179,33 +177,38 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
defaultBindTimeout := int64(30)
ss := &schedulerappconfig.Config{
ComponentConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
SchedulerName: v1.DefaultSchedulerName,
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{
Policy: &kubeschedulerconfig.SchedulerPolicySource{
ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{
Namespace: policyConfigMap.Namespace,
Name: policyConfigMap.Name,
},
sched, err := scheduler.New(clientSet,
informerFactory.Core().V1().Nodes(),
factory.NewPodInformer(clientSet, 0),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
kubeschedulerconfig.SchedulerAlgorithmSource{
Policy: &kubeschedulerconfig.SchedulerPolicySource{
ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{
Namespace: policyConfigMap.Namespace,
Name: policyConfigMap.Name,
},
},
BindTimeoutSeconds: &defaultBindTimeout,
},
Client: clientSet,
InformerFactory: informerFactory,
PodInformer: factory.NewPodInformer(clientSet, 0),
EventClient: clientSet.CoreV1(),
Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
Broadcaster: eventBroadcaster,
}
config, err := schedulerapp.NewSchedulerConfig(ss.Complete())
nil,
scheduler.WithName(v1.DefaultSchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(v1.DefaultHardPodAffinitySymmetricWeight),
scheduler.WithBindTimeoutSeconds(defaultBindTimeout),
)
if err != nil {
t.Fatalf("couldn't make scheduler config: %v", err)
}
config := sched.Config()
// Verify that the config is applied correctly.
schedPredicates := sets.NewString()
for k := range config.Algorithm.Predicates() {
@ -242,29 +245,32 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
defaultBindTimeout := int64(30)
ss := &schedulerappconfig.Config{
ComponentConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
SchedulerName: v1.DefaultSchedulerName,
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{
Policy: &kubeschedulerconfig.SchedulerPolicySource{
ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{
Namespace: "non-existent-config",
Name: "non-existent-config",
},
_, err := scheduler.New(clientSet,
informerFactory.Core().V1().Nodes(),
factory.NewPodInformer(clientSet, 0),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
kubeschedulerconfig.SchedulerAlgorithmSource{
Policy: &kubeschedulerconfig.SchedulerPolicySource{
ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{
Namespace: "non-existent-config",
Name: "non-existent-config",
},
},
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
BindTimeoutSeconds: &defaultBindTimeout,
},
Client: clientSet,
InformerFactory: informerFactory,
PodInformer: factory.NewPodInformer(clientSet, 0),
EventClient: clientSet.CoreV1(),
Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
Broadcaster: eventBroadcaster,
}
nil,
scheduler.WithName(v1.DefaultSchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(v1.DefaultHardPodAffinitySymmetricWeight),
scheduler.WithBindTimeoutSeconds(defaultBindTimeout))
_, err := schedulerapp.NewSchedulerConfig(ss.Complete())
if err == nil {
t.Fatalf("Creation of scheduler didn't fail while the policy ConfigMap didn't exist.")
}
@ -536,7 +542,7 @@ func TestMultiScheduler(t *testing.T) {
go podInformer2.Informer().Run(stopCh)
informerFactory2.Start(stopCh)
sched2, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig2}, nil...)
sched2 := scheduler.NewFromConfig(schedulerConfig2)
sched2.Run()
// 6. **check point-2**:

View File

@ -212,12 +212,7 @@ func initTestSchedulerWithOptions(
context.informerFactory.Start(context.schedulerConfig.StopEverything)
context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything)
context.scheduler, err = scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{
Config: context.schedulerConfig},
nil...)
if err != nil {
t.Fatalf("Couldn't create scheduler: %v", err)
}
context.scheduler = scheduler.NewFromConfig(context.schedulerConfig)
context.scheduler.Run()
return context
}

View File

@ -13,7 +13,6 @@ go_library(
],
importpath = "k8s.io/kubernetes/test/integration/util",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/api:go_default_library",

View File

@ -26,7 +26,6 @@ import (
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/scheduler"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
@ -67,13 +66,12 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
stopCh := make(chan struct{})
schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory, stopCh)
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *factory.Config) {
conf.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
})
config, err := schedulerConfigurator.CreateFromConfig(schedulerapi.Policy{})
if err != nil {
klog.Fatalf("Error creating scheduler: %v", err)
}
sched := scheduler.NewFromConfig(config)
informerFactory.Start(stopCh)
sched.Run()