diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 312184fe7be..59581470d30 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -1451,10 +1451,10 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex } type WorkloadExecutor struct { - tCtx *ktesting.TContext - wg *sync.WaitGroup - collectorCtx *ktesting.TContext - collectorWG *sync.WaitGroup + tCtx ktesting.TContext + wg sync.WaitGroup + collectorCtx ktesting.TContext + collectorWG sync.WaitGroup collectors []testDataCollector dataItems []DataItem numPodsScheduledPerNamespace map[string]int @@ -1511,10 +1511,10 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact defer collectorWG.Wait() executor := WorkloadExecutor{ - tCtx: &tCtx, - wg: &wg, - collectorCtx: &collectorCtx, - collectorWG: &collectorWG, + tCtx: tCtx, + wg: wg, + collectorCtx: collectorCtx, + collectorWG: collectorWG, collectors: collectors, numPodsScheduledPerNamespace: make(map[string]int), podInformer: podInformer, @@ -1543,8 +1543,8 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact executor.runCreateNamespaceOp(opIndex, concreteOp) case *createPodsOp: executor.runCreatePodsOp(opIndex, concreteOp) - if *executor.collectorCtx != nil { - defer (*executor.collectorCtx).Cancel("cleaning up") + if executor.collectorCtx != nil { + executor.collectorCtx.Cancel("cleaning up") } case *deletePodsOp: executor.runDeletePodsOp(opIndex, concreteOp) @@ -1556,7 +1556,7 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact executor.runSleepOp(concreteOp) case *startCollectingMetricsOp: executor.runStartCollectingMetricsOp(opIndex, concreteOp) - defer (*executor.collectorCtx).Cancel("cleaning up") + defer executor.collectorCtx.Cancel("cleaning up") case *stopCollectingMetricsOp: executor.runStopCollectingMetrics(opIndex) default: @@ -1576,27 +1576,27 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact } func (e *WorkloadExecutor) runCreateNodesOp(opIndex int, op *createNodesOp) { - nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), op, (*e.tCtx).Client()) + nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), op, e.tCtx.Client()) if err != nil { - (*e.tCtx).Fatalf("op %d: %v", opIndex, err) + e.tCtx.Fatalf("op %d: %v", opIndex, err) } - if err := nodePreparer.PrepareNodes(*e.tCtx, e.nextNodeIndex); err != nil { - (*e.tCtx).Fatalf("op %d: %v", opIndex, err) + if err := nodePreparer.PrepareNodes(e.tCtx, e.nextNodeIndex); err != nil { + e.tCtx.Fatalf("op %d: %v", opIndex, err) } e.nextNodeIndex += op.Count } func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespacesOp) { - nsPreparer, err := newNamespacePreparer(*e.tCtx, op) + nsPreparer, err := newNamespacePreparer(e.tCtx, op) if err != nil { - (*e.tCtx).Fatalf("op %d: %v", opIndex, err) + e.tCtx.Fatalf("op %d: %v", opIndex, err) } - if err := nsPreparer.prepare(*e.tCtx); err != nil { - err2 := nsPreparer.cleanup(*e.tCtx) + if err := nsPreparer.prepare(e.tCtx); err != nil { + err2 := nsPreparer.cleanup(e.tCtx) if err2 != nil { err = fmt.Errorf("prepare: %w; cleanup: %w", err, err2) } - (*e.tCtx).Fatalf("op %d: %v", opIndex, err) + e.tCtx.Fatalf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { if _, ok := e.numPodsScheduledPerNamespace[n]; ok { @@ -1610,20 +1610,20 @@ func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespace func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) { for _, namespace := range op.Namespaces { if _, ok := e.numPodsScheduledPerNamespace[namespace]; !ok { - (*e.tCtx).Fatalf("op %d: unknown namespace %s", opIndex, namespace) + e.tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) } } switch op.StageRequirement { case Attempted: - if err := waitUntilPodsAttempted(*e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { - (*e.tCtx).Fatalf("op %d: %v", opIndex, err) + if err := waitUntilPodsAttempted(e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { + e.tCtx.Fatalf("op %d: %v", opIndex, err) } case Scheduled: // Default should be treated like "Scheduled", so handling both in the same way. fallthrough default: - if err := waitUntilPodsScheduled(*e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { - (*e.tCtx).Fatalf("op %d: %v", opIndex, err) + if err := waitUntilPodsScheduled(e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { + e.tCtx.Fatalf("op %d: %v", opIndex, err) } // At the end of the barrier, we can be sure that there are no pods // pending scheduling in the namespaces that we just blocked on. @@ -1639,15 +1639,15 @@ func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) { func (e *WorkloadExecutor) runSleepOp(op *sleepOp) { select { - case <-(*e.tCtx).Done(): + case <-(e.tCtx).Done(): case <-time.After(op.Duration.Duration): } } func (e *WorkloadExecutor) runStopCollectingMetrics(opIndex int) { - items := stopCollectingMetrics(*e.tCtx, *e.collectorCtx, e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) + items := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) e.dataItems = append(e.dataItems, items...) - *e.collectorCtx = nil + e.collectorCtx = nil } func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) { @@ -1656,19 +1656,22 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) { if op.Namespace != nil { namespace = *op.Namespace } - createNamespaceIfNotPresent(*e.tCtx, namespace, &e.numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(e.tCtx, namespace, &e.numPodsScheduledPerNamespace) if op.PodTemplatePath == nil { op.PodTemplatePath = e.testCase.DefaultPodTemplatePath } if op.CollectMetrics { - if *e.collectorCtx != nil { - (*e.tCtx).Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + if e.collectorCtx != nil { + e.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) } - *e.collectorCtx, e.collectors = startCollectingMetrics(*e.tCtx, e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) + e.collectorCtx, e.collectors = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) + // e.collectorCtx.Cleanup(func() { + // e.collectorCtx.Cancel("cleaning up") + // }) } - if err := createPodsRapidly(*e.tCtx, namespace, op); err != nil { - (*e.tCtx).Fatalf("op %d: %v", opIndex, err) + if err := createPodsRapidly(e.tCtx, namespace, op); err != nil { + e.tCtx.Fatalf("op %d: %v", opIndex, err) } switch { case op.SkipWaitToCompletion: @@ -1676,21 +1679,21 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) { // in the future. e.numPodsScheduledPerNamespace[namespace] += op.Count case op.SteadyState: - if err := createPodsSteadily(*e.tCtx, namespace, e.podInformer, op); err != nil { - (*e.tCtx).Fatalf("op %d: %v", opIndex, err) + if err := createPodsSteadily(e.tCtx, namespace, e.podInformer, op); err != nil { + e.tCtx.Fatalf("op %d: %v", opIndex, err) } default: - if err := waitUntilPodsScheduledInNamespace(*e.tCtx, e.podInformer, nil, namespace, op.Count); err != nil { - (*e.tCtx).Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) + if err := waitUntilPodsScheduledInNamespace(e.tCtx, e.podInformer, nil, namespace, op.Count); err != nil { + e.tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } if op.CollectMetrics { // CollectMetrics and SkipWaitToCompletion can never be true at the // same time, so if we're here, it means that all pods have been // scheduled. - items := stopCollectingMetrics((*e.tCtx), (*e.collectorCtx), e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) + items := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) e.dataItems = append(e.dataItems, items...) - *e.collectorCtx = nil + e.collectorCtx = nil } } @@ -1699,7 +1702,7 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) { podsToDelete, err := e.podInformer.Lister().Pods(op.Namespace).List(labelSelector) if err != nil { - (*e.tCtx).Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, op.Namespace, err) + e.tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, op.Namespace, err) } deletePods := func(opIndex int) { @@ -1710,13 +1713,13 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) { for i := 0; i < len(podsToDelete); i++ { select { case <-ticker.C: - if err := (*e.tCtx).Client().CoreV1().Pods(op.Namespace).Delete(*e.tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { + if err := e.tCtx.Client().CoreV1().Pods(op.Namespace).Delete(e.tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { if errors.Is(err, context.Canceled) { return } - (*e.tCtx).Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) + e.tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) } - case <-(*e.tCtx).Done(): + case <-(e.tCtx).Done(): return } } @@ -1725,11 +1728,11 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) { listOpts := metav1.ListOptions{ LabelSelector: labelSelector.String(), } - if err := (*e.tCtx).Client().CoreV1().Pods(op.Namespace).DeleteCollection(*e.tCtx, metav1.DeleteOptions{}, listOpts); err != nil { + if err := e.tCtx.Client().CoreV1().Pods(op.Namespace).DeleteCollection(e.tCtx, metav1.DeleteOptions{}, listOpts); err != nil { if errors.Is(err, context.Canceled) { return } - (*e.tCtx).Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, op.Namespace, err) + e.tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, op.Namespace, err) } } @@ -1751,11 +1754,11 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { } else { namespace = fmt.Sprintf("namespace-%d", opIndex) } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient((*e.tCtx).Client().Discovery())) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(e.tCtx.Client().Discovery())) // Ensure the namespace exists. nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} - if _, err := (*e.tCtx).Client().CoreV1().Namespaces().Create(*e.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - (*e.tCtx).Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + if _, err := e.tCtx.Client().CoreV1().Namespaces().Create(e.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { + e.tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) } var churnFns []func(name string) string @@ -1763,31 +1766,31 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { for i, path := range op.TemplatePaths { unstructuredObj, gvk, err := getUnstructuredFromFile(path) if err != nil { - (*e.tCtx).Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + e.tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) } // Obtain GVR. mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { - (*e.tCtx).Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + e.tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) } gvr := mapping.Resource // Distinguish cluster-scoped with namespaced API objects. var dynRes dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { - dynRes = (*e.tCtx).Dynamic().Resource(gvr).Namespace(namespace) + dynRes = e.tCtx.Dynamic().Resource(gvr).Namespace(namespace) } else { - dynRes = (*e.tCtx).Dynamic().Resource(gvr) + dynRes = e.tCtx.Dynamic().Resource(gvr) } churnFns = append(churnFns, func(name string) string { if name != "" { - if err := dynRes.Delete(*e.tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { - (*e.tCtx).Errorf("op %d: unable to delete %v: %v", opIndex, name, err) + if err := dynRes.Delete(e.tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { + e.tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) } return "" } - live, err := dynRes.Create(*e.tCtx, unstructuredObj, metav1.CreateOptions{}) + live, err := dynRes.Create(e.tCtx, unstructuredObj, metav1.CreateOptions{}) if err != nil { return "" } @@ -1818,7 +1821,7 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { churnFns[i]("") } count++ - case <-(*e.tCtx).Done(): + case <-(e.tCtx).Done(): return } } @@ -1842,7 +1845,7 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { retVals[i][count%op.Number] = churnFns[i](retVals[i][count%op.Number]) } count++ - case <-(*e.tCtx).Done(): + case <-(e.tCtx).Done(): return } } @@ -1853,19 +1856,22 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { func (e *WorkloadExecutor) runDefaultOp(opIndex int, op realOp) { runable, ok := op.(runnableOp) if !ok { - (*e.tCtx).Fatalf("op %d: invalid op %v", opIndex, op) + e.tCtx.Fatalf("op %d: invalid op %v", opIndex, op) } for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(*e.tCtx, namespace, &e.numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(e.tCtx, namespace, &e.numPodsScheduledPerNamespace) } - runable.run(*e.tCtx) + runable.run(e.tCtx) } func (e *WorkloadExecutor) runStartCollectingMetricsOp(opIndex int, op *startCollectingMetricsOp) { - if *e.collectorCtx != nil { - (*e.tCtx).Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + if e.collectorCtx != nil { + e.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) } - *e.collectorCtx, e.collectors = startCollectingMetrics((*e.tCtx), e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, op.Name, op.Namespaces, op.LabelSelector) + e.collectorCtx, e.collectors = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, op.Name, op.Namespaces, op.LabelSelector) + // e.collectorCtx.Cleanup(func() { + // collectorCtx.Cancel("cleaning up") + // }) } func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {