diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index dc8a00dd9b4..0cda120ff2f 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -1194,7 +1194,10 @@ func RunBenchmarkPerfScheduling(b *testing.B, configFile string, topicName strin b.Fatalf("workload %s is not valid: %v", w.Name, err) } - results := runWorkload(tCtx, tc, w, informerFactory) + results, err := runWorkload(tCtx, tc, w, informerFactory) + if err != nil { + tCtx.Fatalf("%w: %s", w.Name, err) + } dataItems.DataItems = append(dataItems.DataItems, results...) if len(results) > 0 { @@ -1410,7 +1413,7 @@ func checkEmptyInFlightEvents() error { return nil } -func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector) { +func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector, error) { collectorCtx := ktesting.WithCancel(tCtx) workloadName := tCtx.Name() // The first part is the same for each workload, therefore we can strip it. @@ -1421,7 +1424,7 @@ func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, collector := collector err := collector.init() if err != nil { - tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err) + return nil, nil, fmt.Errorf("op %d: Failed to initialize data collector: %v", opIndex, err) } collectorWG.Add(1) go func() { @@ -1429,12 +1432,12 @@ func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, collector.run(collectorCtx) }() } - return collectorCtx, collectors + return collectorCtx, collectors, nil } -func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) []DataItem { +func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) ([]DataItem, error) { if collectorCtx == nil { - tCtx.Fatalf("op %d: Missing startCollectingMetrics operation before stopping", opIndex) + return nil, fmt.Errorf("op %d: Missing startCollectingMetrics operation before stopping", opIndex) } collectorCtx.Cancel("collecting metrics, collector must stop first") collectorWG.Wait() @@ -1447,7 +1450,7 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex tCtx.Errorf("op %d: %s", opIndex, err) } } - return dataItems + return dataItems, nil } type WorkloadExecutor struct { @@ -1465,7 +1468,7 @@ type WorkloadExecutor struct { nextNodeIndex int } -func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem { +func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) ([]DataItem, error) { b, benchmarking := tCtx.TB().(*testing.B) if benchmarking { start := time.Now() @@ -1513,70 +1516,74 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { realOp, err := op.realOp.patchParams(w) if err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) + return nil, fmt.Errorf("op %d: %v", opIndex, err) } select { case <-tCtx.Done(): - tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx)) + return nil, fmt.Errorf("op %d: %v", opIndex, context.Cause(tCtx)) default: } switch concreteOp := realOp.(type) { case *createNodesOp: - executor.runCreateNodesOp(opIndex, concreteOp) + err = executor.runCreateNodesOp(opIndex, concreteOp) case *createNamespacesOp: - executor.runCreateNamespaceOp(opIndex, concreteOp) + err = executor.runCreateNamespaceOp(opIndex, concreteOp) case *createPodsOp: - executor.runCreatePodsOp(opIndex, concreteOp) + err = executor.runCreatePodsOp(opIndex, concreteOp) case *deletePodsOp: - executor.runDeletePodsOp(opIndex, concreteOp) + err = executor.runDeletePodsOp(opIndex, concreteOp) case *churnOp: - executor.runChurnOp(opIndex, concreteOp) + err = executor.runChurnOp(opIndex, concreteOp) case *barrierOp: - executor.runBarrierOp(opIndex, concreteOp) + err = executor.runBarrierOp(opIndex, concreteOp) case *sleepOp: executor.runSleepOp(concreteOp) case *startCollectingMetricsOp: - executor.runStartCollectingMetricsOp(opIndex, concreteOp) + err = executor.runStartCollectingMetricsOp(opIndex, concreteOp) case *stopCollectingMetricsOp: - executor.runStopCollectingMetrics(opIndex) + err = executor.runStopCollectingMetrics(opIndex) default: - executor.runDefaultOp(opIndex, concreteOp) + err = executor.runDefaultOp(opIndex, concreteOp) + } + if err != nil { + return nil, err } } // check unused params and inform users unusedParams := w.unusedParams() if len(unusedParams) != 0 { - tCtx.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) + return nil, fmt.Errorf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) } // Some tests have unschedulable pods. Do not add an implicit barrier at the // end as we do not want to wait for them. - return executor.dataItems + return executor.dataItems, nil } -func (e *WorkloadExecutor) runCreateNodesOp(opIndex int, op *createNodesOp) { +func (e *WorkloadExecutor) runCreateNodesOp(opIndex int, op *createNodesOp) error { nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), op, e.tCtx.Client()) if err != nil { - e.tCtx.Fatalf("op %d: %v", opIndex, err) + return fmt.Errorf("op %d: %v", opIndex, err) } if err := nodePreparer.PrepareNodes(e.tCtx, e.nextNodeIndex); err != nil { - e.tCtx.Fatalf("op %d: %v", opIndex, err) + return fmt.Errorf("op %d: %v", opIndex, err) } e.nextNodeIndex += op.Count + return nil } -func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespacesOp) { +func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespacesOp) error { nsPreparer, err := newNamespacePreparer(e.tCtx, op) if err != nil { - e.tCtx.Fatalf("op %d: %v", opIndex, err) + return fmt.Errorf("op %d: %v", opIndex, err) } if err := nsPreparer.prepare(e.tCtx); err != nil { err2 := nsPreparer.cleanup(e.tCtx) if err2 != nil { err = fmt.Errorf("prepare: %w; cleanup: %w", err, err2) } - e.tCtx.Fatalf("op %d: %v", opIndex, err) + return fmt.Errorf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { if _, ok := e.numPodsScheduledPerNamespace[n]; ok { @@ -1585,25 +1592,26 @@ func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespace } e.numPodsScheduledPerNamespace[n] = 0 } + return nil } -func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) { +func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) error { for _, namespace := range op.Namespaces { if _, ok := e.numPodsScheduledPerNamespace[namespace]; !ok { - e.tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) + return fmt.Errorf("op %d: unknown namespace %s", opIndex, namespace) } } switch op.StageRequirement { case Attempted: if err := waitUntilPodsAttempted(e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { - e.tCtx.Fatalf("op %d: %v", opIndex, err) + return fmt.Errorf("op %d: %v", opIndex, err) } case Scheduled: // Default should be treated like "Scheduled", so handling both in the same way. fallthrough default: if err := waitUntilPodsScheduled(e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { - e.tCtx.Fatalf("op %d: %v", opIndex, err) + return fmt.Errorf("op %d: %v", opIndex, err) } // At the end of the barrier, we can be sure that there are no pods // pending scheduling in the namespaces that we just blocked on. @@ -1615,6 +1623,7 @@ func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) { } } } + return nil } func (e *WorkloadExecutor) runSleepOp(op *sleepOp) { @@ -1624,13 +1633,17 @@ func (e *WorkloadExecutor) runSleepOp(op *sleepOp) { } } -func (e *WorkloadExecutor) runStopCollectingMetrics(opIndex int) { - items := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) +func (e *WorkloadExecutor) runStopCollectingMetrics(opIndex int) error { + items, err := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) + if err != nil { + return err + } e.dataItems = append(e.dataItems, items...) e.collectorCtx = nil + return nil } -func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) { +func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) error { // define Pod's namespace automatically, and create that namespace. namespace := fmt.Sprintf("namespace-%d", opIndex) if op.Namespace != nil { @@ -1643,9 +1656,13 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) { if op.CollectMetrics { if e.collectorCtx != nil { - e.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + return fmt.Errorf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + } + var err error + e.collectorCtx, e.collectors, err = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) + if err != nil { + return err } - e.collectorCtx, e.collectors = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) e.tCtx.TB().Cleanup(func() { if e.collectorCtx != nil { e.collectorCtx.Cancel("cleaning up") @@ -1653,7 +1670,7 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) { }) } if err := createPodsRapidly(e.tCtx, namespace, op); err != nil { - e.tCtx.Fatalf("op %d: %v", opIndex, err) + return fmt.Errorf("op %d: %v", opIndex, err) } switch { case op.SkipWaitToCompletion: @@ -1662,29 +1679,33 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) { e.numPodsScheduledPerNamespace[namespace] += op.Count case op.SteadyState: if err := createPodsSteadily(e.tCtx, namespace, e.podInformer, op); err != nil { - e.tCtx.Fatalf("op %d: %v", opIndex, err) + return fmt.Errorf("op %d: %v", opIndex, err) } default: if err := waitUntilPodsScheduledInNamespace(e.tCtx, e.podInformer, nil, namespace, op.Count); err != nil { - e.tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) + return fmt.Errorf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } if op.CollectMetrics { // CollectMetrics and SkipWaitToCompletion can never be true at the // same time, so if we're here, it means that all pods have been // scheduled. - items := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) + items, err := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) + if err != nil { + return err + } e.dataItems = append(e.dataItems, items...) e.collectorCtx = nil } + return nil } -func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) { +func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) error { labelSelector := labels.ValidatedSetSelector(op.LabelSelector) podsToDelete, err := e.podInformer.Lister().Pods(op.Namespace).List(labelSelector) if err != nil { - e.tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, op.Namespace, err) + return fmt.Errorf("op %d: error in listing pods in the namespace %s: %v", opIndex, op.Namespace, err) } deletePods := func(opIndex int) { @@ -1727,9 +1748,10 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) { } else { deletePods(opIndex) } + return nil } -func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { +func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) error { var namespace string if op.Namespace != nil { namespace = *op.Namespace @@ -1740,7 +1762,7 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { // Ensure the namespace exists. nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} if _, err := e.tCtx.Client().CoreV1().Namespaces().Create(e.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - e.tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + return fmt.Errorf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) } var churnFns []func(name string) string @@ -1748,12 +1770,12 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { for i, path := range op.TemplatePaths { unstructuredObj, gvk, err := getUnstructuredFromFile(path) if err != nil { - e.tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + return fmt.Errorf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) } // Obtain GVR. mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { - e.tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + return fmt.Errorf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) } gvr := mapping.Resource // Distinguish cluster-scoped with namespaced API objects. @@ -1833,41 +1855,49 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { } }() } + return nil } -func (e *WorkloadExecutor) runDefaultOp(opIndex int, op realOp) { +func (e *WorkloadExecutor) runDefaultOp(opIndex int, op realOp) error { runable, ok := op.(runnableOp) if !ok { - e.tCtx.Fatalf("op %d: invalid op %v", opIndex, op) + return fmt.Errorf("op %d: invalid op %v", opIndex, op) } for _, namespace := range runable.requiredNamespaces() { createNamespaceIfNotPresent(e.tCtx, namespace, &e.numPodsScheduledPerNamespace) } runable.run(e.tCtx) + return nil } -func (e *WorkloadExecutor) runStartCollectingMetricsOp(opIndex int, op *startCollectingMetricsOp) { +func (e *WorkloadExecutor) runStartCollectingMetricsOp(opIndex int, op *startCollectingMetricsOp) error { if e.collectorCtx != nil { - e.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + return fmt.Errorf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + } + var err error + e.collectorCtx, e.collectors, err = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, op.Name, op.Namespaces, op.LabelSelector) + if err != nil { + return err } - e.collectorCtx, e.collectors = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, op.Name, op.Namespaces, op.LabelSelector) e.tCtx.TB().Cleanup(func() { if e.collectorCtx != nil { e.collectorCtx.Cancel("cleaning up") } }) + return nil } -func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) { +func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) error { if _, ok := (*podsPerNamespace)[namespace]; !ok { // The namespace has not created yet. // So, create that and register it. _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) if err != nil { - tCtx.Fatalf("failed to create namespace for Pod: %v", namespace) + return fmt.Errorf("failed to create namespace for Pod: %v", namespace) } (*podsPerNamespace)[namespace] = 0 } + return nil } type testDataCollector interface {