diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index 898cdf0a914..a5ddbfa5abb 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -19,13 +19,13 @@ package benchmark import ( "fmt" "io/ioutil" - "sync/atomic" "testing" "time" v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/cache" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog" @@ -39,11 +39,13 @@ const ( ) var ( - defaultMetrics = []string{ - "scheduler_scheduling_algorithm_predicate_evaluation_seconds", - "scheduler_scheduling_algorithm_priority_evaluation_seconds", - "scheduler_binding_duration_seconds", - "scheduler_e2e_scheduling_duration_seconds", + defaultMetricsCollectorConfig = metricsCollectorConfig{ + Metrics: []string{ + "scheduler_scheduling_algorithm_predicate_evaluation_seconds", + "scheduler_scheduling_algorithm_priority_evaluation_seconds", + "scheduler_binding_duration_seconds", + "scheduler_e2e_scheduling_duration_seconds", + }, } ) @@ -52,8 +54,8 @@ var ( // // It specifies nodes and pods in the cluster before running the test. It also specifies the pods to // schedule during the test. The config can be as simple as just specify number of nodes/pods, where -// default spec will be applied. It also allows the user to specify a pod spec template for more compicated -// test cases. +// default spec will be applied. It also allows the user to specify a pod spec template for more +// complicated test cases. // // It also specifies the metrics to be collected after the test. If nothing is specified, default metrics // such as scheduling throughput and latencies will be collected. @@ -68,6 +70,8 @@ type testCase struct { PodsToSchedule podCase // optional, feature gates to set before running the test FeatureGates map[featuregate.Feature]bool + // optional, replaces default defaultMetricsCollectorConfig if supplied. + MetricsCollectorConfig *metricsCollectorConfig } type nodeCase struct { @@ -100,6 +104,11 @@ type testParams struct { NumPodsToSchedule int } +type testDataCollector interface { + run(stopCh chan struct{}) + collect() []DataItem +} + func BenchmarkPerfScheduling(b *testing.B) { dataItems := DataItems{Version: "v1"} tests := getSimpleTestCases(configFile) @@ -119,119 +128,97 @@ func BenchmarkPerfScheduling(b *testing.B) { } func perfScheduling(test testCase, b *testing.B) []DataItem { - var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{} - if test.Nodes.NodeAllocatableStrategy != nil { - nodeStrategy = test.Nodes.NodeAllocatableStrategy - } else if test.Nodes.LabelNodePrepareStrategy != nil { - nodeStrategy = test.Nodes.LabelNodePrepareStrategy - } else if test.Nodes.UniqueNodeLabelStrategy != nil { - nodeStrategy = test.Nodes.UniqueNodeLabelStrategy - } - - setupPodStrategy := getPodStrategy(test.InitPods) - testPodStrategy := getPodStrategy(test.PodsToSchedule) - - var nodeSpec *v1.Node - if test.Nodes.NodeTemplatePath != nil { - nodeSpec = getNodeSpecFromFile(test.Nodes.NodeTemplatePath) - } - finalFunc, podInformer, clientset := mustSetupScheduler() defer finalFunc() - var nodePreparer testutils.TestNodePreparer - if nodeSpec != nil { - nodePreparer = framework.NewIntegrationTestNodePreparerWithNodeSpec( - clientset, - []testutils.CountToStrategy{{Count: test.Nodes.Num, Strategy: nodeStrategy}}, - nodeSpec, - ) - } else { - nodePreparer = framework.NewIntegrationTestNodePreparer( - clientset, - []testutils.CountToStrategy{{Count: test.Nodes.Num, Strategy: nodeStrategy}}, - "scheduler-perf-", - ) - } - + nodePreparer := getNodePreparer(test.Nodes, clientset) if err := nodePreparer.PrepareNodes(); err != nil { klog.Fatalf("%v", err) } defer nodePreparer.CleanupNodes() - config := testutils.NewTestPodCreatorConfig() - config.AddStrategy(setupNamespace, test.InitPods.Num, setupPodStrategy) - podCreator := testutils.NewTestPodCreator(clientset, config) - podCreator.CreatePods() + createPods(setupNamespace, test.InitPods, clientset) + waitNumPodsScheduled(test.InitPods.Num, podInformer) + // start benchmark + b.ResetTimer() + + // Start test data collectors. + stopCh := make(chan struct{}) + collectors := getTestDataCollectors(test, podInformer, b) + for _, collector := range collectors { + go collector.run(stopCh) + } + + // Schedule the main workload + createPods(testNamespace, test.PodsToSchedule, clientset) + waitNumPodsScheduled(test.InitPods.Num+test.PodsToSchedule.Num, podInformer) + + close(stopCh) + // Note: without this line we're taking the overhead of defer() into account. + b.StopTimer() + + var dataItems []DataItem + for _, collector := range collectors { + dataItems = append(dataItems, collector.collect()...) + } + return dataItems +} + +func waitNumPodsScheduled(num int, podInformer coreinformers.PodInformer) { for { scheduled, err := getScheduledPods(podInformer) if err != nil { klog.Fatalf("%v", err) } - if len(scheduled) >= test.InitPods.Num { + if len(scheduled) >= num { break } - klog.Infof("got %d existing pods, required: %d", len(scheduled), test.InitPods.Num) + klog.Infof("got %d existing pods, required: %d", len(scheduled), num) time.Sleep(1 * time.Second) } +} - scheduled := int32(0) - completedCh := make(chan struct{}) - podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(old, cur interface{}) { - curPod := cur.(*v1.Pod) - oldPod := old.(*v1.Pod) +func getTestDataCollectors(tc testCase, podInformer coreinformers.PodInformer, b *testing.B) []testDataCollector { + collectors := []testDataCollector{newThroughputCollector(podInformer, map[string]string{"Name": b.Name()})} + metricsCollectorConfig := defaultMetricsCollectorConfig + if tc.MetricsCollectorConfig != nil { + metricsCollectorConfig = *tc.MetricsCollectorConfig + } + collectors = append(collectors, newMetricsCollector(metricsCollectorConfig, map[string]string{"Name": b.Name()})) + return collectors +} - if len(oldPod.Spec.NodeName) == 0 && len(curPod.Spec.NodeName) > 0 { - if atomic.AddInt32(&scheduled, 1) >= int32(test.PodsToSchedule.Num) { - completedCh <- struct{}{} - } - } - }, - }) +func getNodePreparer(nc nodeCase, clientset clientset.Interface) testutils.TestNodePreparer { + var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{} + if nc.NodeAllocatableStrategy != nil { + nodeStrategy = nc.NodeAllocatableStrategy + } else if nc.LabelNodePrepareStrategy != nil { + nodeStrategy = nc.LabelNodePrepareStrategy + } else if nc.UniqueNodeLabelStrategy != nil { + nodeStrategy = nc.UniqueNodeLabelStrategy + } - // start benchmark - b.ResetTimer() + if nc.NodeTemplatePath != nil { + return framework.NewIntegrationTestNodePreparerWithNodeSpec( + clientset, + []testutils.CountToStrategy{{Count: nc.Num, Strategy: nodeStrategy}}, + getNodeSpecFromFile(nc.NodeTemplatePath), + ) + } + return framework.NewIntegrationTestNodePreparer( + clientset, + []testutils.CountToStrategy{{Count: nc.Num, Strategy: nodeStrategy}}, + "scheduler-perf-", + ) +} - // Start measuring throughput - stopCh := make(chan struct{}) - throughputCollector := newThroughputCollector(podInformer) - go throughputCollector.run(stopCh) - - // Scheduling the main workload - config = testutils.NewTestPodCreatorConfig() - config.AddStrategy(testNamespace, test.PodsToSchedule.Num, testPodStrategy) - podCreator = testutils.NewTestPodCreator(clientset, config) +func createPods(ns string, pc podCase, clientset clientset.Interface) { + strategy := getPodStrategy(pc) + config := testutils.NewTestPodCreatorConfig() + config.AddStrategy(ns, pc.Num, strategy) + podCreator := testutils.NewTestPodCreator(clientset, config) podCreator.CreatePods() - - <-completedCh - close(stopCh) - - // Note: without this line we're taking the overhead of defer() into account. - b.StopTimer() - - setNameLabel := func(dataItem *DataItem) DataItem { - if dataItem.Labels == nil { - dataItem.Labels = map[string]string{} - } - dataItem.Labels["Name"] = b.Name() - return *dataItem - } - - dataItems := []DataItem{ - setNameLabel(throughputCollector.collect()), - } - - for _, metric := range defaultMetrics { - dataItem := newMetricsCollector(metric).collect() - if dataItem == nil { - continue - } - dataItems = append(dataItems, setNameLabel(dataItem)) - } - - return dataItems } func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy { diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index fb8ea837bab..b099b73caa5 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -118,20 +118,42 @@ func dataItems2JSONFile(dataItems DataItems, namePrefix string) error { return ioutil.WriteFile(destFile, b, 0644) } +// metricsCollectorConfig is the config to be marshalled to YAML config file. +type metricsCollectorConfig struct { + Metrics []string +} + // metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint. // Currently only Histrogram metrics are supported. type metricsCollector struct { - metric string + metricsCollectorConfig + labels map[string]string } -func newMetricsCollector(metric string) *metricsCollector { +func newMetricsCollector(config metricsCollectorConfig, labels map[string]string) *metricsCollector { return &metricsCollector{ - metric: metric, + metricsCollectorConfig: config, + labels: labels, } } -func (pc *metricsCollector) collect() *DataItem { - hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, pc.metric) +func (*metricsCollector) run(stopCh chan struct{}) { + // metricCollector doesn't need to start before the tests, so nothing to do here. +} + +func (pc *metricsCollector) collect() []DataItem { + var dataItems []DataItem + for _, metric := range pc.Metrics { + dataItem := collectHistogram(metric, pc.labels) + if dataItem != nil { + dataItems = append(dataItems, *dataItem) + } + } + return dataItems +} + +func collectHistogram(metric string, labels map[string]string) *DataItem { + hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, metric) if err != nil { klog.Error(err) return nil @@ -153,10 +175,13 @@ func (pc *metricsCollector) collect() *DataItem { msFactor := float64(time.Second) / float64(time.Millisecond) + // Copy labels and add "Metric" label for this metric. + labelMap := map[string]string{"Metric": metric} + for k, v := range labels { + labelMap[k] = v + } return &DataItem{ - Labels: map[string]string{ - "Metric": pc.metric, - }, + Labels: labelMap, Data: map[string]float64{ "Perc50": q50 * msFactor, "Perc90": q90 * msFactor, @@ -170,11 +195,13 @@ func (pc *metricsCollector) collect() *DataItem { type throughputCollector struct { podInformer coreinformers.PodInformer schedulingThroughputs []float64 + labels map[string]string } -func newThroughputCollector(podInformer coreinformers.PodInformer) *throughputCollector { +func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string) *throughputCollector { return &throughputCollector{ podInformer: podInformer, + labels: labels, } } @@ -205,8 +232,8 @@ func (tc *throughputCollector) run(stopCh chan struct{}) { } } -func (tc *throughputCollector) collect() *DataItem { - throughputSummary := &DataItem{} +func (tc *throughputCollector) collect() []DataItem { + throughputSummary := DataItem{Labels: tc.labels} if length := len(tc.schedulingThroughputs); length > 0 { sort.Float64s(tc.schedulingThroughputs) sum := 0.0 @@ -225,5 +252,6 @@ func (tc *throughputCollector) collect() *DataItem { } throughputSummary.Unit = "pods/s" } - return throughputSummary + + return []DataItem{throughputSummary} }