diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 064dcc6965e..9b9f6e58e0a 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -1410,6 +1410,17 @@ func checkEmptyInFlightEvents() error { return nil } +type workloadExecutor struct { + tCtx ktesting.TContext + wg *sync.WaitGroup + collectorCtx *ktesting.TContext + collectorWG *sync.WaitGroup + collectors *[]testDataCollector + numPodsScheduledPerNamespace map[string]int + podInformer coreinformers.PodInformer + throughputErrorMargin float64 +} + func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector) { collectorCtx := ktesting.WithCancel(tCtx) workloadName := tCtx.Name() @@ -1450,57 +1461,6 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex return dataItems } -// metricsCollectionData manages the state and synchronization of metrics collection -// during workload execution. -type metricsCollectionData struct { - // collectors holds a list of test data collectors used to gather performance metrics. - collectors []testDataCollector - // collectorCtx is a separate context specifically for managing the lifecycle - // of metrics collection goroutines. - // This needs a separate context and wait group because - // the metrics collecting needs to be sure that the goroutines - // are stopped. - collectorCtx ktesting.TContext - collectorWG *sync.WaitGroup - // disable error checking of the sampling interval length in the - // throughput collector by default. When running benchmarks, report - // it as test failure when samples are not taken regularly. - throughputErrorMargin float64 -} - -// WorkloadState holds the state information about the workload being executed. -type workloadState struct { - // dataItems stores the collected data from the workload execution. - dataItems []DataItem - // nextNodeIndex keeps track of the next node index to be used when creating nodes. - nextNodeIndex int - // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have. - // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. - numPodsScheduledPerNamespace map[string]int -} - -// sharedOperationData encapsulates all shared state and dependencies used during workload execution. -type sharedOperationData struct { - // podInformer provides access to the informer for monitoring Pod events in the cluster. - // Additional informers needed for testing. The pod informer was - // already created before (scheduler.NewInformerFactory) and the - // factory was started for it (mustSetupCluster), therefore we don't - // need to start again. - podInformer coreinformers.PodInformer - // metricsData contains information and synchronization primitives for managing - // metrics collection during workload execution. - metricsData *metricsCollectionData - // workloadState holds information about the current state of the workload, - // including scheduled pods and created namespaces. - workloadState *workloadState - // tCtx is the root test context, used for cancellation and logging throughout - // the execution of workload operations. - tCtx ktesting.TContext - // wg is a wait group that tracks all ongoing goroutines in the workload execution. - // Ensures proper synchronization and prevents premature termination of operations. - wg *sync.WaitGroup -} - func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem { b, benchmarking := tCtx.TB().(*testing.B) if benchmarking { @@ -1514,6 +1474,9 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact }) } + // Disable error checking of the sampling interval length in the + // throughput collector by default. When running benchmarks, report + // it as test failure when samples are not taken regularly. var throughputErrorMargin float64 if benchmarking { // TODO: To prevent the perf-test failure, we increased the error margin, if still not enough @@ -1521,35 +1484,46 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact throughputErrorMargin = 30 } + // Additional informers needed for testing. The pod informer was + // already created before (scheduler.NewInformerFactory) and the + // factory was started for it (mustSetupCluster), therefore we don't + // need to start again. + // podInformer := informerFactory.Core().V1().Pods() + // Everything else started by this function gets stopped before it returns. tCtx = ktesting.WithCancel(tCtx) - var wg sync.WaitGroup - defer func() { - wg.Wait() - tCtx.Cancel("workload is done") - }() + // var wg sync.WaitGroup + // defer wg.Wait() + defer tCtx.Cancel("workload is done") var dataItems []DataItem + // nextNodeIndex := 0 + // // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have. + // // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. + // numPodsScheduledPerNamespace := make(map[string]int) - var collectorWG sync.WaitGroup - defer collectorWG.Wait() + // sharedOperationData := sharedOperationData{ + // tCtx: tCtx, + // wg: &wg, + // metricsData: &metricsCollectionData{ + // collectorWG: &sync.WaitGroup{}, + // throughputErrorMargin: throughputErrorMargin, + // }, + // workloadState: &workloadState{ + // numPodsScheduledPerNamespace: make(map[string]int), + // }, + // podInformer: informerFactory.Core().V1().Pods(), + // } - sharedOperationData := sharedOperationData{ - tCtx: tCtx, - wg: &wg, - metricsData: &metricsCollectionData{ - collectorWG: &sync.WaitGroup{}, - throughputErrorMargin: throughputErrorMargin, - }, - workloadState: &workloadState{ - numPodsScheduledPerNamespace: make(map[string]int), - }, - podInformer: informerFactory.Core().V1().Pods(), - } + // var collectors []testDataCollector + // // This needs a separate context and wait group because + // // the metrics collecting needs to be sure that the goroutines + // // are stopped. + // var collectorCtx ktesting.TContext + // var collectorWG sync.WaitGroup + // defer collectorWG.Wait() - for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { - runOperation(tc, opIndex, op, w, &sharedOperationData) - } + runJobs(tCtx, tc, w, informerFactory, throughputErrorMargin) // check unused params and inform users unusedParams := w.unusedParams() @@ -1562,90 +1536,137 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact return dataItems } -func runCreateNodesOp(opIndex int, concreteOp *createNodesOp, sharedOperationData *sharedOperationData) { - nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, sharedOperationData.tCtx.Client()) +func doCreateNodesOp(tCtx ktesting.TContext, opIndex int, concreteOp *createNodesOp, nextNodeIndex *int) { + nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client()) if err != nil { - sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) + tCtx.Fatalf("op %d: %v", opIndex, err) } - if err := nodePreparer.PrepareNodes(sharedOperationData.tCtx, sharedOperationData.workloadState.nextNodeIndex); err != nil { - sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) + if err := nodePreparer.PrepareNodes(tCtx, *nextNodeIndex); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) } - sharedOperationData.workloadState.nextNodeIndex += concreteOp.Count + *nextNodeIndex += concreteOp.Count } -func runCreateNamespacesOp(opIndex int, concreteOp *createNamespacesOp, sharedOperationData *sharedOperationData) { - nsPreparer, err := newNamespacePreparer(sharedOperationData.tCtx, concreteOp) +func doCreateNamespaceOp(tCtx ktesting.TContext, opIndex int, concreteOp *createNamespacesOp, numPodsScheduledPerNamespace map[string]int) { + nsPreparer, err := newNamespacePreparer(tCtx, concreteOp) if err != nil { - sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) + tCtx.Fatalf("op %d: %v", opIndex, err) } - if err := nsPreparer.prepare(sharedOperationData.tCtx); err != nil { - err2 := nsPreparer.cleanup(sharedOperationData.tCtx) + if err := nsPreparer.prepare(tCtx); err != nil { + err2 := nsPreparer.cleanup(tCtx) if err2 != nil { err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2) } - sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) + tCtx.Fatalf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { - if _, ok := sharedOperationData.workloadState.numPodsScheduledPerNamespace[n]; ok { + if _, ok := numPodsScheduledPerNamespace[n]; ok { // this namespace has been already created. continue } - sharedOperationData.workloadState.numPodsScheduledPerNamespace[n] = 0 + numPodsScheduledPerNamespace[n] = 0 } } -func runCreatePodsOp(tc *testCase, w *workload, opIndex int, concreteOp *createPodsOp, sharedOperationData *sharedOperationData) { +func doBarrierOp(tCtx ktesting.TContext, opIndex int, concreteOp *barrierOp, numPodsScheduledPerNamespace map[string]int, podInformer coreinformers.PodInformer) { + for _, namespace := range concreteOp.Namespaces { + if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { + tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) + } + } + switch concreteOp.StageRequirement { + case Attempted: + if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) + } + case Scheduled: + // Default should be treated like "Scheduled", so handling both in the same way. + fallthrough + default: + if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) + } + // At the end of the barrier, we can be sure that there are no pods + // pending scheduling in the namespaces that we just blocked on. + if len(concreteOp.Namespaces) == 0 { + numPodsScheduledPerNamespace = make(map[string]int) + } else { + for _, namespace := range concreteOp.Namespaces { + delete(numPodsScheduledPerNamespace, namespace) + } + } + } +} + +func doStopCollectingMetrics(tCtx ktesting.TContext, collectorCtx *ktesting.TContext, opIndex int, dataItems *[]DataItem, w *workload, collectors []testDataCollector, collectorWG *sync.WaitGroup) { + items := stopCollectingMetrics(tCtx, *collectorCtx, collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors) + *dataItems = append(*dataItems, items...) + *collectorCtx = nil +} + +func doCreatePodsOp( + tCtx ktesting.TContext, + opIndex int, + concreteOp *createPodsOp, + numPodsScheduledPerNamespace map[string]int, + dataItems *[]DataItem, + w *workload, + collectors *[]testDataCollector, + collectorWG *sync.WaitGroup, + throughputErrorMargin float64, + podInformer coreinformers.PodInformer, + tc *testCase, + collectorCtx *ktesting.TContext) { var namespace string // define Pod's namespace automatically, and create that namespace. namespace = fmt.Sprintf("namespace-%d", opIndex) if concreteOp.Namespace != nil { namespace = *concreteOp.Namespace } - createNamespaceIfNotPresent(sharedOperationData.tCtx, namespace, &sharedOperationData.workloadState.numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) if concreteOp.PodTemplatePath == nil { concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath } if concreteOp.CollectMetrics { - if sharedOperationData.metricsData.collectorCtx != nil { - sharedOperationData.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + if *collectorCtx != nil { + tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) } - sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectors = startCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorWG, sharedOperationData.podInformer, tc.MetricsCollectorConfig, sharedOperationData.metricsData.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) - defer sharedOperationData.metricsData.collectorCtx.Cancel("cleaning up") + *collectorCtx, *collectors = startCollectingMetrics(tCtx, collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) } - if err := createPodsRapidly(sharedOperationData.tCtx, namespace, concreteOp); err != nil { - sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) + if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) } switch { case concreteOp.SkipWaitToCompletion: // Only record those namespaces that may potentially require barriers // in the future. - sharedOperationData.workloadState.numPodsScheduledPerNamespace[namespace] += concreteOp.Count + numPodsScheduledPerNamespace[namespace] += concreteOp.Count case concreteOp.SteadyState: - if err := createPodsSteadily(sharedOperationData.tCtx, namespace, sharedOperationData.podInformer, concreteOp); err != nil { - sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) + if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) } default: - if err := waitUntilPodsScheduledInNamespace(sharedOperationData.tCtx, sharedOperationData.podInformer, nil, namespace, concreteOp.Count); err != nil { - sharedOperationData.tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) + if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, nil, namespace, concreteOp.Count); err != nil { + tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } if concreteOp.CollectMetrics { // CollectMetrics and SkipWaitToCompletion can never be true at the // same time, so if we're here, it means that all pods have been // scheduled. - items := stopCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.metricsData.collectors) - sharedOperationData.workloadState.dataItems = append(sharedOperationData.workloadState.dataItems, items...) - sharedOperationData.metricsData.collectorCtx = nil + items := stopCollectingMetrics(tCtx, *collectorCtx, collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, *collectors) + *dataItems = append(*dataItems, items...) + *collectorCtx = nil } } -func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData *sharedOperationData) { +func doDeletePodsOp(tCtx ktesting.TContext, opIndex int, concreteOp *deletePodsOp, podInformer coreinformers.PodInformer, wg *sync.WaitGroup) { labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector) - podsToDelete, err := sharedOperationData.podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) + podsToDelete, err := podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) if err != nil { - sharedOperationData.tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) + tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) } deletePods := func(opIndex int) { @@ -1656,13 +1677,13 @@ func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData for i := 0; i < len(podsToDelete); i++ { select { case <-ticker.C: - if err := sharedOperationData.tCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(sharedOperationData.tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { + if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { if errors.Is(err, context.Canceled) { return } - sharedOperationData.tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) + tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) } - case <-sharedOperationData.tCtx.Done(): + case <-tCtx.Done(): return } } @@ -1671,18 +1692,18 @@ func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData listOpts := metav1.ListOptions{ LabelSelector: labelSelector.String(), } - if err := sharedOperationData.tCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(sharedOperationData.tCtx, metav1.DeleteOptions{}, listOpts); err != nil { + if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(tCtx, metav1.DeleteOptions{}, listOpts); err != nil { if errors.Is(err, context.Canceled) { return } - sharedOperationData.tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err) + tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err) } } if concreteOp.SkipWaitToCompletion { - sharedOperationData.wg.Add(1) + wg.Add(1) go func(opIndex int) { - defer sharedOperationData.wg.Done() + defer wg.Done() deletePods(opIndex) }(opIndex) } else { @@ -1690,18 +1711,18 @@ func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData } } -func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *sharedOperationData) { +func doChurnOp(tCtx ktesting.TContext, opIndex int, concreteOp *churnOp, wg *sync.WaitGroup) { var namespace string if concreteOp.Namespace != nil { namespace = *concreteOp.Namespace } else { namespace = fmt.Sprintf("namespace-%d", opIndex) } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(sharedOperationData.tCtx.Client().Discovery())) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery())) // Ensure the namespace exists. nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} - if _, err := sharedOperationData.tCtx.Client().CoreV1().Namespaces().Create(sharedOperationData.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - sharedOperationData.tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { + tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) } var churnFns []func(name string) string @@ -1709,31 +1730,31 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *sharedOpe for i, path := range concreteOp.TemplatePaths { unstructuredObj, gvk, err := getUnstructuredFromFile(path) if err != nil { - sharedOperationData.tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) } // Obtain GVR. mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { - sharedOperationData.tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) } gvr := mapping.Resource // Distinguish cluster-scoped with namespaced API objects. var dynRes dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { - dynRes = sharedOperationData.tCtx.Dynamic().Resource(gvr).Namespace(namespace) + dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace) } else { - dynRes = sharedOperationData.tCtx.Dynamic().Resource(gvr) + dynRes = tCtx.Dynamic().Resource(gvr) } churnFns = append(churnFns, func(name string) string { if name != "" { - if err := dynRes.Delete(sharedOperationData.tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { - sharedOperationData.tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) + if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { + tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) } return "" } - live, err := dynRes.Create(sharedOperationData.tCtx, unstructuredObj, metav1.CreateOptions{}) + live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{}) if err != nil { return "" } @@ -1746,13 +1767,13 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *sharedOpe interval = concreteOp.IntervalMilliseconds } ticker := time.NewTicker(time.Duration(interval) * time.Millisecond) - defer ticker.Stop() switch concreteOp.Mode { case Create: - sharedOperationData.wg.Add(1) + wg.Add(1) go func() { - defer sharedOperationData.wg.Done() + defer wg.Done() + defer ticker.Stop() count, threshold := 0, concreteOp.Number if threshold == 0 { threshold = math.MaxInt32 @@ -1764,15 +1785,16 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *sharedOpe churnFns[i]("") } count++ - case <-sharedOperationData.tCtx.Done(): + case <-tCtx.Done(): return } } }() case Recreate: - sharedOperationData.wg.Add(1) + wg.Add(1) go func() { - defer sharedOperationData.wg.Done() + defer wg.Done() + defer ticker.Stop() retVals := make([][]string, len(churnFns)) // For each churn function, instantiate a slice of strings with length "concreteOp.Number". for i := range retVals { @@ -1787,7 +1809,7 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *sharedOpe retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number]) } count++ - case <-sharedOperationData.tCtx.Done(): + case <-tCtx.Done(): return } } @@ -1795,98 +1817,85 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *sharedOpe } } -func runBarrierOp(opIndex int, concreteOp *barrierOp, sharedOperationData *sharedOperationData) { - for _, namespace := range concreteOp.Namespaces { - if _, ok := sharedOperationData.workloadState.numPodsScheduledPerNamespace[namespace]; !ok { - sharedOperationData.tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) - } - } - switch concreteOp.StageRequirement { - case Attempted: - if err := waitUntilPodsAttempted(sharedOperationData.tCtx, sharedOperationData.podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.workloadState.numPodsScheduledPerNamespace); err != nil { - sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) - } - case Scheduled: - // Default should be treated like "Scheduled", so handling both in the same way. - fallthrough - default: - if err := waitUntilPodsScheduled(sharedOperationData.tCtx, sharedOperationData.podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.workloadState.numPodsScheduledPerNamespace); err != nil { - sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) - } - // At the end of the barrier, we can be sure that there are no pods - // pending scheduling in the namespaces that we just blocked on. - if len(concreteOp.Namespaces) == 0 { - sharedOperationData.workloadState.numPodsScheduledPerNamespace = make(map[string]int) - } else { - for _, namespace := range concreteOp.Namespaces { - delete(sharedOperationData.workloadState.numPodsScheduledPerNamespace, namespace) - } - } - } -} - -func runSleepOp(concreteOp *sleepOp, sharedOperationData *sharedOperationData) { - select { - case <-sharedOperationData.tCtx.Done(): - case <-time.After(concreteOp.Duration.Duration): - } -} - -func runStartCollectingMetricsOp(opIndex int, tc *testCase, concreteOp *startCollectingMetricsOp, sharedOperationData *sharedOperationData) { - if sharedOperationData.metricsData.collectorCtx != nil { - sharedOperationData.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) - } - sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectors = startCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorWG, sharedOperationData.podInformer, tc.MetricsCollectorConfig, sharedOperationData.metricsData.throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector) -} - -func runStopCollectingMetricsOp(opIndex int, w *workload, sharedOperationData *sharedOperationData) { - items := stopCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.metricsData.collectors) - sharedOperationData.workloadState.dataItems = append(sharedOperationData.workloadState.dataItems, items...) - sharedOperationData.metricsData.collectorCtx = nil -} - -func runDefault(opIndex int, concreteOp realOp, sharedOperationData *sharedOperationData) { +func doDefaultOp(tCtx ktesting.TContext, opIndex int, concreteOp realOp, numPodsScheduledPerNamespace map[string]int) { runable, ok := concreteOp.(runnableOp) if !ok { - sharedOperationData.tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) + tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) } for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(sharedOperationData.tCtx, namespace, &sharedOperationData.workloadState.numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) } - runable.run(sharedOperationData.tCtx) + runable.run(tCtx) } -func runOperation(tc *testCase, opIndex int, op op, w *workload, sharedOperationData *sharedOperationData) { - realOp, err := op.realOp.patchParams(w) - if err != nil { - sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) +func doStartCollectingMetricsOp(tCtx ktesting.TContext, opIndex int, concreteOp *startCollectingMetricsOp, collectorCtx *ktesting.TContext, collectors *[]testDataCollector, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, tc *testCase, throughputErrorMargin float64) { + if *collectorCtx != nil { + tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) } - select { - case <-sharedOperationData.tCtx.Done(): - sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, context.Cause(sharedOperationData.tCtx)) - default: - } - switch concreteOp := realOp.(type) { - case *createNodesOp: - runCreateNodesOp(opIndex, concreteOp, sharedOperationData) - case *createNamespacesOp: - runCreateNamespacesOp(opIndex, concreteOp, sharedOperationData) - case *createPodsOp: - runCreatePodsOp(tc, w, opIndex, concreteOp, sharedOperationData) - case *deletePodsOp: - runDeletePodsOp(opIndex, concreteOp, sharedOperationData) - case *churnOp: - runChurnOp(opIndex, concreteOp, sharedOperationData) - case *barrierOp: - runBarrierOp(opIndex, concreteOp, sharedOperationData) - case *sleepOp: - runSleepOp(concreteOp, sharedOperationData) - case *startCollectingMetricsOp: - runStartCollectingMetricsOp(opIndex, tc, concreteOp, sharedOperationData) - case *stopCollectingMetricsOp: - runStopCollectingMetricsOp(opIndex, w, sharedOperationData) - default: - runDefault(opIndex, concreteOp, sharedOperationData) + *collectorCtx, *collectors = startCollectingMetrics(tCtx, collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector) +} + +func runJobs(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, throughputErrorMargin float64) { + var wg sync.WaitGroup + defer wg.Wait() + defer tCtx.Cancel("workload is done") + numPodsScheduledPerNamespace := make(map[string]int) + nextNodeIndex := 0 + podInformer := informerFactory.Core().V1().Pods() + var collectors []testDataCollector + // This needs a separate context and wait group because + // the metrics collecting needs to be sure that the goroutines + // are stopped. + var collectorCtx ktesting.TContext + var collectorWG sync.WaitGroup + defer collectorWG.Wait() + var dataItems []DataItem + for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { + realOp, err := op.realOp.patchParams(w) + if err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) + } + select { + case <-tCtx.Done(): + tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx)) + default: + } + switch concreteOp := realOp.(type) { + case *createNodesOp: + doCreateNodesOp(tCtx, opIndex, concreteOp, &nextNodeIndex) + + case *createNamespacesOp: + doCreateNamespaceOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace) + case *createPodsOp: + doCreatePodsOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace, &dataItems, w, &collectors, &collectorWG, throughputErrorMargin, podInformer, tc, &collectorCtx) + if collectorCtx != nil { + defer collectorCtx.Cancel("cleaning up") + } + case *deletePodsOp: + doDeletePodsOp(tCtx, opIndex, concreteOp, podInformer, &wg) + + case *churnOp: + doChurnOp(tCtx, opIndex, concreteOp, &wg) + + case *barrierOp: + doBarrierOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace, podInformer) + + case *sleepOp: + select { + case <-tCtx.Done(): + case <-time.After(concreteOp.Duration.Duration): + } + + case *startCollectingMetricsOp: + doStartCollectingMetricsOp(tCtx, opIndex, concreteOp, &collectorCtx, &collectors, &collectorWG, podInformer, tc, throughputErrorMargin) + defer collectorCtx.Cancel("cleaning up") + + case *stopCollectingMetricsOp: + doStopCollectingMetrics(tCtx, &collectorCtx, opIndex, &dataItems, w, collectors, &collectorWG) + + default: + doDefaultOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace) + } } }