diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index e560f5a8a03..7348e9eba2c 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -835,7 +835,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ if concreteOp.CollectMetrics { collectorCtx, collectorCancel = context.WithCancel(ctx) defer collectorCancel() - collectors = getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig) + collectors = getTestDataCollectors(b, podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig) for _, collector := range collectors { // Need loop-local variable for function below. collector := collector @@ -1025,12 +1025,12 @@ type testDataCollector interface { collect() []DataItem } -func getTestDataCollectors(podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector { +func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector { if mcc == nil { mcc = &defaultMetricsCollectorConfig } return []testDataCollector{ - newThroughputCollector(podInformer, map[string]string{"Name": name}, []string{namespace}), + newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}), newMetricsCollector(mcc, map[string]string{"Name": name}), } } diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index ca94f5196b4..cf3c9292696 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -50,10 +50,10 @@ import ( ) const ( - dateFormat = "2006-01-02T15:04:05Z" - testNamespace = "sched-test" - setupNamespace = "sched-setup" - throughputSampleFrequency = time.Second + dateFormat = "2006-01-02T15:04:05Z" + testNamespace = "sched-test" + setupNamespace = "sched-setup" + throughputSampleInterval = time.Second ) var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard") @@ -286,14 +286,16 @@ func collectHistogramVec(metric string, labels map[string]string, lvMap map[stri } type throughputCollector struct { + tb testing.TB podInformer coreinformers.PodInformer schedulingThroughputs []float64 labels map[string]string namespaces []string } -func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector { +func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector { return &throughputCollector{ + tb: tb, podInformer: podInformer, labels: labels, namespaces: namespaces, @@ -306,28 +308,71 @@ func (tc *throughputCollector) run(ctx context.Context) { klog.Fatalf("%v", err) } lastScheduledCount := len(podsScheduled) - ticker := time.NewTicker(throughputSampleFrequency) + ticker := time.NewTicker(throughputSampleInterval) defer ticker.Stop() + lastSampleTime := time.Now() + started := false + skipped := 0 + for { select { case <-ctx.Done(): return case <-ticker.C: + now := time.Now() podsScheduled, err := getScheduledPods(tc.podInformer, tc.namespaces...) if err != nil { klog.Fatalf("%v", err) } scheduled := len(podsScheduled) - // Only do sampling if number of scheduled pods is greater than zero - if scheduled > 0 { - samplingRatioSeconds := float64(throughputSampleFrequency) / float64(time.Second) - throughput := float64(scheduled-lastScheduledCount) / samplingRatioSeconds - tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput) + // Only do sampling if number of scheduled pods is greater than zero. + if scheduled == 0 { + continue + } + if !started { + started = true + // Skip the initial sample. It's likely to be an outlier because + // sampling and creating pods get started independently. lastScheduledCount = scheduled - klog.Infof("%d pods scheduled", lastScheduledCount) + lastSampleTime = now + continue } + newScheduled := scheduled - lastScheduledCount + if newScheduled == 0 { + // Throughput would be zero for the interval. + // Instead of recording 0 pods/s, keep waiting + // until we see at least one additional pod + // being scheduled. + skipped++ + continue + } + + // This should be roughly equal to + // throughputSampleInterval * (skipped + 1), but we + // don't count on that because the goroutine might not + // be scheduled immediately when the timer + // triggers. Instead we track the actual time stamps. + duration := now.Sub(lastSampleTime) + durationInSeconds := duration.Seconds() + throughput := float64(newScheduled) / durationInSeconds + expectedDuration := throughputSampleInterval * time.Duration(skipped+1) + errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100 + if math.Abs(errorMargin) > 5 { + // This might affect the result, report it. + tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin) + } + + // To keep percentiles accurate, we have to record multiple samples with the same + // throughput value if we skipped some intervals. + for i := 0; i <= skipped; i++ { + tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput) + } + lastScheduledCount = scheduled + klog.Infof("%d pods scheduled", lastScheduledCount) + skipped = 0 + lastSampleTime = now } } }