diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 6a0c11c149f..19c1ddc352c 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -1194,7 +1194,10 @@ func RunBenchmarkPerfScheduling(b *testing.B, configFile string, topicName strin b.Fatalf("workload %s is not valid: %v", w.Name, err) } - results := runWorkload(tCtx, tc, w, informerFactory) + results, err := runWorkload(tCtx, tc, w, informerFactory) + if err != nil { + tCtx.Fatalf("Error running workload %s: %s", w.Name, err) + } dataItems.DataItems = append(dataItems.DataItems, results...) if len(results) > 0 { @@ -1292,7 +1295,10 @@ func RunIntegrationPerfScheduling(t *testing.T, configFile string) { t.Fatalf("workload %s is not valid: %v", w.Name, err) } - runWorkload(tCtx, tc, w, informerFactory) + _, err = runWorkload(tCtx, tc, w, informerFactory) + if err != nil { + tCtx.Fatalf("Error running workload %s: %s", w.Name, err) + } if featureGates[features.SchedulerQueueingHints] { // In any case, we should make sure InFlightEvents is empty after running the scenario. @@ -1410,7 +1416,7 @@ func checkEmptyInFlightEvents() error { return nil } -func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector) { +func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector, error) { collectorCtx := ktesting.WithCancel(tCtx) workloadName := tCtx.Name() // The first part is the same for each workload, therefore we can strip it. @@ -1421,20 +1427,23 @@ func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, collector := collector err := collector.init() if err != nil { - tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err) + return nil, nil, fmt.Errorf("failed to initialize data collector: %w", err) } + tCtx.TB().Cleanup(func() { + collectorCtx.Cancel("cleaning up") + }) collectorWG.Add(1) go func() { defer collectorWG.Done() collector.run(collectorCtx) }() } - return collectorCtx, collectors + return collectorCtx, collectors, nil } -func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) []DataItem { +func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) ([]DataItem, error) { if collectorCtx == nil { - tCtx.Fatalf("op %d: Missing startCollectingMetrics operation before stopping", opIndex) + return nil, fmt.Errorf("missing startCollectingMetrics operation before stopping") } collectorCtx.Cancel("collecting metrics, collector must stop first") collectorWG.Wait() @@ -1447,10 +1456,25 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex tCtx.Errorf("op %d: %s", opIndex, err) } } - return dataItems + return dataItems, nil } -func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem { +type WorkloadExecutor struct { + tCtx ktesting.TContext + wg sync.WaitGroup + collectorCtx ktesting.TContext + collectorWG sync.WaitGroup + collectors []testDataCollector + dataItems []DataItem + numPodsScheduledPerNamespace map[string]int + podInformer coreinformers.PodInformer + throughputErrorMargin float64 + testCase *testCase + workload *workload + nextNodeIndex int +} + +func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) ([]DataItem, error) { b, benchmarking := tCtx.TB().(*testing.B) if benchmarking { start := time.Now() @@ -1481,343 +1505,408 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact // Everything else started by this function gets stopped before it returns. tCtx = ktesting.WithCancel(tCtx) - var wg sync.WaitGroup - defer wg.Wait() - defer tCtx.Cancel("workload is done") - var dataItems []DataItem - nextNodeIndex := 0 - // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have. - // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. - numPodsScheduledPerNamespace := make(map[string]int) + executor := WorkloadExecutor{ + tCtx: tCtx, + numPodsScheduledPerNamespace: make(map[string]int), + podInformer: podInformer, + throughputErrorMargin: throughputErrorMargin, + testCase: tc, + workload: w, + } - var collectors []testDataCollector - // This needs a separate context and wait group because - // the metrics collecting needs to be sure that the goroutines - // are stopped. - var collectorCtx ktesting.TContext - var collectorWG sync.WaitGroup - defer collectorWG.Wait() + tCtx.TB().Cleanup(func() { + tCtx.Cancel("workload is done") + executor.collectorWG.Wait() + executor.wg.Wait() + }) for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { realOp, err := op.realOp.patchParams(w) if err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) + return nil, fmt.Errorf("op %d: %w", opIndex, err) } select { case <-tCtx.Done(): - tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx)) + return nil, fmt.Errorf("op %d: %w", opIndex, context.Cause(tCtx)) default: } - switch concreteOp := realOp.(type) { - case *createNodesOp: - nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client()) - if err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - nextNodeIndex += concreteOp.Count - - case *createNamespacesOp: - nsPreparer, err := newNamespacePreparer(tCtx, concreteOp) - if err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - if err := nsPreparer.prepare(tCtx); err != nil { - err2 := nsPreparer.cleanup(tCtx) - if err2 != nil { - err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2) - } - tCtx.Fatalf("op %d: %v", opIndex, err) - } - for _, n := range nsPreparer.namespaces() { - if _, ok := numPodsScheduledPerNamespace[n]; ok { - // this namespace has been already created. - continue - } - numPodsScheduledPerNamespace[n] = 0 - } - - case *createPodsOp: - var namespace string - // define Pod's namespace automatically, and create that namespace. - namespace = fmt.Sprintf("namespace-%d", opIndex) - if concreteOp.Namespace != nil { - namespace = *concreteOp.Namespace - } - createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) - if concreteOp.PodTemplatePath == nil { - concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath - } - - if concreteOp.CollectMetrics { - if collectorCtx != nil { - tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) - } - collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) - defer collectorCtx.Cancel("cleaning up") - } - if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - switch { - case concreteOp.SkipWaitToCompletion: - // Only record those namespaces that may potentially require barriers - // in the future. - numPodsScheduledPerNamespace[namespace] += concreteOp.Count - case concreteOp.SteadyState: - if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - default: - if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, nil, namespace, concreteOp.Count); err != nil { - tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) - } - } - if concreteOp.CollectMetrics { - // CollectMetrics and SkipWaitToCompletion can never be true at the - // same time, so if we're here, it means that all pods have been - // scheduled. - items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors) - dataItems = append(dataItems, items...) - collectorCtx = nil - } - - case *deletePodsOp: - labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector) - - podsToDelete, err := podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) - if err != nil { - tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) - } - - deletePods := func(opIndex int) { - if concreteOp.DeletePodsPerSecond > 0 { - ticker := time.NewTicker(time.Second / time.Duration(concreteOp.DeletePodsPerSecond)) - defer ticker.Stop() - - for i := 0; i < len(podsToDelete); i++ { - select { - case <-ticker.C: - if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { - if errors.Is(err, context.Canceled) { - return - } - tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) - } - case <-tCtx.Done(): - return - } - } - return - } - listOpts := metav1.ListOptions{ - LabelSelector: labelSelector.String(), - } - if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(tCtx, metav1.DeleteOptions{}, listOpts); err != nil { - if errors.Is(err, context.Canceled) { - return - } - tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err) - } - } - - if concreteOp.SkipWaitToCompletion { - wg.Add(1) - go func(opIndex int) { - defer wg.Done() - deletePods(opIndex) - }(opIndex) - } else { - deletePods(opIndex) - } - - case *churnOp: - var namespace string - if concreteOp.Namespace != nil { - namespace = *concreteOp.Namespace - } else { - namespace = fmt.Sprintf("namespace-%d", opIndex) - } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery())) - // Ensure the namespace exists. - nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} - if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) - } - - var churnFns []func(name string) string - - for i, path := range concreteOp.TemplatePaths { - unstructuredObj, gvk, err := getUnstructuredFromFile(path) - if err != nil { - tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) - } - // Obtain GVR. - mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) - } - gvr := mapping.Resource - // Distinguish cluster-scoped with namespaced API objects. - var dynRes dynamic.ResourceInterface - if mapping.Scope.Name() == meta.RESTScopeNameNamespace { - dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace) - } else { - dynRes = tCtx.Dynamic().Resource(gvr) - } - - churnFns = append(churnFns, func(name string) string { - if name != "" { - if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { - tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) - } - return "" - } - - live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{}) - if err != nil { - return "" - } - return live.GetName() - }) - } - - var interval int64 = 500 - if concreteOp.IntervalMilliseconds != 0 { - interval = concreteOp.IntervalMilliseconds - } - ticker := time.NewTicker(time.Duration(interval) * time.Millisecond) - defer ticker.Stop() - - switch concreteOp.Mode { - case Create: - wg.Add(1) - go func() { - defer wg.Done() - count, threshold := 0, concreteOp.Number - if threshold == 0 { - threshold = math.MaxInt32 - } - for count < threshold { - select { - case <-ticker.C: - for i := range churnFns { - churnFns[i]("") - } - count++ - case <-tCtx.Done(): - return - } - } - }() - case Recreate: - wg.Add(1) - go func() { - defer wg.Done() - retVals := make([][]string, len(churnFns)) - // For each churn function, instantiate a slice of strings with length "concreteOp.Number". - for i := range retVals { - retVals[i] = make([]string, concreteOp.Number) - } - - count := 0 - for { - select { - case <-ticker.C: - for i := range churnFns { - retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number]) - } - count++ - case <-tCtx.Done(): - return - } - } - }() - } - - case *barrierOp: - for _, namespace := range concreteOp.Namespaces { - if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { - tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) - } - } - switch concreteOp.StageRequirement { - case Attempted: - if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - case Scheduled: - // Default should be treated like "Scheduled", so handling both in the same way. - fallthrough - default: - if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - // At the end of the barrier, we can be sure that there are no pods - // pending scheduling in the namespaces that we just blocked on. - if len(concreteOp.Namespaces) == 0 { - numPodsScheduledPerNamespace = make(map[string]int) - } else { - for _, namespace := range concreteOp.Namespaces { - delete(numPodsScheduledPerNamespace, namespace) - } - } - } - - case *sleepOp: - select { - case <-tCtx.Done(): - case <-time.After(concreteOp.Duration.Duration): - } - - case *startCollectingMetricsOp: - if collectorCtx != nil { - tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) - } - collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector) - defer collectorCtx.Cancel("cleaning up") - - case *stopCollectingMetricsOp: - items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors) - dataItems = append(dataItems, items...) - collectorCtx = nil - - default: - runable, ok := concreteOp.(runnableOp) - if !ok { - tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) - } - for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) - } - runable.run(tCtx) + err = executor.runOp(realOp, opIndex) + if err != nil { + return nil, fmt.Errorf("op %d: %w", opIndex, err) } } // check unused params and inform users unusedParams := w.unusedParams() if len(unusedParams) != 0 { - tCtx.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) + return nil, fmt.Errorf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos", unusedParams, w.Name) } // Some tests have unschedulable pods. Do not add an implicit barrier at the // end as we do not want to wait for them. - return dataItems + return executor.dataItems, nil } -func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) { +func (e *WorkloadExecutor) runOp(op realOp, opIndex int) error { + switch concreteOp := op.(type) { + case *createNodesOp: + return e.runCreateNodesOp(opIndex, concreteOp) + case *createNamespacesOp: + return e.runCreateNamespaceOp(opIndex, concreteOp) + case *createPodsOp: + return e.runCreatePodsOp(opIndex, concreteOp) + case *deletePodsOp: + return e.runDeletePodsOp(opIndex, concreteOp) + case *churnOp: + return e.runChurnOp(opIndex, concreteOp) + case *barrierOp: + return e.runBarrierOp(opIndex, concreteOp) + case *sleepOp: + return e.runSleepOp(concreteOp) + case *startCollectingMetricsOp: + return e.runStartCollectingMetricsOp(opIndex, concreteOp) + case *stopCollectingMetricsOp: + return e.runStopCollectingMetrics(opIndex) + default: + return e.runDefaultOp(opIndex, concreteOp) + } +} + +func (e *WorkloadExecutor) runCreateNodesOp(opIndex int, op *createNodesOp) error { + nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), op, e.tCtx.Client()) + if err != nil { + return err + } + if err := nodePreparer.PrepareNodes(e.tCtx, e.nextNodeIndex); err != nil { + return err + } + e.nextNodeIndex += op.Count + return nil +} + +func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespacesOp) error { + nsPreparer, err := newNamespacePreparer(e.tCtx, op) + if err != nil { + return err + } + if err := nsPreparer.prepare(e.tCtx); err != nil { + err2 := nsPreparer.cleanup(e.tCtx) + if err2 != nil { + err = fmt.Errorf("prepare: %w; cleanup: %w", err, err2) + } + return err + } + for _, n := range nsPreparer.namespaces() { + if _, ok := e.numPodsScheduledPerNamespace[n]; ok { + // this namespace has been already created. + continue + } + e.numPodsScheduledPerNamespace[n] = 0 + } + return nil +} + +func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) error { + for _, namespace := range op.Namespaces { + if _, ok := e.numPodsScheduledPerNamespace[namespace]; !ok { + return fmt.Errorf("unknown namespace %s", namespace) + } + } + switch op.StageRequirement { + case Attempted: + if err := waitUntilPodsAttempted(e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { + return err + } + case Scheduled: + // Default should be treated like "Scheduled", so handling both in the same way. + fallthrough + default: + if err := waitUntilPodsScheduled(e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { + return err + } + // At the end of the barrier, we can be sure that there are no pods + // pending scheduling in the namespaces that we just blocked on. + if len(op.Namespaces) == 0 { + e.numPodsScheduledPerNamespace = make(map[string]int) + } else { + for _, namespace := range op.Namespaces { + delete(e.numPodsScheduledPerNamespace, namespace) + } + } + } + return nil +} + +func (e *WorkloadExecutor) runSleepOp(op *sleepOp) error { + select { + case <-e.tCtx.Done(): + case <-time.After(op.Duration.Duration): + } + return nil +} + +func (e *WorkloadExecutor) runStopCollectingMetrics(opIndex int) error { + items, err := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) + if err != nil { + return err + } + e.dataItems = append(e.dataItems, items...) + e.collectorCtx = nil + return nil +} + +func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) error { + // define Pod's namespace automatically, and create that namespace. + namespace := fmt.Sprintf("namespace-%d", opIndex) + if op.Namespace != nil { + namespace = *op.Namespace + } + err := createNamespaceIfNotPresent(e.tCtx, namespace, &e.numPodsScheduledPerNamespace) + if err != nil { + return err + } + if op.PodTemplatePath == nil { + op.PodTemplatePath = e.testCase.DefaultPodTemplatePath + } + + if op.CollectMetrics { + if e.collectorCtx != nil { + return fmt.Errorf("metrics collection is overlapping. Probably second collector was started before stopping a previous one") + } + var err error + e.collectorCtx, e.collectors, err = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) + if err != nil { + return err + } + } + if err := createPodsRapidly(e.tCtx, namespace, op); err != nil { + return err + } + switch { + case op.SkipWaitToCompletion: + // Only record those namespaces that may potentially require barriers + // in the future. + e.numPodsScheduledPerNamespace[namespace] += op.Count + case op.SteadyState: + if err := createPodsSteadily(e.tCtx, namespace, e.podInformer, op); err != nil { + return err + } + default: + if err := waitUntilPodsScheduledInNamespace(e.tCtx, e.podInformer, nil, namespace, op.Count); err != nil { + return fmt.Errorf("error in waiting for pods to get scheduled: %w", err) + } + } + if op.CollectMetrics { + // CollectMetrics and SkipWaitToCompletion can never be true at the + // same time, so if we're here, it means that all pods have been + // scheduled. + items, err := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) + if err != nil { + return err + } + e.dataItems = append(e.dataItems, items...) + e.collectorCtx = nil + } + return nil +} + +func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) error { + labelSelector := labels.ValidatedSetSelector(op.LabelSelector) + + podsToDelete, err := e.podInformer.Lister().Pods(op.Namespace).List(labelSelector) + if err != nil { + return fmt.Errorf("error in listing pods in the namespace %s: %w", op.Namespace, err) + } + + deletePods := func(opIndex int) { + if op.DeletePodsPerSecond > 0 { + ticker := time.NewTicker(time.Second / time.Duration(op.DeletePodsPerSecond)) + defer ticker.Stop() + + for i := 0; i < len(podsToDelete); i++ { + select { + case <-ticker.C: + if err := e.tCtx.Client().CoreV1().Pods(op.Namespace).Delete(e.tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { + if errors.Is(err, context.Canceled) { + return + } + e.tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) + } + case <-e.tCtx.Done(): + return + } + } + return + } + listOpts := metav1.ListOptions{ + LabelSelector: labelSelector.String(), + } + if err := e.tCtx.Client().CoreV1().Pods(op.Namespace).DeleteCollection(e.tCtx, metav1.DeleteOptions{}, listOpts); err != nil { + if errors.Is(err, context.Canceled) { + return + } + e.tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, op.Namespace, err) + } + } + + if op.SkipWaitToCompletion { + e.wg.Add(1) + go func(opIndex int) { + defer e.wg.Done() + deletePods(opIndex) + }(opIndex) + } else { + deletePods(opIndex) + } + return nil +} + +func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) error { + var namespace string + if op.Namespace != nil { + namespace = *op.Namespace + } else { + namespace = fmt.Sprintf("namespace-%d", opIndex) + } + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(e.tCtx.Client().Discovery())) + // Ensure the namespace exists. + nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} + if _, err := e.tCtx.Client().CoreV1().Namespaces().Create(e.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("unable to create namespace %v: %w", namespace, err) + } + + var churnFns []func(name string) string + + for i, path := range op.TemplatePaths { + unstructuredObj, gvk, err := getUnstructuredFromFile(path) + if err != nil { + return fmt.Errorf("unable to parse the %v-th template path: %w", i, err) + } + // Obtain GVR. + mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return fmt.Errorf("unable to find GVR for %v: %w", gvk, err) + } + gvr := mapping.Resource + // Distinguish cluster-scoped with namespaced API objects. + var dynRes dynamic.ResourceInterface + if mapping.Scope.Name() == meta.RESTScopeNameNamespace { + dynRes = e.tCtx.Dynamic().Resource(gvr).Namespace(namespace) + } else { + dynRes = e.tCtx.Dynamic().Resource(gvr) + } + + churnFns = append(churnFns, func(name string) string { + if name != "" { + if err := dynRes.Delete(e.tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { + e.tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) + } + return "" + } + + live, err := dynRes.Create(e.tCtx, unstructuredObj, metav1.CreateOptions{}) + if err != nil { + return "" + } + return live.GetName() + }) + } + + var interval int64 = 500 + if op.IntervalMilliseconds != 0 { + interval = op.IntervalMilliseconds + } + ticker := time.NewTicker(time.Duration(interval) * time.Millisecond) + + switch op.Mode { + case Create: + e.wg.Add(1) + go func() { + defer e.wg.Done() + defer ticker.Stop() + count, threshold := 0, op.Number + if threshold == 0 { + threshold = math.MaxInt32 + } + for count < threshold { + select { + case <-ticker.C: + for i := range churnFns { + churnFns[i]("") + } + count++ + case <-e.tCtx.Done(): + return + } + } + }() + case Recreate: + e.wg.Add(1) + go func() { + defer e.wg.Done() + defer ticker.Stop() + retVals := make([][]string, len(churnFns)) + // For each churn function, instantiate a slice of strings with length "op.Number". + for i := range retVals { + retVals[i] = make([]string, op.Number) + } + + count := 0 + for { + select { + case <-ticker.C: + for i := range churnFns { + retVals[i][count%op.Number] = churnFns[i](retVals[i][count%op.Number]) + } + count++ + case <-e.tCtx.Done(): + return + } + } + }() + } + return nil +} + +func (e *WorkloadExecutor) runDefaultOp(opIndex int, op realOp) error { + runable, ok := op.(runnableOp) + if !ok { + return fmt.Errorf("invalid op %v", op) + } + for _, namespace := range runable.requiredNamespaces() { + err := createNamespaceIfNotPresent(e.tCtx, namespace, &e.numPodsScheduledPerNamespace) + if err != nil { + return err + } + } + runable.run(e.tCtx) + return nil +} + +func (e *WorkloadExecutor) runStartCollectingMetricsOp(opIndex int, op *startCollectingMetricsOp) error { + if e.collectorCtx != nil { + return fmt.Errorf("metrics collection is overlapping. Probably second collector was started before stopping a previous one") + } + var err error + e.collectorCtx, e.collectors, err = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, op.Name, op.Namespaces, op.LabelSelector) + if err != nil { + return err + } + return nil +} + +func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) error { if _, ok := (*podsPerNamespace)[namespace]; !ok { // The namespace has not created yet. // So, create that and register it. _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) if err != nil { - tCtx.Fatalf("failed to create namespace for Pod: %v", namespace) + return fmt.Errorf("failed to create namespace for Pod: %v", namespace) } (*podsPerNamespace)[namespace] = 0 } + return nil } type testDataCollector interface {