feat: use scheduler.New instead in createSchedulerConfigurator

This commit is contained in:
draveness 2019-07-28 12:57:12 +08:00
parent b97d08fbe0
commit 6d77624bc3
6 changed files with 86 additions and 82 deletions

View File

@ -15,6 +15,8 @@ go_library(
importpath = "k8s.io/kubernetes/test/integration/scheduler_perf", importpath = "k8s.io/kubernetes/test/integration/scheduler_perf",
deps = [ deps = [
"//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/factory:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library",
@ -34,15 +36,13 @@ go_test(
tags = ["integration"], tags = ["integration"],
deps = [ deps = [
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/volume/util:go_default_library", "//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
"//test/integration/framework:go_default_library", "//test/integration/framework:go_default_library",

View File

@ -21,7 +21,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1" storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -358,12 +358,11 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
if b.N < minPods { if b.N < minPods {
b.N = minPods b.N = minPods
} }
schedulerConfigArgs, finalFunc := mustSetupScheduler() _, finalFunc, clientset := mustSetupScheduler()
defer finalFunc() defer finalFunc()
c := schedulerConfigArgs.Client
nodePreparer := framework.NewIntegrationTestNodePreparer( nodePreparer := framework.NewIntegrationTestNodePreparer(
c, clientset,
[]testutils.CountToStrategy{{Count: numNodes, Strategy: nodeStrategy}}, []testutils.CountToStrategy{{Count: numNodes, Strategy: nodeStrategy}},
"scheduler-perf-", "scheduler-perf-",
) )
@ -374,12 +373,11 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
config := testutils.NewTestPodCreatorConfig() config := testutils.NewTestPodCreatorConfig()
config.AddStrategy("sched-test", numExistingPods, setupPodStrategy) config.AddStrategy("sched-test", numExistingPods, setupPodStrategy)
podCreator := testutils.NewTestPodCreator(c, config) podCreator := testutils.NewTestPodCreator(clientset, config)
podCreator.CreatePods() podCreator.CreatePods()
podLister := schedulerConfigArgs.PodInformer.Lister()
for { for {
scheduled, err := getScheduledPods(podLister) scheduled, err := getScheduledPods(clientset)
if err != nil { if err != nil {
klog.Fatalf("%v", err) klog.Fatalf("%v", err)
} }
@ -392,12 +390,11 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
b.ResetTimer() b.ResetTimer()
config = testutils.NewTestPodCreatorConfig() config = testutils.NewTestPodCreatorConfig()
config.AddStrategy("sched-test", b.N, testPodStrategy) config.AddStrategy("sched-test", b.N, testPodStrategy)
podCreator = testutils.NewTestPodCreator(c, config) podCreator = testutils.NewTestPodCreator(clientset, config)
podCreator.CreatePods() podCreator.CreatePods()
for { for {
// This can potentially affect performance of scheduler, since List() is done under mutex.
// TODO: Setup watch on apiserver and wait until all pods scheduled. // TODO: Setup watch on apiserver and wait until all pods scheduled.
scheduled, err := getScheduledPods(podLister) scheduled, err := getScheduledPods(clientset)
if err != nil { if err != nil {
klog.Fatalf("%v", err) klog.Fatalf("%v", err)
} }

View File

@ -23,14 +23,13 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" clientset "k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/factory"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
"k8s.io/klog"
) )
const ( const (
@ -107,18 +106,18 @@ type testConfig struct {
numNodes int numNodes int
mutatedNodeTemplate *v1.Node mutatedNodeTemplate *v1.Node
mutatedPodTemplate *v1.Pod mutatedPodTemplate *v1.Pod
schedulerSupport *factory.ConfigFactoryArgs clientset clientset.Interface
destroyFunc func() destroyFunc func()
} }
// getBaseConfig returns baseConfig after initializing number of nodes and pods. // getBaseConfig returns baseConfig after initializing number of nodes and pods.
func getBaseConfig(nodes int, pods int) *testConfig { func getBaseConfig(nodes int, pods int) *testConfig {
schedulerConfigArgs, destroyFunc := mustSetupScheduler() _, destroyFunc, clientset := mustSetupScheduler()
return &testConfig{ return &testConfig{
schedulerSupport: schedulerConfigArgs, clientset: clientset,
destroyFunc: destroyFunc, destroyFunc: destroyFunc,
numNodes: nodes, numNodes: nodes,
numPods: pods, numPods: pods,
} }
} }
@ -134,11 +133,10 @@ func schedulePods(config *testConfig) int32 {
// We are interested in low scheduling rates (i.e. qps=2), // We are interested in low scheduling rates (i.e. qps=2),
minQPS := int32(math.MaxInt32) minQPS := int32(math.MaxInt32)
start := time.Now() start := time.Now()
podLister := config.schedulerSupport.PodInformer.Lister()
// Bake in time for the first pod scheduling event. // Bake in time for the first pod scheduling event.
for { for {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
scheduled, err := getScheduledPods(podLister) scheduled, err := getScheduledPods(config.clientset)
if err != nil { if err != nil {
klog.Fatalf("%v", err) klog.Fatalf("%v", err)
} }
@ -153,14 +151,11 @@ func schedulePods(config *testConfig) int32 {
// Now that scheduling has started, lets start taking the pulse on how many pods are happening per second. // Now that scheduling has started, lets start taking the pulse on how many pods are happening per second.
for { for {
// This can potentially affect performance of scheduler, since List() is done under mutex.
// Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler.
// TODO: Setup watch on apiserver and wait until all pods scheduled. // TODO: Setup watch on apiserver and wait until all pods scheduled.
scheduled, err := getScheduledPods(podLister) scheduled, err := getScheduledPods(config.clientset)
if err != nil { if err != nil {
klog.Fatalf("%v", err) klog.Fatalf("%v", err)
} }
// We will be completed when all pods are done being scheduled. // We will be completed when all pods are done being scheduled.
// return the worst-case-scenario interval that was seen during this time. // return the worst-case-scenario interval that was seen during this time.
// Note this should never be low due to cold-start, so allow bake in sched time if necessary. // Note this should never be low due to cold-start, so allow bake in sched time if necessary.
@ -186,20 +181,6 @@ func schedulePods(config *testConfig) int32 {
} }
} }
func getScheduledPods(lister listers.PodLister) ([]*v1.Pod, error) {
all, err := lister.List(labels.Everything())
if err != nil {
return nil, err
}
scheduled := make([]*v1.Pod, 0, len(all))
for _, pod := range all {
if len(pod.Spec.NodeName) > 0 {
scheduled = append(scheduled, pod)
}
}
return scheduled, nil
}
// mutateNodeTemplate returns the modified node needed for creation of nodes. // mutateNodeTemplate returns the modified node needed for creation of nodes.
func (na nodeAffinity) mutateNodeTemplate(node *v1.Node) { func (na nodeAffinity) mutateNodeTemplate(node *v1.Node) {
labels := make(map[string]string) labels := make(map[string]string)
@ -237,17 +218,18 @@ func (na nodeAffinity) mutatePodTemplate(pod *v1.Pod) {
// generateNodes generates nodes to be used for scheduling. // generateNodes generates nodes to be used for scheduling.
func (inputConfig *schedulerPerfConfig) generateNodes(config *testConfig) { func (inputConfig *schedulerPerfConfig) generateNodes(config *testConfig) {
for i := 0; i < inputConfig.NodeCount; i++ { for i := 0; i < inputConfig.NodeCount; i++ {
config.schedulerSupport.Client.CoreV1().Nodes().Create(config.mutatedNodeTemplate) config.clientset.CoreV1().Nodes().Create(config.mutatedNodeTemplate)
} }
for i := 0; i < config.numNodes-inputConfig.NodeCount; i++ { for i := 0; i < config.numNodes-inputConfig.NodeCount; i++ {
config.schedulerSupport.Client.CoreV1().Nodes().Create(baseNodeTemplate) config.clientset.CoreV1().Nodes().Create(baseNodeTemplate)
} }
} }
// generatePods generates pods to be used for scheduling. // generatePods generates pods to be used for scheduling.
func (inputConfig *schedulerPerfConfig) generatePods(config *testConfig) { func (inputConfig *schedulerPerfConfig) generatePods(config *testConfig) {
testutils.CreatePod(config.schedulerSupport.Client, "sample", inputConfig.PodCount, config.mutatedPodTemplate) testutils.CreatePod(config.clientset, "sample", inputConfig.PodCount, config.mutatedPodTemplate)
testutils.CreatePod(config.schedulerSupport.Client, "sample", config.numPods-inputConfig.PodCount, basePodTemplate) testutils.CreatePod(config.clientset, "sample", config.numPods-inputConfig.PodCount, basePodTemplate)
} }
// generatePodAndNodeTopology is the wrapper function for modifying both pods and node objects. // generatePodAndNodeTopology is the wrapper function for modifying both pods and node objects.

View File

@ -17,6 +17,8 @@ limitations under the License.
package benchmark package benchmark
import ( import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
@ -31,7 +33,7 @@ import (
// remove resources after finished. // remove resources after finished.
// Notes on rate limiter: // Notes on rate limiter:
// - client rate limit is set to 5000. // - client rate limit is set to 5000.
func mustSetupScheduler() (*factory.ConfigFactoryArgs, util.ShutdownFunc) { func mustSetupScheduler() (*factory.Config, util.ShutdownFunc, clientset.Interface) {
apiURL, apiShutdown := util.StartApiserver() apiURL, apiShutdown := util.StartApiserver()
clientSet := clientset.NewForConfigOrDie(&restclient.Config{ clientSet := clientset.NewForConfigOrDie(&restclient.Config{
Host: apiURL, Host: apiURL,
@ -39,11 +41,27 @@ func mustSetupScheduler() (*factory.ConfigFactoryArgs, util.ShutdownFunc) {
QPS: 5000.0, QPS: 5000.0,
Burst: 5000, Burst: 5000,
}) })
schedulerConfigArgs, schedulerShutdown := util.StartScheduler(clientSet) schedulerConfig, schedulerShutdown := util.StartScheduler(clientSet)
shutdownFunc := func() { shutdownFunc := func() {
schedulerShutdown() schedulerShutdown()
apiShutdown() apiShutdown()
} }
return schedulerConfigArgs, shutdownFunc return schedulerConfig, shutdownFunc, clientSet
}
func getScheduledPods(clientset clientset.Interface) ([]*v1.Pod, error) {
podList, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
return nil, err
}
allPods := podList.Items
scheduled := make([]*v1.Pod, 0, len(allPods))
for i := range allPods {
pod := allPods[i]
if len(pod.Spec.NodeName) > 0 {
scheduled = append(scheduled, &pod)
}
}
return scheduled, nil
} }

View File

@ -16,8 +16,9 @@ go_library(
"//pkg/api/legacyscheme:go_default_library", "//pkg/api/legacyscheme:go_default_library",
"//pkg/scheduler:go_default_library", "//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider/defaults:go_default_library", "//pkg/scheduler/algorithmprovider/defaults:go_default_library",
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -30,8 +30,9 @@ import (
// import DefaultProvider // import DefaultProvider
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults" _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/factory" "k8s.io/kubernetes/pkg/scheduler/factory"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
@ -59,7 +60,7 @@ func StartApiserver() (string, ShutdownFunc) {
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface // StartScheduler configures and starts a scheduler given a handle to the clientSet interface
// and event broadcaster. It returns a handle to the configurator args for the running scheduler // and event broadcaster. It returns a handle to the configurator args for the running scheduler
// and the shutdown function to stop it. // and the shutdown function to stop it.
func StartScheduler(clientSet clientset.Interface) (*factory.ConfigFactoryArgs, ShutdownFunc) { func StartScheduler(clientSet clientset.Interface) (*factory.Config, ShutdownFunc) {
informerFactory := informers.NewSharedInformerFactory(clientSet, 0) informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
@ -67,16 +68,15 @@ func StartScheduler(clientSet clientset.Interface) (*factory.ConfigFactoryArgs,
evtBroadcaster.StartRecordingToSink(stopCh) evtBroadcaster.StartRecordingToSink(stopCh)
configuratorArgs := createSchedulerConfiguratorArgs(clientSet, informerFactory, stopCh) recorder := evtBroadcaster.NewRecorder(
configurator := factory.NewConfigFactory(configuratorArgs) legacyscheme.Scheme,
v1.DefaultSchedulerName,
)
config, err := configurator.CreateFromConfig(schedulerapi.Policy{}) sched, err := createScheduler(clientSet, informerFactory, recorder, stopCh)
if err != nil { if err != nil {
klog.Fatalf("Error creating scheduler: %v", err) klog.Fatalf("Error creating scheduler: %v", err)
} }
config.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, "scheduler")
sched := scheduler.NewFromConfig(config)
scheduler.AddAllEventHandlers(sched, scheduler.AddAllEventHandlers(sched,
v1.DefaultSchedulerName, v1.DefaultSchedulerName,
informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().Nodes(),
@ -96,32 +96,38 @@ func StartScheduler(clientSet clientset.Interface) (*factory.ConfigFactoryArgs,
close(stopCh) close(stopCh)
klog.Infof("destroyed scheduler") klog.Infof("destroyed scheduler")
} }
return configuratorArgs, shutdownFunc return sched.Config(), shutdownFunc
} }
// createSchedulerConfigurator create a configurator for scheduler with given informer factory. // createScheduler create a scheduler with given informer factory and default name.
func createSchedulerConfiguratorArgs( func createScheduler(
clientSet clientset.Interface, clientSet clientset.Interface,
informerFactory informers.SharedInformerFactory, informerFactory informers.SharedInformerFactory,
recorder events.EventRecorder,
stopCh <-chan struct{}, stopCh <-chan struct{},
) *factory.ConfigFactoryArgs { ) (*scheduler.Scheduler, error) {
defaultProviderName := schedulerconfig.SchedulerDefaultProviderName
return &factory.ConfigFactoryArgs{ return scheduler.New(
Client: clientSet, clientSet,
NodeInformer: informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().Nodes(),
PodInformer: informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Pods(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(), informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(), informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(), informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(), informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(), informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), informerFactory.Storage().V1().StorageClasses(),
CSINodeInformer: informerFactory.Storage().V1beta1().CSINodes(), informerFactory.Storage().V1beta1().CSINodes(),
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, recorder,
DisablePreemption: false, schedulerconfig.SchedulerAlgorithmSource{
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, Provider: &defaultProviderName,
StopCh: stopCh, },
} stopCh,
schedulerframework.NewRegistry(),
nil,
[]schedulerconfig.PluginConfig{},
)
} }