This commit is contained in:
YamasouA 2025-01-29 23:58:51 +09:00
parent 659804b765
commit 1b0ad78718

View File

@ -1450,38 +1450,55 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex
return dataItems return dataItems
} }
type MetricsCollectionData struct { // metricsCollectionData manages the state and synchronization of metrics collection
Collectors []testDataCollector // during workload execution.
type metricsCollectionData struct {
// collectors holds a list of test data collectors used to gather performance metrics.
collectors []testDataCollector
// collectorCtx is a separate context specifically for managing the lifecycle
// of metrics collection goroutines.
// This needs a separate context and wait group because // This needs a separate context and wait group because
// the metrics collecting needs to be sure that the goroutines // the metrics collecting needs to be sure that the goroutines
// are stopped. // are stopped.
CollectorCtx ktesting.TContext collectorCtx ktesting.TContext
CollectorWG *sync.WaitGroup collectorWG *sync.WaitGroup
// Disable error checking of the sampling interval length in the // disable error checking of the sampling interval length in the
// throughput collector by default. When running benchmarks, report // throughput collector by default. When running benchmarks, report
// it as test failure when samples are not taken regularly. // it as test failure when samples are not taken regularly.
ThroughputErrorMargin float64 throughputErrorMargin float64
} }
type WorkloadState struct { // WorkloadState holds the state information about the workload being executed.
DataItems []DataItem type workloadState struct {
NextNodeIndex int // dataItems stores the collected data from the workload execution.
dataItems []DataItem
// nextNodeIndex keeps track of the next node index to be used when creating nodes.
nextNodeIndex int
// numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have. // numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have.
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
NumPodsScheduledPerNamespace map[string]int numPodsScheduledPerNamespace map[string]int
} }
type SharedOperationData struct { // sharedOperationData encapsulates all shared state and dependencies used during workload execution.
type sharedOperationData struct {
// podInformer provides access to the informer for monitoring Pod events in the cluster.
// Additional informers needed for testing. The pod informer was // Additional informers needed for testing. The pod informer was
// already created before (scheduler.NewInformerFactory) and the // already created before (scheduler.NewInformerFactory) and the
// factory was started for it (mustSetupCluster), therefore we don't // factory was started for it (mustSetupCluster), therefore we don't
// need to start again. // need to start again.
PodInformer coreinformers.PodInformer podInformer coreinformers.PodInformer
MetricsData *MetricsCollectionData // metricsData contains information and synchronization primitives for managing
WorkloadState *WorkloadState // metrics collection during workload execution.
TCtx ktesting.TContext metricsData *metricsCollectionData
WG sync.WaitGroup // workloadState holds information about the current state of the workload,
CancelFunc context.CancelFunc // including scheduled pods and created namespaces.
workloadState *workloadState
// tCtx is the root test context, used for cancellation and logging throughout
// the execution of workload operations.
tCtx ktesting.TContext
// wg is a wait group that tracks all ongoing goroutines in the workload execution.
// Ensures proper synchronization and prevents premature termination of operations.
wg *sync.WaitGroup
} }
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem { func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem {
@ -1507,25 +1524,27 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
// Everything else started by this function gets stopped before it returns. // Everything else started by this function gets stopped before it returns.
tCtx = ktesting.WithCancel(tCtx) tCtx = ktesting.WithCancel(tCtx)
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait() defer func() {
defer tCtx.Cancel("workload is done") wg.Wait()
tCtx.Cancel("workload is done")
}()
var dataItems []DataItem var dataItems []DataItem
var collectorWG sync.WaitGroup var collectorWG sync.WaitGroup
defer collectorWG.Wait() defer collectorWG.Wait()
sharedOperationData := SharedOperationData{ sharedOperationData := sharedOperationData{
TCtx: tCtx, tCtx: tCtx,
WG: wg, wg: &wg,
MetricsData: &MetricsCollectionData{ metricsData: &metricsCollectionData{
CollectorWG: &sync.WaitGroup{}, collectorWG: &sync.WaitGroup{},
ThroughputErrorMargin: throughputErrorMargin, throughputErrorMargin: throughputErrorMargin,
}, },
WorkloadState: &WorkloadState{ workloadState: &workloadState{
NumPodsScheduledPerNamespace: make(map[string]int), numPodsScheduledPerNamespace: make(map[string]int),
}, },
PodInformer: informerFactory.Core().V1().Pods(), podInformer: informerFactory.Core().V1().Pods(),
} }
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
@ -1543,90 +1562,90 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
return dataItems return dataItems
} }
func runCreateNodesOp(opIndex int, concreteOp *createNodesOp, sharedOperationData *SharedOperationData) { func runCreateNodesOp(opIndex int, concreteOp *createNodesOp, sharedOperationData *sharedOperationData) {
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, sharedOperationData.TCtx.Client()) nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, sharedOperationData.tCtx.Client())
if err != nil { if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err)
} }
if err := nodePreparer.PrepareNodes(sharedOperationData.TCtx, sharedOperationData.WorkloadState.NextNodeIndex); err != nil { if err := nodePreparer.PrepareNodes(sharedOperationData.tCtx, sharedOperationData.workloadState.nextNodeIndex); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err)
} }
sharedOperationData.WorkloadState.NextNodeIndex += concreteOp.Count sharedOperationData.workloadState.nextNodeIndex += concreteOp.Count
} }
func runCreateNamespacesOp(opIndex int, concreteOp *createNamespacesOp, sharedOperationData *SharedOperationData) { func runCreateNamespacesOp(opIndex int, concreteOp *createNamespacesOp, sharedOperationData *sharedOperationData) {
nsPreparer, err := newNamespacePreparer(sharedOperationData.TCtx, concreteOp) nsPreparer, err := newNamespacePreparer(sharedOperationData.tCtx, concreteOp)
if err != nil { if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err)
} }
if err := nsPreparer.prepare(sharedOperationData.TCtx); err != nil { if err := nsPreparer.prepare(sharedOperationData.tCtx); err != nil {
err2 := nsPreparer.cleanup(sharedOperationData.TCtx) err2 := nsPreparer.cleanup(sharedOperationData.tCtx)
if err2 != nil { if err2 != nil {
err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2) err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2)
} }
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err)
} }
for _, n := range nsPreparer.namespaces() { for _, n := range nsPreparer.namespaces() {
if _, ok := sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[n]; ok { if _, ok := sharedOperationData.workloadState.numPodsScheduledPerNamespace[n]; ok {
// this namespace has been already created. // this namespace has been already created.
continue continue
} }
sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[n] = 0 sharedOperationData.workloadState.numPodsScheduledPerNamespace[n] = 0
} }
} }
func runCreatePodsOp(tc *testCase, w *workload, opIndex int, concreteOp *createPodsOp, sharedOperationData *SharedOperationData) { func runCreatePodsOp(tc *testCase, w *workload, opIndex int, concreteOp *createPodsOp, sharedOperationData *sharedOperationData) {
var namespace string var namespace string
// define Pod's namespace automatically, and create that namespace. // define Pod's namespace automatically, and create that namespace.
namespace = fmt.Sprintf("namespace-%d", opIndex) namespace = fmt.Sprintf("namespace-%d", opIndex)
if concreteOp.Namespace != nil { if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace namespace = *concreteOp.Namespace
} }
createNamespaceIfNotPresent(sharedOperationData.TCtx, namespace, &sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace) createNamespaceIfNotPresent(sharedOperationData.tCtx, namespace, &sharedOperationData.workloadState.numPodsScheduledPerNamespace)
if concreteOp.PodTemplatePath == nil { if concreteOp.PodTemplatePath == nil {
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
} }
if concreteOp.CollectMetrics { if concreteOp.CollectMetrics {
if sharedOperationData.MetricsData.CollectorCtx != nil { if sharedOperationData.metricsData.collectorCtx != nil {
sharedOperationData.TCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) sharedOperationData.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
} }
sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.Collectors = startCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorWG, sharedOperationData.PodInformer, tc.MetricsCollectorConfig, sharedOperationData.MetricsData.ThroughputErrorMargin, opIndex, namespace, []string{namespace}, nil) sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectors = startCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorWG, sharedOperationData.podInformer, tc.MetricsCollectorConfig, sharedOperationData.metricsData.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil)
defer sharedOperationData.MetricsData.CollectorCtx.Cancel("cleaning up") defer sharedOperationData.metricsData.collectorCtx.Cancel("cleaning up")
} }
if err := createPodsRapidly(sharedOperationData.TCtx, namespace, concreteOp); err != nil { if err := createPodsRapidly(sharedOperationData.tCtx, namespace, concreteOp); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err)
} }
switch { switch {
case concreteOp.SkipWaitToCompletion: case concreteOp.SkipWaitToCompletion:
// Only record those namespaces that may potentially require barriers // Only record those namespaces that may potentially require barriers
// in the future. // in the future.
sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[namespace] += concreteOp.Count sharedOperationData.workloadState.numPodsScheduledPerNamespace[namespace] += concreteOp.Count
case concreteOp.SteadyState: case concreteOp.SteadyState:
if err := createPodsSteadily(sharedOperationData.TCtx, namespace, sharedOperationData.PodInformer, concreteOp); err != nil { if err := createPodsSteadily(sharedOperationData.tCtx, namespace, sharedOperationData.podInformer, concreteOp); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err)
} }
default: default:
if err := waitUntilPodsScheduledInNamespace(sharedOperationData.TCtx, sharedOperationData.PodInformer, nil, namespace, concreteOp.Count); err != nil { if err := waitUntilPodsScheduledInNamespace(sharedOperationData.tCtx, sharedOperationData.podInformer, nil, namespace, concreteOp.Count); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) sharedOperationData.tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
} }
} }
if concreteOp.CollectMetrics { if concreteOp.CollectMetrics {
// CollectMetrics and SkipWaitToCompletion can never be true at the // CollectMetrics and SkipWaitToCompletion can never be true at the
// same time, so if we're here, it means that all pods have been // same time, so if we're here, it means that all pods have been
// scheduled. // scheduled.
items := stopCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.CollectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.MetricsData.Collectors) items := stopCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.metricsData.collectors)
sharedOperationData.WorkloadState.DataItems = append(sharedOperationData.WorkloadState.DataItems, items...) sharedOperationData.workloadState.dataItems = append(sharedOperationData.workloadState.dataItems, items...)
sharedOperationData.MetricsData.CollectorCtx = nil sharedOperationData.metricsData.collectorCtx = nil
} }
} }
func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData *SharedOperationData) { func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData *sharedOperationData) {
labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector) labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector)
podsToDelete, err := sharedOperationData.PodInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) podsToDelete, err := sharedOperationData.podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector)
if err != nil { if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) sharedOperationData.tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err)
} }
deletePods := func(opIndex int) { deletePods := func(opIndex int) {
@ -1637,13 +1656,13 @@ func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData
for i := 0; i < len(podsToDelete); i++ { for i := 0; i < len(podsToDelete); i++ {
select { select {
case <-ticker.C: case <-ticker.C:
if err := sharedOperationData.TCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(sharedOperationData.TCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { if err := sharedOperationData.tCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(sharedOperationData.tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
return return
} }
sharedOperationData.TCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) sharedOperationData.tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err)
} }
case <-sharedOperationData.TCtx.Done(): case <-sharedOperationData.tCtx.Done():
return return
} }
} }
@ -1652,18 +1671,18 @@ func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData
listOpts := metav1.ListOptions{ listOpts := metav1.ListOptions{
LabelSelector: labelSelector.String(), LabelSelector: labelSelector.String(),
} }
if err := sharedOperationData.TCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(sharedOperationData.TCtx, metav1.DeleteOptions{}, listOpts); err != nil { if err := sharedOperationData.tCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(sharedOperationData.tCtx, metav1.DeleteOptions{}, listOpts); err != nil {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
return return
} }
sharedOperationData.TCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err) sharedOperationData.tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err)
} }
} }
if concreteOp.SkipWaitToCompletion { if concreteOp.SkipWaitToCompletion {
sharedOperationData.WG.Add(1) sharedOperationData.wg.Add(1)
go func(opIndex int) { go func(opIndex int) {
defer sharedOperationData.WG.Done() defer sharedOperationData.wg.Done()
deletePods(opIndex) deletePods(opIndex)
}(opIndex) }(opIndex)
} else { } else {
@ -1671,18 +1690,18 @@ func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData
} }
} }
func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOperationData) { func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *sharedOperationData) {
var namespace string var namespace string
if concreteOp.Namespace != nil { if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace namespace = *concreteOp.Namespace
} else { } else {
namespace = fmt.Sprintf("namespace-%d", opIndex) namespace = fmt.Sprintf("namespace-%d", opIndex)
} }
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(sharedOperationData.TCtx.Client().Discovery())) restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(sharedOperationData.tCtx.Client().Discovery()))
// Ensure the namespace exists. // Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := sharedOperationData.TCtx.Client().CoreV1().Namespaces().Create(sharedOperationData.TCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { if _, err := sharedOperationData.tCtx.Client().CoreV1().Namespaces().Create(sharedOperationData.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
sharedOperationData.TCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) sharedOperationData.tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
} }
var churnFns []func(name string) string var churnFns []func(name string) string
@ -1690,31 +1709,31 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOpe
for i, path := range concreteOp.TemplatePaths { for i, path := range concreteOp.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path) unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil { if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) sharedOperationData.tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
} }
// Obtain GVR. // Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil { if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) sharedOperationData.tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
} }
gvr := mapping.Resource gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects. // Distinguish cluster-scoped with namespaced API objects.
var dynRes dynamic.ResourceInterface var dynRes dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace { if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dynRes = sharedOperationData.TCtx.Dynamic().Resource(gvr).Namespace(namespace) dynRes = sharedOperationData.tCtx.Dynamic().Resource(gvr).Namespace(namespace)
} else { } else {
dynRes = sharedOperationData.TCtx.Dynamic().Resource(gvr) dynRes = sharedOperationData.tCtx.Dynamic().Resource(gvr)
} }
churnFns = append(churnFns, func(name string) string { churnFns = append(churnFns, func(name string) string {
if name != "" { if name != "" {
if err := dynRes.Delete(sharedOperationData.TCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { if err := dynRes.Delete(sharedOperationData.tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) {
sharedOperationData.TCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) sharedOperationData.tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err)
} }
return "" return ""
} }
live, err := dynRes.Create(sharedOperationData.TCtx, unstructuredObj, metav1.CreateOptions{}) live, err := dynRes.Create(sharedOperationData.tCtx, unstructuredObj, metav1.CreateOptions{})
if err != nil { if err != nil {
return "" return ""
} }
@ -1731,9 +1750,9 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOpe
switch concreteOp.Mode { switch concreteOp.Mode {
case Create: case Create:
sharedOperationData.WG.Add(1) sharedOperationData.wg.Add(1)
go func() { go func() {
defer sharedOperationData.WG.Done() defer sharedOperationData.wg.Done()
count, threshold := 0, concreteOp.Number count, threshold := 0, concreteOp.Number
if threshold == 0 { if threshold == 0 {
threshold = math.MaxInt32 threshold = math.MaxInt32
@ -1745,15 +1764,15 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOpe
churnFns[i]("") churnFns[i]("")
} }
count++ count++
case <-sharedOperationData.TCtx.Done(): case <-sharedOperationData.tCtx.Done():
return return
} }
} }
}() }()
case Recreate: case Recreate:
sharedOperationData.WG.Add(1) sharedOperationData.wg.Add(1)
go func() { go func() {
defer sharedOperationData.WG.Done() defer sharedOperationData.wg.Done()
retVals := make([][]string, len(churnFns)) retVals := make([][]string, len(churnFns))
// For each churn function, instantiate a slice of strings with length "concreteOp.Number". // For each churn function, instantiate a slice of strings with length "concreteOp.Number".
for i := range retVals { for i := range retVals {
@ -1768,7 +1787,7 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOpe
retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number]) retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
} }
count++ count++
case <-sharedOperationData.TCtx.Done(): case <-sharedOperationData.tCtx.Done():
return return
} }
} }
@ -1776,77 +1795,75 @@ func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOpe
} }
} }
func runBarrierOp(opIndex int, concreteOp *barrierOp, sharedOperationData *SharedOperationData) { func runBarrierOp(opIndex int, concreteOp *barrierOp, sharedOperationData *sharedOperationData) {
for _, namespace := range concreteOp.Namespaces { for _, namespace := range concreteOp.Namespaces {
if _, ok := sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[namespace]; !ok { if _, ok := sharedOperationData.workloadState.numPodsScheduledPerNamespace[namespace]; !ok {
sharedOperationData.TCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) sharedOperationData.tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
} }
} }
switch concreteOp.StageRequirement { switch concreteOp.StageRequirement {
case Attempted: case Attempted:
if err := waitUntilPodsAttempted(sharedOperationData.TCtx, sharedOperationData.PodInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace); err != nil { if err := waitUntilPodsAttempted(sharedOperationData.tCtx, sharedOperationData.podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.workloadState.numPodsScheduledPerNamespace); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err)
} }
case Scheduled: case Scheduled:
// Default should be treated like "Scheduled", so handling both in the same way. // Default should be treated like "Scheduled", so handling both in the same way.
fallthrough fallthrough
default: default:
if err := waitUntilPodsScheduled(sharedOperationData.TCtx, sharedOperationData.PodInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace); err != nil { if err := waitUntilPodsScheduled(sharedOperationData.tCtx, sharedOperationData.podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.workloadState.numPodsScheduledPerNamespace); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err)
} }
// At the end of the barrier, we can be sure that there are no pods // At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on. // pending scheduling in the namespaces that we just blocked on.
if len(concreteOp.Namespaces) == 0 { if len(concreteOp.Namespaces) == 0 {
sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace = make(map[string]int) sharedOperationData.workloadState.numPodsScheduledPerNamespace = make(map[string]int)
} else { } else {
for _, namespace := range concreteOp.Namespaces { for _, namespace := range concreteOp.Namespaces {
delete(sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace, namespace) delete(sharedOperationData.workloadState.numPodsScheduledPerNamespace, namespace)
} }
} }
} }
} }
func runSleepOp(concreteOp *sleepOp, sharedOperationData *SharedOperationData) { func runSleepOp(concreteOp *sleepOp, sharedOperationData *sharedOperationData) {
select { select {
case <-sharedOperationData.TCtx.Done(): case <-sharedOperationData.tCtx.Done():
case <-time.After(concreteOp.Duration.Duration): case <-time.After(concreteOp.Duration.Duration):
} }
} }
func runStartCollectingMetricsOp(opIndex int, tc *testCase, concreteOp *startCollectingMetricsOp, sharedOperationData *SharedOperationData) { func runStartCollectingMetricsOp(opIndex int, tc *testCase, concreteOp *startCollectingMetricsOp, sharedOperationData *sharedOperationData) {
if sharedOperationData.MetricsData.CollectorCtx != nil { if sharedOperationData.metricsData.collectorCtx != nil {
sharedOperationData.TCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) sharedOperationData.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
} }
sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.Collectors = startCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorWG, sharedOperationData.PodInformer, tc.MetricsCollectorConfig, sharedOperationData.MetricsData.ThroughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector) sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectors = startCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorWG, sharedOperationData.podInformer, tc.MetricsCollectorConfig, sharedOperationData.metricsData.throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector)
defer sharedOperationData.MetricsData.CollectorCtx.Cancel("cleaning up")
} }
func runStopCollectingMetricsOp(opIndex int, w *workload, sharedOperationData *SharedOperationData) { func runStopCollectingMetricsOp(opIndex int, w *workload, sharedOperationData *sharedOperationData) {
items := stopCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.CollectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.MetricsData.Collectors) items := stopCollectingMetrics(sharedOperationData.tCtx, sharedOperationData.metricsData.collectorCtx, sharedOperationData.metricsData.collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.metricsData.collectors)
sharedOperationData.WorkloadState.DataItems = append(sharedOperationData.WorkloadState.DataItems, items...) sharedOperationData.workloadState.dataItems = append(sharedOperationData.workloadState.dataItems, items...)
sharedOperationData.MetricsData.CollectorCtx = nil sharedOperationData.metricsData.collectorCtx = nil
} }
func runDefault(opIndex int, concreteOp realOp, sharedOperationData *SharedOperationData) { func runDefault(opIndex int, concreteOp realOp, sharedOperationData *sharedOperationData) {
runable, ok := concreteOp.(runnableOp) runable, ok := concreteOp.(runnableOp)
if !ok { if !ok {
sharedOperationData.TCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) sharedOperationData.tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
} }
for _, namespace := range runable.requiredNamespaces() { for _, namespace := range runable.requiredNamespaces() {
createNamespaceIfNotPresent(sharedOperationData.TCtx, namespace, &sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace) createNamespaceIfNotPresent(sharedOperationData.tCtx, namespace, &sharedOperationData.workloadState.numPodsScheduledPerNamespace)
} }
runable.run(sharedOperationData.TCtx) runable.run(sharedOperationData.tCtx)
} }
func runOperation(tc *testCase, opIndex int, op op, w *workload, sharedOperationData *SharedOperationData) { func runOperation(tc *testCase, opIndex int, op op, w *workload, sharedOperationData *sharedOperationData) {
realOp, err := op.realOp.patchParams(w) realOp, err := op.realOp.patchParams(w)
if err != nil { if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err) sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, err)
} }
select { select {
case <-sharedOperationData.TCtx.Done(): case <-sharedOperationData.tCtx.Done():
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, context.Cause(sharedOperationData.TCtx)) sharedOperationData.tCtx.Fatalf("op %d: %v", opIndex, context.Cause(sharedOperationData.tCtx))
default: default:
} }
switch concreteOp := realOp.(type) { switch concreteOp := realOp.(type) {