diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 4a952971921..1a126c8de48 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -1579,29 +1579,27 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact } func (e *WorkloadExecutor) runCreateNodesOp(opIndex int, op *createNodesOp) { - tCtx := *e.tCtx - nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), op, tCtx.Client()) + nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), op, (*e.tCtx).Client()) if err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) + (*e.tCtx).Fatalf("op %d: %v", opIndex, err) } - if err := nodePreparer.PrepareNodes((*e.tCtx), e.nextNodeIndex); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) + if err := nodePreparer.PrepareNodes(*e.tCtx, e.nextNodeIndex); err != nil { + (*e.tCtx).Fatalf("op %d: %v", opIndex, err) } e.nextNodeIndex += op.Count } func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespacesOp) { - tCtx := *e.tCtx - nsPreparer, err := newNamespacePreparer(tCtx, op) + nsPreparer, err := newNamespacePreparer(*e.tCtx, op) if err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) + (*e.tCtx).Fatalf("op %d: %v", opIndex, err) } - if err := nsPreparer.prepare(tCtx); err != nil { - err2 := nsPreparer.cleanup(tCtx) + if err := nsPreparer.prepare(*e.tCtx); err != nil { + err2 := nsPreparer.cleanup(*e.tCtx) if err2 != nil { err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2) } - tCtx.Fatalf("op %d: %v", opIndex, err) + (*e.tCtx).Fatalf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { if _, ok := e.numPodsScheduledPerNamespace[n]; ok { @@ -1613,23 +1611,22 @@ func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespace } func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) { - tCtx := *e.tCtx for _, namespace := range op.Namespaces { if _, ok := e.numPodsScheduledPerNamespace[namespace]; !ok { - tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) + (*e.tCtx).Fatalf("op %d: unknown namespace %s", opIndex, namespace) } } switch op.StageRequirement { case Attempted: - if err := waitUntilPodsAttempted(tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) + if err := waitUntilPodsAttempted(*e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { + (*e.tCtx).Fatalf("op %d: %v", opIndex, err) } case Scheduled: // Default should be treated like "Scheduled", so handling both in the same way. fallthrough default: - if err := waitUntilPodsScheduled(tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) + if err := waitUntilPodsScheduled(*e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { + (*e.tCtx).Fatalf("op %d: %v", opIndex, err) } // At the end of the barrier, we can be sure that there are no pods // pending scheduling in the namespaces that we just blocked on. @@ -1644,34 +1641,31 @@ func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) { } func (e *WorkloadExecutor) runStopCollectingMetrics(opIndex int) { - tCtx := *e.tCtx collectorCtx := *e.collectorCtx - items := stopCollectingMetrics(tCtx, collectorCtx, e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) + items := stopCollectingMetrics(*e.tCtx, collectorCtx, e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) e.dataItems = append(e.dataItems, items...) collectorCtx = nil } func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) { - tCtx := *e.tCtx - collectorCtx := *e.tCtx // define Pod's namespace automatically, and create that namespace. namespace := fmt.Sprintf("namespace-%d", opIndex) if op.Namespace != nil { namespace = *op.Namespace } - createNamespaceIfNotPresent(tCtx, namespace, &e.numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(*e.tCtx, namespace, &e.numPodsScheduledPerNamespace) if op.PodTemplatePath == nil { op.PodTemplatePath = e.testCase.DefaultPodTemplatePath } if op.CollectMetrics { if *e.collectorCtx != nil { - tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + (*e.tCtx).Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) } - *e.collectorCtx, e.collectors = startCollectingMetrics(tCtx, e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) + *e.collectorCtx, e.collectors = startCollectingMetrics(*e.tCtx, e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) } - if err := createPodsRapidly(tCtx, namespace, op); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) + if err := createPodsRapidly(*e.tCtx, namespace, op); err != nil { + (*e.tCtx).Fatalf("op %d: %v", opIndex, err) } switch { case op.SkipWaitToCompletion: @@ -1679,31 +1673,30 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) { // in the future. e.numPodsScheduledPerNamespace[namespace] += op.Count case op.SteadyState: - if err := createPodsSteadily(tCtx, namespace, e.podInformer, op); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) + if err := createPodsSteadily(*e.tCtx, namespace, e.podInformer, op); err != nil { + (*e.tCtx).Fatalf("op %d: %v", opIndex, err) } default: - if err := waitUntilPodsScheduledInNamespace(tCtx, e.podInformer, nil, namespace, op.Count); err != nil { - tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) + if err := waitUntilPodsScheduledInNamespace(*e.tCtx, e.podInformer, nil, namespace, op.Count); err != nil { + (*e.tCtx).Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } if op.CollectMetrics { // CollectMetrics and SkipWaitToCompletion can never be true at the // same time, so if we're here, it means that all pods have been // scheduled. - items := stopCollectingMetrics(tCtx, collectorCtx, e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) + items := stopCollectingMetrics((*e.tCtx), (*e.collectorCtx), e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) e.dataItems = append(e.dataItems, items...) *e.collectorCtx = nil } } func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) { - tCtx := *e.tCtx labelSelector := labels.ValidatedSetSelector(op.LabelSelector) podsToDelete, err := e.podInformer.Lister().Pods(op.Namespace).List(labelSelector) if err != nil { - tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, op.Namespace, err) + (*e.tCtx).Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, op.Namespace, err) } deletePods := func(opIndex int) { @@ -1714,13 +1707,13 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) { for i := 0; i < len(podsToDelete); i++ { select { case <-ticker.C: - if err := tCtx.Client().CoreV1().Pods(op.Namespace).Delete(tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { + if err := (*e.tCtx).Client().CoreV1().Pods(op.Namespace).Delete(*e.tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { if errors.Is(err, context.Canceled) { return } - tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) + (*e.tCtx).Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) } - case <-tCtx.Done(): + case <-(*e.tCtx).Done(): return } } @@ -1729,11 +1722,11 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) { listOpts := metav1.ListOptions{ LabelSelector: labelSelector.String(), } - if err := tCtx.Client().CoreV1().Pods(op.Namespace).DeleteCollection(tCtx, metav1.DeleteOptions{}, listOpts); err != nil { + if err := (*e.tCtx).Client().CoreV1().Pods(op.Namespace).DeleteCollection(*e.tCtx, metav1.DeleteOptions{}, listOpts); err != nil { if errors.Is(err, context.Canceled) { return } - tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, op.Namespace, err) + (*e.tCtx).Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, op.Namespace, err) } } @@ -1749,18 +1742,17 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) { } func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { - tCtx := *e.tCtx var namespace string if op.Namespace != nil { namespace = *op.Namespace } else { namespace = fmt.Sprintf("namespace-%d", opIndex) } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery())) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient((*e.tCtx).Client().Discovery())) // Ensure the namespace exists. nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} - if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + if _, err := (*e.tCtx).Client().CoreV1().Namespaces().Create(*e.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { + (*e.tCtx).Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) } var churnFns []func(name string) string @@ -1768,31 +1760,31 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { for i, path := range op.TemplatePaths { unstructuredObj, gvk, err := getUnstructuredFromFile(path) if err != nil { - tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + (*e.tCtx).Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) } // Obtain GVR. mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { - tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + (*e.tCtx).Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) } gvr := mapping.Resource // Distinguish cluster-scoped with namespaced API objects. var dynRes dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { - dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace) + dynRes = (*e.tCtx).Dynamic().Resource(gvr).Namespace(namespace) } else { - dynRes = tCtx.Dynamic().Resource(gvr) + dynRes = (*e.tCtx).Dynamic().Resource(gvr) } churnFns = append(churnFns, func(name string) string { if name != "" { - if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { - tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) + if err := dynRes.Delete(*e.tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { + (*e.tCtx).Errorf("op %d: unable to delete %v: %v", opIndex, name, err) } return "" } - live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{}) + live, err := dynRes.Create(*e.tCtx, unstructuredObj, metav1.CreateOptions{}) if err != nil { return "" } @@ -1823,7 +1815,7 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { churnFns[i]("") } count++ - case <-tCtx.Done(): + case <-(*e.tCtx).Done(): return } } @@ -1847,7 +1839,7 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { retVals[i][count%op.Number] = churnFns[i](retVals[i][count%op.Number]) } count++ - case <-tCtx.Done(): + case <-(*e.tCtx).Done(): return } } @@ -1856,15 +1848,14 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { } func (e *WorkloadExecutor) runDefaultOp(opIndex int, op realOp) { - tCtx := *e.tCtx runable, ok := op.(runnableOp) if !ok { - tCtx.Fatalf("op %d: invalid op %v", opIndex, op) + (*e.tCtx).Fatalf("op %d: invalid op %v", opIndex, op) } for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(tCtx, namespace, &e.numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(*e.tCtx, namespace, &e.numPodsScheduledPerNamespace) } - runable.run(tCtx) + runable.run(*e.tCtx) } func (e *WorkloadExecutor) runStartCollectingMetricsOp(opIndex int, op *startCollectingMetricsOp) {