use workloadExecutor

This commit is contained in:
YamasouA 2025-02-11 12:18:14 +09:00
parent 479f9cd898
commit 297b35873f

View File

@ -1410,17 +1410,6 @@ func checkEmptyInFlightEvents() error {
return nil return nil
} }
type workloadExecutor struct {
tCtx ktesting.TContext
wg *sync.WaitGroup
collectorCtx *ktesting.TContext
collectorWG *sync.WaitGroup
collectors *[]testDataCollector
numPodsScheduledPerNamespace map[string]int
podInformer coreinformers.PodInformer
throughputErrorMargin float64
}
func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector) { func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector) {
collectorCtx := ktesting.WithCancel(tCtx) collectorCtx := ktesting.WithCancel(tCtx)
workloadName := tCtx.Name() workloadName := tCtx.Name()
@ -1461,6 +1450,21 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex
return dataItems return dataItems
} }
type WorkloadExecutor struct {
tCtx *ktesting.TContext
wg *sync.WaitGroup
collectorCtx *ktesting.TContext
collectorWG *sync.WaitGroup
collectors []testDataCollector
dataItems []DataItem
numPodsScheduledPerNamespace map[string]int
podInformer coreinformers.PodInformer
throughputErrorMargin float64
tc *testCase
w *workload
nextNodeIndex int
}
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem { func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem {
b, benchmarking := tCtx.TB().(*testing.B) b, benchmarking := tCtx.TB().(*testing.B)
if benchmarking { if benchmarking {
@ -1488,42 +1492,84 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
// already created before (scheduler.NewInformerFactory) and the // already created before (scheduler.NewInformerFactory) and the
// factory was started for it (mustSetupCluster), therefore we don't // factory was started for it (mustSetupCluster), therefore we don't
// need to start again. // need to start again.
// podInformer := informerFactory.Core().V1().Pods() podInformer := informerFactory.Core().V1().Pods()
// Everything else started by this function gets stopped before it returns. // Everything else started by this function gets stopped before it returns.
tCtx = ktesting.WithCancel(tCtx) tCtx = ktesting.WithCancel(tCtx)
// var wg sync.WaitGroup var wg sync.WaitGroup
// defer wg.Wait() defer wg.Wait()
defer tCtx.Cancel("workload is done") defer tCtx.Cancel("workload is done")
var dataItems []DataItem var dataItems []DataItem
// nextNodeIndex := 0
// // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have. // // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have.
// // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. // // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
// numPodsScheduledPerNamespace := make(map[string]int) // numPodsScheduledPerNamespace := make(map[string]int)
// sharedOperationData := sharedOperationData{ var collectors []testDataCollector
// tCtx: tCtx,
// wg: &wg,
// metricsData: &metricsCollectionData{
// collectorWG: &sync.WaitGroup{},
// throughputErrorMargin: throughputErrorMargin,
// },
// workloadState: &workloadState{
// numPodsScheduledPerNamespace: make(map[string]int),
// },
// podInformer: informerFactory.Core().V1().Pods(),
// }
// var collectors []testDataCollector
// // This needs a separate context and wait group because // // This needs a separate context and wait group because
// // the metrics collecting needs to be sure that the goroutines // // the metrics collecting needs to be sure that the goroutines
// // are stopped. // // are stopped.
// var collectorCtx ktesting.TContext var collectorCtx ktesting.TContext
// var collectorWG sync.WaitGroup var collectorWG sync.WaitGroup
// defer collectorWG.Wait() defer collectorWG.Wait()
runJobs(tCtx, tc, w, informerFactory, throughputErrorMargin) executor := WorkloadExecutor{
tCtx: &tCtx,
wg: &wg,
collectorCtx: &collectorCtx,
collectorWG: &collectorWG,
collectors: collectors,
numPodsScheduledPerNamespace: make(map[string]int),
podInformer: podInformer,
throughputErrorMargin: throughputErrorMargin,
tc: tc,
w: w,
nextNodeIndex: 0,
dataItems: dataItems,
}
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
realOp, err := op.realOp.patchParams(w)
if err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
select {
case <-tCtx.Done():
tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx))
default:
}
switch concreteOp := realOp.(type) {
case *createNodesOp:
executor.doCreateNodesOp(opIndex, concreteOp)
case *createNamespacesOp:
executor.doCreateNamespaceOp(opIndex, concreteOp)
case *createPodsOp:
executor.doCreatePodsOp(opIndex, concreteOp)
if *executor.collectorCtx != nil {
defer (*executor.collectorCtx).Cancel("cleaning up")
}
case *deletePodsOp:
executor.doDeletePodsOp(opIndex, concreteOp)
case *churnOp:
executor.doChurnOp(opIndex, concreteOp)
case *barrierOp:
executor.doBarrierOp(opIndex, concreteOp)
case *sleepOp:
select {
case <-tCtx.Done():
case <-time.After(concreteOp.Duration.Duration):
}
case *startCollectingMetricsOp:
executor.doStartCollectingMetricsOp(opIndex, concreteOp)
defer (*executor.collectorCtx).Cancel("cleaning up")
case *stopCollectingMetricsOp:
executor.doStopCollectingMetrics(opIndex)
default:
executor.doDefaultOp(opIndex, concreteOp)
}
}
// check unused params and inform users // check unused params and inform users
unusedParams := w.unusedParams() unusedParams := w.unusedParams()
@ -1536,18 +1582,20 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
return dataItems return dataItems
} }
func doCreateNodesOp(tCtx ktesting.TContext, opIndex int, concreteOp *createNodesOp, nextNodeIndex *int) { func (e *WorkloadExecutor) doCreateNodesOp(opIndex int, concreteOp *createNodesOp) {
tCtx := *e.tCtx
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client()) nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client())
if err != nil { if err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) tCtx.Fatalf("op %d: %v", opIndex, err)
} }
if err := nodePreparer.PrepareNodes(tCtx, *nextNodeIndex); err != nil { if err := nodePreparer.PrepareNodes((*e.tCtx), e.nextNodeIndex); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) tCtx.Fatalf("op %d: %v", opIndex, err)
} }
*nextNodeIndex += concreteOp.Count e.nextNodeIndex += concreteOp.Count
} }
func doCreateNamespaceOp(tCtx ktesting.TContext, opIndex int, concreteOp *createNamespacesOp, numPodsScheduledPerNamespace map[string]int) { func (e *WorkloadExecutor) doCreateNamespaceOp(opIndex int, concreteOp *createNamespacesOp) {
tCtx := *e.tCtx
nsPreparer, err := newNamespacePreparer(tCtx, concreteOp) nsPreparer, err := newNamespacePreparer(tCtx, concreteOp)
if err != nil { if err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) tCtx.Fatalf("op %d: %v", opIndex, err)
@ -1560,79 +1608,72 @@ func doCreateNamespaceOp(tCtx ktesting.TContext, opIndex int, concreteOp *create
tCtx.Fatalf("op %d: %v", opIndex, err) tCtx.Fatalf("op %d: %v", opIndex, err)
} }
for _, n := range nsPreparer.namespaces() { for _, n := range nsPreparer.namespaces() {
if _, ok := numPodsScheduledPerNamespace[n]; ok { if _, ok := e.numPodsScheduledPerNamespace[n]; ok {
// this namespace has been already created. // this namespace has been already created.
continue continue
} }
numPodsScheduledPerNamespace[n] = 0 e.numPodsScheduledPerNamespace[n] = 0
} }
} }
func doBarrierOp(tCtx ktesting.TContext, opIndex int, concreteOp *barrierOp, numPodsScheduledPerNamespace map[string]int, podInformer coreinformers.PodInformer) { func (e *WorkloadExecutor) doBarrierOp(opIndex int, concreteOp *barrierOp) {
tCtx := *e.tCtx
for _, namespace := range concreteOp.Namespaces { for _, namespace := range concreteOp.Namespaces {
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { if _, ok := e.numPodsScheduledPerNamespace[namespace]; !ok {
tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
} }
} }
switch concreteOp.StageRequirement { switch concreteOp.StageRequirement {
case Attempted: case Attempted:
if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { if err := waitUntilPodsAttempted(tCtx, e.podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, e.numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) tCtx.Fatalf("op %d: %v", opIndex, err)
} }
case Scheduled: case Scheduled:
// Default should be treated like "Scheduled", so handling both in the same way. // Default should be treated like "Scheduled", so handling both in the same way.
fallthrough fallthrough
default: default:
if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { if err := waitUntilPodsScheduled(tCtx, e.podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, e.numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) tCtx.Fatalf("op %d: %v", opIndex, err)
} }
// At the end of the barrier, we can be sure that there are no pods // At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on. // pending scheduling in the namespaces that we just blocked on.
if len(concreteOp.Namespaces) == 0 { if len(concreteOp.Namespaces) == 0 {
numPodsScheduledPerNamespace = make(map[string]int) e.numPodsScheduledPerNamespace = make(map[string]int)
} else { } else {
for _, namespace := range concreteOp.Namespaces { for _, namespace := range concreteOp.Namespaces {
delete(numPodsScheduledPerNamespace, namespace) delete(e.numPodsScheduledPerNamespace, namespace)
} }
} }
} }
} }
func doStopCollectingMetrics(tCtx ktesting.TContext, collectorCtx *ktesting.TContext, opIndex int, dataItems *[]DataItem, w *workload, collectors []testDataCollector, collectorWG *sync.WaitGroup) { func (e *WorkloadExecutor) doStopCollectingMetrics(opIndex int) {
items := stopCollectingMetrics(tCtx, *collectorCtx, collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors) tCtx := *e.tCtx
*dataItems = append(*dataItems, items...) collectorCtx := *e.collectorCtx
*collectorCtx = nil items := stopCollectingMetrics(tCtx, collectorCtx, e.collectorWG, e.w.Threshold, *e.w.ThresholdMetricSelector, opIndex, e.collectors)
e.dataItems = append(e.dataItems, items...)
collectorCtx = nil
} }
func doCreatePodsOp( func (e *WorkloadExecutor) doCreatePodsOp(opIndex int, concreteOp *createPodsOp) {
tCtx ktesting.TContext, tCtx := *e.tCtx
opIndex int, collectorCtx := *e.tCtx
concreteOp *createPodsOp,
numPodsScheduledPerNamespace map[string]int,
dataItems *[]DataItem,
w *workload,
collectors *[]testDataCollector,
collectorWG *sync.WaitGroup,
throughputErrorMargin float64,
podInformer coreinformers.PodInformer,
tc *testCase,
collectorCtx *ktesting.TContext) {
var namespace string var namespace string
// define Pod's namespace automatically, and create that namespace. // define Pod's namespace automatically, and create that namespace.
namespace = fmt.Sprintf("namespace-%d", opIndex) namespace = fmt.Sprintf("namespace-%d", opIndex)
if concreteOp.Namespace != nil { if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace namespace = *concreteOp.Namespace
} }
createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) createNamespaceIfNotPresent(tCtx, namespace, &e.numPodsScheduledPerNamespace)
if concreteOp.PodTemplatePath == nil { if concreteOp.PodTemplatePath == nil {
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath concreteOp.PodTemplatePath = e.tc.DefaultPodTemplatePath
} }
if concreteOp.CollectMetrics { if concreteOp.CollectMetrics {
if *collectorCtx != nil { if *e.collectorCtx != nil {
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
} }
*collectorCtx, *collectors = startCollectingMetrics(tCtx, collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) *e.collectorCtx, e.collectors = startCollectingMetrics(tCtx, e.collectorWG, e.podInformer, e.tc.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil)
} }
if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil { if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) tCtx.Fatalf("op %d: %v", opIndex, err)
@ -1641,13 +1682,13 @@ func doCreatePodsOp(
case concreteOp.SkipWaitToCompletion: case concreteOp.SkipWaitToCompletion:
// Only record those namespaces that may potentially require barriers // Only record those namespaces that may potentially require barriers
// in the future. // in the future.
numPodsScheduledPerNamespace[namespace] += concreteOp.Count e.numPodsScheduledPerNamespace[namespace] += concreteOp.Count
case concreteOp.SteadyState: case concreteOp.SteadyState:
if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil { if err := createPodsSteadily(tCtx, namespace, e.podInformer, concreteOp); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) tCtx.Fatalf("op %d: %v", opIndex, err)
} }
default: default:
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, nil, namespace, concreteOp.Count); err != nil { if err := waitUntilPodsScheduledInNamespace(tCtx, e.podInformer, nil, namespace, concreteOp.Count); err != nil {
tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
} }
} }
@ -1655,16 +1696,17 @@ func doCreatePodsOp(
// CollectMetrics and SkipWaitToCompletion can never be true at the // CollectMetrics and SkipWaitToCompletion can never be true at the
// same time, so if we're here, it means that all pods have been // same time, so if we're here, it means that all pods have been
// scheduled. // scheduled.
items := stopCollectingMetrics(tCtx, *collectorCtx, collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, *collectors) items := stopCollectingMetrics(tCtx, collectorCtx, e.collectorWG, e.w.Threshold, *e.w.ThresholdMetricSelector, opIndex, e.collectors)
*dataItems = append(*dataItems, items...) e.dataItems = append(e.dataItems, items...)
*collectorCtx = nil *e.collectorCtx = nil
} }
} }
func doDeletePodsOp(tCtx ktesting.TContext, opIndex int, concreteOp *deletePodsOp, podInformer coreinformers.PodInformer, wg *sync.WaitGroup) { func (e *WorkloadExecutor) doDeletePodsOp(opIndex int, concreteOp *deletePodsOp) {
tCtx := *e.tCtx
labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector) labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector)
podsToDelete, err := podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) podsToDelete, err := e.podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector)
if err != nil { if err != nil {
tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err)
} }
@ -1701,9 +1743,9 @@ func doDeletePodsOp(tCtx ktesting.TContext, opIndex int, concreteOp *deletePodsO
} }
if concreteOp.SkipWaitToCompletion { if concreteOp.SkipWaitToCompletion {
wg.Add(1) e.wg.Add(1)
go func(opIndex int) { go func(opIndex int) {
defer wg.Done() defer e.wg.Done()
deletePods(opIndex) deletePods(opIndex)
}(opIndex) }(opIndex)
} else { } else {
@ -1711,7 +1753,8 @@ func doDeletePodsOp(tCtx ktesting.TContext, opIndex int, concreteOp *deletePodsO
} }
} }
func doChurnOp(tCtx ktesting.TContext, opIndex int, concreteOp *churnOp, wg *sync.WaitGroup) { func (e *WorkloadExecutor) doChurnOp(opIndex int, concreteOp *churnOp) {
tCtx := *e.tCtx
var namespace string var namespace string
if concreteOp.Namespace != nil { if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace namespace = *concreteOp.Namespace
@ -1770,9 +1813,9 @@ func doChurnOp(tCtx ktesting.TContext, opIndex int, concreteOp *churnOp, wg *syn
switch concreteOp.Mode { switch concreteOp.Mode {
case Create: case Create:
wg.Add(1) e.wg.Add(1)
go func() { go func() {
defer wg.Done() defer e.wg.Done()
defer ticker.Stop() defer ticker.Stop()
count, threshold := 0, concreteOp.Number count, threshold := 0, concreteOp.Number
if threshold == 0 { if threshold == 0 {
@ -1791,9 +1834,9 @@ func doChurnOp(tCtx ktesting.TContext, opIndex int, concreteOp *churnOp, wg *syn
} }
}() }()
case Recreate: case Recreate:
wg.Add(1) e.wg.Add(1)
go func() { go func() {
defer wg.Done() defer e.wg.Done()
defer ticker.Stop() defer ticker.Stop()
retVals := make([][]string, len(churnFns)) retVals := make([][]string, len(churnFns))
// For each churn function, instantiate a slice of strings with length "concreteOp.Number". // For each churn function, instantiate a slice of strings with length "concreteOp.Number".
@ -1817,86 +1860,23 @@ func doChurnOp(tCtx ktesting.TContext, opIndex int, concreteOp *churnOp, wg *syn
} }
} }
func doDefaultOp(tCtx ktesting.TContext, opIndex int, concreteOp realOp, numPodsScheduledPerNamespace map[string]int) { func (e *WorkloadExecutor) doDefaultOp(opIndex int, concreteOp realOp) {
tCtx := *e.tCtx
runable, ok := concreteOp.(runnableOp) runable, ok := concreteOp.(runnableOp)
if !ok { if !ok {
tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
} }
for _, namespace := range runable.requiredNamespaces() { for _, namespace := range runable.requiredNamespaces() {
createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) createNamespaceIfNotPresent(tCtx, namespace, &e.numPodsScheduledPerNamespace)
} }
runable.run(tCtx) runable.run(tCtx)
} }
func doStartCollectingMetricsOp(tCtx ktesting.TContext, opIndex int, concreteOp *startCollectingMetricsOp, collectorCtx *ktesting.TContext, collectors *[]testDataCollector, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, tc *testCase, throughputErrorMargin float64) { func (e *WorkloadExecutor) doStartCollectingMetricsOp(opIndex int, concreteOp *startCollectingMetricsOp) {
if *collectorCtx != nil { if *e.collectorCtx != nil {
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) (*e.tCtx).Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
}
*collectorCtx, *collectors = startCollectingMetrics(tCtx, collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector)
}
func runJobs(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, throughputErrorMargin float64) {
var wg sync.WaitGroup
defer wg.Wait()
defer tCtx.Cancel("workload is done")
numPodsScheduledPerNamespace := make(map[string]int)
nextNodeIndex := 0
podInformer := informerFactory.Core().V1().Pods()
var collectors []testDataCollector
// This needs a separate context and wait group because
// the metrics collecting needs to be sure that the goroutines
// are stopped.
var collectorCtx ktesting.TContext
var collectorWG sync.WaitGroup
defer collectorWG.Wait()
var dataItems []DataItem
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
realOp, err := op.realOp.patchParams(w)
if err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
select {
case <-tCtx.Done():
tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx))
default:
}
switch concreteOp := realOp.(type) {
case *createNodesOp:
doCreateNodesOp(tCtx, opIndex, concreteOp, &nextNodeIndex)
case *createNamespacesOp:
doCreateNamespaceOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace)
case *createPodsOp:
doCreatePodsOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace, &dataItems, w, &collectors, &collectorWG, throughputErrorMargin, podInformer, tc, &collectorCtx)
if collectorCtx != nil {
defer collectorCtx.Cancel("cleaning up")
}
case *deletePodsOp:
doDeletePodsOp(tCtx, opIndex, concreteOp, podInformer, &wg)
case *churnOp:
doChurnOp(tCtx, opIndex, concreteOp, &wg)
case *barrierOp:
doBarrierOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace, podInformer)
case *sleepOp:
select {
case <-tCtx.Done():
case <-time.After(concreteOp.Duration.Duration):
}
case *startCollectingMetricsOp:
doStartCollectingMetricsOp(tCtx, opIndex, concreteOp, &collectorCtx, &collectors, &collectorWG, podInformer, tc, throughputErrorMargin)
defer collectorCtx.Cancel("cleaning up")
case *stopCollectingMetricsOp:
doStopCollectingMetrics(tCtx, &collectorCtx, opIndex, &dataItems, w, collectors, &collectorWG)
default:
doDefaultOp(tCtx, opIndex, concreteOp, numPodsScheduledPerNamespace)
}
} }
*e.collectorCtx, e.collectors = startCollectingMetrics((*e.tCtx), e.collectorWG, e.podInformer, e.tc.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector)
} }
func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) { func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {