From 1b0ad78718eb813e7b007ec1f1158cdaa9423cfe Mon Sep 17 00:00:00 2001 From: YamasouA Date: Wed, 29 Jan 2025 23:58:51 +0900 Subject: [PATCH] fix --- .../scheduler_perf/scheduler_perf.go | 245 ++++++++++-------- 1 file changed, 131 insertions(+), 114 deletions(-) diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 0ce499443bc..064dcc6965e 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -1450,38 +1450,55 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex return dataItems } -type MetricsCollectionData struct { - Collectors []testDataCollector +// metricsCollectionData manages the state and synchronization of metrics collection +// during workload execution. +type metricsCollectionData struct { + // collectors holds a list of test data collectors used to gather performance metrics. + collectors []testDataCollector + // collectorCtx is a separate context specifically for managing the lifecycle + // of metrics collection goroutines. // This needs a separate context and wait group because // the metrics collecting needs to be sure that the goroutines // are stopped. - CollectorCtx ktesting.TContext - CollectorWG *sync.WaitGroup - // Disable error checking of the sampling interval length in the + collectorCtx ktesting.TContext + collectorWG *sync.WaitGroup + // disable error checking of the sampling interval length in the // throughput collector by default. When running benchmarks, report // it as test failure when samples are not taken regularly. - ThroughputErrorMargin float64 + throughputErrorMargin float64 } -type WorkloadState struct { - DataItems []DataItem - NextNodeIndex int +// WorkloadState holds the state information about the workload being executed. +type workloadState struct { + // dataItems stores the collected data from the workload execution. + dataItems []DataItem + // nextNodeIndex keeps track of the next node index to be used when creating nodes. + nextNodeIndex int // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have. // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. - NumPodsScheduledPerNamespace map[string]int + numPodsScheduledPerNamespace map[string]int } -type SharedOperationData struct { +// sharedOperationData encapsulates all shared state and dependencies used during workload execution. +type sharedOperationData struct { + // podInformer provides access to the informer for monitoring Pod events in the cluster. // Additional informers needed for testing. The pod informer was // already created before (scheduler.NewInformerFactory) and the // factory was started for it (mustSetupCluster), therefore we don't // need to start again. - PodInformer coreinformers.PodInformer - MetricsData *MetricsCollectionData - WorkloadState *WorkloadState - TCtx ktesting.TContext - WG sync.WaitGroup - CancelFunc context.CancelFunc + podInformer coreinformers.PodInformer + // metricsData contains information and synchronization primitives for managing + // metrics collection during workload execution. + metricsData *metricsCollectionData + // workloadState holds information about the current state of the workload, + // including scheduled pods and created namespaces. + workloadState *workloadState + // tCtx is the root test context, used for cancellation and logging throughout + // the execution of workload operations. + tCtx ktesting.TContext + // wg is a wait group that tracks all ongoing goroutines in the workload execution. + // Ensures proper synchronization and prevents premature termination of operations. + wg *sync.WaitGroup } func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem { @@ -1507,25 +1524,27 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact // Everything else started by this function gets stopped before it returns. tCtx = ktesting.WithCancel(tCtx) var wg sync.WaitGroup - defer wg.Wait() - defer tCtx.Cancel("workload is done") + defer func() { + wg.Wait() + tCtx.Cancel("workload is done") + }() var dataItems []DataItem var collectorWG sync.WaitGroup defer collectorWG.Wait() - sharedOperationData := SharedOperationData{ - TCtx: tCtx, - WG: wg, - MetricsData: &MetricsCollectionData{ - CollectorWG: &sync.WaitGroup{}, - ThroughputErrorMargin: throughputErrorMargin, + sharedOperationData := sharedOperationData{ + tCtx: tCtx, + wg: &wg, + metricsData: &metricsCollectionData{ + collectorWG: &sync.WaitGroup{}, + throughputErrorMargin: throughputErrorMargin, }, - WorkloadState: &WorkloadState{ - NumPodsScheduledPerNamespace: make(map[string]int), + workloadState: &workloadState{ + numPodsScheduledPerNamespace: make(map[string]int), }, - PodInformer: informerFactory.Core().V1().Pods(), + podInformer: informerFactory.Core().V1().Pods(), } for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { @@ -1543,90 +1562,90 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact return dataItems } -func runCreateNodesOp(opIndex int, concreteOp *createNodesOp, sharedOperationData *SharedOperationData) { - nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, sharedOperationData.TCtx.Client()) +func runCreateNodesOp(opIndex int, concreteOp *createNodesOp, sharedOperationData *sharedOperationData) { + nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, sharedOperationData.tCtx.Client()) if err != nil { - sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) } - if err := nodePreparer.PrepareNodes(sharedOperationData.TCtx, sharedOperationData.WorkloadState.NextNodeIndex); err != nil { - sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + if err := nodePreparer.PrepareNodes(sharedOperationData.tCtx, sharedOperationData.workloadState.nextNodeIndex); err != nil { + sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) } - sharedOperationData.WorkloadState.NextNodeIndex += concreteOp.Count + sharedOperationData.workloadState.nextNodeIndex += concreteOp.Count } -func runCreateNamespacesOp(opIndex int, concreteOp *createNamespacesOp, sharedOperationData *SharedOperationData) { - nsPreparer, err := newNamespacePreparer(sharedOperationData.TCtx, concreteOp) +func runCreateNamespacesOp(opIndex int, concreteOp *createNamespacesOp, sharedOperationData *sharedOperationData) { + nsPreparer, err := newNamespacePreparer(sharedOperationData.tCtx, concreteOp) if err != nil { - sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) } - if err := nsPreparer.prepare(sharedOperationData.TCtx); err != nil { - err2 := nsPreparer.cleanup(sharedOperationData.TCtx) + if err := nsPreparer.prepare(sharedOperationData.tCtx); err != nil { + err2 := nsPreparer.cleanup(sharedOperationData.tCtx) if err2 != nil { err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2) } - sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { - if _, ok := sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[n]; ok { + if _, ok := sharedOperationData.workloadState.numPodsScheduledPerNamespace[n]; ok { // this namespace has been already created. continue } - sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[n] = 0 + sharedOperationData.workloadState.numPodsScheduledPerNamespace[n] = 0 } } -func runCreatePodsOp(tc *testCase, w *workload, opIndex int, concreteOp *createPodsOp, sharedOperationData *SharedOperationData) { +func runCreatePodsOp(tc *testCase, w *workload, opIndex int, concreteOp *createPodsOp, sharedOperationData *sharedOperationData) { var namespace string // define Pod's namespace automatically, and create that namespace. namespace = fmt.Sprintf("namespace-%d", opIndex) if concreteOp.Namespace != nil { namespace = *concreteOp.Namespace } - createNamespaceIfNotPresent(sharedOperationData.TCtx, namespace, &sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace) + createNamespaceIfNotPresent(sharedOperationData.tCtx, namespace, &sharedOperationData.workloadState.numPodsScheduledPerNamespace) if concreteOp.PodTemplatePath == nil { concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath } if concreteOp.CollectMetrics { - if sharedOperationData.MetricsData.CollectorCtx != nil { - sharedOperationData.TCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + if sharedOperationData.metricsData.collectorCtx != nil { + sharedOperationData.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) } - sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.Collectors = startCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorWG, sharedOperationData.PodInformer, tc.MetricsCollectorConfig, sharedOperationData.MetricsData.ThroughputErrorMargin, opIndex, namespace, []string{namespace}, nil) - defer sharedOperationData.MetricsData.CollectorCtx.Cancel("cleaning up") + sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectors = startCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorWG, sharedOperationData.podInformer, tc.MetricsCollectorConfig, sharedOperationData.metricsData.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) + defer sharedOperationData.metricsData.collectorCtx.Cancel("cleaning up") } - if err := createPodsRapidly(sharedOperationData.TCtx, namespace, concreteOp); err != nil { - sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + if err := createPodsRapidly(sharedOperationData.tCtx, namespace, concreteOp); err != nil { + sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) } switch { case concreteOp.SkipWaitToCompletion: // Only record those namespaces that may potentially require barriers // in the future. - sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[namespace] += concreteOp.Count + sharedOperationData.workloadState.numPodsScheduledPerNamespace[namespace] += concreteOp.Count case concreteOp.SteadyState: - if err := createPodsSteadily(sharedOperationData.TCtx, namespace, sharedOperationData.PodInformer, concreteOp); err != nil { - sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + if err := createPodsSteadily(sharedOperationData.tCtx, namespace, sharedOperationData.podInformer, concreteOp); err != nil { + sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) } default: - if err := waitUntilPodsScheduledInNamespace(sharedOperationData.TCtx, sharedOperationData.PodInformer, nil, namespace, concreteOp.Count); err != nil { - sharedOperationData.TCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) + if err := waitUntilPodsScheduledInNamespace(sharedOperationData.tCtx, sharedOperationData.podInformer, nil, namespace, concreteOp.Count); err != nil { + sharedOperationData.tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } if concreteOp.CollectMetrics { // CollectMetrics and SkipWaitToCompletion can never be true at the // same time, so if we're here, it means that all pods have been // scheduled. - items := stopCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.CollectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.MetricsData.Collectors) - sharedOperationData.WorkloadState.DataItems = append(sharedOperationData.WorkloadState.DataItems, items...) - sharedOperationData.MetricsData.CollectorCtx = nil + items := stopCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.metricsData.collectors) + sharedOperationData.workloadState.dataItems = append(sharedOperationData.workloadState.dataItems, items...) + sharedOperationData.metricsData.collectorCtx = nil } } -func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData *SharedOperationData) { +func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData *sharedOperationData) { labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector) - podsToDelete, err := sharedOperationData.PodInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) + podsToDelete, err := sharedOperationData.podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) if err != nil { - sharedOperationData.TCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) + sharedOperationData.tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) } deletePods := func(opIndex int) { @@ -1637,13 +1656,13 @@ func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData for i := 0; i < len(podsToDelete); i++ { select { case <-ticker.C: - if err := sharedOperationData.TCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(sharedOperationData.TCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { + if err := sharedOperationData.tCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(sharedOperationData.tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { if errors.Is(err, context.Canceled) { return } - sharedOperationData.TCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) + sharedOperationData.tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) } - case <-sharedOperationData.TCtx.Done(): + case <-sharedOperationData.tCtx.Done(): return } } @@ -1652,18 +1671,18 @@ func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData listOpts := metav1.ListOptions{ LabelSelector: labelSelector.String(), } - if err := sharedOperationData.TCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(sharedOperationData.TCtx, metav1.DeleteOptions{}, listOpts); err != nil { + if err := sharedOperationData.tCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(sharedOperationData.tCtx, metav1.DeleteOptions{}, listOpts); err != nil { if errors.Is(err, context.Canceled) { return } - sharedOperationData.TCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err) + sharedOperationData.tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err) } } if concreteOp.SkipWaitToCompletion { - sharedOperationData.WG.Add(1) + sharedOperationData.wg.Add(1) go func(opIndex int) { - defer sharedOperationData.WG.Done() + defer sharedOperationData.wg.Done() deletePods(opIndex) }(opIndex) } else { @@ -1671,18 +1690,18 @@ func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData } } -func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOperationData) { +func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *sharedOperationData) { var namespace string if concreteOp.Namespace != nil { namespace = *concreteOp.Namespace } else { namespace = fmt.Sprintf("namespace-%d", opIndex) } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(sharedOperationData.TCtx.Client().Discovery())) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(sharedOperationData.tCtx.Client().Discovery())) // Ensure the namespace exists. nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} - if _, err := sharedOperationData.TCtx.Client().CoreV1().Namespaces().Create(sharedOperationData.TCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - sharedOperationData.TCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + if _, err := sharedOperationData.tCtx.Client().CoreV1().Namespaces().Create(sharedOperationData.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { + sharedOperationData.tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) } var churnFns []func(name string) string @@ -1690,31 +1709,31 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOpe for i, path := range concreteOp.TemplatePaths { unstructuredObj, gvk, err := getUnstructuredFromFile(path) if err != nil { - sharedOperationData.TCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + sharedOperationData.tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) } // Obtain GVR. mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { - sharedOperationData.TCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + sharedOperationData.tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) } gvr := mapping.Resource // Distinguish cluster-scoped with namespaced API objects. var dynRes dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { - dynRes = sharedOperationData.TCtx.Dynamic().Resource(gvr).Namespace(namespace) + dynRes = sharedOperationData.tCtx.Dynamic().Resource(gvr).Namespace(namespace) } else { - dynRes = sharedOperationData.TCtx.Dynamic().Resource(gvr) + dynRes = sharedOperationData.tCtx.Dynamic().Resource(gvr) } churnFns = append(churnFns, func(name string) string { if name != "" { - if err := dynRes.Delete(sharedOperationData.TCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { - sharedOperationData.TCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) + if err := dynRes.Delete(sharedOperationData.tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { + sharedOperationData.tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) } return "" } - live, err := dynRes.Create(sharedOperationData.TCtx, unstructuredObj, metav1.CreateOptions{}) + live, err := dynRes.Create(sharedOperationData.tCtx, unstructuredObj, metav1.CreateOptions{}) if err != nil { return "" } @@ -1731,9 +1750,9 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOpe switch concreteOp.Mode { case Create: - sharedOperationData.WG.Add(1) + sharedOperationData.wg.Add(1) go func() { - defer sharedOperationData.WG.Done() + defer sharedOperationData.wg.Done() count, threshold := 0, concreteOp.Number if threshold == 0 { threshold = math.MaxInt32 @@ -1745,15 +1764,15 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOpe churnFns[i]("") } count++ - case <-sharedOperationData.TCtx.Done(): + case <-sharedOperationData.tCtx.Done(): return } } }() case Recreate: - sharedOperationData.WG.Add(1) + sharedOperationData.wg.Add(1) go func() { - defer sharedOperationData.WG.Done() + defer sharedOperationData.wg.Done() retVals := make([][]string, len(churnFns)) // For each churn function, instantiate a slice of strings with length "concreteOp.Number". for i := range retVals { @@ -1768,7 +1787,7 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOpe retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number]) } count++ - case <-sharedOperationData.TCtx.Done(): + case <-sharedOperationData.tCtx.Done(): return } } @@ -1776,77 +1795,75 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOpe } } -func runBarrierOp(opIndex int, concreteOp *barrierOp, sharedOperationData *SharedOperationData) { +func runBarrierOp(opIndex int, concreteOp *barrierOp, sharedOperationData *sharedOperationData) { for _, namespace := range concreteOp.Namespaces { - if _, ok := sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[namespace]; !ok { - sharedOperationData.TCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) + if _, ok := sharedOperationData.workloadState.numPodsScheduledPerNamespace[namespace]; !ok { + sharedOperationData.tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) } } switch concreteOp.StageRequirement { case Attempted: - if err := waitUntilPodsAttempted(sharedOperationData.TCtx, sharedOperationData.PodInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace); err != nil { - sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + if err := waitUntilPodsAttempted(sharedOperationData.tCtx, sharedOperationData.podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.workloadState.numPodsScheduledPerNamespace); err != nil { + sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) } case Scheduled: // Default should be treated like "Scheduled", so handling both in the same way. fallthrough default: - if err := waitUntilPodsScheduled(sharedOperationData.TCtx, sharedOperationData.PodInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace); err != nil { - sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + if err := waitUntilPodsScheduled(sharedOperationData.tCtx, sharedOperationData.podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.workloadState.numPodsScheduledPerNamespace); err != nil { + sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) } // At the end of the barrier, we can be sure that there are no pods // pending scheduling in the namespaces that we just blocked on. if len(concreteOp.Namespaces) == 0 { - sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace = make(map[string]int) + sharedOperationData.workloadState.numPodsScheduledPerNamespace = make(map[string]int) } else { for _, namespace := range concreteOp.Namespaces { - delete(sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace, namespace) + delete(sharedOperationData.workloadState.numPodsScheduledPerNamespace, namespace) } } } } -func runSleepOp(concreteOp *sleepOp, sharedOperationData *SharedOperationData) { +func runSleepOp(concreteOp *sleepOp, sharedOperationData *sharedOperationData) { select { - case <-sharedOperationData.TCtx.Done(): + case <-sharedOperationData.tCtx.Done(): case <-time.After(concreteOp.Duration.Duration): } } -func runStartCollectingMetricsOp(opIndex int, tc *testCase, concreteOp *startCollectingMetricsOp, sharedOperationData *SharedOperationData) { - if sharedOperationData.MetricsData.CollectorCtx != nil { - sharedOperationData.TCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) +func runStartCollectingMetricsOp(opIndex int, tc *testCase, concreteOp *startCollectingMetricsOp, sharedOperationData *sharedOperationData) { + if sharedOperationData.metricsData.collectorCtx != nil { + sharedOperationData.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) } - sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.Collectors = startCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorWG, sharedOperationData.PodInformer, tc.MetricsCollectorConfig, sharedOperationData.MetricsData.ThroughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector) - - defer sharedOperationData.MetricsData.CollectorCtx.Cancel("cleaning up") + sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectors = startCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorWG, sharedOperationData.podInformer, tc.MetricsCollectorConfig, sharedOperationData.metricsData.throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector) } -func runStopCollectingMetricsOp(opIndex int, w *workload, sharedOperationData *SharedOperationData) { - items := stopCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.CollectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.MetricsData.Collectors) - sharedOperationData.WorkloadState.DataItems = append(sharedOperationData.WorkloadState.DataItems, items...) - sharedOperationData.MetricsData.CollectorCtx = nil +func runStopCollectingMetricsOp(opIndex int, w *workload, sharedOperationData *sharedOperationData) { + items := stopCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.metricsData.collectors) + sharedOperationData.workloadState.dataItems = append(sharedOperationData.workloadState.dataItems, items...) + sharedOperationData.metricsData.collectorCtx = nil } -func runDefault(opIndex int, concreteOp realOp, sharedOperationData *SharedOperationData) { +func runDefault(opIndex int, concreteOp realOp, sharedOperationData *sharedOperationData) { runable, ok := concreteOp.(runnableOp) if !ok { - sharedOperationData.TCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) + sharedOperationData.tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) } for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(sharedOperationData.TCtx, namespace, &sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace) + createNamespaceIfNotPresent(sharedOperationData.tCtx, namespace, &sharedOperationData.workloadState.numPodsScheduledPerNamespace) } - runable.run(sharedOperationData.TCtx) + runable.run(sharedOperationData.tCtx) } -func runOperation(tc *testCase, opIndex int, op op, w *workload, sharedOperationData *SharedOperationData) { +func runOperation(tc *testCase, opIndex int, op op, w *workload, sharedOperationData *sharedOperationData) { realOp, err := op.realOp.patchParams(w) if err != nil { - sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) + sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err) } select { - case <-sharedOperationData.TCtx.Done(): - sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, context.Cause(sharedOperationData.TCtx)) + case <-sharedOperationData.tCtx.Done(): + sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, context.Cause(sharedOperationData.tCtx)) default: } switch concreteOp := realOp.(type) {