From 995e741ef9e0a20c3cd2cf8aa250a99700bafdd2 Mon Sep 17 00:00:00 2001 From: Ahmad Diaa Date: Sun, 25 Aug 2019 18:14:57 +0200 Subject: [PATCH] use scheduler.New in createConfiguratorArgsWithPodInformer --- test/integration/scheduler/BUILD | 2 + test/integration/scheduler/scheduler_test.go | 48 ++++--- test/integration/scheduler/util.go | 134 +++++++++---------- 3 files changed, 93 insertions(+), 91 deletions(-) diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index d73d3068505..3ec832c4b60 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -93,6 +93,7 @@ go_library( "//pkg/scheduler:go_default_library", "//pkg/scheduler/algorithmprovider:go_default_library", "//pkg/scheduler/api:go_default_library", + "//pkg/scheduler/api/latest:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/framework/plugins:go_default_library", @@ -104,6 +105,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index dd824c327c1..e23c4cbf542 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -41,6 +41,7 @@ import ( _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/factory" schedulerplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -352,7 +353,7 @@ func TestUnschedulableNodes(t *testing.T) { context := initTest(t, "unschedulable-nodes") defer cleanupTest(t, context) - nodeLister := context.schedulerConfigArgs.NodeInformer.Lister() + nodeLister := context.informerFactory.Core().V1().Nodes().Lister() // NOTE: This test cannot run in parallel, because it is creating and deleting // non-namespaced objects (Nodes). defer context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{}) @@ -603,28 +604,39 @@ func TestMultiScheduler(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - schedulerConfigFactory2 := factory.NewConfigFactory(createConfiguratorArgsWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, schedulerplugins.NewDefaultRegistry(), - nil, []kubeschedulerconfig.PluginConfig{}, stopCh)) - schedulerConfig2, err := schedulerConfigFactory2.Create() + eventBroadcaster2 := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet2.EventsV1beta1().Events("")}) + recorder := eventBroadcaster2.NewRecorder(legacyscheme.Scheme, "k8s.io/"+fooScheduler) + provider := schedulerconfig.SchedulerDefaultProviderName + algorithmSrc := schedulerconfig.SchedulerAlgorithmSource{ + Provider: &provider, + } + sched2, err := scheduler.New( + clientSet2, + informerFactory2.Core().V1().Nodes(), + podInformer2, + informerFactory2.Core().V1().PersistentVolumes(), + informerFactory2.Core().V1().PersistentVolumeClaims(), + informerFactory2.Core().V1().ReplicationControllers(), + informerFactory2.Apps().V1().ReplicaSets(), + informerFactory2.Apps().V1().StatefulSets(), + informerFactory2.Core().V1().Services(), + informerFactory2.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory2.Storage().V1().StorageClasses(), + informerFactory2.Storage().V1beta1().CSINodes(), + recorder, + algorithmSrc, + stopCh, + schedulerplugins.NewDefaultRegistry(), + nil, + []kubeschedulerconfig.PluginConfig{}, + scheduler.WithName(fooScheduler), + scheduler.WithBindTimeoutSeconds(600), + ) if err != nil { t.Errorf("Couldn't create scheduler config: %v", err) } - eventBroadcaster2 := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet2.EventsV1beta1().Events("")}) - schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, "k8s.io/"+fooScheduler) eventBroadcaster2.StartRecordingToSink(stopCh) - sched2 := scheduler.NewFromConfig(schedulerConfig2) - scheduler.AddAllEventHandlers(sched2, - fooScheduler, - context.informerFactory.Core().V1().Nodes(), - podInformer2, - context.informerFactory.Core().V1().PersistentVolumes(), - context.informerFactory.Core().V1().PersistentVolumeClaims(), - context.informerFactory.Core().V1().Services(), - context.informerFactory.Storage().V1().StorageClasses(), - context.informerFactory.Storage().V1beta1().CSINodes(), - ) - go podInformer2.Informer().Run(stopCh) informerFactory2.Start(stopCh) sched2.Run() diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 8a13d1dc25b..58b23e3b36f 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" @@ -48,6 +49,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/scheduler" + latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" // Register defaults in pkg/scheduler/algorithmprovider. @@ -62,49 +64,32 @@ import ( ) type testContext struct { - closeFn framework.CloseFunc - httpServer *httptest.Server - ns *v1.Namespace - clientSet *clientset.Clientset - informerFactory informers.SharedInformerFactory - schedulerConfigArgs *factory.ConfigFactoryArgs - schedulerConfig *factory.Config - scheduler *scheduler.Scheduler - stopCh chan struct{} + closeFn framework.CloseFunc + httpServer *httptest.Server + ns *v1.Namespace + clientSet *clientset.Clientset + informerFactory informers.SharedInformerFactory + scheduler *scheduler.Scheduler + stopCh chan struct{} } -// createConfiguratorWithPodInformer creates a configurator for scheduler. -func createConfiguratorArgsWithPodInformer( - schedulerName string, - clientSet clientset.Interface, - podInformer coreinformers.PodInformer, - informerFactory informers.SharedInformerFactory, - pluginRegistry schedulerframework.Registry, - plugins *schedulerconfig.Plugins, - pluginConfig []schedulerconfig.PluginConfig, - stopCh <-chan struct{}, -) *factory.ConfigFactoryArgs { - return &factory.ConfigFactoryArgs{ - Client: clientSet, - NodeInformer: informerFactory.Core().V1().Nodes(), - PodInformer: podInformer, - PvInformer: informerFactory.Core().V1().PersistentVolumes(), - PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(), - ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(), - ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(), - StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(), - ServiceInformer: informerFactory.Core().V1().Services(), - PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(), - StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), - CSINodeInformer: informerFactory.Storage().V1beta1().CSINodes(), - Registry: pluginRegistry, - Plugins: plugins, - PluginConfig: pluginConfig, - HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, - DisablePreemption: false, - PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, - BindTimeoutSeconds: 600, - StopCh: stopCh, +func createAlgorithmSourceFromPolicy(policy *schedulerapi.Policy, clientSet clientset.Interface) schedulerconfig.SchedulerAlgorithmSource { + policyString := runtime.EncodeOrDie(latestschedulerapi.Codec, policy) + configPolicyName := "scheduler-custom-policy-config" + policyConfigMap := v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: configPolicyName}, + Data: map[string]string{schedulerconfig.SchedulerPolicyConfigMapKey: policyString}, + } + policyConfigMap.APIVersion = "v1" + clientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(&policyConfigMap) + + return schedulerconfig.SchedulerAlgorithmSource{ + Policy: &schedulerconfig.SchedulerPolicySource{ + ConfigMap: &schedulerconfig.SchedulerPolicyConfigMapSource{ + Namespace: policyConfigMap.Namespace, + Name: policyConfigMap.Name, + }, + }, } } @@ -185,58 +170,61 @@ func initTestSchedulerWithOptions( } else { podInformer = context.informerFactory.Core().V1().Pods() } - - context.schedulerConfigArgs = createConfiguratorArgsWithPodInformer( - v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, pluginRegistry, plugins, - pluginConfig, context.stopCh) - configFactory := factory.NewConfigFactory(context.schedulerConfigArgs) - var err error - - if policy != nil { - context.schedulerConfig, err = configFactory.CreateFromConfig(*policy) - } else { - context.schedulerConfig, err = configFactory.Create() - } - - if err != nil { - t.Fatalf("Couldn't create scheduler config: %v", err) - } - - // set DisablePreemption option - context.schedulerConfig.DisablePreemption = disablePreemption eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ Interface: context.clientSet.EventsV1beta1().Events(""), }) - context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder( + recorder := eventBroadcaster.NewRecorder( legacyscheme.Scheme, v1.DefaultSchedulerName, ) - - context.scheduler = scheduler.NewFromConfig(context.schedulerConfig) - - scheduler.AddAllEventHandlers(context.scheduler, - v1.DefaultSchedulerName, + var algorithmSrc schedulerconfig.SchedulerAlgorithmSource + if policy != nil { + algorithmSrc = createAlgorithmSourceFromPolicy(policy, context.clientSet) + } else { + provider := schedulerconfig.SchedulerDefaultProviderName + algorithmSrc = schedulerconfig.SchedulerAlgorithmSource{ + Provider: &provider, + } + } + context.scheduler, err = scheduler.New( + context.clientSet, context.informerFactory.Core().V1().Nodes(), podInformer, context.informerFactory.Core().V1().PersistentVolumes(), context.informerFactory.Core().V1().PersistentVolumeClaims(), + context.informerFactory.Core().V1().ReplicationControllers(), + context.informerFactory.Apps().V1().ReplicaSets(), + context.informerFactory.Apps().V1().StatefulSets(), context.informerFactory.Core().V1().Services(), + context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(), context.informerFactory.Storage().V1().StorageClasses(), context.informerFactory.Storage().V1beta1().CSINodes(), + recorder, + algorithmSrc, + context.stopCh, + pluginRegistry, + plugins, + pluginConfig, + scheduler.WithPreemptionDisabled(disablePreemption), + scheduler.WithBindTimeoutSeconds(600), ) + if err != nil { + t.Fatalf("Couldn't create scheduler: %v", err) + } + // set setPodInformer if provided. if setPodInformer { - go podInformer.Informer().Run(context.schedulerConfig.StopEverything) - cache.WaitForNamedCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced) + go podInformer.Informer().Run(context.scheduler.StopEverything) + cache.WaitForNamedCacheSync("scheduler", context.scheduler.StopEverything, podInformer.Informer().HasSynced) } stopCh := make(chan struct{}) eventBroadcaster.StartRecordingToSink(stopCh) - context.informerFactory.Start(context.schedulerConfig.StopEverything) - context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything) + context.informerFactory.Start(context.scheduler.StopEverything) + context.informerFactory.WaitForCacheSync(context.scheduler.StopEverything) context.scheduler.Run() return context @@ -268,9 +256,9 @@ func initDisruptionController(t *testing.T, context *testContext) *disruption.Di mapper, scaleClient) - informers.Start(context.schedulerConfig.StopEverything) - informers.WaitForCacheSync(context.schedulerConfig.StopEverything) - go dc.Run(context.schedulerConfig.StopEverything) + informers.Start(context.scheduler.StopEverything) + informers.WaitForCacheSync(context.scheduler.StopEverything) + go dc.Run(context.scheduler.StopEverything) return dc }