fix: scheduler perf test with pod informer

This commit is contained in:
draveness 2019-10-15 12:57:49 +08:00
parent 63bd1d7a5c
commit e5a23f8dda
6 changed files with 17 additions and 20 deletions

View File

@ -36,7 +36,6 @@ go_test(
tags = ["integration"], tags = ["integration"],
deps = [ deps = [
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/volume/util:go_default_library", "//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library",

View File

@ -33,7 +33,6 @@ import (
csilibplugins "k8s.io/csi-translation-lib/plugins" csilibplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
@ -361,10 +360,9 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
if b.N < minPods { if b.N < minPods {
b.N = minPods b.N = minPods
} }
finalFunc, clientset := mustSetupScheduler() finalFunc, podInformer, clientset := mustSetupScheduler()
defer finalFunc() defer finalFunc()
podInformer := scheduler.NewPodInformer(clientset, 0)
nodePreparer := framework.NewIntegrationTestNodePreparer( nodePreparer := framework.NewIntegrationTestNodePreparer(
clientset, clientset,
[]testutils.CountToStrategy{{Count: numNodes, Strategy: nodeStrategy}}, []testutils.CountToStrategy{{Count: numNodes, Strategy: nodeStrategy}},
@ -388,6 +386,7 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
if len(scheduled) >= numExistingPods { if len(scheduled) >= numExistingPods {
break break
} }
klog.Infof("got %d existing pods, required: %d", len(scheduled), numExistingPods)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }

View File

@ -31,7 +31,6 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/scheduler"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
"k8s.io/klog" "k8s.io/klog"
@ -116,12 +115,13 @@ type testConfig struct {
// getBaseConfig returns baseConfig after initializing number of nodes and pods. // getBaseConfig returns baseConfig after initializing number of nodes and pods.
func getBaseConfig(nodes int, pods int) *testConfig { func getBaseConfig(nodes int, pods int) *testConfig {
destroyFunc, clientset := mustSetupScheduler() destroyFunc, podInformer, clientset := mustSetupScheduler()
return &testConfig{ return &testConfig{
clientset: clientset, clientset: clientset,
destroyFunc: destroyFunc, destroyFunc: destroyFunc,
numNodes: nodes, numNodes: nodes,
numPods: pods, numPods: pods,
podInformer: podInformer,
} }
} }
@ -138,11 +138,10 @@ func schedulePods(config *testConfig) int32 {
minQPS := int32(math.MaxInt32) minQPS := int32(math.MaxInt32)
start := time.Now() start := time.Now()
podInformer := scheduler.NewPodInformer(config.clientset, 0)
// Bake in time for the first pod scheduling event. // Bake in time for the first pod scheduling event.
for { for {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
scheduled, err := getScheduledPods(podInformer) scheduled, err := getScheduledPods(config.podInformer)
if err != nil { if err != nil {
klog.Fatalf("%v", err) klog.Fatalf("%v", err)
} }

View File

@ -33,7 +33,7 @@ import (
// remove resources after finished. // remove resources after finished.
// Notes on rate limiter: // Notes on rate limiter:
// - client rate limit is set to 5000. // - client rate limit is set to 5000.
func mustSetupScheduler() (util.ShutdownFunc, clientset.Interface) { func mustSetupScheduler() (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface) {
apiURL, apiShutdown := util.StartApiserver() apiURL, apiShutdown := util.StartApiserver()
clientSet := clientset.NewForConfigOrDie(&restclient.Config{ clientSet := clientset.NewForConfigOrDie(&restclient.Config{
Host: apiURL, Host: apiURL,
@ -41,14 +41,14 @@ func mustSetupScheduler() (util.ShutdownFunc, clientset.Interface) {
QPS: 5000.0, QPS: 5000.0,
Burst: 5000, Burst: 5000,
}) })
_, schedulerShutdown := util.StartScheduler(clientSet) _, podInformer, schedulerShutdown := util.StartScheduler(clientSet)
shutdownFunc := func() { shutdownFunc := func() {
schedulerShutdown() schedulerShutdown()
apiShutdown() apiShutdown()
} }
return shutdownFunc, clientSet return shutdownFunc, podInformer, clientSet
} }
func getScheduledPods(podInformer coreinformers.PodInformer) ([]*v1.Pod, error) { func getScheduledPods(podInformer coreinformers.PodInformer) ([]*v1.Pod, error) {
@ -56,6 +56,7 @@ func getScheduledPods(podInformer coreinformers.PodInformer) ([]*v1.Pod, error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
scheduled := make([]*v1.Pod, 0, len(pods)) scheduled := make([]*v1.Pod, 0, len(pods))
for i := range pods { for i := range pods {
pod := pods[i] pod := pods[i]

View File

@ -19,6 +19,7 @@ go_library(
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/gce:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/gce:go_default_library",

View File

@ -22,6 +22,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/klog" "k8s.io/klog"
@ -57,8 +58,9 @@ func StartApiserver() (string, ShutdownFunc) {
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface // StartScheduler configures and starts a scheduler given a handle to the clientSet interface
// and event broadcaster. It returns the running scheduler and the shutdown function to stop it. // and event broadcaster. It returns the running scheduler and the shutdown function to stop it.
func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, ShutdownFunc) { func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, coreinformers.PodInformer, ShutdownFunc) {
informerFactory := informers.NewSharedInformerFactory(clientSet, 0) informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
podInformer := informerFactory.Core().V1().Pods()
stopCh := make(chan struct{}) stopCh := make(chan struct{})
evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: clientSet.EventsV1beta1().Events("")}) Interface: clientSet.EventsV1beta1().Events("")})
@ -70,15 +72,10 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, Shutdo
v1.DefaultSchedulerName, v1.DefaultSchedulerName,
) )
sched, err := createScheduler(clientSet, informerFactory, recorder, stopCh) sched, err := createScheduler(clientSet, informerFactory, podInformer, recorder, stopCh)
if err != nil { if err != nil {
klog.Fatalf("Error creating scheduler: %v", err) klog.Fatalf("Error creating scheduler: %v", err)
} }
scheduler.AddAllEventHandlers(sched,
v1.DefaultSchedulerName,
informerFactory,
informerFactory.Core().V1().Pods(),
)
informerFactory.Start(stopCh) informerFactory.Start(stopCh)
sched.Run() sched.Run()
@ -88,13 +85,14 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, Shutdo
close(stopCh) close(stopCh)
klog.Infof("destroyed scheduler") klog.Infof("destroyed scheduler")
} }
return sched, shutdownFunc return sched, podInformer, shutdownFunc
} }
// createScheduler create a scheduler with given informer factory and default name. // createScheduler create a scheduler with given informer factory and default name.
func createScheduler( func createScheduler(
clientSet clientset.Interface, clientSet clientset.Interface,
informerFactory informers.SharedInformerFactory, informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
recorder events.EventRecorder, recorder events.EventRecorder,
stopCh <-chan struct{}, stopCh <-chan struct{},
) (*scheduler.Scheduler, error) { ) (*scheduler.Scheduler, error) {
@ -103,7 +101,7 @@ func createScheduler(
return scheduler.New( return scheduler.New(
clientSet, clientSet,
informerFactory, informerFactory,
informerFactory.Core().V1().Pods(), podInformer,
recorder, recorder,
schedulerconfig.SchedulerAlgorithmSource{ schedulerconfig.SchedulerAlgorithmSource{
Provider: &defaultProviderName, Provider: &defaultProviderName,