diff --git a/test/integration/scheduler_perf/BUILD b/test/integration/scheduler_perf/BUILD index d16f19019ab..9b165e36230 100644 --- a/test/integration/scheduler_perf/BUILD +++ b/test/integration/scheduler_perf/BUILD @@ -45,6 +45,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", "//test/integration/framework:go_default_library", diff --git a/test/integration/scheduler_perf/scheduler_bench_test.go b/test/integration/scheduler_perf/scheduler_bench_test.go index 77d61b4a1f6..5f24adea388 100644 --- a/test/integration/scheduler_perf/scheduler_bench_test.go +++ b/test/integration/scheduler_perf/scheduler_bench_test.go @@ -18,6 +18,7 @@ package benchmark import ( "fmt" + "sync/atomic" "testing" "time" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/tools/cache" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/csi-translation-lib/plugins" csilibplugins "k8s.io/csi-translation-lib/plugins" @@ -388,26 +390,30 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int, } time.Sleep(1 * time.Second) } + + scheduled := int32(0) + completedCh := make(chan struct{}) + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, cur interface{}) { + curPod := cur.(*v1.Pod) + oldPod := old.(*v1.Pod) + + if len(oldPod.Spec.NodeName) == 0 && len(curPod.Spec.NodeName) > 0 { + if atomic.AddInt32(&scheduled, 1) >= int32(b.N) { + completedCh <- struct{}{} + } + } + }, + }) + // start benchmark b.ResetTimer() config = testutils.NewTestPodCreatorConfig() config.AddStrategy("sched-test", b.N, testPodStrategy) podCreator = testutils.NewTestPodCreator(clientset, config) podCreator.CreatePods() - for { - // TODO: Setup watch on apiserver and wait until all pods scheduled. - scheduled, err := getScheduledPods(podInformer) - if err != nil { - klog.Fatalf("%v", err) - } - if len(scheduled) >= numExistingPods+b.N { - break - } - // Note: This might introduce slight deviation in accuracy of benchmark results. - // Since the total amount of time is relatively large, it might not be a concern. - time.Sleep(100 * time.Millisecond) - } + <-completedCh // Note: without this line we're taking the overhead of defer() into account. b.StopTimer() diff --git a/test/integration/scheduler_perf/scheduler_test.go b/test/integration/scheduler_perf/scheduler_test.go index 314b6809ab1..bc84ce73205 100644 --- a/test/integration/scheduler_perf/scheduler_test.go +++ b/test/integration/scheduler_perf/scheduler_test.go @@ -17,9 +17,11 @@ limitations under the License. package benchmark import ( + "context" "fmt" "math" "strconv" + "sync/atomic" "testing" "time" @@ -28,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/scheduler/factory" testutils "k8s.io/kubernetes/test/utils" @@ -131,7 +134,7 @@ func getBaseConfig(nodes int, pods int) *testConfig { // It returns the minimum of throughput over whole run. func schedulePods(config *testConfig) int32 { defer config.destroyFunc() - prev := 0 + prev := int32(0) // On startup there may be a latent period where NO scheduling occurs (qps = 0). // We are interested in low scheduling rates (i.e. qps=2), minQPS := int32(math.MaxInt32) @@ -151,39 +154,59 @@ func schedulePods(config *testConfig) int32 { break } } - // map minimum QPS entries in a counter, useful for debugging tests. - qpsStats := map[int]int{} - // Now that scheduling has started, lets start taking the pulse on how many pods are happening per second. - for { - // TODO: Setup watch on apiserver and wait until all pods scheduled. - scheduled, err := getScheduledPods(podInformer) - if err != nil { - klog.Fatalf("%v", err) - } - // We will be completed when all pods are done being scheduled. - // return the worst-case-scenario interval that was seen during this time. - // Note this should never be low due to cold-start, so allow bake in sched time if necessary. - if len(scheduled) >= config.numPods { - consumed := int(time.Since(start) / time.Second) - if consumed <= 0 { - consumed = 1 + scheduled := int32(0) + ctx, cancel := context.WithCancel(context.Background()) + config.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, cur interface{}) { + curPod := cur.(*v1.Pod) + oldPod := old.(*v1.Pod) + + if len(oldPod.Spec.NodeName) == 0 && len(curPod.Spec.NodeName) > 0 { + if atomic.AddInt32(&scheduled, 1) >= int32(config.numPods) { + cancel() + } } - fmt.Printf("Scheduled %v Pods in %v seconds (%v per second on average). min QPS was %v\n", - config.numPods, consumed, config.numPods/consumed, minQPS) - return minQPS - } + }, + }) - // There's no point in printing it for the last iteration, as the value is random - qps := len(scheduled) - prev - qpsStats[qps]++ - if int32(qps) < minQPS { - minQPS = int32(qps) + // map minimum QPS entries in a counter, useful for debugging tests. + qpsStats := map[int32]int{} + + ticker := time.NewTicker(1 * time.Second) + go func() { + for { + select { + case <-ticker.C: + scheduled := atomic.LoadInt32(&scheduled) + qps := scheduled - prev + qpsStats[qps]++ + if qps < minQPS { + minQPS = qps + } + fmt.Printf("%ds\trate: %d\ttotal: %d (qps frequency: %v)\n", time.Since(start)/time.Second, qps, scheduled, qpsStats) + prev = scheduled + + case <-ctx.Done(): + return + } } - fmt.Printf("%ds\trate: %d\ttotal: %d (qps frequency: %v)\n", time.Since(start)/time.Second, qps, len(scheduled), qpsStats) - prev = len(scheduled) - time.Sleep(1 * time.Second) + }() + + <-ctx.Done() + + ticker.Stop() + + // We will be completed when all pods are done being scheduled. + // return the worst-case-scenario interval that was seen during this time. + // Note this should never be low due to cold-start, so allow bake in sched time if necessary. + consumed := int(time.Since(start) / time.Second) + if consumed <= 0 { + consumed = 1 } + fmt.Printf("Scheduled %v Pods in %v seconds (%v per second on average). min QPS was %v\n", + config.numPods, consumed, config.numPods/consumed, minQPS) + return minQPS } // mutateNodeTemplate returns the modified node needed for creation of nodes.