From a273e5381a46db8969f486cf98e859b7af6b91aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Thu, 19 Sep 2024 13:31:16 +0000 Subject: [PATCH] Add separate ops for collecting metrics from multiple namespaces in scheduler_perf --- .../scheduler_perf/scheduler_perf.go | 188 +++++++++++++----- 1 file changed, 137 insertions(+), 51 deletions(-) diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index eb75dd5a3e7..4a22f0bf033 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -72,17 +72,19 @@ import ( type operationCode string const ( - createAnyOpcode operationCode = "createAny" - createNodesOpcode operationCode = "createNodes" - createNamespacesOpcode operationCode = "createNamespaces" - createPodsOpcode operationCode = "createPods" - createPodSetsOpcode operationCode = "createPodSets" - deletePodsOpcode operationCode = "deletePods" - createResourceClaimsOpcode operationCode = "createResourceClaims" - createResourceDriverOpcode operationCode = "createResourceDriver" - churnOpcode operationCode = "churn" - barrierOpcode operationCode = "barrier" - sleepOpcode operationCode = "sleep" + createAnyOpcode operationCode = "createAny" + createNodesOpcode operationCode = "createNodes" + createNamespacesOpcode operationCode = "createNamespaces" + createPodsOpcode operationCode = "createPods" + createPodSetsOpcode operationCode = "createPodSets" + deletePodsOpcode operationCode = "deletePods" + createResourceClaimsOpcode operationCode = "createResourceClaims" + createResourceDriverOpcode operationCode = "createResourceDriver" + churnOpcode operationCode = "churn" + barrierOpcode operationCode = "barrier" + sleepOpcode operationCode = "sleep" + startCollectingMetricsOpcode operationCode = "startCollectingMetrics" + stopCollectingMetricsOpcode operationCode = "stopCollectingMetrics" ) const ( @@ -414,6 +416,8 @@ func (op *op) UnmarshalJSON(b []byte) error { &churnOp{}, &barrierOp{}, &sleepOp{}, + &startCollectingMetricsOp{}, + &stopCollectingMetricsOp{}, // TODO(#94601): add a delete nodes op to simulate scaling behaviour? } var firstError error @@ -815,6 +819,58 @@ func (so sleepOp) patchParams(_ *workload) (realOp, error) { return &so, nil } +// startCollectingMetricsOp defines an op that starts metrics collectors. +// stopCollectingMetricsOp has to be used after this op to finish collecting. +type startCollectingMetricsOp struct { + // Must be "startCollectingMetrics". + Opcode operationCode + // Name appended to workload's name in results. + Name string + // Namespaces for which the scheduling throughput metric is calculated. + Namespaces []string +} + +func (scm *startCollectingMetricsOp) isValid(_ bool) error { + if scm.Opcode != startCollectingMetricsOpcode { + return fmt.Errorf("invalid opcode %q; expected %q", scm.Opcode, startCollectingMetricsOpcode) + } + if len(scm.Namespaces) == 0 { + return fmt.Errorf("namespaces cannot be empty") + } + return nil +} + +func (*startCollectingMetricsOp) collectsMetrics() bool { + return false +} + +func (scm startCollectingMetricsOp) patchParams(_ *workload) (realOp, error) { + return &scm, nil +} + +// stopCollectingMetricsOp defines an op that stops collecting the metrics +// and writes them into the result slice. +// startCollectingMetricsOp has be used before this op to begin collecting. +type stopCollectingMetricsOp struct { + // Must be "stopCollectingMetrics". + Opcode operationCode +} + +func (scm *stopCollectingMetricsOp) isValid(_ bool) error { + if scm.Opcode != stopCollectingMetricsOpcode { + return fmt.Errorf("invalid opcode %q; expected %q", scm.Opcode, stopCollectingMetricsOpcode) + } + return nil +} + +func (*stopCollectingMetricsOp) collectsMetrics() bool { + return true +} + +func (scm stopCollectingMetricsOp) patchParams(_ *workload) (realOp, error) { + return &scm, nil +} + var useTestingLog = flag.Bool("use-testing-log", false, "Write log entries with testing.TB.Log. This is more suitable for unit testing and debugging, but less realistic in real benchmarks.") func initTestOutput(tb testing.TB) io.Writer { @@ -1110,6 +1166,46 @@ func checkEmptyInFlightEvents() error { return nil } +func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string) (ktesting.TContext, []testDataCollector) { + collectorCtx := ktesting.WithCancel(tCtx) + workloadName := tCtx.Name() + // The first part is the same for each workload, therefore we can strip it. + workloadName = workloadName[strings.Index(name, "/")+1:] + collectors := getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", workloadName, name), namespaces, mcc, throughputErrorMargin) + for _, collector := range collectors { + // Need loop-local variable for function below. + collector := collector + err := collector.init() + if err != nil { + tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err) + } + collectorWG.Add(1) + go func() { + defer collectorWG.Done() + collector.run(collectorCtx) + }() + } + return collectorCtx, collectors +} + +func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) []DataItem { + if collectorCtx == nil { + tCtx.Fatalf("op %d: Missing startCollectingMetrics operation before stopping", opIndex) + } + collectorCtx.Cancel("collecting metrics, collector must stop first") + collectorWG.Wait() + var dataItems []DataItem + for _, collector := range collectors { + items := collector.collect() + dataItems = append(dataItems, items...) + err := compareMetricWithThreshold(items, threshold, tms) + if err != nil { + tCtx.Errorf("op %d: %s", opIndex, err) + } + } + return dataItems +} + func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem { b, benchmarking := tCtx.TB().(*testing.B) if benchmarking { @@ -1145,13 +1241,20 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact defer wg.Wait() defer tCtx.Cancel("workload is done") - var mu sync.Mutex var dataItems []DataItem nextNodeIndex := 0 // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have. // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. numPodsScheduledPerNamespace := make(map[string]int) + var collectors []testDataCollector + // This needs a separate context and wait group because + // the metrics collecting needs to be sure that the goroutines + // are stopped. + var collectorCtx ktesting.TContext + var collectorWG sync.WaitGroup + defer collectorWG.Wait() + for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { realOp, err := op.realOp.patchParams(w) if err != nil { @@ -1204,34 +1307,13 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact if concreteOp.PodTemplatePath == nil { concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath } - var collectors []testDataCollector - // This needs a separate context and wait group because - // the code below needs to be sure that the goroutines - // are stopped. - var collectorCtx ktesting.TContext - var collectorWG sync.WaitGroup - defer collectorWG.Wait() if concreteOp.CollectMetrics { - collectorCtx = ktesting.WithCancel(tCtx) - defer collectorCtx.Cancel("cleaning up") - name := tCtx.Name() - // The first part is the same for each work load, therefore we can strip it. - name = name[strings.Index(name, "/")+1:] - collectors = getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", name, namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin) - for _, collector := range collectors { - // Need loop-local variable for function below. - collector := collector - err = collector.init() - if err != nil { - tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err) - } - collectorWG.Add(1) - go func() { - defer collectorWG.Done() - collector.run(collectorCtx) - }() + if collectorCtx != nil { + tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) } + collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}) + defer collectorCtx.Cancel("cleaning up") } if err := createPods(tCtx, namespace, concreteOp); err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) @@ -1249,18 +1331,9 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact // CollectMetrics and SkipWaitToCompletion can never be true at the // same time, so if we're here, it means that all pods have been // scheduled. - collectorCtx.Cancel("collecting metrix, collector must stop first") - collectorWG.Wait() - mu.Lock() - for _, collector := range collectors { - items := collector.collect() - dataItems = append(dataItems, items...) - err := compareMetricWithThreshold(items, w.Threshold, *w.ThresholdMetricSelector) - if err != nil { - tCtx.Errorf("op %d: %s", opIndex, err) - } - } - mu.Unlock() + items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors) + dataItems = append(dataItems, items...) + collectorCtx = nil } case *deletePodsOp: @@ -1440,6 +1513,19 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact case <-tCtx.Done(): case <-time.After(concreteOp.Duration): } + + case *startCollectingMetricsOp: + if collectorCtx != nil { + tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) + } + collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces) + defer collectorCtx.Cancel("cleaning up") + + case *stopCollectingMetricsOp: + items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors) + dataItems = append(dataItems, items...) + collectorCtx = nil + default: runable, ok := concreteOp.(runnableOp) if !ok { @@ -1481,12 +1567,12 @@ type testDataCollector interface { collect() []DataItem } -func getTestDataCollectors(podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector { +func getTestDataCollectors(podInformer coreinformers.PodInformer, name string, namespaces []string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector { if mcc == nil { mcc = &defaultMetricsCollectorConfig } return []testDataCollector{ - newThroughputCollector(podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin), + newThroughputCollector(podInformer, map[string]string{"Name": name}, namespaces, throughputErrorMargin), newMetricsCollector(mcc, map[string]string{"Name": name}), } }