diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 9b9f6e58e0a..ed86a789dba 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -1410,17 +1410,6 @@ func checkEmptyInFlightEvents() error { return nil } -type workloadExecutor struct { - tCtx ktesting.TContext - wg *sync.WaitGroup - collectorCtx *ktesting.TContext - collectorWG *sync.WaitGroup - collectors *[]testDataCollector - numPodsScheduledPerNamespace map[string]int - podInformer coreinformers.PodInformer - throughputErrorMargin float64 -} - func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector) { collectorCtx := ktesting.WithCancel(tCtx) workloadName := tCtx.Name() @@ -1461,6 +1450,21 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex return dataItems } +type WorkloadExecutor struct { + tCtx *ktesting.TContext + wg *sync.WaitGroup + collectorCtx *ktesting.TContext + collectorWG *sync.WaitGroup + collectors []testDataCollector + dataItems []DataItem + numPodsScheduledPerNamespace map[string]int + podInformer coreinformers.PodInformer + throughputErrorMargin float64 + tc *testCase + w *workload + nextNodeIndex int +} + func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem { b, benchmarking := tCtx.TB().(*testing.B) if benchmarking { @@ -1488,42 +1492,84 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact // already created before (scheduler.NewInformerFactory) and the // factory was started for it (mustSetupCluster), therefore we don't // need to start again. - // podInformer := informerFactory.Core().V1().Pods() + podInformer := informerFactory.Core().V1().Pods() // Everything else started by this function gets stopped before it returns. tCtx = ktesting.WithCancel(tCtx) - // var wg sync.WaitGroup - // defer wg.Wait() + var wg sync.WaitGroup + defer wg.Wait() defer tCtx.Cancel("workload is done") var dataItems []DataItem - // nextNodeIndex := 0 + // // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have. // // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. // numPodsScheduledPerNamespace := make(map[string]int) - // sharedOperationData := sharedOperationData{ - // tCtx: tCtx, - // wg: &wg, - // metricsData: &metricsCollectionData{ - // collectorWG: &sync.WaitGroup{}, - // throughputErrorMargin: throughputErrorMargin, - // }, - // workloadState: &workloadState{ - // numPodsScheduledPerNamespace: make(map[string]int), - // }, - // podInformer: informerFactory.Core().V1().Pods(), - // } - - // var collectors []testDataCollector + var collectors []testDataCollector // // This needs a separate context and wait group because // // the metrics collecting needs to be sure that the goroutines // // are stopped. - // var collectorCtx ktesting.TContext - // var collectorWG sync.WaitGroup - // defer collectorWG.Wait() + var collectorCtx ktesting.TContext + var collectorWG sync.WaitGroup + defer collectorWG.Wait() - runJobs(tCtx, tc, w, informerFactory, throughputErrorMargin) + executor := WorkloadExecutor{ + tCtx: &tCtx, + wg: &wg, + collectorCtx: &collectorCtx, + collectorWG: &collectorWG, + collectors: collectors, + numPodsScheduledPerNamespace: make(map[string]int), + podInformer: podInformer, + throughputErrorMargin: throughputErrorMargin, + tc: tc, + w: w, + nextNodeIndex: 0, + dataItems: dataItems, + } + + for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { + realOp, err := op.realOp.patchParams(w) + if err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) + } + select { + case <-tCtx.Done(): + tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx)) + default: + } + switch concreteOp := realOp.(type) { + case *createNodesOp: + executor.doCreateNodesOp(opIndex, concreteOp) + + case *createNamespacesOp: + executor.doCreateNamespaceOp(opIndex, concreteOp) + case *createPodsOp: + executor.doCreatePodsOp(opIndex, concreteOp) + if *executor.collectorCtx != nil { + defer (*executor.collectorCtx).Cancel("cleaning up") + } + case *deletePodsOp: + executor.doDeletePodsOp(opIndex, concreteOp) + case *churnOp: + executor.doChurnOp(opIndex, concreteOp) + case *barrierOp: + executor.doBarrierOp(opIndex, concreteOp) + case *sleepOp: + select { + case <-tCtx.Done(): + case <-time.After(concreteOp.Duration.Duration): + } + case *startCollectingMetricsOp: + executor.doStartCollectingMetricsOp(opIndex, concreteOp) + defer (*executor.collectorCtx).Cancel("cleaning up") + case *stopCollectingMetricsOp: + executor.doStopCollectingMetrics(opIndex) + default: + executor.doDefaultOp(opIndex, concreteOp) + } + } // check unused params and inform users unusedParams := w.unusedParams() @@ -1536,18 +1582,20 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact return dataItems } -func doCreateNodesOp(tCtx ktesting.TContext, opIndex int, concreteOp *createNodesOp, nextNodeIndex *int) { +func (e *WorkloadExecutor) doCreateNodesOp(opIndex int, concreteOp *createNodesOp) { + tCtx := *e.tCtx nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client()) if err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) } - if err := nodePreparer.PrepareNodes(tCtx, *nextNodeIndex); err != nil { + if err := nodePreparer.PrepareNodes((*e.tCtx), e.nextNodeIndex); err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) } - *nextNodeIndex += concreteOp.Count + e.nextNodeIndex += concreteOp.Count } -func doCreateNamespaceOp(tCtx ktesting.TContext, opIndex int, concreteOp *createNamespacesOp, numPodsScheduledPerNamespace map[string]int) { +func (e *WorkloadExecutor) doCreateNamespaceOp(opIndex int, concreteOp *createNamespacesOp) { + tCtx := *e.tCtx nsPreparer, err := newNamespacePreparer(tCtx, concreteOp) if err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) @@ -1560,79 +1608,72 @@ func doCreateNamespaceOp(tCtx ktesting.TContext, opIndex int, concreteOp *create tCtx.Fatalf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { - if _, ok := numPodsScheduledPerNamespace[n]; ok { + if _, ok := e.numPodsScheduledPerNamespace[n]; ok { // this namespace has been already created. continue } - numPodsScheduledPerNamespace[n] = 0 + e.numPodsScheduledPerNamespace[n] = 0 } } -func doBarrierOp(tCtx ktesting.TContext, opIndex int, concreteOp *barrierOp, numPodsScheduledPerNamespace map[string]int, podInformer coreinformers.PodInformer) { +func (e *WorkloadExecutor) doBarrierOp(opIndex int, concreteOp *barrierOp) { + tCtx := *e.tCtx for _, namespace := range concreteOp.Namespaces { - if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { + if _, ok := e.numPodsScheduledPerNamespace[namespace]; !ok { tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) } } switch concreteOp.StageRequirement { case Attempted: - if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + if err := waitUntilPodsAttempted(tCtx, e.podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, e.numPodsScheduledPerNamespace); err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) } case Scheduled: // Default should be treated like "Scheduled", so handling both in the same way. fallthrough default: - if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + if err := waitUntilPodsScheduled(tCtx, e.podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, e.numPodsScheduledPerNamespace); err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) } // At the end of the barrier, we can be sure that there are no pods // pending scheduling in the namespaces that we just blocked on. if len(concreteOp.Namespaces) == 0 { - numPodsScheduledPerNamespace = make(map[string]int) + e.numPodsScheduledPerNamespace = make(map[string]int) } else { for _, namespace := range concreteOp.Namespaces { - delete(numPodsScheduledPerNamespace, namespace) + delete(e.numPodsScheduledPerNamespace, namespace) } } } } -func doStopCollectingMetrics(tCtx ktesting.TContext, collectorCtx *ktesting.TContext, opIndex int, dataItems *[]DataItem, w *workload, collectors []testDataCollector, collectorWG *sync.WaitGroup) { - items := stopCollectingMetrics(tCtx, *collectorCtx, collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors) - *dataItems = append(*dataItems, items...) - *collectorCtx = nil +func (e *WorkloadExecutor) doStopCollectingMetrics(opIndex int) { + tCtx := *e.tCtx + collectorCtx := *e.collectorCtx + items := stopCollectingMetrics(tCtx, collectorCtx, e.collectorWG, e.w.Threshold, *e.w.ThresholdMetricSelector, opIndex, e.collectors) + e.dataItems = append(e.dataItems, items...) + collectorCtx = nil } -func doCreatePodsOp( - tCtx ktesting.TContext, - opIndex int, - concreteOp *createPodsOp, - numPodsScheduledPerNamespace map[string]int, - dataItems *[]DataItem, - w *workload, - collectors *[]testDataCollector, - collectorWG *sync.WaitGroup, - throughputErrorMargin float64, - podInformer coreinformers.PodInformer, - tc *testCase, - collectorCtx *ktesting.TContext) { +func (e *WorkloadExecutor) doCreatePodsOp(opIndex int, concreteOp *createPodsOp) { + tCtx := *e.tCtx + collectorCtx := *e.tCtx var namespace string // define Pod's namespace automatically, and create that namespace. namespace = fmt.Sprintf("namespace-%d", opIndex) if concreteOp.Namespace != nil { namespace = *concreteOp.Namespace } - createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(tCtx, namespace, &e.numPodsScheduledPerNamespace) if concreteOp.PodTemplatePath == nil { - concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath + concreteOp.PodTemplatePath = e.tc.DefaultPodTemplatePath } if concreteOp.CollectMetrics { - if *collectorCtx != nil { + if *e.collectorCtx != nil { tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) } - *collectorCtx, *collectors = startCollectingMetrics(tCtx, collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) + *e.collectorCtx, e.collectors = startCollectingMetrics(tCtx, e.collectorWG, e.podInformer, e.tc.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) } if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) @@ -1641,13 +1682,13 @@ func doCreatePodsOp( case concreteOp.SkipWaitToCompletion: // Only record those namespaces that may potentially require barriers // in the future. - numPodsScheduledPerNamespace[namespace] += concreteOp.Count + e.numPodsScheduledPerNamespace[namespace] += concreteOp.Count case concreteOp.SteadyState: - if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil { + if err := createPodsSteadily(tCtx, namespace, e.podInformer, concreteOp); err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) } default: - if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, nil, namespace, concreteOp.Count); err != nil { + if err := waitUntilPodsScheduledInNamespace(tCtx, e.podInformer, nil, namespace, concreteOp.Count); err != nil { tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } @@ -1655,16 +1696,17 @@ func doCreatePodsOp( // CollectMetrics and SkipWaitToCompletion can never be true at the // same time, so if we're here, it means that all pods have been // scheduled. - items := stopCollectingMetrics(tCtx, *collectorCtx, collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, *collectors) - *dataItems = append(*dataItems, items...) - *collectorCtx = nil + items := stopCollectingMetrics(tCtx, collectorCtx, e.collectorWG, e.w.Threshold, *e.w.ThresholdMetricSelector, opIndex, e.collectors) + e.dataItems = append(e.dataItems, items...) + *e.collectorCtx = nil } } -func doDeletePodsOp(tCtx ktesting.TContext, opIndex int, concreteOp *deletePodsOp, podInformer coreinformers.PodInformer, wg *sync.WaitGroup) { +func (e *WorkloadExecutor) doDeletePodsOp(opIndex int, concreteOp *deletePodsOp) { + tCtx := *e.tCtx labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector) - podsToDelete, err := podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) + podsToDelete, err := e.podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) if err != nil { tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) } @@ -1701,9 +1743,9 @@ func doDeletePodsOp(tCtx ktesting.TContext, opIndex int, concreteOp *deletePodsO } if concreteOp.SkipWaitToCompletion { - wg.Add(1) + e.wg.Add(1) go func(opIndex int) { - defer wg.Done() + defer e.wg.Done() deletePods(opIndex) }(opIndex) } else { @@ -1711,7 +1753,8 @@ func doDeletePodsOp(tCtx ktesting.TContext, opIndex int, concreteOp *deletePodsO } } -func doChurnOp(tCtx ktesting.TContext, opIndex int, concreteOp *churnOp, wg *sync.WaitGroup) { +func (e *WorkloadExecutor) doChurnOp(opIndex int, concreteOp *churnOp) { + tCtx := *e.tCtx var namespace string if concreteOp.Namespace != nil { namespace = *concreteOp.Namespace @@ -1770,9 +1813,9 @@ func doChurnOp(tCtx ktesting.TContext, opIndex int, concreteOp *churnOp, wg *syn switch concreteOp.Mode { case Create: - wg.Add(1) + e.wg.Add(1) go func() { - defer wg.Done() + defer e.wg.Done() defer ticker.Stop() count, threshold := 0, concreteOp.Number if threshold == 0 { @@ -1791,9 +1834,9 @@ func doChurnOp(tCtx ktesting.TContext, opIndex int, concreteOp *churnOp, wg *syn } }() case Recreate: - wg.Add(1) + e.wg.Add(1) go func() { - defer wg.Done() + defer e.wg.Done() defer ticker.Stop() retVals := make([][]string, len(churnFns)) // For each churn function, instantiate a slice of strings with length "concreteOp.Number". @@ -1817,86 +1860,23 @@ func doChurnOp(tCtx ktesting.TContext, opIndex int, concreteOp *churnOp, wg *syn } } -func doDefaultOp(tCtx ktesting.TContext, opIndex int, concreteOp realOp, numPodsScheduledPerNamespace map[string]int) { +func (e *WorkloadExecutor) doDefaultOp(opIndex int, concreteOp realOp) { + tCtx := *e.tCtx runable, ok := concreteOp.(runnableOp) if !ok { tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) } for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(tCtx, namespace, &e.numPodsScheduledPerNamespace) } runable.run(tCtx) } -func doStartCollectingMetricsOp(tCtx ktesting.TContext, opIndex int, concreteOp *startCollectingMetricsOp, collectorCtx *ktesting.TContext, collectors *[]testDataCollector, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, tc *testCase, throughputErrorMargin float64) { - if *collectorCtx != nil { - tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) - } - *collectorCtx, *collectors = startCollectingMetrics(tCtx, collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector) -} - -func runJobs(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, throughputErrorMargin float64) { - var wg sync.WaitGroup - defer wg.Wait() - defer tCtx.Cancel("workload is done") - numPodsScheduledPerNamespace := make(map[string]int) - nextNodeIndex := 0 - podInformer := informerFactory.Core().V1().Pods() - var collectors []testDataCollector - // This needs a separate context and wait group because - // the metrics collecting needs to be sure that the goroutines - // are stopped. - var collectorCtx ktesting.TContext - var collectorWG sync.WaitGroup - defer collectorWG.Wait() - var dataItems []DataItem - for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { - realOp, err := op.realOp.patchParams(w) - if err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - select { - case <-tCtx.Done(): - tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx)) - default: - } - switch concreteOp := realOp.(type) { - case *createNodesOp: - doCreateNodesOp(tCtx, opIndex, concreteOp, &nextNodeIndex) - - case *createNamespacesOp: - doCreateNamespaceOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace) - case *createPodsOp: - doCreatePodsOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace, &dataItems, w, &collectors, &collectorWG, throughputErrorMargin, podInformer, tc, &collectorCtx) - if collectorCtx != nil { - defer collectorCtx.Cancel("cleaning up") - } - case *deletePodsOp: - doDeletePodsOp(tCtx, opIndex, concreteOp, podInformer, &wg) - - case *churnOp: - doChurnOp(tCtx, opIndex, concreteOp, &wg) - - case *barrierOp: - doBarrierOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace, podInformer) - - case *sleepOp: - select { - case <-tCtx.Done(): - case <-time.After(concreteOp.Duration.Duration): - } - - case *startCollectingMetricsOp: - doStartCollectingMetricsOp(tCtx, opIndex, concreteOp, &collectorCtx, &collectors, &collectorWG, podInformer, tc, throughputErrorMargin) - defer collectorCtx.Cancel("cleaning up") - - case *stopCollectingMetricsOp: - doStopCollectingMetrics(tCtx, &collectorCtx, opIndex, &dataItems, w, collectors, &collectorWG) - - default: - doDefaultOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace) - } +func (e *WorkloadExecutor) doStartCollectingMetricsOp(opIndex int, concreteOp *startCollectingMetricsOp) { + if *e.collectorCtx != nil { + (*e.tCtx).Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) } + *e.collectorCtx, e.collectors = startCollectingMetrics((*e.tCtx), e.collectorWG, e.podInformer, e.tc.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector) } func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {