From 6d77624bc3c37917da5f572c2ef52f349931a499 Mon Sep 17 00:00:00 2001 From: draveness Date: Sun, 28 Jul 2019 12:57:12 +0800 Subject: [PATCH] feat: use scheduler.New instead in createSchedulerConfigurator --- test/integration/scheduler_perf/BUILD | 6 +- .../scheduler_perf/scheduler_bench_test.go | 17 ++--- .../scheduler_perf/scheduler_test.go | 52 +++++---------- test/integration/scheduler_perf/util.go | 24 ++++++- test/integration/util/BUILD | 3 +- test/integration/util/util.go | 66 ++++++++++--------- 6 files changed, 86 insertions(+), 82 deletions(-) diff --git a/test/integration/scheduler_perf/BUILD b/test/integration/scheduler_perf/BUILD index 792bdcbaa7e..85fd2bd64c9 100644 --- a/test/integration/scheduler_perf/BUILD +++ b/test/integration/scheduler_perf/BUILD @@ -15,6 +15,8 @@ go_library( importpath = "k8s.io/kubernetes/test/integration/scheduler_perf", deps = [ "//pkg/scheduler/factory:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", @@ -34,15 +36,13 @@ go_test( tags = ["integration"], deps = [ "//pkg/features:go_default_library", - "//pkg/scheduler/factory:go_default_library", "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", - "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", "//test/integration/framework:go_default_library", diff --git a/test/integration/scheduler_perf/scheduler_bench_test.go b/test/integration/scheduler_perf/scheduler_bench_test.go index 90e32f89ebb..7b154dafa6e 100644 --- a/test/integration/scheduler_perf/scheduler_bench_test.go +++ b/test/integration/scheduler_perf/scheduler_bench_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -358,12 +358,11 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int, if b.N < minPods { b.N = minPods } - schedulerConfigArgs, finalFunc := mustSetupScheduler() + _, finalFunc, clientset := mustSetupScheduler() defer finalFunc() - c := schedulerConfigArgs.Client nodePreparer := framework.NewIntegrationTestNodePreparer( - c, + clientset, []testutils.CountToStrategy{{Count: numNodes, Strategy: nodeStrategy}}, "scheduler-perf-", ) @@ -374,12 +373,11 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int, config := testutils.NewTestPodCreatorConfig() config.AddStrategy("sched-test", numExistingPods, setupPodStrategy) - podCreator := testutils.NewTestPodCreator(c, config) + podCreator := testutils.NewTestPodCreator(clientset, config) podCreator.CreatePods() - podLister := schedulerConfigArgs.PodInformer.Lister() for { - scheduled, err := getScheduledPods(podLister) + scheduled, err := getScheduledPods(clientset) if err != nil { klog.Fatalf("%v", err) } @@ -392,12 +390,11 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int, b.ResetTimer() config = testutils.NewTestPodCreatorConfig() config.AddStrategy("sched-test", b.N, testPodStrategy) - podCreator = testutils.NewTestPodCreator(c, config) + podCreator = testutils.NewTestPodCreator(clientset, config) podCreator.CreatePods() for { - // This can potentially affect performance of scheduler, since List() is done under mutex. // TODO: Setup watch on apiserver and wait until all pods scheduled. - scheduled, err := getScheduledPods(podLister) + scheduled, err := getScheduledPods(clientset) if err != nil { klog.Fatalf("%v", err) } diff --git a/test/integration/scheduler_perf/scheduler_test.go b/test/integration/scheduler_perf/scheduler_test.go index 9abc700fecb..eef16b23779 100644 --- a/test/integration/scheduler_perf/scheduler_test.go +++ b/test/integration/scheduler_perf/scheduler_test.go @@ -23,14 +23,13 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - listers "k8s.io/client-go/listers/core/v1" - "k8s.io/klog" - "k8s.io/kubernetes/pkg/scheduler/factory" + clientset "k8s.io/client-go/kubernetes" testutils "k8s.io/kubernetes/test/utils" + + "k8s.io/klog" ) const ( @@ -107,18 +106,18 @@ type testConfig struct { numNodes int mutatedNodeTemplate *v1.Node mutatedPodTemplate *v1.Pod - schedulerSupport *factory.ConfigFactoryArgs + clientset clientset.Interface destroyFunc func() } // getBaseConfig returns baseConfig after initializing number of nodes and pods. func getBaseConfig(nodes int, pods int) *testConfig { - schedulerConfigArgs, destroyFunc := mustSetupScheduler() + _, destroyFunc, clientset := mustSetupScheduler() return &testConfig{ - schedulerSupport: schedulerConfigArgs, - destroyFunc: destroyFunc, - numNodes: nodes, - numPods: pods, + clientset: clientset, + destroyFunc: destroyFunc, + numNodes: nodes, + numPods: pods, } } @@ -134,11 +133,10 @@ func schedulePods(config *testConfig) int32 { // We are interested in low scheduling rates (i.e. qps=2), minQPS := int32(math.MaxInt32) start := time.Now() - podLister := config.schedulerSupport.PodInformer.Lister() // Bake in time for the first pod scheduling event. for { time.Sleep(50 * time.Millisecond) - scheduled, err := getScheduledPods(podLister) + scheduled, err := getScheduledPods(config.clientset) if err != nil { klog.Fatalf("%v", err) } @@ -153,14 +151,11 @@ func schedulePods(config *testConfig) int32 { // Now that scheduling has started, lets start taking the pulse on how many pods are happening per second. for { - // This can potentially affect performance of scheduler, since List() is done under mutex. - // Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler. // TODO: Setup watch on apiserver and wait until all pods scheduled. - scheduled, err := getScheduledPods(podLister) + scheduled, err := getScheduledPods(config.clientset) if err != nil { klog.Fatalf("%v", err) } - // We will be completed when all pods are done being scheduled. // return the worst-case-scenario interval that was seen during this time. // Note this should never be low due to cold-start, so allow bake in sched time if necessary. @@ -186,20 +181,6 @@ func schedulePods(config *testConfig) int32 { } } -func getScheduledPods(lister listers.PodLister) ([]*v1.Pod, error) { - all, err := lister.List(labels.Everything()) - if err != nil { - return nil, err - } - scheduled := make([]*v1.Pod, 0, len(all)) - for _, pod := range all { - if len(pod.Spec.NodeName) > 0 { - scheduled = append(scheduled, pod) - } - } - return scheduled, nil -} - // mutateNodeTemplate returns the modified node needed for creation of nodes. func (na nodeAffinity) mutateNodeTemplate(node *v1.Node) { labels := make(map[string]string) @@ -237,17 +218,18 @@ func (na nodeAffinity) mutatePodTemplate(pod *v1.Pod) { // generateNodes generates nodes to be used for scheduling. func (inputConfig *schedulerPerfConfig) generateNodes(config *testConfig) { for i := 0; i < inputConfig.NodeCount; i++ { - config.schedulerSupport.Client.CoreV1().Nodes().Create(config.mutatedNodeTemplate) + config.clientset.CoreV1().Nodes().Create(config.mutatedNodeTemplate) + } for i := 0; i < config.numNodes-inputConfig.NodeCount; i++ { - config.schedulerSupport.Client.CoreV1().Nodes().Create(baseNodeTemplate) + config.clientset.CoreV1().Nodes().Create(baseNodeTemplate) } } // generatePods generates pods to be used for scheduling. func (inputConfig *schedulerPerfConfig) generatePods(config *testConfig) { - testutils.CreatePod(config.schedulerSupport.Client, "sample", inputConfig.PodCount, config.mutatedPodTemplate) - testutils.CreatePod(config.schedulerSupport.Client, "sample", config.numPods-inputConfig.PodCount, basePodTemplate) + testutils.CreatePod(config.clientset, "sample", inputConfig.PodCount, config.mutatedPodTemplate) + testutils.CreatePod(config.clientset, "sample", config.numPods-inputConfig.PodCount, basePodTemplate) } // generatePodAndNodeTopology is the wrapper function for modifying both pods and node objects. diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 8d90a19eed1..97f98286b0a 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -17,6 +17,8 @@ limitations under the License. package benchmark import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -31,7 +33,7 @@ import ( // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler() (*factory.ConfigFactoryArgs, util.ShutdownFunc) { +func mustSetupScheduler() (*factory.Config, util.ShutdownFunc, clientset.Interface) { apiURL, apiShutdown := util.StartApiserver() clientSet := clientset.NewForConfigOrDie(&restclient.Config{ Host: apiURL, @@ -39,11 +41,27 @@ func mustSetupScheduler() (*factory.ConfigFactoryArgs, util.ShutdownFunc) { QPS: 5000.0, Burst: 5000, }) - schedulerConfigArgs, schedulerShutdown := util.StartScheduler(clientSet) + schedulerConfig, schedulerShutdown := util.StartScheduler(clientSet) shutdownFunc := func() { schedulerShutdown() apiShutdown() } - return schedulerConfigArgs, shutdownFunc + return schedulerConfig, shutdownFunc, clientSet +} + +func getScheduledPods(clientset clientset.Interface) ([]*v1.Pod, error) { + podList, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + allPods := podList.Items + scheduled := make([]*v1.Pod, 0, len(allPods)) + for i := range allPods { + pod := allPods[i] + if len(pod.Spec.NodeName) > 0 { + scheduled = append(scheduled, &pod) + } + } + return scheduled, nil } diff --git a/test/integration/util/BUILD b/test/integration/util/BUILD index 32a9e52bed0..a198c395c24 100644 --- a/test/integration/util/BUILD +++ b/test/integration/util/BUILD @@ -16,8 +16,9 @@ go_library( "//pkg/api/legacyscheme:go_default_library", "//pkg/scheduler:go_default_library", "//pkg/scheduler/algorithmprovider/defaults:go_default_library", - "//pkg/scheduler/api:go_default_library", + "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/factory:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 621afc342fb..29ba1b9ba4a 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -30,8 +30,9 @@ import ( // import DefaultProvider _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults" - schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/factory" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/test/integration/framework" ) @@ -59,7 +60,7 @@ func StartApiserver() (string, ShutdownFunc) { // StartScheduler configures and starts a scheduler given a handle to the clientSet interface // and event broadcaster. It returns a handle to the configurator args for the running scheduler // and the shutdown function to stop it. -func StartScheduler(clientSet clientset.Interface) (*factory.ConfigFactoryArgs, ShutdownFunc) { +func StartScheduler(clientSet clientset.Interface) (*factory.Config, ShutdownFunc) { informerFactory := informers.NewSharedInformerFactory(clientSet, 0) stopCh := make(chan struct{}) evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ @@ -67,16 +68,15 @@ func StartScheduler(clientSet clientset.Interface) (*factory.ConfigFactoryArgs, evtBroadcaster.StartRecordingToSink(stopCh) - configuratorArgs := createSchedulerConfiguratorArgs(clientSet, informerFactory, stopCh) - configurator := factory.NewConfigFactory(configuratorArgs) + recorder := evtBroadcaster.NewRecorder( + legacyscheme.Scheme, + v1.DefaultSchedulerName, + ) - config, err := configurator.CreateFromConfig(schedulerapi.Policy{}) + sched, err := createScheduler(clientSet, informerFactory, recorder, stopCh) if err != nil { klog.Fatalf("Error creating scheduler: %v", err) } - config.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, "scheduler") - - sched := scheduler.NewFromConfig(config) scheduler.AddAllEventHandlers(sched, v1.DefaultSchedulerName, informerFactory.Core().V1().Nodes(), @@ -96,32 +96,38 @@ func StartScheduler(clientSet clientset.Interface) (*factory.ConfigFactoryArgs, close(stopCh) klog.Infof("destroyed scheduler") } - return configuratorArgs, shutdownFunc + return sched.Config(), shutdownFunc } -// createSchedulerConfigurator create a configurator for scheduler with given informer factory. -func createSchedulerConfiguratorArgs( +// createScheduler create a scheduler with given informer factory and default name. +func createScheduler( clientSet clientset.Interface, informerFactory informers.SharedInformerFactory, + recorder events.EventRecorder, stopCh <-chan struct{}, -) *factory.ConfigFactoryArgs { +) (*scheduler.Scheduler, error) { + defaultProviderName := schedulerconfig.SchedulerDefaultProviderName - return &factory.ConfigFactoryArgs{ - Client: clientSet, - NodeInformer: informerFactory.Core().V1().Nodes(), - PodInformer: informerFactory.Core().V1().Pods(), - PvInformer: informerFactory.Core().V1().PersistentVolumes(), - PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(), - ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(), - ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(), - StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(), - ServiceInformer: informerFactory.Core().V1().Services(), - PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(), - StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), - CSINodeInformer: informerFactory.Storage().V1beta1().CSINodes(), - HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, - DisablePreemption: false, - PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, - StopCh: stopCh, - } + return scheduler.New( + clientSet, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Apps().V1().ReplicaSets(), + informerFactory.Apps().V1().StatefulSets(), + informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), + informerFactory.Storage().V1beta1().CSINodes(), + recorder, + schedulerconfig.SchedulerAlgorithmSource{ + Provider: &defaultProviderName, + }, + stopCh, + schedulerframework.NewRegistry(), + nil, + []schedulerconfig.PluginConfig{}, + ) }