From 659804b765c4be5f0a9ee5b9c055f21c8b19493e Mon Sep 17 00:00:00 2001 From: YamasouA Date: Sun, 26 Jan 2025 19:39:38 +0900 Subject: [PATCH] refactor runWorkloads --- .../scheduler_perf/scheduler_perf.go | 691 ++++++++++-------- 1 file changed, 378 insertions(+), 313 deletions(-) diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index a69100bff9e..0ce499443bc 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -1450,6 +1450,40 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex return dataItems } +type MetricsCollectionData struct { + Collectors []testDataCollector + // This needs a separate context and wait group because + // the metrics collecting needs to be sure that the goroutines + // are stopped. + CollectorCtx ktesting.TContext + CollectorWG *sync.WaitGroup + // Disable error checking of the sampling interval length in the + // throughput collector by default. When running benchmarks, report + // it as test failure when samples are not taken regularly. + ThroughputErrorMargin float64 +} + +type WorkloadState struct { + DataItems []DataItem + NextNodeIndex int + // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have. + // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. + NumPodsScheduledPerNamespace map[string]int +} + +type SharedOperationData struct { + // Additional informers needed for testing. The pod informer was + // already created before (scheduler.NewInformerFactory) and the + // factory was started for it (mustSetupCluster), therefore we don't + // need to start again. + PodInformer coreinformers.PodInformer + MetricsData *MetricsCollectionData + WorkloadState *WorkloadState + TCtx ktesting.TContext + WG sync.WaitGroup + CancelFunc context.CancelFunc +} + func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem { b, benchmarking := tCtx.TB().(*testing.B) if benchmarking { @@ -1463,9 +1497,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact }) } - // Disable error checking of the sampling interval length in the - // throughput collector by default. When running benchmarks, report - // it as test failure when samples are not taken regularly. var throughputErrorMargin float64 if benchmarking { // TODO: To prevent the perf-test failure, we increased the error margin, if still not enough @@ -1473,12 +1504,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact throughputErrorMargin = 30 } - // Additional informers needed for testing. The pod informer was - // already created before (scheduler.NewInformerFactory) and the - // factory was started for it (mustSetupCluster), therefore we don't - // need to start again. - podInformer := informerFactory.Core().V1().Pods() - // Everything else started by this function gets stopped before it returns. tCtx = ktesting.WithCancel(tCtx) var wg sync.WaitGroup @@ -1486,315 +1511,25 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact defer tCtx.Cancel("workload is done") var dataItems []DataItem - nextNodeIndex := 0 - // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have. - // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. - numPodsScheduledPerNamespace := make(map[string]int) - var collectors []testDataCollector - // This needs a separate context and wait group because - // the metrics collecting needs to be sure that the goroutines - // are stopped. - var collectorCtx ktesting.TContext var collectorWG sync.WaitGroup defer collectorWG.Wait() + sharedOperationData := SharedOperationData{ + TCtx: tCtx, + WG: wg, + MetricsData: &MetricsCollectionData{ + CollectorWG: &sync.WaitGroup{}, + ThroughputErrorMargin: throughputErrorMargin, + }, + WorkloadState: &WorkloadState{ + NumPodsScheduledPerNamespace: make(map[string]int), + }, + PodInformer: informerFactory.Core().V1().Pods(), + } + for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { - realOp, err := op.realOp.patchParams(w) - if err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - select { - case <-tCtx.Done(): - tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx)) - default: - } - switch concreteOp := realOp.(type) { - case *createNodesOp: - nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client()) - if err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - nextNodeIndex += concreteOp.Count - - case *createNamespacesOp: - nsPreparer, err := newNamespacePreparer(tCtx, concreteOp) - if err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - if err := nsPreparer.prepare(tCtx); err != nil { - err2 := nsPreparer.cleanup(tCtx) - if err2 != nil { - err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2) - } - tCtx.Fatalf("op %d: %v", opIndex, err) - } - for _, n := range nsPreparer.namespaces() { - if _, ok := numPodsScheduledPerNamespace[n]; ok { - // this namespace has been already created. - continue - } - numPodsScheduledPerNamespace[n] = 0 - } - - case *createPodsOp: - var namespace string - // define Pod's namespace automatically, and create that namespace. - namespace = fmt.Sprintf("namespace-%d", opIndex) - if concreteOp.Namespace != nil { - namespace = *concreteOp.Namespace - } - createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) - if concreteOp.PodTemplatePath == nil { - concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath - } - - if concreteOp.CollectMetrics { - if collectorCtx != nil { - tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) - } - collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) - defer collectorCtx.Cancel("cleaning up") - } - if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - switch { - case concreteOp.SkipWaitToCompletion: - // Only record those namespaces that may potentially require barriers - // in the future. - numPodsScheduledPerNamespace[namespace] += concreteOp.Count - case concreteOp.SteadyState: - if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - default: - if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, nil, namespace, concreteOp.Count); err != nil { - tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) - } - } - if concreteOp.CollectMetrics { - // CollectMetrics and SkipWaitToCompletion can never be true at the - // same time, so if we're here, it means that all pods have been - // scheduled. - items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors) - dataItems = append(dataItems, items...) - collectorCtx = nil - } - - case *deletePodsOp: - labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector) - - podsToDelete, err := podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) - if err != nil { - tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) - } - - deletePods := func(opIndex int) { - if concreteOp.DeletePodsPerSecond > 0 { - ticker := time.NewTicker(time.Second / time.Duration(concreteOp.DeletePodsPerSecond)) - defer ticker.Stop() - - for i := 0; i < len(podsToDelete); i++ { - select { - case <-ticker.C: - if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { - if errors.Is(err, context.Canceled) { - return - } - tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) - } - case <-tCtx.Done(): - return - } - } - return - } - listOpts := metav1.ListOptions{ - LabelSelector: labelSelector.String(), - } - if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(tCtx, metav1.DeleteOptions{}, listOpts); err != nil { - if errors.Is(err, context.Canceled) { - return - } - tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err) - } - } - - if concreteOp.SkipWaitToCompletion { - wg.Add(1) - go func(opIndex int) { - defer wg.Done() - deletePods(opIndex) - }(opIndex) - } else { - deletePods(opIndex) - } - - case *churnOp: - var namespace string - if concreteOp.Namespace != nil { - namespace = *concreteOp.Namespace - } else { - namespace = fmt.Sprintf("namespace-%d", opIndex) - } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery())) - // Ensure the namespace exists. - nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} - if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) - } - - var churnFns []func(name string) string - - for i, path := range concreteOp.TemplatePaths { - unstructuredObj, gvk, err := getUnstructuredFromFile(path) - if err != nil { - tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) - } - // Obtain GVR. - mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) - } - gvr := mapping.Resource - // Distinguish cluster-scoped with namespaced API objects. - var dynRes dynamic.ResourceInterface - if mapping.Scope.Name() == meta.RESTScopeNameNamespace { - dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace) - } else { - dynRes = tCtx.Dynamic().Resource(gvr) - } - - churnFns = append(churnFns, func(name string) string { - if name != "" { - if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { - tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) - } - return "" - } - - live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{}) - if err != nil { - return "" - } - return live.GetName() - }) - } - - var interval int64 = 500 - if concreteOp.IntervalMilliseconds != 0 { - interval = concreteOp.IntervalMilliseconds - } - ticker := time.NewTicker(time.Duration(interval) * time.Millisecond) - defer ticker.Stop() - - switch concreteOp.Mode { - case Create: - wg.Add(1) - go func() { - defer wg.Done() - count, threshold := 0, concreteOp.Number - if threshold == 0 { - threshold = math.MaxInt32 - } - for count < threshold { - select { - case <-ticker.C: - for i := range churnFns { - churnFns[i]("") - } - count++ - case <-tCtx.Done(): - return - } - } - }() - case Recreate: - wg.Add(1) - go func() { - defer wg.Done() - retVals := make([][]string, len(churnFns)) - // For each churn function, instantiate a slice of strings with length "concreteOp.Number". - for i := range retVals { - retVals[i] = make([]string, concreteOp.Number) - } - - count := 0 - for { - select { - case <-ticker.C: - for i := range churnFns { - retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number]) - } - count++ - case <-tCtx.Done(): - return - } - } - }() - } - - case *barrierOp: - for _, namespace := range concreteOp.Namespaces { - if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { - tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) - } - } - switch concreteOp.StageRequirement { - case Attempted: - if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - case Scheduled: - // Default should be treated like "Scheduled", so handling both in the same way. - fallthrough - default: - if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - // At the end of the barrier, we can be sure that there are no pods - // pending scheduling in the namespaces that we just blocked on. - if len(concreteOp.Namespaces) == 0 { - numPodsScheduledPerNamespace = make(map[string]int) - } else { - for _, namespace := range concreteOp.Namespaces { - delete(numPodsScheduledPerNamespace, namespace) - } - } - } - - case *sleepOp: - select { - case <-tCtx.Done(): - case <-time.After(concreteOp.Duration.Duration): - } - - case *startCollectingMetricsOp: - if collectorCtx != nil { - tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) - } - collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector) - defer collectorCtx.Cancel("cleaning up") - - case *stopCollectingMetricsOp: - items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors) - dataItems = append(dataItems, items...) - collectorCtx = nil - - default: - runable, ok := concreteOp.(runnableOp) - if !ok { - tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) - } - for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) - } - runable.run(tCtx) - } + runOperation(tc, opIndex, op, w, &sharedOperationData) } // check unused params and inform users @@ -1808,6 +1543,336 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact return dataItems } +func runCreateNodesOp(opIndex int, concreteOp *createNodesOp, sharedOperationData *SharedOperationData) { + nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, sharedOperationData.TCtx.Client()) + if err != nil { + sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + } + if err := nodePreparer.PrepareNodes(sharedOperationData.TCtx, sharedOperationData.WorkloadState.NextNodeIndex); err != nil { + sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + } + sharedOperationData.WorkloadState.NextNodeIndex += concreteOp.Count +} + +func runCreateNamespacesOp(opIndex int, concreteOp *createNamespacesOp, sharedOperationData *SharedOperationData) { + nsPreparer, err := newNamespacePreparer(sharedOperationData.TCtx, concreteOp) + if err != nil { + sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + } + if err := nsPreparer.prepare(sharedOperationData.TCtx); err != nil { + err2 := nsPreparer.cleanup(sharedOperationData.TCtx) + if err2 != nil { + err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2) + } + sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + } + for _, n := range nsPreparer.namespaces() { + if _, ok := sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[n]; ok { + // this namespace has been already created. + continue + } + sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[n] = 0 + } +} + +func runCreatePodsOp(tc *testCase, w *workload, opIndex int, concreteOp *createPodsOp, sharedOperationData *SharedOperationData) { + var namespace string + // define Pod's namespace automatically, and create that namespace. + namespace = fmt.Sprintf("namespace-%d", opIndex) + if concreteOp.Namespace != nil { + namespace = *concreteOp.Namespace + } + createNamespaceIfNotPresent(sharedOperationData.TCtx, namespace, &sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace) + if concreteOp.PodTemplatePath == nil { + concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath + } + + if concreteOp.CollectMetrics { + if sharedOperationData.MetricsData.CollectorCtx != nil { + sharedOperationData.TCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + } + sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.Collectors = startCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorWG, sharedOperationData.PodInformer, tc.MetricsCollectorConfig, sharedOperationData.MetricsData.ThroughputErrorMargin, opIndex, namespace, []string{namespace}, nil) + defer sharedOperationData.MetricsData.CollectorCtx.Cancel("cleaning up") + } + if err := createPodsRapidly(sharedOperationData.TCtx, namespace, concreteOp); err != nil { + sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + } + switch { + case concreteOp.SkipWaitToCompletion: + // Only record those namespaces that may potentially require barriers + // in the future. + sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[namespace] += concreteOp.Count + case concreteOp.SteadyState: + if err := createPodsSteadily(sharedOperationData.TCtx, namespace, sharedOperationData.PodInformer, concreteOp); err != nil { + sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + } + default: + if err := waitUntilPodsScheduledInNamespace(sharedOperationData.TCtx, sharedOperationData.PodInformer, nil, namespace, concreteOp.Count); err != nil { + sharedOperationData.TCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) + } + } + if concreteOp.CollectMetrics { + // CollectMetrics and SkipWaitToCompletion can never be true at the + // same time, so if we're here, it means that all pods have been + // scheduled. + items := stopCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.CollectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.MetricsData.Collectors) + sharedOperationData.WorkloadState.DataItems = append(sharedOperationData.WorkloadState.DataItems, items...) + sharedOperationData.MetricsData.CollectorCtx = nil + } +} + +func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData *SharedOperationData) { + labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector) + + podsToDelete, err := sharedOperationData.PodInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) + if err != nil { + sharedOperationData.TCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) + } + + deletePods := func(opIndex int) { + if concreteOp.DeletePodsPerSecond > 0 { + ticker := time.NewTicker(time.Second / time.Duration(concreteOp.DeletePodsPerSecond)) + defer ticker.Stop() + + for i := 0; i < len(podsToDelete); i++ { + select { + case <-ticker.C: + if err := sharedOperationData.TCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(sharedOperationData.TCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { + if errors.Is(err, context.Canceled) { + return + } + sharedOperationData.TCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) + } + case <-sharedOperationData.TCtx.Done(): + return + } + } + return + } + listOpts := metav1.ListOptions{ + LabelSelector: labelSelector.String(), + } + if err := sharedOperationData.TCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(sharedOperationData.TCtx, metav1.DeleteOptions{}, listOpts); err != nil { + if errors.Is(err, context.Canceled) { + return + } + sharedOperationData.TCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err) + } + } + + if concreteOp.SkipWaitToCompletion { + sharedOperationData.WG.Add(1) + go func(opIndex int) { + defer sharedOperationData.WG.Done() + deletePods(opIndex) + }(opIndex) + } else { + deletePods(opIndex) + } +} + +func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOperationData) { + var namespace string + if concreteOp.Namespace != nil { + namespace = *concreteOp.Namespace + } else { + namespace = fmt.Sprintf("namespace-%d", opIndex) + } + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(sharedOperationData.TCtx.Client().Discovery())) + // Ensure the namespace exists. + nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} + if _, err := sharedOperationData.TCtx.Client().CoreV1().Namespaces().Create(sharedOperationData.TCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { + sharedOperationData.TCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + } + + var churnFns []func(name string) string + + for i, path := range concreteOp.TemplatePaths { + unstructuredObj, gvk, err := getUnstructuredFromFile(path) + if err != nil { + sharedOperationData.TCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + } + // Obtain GVR. + mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + sharedOperationData.TCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + } + gvr := mapping.Resource + // Distinguish cluster-scoped with namespaced API objects. + var dynRes dynamic.ResourceInterface + if mapping.Scope.Name() == meta.RESTScopeNameNamespace { + dynRes = sharedOperationData.TCtx.Dynamic().Resource(gvr).Namespace(namespace) + } else { + dynRes = sharedOperationData.TCtx.Dynamic().Resource(gvr) + } + + churnFns = append(churnFns, func(name string) string { + if name != "" { + if err := dynRes.Delete(sharedOperationData.TCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { + sharedOperationData.TCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) + } + return "" + } + + live, err := dynRes.Create(sharedOperationData.TCtx, unstructuredObj, metav1.CreateOptions{}) + if err != nil { + return "" + } + return live.GetName() + }) + } + + var interval int64 = 500 + if concreteOp.IntervalMilliseconds != 0 { + interval = concreteOp.IntervalMilliseconds + } + ticker := time.NewTicker(time.Duration(interval) * time.Millisecond) + defer ticker.Stop() + + switch concreteOp.Mode { + case Create: + sharedOperationData.WG.Add(1) + go func() { + defer sharedOperationData.WG.Done() + count, threshold := 0, concreteOp.Number + if threshold == 0 { + threshold = math.MaxInt32 + } + for count < threshold { + select { + case <-ticker.C: + for i := range churnFns { + churnFns[i]("") + } + count++ + case <-sharedOperationData.TCtx.Done(): + return + } + } + }() + case Recreate: + sharedOperationData.WG.Add(1) + go func() { + defer sharedOperationData.WG.Done() + retVals := make([][]string, len(churnFns)) + // For each churn function, instantiate a slice of strings with length "concreteOp.Number". + for i := range retVals { + retVals[i] = make([]string, concreteOp.Number) + } + + count := 0 + for { + select { + case <-ticker.C: + for i := range churnFns { + retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number]) + } + count++ + case <-sharedOperationData.TCtx.Done(): + return + } + } + }() + } +} + +func runBarrierOp(opIndex int, concreteOp *barrierOp, sharedOperationData *SharedOperationData) { + for _, namespace := range concreteOp.Namespaces { + if _, ok := sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[namespace]; !ok { + sharedOperationData.TCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) + } + } + switch concreteOp.StageRequirement { + case Attempted: + if err := waitUntilPodsAttempted(sharedOperationData.TCtx, sharedOperationData.PodInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace); err != nil { + sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + } + case Scheduled: + // Default should be treated like "Scheduled", so handling both in the same way. + fallthrough + default: + if err := waitUntilPodsScheduled(sharedOperationData.TCtx, sharedOperationData.PodInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace); err != nil { + sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + } + // At the end of the barrier, we can be sure that there are no pods + // pending scheduling in the namespaces that we just blocked on. + if len(concreteOp.Namespaces) == 0 { + sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace = make(map[string]int) + } else { + for _, namespace := range concreteOp.Namespaces { + delete(sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace, namespace) + } + } + } +} + +func runSleepOp(concreteOp *sleepOp, sharedOperationData *SharedOperationData) { + select { + case <-sharedOperationData.TCtx.Done(): + case <-time.After(concreteOp.Duration.Duration): + } +} + +func runStartCollectingMetricsOp(opIndex int, tc *testCase, concreteOp *startCollectingMetricsOp, sharedOperationData *SharedOperationData) { + if sharedOperationData.MetricsData.CollectorCtx != nil { + sharedOperationData.TCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + } + sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.Collectors = startCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorWG, sharedOperationData.PodInformer, tc.MetricsCollectorConfig, sharedOperationData.MetricsData.ThroughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector) + + defer sharedOperationData.MetricsData.CollectorCtx.Cancel("cleaning up") +} + +func runStopCollectingMetricsOp(opIndex int, w *workload, sharedOperationData *SharedOperationData) { + items := stopCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.CollectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.MetricsData.Collectors) + sharedOperationData.WorkloadState.DataItems = append(sharedOperationData.WorkloadState.DataItems, items...) + sharedOperationData.MetricsData.CollectorCtx = nil +} + +func runDefault(opIndex int, concreteOp realOp, sharedOperationData *SharedOperationData) { + runable, ok := concreteOp.(runnableOp) + if !ok { + sharedOperationData.TCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) + } + for _, namespace := range runable.requiredNamespaces() { + createNamespaceIfNotPresent(sharedOperationData.TCtx, namespace, &sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace) + } + runable.run(sharedOperationData.TCtx) +} + +func runOperation(tc *testCase, opIndex int, op op, w *workload, sharedOperationData *SharedOperationData) { + realOp, err := op.realOp.patchParams(w) + if err != nil { + sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + } + select { + case <-sharedOperationData.TCtx.Done(): + sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, context.Cause(sharedOperationData.TCtx)) + default: + } + switch concreteOp := realOp.(type) { + case *createNodesOp: + runCreateNodesOp(opIndex, concreteOp, sharedOperationData) + case *createNamespacesOp: + runCreateNamespacesOp(opIndex, concreteOp, sharedOperationData) + case *createPodsOp: + runCreatePodsOp(tc, w, opIndex, concreteOp, sharedOperationData) + case *deletePodsOp: + runDeletePodsOp(opIndex, concreteOp, sharedOperationData) + case *churnOp: + runChurnOp(opIndex, concreteOp, sharedOperationData) + case *barrierOp: + runBarrierOp(opIndex, concreteOp, sharedOperationData) + case *sleepOp: + runSleepOp(concreteOp, sharedOperationData) + case *startCollectingMetricsOp: + runStartCollectingMetricsOp(opIndex, tc, concreteOp, sharedOperationData) + case *stopCollectingMetricsOp: + runStopCollectingMetricsOp(opIndex, w, sharedOperationData) + default: + runDefault(opIndex, concreteOp, sharedOperationData) + } +} + func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) { if _, ok := (*podsPerNamespace)[namespace]; !ok { // The namespace has not created yet.